authentication.py 5.98 KB
Newer Older
Ahsan Ulhaq committed
1
""" Common Authentication Handlers used across projects. """
2

Ahsan Ulhaq committed
3
import logging
4

5
import django.utils.timezone
6 7
from oauth2_provider import models as dot_models
from provider.oauth2 import models as dop_models
8
from rest_framework.exceptions import AuthenticationFailed
9
from rest_framework.authentication import SessionAuthentication
10
from rest_framework_oauth.authentication import OAuth2Authentication
11

Ahsan Ulhaq committed
12

13 14 15 16 17 18
OAUTH2_TOKEN_ERROR = u'token_error'
OAUTH2_TOKEN_ERROR_EXPIRED = u'token_expired'
OAUTH2_TOKEN_ERROR_MALFORMED = u'token_malformed'
OAUTH2_TOKEN_ERROR_NONEXISTENT = u'token_nonexistent'
OAUTH2_TOKEN_ERROR_NOT_PROVIDED = u'token_not_provided'

stephensanchez committed
19

Ahsan Ulhaq committed
20 21 22
log = logging.getLogger(__name__)


23
class SessionAuthenticationAllowInactiveUser(SessionAuthentication):
stephensanchez committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
    """Ensure that the user is logged in, but do not require the account to be active.

    We use this in the special case that a user has created an account,
    but has not yet activated it.  We still want to allow the user to
    enroll in courses, so we remove the usual restriction
    on session authentication that requires an active account.

    You should use this authentication class ONLY for end-points that
    it's safe for an un-activated user to access.  For example,
    we can allow a user to update his/her own enrollments without
    activating an account.

    """
    def authenticate(self, request):
        """Authenticate the user, requiring a logged-in account and CSRF.

        This is exactly the same as the `SessionAuthentication` implementation,
        with the `user.is_active` check removed.

        Args:
            request (HttpRequest)

        Returns:
            Tuple of `(user, token)`

        Raises:
            PermissionDenied: The CSRF token check failed.

        """
        # Get the underlying HttpRequest object
        request = request._request  # pylint: disable=protected-access
        user = getattr(request, 'user', None)

        # Unauthenticated, CSRF validation not required
        # This is where regular `SessionAuthentication` checks that the user is active.
        # We have removed that check in this implementation.
60 61
        # But we added a check to prevent anonymous users since we require a logged-in account.
        if not user or user.is_anonymous():
stephensanchez committed
62 63 64 65 66 67
            return None

        self.enforce_csrf(request)

        # CSRF passed with authenticated user
        return (user, None)
68 69


70
class OAuth2AuthenticationAllowInactiveUser(OAuth2Authentication):
71 72 73 74 75 76 77 78 79 80 81 82 83
    """
    This is a temporary workaround while the is_active field on the user is coupled
    with whether or not the user has verified ownership of their claimed email address.
    Once is_active is decoupled from verified_email, we will no longer need this
    class override.

    But until then, this authentication class ensures that the user is logged in,
    but does not require that their account "is_active".

    This class can be used for an OAuth2-accessible endpoint that allows users to access
    that endpoint without having their email verified.  For example, this is used
    for mobile endpoints.
    """
84 85 86 87 88 89 90 91 92 93 94

    def authenticate(self, *args, **kwargs):
        """
        Returns two-tuple of (user, token) if access token authentication
        succeeds, raises an AuthenticationFailed (HTTP 401) if authentication
        fails or None if the user did not try to authenticate using an access
        token.
        """

        try:
            return super(OAuth2AuthenticationAllowInactiveUser, self).authenticate(*args, **kwargs)
95 96 97 98
        except AuthenticationFailed as exc:
            if isinstance(exc.detail, dict):
                developer_message = exc.detail['developer_message']
                error_code = exc.detail['error_code']
99
            else:
100 101 102 103 104 105 106
                developer_message = exc.detail
                if 'No credentials provided' in developer_message:
                    error_code = OAUTH2_TOKEN_ERROR_NOT_PROVIDED
                elif 'Token string should not contain spaces' in developer_message:
                    error_code = OAUTH2_TOKEN_ERROR_MALFORMED
                else:
                    error_code = OAUTH2_TOKEN_ERROR
107 108
            raise AuthenticationFailed({
                u'error_code': error_code,
109
                u'developer_message': developer_message
110 111
            })

112 113 114
    def authenticate_credentials(self, request, access_token):
        """
        Authenticate the request, given the access token.
115 116 117

        Overrides base class implementation to discard failure if user is
        inactive.
118
        """
119 120

        token = self.get_access_token(access_token)
121 122 123 124 125
        if not token:
            raise AuthenticationFailed({
                u'error_code': OAUTH2_TOKEN_ERROR_NONEXISTENT,
                u'developer_message': u'The provided access token does not match any valid tokens.'
            })
126
        elif token.expires < django.utils.timezone.now():
127 128 129 130 131 132
            raise AuthenticationFailed({
                u'error_code': OAUTH2_TOKEN_ERROR_EXPIRED,
                u'developer_message': u'The provided access token has expired and is no longer valid.',
            })
        else:
            return token.user, token
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155

    def get_access_token(self, access_token):
        """
        Return a valid access token that exists in one of our OAuth2 libraries,
        or None if no matching token is found.
        """
        return self._get_dot_token(access_token) or self._get_dop_token(access_token)

    def _get_dop_token(self, access_token):
        """
        Return a valid access token stored by django-oauth2-provider (DOP), or
        None if no matching token is found.
        """
        token_query = dop_models.AccessToken.objects.select_related('user')
        return token_query.filter(token=access_token).first()

    def _get_dot_token(self, access_token):
        """
        Return a valid access token stored by django-oauth-toolkit (DOT), or
        None if no matching token is found.
        """
        token_query = dot_models.AccessToken.objects.select_related('user')
        return token_query.filter(token=access_token).first()