Commit 96b8181e by Greg Price

Merge pull request #24 from edx/gprice/remove-username-cipher

Remove unnecessary UsernameCipher code
parents e3a25eee 10a41239
......@@ -14,7 +14,7 @@ from django.utils.html import strip_tags
from django.utils.translation import ugettext as _, activate, deactivate
from statsd import statsd
from notifier.user import UsernameCipher, LANGUAGE_PREFERENCE_KEY
from notifier.user import DIGEST_NOTIFICATION_PREFERENCE_KEY, LANGUAGE_PREFERENCE_KEY
# maximum number of threads to display per course
MAX_COURSE_THREADS = 30
......@@ -147,14 +147,14 @@ def _get_thread_url(course_id, thread_id, commentable_id):
return _get_course_url(course_id) + thread_path
def _get_unsubscribe_url(username):
def _get_unsubscribe_url(user):
"""
Formatting helper.
Generate a click-through url to unsubscribe a user from digest notifications,
using an encrypted token based on the username.
using the encrypted token contained in the user's preference.
"""
token = UsernameCipher.encrypt(username)
token = user["preferences"][DIGEST_NOTIFICATION_PREFERENCE_KEY]
return '{}/notification_prefs/unsubscribe/{}/'.format(settings.LMS_URL_BASE, token)
......@@ -227,7 +227,7 @@ def render_digest(user, digest, title, description):
'course_names': _make_text_list([course.title for course in digest.courses]),
'thread_count': sum(course.thread_count for course in digest.courses),
'logo_image_url': settings.LOGO_IMAGE_URL,
'unsubscribe_url': _get_unsubscribe_url(user['username']),
'unsubscribe_url': _get_unsubscribe_url(user),
'postal_address': settings.EMAIL_SENDER_POSTAL_ADDRESS,
})
......
......@@ -63,10 +63,6 @@ EMAIL_USE_TLS = os.getenv('EMAIL_USE_TLS')
# email settings independent of backend
EMAIL_REWRITE_RECIPIENT = os.getenv('EMAIL_REWRITE_RECIPIENT')
# secret key for generating unsub tokens
# this MUST be changed in production envs, and MUST match the LMS' secret key
SECRET_KEY = os.getenv('SECRET_KEY', '85920908f28904ed733fe576320db18cabd7b6cd')
# LMS links, images, etc
LMS_URL_BASE = os.getenv('LMS_URL_BASE', 'http://localhost:8000')
......
# coding=utf-8
from uuid import uuid4
from unittest import skip
from django.test import TestCase
from mock import patch
from notifier import settings
from notifier.digest import Digest, DigestCourse, DigestItem, DigestThread, render_digest
from notifier.user import LANGUAGE_PREFERENCE_KEY
from notifier.user import DIGEST_NOTIFICATION_PREFERENCE_KEY, LANGUAGE_PREFERENCE_KEY
TEST_COURSE_ID = "test_org/test_num/test_course"
TEST_COMMENTABLE = "test_commentable"
......@@ -77,8 +80,9 @@ class RenderDigestTestCase(TestCase):
def setUp(self):
self.user = {
"id": "0",
"username": "test_user",
"preferences": {}
"preferences": {
DIGEST_NOTIFICATION_PREFERENCE_KEY: uuid4(),
}
}
self.set_digest("test title")
......@@ -132,3 +136,12 @@ class RenderDigestTestCase(TestCase):
del self.user["preferences"][LANGUAGE_PREFERENCE_KEY]
render_digest(self.user, self.digest, "dummy", "dummy")
mock_activate.assert_not_called()
def test_unsubscribe_url(self):
text, html = render_digest(self.user, self.digest, "dummy", "dummy")
expected_url = "{lms_url_base}/notification_prefs/unsubscribe/{token}/".format(
lms_url_base=settings.LMS_URL_BASE,
token=self.user["preferences"][DIGEST_NOTIFICATION_PREFERENCE_KEY]
)
self.assertIn(expected_url, text)
self.assertIn(expected_url, html)
"""
Functions in support of generating formatted digest emails of forums activity.
"""
from base64 import urlsafe_b64encode, urlsafe_b64decode
from hashlib import sha256
import logging
import sys
from Crypto.Cipher import AES
from Crypto import Random
from dogapi import dog_stats_api
from django.conf import settings
import requests
......@@ -84,77 +80,3 @@ def get_user(user_id):
raise Exception(
'unhandled response from user service: %s %s' %
(r.status_code, r.reason))
# implementation mirrors that in
# https://github.com/edx/edx-platform/blob/master/lms/djangoapps/notification_prefs/views.py
class UsernameCipher(object):
"""
A transformation of a username to/from an opaque token
The purpose of the token is to make one-click unsubscribe links that don't
require the user to log in. To prevent users from unsubscribing other users,
we must ensure the token cannot be computed by anyone who has this
source code. The token must also be embeddable in a URL.
Thus, we take the following steps to encode (and do the inverse to decode):
1. Pad the UTF-8 encoding of the username with PKCS#7 padding to match the
AES block length
2. Generate a random AES block length initialization vector
3. Use AES-256 (with a hash of settings.SECRET_KEY as the encryption key)
in CBC mode to encrypt the username
4. Prepend the IV to the encrypted value to allow for initialization of the
decryption cipher
5. base64url encode the result
"""
@staticmethod
def _get_aes_cipher(initialization_vector):
hash_ = sha256()
hash_.update(settings.SECRET_KEY)
return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)
@staticmethod
def _add_padding(input_str):
"""Return `input_str` with PKCS#7 padding added to match AES block length"""
padding_len = AES.block_size - len(input_str) % AES.block_size
return input_str + padding_len * chr(padding_len)
@staticmethod
def _remove_padding(input_str):
"""Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
num_pad_bytes = ord(input_str[-1])
if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
raise UsernameDecryptionException("padding")
return input_str[:-num_pad_bytes]
@staticmethod
def encrypt(username):
initialization_vector = Random.new().read(AES.block_size)
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
return urlsafe_b64encode(
initialization_vector +
aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
)
@staticmethod
def decrypt(token):
try:
base64_decoded = urlsafe_b64decode(token)
except TypeError:
raise UsernameDecryptionException("base64url")
if len(base64_decoded) < AES.block_size:
raise UsernameDecryptionException("initialization_vector")
initialization_vector = base64_decoded[:AES.block_size]
aes_encrypted = base64_decoded[AES.block_size:]
aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
try:
decrypted = aes_cipher.decrypt(aes_encrypted)
except ValueError:
raise UsernameDecryptionException("aes")
return UsernameCipher._remove_padding(decrypted)
......@@ -16,7 +16,6 @@ logilab-astng==0.24.3
logilab-common==0.59.1
mock==1.0.1
pep8==1.4.6
pycrypto>=2.6
pylint==0.28.0
python-dateutil==2.1
pytz==2013b
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment