Commit 1e4f1b58 by Diana Huang

Allow the enrollment API to be accessed via API keys.

parent 28a3e9f5
......@@ -16,11 +16,13 @@ from util.testing import UrlResetMixin
from enrollment import api
from enrollment.errors import CourseEnrollmentError
from openedx.core.djangoapps.user_api.models import UserOrgTag
from django.test.utils import override_settings
from student.tests.factories import UserFactory, CourseModeFactory
from student.models import CourseEnrollment
from embargo.test_utils import restrict_course
@override_settings(EDX_API_KEY="i am a key")
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EnrollmentTest(ModuleStoreTestCase, APITestCase):
......@@ -30,12 +32,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
EMAIL = ""
PASSWORD = "edx"
API_KEY = "i am a key"
def setUp(self):
""" Create a course and user, then log in. """
super(EnrollmentTest, self).setUp()
self.course = CourseFactory.create()
self.user = UserFactory.create(username=self.USERNAME, email=self.EMAIL, password=self.PASSWORD)
self.other_user = UserFactory.create()
self.client.login(username=self.USERNAME, password=self.PASSWORD)
......@@ -179,7 +183,10 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
self._create_enrollment(username='not_the_user', expected_status=status.HTTP_404_NOT_FOUND)
self._create_enrollment(username=self.other_user.username, expected_status=status.HTTP_404_NOT_FOUND)
# Verify that the server still has access to this endpoint.
self._create_enrollment(username=self.other_user.username, as_server=True)
def test_user_does_not_match_param_for_list(self):
......@@ -187,8 +194,14 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
resp = self.client.get(reverse('courseenrollments'), {"user": "not_the_user"})
resp = self.client.get(reverse('courseenrollments'), {"user": self.other_user.username})
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
# Verify that the server still has access to this endpoint.
resp = self.client.get(
reverse('courseenrollments'), {"user": self.other_user.username}, **{'HTTP_X_EDX_API_KEY': self.API_KEY}
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_user_does_not_match_param(self):
......@@ -197,9 +210,16 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
resp = self.client.get(
reverse('courseenrollment', kwargs={"user": "not_the_user", "course_id": unicode(})
reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(})
# Verify that the server still has access to this endpoint.
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
resp = self.client.get(
reverse('courseenrollment', kwargs={"user": self.other_user.username, "course_id": unicode(}),
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_get_course_details(self):
......@@ -237,7 +257,26 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None):
def test_enrollment_already_enrolled(self):
response = self._create_enrollment()
repeat_response = self._create_enrollment()
self.assertEqual(json.loads(response.content), json.loads(repeat_response.content))
def test_get_enrollment_with_invalid_key(self):
resp =
'course_details': {
'course_id': 'invalidcourse'
'user': self.user.username
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("No course ", resp.content)
def _create_enrollment(self, course_id=None, username=None, expected_status=status.HTTP_200_OK, email_opt_in=None, as_server=False):
"""Enroll in the course and verify the URL we are sent to. """
course_id = unicode( if course_id is None else course_id
username = self.user.username if username is None else username
......@@ -250,7 +289,11 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
if email_opt_in is not None:
params['email_opt_in'] = email_opt_in
resp ='courseenrollments'), params, format='json')
if as_server:
resp ='courseenrollments'), params, format='json', **{'HTTP_X_EDX_API_KEY': self.API_KEY})
resp ='courseenrollments'), params, format='json')
self.assertEqual(resp.status_code, expected_status)
if expected_status == status.HTTP_200_OK:
......@@ -260,25 +303,6 @@ class EnrollmentTest(ModuleStoreTestCase, APITestCase):
return resp
def test_enrollment_already_enrolled(self):
response = self._create_enrollment()
repeat_response = self._create_enrollment()
self.assertEqual(json.loads(response.content), json.loads(repeat_response.content))
def test_get_enrollment_with_invalid_key(self):
resp =
'course_details': {
'course_id': 'invalidcourse'
'user': self.user.username
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("No course ", resp.content)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class EnrollmentEmbargoTest(UrlResetMixin, ModuleStoreTestCase):
......@@ -8,6 +8,7 @@ from django.conf import settings
from opaque_keys import InvalidKeyError
from opaque_keys.edx.locator import CourseLocator
from openedx.core.djangoapps.user_api import api as user_api
from openedx.core.lib.api.permissions import ApiKeyHeaderPermission, ApiKeyHeaderPermissionIsAuthenticated
from rest_framework import status
from rest_framework import permissions
from rest_framework.response import Response
......@@ -30,8 +31,27 @@ class EnrollmentUserThrottle(UserRateThrottle):
rate = '50/second'
class ApiKeyPermissionMixIn(object):
This mixin is used to provide a convenience function for doing individual permission checks
for the presence of API keys.
def has_api_key_permissions(self, request):
Checks to see if the request was made by a server with an API key.
request (Request): the request being made into the view
True if the request has been made with a valid API key
False otherwise
return ApiKeyHeaderPermission().has_permission(request, self)
class EnrollmentView(APIView):
class EnrollmentView(APIView, ApiKeyPermissionMixIn):
**Use Cases**
......@@ -73,7 +93,7 @@ class EnrollmentView(APIView):
authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser
permission_classes = permissions.IsAuthenticated,
permission_classes = ApiKeyHeaderPermissionIsAuthenticated,
throttle_classes = EnrollmentUserThrottle,
def get(self, request, course_id=None, user=None):
......@@ -94,7 +114,7 @@ class EnrollmentView(APIView):
user = user if user else request.user.username
if request.user.username != user:
if request.user.username != user and not self.has_api_key_permissions(request):
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
# other users, do not let them deduce the existence of an enrollment.
return Response(status=status.HTTP_404_NOT_FOUND)
......@@ -182,7 +202,7 @@ class EnrollmentCourseDetailView(APIView):
class EnrollmentListView(APIView):
class EnrollmentListView(APIView, ApiKeyPermissionMixIn):
**Use Cases**
......@@ -245,7 +265,7 @@ class EnrollmentListView(APIView):
authentication_classes = OAuth2AuthenticationAllowInactiveUser, SessionAuthenticationAllowInactiveUser
permission_classes = permissions.IsAuthenticated,
permission_classes = ApiKeyHeaderPermissionIsAuthenticated,
throttle_classes = EnrollmentUserThrottle,
def get(self, request):
......@@ -253,7 +273,7 @@ class EnrollmentListView(APIView):
Gets a list of all course enrollments for the currently logged in user.
user = request.GET.get('user', request.user.username)
if request.user.username != user:
if request.user.username != user and not self.has_api_key_permissions(request):
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
# other users, do not let them deduce the existence of an enrollment.
return Response(status=status.HTTP_404_NOT_FOUND)
......@@ -276,7 +296,7 @@ class EnrollmentListView(APIView):
user = request.DATA.get('user', request.user.username)
if not user:
user = request.user.username
if user != request.user.username:
if user != request.user.username and not self.has_api_key_permissions(request):
# Return a 404 instead of a 403 (Unauthorized). If one user is looking up
# other users, do not let them deduce the existence of an enrollment.
return Response(status=status.HTTP_404_NOT_FOUND)
......@@ -21,6 +21,19 @@ class ApiKeyHeaderPermission(permissions.BasePermission):
class ApiKeyHeaderPermissionIsAuthenticated(ApiKeyHeaderPermission, permissions.IsAuthenticated):
Allow someone to access the view if they have the API key OR they are authenticated.
See ApiKeyHeaderPermission for more information how the API key portion is implemented.
def has_permission(self, request, view):
#TODO We can optimize this later on when we know which of these methods is used more often.
api_permissions = ApiKeyHeaderPermission.has_permission(self, request, view)
is_authenticated_permissions = permissions.IsAuthenticated.has_permission(self, request, view)
return api_permissions or is_authenticated_permissions
class IsAuthenticatedOrDebug(permissions.BasePermission):
Allows access only to authenticated users, or anyone if debug mode is enabled.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment