Commit a628b62d by brianhw

Merge pull request #511 from edx/feature/brian/audit-log

Use audit logger for logging of logins in external_auth...
parents 3d87dd8c 635d36fc
......@@ -5,6 +5,8 @@ These are notable changes in edx-platform. This is a rolling list of changes,
in roughly chronological order, most recent first. Add your entries at or near
the top. Include a label indicating the component affected.
Common: Add additional logging to cover login attempts and logouts.
Studio: Send e-mails to new Studio users (on edge only) when their course creator
status has changed. This will not be in use until the course creator table
is enabled.
......
......@@ -3,6 +3,7 @@ Tests for Shibboleth Authentication
@jbau
"""
import unittest
from mock import patch
from django.conf import settings
from django.http import HttpResponseRedirect
......@@ -10,7 +11,6 @@ from django.test.client import RequestFactory, Client as DjangoTestClient
from django.test.utils import override_settings
from django.core.urlresolvers import reverse
from django.contrib.auth.models import AnonymousUser, User
from django.contrib.sessions.backends.base import SessionBase
from django.utils.importlib import import_module
from xmodule.modulestore.tests.factories import CourseFactory
......@@ -27,11 +27,11 @@ from student.views import create_account, change_enrollment
from student.models import UserProfile, Registration, CourseEnrollment
from student.tests.factories import UserFactory
#Shib is supposed to provide 'REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider'
#attributes via request.META. We can count on 'Shib-Identity-Provider', and 'REMOTE_USER' being present
#b/c of how mod_shib works but should test the behavior with the rest of the attributes present/missing
# Shib is supposed to provide 'REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider'
# attributes via request.META. We can count on 'Shib-Identity-Provider', and 'REMOTE_USER' being present
# b/c of how mod_shib works but should test the behavior with the rest of the attributes present/missing
#For the sake of python convention we'll make all of these variable names ALL_CAPS
# For the sake of python convention we'll make all of these variable names ALL_CAPS
IDP = 'https://idp.stanford.edu/'
REMOTE_USER = 'test_user@stanford.edu'
MAILS = [None, '', 'test_user@stanford.edu']
......@@ -93,6 +93,13 @@ class ShibSPTest(ModuleStoreTestCase):
self.assertEqual(no_idp_response.status_code, 403)
self.assertIn("identity server did not return your ID information", no_idp_response.content)
def _assert_shib_login_is_logged(self, audit_log_call, remote_user):
"""Asserts that shibboleth login attempt is being logged"""
method_name, args, _kwargs = audit_log_call
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 2)
self.assertIn(u'logged in via Shibboleth', args[0])
self.assertEquals(remote_user, args[1])
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
def test_shib_login(self):
......@@ -140,26 +147,57 @@ class ShibSPTest(ModuleStoreTestCase):
'REMOTE_USER': remote_user,
'mail': remote_user})
request.user = AnonymousUser()
response = shib_login(request)
with patch('external_auth.views.AUDIT_LOG') as mock_audit_log:
response = shib_login(request)
audit_log_calls = mock_audit_log.method_calls
if idp == "https://idp.stanford.edu/" and remote_user == 'withmap@stanford.edu':
self.assertIsInstance(response, HttpResponseRedirect)
self.assertEqual(request.user, user_w_map)
self.assertEqual(response['Location'], '/')
# verify logging:
self.assertEquals(len(audit_log_calls), 2)
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 3)
self.assertIn(u'Login success', args[0])
self.assertEquals(remote_user, args[2])
elif idp == "https://idp.stanford.edu/" and remote_user == 'inactive@stanford.edu':
self.assertEqual(response.status_code, 403)
self.assertIn("Account not yet activated: please look for link in your email", response.content)
# verify logging:
self.assertEquals(len(audit_log_calls), 2)
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'warning')
self.assertEquals(len(args), 2)
self.assertIn(u'is not active after external login', args[0])
# self.assertEquals(remote_user, args[1])
elif idp == "https://idp.stanford.edu/" and remote_user == 'womap@stanford.edu':
self.assertIsNotNone(ExternalAuthMap.objects.get(user=user_wo_map))
self.assertIsInstance(response, HttpResponseRedirect)
self.assertEqual(request.user, user_wo_map)
self.assertEqual(response['Location'], '/')
# verify logging:
self.assertEquals(len(audit_log_calls), 2)
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 3)
self.assertIn(u'Login success', args[0])
self.assertEquals(remote_user, args[2])
elif idp == "https://someother.idp.com/" and remote_user in \
['withmap@stanford.edu', 'womap@stanford.edu', 'inactive@stanford.edu']:
self.assertEqual(response.status_code, 403)
self.assertIn("You have already created an account using an external login", response.content)
# no audit logging calls
self.assertEquals(len(audit_log_calls), 0)
else:
self.assertEqual(response.status_code, 200)
self.assertContains(response, "<title>Register for")
# no audit logging calls
self.assertEquals(len(audit_log_calls), 0)
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
def test_registration_form(self):
......@@ -187,7 +225,7 @@ class ShibSPTest(ModuleStoreTestCase):
else:
self.assertNotContains(response, fullname_input_HTML)
#clean up b/c we don't want existing ExternalAuthMap for the next run
# clean up b/c we don't want existing ExternalAuthMap for the next run
client.session['ExternalAuthMap'].delete()
@unittest.skipUnless(settings.MITX_FEATURES.get('AUTH_USE_SHIB'), True)
......@@ -200,25 +238,47 @@ class ShibSPTest(ModuleStoreTestCase):
Uses django test client for its session support
"""
for identity in gen_all_identities():
#First we pop the registration form
# First we pop the registration form
client = DjangoTestClient()
response1 = client.get(path='/shib-login/', data={}, follow=False, **identity)
#Then we have the user answer the registration form
# Then we have the user answer the registration form
postvars = {'email': 'post_email@stanford.edu',
'username': 'post_username',
'password': 'post_password',
'name': 'post_name',
'terms_of_service': 'true',
'honor_code': 'true'}
#use RequestFactory instead of TestClient here because we want access to request.user
# use RequestFactory instead of TestClient here because we want access to request.user
request2 = self.request_factory.post('/create_account', data=postvars)
request2.session = client.session
request2.user = AnonymousUser()
response2 = create_account(request2)
with patch('student.views.AUDIT_LOG') as mock_audit_log:
_response2 = create_account(request2)
user = request2.user
mail = identity.get('mail')
#check that the created user has the right email, either taken from shib or user input
# verify logging of login happening during account creation:
audit_log_calls = mock_audit_log.method_calls
self.assertEquals(len(audit_log_calls), 3)
method_name, args, _kwargs = audit_log_calls[0]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 1)
self.assertIn(u'Login success on new account creation', args[0])
self.assertIn(u'post_username', args[0])
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 2)
self.assertIn(u'User registered with external_auth', args[0])
self.assertEquals(u'post_username', args[1])
method_name, args, _kwargs = audit_log_calls[2]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 3)
self.assertIn(u'Updated ExternalAuthMap for ', args[0])
self.assertEquals(u'post_username', args[1])
self.assertEquals(u'test_user@stanford.edu', args[2].external_id)
# check that the created user has the right email, either taken from shib or user input
if mail:
self.assertEqual(user.email, mail)
self.assertEqual(list(User.objects.filter(email=postvars['email'])), [])
......@@ -228,7 +288,7 @@ class ShibSPTest(ModuleStoreTestCase):
self.assertEqual(list(User.objects.filter(email=mail)), [])
self.assertIsNotNone(User.objects.get(email=postvars['email'])) # get enforces only 1 such user
#check that the created user profile has the right name, either taken from shib or user input
# check that the created user profile has the right name, either taken from shib or user input
profile = UserProfile.objects.get(user=user)
sn_empty = not identity.get('sn')
given_name_empty = not identity.get('givenName')
......@@ -236,7 +296,7 @@ class ShibSPTest(ModuleStoreTestCase):
self.assertEqual(profile.name, postvars['name'])
else:
self.assertEqual(profile.name, request2.session['ExternalAuthMap'].external_name)
#clean up for next loop
# clean up for next loop
request2.session['ExternalAuthMap'].delete()
UserProfile.objects.filter(user=user).delete()
Registration.objects.filter(user=user).delete()
......@@ -251,17 +311,17 @@ class ShibSPTest(ModuleStoreTestCase):
# Test for cases where course is found
for domain in ["", "shib:https://idp.stanford.edu/"]:
#set domains
# set domains
course.enrollment_domain = domain
metadata = own_metadata(course)
metadata['enrollment_domain'] = domain
self.store.update_metadata(course.location.url(), metadata)
#setting location to test that GET params get passed through
# setting location to test that GET params get passed through
login_request = self.request_factory.get('/course_specific_login/MITx/999/Robot_Super_Course' +
'?course_id=MITx/999/Robot_Super_Course' +
'&enrollment_action=enroll')
reg_request = self.request_factory.get('/course_specific_register/MITx/999/Robot_Super_Course' +
_reg_request = self.request_factory.get('/course_specific_register/MITx/999/Robot_Super_Course' +
'?course_id=MITx/999/course/Robot_Super_Course' +
'&enrollment_action=enroll')
......@@ -292,11 +352,11 @@ class ShibSPTest(ModuleStoreTestCase):
'&enrollment_action=enroll')
# Now test for non-existent course
#setting location to test that GET params get passed through
# setting location to test that GET params get passed through
login_request = self.request_factory.get('/course_specific_login/DNE/DNE/DNE' +
'?course_id=DNE/DNE/DNE' +
'&enrollment_action=enroll')
reg_request = self.request_factory.get('/course_specific_register/DNE/DNE/DNE' +
_reg_request = self.request_factory.get('/course_specific_register/DNE/DNE/DNE' +
'?course_id=DNE/DNE/DNE/Robot_Super_Course' +
'&enrollment_action=enroll')
......@@ -321,7 +381,7 @@ class ShibSPTest(ModuleStoreTestCase):
the proper external auth
"""
#create 2 course, one with limited enrollment one without
# create 2 course, one with limited enrollment one without
shib_course = CourseFactory.create(org='Stanford', number='123', display_name='Shib Only')
shib_course.enrollment_domain = 'shib:https://idp.stanford.edu/'
metadata = own_metadata(shib_course)
......@@ -360,7 +420,7 @@ class ShibSPTest(ModuleStoreTestCase):
int_student.email = "teststudent3@gmail.com"
int_student.save()
#Tests the two case for courses, limited and not
# Tests the two case for courses, limited and not
for course in [shib_course, open_enroll_course]:
for student in [shib_student, other_ext_student, int_student]:
request = self.request_factory.post('/change_enrollment')
......@@ -368,11 +428,11 @@ class ShibSPTest(ModuleStoreTestCase):
'course_id': course.id})
request.user = student
response = change_enrollment(request)
#if course is not limited or student has correct shib extauth then enrollment should be allowed
# If course is not limited or student has correct shib extauth then enrollment should be allowed
if course is open_enroll_course or student is shib_student:
self.assertEqual(response.status_code, 200)
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 1)
#clean up
# Clean up
CourseEnrollment.objects.filter(user=student, course_id=course.id).delete()
else:
self.assertEqual(response.status_code, 400)
......@@ -383,9 +443,6 @@ class ShibSPTest(ModuleStoreTestCase):
"""
A functionality test that a student with an existing shib login can auto-enroll in a class with GET params
"""
if not settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
return
student = UserFactory.create()
extauth = ExternalAuthMap(external_id='testuser@stanford.edu',
external_email='',
......@@ -403,8 +460,8 @@ class ShibSPTest(ModuleStoreTestCase):
metadata['enrollment_domain'] = course.enrollment_domain
self.store.update_metadata(course.location.url(), metadata)
#use django test client for sessions and url processing
#no enrollment before trying
# use django test client for sessions and url processing
# no enrollment before trying
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 0)
self.client.logout()
request_kwargs = {'path': '/shib-login/',
......@@ -413,8 +470,8 @@ class ShibSPTest(ModuleStoreTestCase):
'REMOTE_USER': 'testuser@stanford.edu',
'Shib-Identity-Provider': 'https://idp.stanford.edu/'}
response = self.client.get(**request_kwargs)
#successful login is a redirect to "/"
# successful login is a redirect to "/"
self.assertEqual(response.status_code, 302)
self.assertEqual(response['location'], 'http://testserver/')
#now there is enrollment
# now there is enrollment
self.assertEqual(CourseEnrollment.objects.filter(user=student, course_id=course.id).count(), 1)
......@@ -17,9 +17,9 @@ from django.core.urlresolvers import reverse
from django.core.validators import validate_email
from django.core.exceptions import ValidationError
from student.models import UserProfile, TestCenterUser, TestCenterRegistration
from student.models import TestCenterUser, TestCenterRegistration
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, HttpResponseForbidden
from django.utils.http import urlquote
from django.shortcuts import redirect
from django.utils.translation import ugettext as _
......@@ -50,7 +50,10 @@ from xmodule.modulestore import Location
from xmodule.modulestore.exceptions import ItemNotFoundError
log = logging.getLogger("mitx.external_auth")
AUDIT_LOG = logging.getLogger("audit")
SHIBBOLETH_DOMAIN_PREFIX = 'shib:'
OPENID_DOMAIN_PREFIX = 'openid:'
# -----------------------------------------------------------------------------
# OpenID Common
......@@ -81,7 +84,7 @@ def default_render_failure(request,
def generate_password(length=12, chars=string.letters + string.digits):
"""Generate internal password for externally authenticated user"""
choice = random.SystemRandom().choice
return ''.join([choice(chars) for i in range(length)])
return ''.join([choice(chars) for _i in range(length)])
@csrf_exempt
......@@ -105,27 +108,29 @@ def openid_login_complete(request,
log.debug('openid success, details=%s', details)
url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
external_domain = "openid:%s" % url
external_domain = "{0}{1}".format(OPENID_DOMAIN_PREFIX, url)
fullname = '%s %s' % (details.get('first_name', ''),
details.get('last_name', ''))
return external_login_or_signup(request,
external_id,
external_domain,
details,
details.get('email', ''),
fullname)
return _external_login_or_signup(
request,
external_id,
external_domain,
details,
details.get('email', ''),
fullname
)
return render_failure(request, 'Openid failure')
def external_login_or_signup(request,
external_id,
external_domain,
credentials,
email,
fullname,
retfun=None):
def _external_login_or_signup(request,
external_id,
external_domain,
credentials,
email,
fullname,
retfun=None):
"""Generic external auth login or signup"""
# see if we have a map from this external_id to an edX username
......@@ -142,13 +147,13 @@ def external_login_or_signup(request,
eamap.external_name = fullname
eamap.internal_password = generate_password()
log.debug('Created eamap=%s', eamap)
eamap.save()
log.info(u"External_Auth login_or_signup for %s : %s : %s : %s", external_domain, external_id, email, fullname)
uses_shibboleth = settings.MITX_FEATURES.get('AUTH_USE_SHIB') and external_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX)
internal_user = eamap.user
if internal_user is None:
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
if uses_shibboleth:
# if we are using shib, try to link accounts using email
try:
link_user = User.objects.get(email=eamap.external_email)
......@@ -169,14 +174,14 @@ def external_login_or_signup(request,
return default_render_failure(request, failure_msg)
except User.DoesNotExist:
log.info('SHIB: No user for %s yet, doing signup', eamap.external_email)
return signup(request, eamap)
return _signup(request, eamap)
else:
log.info('No user for %s yet. doing signup', eamap.external_email)
return signup(request, eamap)
return _signup(request, eamap)
# We trust shib's authentication, so no need to authenticate using the password again
if settings.MITX_FEATURES.get('AUTH_USE_SHIB'):
uname = internal_user.username
uname = internal_user.username
if uses_shibboleth:
user = internal_user
# Assuming this 'AUTHENTICATION_BACKENDS' is set in settings, which I think is safe
if settings.AUTHENTICATION_BACKENDS:
......@@ -184,32 +189,32 @@ def external_login_or_signup(request,
else:
auth_backend = 'django.contrib.auth.backends.ModelBackend'
user.backend = auth_backend
log.info('SHIB: Logging in linked user %s', user.email)
AUDIT_LOG.info('Linked user "%s" logged in via Shibboleth', user.email)
else:
uname = internal_user.username
user = authenticate(username=uname, password=eamap.internal_password)
if user is None:
log.warning("External Auth Login failed for %s / %s",
uname, eamap.internal_password)
return signup(request, eamap)
# we want to log the failure, but don't want to log the password attempted:
AUDIT_LOG.warning('External Auth Login failed for "%s"', uname)
return _signup(request, eamap)
if not user.is_active:
log.warning("User %s is not active", uname)
AUDIT_LOG.warning('User "%s" is not active after external login', uname)
# TODO: improve error page
msg = 'Account not yet activated: please look for link in your email'
return default_render_failure(request, msg)
login(request, user)
request.session.set_expiry(0)
# Now to try enrollment
# Need to special case Shibboleth here because it logs in via a GET.
# testing request.method for extra paranoia
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in external_domain and request.method == 'GET':
enroll_request = make_shib_enrollment_request(request)
if uses_shibboleth and request.method == 'GET':
enroll_request = _make_shib_enrollment_request(request)
student_views.try_change_enrollment(enroll_request)
else:
student_views.try_change_enrollment(request)
log.info("Login success - %s (%s)", user.username, user.email)
AUDIT_LOG.info("Login success - %s (%s)", user.username, user.email)
if retfun is None:
return redirect('/')
return retfun()
......@@ -217,20 +222,16 @@ def external_login_or_signup(request,
@ensure_csrf_cookie
@cache_if_anonymous
def signup(request, eamap=None):
def _signup(request, eamap):
"""
Present form to complete for signup via external authentication.
Even though the user has external credentials, he/she still needs
to create an account on the edX system, and fill in the user
registration form.
eamap is an ExteralAuthMap object, specifying the external user
eamap is an ExternalAuthMap object, specifying the external user
for which to complete the signup.
"""
if eamap is None:
pass
# save this for use by student.views.create_account
request.session['ExternalAuthMap'] = eamap
......@@ -248,8 +249,9 @@ def signup(request, eamap=None):
# Some openEdX instances can't have terms of service for shib users, like
# according to Stanford's Office of General Counsel
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and settings.MITX_FEATURES.get('SHIB_DISABLE_TOS') and \
('shib' in eamap.external_domain):
uses_shibboleth = (settings.MITX_FEATURES.get('AUTH_USE_SHIB') and
eamap.external_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX))
if uses_shibboleth and settings.MITX_FEATURES.get('SHIB_DISABLE_TOS'):
context['ask_for_tos'] = False
# detect if full name is blank and ask for it from user
......@@ -272,19 +274,19 @@ def signup(request, eamap=None):
# -----------------------------------------------------------------------------
def ssl_dn_extract_info(dn):
def _ssl_dn_extract_info(dn_string):
"""
Extract username, email address (may be anyuser@anydomain.com) and
full name from the SSL DN string. Return (user,email,fullname) if
successful, and None otherwise.
"""
ss = re.search('/emailAddress=(.*)@([^/]+)', dn)
ss = re.search('/emailAddress=(.*)@([^/]+)', dn_string)
if ss:
user = ss.group(1)
email = "%s@%s" % (user, ss.group(2))
else:
return None
ss = re.search('/CN=([^/]+)/', dn)
ss = re.search('/CN=([^/]+)/', dn_string)
if ss:
fullname = ss.group(1)
else:
......@@ -292,7 +294,7 @@ def ssl_dn_extract_info(dn):
return (user, email, fullname)
def ssl_get_cert_from_request(request):
def _ssl_get_cert_from_request(request):
"""
Extract user information from certificate, if it exists, returning (user, email, fullname).
Else return None.
......@@ -311,9 +313,6 @@ def ssl_get_cert_from_request(request):
return cert
(user, email, fullname) = ssl_dn_extract_info(cert)
return (user, email, fullname)
def ssl_login_shortcut(fn):
"""
......@@ -324,24 +323,26 @@ def ssl_login_shortcut(fn):
if not settings.MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES']:
return fn(*args, **kwargs)
request = args[0]
cert = ssl_get_cert_from_request(request)
cert = _ssl_get_cert_from_request(request)
if not cert: # no certificate information - show normal login window
return fn(*args, **kwargs)
(user, email, fullname) = ssl_dn_extract_info(cert)
return external_login_or_signup(request,
external_id=email,
external_domain="ssl:MIT",
credentials=cert,
email=email,
fullname=fullname)
(_user, email, fullname) = _ssl_dn_extract_info(cert)
return _external_login_or_signup(
request,
external_id=email,
external_domain="ssl:MIT",
credentials=cert,
email=email,
fullname=fullname
)
return wrapped
@csrf_exempt
def ssl_login(request):
"""
This is called by student.views.index when
This is called by branding.views.index when
MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES'] = True
Used for MIT user authentication. This presumes the web server
......@@ -355,22 +356,28 @@ def ssl_login(request):
Else continues on with student.views.index, and no authentication.
"""
cert = ssl_get_cert_from_request(request)
# Just to make sure we're calling this only at MIT:
if not settings.MITX_FEATURES['AUTH_USE_MIT_CERTIFICATES']:
return HttpResponseForbidden()
cert = _ssl_get_cert_from_request(request)
if not cert:
# no certificate information - go onward to main index
return student_views.index(request)
(user, email, fullname) = ssl_dn_extract_info(cert)
(_user, email, fullname) = _ssl_dn_extract_info(cert)
retfun = functools.partial(student_views.index, request)
return external_login_or_signup(request,
external_id=email,
external_domain="ssl:MIT",
credentials=cert,
email=email,
fullname=fullname,
retfun=retfun)
return _external_login_or_signup(
request,
external_id=email,
external_domain="ssl:MIT",
credentials=cert,
email=email,
fullname=fullname,
retfun=retfun
)
# -----------------------------------------------------------------------------
......@@ -396,28 +403,30 @@ def shib_login(request):
log.error("SHIB: no Shib-Identity-Provider in request.META")
return default_render_failure(request, shib_error_msg)
else:
#if we get here, the user has authenticated properly
# If we get here, the user has authenticated properly
shib = {attr: request.META.get(attr, '')
for attr in ['REMOTE_USER', 'givenName', 'sn', 'mail', 'Shib-Identity-Provider']}
#Clean up first name, last name, and email address
#TODO: Make this less hardcoded re: format, but split will work
#even if ";" is not present since we are accessing 1st element
# Clean up first name, last name, and email address
# TODO: Make this less hardcoded re: format, but split will work
# even if ";" is not present, since we are accessing 1st element
shib['sn'] = shib['sn'].split(";")[0].strip().capitalize().decode('utf-8')
shib['givenName'] = shib['givenName'].split(";")[0].strip().capitalize().decode('utf-8')
# TODO: should we be logging creds here, at info level?
log.info("SHIB creds returned: %r", shib)
return external_login_or_signup(request,
external_id=shib['REMOTE_USER'],
external_domain="shib:" + shib['Shib-Identity-Provider'],
credentials=shib,
email=shib['mail'],
fullname=u'%s %s' % (shib['givenName'], shib['sn']),
)
return _external_login_or_signup(
request,
external_id=shib['REMOTE_USER'],
external_domain=SHIBBOLETH_DOMAIN_PREFIX + shib['Shib-Identity-Provider'],
credentials=shib,
email=shib['mail'],
fullname=u'%s %s' % (shib['givenName'], shib['sn']),
)
def make_shib_enrollment_request(request):
def _make_shib_enrollment_request(request):
"""
Need this hack function because shibboleth logins don't happen over POST
but change_enrollment expects its request to be a POST, with
......@@ -452,14 +461,14 @@ def course_specific_login(request, course_id):
try:
course = course_from_id(course_id)
except ItemNotFoundError:
#couldn't find the course, will just return vanilla signin page
# couldn't find the course, will just return vanilla signin page
return redirect_with_querystring('signin_user', query_string)
#now the dispatching conditionals. Only shib for now
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in course.enrollment_domain:
# now the dispatching conditionals. Only shib for now
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and course.enrollment_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX):
return redirect_with_querystring('shib-login', query_string)
#Default fallthrough to normal signin page
# Default fallthrough to normal signin page
return redirect_with_querystring('signin_user', query_string)
......@@ -473,15 +482,15 @@ def course_specific_register(request, course_id):
try:
course = course_from_id(course_id)
except ItemNotFoundError:
#couldn't find the course, will just return vanilla registration page
# couldn't find the course, will just return vanilla registration page
return redirect_with_querystring('register_user', query_string)
#now the dispatching conditionals. Only shib for now
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and 'shib:' in course.enrollment_domain:
#shib-login takes care of both registration and login flows
# now the dispatching conditionals. Only shib for now
if settings.MITX_FEATURES.get('AUTH_USE_SHIB') and course.enrollment_domain.startswith(SHIBBOLETH_DOMAIN_PREFIX):
# shib-login takes care of both registration and login flows
return redirect_with_querystring('shib-login', query_string)
#Default fallthrough to normal registration page
# Default fallthrough to normal registration page
return redirect_with_querystring('register_user', query_string)
......@@ -702,7 +711,7 @@ def provider_login(request):
except User.DoesNotExist:
request.session['openid_error'] = True
msg = "OpenID login failed - Unknown user email: %s"
log.warning(msg, email)
AUDIT_LOG.warning(msg, email)
return HttpResponseRedirect(openid_request_url)
# attempt to authenticate user (but not actually log them in...)
......@@ -713,7 +722,7 @@ def provider_login(request):
if user is None:
request.session['openid_error'] = True
msg = "OpenID login failed - password for %s is invalid"
log.warning(msg, email)
AUDIT_LOG.warning(msg, email)
return HttpResponseRedirect(openid_request_url)
# authentication succeeded, so fetch user information
......@@ -723,8 +732,8 @@ def provider_login(request):
if 'openid_error' in request.session:
del request.session['openid_error']
log.info("OpenID login success - %s (%s)",
user.username, user.email)
AUDIT_LOG.info("OpenID login success - %s (%s)",
user.username, user.email)
# redirect user to return_to location
url = endpoint + urlquote(user.username)
......@@ -755,7 +764,7 @@ def provider_login(request):
# the account is not active, so redirect back to the login page:
request.session['openid_error'] = True
msg = "Login failed - Account not active for user %s"
log.warning(msg, username)
AUDIT_LOG.warning(msg, username)
return HttpResponseRedirect(openid_request_url)
# determine consumer domain if applicable
......@@ -856,9 +865,11 @@ def test_center_login(request):
try:
testcenteruser = TestCenterUser.objects.get(client_candidate_id=client_candidate_id)
except TestCenterUser.DoesNotExist:
log.error("not able to find demographics for cand ID {}".format(client_candidate_id))
AUDIT_LOG.error("not able to find demographics for cand ID {}".format(client_candidate_id))
return HttpResponseRedirect(makeErrorURL(error_url, "invalidClientCandidateID"))
AUDIT_LOG.info("Attempting to log in test-center user '{}' for test of cand {}".format(testcenteruser.user.username, client_candidate_id))
# find testcenter_registration that matches the provided exam code:
# Note that we could rely in future on either the registrationId or the exam code,
# or possibly both. But for now we know what to do with an ExamSeriesCode,
......@@ -867,13 +878,13 @@ def test_center_login(request):
# we are not allowed to make up a new error code, according to Pearson,
# so instead of "missingExamSeriesCode", we use a valid one that is
# inaccurate but at least distinct. (Sigh.)
log.error("missing exam series code for cand ID {}".format(client_candidate_id))
AUDIT_LOG.error("missing exam series code for cand ID {}".format(client_candidate_id))
return HttpResponseRedirect(makeErrorURL(error_url, "missingPartnerID"))
exam_series_code = request.POST.get('vueExamSeriesCode')
registrations = TestCenterRegistration.objects.filter(testcenter_user=testcenteruser, exam_series_code=exam_series_code)
if not registrations:
log.error("not able to find exam registration for exam {} and cand ID {}".format(exam_series_code, client_candidate_id))
AUDIT_LOG.error("not able to find exam registration for exam {} and cand ID {}".format(exam_series_code, client_candidate_id))
return HttpResponseRedirect(makeErrorURL(error_url, "noTestsAssigned"))
# TODO: figure out what to do if there are more than one registrations....
......@@ -883,14 +894,14 @@ def test_center_login(request):
course_id = registration.course_id
course = course_from_id(course_id) # assume it will be found....
if not course:
log.error("not able to find course from ID {} for cand ID {}".format(course_id, client_candidate_id))
AUDIT_LOG.error("not able to find course from ID {} for cand ID {}".format(course_id, client_candidate_id))
return HttpResponseRedirect(makeErrorURL(error_url, "incorrectCandidateTests"))
exam = course.get_test_center_exam(exam_series_code)
if not exam:
log.error("not able to find exam {} for course ID {} and cand ID {}".format(exam_series_code, course_id, client_candidate_id))
AUDIT_LOG.error("not able to find exam {} for course ID {} and cand ID {}".format(exam_series_code, course_id, client_candidate_id))
return HttpResponseRedirect(makeErrorURL(error_url, "incorrectCandidateTests"))
location = exam.exam_url
log.info("proceeding with test of cand {} on exam {} for course {}: URL = {}".format(client_candidate_id, exam_series_code, course_id, location))
log.info("Proceeding with test of cand {} on exam {} for course {}: URL = {}".format(client_candidate_id, exam_series_code, course_id, location))
# check if the test has already been taken
timelimit_descriptor = modulestore().get_instance(course_id, Location(location))
......@@ -907,7 +918,7 @@ def test_center_login(request):
return HttpResponseRedirect(makeErrorURL(error_url, "missingClientProgram"))
if timelimit_module and timelimit_module.has_ended:
log.warning("cand {} on exam {} for course {}: test already over at {}".format(client_candidate_id, exam_series_code, course_id, timelimit_module.ending_at))
AUDIT_LOG.warning("cand {} on exam {} for course {}: test already over at {}".format(client_candidate_id, exam_series_code, course_id, timelimit_module.ending_at))
return HttpResponseRedirect(makeErrorURL(error_url, "allTestsTaken"))
# check if we need to provide an accommodation:
......@@ -922,7 +933,7 @@ def test_center_login(request):
if time_accommodation_code:
timelimit_module.accommodation_code = time_accommodation_code
log.info("cand {} on exam {} for course {}: receiving accommodation {}".format(client_candidate_id, exam_series_code, course_id, time_accommodation_code))
AUDIT_LOG.info("cand {} on exam {} for course {}: receiving accommodation {}".format(client_candidate_id, exam_series_code, course_id, time_accommodation_code))
# UGLY HACK!!!
# Login assumes that authentication has occurred, and that there is a
......@@ -936,6 +947,7 @@ def test_center_login(request):
# testcenteruser.user.backend = "%s.%s" % (backend.__module__, backend.__class__.__name__)
testcenteruser.user.backend = "%s.%s" % ("TestcenterAuthenticationModule", "TestcenterAuthenticationClass")
login(request, testcenteruser.user)
AUDIT_LOG.info("Logged in user '{}' for test of cand {} on exam {} for course {}: URL = {}".format(testcenteruser.user.username, client_candidate_id, exam_series_code, course_id, location))
# And start the test:
return jump_to(request, course_id, location)
......@@ -20,6 +20,7 @@ from random import randint
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.db import models
from django.db.models.signals import post_save
from django.dispatch import receiver
......@@ -30,6 +31,7 @@ from pytz import UTC
log = logging.getLogger(__name__)
AUDIT_LOG = logging.getLogger("audit")
class UserProfile(models.Model):
......@@ -779,3 +781,20 @@ def update_user_information(sender, instance, created, **kwargs):
log = logging.getLogger("mitx.discussion")
log.error(unicode(e))
log.error("update user info to discussion failed for user with id: " + str(instance.id))
# Define login and logout handlers here in the models file, instead of the views file,
# so that they are more likely to be loaded when a Studio user brings up the Studio admin
# page to login. These are currently the only signals available, so we need to continue
# identifying and logging failures separately (in views).
@receiver(user_logged_in)
def log_successful_login(sender, request, user, **kwargs):
"""Handler to log when logins have occurred successfully."""
AUDIT_LOG.info(u"Login success - {0} ({1})".format(user.username, user.email))
@receiver(user_logged_out)
def log_successful_logout(sender, request, user, **kwargs):
"""Handler to log when logouts have occurred successfully."""
AUDIT_LOG.info(u"Logout - {0}".format(request.user))
'''
Tests for student activation and login
'''
import json
from mock import patch
from django.test import TestCase
from django.test.client import Client
from django.core.urlresolvers import reverse
from courseware.tests.factories import UserFactory, RegistrationFactory, UserProfileFactory
import json
from django.core.urlresolvers import reverse, NoReverseMatch
from student.tests.factories import UserFactory, RegistrationFactory, UserProfileFactory
class LoginTest(TestCase):
......@@ -29,30 +31,37 @@ class LoginTest(TestCase):
self.client = Client()
# Store the login url
self.url = reverse('login')
try:
self.url = reverse('login_post')
except NoReverseMatch:
self.url = reverse('login')
def test_login_success(self):
response = self._login_response('test@edx.org', 'test_password')
response, mock_audit_log = self._login_response('test@edx.org', 'test_password', patched_audit_log='student.models.AUDIT_LOG')
self._assert_response(response, success=True)
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', u'test@edx.org'])
def test_login_success_unicode_email(self):
unicode_email = u'test@edx.org' + unichr(40960)
unicode_email = u'test' + unichr(40960) + u'@edx.org'
self.user.email = unicode_email
self.user.save()
response = self._login_response(unicode_email, 'test_password')
response, mock_audit_log = self._login_response(unicode_email, 'test_password', patched_audit_log='student.models.AUDIT_LOG')
self._assert_response(response, success=True)
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', unicode_email])
def test_login_fail_no_user_exists(self):
response = self._login_response('not_a_user@edx.org', 'test_password')
nonexistent_email = u'not_a_user@edx.org'
response, mock_audit_log = self._login_response(nonexistent_email, 'test_password')
self._assert_response(response, success=False,
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Unknown user email', nonexistent_email])
def test_login_fail_wrong_password(self):
response = self._login_response('test@edx.org', 'wrong_password')
response, mock_audit_log = self._login_response('test@edx.org', 'wrong_password')
self._assert_response(response, success=False,
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'test@edx.org', u'invalid'])
def test_login_not_activated(self):
# De-activate the user
......@@ -60,24 +69,38 @@ class LoginTest(TestCase):
self.user.save()
# Should now be unable to login
response = self._login_response('test@edx.org', 'test_password')
response, mock_audit_log = self._login_response('test@edx.org', 'test_password')
self._assert_response(response, success=False,
value="This account has not been activated")
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Account not active for user'])
def test_login_unicode_email(self):
unicode_email = u'test@edx.org' + unichr(40960)
response = self._login_response(unicode_email, 'test_password')
response, mock_audit_log = self._login_response(unicode_email, 'test_password')
self._assert_response(response, success=False)
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', unicode_email])
def test_login_unicode_password(self):
unicode_password = u'test_password' + unichr(1972)
response = self._login_response('test@edx.org', unicode_password)
response, mock_audit_log = self._login_response('test@edx.org', unicode_password)
self._assert_response(response, success=False)
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'test@edx.org', u'invalid'])
def test_logout_logging(self):
response, _ = self._login_response('test@edx.org', 'test_password')
self._assert_response(response, success=True)
logout_url = reverse('logout')
with patch('student.models.AUDIT_LOG') as mock_audit_log:
response = self.client.post(logout_url)
self.assertEqual(response.status_code, 302)
self._assert_audit_log(mock_audit_log, 'info', [u'Logout', u'test'])
def _login_response(self, email, password):
def _login_response(self, email, password, patched_audit_log='student.views.AUDIT_LOG'):
''' Post the login info '''
post_params = {'email': email, 'password': password}
return self.client.post(self.url, post_params)
with patch(patched_audit_log) as mock_audit_log:
result = self.client.post(self.url, post_params)
return result, mock_audit_log
def _assert_response(self, response, success=None, value=None):
'''
......@@ -105,3 +128,16 @@ class LoginTest(TestCase):
msg = ("'%s' did not contain '%s'" %
(str(response_dict['value']), str(value)))
self.assertTrue(value in response_dict['value'], msg)
def _assert_audit_log(self, mock_audit_log, level, log_strings):
"""
Check that the audit log has received the expected call.
"""
method_calls = mock_audit_log.method_calls
self.assertEquals(len(method_calls), 1)
name, args, _kwargs = method_calls[0]
self.assertEquals(name, level)
self.assertEquals(len(args), 1)
format_string = args[0]
for log_string in log_strings:
self.assertIn(log_string, format_string)
......@@ -56,6 +56,8 @@ from statsd import statsd
from pytz import UTC
log = logging.getLogger("mitx.student")
AUDIT_LOG = logging.getLogger("audit")
Article = namedtuple('Article', 'title url author image deck publication publish_date')
......@@ -104,8 +106,7 @@ day_pattern = re.compile(r'\s\d+,\s')
multimonth_pattern = re.compile(r'\s?\-\s?\S+\s')
def get_date_for_press(publish_date):
import datetime
def _get_date_for_press(publish_date):
# strip off extra months, and just use the first:
date = re.sub(multimonth_pattern, ", ", publish_date)
if re.search(day_pattern, date):
......@@ -126,7 +127,7 @@ def press(request):
json_articles = json.loads(content)
cache.set("student_press_json_articles", json_articles)
articles = [Article(**article) for article in json_articles]
articles.sort(key=lambda item: get_date_for_press(item.publish_date), reverse=True)
articles.sort(key=lambda item: _get_date_for_press(item.publish_date), reverse=True)
return render_to_response('static_templates/press.html', {'articles': articles})
......@@ -230,7 +231,7 @@ def signin_user(request):
@ensure_csrf_cookie
def register_user(request, extra_context={}):
def register_user(request, extra_context=None):
"""
This view will display the non-modal registration form
"""
......@@ -241,7 +242,8 @@ def register_user(request, extra_context={}):
'course_id': request.GET.get('course_id'),
'enrollment_action': request.GET.get('enrollment_action')
}
context.update(extra_context)
if extra_context is not None:
context.update(extra_context)
return render_to_response('register.html', context)
......@@ -374,7 +376,7 @@ def change_enrollment(request):
"run:{0}".format(run)])
try:
enrollment, created = CourseEnrollment.objects.get_or_create(user=user, course_id=course.id)
enrollment, _created = CourseEnrollment.objects.get_or_create(user=user, course_id=course.id)
except IntegrityError:
# If we've already created this enrollment in a separate transaction,
# then just continue
......@@ -418,19 +420,21 @@ def login_user(request, error=""):
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
log.warning(u"Login failed - Unknown user email: {0}".format(email))
AUDIT_LOG.warning(u"Login failed - Unknown user email: {0}".format(email))
return HttpResponse(json.dumps({'success': False,
'value': _('Email or password is incorrect.')})) # TODO: User error message
username = user.username
user = authenticate(username=username, password=password)
if user is None:
log.warning(u"Login failed - password for {0} is invalid".format(email))
AUDIT_LOG.warning(u"Login failed - password for {0} is invalid".format(email))
return HttpResponse(json.dumps({'success': False,
'value': _('Email or password is incorrect.')}))
if user is not None and user.is_active:
try:
# We do not log here, because we have a handler registered
# to perform logging on successful logins.
login(request, user)
if request.POST.get('remember') == 'true':
request.session.set_expiry(604800)
......@@ -438,14 +442,14 @@ def login_user(request, error=""):
else:
request.session.set_expiry(0)
except Exception as e:
AUDIT_LOG.critical("Login failed - Could not create session. Is memcached running?")
log.critical("Login failed - Could not create session. Is memcached running?")
log.exception(e)
log.info(u"Login success - {0} ({1})".format(username, email))
raise
try_change_enrollment(request)
statsd.increment(_("common.student.successful_login"))
statsd.increment("common.student.successful_login")
response = HttpResponse(json.dumps({'success': True}))
# set the login cookie for the edx marketing site
......@@ -469,7 +473,7 @@ def login_user(request, error=""):
return response
log.warning(u"Login failed - Account not active for user {0}, resending activation".format(username))
AUDIT_LOG.warning(u"Login failed - Account not active for user {0}, resending activation".format(username))
reactivation_email_for_user(user)
not_activated_msg = _("This account has not been activated. We have sent another activation message. Please check your e-mail for the activation instructions.")
......@@ -484,7 +488,8 @@ def logout_user(request):
Deletes both the CSRF and sessionid cookies so the marketing
site can determine the logged in state of the user
'''
# We do not log here, because we have a handler registered
# to perform logging on successful logouts.
logout(request)
response = redirect('/')
response.delete_cookie(settings.EDXMKTG_COOKIE_NAME,
......@@ -591,7 +596,7 @@ def create_account(request, post_override=None):
password = eamap.internal_password
post_vars = dict(post_vars.items())
post_vars.update(dict(email=email, name=name, password=password))
log.info('In create_account with external_auth: post_vars = %s' % post_vars)
log.debug(u'In create_account with external_auth: user = %s, email=%s', name, email)
# Confirm we have a properly formed request
for a in ['username', 'email', 'password', 'name']:
......@@ -624,7 +629,7 @@ def create_account(request, post_override=None):
required_post_vars = ['username', 'email', 'name', 'password', 'terms_of_service', 'honor_code']
if tos_not_required:
required_post_vars = ['username', 'email', 'name', 'password', 'honor_code']
required_post_vars = ['username', 'email', 'name', 'password', 'honor_code']
for a in required_post_vars:
if len(post_vars[a]) < 2:
......@@ -677,7 +682,7 @@ def create_account(request, post_override=None):
'-' * 80 + '\n\n' + message)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [dest_addr], fail_silently=False)
else:
res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
_res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
except:
log.warning('Unable to send activation email to user', exc_info=True)
js['value'] = _('Could not send activation e-mail.')
......@@ -690,17 +695,23 @@ def create_account(request, post_override=None):
login(request, login_user)
request.session.set_expiry(0)
# TODO: there is no error checking here to see that the user actually logged in successfully,
# and is not yet an active user.
if login_user is not None:
AUDIT_LOG.info(u"Login success on new account creation - {0}".format(login_user.username))
if DoExternalAuth:
eamap.user = login_user
eamap.dtsignup = datetime.datetime.now(UTC)
eamap.save()
log.info("User registered with external_auth %s" % post_vars['username'])
log.info('Updated ExternalAuthMap for %s to be %s' % (post_vars['username'], eamap))
AUDIT_LOG.info("User registered with external_auth %s", post_vars['username'])
AUDIT_LOG.info('Updated ExternalAuthMap for %s to be %s', post_vars['username'], eamap)
if settings.MITX_FEATURES.get('BYPASS_ACTIVATION_EMAIL_FOR_EXTAUTH'):
log.info('bypassing activation email')
login_user.is_active = True
login_user.save()
AUDIT_LOG.info(u"Login activated on extauth account - {0} ({1})".format(login_user.username, login_user.email))
try_change_enrollment(request)
......@@ -957,14 +968,14 @@ def activate_account(request, key):
r[0].activate()
already_active = False
#Enroll student in any pending courses he/she may have if auto_enroll flag is set
# Enroll student in any pending courses he/she may have if auto_enroll flag is set
student = User.objects.filter(id=r[0].user_id)
if student:
ceas = CourseEnrollmentAllowed.objects.filter(email=student[0].email)
for cea in ceas:
if cea.auto_enroll:
course_id = cea.course_id
enrollment, created = CourseEnrollment.objects.get_or_create(user_id=student[0].id, course_id=course_id)
_enrollment, _created = CourseEnrollment.objects.get_or_create(user_id=student[0].id, course_id=course_id)
resp = render_to_response("registration/activation_complete.html", {'user_logged_in': user_logged_in, 'already_active': already_active})
return resp
......@@ -996,7 +1007,7 @@ def password_reset_confirm_wrapper(request, uidb36=None, token=None):
''' A wrapper around django.contrib.auth.views.password_reset_confirm.
Needed because we want to set the user as active at this step.
'''
#cribbed from django.contrib.auth.views.password_reset_confirm
# cribbed from django.contrib.auth.views.password_reset_confirm
try:
uid_int = base36_to_int(uidb36)
user = User.objects.get(id=uid_int)
......@@ -1022,7 +1033,7 @@ def reactivation_email_for_user(user):
message = render_to_string('emails/activation_email.txt', d)
try:
res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
_res = user.email_user(subject, message, settings.DEFAULT_FROM_EMAIL)
except:
log.warning('Unable to send reactivation email', exc_info=True)
return HttpResponse(json.dumps({'success': False, 'error': _('Unable to send reactivation email')}))
......@@ -1080,7 +1091,7 @@ def change_email_request(request):
subject = ''.join(subject.splitlines())
message = render_to_string('emails/email_change.txt', d)
res = send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [pec.new_email])
_res = send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [pec.new_email])
return HttpResponse(json.dumps({'success': True}))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment