Commit a03baee2 by Will Daly

Merge pull request #4976 from edx/will/ecom-128

Embargo based on country code in the user's profile.
parents 12e2d442 31a65661
......@@ -12,7 +12,7 @@ This middleware allows you to:
* Embargoing site (access restriction of the main site)
Embargo can restrict by states and whitelist/blacklist (IP Addresses
(ie. 10.0.0.0) or Networks (ie. 10.0.0.0/24)).
(ie. 10.0.0.0), Networks (ie. 10.0.0.0/24)), or the user profile country.
Usage:
......@@ -30,16 +30,20 @@ EMBARGO_SITE_REDIRECT_URL = 'https://www.edx.org/'
# if EMBARGO_SITE_REDIRECT_URL is missing, a HttpResponseForbidden is returned.
"""
from functools import partial
import logging
import pygeoip
from lazy import lazy
from django.core.exceptions import MiddlewareNotUsed
from django.core.cache import cache
from django.conf import settings
from django.shortcuts import redirect
from django.http import HttpResponseRedirect, HttpResponseForbidden
from ipware.ip import get_ip
from util.request import course_id_from_url
from student.models import unique_id_for_user
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
log = logging.getLogger(__name__)
......@@ -52,16 +56,27 @@ class EmbargoMiddleware(object):
This is configured by creating ``EmbargoedCourse``, ``EmbargoedState``, and
optionally ``IPFilter`` rows in the database, using the django admin site.
"""
# Reasons a user might be blocked.
# These are used to generate info messages in the logs.
REASONS = {
"ip_blacklist": u"Restricting IP address {ip_addr} {from_course} because IP is blacklisted.",
"ip_country": u"Restricting IP address {ip_addr} {from_course} because IP is from country {ip_country}.",
"profile_country": (
u"Restricting user {user_id} {from_course} because "
u"the user set the profile country to {profile_country}."
)
}
def __init__(self):
self.site_enabled = settings.FEATURES.get('SITE_EMBARGOED', False)
# If embargoing is turned off, make this middleware do nothing
if not settings.FEATURES.get('EMBARGO', False) and \
not self.site_enabled:
if not settings.FEATURES.get('EMBARGO', False) and not self.site_enabled:
raise MiddlewareNotUsed()
def process_request(self, request):
"""
Processes embargo requests
Processes embargo requests.
"""
url = request.path
course_id = course_id_from_url(url)
......@@ -69,41 +84,216 @@ class EmbargoMiddleware(object):
# If they're trying to access a course that cares about embargoes
if self.site_enabled or course_is_embargoed:
response = redirect('embargo')
# Set the proper response if site is enabled
if self.site_enabled:
redirect_url = getattr(settings, 'EMBARGO_SITE_REDIRECT_URL', None)
response = HttpResponseRedirect(redirect_url) if redirect_url \
else HttpResponseForbidden('Access Denied')
# If we're having performance issues, add caching here
ip_addr = get_ip(request)
# if blacklisted, immediately fail
if ip_addr in IPFilter.current().blacklist_ips:
if course_is_embargoed:
msg = "Embargo: Restricting IP address %s to course %s because IP is blacklisted." % \
(ip_addr, course_id)
else:
msg = "Embargo: Restricting IP address %s because IP is blacklisted." % ip_addr
log.info(msg)
return response
# ipv6 support
if ip_addr.find(':') >= 0:
country_code_from_ip = pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
# Construct the list of functions that check whether the user is embargoed.
# We wrap each of these functions in a decorator that logs the reason the user
# was blocked.
# Each function should return `True` iff the user is blocked by an embargo.
check_functions = [
self._log_embargo_reason(check_func, course_id, course_is_embargoed)
for check_func in [
partial(self._is_embargoed_by_ip, get_ip(request)),
partial(self._is_embargoed_by_profile_country, request.user)
]
]
# Perform each of the checks
# If the user fails any of the checks, immediately redirect them
# and skip later checks.
for check_func in check_functions:
if check_func():
return self._embargo_redirect_response
# If all the check functions pass, implicitly return None
# so that the middleware processor can continue processing
# the response.
def _is_embargoed_by_ip(self, ip_addr, course_id=u"", course_is_embargoed=False):
"""
Check whether the user is embargoed based on the IP address.
Args:
ip_addr (str): The IP address the request originated from.
Keyword Args:
course_id (unicode): The course the user is trying to access.
course_is_embargoed (boolean): Whether the course the user is accessing has been embargoed.
Returns:
A unicode message if the user is embargoed, otherwise `None`
"""
# If blacklisted, immediately fail
if ip_addr in IPFilter.current().blacklist_ips:
return self.REASONS['ip_blacklist'].format(
ip_addr=ip_addr,
from_course=self._from_course_msg(course_id, course_is_embargoed)
)
# If we're white-listed, then allow access
if ip_addr in IPFilter.current().whitelist_ips:
return None
# Retrieve the country code from the IP address
# and check it against the list of embargoed countries
ip_country = self._country_code_from_ip(ip_addr)
if ip_country in self._embargoed_countries:
return self.REASONS['ip_country'].format(
ip_addr=ip_addr,
ip_country=ip_country,
from_course=self._from_course_msg(course_id, course_is_embargoed)
)
# If none of the other checks caught anything,
# implicitly return None to indicate that the user can access the course
def _is_embargoed_by_profile_country(self, user, course_id="", course_is_embargoed=False):
"""
Check whether the user is embargoed based on the country code in the user's profile.
Args:
user (User): The user attempting to access courseware.
Keyword Args:
course_id (unicode): The course the user is trying to access.
course_is_embargoed (boolean): Whether the course the user is accessing has been embargoed.
Returns:
A unicode message if the user is embargoed, otherwise `None`
"""
cache_key = u'user.{user_id}.profile.country'.format(user_id=user.id)
profile_country = cache.get(cache_key)
if profile_country is None:
profile = getattr(user, 'profile', None)
if profile is not None:
profile_country = profile.country.code.upper()
else:
country_code_from_ip = pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
is_embargoed = country_code_from_ip in EmbargoedState.current().embargoed_countries_list
# Fail if country is embargoed and the ip address isn't explicitly
# whitelisted
if is_embargoed and ip_addr not in IPFilter.current().whitelist_ips:
if course_is_embargoed:
msg = "Embargo: Restricting IP address %s to course %s because IP is from country %s." % \
(ip_addr, course_id, country_code_from_ip)
else:
msg = "Embargo: Restricting IP address %s because IP is from country %s." % \
(ip_addr, country_code_from_ip)
profile_country = ""
cache.set(cache_key, profile_country)
if profile_country in self._embargoed_countries:
return self.REASONS['profile_country'].format(
user_id=unique_id_for_user(user),
profile_country=profile_country,
from_course=self._from_course_msg(course_id, course_is_embargoed)
)
else:
return None
def _country_code_from_ip(self, ip_addr):
"""
Return the country code associated with an IP address.
Handles both IPv4 and IPv6 addresses.
Args:
ip_addr (str): The IP address to look up.
Returns:
str: A 2-letter country code.
"""
if ip_addr.find(':') >= 0:
return pygeoip.GeoIP(settings.GEOIPV6_PATH).country_code_by_addr(ip_addr)
else:
return pygeoip.GeoIP(settings.GEOIP_PATH).country_code_by_addr(ip_addr)
@property
def _embargo_redirect_response(self):
"""
The HTTP response to send when the user is blocked from a course.
This will either be a redirect to a URL configured in Django settings
or a forbidden response.
Returns:
HTTPResponse
"""
response = redirect('embargo')
# Set the proper response if site is enabled
if self.site_enabled:
redirect_url = getattr(settings, 'EMBARGO_SITE_REDIRECT_URL', None)
response = (
HttpResponseRedirect(redirect_url)
if redirect_url
else HttpResponseForbidden('Access Denied')
)
return response
@lazy
def _embargoed_countries(self):
"""
Return the list of 2-letter country codes for embargoed countries.
The result is cached within the scope of the response.
Returns:
list
"""
return EmbargoedState.current().embargoed_countries_list
def _from_course_msg(self, course_id, course_is_embargoed):
"""
Format a message indicating whether the user was blocked from a specific course.
This can be used in info messages, but should not be used in user-facing messages.
Args:
course_id (unicode): The ID of the course being accessed.
course_is_embarged (boolean): Whether the course being accessed is embargoed.
Returns:
unicode
"""
return (
u"from course {course_id}".format(course_id=course_id)
if course_is_embargoed
else u""
)
def _log_embargo_reason(self, check_func, course_id, course_is_embargoed):
"""
Decorator for embargo check functions that will:
* execute the check function
* check whether the user is blocked by an embargo, and if so, log the reason
* return a boolean indicating whether the user was blocked.
Args:
check_func (partial): A function that should return unicode reason if the user
was blocked, otherwise should return None. This function will be passed
`course_id` and `course_is_embarged` kwargs so it can format a detailed
reason message.
course_id (unicode): The ID of the course the user is trying to access.
course_is_embargoed (boolean): Whether the course the user is trying
to access is under an embargo.
Returns:
boolean: True iff the user was blocked by an embargo
"""
def _inner():
# Perform the check and retrieve the reason string.
# The reason will be `None` if the user passes the check and can access the course.
# We pass in the course ID and whether the course is embargoed
# so that the check function can fill in the "reason" message with more specific details.
reason = check_func(
course_id=course_id,
course_is_embargoed=course_is_embargoed
)
# If the reason was `None`, indicate that the user was not blocked.
if reason is None:
return False
# Otherwise, log the reason the user was blocked
# and return True.
else:
msg = u"Embargo: {reason}".format(reason=reason)
log.info(msg)
return response
return True
return _inner
......@@ -6,26 +6,35 @@ import mock
import pygeoip
import unittest
from django.core.urlresolvers import reverse
from django.conf import settings
from django.test import TestCase, Client
from django.test.utils import override_settings
from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE
import ddt
from student.models import CourseEnrollment
from student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import (
ModuleStoreTestCase, mixed_store_config
)
# Explicitly import the cache from ConfigurationModel so we can reset it after each test
from config_models.models import cache
from embargo.models import EmbargoedCourse, EmbargoedState, IPFilter
@override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE)
class EmbargoMiddlewareTests(TestCase):
# Since we don't need any XML course fixtures, use a modulestore configuration
# that disables the XML modulestore.
MODULESTORE_CONFIG = mixed_store_config(settings.COMMON_TEST_DATA_ROOT, {}, include_xml=False)
@ddt.ddt
@override_settings(MODULESTORE=MODULESTORE_CONFIG)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EmbargoMiddlewareTests(ModuleStoreTestCase):
"""
Tests of EmbargoMiddleware
"""
def setUp(self):
self.client = Client()
self.user = UserFactory(username='fred', password='secret')
self.client.login(username='fred', password='secret')
self.embargo_course = CourseFactory.create()
......@@ -69,7 +78,6 @@ class EmbargoMiddlewareTests(TestCase):
}
return ip_dict.get(ip_addr, 'US')
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
def test_countries(self):
# Accessing an embargoed page from a blocked IP should cause a redirect
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='1.0.0.0', REMOTE_ADDR='1.0.0.0')
......@@ -95,7 +103,6 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
def test_countries_ipv6(self):
# Accessing an embargoed page from a blocked IP should cause a redirect
response = self.client.get(self.embargoed_page, HTTP_X_FORWARDED_FOR='2001:1340::', REMOTE_ADDR='2001:1340::')
......@@ -121,7 +128,6 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='2001:250::', REMOTE_ADDR='2001:250::')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
def test_ip_exceptions(self):
# Explicitly whitelist/blacklist some IPs
IPFilter(
......@@ -157,7 +163,6 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
def test_ip_network_exceptions(self):
# Explicitly whitelist/blacklist some IP networks
IPFilter(
......@@ -213,7 +218,54 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@ddt.data(
("", False),
("us", False),
("CU", True),
("Ir", True),
("sy", True),
("sd", True)
)
@ddt.unpack
def test_embargo_profile_country(self, profile_country, is_embargoed):
# Set the country in the user's profile
profile = self.user.profile
profile.country = profile_country
profile.save()
# Attempt to access an embargoed course
response = self.client.get(self.embargoed_page)
# If the user is from an embargoed country, verify that
# they are redirected to the embargo page.
if is_embargoed:
embargo_url = reverse('embargo')
self.assertRedirects(response, embargo_url)
# Otherwise, verify that the student can access the page
else:
self.assertEqual(response.status_code, 200)
# For non-embargoed courses, the student should be able to access
# the page, even if he/she is from an embargoed country.
response = self.client.get(self.regular_page)
self.assertEqual(response.status_code, 200)
def test_embargo_profile_country_cache(self):
# Set the country in the user's profile
profile = self.user.profile
profile.country = "us"
profile.save()
# Warm the cache
with self.assertNumQueries(18):
self.client.get(self.embargoed_page)
# Access the page multiple times, but expect that we hit
# the database to check the user's profile only once
with self.assertNumQueries(12):
self.client.get(self.embargoed_page)
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False})
def test_countries_embargo_off(self):
# When the middleware is turned off, all requests should go through
......@@ -242,7 +294,6 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False, 'SITE_EMBARGOED': True})
def test_embargo_off_embargo_site_on(self):
# When the middleware is turned on with SITE, main site access should be restricted
......@@ -254,7 +305,6 @@ class EmbargoMiddlewareTests(TestCase):
response = self.client.get(self.regular_page, HTTP_X_FORWARDED_FOR='5.0.0.0', REMOTE_ADDR='5.0.0.0')
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
@mock.patch.dict(settings.FEATURES, {'EMBARGO': False, 'SITE_EMBARGOED': True})
@override_settings(EMBARGO_SITE_REDIRECT_URL='https://www.edx.org/')
def test_embargo_off_embargo_site_on_with_redirect_url(self):
......
......@@ -724,7 +724,7 @@ class CourseEnrollment(models.Model):
else:
unenroll_done.send(sender=None, course_enrollment=self)
self.emit_event(EVENT_NAME_ENROLLMENT_DEACTIVATED)
dog_stats_api.increment(
......
......@@ -4,5 +4,5 @@
<%block name="pagetitle">${_("This Course Unavailable In Your Country")}</%block>
<section class="outside-app">
<p>${_("Our system indicates that you are trying to access an edX course from an IP address associated with a country currently subjected to U.S. economic and trade sanctions. Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this particular course. Feel free to browse our catalogue to find other courses you may be interested in taking.")}</p>
<p>${_("Our system indicates that you are trying to access an edX course from a country currently subject to U.S. economic and trade sanctions. Unfortunately, at this time edX must comply with export controls, and we cannot allow you to access this particular course. Feel free to browse our catalogue to find other courses you may be interested in taking.")}</p>
</section>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment