Commit 47b51e93 by Greg Price

Add endpoints to set user notification preference

Two endpoints are intended to be used by AJAX calls from the web UI with an
authenticated user. When enabled, the value of the preference is an encryption
of the username, to be used as a token for one-click unsubscribe links. The
third endpoint is the target of unsubscribe links, which displays a page with
an appropriate message to the user.
parent b4008f2e
......@@ -11,6 +11,9 @@ preferences. Access to the REST API is restricted by use of the
X-Edx-Api-Key HTTP header (which must match settings.EDX_API_KEY; if
the setting is not present, the API is disabled).
LMS: Added endpoints for AJAX requests to enable/disable notifications
(which are not yet implemented) and a one-click unsubscribe page.
Common: Added *experimental* support for jsinput type.
Common: Added setting to specify Celery Broker vhost
......
NOTIFICATION_PREF_KEY = "notification_pref"
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import PermissionDenied
from django.http import Http404
from django.test import TestCase
from django.test.client import Client, RequestFactory
from django.test.utils import override_settings
from mock import Mock, patch
from notification_prefs import NOTIFICATION_PREF_KEY
from notification_prefs.views import ajax_enable, ajax_disable, unsubscribe
from student.tests.factories import UserFactory
from user_api.models import UserPreference
@override_settings(SECRET_KEY="test secret key")
class NotificationPrefViewTest(TestCase):
INITIALIZATION_VECTOR = "\x00" * 16
@classmethod
def setUpClass(cls):
# Make sure global state is set up appropriately
Client().get("/")
def setUp(self):
self.user = UserFactory.create(username="testuser")
# Tokens are intentionally hard-coded instead of computed to help us
# avoid breaking existing links.
self.tokens = {
self.user: "AAAAAAAAAAAAAAAAAAAAAA8mMQo96FZfb1YKv1R5X6s=",
# Username with length equal to AES block length to test padding
UserFactory.create(username="sixteencharsuser"):
"AAAAAAAAAAAAAAAAAAAAAPxPWCuI2Ay9TATBVnfw7eIj-hUh6erQ_-VkbDqHqm8D",
# Even longer username
UserFactory.create(username="thisusernameissoveryverylong"):
"AAAAAAAAAAAAAAAAAAAAAPECbYqPI7_W4mRF8LbTaHuHt3tNXPggZ1Bke-zDyEiZ",
# Non-ASCII username
UserFactory.create(username=u"\u4e2d\u56fd"):
"AAAAAAAAAAAAAAAAAAAAAMjfGAhZKIZsI3L-Z7nflTA="
}
self.request_factory = RequestFactory()
def create_prefs(self):
"""Create all test preferences in the database"""
for (user, token) in self.tokens.items():
UserPreference.objects.create(user=user, key=NOTIFICATION_PREF_KEY, value=token)
def assertPrefValid(self, user):
"""Ensure that the correct preference for the user is persisted"""
self.assertEqual(
UserPreference.objects.get(user=user, key=NOTIFICATION_PREF_KEY).value,
self.tokens[user]
)
def assertNotPrefExists(self, user):
"""Ensure that the user does not have a persisted preference"""
self.assertFalse(
UserPreference.objects.filter(user=user, key=NOTIFICATION_PREF_KEY).exists()
)
# AJAX enable view
def test_ajax_enable_get(self):
request = self.request_factory.get("dummy")
request.user = self.user
response = ajax_enable(request)
self.assertEqual(response.status_code, 405)
self.assertNotPrefExists(self.user)
def test_ajax_enable_anon_user(self):
request = self.request_factory.post("dummy")
request.user = AnonymousUser()
self.assertRaises(PermissionDenied, ajax_enable, request)
self.assertNotPrefExists(self.user)
@patch("Crypto.Random.new")
def test_ajax_enable_success(self, mock_random_new):
mock_stream = Mock()
mock_stream.read.return_value = self.INITIALIZATION_VECTOR
mock_random_new.return_value = mock_stream
def test_user(user):
request = self.request_factory.post("dummy")
request.user = user
response = ajax_enable(request)
self.assertEqual(response.status_code, 204)
self.assertPrefValid(user)
for user in self.tokens.keys():
test_user(user)
def test_ajax_enable_already_enabled(self):
self.create_prefs()
request = self.request_factory.post("dummy")
request.user = self.user
response = ajax_enable(request)
self.assertEqual(response.status_code, 204)
self.assertPrefValid(self.user)
def test_ajax_enable_distinct_values(self):
request = self.request_factory.post("dummy")
request.user = self.user
ajax_enable(request)
other_user = UserFactory.create()
request.user = other_user
ajax_enable(request)
self.assertNotEqual(
UserPreference.objects.get(user=self.user, key=NOTIFICATION_PREF_KEY).value,
UserPreference.objects.get(user=other_user, key=NOTIFICATION_PREF_KEY).value
)
# AJAX disable view
def test_ajax_disable_get(self):
self.create_prefs()
request = self.request_factory.get("dummy")
request.user = self.user
response = ajax_disable(request)
self.assertEqual(response.status_code, 405)
self.assertPrefValid(self.user)
def test_ajax_disable_anon_user(self):
self.create_prefs()
request = self.request_factory.post("dummy")
request.user = AnonymousUser()
self.assertRaises(PermissionDenied, ajax_disable, request)
self.assertPrefValid(self.user)
def test_ajax_disable_success(self):
self.create_prefs()
request = self.request_factory.post("dummy")
request.user = self.user
response = ajax_disable(request)
self.assertEqual(response.status_code, 204)
self.assertNotPrefExists(self.user)
def test_ajax_disable_already_disabled(self):
request = self.request_factory.post("dummy")
request.user = self.user
response = ajax_disable(request)
self.assertEqual(response.status_code, 204)
self.assertNotPrefExists(self.user)
# Unsubscribe view
def test_unsubscribe_post(self):
request = self.request_factory.post("dummy")
response = unsubscribe(request, "dummy")
self.assertEqual(response.status_code, 405)
def test_unsubscribe_invalid_token(self):
def test_invalid_token(token, message):
request = self.request_factory.get("dummy")
self.assertRaisesRegexp(Http404, "^{}$".format(message), unsubscribe, request, token)
# Invalid base64 encoding
test_invalid_token("ZOMG INVALID BASE64 CHARS!!!", "base64url")
test_invalid_token("Non-ASCII\xff", "base64url")
test_invalid_token(self.tokens[self.user][:-1], "base64url")
# Token not long enough to contain initialization vector
test_invalid_token("AAAAAAAAAAA=", "initialization_vector")
# Token length not a multiple of AES block length
test_invalid_token(self.tokens[self.user][:-4], "aes")
# Invalid padding (ends in 0 byte)
# Encrypted value: "testuser" + "\x00" * 8
test_invalid_token("AAAAAAAAAAAAAAAAAAAAAMoazRI7ePLjEWXN1N7keLw=", "padding")
# Invalid padding (ends in byte > 16)
# Encrypted value: "testusertestuser"
test_invalid_token("AAAAAAAAAAAAAAAAAAAAAC6iLXGhjkFytJoJSBJZzJ4=", "padding")
# Invalid padding (entire string is padding)
# Encrypted value: "\x10" * 16
test_invalid_token("AAAAAAAAAAAAAAAAAAAAANRGw8HDEmlcLVFawgY9wI8=", "padding")
# Nonexistent user
# Encrypted value: "nonexistentuser\x01"
test_invalid_token("AAAAAAAAAAAAAAAAAAAAACpyUxTGIrUjnpuUsNi7mAY=", "username")
def test_unsubscribe_success(self):
self.create_prefs()
def test_user(user):
request = self.request_factory.get("dummy")
request.user = AnonymousUser()
response = unsubscribe(request, self.tokens[user])
self.assertEqual(response.status_code, 200)
self.assertNotPrefExists(user)
for user in self.tokens.keys():
test_user(user)
def test_unsubscribe_twice(self):
self.create_prefs()
request = self.request_factory.get("dummy")
request.user = AnonymousUser()
unsubscribe(request, self.tokens[self.user])
response = unsubscribe(request, self.tokens[self.user])
self.assertEqual(response.status_code, 200)
self.assertNotPrefExists(self.user)
from base64 import urlsafe_b64encode, urlsafe_b64decode
from hashlib import sha256
from Crypto.Cipher import AES
from Crypto import Random
from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.http import Http404, HttpResponse
from django.views.decorators.http import require_GET, require_POST
from mitxmako.shortcuts import render_to_response
from notification_prefs import NOTIFICATION_PREF_KEY
from user_api.models import UserPreference
class UsernameDecryptionException(Exception):
pass
class UsernameCipher(object):
"""
A transformation of a username to/from an opaque token
The purpose of the token is to make one-click unsubscribe links that don't
require the user to log in. To prevent users from unsubscribing other users,
we must ensure the token cannot be computed by anyone who has this
source code. The token must also be embeddable in a URL.
Thus, we take the following steps to encode (and do the inverse to decode):
1. Pad the UTF-8 encoding of the username with PKCS#7 padding to match the
AES block length
2. Generate a random AES block length initialization vector
3. Use AES-256 (with a hash of settings.SECRET_KEY as the encryption key)
in CBC mode to encrypt the username
4. Prepend the IV to the encrypted value to allow for initialization of the
decryption cipher
5. base64url encode the result
"""
@staticmethod
def _get_aes_cipher(initialization_vector):
hash_ = sha256()
hash_.update(settings.SECRET_KEY)
return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)
@staticmethod
def _add_padding(input_str):
"""Return `input_str` with PKCS#7 padding added to match AES block length"""
padding_len = AES.block_size - len(input_str) % AES.block_size
return input_str + padding_len * chr(padding_len)
@staticmethod
def _remove_padding(input_str):
"""Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
num_pad_bytes = ord(input_str[-1])
if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
raise UsernameDecryptionException("padding")
return input_str[:-num_pad_bytes]
@staticmethod
def encrypt(username):
initialization_vector = Random.new().read(AES.block_size)
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
return urlsafe_b64encode(
initialization_vector +
aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
)
@staticmethod
def decrypt(token):
try:
base64_decoded = urlsafe_b64decode(token)
except TypeError:
raise UsernameDecryptionException("base64url")
if len(base64_decoded) < AES.block_size:
raise UsernameDecryptionException("initialization_vector")
initialization_vector = base64_decoded[:AES.block_size]
aes_encrypted = base64_decoded[AES.block_size:]
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
try:
decrypted = aes_cipher.decrypt(aes_encrypted)
except ValueError:
raise UsernameDecryptionException("aes")
return UsernameCipher._remove_padding(decrypted)
@require_POST
def ajax_enable(request):
"""
A view that enables notifications for the authenticated user
This view should be invoked by an AJAX POST call. It returns status 204
(no content) or an error. If notifications were already enabled for this
user, this has no effect. Otherwise, a preference is created with the
unsubscribe token (an ecnryption of the username) as the value.unsernam
"""
if not request.user.is_authenticated():
raise PermissionDenied
UserPreference.objects.get_or_create(
user=request.user,
key=NOTIFICATION_PREF_KEY,
defaults={
"value": UsernameCipher.encrypt(request.user.username)
}
)
return HttpResponse(status=204)
@require_POST
def ajax_disable(request):
"""
A view that disables notifications for the authenticated user
This view should be invoked by an AJAX POST call. It returns status 204
(no content) or an error.
"""
if not request.user.is_authenticated():
raise PermissionDenied
UserPreference.objects.filter(
user=request.user,
key=NOTIFICATION_PREF_KEY
).delete()
return HttpResponse(status=204)
@require_GET
def unsubscribe(request, token):
"""
A view that disables notifications for a user who may not be authenticated
This view is meant to be the target of an unsubscribe link. The request
must be a GET, and the `token` parameter must decrypt to a valid username.
A 405 will be returned if the request method is not GET. A 404 will be
returned if the token parameter does not decrypt to a valid username. On
success, the response will contain a page indicating success.
"""
try:
username = UsernameCipher().decrypt(token.encode())
user = User.objects.get(username=username)
except UnicodeDecodeError:
raise Http404("base64url")
except UsernameDecryptionException as exn:
raise Http404(exn.message)
except User.DoesNotExist:
raise Http404("username")
UserPreference.objects.filter(user=user, key=NOTIFICATION_PREF_KEY).delete()
return render_to_response("unsubscribe.html", {})
......@@ -753,6 +753,9 @@ INSTALLED_APPS = (
# User API
'rest_framework',
'user_api',
# Notification preferences setting
'notification_prefs',
)
######################### MARKETING SITE ###############################
......
......@@ -30,6 +30,7 @@
@import 'shared/course_filter';
@import 'shared/modal';
@import 'shared/activation_messages';
@import 'shared/unsubscribe';
@import 'multicourse/home';
@import 'multicourse/dashboard';
......
.container.unsubscribe {
padding: 60px 0px 120px;
h1 {
margin-bottom: 20px;
padding: 10px;
@extend .success-message-colors;
}
h1 + hr {
margin-bottom: 30px;
}
.message {
background: rgb(252,252,252);
border: 1px solid rgb(200,200,200);
box-shadow: 0 3px 20px 0 rgba(0,0,0, 0.2);
border-radius: 4px;
margin: 0 auto;
padding: 40px;
width: flex-grid(6);
}
}
<%! from django.core.urlresolvers import reverse %>
<%inherit file="main.html" />
<%namespace name='static' file='static_content.html'/>
<section class="container unsubscribe">
<section class="message">
<h1>Unsubscribe Successful!</h1>
<hr class="horizontal-divider">
<p>
You will no longer receive notification emails from edX.
Click <a href="${reverse('dashboard')}">here</a> to return to your dashboard.
</p>
</section>
</section>
......@@ -331,7 +331,10 @@ if settings.COURSEWARE_ENABLED:
url(r'^courses/(?P<course_id>[^/]+/[^/]+/[^/]+)/news$',
'courseware.views.news', name="news"),
url(r'^courses/(?P<course_id>[^/]+/[^/]+/[^/]+)/discussion/',
include('django_comment_client.urls'))
include('django_comment_client.urls')),
url(r'^notification_prefs/enable/', 'notification_prefs.views.ajax_enable'),
url(r'^notification_prefs/disable/', 'notification_prefs.views.ajax_disable'),
url(r'^notification_prefs/unsubscribe/(?P<token>[a-zA-Z0-9-_=]+)/', 'notification_prefs.views.unsubscribe'),
)
urlpatterns += (
# This MUST be the last view in the courseware--it's a catch-all for custom tabs.
......
......@@ -37,6 +37,7 @@ path.py==3.0.1
Pillow==1.7.8
pip>=1.3
polib==1.0.3
pycrypto>=2.6
pygments==1.5
pygraphviz==1.1
pymongo==2.4.1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment