Commit 38e55a4e by chrisndodge

Merge pull request #2668 from edx/cdodge/no-pii-in-logs

add new feature to make sure we aren't writing PII into the application ...
parents 4516f9e9 fe3ebca1
......@@ -78,6 +78,9 @@ FEATURES = {
# Allow editing of short description in course settings in cms
'EDITABLE_SHORT_DESCRIPTION': True,
# Hide any Personally Identifiable Information from application logs
'SQUELCH_PII_IN_LOGS': False,
}
ENABLE_JASMINE = False
......
......@@ -103,9 +103,9 @@ class ShibSPTest(ModuleStoreTestCase):
"""Asserts that shibboleth login attempt is being logged"""
method_name, args, _kwargs = audit_log_call
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 2)
self.assertEquals(len(args), 1)
self.assertIn(u'logged in via Shibboleth', args[0])
self.assertEquals(remote_user, args[1])
self.assertIn(remote_user, args[0])
@unittest.skipUnless(settings.FEATURES.get('AUTH_USE_SHIB'), "AUTH_USE_SHIB not set")
def test_shib_login(self):
......@@ -166,9 +166,9 @@ class ShibSPTest(ModuleStoreTestCase):
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 3)
self.assertEquals(len(args), 1)
self.assertIn(u'Login success', args[0])
self.assertEquals(remote_user, args[2])
self.assertIn(remote_user, args[0])
elif idp == "https://idp.stanford.edu/" and remote_user == 'inactive@stanford.edu':
self.assertEqual(response.status_code, 403)
self.assertIn("Account not yet activated: please look for link in your email", response.content)
......@@ -177,7 +177,7 @@ class ShibSPTest(ModuleStoreTestCase):
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'warning')
self.assertEquals(len(args), 2)
self.assertEquals(len(args), 1)
self.assertIn(u'is not active after external login', args[0])
# self.assertEquals(remote_user, args[1])
elif idp == "https://idp.stanford.edu/" and remote_user == 'womap@stanford.edu':
......@@ -190,9 +190,9 @@ class ShibSPTest(ModuleStoreTestCase):
self._assert_shib_login_is_logged(audit_log_calls[0], remote_user)
method_name, args, _kwargs = audit_log_calls[1]
self.assertEquals(method_name, 'info')
self.assertEquals(len(args), 3)
self.assertEquals(len(args), 1)
self.assertIn(u'Login success', args[0])
self.assertEquals(remote_user, args[2])
self.assertIn(remote_user, args[0])
elif idp == "https://someother.idp.com/" and remote_user in \
['withmap@stanford.edu', 'womap@stanford.edu', 'inactive@stanford.edu']:
self.assertEqual(response.status_code, 403)
......
......@@ -193,21 +193,33 @@ def _external_login_or_signup(request,
else:
auth_backend = 'django.contrib.auth.backends.ModelBackend'
user.backend = auth_backend
AUDIT_LOG.info('Linked user "%s" logged in via Shibboleth', user.email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info('Linked user.id: {0} logged in via Shibboleth'.format(user.id))
else:
AUDIT_LOG.info('Linked user "{0}" logged in via Shibboleth'.format(user.email))
elif uses_certs:
# Certificates are trusted, so just link the user and log the action
user = internal_user
user.backend = 'django.contrib.auth.backends.ModelBackend'
AUDIT_LOG.info('Linked user "%s" logged in via SSL certificate', user.email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info('Linked user_id {0} logged in via SSL certificate'.format(user.id))
else:
AUDIT_LOG.info('Linked user "{0}" logged in via SSL certificate'.format(user.email))
else:
user = authenticate(username=uname, password=eamap.internal_password, request=request)
if user is None:
# we want to log the failure, but don't want to log the password attempted:
AUDIT_LOG.warning('External Auth Login failed for "%s"', uname)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning('External Auth Login failed')
else:
AUDIT_LOG.warning('External Auth Login failed for "{0}"'.format(uname))
return _signup(request, eamap, retfun)
if not user.is_active:
AUDIT_LOG.warning('User "%s" is not active after external login', uname)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning('User {0} is not active after external login'.format(user.id))
else:
AUDIT_LOG.warning('User "{0}" is not active after external login'.format(uname))
# TODO: improve error page
msg = 'Account not yet activated: please look for link in your email'
return default_render_failure(request, msg)
......@@ -223,7 +235,10 @@ def _external_login_or_signup(request,
student.views.try_change_enrollment(enroll_request)
else:
student.views.try_change_enrollment(request)
AUDIT_LOG.info("Login success - %s (%s)", user.username, user.email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info("Login success - user.id: {0}".format(user.id))
else:
AUDIT_LOG.info("Login success - {0} ({1})".format(user.username, user.email))
if retfun is None:
return redirect('/')
return retfun()
......@@ -812,8 +827,11 @@ def provider_login(request):
user = User.objects.get(email=email)
except User.DoesNotExist:
request.session['openid_error'] = True
msg = "OpenID login failed - Unknown user email: %s"
AUDIT_LOG.warning(msg, email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning("OpenID login failed - Unknown user email")
else:
msg = "OpenID login failed - Unknown user email: {0}".format(email)
AUDIT_LOG.warning(msg)
return HttpResponseRedirect(openid_request_url)
# attempt to authenticate user (but not actually log them in...)
......@@ -828,8 +846,11 @@ def provider_login(request):
if user is None:
request.session['openid_error'] = True
msg = "OpenID login failed - password for %s is invalid"
AUDIT_LOG.warning(msg, email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning("OpenID login failed - invalid password")
else:
msg = "OpenID login failed - password for {0} is invalid".format(email)
AUDIT_LOG.warning(msg)
return HttpResponseRedirect(openid_request_url)
# authentication succeeded, so fetch user information
......@@ -839,8 +860,11 @@ def provider_login(request):
if 'openid_error' in request.session:
del request.session['openid_error']
AUDIT_LOG.info("OpenID login success - %s (%s)",
user.username, user.email)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info("OpenID login success - user.id: {0}".format(user.id))
else:
AUDIT_LOG.info("OpenID login success - {0} ({1})".format(
user.username, user.email))
# redirect user to return_to location
url = endpoint + urlquote(user.username)
......@@ -859,8 +883,11 @@ def provider_login(request):
# the account is not active, so redirect back to the login page:
request.session['openid_error'] = True
msg = "Login failed - Account not active for user %s"
AUDIT_LOG.warning(msg, username)
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning("Login failed - Account not active for user.id {0}".format(user.id))
else:
msg = "Login failed - Account not active for user {0}".format(username)
AUDIT_LOG.warning(msg)
return HttpResponseRedirect(openid_request_url)
# determine consumer domain if applicable
......
......@@ -845,10 +845,16 @@ def update_user_information(sender, instance, created, **kwargs):
@receiver(user_logged_in)
def log_successful_login(sender, request, user, **kwargs):
"""Handler to log when logins have occurred successfully."""
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info(u"Login success - user.id: {0}".format(user.id))
else:
AUDIT_LOG.info(u"Login success - {0} ({1})".format(user.username, user.email))
@receiver(user_logged_out)
def log_successful_logout(sender, request, user, **kwargs):
"""Handler to log when logouts have occurred successfully."""
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.info(u"Logout - user.id: {0}".format(request.user.id))
else:
AUDIT_LOG.info(u"Logout - {0}".format(request.user))
......@@ -56,6 +56,13 @@ class LoginTest(TestCase):
self._assert_response(response, success=True)
self._assert_audit_log(mock_audit_log, 'info', [u'Login success', u'test@edx.org'])
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_login_success_no_pii(self):
response, mock_audit_log = self._login_response('test@edx.org', 'test_password', patched_audit_log='student.models.AUDIT_LOG')
self._assert_response(response, success=True)
self._assert_audit_log(mock_audit_log, 'info', [u'Login success'])
self._assert_not_in_audit_log(mock_audit_log, 'info', [u'test@edx.org'])
def test_login_success_unicode_email(self):
unicode_email = u'test' + unichr(40960) + u'@edx.org'
self.user.email = unicode_email
......@@ -72,12 +79,29 @@ class LoginTest(TestCase):
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Unknown user email', nonexistent_email])
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_login_fail_no_user_exists_no_pii(self):
nonexistent_email = u'not_a_user@edx.org'
response, mock_audit_log = self._login_response(nonexistent_email, 'test_password')
self._assert_response(response, success=False,
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Unknown user email'])
self._assert_not_in_audit_log(mock_audit_log, 'warning', [nonexistent_email])
def test_login_fail_wrong_password(self):
response, mock_audit_log = self._login_response('test@edx.org', 'wrong_password')
self._assert_response(response, success=False,
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'test@edx.org', u'invalid'])
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_login_fail_wrong_password_no_pii(self):
response, mock_audit_log = self._login_response('test@edx.org', 'wrong_password')
self._assert_response(response, success=False,
value='Email or password is incorrect')
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'password for', u'invalid'])
self._assert_not_in_audit_log(mock_audit_log, 'warning', [u'test@edx.org'])
def test_login_not_activated(self):
# De-activate the user
self.user.is_active = False
......@@ -89,6 +113,19 @@ class LoginTest(TestCase):
value="This account has not been activated")
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Account not active for user'])
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_login_not_activated_no_pii(self):
# De-activate the user
self.user.is_active = False
self.user.save()
# Should now be unable to login
response, mock_audit_log = self._login_response('test@edx.org', 'test_password')
self._assert_response(response, success=False,
value="This account has not been activated")
self._assert_audit_log(mock_audit_log, 'warning', [u'Login failed', u'Account not active for user'])
self._assert_not_in_audit_log(mock_audit_log, 'warning', [u'test'])
def test_login_unicode_email(self):
unicode_email = u'test@edx.org' + unichr(40960)
response, mock_audit_log = self._login_response(unicode_email, 'test_password')
......@@ -110,6 +147,17 @@ class LoginTest(TestCase):
self.assertEqual(response.status_code, 302)
self._assert_audit_log(mock_audit_log, 'info', [u'Logout', u'test'])
@patch.dict("django.conf.settings.FEATURES", {'SQUELCH_PII_IN_LOGS': True})
def test_logout_logging_no_pii(self):
response, _ = self._login_response('test@edx.org', 'test_password')
self._assert_response(response, success=True)
logout_url = reverse('logout')
with patch('student.models.AUDIT_LOG') as mock_audit_log:
response = self.client.post(logout_url)
self.assertEqual(response.status_code, 302)
self._assert_audit_log(mock_audit_log, 'info', [u'Logout'])
self._assert_not_in_audit_log(mock_audit_log, 'info', [u'test'])
def test_login_ratelimited_success(self):
# Try (and fail) logging in with fewer attempts than the limit of 30
# and verify that you can still successfully log in afterwards.
......@@ -177,6 +225,18 @@ class LoginTest(TestCase):
for log_string in log_strings:
self.assertIn(log_string, format_string)
def _assert_not_in_audit_log(self, mock_audit_log, level, log_strings):
"""
Check that the audit log has received the expected call as its last call.
"""
method_calls = mock_audit_log.method_calls
name, args, _kwargs = method_calls[-1]
self.assertEquals(name, level)
self.assertEquals(len(args), 1)
format_string = args[0]
for log_string in log_strings:
self.assertNotIn(log_string, format_string)
class UtilFnTest(TestCase):
"""
......
......@@ -702,6 +702,9 @@ def login_user(request, error=""):
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning(u"Login failed - Unknown user email")
else:
AUDIT_LOG.warning(u"Login failed - Unknown user email: {0}".format(email))
user = None
......@@ -749,6 +752,10 @@ def login_user(request, error=""):
# if we didn't find this username earlier, the account for this email
# doesn't exist, and doesn't have a corresponding password
if username != "":
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
loggable_id = user_found_by_email_lookup.id if user_found_by_email_lookup else "<unknown>"
AUDIT_LOG.warning(u"Login failed - password for user.id: {0} is invalid".format(loggable_id))
else:
AUDIT_LOG.warning(u"Login failed - password for {0} is invalid".format(email))
return JsonResponse({
"success": False,
......@@ -803,6 +810,9 @@ def login_user(request, error=""):
return response
if settings.FEATURES['SQUELCH_PII_IN_LOGS']:
AUDIT_LOG.warning(u"Login failed - Account not active for user.id: {0}, resending activation".format(user.id))
else:
AUDIT_LOG.warning(u"Login failed - Account not active for user {0}, resending activation".format(username))
reactivation_email_for_user(user)
......
......@@ -218,6 +218,9 @@ FEATURES = {
# Turn off account locking if failed login attempts exceeds a limit
'ENABLE_MAX_FAILED_LOGIN_ATTEMPTS': False,
# Hide any Personally Identifiable Information from application logs
'SQUELCH_PII_IN_LOGS': False,
}
# Used for A/B testing
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment