Commit 82cf41d6 by chrisndodge

Merge pull request #25 from edx-solutions/login_audit_log_mckin_817

Create new Account/Login Audit Log
parents 8772a67f 7bb0a649
# pylint: disable=E1101
""" API implementation for session-oriented interactions. """
import logging
from django.conf import settings
from django.contrib.auth import authenticate, login
......@@ -21,6 +22,8 @@ from api_manager.permissions import ApiKeyHeaderPermission
from api_manager.serializers import UserSerializer
from student.models import LoginFailures
AUDIT_LOG = logging.getLogger("audit")
def _generate_base_uri(request):
"""
......@@ -82,6 +85,9 @@ class SessionsList(APIView):
response_data['user'] = user_dto.data
response_data['uri'] = '{}/{}'.format(base_uri, request.session.session_key)
response_status = status.HTTP_201_CREATED
# add to audit log
AUDIT_LOG.info(u"API::User logged in successfully with user-id - {0}".format(user.id))
else:
response_status = status.HTTP_401_UNAUTHORIZED
else:
......@@ -91,7 +97,9 @@ class SessionsList(APIView):
LoginFailures.increment_lockout_counter(existing_user)
response_status = status.HTTP_401_UNAUTHORIZED
AUDIT_LOG.warn(u"API::User authentication failed with user-id - {0}".format(existing_user.id))
else:
AUDIT_LOG.warn(u"API::Failed login attempt with unknown email/username")
response_status = status.HTTP_404_NOT_FOUND
return Response(response_data, status=response_status)
......@@ -131,5 +139,9 @@ class SessionsDetail(APIView):
base_uri = _generate_base_uri(request)
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore(session_id)
user_id = session[SESSION_KEY]
AUDIT_LOG.info(u"API::User session terminated for user-id - {0}".format(user_id))
session.flush()
return Response(response_data, status=status.HTTP_204_NO_CONTENT)
return Response(response_data, status=status.HTTP_204_NO_CONTENT)
......@@ -3,7 +3,6 @@ Tests for session api with advance security features
"""
import json
import uuid
import unittest
from mock import patch
from datetime import datetime, timedelta
from freezegun import freeze_time
......@@ -13,7 +12,6 @@ from django.test import TestCase
from django.test.client import Client
from django.test.utils import override_settings
from django.utils.translation import ugettext as _
from django.conf import settings
from django.core.cache import cache
from student.tests.factories import UserFactory
......@@ -50,11 +48,11 @@ class SessionApiSecurityTest(TestCase):
"""
for i in xrange(9):
password = u'test_password{0}'.format(i)
response = self._do_post_request(self.session_url, 'test', password, secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', password, secure=True)
self.assertEqual(response.status_code, 401)
# now try logging in with a valid password and check status
response = self._do_post_request(self.session_url, 'test', 'test_password', secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', 'test_password', secure=True)
self._assert_response(response, status=201)
@override_settings(MAX_FAILED_LOGIN_ATTEMPTS_ALLOWED=10)
......@@ -65,11 +63,11 @@ class SessionApiSecurityTest(TestCase):
"""
for i in xrange(10):
password = u'test_password{0}'.format(i)
response = self._do_post_request(self.session_url, 'test', password, secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', password, secure=True)
self.assertEqual(response.status_code, 401)
# check to see if this response indicates blockout
response = self._do_post_request(self.session_url, 'test', 'test_password', secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', 'test_password', secure=True)
message = _('This account has been temporarily locked due to excessive login failures. Try again later.')
self._assert_response(response, status=403, message=message)
......@@ -83,18 +81,21 @@ class SessionApiSecurityTest(TestCase):
"""
for i in xrange(10):
password = u'test_password{0}'.format(i)
response = self._do_post_request(self.session_url, 'test', password, secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', password, secure=True)
self.assertEqual(response.status_code, 401)
self._assert_audit_log(mock_audit_log, 'warn',
[u"API::User authentication failed with user-id - {0}".format(self.user.id)])
self._assert_not_in_audit_log(mock_audit_log, 'warn', [u'test'])
# check to see if this response indicates blockout
response = self._do_post_request(self.session_url, 'test', 'test_password', secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', 'test_password', secure=True)
message = _('This account has been temporarily locked due to excessive login failures. Try again later.')
self._assert_response(response, status=403, message=message)
# now reset the time to 30 from now in future
reset_time = datetime.now(UTC) + timedelta(seconds=1800)
with freeze_time(reset_time):
response = self._do_post_request(self.session_url, 'test', 'test_password', secure=True)
response, mock_audit_log = self._do_request(self.session_url, 'test', 'test_password', secure=True)
self._assert_response(response, status=201)
@override_settings(PASSWORD_MIN_LENGTH=4)
......@@ -102,7 +103,7 @@ class SessionApiSecurityTest(TestCase):
"""
Try (and fail) user creation with shorter password
"""
response = self._do_post_request(self.user_url, 'test', 'abc', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'abc', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Invalid Length (must be 4 characters or more)')
self._assert_response(response, status=400, message=message)
......@@ -112,7 +113,7 @@ class SessionApiSecurityTest(TestCase):
"""
Try (and fail) user creation with longer password
"""
response = self._do_post_request(self.user_url, 'test', 'test_password', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'test_password', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Invalid Length (must be 12 characters or less)')
self._assert_response(response, status=400, message=message)
......@@ -123,7 +124,7 @@ class SessionApiSecurityTest(TestCase):
Try (and fail) user creation since password should have atleast
2 upper characters
"""
response = self._do_post_request(self.user_url, 'test', 'test.pa64!', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'test.pa64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Must be more complex (must contain 2 or more uppercase characters)')
self._assert_response(response, status=400, message=message)
......@@ -134,7 +135,7 @@ class SessionApiSecurityTest(TestCase):
Try (and fail) user creation without any numeric characters
in password
"""
response = self._do_post_request(self.user_url, 'test', 'TEST.PA64!', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'TEST.PA64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Must be more complex (must contain 2 or more lowercase characters)')
self._assert_response(response, status=400, message=message)
......@@ -144,7 +145,7 @@ class SessionApiSecurityTest(TestCase):
"""
Try (and fail) user creation without any punctuation in password
"""
response = self._do_post_request(self.user_url, 'test', 'test64SS', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'test64SS', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Must be more complex (must contain 2 or more uppercase characters,'
' must contain 2 or more punctuation characters)')
......@@ -155,7 +156,7 @@ class SessionApiSecurityTest(TestCase):
"""
Try (and fail) user creation without any numeric characters in password
"""
response = self._do_post_request(self.user_url, 'test', 'test.paSS!', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'test.paSS!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Password: Must be more complex (must contain 2 or more uppercase characters,'
' must contain 2 or more digits)')
......@@ -166,15 +167,19 @@ class SessionApiSecurityTest(TestCase):
"""
This should pass since it has everything needed for a complex password
"""
response = self._do_post_request(self.user_url, str(uuid.uuid4()), 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
response, mock_audit_log = self._do_request(self.user_url, str(uuid.uuid4()), 'Test.Me64!',
email='test@edx.org', first_name='John',
last_name='Doe', secure=True,
patched_audit_log='api_manager.users_views.AUDIT_LOG')
self._assert_response(response, status=201)
self._assert_audit_log(mock_audit_log, 'info', [u'API::New account created with user-id'])
self._assert_not_in_audit_log(mock_audit_log, 'info', [u'test@edx.org'])
def test_user_with_invalid_email(self):
"""
Try (and fail) user creation with invalid email address
"""
response = self._do_post_request(self.user_url, 'test', 'Test.Me64!', email='test-edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'test', 'Test.Me64!', email='test-edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Valid e-mail is required.')
self._assert_response(response, status=400, message=message)
......@@ -183,27 +188,65 @@ class SessionApiSecurityTest(TestCase):
"""
Try (and fail) user creation with invalid username
"""
response = self._do_post_request(self.user_url, 'user name', 'Test.Me64!', email='test@edx.org',
response, mock_audit_log = self._do_request(self.user_url, 'user name', 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True)
message = _('Username should only consist of A-Z and 0-9, with no spaces.')
self._assert_response(response, status=400, message=message)
def _do_post_request(self, url, username, password, **kwargs):
def test_user_with_unknown_username(self):
"""
Try (and fail) user login with unknown credentials
"""
response, mock_audit_log = self._do_request(self.session_url, 'unknown', 'UnKnown.Pass', secure=True)
self._assert_response(response, status=404)
self._assert_audit_log(mock_audit_log, 'warn', [u'API::Failed login attempt with unknown email/username'])
def test_successful_logout(self):
"""
Try login of user first and then logout user successfully and test audit log
"""
response, mock_audit_log = self._do_request(self.session_url, 'test', 'test_password', secure=True)
self._assert_response(response, status=201)
self._assert_audit_log(mock_audit_log, 'info',
[u"API::User logged in successfully with user-id - {0}".format(self.user.id)])
self._assert_not_in_audit_log(mock_audit_log, 'info', [u'test'])
response_dict = json.loads(response.content)
response, mock_audit_log = self._do_request(self.session_url + '/' + response_dict['token'], 'test',
'test_password', secure=True, request_method='DELETE')
self._assert_response(response, status=204)
self._assert_audit_log(mock_audit_log, 'info',
[u'API::User session terminated for user-id - {0}'.format(self.user.id)])
def _do_request(self, url, username, password, **kwargs):
"""
Post the login info
Make Post/Delete/Get requests with params
"""
post_params, extra = {'username': username, 'password': password}, {}
post_params, extra, = {'username': username, 'password': password}, {}
patched_audit_log = 'api_manager.sessions_views.AUDIT_LOG'
request_method = kwargs.get('request_method', 'POST')
if kwargs.get('email'):
post_params['email'] = kwargs.get('email')
if kwargs.get('first_name'):
post_params['first_name'] = kwargs.get('first_name')
if kwargs.get('last_name'):
post_params['last_name'] = kwargs.get('last_name')
headers = {'X-Edx-Api-Key': TEST_API_KEY, 'Content-Type': 'application/json'}
if kwargs.get('secure', False):
extra['wsgi.url_scheme'] = 'https'
return self.client.post(url, post_params, headers=headers, **extra)
if kwargs.get('patched_audit_log'):
patched_audit_log = kwargs.get('patched_audit_log')
headers = {'X-Edx-Api-Key': TEST_API_KEY, 'Content-Type': 'application/json'}
with patch(patched_audit_log) as mock_audit_log:
if request_method == 'POST':
result = self.client.post(url, post_params, headers=headers, **extra)
elif request_method == 'DELETE':
result = self.client.delete(url, post_params, headers=headers, **extra)
else:
result = self.client.get(url, post_params, headers=headers, **extra)
return result, mock_audit_log
def _assert_response(self, response, status=200, success=None, message=None):
"""
......@@ -218,6 +261,10 @@ class SessionApiSecurityTest(TestCase):
"""
self.assertEqual(response.status_code, status)
# Return if response has not content
if response.status_code == 204:
return
try:
response_dict = json.loads(response.content)
except ValueError:
......@@ -231,3 +278,27 @@ class SessionApiSecurityTest(TestCase):
msg = ("'%s' did not contain '%s'" %
(response_dict['message'], message))
self.assertTrue(message in response_dict['message'], msg)
def _assert_audit_log(self, mock_audit_log, level, log_strings):
"""
Check that the audit log has received the expected call as its last call.
"""
method_calls = mock_audit_log.method_calls
name, args, _kwargs = method_calls[-1]
self.assertEquals(name, level)
self.assertEquals(len(args), 1)
format_string = args[0]
for log_string in log_strings:
self.assertIn(log_string, format_string)
def _assert_not_in_audit_log(self, mock_audit_log, level, log_strings):
"""
Check that the audit log has received the expected call as its last call.
"""
method_calls = mock_audit_log.method_calls
name, args, _kwargs = method_calls[-1]
self.assertEquals(name, level)
self.assertEquals(len(args), 1)
format_string = args[0]
for log_string in log_strings:
self.assertNotIn(log_string, format_string)
......@@ -27,7 +27,7 @@ from util.password_policy_validators import (
)
log = logging.getLogger(__name__)
AUDIT_LOG = logging.getLogger("audit")
def _generate_base_uri(request):
"""
......@@ -144,6 +144,9 @@ class UsersList(APIView):
password_history_entry = PasswordHistory()
password_history_entry.create(user)
# add to audit log
AUDIT_LOG.info(u"API::New account created with user-id - {0}".format(user.id))
# CDODGE: @TODO: We will have to extend this to look in the CourseEnrollmentAllowed table and
# auto-enroll students when they create a new account. Also be sure to remove from
# the CourseEnrollmentAllow table after the auto-registration has taken place
......@@ -155,6 +158,7 @@ class UsersList(APIView):
status_code = status.HTTP_409_CONFLICT
response_data['message'] = "User '%s' already exists", username
response_data['field_conflict'] = "username"
return Response(response_data, status=status_code)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment