Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-platform
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-platform
Commits
1469cee9
Commit
1469cee9
authored
Apr 28, 2014
by
Asad Iqbal
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Adding ratelimiting on login api
parent
3732bd16
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
122 additions
and
0 deletions
+122
-0
lms/djangoapps/api_manager/sessions_views.py
+8
-0
lms/djangoapps/api_manager/tests/test_session_login_ratelimit.py
+114
-0
No files found.
lms/djangoapps/api_manager/sessions_views.py
View file @
1469cee9
...
@@ -13,6 +13,7 @@ from django.utils.translation import ugettext as _
...
@@ -13,6 +13,7 @@ from django.utils.translation import ugettext as _
from
rest_framework
import
status
from
rest_framework
import
status
from
rest_framework.decorators
import
api_view
,
permission_classes
from
rest_framework.decorators
import
api_view
,
permission_classes
from
rest_framework.response
import
Response
from
rest_framework.response
import
Response
from
util.bad_request_rate_limiter
import
BadRequestRateLimiter
from
api_manager.permissions
import
ApiKeyHeaderPermission
from
api_manager.permissions
import
ApiKeyHeaderPermission
from
api_manager.serializers
import
UserSerializer
from
api_manager.serializers
import
UserSerializer
...
@@ -42,6 +43,12 @@ def session_list(request):
...
@@ -42,6 +43,12 @@ def session_list(request):
1. Open edX username/password
1. Open edX username/password
"""
"""
response_data
=
{}
response_data
=
{}
# Add some rate limiting here by re-using the RateLimitMixin as a helper class
limiter
=
BadRequestRateLimiter
()
if
limiter
.
is_rate_limit_exceeded
(
request
):
response_data
[
'message'
]
=
_
(
'Rate limit exceeded in api login.'
)
return
Response
(
response_data
,
status
=
status
.
HTTP_403_FORBIDDEN
)
base_uri
=
_generate_base_uri
(
request
)
base_uri
=
_generate_base_uri
(
request
)
try
:
try
:
existing_user
=
User
.
objects
.
get
(
username
=
request
.
DATA
[
'username'
])
existing_user
=
User
.
objects
.
get
(
username
=
request
.
DATA
[
'username'
])
...
@@ -75,6 +82,7 @@ def session_list(request):
...
@@ -75,6 +82,7 @@ def session_list(request):
else
:
else
:
response_status
=
status
.
HTTP_403_FORBIDDEN
response_status
=
status
.
HTTP_403_FORBIDDEN
else
:
else
:
limiter
.
tick_bad_request_counter
(
request
)
# tick the failed login counters if the user exists in the database
# tick the failed login counters if the user exists in the database
if
LoginFailures
.
is_feature_enabled
():
if
LoginFailures
.
is_feature_enabled
():
LoginFailures
.
increment_lockout_counter
(
existing_user
)
LoginFailures
.
increment_lockout_counter
(
existing_user
)
...
...
lms/djangoapps/api_manager/tests/test_session_login_ratelimit.py
0 → 100644
View file @
1469cee9
import
json
import
uuid
import
unittest
from
mock
import
patch
from
datetime
import
datetime
,
timedelta
from
freezegun
import
freeze_time
from
pytz
import
UTC
from
django.test
import
TestCase
from
django.test.client
import
Client
from
django.test.utils
import
override_settings
from
django.utils.translation
import
ugettext
as
_
from
django.conf
import
settings
from
django.core.cache
import
cache
from
student.tests.factories
import
UserFactory
TEST_API_KEY
=
str
(
uuid
.
uuid4
())
@override_settings
(
EDX_API_KEY
=
TEST_API_KEY
)
@patch.dict
(
"django.conf.settings.FEATURES"
,
{
'ENABLE_MAX_FAILED_LOGIN_ATTEMPTS'
:
False
})
class
SessionApiRateLimitingProtectionTest
(
TestCase
):
"""
Test api_manager.session.login.ratelimit
"""
def
setUp
(
self
):
"""
Create one user and save it to the database
"""
self
.
user
=
UserFactory
.
build
(
username
=
'test'
,
email
=
'test@edx.org'
)
self
.
user
.
set_password
(
'test_password'
)
self
.
user
.
save
()
# Create the test client
self
.
client
=
Client
()
cache
.
clear
()
self
.
session_url
=
'/api/sessions'
def
test_login_ratelimiting_protection
(
self
):
""" Try (and fail) login user 30 times on invalid password """
for
i
in
xrange
(
30
):
password
=
u'test_password{0}'
.
format
(
i
)
response
=
self
.
_do_post_request
(
self
.
session_url
,
'test'
,
password
,
secure
=
True
)
self
.
assertEqual
(
response
.
status_code
,
401
)
# then the rate limiter should kick in and give a HttpForbidden response
response
=
self
.
_do_post_request
(
self
.
session_url
,
'test'
,
'test_password'
,
secure
=
True
)
message
=
_
(
'Rate limit exceeded in api login.'
)
self
.
_assert_response
(
response
,
status
=
403
,
message
=
message
)
def
test_login_ratelimiting_unblock
(
self
):
""" Try (and fail) login user 30 times on invalid password """
for
i
in
xrange
(
30
):
password
=
u'test_password{0}'
.
format
(
i
)
response
=
self
.
_do_post_request
(
self
.
session_url
,
'test'
,
password
,
secure
=
True
)
self
.
assertEqual
(
response
.
status_code
,
401
)
# then the rate limiter should kick in and give a HttpForbidden response
response
=
self
.
_do_post_request
(
self
.
session_url
,
'test'
,
'test_password'
,
secure
=
True
)
message
=
_
(
'Rate limit exceeded in api login.'
)
self
.
_assert_response
(
response
,
status
=
403
,
message
=
message
)
# now reset the time to 5 mins from now in future in order to unblock
reset_time
=
datetime
.
now
(
UTC
)
+
timedelta
(
seconds
=
300
)
with
freeze_time
(
reset_time
):
response
=
self
.
_do_post_request
(
self
.
session_url
,
'test'
,
'test_password'
,
secure
=
True
)
self
.
_assert_response
(
response
,
status
=
201
)
def
_do_post_request
(
self
,
url
,
username
,
password
,
**
kwargs
):
"""
Post the login info
"""
post_params
,
extra
=
{
'username'
:
username
,
'password'
:
password
},
{}
if
kwargs
.
get
(
'email'
):
post_params
[
'email'
]
=
kwargs
.
get
(
'email'
)
if
kwargs
.
get
(
'first_name'
):
post_params
[
'first_name'
]
=
kwargs
.
get
(
'first_name'
)
if
kwargs
.
get
(
'last_name'
):
post_params
[
'last_name'
]
=
kwargs
.
get
(
'last_name'
)
headers
=
{
'X-Edx-Api-Key'
:
TEST_API_KEY
,
'Content-Type'
:
'application/json'
}
if
kwargs
.
get
(
'secure'
,
False
):
extra
[
'wsgi.url_scheme'
]
=
'https'
return
self
.
client
.
post
(
url
,
post_params
,
headers
=
headers
,
**
extra
)
def
_assert_response
(
self
,
response
,
status
=
200
,
success
=
None
,
message
=
None
):
"""
Assert that the response had status 200 and returned a valid
JSON-parseable dict.
If success is provided, assert that the response had that
value for 'success' in the JSON dict.
If message is provided, assert that the response contained that
value for 'message' in the JSON dict.
"""
self
.
assertEqual
(
response
.
status_code
,
status
)
try
:
response_dict
=
json
.
loads
(
response
.
content
)
except
ValueError
:
self
.
fail
(
"Could not parse response content as JSON:
%
s"
%
str
(
response
.
content
))
if
success
is
not
None
:
self
.
assertEqual
(
response_dict
[
'success'
],
success
)
if
message
is
not
None
:
msg
=
(
"'
%
s' did not contain '
%
s'"
%
(
response_dict
[
'message'
],
message
))
self
.
assertTrue
(
message
in
response_dict
[
'message'
],
msg
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment