Commit b24c2280 by Calen Pennington Committed by GitHub

Merge pull request #14047 from cpennington/safer-opaque-keys

Safer opaque keys
parents d2b6c9eb 42617545
......@@ -22,6 +22,7 @@ course.certificates: {
}
"""
import json
import logging
from django.conf import settings
from django.contrib.auth.decorators import login_required
......@@ -33,6 +34,7 @@ from django.views.decorators.http import require_http_methods
from contentstore.utils import reverse_course_url
from edxmako.shortcuts import render_to_response
from opaque_keys.edx.keys import CourseKey, AssetKey
from opaque_keys import InvalidKeyError
from eventtracking import tracker
from student.auth import has_studio_write_access
from student.roles import GlobalStaff
......@@ -49,6 +51,8 @@ from contentstore.utils import get_lms_link_for_certificate_web_view
CERTIFICATE_SCHEMA_VERSION = 1
CERTIFICATE_MINIMUM_ID = 100
LOGGER = logging.getLogger(__name__)
def _get_course_and_check_access(course_key, user, depth=0):
"""
......@@ -67,11 +71,32 @@ def _delete_asset(course_key, asset_key_string):
remove asset by calling delete_asset method of assets module.
"""
if asset_key_string:
# remove first slash in asset path
# otherwise it generates InvalidKeyError in case of split modulestore
if '/' == asset_key_string[0]:
asset_key_string = asset_key_string[1:]
asset_key = AssetKey.from_string(asset_key_string)
try:
asset_key = AssetKey.from_string(asset_key_string)
except InvalidKeyError:
# remove first slash in asset path
# otherwise it generates InvalidKeyError in case of split modulestore
if '/' == asset_key_string[0]:
asset_key_string = asset_key_string[1:]
try:
asset_key = AssetKey.from_string(asset_key_string)
except InvalidKeyError:
# Unable to parse the asset key, log and return
LOGGER.info(
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
course_key,
asset_key_string,
)
return
else:
# Unable to parse the asset key, log and return
LOGGER.info(
"In course %r, unable to parse asset key %r, not attempting to delete signatory.",
course_key,
asset_key_string,
)
return
try:
delete_asset(course_key, asset_key)
# If the asset was not found, it doesn't have to be deleted...
......
......@@ -21,7 +21,7 @@ from xmodule.services import SettingsService
from xmodule.modulestore.django import modulestore, ModuleI18nService
from xmodule.mixin import wrap_with_license
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.asides import AsideUsageKeyV1
from opaque_keys.edx.asides import AsideUsageKeyV1, AsideUsageKeyV2
from xmodule.x_module import ModuleSystem
from xblock.runtime import KvsFieldData
from xblock.django.request import webob_to_django_response, django_to_webob_request
......@@ -57,7 +57,7 @@ def preview_handler(request, usage_key_string, handler, suffix=''):
"""
usage_key = UsageKey.from_string(usage_key_string)
if isinstance(usage_key, AsideUsageKeyV1):
if isinstance(usage_key, (AsideUsageKeyV1, AsideUsageKeyV2)):
descriptor = modulestore().get_item(usage_key.usage_key)
for aside in descriptor.runtime.get_asides(descriptor):
if aside.scope_ids.block_type == usage_key.aside_type:
......
......@@ -3,6 +3,7 @@
"""
Certificates Tests.
"""
import itertools
import json
import mock
import ddt
......@@ -11,7 +12,6 @@ from django.conf import settings
from django.test.utils import override_settings
from opaque_keys.edx.keys import AssetKey
from opaque_keys.edx.locations import AssetLocation
from contentstore.utils import reverse_course_url
from contentstore.views.certificates import CERTIFICATE_SCHEMA_VERSION
......@@ -53,6 +53,9 @@ CERTIFICATE_JSON_WITH_SIGNATORIES = {
]
}
C4X_SIGNATORY_PATH = '/c4x/test/CSS101/asset/Signature{}.png'
SIGNATORY_PATH = 'asset-v1:test+CSS101+SP2017+type@asset+block@Signature{}.png'
# pylint: disable=no-member
class HelperMethods(object):
......@@ -70,7 +73,8 @@ class HelperMethods(object):
)
contentstore().save(content)
def _add_course_certificates(self, count=1, signatory_count=0, is_active=False):
def _add_course_certificates(self, count=1, signatory_count=0, is_active=False,
asset_path_format=C4X_SIGNATORY_PATH):
"""
Create certificate for the course.
"""
......@@ -78,18 +82,14 @@ class HelperMethods(object):
{
'name': 'Name ' + str(i),
'title': 'Title ' + str(i),
'signature_image_path': '/c4x/test/CSS101/asset/Signature{}.png'.format(i),
'signature_image_path': asset_path_format.format(i),
'id': i
} for i in xrange(signatory_count)
]
# create images for signatory signatures except the last signatory
for idx, signatory in enumerate(signatories):
if len(signatories) > 2 and idx == len(signatories) - 1:
continue
else:
self._create_fake_images([signatory['signature_image_path']])
self._create_fake_images(signatory['signature_image_path'] for signatory in signatories[:-1])
certificates = [
{
......@@ -557,11 +557,61 @@ class CertificatesDetailHandlerTestCase(
content = json.loads(response.content)
self.assertEqual(content, expected)
def test_can_delete_certificate_with_signatories(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_can_delete_certificate_with_signatories(self, signatory_path):
"""
Delete certificate
"""
self._add_course_certificates(count=2, signatory_count=1, asset_path_format=signatory_path)
response = self.client.delete(
self._url(cid=1),
content_type="application/json",
HTTP_ACCEPT="application/json",
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
)
self.assertEqual(response.status_code, 204)
self.assert_event_emitted(
'edx.certificate.configuration.deleted',
course_id=unicode(self.course.id),
configuration_id='1',
)
self.reload_course()
# Verify that certificates are properly updated in the course.
certificates = self.course.certificates['certificates']
self.assertEqual(len(certificates), 1)
self.assertEqual(certificates[0].get('name'), 'Name 0')
self.assertEqual(certificates[0].get('description'), 'Description 0')
def test_can_delete_certificate_with_slash_prefix_signatory(self):
"""
Delete certificate
"""
self._add_course_certificates(count=2, signatory_count=1, asset_path_format="/" + SIGNATORY_PATH)
response = self.client.delete(
self._url(cid=1),
content_type="application/json",
HTTP_ACCEPT="application/json",
HTTP_X_REQUESTED_WITH="XMLHttpRequest",
)
self.assertEqual(response.status_code, 204)
self.assert_event_emitted(
'edx.certificate.configuration.deleted',
course_id=unicode(self.course.id),
configuration_id='1',
)
self.reload_course()
# Verify that certificates are properly updated in the course.
certificates = self.course.certificates['certificates']
self.assertEqual(len(certificates), 1)
self.assertEqual(certificates[0].get('name'), 'Name 0')
self.assertEqual(certificates[0].get('description'), 'Description 0')
@ddt.data("not_a_valid_asset_key{}.png", "/not_a_valid_asset_key{}.png")
def test_can_delete_certificate_with_invalid_signatory(self, signatory_path):
"""
Delete certificate
"""
self._add_course_certificates(count=2, signatory_count=1)
self._add_course_certificates(count=2, signatory_count=1, asset_path_format=signatory_path)
response = self.client.delete(
self._url(cid=1),
content_type="application/json",
......@@ -581,11 +631,12 @@ class CertificatesDetailHandlerTestCase(
self.assertEqual(certificates[0].get('name'), 'Name 0')
self.assertEqual(certificates[0].get('description'), 'Description 0')
def test_delete_certificate_without_write_permissions(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_delete_certificate_without_write_permissions(self, signatory_path):
"""
Tests certificate deletion without write permission on course.
"""
self._add_course_certificates(count=2, signatory_count=1)
self._add_course_certificates(count=2, signatory_count=1, asset_path_format=signatory_path)
user = UserFactory()
self.client.login(username=user.username, password='test')
response = self.client.delete(
......@@ -596,11 +647,12 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEqual(response.status_code, 403)
def test_delete_certificate_without_global_staff_permissions(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_delete_certificate_without_global_staff_permissions(self, signatory_path):
"""
Tests deletion of an active certificate without global staff permission on course.
"""
self._add_course_certificates(count=2, signatory_count=1, is_active=True)
self._add_course_certificates(count=2, signatory_count=1, is_active=True, asset_path_format=signatory_path)
user = UserFactory()
for role in [CourseInstructorRole, CourseStaffRole]:
role(self.course.id).add_users(user)
......@@ -613,11 +665,12 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEqual(response.status_code, 403)
def test_update_active_certificate_without_global_staff_permissions(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_update_active_certificate_without_global_staff_permissions(self, signatory_path):
"""
Tests update of an active certificate without global staff permission on course.
"""
self._add_course_certificates(count=2, signatory_count=1, is_active=True)
self._add_course_certificates(count=2, signatory_count=1, is_active=True, asset_path_format=signatory_path)
cert_data = {
u'id': 1,
u'version': CERTIFICATE_SCHEMA_VERSION,
......@@ -654,14 +707,15 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEqual(response.status_code, 404)
def test_can_delete_signatory(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_can_delete_signatory(self, signatory_path):
"""
Delete an existing certificate signatory
"""
self._add_course_certificates(count=2, signatory_count=3)
self._add_course_certificates(count=2, signatory_count=3, asset_path_format=signatory_path)
certificates = self.course.certificates['certificates']
signatory = certificates[1].get("signatories")[1]
image_asset_location = AssetLocation.from_deprecated_string(signatory['signature_image_path'])
image_asset_location = AssetKey.from_string(signatory['signature_image_path'])
content = contentstore().find(image_asset_location)
self.assertIsNotNone(content)
test_url = '{}/signatories/1'.format(self._url(cid=1))
......@@ -680,11 +734,12 @@ class CertificatesDetailHandlerTestCase(
# make sure signatory signature image is deleted too
self.assertRaises(NotFoundError, contentstore().find, image_asset_location)
def test_deleting_signatory_without_signature(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_deleting_signatory_without_signature(self, signatory_path):
"""
Delete an signatory whose signature image is already removed or does not exist
"""
self._add_course_certificates(count=2, signatory_count=4)
self._add_course_certificates(count=2, signatory_count=4, asset_path_format=signatory_path)
test_url = '{}/signatories/3'.format(self._url(cid=1))
response = self.client.delete(
test_url,
......@@ -708,12 +763,13 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEqual(response.status_code, 404)
def test_certificate_activation_success(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_certificate_activation_success(self, signatory_path):
"""
Activate and Deactivate the course certificate
"""
test_url = reverse_course_url('certificates.certificate_activation_handler', self.course.id)
self._add_course_certificates(count=1, signatory_count=2)
self._add_course_certificates(count=1, signatory_count=2, asset_path_format=signatory_path)
is_active = True
for i in range(2):
......@@ -736,14 +792,15 @@ class CertificatesDetailHandlerTestCase(
course_id=unicode(self.course.id),
)
@ddt.data(True, False)
def test_certificate_activation_without_write_permissions(self, activate):
@ddt.data(*itertools.product([True, False], [C4X_SIGNATORY_PATH, SIGNATORY_PATH]))
@ddt.unpack
def test_certificate_activation_without_write_permissions(self, activate, signatory_path):
"""
Tests certificate Activate and Deactivate should not be allowed if user
does not have write permissions on course.
"""
test_url = reverse_course_url('certificates.certificate_activation_handler', self.course.id)
self._add_course_certificates(count=1, signatory_count=2)
self._add_course_certificates(count=1, signatory_count=2, asset_path_format=signatory_path)
user = UserFactory()
self.client.login(username=user.username, password='test')
response = self.client.post(
......@@ -755,14 +812,15 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEquals(response.status_code, 403)
@ddt.data(True, False)
def test_certificate_activation_without_global_staff_permissions(self, activate):
@ddt.data(*itertools.product([True, False], [C4X_SIGNATORY_PATH, SIGNATORY_PATH]))
@ddt.unpack
def test_certificate_activation_without_global_staff_permissions(self, activate, signatory_path):
"""
Tests certificate Activate and Deactivate should not be allowed if user
does not have global staff permissions on course.
"""
test_url = reverse_course_url('certificates.certificate_activation_handler', self.course.id)
self._add_course_certificates(count=1, signatory_count=2)
self._add_course_certificates(count=1, signatory_count=2, asset_path_format=signatory_path)
user = UserFactory()
for role in [CourseInstructorRole, CourseStaffRole]:
role(self.course.id).add_users(user)
......@@ -776,7 +834,8 @@ class CertificatesDetailHandlerTestCase(
)
self.assertEquals(response.status_code, 403)
def test_certificate_activation_failure(self):
@ddt.data(C4X_SIGNATORY_PATH, SIGNATORY_PATH)
def test_certificate_activation_failure(self, signatory_path):
"""
Certificate activation should fail when user has not read access to course then permission denied exception
should raised.
......@@ -784,7 +843,7 @@ class CertificatesDetailHandlerTestCase(
test_url = reverse_course_url('certificates.certificate_activation_handler', self.course.id)
test_user_client, test_user = self.create_non_staff_authed_user_client()
CourseEnrollment.enroll(test_user, self.course.id)
self._add_course_certificates(count=1, signatory_count=2)
self._add_course_certificates(count=1, signatory_count=2, asset_path_format=signatory_path)
response = test_user_client.post(
test_url,
data=json.dumps({"is_active": True}),
......
......@@ -2,6 +2,7 @@
Tests for the Studio Tagging XBlockAside
"""
import ddt
from xmodule.modulestore import ModuleStoreEnum
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
......@@ -17,13 +18,14 @@ from contentstore.utils import reverse_usage_url
from contentstore.tests.utils import AjaxEnabledTestClient
from django.test.client import RequestFactory
from student.tests.factories import UserFactory
from opaque_keys.edx.asides import AsideUsageKeyV1
from opaque_keys.edx.asides import AsideUsageKeyV1, AsideUsageKeyV2
from datetime import datetime
from pytz import UTC
from lxml import etree
from StringIO import StringIO
@ddt.ddt
class StructuredTagsAsideTestCase(ModuleStoreTestCase):
"""
Base class for tests of StructuredTagsAside (tagging.py)
......@@ -179,13 +181,14 @@ class StructuredTagsAsideTestCase(ModuleStoreTestCase):
video_html = get_preview_fragment(request, self.video, context).content
self.assertNotRegexpMatches(video_html, "<select")
def test_handle_requests(self):
@ddt.data(AsideUsageKeyV1, AsideUsageKeyV2)
def test_handle_requests(self, aside_key_class):
"""
Checks that handler to save tags in StructuredTagsAside works properly
"""
handler_url = reverse_usage_url(
'preview_handler',
'%s:%s::%s' % (AsideUsageKeyV1.CANONICAL_NAMESPACE, self.problem.location, self.aside_name),
unicode(aside_key_class(self.problem.location, self.aside_name)),
kwargs={'handler': 'save_tags'}
)
......
......@@ -55,7 +55,7 @@ setup(
'capa',
'path.py',
'webob',
'edx-opaque-keys>=0.3.4,<1.0.0',
'edx-opaque-keys>=0.4.0,<1.0.0',
],
package_data={
'xmodule': ['js/module/*'],
......
......@@ -200,7 +200,10 @@ class StaticContent(object):
if path.startswith('/static/'):
path = path[len('/static/'):]
path = path.lstrip('/')
# Old-style asset keys start with `/`, so don't try and strip it
# in that case.
if not path.startswith('/c4x'):
path = path.lstrip('/')
try:
return AssetKey.from_string(path)
......
......@@ -33,7 +33,7 @@ from xmodule.errortracker import exc_info_to_str
from xmodule.modulestore.exceptions import ItemNotFoundError
from opaque_keys.edx.keys import UsageKey
from opaque_keys.edx.asides import AsideUsageKeyV1, AsideDefinitionKeyV1
from opaque_keys.edx.asides import AsideUsageKeyV2, AsideDefinitionKeyV2
from xmodule.exceptions import UndefinedContext
import dogstats_wrapper as dog_stats_api
......@@ -155,8 +155,8 @@ class AsideKeyGenerator(IdGenerator):
Returns:
(aside_definition_id, aside_usage_id)
"""
def_key = AsideDefinitionKeyV1(definition_id, aside_type)
usage_key = AsideUsageKeyV1(usage_id, aside_type)
def_key = AsideDefinitionKeyV2(definition_id, aside_type)
usage_key = AsideUsageKeyV2(usage_id, aside_type)
return (def_key, usage_key)
def create_usage(self, def_id):
......
......@@ -33,7 +33,7 @@ from .models import (
import logging
from opaque_keys.edx.keys import CourseKey, UsageKey
from opaque_keys.edx.block_types import BlockTypeKeyV1
from opaque_keys.edx.asides import AsideUsageKeyV1
from opaque_keys.edx.asides import AsideUsageKeyV1, AsideUsageKeyV2
from contracts import contract, new_contract
from django.db import DatabaseError
......@@ -67,6 +67,7 @@ def _all_usage_keys(descriptors, aside_types):
for aside_type in aside_types:
usage_ids.add(AsideUsageKeyV1(descriptor.scope_ids.usage_id, aside_type))
usage_ids.add(AsideUsageKeyV2(descriptor.scope_ids.usage_id, aside_type))
return usage_ids
......
......@@ -26,7 +26,6 @@ __test__ = False # do not collect
@cmdopts([
("system=", "s", "System to act on"),
("test-id=", "t", "Test id"),
("failed", "f", "Run only failed tests"),
("fail-fast", "x", "Fail suite on first failed test"),
("fasttest", "a", "Run without collectstatic"),
make_option(
......
......@@ -48,7 +48,7 @@ edx-django-oauth2-provider==1.1.4
edx-django-sites-extensions==2.1.1
edx-enterprise==0.1.0
edx-oauth2-provider==1.2.0
edx-opaque-keys==0.3.4
edx-opaque-keys==0.4.0
edx-organizations==0.4.1
edx-rest-api-client==1.2.1
edx-search==0.1.2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment