Commit c6fa4873 by Diana Huang

Handle issues decoding requests more gracefully

Clean up some pep8/pylint violations as well
parent 894d44a0
...@@ -59,18 +59,15 @@ class MyFetcher(HTTPFetcher): ...@@ -59,18 +59,15 @@ class MyFetcher(HTTPFetcher):
final_url=final_url, final_url=final_url,
headers=response_headers, headers=response_headers,
status=status, status=status,
) )
class OpenIdProviderTest(TestCase): class OpenIdProviderTest(TestCase):
"""
Tests of the OpenId login
"""
# def setUp(self): def test_begin_login_with_xrds_url(self):
# username = 'viewtest'
# email = 'view@test.com'
# password = 'foo'
# user = User.objects.create_user(username, email, password)
def testBeginLoginWithXrdsUrl(self):
# skip the test if openid is not enabled (as in cms.envs.test): # skip the test if openid is not enabled (as in cms.envs.test):
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return return
...@@ -99,7 +96,7 @@ class OpenIdProviderTest(TestCase): ...@@ -99,7 +96,7 @@ class OpenIdProviderTest(TestCase):
"got code {0} for url '{1}'. Expected code {2}" "got code {0} for url '{1}'. Expected code {2}"
.format(resp.status_code, url, code)) .format(resp.status_code, url, code))
def testBeginLoginWithLoginUrl(self): def test_begin_login_with_login_url(self):
# skip the test if openid is not enabled (as in cms.envs.test): # skip the test if openid is not enabled (as in cms.envs.test):
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return return
...@@ -150,45 +147,77 @@ class OpenIdProviderTest(TestCase): ...@@ -150,45 +147,77 @@ class OpenIdProviderTest(TestCase):
# <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" /> # <input name="openid.return_to" type="hidden" value="http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H" />
# <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" /> # <input name="openid.assoc_handle" type="hidden" value="{HMAC-SHA1}{50ff8120}{rh87+Q==}" />
def test_open_id_setup(self):
def testOpenIdSetup(self):
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): if not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return return
url = reverse('openid-provider-login') url = reverse('openid-provider-login')
post_args = { post_args = {
"openid.mode": "checkid_setup", "openid.mode": "checkid_setup",
"openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H", "openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
"openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}", "openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select", "openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns": "http://specs.openid.net/auth/2.0", "openid.ns": "http://specs.openid.net/auth/2.0",
"openid.realm": "http://testserver/", "openid.realm": "http://testserver/",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select", "openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns.ax": "http://openid.net/srv/ax/1.0", "openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request", "openid.ax.mode": "fetch_request",
"openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname", "openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
"openid.ax.type.fullname": "http://axschema.org/namePerson", "openid.ax.type.fullname": "http://axschema.org/namePerson",
"openid.ax.type.lastname": "http://axschema.org/namePerson/last", "openid.ax.type.lastname": "http://axschema.org/namePerson/last",
"openid.ax.type.firstname": "http://axschema.org/namePerson/first", "openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.nickname": "http://axschema.org/namePerson/friendly", "openid.ax.type.nickname": "http://axschema.org/namePerson/friendly",
"openid.ax.type.email": "http://axschema.org/contact/email", "openid.ax.type.email": "http://axschema.org/contact/email",
"openid.ax.type.old_email": "http://schema.openid.net/contact/email", "openid.ax.type.old_email": "http://schema.openid.net/contact/email",
"openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly", "openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly",
"openid.ax.type.old_fullname": "http://schema.openid.net/namePerson", "openid.ax.type.old_fullname": "http://schema.openid.net/namePerson",
} }
resp = self.client.post(url, post_args) resp = self.client.post(url, post_args)
code = 200 code = 200
self.assertEqual(resp.status_code, code, self.assertEqual(resp.status_code, code,
"got code {0} for url '{1}'. Expected code {2}" "got code {0} for url '{1}'. Expected code {2}"
.format(resp.status_code, url, code)) .format(resp.status_code, url, code))
def test_invalid_namespace(self):
""" Test for 403 error code when the namespace of the request is invalid"""
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return
url = reverse('openid-provider-login')
post_args = {
"openid.mode": "checkid_setup",
"openid.return_to": "http://testserver/openid/complete/?janrain_nonce=2013-01-23T06%3A20%3A17ZaN7j6H",
"openid.assoc_handle": "{HMAC-SHA1}{50ff8120}{rh87+Q==}",
"openid.claimed_id": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns": "http%3A%2F%2Fspecs.openid.net%2Fauth%2F2.0",
"openid.realm": "http://testserver/",
"openid.identity": "http://specs.openid.net/auth/2.0/identifier_select",
"openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request",
"openid.ax.required": "email,fullname,old_email,firstname,old_nickname,lastname,old_fullname,nickname",
"openid.ax.type.fullname": "http://axschema.org/namePerson",
"openid.ax.type.lastname": "http://axschema.org/namePerson/last",
"openid.ax.type.firstname": "http://axschema.org/namePerson/first",
"openid.ax.type.nickname": "http://axschema.org/namePerson/friendly",
"openid.ax.type.email": "http://axschema.org/contact/email",
"openid.ax.type.old_email": "http://schema.openid.net/contact/email",
"openid.ax.type.old_nickname": "http://schema.openid.net/namePerson/friendly",
"openid.ax.type.old_fullname": "http://schema.openid.net/namePerson",
}
resp = self.client.post(url, post_args)
code = 403
self.assertEqual(resp.status_code, code,
"got code {0} for url '{1}'. Expected code {2}"
.format(resp.status_code, url, code))
# In order for this absolute URL to work (i.e. to get xrds, then authentication)
# in the test environment, we either need a live server that works with the default
# fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
# Here we do the former.
class OpenIdProviderLiveServerTest(LiveServerTestCase): class OpenIdProviderLiveServerTest(LiveServerTestCase):
"""
In order for this absolute URL to work (i.e. to get xrds, then authentication)
in the test environment, we either need a live server that works with the default
fetcher (i.e. urlopen2), or a test server that is reached through a custom fetcher.
Here we do the former.
"""
def testBeginLogin(self): def test_begin_login(self):
# skip the test if openid is not enabled (as in cms.envs.test): # skip the test if openid is not enabled (as in cms.envs.test):
if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'): if not settings.MITX_FEATURES.get('AUTH_USE_OPENID') or not settings.MITX_FEATURES.get('AUTH_USE_OPENID_PROVIDER'):
return return
......
...@@ -36,7 +36,7 @@ import django_openid_auth.views as openid_views ...@@ -36,7 +36,7 @@ import django_openid_auth.views as openid_views
from django_openid_auth import auth as openid_auth from django_openid_auth import auth as openid_auth
from openid.consumer.consumer import SUCCESS from openid.consumer.consumer import SUCCESS
from openid.server.server import Server from openid.server.server import Server, ProtocolError
from openid.server.trustroot import TrustRoot from openid.server.trustroot import TrustRoot
from openid.extensions import ax, sreg from openid.extensions import ax, sreg
...@@ -102,7 +102,7 @@ def openid_login_complete(request, ...@@ -102,7 +102,7 @@ def openid_login_complete(request,
oid_backend = openid_auth.OpenIDBackend() oid_backend = openid_auth.OpenIDBackend()
details = oid_backend._extract_user_details(openid_response) details = oid_backend._extract_user_details(openid_response)
log.debug('openid success, details=%s' % details) log.debug('openid success, details={0}'.format(details))
url = getattr(settings, 'OPENID_SSO_SERVER_URL', None) url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
external_domain = "openid:%s" % url external_domain = "openid:%s" % url
...@@ -132,7 +132,7 @@ def external_login_or_signup(request, ...@@ -132,7 +132,7 @@ def external_login_or_signup(request,
try: try:
eamap = ExternalAuthMap.objects.get(external_id=external_id, eamap = ExternalAuthMap.objects.get(external_id=external_id,
external_domain=external_domain) external_domain=external_domain)
log.debug('Found eamap=%s' % eamap) log.debug('Found eamap={0}'.format(eamap))
except ExternalAuthMap.DoesNotExist: except ExternalAuthMap.DoesNotExist:
# go render form for creating edX user # go render form for creating edX user
eamap = ExternalAuthMap(external_id=external_id, eamap = ExternalAuthMap(external_id=external_id,
...@@ -141,11 +141,11 @@ def external_login_or_signup(request, ...@@ -141,11 +141,11 @@ def external_login_or_signup(request,
eamap.external_email = email eamap.external_email = email
eamap.external_name = fullname eamap.external_name = fullname
eamap.internal_password = generate_password() eamap.internal_password = generate_password()
log.debug('Created eamap=%s' % eamap) log.debug('Created eamap={0}'.format(eamap))
eamap.save() eamap.save()
log.info("External_Auth login_or_signup for %s : %s : %s : %s" % (external_domain, external_id, email, fullname)) log.info(u"External_Auth login_or_signup for {0} : {1} : {2} : {3}".format(external_domain, external_id, email, fullname))
internal_user = eamap.user internal_user = eamap.user
if internal_user is None: if internal_user is None:
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'): if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
...@@ -157,7 +157,7 @@ def external_login_or_signup(request, ...@@ -157,7 +157,7 @@ def external_login_or_signup(request,
eamap.user = link_user eamap.user = link_user
eamap.save() eamap.save()
internal_user = link_user internal_user = link_user
log.info('SHIB: Linking existing account for %s' % eamap.external_email) log.info('SHIB: Linking existing account for {0}'.format(eamap.external_email))
# now pass through to log in # now pass through to log in
else: else:
# otherwise, there must have been an error, b/c we've already linked a user with these external # otherwise, there must have been an error, b/c we've already linked a user with these external
...@@ -168,10 +168,10 @@ def external_login_or_signup(request, ...@@ -168,10 +168,10 @@ def external_login_or_signup(request,
% getattr(settings, 'TECH_SUPPORT_EMAIL', 'techsupport@class.stanford.edu'))) % getattr(settings, 'TECH_SUPPORT_EMAIL', 'techsupport@class.stanford.edu')))
return default_render_failure(request, failure_msg) return default_render_failure(request, failure_msg)
except User.DoesNotExist: except User.DoesNotExist:
log.info('SHIB: No user for %s yet, doing signup' % eamap.external_email) log.info('SHIB: No user for {0} yet, doing signup'.format(eamap.external_email))
return signup(request, eamap) return signup(request, eamap)
else: else:
log.info('No user for %s yet, doing signup' % eamap.external_email) log.info('No user for {0} yet.formatdoing signup'.format(eamap.external_email))
return signup(request, eamap) return signup(request, eamap)
# We trust shib's authentication, so no need to authenticate using the password again # We trust shib's authentication, so no need to authenticate using the password again
...@@ -183,17 +183,17 @@ def external_login_or_signup(request, ...@@ -183,17 +183,17 @@ def external_login_or_signup(request,
else: else:
auth_backend = 'django.contrib.auth.backends.ModelBackend' auth_backend = 'django.contrib.auth.backends.ModelBackend'
user.backend = auth_backend user.backend = auth_backend
log.info('SHIB: Logging in linked user %s' % user.email) log.info('SHIB: Logging in linked user {0}'.format(user.email))
else: else:
uname = internal_user.username uname = internal_user.username
user = authenticate(username=uname, password=eamap.internal_password) user = authenticate(username=uname, password=eamap.internal_password)
if user is None: if user is None:
log.warning("External Auth Login failed for %s / %s" % log.warning("External Auth Login failed for {0} / {1}".format(
(uname, eamap.internal_password)) uname, eamap.internal_password))
return signup(request, eamap) return signup(request, eamap)
if not user.is_active: if not user.is_active:
log.warning("User %s is not active" % (uname)) log.warning("User {0} is not active".format(uname))
# TODO: improve error page # TODO: improve error page
msg = 'Account not yet activated: please look for link in your email' msg = 'Account not yet activated: please look for link in your email'
return default_render_failure(request, msg) return default_render_failure(request, msg)
...@@ -261,7 +261,7 @@ def signup(request, eamap=None): ...@@ -261,7 +261,7 @@ def signup(request, eamap=None):
except ValidationError: except ValidationError:
context['ask_for_email'] = True context['ask_for_email'] = True
log.info('EXTAUTH: Doing signup for %s' % eamap.external_id) log.info('EXTAUTH: Doing signup for {0}'.format(eamap.external_id))
return student_views.register_user(request, extra_context=context) return student_views.register_user(request, extra_context=context)
...@@ -405,7 +405,7 @@ def shib_login(request): ...@@ -405,7 +405,7 @@ def shib_login(request):
shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8') shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8')
shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8') shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8')
log.info("SHIB creds returned: %r" % shib) log.info("SHIB creds returned: {0}".format(shib))
return external_login_or_signup(request, return external_login_or_signup(request,
external_id=shib['REMOTE_USER'], external_id=shib['REMOTE_USER'],
...@@ -640,7 +640,10 @@ def provider_login(request): ...@@ -640,7 +640,10 @@ def provider_login(request):
error = False error = False
if 'openid.mode' in request.GET or 'openid.mode' in request.POST: if 'openid.mode' in request.GET or 'openid.mode' in request.POST:
# decode request # decode request
openid_request = server.decodeRequest(querydict) try:
openid_request = server.decodeRequest(querydict)
except ProtocolError:
return default_render_failure(request, "Invalid OpenID request")
if not openid_request: if not openid_request:
return default_render_failure(request, "Invalid OpenID request") return default_render_failure(request, "Invalid OpenID request")
...@@ -720,8 +723,6 @@ def provider_login(request): ...@@ -720,8 +723,6 @@ def provider_login(request):
if 'openid_error' in request.session: if 'openid_error' in request.session:
del request.session['openid_error'] del request.session['openid_error']
# fullname field comes from user profile
profile = UserProfile.objects.get(user=user)
log.info("OpenID login success - {0} ({1})".format(user.username, log.info("OpenID login success - {0} ({1})".format(user.username,
user.email)) user.email))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment