Commit ce0b6407 by John Cox

Add third-party auth implementation and tests

parent 83853098
......@@ -141,3 +141,4 @@ William Desloge <william.desloge@ionis-group.com>
Marco Re <mrc.re@tiscali.it>
Jonas Jelten <jelten@in.tum.de>
Christine Lytwynec <clytwynec@edx.org>
John Cox <johncox@google.com>
......@@ -16,6 +16,7 @@ from django.contrib.auth import logout, authenticate, login
from django.contrib.auth.models import User, AnonymousUser
from django.contrib.auth.decorators import login_required
from django.contrib.auth.views import password_reset_confirm
from django.contrib import messages
from django.core.cache import cache
from django.core.context_processors import csrf
from django.core.mail import send_mail
......@@ -85,6 +86,8 @@ from util.password_policy_validators import (
validate_password_dictionary
)
from third_party_auth import pipeline, provider
log = logging.getLogger("edx.student")
AUDIT_LOG = logging.getLogger("audit")
......@@ -363,11 +366,16 @@ def signin_user(request):
context = {
'course_id': request.GET.get('course_id'),
'enrollment_action': request.GET.get('enrollment_action'),
# Bool injected into JS to submit form if we're inside a running third-
# party auth pipeline; distinct from the actual instance of the running
# pipeline, if any.
'pipeline_running': 'true' if pipeline.running(request) else 'false',
'platform_name': microsite.get_value(
'platform_name',
settings.PLATFORM_NAME
),
}
return render_to_response('login.html', context)
......@@ -385,17 +393,34 @@ def register_user(request, extra_context=None):
context = {
'course_id': request.GET.get('course_id'),
'email': '',
'enrollment_action': request.GET.get('enrollment_action'),
'name': '',
'running_pipeline': None,
'platform_name': microsite.get_value(
'platform_name',
settings.PLATFORM_NAME
),
'selected_provider': '',
'username': '',
}
if extra_context is not None:
context.update(extra_context)
if context.get("extauth_domain", '').startswith(external_auth.views.SHIBBOLETH_DOMAIN_PREFIX):
return render_to_response('register-shib.html', context)
# If third-party auth is enabled, prepopulate the form with data from the
# selected provider.
if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH') and pipeline.running(request):
running_pipeline = pipeline.get(request)
current_provider = provider.Registry.get_by_backend_name(running_pipeline.get('backend'))
overrides = current_provider.get_register_form_data(running_pipeline.get('kwargs'))
overrides['running_pipeline'] = running_pipeline
overrides['selected_provider'] = current_provider.NAME
context.update(overrides)
return render_to_response('register.html', context)
......@@ -532,8 +557,17 @@ def dashboard(request):
'language_options': language_options,
'current_language': current_language,
'current_language_code': cur_lang_code,
'user': user,
'duplicate_provider': None,
'logout_url': reverse(logout_user),
'platform_name': settings.PLATFORM_NAME,
'provider_states': [],
}
if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
context['duplicate_provider'] = pipeline.get_duplicate_provider(messages.get_messages(request))
context['provider_user_states'] = pipeline.get_provider_user_states(user)
return render_to_response('dashboard.html', context)
......@@ -690,6 +724,7 @@ def accounts_login(request):
return external_auth.views.course_specific_login(request, course_id)
context = {
'pipeline_running': 'false',
'platform_name': settings.PLATFORM_NAME,
}
return render_to_response('login.html', context)
......@@ -697,24 +732,62 @@ def accounts_login(request):
# Need different levels of logging
@ensure_csrf_cookie
def login_user(request, error=""):
def login_user(request, error=""): # pylint: disable-msg=too-many-statements,unused-argument
"""AJAX request to log in the user."""
if 'email' not in request.POST or 'password' not in request.POST:
return JsonResponse({
"success": False,
"value": _('There was an error receiving your login information. Please email us.'), # TODO: User error message
}) # TODO: this should be status code 400 # pylint: disable=fixme
email = request.POST['email']
password = request.POST['password']
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning(u"Login failed - Unknown user email")
else:
AUDIT_LOG.warning(u"Login failed - Unknown user email: {0}".format(email))
user = None
backend_name = None
email = None
password = None
redirect_url = None
response = None
running_pipeline = None
third_party_auth_requested = settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH') and pipeline.running(request)
third_party_auth_successful = False
trumped_by_first_party_auth = bool(request.POST.get('email')) or bool(request.POST.get('password'))
user = None
if third_party_auth_requested and not trumped_by_first_party_auth:
# The user has already authenticated via third-party auth and has not
# asked to do first party auth by supplying a username or password. We
# now want to put them through the same logging and cookie calculation
# logic as with first-party auth.
running_pipeline = pipeline.get(request)
username = running_pipeline['kwargs'].get('username')
backend_name = running_pipeline['backend']
requested_provider = provider.Registry.get_by_backend_name(backend_name)
try:
user = pipeline.get_authenticated_user(username, backend_name)
third_party_auth_successful = True
except User.DoesNotExist:
AUDIT_LOG.warning(
u'Login failed - user with username {username} has no social auth with backend_name {backend_name}'.format(
username=username, backend_name=backend_name))
return JsonResponse({
"success": False,
# Translators: provider_name is the name of an external, third-party user authentication service (like
# Google or LinkedIn).
"value": _('There is no {platform_name} account associated with your {provider_name} account. Please use your {platform_name} credentials or pick another provider.').format(
platform_name=settings.PLATFORM_NAME, provider_name=requested_provider.NAME)
}) # TODO: this should be a status code 401 # pylint: disable=fixme
else:
if 'email' not in request.POST or 'password' not in request.POST:
return JsonResponse({
"success": False,
"value": _('There was an error receiving your login information. Please email us.'), # TODO: User error message
}) # TODO: this should be status code 400 # pylint: disable=fixme
email = request.POST['email']
password = request.POST['password']
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning(u"Login failed - Unknown user email")
else:
AUDIT_LOG.warning(u"Login failed - Unknown user email: {0}".format(email))
# check if the user has a linked shibboleth account, if so, redirect the user to shib-login
# This behavior is pretty much like what gmail does for shibboleth. Try entering some @stanford.edu
......@@ -753,14 +826,17 @@ def login_user(request, error=""):
# username so that authentication is guaranteed to fail and we can take
# advantage of the ratelimited backend
username = user.username if user else ""
try:
user = authenticate(username=username, password=password, request=request)
# this occurs when there are too many attempts from the same IP address
except RateLimitException:
return JsonResponse({
"success": False,
"value": _('Too many failed login attempts. Try again later.'),
}) # TODO: this should be status code 429 # pylint: disable=fixme
if not third_party_auth_successful:
try:
user = authenticate(username=username, password=password, request=request)
# this occurs when there are too many attempts from the same IP address
except RateLimitException:
return JsonResponse({
"success": False,
"value": _('Too many failed login attempts. Try again later.'),
}) # TODO: this should be status code 429 # pylint: disable=fixme
if user is None:
# tick the failed login counters if the user exists in the database
if user_found_by_email_lookup and LoginFailures.is_feature_enabled():
......@@ -801,7 +877,9 @@ def login_user(request, error=""):
redirect_url = try_change_enrollment(request)
dog_stats_api.increment("common.student.successful_login")
if third_party_auth_successful:
redirect_url = pipeline.get_complete_url(backend_name)
response = JsonResponse({
"success": True,
"redirect_url": redirect_url,
......@@ -1027,16 +1105,20 @@ def _do_create_account(post_vars):
@ensure_csrf_cookie
def create_account(request, post_override=None):
def create_account(request, post_override=None): # pylint: disable-msg=too-many-statements
"""
JSON call to create new edX account.
Used by form in signup_modal.html, which is included into navigation.html
"""
js = {'success': False}
js = {'success': False} # pylint: disable-msg=invalid-name
post_vars = post_override if post_override else request.POST
extra_fields = getattr(settings, 'REGISTRATION_EXTRA_FIELDS', {})
if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH') and pipeline.running(request):
post_vars = dict(post_vars.items())
post_vars.update({'password': pipeline.make_random_password()})
# if doing signup for an external authorization, then get email, password, name from the eamap
# don't use the ones from the form, since the user could have hacked those
# unless originally we didn't get a valid email or name from the external auth
......@@ -1234,9 +1316,17 @@ def create_account(request, post_override=None):
login_user.save()
AUDIT_LOG.info(u"Login activated on extauth account - {0} ({1})".format(login_user.username, login_user.email))
dog_stats_api.increment("common.student.account_created")
redirect_url = try_change_enrollment(request)
# Resume the third-party-auth pipeline if necessary.
if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH') and pipeline.running(request):
running_pipeline = pipeline.get(request)
redirect_url = pipeline.get_complete_url(running_pipeline['backend'])
response = JsonResponse({
'success': True,
'redirect_url': try_change_enrollment(request),
'redirect_url': redirect_url,
})
# set the login cookie for the edx marketing site
......
"""Middleware classes for third_party_auth."""
from social.apps.django_app.middleware import SocialAuthExceptionMiddleware
from . import pipeline
class ExceptionMiddleware(SocialAuthExceptionMiddleware):
"""Custom middleware that handles conditional redirection."""
def get_redirect_uri(self, request, exception):
# Safe because it's already been validated by
# pipeline.parse_query_params. If that pipeline step ever moves later
# in the pipeline stack, we'd need to validate this value because it
# would be an injection point for attacker data.
auth_entry = request.session.get(pipeline.AUTH_ENTRY_KEY)
# Fall back to django settings's SOCIAL_AUTH_LOGIN_ERROR_URL.
return '/' + auth_entry if auth_entry else super(ExceptionMiddleware, self).get_redirect_uri(request, exception)
"""Auth pipeline definitions."""
"""Auth pipeline definitions.
Auth pipelines handle the process of authenticating a user. They involve a
consumer system and a provider service. The general pattern is:
1. The consumer system exposes a URL endpoint that starts the process.
2. When a user visits that URL, the client system redirects the user to a
page served by the provider. The user authenticates with the provider.
The provider handles authentication failure however it wants.
3. On success, the provider POSTs to a URL endpoint on the consumer to
invoke the pipeline. It sends back an arbitrary payload of data about
the user.
4. The pipeline begins, executing each function in its stack. The stack is
defined on django's settings object's SOCIAL_AUTH_PIPELINE. This is done
in settings._set_global_settings.
5. Each pipeline function is variadic. Most pipeline functions are part of
the pythons-social-auth library; our extensions are defined below. The
pipeline is the same no matter what provider is used.
6. Pipeline functions can return a dict to add arguments to the function
invoked next. They can return None if this is not necessary.
7. Pipeline functions may be decorated with @partial.partial. This pauses
the pipeline and serializes its state onto the request's session. When
this is done they may redirect to other edX handlers to execute edX
account registration/sign in code.
8. In that code, redirecting to get_complete_url() resumes the pipeline.
This happens by hitting a handler exposed by the consumer system.
9. In this way, execution moves between the provider, the pipeline, and
arbitrary consumer system code.
Gotcha alert!:
Bear in mind that when pausing and resuming a pipeline function decorated with
@partial.partial, execution resumes by re-invoking the decorated function
instead of invoking the next function in the pipeline stack. For example, if
you have a pipeline of
A
B
C
with an implementation of
@partial.partial
def B(*args, **kwargs):
[...]
B will be invoked twice: once when initially proceeding through the pipeline
before it is paused, and once when other code finishes and the pipeline
resumes. Consequently, many decorated functions will first invoke a predicate
to determine if they are in their first or second execution (usually by
checking side-effects from the first run).
This is surprising but important behavior, since it allows a single function in
the pipeline to consolidate all the operations needed to establish invariants
rather than spreading them across two functions in the pipeline.
See http://psa.matiasaguirre.net/docs/pipeline.html for more docs.
"""
import random
import string # pylint: disable-msg=deprecated-module
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.shortcuts import redirect
from social.apps.django_app.default import models
from social.exceptions import AuthException
from social.pipeline import partial
from . import provider
AUTH_ENTRY_KEY = 'auth_entry'
AUTH_ENTRY_DASHBOARD = 'dashboard'
AUTH_ENTRY_LOGIN = 'login'
AUTH_ENTRY_REGISTER = 'register'
_AUTH_ENTRY_CHOICES = frozenset([
AUTH_ENTRY_DASHBOARD,
AUTH_ENTRY_LOGIN,
AUTH_ENTRY_REGISTER
])
_DEFAULT_RANDOM_PASSWORD_LENGTH = 12
_PASSWORD_CHARSET = string.letters + string.digits
class AuthEntryError(AuthException):
"""Raised when auth_entry is missing or invalid on URLs.
auth_entry tells us whether the auth flow was initiated to register a new
user (in which case it has the value of AUTH_ENTRY_REGISTER) or log in an
existing user (in which case it has the value of AUTH_ENTRY_LOGIN).
This is necessary because the edX code we hook into the pipeline to
redirect to the existing auth flows needs to know what case we are in in
order to format its output correctly (for example, the register code is
invoked earlier than the login code, and it needs to know if the login flow
was requested to dispatch correctly).
"""
class ProviderUserState(object):
"""Object representing the provider state (attached or not) for a user.
This is intended only for use when rendering templates. See for example
lms/templates/dashboard.html.
"""
def __init__(self, enabled_provider, user, state):
# Boolean. Whether the user has an account associated with the provider
self.has_account = state
# provider.BaseProvider child. Callers must verify that the provider is
# enabled.
self.provider = enabled_provider
# django.contrib.auth.models.User.
self.user = user
def get_unlink_form_name(self):
"""Gets the name used in HTML forms that unlink a provider account."""
return self.provider.NAME + '_unlink_form'
def get(request):
"""Gets the running pipeline from the passed request."""
return request.session.get('partial_pipeline')
def get_authenticated_user(username, backend_name):
"""Gets a saved user authenticated by a particular backend.
Between pipeline steps User objects are not saved. We need to reconstitute
the user and set its .backend, which is ordinarily monkey-patched on by
Django during authenticate(), so it will function like a user returned by
authenticate().
Args:
username: string. Username of user to get.
backend_name: string. The name of the third-party auth backend from
the running pipeline.
Returns:
User if user is found and has a social auth from the passed
backend_name.
Raises:
User.DoesNotExist: if no user matching user is found, or the matching
user has no social auth associated with the given backend.
AssertionError: if the user is not authenticated.
"""
user = models.DjangoStorage.user.user_model().objects.get(username=username)
match = models.DjangoStorage.user.get_social_auth_for_user(user, provider=backend_name)
if not match:
raise User.DoesNotExist
user.backend = provider.Registry.get_by_backend_name(backend_name).get_authentication_backend()
return user
def _get_enabled_provider_by_name(provider_name):
"""Gets an enabled provider by its NAME member or throws."""
enabled_provider = provider.Registry.get(provider_name)
if not enabled_provider:
raise ValueError('Provider %s not enabled' % provider_name)
return enabled_provider
def _get_url(view_name, backend_name, auth_entry=None):
"""Creates a URL to hook into social auth endpoints."""
kwargs = {'backend': backend_name}
url = reverse(view_name, kwargs=kwargs)
if auth_entry:
url += '?%s=%s' % (AUTH_ENTRY_KEY, auth_entry)
return url
def get_complete_url(backend_name):
"""Gets URL for the endpoint that returns control to the auth pipeline.
Args:
backend_name: string. Name of the python-social-auth backend from the
currently-running pipeline.
Returns:
String. URL that finishes the auth pipeline for a provider.
Raises:
ValueError: if no provider is enabled with the given backend_name.
"""
enabled_provider = provider.Registry.get_by_backend_name(backend_name)
if not enabled_provider:
raise ValueError('Provider with backend %s not enabled' % backend_name)
return _get_url('social:complete', backend_name)
def get_disconnect_url(provider_name):
"""Gets URL for the endpoint that starts the disconnect pipeline.
Args:
provider_name: string. Name of the provider.BaseProvider child you want
to disconnect from.
Returns:
String. URL that starts the disconnection pipeline.
Raises:
ValueError: if no provider is enabled with the given backend_name.
"""
enabled_provider = _get_enabled_provider_by_name(provider_name)
return _get_url('social:disconnect', enabled_provider.BACKEND_CLASS.name)
def get_login_url(provider_name, auth_entry):
"""Gets the login URL for the endpoint that kicks off auth with a provider.
Args:
provider_name: string. The name of the provider.Provider that has been
enabled.
auth_entry: string. Query argument specifying the desired entry point
for the auth pipeline. Used by the pipeline for later branching.
Must be one of _AUTH_ENTRY_CHOICES.
Returns:
String. URL that starts the auth pipeline for a provider.
Raises:
ValueError: if no provider is enabled with the given provider_name.
"""
assert auth_entry in _AUTH_ENTRY_CHOICES
enabled_provider = _get_enabled_provider_by_name(provider_name)
return _get_url('social:begin', enabled_provider.BACKEND_CLASS.name, auth_entry=auth_entry)
def get_duplicate_provider(messages):
"""Gets provider from message about social account already in use.
python-social-auth's exception middleware uses the messages module to
record details about duplicate account associations. It records exactly one
message there is a request to associate a social account S with an edX
account E if S is already associated with an edX account E'.
Returns:
provider.BaseProvider child instance. The provider of the duplicate
account, or None if there is no duplicate (and hence no error).
"""
social_auth_messages = [m for m in messages if m.extra_tags.startswith('social-auth')]
if not social_auth_messages:
return
assert len(social_auth_messages) == 1
return provider.Registry.get_by_backend_name(social_auth_messages[0].extra_tags.split()[1])
def get_provider_user_states(user):
"""Gets list of states of provider-user combinations.
Args:
django.contrib.auth.User. The user to get states for.
Returns:
List of ProviderUserState. The list of states of a user's account with
each enabled provider.
"""
states = []
found_user_backends = [
social_auth.provider for social_auth in models.DjangoStorage.user.get_social_auth_for_user(user)
]
for enabled_provider in provider.Registry.enabled():
states.append(
ProviderUserState(enabled_provider, user, enabled_provider.BACKEND_CLASS.name in found_user_backends)
)
return states
def make_random_password(length=None, choice_fn=random.SystemRandom().choice):
"""Makes a random password.
When a user creates an account via a social provider, we need to create a
placeholder password for them to satisfy the ORM's consistency and
validation requirements. Users don't know (and hence cannot sign in with)
this password; that's OK because they can always use the reset password
flow to set it to a known value.
Args:
choice_fn: function or method. Takes an iterable and returns a random
element.
length: int. Number of chars in the returned value. None to use default.
Returns:
String. The resulting password.
"""
length = length if length is not None else _DEFAULT_RANDOM_PASSWORD_LENGTH
return ''.join(choice_fn(_PASSWORD_CHARSET) for _ in xrange(length))
def running(request):
"""Returns True iff request is running a third-party auth pipeline."""
return request.session.get('partial_pipeline') is not None # Avoid False for {}.
# Pipeline functions.
# Signatures are set by python-social-auth; prepending 'unused_' causes
# TypeError on dispatch to the auth backend's authenticate().
# pylint: disable-msg=unused-argument
def parse_query_params(strategy, response, *args, **kwargs):
"""Reads whitelisted query params, transforms them into pipeline args."""
auth_entry = strategy.session.get(AUTH_ENTRY_KEY)
if not (auth_entry and auth_entry in _AUTH_ENTRY_CHOICES):
raise AuthEntryError(strategy.backend, 'auth_entry missing or invalid')
return {
# Whether the auth pipeline entered from /dashboard.
'is_dashboard': auth_entry == AUTH_ENTRY_DASHBOARD,
# Whether the auth pipeline entered from /login.
'is_login': auth_entry == AUTH_ENTRY_LOGIN,
# Whether the auth pipeline entered from /register.
'is_register': auth_entry == AUTH_ENTRY_REGISTER,
}
@partial.partial
def step(*args, **kwargs):
"""Fake pipeline step; just throws loudly for now."""
raise NotImplementedError('%s, %s' % (args, kwargs))
def redirect_to_supplementary_form(strategy, details, response, uid, is_dashboard=None, is_login=None, is_register=None, user=None, *args, **kwargs):
"""Dispatches user to views outside the pipeline if necessary."""
# We're deliberately verbose here to make it clear what the intended
# dispatch behavior is for the three pipeline entry points, given the
# current state of the pipeline. Keep in mind the pipeline is re-entrant
# and values will change on repeated invocations (for example, the first
# time through the login flow the user will be None so we dispatch to the
# login form; the second time it will have a value so we continue to the
# next pipeline step directly).
#
# It is important that we always execute the entire pipeline. Even if
# behavior appears correct without executing a step, it means important
# invariants have been violated and future misbehavior is likely.
if is_dashboard:
return
if is_login and user is None:
return redirect('/login', name='signin_user')
if is_register and user is None:
return redirect('/register', name='register_user')
......@@ -4,6 +4,10 @@ Loaded by Django's settings mechanism. Consequently, this module must not
invoke the Django armature.
"""
from social.backends import google, linkedin
_DEFAULT_ICON_CLASS = 'icon-signin'
class BaseProvider(object):
"""Abstract base class for third-party auth providers.
......@@ -12,13 +16,14 @@ class BaseProvider(object):
in the provider Registry.
"""
# String. Dot-delimited module.Class. The name of the backend
# implementation to load.
AUTHENTICATION_BACKEND = None
# Class. The provider's backing social.backends.base.BaseAuth child.
BACKEND_CLASS = None
# String. Name of the FontAwesome glyph to use for sign in buttons (or the
# name of a user-supplied custom glyph that is present at runtime).
ICON_CLASS = _DEFAULT_ICON_CLASS
# String. User-facing name of the provider. Must be unique across all
# enabled providers.
# enabled providers. Will be presented in the UI.
NAME = None
# Dict of string -> object. Settings that will be merged into Django's
# settings instance. In most cases the value will be None, since real
# values are merged from .json files (foo.auth.json; foo.env.json) onto the
......@@ -26,8 +31,81 @@ class BaseProvider(object):
SETTINGS = {}
@classmethod
def get_authentication_backend(cls):
"""Gets associated Django settings.AUTHENTICATION_BACKEND string."""
return '%s.%s' % (cls.BACKEND_CLASS.__module__, cls.BACKEND_CLASS.__name__)
@classmethod
def get_email(cls, unused_provider_details):
"""Gets user's email address.
Provider responses can contain arbitrary data. This method can be
overridden to extract an email address from the provider details
extracted by the social_details pipeline step.
Args:
unused_provider_details: dict of string -> string. Data about the
user passed back by the provider.
Returns:
String or None. The user's email address, if any.
"""
return None
@classmethod
def get_name(cls, unused_provider_details):
"""Gets user's name.
Provider responses can contain arbitrary data. This method can be
overridden to extract a full name for a user from the provider details
extracted by the social_details pipeline step.
Args:
unused_provider_details: dict of string -> string. Data about the
user passed back by the provider.
Returns:
String or None. The user's full name, if any.
"""
return None
@classmethod
def get_register_form_data(cls, pipeline_kwargs):
"""Gets dict of data to display on the register form.
common.djangoapps.student.views.register_user uses this to populate the
new account creation form with values supplied by the user's chosen
provider, preventing duplicate data entry.
Args:
pipeline_kwargs: dict of string -> object. Keyword arguments
accumulated by the pipeline thus far.
Returns:
Dict of string -> string. Keys are names of form fields; values are
values for that field. Where there is no value, the empty string
must be used.
"""
# Details about the user sent back from the provider.
details = pipeline_kwargs.get('details')
# Get the username separately to take advantage of the de-duping logic
# built into the pipeline. The provider cannot de-dupe because it can't
# check the state of taken usernames in our system. Note that there is
# technically a data race between the creation of this value and the
# creation of the user object, so it is still possible for users to get
# an error on submit.
suggested_username = pipeline_kwargs.get('username')
return {
'email': cls.get_email(details) or '',
'name': cls.get_name(details) or '',
'username': suggested_username,
}
@classmethod
def merge_onto(cls, settings):
"""Merge class-level settings onto a django `settings` module."""
"""Merge class-level settings onto a django settings module."""
for key, value in cls.SETTINGS.iteritems():
setattr(settings, key, value)
......@@ -35,30 +113,41 @@ class BaseProvider(object):
class GoogleOauth2(BaseProvider):
"""Provider for Google's Oauth2 auth system."""
AUTHENTICATION_BACKEND = 'social.backends.google.GoogleOAuth2'
BACKEND_CLASS = google.GoogleOAuth2
ICON_CLASS = 'icon-google-plus'
NAME = 'Google'
SETTINGS = {
'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY': None,
'SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET': None,
}
@classmethod
def get_email(cls, provider_details):
return provider_details.get('email')
@classmethod
def get_name(cls, provider_details):
return provider_details.get('fullname')
class LinkedInOauth2(BaseProvider):
"""Provider for LinkedIn's Oauth2 auth system."""
AUTHENTICATION_BACKEND = 'social.backends.linkedin.LinkedinOAuth2'
BACKEND_CLASS = linkedin.LinkedinOAuth2
ICON_CLASS = 'icon-linkedin'
NAME = 'LinkedIn'
SETTINGS = {
'SOCIAL_AUTH_LINKEDIN_OAUTH2_KEY': None,
'SOCIAL_AUTH_LINKEDIN_OAUTH2_SECRET': None,
}
@classmethod
def get_email(cls, provider_details):
return provider_details.get('email')
class MozillaPersona(BaseProvider):
"""Provider for Mozilla's Persona auth system."""
AUTHENTICATION_BACKEND = 'social.backends.persona.PersonaAuth'
NAME = 'Mozilla Persona'
@classmethod
def get_name(cls, provider_details):
return provider_details.get('fullname')
class Registry(object):
......@@ -84,7 +173,7 @@ class Registry(object):
@classmethod
def _enable(cls, provider):
"""Enables a single `provider`."""
"""Enables a single provider."""
if provider.NAME in cls._ENABLED:
raise ValueError('Provider %s already enabled' % provider.NAME)
cls._ENABLED[provider.NAME] = provider
......@@ -93,10 +182,17 @@ class Registry(object):
def configure_once(cls, provider_names):
"""Configures providers.
Takes `provider_names`, a list of string.
Args:
provider_names: list of string. The providers to configure.
Raises:
ValueError: if the registry has already been configured, or if any
of the passed provider_names does not have a corresponding
BaseProvider child implementation.
"""
if cls._CONFIGURED:
raise ValueError('Provider registry already configured')
# Flip the bit eagerly -- configure() should not be re-callable if one
# _enable call fails.
cls._CONFIGURED = True
......@@ -114,11 +210,28 @@ class Registry(object):
@classmethod
def get(cls, provider_name):
"""Gets provider named `provider_name` string if enabled, else None."""
"""Gets provider named provider_name string if enabled, else None."""
cls._check_configured()
return cls._ENABLED.get(provider_name)
@classmethod
def get_by_backend_name(cls, backend_name):
"""Gets provider (or None) by backend name.
Args:
backend_name: string. The python-social-auth
backends.base.BaseAuth.name (for example, 'google-oauth2') to
try and get a provider for.
Raises:
RuntimeError: if the registry has not been configured.
"""
cls._check_configured()
for enabled in cls._ENABLED.values():
if enabled.BACKEND_CLASS.name == backend_name:
return enabled
@classmethod
def _reset(cls):
"""Returns the registry to an unconfigured state; for tests only."""
cls._CONFIGURED = False
......
......@@ -46,8 +46,15 @@ If true, it:
from . import provider
_FIELDS_STORED_IN_SESSION = ['auth_entry']
_MIDDLEWARE_CLASSES = (
'third_party_auth.middleware.ExceptionMiddleware',
)
_SOCIAL_AUTH_LOGIN_REDIRECT_URL = '/dashboard'
def _merge_auth_info(django_settings, auth_info):
"""Merge `auth_info` dict onto `django_settings` module."""
"""Merge auth_info dict onto django_settings module."""
enabled_provider_names = []
to_merge = []
......@@ -66,39 +73,77 @@ def _merge_auth_info(django_settings, auth_info):
def _set_global_settings(django_settings):
"""Set provider-independent settings."""
# Whitelisted URL query parameters retrained in the pipeline session.
# Params not in this whitelist will be silently dropped.
django_settings.FIELDS_STORED_IN_SESSION = _FIELDS_STORED_IN_SESSION
# Register and configure python-social-auth with Django.
django_settings.INSTALLED_APPS += (
'social.apps.django_app.default',
'third_party_auth',
)
django_settings.TEMPLATE_CONTEXT_PROCESSORS += (
'social.apps.django_app.context_processors.backends',
'social.apps.django_app.context_processors.login_redirect',
)
# Inject exception middleware to make redirects fire.
django_settings.MIDDLEWARE_CLASSES += _MIDDLEWARE_CLASSES
# Where to send the user if there's an error during social authentication
# and we cannot send them to a more specific URL
# (see middleware.ExceptionMiddleware).
django_settings.SOCIAL_AUTH_LOGIN_ERROR_URL = '/'
# Where to send the user once social authentication is successful.
django_settings.SOCIAL_AUTH_LOGIN_REDIRECT_URL = _SOCIAL_AUTH_LOGIN_REDIRECT_URL
# Inject our customized auth pipeline. All auth backends must work with
# this pipeline.
django_settings.SOCIAL_AUTH_PIPELINE = (
'third_party_auth.pipeline.step',
'third_party_auth.pipeline.parse_query_params',
'social.pipeline.social_auth.social_details',
'social.pipeline.social_auth.social_uid',
'social.pipeline.social_auth.auth_allowed',
'social.pipeline.social_auth.social_user',
'social.pipeline.user.get_username',
'third_party_auth.pipeline.redirect_to_supplementary_form',
'social.pipeline.user.create_user',
'social.pipeline.social_auth.associate_user',
'social.pipeline.social_auth.load_extra_data',
'social.pipeline.user.user_details',
)
# We let the user specify their email address during signup.
django_settings.SOCIAL_AUTH_PROTECTED_USER_FIELDS = ['email']
# Disable exceptions by default for prod so you get redirect behavior
# instead of a Django error page. During development you may want to
# enable this when you want to get stack traces rather than redirections.
django_settings.SOCIAL_AUTH_RAISE_EXCEPTIONS = False
# Context processors required under Django.
django_settings.SOCIAL_AUTH_UUID_LENGTH = 4
django_settings.TEMPLATE_CONTEXT_PROCESSORS += (
'social.apps.django_app.context_processors.backends',
'social.apps.django_app.context_processors.login_redirect',
)
def _set_provider_settings(django_settings, enabled_providers, auth_info):
"""Set provider-specific settings."""
"""Sets provider-specific settings."""
# Must prepend here so we get called first.
django_settings.AUTHENTICATION_BACKENDS = (
tuple(enabled_provider.AUTHENTICATION_BACKEND for enabled_provider in enabled_providers) +
tuple(enabled_provider.get_authentication_backend() for enabled_provider in enabled_providers) +
django_settings.AUTHENTICATION_BACKENDS)
# Merge settings from provider classes, and configure all placeholders.
for enabled_provider in enabled_providers:
enabled_provider.merge_onto(django_settings)
# Merge settings from <deployment>.auth.json.
# Merge settings from <deployment>.auth.json, overwriting placeholders.
_merge_auth_info(django_settings, auth_info)
def apply_settings(auth_info, django_settings):
"""Apply settings from `auth_info` dict to `django_settings` module."""
"""Applies settings from auth_info dict to django_settings module."""
provider_names = auth_info.keys()
provider.Registry.configure_once(provider_names)
enabled_providers = provider.Registry.enabled()
......
"""Base integration test for provider implementations."""
import re
import unittest
import json
import mock
from django import test
from django.contrib import auth
from django.contrib.auth import models as auth_models
from django.contrib.messages.storage import fallback
from django.contrib.sessions.backends import cache
from django.test import utils as django_utils
from django.conf import settings as django_settings
from social import actions, exceptions
from social.apps.django_app import utils as social_utils
from social.apps.django_app import views as social_views
from student import models as student_models
from student import views as student_views
from third_party_auth import middleware, pipeline
from third_party_auth import settings as auth_settings
from third_party_auth.tests import testutil
@unittest.skipUnless(
testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
@django_utils.override_settings() # For settings reversion on a method-by-method basis.
class IntegrationTest(testutil.TestCase, test.TestCase):
"""Abstract base class for provider integration tests."""
# Configuration. You will need to override these values in your test cases.
# Class. The third_party_auth.provider.BaseProvider child we are testing.
PROVIDER_CLASS = None
# Dict of string -> object. Settings that will be merged onto Django's
# settings object before test execution. In most cases, this is
# PROVIDER_CLASS.SETTINGS with test values.
PROVIDER_SETTINGS = {}
# Methods you must override in your children.
def get_response_data(self):
"""Gets a dict of response data of the form given by the provider.
To determine what the provider returns, drop into a debugger in your
provider's do_auth implementation. Providers may merge different kinds
of data (for example, data about the user and data about the user's
credentials).
"""
raise NotImplementedError
def get_username(self):
"""Gets username based on response data from a provider.
Each provider has different logic for username generation. Sadly,
this is not extracted into its own method in python-social-auth, so we
must provide a getter ourselves.
Note that this is the *initial* value the framework will attempt to use.
If it collides, the pipeline will generate a new username. We extract
it here so we can force collisions in a polymorphic way.
"""
raise NotImplementedError
# Asserts you can optionally override and make more specific.
def assert_redirect_to_provider_looks_correct(self, response):
"""Asserts the redirect to the provider's site looks correct.
When we hit /auth/login/<provider>, we should be redirected to the
provider's site. Here we check that we're redirected, but we don't know
enough about the provider to check what we're redirected to. Child test
implementations may optionally strengthen this assertion with, for
example, more details about the format of the Location header.
"""
self.assertEqual(302, response.status_code)
self.assertTrue(response.has_header('Location'))
def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_kwargs):
"""Performs spot checks of the rendered register.html page.
When we display the new account registration form after the user signs
in with a third party, we prepopulate the form with values sent back
from the provider. The exact set of values varies on a provider-by-
provider basis and is generated by
provider.BaseProvider.get_register_form_data. We provide some stock
assertions based on the provider's implementation; if you want more
assertions in your test, override this method.
"""
self.assertEqual(200, response.status_code)
# Check that the correct provider was selected.
self.assertIn('successfully signed in with <strong>%s</strong>' % self.PROVIDER_CLASS.NAME, response.content)
# Expect that each truthy value we've prepopulated the register form
# with is actually present.
for prepopulated_form_value in self.PROVIDER_CLASS.get_register_form_data(pipeline_kwargs).values():
if prepopulated_form_value:
self.assertIn(prepopulated_form_value, response.content)
# Implementation details and actual tests past this point -- no more
# configuration needed.
def setUp(self):
super(IntegrationTest, self).setUp()
self.configure_runtime()
self.backend_name = self.PROVIDER_CLASS.BACKEND_CLASS.name
self.client = test.Client()
self.request_factory = test.RequestFactory()
def assert_dashboard_response_looks_correct(self, response, user, duplicate=False, linked=None):
"""Asserts the user's dashboard is in the expected state.
We check unconditionally that the dashboard 200s and contains the
user's info. If duplicate is True, we expect the duplicate account
association error to be present. If linked is passed, we conditionally
check the content and controls in the Account Links section of the
sidebar.
"""
duplicate_account_error_needle = '<section class="dashboard-banner third-party-auth">'
assert_duplicate_presence_fn = self.assertIn if duplicate else self.assertNotIn
self.assertEqual(200, response.status_code)
self.assertIn(user.email, response.content)
self.assertIn(user.username, response.content)
assert_duplicate_presence_fn(duplicate_account_error_needle, response.content)
if linked is not None:
if linked:
expected_control_text = pipeline.ProviderUserState(
self.PROVIDER_CLASS, user, False).get_unlink_form_name()
else:
expected_control_text = pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_DASHBOARD)
icon_state = re.search(r'third-party-auth.+icon icon-(\w+)', response.content, re.DOTALL).groups()[0]
provider_name = re.search(r'<span class="provider">([^<]+)', response.content, re.DOTALL).groups()[0]
self.assertIn(expected_control_text, response.content)
self.assertEqual('link' if linked else 'unlink', icon_state)
self.assertEqual(self.PROVIDER_CLASS.NAME, provider_name)
def assert_exception_redirect_looks_correct(self, auth_entry=None):
"""Tests middleware conditional redirection.
middleware.ExceptionMiddleware makes sure the user ends up in the right
place when they cancel authentication via the provider's UX.
"""
exception_middleware = middleware.ExceptionMiddleware()
request, _ = self.get_request_and_strategy(auth_entry=auth_entry)
response = exception_middleware.process_exception(
request, exceptions.AuthCanceled(request.social_strategy.backend))
location = response.get('Location')
self.assertEqual(302, response.status_code)
self.assertIn('canceled', location)
self.assertIn(self.backend_name, location)
if auth_entry:
# Custom redirection to form.
self.assertTrue(location.startswith('/' + auth_entry))
else:
# Stock framework redirection to root.
self.assertTrue(location.startswith('/?'))
def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=None, success=None):
"""Asserts first party auth was used in place of third party auth.
Args:
email: string. The user's email. If not None, will be set on POST.
password: string. The user's password. If not None, will be set on
POST.
success: None or bool. Whether we expect auth to be successful. Set
to None to indicate we expect the request to be invalid (meaning
one of username or password will be missing).
"""
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
self.create_user_models_for_existing_account(
strategy, email, password, self.get_username(), skip_social_auth=True)
strategy.request.POST = dict(strategy.request.POST)
if email:
strategy.request.POST['email'] = email
if password:
strategy.request.POST['password'] = 'bad_' + password if success is False else password
self.assert_pipeline_running(strategy.request)
payload = json.loads(student_views.login_user(strategy.request).content)
if success is None:
# Request malformed -- just one of email/password given.
self.assertFalse(payload.get('success'))
self.assertIn('There was an error receiving your login information', payload.get('value'))
elif success:
# Request well-formed and credentials good.
self.assertTrue(payload.get('success'))
else:
# Request well-formed but credentials bad.
self.assertFalse(payload.get('success'))
self.assertIn('incorrect', payload.get('value'))
def assert_javascript_would_submit_login_form(self, boolean, response):
"""Asserts we pass form submit JS the right boolean string."""
argument_string = re.search(
r'function\ post_form_if_pipeline_running.*\(([a-z]+)\)', response.content, re.DOTALL).groups()[0]
self.assertIn(argument_string, ['true', 'false'])
self.assertEqual(boolean, True if argument_string == 'true' else False)
def assert_json_failure_response_is_missing_social_auth(self, response):
"""Asserts failure on /login for missing social auth looks right."""
self.assertEqual(200, response.status_code) # Yes, it's a 200 even though it's a failure.
payload = json.loads(response.content)
self.assertFalse(payload.get('success'))
self.assertIn('associated with your %s account' % self.PROVIDER_CLASS.NAME, payload.get('value'))
def assert_json_failure_response_is_username_collision(self, response):
"""Asserts the json response indicates a username collision."""
self.assertEqual(400, response.status_code)
payload = json.loads(response.content)
self.assertFalse(payload.get('success'))
self.assertIn('already exists', payload.get('value'))
def assert_json_success_response_looks_correct(self, response):
"""Asserts the json response indicates success and redirection."""
self.assertEqual(200, response.status_code)
payload = json.loads(response.content)
self.assertTrue(payload.get('success'))
self.assertEqual(pipeline.get_complete_url(self.PROVIDER_CLASS.BACKEND_CLASS.name), payload.get('redirect_url'))
def assert_login_response_before_pipeline_looks_correct(self, response):
"""Asserts a GET of /login not in the pipeline looks correct."""
self.assertEqual(200, response.status_code)
self.assertIn('Sign in with ' + self.PROVIDER_CLASS.NAME, response.content)
self.assert_javascript_would_submit_login_form(False, response)
self.assert_signin_button_looks_functional(response.content, pipeline.AUTH_ENTRY_LOGIN)
def assert_login_response_in_pipeline_looks_correct(self, response):
"""Asserts a GET of /login in the pipeline looks correct."""
self.assertEqual(200, response.status_code)
# Make sure the form submit JS is told to submit the form:
self.assert_javascript_would_submit_login_form(True, response)
def assert_password_overridden_by_pipeline(self, username, password):
"""Verifies that the given password is not correct.
The pipeline overrides POST['password'], if any, with random data.
"""
self.assertIsNone(auth.authenticate(password=password, username=username))
def assert_pipeline_running(self, request):
"""Makes sure the given request is running an auth pipeline."""
self.assertTrue(pipeline.running(request))
def assert_redirect_to_dashboard_looks_correct(self, response):
"""Asserts a response would redirect to /dashboard."""
self.assertEqual(302, response.status_code)
# pylint: disable-msg=protected-access
self.assertEqual(auth_settings._SOCIAL_AUTH_LOGIN_REDIRECT_URL, response.get('Location'))
def assert_redirect_to_login_looks_correct(self, response):
"""Asserts a response would redirect to /login."""
self.assertEqual(302, response.status_code)
self.assertEqual('/' + pipeline.AUTH_ENTRY_LOGIN, response.get('Location'))
def assert_redirect_to_register_looks_correct(self, response):
"""Asserts a response would redirect to /register."""
self.assertEqual(302, response.status_code)
self.assertEqual('/' + pipeline.AUTH_ENTRY_REGISTER, response.get('Location'))
def assert_register_response_before_pipeline_looks_correct(self, response):
"""Asserts a GET of /register not in the pipeline looks correct."""
self.assertEqual(200, response.status_code)
self.assertIn('Sign in with ' + self.PROVIDER_CLASS.NAME, response.content)
self.assert_signin_button_looks_functional(response.content, pipeline.AUTH_ENTRY_REGISTER)
def assert_signin_button_looks_functional(self, content, auth_entry):
"""Asserts JS is available to signin buttons and has the right args."""
self.assertTrue(re.search(r'function thirdPartySignin', content))
self.assertEqual(
pipeline.get_login_url(self.PROVIDER_CLASS.NAME, auth_entry),
re.search(r"thirdPartySignin\(event, '([^']+)", content).groups()[0])
def assert_social_auth_does_not_exist_for_user(self, user, strategy):
"""Asserts a user does not have an auth with the expected provider."""
social_auths = strategy.storage.user.get_social_auth_for_user(
user, provider=self.PROVIDER_CLASS.BACKEND_CLASS.name)
self.assertEqual(0, len(social_auths))
def assert_social_auth_exists_for_user(self, user, strategy):
"""Asserts a user has a social auth with the expected provider."""
social_auths = strategy.storage.user.get_social_auth_for_user(
user, provider=self.PROVIDER_CLASS.BACKEND_CLASS.name)
self.assertEqual(1, len(social_auths))
self.assertEqual(self.backend_name, social_auths[0].provider)
def configure_runtime(self):
"""Configures settings details."""
auth_settings.apply_settings({self.PROVIDER_CLASS.NAME: self.PROVIDER_SETTINGS}, django_settings)
# Force settings to propagate into cached members on
# social.apps.django_app.utils.
reload(social_utils)
def create_user_models_for_existing_account(self, strategy, email, password, username, skip_social_auth=False):
"""Creates user, profile, registration, and (usually) social auth.
This synthesizes what happens during /register.
See student.views.register and student.views._do_create_account.
"""
response_data = self.get_response_data()
uid = strategy.backend.get_user_id(response_data, response_data)
user = social_utils.Storage.user.create_user(email=email, password=password, username=username)
profile = student_models.UserProfile(user=user)
profile.save()
registration = student_models.Registration()
registration.register(user)
registration.save()
if not skip_social_auth:
social_utils.Storage.user.create_social_auth(user, uid, self.PROVIDER_CLASS.BACKEND_CLASS.name)
return user
def fake_auth_complete(self, strategy):
"""Fake implementation of social.backends.BaseAuth.auth_complete.
Unlike what the docs say, it does not need to return a user instance.
Sometimes (like when directing users to the /register form) it instead
returns a response that 302s to /register.
"""
args = ()
kwargs = {
'request': strategy.request,
'backend': strategy.backend,
'user': None,
'response': self.get_response_data(),
}
return strategy.authenticate(*args, **kwargs)
def get_registration_post_vars(self, overrides=None):
"""POST vars generated by the registration form."""
defaults = {
'username': 'username',
'name': 'First Last',
'gender': '',
'year_of_birth': '',
'level_of_education': '',
'goals': '',
'honor_code': 'true',
'terms_of_service': 'true',
'password': 'password',
'mailing_address': '',
'email': 'user@email.com',
}
if overrides:
defaults.update(overrides)
return defaults
def get_request_and_strategy(self, auth_entry=None, redirect_uri=None):
"""Gets a fully-configured request and strategy.
These two objects contain circular references, so we create them
together. The references themselves are a mixture of normal __init__
stuff and monkey-patching done by python-social-auth. See, for example,
social.apps.django_apps.utils.strategy().
"""
request = self.request_factory.get(
pipeline.get_complete_url(self.backend_name) +
'?redirect_state=redirect_state_value&code=code_value&state=state_value')
request.user = auth_models.AnonymousUser()
request.session = cache.SessionStore()
request.session[self.backend_name + '_state'] = 'state_value'
if auth_entry:
request.session[pipeline.AUTH_ENTRY_KEY] = auth_entry
strategy = social_utils.load_strategy(backend=self.backend_name, redirect_uri=redirect_uri, request=request)
request.social_strategy = strategy
return request, strategy
def get_user_by_email(self, strategy, email):
"""Gets a user by email, using the given strategy."""
return strategy.storage.user.user_model().objects.get(email=email)
# Actual tests, executed once per child.
def test_canceling_authentication_redirects_to_login_when_auth_entry_login(self):
self.assert_exception_redirect_looks_correct(auth_entry=pipeline.AUTH_ENTRY_LOGIN)
def test_canceling_authentication_redirects_to_register_when_auth_entry_register(self):
self.assert_exception_redirect_looks_correct(auth_entry=pipeline.AUTH_ENTRY_REGISTER)
def test_canceling_authentication_redirects_to_root_when_auth_entry_not_set(self):
self.assert_exception_redirect_looks_correct()
def test_full_pipeline_succeeds_for_linking_account(self):
# First, create, the request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
request.user = self.create_user_models_for_existing_account(
strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)
# Instrument the pipeline to get to the dashboard with the full
# expected state.
self.client.get(
pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(strategy, social_views._do_login) # pylint: disable-msg=protected-access
student_views.signin_user(strategy.request)
student_views.login_user(strategy.request)
actions.do_complete(strategy, social_views._do_login) # pylint: disable-msg=protected-access
# First we expect that we're in the unlinked state, and that there
# really is no association in the backend.
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), request.user, linked=False)
self.assert_social_auth_does_not_exist_for_user(request.user, strategy)
# Fire off the auth pipeline to link.
self.assert_redirect_to_dashboard_looks_correct(actions.do_complete(
request.social_strategy, social_views._do_login, request.user, None, # pylint: disable-msg=protected-access
redirect_field_name=auth.REDIRECT_FIELD_NAME))
# Now we expect to be in the linked state, with a backend entry.
self.assert_social_auth_exists_for_user(request.user, strategy)
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), request.user, linked=True)
def test_full_pipeline_succeeds_for_unlinking_account(self):
# First, create, the request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, 'user@example.com', 'password', self.get_username())
self.assert_social_auth_exists_for_user(user, strategy)
# Instrument the pipeline to get to the dashboard with the full
# expected state.
self.client.get(
pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(strategy, social_views._do_login) # pylint: disable-msg=protected-access
student_views.signin_user(strategy.request)
student_views.login_user(strategy.request)
actions.do_complete(strategy, social_views._do_login, user=user) # pylint: disable-msg=protected-access
# First we expect that we're in the linked state, with a backend entry.
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user, linked=True)
self.assert_social_auth_exists_for_user(request.user, strategy)
# Fire off the disconnect pipeline to unlink.
self.assert_redirect_to_dashboard_looks_correct(actions.do_disconnect(
request.social_strategy, request.user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME))
# Now we expect to be in the unlinked state, with no backend entry.
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user, linked=False)
self.assert_social_auth_does_not_exist_for_user(user, strategy)
def test_linking_already_associated_account_raises_auth_already_associated(self):
# This is of a piece with
# test_already_associated_exception_populates_dashboard_with_error. It
# verifies the exception gets raised when we expect; the latter test
# covers exception handling.
email = 'user@example.com'
password = 'password'
username = self.get_username()
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
linked_user = self.create_user_models_for_existing_account(strategy, email, password, username)
unlinked_user = social_utils.Storage.user.create_user(
email='other_' + email, password=password, username='other_' + username)
self.assert_social_auth_exists_for_user(linked_user, strategy)
self.assert_social_auth_does_not_exist_for_user(unlinked_user, strategy)
with self.assertRaises(exceptions.AuthAlreadyAssociated):
actions.do_complete(strategy, social_views._do_login, user=unlinked_user) # pylint: disable-msg=protected-access
def test_already_associated_exception_populates_dashboard_with_error(self):
# Instrument the pipeline with an exception. We test that the
# exception is raised correctly separately, so it's ok that we're
# raising it artificially here. This makes the linked=True artificial
# in the final assert because in practice the account would be
# unlinked, but getting that behavior is cumbersome here and already
# covered in other tests. Using linked=True does, however, let us test
# that the duplicate error has no effect on the state of the controls.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, 'user@example.com', 'password', self.get_username())
self.assert_social_auth_exists_for_user(user, strategy)
self.client.get('/login')
self.client.get(pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
actions.do_complete(strategy, social_views._do_login) # pylint: disable-msg=protected-access
student_views.signin_user(strategy.request)
student_views.login_user(strategy.request)
actions.do_complete(strategy, social_views._do_login, user=user) # pylint: disable-msg=protected-access
# Monkey-patch storage for messaging; pylint: disable-msg=protected-access
request._messages = fallback.FallbackStorage(request)
middleware.ExceptionMiddleware().process_exception(
request, exceptions.AuthAlreadyAssociated(self.PROVIDER_CLASS.BACKEND_CLASS.name, 'duplicate'))
self.assert_dashboard_response_looks_correct(
student_views.dashboard(request), user, duplicate=True, linked=True)
def test_full_pipeline_succeeds_for_signing_in_to_existing_account(self):
# First, create, the request and strategy that store pipeline state,
# configure the backend, and mock out wire traffic.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
user = self.create_user_models_for_existing_account(
strategy, 'user@example.com', 'password', self.get_username())
self.assert_social_auth_exists_for_user(user, strategy)
# Begin! Ensure that the login form contains expected controls before
# the user starts the pipeline.
self.assert_login_response_before_pipeline_looks_correct(self.client.get('/login'))
# The pipeline starts by a user GETting /auth/login/<provider>.
# Synthesize that request and check that it redirects to the correct
# provider page.
self.assert_redirect_to_provider_looks_correct(self.client.get(
pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN)))
# Next, the provider makes a request against /auth/complete/<provider>
# to resume the pipeline.
# pylint: disable-msg=protected-access
self.assert_redirect_to_login_looks_correct(actions.do_complete(strategy, social_views._do_login))
# At this point we know the pipeline has resumed correctly. Next we
# fire off the view that displays the login form and posts it via JS.
self.assert_login_response_in_pipeline_looks_correct(student_views.signin_user(strategy.request))
# Next, we invoke the view that handles the POST, and expect it
# redirects to /auth/complete. In the browser ajax handlers will
# redirect the user to the dashboard; we invoke it manually here.
self.assert_json_success_response_looks_correct(student_views.login_user(strategy.request))
self.assert_redirect_to_dashboard_looks_correct(
actions.do_complete(strategy, social_views._do_login, user=user))
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user)
def test_signin_fails_if_no_account_associated(self):
_, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
self.create_user_models_for_existing_account(
strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)
self.assert_json_failure_response_is_missing_social_auth(student_views.login_user(strategy.request))
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_email_in_request(self):
self.assert_first_party_auth_trumps_third_party_auth(email='user@example.com')
def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_password_in_request(self):
self.assert_first_party_auth_trumps_third_party_auth(password='password')
def test_first_party_auth_trumps_third_party_auth_and_fails_when_credentials_bad(self):
self.assert_first_party_auth_trumps_third_party_auth(
email='user@example.com', password='password', success=False)
def test_first_party_auth_trumps_third_party_auth_and_succeeds_when_credentials_good(self):
self.assert_first_party_auth_trumps_third_party_auth(
email='user@example.com', password='password', success=True)
def test_full_pipeline_succeeds_registering_new_account(self):
# First, create, the request and strategy that store pipeline state.
# Mock out wire traffic.
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
# Begin! Grab the registration page and check the login control on it.
self.assert_register_response_before_pipeline_looks_correct(self.client.get('/register'))
# The pipeline starts by a user GETting /auth/login/<provider>.
# Synthesize that request and check that it redirects to the correct
# provider page.
self.assert_redirect_to_provider_looks_correct(self.client.get(
pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN)))
# Next, the provider makes a request against /auth/complete/<provider>.
# pylint:disable-msg=protected-access
self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))
# At this point we know the pipeline has resumed correctly. Next we
# fire off the view that displays the registration form.
self.assert_register_response_in_pipeline_looks_correct(
student_views.register_user(strategy.request), pipeline.get(request)['kwargs'])
# Next, we invoke the view that handles the POST. Not all providers
# supply email. Manually add it as the user would have to; this
# also serves as a test of overriding provider values. Always provide a
# password for us to check that we override it properly.
overridden_password = strategy.request.POST.get('password')
email = 'new@example.com'
if not strategy.request.POST.get('email'):
strategy.request.POST = self.get_registration_post_vars({'email': email})
# The user must not exist yet...
with self.assertRaises(auth_models.User.DoesNotExist):
self.get_user_by_email(strategy, email)
# ...but when we invoke create_account the existing edX view will make
# it, but not social auths. The pipeline creates those later.
self.assert_json_success_response_looks_correct(student_views.create_account(strategy.request))
# We've overridden the user's password, so authenticate() with the old
# value won't work:
created_user = self.get_user_by_email(strategy, email)
self.assert_password_overridden_by_pipeline(overridden_password, created_user.username)
# Last step in the pipeline: we re-invoke the pipeline and expect to
# end up on /dashboard, with the correct social auth object now in the
# backend and the correct user's data on display.
self.assert_redirect_to_dashboard_looks_correct(
actions.do_complete(strategy, social_views._do_login, user=created_user))
self.assert_social_auth_exists_for_user(created_user, strategy)
self.assert_dashboard_response_looks_correct(student_views.dashboard(request), created_user)
def test_new_account_registration_assigns_distinct_username_on_collision(self):
original_username = self.get_username()
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
# Create a colliding username in the backend, then proceed with
# assignment via pipeline to make sure a distinct username is created.
strategy.storage.user.create_user(username=self.get_username(), email='user@email.com', password='password')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
# pylint:disable-msg=protected-access
self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))
distinct_username = pipeline.get(request)['kwargs']['username']
self.assertNotEqual(original_username, distinct_username)
def test_new_account_registration_fails_if_email_exists(self):
request, strategy = self.get_request_and_strategy(
auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
# pylint:disable-msg=protected-access
self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))
self.assert_register_response_in_pipeline_looks_correct(
student_views.register_user(strategy.request), pipeline.get(request)['kwargs'])
strategy.request.POST = self.get_registration_post_vars()
# Create twice: once successfully, and once causing a collision.
student_views.create_account(strategy.request)
self.assert_json_failure_response_is_username_collision(student_views.create_account(strategy.request))
def test_pipeline_raises_auth_entry_error_if_auth_entry_invalid(self):
auth_entry = 'invalid'
self.assertNotIn(auth_entry, pipeline._AUTH_ENTRY_CHOICES) # pylint: disable-msg=protected-access
_, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri='social:complete')
with self.assertRaises(pipeline.AuthEntryError):
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
def test_pipeline_raises_auth_entry_error_if_auth_entry_missing(self):
_, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri='social:complete')
with self.assertRaises(pipeline.AuthEntryError):
strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
class Oauth2IntegrationTest(IntegrationTest): # pylint: disable-msg=abstract-method
"""Base test case for integration tests of Oauth2 providers."""
# Dict of string -> object. Information about the token granted to the
# user. Override with test values in subclass; None to force a throw.
TOKEN_RESPONSE_DATA = None
# Dict of string -> object. Information about the user themself. Override
# with test values in subclass; None to force a throw.
USER_RESPONSE_DATA = None
def get_response_data(self):
"""Gets dict (string -> object) of merged data about the user."""
response_data = dict(self.TOKEN_RESPONSE_DATA)
response_data.update(self.USER_RESPONSE_DATA)
return response_data
"""Integration tests for Google providers."""
from third_party_auth import provider
from third_party_auth.tests.specs import base
class GoogleOauth2IntegrationTest(base.Oauth2IntegrationTest):
"""Integration tests for provider.GoogleOauth2."""
PROVIDER_CLASS = provider.GoogleOauth2
PROVIDER_SETTINGS = {
'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY': 'google_oauth2_key',
'SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET': 'google_oauth2_secret',
}
TOKEN_RESPONSE_DATA = {
'access_token': 'access_token_value',
'expires_in': 'expires_in_value',
'id_token': 'id_token_value',
'token_type': 'token_type_value',
}
USER_RESPONSE_DATA = {
'email': 'email_value@example.com',
'family_name': 'family_name_value',
'given_name': 'given_name_value',
'id': 'id_value',
'link': 'link_value',
'locale': 'locale_value',
'name': 'name_value',
'picture': 'picture_value',
'verified_email': 'verified_email_value',
}
def get_username(self):
return self.get_response_data().get('email').split('@')[0]
"""Integration tests for LinkedIn providers."""
from third_party_auth import provider
from third_party_auth.tests.specs import base
class LinkedInOauth2IntegrationTest(base.Oauth2IntegrationTest):
"""Integration tests for provider.LinkedInOauth2."""
PROVIDER_CLASS = provider.LinkedInOauth2
PROVIDER_SETTINGS = {
'SOCIAL_AUTH_LINKEDIN_OAUTH2_KEY': 'linkedin_oauth2_key',
'SOCIAL_AUTH_LINKEDIN_OAUTH2_SECRET': 'linkedin_oauth2_secret',
}
TOKEN_RESPONSE_DATA = {
'access_token': 'access_token_value',
'expires_in': 'expires_in_value',
}
USER_RESPONSE_DATA = {
'lastName': 'lastName_value',
'id': 'id_value',
'firstName': 'firstName_value',
}
def get_username(self):
response_data = self.get_response_data()
return response_data.get('firstName') + response_data.get('lastName')
"""Unit tests for third_party_auth/pipeline.py."""
import random
from third_party_auth import pipeline, provider
from third_party_auth.tests import testutil
# Allow tests access to protected methods (or module-protected methods) under
# test. pylint: disable-msg=protected-access
class MakeRandomPasswordTest(testutil.TestCase):
"""Tests formation of random placeholder passwords."""
def setUp(self):
super(MakeRandomPasswordTest, self).setUp()
self.seed = 1
def test_default_args(self):
self.assertEqual(pipeline._DEFAULT_RANDOM_PASSWORD_LENGTH, len(pipeline.make_random_password()))
def test_probably_only_uses_charset(self):
# This is ultimately probablistic since we could randomly select a good character 100000 consecutive times.
for char in pipeline.make_random_password(length=100000):
self.assertIn(char, pipeline._PASSWORD_CHARSET)
def test_pseudorandomly_picks_chars_from_charset(self):
random_instance = random.Random(self.seed)
expected = ''.join(
random_instance.choice(pipeline._PASSWORD_CHARSET)
for _ in xrange(pipeline._DEFAULT_RANDOM_PASSWORD_LENGTH))
random_instance.seed(self.seed)
self.assertEqual(expected, pipeline.make_random_password(choice_fn=random_instance.choice))
class ProviderUserStateTestCase(testutil.TestCase):
"""Tests ProviderUserState behavior."""
def test_get_unlink_form_name(self):
state = pipeline.ProviderUserState(provider.GoogleOauth2, object(), False)
self.assertEqual(provider.GoogleOauth2.NAME + '_unlink_form', state.get_unlink_form_name())
"""Integration tests for pipeline.py."""
import unittest
from django.conf import settings
from django import test
from django.contrib.auth import models
from third_party_auth import pipeline, provider
from third_party_auth.tests import testutil
from social.apps.django_app.default import models as social_models
# Get Django User model by reference from python-social-auth. Not a type
# constant, pylint.
User = social_models.DjangoStorage.user.user_model() # pylint: disable-msg=invalid-name
class TestCase(testutil.TestCase, test.TestCase):
"""Base test case."""
def setUp(self):
super(TestCase, self).setUp()
self.enabled_provider_name = provider.GoogleOauth2.NAME
provider.Registry.configure_once([self.enabled_provider_name])
self.enabled_provider = provider.Registry.get(self.enabled_provider_name)
@unittest.skipUnless(
testutil.AUTH_FEATURES_KEY in settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
class GetAuthenticatedUserTestCase(TestCase):
"""Tests for get_authenticated_user."""
def setUp(self):
super(GetAuthenticatedUserTestCase, self).setUp()
self.user = social_models.DjangoStorage.user.create_user(username='username', password='password')
def get_by_username(self, username):
"""Gets a User by username."""
return social_models.DjangoStorage.user.user_model().objects.get(username=username)
def test_raises_does_not_exist_if_user_missing(self):
with self.assertRaises(models.User.DoesNotExist):
pipeline.get_authenticated_user('new_' + self.user.username, 'backend')
def test_raises_does_not_exist_if_user_found_but_no_association(self):
backend_name = 'backend'
self.assertIsNotNone(self.get_by_username(self.user.username))
self.assertIsNone(provider.Registry.get_by_backend_name(backend_name))
with self.assertRaises(models.User.DoesNotExist):
pipeline.get_authenticated_user(self.user.username, 'backend')
def test_raises_does_not_exist_if_user_and_association_found_but_no_match(self):
self.assertIsNotNone(self.get_by_username(self.user.username))
social_models.DjangoStorage.user.create_social_auth(
self.user, 'uid', 'other_' + self.enabled_provider.BACKEND_CLASS.name)
with self.assertRaises(models.User.DoesNotExist):
pipeline.get_authenticated_user(self.user.username, self.enabled_provider.BACKEND_CLASS.name)
def test_returns_user_with_is_authenticated_and_backend_set_if_match(self):
social_models.DjangoStorage.user.create_social_auth(self.user, 'uid', self.enabled_provider.BACKEND_CLASS.name)
user = pipeline.get_authenticated_user(self.user.username, self.enabled_provider.BACKEND_CLASS.name)
self.assertEqual(self.user, user)
self.assertEqual(self.enabled_provider.get_authentication_backend(), user.backend)
@unittest.skipUnless(
testutil.AUTH_FEATURES_KEY in settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
class GetProviderUserStatesTestCase(testutil.TestCase, test.TestCase):
"""Tests generation of ProviderUserStates."""
def setUp(self):
super(GetProviderUserStatesTestCase, self).setUp()
self.user = social_models.DjangoStorage.user.create_user(username='username', password='password')
def test_returns_empty_list_if_no_enabled_providers(self):
provider.Registry.configure_once([])
self.assertEquals([], pipeline.get_provider_user_states(self.user))
def test_state_not_returned_for_disabled_provider(self):
disabled_provider = provider.GoogleOauth2
enabled_provider = provider.LinkedInOauth2
provider.Registry.configure_once([enabled_provider.NAME])
social_models.DjangoStorage.user.create_social_auth(self.user, 'uid', disabled_provider.BACKEND_CLASS.name)
states = pipeline.get_provider_user_states(self.user)
self.assertEqual(1, len(states))
self.assertNotIn(disabled_provider, (state.provider for state in states))
def test_states_for_enabled_providers_user_has_accounts_associated_with(self):
provider.Registry.configure_once([provider.GoogleOauth2.NAME, provider.LinkedInOauth2.NAME])
social_models.DjangoStorage.user.create_social_auth(self.user, 'uid', provider.GoogleOauth2.BACKEND_CLASS.name)
social_models.DjangoStorage.user.create_social_auth(
self.user, 'uid', provider.LinkedInOauth2.BACKEND_CLASS.name)
states = pipeline.get_provider_user_states(self.user)
self.assertEqual(2, len(states))
google_state = [state for state in states if state.provider == provider.GoogleOauth2][0]
linkedin_state = [state for state in states if state.provider == provider.LinkedInOauth2][0]
self.assertTrue(google_state.has_account)
self.assertEqual(provider.GoogleOauth2, google_state.provider)
self.assertEqual(self.user, google_state.user)
self.assertTrue(linkedin_state.has_account)
self.assertEqual(provider.LinkedInOauth2, linkedin_state.provider)
self.assertEqual(self.user, linkedin_state.user)
def test_states_for_enabled_providers_user_has_no_account_associated_with(self):
provider.Registry.configure_once([provider.GoogleOauth2.NAME, provider.LinkedInOauth2.NAME])
states = pipeline.get_provider_user_states(self.user)
self.assertEqual([], [x for x in social_models.DjangoStorage.user.objects.all()])
self.assertEqual(2, len(states))
google_state = [state for state in states if state.provider == provider.GoogleOauth2][0]
linkedin_state = [state for state in states if state.provider == provider.LinkedInOauth2][0]
self.assertFalse(google_state.has_account)
self.assertEqual(provider.GoogleOauth2, google_state.provider)
self.assertEqual(self.user, google_state.user)
self.assertFalse(linkedin_state.has_account)
self.assertEqual(provider.LinkedInOauth2, linkedin_state.provider)
self.assertEqual(self.user, linkedin_state.user)
@unittest.skipUnless(
testutil.AUTH_FEATURES_KEY in settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
class UrlFormationTestCase(TestCase):
"""Tests formation of URLs for pipeline hook points."""
def test_complete_url_raises_value_error_if_provider_not_enabled(self):
provider_name = 'not_enabled'
self.assertIsNone(provider.Registry.get(provider_name))
with self.assertRaises(ValueError):
pipeline.get_complete_url(provider_name)
def test_complete_url_returns_expected_format(self):
complete_url = pipeline.get_complete_url(self.enabled_provider.BACKEND_CLASS.name)
self.assertTrue(complete_url.startswith('/auth/complete'))
self.assertIn(self.enabled_provider.BACKEND_CLASS.name, complete_url)
def test_disconnect_url_raises_value_error_if_provider_not_enabled(self):
provider_name = 'not_enabled'
self.assertIsNone(provider.Registry.get(provider_name))
with self.assertRaises(ValueError):
pipeline.get_disconnect_url(provider_name)
def test_disconnect_url_returns_expected_format(self):
disconnect_url = pipeline.get_disconnect_url(self.enabled_provider.NAME)
self.assertTrue(disconnect_url.startswith('/auth/disconnect'))
self.assertIn(self.enabled_provider.BACKEND_CLASS.name, disconnect_url)
def test_login_url_raises_value_error_if_provider_not_enabled(self):
provider_name = 'not_enabled'
self.assertIsNone(provider.Registry.get(provider_name))
with self.assertRaises(ValueError):
pipeline.get_login_url(provider_name, pipeline.AUTH_ENTRY_LOGIN)
def test_login_url_returns_expected_format(self):
login_url = pipeline.get_login_url(self.enabled_provider.NAME, pipeline.AUTH_ENTRY_LOGIN)
self.assertTrue(login_url.startswith('/auth/login'))
self.assertIn(self.enabled_provider.BACKEND_CLASS.name, login_url)
self.assertTrue(login_url.endswith(pipeline.AUTH_ENTRY_LOGIN))
"""
Test configuration of providers.
"""
"""Unit tests for provider.py."""
from third_party_auth import provider
from third_party_auth.tests import testutil
......@@ -10,8 +8,7 @@ class RegistryTest(testutil.TestCase):
"""Tests registry discovery and operation."""
# Allow access to protected methods (or module-protected methods) under
# test.
# pylint: disable-msg=protected-access
# test. pylint: disable-msg=protected-access
def test_calling_configure_once_twice_raises_value_error(self):
provider.Registry.configure_once([provider.GoogleOauth2.NAME])
......@@ -68,4 +65,18 @@ class RegistryTest(testutil.TestCase):
def test_get_returns_none_if_provider_not_enabled(self):
provider.Registry.configure_once([])
self.assertIsNone(provider.Registry.get(provider.MozillaPersona.NAME))
self.assertIsNone(provider.Registry.get(provider.LinkedInOauth2.NAME))
def test_get_by_backend_name_raises_runtime_error_if_not_configured(self):
with self.assertRaisesRegexp(RuntimeError, '^.*not configured$'):
provider.Registry.get_by_backend_name('')
def test_get_by_backend_name_returns_enabled_provider(self):
provider.Registry.configure_once([provider.GoogleOauth2.NAME])
self.assertIs(
provider.GoogleOauth2,
provider.Registry.get_by_backend_name(provider.GoogleOauth2.BACKEND_CLASS.name))
def test_get_by_backend_name_returns_none_if_provider_not_enabled(self):
provider.Registry.configure_once([])
self.assertIsNone(provider.Registry.get_by_backend_name(provider.GoogleOauth2.BACKEND_CLASS.name))
"""
Unit tests for settings code.
"""
"""Unit tests for settings.py."""
from third_party_auth import provider
from third_party_auth import settings
from third_party_auth import provider, settings
from third_party_auth.tests import testutil
_ORIGINAL_AUTHENTICATION_BACKENDS = ('first_authentication_backend',)
_ORIGINAL_INSTALLED_APPS = ('first_installed_app',)
_ORIGINAL_MIDDLEWARE_CLASSES = ('first_middleware_class',)
_ORIGINAL_TEMPLATE_CONTEXT_PROCESSORS = ('first_template_context_preprocessor',)
_SETTINGS_MAP = {
'AUTHENTICATION_BACKENDS': _ORIGINAL_AUTHENTICATION_BACKENDS,
'INSTALLED_APPS': _ORIGINAL_INSTALLED_APPS,
'MIDDLEWARE_CLASSES': _ORIGINAL_MIDDLEWARE_CLASSES,
'TEMPLATE_CONTEXT_PROCESSORS': _ORIGINAL_TEMPLATE_CONTEXT_PROCESSORS,
}
......@@ -20,6 +19,8 @@ _SETTINGS_MAP = {
class SettingsUnitTest(testutil.TestCase):
"""Unit tests for settings management code."""
# Allow access to protected methods (or module-protected methods) under
# test. pylint: disable-msg=protected-access
# Suppress sprurious no-member warning on fakes.
# pylint: disable-msg=no-member
......@@ -27,6 +28,15 @@ class SettingsUnitTest(testutil.TestCase):
super(SettingsUnitTest, self).setUp()
self.settings = testutil.FakeDjangoSettings(_SETTINGS_MAP)
def test_apply_settings_adds_exception_middleware(self):
settings.apply_settings({}, self.settings)
for middleware_name in settings._MIDDLEWARE_CLASSES:
self.assertIn(middleware_name, self.settings.MIDDLEWARE_CLASSES)
def test_apply_settings_adds_fields_stored_in_session(self):
settings.apply_settings({}, self.settings)
self.assertEqual(settings._FIELDS_STORED_IN_SESSION, self.settings.FIELDS_STORED_IN_SESSION)
def test_apply_settings_adds_third_party_auth_to_installed_apps(self):
settings.apply_settings({}, self.settings)
self.assertIn('third_party_auth', self.settings.INSTALLED_APPS)
......@@ -50,9 +60,9 @@ class SettingsUnitTest(testutil.TestCase):
def test_apply_settings_prepends_auth_backends(self):
self.assertEqual(_ORIGINAL_AUTHENTICATION_BACKENDS, self.settings.AUTHENTICATION_BACKENDS)
settings.apply_settings({provider.GoogleOauth2.NAME: {}, provider.MozillaPersona.NAME: {}}, self.settings)
settings.apply_settings({provider.GoogleOauth2.NAME: {}, provider.LinkedInOauth2.NAME: {}}, self.settings)
self.assertEqual((
provider.GoogleOauth2.AUTHENTICATION_BACKEND, provider.MozillaPersona.AUTHENTICATION_BACKEND) +
provider.GoogleOauth2.get_authentication_backend(), provider.LinkedInOauth2.get_authentication_backend()) +
_ORIGINAL_AUTHENTICATION_BACKENDS,
self.settings.AUTHENTICATION_BACKENDS)
......@@ -66,3 +76,9 @@ class SettingsUnitTest(testutil.TestCase):
}
with self.assertRaisesRegexp(ValueError, '^.*not initialized$'):
settings.apply_settings(auth_info, self.settings)
def test_apply_settings_turns_off_raising_social_exceptions(self):
# Guard against submitting a conf change that's convenient in dev but
# bad in prod.
settings.apply_settings({}, self.settings)
self.assertFalse(self.settings.SOCIAL_AUTH_RAISE_EXCEPTIONS)
"""
Integration tests for settings code.
"""
import mock
import unittest
"""Integration tests for settings.py."""
from django.conf import settings
......@@ -11,29 +6,22 @@ from third_party_auth import provider
from third_party_auth import settings as auth_settings
from third_party_auth.tests import testutil
_AUTH_FEATURES_KEY = 'ENABLE_THIRD_PARTY_AUTH'
class SettingsIntegrationTest(testutil.TestCase):
"""Integration tests of auth settings pipeline."""
"""Integration tests of auth settings pipeline.
@unittest.skipUnless(_AUTH_FEATURES_KEY in settings.FEATURES, _AUTH_FEATURES_KEY + ' not in settings.FEATURES')
def test_enable_third_party_auth_is_disabled_by_default(self):
self.assertIs(False, settings.FEATURES.get(_AUTH_FEATURES_KEY))
Note that ENABLE_THIRD_PARTY_AUTH is True in lms/envs/test.py and False in
cms/envs/test.py. This implicitly gives us coverage of the full settings
mechanism with both values, so we do not have explicit test methods as they
are superfluous.
"""
@mock.patch.dict(settings.FEATURES, {'ENABLE_THIRD_PARTY_AUTH': True})
def test_can_enable_google_oauth2(self):
auth_settings.apply_settings({'Google': {'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY': 'google_key'}}, settings)
self.assertEqual([provider.GoogleOauth2], provider.Registry.enabled())
self.assertEqual('google_key', settings.SOCIAL_AUTH_GOOGLE_OAUTH2_KEY)
@mock.patch.dict(settings.FEATURES, {'ENABLE_THIRD_PARTY_AUTH': True})
def test_can_enable_linkedin_oauth2(self):
auth_settings.apply_settings({'LinkedIn': {'SOCIAL_AUTH_LINKEDIN_OAUTH2_KEY': 'linkedin_key'}}, settings)
self.assertEqual([provider.LinkedInOauth2], provider.Registry.enabled())
self.assertEqual('linkedin_key', settings.SOCIAL_AUTH_LINKEDIN_OAUTH2_KEY)
@mock.patch.dict(settings.FEATURES, {'ENABLE_THIRD_PARTY_ATUH': True})
def test_can_enable_mozilla_persona(self):
auth_settings.apply_settings({'Mozilla Persona': {}}, settings)
self.assertEqual([provider.MozillaPersona], provider.Registry.enabled())
......@@ -9,11 +9,14 @@ import unittest
from third_party_auth import provider
AUTH_FEATURES_KEY = 'ENABLE_THIRD_PARTY_AUTH'
class FakeDjangoSettings(object):
"""A fake for Django settings."""
def __init__(self, mappings):
"""Initializes the fake from `mappings`, a dict."""
"""Initializes the fake from mappings dict."""
for key, value in mappings.iteritems():
setattr(self, key, value)
......
......@@ -2,6 +2,8 @@
from django.conf.urls import include, patterns, url
urlpatterns = patterns(
'', url(r'^auth/', include('social.apps.django_app.urls', namespace='social')),
'',
url(r'^auth/', include('social.apps.django_app.urls', namespace='social')),
)
......@@ -180,6 +180,9 @@ SECRET_KEY = '85920908f28904ed733fe576320db18cabd7b6cd'
# hide ratelimit warnings while running tests
filterwarnings('ignore', message='No request passed to the backend, unable to rate-limit')
######### Third-party auth ##########
FEATURES['ENABLE_THIRD_PARTY_AUTH'] = True
################################## OPENID #####################################
FEATURES['AUTH_USE_OPENID'] = True
FEATURES['AUTH_USE_OPENID_PROVIDER'] = True
......
......@@ -450,6 +450,20 @@
}
}
// forms - third-party auth
.form-third-party-auth {
margin-bottom: $baseline;
button {
margin-right: $baseline;
.icon {
color: inherit;
margin-right: $baseline/2;
}
}
}
// forms - messages/status
.status {
@include box-sizing(border-box);
......
......@@ -130,6 +130,23 @@
white-space: nowrap;
text-overflow: ellipsis;
overflow: hidden;
.third-party-auth {
color: inherit;
font-weight: inherit;
.control {
float: right;
}
.icon {
margin-top: 4px;
}
.provider {
display: inline;
}
}
}
}
}
......
<%! from django.utils.translation import ugettext as _ %>
<%! from django.template import RequestContext %>
<%! from third_party_auth import pipeline %>
<%!
from django.core.urlresolvers import reverse
......@@ -194,6 +195,13 @@
</section>
%endif
% if duplicate_provider:
<section class="dashboard-banner third-party-auth">
## Translators: this message is displayed when a user tries to link their account with a third-party authentication provider (for example, Google or LinkedIn) with a given edX account, but their third-party account is already associated with another edX account. provider_name is the name of the third-party authentication provider, and platform_name is the name of the edX deployment.
${_('The selected {provider_name} account is already linked to another {platform_name} account. Please {link_start}log out{link_end}, then log in with your {provider_name} account.').format(link_end='</a>', link_start='<a href="%s">' % logout_url, provider_name='<strong>%s</strong>' % duplicate_provider.NAME, platform_name=platform_name)}
</section>
% endif
<section class="profile-sidebar">
<header class="profile">
<h1 class="user-name">${ user.username }</h1>
......@@ -215,6 +223,53 @@
<%include file='dashboard/_dashboard_info_language.html' />
%endif
% if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
<li class="controls--account">
<span class="title">
<div class="icon icon-gears"></div>
## Translators: this section lists all the third-party authentication providers (for example, Google and LinkedIn) the user can link with or unlink from their edX account.
${_("Account Links")}
</span>
<span class="data">
<span class="third-party-auth">
% for state in provider_user_states:
<div>
% if state.has_account:
<span class="icon icon-link pull-left"></span>
% else:
<span class="icon icon-unlink pull-left"></span>
% endif
<span class="provider">${state.provider.NAME}</span>
<span class="control">
% if state.has_account:
<form
action="${pipeline.get_disconnect_url(state.provider.NAME)}"
method="post"
name="${state.get_unlink_form_name()}">
<input type="hidden" name="csrfmiddlewaretoken" value="${csrf_token}">
</form>
<a href="#" onclick="document.${state.get_unlink_form_name()}.submit()">
## Translators: clicking on this removes the link between a user's edX account and their account with an external authentication provider (like Google or LinkedIn).
${_("unlink")}
</a>
% else:
<a href="${pipeline.get_login_url(state.provider.NAME, pipeline.AUTH_ENTRY_DASHBOARD)}">
## Translators: clicking on this creates a link between a user's edX account and their account with an external authentication provider (like Google or LinkedIn).
${_("link")}
</a>
% endif
</span>
</div>
% endfor
</span>
</li>
% endif
% if external_auth_map is None or 'shib' not in external_auth_map.external_domain:
<li class="controls--account">
<span class="title"><div class="icon"></div><a href="#password_reset_complete" rel="leanModal" id="pwd_reset_button">${_("Reset Password")}</a></span>
......
......@@ -4,6 +4,7 @@
<%! from django.core.urlresolvers import reverse %>
<%! from django.utils.translation import ugettext as _ %>
<%! from third_party_auth import provider, pipeline %>
<%block name="pagetitle">${_("Log into your {platform_name} Account").format(platform_name=platform_name)}</%block>
......@@ -93,6 +94,22 @@
text("${_(u'Processing your account information…')}");
}
}
function thirdPartySignin(event, url) {
event.preventDefault();
window.location.href = url;
}
(function post_form_if_pipeline_running(pipeline_running) {
// If the pipeline is running, the user has already authenticated via a
// third-party provider. We want to invoke /login_ajax to loop in the
// code that does logging and sets cookies on the request. It is most
// consistent to do that by using the same mechanism that is used when
// the use does first-party sign-in: POSTing the sign-in form.
if (pipeline_running) {
$('#login-form').submit();
}
})(${pipeline_running})
</script>
</%block>
......@@ -164,6 +181,28 @@
<button name="submit" type="submit" id="submit" class="action action-primary action-update"></button>
</div>
</form>
% if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
<hr />
<p class="instructions">
## Developers: this is a sentence fragment, which is usually frowned upon. The design of the pags uses this fragment to provide an "else" clause underneath a number of choices. It's OK to leave it.
## Translators: this is the last choice of a number of choices of how to log in to the site.
${_('or, if you have connected one of these providers, log in below.')}
</p>
<div class="form-actions form-third-party-auth">
% for enabled in provider.Registry.enabled():
## Translators: provider_name is the name of an external, third-party user authentication provider (like Google or LinkedIn).
<button type="submit" class="button button-primary" onclick="thirdPartySignin(event, '${pipeline.get_login_url(enabled.NAME, pipeline.AUTH_ENTRY_LOGIN)}');"><span class="icon ${enabled.ICON_CLASS}"></span>${_('Sign in with {provider_name}').format(provider_name=enabled.NAME)}</button>
% endfor
</div>
% endif
</section>
<aside role="complementary">
......
......@@ -12,6 +12,7 @@
<%! from django.utils.translation import ugettext as _ %>
<%! from student.models import UserProfile %>
<%! from datetime import date %>
<%! from third_party_auth import pipeline, provider %>
<%! import calendar %>
<%block name="pagetitle">${_("Register for {platform_name}").format(platform_name=platform_name)}</%block>
......@@ -67,6 +68,11 @@
});
})(this);
function thirdPartySignin(event, url) {
event.preventDefault();
window.location.href = url;
}
function toggleSubmitButton(enable) {
var $submitButton = $('form .form-actions #submit');
......@@ -110,11 +116,46 @@
<ul class="message-copy"> </ul>
</div>
% if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
% if not running_pipeline:
<p class="instructions">
${_("Register to start learning today!")}
</p>
<div class="form-actions form-third-party-auth">
% for enabled in provider.Registry.enabled():
## Translators: provider_name is the name of an external, third-party user authentication service (like Google or LinkedIn).
<button type="submit" class="button button-primary" onclick="thirdPartySignin(event, '${pipeline.get_login_url(enabled.NAME, pipeline.AUTH_ENTRY_REGISTER)}');"><span class="icon ${enabled.ICON_CLASS}"></span>${_('Sign in with {provider_name}').format(provider_name=enabled.NAME)}</button>
% endfor
</div>
<p class="instructions">
${_('or create your own {platform_name} account by completing all <strong>required*</strong> fields below.').format(platform_name=platform_name)}
</p>
% else:
<p class="instructions">
## Translators: selected_provider is the name of an external, third-party user authentication service (like Google or LinkedIn).
${_("You've successfully signed in with {selected_provider}.").format(selected_provider='<strong>%s</strong>' % selected_provider)}<br />
${_("Finish your account registration below to start learning.")}
</p>
% endif
% else:
<p class="instructions">
${_("Please complete the following fields to register for an account. ")}<br />
${_('Required fields are noted by <strong class="indicator">bold text and an asterisk (*)</strong>.')}
</p>
% endif
<div class="group group-form group-form-requiredinformation">
<h2 class="sr">${_('Required Information')}</h2>
......@@ -123,20 +164,33 @@
<ol class="list-input">
<li class="field required text" id="field-email">
<label for="email">${_('E-mail')}</label>
<input class="" id="email" type="email" name="email" value="" placeholder="${_('example: username@domain.com')}" required aria-required="true" />
<input class="" id="email" type="email" name="email" value="${email}" placeholder="${_('example: username@domain.com')}" required aria-required="true" />
</li>
% if settings.FEATURES.get('ENABLE_THIRD_PARTY_AUTH') and running_pipeline:
<li class="is-disabled field optional password" id="field-password" hidden>
<label for="password">${_('Password')}</label>
<input id="password" type="password" name="password" value="" disabled required aria-required="true" />
</li>
% else:
<li class="field required password" id="field-password">
<label for="password">${_('Password')}</label>
<input id="password" type="password" name="password" value="" required aria-required="true" />
</li>
% endif
<li class="field required text" id="field-username">
<label for="username">${_('Public Username')}</label>
<input id="username" type="text" name="username" value="" placeholder="${_('example: JaneDoe')}" required aria-required="true" aria-describedby="username-tip"/>
<input id="username" type="text" name="username" value="${username}" placeholder="${_('example: JaneDoe')}" required aria-required="true" aria-describedby="username-tip"/>
<span class="tip tip-input" id="username-tip">${_('Will be shown in any discussions or forums you participate in')} <strong>(${_('cannot be changed later')})</strong></span>
</li>
<li class="field required text" id="field-name">
<label for="name">${_('Full Name')}</label>
<input id="name" type="text" name="name" value="" placeholder="${_('example: Jane Doe')}" required aria-required="true" aria-describedby="name-tip" />
<input id="name" type="text" name="name" value="${name}" placeholder="${_('example: Jane Doe')}" required aria-required="true" aria-describedby="name-tip" />
<span class="tip tip-input" id="name-tip">${_("Needed for any certificates you may earn")}</span>
</li>
</ol>
......
......@@ -60,7 +60,7 @@ pyparsing==2.0.1
python-memcached==1.48
python-openid==2.2.5
python-dateutil==2.1
python-social-auth==0.1.21
python-social-auth==0.1.23
pytz==2012h
pysrt==0.4.7
PyYAML==3.10
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment