Commit 17326634 by chrisndodge

add restricted DOT OAUTH2 client with reduced privileges

parent 6e23a2ca
...@@ -331,7 +331,3 @@ FEATURES['CUSTOM_COURSES_EDX'] = True ...@@ -331,7 +331,3 @@ FEATURES['CUSTOM_COURSES_EDX'] = True
# API access management -- needed for simple-history to run. # API access management -- needed for simple-history to run.
INSTALLED_APPS += ('openedx.core.djangoapps.api_admin',) INSTALLED_APPS += ('openedx.core.djangoapps.api_admin',)
# Set the default Oauth2 Provider Model so that migrations can run in
# verbose mode
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
...@@ -450,9 +450,14 @@ OAUTH2_PROVIDER = { ...@@ -450,9 +450,14 @@ OAUTH2_PROVIDER = {
'read': 'Read scope', 'read': 'Read scope',
'write': 'Write scope', 'write': 'Write scope',
'email': 'Email scope', 'email': 'Email scope',
'profile': 'Profile scope', # conform profile scope message that is presented to end-user
} # to lms/templates/provider/authorize.html. This may be revised later.
'profile': 'Read your user profile',
},
} }
# This is required for the migrations in oauth_dispatch.models
# otherwise it fails saying this attribute is not present in Settings
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
################################## TEMPLATE CONFIGURATION ##################################### ################################## TEMPLATE CONFIGURATION #####################################
# Mako templating # Mako templating
......
...@@ -582,10 +582,6 @@ JWT_AUTH.update({ ...@@ -582,10 +582,6 @@ JWT_AUTH.update({
'JWT_AUDIENCE': 'test-key', 'JWT_AUDIENCE': 'test-key',
}) })
# Set the default Oauth2 Provider Model so that migrations can run in
# verbose mode
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
COURSE_CATALOG_API_URL = 'https://catalog.example.com/api/v1' COURSE_CATALOG_API_URL = 'https://catalog.example.com/api/v1'
COMPREHENSIVE_THEME_DIRS = [REPO_ROOT / "themes", REPO_ROOT / "common/test"] COMPREHENSIVE_THEME_DIRS = [REPO_ROOT / "themes", REPO_ROOT / "common/test"]
......
...@@ -15,3 +15,41 @@ ...@@ -15,3 +15,41 @@
} }
} }
// For the django-oauth-toolkit Authorization view
.wrapper-authorize {
background: $white;
width: 60%;
padding-right: 140px;
padding-left: 140px;
font-family: $sans-serif;
h1 {
@extend %t-title4;
margin-bottom: 0;
margin-left: 0;
padding: $baseline;
padding-left: 0px;
@include text-align(left);
}
p {
@extend %t-copy-base;
margin: $baseline/2 0;
}
.control-group {
float: right;
}
.btn-authorization-allow {
@extend %btn-primary-blue;
margin-left: 20px;
line-height: 0.7em;
}
.btn-authorization-cancel {
@extend %btn-secondary-blue-outline;
}
}
{% extends "main_django.html" %}
{% load i18n configuration %}
{% block title %}
{% trans "Authorize" %} | {% platform_name %}
{% endblock %}
{% block body %}
<main id="main" aria-label="Content" tabindex="-1">
<section class="container authorize {{ selected_tab }}" id="authorize-content">
<div class="wrapper-authorize">
<div class="block-center">
{% if not error %}
<form id="authorizationForm" method="post">
<h1 class="block-center-heading">{% trans "Authorize" %} {{ application.name }}?</h1>
{% csrf_token %}
{% for field in form %}
{% if field.is_hidden %}
{{ field }}
{% endif %}
{% endfor %}
<p>{% trans "The above application requests the following permissions from your account:" %}</p>
<ul>
{% for scope in scopes_descriptions %}
<li>{{ scope }}</li>
{% endfor %}
</ul>
<p>{% trans "Please click the 'Allow' button to grant these permissions to the above application. Otherwise, to withhold these permissions, please click the 'Cancel' button." %}
</p>
{{ form.errors }}
{{ form.non_field_errors }}
<div class="control-group">
<div class="controls">
<button type="submit" class="btn btn-authorization-cancel" name="cancel"/>{% trans "Cancel" %}</button><button type="submit" class="btn btn-authorization-allow" name="allow" value="Authorize"/>{% trans "Allow" %}</button>
</div>
</div>
</form>
{% else %}
<h2>{% trans "Error" %}: {{ error.error }}</h2>
<p>{{ error.description }}</p>
{% endif %}
</div>
</div>
</section>
</main>
{% endblock %}
...@@ -5,6 +5,8 @@ Override admin configuration for django-oauth-toolkit ...@@ -5,6 +5,8 @@ Override admin configuration for django-oauth-toolkit
from django.contrib.admin import ModelAdmin, site from django.contrib.admin import ModelAdmin, site
from oauth2_provider import models from oauth2_provider import models
from .models import RestrictedApplication
def reregister(model_class): def reregister(model_class):
""" """
...@@ -71,3 +73,13 @@ class DOTGrantAdmin(ModelAdmin): ...@@ -71,3 +73,13 @@ class DOTGrantAdmin(ModelAdmin):
list_filter = [u'application'] list_filter = [u'application']
raw_id_fields = [u'user'] raw_id_fields = [u'user']
search_fields = [u'code', u'user__username'] search_fields = [u'code', u'user__username']
class RestrictedApplicationAdmin(ModelAdmin):
"""
ModelAdmin for the Restricted Application
"""
list_display = [u'application']
site.register(RestrictedApplication, RestrictedApplicationAdmin)
...@@ -3,10 +3,32 @@ Classes that override default django-oauth-toolkit behavior ...@@ -3,10 +3,32 @@ Classes that override default django-oauth-toolkit behavior
""" """
from __future__ import unicode_literals from __future__ import unicode_literals
from .models import RestrictedApplication
from datetime import datetime
from django.contrib.auth import authenticate, get_user_model from django.contrib.auth import authenticate, get_user_model
from django.db.models.signals import pre_save
from django.dispatch import receiver
from pytz import utc
from oauth2_provider.models import AccessToken
from oauth2_provider.oauth2_validators import OAuth2Validator from oauth2_provider.oauth2_validators import OAuth2Validator
@receiver(pre_save, sender=AccessToken)
def on_access_token_presave(sender, instance, *args, **kwargs): # pylint: disable=unused-argument
"""
A hook on the AccessToken. Since we do not have protected scopes, we must mark all
AccessTokens as expired for 'restricted applications'.
We do this as a pre-save hook on the ORM
"""
is_application_restricted = RestrictedApplication.objects.filter(application=instance.application).exists()
if is_application_restricted:
RestrictedApplication.set_access_token_as_expired(instance)
class EdxOAuth2Validator(OAuth2Validator): class EdxOAuth2Validator(OAuth2Validator):
""" """
Validator class that implements edX-specific custom behavior: Validator class that implements edX-specific custom behavior:
...@@ -61,6 +83,23 @@ class EdxOAuth2Validator(OAuth2Validator): ...@@ -61,6 +83,23 @@ class EdxOAuth2Validator(OAuth2Validator):
super(EdxOAuth2Validator, self).save_bearer_token(token, request, *args, **kwargs) super(EdxOAuth2Validator, self).save_bearer_token(token, request, *args, **kwargs)
is_application_restricted = RestrictedApplication.objects.filter(application=request.client).exists()
if is_application_restricted:
# Since RestrictedApplications will override the DOT defined expiry, so that access_tokens
# are always expired, we need to re-read the token from the database and then calculate the
# expires_in (in seconds) from what we stored in the database. This value should be a negative
#value, meaning that it is already expired
access_token = AccessToken.objects.get(token=token['access_token'])
utc_now = datetime.utcnow().replace(tzinfo=utc)
expires_in = (access_token.expires - utc_now).total_seconds()
# assert that RestriectedApplications only issue expired tokens
# blow up processing if we see otherwise
assert expires_in < 0
token['expires_in'] = expires_in
# Restore the original request attributes # Restore the original request attributes
request.grant_type = grant_type request.grant_type = grant_type
request.user = user request.user = user
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_APPLICATION_MODEL),
]
operations = [
migrations.CreateModel(
name='RestrictedApplication',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('application', models.ForeignKey(to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
],
),
]
"""
Specialized models for oauth_dispatch djangoapp
"""
from datetime import datetime
from django.db import models
from pytz import utc
from oauth2_provider.settings import oauth2_settings
class RestrictedApplication(models.Model):
"""
This model lists which django-oauth-toolkit Applications are considered 'restricted'
and thus have a limited ability to use various APIs.
A restricted Application will only get expired token/JWT payloads
so that they cannot be used to call into APIs.
"""
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, null=False)
def __unicode__(self):
"""
Return a unicode representation of this object
"""
return u"<RestrictedApplication '{name}'>".format(
name=self.application.name
)
@classmethod
def set_access_token_as_expired(cls, access_token):
"""
For access_tokens for RestrictedApplications, put the expire timestamp into the beginning of the epoch
which is Jan. 1, 1970
"""
access_token.expires = datetime(1970, 1, 1, tzinfo=utc)
@classmethod
def verify_access_token_as_expired(cls, access_token):
"""
For access_tokens for RestrictedApplications, make sure that the expiry date
is set at the beginning of the epoch which is Jan. 1, 1970
"""
return access_token.expires == datetime(1970, 1, 1, tzinfo=utc)
...@@ -3,3 +3,4 @@ Constants for testing purposes ...@@ -3,3 +3,4 @@ Constants for testing purposes
""" """
DUMMY_REDIRECT_URL = u'https://example.com/edx/redirect' DUMMY_REDIRECT_URL = u'https://example.com/edx/redirect'
DUMMY_REDIRECT_URL2 = u'https://example.com/edx/other-redirect'
""" """
OAuth Dispatch test mixins OAuth Dispatch test mixins
""" """
import jwt
from django.conf import settings from django.conf import settings
import jwt
from jwt.exceptions import ExpiredSignatureError
from student.models import UserProfile, anonymous_id_for_user from student.models import UserProfile, anonymous_id_for_user
class AccessTokenMixin(object): class AccessTokenMixin(object):
""" Mixin for tests dealing with OAuth 2 access tokens. """ """ Mixin for tests dealing with OAuth 2 access tokens. """
def assert_valid_jwt_access_token(self, access_token, user, scopes=None): def assert_valid_jwt_access_token(self, access_token, user, scopes=None, should_be_expired=False):
""" """
Verify the specified JWT access token is valid, and belongs to the specified user. Verify the specified JWT access token is valid, and belongs to the specified user.
Args: Args:
access_token (str): JWT access_token (str): JWT
user (User): User whose information is contained in the JWT payload. user (User): User whose information is contained in the JWT payload.
(optional) should_be_expired: indicates if the passed in JWT token is expected to be expired
Returns: Returns:
dict: Decoded JWT payload dict: Decoded JWT payload
...@@ -24,13 +28,27 @@ class AccessTokenMixin(object): ...@@ -24,13 +28,27 @@ class AccessTokenMixin(object):
scopes = scopes or [] scopes = scopes or []
audience = settings.JWT_AUTH['JWT_AUDIENCE'] audience = settings.JWT_AUTH['JWT_AUDIENCE']
issuer = settings.JWT_AUTH['JWT_ISSUER'] issuer = settings.JWT_AUTH['JWT_ISSUER']
payload = jwt.decode(
access_token, def _decode_jwt(verify_expiration):
settings.JWT_AUTH['JWT_SECRET_KEY'], """
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']], Helper method to decode a JWT with the ability to
audience=audience, verify the expiration of said token
issuer=issuer """
) return jwt.decode(
access_token,
settings.JWT_AUTH['JWT_SECRET_KEY'],
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']],
audience=audience,
issuer=issuer,
verify_expiration=verify_expiration
)
# Note that if we expect the claims to have expired
# then we ask the JWT library not to verify expiration
# as that would throw a ExpiredSignatureError and
# halt other verifications steps. We'll do a manual
# expiry verification later on
payload = _decode_jwt(verify_expiration=not should_be_expired)
expected = { expected = {
'aud': audience, 'aud': audience,
...@@ -54,4 +72,13 @@ class AccessTokenMixin(object): ...@@ -54,4 +72,13 @@ class AccessTokenMixin(object):
self.assertDictContainsSubset(expected, payload) self.assertDictContainsSubset(expected, payload)
# Since we suppressed checking of expiry
# in the claim in the above check, because we want
# to fully examine the claims outside of the expiry,
# now we should assert that the claim is indeed
# expired
if should_be_expired:
with self.assertRaises(ExpiredSignatureError):
_decode_jwt(verify_expiration=True)
return payload return payload
...@@ -5,17 +5,21 @@ Tests for DOT Adapter ...@@ -5,17 +5,21 @@ Tests for DOT Adapter
from datetime import timedelta from datetime import timedelta
import ddt import ddt
from django.conf import settings
from django.test import TestCase from django.test import TestCase
from django.utils.timezone import now from django.utils.timezone import now
from oauth2_provider import models from oauth2_provider import models
import unittest
from student.tests.factories import UserFactory from student.tests.factories import UserFactory
from ..adapters import DOTAdapter from ..adapters import DOTAdapter
from .constants import DUMMY_REDIRECT_URL from .constants import DUMMY_REDIRECT_URL, DUMMY_REDIRECT_URL2
from ..models import RestrictedApplication
@ddt.ddt @ddt.ddt
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class DOTAdapterTestCase(TestCase): class DOTAdapterTestCase(TestCase):
""" """
Test class for DOTAdapter. Test class for DOTAdapter.
...@@ -38,6 +42,21 @@ class DOTAdapterTestCase(TestCase): ...@@ -38,6 +42,21 @@ class DOTAdapterTestCase(TestCase):
redirect_uri=DUMMY_REDIRECT_URL, redirect_uri=DUMMY_REDIRECT_URL,
client_id='confidential-client-id', client_id='confidential-client-id',
) )
self.restricted_client = self.adapter.create_confidential_client(
name='restricted app',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL2,
client_id='restricted-client-id',
)
self.restricted_app = RestrictedApplication.objects.create(application=self.restricted_client)
def test_restricted_app_unicode(self):
"""
Make sure unicode representation of RestrictedApplication is correct
"""
self.assertEqual(unicode(self.restricted_app), u"<RestrictedApplication '{name}'>".format(
name=self.restricted_client.name
))
@ddt.data( @ddt.data(
('confidential', models.Application.CLIENT_CONFIDENTIAL), ('confidential', models.Application.CLIENT_CONFIDENTIAL),
...@@ -51,7 +70,14 @@ class DOTAdapterTestCase(TestCase): ...@@ -51,7 +70,14 @@ class DOTAdapterTestCase(TestCase):
self.assertEqual(client.client_type, client_type) self.assertEqual(client.client_type, client_type)
def test_get_client(self): def test_get_client(self):
client = self.adapter.get_client(client_type=models.Application.CLIENT_CONFIDENTIAL) """
Read back one of the confidential clients (there are two)
and verify that we get back what we expected
"""
client = self.adapter.get_client(
redirect_uris=DUMMY_REDIRECT_URL,
client_type=models.Application.CLIENT_CONFIDENTIAL
)
self.assertIsInstance(client, models.Application) self.assertIsInstance(client, models.Application)
self.assertEqual(client.client_type, models.Application.CLIENT_CONFIDENTIAL) self.assertEqual(client.client_type, models.Application.CLIENT_CONFIDENTIAL)
...@@ -74,3 +100,18 @@ class DOTAdapterTestCase(TestCase): ...@@ -74,3 +100,18 @@ class DOTAdapterTestCase(TestCase):
expires=now() + timedelta(days=30), expires=now() + timedelta(days=30),
) )
self.assertEqual(self.adapter.get_access_token(token_string='token-id'), token) self.assertEqual(self.adapter.get_access_token(token_string='token-id'), token)
def test_get_restricted_access_token(self):
"""
Make sure when generating an access_token for a restricted client
that the token is immediately expired
"""
models.AccessToken.objects.create(
token='expired-token-id',
application=self.restricted_client,
user=self.user,
expires=now() + timedelta(days=30),
)
readback_token = self.adapter.get_access_token(token_string='expired-token-id')
self.assertTrue(RestrictedApplication.verify_access_token_as_expired(readback_token))
# pylint: disable=missing-docstring # pylint: disable=missing-docstring
from django.conf import settings
from django.test import TestCase from django.test import TestCase
from oauth2_provider.models import Application, AccessToken, RefreshToken from oauth2_provider.models import Application, AccessToken, RefreshToken
import unittest
from openedx.core.djangoapps.oauth_dispatch.tests import factories from openedx.core.djangoapps.oauth_dispatch.tests import factories
from student.tests.factories import UserFactory from student.tests.factories import UserFactory
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestClientFactory(TestCase): class TestClientFactory(TestCase):
def setUp(self): def setUp(self):
super(TestClientFactory, self).setUp() super(TestClientFactory, self).setUp()
...@@ -18,6 +21,7 @@ class TestClientFactory(TestCase): ...@@ -18,6 +21,7 @@ class TestClientFactory(TestCase):
self.assertEqual(actual_application, expected_application) self.assertEqual(actual_application, expected_application)
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestAccessTokenFactory(TestCase): class TestAccessTokenFactory(TestCase):
def setUp(self): def setUp(self):
super(TestAccessTokenFactory, self).setUp() super(TestAccessTokenFactory, self).setUp()
...@@ -30,6 +34,7 @@ class TestAccessTokenFactory(TestCase): ...@@ -30,6 +34,7 @@ class TestAccessTokenFactory(TestCase):
self.assertEqual(actual_access_token, expected_access_token) self.assertEqual(actual_access_token, expected_access_token)
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestRefreshTokenFactory(TestCase): class TestRefreshTokenFactory(TestCase):
def setUp(self): def setUp(self):
super(TestRefreshTokenFactory, self).setUp() super(TestRefreshTokenFactory, self).setUp()
......
...@@ -9,6 +9,7 @@ from django.conf import settings ...@@ -9,6 +9,7 @@ from django.conf import settings
from django.test import RequestFactory, TestCase from django.test import RequestFactory, TestCase
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
import httpretty import httpretty
from oauth2_provider import models as dot_models
from provider import constants from provider import constants
import unittest import unittest
...@@ -16,10 +17,49 @@ from student.tests.factories import UserFactory ...@@ -16,10 +17,49 @@ from student.tests.factories import UserFactory
from third_party_auth.tests.utils import ThirdPartyOAuthTestMixin, ThirdPartyOAuthTestMixinGoogle from third_party_auth.tests.utils import ThirdPartyOAuthTestMixin, ThirdPartyOAuthTestMixinGoogle
from .constants import DUMMY_REDIRECT_URL from .constants import DUMMY_REDIRECT_URL
from . import mixins
from .. import adapters from .. import adapters
from .. import models
if settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"): if settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"):
from .. import views from .. import views
from . import mixins
class AccessTokenLoginMixin(object):
"""
Shared helper class to assert proper access levels when using access_tokens
"""
def setUp(self):
"""
Initialize mixin
"""
super(AccessTokenLoginMixin, self).setUp()
self.login_with_access_token_url = reverse("login_with_access_token")
def login_with_access_token(self, access_token=None):
"""
Login with access token and return response.
You can optionally send in an accss_token to override
the object's attribute
"""
return self.client.post(
self.login_with_access_token_url,
HTTP_AUTHORIZATION="Bearer {0}".format(access_token if access_token else self.access_token)
)
def _assert_access_token_is_valid(self, access_token=None):
"""
Asserts that oauth assigned access_token is valid and usable
"""
self.assertEqual(self.login_with_access_token(access_token=access_token).status_code, 204)
def _assert_access_token_invalidated(self, access_token=None):
"""
Asserts that oauth assigned access_token is not valid
"""
self.assertEqual(self.login_with_access_token(access_token=access_token).status_code, 401)
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled") @unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
...@@ -48,6 +88,16 @@ class _DispatchingViewTestCase(TestCase): ...@@ -48,6 +88,16 @@ class _DispatchingViewTestCase(TestCase):
client_id='dop-app-client-id', client_id='dop-app-client-id',
) )
# Create a "restricted" DOT Application which means any AccessToken/JWT
# generated for this application will be immediately expired
self.restricted_dot_app = self.dot_adapter.create_public_client(
name='test restricted dot application',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL,
client_id='dot-restricted-app-client-id',
)
models.RestrictedApplication.objects.create(application=self.restricted_dot_app)
def _post_request(self, user, client, token_type=None): def _post_request(self, user, client, token_type=None):
""" """
Call the view with a POST request objectwith the appropriate format, Call the view with a POST request objectwith the appropriate format,
...@@ -63,14 +113,14 @@ class _DispatchingViewTestCase(TestCase): ...@@ -63,14 +113,14 @@ class _DispatchingViewTestCase(TestCase):
@ddt.ddt @ddt.ddt
class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase): class TestAccessTokenView(AccessTokenLoginMixin, mixins.AccessTokenMixin, _DispatchingViewTestCase):
""" """
Test class for AccessTokenView Test class for AccessTokenView
""" """
def setUp(self): def setUp(self):
super(TestAccessTokenView, self).setUp()
self.url = reverse('access_token') self.url = reverse('access_token')
self.view_class = views.AccessTokenView self.view_class = views.AccessTokenView
super(TestAccessTokenView, self).setUp()
def _post_body(self, user, client, token_type=None): def _post_body(self, user, client, token_type=None):
""" """
...@@ -99,6 +149,22 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase): ...@@ -99,6 +149,22 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase):
self.assertIn('scope', data) self.assertIn('scope', data)
self.assertIn('token_type', data) self.assertIn('token_type', data)
def test_restricted_access_token_fields(self):
response = self._post_request(self.user, self.restricted_dot_app)
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertIn('access_token', data)
self.assertIn('expires_in', data)
self.assertIn('scope', data)
self.assertIn('token_type', data)
# Restricted applications have immediately expired tokens
self.assertLess(data['expires_in'], 0)
# double check that the token stored in the DB is marked as expired
access_token = dot_models.AccessToken.objects.get(token=data['access_token'])
self.assertTrue(models.RestrictedApplication.verify_access_token_as_expired(access_token))
@ddt.data('dop_app', 'dot_app') @ddt.data('dop_app', 'dot_app')
def test_jwt_access_token(self, client_attr): def test_jwt_access_token(self, client_attr):
client = getattr(self, client_attr) client = getattr(self, client_attr)
...@@ -109,6 +175,47 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase): ...@@ -109,6 +175,47 @@ class TestAccessTokenView(mixins.AccessTokenMixin, _DispatchingViewTestCase):
self.assertEqual(data['token_type'], 'JWT') self.assertEqual(data['token_type'], 'JWT')
self.assert_valid_jwt_access_token(data['access_token'], self.user, data['scope'].split(' ')) self.assert_valid_jwt_access_token(data['access_token'], self.user, data['scope'].split(' '))
def test_restricted_jwt_access_token(self):
"""
Verify that when requesting a JWT token from a restricted Application
within the DOT subsystem, that our claims is marked as already expired
(i.e. expiry set to Jan 1, 1970)
"""
response = self._post_request(self.user, self.restricted_dot_app, token_type='jwt')
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertIn('expires_in', data)
# jwt must indicate that it is already expired
self.assertLess(data['expires_in'], 0)
self.assertEqual(data['token_type'], 'JWT')
self.assert_valid_jwt_access_token(
data['access_token'],
self.user,
data['scope'].split(' '),
should_be_expired=True
)
def test_restricted_access_token(self):
"""
Verify that an access_token generated for a RestrictedApplication fails when
submitted to an API endpoint
"""
response = self._post_request(self.user, self.restricted_dot_app)
self.assertEqual(response.status_code, 200)
data = json.loads(response.content)
self.assertIn('expires_in', data)
self.assertIn('access_token', data)
# the payload should indicate that the token is expired
self.assertLess(data['expires_in'], 0)
# try submitting this expired access_token to an API,
# and assert that it fails
self._assert_access_token_invalidated(data['access_token'])
def test_dot_access_token_provides_refresh_token(self): def test_dot_access_token_provides_refresh_token(self):
response = self._post_request(self.user, self.dot_app) response = self._post_request(self.user, self.dot_app)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
...@@ -197,6 +304,47 @@ class TestAuthorizationView(_DispatchingViewTestCase): ...@@ -197,6 +304,47 @@ class TestAuthorizationView(_DispatchingViewTestCase):
check_response = getattr(self, '_check_{}_response'.format(client_type)) check_response = getattr(self, '_check_{}_response'.format(client_type))
check_response(response) check_response(response)
def test_check_dot_authorization_page_get(self):
"""
Make sure we get the overridden Authorization page - not
the default django-oauth-toolkit when we perform a page load
"""
self.client.login(username=self.user.username, password='test')
response = self.client.get(
'/oauth2/authorize/',
{
'client_id': self.dot_app.client_id,
'response_type': 'code',
'state': 'random_state_string',
'redirect_uri': DUMMY_REDIRECT_URL,
'scope': 'profile'
},
follow=True,
)
# are the requested scopes on the page? We only requested 'profile', lets make
# sure the page only lists that one
self.assertContains(response, settings.OAUTH2_PROVIDER['SCOPES']['profile'])
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['read'])
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['write'])
self.assertNotContains(response, settings.OAUTH2_PROVIDER['SCOPES']['email'])
# is the application name specified?
self.assertContains(
response,
"Authorize {name}".format(name=self.dot_app.name)
)
# are the cancel and allow buttons on the page?
self.assertContains(
response,
'<button type="submit" class="btn btn-authorization-cancel" name="cancel"/>Cancel</button>'
)
self.assertContains(
response,
'<button type="submit" class="btn btn-authorization-allow" name="allow" value="Authorize"/>Allow</button>'
)
def _check_dot_response(self, response): def _check_dot_response(self, response):
""" """
Check that django-oauth-toolkit gives an appropriate authorization response. Check that django-oauth-toolkit gives an appropriate authorization response.
...@@ -327,12 +475,11 @@ class TestViewDispatch(TestCase): ...@@ -327,12 +475,11 @@ class TestViewDispatch(TestCase):
self.assertRaises(KeyError, view_object.get_view_for_backend, None) self.assertRaises(KeyError, view_object.get_view_for_backend, None)
class TestRevokeTokenView(_DispatchingViewTestCase): # pylint: disable=abstract-method class TestRevokeTokenView(AccessTokenLoginMixin, _DispatchingViewTestCase): # pylint: disable=abstract-method
""" """
Test class for RevokeTokenView Test class for RevokeTokenView
""" """
def setUp(self): def setUp(self):
self.login_with_access_token_url = reverse("login_with_access_token")
self.revoke_token_url = reverse('revoke_token') self.revoke_token_url = reverse('revoke_token')
self.access_token_url = reverse('access_token') self.access_token_url = reverse('access_token')
...@@ -374,27 +521,6 @@ class TestRevokeTokenView(_DispatchingViewTestCase): # pylint: disable=abstract ...@@ -374,27 +521,6 @@ class TestRevokeTokenView(_DispatchingViewTestCase): # pylint: disable=abstract
'token': token, 'token': token,
} }
def login_with_access_token(self):
"""
Login with access token and return response
"""
return self.client.post(
self.login_with_access_token_url,
HTTP_AUTHORIZATION="Bearer {0}".format(self.access_token)
)
def _assert_access_token_is_valid(self):
"""
Asserts that oauth assigned access_token is valid and usable
"""
self.assertEqual(self.login_with_access_token().status_code, 204)
def _assert_access_token_invalidated(self):
"""
Asserts that oauth assigned access_token is not valid
"""
self.assertEqual(self.login_with_access_token().status_code, 401)
def _assert_refresh_token_invalidated(self): def _assert_refresh_token_invalidated(self):
""" """
Asserts that oauth assigned refresh_token is not valid Asserts that oauth assigned refresh_token is not valid
......
...@@ -11,6 +11,7 @@ from collections import namedtuple ...@@ -11,6 +11,7 @@ from collections import namedtuple
import ddt import ddt
from datetime import datetime, timedelta from datetime import datetime, timedelta
from django.conf import settings
from django.conf.urls import patterns, url, include from django.conf.urls import patterns, url, include
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.http import HttpResponse from django.http import HttpResponse
...@@ -26,6 +27,7 @@ from rest_framework.test import APIRequestFactory, APIClient ...@@ -26,6 +27,7 @@ from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.views import APIView from rest_framework.views import APIView
from rest_framework_oauth import permissions from rest_framework_oauth import permissions
from rest_framework_oauth.compat import oauth2_provider, oauth2_provider_scope from rest_framework_oauth.compat import oauth2_provider, oauth2_provider_scope
import unittest
from openedx.core.djangoapps.oauth_dispatch import adapters from openedx.core.djangoapps.oauth_dispatch import adapters
from openedx.core.lib.api import authentication from openedx.core.lib.api import authentication
...@@ -73,6 +75,7 @@ urlpatterns = patterns( ...@@ -73,6 +75,7 @@ urlpatterns = patterns(
@attr(shard=2) @attr(shard=2)
@ddt.ddt @ddt.ddt
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class OAuth2Tests(TestCase): class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication""" """OAuth 2.0 authentication"""
urls = 'openedx.core.lib.api.tests.test_authentication' urls = 'openedx.core.lib.api.tests.test_authentication'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment