Commit e7cdcb9b by Renzo Lucioni Committed by GitHub

Merge pull request #12954 from edx/renzo/extract-token-generation

Unify JWT generation code
parents a873bf7c f6d7371d
"""Programs views for use with Studio.""" """Programs views for use with Studio."""
from django.conf import settings from django.conf import settings
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.core.exceptions import ImproperlyConfigured
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.http import Http404, JsonResponse from django.http import Http404, JsonResponse
from django.utils.decorators import method_decorator from django.utils.decorators import method_decorator
from django.views.generic import View from django.views.generic import View
from provider.oauth2.models import Client
from edxmako.shortcuts import render_to_response from edxmako.shortcuts import render_to_response
from openedx.core.djangoapps.programs.models import ProgramsApiConfig from openedx.core.djangoapps.programs.models import ProgramsApiConfig
from openedx.core.lib.token_utils import get_id_token from openedx.core.lib.token_utils import JwtBuilder
class ProgramAuthoringView(View): class ProgramAuthoringView(View):
...@@ -44,7 +46,24 @@ class ProgramsIdTokenView(View): ...@@ -44,7 +46,24 @@ class ProgramsIdTokenView(View):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
"""Generate and return a token, if the integration is enabled.""" """Generate and return a token, if the integration is enabled."""
if ProgramsApiConfig.current().is_studio_tab_enabled: if ProgramsApiConfig.current().is_studio_tab_enabled:
id_token = get_id_token(request.user, 'programs') # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name.
return JsonResponse({'id_token': id_token}) client_name = 'programs'
try:
client = Client.objects.get(name=client_name)
except Client.DoesNotExist:
raise ImproperlyConfigured(
'OAuth2 Client with name [{}] does not exist.'.format(client_name)
)
scopes = ['email', 'profile']
expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION
jwt = JwtBuilder(request.user, secret=client.client_secret).build_token(
scopes,
expires_in,
aud=client.client_id
)
return JsonResponse({'id_token': jwt})
else: else:
raise Http404 raise Http404
...@@ -147,18 +147,17 @@ class TestProgramsIdTokenView(ProgramsApiConfigMixin, SharedModuleStoreTestCase) ...@@ -147,18 +147,17 @@ class TestProgramsIdTokenView(ProgramsApiConfigMixin, SharedModuleStoreTestCase)
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertIn(settings.LOGIN_URL, response['Location']) self.assertIn(settings.LOGIN_URL, response['Location'])
@mock.patch('cms.djangoapps.contentstore.views.program.get_id_token', return_value='test-id-token') @mock.patch('cms.djangoapps.contentstore.views.program.JwtBuilder.build_token')
def test_config_enabled(self, mock_get_id_token): def test_config_enabled(self, mock_build_token):
""" """
Ensure the endpoint responds with a valid JSON payload when authoring Ensure the endpoint responds with a valid JSON payload when authoring
is enabled. is enabled.
""" """
mock_build_token.return_value = 'test-id-token'
ClientFactory(name=ProgramsApiConfig.OAUTH2_CLIENT_NAME, client_type=CONFIDENTIAL)
self.create_programs_config() self.create_programs_config()
response = self.client.get(self.path) response = self.client.get(self.path)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
payload = json.loads(response.content) payload = json.loads(response.content)
self.assertEqual(payload, {"id_token": "test-id-token"}) self.assertEqual(payload, {'id_token': 'test-id-token'})
# this comparison is a little long-handed because we need to compare user instances directly
user, client_name = mock_get_id_token.call_args[0]
self.assertEqual(user, self.user)
self.assertEqual(client_name, "programs")
...@@ -448,6 +448,9 @@ MICROSITE_DATABASE_TEMPLATE_CACHE_TTL = ENV_TOKENS.get( ...@@ -448,6 +448,9 @@ MICROSITE_DATABASE_TEMPLATE_CACHE_TTL = ENV_TOKENS.get(
# OpenID Connect issuer ID. Normally the URL of the authentication endpoint. # OpenID Connect issuer ID. Normally the URL of the authentication endpoint.
OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER'] OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER']
#### JWT configuration ####
JWT_AUTH.update(ENV_TOKENS.get('JWT_AUTH', {}))
######################## CUSTOM COURSES for EDX CONNECTOR ###################### ######################## CUSTOM COURSES for EDX CONNECTOR ######################
if FEATURES.get('CUSTOM_COURSES_EDX'): if FEATURES.get('CUSTOM_COURSES_EDX'):
INSTALLED_APPS += ('openedx.core.djangoapps.ccxcon',) INSTALLED_APPS += ('openedx.core.djangoapps.ccxcon',)
......
...@@ -75,6 +75,8 @@ from lms.envs.common import ( ...@@ -75,6 +75,8 @@ from lms.envs.common import (
# constants for redirects app # constants for redirects app
REDIRECT_CACHE_TIMEOUT, REDIRECT_CACHE_TIMEOUT,
REDIRECT_CACHE_KEY_PREFIX, REDIRECT_CACHE_KEY_PREFIX,
JWT_AUTH,
) )
from path import Path as path from path import Path as path
from warnings import simplefilter from warnings import simplefilter
......
...@@ -35,6 +35,7 @@ from lms.envs.test import ( ...@@ -35,6 +35,7 @@ from lms.envs.test import (
MEDIA_ROOT, MEDIA_ROOT,
MEDIA_URL, MEDIA_URL,
COMPREHENSIVE_THEME_DIRS, COMPREHENSIVE_THEME_DIRS,
JWT_AUTH,
) )
# mongo connection settings # mongo connection settings
......
...@@ -18,12 +18,13 @@ from django.conf import settings ...@@ -18,12 +18,13 @@ from django.conf import settings
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from provider.oauth2.models import Client
from edxnotes.exceptions import EdxNotesParseError, EdxNotesServiceUnavailable from edxnotes.exceptions import EdxNotesParseError, EdxNotesServiceUnavailable
from edxnotes.plugins import EdxNotesTab from edxnotes.plugins import EdxNotesTab
from courseware.views.views import get_current_child from courseware.views.views import get_current_child
from courseware.access import has_access from courseware.access import has_access
from openedx.core.lib.token_utils import get_id_token from openedx.core.lib.token_utils import JwtBuilder
from student.models import anonymous_id_for_user from student.models import anonymous_id_for_user
from util.date_utils import get_default_time_display from util.date_utils import get_default_time_display
from xmodule.modulestore.django import modulestore from xmodule.modulestore.django import modulestore
...@@ -52,7 +53,19 @@ def get_edxnotes_id_token(user): ...@@ -52,7 +53,19 @@ def get_edxnotes_id_token(user):
""" """
Returns generated ID Token for edxnotes. Returns generated ID Token for edxnotes.
""" """
return get_id_token(user, CLIENT_NAME) # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name.
try:
client = Client.objects.get(name=CLIENT_NAME)
except Client.DoesNotExist:
raise ImproperlyConfigured(
'OAuth2 Client with name [{}] does not exist.'.format(CLIENT_NAME)
)
scopes = ['email', 'profile']
expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION
jwt = JwtBuilder(user, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id)
return jwt
def get_token_url(course_id): def get_token_url(course_id):
......
...@@ -4,6 +4,8 @@ OAuth Dispatch test mixins ...@@ -4,6 +4,8 @@ OAuth Dispatch test mixins
import jwt import jwt
from django.conf import settings from django.conf import settings
from student.models import UserProfile, anonymous_id_for_user
class AccessTokenMixin(object): class AccessTokenMixin(object):
""" Mixin for tests dealing with OAuth 2 access tokens. """ """ Mixin for tests dealing with OAuth 2 access tokens. """
...@@ -35,11 +37,21 @@ class AccessTokenMixin(object): ...@@ -35,11 +37,21 @@ class AccessTokenMixin(object):
'iss': issuer, 'iss': issuer,
'preferred_username': user.username, 'preferred_username': user.username,
'scopes': scopes, 'scopes': scopes,
'sub': anonymous_id_for_user(user, None),
} }
if 'email' in scopes: if 'email' in scopes:
expected['email'] = user.email expected['email'] = user.email
if 'profile' in scopes:
try:
name = UserProfile.objects.get(user=user).name
except UserProfile.DoesNotExist:
name = None
expected['name'] = name
expected['administrator'] = user.is_staff
self.assertDictContainsSubset(expected, payload) self.assertDictContainsSubset(expected, payload)
return payload return payload
...@@ -17,6 +17,7 @@ from edx_oauth2_provider import views as dop_views # django-oauth2-provider vie ...@@ -17,6 +17,7 @@ from edx_oauth2_provider import views as dop_views # django-oauth2-provider vie
from oauth2_provider import models as dot_models, views as dot_views # django-oauth-toolkit from oauth2_provider import models as dot_models, views as dot_views # django-oauth-toolkit
from openedx.core.djangoapps.theming import helpers from openedx.core.djangoapps.theming import helpers
from openedx.core.lib.token_utils import JwtBuilder
from . import adapters from . import adapters
...@@ -87,15 +88,6 @@ class AccessTokenView(_DispatchingView): ...@@ -87,15 +88,6 @@ class AccessTokenView(_DispatchingView):
dot_view = dot_views.TokenView dot_view = dot_views.TokenView
dop_view = dop_views.AccessTokenView dop_view = dop_views.AccessTokenView
@cached_property
def claim_handlers(self):
""" Returns a dictionary mapping scopes to methods that will add claims to the JWT payload. """
return {
'email': self._attach_email_claim,
'profile': self._attach_profile_claim
}
def dispatch(self, request, *args, **kwargs): def dispatch(self, request, *args, **kwargs):
response = super(AccessTokenView, self).dispatch(request, *args, **kwargs) response = super(AccessTokenView, self).dispatch(request, *args, **kwargs)
...@@ -103,7 +95,7 @@ class AccessTokenView(_DispatchingView): ...@@ -103,7 +95,7 @@ class AccessTokenView(_DispatchingView):
expires_in, scopes, user = self._decompose_access_token_response(request, response) expires_in, scopes, user = self._decompose_access_token_response(request, response)
content = { content = {
'access_token': self._generate_jwt(user, scopes, expires_in), 'access_token': JwtBuilder(user).build_token(scopes, expires_in),
'expires_in': expires_in, 'expires_in': expires_in,
'token_type': 'JWT', 'token_type': 'JWT',
'scope': ' '.join(scopes), 'scope': ' '.join(scopes),
...@@ -123,43 +115,6 @@ class AccessTokenView(_DispatchingView): ...@@ -123,43 +115,6 @@ class AccessTokenView(_DispatchingView):
expires_in = content['expires_in'] expires_in = content['expires_in']
return expires_in, scopes, user return expires_in, scopes, user
def _generate_jwt(self, user, scopes, expires_in):
""" Returns a JWT access token. """
now = int(time())
jwt_auth = helpers.get_value("JWT_AUTH", settings.JWT_AUTH)
payload = {
'iss': jwt_auth['JWT_ISSUER'],
'aud': jwt_auth['JWT_AUDIENCE'],
'exp': now + expires_in,
'iat': now,
'preferred_username': user.username,
'scopes': scopes,
}
for scope in scopes:
handler = self.claim_handlers.get(scope)
if handler:
handler(payload, user)
secret = jwt_auth['JWT_SECRET_KEY']
token = jwt.encode(payload, secret, algorithm=jwt_auth['JWT_ALGORITHM'])
return token
def _attach_email_claim(self, payload, user):
""" Add the email claim details to the JWT payload. """
payload['email'] = user.email
def _attach_profile_claim(self, payload, user):
""" Add the profile claim details to the JWT payload. """
payload.update({
'family_name': user.last_name,
'name': user.get_full_name(),
'given_name': user.first_name,
'administrator': user.is_staff,
})
class AuthorizationView(_DispatchingView): class AuthorizationView(_DispatchingView):
""" """
......
""" Course Discovery API Service. """ """ Course Discovery API Service. """
import datetime
import jwt
from django.conf import settings from django.conf import settings
from edx_rest_api_client.client import EdxRestApiClient from edx_rest_api_client.client import EdxRestApiClient
from openedx.core.djangoapps.theming import helpers from openedx.core.lib.token_utils import JwtBuilder
from student.models import UserProfile, anonymous_id_for_user
def get_id_token(user):
"""
Return a JWT for `user`, suitable for use with the course discovery service.
Arguments:
user (User): User for whom to generate the JWT.
Returns:
str: The JWT.
"""
try:
# Service users may not have user profiles.
full_name = UserProfile.objects.get(user=user).name
except UserProfile.DoesNotExist:
full_name = None
now = datetime.datetime.utcnow()
expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30)
payload = {
'preferred_username': user.username,
'name': full_name,
'email': user.email,
'administrator': user.is_staff,
'iss': helpers.get_value('OAUTH_OIDC_ISSUER', settings.OAUTH_OIDC_ISSUER),
'exp': now + datetime.timedelta(seconds=expires_in),
'iat': now,
'aud': helpers.get_value('JWT_AUTH', settings.JWT_AUTH)['JWT_AUDIENCE'],
'sub': anonymous_id_for_user(user, None),
}
secret_key = helpers.get_value('JWT_AUTH', settings.JWT_AUTH)['JWT_SECRET_KEY']
return jwt.encode(payload, secret_key).decode('utf-8')
def course_discovery_api_client(user): def course_discovery_api_client(user):
""" Returns a Course Discovery API client setup with authentication for the specified user. """ """ Returns a Course Discovery API client setup with authentication for the specified user. """
return EdxRestApiClient(settings.COURSE_CATALOG_API_URL, jwt=get_id_token(user)) scopes = ['email', 'profile']
expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION
jwt = JwtBuilder(user).build_token(scopes, expires_in)
return EdxRestApiClient(settings.COURSE_CATALOG_API_URL, jwt=jwt)
...@@ -5,13 +5,15 @@ from celery import task ...@@ -5,13 +5,15 @@ from celery import task
from celery.utils.log import get_task_logger # pylint: disable=no-name-in-module, import-error from celery.utils.log import get_task_logger # pylint: disable=no-name-in-module, import-error
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ImproperlyConfigured
from edx_rest_api_client.client import EdxRestApiClient from edx_rest_api_client.client import EdxRestApiClient
from provider.oauth2.models import Client
from openedx.core.djangoapps.credentials.models import CredentialsApiConfig from openedx.core.djangoapps.credentials.models import CredentialsApiConfig
from openedx.core.djangoapps.credentials.utils import get_user_credentials from openedx.core.djangoapps.credentials.utils import get_user_credentials
from openedx.core.djangoapps.programs.models import ProgramsApiConfig from openedx.core.djangoapps.programs.models import ProgramsApiConfig
from openedx.core.djangoapps.programs.utils import ProgramProgressMeter from openedx.core.djangoapps.programs.utils import ProgramProgressMeter
from openedx.core.lib.token_utils import get_id_token from openedx.core.lib.token_utils import JwtBuilder
LOGGER = get_task_logger(__name__) LOGGER = get_task_logger(__name__)
...@@ -31,8 +33,21 @@ def get_api_client(api_config, student): ...@@ -31,8 +33,21 @@ def get_api_client(api_config, student):
EdxRestApiClient EdxRestApiClient
""" """
id_token = get_id_token(student, api_config.OAUTH2_CLIENT_NAME) # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name.
return EdxRestApiClient(api_config.internal_api_url, jwt=id_token) client_name = api_config.OAUTH2_CLIENT_NAME
try:
client = Client.objects.get(name=client_name)
except Client.DoesNotExist:
raise ImproperlyConfigured(
'OAuth2 Client with name [{}] does not exist.'.format(client_name)
)
scopes = ['email', 'profile']
expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION
jwt = JwtBuilder(student, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id)
return EdxRestApiClient(api_config.internal_api_url, jwt=jwt)
def get_completed_programs(student): def get_completed_programs(student):
......
...@@ -34,8 +34,8 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin): ...@@ -34,8 +34,8 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin):
Test the get_api_client function Test the get_api_client function
""" """
@mock.patch(TASKS_MODULE + '.get_id_token') @mock.patch(TASKS_MODULE + '.JwtBuilder.build_token')
def test_get_api_client(self, mock_get_id_token): def test_get_api_client(self, mock_build_token):
""" """
Ensure the function is making the right API calls based on inputs Ensure the function is making the right API calls based on inputs
""" """
...@@ -45,10 +45,9 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin): ...@@ -45,10 +45,9 @@ class GetApiClientTestCase(TestCase, ProgramsApiConfigMixin):
internal_service_url='http://foo', internal_service_url='http://foo',
api_version_number=99, api_version_number=99,
) )
mock_get_id_token.return_value = 'test-token' mock_build_token.return_value = 'test-token'
api_client = tasks.get_api_client(api_config, student) api_client = tasks.get_api_client(api_config, student)
self.assertEqual(mock_get_id_token.call_args[0], (student, 'programs'))
self.assertEqual(api_client._store['base_url'], 'http://foo/api/v99/') # pylint: disable=protected-access self.assertEqual(api_client._store['base_url'], 'http://foo/api/v99/') # pylint: disable=protected-access
self.assertEqual(api_client._store['session'].auth.token, 'test-token') # pylint: disable=protected-access self.assertEqual(api_client._store['session'].auth.token, 'test-token') # pylint: disable=protected-access
......
...@@ -2,10 +2,13 @@ ...@@ -2,10 +2,13 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from edx_rest_api_client.client import EdxRestApiClient from edx_rest_api_client.client import EdxRestApiClient
from provider.oauth2.models import Client
from openedx.core.lib.token_utils import get_id_token from openedx.core.lib.token_utils import JwtBuilder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -48,7 +51,20 @@ def get_edx_api_data(api_config, user, resource, ...@@ -48,7 +51,20 @@ def get_edx_api_data(api_config, user, resource,
try: try:
if not api: if not api:
jwt = get_id_token(user, api_config.OAUTH2_CLIENT_NAME) # TODO: Use the system's JWT_AUDIENCE and JWT_SECRET_KEY instead of client ID and name.
client_name = api_config.OAUTH2_CLIENT_NAME
try:
client = Client.objects.get(name=client_name)
except Client.DoesNotExist:
raise ImproperlyConfigured(
'OAuth2 Client with name [{}] does not exist.'.format(client_name)
)
scopes = ['email', 'profile']
expires_in = settings.OAUTH_ID_TOKEN_EXPIRATION
jwt = JwtBuilder(user, secret=client.client_secret).build_token(scopes, expires_in, aud=client.client_id)
api = EdxRestApiClient(api_config.internal_api_url, jwt=jwt) api = EdxRestApiClient(api_config.internal_api_url, jwt=jwt)
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
log.exception('Failed to initialize the %s API client.', api_config.API_NAME) log.exception('Failed to initialize the %s API client.', api_config.API_NAME)
......
"""Tests covering utilities for working with ID tokens.""" """Tests covering JWT construction utilities."""
import calendar
import datetime
import ddt import ddt
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase from django.test import TestCase
from django.test.utils import override_settings
import freezegun
import jwt import jwt
from nose.plugins.attrib import attr from nose.plugins.attrib import attr
from edx_oauth2_provider.tests.factories import ClientFactory
from provider.constants import CONFIDENTIAL
from openedx.core.lib.token_utils import get_id_token from lms.djangoapps.oauth_dispatch.tests import mixins
from student.models import anonymous_id_for_user from openedx.core.lib.token_utils import JwtBuilder
from student.tests.factories import UserFactory, UserProfileFactory from student.tests.factories import UserFactory, UserProfileFactory
@attr('shard_2') @attr('shard_2')
@ddt.ddt @ddt.ddt
class TestIdTokenGeneration(TestCase): class TestJwtBuilder(mixins.AccessTokenMixin, TestCase):
"""Tests covering ID token generation.""" """
client_name = 'edx-dummy-client' Test class for JwtBuilder.
"""
def setUp(self):
super(TestIdTokenGeneration, self).setUp()
self.oauth2_client = ClientFactory(name=self.client_name, client_type=CONFIDENTIAL)
self.user = UserFactory.build()
self.user.save()
@override_settings(OAUTH_OIDC_ISSUER='test-issuer', OAUTH_ID_TOKEN_EXPIRATION=1)
@freezegun.freeze_time('2015-01-01 12:00:00')
@ddt.data(True, False)
def test_get_id_token(self, has_profile):
"""Verify that ID tokens are signed with the correct secret and generated with the correct claims."""
full_name = UserProfileFactory(user=self.user).name if has_profile else None
token = get_id_token(self.user, self.client_name) expires_in = 10
payload = jwt.decode( def setUp(self):
token, super(TestJwtBuilder, self).setUp()
self.oauth2_client.client_secret,
audience=self.oauth2_client.client_id, self.user = UserFactory()
issuer=settings.OAUTH_OIDC_ISSUER, self.profile = UserProfileFactory(user=self.user)
)
@ddt.data(
now = datetime.datetime.utcnow() [],
expiration = now + datetime.timedelta(seconds=settings.OAUTH_ID_TOKEN_EXPIRATION) ['email'],
['profile'],
expected_payload = { ['email', 'profile'],
'preferred_username': self.user.username, )
'name': full_name, def test_jwt_construction(self, scopes):
'email': self.user.email, """
'administrator': self.user.is_staff, Verify that a valid JWT is built, including claims for the requested scopes.
'iss': settings.OAUTH_OIDC_ISSUER, """
'exp': calendar.timegm(expiration.utctimetuple()), token = JwtBuilder(self.user).build_token(scopes, self.expires_in)
'iat': calendar.timegm(now.utctimetuple()), self.assert_valid_jwt_access_token(token, self.user, scopes)
'aud': self.oauth2_client.client_id,
'sub': anonymous_id_for_user(self.user, None), def test_user_profile_missing(self):
} """
Verify that token construction succeeds if the UserProfile is missing.
self.assertEqual(payload, expected_payload) """
self.profile.delete() # pylint: disable=no-member
def test_get_id_token_invalid_client(self):
"""Verify that ImproperlyConfigured is raised when an invalid client name is provided.""" scopes = ['profile']
with self.assertRaises(ImproperlyConfigured): token = JwtBuilder(self.user).build_token(scopes, self.expires_in)
get_id_token(self.user, 'does-not-exist') self.assert_valid_jwt_access_token(token, self.user, scopes)
def test_override_secret_and_audience(self):
"""
Verify that the signing key and audience can be overridden.
"""
secret = 'avoid-this'
audience = 'avoid-this-too'
scopes = []
token = JwtBuilder(self.user, secret=secret).build_token(scopes, self.expires_in, aud=audience)
jwt.decode(token, secret, audience=audience)
"""Utilities for working with ID tokens.""" """Utilities for working with ID tokens."""
import datetime from time import time
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key
from django.conf import settings from django.conf import settings
from django.core.exceptions import ImproperlyConfigured from django.utils.functional import cached_property
import jwt import jwt
from provider.oauth2.models import Client
from openedx.core.djangoapps.theming import helpers
from student.models import UserProfile, anonymous_id_for_user from student.models import UserProfile, anonymous_id_for_user
def get_id_token(user, client_name, secret_key=None): class JwtBuilder(object):
"""Construct a JWT for use with the named client. """Utility for building JWTs.
The JWT is signed with the named client's secret, and includes the following claims: Unifies diverse approaches to JWT creation in a single class. This utility defaults to using the system's
JWT configuration.
preferred_username (str): The user's username. The claim name is borrowed from edx-oauth2-provider. NOTE: This utility class will allow you to override the signing key and audience claim to support those
name (str): The user's full name. clients which still require this. This approach to JWT creation is DEPRECATED. Avoid doing this for new clients.
email (str): The user's email address.
administrator (Boolean): Whether the user has staff permissions.
iss (str): Registered claim. Identifies the principal that issued the JWT.
exp (int): Registered claim. Identifies the expiration time on or after which
the JWT must NOT be accepted for processing.
iat (int): Registered claim. Identifies the time at which the JWT was issued.
aud (str): Registered claim. Identifies the recipients that the JWT is intended for. This implementation
uses the named client's ID.
sub (int): Registered claim. Identifies the user. This implementation uses the raw user id.
Arguments: Arguments:
user (User): User for which to generate the JWT. user (User): User for which to generate the JWT.
client_name (unicode): Name of the OAuth2 Client for which the token is intended.
secret_key (str): Optional secret key for signing the JWT. Defaults to the configured client secret
if not provided.
Returns:
str: the JWT
Raises:
ImproperlyConfigured: If no OAuth2 Client with the provided name exists.
"""
try:
client = Client.objects.get(name=client_name)
except Client.DoesNotExist:
raise ImproperlyConfigured('OAuth2 Client with name [%s] does not exist' % client_name)
try:
# Service users may not have user profiles.
full_name = UserProfile.objects.get(user=user).name
except UserProfile.DoesNotExist:
full_name = None
now = datetime.datetime.utcnow()
expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30)
payload = {
'preferred_username': user.username,
'name': full_name,
'email': user.email,
'administrator': user.is_staff,
'iss': settings.OAUTH_OIDC_ISSUER,
'exp': now + datetime.timedelta(seconds=expires_in),
'iat': now,
'aud': client.client_id,
'sub': anonymous_id_for_user(user, None),
}
if secret_key is None:
secret_key = client.client_secret
return jwt.encode(payload, secret_key)
def get_asymmetric_token(user, client_id):
"""Construct a JWT signed with this app's private key.
The JWT includes the following claims:
preferred_username (str): The user's username. The claim name is borrowed from edx-oauth2-provider.
name (str): The user's full name.
email (str): The user's email address.
administrator (Boolean): Whether the user has staff permissions.
iss (str): Registered claim. Identifies the principal that issued the JWT.
exp (int): Registered claim. Identifies the expiration time on or after which
the JWT must NOT be accepted for processing.
iat (int): Registered claim. Identifies the time at which the JWT was issued.
sub (int): Registered claim. Identifies the user. This implementation uses the raw user id.
Arguments:
user (User): User for which to generate the JWT.
Returns:
str: the JWT
Keyword Arguments:
asymmetric (Boolean): Whether the JWT should be signed with this app's private key.
secret (string): Overrides configured JWT secret (signing) key. Unused if an asymmetric signature is requested.
""" """
private_key = load_pem_private_key(settings.PRIVATE_RSA_KEY, None, default_backend()) def __init__(self, user, asymmetric=False, secret=None):
self.user = user
try: self.asymmetric = asymmetric
# Service users may not have user profiles. self.secret = secret
full_name = UserProfile.objects.get(user=user).name self.jwt_auth = helpers.get_value('JWT_AUTH', settings.JWT_AUTH)
except UserProfile.DoesNotExist:
full_name = None def build_token(self, scopes, expires_in, aud=None):
"""Returns a JWT access token.
now = datetime.datetime.utcnow()
expires_in = getattr(settings, 'OAUTH_ID_TOKEN_EXPIRATION', 30) Arguments:
scopes (list): Scopes controlling which optional claims are included in the token.
payload = { expires_in (int): Time to token expiry, specified in seconds.
'preferred_username': user.username,
'name': full_name, Keyword Arguments:
'email': user.email, aud (string): Overrides configured JWT audience claim.
'administrator': user.is_staff, """
'iss': settings.OAUTH_OIDC_ISSUER, now = int(time())
'exp': now + datetime.timedelta(seconds=expires_in), payload = {
'iat': now, 'aud': aud if aud else self.jwt_auth['JWT_AUDIENCE'],
'aud': client_id, 'exp': now + expires_in,
'sub': anonymous_id_for_user(user, None), 'iat': now,
} 'iss': self.jwt_auth['JWT_ISSUER'],
'preferred_username': self.user.username,
return jwt.encode(payload, private_key, algorithm='RS512') 'scopes': scopes,
'sub': anonymous_id_for_user(self.user, None),
}
for scope in scopes:
handler = self.claim_handlers.get(scope)
if handler:
handler(payload)
return self.encode(payload)
@cached_property
def claim_handlers(self):
"""Returns a dictionary mapping scopes to methods that will add claims to the JWT payload."""
return {
'email': self.attach_email_claim,
'profile': self.attach_profile_claim
}
def attach_email_claim(self, payload):
"""Add the email claim details to the JWT payload."""
payload['email'] = self.user.email
def attach_profile_claim(self, payload):
"""Add the profile claim details to the JWT payload."""
try:
# Some users (e.g., service users) may not have user profiles.
name = UserProfile.objects.get(user=self.user).name
except UserProfile.DoesNotExist:
name = None
payload.update({
'name': name,
'administrator': self.user.is_staff,
})
def encode(self, payload):
"""Encode the provided payload."""
if self.asymmetric:
secret = load_pem_private_key(settings.PRIVATE_RSA_KEY, None, default_backend())
algorithm = 'RS512'
else:
secret = self.secret if self.secret else self.jwt_auth['JWT_SECRET_KEY']
algorithm = self.jwt_auth['JWT_ALGORITHM']
return jwt.encode(payload, secret, algorithm=algorithm)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment