Commit c474ce64 by Clinton Blackburn

Merge pull request #10634 from edx/clintonb/credit-api-drf

Rewrote Credit API
parents 996da3d7 92153752
......@@ -393,9 +393,6 @@ FEATURES = {
# Enable OpenBadge support. See the BADGR_* settings later in this file.
'ENABLE_OPENBADGES': False,
# Credit course API
'ENABLE_CREDIT_API': True,
# The block types to disable need to be specified in "x block disable config" in django admin.
'ENABLE_DISABLING_XBLOCK_TYPES': True,
......
......@@ -68,8 +68,6 @@ FEATURES['ENABLE_SHOPPING_CART'] = True
FEATURES['ENABLE_VERIFIED_CERTIFICATES'] = True
FEATURES['ENABLE_CREDIT_API'] = True
# Enable this feature for course staff grade downloads, to enable acceptance tests
FEATURES['ENABLE_S3_GRADE_DOWNLOADS'] = True
FEATURES['ALLOW_COURSE_STAFF_GRADE_DOWNLOADS'] = True
......
......@@ -16,6 +16,8 @@ var edx = edx || {};
headers: {
'X-CSRFToken': $.cookie('csrftoken')
},
dataType: 'json',
contentType: 'application/json',
data: JSON.stringify({
'course_key': courseKey,
'username': username
......
......@@ -140,7 +140,11 @@ var edx = edx || {};
return $.ajax({
url: _.sprintf(providerUrl, providerId),
type: 'GET',
dataType: 'json'
dataType: 'json',
contentType: 'application/json',
headers: {
'X-CSRFToken': $.cookie('csrftoken')
}
}).retry({times: 5, timeout: 2000, statusCodes: [404]});
},
/**
......
......@@ -7,8 +7,8 @@
</div>
<div class="provider-more-info">
<%= interpolate(
gettext("To finalize course credit, %(provider_id)s requires %(platform_name)s learners to submit a credit request."),
{ provider_id: provider_id.toUpperCase(), platform_name: platformName }, true
gettext("To finalize course credit, %(display_name)s requires %(platform_name)s learners to submit a credit request."),
{ display_name: display_name, platform_name: platformName }, true
) %>
</div>
<div class="provider-instructions">
......@@ -21,7 +21,7 @@
<%= interpolate("<img src='%s' alt='%s'></image>", [thumbnail_url, display_name]) %>
</div>
<div class="complete-order">
<%= interpolate('<button data-provider="%s" data-course-key="%s" data-username="%s" class="complete-course" onClick=completeOrder(this)>%s</button>', [provider_id, course_key, username,
<%= interpolate('<button data-provider="%s" data-course-key="%s" data-username="%s" class="complete-course" onClick=completeOrder(this)>%s</button>', [id, course_key, username,
gettext( "Get Credit")]) %>
</div>
</div>
......@@ -98,6 +98,7 @@ urlpatterns = (
url(r'^api/val/v0/', include('edxval.urls')),
url(r'^api/commerce/', include('commerce.api.urls', namespace='commerce_api')),
url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
)
if settings.FEATURES["ENABLE_COMBINED_LOGIN_REGISTRATION"]:
......@@ -115,12 +116,6 @@ else:
url(r'^register$', 'student.views.register_user', name="register_user"),
)
if settings.FEATURES.get("ENABLE_CREDIT_API"):
# Credit API end-points
urlpatterns += (
url(r'^api/credit/', include('openedx.core.djangoapps.credit.urls', app_name="credit", namespace='credit')),
)
if settings.FEATURES["ENABLE_MOBILE_REST_API"]:
urlpatterns += (
url(r'^api/mobile/v0.5/', include('mobile_api.urls')),
......
......@@ -5,6 +5,8 @@ whether a user has satisfied those requirements.
import logging
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.credit.exceptions import InvalidCreditRequirements, InvalidCreditCourse
from openedx.core.djangoapps.credit.email_utils import send_credit_notifications
from openedx.core.djangoapps.credit.models import (
......@@ -14,8 +16,7 @@ from openedx.core.djangoapps.credit.models import (
CreditEligibility,
)
from opaque_keys.edx.keys import CourseKey
# TODO: Cleanup this mess! ECOM-2908
log = logging.getLogger(__name__)
......@@ -246,8 +247,7 @@ def set_credit_requirement_status(username, course_key, req_namespace, req_name,
# Find the requirement we're trying to set
req_to_update = next((
req for req in reqs
if req.namespace == req_namespace
and req.name == req_name
if req.namespace == req_namespace and req.name == req_name
), None)
# If we can't find the requirement, then the most likely explanation
......
......@@ -4,12 +4,12 @@ API for initiating and tracking requests for credit from a provider.
import datetime
import logging
import pytz
import uuid
import pytz
from django.db import transaction
from lms.djangoapps.django_comment_client.utils import JsonResponse
from lms.djangoapps.django_comment_client.utils import JsonResponse
from openedx.core.djangoapps.credit.exceptions import (
UserIsNotEligible,
CreditProviderNotConfigured,
......@@ -28,6 +28,8 @@ from student.models import User
from util.date_utils import to_timestamp
# TODO: Cleanup this mess! ECOM-2908
log = logging.getLogger(__name__)
......@@ -257,12 +259,10 @@ def create_credit_request(course_key, provider_id, username):
final_grade = unicode(final_grade)
except (CreditRequirementStatus.DoesNotExist, TypeError, KeyError):
log.exception(
"Could not retrieve final grade from the credit eligibility table "
"for user %s in course %s.",
user.id, course_key
)
raise UserIsNotEligible
msg = 'Could not retrieve final grade from the credit eligibility table for ' \
'user [{user_id}] in course [{course_key}].'.format(user_id=user.id, course_key=course_key)
log.exception(msg)
raise UserIsNotEligible(msg)
parameters = {
"request_uuid": credit_request.uuid,
......
"""Exceptions raised by the credit API. """
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from rest_framework import status
from rest_framework.exceptions import APIException
# TODO: Cleanup this mess! ECOM-2908
class CreditApiBadRequest(Exception):
......@@ -56,3 +62,25 @@ class InvalidCreditStatus(CreditApiBadRequest):
The status is not either "approved" or "rejected".
"""
pass
class InvalidCreditRequest(APIException):
""" API request is invalid. """
status_code = status.HTTP_400_BAD_REQUEST
class UserNotEligibleException(InvalidCreditRequest):
""" User not eligible for credit for a given course. """
def __init__(self, course_key, username):
detail = _('[{username}] is not eligible for credit for [{course_key}].').format(username=username,
course_key=course_key)
super(UserNotEligibleException, self).__init__(detail)
class InvalidCourseKey(InvalidCreditRequest):
""" Course key is invalid. """
def __init__(self, course_key):
detail = _('[{course_key}] is not a valid course key.').format(course_key=course_key)
super(InvalidCourseKey, self).__init__(detail)
......@@ -25,6 +25,7 @@ from xmodule_django.models import CourseKeyField
from django.utils.translation import ugettext_lazy
CREDIT_PROVIDER_ID_REGEX = r"[a-z,A-Z,0-9,\-]+"
log = logging.getLogger(__name__)
......@@ -42,7 +43,7 @@ class CreditProvider(TimeStampedModel):
unique=True,
validators=[
RegexValidator(
regex=r"^[a-z,A-Z,0-9,\-]+$",
regex=CREDIT_PROVIDER_ID_REGEX,
message="Only alphanumeric characters and hyphens (-) are allowed",
code="invalid_provider_id",
)
......@@ -498,10 +499,7 @@ def default_deadline_for_credit_eligibility(): # pylint: disable=invalid-name
class CreditEligibility(TimeStampedModel):
"""
A record of a user's eligibility for credit from a specific credit
provider for a specific course.
"""
""" A record of a user's eligibility for credit for a specific course. """
username = models.CharField(max_length=255, db_index=True)
course = models.ForeignKey(CreditCourse, related_name="eligibilities")
......
""" Credit API Serializers """
from rest_framework import serializers
from __future__ import unicode_literals
import datetime
import logging
from opaque_keys.edx.keys import CourseKey
from django.conf import settings
from opaque_keys import InvalidKeyError
from openedx.core.djangoapps.credit.models import CreditCourse
from opaque_keys.edx.keys import CourseKey
import pytz
from rest_framework import serializers
from rest_framework.exceptions import PermissionDenied
from openedx.core.djangoapps.credit.models import CreditCourse, CreditProvider, CreditEligibility, CreditRequest
from openedx.core.djangoapps.credit.signature import get_shared_secret_key, signature
from util.date_utils import from_timestamp
log = logging.getLogger(__name__)
class CourseKeyField(serializers.Field):
"""
Serializer field for a model CourseKey field.
"""
""" Serializer field for a model CourseKey field. """
def to_representation(self, data):
"""Convert a course key to unicode. """
......@@ -32,3 +41,81 @@ class CreditCourseSerializer(serializers.ModelSerializer):
class Meta(object):
model = CreditCourse
exclude = ('id',)
class CreditProviderSerializer(serializers.ModelSerializer):
""" CreditProvider """
id = serializers.CharField(source='provider_id') # pylint:disable=invalid-name
description = serializers.CharField(source='provider_description')
status_url = serializers.URLField(source='provider_status_url')
url = serializers.URLField(source='provider_url')
class Meta(object):
model = CreditProvider
fields = ('id', 'display_name', 'url', 'status_url', 'description', 'enable_integration',
'fulfillment_instructions', 'thumbnail_url',)
class CreditEligibilitySerializer(serializers.ModelSerializer):
""" CreditEligibility serializer. """
course_key = serializers.SerializerMethodField()
def get_course_key(self, obj):
""" Returns the course key associated with the course. """
return unicode(obj.course.course_key)
class Meta(object):
model = CreditEligibility
fields = ('username', 'course_key', 'deadline',)
class CreditProviderCallbackSerializer(serializers.Serializer): # pylint:disable=abstract-method
"""
Serializer for input to the CreditProviderCallback view.
This is used solely for validating the input.
"""
request_uuid = serializers.CharField(required=True)
status = serializers.ChoiceField(required=True, choices=CreditRequest.REQUEST_STATUS_CHOICES)
timestamp = serializers.IntegerField(required=True)
signature = serializers.CharField(required=True)
def __init__(self, **kwargs):
self.provider = kwargs.pop('provider', None)
super(CreditProviderCallbackSerializer, self).__init__(**kwargs)
def validate_timestamp(self, value):
""" Ensure the request has been received in a timely manner. """
date_time = from_timestamp(value)
# Ensure we converted the timestamp to a datetime
if not date_time:
msg = '[{}] is not a valid timestamp'.format(value)
log.warning(msg)
raise serializers.ValidationError(msg)
elapsed = (datetime.datetime.now(pytz.UTC) - date_time).total_seconds()
if elapsed > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
msg = '[{value}] is too far in the past (over [{elapsed}] seconds).'.format(value=value, elapsed=elapsed)
log.warning(msg)
raise serializers.ValidationError(msg)
return value
def validate_signature(self, value):
""" Validate the signature and ensure the provider is setup properly. """
provider_id = self.provider.provider_id
secret_key = get_shared_secret_key(provider_id)
if secret_key is None:
msg = 'Could not retrieve secret key for credit provider [{}]. ' \
'Unable to validate requests from provider.'.format(provider_id)
log.error(msg)
raise PermissionDenied(msg)
data = self.initial_data
actual_signature = data["signature"]
if signature(data, secret_key) != actual_signature:
msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id)
raise PermissionDenied(msg)
return value
......@@ -59,5 +59,5 @@ def signature(params, shared_secret):
for key in sorted(params.keys())
if key != u"signature"
])
hasher = hmac.new(shared_secret, encoded_params.encode('utf-8'), hashlib.sha256)
hasher = hmac.new(shared_secret.encode('utf-8'), encoded_params.encode('utf-8'), hashlib.sha256)
return hasher.hexdigest()
......@@ -2,13 +2,9 @@
This file contains celery tasks for credit course views.
"""
import datetime
from pytz import UTC
from django.conf import settings
from celery import task
from celery.utils.log import get_task_logger
from django.conf import settings
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey, UsageKey
......
# pylint:disable=missing-docstring,no-member
import datetime
import json
import uuid # pylint:disable=unused-import
from django.contrib.auth.models import User
import factory
from factory.fuzzy import FuzzyText
import pytz
from openedx.core.djangoapps.credit.models import CreditProvider, CreditEligibility, CreditCourse, CreditRequest
from util.date_utils import to_timestamp
class CreditCourseFactory(factory.DjangoModelFactory):
class Meta(object):
model = CreditCourse
course_key = FuzzyText(prefix='fake.org/', suffix='/fake.run')
enabled = True
class CreditProviderFactory(factory.DjangoModelFactory):
class Meta(object):
model = CreditProvider
provider_id = FuzzyText(length=5)
provider_url = FuzzyText(prefix='http://')
class CreditEligibilityFactory(factory.DjangoModelFactory):
class Meta(object):
model = CreditEligibility
course = factory.SubFactory(CreditCourseFactory)
class CreditRequestFactory(factory.DjangoModelFactory):
class Meta(object):
model = CreditRequest
uuid = factory.LazyAttribute(lambda o: uuid.uuid4().hex) # pylint: disable=undefined-variable
# pylint: disable=access-member-before-definition,attribute-defined-outside-init,no-self-argument,unused-argument
@factory.post_generation
def post(obj, create, extracted, **kwargs):
"""
Post-generation handler.
Sets up parameters field.
"""
if not obj.parameters:
course_key = obj.course.course_key
user = User.objects.get(username=obj.username)
user_profile = user.profile
# pylint:disable=access-member-before-definition
obj.parameters = json.dumps({
"request_uuid": obj.uuid,
"timestamp": to_timestamp(datetime.datetime.now(pytz.UTC)),
"course_org": course_key.org,
"course_num": course_key.course,
"course_run": course_key.run,
"final_grade": '0.96',
"user_username": user.username,
"user_email": user.email,
"user_full_name": user_profile.name,
"user_mailing_address": "",
"user_country": user_profile.country.code or "",
})
obj.save()
......@@ -2,17 +2,13 @@
Tests for the API functions in the credit app.
"""
import datetime
import json
import unittest
import ddt
from django.conf import settings
from django.core import mail
from django.test import TestCase
from django.test.utils import override_settings
from django.db import connection, transaction
from django.core.urlresolvers import reverse, NoReverseMatch
from mock import patch
from opaque_keys.edx.keys import CourseKey
import pytz
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
......@@ -39,8 +35,6 @@ from student.tests.factories import UserFactory
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
from util.testing import UrlResetMixin
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY,
......@@ -880,119 +874,3 @@ class CreditProviderIntegrationApiTests(CreditApiTestBase):
"""Check the user's credit status. """
statuses = api.get_credit_requests_for_user(self.USER_INFO["username"])
self.assertEqual(statuses[0]["status"], expected_status)
class CreditApiFeatureFlagTest(UrlResetMixin, TestCase):
"""
Base class to test the credit api urls.
"""
def setUp(self, **kwargs):
enable_credit_api = kwargs.get('enable_credit_api', False)
with patch.dict('django.conf.settings.FEATURES', {'ENABLE_CREDIT_API': enable_credit_api}):
super(CreditApiFeatureFlagTest, self).setUp('lms.urls')
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditApiFeatureFlagDisabledTests(CreditApiFeatureFlagTest):
"""
Test Python API for credit provider api with feature flag
'ENABLE_CREDIT_API' disabled.
"""
PROVIDER_ID = "hogwarts"
def setUp(self):
super(CreditApiFeatureFlagDisabledTests, self).setUp(enable_credit_api=False)
def test_get_credit_provider_details(self):
"""
Test that 'get_provider_info' api url not found.
"""
with self.assertRaises(NoReverseMatch):
reverse('credit:get_provider_info', args=[self.PROVIDER_ID])
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditApiFeatureFlagEnabledTests(CreditApiFeatureFlagTest, CreditApiTestBase):
"""
Test Python API for credit provider api with feature flag
'ENABLE_CREDIT_API' enabled.
"""
USER_INFO = {
"username": "bob",
"email": "bob@example.com",
"full_name": "Bob",
"mailing_address": "123 Fake Street, Cambridge MA",
"country": "US",
}
FINAL_GRADE = 0.95
def setUp(self):
super(CreditApiFeatureFlagEnabledTests, self).setUp(enable_credit_api=True)
self.user = UserFactory(
username=self.USER_INFO['username'],
email=self.USER_INFO['email'],
)
self.user.profile.name = self.USER_INFO['full_name']
self.user.profile.mailing_address = self.USER_INFO['mailing_address']
self.user.profile.country = self.USER_INFO['country']
self.user.profile.save()
# By default, configure the database so that there is a single
# credit requirement that the user has satisfied (minimum grade)
self._configure_credit()
def test_get_credit_provider_details(self):
"""Test that credit api method 'test_get_credit_provider_details'
returns dictionary data related to provided credit provider.
"""
expected_result = {
"provider_id": self.PROVIDER_ID,
"display_name": self.PROVIDER_NAME,
"provider_url": self.PROVIDER_URL,
"provider_status_url": self.PROVIDER_STATUS_URL,
"provider_description": self.PROVIDER_DESCRIPTION,
"enable_integration": self.ENABLE_INTEGRATION,
"fulfillment_instructions": self.FULFILLMENT_INSTRUCTIONS,
"thumbnail_url": self.THUMBNAIL_URL
}
path = reverse('credit:get_provider_info', kwargs={'provider_id': self.PROVIDER_ID})
result = self.client.get(path)
result = json.loads(result.content)
self.assertEqual(result, expected_result)
# now test that user gets empty dict for non existent credit provider
path = reverse('credit:get_provider_info', kwargs={'provider_id': 'fake_provider_id'})
result = self.client.get(path)
result = json.loads(result.content)
self.assertEqual(result, {})
def _configure_credit(self):
"""
Configure a credit course and its requirements.
By default, add a single requirement (minimum grade)
that the user has satisfied.
"""
credit_course = self.add_credit_course()
requirement = CreditRequirement.objects.create(
course=credit_course,
namespace="grade",
name="grade",
active=True
)
status = CreditRequirementStatus.objects.create(
username=self.USER_INFO["username"],
requirement=requirement,
)
status.status = "satisfied"
status.reason = {"final_grade": self.FINAL_GRADE}
status.save()
CreditEligibility.objects.create(
username=self.USER_INFO['username'],
course=CreditCourse.objects.get(course_key=self.course_key)
)
......@@ -5,7 +5,6 @@ Tests for credit course models.
import ddt
from django.test import TestCase
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.credit.models import CreditCourse, CreditRequirement
......@@ -17,7 +16,7 @@ class CreditEligibilityModelTests(TestCase):
Tests for credit models used to track credit eligibility.
"""
def setUp(self, **kwargs):
def setUp(self):
super(CreditEligibilityModelTests, self).setUp()
self.course_key = CourseKey.from_string("edX/DemoX/Demo_Course")
......
""" Tests for Credit API serializers. """
# pylint: disable=no-member
from __future__ import unicode_literals
from django.test import TestCase
from openedx.core.djangoapps.credit import serializers
from openedx.core.djangoapps.credit.tests.factories import CreditProviderFactory, CreditEligibilityFactory
from student.tests.factories import UserFactory
class CreditProviderSerializerTests(TestCase):
""" CreditProviderSerializer tests. """
def test_data(self):
""" Verify the correct fields are serialized. """
provider = CreditProviderFactory(active=False)
serializer = serializers.CreditProviderSerializer(provider)
expected = {
'id': provider.provider_id,
'display_name': provider.display_name,
'url': provider.provider_url,
'status_url': provider.provider_status_url,
'description': provider.provider_description,
'enable_integration': provider.enable_integration,
'fulfillment_instructions': provider.fulfillment_instructions,
'thumbnail_url': provider.thumbnail_url,
}
self.assertDictEqual(serializer.data, expected)
class CreditEligibilitySerializerTests(TestCase):
""" CreditEligibilitySerializer tests. """
def test_data(self):
""" Verify the correct fields are serialized. """
user = UserFactory()
eligibility = CreditEligibilityFactory(username=user.username)
serializer = serializers.CreditEligibilitySerializer(eligibility)
expected = {
'course_key': unicode(eligibility.course.course_key),
'deadline': eligibility.deadline.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
'username': user.username,
}
self.assertDictEqual(serializer.data, expected)
"""
Tests for credit app views.
"""
# pylint: disable=no-member
from __future__ import unicode_literals
import datetime
import json
import unittest
......@@ -10,397 +14,98 @@ from django.conf import settings
from django.core.urlresolvers import reverse
from django.test import TestCase, Client
from django.test.utils import override_settings
from mock import patch
from oauth2_provider.tests.factories import AccessTokenFactory, ClientFactory
from opaque_keys.edx.keys import CourseKey
import pytz
from student.tests.factories import UserFactory
from util.date_utils import to_timestamp
from util.testing import UrlResetMixin
from openedx.core.djangoapps.credit import api
from openedx.core.djangoapps.credit.signature import signature
from openedx.core.djangoapps.credit.serializers import CreditProviderSerializer, CreditEligibilitySerializer
from openedx.core.djangoapps.credit.tests.factories import (
CreditProviderFactory,
CreditEligibilityFactory,
CreditCourseFactory, CreditRequestFactory)
from student.tests.factories import UserFactory, AdminFactory
from openedx.core.djangoapps.credit.models import (
CreditCourse,
CreditProvider,
CreditRequirement,
CreditRequirementStatus,
CreditEligibility,
CreditRequest,
)
CreditProvider, CreditRequest, CreditRequirement, CreditRequirementStatus)
from util.date_utils import to_timestamp
JSON = 'application/json'
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
@ddt.ddt
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY
})
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditProviderViewTests(UrlResetMixin, TestCase):
"""
Tests for HTTP end-points used to issue requests to credit providers
and receive responses approving or denying requests.
"""
USERNAME = "ron"
USER_FULL_NAME = "Ron Weasley"
PASSWORD = "password"
PROVIDER_ID = "hogwarts"
PROVIDER_URL = "https://credit.example.com/request"
COURSE_KEY = CourseKey.from_string("edX/DemoX/Demo_Course")
FINAL_GRADE = 0.95
@patch.dict(settings.FEATURES, {"ENABLE_CREDIT_API": True})
def setUp(self):
"""
Configure a credit course.
"""
super(CreditProviderViewTests, self).setUp()
# Create the test user and log in
self.user = UserFactory(username=self.USERNAME, password=self.PASSWORD)
self.user.profile.name = self.USER_FULL_NAME
self.user.profile.save()
class ApiTestCaseMixin(object):
""" Mixin to aid with API testing. """
success = self.client.login(username=self.USERNAME, password=self.PASSWORD)
self.assertTrue(success, msg="Could not log in")
def assert_error_response(self, response, msg, status_code=400):
""" Validate the response's status and detail message. """
self.assertEqual(response.status_code, status_code)
self.assertDictEqual(response.data, {'detail': msg})
# Enable the course for credit
credit_course = CreditCourse.objects.create(
course_key=self.COURSE_KEY,
enabled=True,
)
# Configure a credit provider for the course
CreditProvider.objects.create(
provider_id=self.PROVIDER_ID,
enable_integration=True,
provider_url=self.PROVIDER_URL,
)
# Add a single credit requirement (final grade)
requirement = CreditRequirement.objects.create(
course=credit_course,
namespace="grade",
name="grade",
)
# Mark the user as having satisfied the requirement
# and eligible for credit.
CreditRequirementStatus.objects.create(
username=self.USERNAME,
requirement=requirement,
status="satisfied",
reason={"final_grade": self.FINAL_GRADE}
)
CreditEligibility.objects.create(
username=self.USERNAME,
course=credit_course,
)
def test_credit_request_and_response(self):
# Initiate a request
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
self.assertEqual(response.status_code, 200)
class UserMixin(object):
""" Test mixin that creates, and authenticates, a new user for every test. """
password = 'password'
list_path = None
# Check that the user's request status is pending
requests = api.get_credit_requests_for_user(self.USERNAME)
self.assertEqual(len(requests), 1)
self.assertEqual(requests[0]["status"], "pending")
def setUp(self):
super(UserMixin, self).setUp()
# Check request parameters
content = json.loads(response.content)
self.assertEqual(content["url"], self.PROVIDER_URL)
self.assertEqual(content["method"], "POST")
self.assertEqual(len(content["parameters"]["request_uuid"]), 32)
self.assertEqual(content["parameters"]["course_org"], "edX")
self.assertEqual(content["parameters"]["course_num"], "DemoX")
self.assertEqual(content["parameters"]["course_run"], "Demo_Course")
self.assertEqual(content["parameters"]["final_grade"], unicode(self.FINAL_GRADE))
self.assertEqual(content["parameters"]["user_username"], self.USERNAME)
self.assertEqual(content["parameters"]["user_full_name"], self.USER_FULL_NAME)
self.assertEqual(content["parameters"]["user_mailing_address"], "")
self.assertEqual(content["parameters"]["user_country"], "")
# This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
if self.list_path:
self.path = reverse(self.list_path)
# The signature is going to change each test run because the request
# is assigned a different UUID each time.
# For this reason, we use the signature function directly
# (the "signature" parameter will be ignored when calculating the signature).
# Other unit tests verify that the signature function is working correctly.
self.assertEqual(
content["parameters"]["signature"],
signature(content["parameters"], TEST_CREDIT_PROVIDER_SECRET_KEY)
)
# Create a user and login, so that we can use session auth for the
# tests that aren't specifically testing authentication or authorization.
self.user = UserFactory(password=self.password, is_staff=True)
self.client.login(username=self.user.username, password=self.password)
# Simulate a response from the credit provider
response = self._credit_provider_callback(
content["parameters"]["request_uuid"],
"approved"
)
self.assertEqual(response.status_code, 200)
# Check that the user's status is approved
requests = api.get_credit_requests_for_user(self.USERNAME)
self.assertEqual(len(requests), 1)
self.assertEqual(requests[0]["status"], "approved")
class AuthMixin(object):
""" Test mixin with methods to test OAuth 2.0 and session authentication. """
def test_request_credit_anonymous_user(self):
def test_authentication_required(self):
""" Verify the endpoint requires authentication. """
self.client.logout()
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
self.assertEqual(response.status_code, 403)
def test_request_credit_for_another_user(self):
response = self._create_credit_request("another_user", self.COURSE_KEY)
self.assertEqual(response.status_code, 403)
@ddt.data(
# Invalid JSON
"{",
# Missing required parameters
json.dumps({"username": USERNAME}),
json.dumps({"course_key": unicode(COURSE_KEY)}),
# Invalid course key format
json.dumps({"username": USERNAME, "course_key": "invalid"}),
)
def test_create_credit_request_invalid_parameters(self, request_data):
url = reverse("credit:create_request", args=[self.PROVIDER_ID])
response = self.client.post(url, data=request_data, content_type=JSON)
self.assertEqual(response.status_code, 400)
def test_credit_provider_callback_validates_signature(self):
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
# Simulate a callback from the credit provider with an invalid signature
# Since the signature is invalid, we respond with a 403 Not Authorized.
response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
self.assertEqual(response.status_code, 403)
def test_credit_provider_callback_validates_timestamp(self):
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
# Simulate a callback from the credit provider with a timestamp too far in the past
# (slightly more than 15 minutes)
# Since the message isn't timely, respond with a 403.
timestamp = to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1))
response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
self.assertEqual(response.status_code, 403)
def test_credit_provider_callback_handles_string_timestamp(self):
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
response = self.client.get(self.path)
self.assertEqual(response.status_code, 401)
# Simulate a callback from the credit provider with a timestamp
# encoded as a string instead of an integer.
timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
response = self._credit_provider_callback(request_uuid, "approved", timestamp=timestamp)
def test_oauth(self):
""" Verify the endpoint supports authentication via OAuth 2.0. """
access_token = AccessTokenFactory(user=self.user, client=ClientFactory()).token
headers = {
'HTTP_AUTHORIZATION': 'Bearer ' + access_token
}
self.client.logout()
response = self.client.get(self.path, **headers)
self.assertEqual(response.status_code, 200)
def test_credit_provider_callback_is_idempotent(self):
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
# Initially, the status should be "pending"
self._assert_request_status(request_uuid, "pending")
# First call sets the status to approved
self._credit_provider_callback(request_uuid, "approved")
self._assert_request_status(request_uuid, "approved")
# Second call succeeds as well; status is still approved
self._credit_provider_callback(request_uuid, "approved")
self._assert_request_status(request_uuid, "approved")
@ddt.data(
# Invalid JSON
"{",
# Not a dictionary
"4",
# Invalid timestamp format
json.dumps({
"request_uuid": "557168d0f7664fe59097106c67c3f847",
"status": "approved",
"timestamp": "invalid",
"signature": "7685ae1c8f763597ee7ce526685c5ac24353317dbfe087f0ed32a699daf7dc63",
}),
)
def test_credit_provider_callback_invalid_parameters(self, request_data):
url = reverse("credit:provider_callback", args=[self.PROVIDER_ID])
response = self.client.post(url, data=request_data, content_type=JSON)
self.assertEqual(response.status_code, 400)
def test_credit_provider_invalid_status(self):
response = self._credit_provider_callback("557168d0f7664fe59097106c67c3f847", "invalid")
self.assertEqual(response.status_code, 400)
def test_credit_provider_key_not_configured(self):
# Cannot initiate a request because we can't sign it
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
response = self._create_credit_request(self.USERNAME, self.COURSE_KEY)
self.assertEqual(response.status_code, 400)
# Create the request with the secret key configured
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
# Callback from the provider is not authorized, because
# the shared secret isn't configured.
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
response = self._credit_provider_callback(request_uuid, "approved")
self.assertEqual(response.status_code, 403)
def test_request_associated_with_another_provider(self):
other_provider_id = "other_provider"
other_provider_secret_key = "1d01f067a5a54b0b8059f7095a7c636d"
# Create an additional credit provider
CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
# Initiate a credit request with the first provider
request_uuid = self._create_credit_request_and_get_uuid(self.USERNAME, self.COURSE_KEY)
# Attempt to update the request status for a different provider
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={other_provider_id: other_provider_secret_key}):
response = self._credit_provider_callback(
request_uuid,
"approved",
provider_id=other_provider_id,
secret_key=other_provider_secret_key,
)
# Response should be a 404 to avoid leaking request UUID values to other providers.
self.assertEqual(response.status_code, 404)
# Request status should still be "pending"
self._assert_request_status(request_uuid, "pending")
def _create_credit_request(self, username, course_key):
"""
Initiate a request for credit.
"""
url = reverse("credit:create_request", args=[self.PROVIDER_ID])
return self.client.post(
url,
data=json.dumps({
"username": username,
"course_key": unicode(course_key),
}),
content_type=JSON,
)
def _create_credit_request_and_get_uuid(self, username, course_key):
"""
Initiate a request for credit and return the request UUID.
"""
response = self._create_credit_request(username, course_key)
def test_session_auth(self):
""" Verify the endpoint supports authentication via session. """
self.client.logout()
self.client.login(username=self.user.username, password=self.password)
response = self.client.get(self.path)
self.assertEqual(response.status_code, 200)
return json.loads(response.content)["parameters"]["request_uuid"]
def _credit_provider_callback(self, request_uuid, status, **kwargs):
"""
Simulate a response from the credit provider approving
or rejecting the credit request.
Arguments:
request_uuid (str): The UUID of the credit request.
status (str): The status of the credit request.
Keyword Arguments:
provider_id (str): Identifier for the credit provider.
secret_key (str): Shared secret key for signing messages.
timestamp (datetime): Timestamp of the message.
sig (str): Digital signature to use on messages.
"""
provider_id = kwargs.get("provider_id", self.PROVIDER_ID)
secret_key = kwargs.get("secret_key", TEST_CREDIT_PROVIDER_SECRET_KEY)
timestamp = kwargs.get("timestamp", to_timestamp(datetime.datetime.now(pytz.UTC)))
url = reverse("credit:provider_callback", args=[provider_id])
parameters = {
"request_uuid": request_uuid,
"status": status,
"timestamp": timestamp,
}
parameters["signature"] = kwargs.get("sig", signature(parameters, secret_key))
return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
def _assert_request_status(self, uuid, expected_status):
"""
Check the status of a credit request.
"""
request = CreditRequest.objects.get(uuid=uuid)
self.assertEqual(request.status, expected_status)
def test_get_providers_detail(self):
"""Verify that the method 'get_provider_detail' returns provider with
the given provide in 'provider_ids'.
"""
url = reverse("credit:providers_detail") + "?provider_ids=hogwarts"
response = self.client.get(url)
expected = [
{
'enable_integration': True,
'description': '',
'url': 'https://credit.example.com/request',
'status_url': '',
'thumbnail_url': '',
'fulfillment_instructions': None,
'display_name': '',
'id': 'hogwarts'
}
]
self.assertListEqual(json.loads(response.content), expected)
def test_get_providers_with_multiple_provider_ids(self):
"""Test that the method 'get_provider_detail' returns multiple
providers with given 'provider_ids' or when no provider in
'provider_ids' is given.
"""
# Add another credit provider for the course
CreditProvider.objects.create(
provider_id='dummy_id',
enable_integration=True,
provider_url='https://example.com',
)
# verify that all the matching providers are returned when provider ids
# are given in parameter 'provider_ids'
url = reverse("credit:providers_detail") + "?provider_ids=hogwarts,dummy_id"
response = self.client.get(url)
self.assertEquals(len(json.loads(response.content)), 2)
@ddt.ddt
class ReadOnlyMixin(object):
""" Test mixin for read-only API endpoints. """
# verify that all providers are returned when no provider id in
# parameter 'provider_ids' is provided
url = reverse("credit:providers_detail")
response = self.client.get(url)
self.assertEquals(len(json.loads(response.content)), 2)
@ddt.data('delete', 'post', 'put')
def test_readonly(self, method):
""" Verify the viewset does not allow CreditProvider objects to be created or modified. """
response = getattr(self.client, method)(self.path)
self.assertEqual(response.status_code, 405)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditCourseViewSetTests(TestCase):
class CreditCourseViewSetTests(UserMixin, TestCase):
""" Tests for the CreditCourse endpoints.
GET/POST /api/v1/credit/creditcourse/
GET/PUT /api/v1/credit/creditcourse/:course_id/
"""
password = 'password'
def setUp(self):
super(CreditCourseViewSetTests, self).setUp()
# This value must be set here, as setting it outside of a method results in issues with CMS/Studio tests.
self.path = reverse('credit:creditcourse-list')
# Create a user and login, so that we can use session auth for the
# tests that aren't specifically testing authentication or authorization.
user = UserFactory(password=self.password, is_staff=True)
self.client.login(username=user.username, password=self.password)
list_path = 'credit:creditcourse-list'
def _serialize_credit_course(self, credit_course):
""" Serializes a CreditCourse to a Python dict. """
......@@ -442,7 +147,7 @@ class CreditCourseViewSetTests(TestCase):
self.assertIn('CSRF', response.content)
# Retrieve a CSRF token
response = client.get('/dashboard')
response = client.get('/')
csrf_token = response.cookies[settings.CSRF_COOKIE_NAME].value # pylint: disable=no-member
self.assertGreater(len(csrf_token), 0)
......@@ -552,3 +257,410 @@ class CreditCourseViewSetTests(TestCase):
# Verify the data was persisted
credit_course = CreditCourse.objects.get(course_key=credit_course.course_key)
self.assertTrue(credit_course.enabled)
@ddt.ddt
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditProviderViewSetTests(ApiTestCaseMixin, ReadOnlyMixin, AuthMixin, UserMixin, TestCase):
""" Tests for CreditProviderViewSet. """
list_path = 'credit:creditprovider-list'
@classmethod
def setUpClass(cls):
super(CreditProviderViewSetTests, cls).setUpClass()
cls.bayside = CreditProviderFactory(provider_id='bayside')
cls.hogwarts = CreditProviderFactory(provider_id='hogwarts')
cls.starfleet = CreditProviderFactory(provider_id='starfleet')
def test_list(self):
""" Verify the endpoint returns a list of all CreditProvider objects. """
response = self.client.get(self.path)
self.assertEqual(response.status_code, 200)
expected = CreditProviderSerializer(CreditProvider.objects.all(), many=True).data
self.assertEqual(response.data, expected)
@ddt.data(
('bayside',),
('hogwarts', 'starfleet')
)
def test_list_filtering(self, provider_ids):
""" Verify the endpoint returns a list of all CreditProvider objects, filtered to contain only those objects
associated with the given IDs. """
url = '{}?provider_ids={}'.format(self.path, ','.join(provider_ids))
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
expected = CreditProviderSerializer(CreditProvider.objects.filter(provider_id__in=provider_ids),
many=True).data
self.assertEqual(response.data, expected)
def test_retrieve(self):
""" Verify the endpoint returns the details for a single CreditProvider. """
url = reverse('credit:creditprovider-detail', kwargs={'provider_id': self.bayside.provider_id})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data, CreditProviderSerializer(self.bayside).data)
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase):
""" Tests for CreditProviderRequestCreateView. """
@classmethod
def setUpClass(cls):
super(CreditProviderRequestCreateViewTests, cls).setUpClass()
cls.provider = CreditProviderFactory()
def setUp(self):
super(CreditProviderRequestCreateViewTests, self).setUp()
self.path = reverse('credit:create_request', kwargs={'provider_id': self.provider.provider_id})
self.eligibility = CreditEligibilityFactory(username=self.user.username)
def post_credit_request(self, username, course_id):
""" Create a credit request for the given user and course. """
data = {
'username': username,
'course_key': unicode(course_id)
}
return self.client.post(self.path, json.dumps(data), content_type=JSON)
def test_post_with_provider_integration(self):
""" Verify the endpoint can create a new credit request. """
username = self.user.username
course = self.eligibility.course
course_key = course.course_key
final_grade = 0.95
# Enable provider integration
self.provider.enable_integration = True
self.provider.save()
# Add a single credit requirement (final grade)
requirement = CreditRequirement.objects.create(
course=course,
namespace='grade',
name='grade',
)
# Mark the user as having satisfied the requirement and eligible for credit.
CreditRequirementStatus.objects.create(
username=username,
requirement=requirement,
status='satisfied',
reason={'final_grade': final_grade}
)
secret_key = 'secret'
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={self.provider.provider_id: secret_key}):
response = self.post_credit_request(username, course_key)
self.assertEqual(response.status_code, 200)
# Check that the user's request status is pending
request = CreditRequest.objects.get(username=username, course__course_key=course_key)
self.assertEqual(request.status, 'pending')
# Check request parameters
content = json.loads(response.content)
parameters = content['parameters']
self.assertEqual(content['url'], self.provider.provider_url)
self.assertEqual(content['method'], 'POST')
self.assertEqual(len(parameters['request_uuid']), 32)
self.assertEqual(parameters['course_org'], course_key.org)
self.assertEqual(parameters['course_num'], course_key.course)
self.assertEqual(parameters['course_run'], course_key.run)
self.assertEqual(parameters['final_grade'], unicode(final_grade))
self.assertEqual(parameters['user_username'], username)
self.assertEqual(parameters['user_full_name'], self.user.get_full_name())
self.assertEqual(parameters['user_mailing_address'], '')
self.assertEqual(parameters['user_country'], '')
# The signature is going to change each test run because the request
# is assigned a different UUID each time.
# For this reason, we use the signature function directly
# (the "signature" parameter will be ignored when calculating the signature).
# Other unit tests verify that the signature function is working correctly.
self.assertEqual(parameters['signature'], signature(parameters, secret_key))
def test_post_invalid_provider(self):
""" Verify the endpoint returns HTTP 404 if the credit provider is not valid. """
path = reverse('credit:create_request', kwargs={'provider_id': 'fake'})
response = self.client.post(path, {})
self.assertEqual(response.status_code, 404)
def test_post_no_username(self):
""" Verify the endpoint returns HTTP 400 if no username is supplied. """
response = self.post_credit_request(None, 'a/b/c')
self.assert_error_response(response, 'A username must be specified.')
def test_post_invalid_course_key(self):
""" Verify the endpoint returns HTTP 400 if the course is not a valid course key. """
course_key = 'not-a-course-id'
response = self.post_credit_request(self.user.username, course_key)
self.assert_error_response(response, '[{}] is not a valid course key.'.format(course_key))
def test_post_user_not_eligible(self):
""" Verify the endpoint returns HTTP 400 if the user is not eligible for credit for the course. """
credit_course = CreditCourseFactory()
username = 'ineligible-user'
course_key = credit_course.course_key
response = self.post_credit_request(username, course_key)
msg = '[{username}] is not eligible for credit for [{course_key}].'.format(username=username,
course_key=course_key)
self.assert_error_response(response, msg)
def test_post_permissions_staff(self):
""" Verify staff users can create requests for any user. """
admin = AdminFactory(password=self.password)
self.client.logout()
self.client.login(username=admin.username, password=self.password)
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 200)
def test_post_other_user(self):
""" Verify non-staff users cannot create requests for other users. """
user = UserFactory(password=self.password)
self.client.logout()
self.client.login(username=user.username, password=self.password)
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 403)
def test_post_no_provider_integration(self):
""" Verify the endpoint returns the provider URL if provider integration is not enabled. """
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 200)
expected = {
'url': self.provider.provider_url,
'method': 'GET',
'parameters': {},
}
self.assertEqual(response.data, expected)
def test_post_secret_key_not_set(self):
""" Verify the endpoint returns HTTP 400 if we attempt to create a
request for a provider with no secret key set. """
# Enable provider integration
self.provider.enable_integration = True
self.provider.save()
# Cannot initiate a request because we cannot sign it
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 400)
@ddt.ddt
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditProviderCallbackViewTests(UserMixin, TestCase):
""" Tests for CreditProviderCallbackView. """
def setUp(self):
super(CreditProviderCallbackViewTests, self).setUp()
# Authentication should NOT be required for this endpoint.
self.client.logout()
self.provider = CreditProviderFactory()
self.path = reverse('credit:provider_callback', args=[self.provider.provider_id])
self.eligibility = CreditEligibilityFactory(username=self.user.username)
def _credit_provider_callback(self, request_uuid, status, **kwargs):
"""
Simulate a response from the credit provider approving
or rejecting the credit request.
Arguments:
request_uuid (str): The UUID of the credit request.
status (str): The status of the credit request.
Keyword Arguments:
provider_id (str): Identifier for the credit provider.
secret_key (str): Shared secret key for signing messages.
timestamp (datetime): Timestamp of the message.
sig (str): Digital signature to use on messages.
keys (dict): Override for CREDIT_PROVIDER_SECRET_KEYS setting.
"""
provider_id = kwargs.get('provider_id', self.provider.provider_id)
secret_key = kwargs.get('secret_key', '931433d583c84ca7ba41784bad3232e6')
timestamp = kwargs.get('timestamp', to_timestamp(datetime.datetime.now(pytz.UTC)))
keys = kwargs.get('keys', {self.provider.provider_id: secret_key})
url = reverse('credit:provider_callback', args=[provider_id])
parameters = {
'request_uuid': request_uuid,
'status': status,
'timestamp': timestamp,
}
parameters['signature'] = kwargs.get('sig', signature(parameters, secret_key))
with override_settings(CREDIT_PROVIDER_SECRET_KEYS=keys):
return self.client.post(url, data=json.dumps(parameters), content_type=JSON)
def _create_credit_request_and_get_uuid(self, username=None, course_key=None):
""" Initiate a request for credit and return the request UUID. """
username = username or self.user.username
course = CreditCourse.objects.get(course_key=course_key) if course_key else self.eligibility.course
credit_request = CreditRequestFactory(username=username, course=course, provider=self.provider)
return credit_request.uuid
def _assert_request_status(self, uuid, expected_status):
""" Check the status of a credit request. """
request = CreditRequest.objects.get(uuid=uuid)
self.assertEqual(request.status, expected_status)
def test_post_invalid_provider_id(self):
""" Verify the endpoint returns HTTP 404 if the provider does not exist. """
provider_id = 'fakey-provider'
self.assertFalse(CreditProvider.objects.filter(provider_id=provider_id).exists())
path = reverse('credit:provider_callback', args=[provider_id])
response = self.client.post(path, {})
self.assertEqual(response.status_code, 404)
def test_post_with_invalid_signature(self):
""" Verify the endpoint returns HTTP 403 if a request is received with an invalid signature. """
request_uuid = self._create_credit_request_and_get_uuid()
# Simulate a callback from the credit provider with an invalid signature
# Since the signature is invalid, we respond with a 403 Not Authorized.
response = self._credit_provider_callback(request_uuid, "approved", sig="invalid")
self.assertEqual(response.status_code, 403)
@ddt.data(
to_timestamp(datetime.datetime.now(pytz.UTC) - datetime.timedelta(0, 60 * 15 + 1)),
'invalid'
)
def test_post_with_invalid_timestamp(self, timestamp):
""" Verify HTTP 400 is returned for requests with an invalid timestamp. """
request_uuid = self._create_credit_request_and_get_uuid()
response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
self.assertEqual(response.status_code, 400)
def test_post_with_string_timestamp(self):
""" Verify the endpoint supports timestamps transmitted as strings instead of integers. """
request_uuid = self._create_credit_request_and_get_uuid()
timestamp = str(to_timestamp(datetime.datetime.now(pytz.UTC)))
response = self._credit_provider_callback(request_uuid, 'approved', timestamp=timestamp)
self.assertEqual(response.status_code, 200)
def test_credit_provider_callback_is_idempotent(self):
""" Verify clients can make subsequent calls with the same status. """
request_uuid = self._create_credit_request_and_get_uuid()
# Initially, the status should be "pending"
self._assert_request_status(request_uuid, "pending")
# First call sets the status to approved
self._credit_provider_callback(request_uuid, 'approved')
self._assert_request_status(request_uuid, "approved")
# Second call succeeds as well; status is still approved
self._credit_provider_callback(request_uuid, 'approved')
self._assert_request_status(request_uuid, "approved")
def test_credit_provider_invalid_status(self):
""" Verify requests with an invalid status value return HTTP 400. """
request_uuid = self._create_credit_request_and_get_uuid()
response = self._credit_provider_callback(request_uuid, 'invalid')
self.assertEqual(response.status_code, 400)
def test_request_associated_with_another_provider(self):
""" Verify the endpoint returns HTTP 404 if a request is received for the incorrect provider. """
other_provider_id = 'other-provider'
other_provider_secret_key = '1d01f067a5a54b0b8059f7095a7c636d'
# Create an additional credit provider
CreditProvider.objects.create(provider_id=other_provider_id, enable_integration=True)
# Initiate a credit request with the first provider
request_uuid = self._create_credit_request_and_get_uuid()
# Attempt to update the request status for a different provider
response = self._credit_provider_callback(
request_uuid,
'approved',
provider_id=other_provider_id,
secret_key=other_provider_secret_key,
keys={other_provider_id: other_provider_secret_key}
)
# Response should be a 404 to avoid leaking request UUID values to other providers.
self.assertEqual(response.status_code, 404)
# Request status should still be 'pending'
self._assert_request_status(request_uuid, 'pending')
def test_credit_provider_key_not_configured(self):
""" Verify the endpoint returns HTTP 403 if the provider has no key configured. """
request_uuid = self._create_credit_request_and_get_uuid()
# Callback from the provider is not authorized, because the shared secret isn't configured.
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={}):
response = self._credit_provider_callback(request_uuid, 'approved', keys={})
self.assertEqual(response.status_code, 403)
@ddt.ddt
@unittest.skipUnless(settings.ROOT_URLCONF == 'lms.urls', 'Test only valid in lms')
class CreditEligibilityViewTests(AuthMixin, UserMixin, ReadOnlyMixin, TestCase):
""" Tests for CreditEligibilityView. """
view_name = 'credit:eligibility_details'
def setUp(self):
super(CreditEligibilityViewTests, self).setUp()
self.eligibility = CreditEligibilityFactory(username=self.user.username)
self.path = self.create_url(self.eligibility)
def create_url(self, eligibility):
""" Returns a URL that can be used to view eligibility data. """
return '{path}?username={username}&course_key={course_key}'.format(
path=reverse(self.view_name),
username=eligibility.username,
course_key=eligibility.course.course_key
)
def assert_valid_get_response(self, eligibility):
""" Ensure the endpoint returns the correct eligibility data. """
url = self.create_url(eligibility)
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertListEqual(response.data, CreditEligibilitySerializer([eligibility], many=True).data)
def test_get(self):
""" Verify the endpoint returns eligibility information for the give user and course. """
self.assert_valid_get_response(self.eligibility)
def test_get_with_missing_parameters(self):
""" Verify the endpoint returns HTTP status 400 if either the username or course_key querystring argument
is not provided. """
response = self.client.get(reverse(self.view_name))
self.assertEqual(response.status_code, 400)
self.assertDictEqual(response.data,
{'detail': 'Both the course_key and username querystring parameters must be supplied.'})
def test_get_with_invalid_course_key(self):
""" Verify the endpoint returns HTTP status 400 if the provided course_key is not an actual CourseKey. """
url = '{}?username=edx&course_key=a'.format(reverse(self.view_name))
response = self.client.get(url)
self.assertEqual(response.status_code, 400)
self.assertDictEqual(response.data, {'detail': '[a] is not a valid course key.'})
def test_staff_can_view_all(self):
""" Verify that staff users can view eligibility data for all users. """
staff = AdminFactory(password=self.password)
self.client.logout()
self.client.login(username=staff.username, password=self.password)
self.assert_valid_get_response(self.eligibility)
def test_nonstaff_can_only_view_own_data(self):
""" Verify that non-staff users can only view their own eligibility data. """
user = UserFactory(password=self.password)
self.client.logout()
self.client.login(username=user.username, password=self.password)
response = self.client.get(self.path)
self.assertEqual(response.status_code, 403)
......@@ -3,47 +3,25 @@ URLs for the credit app.
"""
from django.conf.urls import patterns, url, include
from openedx.core.djangoapps.credit import views, routers
from openedx.core.djangoapps.credit.api.provider import get_credit_provider_info
from openedx.core.djangoapps.credit import views, routers, models
PROVIDER_ID_PATTERN = r'(?P<provider_id>[^/]+)'
PROVIDER_ID_PATTERN = r'(?P<provider_id>{})'.format(models.CREDIT_PROVIDER_ID_REGEX)
V1_URLS = patterns(
PROVIDER_URLS = patterns(
'',
url(r'^request/$', views.CreditProviderRequestCreateView.as_view(), name='create_request'),
url(r'^callback/?$', views.CreditProviderCallbackView.as_view(), name='provider_callback'),
)
url(
r'^providers/$',
views.get_providers_detail,
name='providers_detail'
),
url(
r'^providers/{provider_id}/$'.format(provider_id=PROVIDER_ID_PATTERN),
get_credit_provider_info,
name='get_provider_info'
),
url(
r'^providers/{provider_id}/request/$'.format(provider_id=PROVIDER_ID_PATTERN),
views.create_credit_request,
name='create_request'
),
url(
r'^providers/{provider_id}/callback/?$'.format(provider_id=PROVIDER_ID_PATTERN),
views.credit_provider_callback,
name='provider_callback'
),
url(
r'^eligibility/$',
views.get_eligibility_for_user,
name='eligibility_details'
),
V1_URLS = patterns(
'',
url(r'^providers/{}/'.format(PROVIDER_ID_PATTERN), include(PROVIDER_URLS)),
url(r'^eligibility/$', views.CreditEligibilityView.as_view(), name='eligibility_details'),
)
router = routers.SimpleRouter() # pylint: disable=invalid-name
router.register(r'courses', views.CreditCourseViewSet)
router.register(r'providers', views.CreditProviderViewSet)
V1_URLS += router.urls
urlpatterns = patterns(
......
"""
Views for the credit Django app.
"""
import datetime
import json
from __future__ import unicode_literals
import logging
import datetime
from django.conf import settings
from django.http import (
HttpResponse,
HttpResponseBadRequest,
HttpResponseForbidden,
Http404
)
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST, require_GET
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
import pytz
from rest_framework import viewsets, mixins, permissions
from rest_framework import viewsets, mixins, permissions, views, generics
from rest_framework.authentication import SessionAuthentication
from rest_framework.exceptions import ValidationError
from rest_framework.response import Response
from rest_framework_oauth.authentication import OAuth2Authentication
from openedx.core.djangoapps.credit import api
from openedx.core.djangoapps.credit.exceptions import CreditApiBadRequest, CreditRequestNotFound
from openedx.core.djangoapps.credit.models import CreditCourse
from openedx.core.djangoapps.credit.serializers import CreditCourseSerializer
from openedx.core.djangoapps.credit.signature import signature, get_shared_secret_key
from openedx.core.djangoapps.credit.api import create_credit_request
from openedx.core.djangoapps.credit.exceptions import (UserNotEligibleException, InvalidCourseKey, CreditApiBadRequest,
InvalidCreditRequest)
from openedx.core.djangoapps.credit.models import (CreditCourse, CreditProvider, CREDIT_PROVIDER_ID_REGEX,
CreditEligibility, CreditRequest)
from openedx.core.djangoapps.credit.serializers import (CreditCourseSerializer, CreditProviderSerializer,
CreditEligibilitySerializer, CreditProviderCallbackSerializer)
from openedx.core.lib.api.mixins import PutAsCreateMixin
from util.date_utils import from_timestamp
from util.json_request import JsonResponse
from openedx.core.lib.api.permissions import IsStaffOrOwner
log = logging.getLogger(__name__)
@require_GET
def get_providers_detail(request):
"""
**User Cases**
Returns details of the credit providers filtered by provided query parameters.
**Parameters:**
* provider_id (list of provider ids separated with ","): The identifiers for the providers for which
user requested
**Example Usage:**
GET /api/credit/v1/providers?provider_id=asu,hogwarts
"response": [
"id": "hogwarts",
"display_name": "Hogwarts School of Witchcraft and Wizardry",
"url": "https://credit.example.com/",
"status_url": "https://credit.example.com/status/",
"description": "A new model for the Witchcraft and Wizardry School System.",
"enable_integration": false,
"fulfillment_instructions": "
<p>In order to fulfill credit, Hogwarts School of Witchcraft and Wizardry requires learners to:</p>
<ul>
<li>Sample instruction abc</li>
<li>Sample instruction xyz</li>
</ul>",
},
...
]
**Responses:**
* 200 OK: The request was created successfully. Returned content
is a JSON-encoded dictionary describing what the client should
send to the credit provider.
* 404 Not Found: The provider does not exist.
"""
provider_ids = request.GET.get("provider_ids", None)
providers_list = provider_ids.split(",") if provider_ids else None
providers = api.get_credit_providers(providers_list)
return JsonResponse(providers)
@require_POST
def create_credit_request(request, provider_id):
"""
Initiate a request for credit in a course.
This end-point will get-or-create a record in the database to track
the request. It will then calculate the parameters to send to
the credit provider and digitally sign the parameters, using a secret
key shared with the credit provider.
The user's browser is responsible for POSTing these parameters
directly to the credit provider.
**Example Usage:**
POST /api/credit/v1/providers/hogwarts/request/
{
"username": "ron",
"course_key": "edX/DemoX/Demo_Course"
}
Response: 200 OK
Content-Type: application/json
{
"url": "http://example.com/request-credit",
"method": "POST",
"parameters": {
request_uuid: "557168d0f7664fe59097106c67c3f847"
timestamp: 1434631630,
course_org: "ASUx"
course_num: "DemoX"
course_run: "1T2015"
final_grade: "0.95",
user_username: "john",
user_email: "john@example.com"
user_full_name: "John Smith"
user_mailing_address: "",
user_country: "US",
signature: "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
}
}
**Parameters:**
* username (unicode): The username of the user requesting credit.
* course_key (unicode): The identifier for the course for which the user
is requesting credit.
**Responses:**
* 200 OK: The request was created successfully. Returned content
is a JSON-encoded dictionary describing what the client should
send to the credit provider.
* 400 Bad Request:
- The provided course key did not correspond to a valid credit course.
- The user already has a completed credit request for this course and provider.
* 403 Not Authorized:
- The username does not match the name of the logged in user.
- The user is not eligible for credit in the course.
* 404 Not Found:
- The provider does not exist.
"""
response, parameters = _validate_json_parameters(request.body, ["username", "course_key"])
if response is not None:
return response
try:
course_key = CourseKey.from_string(parameters["course_key"])
except InvalidKeyError:
return HttpResponseBadRequest(
u'Could not parse "{course_key}" as a course key'.format(
course_key=parameters["course_key"]
)
)
# Check user authorization
if not (request.user and request.user.username == parameters["username"]):
log.warning(
u'User with ID %s attempted to initiate a credit request for user with username "%s"',
request.user.id if request.user else "[Anonymous]",
parameters["username"]
)
return HttpResponseForbidden("Users are not allowed to initiate credit requests for other users.")
# Initiate the request
try:
credit_request = api.create_credit_request(course_key, provider_id, parameters["username"])
except CreditApiBadRequest as ex:
return HttpResponseBadRequest(ex)
else:
return JsonResponse(credit_request)
@require_POST
@csrf_exempt
def credit_provider_callback(request, provider_id):
"""
Callback end-point used by credit providers to approve or reject
a request for credit.
**Example Usage:**
POST /api/credit/v1/providers/{provider-id}/callback
{
"request_uuid": "557168d0f7664fe59097106c67c3f847",
"status": "approved",
"timestamp": 1434631630,
"signature": "cRCNjkE4IzY+erIjRwOQCpRILgOvXx4q2qvx141BCqI="
}
Response: 200 OK
**Parameters:**
* request_uuid (string): The UUID of the request.
* status (string): Either "approved" or "rejected".
* timestamp (int or string): The datetime at which the POST request was made, represented
as the number of seconds since January 1, 1970 00:00:00 UTC.
If the timestamp is a string, it will be converted to an integer.
* signature (string): A digital signature of the request parameters,
created using a secret key shared with the credit provider.
**Responses:**
* 200 OK: The user's status was updated successfully.
* 400 Bad request: The provided parameters were not valid.
Response content will be a JSON-encoded string describing the error.
class CreditProviderViewSet(viewsets.ReadOnlyModelViewSet):
""" Credit provider endpoints. """
* 403 Forbidden: Signature was invalid or timestamp was too far in the past.
* 404 Not Found: Could not find a request with the specified UUID associated with this provider.
"""
response, parameters = _validate_json_parameters(request.body, [
"request_uuid", "status", "timestamp", "signature"
])
if response is not None:
return response
# Validate the digital signature of the request.
# This ensures that the message came from the credit provider
# and hasn't been tampered with.
response = _validate_signature(parameters, provider_id)
if response is not None:
return response
# Validate the timestamp to ensure that the request is timely.
response = _validate_timestamp(parameters["timestamp"], provider_id)
if response is not None:
return response
# Update the credit request status
try:
api.update_credit_request_status(parameters["request_uuid"], provider_id, parameters["status"])
except CreditRequestNotFound:
raise Http404
except CreditApiBadRequest as ex:
return HttpResponseBadRequest(ex)
else:
return HttpResponse()
@require_GET
def get_eligibility_for_user(request):
"""
**User Cases**
Retrieve user eligibility against course.
**Parameters:**
lookup_field = 'provider_id'
lookup_value_regex = CREDIT_PROVIDER_ID_REGEX
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
pagination_class = None
permission_classes = (permissions.IsAuthenticated,)
queryset = CreditProvider.objects.all()
serializer_class = CreditProviderSerializer
* course_key (unicode): Identifier of course.
* username (unicode): Username of current User.
def filter_queryset(self, queryset):
queryset = super(CreditProviderViewSet, self).filter_queryset(queryset)
**Example Usage:**
# Filter by provider ID
provider_ids = self.request.GET.get('provider_ids', None)
GET /api/credit/v1/eligibility?username=user&course_key=edX/Demo_101/Fall
"response": {
"course_key": "edX/Demo_101/Fall",
"deadline": "2015-10-23"
}
if provider_ids:
provider_ids = provider_ids.split(',')
queryset = queryset.filter(provider_id__in=provider_ids)
**Responses:**
return queryset
* 200 OK: The request was created successfully.
* 404 Not Found: The provider does not exist.
class CreditProviderRequestCreateView(views.APIView):
""" Creates a credit request for the given user and course, if the user is eligible for credit."""
"""
course_key = request.GET.get("course_key", None)
username = request.GET.get("username", None)
return JsonResponse(api.get_eligibilities_for_user(username=username, course_key=course_key))
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner,)
def post(self, request, provider_id):
""" POST handler. """
# Get the provider, or return HTTP 404 if it doesn't exist
provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
def _validate_json_parameters(params_string, expected_parameters):
"""
Load the request parameters as a JSON dictionary and check that
all required paramters are present.
# Validate the course key
course_key = request.data.get('course_key')
try:
course_key = CourseKey.from_string(course_key)
except InvalidKeyError:
raise InvalidCourseKey(course_key)
Arguments:
params_string (unicode): The JSON-encoded parameter dictionary.
expected_parameters (list): Required keys of the parameters dictionary.
# Validate the username
username = request.data.get('username')
if not username:
raise ValidationError({'detail': 'A username must be specified.'})
Returns: tuple of (HttpResponse, dict)
# Ensure the user is actually eligible to receive credit
if not CreditEligibility.is_user_eligible_for_credit(course_key, username):
raise UserNotEligibleException(course_key, username)
"""
try:
parameters = json.loads(params_string)
except (TypeError, ValueError):
return HttpResponseBadRequest("Could not parse the request body as JSON."), None
try:
credit_request = create_credit_request(course_key, provider.provider_id, username)
return Response(credit_request)
except CreditApiBadRequest as ex:
raise InvalidCreditRequest(ex.message)
if not isinstance(parameters, dict):
return HttpResponseBadRequest("Request parameters must be a JSON-encoded dictionary."), None
missing_params = set(expected_parameters) - set(parameters.keys())
if missing_params:
msg = u"Required parameters are missing: {missing}".format(missing=u", ".join(missing_params))
return HttpResponseBadRequest(msg), None
class CreditProviderCallbackView(views.APIView):
""" Callback used by credit providers to update credit request status. """
return None, parameters
# This endpoint should be open to all external credit providers.
authentication_classes = ()
permission_classes = ()
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super(CreditProviderCallbackView, self).dispatch(request, *args, **kwargs)
def post(self, request, provider_id):
""" POST handler. """
provider = generics.get_object_or_404(CreditProvider, provider_id=provider_id)
data = request.data
# Ensure the input data is valid
serializer = CreditProviderCallbackSerializer(data=data, provider=provider)
serializer.is_valid(raise_exception=True)
# Update the credit request status
request_uuid = data['request_uuid']
new_status = data['status']
credit_request = generics.get_object_or_404(CreditRequest, uuid=request_uuid, provider=provider)
old_status = credit_request.status
credit_request.status = new_status
credit_request.save()
log.info(
'Updated [%s] CreditRequest [%s] from status [%s] to [%s].',
provider_id, request_uuid, old_status, new_status
)
def _validate_signature(parameters, provider_id):
"""
Check that the signature from the credit provider is valid.
return Response()
Arguments:
parameters (dict): Parameters received from the credit provider.
provider_id (unicode): Identifier for the credit provider.
Returns:
HttpResponseForbidden or None
class CreditEligibilityView(generics.ListAPIView):
""" Returns eligibility for a user-course combination. """
"""
secret_key = get_shared_secret_key(provider_id)
if secret_key is None:
log.error(
(
u'Could not retrieve secret key for credit provider with ID "%s". '
u'Since no key has been configured, we cannot validate requests from the credit provider.'
), provider_id
)
return HttpResponseForbidden("Credit provider credentials have not been configured.")
if signature(parameters, secret_key) != parameters["signature"]:
log.warning(u'Request from credit provider with ID "%s" had an invalid signature', parameters["signature"])
return HttpResponseForbidden("Invalid signature.")
def _validate_timestamp(timestamp_value, provider_id):
"""
Check that the timestamp of the request is recent.
Arguments:
timestamp (int or string): Number of seconds since Jan. 1, 1970 UTC.
If specified as a string, it will be converted to an integer.
provider_id (unicode): Identifier for the credit provider.
Returns:
HttpResponse or None
"""
timestamp = from_timestamp(timestamp_value)
if timestamp is None:
msg = u'"{timestamp}" is not a valid timestamp'.format(timestamp=timestamp_value)
log.warning(msg)
return HttpResponseBadRequest(msg)
# Check that the timestamp is recent
elapsed_seconds = (datetime.datetime.now(pytz.UTC) - timestamp).total_seconds()
if elapsed_seconds > settings.CREDIT_PROVIDER_TIMESTAMP_EXPIRATION:
log.warning(
(
u'Timestamp %s is too far in the past (%s seconds), '
u'so we are rejecting the notification from the credit provider "%s".'
),
timestamp_value, elapsed_seconds, provider_id,
authentication_classes = (OAuth2Authentication, SessionAuthentication,)
pagination_class = None
permission_classes = (permissions.IsAuthenticated, IsStaffOrOwner)
serializer_class = CreditEligibilitySerializer
queryset = CreditEligibility.objects.all()
def filter_queryset(self, queryset):
username = self.request.GET.get('username')
course_key = self.request.GET.get('course_key')
if not (username and course_key):
raise ValidationError(
{'detail': 'Both the course_key and username querystring parameters must be supplied.'})
course_key = unicode(course_key)
try:
course_key = CourseKey.from_string(course_key)
except InvalidKeyError:
raise ValidationError({'detail': '[{}] is not a valid course key.'.format(course_key)})
return queryset.filter(
username=username,
course__course_key=course_key,
deadline__gt=datetime.datetime.now(pytz.UTC)
)
return HttpResponseForbidden(u"Timestamp is too far in the past.")
class CreditCourseViewSet(PutAsCreateMixin, mixins.UpdateModelMixin, viewsets.ReadOnlyModelViewSet):
......
......@@ -80,3 +80,16 @@ class IsStaffOrReadOnly(permissions.BasePermission):
return (request.user.is_staff or
CourseStaffRole(obj.course_id).has_user(request.user) or
request.method in permissions.SAFE_METHODS)
class IsStaffOrOwner(permissions.BasePermission):
"""
Permission that allows access to admin users or the owner of an object.
The owner is considered the User object represented by obj.user.
"""
def has_object_permission(self, request, view, obj):
return request.user.is_staff or obj.user == request.user
def has_permission(self, request, view):
user = request.user
return user.is_staff or (user.username == request.data.get('username'))
""" Tests for API permissions classes. """
from django.test import TestCase, RequestFactory
from openedx.core.lib.api.permissions import IsStaffOrOwner
from student.tests.factories import UserFactory
class TestObject(object):
""" Fake class for object permission tests. """
user = None
class IsStaffOrOwnerTests(TestCase):
""" Tests for IsStaffOrOwner permission class. """
def setUp(self):
super(IsStaffOrOwnerTests, self).setUp()
self.permission = IsStaffOrOwner()
self.request = RequestFactory().get('/')
self.obj = TestObject()
def assert_user_has_object_permission(self, user, permitted):
"""
Asserts whether or not the user has permission to access an object.
Arguments
user (User)
permitted (boolean)
"""
self.request.user = user
self.assertEqual(self.permission.has_object_permission(self.request, None, self.obj), permitted)
def test_staff_user(self):
""" Staff users should be permitted. """
user = UserFactory.create(is_staff=True)
self.assert_user_has_object_permission(user, True)
def test_owner(self):
""" Owners should be permitted. """
user = UserFactory.create()
self.obj.user = user
self.assert_user_has_object_permission(user, True)
def test_non_staff_test_non_owner_or_staff_user(self):
""" Non-staff and non-owner users should not be permitted. """
user = UserFactory.create()
self.assert_user_has_object_permission(user, False)
def test_has_permission_as_staff(self):
""" Staff users always have permission. """
self.request.user = UserFactory.create(is_staff=True)
self.assertTrue(self.permission.has_permission(self.request, None))
def test_has_permission_as_owner(self):
""" Owners always have permission. """
user = UserFactory.create()
request = RequestFactory().get('/?username={}'.format(user.username))
request.user = user
self.assertTrue(self.permission.has_permission(request, None))
def test_has_permission_as_non_owner(self):
""" Non-owners should not have permission. """
user = UserFactory.create()
request = RequestFactory().get('/?username={}'.format(user.username))
request.user = UserFactory.create()
self.assertFalse(self.permission.has_permission(request, None))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment