Commit 98ee3a53 by Will Daly

Implement IP filtering in embargo middleware.

Add history table for course access rule changes.

Provide test utility for simulating restricted access.

Provide `redirect_if_blocked` method for integration with other
parts of the system (will be used for blocking enrollment).

Add info-level logging explaining when and why users are blocked.
parent 268280df
......@@ -10,12 +10,114 @@ import pygeoip
from django.core.cache import cache
from django.conf import settings
from embargo.models import CountryAccessRule, RestrictedCourse
log = logging.getLogger(__name__)
def get_user_country_from_profile(user):
def redirect_if_blocked(course_key, access_point='enrollment', **kwargs):
"""Redirect if the user does not have access to the course.
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
Same as `check_course_access` and `message_url_path`
"""
if settings.FEATURES.get('ENABLE_COUNTRY_ACCESS'):
is_blocked = not check_course_access(course_key, **kwargs)
if is_blocked:
return message_url_path(course_key, access_point)
def check_course_access(course_key, user=None, ip_address=None, url=None):
"""
Check is the user with this ip_address has access to the given course
Arguments:
course_key (CourseKey): Location of the course the user is trying to access.
Keyword Arguments:
user (User): The user making the request. Can be None, in which case
the user's profile country will not be checked.
ip_address (str): The IP address of the request.
url (str): The URL the user is trying to access. Used in
log messages.
Returns:
Boolean: True if the user has access to the course; False otherwise
"""
# First, check whether there are any restrictions on the course.
# If not, then we do not need to do any further checks
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
if not course_is_restricted:
return True
if ip_address is not None:
# Retrieve the country code from the IP address
# and check it against the allowed countries list for a course
user_country_from_ip = _country_code_from_ip(ip_address)
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
log.info(
(
u"Blocking user %s from accessing course %s at %s "
u"because the user's IP address %s appears to be "
u"located in %s."
),
getattr(user, 'id', '<Not Authenticated>'),
course_key,
url,
ip_address,
user_country_from_ip
)
return False
if user is not None:
# Retrieve the country code from the user's profile
# and check it against the allowed countries list for a course.
user_country_from_profile = _get_user_country_from_profile(user)
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
log.info(
(
u"Blocking user %s from accessing course %s at %s "
u"because the user's profile country is %s."
),
user.id, course_key, url, user_country_from_profile
)
return False
return True
def message_url_path(course_key, access_point):
"""Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
"""
return RestrictedCourse.message_url_path(course_key, access_point)
def _get_user_country_from_profile(user):
"""
Check whether the user is embargoed based on the country code in the user's profile.
......@@ -55,40 +157,3 @@ def _country_code_from_ip(ip_addr):
return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
else:
return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
def check_course_access(user, ip_address, course_key):
"""
Check is the user with this ip_address has access to the given course
Params:
user (User): Currently logged in user object
ip_address (str): The ip_address of user
course_key (CourseLocator): CourseLocator object the user is trying to access
Returns:
The return will be True if the user has access on the course.
if any constraints fails it will return the False
"""
course_is_restricted = RestrictedCourse.is_restricted_course(course_key)
# If they're trying to access a course that cares about embargoes
# If course is not restricted then return immediately return True
# no need for further checking
if not course_is_restricted:
return True
# Retrieve the country code from the IP address
# and check it against the allowed countries list for a course
user_country_from_ip = _country_code_from_ip(ip_address)
# if user country has access to course return True
if not CountryAccessRule.check_country_access(course_key, user_country_from_ip):
return False
# Retrieve the country code from the user profile.
user_country_from_profile = get_user_country_from_profile(user)
# if profile country has access return True
if not CountryAccessRule.check_country_access(course_key, user_country_from_profile):
return False
return True
"""Exceptions for the embargo app."""
class InvalidAccessPoint(Exception):
"""The requested access point is not supported. """
def __init__(self, access_point, *args, **kwargs):
msg = (
u"Access point '{access_point}' should be either 'enrollment' or 'courseware'"
).format(access_point=access_point)
super(InvalidAccessPoint, self).__init__(msg, *args, **kwargs)
......@@ -32,11 +32,13 @@ EMBARGO_SITE_REDIRECT_URL = 'https://www.edx.org/'
"""
from functools import partial
import logging
import re
import pygeoip
from lazy import lazy
from django.core.exceptions import MiddlewareNotUsed
from django.core.cache import cache
from django.core.urlresolvers import reverse
from django.conf import settings
from django.shortcuts import redirect
from django.http import HttpResponseRedirect, HttpResponseForbidden
......@@ -45,7 +47,7 @@ from util.request import course_id_from_url
from student.models import unique_id_for_user
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
from embargo.api import check_course_access
from embargo import api as embargo_api
log = logging.getLogger(__name__)
......@@ -58,6 +60,17 @@ class EmbargoMiddleware(object):
optionally ``IPFilter`` rows in the database, using the django admin site.
"""
ALLOW_URL_PATTERNS = [
# Don't block the embargo message pages; otherwise we'd
# end up in an infinite redirect loop.
re.compile(r'^/embargo/blocked-message/'),
# Don't block the Django admin pages. Otherwise, we might
# accidentally lock ourselves out of Django admin
# during testing.
re.compile(r'^/admin/'),
]
# Reasons a user might be blocked.
# These are used to generate info messages in the logs.
REASONS = {
......@@ -71,20 +84,81 @@ class EmbargoMiddleware(object):
def __init__(self):
self.site_enabled = settings.FEATURES.get('SITE_EMBARGOED', False)
self.enable_country_access = settings.FEATURES.get('ENABLE_COUNTRY_ACCESS', False)
# If embargoing is turned off, make this middleware do nothing
if not settings.FEATURES.get('EMBARGO', False) and not self.site_enabled:
disable_middleware = not (
settings.FEATURES.get('EMBARGO') or
self.site_enabled or
self.enable_country_access
)
if disable_middleware:
raise MiddlewareNotUsed()
self.enable_country_access = settings.FEATURES.get('ENABLE_COUNTRY_ACCESS', False)
def process_request(self, request):
"""Block requests based on embargo rules.
In the new ENABLE_COUNTRY_ACCESS implmentation,
this will perform the following checks:
1) If the user's IP address is blacklisted, block.
2) If the user's IP address is whitelisted, allow.
3) If the user's country (inferred from their IP address) is blocked for
a courseware page, block.
4) If the user's country (retrieved from the user's profile) is blocked
for a courseware page, block.
5) Allow access.
"""
Processes embargo requests.
"""
# If the feature flag is set, use the new "country access" implementation.
# This is a more flexible implementation of the embargo feature that allows
# per-course country access rules.
if self.enable_country_access:
if self.country_access_rules(request):
# Never block certain patterns by IP address
for pattern in self.ALLOW_URL_PATTERNS:
if pattern.match(request.path) is not None:
return None
ip_address = get_ip(request)
ip_filter = IPFilter.current()
if ip_filter.enabled and ip_address in ip_filter.blacklist_ips:
log.info(
(
u"User %s was blocked from accessing %s "
u"because IP address %s is blacklisted."
), request.user.id, request.path, ip_address
)
# If the IP is blacklisted, reject.
# This applies to any request, not just courseware URLs.
ip_blacklist_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'embargo'
}
)
return redirect(ip_blacklist_url)
elif ip_filter.enabled and ip_address in ip_filter.whitelist_ips:
log.info(
(
u"User %s was allowed access to %s because "
u"IP address %s is whitelisted."
),
request.user.id, request.path, ip_address
)
# If the IP is whitelisted, then allow access,
# skipping later checks.
return None
else:
return self._embargo_redirect_response
# Otherwise, perform the country access checks.
# This applies only to courseware URLs.
return self.country_access_rules(request.user, ip_address, request.path)
url = request.path
course_id = course_id_from_url(url)
......@@ -306,19 +380,30 @@ class EmbargoMiddleware(object):
return _inner
def country_access_rules(self, request):
def country_access_rules(self, user, ip_address, url_path):
"""
check the country access rules for a given course.
if course id is invalid return True
Check the country access rules for a given course.
Applies only to courseware URLs.
Args:
request
user (User): The user making the current request.
ip_address (str): The IP address from which the request originated.
url_path (str): The request path.
Return:
boolean: True if the user has access else false.
Returns:
HttpResponse or None
"""
url = request.path
course_id = course_id_from_url(url)
if course_id is None:
return True
return check_course_access(request.user, get_ip(request), course_id)
course_id = course_id_from_url(url_path)
if course_id:
redirect_url = embargo_api.redirect_if_blocked(
course_id,
user=user,
ip_address=ip_address,
url=url_path,
access_point='courseware'
)
if redirect_url:
return redirect(redirect_url)
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'CourseAccessRuleHistory'
db.create_table('embargo_courseaccessrulehistory', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('timestamp', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, db_index=True, blank=True)),
('course_key', self.gf('xmodule_django.models.CourseKeyField')(max_length=255, db_index=True)),
('snapshot', self.gf('django.db.models.fields.TextField')(null=True, blank=True)),
))
db.send_create_signal('embargo', ['CourseAccessRuleHistory'])
def backwards(self, orm):
# Deleting model 'CourseAccessRuleHistory'
db.delete_table('embargo_courseaccessrulehistory')
models = {
'auth.group': {
'Meta': {'object_name': 'Group'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
},
'auth.permission': {
'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
},
'auth.user': {
'Meta': {'object_name': 'User'},
'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
},
'contenttypes.contenttype': {
'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
},
'embargo.country': {
'Meta': {'ordering': "['country']", 'object_name': 'Country'},
'country': ('django_countries.fields.CountryField', [], {'unique': 'True', 'max_length': '2', 'db_index': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
},
'embargo.countryaccessrule': {
'Meta': {'unique_together': "(('restricted_course', 'country'),)", 'object_name': 'CountryAccessRule'},
'country': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.Country']"}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'restricted_course': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['embargo.RestrictedCourse']"}),
'rule_type': ('django.db.models.fields.CharField', [], {'default': "'blacklist'", 'max_length': '255'})
},
'embargo.courseaccessrulehistory': {
'Meta': {'object_name': 'CourseAccessRuleHistory'},
'course_key': ('xmodule_django.models.CourseKeyField', [], {'max_length': '255', 'db_index': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'snapshot': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'timestamp': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'db_index': 'True', 'blank': 'True'})
},
'embargo.embargoedcourse': {
'Meta': {'object_name': 'EmbargoedCourse'},
'course_id': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}),
'embargoed': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
},
'embargo.embargoedstate': {
'Meta': {'object_name': 'EmbargoedState'},
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
'embargoed_countries': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
},
'embargo.ipfilter': {
'Meta': {'object_name': 'IPFilter'},
'blacklist': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'change_date': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
'changed_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'on_delete': 'models.PROTECT'}),
'enabled': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'whitelist': ('django.db.models.fields.TextField', [], {'blank': 'True'})
},
'embargo.restrictedcourse': {
'Meta': {'object_name': 'RestrictedCourse'},
'access_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}),
'course_key': ('xmodule_django.models.CourseKeyField', [], {'unique': 'True', 'max_length': '255', 'db_index': 'True'}),
'enroll_msg_key': ('django.db.models.fields.CharField', [], {'default': "'default'", 'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'})
}
}
complete_apps = ['embargo']
\ No newline at end of file
......@@ -12,10 +12,14 @@ file and check it in at the same time as your model changes. To do that,
"""
import ipaddr
import json
import logging
from django.db import models
from django.utils.translation import ugettext as _, ugettext_lazy
from django.core.cache import cache
from django.core.urlresolvers import reverse
from django.db.models.signals import post_save, post_delete
from django_countries.fields import CountryField
from django_countries import countries
......@@ -23,11 +27,11 @@ from django_countries import countries
from config_models.models import ConfigurationModel
from xmodule_django.models import CourseKeyField, NoneToEmptyManager
from embargo.exceptions import InvalidAccessPoint
from embargo.messages import ENROLL_MESSAGES, COURSEWARE_MESSAGES
WHITE_LIST = 'whitelist'
BLACK_LIST = 'blacklist'
log = logging.getLogger(__name__)
class EmbargoedCourse(models.Model):
......@@ -100,6 +104,9 @@ class RestrictedCourse(models.Model):
These displayed on pages served by the embargo app.
"""
COURSE_LIST_CACHE_KEY = 'embargo.restricted_courses'
MESSAGE_URL_CACHE_KEY = 'embargo.message_url_path.{access_point}.{course_key}'
ENROLL_MSG_KEY_CHOICES = tuple([
(msg_key, msg.description)
for msg_key, msg in ENROLL_MESSAGES.iteritems()
......@@ -130,11 +137,6 @@ class RestrictedCourse(models.Model):
)
@classmethod
def cache_key_name(cls):
"""Return the name of the key to use to cache the current restricted course list"""
return 'embargo/RestrictedCourse/courses'
@classmethod
def is_restricted_course(cls, course_id):
"""
Check if the course is in restricted list
......@@ -153,25 +155,172 @@ class RestrictedCourse(models.Model):
"""
Cache all restricted courses and returns the list of course_keys that are restricted
"""
restricted_courses = cache.get(cls.cache_key_name())
restricted_courses = cache.get(cls.COURSE_LIST_CACHE_KEY)
if not restricted_courses:
restricted_courses = list(RestrictedCourse.objects.values_list('course_key', flat=True))
cache.set(cls.cache_key_name(), restricted_courses)
cache.set(cls.COURSE_LIST_CACHE_KEY, restricted_courses)
return restricted_courses
def snapshot(self):
"""Return a snapshot of all access rules for this course.
This is useful for recording an audit trail of rule changes.
The returned dictionary is JSON-serializable.
Returns:
dict
Example Usage:
>>> restricted_course.snapshot()
{
'enroll_msg': 'default',
'access_msg': 'default',
'country_rules': [
{'country': 'IR', 'rule_type': 'blacklist'},
{'country': 'CU', 'rule_type': 'blacklist'}
]
}
"""
country_rules_for_course = (
CountryAccessRule.objects
).select_related('restricted_country').filter(restricted_course=self)
return {
'enroll_msg': self.enroll_msg_key,
'access_msg': self.access_msg_key,
'country_rules': [
{
'country': unicode(rule.country.country),
'rule_type': rule.rule_type
}
for rule in country_rules_for_course
]
}
def message_key_for_access_point(self, access_point):
"""Determine which message to show the user.
The message can be configured per-course and depends
on how the user is trying to access the course
(trying to enroll or accessing courseware).
Arguments:
access_point (str): Either "courseware" or "enrollment"
Returns:
str: The message key. If the access point is not valid,
returns None instead.
"""
if access_point == 'enrollment':
return self.enroll_msg_key
elif access_point == 'courseware':
return self.access_msg_key
def __unicode__(self):
return unicode(self.course_key)
def save(self, *args, **kwargs):
@classmethod
def message_url_path(cls, course_key, access_point):
"""Determine the URL path for the message explaining why the user was blocked.
This is configured per-course. See `RestrictedCourse` in the `embargo.models`
module for more details.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
Raises:
InvalidAccessPoint: Raised if access_point is not a supported value.
"""
Clear the cached value when saving a RestrictedCourse entry
if access_point not in ['enrollment', 'courseware']:
raise InvalidAccessPoint(access_point)
# First check the cache to see if we already have
# a URL for this (course_key, access_point) tuple
cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
access_point=access_point,
course_key=course_key
)
url = cache.get(cache_key)
# If there's a cache miss, we'll need to retrieve the message
# configuration from the database
if url is None:
url = cls._get_message_url_path_from_db(course_key, access_point)
cache.set(cache_key, url)
return url
@classmethod
def _get_message_url_path_from_db(cls, course_key, access_point):
"""Retrieve the "blocked" message from the database.
Arguments:
course_key (CourseKey): The location of the course.
access_point (str): How the user was trying to access the course.
Can be either "enrollment" or "courseware".
Returns:
unicode: The URL path to a page explaining why the user was blocked.
"""
super(RestrictedCourse, self).save(*args, **kwargs)
cache.delete(self.cache_key_name())
# Fallback in case we're not able to find a message path
# Presumably if the caller is requesting a URL, the caller
# has already determined that the user should be blocked.
# We use generic messaging unless we find something more specific,
# but *always* return a valid URL path.
default_path = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'default'
}
)
def delete(self, using=None):
super(RestrictedCourse, self).delete()
cache.delete(self.cache_key_name())
# First check whether this is a restricted course.
# The list of restricted courses is cached, so this does
# not require a database query.
if not cls.is_restricted_course(course_key):
return default_path
# Retrieve the message key from the restricted course
# for this access point, then determine the URL.
try:
course = cls.objects.get(course_key=course_key)
msg_key = course.message_key_for_access_point(access_point)
return reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': msg_key
}
)
except cls.DoesNotExist:
# This occurs only if there's a race condition
# between cache invalidation and database access.
return default_path
@classmethod
def invalidate_cache_for_course(cls, course_key):
"""Invalidate the caches for the restricted course. """
cache.delete(cls.COURSE_LIST_CACHE_KEY)
log.info("Invalidated cached list of restricted courses.")
for access_point in ['enrollment', 'courseware']:
msg_cache_key = cls.MESSAGE_URL_CACHE_KEY.format(
access_point=access_point,
course_key=course_key
)
cache.delete(msg_cache_key)
log.info("Invalidated cached messaging URLs ")
class Country(models.Model):
......@@ -216,15 +365,18 @@ class CountryAccessRule(models.Model):
"""
WHITELIST_RULE = 'whitelist'
BLACKLIST_RULE = 'blacklist'
RULE_TYPE_CHOICES = (
(WHITE_LIST, 'Whitelist (allow only these countries)'),
(BLACK_LIST, 'Blacklist (block these countries)'),
(WHITELIST_RULE, 'Whitelist (allow only these countries)'),
(BLACKLIST_RULE, 'Blacklist (block these countries)'),
)
rule_type = models.CharField(
max_length=255,
choices=RULE_TYPE_CHOICES,
default=BLACK_LIST,
default=BLACKLIST_RULE,
help_text=ugettext_lazy(
u"Whether to include or exclude the given course. "
u"If whitelist countries are specified, then ONLY users from whitelisted countries "
......@@ -243,15 +395,7 @@ class CountryAccessRule(models.Model):
help_text=ugettext_lazy(u"The country to which this rule applies.")
)
@classmethod
def cache_key_for_consolidated_countries(cls, course_id):
"""
Args:
course_id (str): course_id to look for
Returns:
Consolidated list of accessible countries for given course
"""
return "{}/allowed/countries".format(course_id)
CACHE_KEY = u"embargo.allowed_countries.{course_key}"
@classmethod
def check_country_access(cls, course_id, country):
......@@ -267,10 +411,11 @@ class CountryAccessRule(models.Model):
True if country found in allowed country
otherwise check given country exists in list
"""
allowed_countries = cache.get(cls.cache_key_for_consolidated_countries(course_id))
cache_key = cls.CACHE_KEY.format(course_key=course_id)
allowed_countries = cache.get(cache_key)
if not allowed_countries:
allowed_countries = cls._get_country_access_list(course_id)
cache.set(cls.cache_key_for_consolidated_countries(course_id), allowed_countries)
cache.set(cache_key, allowed_countries)
return country == '' or country in allowed_countries
......@@ -298,9 +443,9 @@ class CountryAccessRule(models.Model):
# Filter the rules into a whitelist and blacklist in one pass
for rule in rules_for_course:
if rule.rule_type == 'whitelist':
if rule.rule_type == cls.WHITELIST_RULE:
whitelist_countries.add(rule.country.country.code)
elif rule.rule_type == 'blacklist':
elif rule.rule_type == cls.BLACKLIST_RULE:
blacklist_countries.add(rule.country.country.code)
# If there are no whitelist countries, default to all countries
......@@ -312,30 +457,23 @@ class CountryAccessRule(models.Model):
return list(whitelist_countries - blacklist_countries)
def __unicode__(self):
if self.rule_type == WHITE_LIST:
if self.rule_type == self.WHITELIST_RULE:
return _(u"Whitelist {country} for {course}").format(
course=unicode(self.restricted_course.course_key),
country=unicode(self.country),
)
elif self.rule_type == BLACK_LIST:
elif self.rule_type == self.BLACKLIST_RULE:
return _(u"Blacklist {country} for {course}").format(
course=unicode(self.restricted_course.course_key),
country=unicode(self.country),
)
def save(self, *args, **kwargs):
"""
Clear the cached value when saving a entry
"""
super(CountryAccessRule, self).save(*args, **kwargs)
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
def delete(self, using=None):
"""
clear the cached value when saving a entry
"""
super(CountryAccessRule, self).delete()
cache.delete(self.cache_key_for_consolidated_countries(unicode(self.restricted_course.course_key)))
@classmethod
def invalidate_cache_for_course(cls, course_key):
"""Invalidate the cache. """
cache_key = cls.CACHE_KEY.format(course_key=course_key)
cache.delete(cache_key)
log.info("Invalidated country access list for course %s", course_key)
class Meta:
"""a course can be added with either black or white list. """
......@@ -347,6 +485,132 @@ class CountryAccessRule(models.Model):
)
def invalidate_country_rule_cache(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Invalidate cached rule information on changes to the rule models.
We need to handle this in a Django receiver, because Django admin
doesn't always call the model's `delete()` method directly during
a bulk delete operation.
Arguments:
sender: Not used, but required by Django receivers.
instance (RestrictedCourse or CountryAccessRule): The instance
being saved or deleted.
"""
if isinstance(instance, RestrictedCourse):
# If a restricted course changed, we need to update the list
# of which courses are restricted as well as any rules
# associated with the course.
RestrictedCourse.invalidate_cache_for_course(instance.course_key)
CountryAccessRule.invalidate_cache_for_course(instance.course_key)
if isinstance(instance, CountryAccessRule):
try:
restricted_course = instance.restricted_course
except RestrictedCourse.DoesNotExist:
# If the restricted course and its rules are being deleted,
# the restricted course may not exist at this point.
# However, the cache should have been invalidated
# when the restricted course was deleted.
pass
else:
# Invalidate the cache of countries for the course.
CountryAccessRule.invalidate_cache_for_course(restricted_course.course_key)
# Hook up the cache invalidation receivers to the appropriate
# post_save and post_delete signals.
post_save.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
post_save.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
post_delete.connect(invalidate_country_rule_cache, sender=CountryAccessRule)
post_delete.connect(invalidate_country_rule_cache, sender=RestrictedCourse)
class CourseAccessRuleHistory(models.Model):
"""History of course access rule changes. """
timestamp = models.DateTimeField(db_index=True, auto_now_add=True)
course_key = CourseKeyField(max_length=255, db_index=True)
snapshot = models.TextField(null=True, blank=True)
DELETED_PLACEHOLDER = "DELETED"
@classmethod
def save_snapshot(cls, restricted_course, deleted=False):
"""Save a snapshot of access rules for a course.
Arguments:
restricted_course (RestrictedCourse)
Keyword Arguments:
deleted (boolean): If True, the restricted course
is about to be deleted. Create a placeholder
snapshot recording that the course and all its
rules was deleted.
Returns:
None
"""
course_key = restricted_course.course_key
# At the point this is called, the access rules may not have
# been deleted yet. When the rules *are* deleted, the
# restricted course entry may no longer exist, so we
# won't be able to take a snapshot of the rules.
# To handle this, we save a placeholder "DELETED" entry
# so that it's clear in the audit that the restricted
# course (along with all its rules) was deleted.
snapshot = (
CourseAccessRuleHistory.DELETED_PLACEHOLDER if deleted
else json.dumps(restricted_course.snapshot())
)
cls.objects.create(
course_key=course_key,
snapshot=snapshot
)
@staticmethod
def snapshot_post_save_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Create a snapshot of course access rules when the rules are updated. """
if isinstance(instance, RestrictedCourse):
CourseAccessRuleHistory.save_snapshot(instance)
elif isinstance(instance, CountryAccessRule):
CourseAccessRuleHistory.save_snapshot(instance.restricted_course)
@staticmethod
def snapshot_post_delete_receiver(sender, instance, **kwargs): # pylint: disable=unused-argument
"""Create a snapshot of course access rules when rules are deleted. """
if isinstance(instance, RestrictedCourse):
CourseAccessRuleHistory.save_snapshot(instance, deleted=True)
elif isinstance(instance, CountryAccessRule):
try:
restricted_course = instance.restricted_course
except RestrictedCourse.DoesNotExist:
# When Django admin deletes a restricted course, it will
# also delete the rules associated with that course.
# At this point, we can't access the restricted course
# from the rule beause it may already have been deleted.
# If this happens, we don't need to record anything,
# since we already record a placeholder "DELETED"
# entry when the restricted course record is deleted.
pass
else:
CourseAccessRuleHistory.save_snapshot(restricted_course)
class Meta: # pylint: disable=missing-docstring,old-style-class
get_latest_by = 'timestamp'
# Connect the signals to the receivers so we record a history
# of changes to the course access rules.
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=RestrictedCourse)
post_save.connect(CourseAccessRuleHistory.snapshot_post_save_receiver, sender=CountryAccessRule)
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=RestrictedCourse)
post_delete.connect(CourseAccessRuleHistory.snapshot_post_delete_receiver, sender=CountryAccessRule)
class IPFilter(ConfigurationModel):
"""
Register specific IP addresses to explicitly block or unblock.
......
"""Utilities for writing unit tests that involve course embargos. """
import contextlib
import mock
import pygeoip
from django.core.urlresolvers import reverse
from django.core.cache import cache
from embargo.models import Country, CountryAccessRule, RestrictedCourse
@contextlib.contextmanager
def restrict_course(course_key, access_point="enrollment"):
"""Simulate that a course is restricted.
This does two things:
1) Configures country access rules so that the course is restricted.
2) Mocks the GeoIP call so the user appears to be coming
from a country that's blocked from the course.
This is useful for tests that need to verify
that restricted users won't be able to access
particular views.
Arguments:
course_key (CourseKey): The location of the course to block.
Keyword Arguments:
access_point (str): Either "courseware" or "enrollment"
Yields:
str: A URL to the page in the embargo app that explains
why the user was blocked.
Example Usage:
>>> with restrict_course(course_key) as redirect_url:
>>> # The client will appear to be coming from
>>> # an IP address that is blocked.
>>> resp = self.client.get(url)
>>> self.assertRedirects(resp, redirect_url)
"""
# Clear the cache to ensure that previous tests don't interfere
# with this test.
cache.clear()
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
# Remove all existing rules for the course
CountryAccessRule.objects.all().delete()
# Create the country object
# Ordinarily, we'd create models for every country,
# but that would slow down the test suite.
country, __ = Country.objects.get_or_create(country='IR')
# Create a model for the restricted course
restricted_course, __ = RestrictedCourse.objects.get_or_create(course_key=course_key)
restricted_course.enroll_msg_key = 'default'
restricted_course.access_msg_key = 'default'
restricted_course.save()
# Ensure that there is a blacklist rule for the country
CountryAccessRule.objects.get_or_create(
restricted_course=restricted_course,
country=country,
rule_type='blacklist'
)
# Simulate that the user is coming from the blacklisted country
mock_ip.return_value = 'IR'
# Yield the redirect url so the tests don't need to know
# the embargo messaging URL structure.
redirect_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': 'default'
}
)
yield redirect_url
"""
Tests for EmbargoMiddleware
"""
import mock
import unittest
import pygeoip
import ddt
from django.conf import settings
from django.test.utils import override_settings
from django.core.cache import cache
from django.db import connection, transaction
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import (
ModuleStoreTestCase, mixed_store_config
)
from embargo.models import (
RestrictedCourse, Country, CountryAccessRule,
)
from util.testing import UrlResetMixin
from embargo import api as embargo_api
from embargo.exceptions import InvalidAccessPoint
from mock import patch
# Since we don't need any XML course fixtures, use a modulestore configuration
# that disables the XML modulestore.
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False)
@ddt.ddt
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoCheckAccessApiTests(ModuleStoreTestCase):
"""Test the embargo API calls to determine whether a user has access. """
def setUp(self):
super(EmbargoCheckAccessApiTests, self).setUp()
self.course = CourseFactory.create()
self.user = UserFactory.create()
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course.id)
Country.objects.create(country='US')
Country.objects.create(country='IR')
Country.objects.create(country='CU')
# Clear the cache to prevent interference between tests
cache.clear()
@ddt.data(
# IP country, profile_country, blacklist, whitelist, allow_access
('US', None, [], [], True),
('IR', None, ['IR', 'CU'], [], False),
('US', 'IR', ['IR', 'CU'], [], False),
('IR', 'IR', ['IR', 'CU'], [], False),
('US', None, [], ['US'], True),
('IR', None, [], ['US'], False),
('US', 'IR', [], ['US'], False),
)
@ddt.unpack
def test_country_access_rules(self, ip_country, profile_country, blacklist, whitelist, allow_access):
# Configure the access rules
for whitelist_country in whitelist:
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.WHITELIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country=whitelist_country)
)
for blacklist_country in blacklist:
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country=blacklist_country)
)
# Configure the user's profile country
if profile_country is not None:
self.user.profile.country = profile_country
self.user.profile.save()
# Appear to make a request from an IP in a particular country
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
mock_ip.return_value = ip_country
# Call the API. Note that the IP address we pass in doesn't
# matter, since we're injecting a mock for geo-location
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
# Verify that the access rules were applied correctly
self.assertEqual(result, allow_access)
def test_no_user_has_access(self):
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
# The user is set to None, because the user has not been authenticated.
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
self.assertTrue(result)
def test_no_user_blocked(self):
CountryAccessRule.objects.create(
rule_type=CountryAccessRule.BLACKLIST_RULE,
restricted_course=self.restricted_course,
country=Country.objects.get(country='US')
)
with mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr') as mock_ip:
mock_ip.return_value = 'US'
# The user is set to None, because the user has not been authenticated.
result = embargo_api.check_course_access(self.course.id, ip_address='0.0.0.0')
self.assertFalse(result)
def test_course_not_restricted(self):
# No restricted course model for this course key,
# so all access checks should be skipped.
unrestricted_course = CourseFactory.create()
with self.assertNumQueries(1):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
# The second check should require no database queries
with self.assertNumQueries(0):
embargo_api.check_course_access(unrestricted_course.id, user=self.user, ip_address='0.0.0.0')
def test_ip_v6(self):
# Test the scenario that will go through every check
# (restricted course, but pass all the checks)
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='FE80::0202:B3FF:FE1E:8329')
self.assertTrue(result)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_profile_country_db_null(self):
# Django country fields treat NULL values inconsistently.
# When saving a profile with country set to None, Django saves an empty string to the database.
# However, when the country field loads a NULL value from the database, it sets
# `country.code` to `None`. This caused a bug in which country values created by
# the original South schema migration -- which defaulted to NULL -- caused a runtime
# exception when the embargo middleware treated the value as a string.
# In order to simulate this behavior, we can't simply set `profile.country = None`.
# (because when we save it, it will set the database field to an empty string instead of NULL)
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
connection.cursor().execute(query, [str(self.user.profile.id)])
transaction.commit_unless_managed()
# Verify that we can check the user's access without error
result = embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
self.assertTrue(result)
def test_caching(self):
# Test the scenario that will go through every check
# (restricted course, but pass all the checks)
# This is the worst case, so it will hit all of the
# caching code.
with self.assertNumQueries(3):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
with self.assertNumQueries(0):
embargo_api.check_course_access(self.course.id, user=self.user, ip_address='0.0.0.0')
@ddt.ddt
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoMessageUrlApiTests(UrlResetMixin, ModuleStoreTestCase):
"""Test the embargo API calls for retrieving the blocking message URLs. """
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def setUp(self):
super(EmbargoMessageUrlApiTests, self).setUp('embargo')
self.course = CourseFactory.create()
def tearDown(self):
cache.clear()
@ddt.data(
('enrollment', '/embargo/blocked-message/enrollment/embargo/'),
('courseware', '/embargo/blocked-message/courseware/embargo/')
)
@ddt.unpack
def test_message_url_path(self, access_point, expected_url_path):
self._restrict_course(self.course.id)
# Retrieve the URL to the blocked message page
url_path = embargo_api.message_url_path(self.course.id, access_point)
self.assertEqual(url_path, expected_url_path)
def test_message_url_path_caching(self):
self._restrict_course(self.course.id)
# The first time we retrieve the message, we'll need
# to hit the database.
with self.assertNumQueries(2):
embargo_api.message_url_path(self.course.id, "enrollment")
# The second time, we should be using cached values
with self.assertNumQueries(0):
embargo_api.message_url_path(self.course.id, "enrollment")
@ddt.data('enrollment', 'courseware')
def test_message_url_path_no_restrictions_for_course(self, access_point):
# No restrictions for the course
url_path = embargo_api.message_url_path(self.course.id, access_point)
# Use a default path
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
def test_invalid_access_point(self):
with self.assertRaises(InvalidAccessPoint):
embargo_api.message_url_path(self.course.id, "invalid")
def test_message_url_stale_cache(self):
# Retrieve the URL once, populating the cache with the list
# of restricted courses.
self._restrict_course(self.course.id)
embargo_api.message_url_path(self.course.id, 'courseware')
# Delete the restricted course entry
RestrictedCourse.objects.get(course_key=self.course.id).delete()
# Clear the message URL cache
message_cache_key = (
'embargo.message_url_path.courseware.{course_key}'
).format(course_key=self.course.id)
cache.delete(message_cache_key)
# Try again. Even though the cache results are stale,
# we should still get a valid URL.
url_path = embargo_api.message_url_path(self.course.id, 'courseware')
self.assertEqual(url_path, '/embargo/blocked-message/courseware/default/')
def _restrict_course(self, course_key):
"""Restrict the user from accessing the course. """
country = Country.objects.create(country='us')
restricted_course = RestrictedCourse.objects.create(
course_key=course_key,
enroll_msg_key='embargo',
access_msg_key='embargo'
)
CountryAccessRule.objects.create(
restricted_course=restricted_course,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)
......@@ -2,27 +2,24 @@
Tests for EmbargoMiddleware with CountryAccessRules
"""
import mock
import pygeoip
import unittest
from mock import patch
import ddt
from django.db import connection, transaction
from django.core.urlresolvers import reverse
from django.conf import settings
import ddt
from django.core.cache import cache as django_cache
from util.testing import UrlResetMixin
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import (
ModuleStoreTestCase, mixed_store_config
)
from config_models.models import cache as config_cache
# Explicitly import the cache from ConfigurationModel so we can reset it after each test
from config_models.models import cache
from embargo.models import (
RestrictedCourse, Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
)
from django_countries import countries
from embargo.models import RestrictedCourse, IPFilter
from embargo.test_utils import restrict_course
# Since we don't need any XML course fixtures, use a modulestore configuration
......@@ -32,256 +29,147 @@ MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, incl
@ddt.ddt
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoCountryAccessRulesTests(ModuleStoreTestCase):
"""
Tests of EmbargoApi
"""
def setUp(self):
super(EmbargoCountryAccessRulesTests, self).setUp()
self.user = UserFactory(username='fred', password='secret')
self.client.login(username='fred', password='secret')
self.embargo_course1 = CourseFactory.create()
self.embargo_course1.save()
self.embargo_course2 = CourseFactory.create()
self.embargo_course2.save()
self.regular_course = CourseFactory.create(org="Regular")
self.regular_course.save()
self.embargoed_course_whitelisted = '/courses/' + self.embargo_course1.id.to_deprecated_string() + '/info'
self.embargoed_course_blacklisted = '/courses/' + self.embargo_course2.id.to_deprecated_string() + '/info'
self.regular_page = '/courses/' + self.regular_course.id.to_deprecated_string() + '/info'
restricted_course_1 = RestrictedCourse.objects.create(course_key=self.embargo_course1.id)
restricted_course_2 = RestrictedCourse.objects.create(course_key=self.embargo_course2.id)
all_countries = [Country(country=code[0]) for code in list(countries)]
Country.objects.bulk_create(all_countries)
country_access_white_rules = [
CountryAccessRule(
restricted_course=restricted_course_1,
rule_type=WHITE_LIST,
country=Country.objects.get(country='US')
),
CountryAccessRule(
restricted_course=restricted_course_1,
rule_type=WHITE_LIST,
country=Country.objects.get(country='NZ')
)
]
CountryAccessRule.objects.bulk_create(country_access_white_rules)
country_access_black_rules = [
CountryAccessRule(
restricted_course=restricted_course_2,
rule_type=BLACK_LIST,
country=Country.objects.get(country='CU')
),
CountryAccessRule(
restricted_course=restricted_course_2,
rule_type=BLACK_LIST,
country=Country.objects.get(country='IR')
)
]
CountryAccessRule.objects.bulk_create(country_access_black_rules)
# Text from lms/templates/static_templates/embargo.html
self.embargo_text = "Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this course." # pylint: disable=line-too-long
self.patcher = mock.patch.object(pygeoip.GeoIP, 'country_code_by_addr', self.mock_country_code_by_addr)
self.patcher.start()
class EmbargoMiddlewareAccessTests(UrlResetMixin, ModuleStoreTestCase):
"""Tests of embargo middleware country access rules.
def tearDown(self):
# Explicitly clear ConfigurationModel's cache so tests have a clear cache
# and don't interfere with each other
cache.clear()
self.patcher.stop()
There are detailed unit tests for the rule logic in
`test_api.py`; here, we're mainly testing the integration
with middleware
def mock_country_code_by_addr(self, ip_addr):
"""
making a lists of countries which will be use in country access rules.
if incoming request's ip belongs to this dict then related country will return.
for one course CU and IR added as blacklist in course access rules.
for one course US and NZ added as whitelist in course access rules.
"""
ip_dict = {
'1.0.0.0': 'CU',
'2.0.0.0': 'IR',
'3.0.0.0': 'US',
'4.0.0.0': 'NZ'
}
return ip_dict.get(ip_addr, 'FR')
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data('1.0.0.0', '2.0.0.0')
def test_course_access_rules_with_black_rule_country_by_user_ip(self, ip_address):
# Accessing an embargoed page from a user ip whose origin is added as
# blacklist in course access rules should be redirected.
# any other IP should be success
"""
USERNAME = 'fred'
PASSWORD = 'secret'
# Following the redirect should give us the embargo page
response = self.client.get(
self.embargoed_course_blacklisted,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def setUp(self):
super(EmbargoMiddlewareAccessTests, self).setUp('embargo')
self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
self.course = CourseFactory.create()
self.client.login(username=self.USERNAME, password=self.PASSWORD)
self.courseware_url = reverse(
'course_root',
kwargs={'course_id': unicode(self.course.id)}
)
self.assertEqual(response.status_code, 302)
self.non_courseware_url = reverse('dashboard')
# Clear the cache to avoid interference between tests
django_cache.clear()
config_cache.clear()
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_blocked(self):
with restrict_course(self.course.id, access_point='courseware') as redirect_url:
response = self.client.get(self.courseware_url)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_allowed(self):
# Add the course to the list of restricted courses
# but don't create any access rules
RestrictedCourse.objects.create(course_key=self.course.id)
# Expect that we can access courseware
response = self.client.get(self.courseware_url)
self.assertEqual(response.status_code, 200)
# Following the redirect should give us the embargo page
response = self.client.get(
self.embargoed_course_blacklisted,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address,
follow=True
)
self.assertIn(self.embargo_text, response.content)
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_non_courseware_url(self):
with restrict_course(self.course.id):
response = self.client.get(self.non_courseware_url)
self.assertEqual(response.status_code, 200)
# accesssing blacklist course from any other country ip should be success
response = self.client.get(
self.embargoed_course_blacklisted,
HTTP_X_FORWARDED_FOR='5.0.0.1',
REMOTE_ADDR='5.0.0.1'
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data(
# request_ip, blacklist, whitelist, is_enabled, allow_access
('173.194.123.35', ['173.194.123.35'], [], True, False),
('173.194.123.35', ['173.194.0.0/16'], [], True, False),
('173.194.123.35', ['127.0.0.0/32', '173.194.0.0/16'], [], True, False),
('173.195.10.20', ['173.194.0.0/16'], [], True, True),
('173.194.123.35', ['173.194.0.0/16'], ['173.194.0.0/16'], True, False),
('173.194.123.35', [], ['173.194.0.0/16'], True, True),
('192.178.2.3', [], ['173.194.0.0/16'], True, True),
('173.194.123.35', ['173.194.123.35'], [], False, True),
)
@ddt.unpack
def test_ip_access_rules(self, request_ip, blacklist, whitelist, is_enabled, allow_access):
# Ensure that IP blocking works for anonymous users
self.client.logout()
# Set up the IP rules
IPFilter.objects.create(
blacklist=", ".join(blacklist),
whitelist=", ".join(whitelist),
enabled=is_enabled
)
self.assertEqual(response.status_code, 200)
# accesssing whitelist course from these should give us the embargo page
# Check that access is enforced
response = self.client.get(
self.embargoed_course_whitelisted,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
"/",
HTTP_X_FORWARDED_FOR=request_ip,
REMOTE_ADDR=request_ip
)
self.assertEqual(response.status_code, 302)
# Accessing a regular page from these IP should be success
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data('3.0.0.0', '4.0.0.0', "7.0.0.1", "2001:250::")
def test_course_access_rules_with_white_rule_country_by_user_ip(self, ip_address):
# Accessing an embargoed page from a user ip whose origin is added as
# white in course access rules should succeed. any other ip should be fail
response = self.client.get(
self.embargoed_course_whitelisted,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
)
if ip_address in ['3.0.0.0', '4.0.0.0']:
if allow_access:
self.assertEqual(response.status_code, 200)
else:
self.assertEqual(response.status_code, 302)
redirect_url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': 'courseware',
'message_key': 'embargo'
}
)
self.assertRedirects(response, redirect_url)
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data(
('courseware', 'default'),
('courseware', 'embargo'),
('enrollment', 'default'),
('enrollment', 'embargo')
)
@ddt.unpack
def test_always_allow_access_to_embargo_messages(self, access_point, msg_key):
# Blacklist an IP address
IPFilter.objects.create(
blacklist="192.168.10.20",
enabled=True
)
# access the blacklisted course should give success
url = reverse(
'embargo_blocked_message',
kwargs={
'access_point': access_point,
'message_key': msg_key
}
)
response = self.client.get(
self.embargoed_course_blacklisted,
HTTP_X_FORWARDED_FOR=ip_address,
REMOTE_ADDR=ip_address
url,
HTTP_X_FORWARDED_FOR="192.168.10.20",
REMOTE_ADDR="192.168.10.20"
)
self.assertEqual(response.status_code, 200)
# Accessing a regular page should success
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR=ip_address, REMOTE_ADDR=ip_address)
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_embargo_profile_country_db_null(self):
# Django country fields treat NULL values inconsistently.
# When saving a profile with country set to None, Django saves an empty string to the database.
# However, when the country field loads a NULL value from the database, it sets
# `country.code` to `None`. This caused a bug in which country values created by
# the original South schema migration -- which defaulted to NULL -- caused a runtime
# exception when the embargo middleware treated the value as a string.
# In order to simulate this behavior, we can't simply set `profile.country = None`.
# (because when we save it, it will set the database field to an empty string instead of NULL)
query = "UPDATE auth_userprofile SET country = NULL WHERE id = %s"
connection.cursor().execute(query, [str(self.user.profile.id)])
transaction.commit_unless_managed()
# Attempt to access an embargoed course
# Verify that the student can access the page without an error
response = self.client.get(self.embargoed_course_blacklisted)
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
def test_regular_course_accessible_from_every_where(self, profile_country):
# regular course is accessible even when ENABLE_COUNTRY_ACCESS flag is true
profile = self.user.profile
profile.country = profile_country
profile.save()
response = self.client.get(self.regular_page)
# Course is accessible from all countries.
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data("", "US", "CA", "AF", "NZ", "IR")
def test_embargo_course_whitelisted_with_profile_country(self, profile_country):
# course is emabargoed and has white list countries.
# but user ip belongs to US but profile country is blacklist
# only white list country can access the course.
profile = self.user.profile
profile.country = profile_country
profile.save()
# adding the US IP so the _country_code_from_ip() get passed
response = self.client.get(
self.embargoed_course_whitelisted,
HTTP_X_FORWARDED_FOR='3.0.0.0',
REMOTE_ADDR='3.0.0.0'
@patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_whitelist_ip_skips_country_access_checks(self):
# Whitelist an IP address
IPFilter.objects.create(
whitelist="192.168.10.20",
enabled=True
)
# Course is whitelisted against US,NZ so all other countries will be disallowed
if profile_country in ["CA", "AF", "IR"]:
self.assertRedirects(response, reverse('embargo'))
self.assertEqual(response.status_code, 302)
else:
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
def test_embargo_course_blacklisted_with_profile_country(self, profile_country):
# if course is emabargoed and has black list countries ( CU , IR ).
# then users from these countries can't access this course.
# any user from other than these countries can access.
profile = self.user.profile
profile.country = profile_country
profile.save()
response = self.client.get(self.embargoed_course_blacklisted)
if profile_country in ["", "US", "CA", "NZ"]:
self.assertEqual(response.status_code, 200)
else:
embargo_url = reverse('embargo')
self.assertRedirects(response, embargo_url)
self.assertEqual(response.status_code, 302)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
@ddt.data("", "US", "CA", "NZ", "IR", "CU")
def test_embargo_course_without_any_rules_list(self, profile_country):
# if course is emabargoed but without whitelist and blacklist
# then course can be accessible from any where
profile = self.user.profile
profile.country = profile_country
profile.save()
embargo_course3 = CourseFactory.create()
embargo_course3.save()
RestrictedCourse(course_key=embargo_course3.id).save()
embargoed_course_page = '/courses/' + embargo_course3.id.to_deprecated_string() + '/info'
# Set up country access rules so the user would
# be restricted from the course.
with restrict_course(self.course.id):
# Make a request from the whitelisted IP address
response = self.client.get(
self.courseware_url,
HTTP_X_FORWARDED_FOR="192.168.10.20",
REMOTE_ADDR="192.168.10.20"
)
response = self.client.get(embargoed_course_page)
# Expect that we were still able to access the page,
# even though we would have been blocked by country
# access rules.
self.assertEqual(response.status_code, 200)
@mock.patch.dict(settings.FEATURES, {'ENABLE_COUNTRY_ACCESS': True})
def test_embargo_profile_country_cache(self):
# Warm the cache
with self.assertNumQueries(24):
self.client.get(self.embargoed_course_blacklisted)
# Access the page multiple times, but expect that we hit
# the database to check the user's profile only once
with self.assertNumQueries(9):
self.client.get(self.embargoed_course_blacklisted)
"""Test of models for embargo middleware app"""
"""Test of models for embargo app"""
import json
from django.test import TestCase
from django.db.utils import IntegrityError
from opaque_keys.edx.locations import SlashSeparatedCourseKey
from opaque_keys.edx.locator import CourseLocator
from embargo.models import (
EmbargoedCourse, EmbargoedState, IPFilter, RestrictedCourse,
Country, CountryAccessRule, WHITE_LIST, BLACK_LIST
Country, CountryAccessRule, CourseAccessRuleHistory
)
class EmbargoModelsTest(TestCase):
"""Test each of the 3 models in embargo.models"""
def test_course_embargo(self):
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
# Test that course is not authorized by default
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
......@@ -22,8 +23,8 @@ class EmbargoModelsTest(TestCase):
# Now, course should be embargoed
self.assertTrue(EmbargoedCourse.is_embargoed(course_id))
self.assertEquals(
cauth.__unicode__(),
"Course 'abc/123/doremi' is Embargoed"
unicode(cauth),
u"Course '{course_id}' is Embargoed".format(course_id=course_id)
)
# Unauthorize by explicitly setting email_enabled to False
......@@ -32,8 +33,8 @@ class EmbargoModelsTest(TestCase):
# Test that course is now unauthorized
self.assertFalse(EmbargoedCourse.is_embargoed(course_id))
self.assertEquals(
cauth.__unicode__(),
"Course 'abc/123/doremi' is Not Embargoed"
unicode(cauth),
u"Course '{course_id}' is Not Embargoed".format(course_id=course_id)
)
def test_state_embargo(self):
......@@ -101,18 +102,18 @@ class EmbargoModelsTest(TestCase):
class RestrictedCourseTest(TestCase):
"""Test unicode values tests and cache functionality"""
"""Test RestrictedCourse model. """
def test_unicode_values(self):
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
restricted_course = RestrictedCourse.objects.create(course_key=course_id)
self.assertEquals(
restricted_course.__unicode__(),
"abc/123/doremi"
unicode(restricted_course),
unicode(course_id)
)
def test_restricted_course_cache_with_save_delete(self):
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
RestrictedCourse.objects.create(course_key=course_id)
# Warm the cache
......@@ -124,7 +125,7 @@ class RestrictedCourseTest(TestCase):
RestrictedCourse.is_restricted_course(course_id)
# add new the course so the cache must get delete and again hit the db
new_course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
new_course_id = CourseLocator('def', '123', 'doremi')
RestrictedCourse.objects.create(course_key=new_course_id)
with self.assertNumQueries(1):
RestrictedCourse.is_restricted_course(new_course_id)
......@@ -146,45 +147,42 @@ class RestrictedCourseTest(TestCase):
class CountryTest(TestCase):
"""Test unicode values test"""
"""Test Country model. """
def test_unicode_values(self):
country = Country.objects.create(country='NZ')
self.assertEquals(
country.__unicode__(),
"New Zealand (NZ)"
)
self.assertEquals(unicode(country), "New Zealand (NZ)")
class CountryAccessRuleTest(TestCase):
"""Test unicode values tests and unique-together contraint"""
"""Test CountryAccessRule model. """
def test_unicode_values(self):
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
access_rule = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=WHITE_LIST,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
self.assertEquals(
access_rule.__unicode__(),
"Whitelist New Zealand (NZ) for abc/123/doremi"
unicode(access_rule),
u"Whitelist New Zealand (NZ) for {course_key}".format(course_key=course_id)
)
course_id = SlashSeparatedCourseKey('def', '123', 'doremi')
course_id = CourseLocator('def', '123', 'doremi')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
access_rule = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=BLACK_LIST,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)
self.assertEquals(
access_rule.__unicode__(),
"Blacklist New Zealand (NZ) for def/123/doremi"
unicode(access_rule),
u"Blacklist New Zealand (NZ) for {course_key}".format(course_key=course_id)
)
def test_unique_together_constraint(self):
......@@ -192,31 +190,31 @@ class CountryAccessRuleTest(TestCase):
Course with specific country can be added either as whitelist or blacklist
trying to add with both types will raise error
"""
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=WHITE_LIST,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
with self.assertRaises(IntegrityError):
CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=BLACK_LIST,
rule_type=CountryAccessRule.BLACKLIST_RULE,
country=country
)
def test_country_access_list_cache_with_save_delete(self):
course_id = SlashSeparatedCourseKey('abc', '123', 'doremi')
course_id = CourseLocator('abc', '123', 'doremi')
country = Country.objects.create(country='NZ')
restricted_course1 = RestrictedCourse.objects.create(course_key=course_id)
course = CountryAccessRule.objects.create(
restricted_course=restricted_course1,
rule_type=WHITE_LIST,
rule_type=CountryAccessRule.WHITELIST_RULE,
country=country
)
......@@ -227,8 +225,123 @@ class CountryAccessRuleTest(TestCase):
with self.assertNumQueries(0):
CountryAccessRule.check_country_access(course_id, 'NZ')
# deleting an object will delete cache also.and hit db on
# get the country access lists for course
# Deleting an object will invalidate the cache
course.delete()
with self.assertNumQueries(1):
CountryAccessRule.check_country_access(course_id, 'NZ')
class CourseAccessRuleHistoryTest(TestCase):
"""Test course access rule history. """
def setUp(self):
self.course_key = CourseLocator('edx', 'DemoX', 'Demo_Course')
self.restricted_course = RestrictedCourse.objects.create(course_key=self.course_key)
self.countries = {
'US': Country.objects.create(country='US'),
'AU': Country.objects.create(country='AU')
}
def test_course_access_history_no_rules(self):
self._assert_history([])
self.restricted_course.delete()
self._assert_history_deleted()
def test_course_access_history_with_rules(self):
# Add one rule
us_rule = CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['US'],
rule_type=CountryAccessRule.WHITELIST_RULE
)
self._assert_history([('US', 'whitelist')])
# Add another rule
au_rule = CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['AU'],
rule_type=CountryAccessRule.BLACKLIST_RULE
)
self._assert_history([
('US', 'whitelist'),
('AU', 'blacklist')
])
# Delete the first rule
us_rule.delete()
self._assert_history([('AU', 'blacklist')])
# Delete the second rule
au_rule.delete()
self._assert_history([])
def test_course_access_history_delete_all(self):
# Create a rule
CountryAccessRule.objects.create(
restricted_course=self.restricted_course,
country=self.countries['US'],
rule_type=CountryAccessRule.WHITELIST_RULE
)
# Delete the course (and, implicitly, all the rules)
self.restricted_course.delete()
self._assert_history_deleted()
def test_course_access_history_change_message(self):
# Change the message key
self.restricted_course.enroll_msg_key = 'embargo'
self.restricted_course.access_msg_key = 'embargo'
self.restricted_course.save()
# Expect a history entry with the changed keys
self._assert_history([], enroll_msg='embargo', access_msg='embargo')
def _assert_history(self, country_rules, enroll_msg='default', access_msg='default'):
"""Check the latest history entry.
Arguments:
country_rules (list): List of rules, each of which are tuples
of the form `(country_code, rule_type)`.
Keyword Arguments:
enroll_msg (str): The expected enrollment message key.
access_msg (str): The expected access message key.
Raises:
AssertionError
"""
record = CourseAccessRuleHistory.objects.latest()
# Check that the record is for the correct course
self.assertEqual(record.course_key, self.course_key)
# Load the history entry and verify the message keys
snapshot = json.loads(record.snapshot)
self.assertEqual(snapshot['enroll_msg'], enroll_msg)
self.assertEqual(snapshot['access_msg'], access_msg)
# For each rule, check that there is an entry
# in the history record.
for (country, rule_type) in country_rules:
self.assertIn(
{
'country': country,
'rule_type': rule_type
},
snapshot['country_rules']
)
# Check that there are no duplicate entries
self.assertEqual(len(snapshot['country_rules']), len(country_rules))
def _assert_history_deleted(self):
"""Check the latest history entry for a 'DELETED' placeholder.
Raises:
AssertionError
"""
record = CourseAccessRuleHistory.objects.latest()
self.assertEqual(record.course_key, self.course_key)
self.assertEqual(record.snapshot, "DELETED")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment