Commit 16e7ea41 by ziafazal

Added Security to session api as requested in #785

parent 11225820
"""
Tests for session api with advance security features
"""
import json
import uuid
import unittest
from django.test import TestCase
from django.test.client import Client
from django.test.utils import override_settings
from django.utils.translation import ugettext as _
from django.conf import settings
from django.core.cache import cache
from student.tests.factories import UserFactory
TEST_API_KEY = str(uuid.uuid4())
@override_settings(EDX_API_KEY=TEST_API_KEY, ENABLE_MAX_FAILED_LOGIN_ATTEMPTS=True)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class SessionApiSecurityTest(TestCase):
"""
Test api_manager.session.session_list view
"""
def setUp(self):
# Create one user and save it to the database
self.user = UserFactory.build(username='test', email='test@edx.org')
self.user.set_password('test_password')
self.user.save()
# Create the test client
self.client = Client()
cache.clear()
self.url = '/api/sessions/'
@override_settings(MAX_FAILED_LOGIN_ATTEMPTS_ALLOWED=10)
def test_login_ratelimited_success(self):
# Try (and fail) logging in with fewer attempts than the limit of 10
# and verify that you can still successfully log in afterwards.
for i in xrange(9):
password = u'test_password{0}'.format(i)
response = self._login_response('test', password, secure=True)
self.assertEqual(response.status_code, 401)
# now try logging in with a valid password and check status
response = self._login_response('test', 'test_password', secure=True)
self._assert_response(response, status=201)
@override_settings(MAX_FAILED_LOGIN_ATTEMPTS_ALLOWED=10)
def test_login_blockout(self):
# Try (and fail) logging in with 10 attempts
# and verify that user is blocked out.
for i in xrange(10):
password = u'test_password{0}'.format(i)
response = self._login_response('test', password, secure=True)
self.assertEqual(response.status_code, 401)
# check to see if this response indicates blockout
response = self._login_response('test', 'test_password', secure=True)
message = _('This account has been temporarily locked due to excessive login failures. Try again later.')
self._assert_response(response, status=403, message=message)
def _login_response(self, username, password, secure=False):
"""
Post the login info
"""
post_params, extra = {'username': username, 'password': password}, {}
headers = {'X-Edx-Api-Key': TEST_API_KEY, 'Content-Type': 'application/json'}
if secure:
extra['wsgi.url_scheme'] = 'https'
return self.client.post(self.url, post_params, headers=headers, **extra)
def _assert_response(self, response, status=200, success=None, message=None):
"""
Assert that the response had status 200 and returned a valid
JSON-parseable dict.
If success is provided, assert that the response had that
value for 'success' in the JSON dict.
If message is provided, assert that the response contained that
value for 'message' in the JSON dict.
"""
self.assertEqual(response.status_code, status)
try:
response_dict = json.loads(response.content)
except ValueError:
self.fail("Could not parse response content as JSON: %s"
% str(response.content))
if success is not None:
self.assertEqual(response_dict['success'], success)
if message is not None:
msg = ("'%s' did not contain '%s'" %
(response_dict['message'], message))
self.assertTrue(message in response_dict['message'], msg)
......@@ -8,6 +8,7 @@ from django.contrib.auth import SESSION_KEY, BACKEND_SESSION_KEY, load_backend
from django.contrib.auth.models import AnonymousUser, User
from django.core.exceptions import ObjectDoesNotExist
from django.utils.importlib import import_module
from django.utils.translation import ugettext as _
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
......@@ -15,6 +16,7 @@ from rest_framework.response import Response
from api_manager.permissions import ApiKeyHeaderPermission
from api_manager.serializers import UserSerializer
from student.models import LoginFailures
def _generate_base_uri(request):
......@@ -45,9 +47,23 @@ def session_list(request):
existing_user = User.objects.get(username=request.DATA['username'])
except ObjectDoesNotExist:
existing_user = None
# see if account has been locked out due to excessive login failures
if existing_user and LoginFailures.is_feature_enabled():
if LoginFailures.is_user_locked_out(existing_user):
response_status = status.HTTP_403_FORBIDDEN
response_data['message'] = _('This account has been temporarily locked due to excessive login failures. '
'Try again later.')
return Response(response_data, status=response_status)
if existing_user:
user = authenticate(username=existing_user.username, password=request.DATA['password'])
if user is not None:
# successful login, clear failed login attempts counters, if applicable
if LoginFailures.is_feature_enabled():
LoginFailures.clear_lockout_counter(user)
if user.is_active:
login(request, user)
response_data['token'] = request.session.session_key
......@@ -59,6 +75,10 @@ def session_list(request):
else:
response_status = status.HTTP_403_FORBIDDEN
else:
# tick the failed login counters if the user exists in the database
if LoginFailures.is_feature_enabled():
LoginFailures.increment_lockout_counter(existing_user)
response_status = status.HTTP_401_UNAUTHORIZED
else:
response_status = status.HTTP_404_NOT_FOUND
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment