Commit d5c5fb34 by Diana Huang

Merge pull request #290 from edx/diana/open-id-namespace

Handle issues decoding openid requests more gracefully
parents 4a41ce80 f33bfd1c
...@@ -9,9 +9,11 @@ from urlparse import parse_qs ...@@ -9,9 +9,11 @@ from urlparse import parse_qs
from django.conf import settings from django.conf import settings
from django.test import TestCase, LiveServerTestCase from django.test import TestCase, LiveServerTestCase
from django.test.utils import override_settings
# from django.contrib.auth.models import User # from django.contrib.auth.models import User
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.test.client import RequestFactory from django.test.client import RequestFactory
from unittest import skipUnless
class MyFetcher(HTTPFetcher): class MyFetcher(HTTPFetcher):
...@@ -59,21 +61,17 @@ class MyFetcher(HTTPFetcher): ...@@ -59,21 +61,17 @@ class MyFetcher(HTTPFetcher):
final_url=final_url, final_url=final_url,
headers=response_headers, headers=response_headers,
status=status, status=status,
) )
class OpenIdProviderTest(TestCase): class OpenIdProviderTest(TestCase):
"""
Tests of the OpenId login
"""
# def setUp(self): @skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
# username = 'viewtest' settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
# email = 'view@test.com' def test_begin_login_with_xrds_url(self):
# password = 'foo'
# user = User.objects.create_user(username, email, password)
def testBeginLoginWithXrdsUrl(self):
# skip the test if openid is not enabled (as in cms.envs.test):
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return
# the provider URL must be converted to an absolute URL in order to be # the provider URL must be converted to an absolute URL in order to be
# used as an openid provider. # used as an openid provider.
...@@ -99,10 +97,9 @@ class OpenIdProviderTest(TestCase): ...@@ -99,10 +97,9 @@ class OpenIdProviderTest(TestCase):
"got code {0} for url '{1}'. Expected code {2}" "got code {0} for url '{1}'. Expected code {2}"
.format(resp.status_code, url, code)) .format(resp.status_code, url, code))
def testBeginLoginWithLoginUrl(self): @skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
# skip the test if openid is not enabled (as in cms.envs.test): settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): def test_begin_login_with_login_url(self):
return
# the provider URL must be converted to an absolute URL in order to be # the provider URL must be converted to an absolute URL in order to be
# used as an openid provider. # used as an openid provider.
...@@ -150,49 +147,70 @@ class OpenIdProviderTest(TestCase): ...@@ -150,49 +147,70 @@ class OpenIdProviderTest(TestCase):
# <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" /> # <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" />
# <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" /> # <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" />
def attempt_login(self, expected_code, **kwargs):
def testOpenIdSetup(self): """ Attempt to log in through the open id provider login """
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return
url = reverse('openid-provider-login') url = reverse('openid-provider-login')
post_args = { post_args = {
"openid.mode": "checkid_setup", "openid.mode": "checkid_setup",
"openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H", "openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
"openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}", "openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select", "openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns": "http://specs.openid.net/auth/2.0", "openid.ns": "http://specs.openid.net/auth/2.0",
"openid.realm": "http://testserver/", "openid.realm": "http://testserver/",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select", "openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns.ax": "http://openid.net/srv/ax/1.0", "openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request", "openid.ax.mode": "fetch_request",
"openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname", "openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
"openid.ax.type.fullname": "http://axschema.org/namePerson", "openid.ax.type.fullname": "http://axschema.org/namePerson",
"openid.ax.type.lastname": "http://axschema.org/namePerson/last", "openid.ax.type.lastname": "http://axschema.org/namePerson/last",
"openid.ax.type.firstname": "http://axschema.org/namePerson/first", "openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.nickname": "http://axschema.org/namePerson/friendly", "openid.ax.type.nickname": "http://axschema.org/namePerson/friendly",
"openid.ax.type.email": "http://axschema.org/contact/email", "openid.ax.type.email": "http://axschema.org/contact/email",
"openid.ax.type.old_email": "http://schema.openid.net/contact/email", "openid.ax.type.old_email": "http://schema.openid.net/contact/email",
"openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly", "openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly",
"openid.ax.type.old_fullname": "http://schema.openid.net/namePerson", "openid.ax.type.old_fullname": "http://schema.openid.net/namePerson",
} }
# override the default args with any given arguments
for key in kwargs:
post_args["openid." + key] = kwargs[key]
resp = self.client.post(url, post_args) resp = self.client.post(url, post_args)
code = 200 code = expected_code
self.assertEqual(resp.status_code, code, self.assertEqual(resp.status_code, code,
"got code {0} for url '{1}'. Expected code {2}" "got code {0} for url '{1}'. Expected code {2}"
.format(resp.status_code, url, code)) .format(resp.status_code, url, code))
@skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
def test_open_id_setup(self):
""" Attempt a standard successful login """
self.attempt_login(200)
# In order for this absolute URL to work (i.e. to get xrds, then authentication) @skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
# in the test environment, we either need a live server that works with the default settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
# fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher. def test_invalid_namespace(self):
# Here we do the former. """ Test for 403 error code when the namespace of the request is invalid"""
class OpenIdProviderLiveServerTest(LiveServerTestCase): self.attempt_login(403, ns="http%3A%2F%2Fspecs.openid.net%2Fauth%2F2.0")
def testBeginLogin(self): @override_settings(OPENID_PROVIDER_TRUSTED_ROOTS=['http://apps.cs50.edx.org'])
# skip the test if openid is not enabled (as in cms.envs.test): @skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
return def test_invalid_return_url(self):
""" Test for 403 error code when the url"""
self.attempt_login(403, return_to="http://apps.cs50.edx.or")
class OpenIdProviderLiveServerTest(LiveServerTestCase):
"""
In order for this absolute URL to work (i.e. to get xrds, then authentication)
in the test environment, we either need a live server that works with the default
fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
Here we do the former.
"""
@skipUnless(settings.MITX_FEATURES.get('AUTH_USE_OPENID') or
settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'), True)
def test_begin_login(self):
# the provider URL must be converted to an absolute URL in order to be # the provider URL must be converted to an absolute URL in order to be
# used as an openid provider. # used as an openid provider.
provider_url = reverse('openid-provider-xrds') provider_url = reverse('openid-provider-xrds')
......
...@@ -36,7 +36,7 @@ import django_openid_auth.views as openid_views ...@@ -36,7 +36,7 @@ import django_openid_auth.views as openid_views
from django_openid_auth import auth as openid_auth from django_openid_auth import auth as openid_auth
from openid.consumer.consumer import SUCCESS from openid.consumer.consumer import SUCCESS
from openid.server.server import Server from openid.server.server import Server, ProtocolError, UntrustedReturnURL
from openid.server.trustroot import TrustRoot from openid.server.trustroot import TrustRoot
from openid.extensions import ax, sreg from openid.extensions import ax, sreg
...@@ -102,7 +102,7 @@ def openid_login_complete(request, ...@@ -102,7 +102,7 @@ def openid_login_complete(request,
oid_backend = openid_auth.OpenIDBackend() oid_backend = openid_auth.OpenIDBackend()
details = oid_backend._extract_user_details(openid_response) details = oid_backend._extract_user_details(openid_response)
log.debug('openid success, details=%s' % details) log.debug('openid success, details=%s', details)
url = getattr(settings, 'OPENID_SSO_SERVER_URL', None) url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
external_domain = "openid:%s" % url external_domain = "openid:%s" % url
...@@ -132,7 +132,7 @@ def external_login_or_signup(request, ...@@ -132,7 +132,7 @@ def external_login_or_signup(request,
try: try:
eamap = ExternalAuthMap.objects.get(external_id=external_id, eamap = ExternalAuthMap.objects.get(external_id=external_id,
external_domain=external_domain) external_domain=external_domain)
log.debug('Found eamap=%s' % eamap) log.debug('Found eamap=%s', eamap)
except ExternalAuthMap.DoesNotExist: except ExternalAuthMap.DoesNotExist:
# go render form for creating edX user # go render form for creating edX user
eamap = ExternalAuthMap(external_id=external_id, eamap = ExternalAuthMap(external_id=external_id,
...@@ -141,11 +141,11 @@ def external_login_or_signup(request, ...@@ -141,11 +141,11 @@ def external_login_or_signup(request,
eamap.external_email = email eamap.external_email = email
eamap.external_name = fullname eamap.external_name = fullname
eamap.internal_password = generate_password() eamap.internal_password = generate_password()
log.debug('Created eamap=%s' % eamap) log.debug('Created eamap=%s', eamap)
eamap.save() eamap.save()
log.info("External_Auth login_or_signup for %s : %s : %s : %s" % (external_domain, external_id, email, fullname)) log.info(u"External_Auth login_or_signup for %s : %s : %s : %s", external_domain, external_id, email, fullname)
internal_user = eamap.user internal_user = eamap.user
if internal_user is None: if internal_user is None:
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'): if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
...@@ -157,7 +157,7 @@ def external_login_or_signup(request, ...@@ -157,7 +157,7 @@ def external_login_or_signup(request,
eamap.user = link_user eamap.user = link_user
eamap.save() eamap.save()
internal_user = link_user internal_user = link_user
log.info('SHIB: Linking existing account for %s' % eamap.external_email) log.info('SHIB: Linking existing account for %s', eamap.external_email)
# now pass through to log in # now pass through to log in
else: else:
# otherwise, there must have been an error, b/c we've already linked a user with these external # otherwise, there must have been an error, b/c we've already linked a user with these external
...@@ -168,10 +168,10 @@ def external_login_or_signup(request, ...@@ -168,10 +168,10 @@ def external_login_or_signup(request,
% getattr(settings, 'TECH_SUPPORT_EMAIL', 'techsupport@class.stanford.edu'))) % getattr(settings, 'TECH_SUPPORT_EMAIL', 'techsupport@class.stanford.edu')))
return default_render_failure(request, failure_msg) return default_render_failure(request, failure_msg)
except User.DoesNotExist: except User.DoesNotExist:
log.info('SHIB: No user for %s yet, doing signup' % eamap.external_email) log.info('SHIB: No user for %s yet, doing signup', eamap.external_email)
return signup(request, eamap) return signup(request, eamap)
else: else:
log.info('No user for %s yet, doing signup' % eamap.external_email) log.info('No user for %s yet. doing signup', eamap.external_email)
return signup(request, eamap) return signup(request, eamap)
# We trust shib's authentication, so no need to authenticate using the password again # We trust shib's authentication, so no need to authenticate using the password again
...@@ -183,17 +183,17 @@ def external_login_or_signup(request, ...@@ -183,17 +183,17 @@ def external_login_or_signup(request,
else: else:
auth_backend = 'django.contrib.auth.backends.ModelBackend' auth_backend = 'django.contrib.auth.backends.ModelBackend'
user.backend = auth_backend user.backend = auth_backend
log.info('SHIB: Logging in linked user %s' % user.email) log.info('SHIB: Logging in linked user %s', user.email)
else: else:
uname = internal_user.username uname = internal_user.username
user = authenticate(username=uname, password=eamap.internal_password) user = authenticate(username=uname, password=eamap.internal_password)
if user is None: if user is None:
log.warning("External Auth Login failed for %s / %s" % log.warning("External Auth Login failed for %s / %s",
(uname, eamap.internal_password)) uname, eamap.internal_password)
return signup(request, eamap) return signup(request, eamap)
if not user.is_active: if not user.is_active:
log.warning("User %s is not active" % (uname)) log.warning("User %s is not active", uname)
# TODO: improve error page # TODO: improve error page
msg = 'Account not yet activated: please look for link in your email' msg = 'Account not yet activated: please look for link in your email'
return default_render_failure(request, msg) return default_render_failure(request, msg)
...@@ -208,7 +208,7 @@ def external_login_or_signup(request, ...@@ -208,7 +208,7 @@ def external_login_or_signup(request,
student_views.try_change_enrollment(enroll_request) student_views.try_change_enrollment(enroll_request)
else: else:
student_views.try_change_enrollment(request) student_views.try_change_enrollment(request)
log.info("Login success - {0} ({1})".format(user.username, user.email)) log.info("Login success - %s (%s)", user.username, user.email)
if retfun is None: if retfun is None:
return redirect('/') return redirect('/')
return retfun() return retfun()
...@@ -261,7 +261,7 @@ def signup(request, eamap=None): ...@@ -261,7 +261,7 @@ def signup(request, eamap=None):
except ValidationError: except ValidationError:
context['ask_for_email'] = True context['ask_for_email'] = True
log.info('EXTAUTH: Doing signup for %s' % eamap.external_id) log.info('EXTAUTH: Doing signup for %s', eamap.external_id)
return student_views.register_user(request, extra_context=context) return student_views.register_user(request, extra_context=context)
...@@ -405,7 +405,7 @@ def shib_login(request): ...@@ -405,7 +405,7 @@ def shib_login(request):
shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8') shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8')
shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8') shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8')
log.info("SHIB creds returned: %r" % shib) log.info("SHIB creds returned: %r", shib)
return external_login_or_signup(request, return external_login_or_signup(request,
external_id=shib['REMOTE_USER'], external_id=shib['REMOTE_USER'],
...@@ -640,7 +640,10 @@ def provider_login(request): ...@@ -640,7 +640,10 @@ def provider_login(request):
error = False error = False
if 'openid.mode' in request.GET or 'openid.mode' in request.POST: if 'openid.mode' in request.GET or 'openid.mode' in request.POST:
# decode request # decode request
openid_request = server.decodeRequest(querydict) try:
openid_request = server.decodeRequest(querydict)
except (UntrustedReturnURL, ProtocolError):
openid_request = None
if not openid_request: if not openid_request:
return default_render_failure(request, "Invalid OpenID request") return default_render_failure(request, "Invalid OpenID request")
...@@ -697,8 +700,8 @@ def provider_login(request): ...@@ -697,8 +700,8 @@ def provider_login(request):
user = User.objects.get(email=email) user = User.objects.get(email=email)
except User.DoesNotExist: except User.DoesNotExist:
request.session['openid_error'] = True request.session['openid_error'] = True
msg = "OpenID login failed - Unknown user email: {0}".format(email) msg = "OpenID login failed - Unknown user email: %s"
log.warning(msg) log.warning(msg, email)
return HttpResponseRedirect(openid_request_url) return HttpResponseRedirect(openid_request_url)
# attempt to authenticate user (but not actually log them in...) # attempt to authenticate user (but not actually log them in...)
...@@ -708,9 +711,8 @@ def provider_login(request): ...@@ -708,9 +711,8 @@ def provider_login(request):
user = authenticate(username=username, password=password) user = authenticate(username=username, password=password)
if user is None: if user is None:
request.session['openid_error'] = True request.session['openid_error'] = True
msg = "OpenID login failed - password for {0} is invalid" msg = "OpenID login failed - password for %s is invalid"
msg = msg.format(email) log.warning(msg, email)
log.warning(msg)
return HttpResponseRedirect(openid_request_url) return HttpResponseRedirect(openid_request_url)
# authentication succeeded, so fetch user information # authentication succeeded, so fetch user information
...@@ -720,10 +722,8 @@ def provider_login(request): ...@@ -720,10 +722,8 @@ def provider_login(request):
if 'openid_error' in request.session: if 'openid_error' in request.session:
del request.session['openid_error'] del request.session['openid_error']
# fullname field comes from user profile log.info("OpenID login success - %s (%s)",
profile = UserProfile.objects.get(user=user) user.username, user.email)
log.info("OpenID login success - {0} ({1})".format(user.username,
user.email))
# redirect user to return_to location # redirect user to return_to location
url = endpoint + urlquote(user.username) url = endpoint + urlquote(user.username)
...@@ -753,8 +753,8 @@ def provider_login(request): ...@@ -753,8 +753,8 @@ def provider_login(request):
# the account is not active, so redirect back to the login page: # the account is not active, so redirect back to the login page:
request.session['openid_error'] = True request.session['openid_error'] = True
msg = "Login failed - Account not active for user {0}".format(username) msg = "Login failed - Account not active for user %s"
log.warning(msg) log.warning(msg, username)
return HttpResponseRedirect(openid_request_url) return HttpResponseRedirect(openid_request_url)
# determine consumer domain if applicable # determine consumer domain if applicable
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment