Commit 4645c6ec by Braden MacDonald

Allow using a custom login/register form with third_party_auth

parent 1371b2e9
......@@ -57,6 +57,10 @@ rather than spreading them across two functions in the pipeline.
See http://psa.matiasaguirre.net/docs/pipeline.html for more docs.
"""
import base64
import hashlib
import hmac
import json
import random
import string
from collections import OrderedDict
......@@ -104,6 +108,18 @@ AUTH_ENTRY_ACCOUNT_SETTINGS = 'account_settings'
AUTH_ENTRY_LOGIN_API = 'login_api'
AUTH_ENTRY_REGISTER_API = 'register_api'
# AUTH_ENTRY_CUSTOM: Custom auth entry point for post-auth integrations.
# This should be a dict where the key is a word passed via ?auth_entry=, and the
# value is a dict with an arbitrary 'secret_key' and a 'url'.
# This can be used as an extension point to inject custom behavior into the auth
# process, replacing the registration/login form that would normally be seen
# immediately after the user has authenticated with the third party provider.
# If a custom 'auth_entry' query parameter is used, then once the user has
# authenticated with a specific backend/provider, they will be redirected to the
# URL specified with this setting, rather than to the built-in
# registration/login form/logic.
AUTH_ENTRY_CUSTOM = getattr(settings, 'THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS', {})
def is_api(auth_entry):
"""Returns whether the auth entry point is via an API call."""
......@@ -128,7 +144,7 @@ _AUTH_ENTRY_CHOICES = frozenset([
AUTH_ENTRY_ACCOUNT_SETTINGS,
AUTH_ENTRY_LOGIN_API,
AUTH_ENTRY_REGISTER_API,
])
] + AUTH_ENTRY_CUSTOM.keys())
_DEFAULT_RANDOM_PASSWORD_LENGTH = 12
_PASSWORD_CHARSET = string.letters + string.digits
......@@ -445,6 +461,33 @@ def set_pipeline_timeout(strategy, user, *args, **kwargs):
# choice of the user.
def redirect_to_custom_form(request, auth_entry, user_details):
"""
If auth_entry is found in AUTH_ENTRY_CUSTOM, this is used to send provider
data to an external server's registration/login page.
The data is sent as a base64-encoded values in a POST request and includes
a cryptographic checksum in case the integrity of the data is important.
"""
form_info = AUTH_ENTRY_CUSTOM[auth_entry]
secret_key = form_info['secret_key']
if isinstance(secret_key, unicode):
secret_key = secret_key.encode('utf-8')
custom_form_url = form_info['url']
data_str = json.dumps({
"user_details": user_details
})
digest = hmac.new(secret_key, msg=data_str, digestmod=hashlib.sha256).digest()
# Store the data in the session temporarily, then redirect to a page that will POST it to
# the custom login/register page.
request.session['tpa_custom_auth_entry_data'] = {
'data': base64.b64encode(data_str),
'hmac': base64.b64encode(digest),
'post_url': custom_form_url,
}
return redirect(reverse('tpa_post_to_custom_auth_form'))
@partial.partial
def ensure_user_information(strategy, auth_entry, backend=None, user=None, social=None,
allow_inactive_user=False, *args, **kwargs):
......@@ -492,6 +535,9 @@ def ensure_user_information(strategy, auth_entry, backend=None, user=None, socia
return dispatch_to_register()
elif auth_entry == AUTH_ENTRY_ACCOUNT_SETTINGS:
raise AuthEntryError(backend, 'auth_entry is wrong. Settings requires a user.')
elif auth_entry in AUTH_ENTRY_CUSTOM:
# Pass the username, email, etc. via query params to the custom entry page:
return redirect_to_custom_form(strategy.request, auth_entry, kwargs['details'])
else:
raise AuthEntryError(backend, 'auth_entry invalid')
......
......@@ -3,6 +3,7 @@ A custom Strategy for python-social-auth that allows us to fetch configuration f
ConfigurationModels rather than django.settings
"""
from .models import OAuth2ProviderConfig
from .pipeline import AUTH_ENTRY_CUSTOM
from social.backends.oauth import OAuthAuth
from social.strategies.django_strategy import DjangoStrategy
......@@ -31,6 +32,15 @@ class ConfigurationModelStrategy(DjangoStrategy):
return provider_config.get_setting(name)
except KeyError:
pass
# special case handling of login error URL if we're using a custom auth entry point:
if name == 'LOGIN_ERROR_URL':
auth_entry = self.request.session.get('auth_entry')
if auth_entry and auth_entry in AUTH_ENTRY_CUSTOM:
error_url = AUTH_ENTRY_CUSTOM[auth_entry].get('error_url')
if error_url:
return error_url
# At this point, we know 'name' is not set in a [OAuth2|LTI|SAML]ProviderConfig row.
# It's probably a global Django setting like 'FIELDS_STORED_IN_SESSION':
return super(ConfigurationModelStrategy, self).setting(name, default, backend)
{% load i18n %}
<!DOCTYPE html>
<html lang="en">
<head>
<title>{% trans "Please wait" %}</title>
<style type="text/css">
#djDebug {display:none;}
</style>
</head>
<body>
<form id="sso-data-form" action="{{post_url}}" method="post">
{% csrf_token %}
<input type="hidden" name="sso_data" value="{{data}}">
<input type="hidden" name="sso_data_hmac" value="{{hmac}}">
<noscript>
<input id="submit-button" type="submit" value="Click to continue" autofocus>
</noscript>
</form>
<script>
document.getElementById('sso-data-form').submit();
</script>
</body>
</html>
"""Integration tests for Google providers."""
import base64
import hashlib
import hmac
from django.conf import settings
from django.core.urlresolvers import reverse
import json
from mock import patch
from social.exceptions import AuthException
from student.tests.factories import UserFactory
from third_party_auth import pipeline
from third_party_auth.tests.specs import base
......@@ -34,3 +43,90 @@ class GoogleOauth2IntegrationTest(base.Oauth2IntegrationTest):
def get_username(self):
return self.get_response_data().get('email').split('@')[0]
def assert_redirect_to_provider_looks_correct(self, response):
super(GoogleOauth2IntegrationTest, self).assert_redirect_to_provider_looks_correct(response)
self.assertIn('google.com', response['Location'])
def test_custom_form(self):
"""
Use the Google provider to test the custom login/register form feature.
"""
# The pipeline starts by a user GETting /auth/login/google-oauth2/?auth_entry=custom1
# Synthesize that request and check that it redirects to the correct
# provider page.
auth_entry = 'custom1' # See definition in lms/envs/test.py
login_url = pipeline.get_login_url(self.provider.provider_id, auth_entry)
login_url += "&next=/misc/final-destination"
self.assert_redirect_to_provider_looks_correct(self.client.get(login_url))
def fake_auth_complete(inst, *args, **kwargs):
""" Mock the backend's auth_complete() method """
kwargs.update({'response': self.get_response_data(), 'backend': inst})
return inst.strategy.authenticate(*args, **kwargs)
# Next, the provider makes a request against /auth/complete/<provider>.
complete_url = pipeline.get_complete_url(self.provider.backend_name)
with patch.object(self.provider.backend_class, 'auth_complete', fake_auth_complete):
response = self.client.get(complete_url)
# This should redirect to the custom login/register form:
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], 'http://example.none/auth/custom_auth_entry')
response = self.client.get(response['Location'])
self.assertEqual(response.status_code, 200)
self.assertIn('action="/misc/my-custom-registration-form" method="post"', response.content)
data_decoded = base64.b64decode(response.context['data']) # pylint: disable=no-member
data_parsed = json.loads(data_decoded)
# The user's details get passed to the custom page as a base64 encoded query parameter:
self.assertEqual(data_parsed, {
'user_details': {
'username': 'email_value',
'email': 'email_value@example.com',
'fullname': 'name_value',
'first_name': 'given_name_value',
'last_name': 'family_name_value',
}
})
# Check the hash that is used to confirm the user's data in the GET parameter is correct
secret_key = settings.THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS['custom1']['secret_key']
hmac_expected = hmac.new(secret_key, msg=data_decoded, digestmod=hashlib.sha256).digest()
self.assertEqual(base64.b64decode(response.context['hmac']), hmac_expected) # pylint: disable=no-member
# Now our custom registration form creates or logs in the user:
email, password = data_parsed['user_details']['email'], 'random_password'
created_user = UserFactory(email=email, password=password)
login_response = self.client.post(reverse('login'), {'email': email, 'password': password})
self.assertEqual(login_response.status_code, 200)
# Now our custom login/registration page must resume the pipeline:
response = self.client.get(complete_url)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], 'http://example.none/misc/final-destination')
_, strategy = self.get_request_and_strategy()
self.assert_social_auth_exists_for_user(created_user, strategy)
def test_custom_form_error(self):
"""
Use the Google provider to test the custom login/register failure redirects.
"""
# The pipeline starts by a user GETting /auth/login/google-oauth2/?auth_entry=custom1
# Synthesize that request and check that it redirects to the correct
# provider page.
auth_entry = 'custom1' # See definition in lms/envs/test.py
login_url = pipeline.get_login_url(self.provider.provider_id, auth_entry)
login_url += "&next=/misc/final-destination"
self.assert_redirect_to_provider_looks_correct(self.client.get(login_url))
def fake_auth_complete_error(_inst, *_args, **_kwargs):
""" Mock the backend's auth_complete() method """
raise AuthException("Mock login failed")
# Next, the provider makes a request against /auth/complete/<provider>.
complete_url = pipeline.get_complete_url(self.provider.backend_name)
with patch.object(self.provider.backend_class, 'auth_complete', fake_auth_complete_error):
response = self.client.get(complete_url)
# This should redirect to the custom error URL
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], 'http://example.none/misc/my-custom-sso-error-page')
......@@ -2,11 +2,12 @@
from django.conf.urls import include, patterns, url
from .views import inactive_user_view, saml_metadata_view, lti_login_and_complete_view
from .views import inactive_user_view, saml_metadata_view, lti_login_and_complete_view, post_to_custom_auth_form
urlpatterns = patterns(
'',
url(r'^auth/inactive', inactive_user_view, name="third_party_inactive_redirect"),
url(r'^auth/custom_auth_entry', post_to_custom_auth_form, name='tpa_post_to_custom_auth_form'),
url(r'^auth/saml/metadata.xml', saml_metadata_view),
url(r'^auth/login/(?P<backend>lti)/$', lti_login_and_complete_view),
url(r'^auth/', include('social.apps.django_app.urls', namespace='social')),
......
......@@ -4,7 +4,7 @@ Extra views required for SSO
from django.conf import settings
from django.core.urlresolvers import reverse
from django.http import HttpResponse, HttpResponseServerError, Http404, HttpResponseNotAllowed
from django.shortcuts import redirect
from django.shortcuts import redirect, render
from django.views.decorators.csrf import csrf_exempt
import social
from social.apps.django_app.views import complete
......@@ -59,3 +59,26 @@ def lti_login_and_complete_view(request, backend, *args, **kwargs):
request.backend.start()
return complete(request, backend, *args, **kwargs)
def post_to_custom_auth_form(request):
"""
Redirect to a custom login/register page.
Since we can't do a redirect-to-POST, this view is used to pass SSO data from
the third_party_auth pipeline to a custom login/register form (possibly on another server).
"""
pipeline_data = request.session.pop('tpa_custom_auth_entry_data', None)
if not pipeline_data:
raise Http404
# Verify the format of pipeline_data:
data = {
'post_url': pipeline_data['post_url'],
# The user's name, email, etc. as base64 encoded JSON
# It's base64 encoded because it's signed cryptographically and we don't want whitespace
# or ordering issues affecting the hash/signature.
'data': pipeline_data['data'],
# The cryptographic hash of user_data:
'hmac': pipeline_data['hmac'],
}
return render(request, 'third_party_auth/post_custom_auth_entry.html', data)
......@@ -592,6 +592,11 @@ if FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
'schedule': datetime.timedelta(hours=ENV_TOKENS.get('THIRD_PARTY_AUTH_SAML_FETCH_PERIOD_HOURS', 24)),
}
# The following can be used to integrate a custom login form with third_party_auth.
# It should be a dict where the key is a word passed via ?auth_entry=, and the value is a
# dict with an arbitrary 'secret_key' and a 'url'.
THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS = AUTH_TOKENS.get('THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS', {})
##### OAUTH2 Provider ##############
if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER']
......
......@@ -273,6 +273,14 @@ AUTHENTICATION_BACKENDS = (
'third_party_auth.lti.LTIAuthBackend',
) + AUTHENTICATION_BACKENDS
THIRD_PARTY_AUTH_CUSTOM_AUTH_FORMS = {
'custom1': {
'secret_key': 'opensesame',
'url': '/misc/my-custom-registration-form',
'error_url': '/misc/my-custom-sso-error-page'
},
}
################################## OPENID #####################################
FEATURES['AUTH_USE_OPENID'] = True
FEATURES['AUTH_USE_OPENID_PROVIDER'] = True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment