views.py 7.21 KB
Newer Older
1
import json
2 3
from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import sha256
4 5

from Crypto import Random
6
from Crypto.Cipher import AES
7 8 9 10 11 12
from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import PermissionDenied
from django.http import Http404, HttpResponse
from django.views.decorators.http import require_GET, require_POST

David Baumgold committed
13
from edxmako.shortcuts import render_to_response
14
from notification_prefs import NOTIFICATION_PREF_KEY
15
from openedx.core.djangoapps.user_api.models import UserPreference
16
from openedx.core.djangoapps.user_api.preferences.api import delete_user_preference
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93


class UsernameDecryptionException(Exception):
    pass


class UsernameCipher(object):
    """
    A transformation of a username to/from an opaque token

    The purpose of the token is to make one-click unsubscribe links that don't
    require the user to log in. To prevent users from unsubscribing other users,
    we must ensure the token cannot be computed by anyone who has this
    source code. The token must also be embeddable in a URL.

    Thus, we take the following steps to encode (and do the inverse to decode):
    1. Pad the UTF-8 encoding of the username with PKCS#7 padding to match the
       AES block length
    2. Generate a random AES block length initialization vector
    3. Use AES-256 (with a hash of settings.SECRET_KEY as the encryption key)
       in CBC mode to encrypt the username
    4. Prepend the IV to the encrypted value to allow for initialization of the
       decryption cipher
    5. base64url encode the result
    """

    @staticmethod
    def _get_aes_cipher(initialization_vector):
        hash_ = sha256()
        hash_.update(settings.SECRET_KEY)
        return AES.new(hash_.digest(), AES.MODE_CBC, initialization_vector)

    @staticmethod
    def _add_padding(input_str):
        """Return `input_str` with PKCS#7 padding added to match AES block length"""
        padding_len = AES.block_size - len(input_str) % AES.block_size
        return input_str + padding_len * chr(padding_len)

    @staticmethod
    def _remove_padding(input_str):
        """Return `input_str` with PKCS#7 padding trimmed to match AES block length"""
        num_pad_bytes = ord(input_str[-1])
        if num_pad_bytes < 1 or num_pad_bytes > AES.block_size or num_pad_bytes >= len(input_str):
            raise UsernameDecryptionException("padding")
        return input_str[:-num_pad_bytes]

    @staticmethod
    def encrypt(username):
        initialization_vector = Random.new().read(AES.block_size)
        aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)
        return urlsafe_b64encode(
            initialization_vector +
            aes_cipher.encrypt(UsernameCipher._add_padding(username.encode("utf-8")))
        )

    @staticmethod
    def decrypt(token):
        try:
            base64_decoded = urlsafe_b64decode(token)
        except TypeError:
            raise UsernameDecryptionException("base64url")

        if len(base64_decoded) < AES.block_size:
            raise UsernameDecryptionException("initialization_vector")

        initialization_vector = base64_decoded[:AES.block_size]
        aes_encrypted = base64_decoded[AES.block_size:]
        aes_cipher = UsernameCipher._get_aes_cipher(initialization_vector)

        try:
            decrypted = aes_cipher.decrypt(aes_encrypted)
        except ValueError:
            raise UsernameDecryptionException("aes")

        return UsernameCipher._remove_padding(decrypted)


94 95 96 97 98
def enable_notifications(user):
    """
    Enable notifications for a user.
    Currently only used for daily forum digests.
    """
99 100
    # Calling UserPreference directly because this method is called from a couple of places,
    # and it is not clear that user is always the user initiating the request.
101 102 103 104 105 106 107 108 109
    UserPreference.objects.get_or_create(
        user=user,
        key=NOTIFICATION_PREF_KEY,
        defaults={
            "value": UsernameCipher.encrypt(user.username)
        }
    )


110 111 112 113 114 115 116 117
@require_POST
def ajax_enable(request):
    """
    A view that enables notifications for the authenticated user

    This view should be invoked by an AJAX POST call. It returns status 204
    (no content) or an error. If notifications were already enabled for this
    user, this has no effect. Otherwise, a preference is created with the
118
    unsubscribe token (an encryption of the username) as the value.username
119 120 121 122
    """
    if not request.user.is_authenticated():
        raise PermissionDenied

123
    enable_notifications(request.user)
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

    return HttpResponse(status=204)


@require_POST
def ajax_disable(request):
    """
    A view that disables notifications for the authenticated user

    This view should be invoked by an AJAX POST call. It returns status 204
    (no content) or an error.
    """
    if not request.user.is_authenticated():
        raise PermissionDenied

139
    delete_user_preference(request.user, NOTIFICATION_PREF_KEY)
140 141 142

    return HttpResponse(status=204)

143

144
@require_GET
145 146
def ajax_status(request):
    """
147
    A view that retrieves notifications status for the authenticated user.
148

149 150
    This view should be invoked by an AJAX GET call. It returns status 200,
    with a JSON-formatted payload, or an error.
151 152 153 154
    """
    if not request.user.is_authenticated():
        raise PermissionDenied

155
    qs = UserPreference.objects.filter(
156
        user=request.user,
157
        key=NOTIFICATION_PREF_KEY
158 159
    )

160
    return HttpResponse(json.dumps({"status": len(qs)}), content_type="application/json")
161 162


163
@require_GET
164
def set_subscription(request, token, subscribe):  # pylint: disable=unused-argument
165
    """
166
    A view that disables or re-enables notifications for a user who may not be authenticated
167 168 169

    This view is meant to be the target of an unsubscribe link. The request
    must be a GET, and the `token` parameter must decrypt to a valid username.
170 171
    The subscribe flag feature controls whether the view subscribes or unsubscribes the user, with subscribe=True
    used to "undo" accidentally clicking on the unsubscribe link
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

    A 405 will be returned if the request method is not GET. A 404 will be
    returned if the token parameter does not decrypt to a valid username. On
    success, the response will contain a page indicating success.
    """
    try:
        username = UsernameCipher().decrypt(token.encode())
        user = User.objects.get(username=username)
    except UnicodeDecodeError:
        raise Http404("base64url")
    except UsernameDecryptionException as exn:
        raise Http404(exn.message)
    except User.DoesNotExist:
        raise Http404("username")

187 188
    # Calling UserPreference directly because the fact that the user is passed in the token implies
    # that it may not match request.user.
189
    if subscribe:
190 191 192 193 194 195
        UserPreference.objects.get_or_create(user=user,
                                             key=NOTIFICATION_PREF_KEY,
                                             defaults={
                                                 "value": UsernameCipher.encrypt(user.username)
                                             })
        return render_to_response("resubscribe.html", {'token': token})
196 197 198
    else:
        UserPreference.objects.filter(user=user, key=NOTIFICATION_PREF_KEY).delete()
        return render_to_response("unsubscribe.html", {'token': token})