Commit ff18827f by Muhammad Shoaib Committed by Chris Dodge

User must reset password functionality added

User must reset password dunctionality added

Added user password reset functionality

Add password reset api and implemented Password history

Remove unused imports, password reset message text
parent 848961c5
...@@ -12,7 +12,6 @@ from django.utils.importlib import import_module ...@@ -12,7 +12,6 @@ from django.utils.importlib import import_module
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from rest_framework import status from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
...@@ -20,8 +19,9 @@ from util.bad_request_rate_limiter import BadRequestRateLimiter ...@@ -20,8 +19,9 @@ from util.bad_request_rate_limiter import BadRequestRateLimiter
from api_manager.permissions import ApiKeyHeaderPermission from api_manager.permissions import ApiKeyHeaderPermission
from api_manager.users.serializers import UserSerializer from api_manager.users.serializers import UserSerializer
from student.models import LoginFailures from student.models import (
LoginFailures, PasswordHistory
)
AUDIT_LOG = logging.getLogger("audit") AUDIT_LOG = logging.getLogger("audit")
...@@ -69,6 +69,15 @@ class SessionsList(APIView): ...@@ -69,6 +69,15 @@ class SessionsList(APIView):
'Try again later.') 'Try again later.')
return Response(response_data, status=response_status) return Response(response_data, status=response_status)
# see if the user must reset his/her password due to any policy settings
if existing_user and PasswordHistory.should_user_reset_password_now(existing_user):
response_status = status.HTTP_403_FORBIDDEN
response_data['message'] = _(
'Your password has expired due to password policy on this account. '
'You must reset your password before you can log in again.'
)
return Response(response_data, status=response_status)
if existing_user: if existing_user:
user = authenticate(username=existing_user.username, password=request.DATA['password']) user = authenticate(username=existing_user.username, password=request.DATA['password'])
if user is not None: if user is not None:
...@@ -142,4 +151,4 @@ class SessionsDetail(APIView): ...@@ -142,4 +151,4 @@ class SessionsDetail(APIView):
user_id = session[SESSION_KEY] user_id = session[SESSION_KEY]
AUDIT_LOG.info(u"API::User session terminated for user-id - {0}".format(user_id)) AUDIT_LOG.info(u"API::User session terminated for user-id - {0}".format(user_id))
session.flush() session.flush()
return Response(response_data, status=status.HTTP_204_NO_CONTENT) return Response(response_data, status=status.HTTP_204_NO_CONTENT)
\ No newline at end of file
"""
Tests for session api with advance security features
"""
import json
import uuid
from mock import patch
from django.test import TestCase
from django.test.utils import override_settings
from django.utils import timezone
from django.utils.translation import ugettext as _
from django.core.cache import cache
from datetime import datetime, timedelta
from freezegun import freeze_time
from pytz import UTC
TEST_API_KEY = str(uuid.uuid4())
@override_settings(EDX_API_KEY=TEST_API_KEY)
@patch.dict("django.conf.settings.FEATURES", {'ENFORCE_PASSWORD_POLICY': True})
@patch.dict("django.conf.settings.FEATURES", {'ADVANCED_SECURITY': True})
@override_settings(PASSWORD_MIN_LENGTH=4, PASSWORD_MAX_LENGTH=12,
PASSWORD_COMPLEXITY={'UPPER': 2, 'LOWER': 2, 'PUNCTUATION': 2, 'DIGITS': 2})
class UserPasswordResetTest(TestCase):
"""
Test api_manager.session.session_list view
"""
def setUp(self):
"""
setup the api urls
"""
self.session_url = '/api/sessions'
self.user_url = '/api/users'
cache.clear()
@override_settings(ADVANCED_SECURITY_CONFIG={'MIN_DAYS_FOR_STUDENT_ACCOUNTS_PASSWORD_RESETS': 5})
def test_user_must_reset_password_after_n_days(self):
"""
Test to ensure that User session login fails
after N days. User must reset his/her
password after N days to login again
"""
response = self._do_post_request(
self.user_url, 'test2', 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True
)
self._assert_response(response, status=201)
user_id = response.data['id']
response = self._do_post_request(self.session_url, 'test2', 'Test.Me64!', secure=True)
self.assertEqual(response.status_code, 201)
reset_time = timezone.now() + timedelta(days=5)
with patch.object(timezone, 'now', return_value=reset_time):
response = self._do_post_request(self.session_url, 'test2', 'Test.Me64!', secure=True)
message =_(
'Your password has expired due to password policy on this account. '
'You must reset your password before you can log in again.'
)
self._assert_response(response, status=403, message=message)
#reset the password and then try login
pass_reset_url = "%s/%s" % (self.user_url, str(user_id))
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64@', secure=True
)
self.assertEqual(response.status_code, 201)
#login successful after reset password
response = self._do_post_request(self.session_url, 'test2', 'Test.Me64@', secure=True)
self.assertEqual(response.status_code, 201)
@override_settings(ADVANCED_SECURITY_CONFIG={'MIN_DIFFERENT_STUDENT_PASSWORDS_BEFORE_REUSE': 4,
'MIN_TIME_IN_DAYS_BETWEEN_ALLOWED_RESETS': 0})
def test_password_reset_not_allowable_reuse(self):
"""
Try resetting user password < 4 and > 4 times and
then use one of the passwords that you have used
before
"""
response = self._do_post_request(
self.user_url, 'test2', 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True
)
self._assert_response(response, status=201)
user_id = response.data['id']
pass_reset_url = "%s/%s" % (self.user_url, str(user_id))
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64#', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64@', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64^', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
#now use previously used password
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64!', secure=True
)
message = _(
"You are re-using a password that you have used recently. You must "
"have 4 distinct password(s) before reusing a previous password."
)
self._assert_response(response, status=403, message=message)
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64&', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
#now use previously used password
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64!', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
@override_settings(ADVANCED_SECURITY_CONFIG={'MIN_TIME_IN_DAYS_BETWEEN_ALLOWED_RESETS': 1})
def test_is_password_reset_too_frequent(self):
"""
Try reset user password before
and after the MIN_TIME_IN_DAYS_BETWEEN_ALLOWED_RESETS
"""
response = self._do_post_request(
self.user_url, 'test2', 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True
)
self._assert_response(response, status=201)
user_id = response.data['id']
pass_reset_url = "%s/%s" % (self.user_url, str(user_id))
response = self._do_post_pass_reset_request(
pass_reset_url, password='NewP@ses34!', secure=True
)
message = _(
"You are resetting passwords too frequently. Due to security policies, "
"1 day(s) must elapse between password resets"
)
self._assert_response(response, status=403, message=message)
reset_time = timezone.now() + timedelta(days=1)
with patch.object(timezone, 'now', return_value=reset_time):
response = self._do_post_pass_reset_request(
pass_reset_url, password='NewP@ses34!', secure=True
)
message = 'Password Reset Successful'
self._assert_response(response, status=201, message=message)
@override_settings(ADVANCED_SECURITY_CONFIG={'MIN_TIME_IN_DAYS_BETWEEN_ALLOWED_RESETS': 0})
def test_password_reset_rate_limiting_unblock(self):
"""
Try (and fail) login user 30 times on invalid password
and then unblock it after 5 minutes
"""
response = self._do_post_request(
self.user_url, 'test2', 'Test.Me64!', email='test@edx.org',
first_name='John', last_name='Doe', secure=True
)
self._assert_response(response, status=201)
user_id = response.data['id']
pass_reset_url = '{}/{}'.format(self.user_url, user_id)
for i in xrange(30):
password = u'test_password{0}'.format(i)
response = self._do_post_pass_reset_request(
'{}/{}'.format(self.user_url, i+200), password=password, secure=True
)
self._assert_response(response, status=404)
response = self._do_post_pass_reset_request(
'{}/{}'.format(self.user_url, '31'), password='Test.Me64@', secure=True
)
message = _('Rate limit exceeded in password_reset.')
self._assert_response(response, status=403, message=message)
# now reset the time to 5 mins from now in future in order to unblock
reset_time = datetime.now(UTC) + timedelta(seconds=300)
with freeze_time(reset_time):
response = self._do_post_pass_reset_request(
pass_reset_url, password='Test.Me64@', secure=True
)
self._assert_response(response, status=201)
def _do_post_request(self, url, username, password, **kwargs):
"""
Post the login info
"""
post_params, extra = {'username': username, 'password': password}, {}
if kwargs.get('email'):
post_params['email'] = kwargs.get('email')
if kwargs.get('first_name'):
post_params['first_name'] = kwargs.get('first_name')
if kwargs.get('last_name'):
post_params['last_name'] = kwargs.get('last_name')
headers = {'X-Edx-Api-Key': TEST_API_KEY, 'Content-Type': 'application/json'}
if kwargs.get('secure', False):
extra['wsgi.url_scheme'] = 'https'
return self.client.post(url, post_params, headers=headers, **extra)
def _do_post_pass_reset_request(self, url, password, **kwargs):
"""
Post the Password Reset info
"""
post_params, extra = {'password': password}, {}
headers = {'X-Edx-Api-Key': TEST_API_KEY, 'Content-Type': 'application/json'}
if kwargs.get('secure', False):
extra['wsgi.url_scheme'] = 'https'
return self.client.post(url, post_params, headers=headers, **extra)
def _assert_response(self, response, status=200, success=None, message=None):
"""
Assert that the response had status 200 and returned a valid
JSON-parseable dict.
If success is provided, assert that the response had that
value for 'success' in the JSON dict.
If message is provided, assert that the response contained that
value for 'message' in the JSON dict.
"""
self.assertEqual(response.status_code, status)
response_dict = json.loads(response.content)
if message is not None:
msg = ("'%s' did not contain '%s'" %
(response_dict['message'], message))
self.assertTrue(message in response_dict['message'], msg)
...@@ -8,9 +8,7 @@ from django.db import IntegrityError ...@@ -8,9 +8,7 @@ from django.db import IntegrityError
from django.core.validators import validate_email, validate_slug, ValidationError from django.core.validators import validate_email, validate_slug, ValidationError
from django.conf import settings from django.conf import settings
from django.utils.translation import get_language, ugettext_lazy as _ from django.utils.translation import get_language, ugettext_lazy as _
from rest_framework import status from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.views import APIView from rest_framework.views import APIView
...@@ -27,6 +25,7 @@ from util.password_policy_validators import ( ...@@ -27,6 +25,7 @@ from util.password_policy_validators import (
validate_password_length, validate_password_complexity, validate_password_length, validate_password_complexity,
validate_password_dictionary validate_password_dictionary
) )
from util.bad_request_rate_limiter import BadRequestRateLimiter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
AUDIT_LOG = logging.getLogger("audit") AUDIT_LOG = logging.getLogger("audit")
...@@ -45,7 +44,6 @@ def _generate_base_uri(request): ...@@ -45,7 +44,6 @@ def _generate_base_uri(request):
) )
return resource_uri return resource_uri
def _serialize_user(response_data, user): def _serialize_user(response_data, user):
""" """
Loads the object data into the response dict Loads the object data into the response dict
...@@ -58,7 +56,6 @@ def _serialize_user(response_data, user): ...@@ -58,7 +56,6 @@ def _serialize_user(response_data, user):
response_data['id'] = user.id response_data['id'] = user.id
return response_data return response_data
def _save_module_position(request, user, course_id, course_descriptor, position): def _save_module_position(request, user, course_id, course_descriptor, position):
""" """
Records the indicated position for the specified course Records the indicated position for the specified course
...@@ -92,6 +89,7 @@ def _save_module_position(request, user, course_id, course_descriptor, position) ...@@ -92,6 +89,7 @@ def _save_module_position(request, user, course_id, course_descriptor, position)
saved_module = get_current_child(parent_module) saved_module = get_current_child(parent_module)
return saved_module.id return saved_module.id
class UsersList(APIView): class UsersList(APIView):
permission_classes = (ApiKeyHeaderPermission,) permission_classes = (ApiKeyHeaderPermission,)
...@@ -207,6 +205,84 @@ class UsersDetail(APIView): ...@@ -207,6 +205,84 @@ class UsersDetail(APIView):
pass pass
return Response(response_data, status=status.HTTP_204_NO_CONTENT) return Response(response_data, status=status.HTTP_204_NO_CONTENT)
def post(self, request, user_id, format=None):
response_data = {}
base_uri = _generate_base_uri(request)
# Add some rate limiting here by re-using the RateLimitMixin as a helper class
limiter = BadRequestRateLimiter()
if limiter.is_rate_limit_exceeded(request):
AUDIT_LOG.warning("API::Rate limit exceeded in password_reset")
response_data['message'] = _('Rate limit exceeded in password_reset.')
status_code = status.HTTP_403_FORBIDDEN
return Response(response_data, status=status_code)
try:
existing_user = User.objects.get(id=user_id)
old_password_hash = existing_user.password
_serialize_user(response_data, existing_user)
password = request.DATA['password']
if existing_user:
if settings.FEATURES.get('ENFORCE_PASSWORD_POLICY', False):
try:
validate_password_length(password)
validate_password_complexity(password)
validate_password_dictionary(password)
except ValidationError, err:
# bad user? tick the rate limiter counter
AUDIT_LOG.warning("API::Bad password in password_reset.")
status_code = status.HTTP_400_BAD_REQUEST
response_data['message'] = _('Password: ') + '; '.join(err.messages)
return Response(response_data, status=status_code)
# also, check the password reuse policy
err_msg = None
if not PasswordHistory.is_allowable_password_reuse(existing_user, password):
if existing_user.is_staff:
num_distinct = settings.ADVANCED_SECURITY_CONFIG['MIN_DIFFERENT_STAFF_PASSWORDS_BEFORE_REUSE']
else:
num_distinct = settings.ADVANCED_SECURITY_CONFIG['MIN_DIFFERENT_STUDENT_PASSWORDS_BEFORE_REUSE']
err_msg = _(
"You are re-using a password that you have used recently. You must "
"have {0} distinct password(s) before reusing a previous password."
).format(num_distinct)
# also, check to see if passwords are getting reset too frequent
if PasswordHistory.is_password_reset_too_soon(existing_user):
num_days = settings.ADVANCED_SECURITY_CONFIG['MIN_TIME_IN_DAYS_BETWEEN_ALLOWED_RESETS']
err_msg = _(
"You are resetting passwords too frequently. Due to security policies, "
"{0} day(s) must elapse between password resets"
).format(num_days)
if err_msg:
# We have an password reset attempt which violates some security policy,
status_code = status.HTTP_403_FORBIDDEN
response_data['message'] = err_msg
return Response(response_data, status=status_code)
existing_user.is_active = True
existing_user.set_password(password)
existing_user.save()
update_user_password_hash = existing_user.password
if update_user_password_hash != old_password_hash:
# add this account creation to password history
# NOTE, this will be a NOP unless the feature has been turned on in configuration
password_history_entry = PasswordHistory()
password_history_entry.create(existing_user)
status_code = status.HTTP_201_CREATED
response_data['uri'] = base_uri
response_data['message'] = 'Password Reset Successful'
else:
status_code = status.HTTP_404_NOT_FOUND
response_data['message'] = 'User not exist'
except ObjectDoesNotExist:
limiter.tick_bad_request_counter(request)
return Response(response_data, status=status.HTTP_404_NOT_FOUND)
return Response(response_data, status=status_code)
class UsersGroupsList(APIView): class UsersGroupsList(APIView):
permission_classes = (ApiKeyHeaderPermission,) permission_classes = (ApiKeyHeaderPermission,)
...@@ -265,7 +341,6 @@ class UsersGroupsList(APIView): ...@@ -265,7 +341,6 @@ class UsersGroupsList(APIView):
return Response(response_data, status=response_status) return Response(response_data, status=response_status)
class UsersGroupsDetail(APIView): class UsersGroupsDetail(APIView):
permission_classes = (ApiKeyHeaderPermission,) permission_classes = (ApiKeyHeaderPermission,)
...@@ -430,4 +505,4 @@ class UsersCoursesDetail(APIView): ...@@ -430,4 +505,4 @@ class UsersCoursesDetail(APIView):
user = None user = None
if user: if user:
CourseEnrollment.unenroll(user, course_id) CourseEnrollment.unenroll(user, course_id)
return Response({}, status=status.HTTP_204_NO_CONTENT) return Response({}, status=status.HTTP_204_NO_CONTENT)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment