Commit 6d327594 by chrisndodge

add restricted DOT OAUTH2 client with reduced privileges

parent 0928cfde
......@@ -1201,3 +1201,10 @@ DOC_LINK_BASE_URL = None
# Theme directory locale paths
COMPREHENSIVE_THEME_LOCALE_PATHS = []
# This is required for the migrations in oauth_dispatch.models
# otherwise it fails saying this attribute is not present in Settings
# Although Studio does not exable OAuth2 Provider capability, the new approach
# to generating test databases will discover and try to create all tables
# and this setting needs to be present
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
......@@ -331,7 +331,3 @@ FEATURES['CUSTOM_COURSES_EDX'] = True
# API access management -- needed for simple-history to run.
INSTALLED_APPS += ('openedx.core.djangoapps.api_admin',)
# Set the default Oauth2 Provider Model so that migrations can run in
# verbose mode
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
......@@ -451,9 +451,14 @@ OAUTH2_PROVIDER = {
'read': 'Read scope',
'write': 'Write scope',
'email': 'Email scope',
'profile': 'Profile scope',
}
# conform profile scope message that is presented to end-user
# to lms/templates/provider/authorize.html. This may be revised later.
'profile': 'Read your user profile',
},
}
# This is required for the migrations in oauth_dispatch.models
# otherwise it fails saying this attribute is not present in Settings
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
################################## TEMPLATE CONFIGURATION #####################################
# Mako templating
......
......@@ -582,10 +582,6 @@ JWT_AUTH.update({
'JWT_AUDIENCE': 'test-key',
})
# Set the default Oauth2 Provider Model so that migrations can run in
# verbose mode
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
COURSE_CATALOG_API_URL = 'https://catalog.example.com/api/v1'
COMPREHENSIVE_THEME_DIRS = [REPO_ROOT / "themes", REPO_ROOT / "common/test"]
......
......@@ -15,3 +15,41 @@
}
}
// For the django-oauth-toolkit Authorization view
.wrapper-authorize {
background: $white;
width: 60%;
padding-right: 140px;
padding-left: 140px;
font-family: $sans-serif;
h1 {
@extend %t-title4;
margin-bottom: 0;
margin-left: 0;
padding: $baseline;
padding-left: 0px;
@include text-align(left);
}
p {
@extend %t-copy-base;
margin: $baseline/2 0;
}
.control-group {
float: right;
}
.btn-authorization-allow {
@extend %btn-primary-blue;
margin-left: 20px;
line-height: 0.7em;
}
.btn-authorization-cancel {
@extend %btn-secondary-blue-outline;
}
}
{% extends "main_django.html" %}
{% load i18n configuration %}
{% block title %}
{% trans "Authorize" %} | {% platform_name %}
{% endblock %}
{% block body %}
<main id="main" aria-label="Content" tabindex="-1">
<section class="container authorize {{ selected_tab }}" id="authorize-content">
<div class="wrapper-authorize">
<div class="block-center">
{% if not error %}
<form id="authorizationForm" method="post">
<h1 class="block-center-heading">{% trans "Authorize" %} {{ application.name }}?</h1>
{% csrf_token %}
{% for field in form %}
{% if field.is_hidden %}
{{ field }}
{% endif %}
{% endfor %}
<p>{% trans "The above application requests the following permissions from your account:" %}</p>
<ul>
{% for scope in scopes_descriptions %}
<li>{{ scope }}</li>
{% endfor %}
</ul>
<p>{% trans "Please click the 'Allow' button to grant these permissions to the above application. Otherwise, to withhold these permissions, please click the 'Cancel' button." %}
</p>
{{ form.errors }}
{{ form.non_field_errors }}
<div class="control-group">
<div class="controls">
<button type="submit" class="btn btn-authorization-cancel" name="cancel"/>{% trans "Cancel" %}</button><button type="submit" class="btn btn-authorization-allow" name="allow" value="Authorize"/>{% trans "Allow" %}</button>
</div>
</div>
</form>
{% else %}
<h2>{% trans "Error" %}: {{ error.error }}</h2>
<p>{{ error.description }}</p>
{% endif %}
</div>
</div>
</section>
</main>
{% endblock %}
......@@ -5,6 +5,8 @@ Override admin configuration for django-oauth-toolkit
from django.contrib.admin import ModelAdmin, site
from oauth2_provider import models
from .models import RestrictedApplication
def reregister(model_class):
"""
......@@ -71,3 +73,13 @@ class DOTGrantAdmin(ModelAdmin):
list_filter = [u'application']
raw_id_fields = [u'user']
search_fields = [u'code', u'user__username']
class RestrictedApplicationAdmin(ModelAdmin):
"""
ModelAdmin for the Restricted Application
"""
list_display = [u'application']
site.register(RestrictedApplication, RestrictedApplicationAdmin)
......@@ -3,10 +3,32 @@ Classes that override default django-oauth-toolkit behavior
"""
from __future__ import unicode_literals
from .models import RestrictedApplication
from datetime import datetime
from django.contrib.auth import authenticate, get_user_model
from django.db.models.signals import pre_save
from django.dispatch import receiver
from pytz import utc
from oauth2_provider.models import AccessToken
from oauth2_provider.oauth2_validators import OAuth2Validator
@receiver(pre_save, sender=AccessToken)
def on_access_token_presave(sender, instance, *args, **kwargs): # pylint: disable=unused-argument
"""
A hook on the AccessToken. Since we do not have protected scopes, we must mark all
AccessTokens as expired for 'restricted applications'.
We do this as a pre-save hook on the ORM
"""
is_application_restricted = RestrictedApplication.objects.filter(application=instance.application).exists()
if is_application_restricted:
RestrictedApplication.set_access_token_as_expired(instance)
class EdxOAuth2Validator(OAuth2Validator):
"""
Validator class that implements edX-specific custom behavior:
......@@ -61,6 +83,23 @@ class EdxOAuth2Validator(OAuth2Validator):
super(EdxOAuth2Validator, self).save_bearer_token(token, request, *args, **kwargs)
is_application_restricted = RestrictedApplication.objects.filter(application=request.client).exists()
if is_application_restricted:
# Since RestrictedApplications will override the DOT defined expiry, so that access_tokens
# are always expired, we need to re-read the token from the database and then calculate the
# expires_in (in seconds) from what we stored in the database. This value should be a negative
#value, meaning that it is already expired
access_token = AccessToken.objects.get(token=token['access_token'])
utc_now = datetime.utcnow().replace(tzinfo=utc)
expires_in = (access_token.expires - utc_now).total_seconds()
# assert that RestriectedApplications only issue expired tokens
# blow up processing if we see otherwise
assert expires_in < 0
token['expires_in'] = expires_in
# Restore the original request attributes
request.grant_type = grant_type
request.user = user
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_APPLICATION_MODEL),
]
operations = [
migrations.CreateModel(
name='RestrictedApplication',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('application', models.ForeignKey(to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL)),
],
),
]
"""
Specialized models for oauth_dispatch djangoapp
"""
from datetime import datetime
from django.db import models
from pytz import utc
from oauth2_provider.settings import oauth2_settings
class RestrictedApplication(models.Model):
"""
This model lists which django-oauth-toolkit Applications are considered 'restricted'
and thus have a limited ability to use various APIs.
A restricted Application will only get expired token/JWT payloads
so that they cannot be used to call into APIs.
"""
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, null=False)
def __unicode__(self):
"""
Return a unicode representation of this object
"""
return u"<RestrictedApplication '{name}'>".format(
name=self.application.name
)
@classmethod
def set_access_token_as_expired(cls, access_token):
"""
For access_tokens for RestrictedApplications, put the expire timestamp into the beginning of the epoch
which is Jan. 1, 1970
"""
access_token.expires = datetime(1970, 1, 1, tzinfo=utc)
@classmethod
def verify_access_token_as_expired(cls, access_token):
"""
For access_tokens for RestrictedApplications, make sure that the expiry date
is set at the beginning of the epoch which is Jan. 1, 1970
"""
return access_token.expires == datetime(1970, 1, 1, tzinfo=utc)
......@@ -3,3 +3,4 @@ Constants for testing purposes
"""
DUMMY_REDIRECT_URL = u'https://example.com/edx/redirect'
DUMMY_REDIRECT_URL2 = u'https://example.com/edx/other-redirect'
"""
OAuth Dispatch test mixins
"""
import jwt
from django.conf import settings
import jwt
from jwt.exceptions import ExpiredSignatureError
from student.models import UserProfile, anonymous_id_for_user
class AccessTokenMixin(object):
""" Mixin for tests dealing with OAuth 2 access tokens. """
def assert_valid_jwt_access_token(self, access_token, user, scopes=None):
def assert_valid_jwt_access_token(self, access_token, user, scopes=None, should_be_expired=False):
"""
Verify the specified JWT access token is valid, and belongs to the specified user.
Args:
access_token (str): JWT
user (User): User whose information is contained in the JWT payload.
(optional) should_be_expired: indicates if the passed in JWT token is expected to be expired
Returns:
dict: Decoded JWT payload
......@@ -24,13 +28,27 @@ class AccessTokenMixin(object):
scopes = scopes or []
audience = settings.JWT_AUTH['JWT_AUDIENCE']
issuer = settings.JWT_AUTH['JWT_ISSUER']
payload = jwt.decode(
access_token,
settings.JWT_AUTH['JWT_SECRET_KEY'],
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']],
audience=audience,
issuer=issuer
)
def _decode_jwt(verify_expiration):
"""
Helper method to decode a JWT with the ability to
verify the expiration of said token
"""
return jwt.decode(
access_token,
settings.JWT_AUTH['JWT_SECRET_KEY'],
algorithms=[settings.JWT_AUTH['JWT_ALGORITHM']],
audience=audience,
issuer=issuer,
verify_expiration=verify_expiration
)
# Note that if we expect the claims to have expired
# then we ask the JWT library not to verify expiration
# as that would throw a ExpiredSignatureError and
# halt other verifications steps. We'll do a manual
# expiry verification later on
payload = _decode_jwt(verify_expiration=not should_be_expired)
expected = {
'aud': audience,
......@@ -54,4 +72,13 @@ class AccessTokenMixin(object):
self.assertDictContainsSubset(expected, payload)
# Since we suppressed checking of expiry
# in the claim in the above check, because we want
# to fully examine the claims outside of the expiry,
# now we should assert that the claim is indeed
# expired
if should_be_expired:
with self.assertRaises(ExpiredSignatureError):
_decode_jwt(verify_expiration=True)
return payload
......@@ -5,17 +5,21 @@ Tests for DOT Adapter
from datetime import timedelta
import ddt
from django.conf import settings
from django.test import TestCase
from django.utils.timezone import now
from oauth2_provider import models
import unittest
from student.tests.factories import UserFactory
from ..adapters import DOTAdapter
from .constants import DUMMY_REDIRECT_URL
from .constants import DUMMY_REDIRECT_URL, DUMMY_REDIRECT_URL2
from ..models import RestrictedApplication
@ddt.ddt
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class DOTAdapterTestCase(TestCase):
"""
Test class for DOTAdapter.
......@@ -38,6 +42,21 @@ class DOTAdapterTestCase(TestCase):
redirect_uri=DUMMY_REDIRECT_URL,
client_id='confidential-client-id',
)
self.restricted_client = self.adapter.create_confidential_client(
name='restricted app',
user=self.user,
redirect_uri=DUMMY_REDIRECT_URL2,
client_id='restricted-client-id',
)
self.restricted_app = RestrictedApplication.objects.create(application=self.restricted_client)
def test_restricted_app_unicode(self):
"""
Make sure unicode representation of RestrictedApplication is correct
"""
self.assertEqual(unicode(self.restricted_app), u"<RestrictedApplication '{name}'>".format(
name=self.restricted_client.name
))
@ddt.data(
('confidential', models.Application.CLIENT_CONFIDENTIAL),
......@@ -51,7 +70,14 @@ class DOTAdapterTestCase(TestCase):
self.assertEqual(client.client_type, client_type)
def test_get_client(self):
client = self.adapter.get_client(client_type=models.Application.CLIENT_CONFIDENTIAL)
"""
Read back one of the confidential clients (there are two)
and verify that we get back what we expected
"""
client = self.adapter.get_client(
redirect_uris=DUMMY_REDIRECT_URL,
client_type=models.Application.CLIENT_CONFIDENTIAL
)
self.assertIsInstance(client, models.Application)
self.assertEqual(client.client_type, models.Application.CLIENT_CONFIDENTIAL)
......@@ -74,3 +100,18 @@ class DOTAdapterTestCase(TestCase):
expires=now() + timedelta(days=30),
)
self.assertEqual(self.adapter.get_access_token(token_string='token-id'), token)
def test_get_restricted_access_token(self):
"""
Make sure when generating an access_token for a restricted client
that the token is immediately expired
"""
models.AccessToken.objects.create(
token='expired-token-id',
application=self.restricted_client,
user=self.user,
expires=now() + timedelta(days=30),
)
readback_token = self.adapter.get_access_token(token_string='expired-token-id')
self.assertTrue(RestrictedApplication.verify_access_token_as_expired(readback_token))
# pylint: disable=missing-docstring
from django.conf import settings
from django.test import TestCase
from oauth2_provider.models import Application, AccessToken, RefreshToken
import unittest
from openedx.core.djangoapps.oauth_dispatch.tests import factories
from student.tests.factories import UserFactory
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestClientFactory(TestCase):
def setUp(self):
super(TestClientFactory, self).setUp()
......@@ -18,6 +21,7 @@ class TestClientFactory(TestCase):
self.assertEqual(actual_application, expected_application)
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestAccessTokenFactory(TestCase):
def setUp(self):
super(TestAccessTokenFactory, self).setUp()
......@@ -30,6 +34,7 @@ class TestAccessTokenFactory(TestCase):
self.assertEqual(actual_access_token, expected_access_token)
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class TestRefreshTokenFactory(TestCase):
def setUp(self):
super(TestRefreshTokenFactory, self).setUp()
......
......@@ -11,6 +11,7 @@ from collections import namedtuple
import ddt
from datetime import datetime, timedelta
from django.conf import settings
from django.conf.urls import patterns, url, include
from django.contrib.auth.models import User
from django.http import HttpResponse
......@@ -26,6 +27,7 @@ from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.views import APIView
from rest_framework_oauth import permissions
from rest_framework_oauth.compat import oauth2_provider, oauth2_provider_scope
import unittest
from openedx.core.djangoapps.oauth_dispatch import adapters
from openedx.core.lib.api import authentication
......@@ -73,6 +75,7 @@ urlpatterns = patterns(
@attr(shard=2)
@ddt.ddt
@unittest.skipUnless(settings.FEATURES.get("ENABLE_OAUTH2_PROVIDER"), "OAuth2 not enabled")
class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication"""
urls = 'openedx.core.lib.api.tests.test_authentication'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment