Commit e7708e5a by Peter Pinch

Merge pull request #8942 from mitocw/bdero/import-export-reprise

Public Course Import/Export API (continued)
parents 5c178900 7bf8e2d6
......@@ -345,7 +345,7 @@ class CourseKeyVerificationTestCase(CourseTestCase):
resp = self.client.get_html(url)
self.assertEqual(resp.status_code, status_code)
url = '/import_status/{course_key}/{filename}'.format(
url = '/api/import_export/v1/courses/{course_key}/import_status/{filename}'.format(
course_key=course_key,
filename='xyz.tar.gz'
)
......
......@@ -2,485 +2,166 @@
These views handle all actions in Studio related to import and exporting of
courses
"""
import base64
import logging
import os
from opaque_keys import InvalidKeyError
import re
import shutil
import tarfile
from path import Path as path
from tempfile import mkdtemp
from django.conf import settings
from contentstore.utils import reverse_course_url, reverse_library_url, reverse_usage_url
from django.contrib.auth.decorators import login_required
from django.core.exceptions import SuspiciousOperation, PermissionDenied
from django.core.files.temp import NamedTemporaryFile
from django.core.servers.basehttp import FileWrapper
from django.http import HttpResponse, HttpResponseNotFound
from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import ensure_csrf_cookie
from django.views.decorators.http import require_http_methods, require_GET
from django.views.decorators.http import require_http_methods
import dogstats_wrapper as dog_stats_api
from django.views.decorators.csrf import ensure_csrf_cookie
from edxmako.shortcuts import render_to_response
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import SerializationError
from xmodule.modulestore.django import modulestore
from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.locator import LibraryLocator
from xmodule.modulestore.xml_importer import import_course_from_xml, import_library_from_xml
from xmodule.modulestore.xml_exporter import export_course_to_xml, export_library_to_xml
from xmodule.modulestore import COURSE_ROOT, LIBRARY_ROOT
from student.auth import has_course_author_access
from openedx.core.lib.extract_tar import safetar_extractall
from util.json_request import JsonResponse
from util.views import ensure_valid_course_key
from xmodule.modulestore.django import modulestore
from contentstore.utils import reverse_course_url, reverse_usage_url, reverse_library_url
from urllib import urlencode
__all__ = [
'import_handler', 'import_status_handler',
'export_handler',
]
__all__ = ["import_handler", "export_handler"]
log = logging.getLogger(__name__)
# Regex to capture Content-Range header ranges.
CONTENT_RE = re.compile(r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})")
CONTENT_RE = re.compile(
r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})"
)
# pylint: disable=unused-argument
@login_required
@ensure_csrf_cookie
@require_http_methods(("GET", "POST", "PUT"))
@require_http_methods(("GET",))
@ensure_valid_course_key
def import_handler(request, course_key_string):
"""
The restful handler for importing a course.
The restful handler for the import page.
GET
html: return html page for import page
json: not supported
POST or PUT
json: import a course via the .tar.gz file specified in request.FILES
"""
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
root_name = LIBRARY_ROOT
successful_url = reverse_library_url('library_handler', courselike_key)
context_name = 'context_library'
successful_url = reverse_library_url("library_handler", courselike_key)
courselike_module = modulestore().get_library(courselike_key)
import_func = import_library_from_xml
context_name = "context_library"
else:
root_name = COURSE_ROOT
successful_url = reverse_course_url('course_handler', courselike_key)
context_name = 'context_course'
successful_url = reverse_course_url("course_handler", courselike_key)
courselike_module = modulestore().get_course(courselike_key)
import_func = import_course_from_xml
return _import_handler(
request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func
)
context_name = "context_course"
def _import_handler(request, courselike_key, root_name, successful_url, context_name, courselike_module, import_func):
"""
Parameterized function containing the meat of import_handler.
"""
if not has_course_author_access(request.user, courselike_key):
raise PermissionDenied()
if 'application/json' in request.META.get('HTTP_ACCEPT', 'application/json'):
if request.method == 'GET':
raise NotImplementedError('coming soon')
else:
# Do everything in a try-except block to make sure everything is properly cleaned up.
try:
data_root = path(settings.GITHUB_REPO_ROOT)
subdir = base64.urlsafe_b64encode(repr(courselike_key))
course_dir = data_root / subdir
filename = request.FILES['course-data'].name
# Use sessions to keep info about import progress
session_status = request.session.setdefault("import_status", {})
courselike_string = unicode(courselike_key) + filename
_save_request_status(request, courselike_string, 0)
if not filename.endswith('.tar.gz'):
_save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'ErrMsg': _('We only support uploading a .tar.gz file.'),
'Stage': -1
},
status=415
)
temp_filepath = course_dir / filename
if not course_dir.isdir():
os.mkdir(course_dir)
logging.debug('importing course to {0}'.format(temp_filepath))
# Get upload chunks byte ranges
try:
matches = CONTENT_RE.search(request.META["HTTP_CONTENT_RANGE"])
content_range = matches.groupdict()
except KeyError: # Single chunk
# no Content-Range header, so make one that will work
content_range = {'start': 0, 'stop': 1, 'end': 2}
# stream out the uploaded files in chunks to disk
if int(content_range['start']) == 0:
mode = "wb+"
else:
mode = "ab+"
size = os.path.getsize(temp_filepath)
# Check to make sure we haven't missed a chunk
# This shouldn't happen, even if different instances are handling
# the same session, but it's always better to catch errors earlier.
if size < int(content_range['start']):
_save_request_status(request, courselike_string, -1)
log.warning(
"Reported range %s does not match size downloaded so far %s",
content_range['start'],
size
)
return JsonResponse(
{
'ErrMsg': _('File upload corrupted. Please try again'),
'Stage': -1
},
status=409
)
# The last request sometimes comes twice. This happens because
# nginx sends a 499 error code when the response takes too long.
elif size > int(content_range['stop']) and size == int(content_range['end']):
return JsonResponse({'ImportStatus': 1})
with open(temp_filepath, mode) as temp_file:
for chunk in request.FILES['course-data'].chunks():
temp_file.write(chunk)
size = os.path.getsize(temp_filepath)
if int(content_range['stop']) != int(content_range['end']) - 1:
# More chunks coming
return JsonResponse({
"files": [{
"name": filename,
"size": size,
"deleteUrl": "",
"deleteType": "",
"url": reverse_course_url('import_handler', courselike_key),
"thumbnailUrl": ""
}]
})
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
_save_request_status(request, courselike_string, -1)
if course_dir.isdir():
shutil.rmtree(course_dir)
log.info("Course import %s: Temp data cleared", courselike_key)
log.exception(
"error importing course"
)
return JsonResponse(
{
'ErrMsg': str(exception),
'Stage': -1
},
status=400
)
# try-finally block for proper clean up after receiving last chunk.
try:
# This was the last chunk.
log.info("Course import %s: Upload complete", courselike_key)
_save_request_status(request, courselike_string, 1)
tar_file = tarfile.open(temp_filepath)
try:
safetar_extractall(tar_file, (course_dir + '/').encode('utf-8'))
except SuspiciousOperation as exc:
_save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'ErrMsg': 'Unsafe tar file. Aborting import.',
'SuspiciousFileOperationMsg': exc.args[0],
'Stage': -1
},
status=400
)
finally:
tar_file.close()
log.info("Course import %s: Uploaded file extracted", courselike_key)
_save_request_status(request, courselike_string, 2)
# find the 'course.xml' file
def get_all_files(directory):
"""
For each file in the directory, yield a 2-tuple of (file-name,
directory-path)
"""
for dirpath, _dirnames, filenames in os.walk(directory):
for filename in filenames:
yield (filename, dirpath)
def get_dir_for_fname(directory, filename):
"""
Returns the dirpath for the first file found in the directory
with the given name. If there is no file in the directory with
the specified name, return None.
"""
for fname, dirpath in get_all_files(directory):
if fname == filename:
return dirpath
return None
dirpath = get_dir_for_fname(course_dir, root_name)
if not dirpath:
_save_request_status(request, courselike_string, -2)
return JsonResponse(
{
'ErrMsg': _('Could not find the {0} file in the package.').format(root_name),
'Stage': -2
},
status=415
)
dirpath = os.path.relpath(dirpath, data_root)
logging.debug('found %s at %s', root_name, dirpath)
log.info("Course import %s: Extracted file verified", courselike_key)
_save_request_status(request, courselike_string, 3)
with dog_stats_api.timer(
'courselike_import.time',
tags=[u"courselike:{}".format(courselike_key)]
):
courselike_items = import_func(
modulestore(), request.user.id,
settings.GITHUB_REPO_ROOT, [dirpath],
load_error_modules=False,
static_content_store=contentstore(),
target_id=courselike_key
)
new_location = courselike_items[0].location
logging.debug('new course at %s', new_location)
log.info("Course import %s: Course import successful", courselike_key)
_save_request_status(request, courselike_string, 4)
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
log.exception(
"error importing course"
)
return JsonResponse(
{
'ErrMsg': str(exception),
'Stage': -session_status[courselike_string]
},
status=400
)
finally:
if course_dir.isdir():
shutil.rmtree(course_dir)
log.info("Course import %s: Temp data cleared", courselike_key)
# set failed stage number with negative sign in case of unsuccessful import
if session_status[courselike_string] != 4:
_save_request_status(request, courselike_string, -abs(session_status[courselike_string]))
return JsonResponse({'Status': 'OK'})
elif request.method == 'GET': # assume html
status_url = reverse_course_url(
"import_status_handler", courselike_key, kwargs={'filename': "fillerName"}
)
return render_to_response('import.html', {
return render_to_response("import.html", {
context_name: courselike_module,
'successful_import_redirect_url': successful_url,
'import_status_url': status_url,
'library': isinstance(courselike_key, LibraryLocator)
})
else:
return HttpResponseNotFound()
def _save_request_status(request, key, status):
"""
Save import status for a course in request session
"""
session_status = request.session.get('import_status')
if session_status is None:
session_status = request.session.setdefault("import_status", {})
session_status[key] = status
request.session.save()
# pylint: disable=unused-argument
@require_GET
@ensure_csrf_cookie
@login_required
@ensure_valid_course_key
def import_status_handler(request, course_key_string, filename=None):
"""
Returns an integer corresponding to the status of a file import. These are:
-X : Import unsuccessful due to some error with X as stage [0-3]
0 : No status info found (import done or upload still in progress)
1 : Extracting file
2 : Validating.
3 : Importing to mongo
4 : Import successful
"""
course_key = CourseKey.from_string(course_key_string)
if not has_course_author_access(request.user, course_key):
raise PermissionDenied()
try:
session_status = request.session["import_status"]
status = session_status[course_key_string + filename]
except KeyError:
status = 0
return JsonResponse({"ImportStatus": status})
def create_export_tarball(course_module, course_key, context):
"""
Generates the export tarball, or returns None if there was an error.
Updates the context with any error information if applicable.
"""
name = course_module.url_name
export_file = NamedTemporaryFile(prefix=name + '.', suffix=".tar.gz")
root_dir = path(mkdtemp())
try:
if isinstance(course_key, LibraryLocator):
export_library_to_xml(modulestore(), contentstore(), course_key, root_dir, name)
else:
export_course_to_xml(modulestore(), contentstore(), course_module.id, root_dir, name)
logging.debug(u'tar file being generated at %s', export_file.name)
with tarfile.open(name=export_file.name, mode='w:gz') as tar_file:
tar_file.add(root_dir / name, arcname=name)
except SerializationError as exc:
log.exception(u'There was an error exporting %s', course_key)
unit = None
failed_item = None
parent = None
try:
failed_item = modulestore().get_item(exc.location)
parent_loc = modulestore().get_parent_location(failed_item.location)
if parent_loc is not None:
parent = modulestore().get_item(parent_loc)
if parent.location.category == 'vertical':
unit = parent
except: # pylint: disable=bare-except
# if we have a nested exception, then we'll show the more generic error message
pass
context.update({
'in_err': True,
'raw_err_msg': str(exc),
'failed_module': failed_item,
'unit': unit,
'edit_unit_url': reverse_usage_url("container_handler", parent.location) if parent else "",
"successful_import_redirect_url": successful_url,
"import_status_url": reverse(
"course_import_status_handler",
kwargs={
"course_key_string": unicode(courselike_key),
"filename": "fillerName"
}
),
"import_url": reverse(
"course_import_export_handler",
kwargs={
"course_key_string": unicode(courselike_key),
}
),
"library": library
})
raise
except Exception as exc:
log.exception('There was an error exporting %s', course_key)
context.update({
'in_err': True,
'unit': None,
'raw_err_msg': str(exc)})
raise
finally:
shutil.rmtree(root_dir / name)
return export_file
def send_tarball(tarball):
"""
Renders a tarball to response, for use when sending a tar.gz file to the user.
"""
wrapper = FileWrapper(tarball)
response = HttpResponse(wrapper, content_type='application/x-tgz')
response['Content-Disposition'] = 'attachment; filename=%s' % os.path.basename(tarball.name.encode('utf-8'))
response['Content-Length'] = os.path.getsize(tarball.name)
return response
# pylint: disable=unused-argument
@ensure_csrf_cookie
@login_required
@require_http_methods(("GET",))
@ensure_valid_course_key
def export_handler(request, course_key_string):
"""
The restful handler for exporting a course.
The restful handler for the export page.
GET
html: return html page for import page
application/x-tgz: return tar.gz file containing exported course
json: not supported
"""
error = request.GET.get("error", None)
error_message = request.GET.get("error_message", None)
failed_module = request.GET.get("failed_module", None)
unit = request.GET.get("unit", None)
Note that there are 2 ways to request the tar.gz file. The request header can specify
application/x-tgz via HTTP_ACCEPT, or a query parameter can be used (?_accept=application/x-tgz).
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
successful_url = reverse_library_url("library_handler", courselike_key)
courselike_module = modulestore().get_library(courselike_key)
context_name = "context_library"
else:
successful_url = reverse_course_url("course_handler", courselike_key)
courselike_module = modulestore().get_course(courselike_key)
context_name = "context_course"
If the tar.gz file has been requested but the export operation fails, an HTML page will be returned
which describes the error.
"""
course_key = CourseKey.from_string(course_key_string)
export_url = reverse_course_url('export_handler', course_key)
if not has_course_author_access(request.user, course_key):
if not has_course_author_access(request.user, courselike_key):
raise PermissionDenied()
if isinstance(course_key, LibraryLocator):
courselike_module = modulestore().get_library(course_key)
context = {
'context_library': courselike_module,
'courselike_home_url': reverse_library_url("library_handler", course_key),
'library': True
}
else:
courselike_module = modulestore().get_course(course_key)
context = {
'context_course': courselike_module,
'courselike_home_url': reverse_course_url("course_handler", course_key),
'library': False
export_url = reverse(
"course_import_export_handler",
kwargs={
"course_key_string": unicode(courselike_key),
}
) + "?accept=application/x-tgz"
context['export_url'] = export_url + '?_accept=application/x-tgz'
# an _accept URL parameter will be preferred over HTTP_ACCEPT in the header.
requested_format = request.REQUEST.get('_accept', request.META.get('HTTP_ACCEPT', 'text/html'))
export_url += "&{0}".format(
urlencode({
"redirect": reverse_course_url(
"export_handler",
unicode(courselike_key)
)
})
)
if 'application/x-tgz' in requested_format:
if unit:
try:
tarball = create_export_tarball(courselike_module, course_key, context)
except SerializationError:
return render_to_response('export.html', context)
return send_tarball(tarball)
edit_unit_url = reverse_usage_url("container_handler", unit)
except (InvalidKeyError, AttributeError):
log.error("Invalid parent key supplied to export view: %s", unit)
elif 'text/html' in requested_format:
return render_to_response('export.html', context)
return render_to_response("export.html", {
context_name: courselike_module,
"export_url": export_url,
"raw_err_msg": _(
"An invalid parent key was supplied: \"{supplied_key}\" "
"is not a valid course unit."
).format(supplied_key=unit),
"library": library
})
else:
edit_unit_url = ""
if error:
return render_to_response('export.html', {
context_name: courselike_module,
"export_url": export_url,
"in_err": error,
"unit": unit,
"failed_module": failed_module,
"edit_unit_url": edit_unit_url,
"courselike_home_url": successful_url,
"raw_err_msg": error_message,
"library": library
})
else:
# Only HTML or x-tgz request formats are supported (no JSON).
return HttpResponse(status=406)
return render_to_response("export.html", {
context_name: courselike_module,
"export_url": export_url,
"library": library
})
"""
Unit tests for course import and export
"""
import copy
import json
import logging
import lxml
import os
import shutil
import tarfile
import tempfile
from path import Path as path
from uuid import uuid4
from django.test.utils import override_settings
from django.conf import settings
from xmodule.contentstore.django import contentstore
from xmodule.modulestore.xml_exporter import export_library_to_xml
from xmodule.modulestore.xml_importer import import_library_from_xml
from xmodule.modulestore import LIBRARY_ROOT
from contentstore.utils import reverse_course_url
from xmodule.modulestore.tests.factories import ItemFactory, LibraryFactory
from contentstore.tests.utils import CourseTestCase
from openedx.core.lib.extract_tar import safetar_extractall
from student import auth
from student.roles import CourseInstructorRole, CourseStaffRole
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
TEST_DATA_CONTENTSTORE['DOC_STORE_CONFIG']['db'] = 'test_xcontent_%s' % uuid4().hex
TEST_DATA_DIR = settings.COMMON_TEST_DATA_ROOT
from contentstore.utils import reverse_course_url
log = logging.getLogger(__name__)
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ImportTestCase(CourseTestCase):
"""
Unit tests for importing a course or Library
"""
def setUp(self):
super(ImportTestCase, self).setUp()
self.url = reverse_course_url('import_handler', self.course.id)
self.content_dir = path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, self.content_dir)
def touch(name):
""" Equivalent to shell's 'touch'"""
with file(name, 'a'):
os.utime(name, None)
# Create tar test files -----------------------------------------------
# OK course:
good_dir = tempfile.mkdtemp(dir=self.content_dir)
# test course being deeper down than top of tar file
embedded_dir = os.path.join(good_dir, "grandparent", "parent")
os.makedirs(os.path.join(embedded_dir, "course"))
with open(os.path.join(embedded_dir, "course.xml"), "w+") as f:
f.write('<course url_name="2013_Spring" org="EDx" course="0.00x"/>')
with open(os.path.join(embedded_dir, "course", "2013_Spring.xml"), "w+") as f:
f.write('<course></course>')
self.good_tar = os.path.join(self.content_dir, "good.tar.gz")
with tarfile.open(self.good_tar, "w:gz") as gtar:
gtar.add(good_dir)
# Bad course (no 'course.xml' file):
bad_dir = tempfile.mkdtemp(dir=self.content_dir)
touch(os.path.join(bad_dir, "bad.xml"))
self.bad_tar = os.path.join(self.content_dir, "bad.tar.gz")
with tarfile.open(self.bad_tar, "w:gz") as btar:
btar.add(bad_dir)
self.unsafe_common_dir = path(tempfile.mkdtemp(dir=self.content_dir))
def test_no_coursexml(self):
"""
Check that the response for a tar.gz import without a course.xml is
correct.
"""
with open(self.bad_tar) as btar:
resp = self.client.post(
self.url,
{
"name": self.bad_tar,
"course-data": [btar]
})
self.assertEquals(resp.status_code, 415)
# Check that `import_status` returns the appropriate stage (i.e., the
# stage at which import failed).
resp_status = self.client.get(
reverse_course_url(
'import_status_handler',
self.course.id,
kwargs={'filename': os.path.split(self.bad_tar)[1]}
)
)
self.assertEquals(json.loads(resp_status.content)["ImportStatus"], -2)
def test_with_coursexml(self):
"""
Check that the response for a tar.gz import with a course.xml is
correct.
"""
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
def test_import_in_existing_course(self):
"""
Check that course is imported successfully in existing course and users have their access roles
"""
# Create a non_staff user and add it to course staff only
__, nonstaff_user = self.create_non_staff_authed_user_client()
auth.add_users(self.user, CourseStaffRole(self.course.id), nonstaff_user)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_before_import = course.display_name
# Check that global staff user can import course
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_after_import = course.display_name
# Check that course display name have changed after import
self.assertNotEqual(display_name_before_import, display_name_after_import)
# Now check that non_staff user has his same role
self.assertFalse(CourseInstructorRole(self.course.id).has_user(nonstaff_user))
self.assertTrue(CourseStaffRole(self.course.id).has_user(nonstaff_user))
# Now course staff user can also successfully import course
self.client.login(username=nonstaff_user.username, password='foo')
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
# Now check that non_staff user has his same role
self.assertFalse(CourseInstructorRole(self.course.id).has_user(nonstaff_user))
self.assertTrue(CourseStaffRole(self.course.id).has_user(nonstaff_user))
## Unsafe tar methods #####################################################
# Each of these methods creates a tarfile with a single type of unsafe
# content.
def _fifo_tar(self):
"""
Tar file with FIFO
"""
fifop = self.unsafe_common_dir / "fifo.file"
fifo_tar = self.unsafe_common_dir / "fifo.tar.gz"
os.mkfifo(fifop)
with tarfile.open(fifo_tar, "w:gz") as tar:
tar.add(fifop)
return fifo_tar
def _symlink_tar(self):
"""
Tarfile with symlink to path outside directory.
"""
outsidep = self.unsafe_common_dir / "unsafe_file.txt"
symlinkp = self.unsafe_common_dir / "symlink.txt"
symlink_tar = self.unsafe_common_dir / "symlink.tar.gz"
outsidep.symlink(symlinkp)
with tarfile.open(symlink_tar, "w:gz") as tar:
tar.add(symlinkp)
return symlink_tar
def _outside_tar(self):
"""
Tarfile with file that extracts to outside directory.
Extracting this tarfile in directory <dir> will put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(str(self.content_dir / "a_file")))
return outside_tar
def _outside_tar2(self):
"""
Tarfile with file that extracts to outside directory.
The path here matches the basename (`self.unsafe_common_dir`), but
then "cd's out". E.g. "/usr/../etc" == "/etc", but the naive basename
of the first (but not the second) is "/usr"
Extracting this tarfile in directory <dir> will also put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(str(self.unsafe_common_dir / "../a_file")))
return outside_tar
def _edx_platform_tar(self):
"""
Tarfile with file that extracts to edx-platform directory.
Extracting this tarfile in directory <dir> will also put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(os.path.join(os.path.abspath("."), "a_file")))
return outside_tar
def test_unsafe_tar(self):
"""
Check that safety measure work.
This includes:
'tarbombs' which include files or symlinks with paths
outside or directly in the working directory,
'special files' (character device, block device or FIFOs),
all raise exceptions/400s.
"""
def try_tar(tarpath):
""" Attempt to tar an unacceptable file """
with open(tarpath) as tar:
args = {"name": tarpath, "course-data": [tar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 400)
self.assertTrue("SuspiciousFileOperation" in resp.content)
try_tar(self._fifo_tar())
try_tar(self._symlink_tar())
try_tar(self._outside_tar())
try_tar(self._outside_tar2())
try_tar(self._edx_platform_tar())
# test trying to open a tar outside of the normal data directory
with self.settings(DATA_DIR='/not/the/data/dir'):
try_tar(self._edx_platform_tar())
# Check that `import_status` returns the appropriate stage (i.e.,
# either 3, indicating all previous steps are completed, or 0,
# indicating no upload in progress)
resp_status = self.client.get(
reverse_course_url(
'import_status_handler',
self.course.id,
kwargs={'filename': os.path.split(self.good_tar)[1]}
)
)
import_status = json.loads(resp_status.content)["ImportStatus"]
self.assertIn(import_status, (0, 3))
def test_library_import(self):
"""
Try importing a known good library archive, and verify that the
contents of the library have completely replaced the old contents.
"""
# Create some blocks to overwrite
library = LibraryFactory.create(modulestore=self.store)
lib_key = library.location.library_key
test_block = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
)
test_block2 = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False
)
# Create a library and blocks that should remain unmolested.
unchanged_lib = LibraryFactory.create()
unchanged_key = unchanged_lib.location.library_key
test_block3 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
test_block4 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
# Refresh library.
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block.url_name, children)
self.assertIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
extract_dir = path(tempfile.mkdtemp(dir=settings.DATA_DIR))
# the extract_dir needs to be passed as a relative dir to
# import_library_from_xml
extract_dir_relative = path.relpath(extract_dir, settings.DATA_DIR)
try:
with tarfile.open(path(TEST_DATA_DIR) / 'imports' / 'library.HhJfPD.tar.gz') as tar:
safetar_extractall(tar, extract_dir)
library_items = import_library_from_xml(
self.store,
self.user.id,
settings.GITHUB_REPO_ROOT,
[extract_dir_relative / 'library'],
load_error_modules=False,
static_content_store=contentstore(),
target_id=lib_key
)
finally:
shutil.rmtree(extract_dir)
self.assertEqual(lib_key, library_items[0].location.library_key)
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 3)
self.assertNotIn(test_block.url_name, children)
self.assertNotIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ExportTestCase(CourseTestCase):
class ImportExportTestCase(CourseTestCase):
"""
Tests for export_handler.
"""
......@@ -356,114 +17,34 @@ class ExportTestCase(CourseTestCase):
"""
Sets up the test course.
"""
super(ExportTestCase, self).setUp()
self.url = reverse_course_url('export_handler', self.course.id)
def test_export_html(self):
"""
Get the HTML for the page.
"""
resp = self.client.get_html(self.url)
self.assertEquals(resp.status_code, 200)
self.assertContains(resp, "Export My Course Content")
def test_export_json_unsupported(self):
"""
JSON is unsupported.
"""
resp = self.client.get(self.url, HTTP_ACCEPT='application/json')
self.assertEquals(resp.status_code, 406)
super(ImportExportTestCase, self).setUp()
self.import_url = reverse_course_url('import_handler', self.course.id)
self.export_url = reverse_course_url('export_handler', self.course.id)
def test_export_targz(self):
def test_import_html(self):
"""
Get tar.gz file, using HTTP_ACCEPT.
Get the HTML for the import page.
"""
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
self._verify_export_succeeded(resp)
def test_export_targz_urlparam(self):
"""
Get tar.gz file, using URL parameter.
"""
resp = self.client.get(self.url + '?_accept=application/x-tgz')
self._verify_export_succeeded(resp)
def _verify_export_succeeded(self, resp):
""" Export success helper method. """
resp = self.client.get_html(self.import_url)
self.assertEquals(resp.status_code, 200)
self.assertTrue(resp.get('Content-Disposition').startswith('attachment'))
self.assertContains(resp, "Replace Your Course Content")
def test_export_failure_top_level(self):
"""
Export failure.
"""
fake_xblock = ItemFactory.create(parent_location=self.course.location, category='aawefawef')
self.store.publish(fake_xblock.location, self.user.id)
self._verify_export_failure(u'/container/{}'.format(self.course.location))
def test_export_failure_subsection_level(self):
def test_export_html(self):
"""
Slightly different export failure.
Get the HTML for the export page.
"""
vertical = ItemFactory.create(parent_location=self.course.location, category='vertical', display_name='foo')
ItemFactory.create(
parent_location=vertical.location,
category='aawefawef'
)
self._verify_export_failure(u'/container/{}'.format(vertical.location))
def _verify_export_failure(self, expected_text):
""" Export failure helper method. """
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
resp = self.client.get_html(self.export_url)
self.assertEquals(resp.status_code, 200)
self.assertIsNone(resp.get('Content-Disposition'))
self.assertContains(resp, 'Unable to create xml for module')
self.assertContains(resp, expected_text)
def test_library_export(self):
"""
Verify that useable library data can be exported.
"""
youtube_id = "qS4NO9MNC6w"
library = LibraryFactory.create(modulestore=self.store)
video_block = ItemFactory.create(
category="video",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
youtube_id_1_0=youtube_id
)
name = library.url_name
lib_key = library.location.library_key
root_dir = path(tempfile.mkdtemp())
try:
export_library_to_xml(self.store, contentstore(), lib_key, root_dir, name)
lib_xml = lxml.etree.XML(open(root_dir / name / LIBRARY_ROOT).read())
self.assertEqual(lib_xml.get('org'), lib_key.org)
self.assertEqual(lib_xml.get('library'), lib_key.library)
block = lib_xml.find('video')
self.assertIsNotNone(block)
self.assertEqual(block.get('url_name'), video_block.url_name)
video_xml = lxml.etree.XML(open(root_dir / name / 'video' / video_block.url_name + '.xml').read())
self.assertEqual(video_xml.tag, 'video')
self.assertEqual(video_xml.get('youtube_id_1_0'), youtube_id)
finally:
shutil.rmtree(root_dir / name)
self.assertContains(resp, "Export My Course Content")
def test_export_success_with_custom_tag(self):
def test_permission_denied(self):
"""
Verify that course export with customtag
Test if the views handle unauthorized requests properly
"""
xml_string = '<impl>slides</impl>'
vertical = ItemFactory.create(
parent_location=self.course.location, category='vertical', display_name='foo'
)
ItemFactory.create(
parent_location=vertical.location,
category='customtag',
display_name='custom_tag_foo',
data=xml_string
# pylint: disable=unused-variable
client, user = self.create_non_staff_authed_user_client(
authenticate=True
)
self.test_export_targz_urlparam()
for url in [self.import_url, self.export_url]:
resp = client.get(url)
self.assertEquals(resp.status_code, 403)
......@@ -319,6 +319,19 @@ SESSION_INACTIVITY_TIMEOUT_IN_SECONDS = AUTH_TOKENS.get("SESSION_INACTIVITY_TIME
##### X-Frame-Options response header settings #####
X_FRAME_OPTIONS = ENV_TOKENS.get('X_FRAME_OPTIONS', X_FRAME_OPTIONS)
##### OAUTH2 Provider ##############
if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
OAUTH_OIDC_ISSUER_PATH = ENV_TOKENS.get('OAUTH_OIDC_ISSUER_PATH', 'oauth2')
OAUTH_OIDC_ISSUER = ENV_TOKENS.get(
'OAUTH_OIDC_ISSUER',
'https://{0}/{1}'.format(
SITE_NAME,
OAUTH_OIDC_ISSUER_PATH
)
)
OAUTH_ENFORCE_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_SECURE', True)
OAUTH_ENFORCE_CLIENT_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_CLIENT_SECURE', True)
##### ADVANCED_SECURITY_CONFIG #####
ADVANCED_SECURITY_CONFIG = ENV_TOKENS.get('ADVANCED_SECURITY_CONFIG', {})
......
......@@ -72,6 +72,9 @@ FEATURES = {
'AUTH_USE_CERTIFICATES': False,
# Toggles OAuth2 authentication provider
'ENABLE_OAUTH2_PROVIDER': False,
# email address for studio staff (eg to request course creation)
'STUDIO_REQUEST_EMAIL': '',
......@@ -209,6 +212,29 @@ sys.path.append(COMMON_ROOT / 'djangoapps')
GEOIP_PATH = REPO_ROOT / "common/static/data/geoip/GeoIP.dat"
GEOIPV6_PATH = REPO_ROOT / "common/static/data/geoip/GeoIPv6.dat"
############################ OAUTH2 Provider ###################################
# OpenID Connect issuer ID. Normally the URL of the authentication endpoint.
OAUTH_OIDC_ISSUER_PATH = 'oauth2'
OAUTH_OIDC_ISSUER = 'https:/example.com/oauth2'
# OpenID Connect claim handlers
OAUTH_OIDC_ID_TOKEN_HANDLERS = (
'oauth2_provider.oidc.handlers.BasicIDTokenHandler',
'oauth2_provider.oidc.handlers.ProfileHandler',
'oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.IDTokenHandler'
)
OAUTH_OIDC_USERINFO_HANDLERS = (
'oauth2_provider.oidc.handlers.BasicUserInfoHandler',
'oauth2_provider.oidc.handlers.ProfileHandler',
'oauth2_provider.oidc.handlers.EmailHandler',
'oauth2_handler.UserInfoHandler'
)
############################# WEB CONFIGURATION #############################
# This is where we stick our compiled template files.
import tempfile
......@@ -254,7 +280,8 @@ LMS_BASE = None
# These are standard regexes for pulling out info like course_ids, usage_ids, etc.
# They are used so that URLs with deprecated-format strings still work.
from lms.envs.common import (
COURSE_KEY_PATTERN, COURSE_ID_PATTERN, USAGE_KEY_PATTERN, ASSET_KEY_PATTERN
COURSE_KEY_PATTERN, COURSELIKE_KEY_PATTERN, COURSE_ID_PATTERN,
USAGE_KEY_PATTERN, ASSET_KEY_PATTERN
)
######################### CSRF #########################################
......@@ -750,6 +777,11 @@ INSTALLED_APPS = (
# Theming
'openedx.core.djangoapps.theming',
# OAuth2 Provider
'provider',
'provider.oauth2',
'oauth2_provider',
# comment common
'django_comment_common',
......@@ -786,6 +818,10 @@ INSTALLED_APPS = (
# Credit courses
'openedx.core.djangoapps.credit',
# Import/Export API
'rest_framework',
'openedx.core.djangoapps.import_export',
'xblock_django',
# edX Proctoring
......
......@@ -33,6 +33,30 @@ else:
require(["js/factories/export"], function(ExportFactory) {
ExportFactory(hasUnit, editUnitUrl, courselikeHomeUrl, is_library, errMsg);
});
## Even though there isn't an export error, we should still show contextual
## error popups if supplied.
%elif raw_err_msg:
var errMsg = ${json.dumps(raw_err_msg)};
require(['gettext', 'js/views/feedback_prompt'], function(gettext, PromptView) {
dialog = new PromptView({
title: gettext('There has been an error.'),
message: errMsg,
intent: 'error',
actions: {
primary: {
text: gettext('Continue'),
click: function(view) {
view.hide();
}
}
}
});
$('body').addClass('js');
dialog.show();
});
%endif
</%block>
......
......@@ -53,7 +53,7 @@ else:
</div>
<form id="fileupload" method="post" enctype="multipart/form-data" class="import-form">
<form id="fileupload" method="post" action="${import_url}" enctype="multipart/form-data" class="import-form">
## Translators: ".tar.gz" is a file extension, and files with that extension are called "gzipped tar files": these terms should not be translated
<h2 class="title">
......
......@@ -7,10 +7,6 @@ admin.autodiscover()
# pylint: disable=bad-continuation
# Pattern to match a course key or a library key
COURSELIKE_KEY_PATTERN = r'(?P<course_key_string>({}|{}))'.format(
r'[^/]+/[^/]+/[^/]+', r'[^/:]+:[^/+]+\+[^/+]+(\+[^/]+)?'
)
# Pattern to match a library key only
LIBRARY_KEY_PATTERN = r'(?P<library_key_string>library-v1:[^/+]+\+[^/+]+)'
......@@ -74,7 +70,7 @@ urlpatterns += patterns(
url(r'^signin$', 'login_page', name='login'),
url(r'^request_course_creator$', 'request_course_creator'),
url(r'^course_team/{}(?:/(?P<email>.+))?$'.format(COURSELIKE_KEY_PATTERN), 'course_team_handler'),
url(r'^course_team/{}(?:/(?P<email>.+))?$'.format(settings.COURSELIKE_KEY_PATTERN), 'course_team_handler'),
url(r'^course_info/{}$'.format(settings.COURSE_KEY_PATTERN), 'course_info_handler'),
url(
r'^course_info_update/{}/(?P<provided_id>\d+)?$'.format(settings.COURSE_KEY_PATTERN),
......@@ -94,9 +90,8 @@ urlpatterns += patterns(
url(r'^checklists/{}/(?P<checklist_index>\d+)?$'.format(settings.COURSE_KEY_PATTERN), 'checklists_handler'),
url(r'^orphan/{}$'.format(settings.COURSE_KEY_PATTERN), 'orphan_handler'),
url(r'^assets/{}/{}?$'.format(settings.COURSE_KEY_PATTERN, settings.ASSET_KEY_PATTERN), 'assets_handler'),
url(r'^import/{}$'.format(COURSELIKE_KEY_PATTERN), 'import_handler'),
url(r'^import_status/{}/(?P<filename>.+)$'.format(COURSELIKE_KEY_PATTERN), 'import_status_handler'),
url(r'^export/{}$'.format(COURSELIKE_KEY_PATTERN), 'export_handler'),
url(r'^import/{}$'.format(settings.COURSELIKE_KEY_PATTERN), 'import_handler'),
url(r'^export/{}$'.format(settings.COURSELIKE_KEY_PATTERN), 'export_handler'),
url(r'^xblock/outline/{}$'.format(settings.USAGE_KEY_PATTERN), 'xblock_outline_handler'),
url(r'^xblock/container/{}$'.format(settings.USAGE_KEY_PATTERN), 'xblock_container_handler'),
url(r'^xblock/{}/(?P<view_name>[^/]+)$'.format(settings.USAGE_KEY_PATTERN), 'xblock_view_handler'),
......@@ -112,7 +107,11 @@ urlpatterns += patterns(
url(r'^group_configurations/{}$'.format(settings.COURSE_KEY_PATTERN), 'group_configurations_list_handler'),
url(r'^group_configurations/{}/(?P<group_configuration_id>\d+)(/)?(?P<group_id>\d+)?$'.format(
settings.COURSE_KEY_PATTERN), 'group_configurations_detail_handler'),
url(r'^api/val/v0/', include('edxval.urls')),
# Import/Export API
url(r'^api/import_export/v1/', include('openedx.core.djangoapps.import_export.urls')),
)
JS_INFO_DICT = {
......@@ -156,6 +155,12 @@ if settings.FEATURES.get('AUTH_USE_CAS'):
url(r'^cas-auth/logout/$', 'django_cas.views.logout', {'next_page': '/'}, name="cas-logout"),
)
if settings.FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
urlpatterns += (
url(r'^oauth2/', include('oauth2_provider.urls', namespace='oauth2')),
)
urlpatterns += patterns('', url(r'^admin/', include(admin.site.urls)),)
# enable automatic login
......
......@@ -138,6 +138,10 @@ DEFAULT_COURSE_ABOUT_IMAGE_URL = ENV_TOKENS.get('DEFAULT_COURSE_ABOUT_IMAGE_URL'
MEDIA_ROOT = ENV_TOKENS.get('MEDIA_ROOT', MEDIA_ROOT)
MEDIA_URL = ENV_TOKENS.get('MEDIA_URL', MEDIA_URL)
# GITHUB_REPO_ROOT is the base directory
# for course data
GITHUB_REPO_ROOT = ENV_TOKENS.get('GITHUB_REPO_ROOT', GITHUB_REPO_ROOT)
PLATFORM_NAME = ENV_TOKENS.get('PLATFORM_NAME', PLATFORM_NAME)
# For displaying on the receipt. At Stanford PLATFORM_NAME != MERCHANT_NAME, but PLATFORM_NAME is a fine default
PLATFORM_TWITTER_ACCOUNT = ENV_TOKENS.get('PLATFORM_TWITTER_ACCOUNT', PLATFORM_TWITTER_ACCOUNT)
......@@ -585,7 +589,14 @@ if FEATURES.get('ENABLE_THIRD_PARTY_AUTH'):
##### OAUTH2 Provider ##############
if FEATURES.get('ENABLE_OAUTH2_PROVIDER'):
OAUTH_OIDC_ISSUER = ENV_TOKENS['OAUTH_OIDC_ISSUER']
OAUTH_OIDC_ISSUER_PATH = ENV_TOKENS.get('OAUTH_OIDC_ISSUER_PATH', 'oauth2')
OAUTH_OIDC_ISSUER = ENV_TOKENS.get(
'OAUTH_OIDC_ISSUER',
'https://{0}/{1}'.format(
SITE_NAME,
OAUTH_OIDC_ISSUER_PATH
)
)
OAUTH_ENFORCE_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_SECURE', True)
OAUTH_ENFORCE_CLIENT_SECURE = ENV_TOKENS.get('OAUTH_ENFORCE_CLIENT_SECURE', True)
......
......@@ -400,6 +400,9 @@ FEATURES = {
# Credit course API
'ENABLE_CREDIT_API': True,
# Full Course/Library Import/Export API
'ENABLE_IMPORT_EXPORT_LMS': False,
# The block types to disable need to be specified in "x block disable config" in django admin.
'ENABLE_DISABLING_XBLOCK_TYPES': True,
......@@ -428,6 +431,7 @@ PROJECT_ROOT = path(__file__).abspath().dirname().dirname() # /edx-platform/lms
REPO_ROOT = PROJECT_ROOT.dirname()
COMMON_ROOT = REPO_ROOT / "common"
ENV_ROOT = REPO_ROOT.dirname() # virtualenv dir /edx-platform is in
GITHUB_REPO_ROOT = ENV_ROOT / "data"
COURSES_ROOT = ENV_ROOT / "data"
DATA_DIR = COURSES_ROOT
......@@ -465,6 +469,7 @@ OPENID_PROVIDER_TRUSTED_ROOTS = ['cs50.net', '*.cs50.net']
# OpenID Connect issuer ID. Normally the URL of the authentication endpoint.
OAUTH_OIDC_ISSUER_PATH = 'oauth2'
OAUTH_OIDC_ISSUER = 'https:/example.com/oauth2'
# OpenID Connect claim handlers
......@@ -587,6 +592,12 @@ COURSE_KEY_PATTERN = r'(?P<course_key_string>[^/+]+(/|\+)[^/+]+(/|\+)[^/?]+)'
COURSE_ID_PATTERN = COURSE_KEY_PATTERN.replace('course_key_string', 'course_id')
COURSE_KEY_REGEX = COURSE_KEY_PATTERN.replace('P<course_key_string>', ':')
# Pattern to match a course key or a library key
COURSELIKE_KEY_PATTERN = r'(?P<course_key_string>({}|{}))'.format(
r'[^/:+]+/[^/:+]+/[^/:+]+',
r'[^/:]+:[^/+]+\+[^/+]+(\+[^/]+)?',
)
USAGE_KEY_PATTERN = r'(?P<usage_key_string>(?:i4x://?[^/]+/[^/]+/[^/]+/[^@]+(?:@[^/]+)?)|(?:[^/]+))'
ASSET_KEY_PATTERN = r'(?P<asset_key_string>(?:/?c4x(:/)?/[^/]+/[^/]+/[^/]+/[^@]+(?:@[^/]+)?)|(?:[^/]+))'
USAGE_ID_PATTERN = r'(?P<usage_id>(?:i4x://?[^/]+/[^/]+/[^/]+/[^@]+(?:@[^/]+)?)|(?:[^/]+))'
......@@ -1963,6 +1974,9 @@ INSTALLED_APPS = (
# Course teams
'teams',
# Import/Export API
'openedx.core.djangoapps.import_export',
'xblock_django',
)
......
......@@ -110,7 +110,7 @@ DATA_DIR = COURSES_ROOT
COMMON_TEST_DATA_ROOT = COMMON_ROOT / "test" / "data"
# Where the content data is checked out. This may not exist on jenkins.
GITHUB_REPO_ROOT = ENV_ROOT / "data"
GITHUB_REPO_ROOT = TEST_ROOT / "data"
USE_I18N = True
LANGUAGE_CODE = 'en' # tests assume they will get English.
......@@ -532,3 +532,6 @@ AUTHENTICATION_BACKENDS += ('lti_provider.users.LtiBackend',)
# ORGANIZATIONS
FEATURES['ORGANIZATIONS_APP'] = True
# Enable the Full Course/Library Import/Export API
FEATURES['ENABLE_IMPORT_EXPORT_LMS'] = True
......@@ -94,6 +94,12 @@ urlpatterns = (
url(r'^api/commerce/', include('commerce.api.urls', namespace='commerce_api')),
)
# Full Course/Library Import/Export API
if settings.FEATURES["ENABLE_IMPORT_EXPORT_LMS"]:
urlpatterns += (
url(r'^api/import_export/v1/', include('openedx.core.djangoapps.import_export.urls')),
)
if settings.FEATURES["ENABLE_COMBINED_LOGIN_REGISTRATION"]:
# Backwards compatibility with old URL structure, but serve the new views
urlpatterns += (
......
"""
A models.py is required to make this an app (until we move to Django 1.7)
"""
"""
Unit tests for course import and export
"""
import copy
import json
import logging
import lxml
import os
import tarfile
import tempfile
from path import path # pylint: disable=no-name-in-module
from uuid import uuid4
from django.test.utils import override_settings
from django.conf import settings
from xmodule.contentstore.django import contentstore
from xmodule.modulestore.xml_exporter import export_library_to_xml
from xmodule.modulestore.xml_importer import import_library_from_xml
from xmodule.modulestore import LIBRARY_ROOT
from django.core.urlresolvers import reverse
from xmodule.modulestore.tests.factories import ItemFactory, LibraryFactory
from .utils import CourseTestCase
from openedx.core.lib.extract_tar import safetar_extractall
from openedx.core.lib.tempdir import mkdtemp_clean
from student import auth
from student.roles import CourseInstructorRole, CourseStaffRole
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
TEST_DATA_CONTENTSTORE['DOC_STORE_CONFIG']['db'] = 'test_xcontent_{}'.format(
uuid4().hex
)
TEST_DATA_DIR = settings.COMMON_TEST_DATA_ROOT
log = logging.getLogger(__name__)
def course_url(handler, course_key, **kwargs):
"""
Reverse a handler that uses a course key.
:param handler: a URL handler name
:param course_key: a CourseKey
:return: the reversed URL string of the handler with the given course key
"""
kwargs_for_reverse = {'course_key_string': course_key.id}
if kwargs:
kwargs_for_reverse.update(kwargs)
return reverse(
handler,
kwargs=kwargs_for_reverse
)
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ImportTestCase(CourseTestCase):
"""
Unit tests for importing a course or library
"""
def setUp(self):
super(ImportTestCase, self).setUp()
self.url = course_url('course_import_export_handler', self.course)
self.content_dir = path(mkdtemp_clean())
# Create tar test files -----------------------------------------------
# OK course:
good_dir = tempfile.mkdtemp(dir=self.content_dir)
# test course being deeper down than top of tar file
embedded_dir = os.path.join(good_dir, "grandparent", "parent")
os.makedirs(os.path.join(embedded_dir, "course"))
with open(os.path.join(embedded_dir, "course.xml"), "w+") as f:
f.write('<course url_name="2013_Spring" org="EDx" course="0.00x"/>')
with open(os.path.join(embedded_dir, "course", "2013_Spring.xml"), "w+") as f:
f.write('<course></course>')
self.good_tar = os.path.join(self.content_dir, "good.tar.gz")
with tarfile.open(self.good_tar, "w:gz") as gtar:
gtar.add(good_dir)
# Bad course (no 'course.xml' file):
bad_dir = tempfile.mkdtemp(dir=self.content_dir)
path.joinpath(bad_dir, "bad.xml").touch()
self.bad_tar = os.path.join(self.content_dir, "bad.tar.gz")
with tarfile.open(self.bad_tar, "w:gz") as btar:
btar.add(bad_dir)
self.unsafe_common_dir = path(tempfile.mkdtemp(dir=self.content_dir))
def test_no_coursexml(self):
"""
Check that the response for a tar.gz import without a course.xml is
correct.
"""
with open(self.bad_tar) as btar:
resp = self.client.post(
self.url,
{
"name": self.bad_tar,
"course-data": [btar]
})
self.assertEquals(resp.status_code, 415)
# Check that `ImportStatus` returns the appropriate stage (i.e., the
# stage at which import failed).
resp_status = self.client.get(
course_url(
'course_import_status_handler',
self.course,
filename=os.path.split(self.bad_tar)[1]
)
)
obj = json.loads(resp_status.content)
self.assertIn("ImportStatus", obj)
self.assertEquals(obj["ImportStatus"], -2)
def test_with_coursexml(self):
"""
Check that the response for a tar.gz import with a course.xml is
correct.
"""
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
def test_import_in_existing_course(self):
"""
Check that course is imported successfully in existing course and users
have their access roles
"""
# Create a non_staff user and add it to course staff only
__, nonstaff_user = self.create_non_staff_authed_user_client()
auth.add_users(
self.user,
CourseStaffRole(self.course.id),
nonstaff_user
)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_before_import = course.display_name
# Check that global staff user can import course
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
course = self.store.get_course(self.course.id)
self.assertIsNotNone(course)
display_name_after_import = course.display_name
# Check that course display name have changed after import
self.assertNotEqual(
display_name_before_import,
display_name_after_import
)
# Now check that non_staff user has his same role
self.assertFalse(
CourseInstructorRole(self.course.id).has_user(nonstaff_user)
)
self.assertTrue(
CourseStaffRole(self.course.id).has_user(nonstaff_user)
)
# Now course staff user can also successfully import course
self.client.login(username=nonstaff_user.username, password='foo')
with open(self.good_tar) as gtar:
args = {"name": self.good_tar, "course-data": [gtar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 200)
# Now check that non_staff user has his same role
self.assertFalse(
CourseInstructorRole(self.course.id).has_user(nonstaff_user)
)
self.assertTrue(
CourseStaffRole(self.course.id).has_user(nonstaff_user)
)
## Unsafe tar methods #####################################################
# Each of these methods creates a tarfile with a single type of unsafe
# content.
def _create_tar_with_fifo(self):
"""
Tar file with FIFO
"""
fifop = self.unsafe_common_dir / "fifo.file"
fifo_tar = self.unsafe_common_dir / "fifo.tar.gz"
os.mkfifo(fifop)
with tarfile.open(fifo_tar, "w:gz") as tar:
tar.add(fifop)
return fifo_tar
def _create_tar_with_symlink(self):
"""
Tarfile with symlink to path outside directory.
"""
outsidep = self.unsafe_common_dir / "unsafe_file.txt"
symlinkp = self.unsafe_common_dir / "symlink.txt"
symlink_tar = self.unsafe_common_dir / "symlink.tar.gz"
outsidep.symlink(symlinkp) # pylint: disable=no-value-for-parameter
with tarfile.open(symlink_tar, "w:gz") as tar:
tar.add(symlinkp)
return symlink_tar
def _create_tar_file_outside(self, parent=False):
"""
Tarfile that extracts to outside directory.
If parent is False:
The path of the file will match the basename
(`self.unsafe_common_dir`), but then "cd's out".
E.g. "/usr/../etc" == "/etc", but the naive basename of the first
(but not the second) is "/usr"
Extracting this tarfile in directory <dir> will put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
tarfile_path = str(
self.unsafe_common_dir / "../a_file" if parent
else self.content_dir / "a_file"
)
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(
tarfile.TarInfo(tarfile_path)
)
return outside_tar
def _create_edx_platform_tar(self):
"""
Tarfile with file that extracts to edx-platform directory.
Extracting this tarfile in directory <dir> will also put its contents
directly in <dir> (rather than <dir/tarname>).
"""
outside_tar = self.unsafe_common_dir / "unsafe_file.tar.gz"
with tarfile.open(outside_tar, "w:gz") as tar:
tar.addfile(tarfile.TarInfo(os.path.join(os.path.abspath("."), "a_file")))
return outside_tar
def test_unsafe_tar(self):
"""
Check that safety measure work.
This includes:
'tarbombs' which include files or symlinks with paths
outside or directly in the working directory,
'special files' (character device, block device or FIFOs),
all raise exceptions/400s.
"""
def try_tar(tarpath):
""" Attempt to tar an unacceptable file """
with open(tarpath) as tar:
args = {"name": tarpath, "course-data": [tar]}
resp = self.client.post(self.url, args)
self.assertEquals(resp.status_code, 400)
self.assertTrue("suspicious_operation_message" in resp.content)
try_tar(self._create_tar_with_fifo())
try_tar(self._create_tar_with_symlink())
try_tar(self._create_tar_file_outside())
try_tar(self._create_tar_file_outside(True))
try_tar(self._create_edx_platform_tar())
# test trying to open a tar outside of the normal data directory
with self.settings(DATA_DIR='/not/the/data/dir'):
try_tar(self._create_edx_platform_tar())
# Check that `ImportStatus` returns the appropriate stage (i.e.,
# either 3, indicating all previous steps are completed, or 0,
# indicating no upload in progress)
resp_status = self.client.get(
course_url(
'course_import_status_handler',
self.course,
filename=os.path.split(self.good_tar)[1]
)
)
import_status = json.loads(resp_status.content)["ImportStatus"]
self.assertIn(import_status, (0, 3))
@override_settings(MODULESTORE_BRANCH='published')
def test_library_import(self):
"""
Try importing a known good library archive, and verify that the
contents of the library have completely replaced the old contents.
"""
# Create some blocks to overwrite
library = LibraryFactory.create(modulestore=self.store)
lib_key = library.location.library_key
test_block = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
)
test_block2 = ItemFactory.create(
category="vertical",
parent_location=library.location,
user_id=self.user.id,
publish_item=False
)
# Create a library and blocks that should remain unmolested.
unchanged_lib = LibraryFactory.create()
unchanged_key = unchanged_lib.location.library_key
test_block3 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
test_block4 = ItemFactory.create(
category="vertical",
parent_location=unchanged_lib.location,
user_id=self.user.id,
publish_item=False
)
# Refresh library.
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block.url_name, children)
self.assertIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
extract_dir = path(mkdtemp_clean(dir=settings.DATA_DIR))
# the extract_dir needs to be passed as a relative dir to
# import_library_from_xml
extract_dir_relative = path.relpath(extract_dir, settings.DATA_DIR)
with tarfile.open(path(TEST_DATA_DIR) / 'imports' / 'library.HhJfPD.tar.gz') as tar:
safetar_extractall(tar, extract_dir)
library_items = import_library_from_xml(
self.store,
self.user.id,
settings.GITHUB_REPO_ROOT,
[extract_dir_relative / 'library'],
load_error_modules=False,
static_content_store=contentstore(),
target_id=lib_key
)
self.assertEqual(lib_key, library_items[0].location.library_key)
library = self.store.get_library(lib_key)
children = [self.store.get_item(child).url_name for child in library.children]
self.assertEqual(len(children), 3)
self.assertNotIn(test_block.url_name, children)
self.assertNotIn(test_block2.url_name, children)
unchanged_lib = self.store.get_library(unchanged_key)
children = [self.store.get_item(child).url_name for child in unchanged_lib.children]
self.assertEqual(len(children), 2)
self.assertIn(test_block3.url_name, children)
self.assertIn(test_block4.url_name, children)
@override_settings(CONTENTSTORE=TEST_DATA_CONTENTSTORE)
class ExportTestCase(CourseTestCase):
"""
Tests for export_handler.
"""
def setUp(self):
"""
Sets up the test course.
"""
super(ExportTestCase, self).setUp()
self.url = course_url('course_import_export_handler', self.course)
def test_export_html_unsupported(self):
"""
HTML is unsupported
"""
resp = self.client.get(self.url, HTTP_ACCEPT='text/html')
self.assertEquals(resp.status_code, 406)
def test_export_json_supported(self):
"""
JSON is supported.
"""
resp = self.client.get(self.url, HTTP_ACCEPT='application/json')
self.assertEquals(resp.status_code, 200)
def test_export_targz(self):
"""
Get tar.gz file, using HTTP_ACCEPT.
"""
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
self._verify_export_succeeded(resp)
def test_export_targz_urlparam(self):
"""
Get tar.gz file, using URL parameter.
"""
resp = self.client.get(self.url + '?accept=application/x-tgz')
self._verify_export_succeeded(resp)
def _verify_export_succeeded(self, resp):
""" Export success helper method. """
self.assertEquals(resp.status_code, 200)
self.assertTrue(
resp.get('Content-Disposition').startswith('attachment')
)
@override_settings(MODULESTORE_BRANCH='draft-preferred')
def test_export_failure_top_level(self):
"""
Export failure.
"""
fake_xblock = ItemFactory.create(
parent_location=self.course.location,
category='aawefawef'
)
self.store.publish(fake_xblock.location, self.user.id)
self._verify_export_failure(u'{}'.format(self.course.location))
def test_export_failure_subsection_level(self):
"""
Slightly different export failure.
"""
vertical = ItemFactory.create(
parent_location=self.course.location,
category='vertical',
display_name='foo')
ItemFactory.create(
parent_location=vertical.location,
category='aawefawef'
)
self._verify_export_failure(u'{}'.format(vertical.location))
def _verify_export_failure(self, expected_text):
""" Export failure helper method. """
resp = self.client.get(self.url, HTTP_ACCEPT='application/x-tgz')
self.assertEquals(resp.status_code, 200)
self.assertNotIn('Content-Disposition', resp)
self.assertContains(resp, 'Unable to create xml for module')
self.assertContains(resp, expected_text)
def test_library_export(self):
"""
Verify that useable library data can be exported.
"""
youtube_id = "qS4NO9MNC6w"
library = LibraryFactory.create(modulestore=self.store)
video_block = ItemFactory.create(
category="video",
parent_location=library.location,
user_id=self.user.id,
publish_item=False,
youtube_id_1_0=youtube_id
)
name = library.url_name
lib_key = library.location.library_key
root_dir = path(mkdtemp_clean())
export_library_to_xml(self.store, contentstore(), lib_key, root_dir, name)
lib_xml = lxml.etree.XML(open(root_dir / name / LIBRARY_ROOT).read()) # pylint: disable=no-member
self.assertEqual(lib_xml.get('org'), lib_key.org)
self.assertEqual(lib_xml.get('library'), lib_key.library)
block = lib_xml.find('video')
self.assertIsNotNone(block)
self.assertEqual(block.get('url_name'), video_block.url_name)
video_xml = lxml.etree.XML( # pylint: disable=no-member
open(root_dir / name / 'video' / video_block.url_name + '.xml').read()
)
self.assertEqual(video_xml.tag, 'video')
self.assertEqual(video_xml.get('youtube_id_1_0'), youtube_id)
def test_export_success_with_custom_tag(self):
"""
Verify that course export with customtag
"""
xml_string = '<impl>slides</impl>'
vertical = ItemFactory.create(
parent_location=self.course.location, category='vertical', display_name='foo'
)
ItemFactory.create(
parent_location=vertical.location,
category='customtag',
display_name='custom_tag_foo',
data=xml_string
)
self.test_export_targz_urlparam()
'''
Utilities for contentstore tests
'''
from datetime import timedelta
from django.conf import settings
from django.utils import timezone
from provider.oauth2.models import AccessToken, Client as OAuth2Client
from provider import constants
from rest_framework.test import APIClient
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
TEST_DATA_DIR = settings.COMMON_TEST_DATA_ROOT
def create_oauth2_client(user):
"""
Create an OAuth2 client associated with the given user and generate an
access token for said client.
:param user:
:return: a Client (provider.oauth2) and an AccessToken
"""
# Register an OAuth2 Client
client = OAuth2Client(
user=user,
name=user.username,
url="http://127.0.0.1/",
redirect_uri="http://127.0.0.1/",
client_type=constants.CONFIDENTIAL
)
client.save()
# Generate an access token for the client
access_token = AccessToken(
user=user,
client=client,
# Set the access token to expire one day from now
expires=timezone.now() + timedelta(1, 0),
scope=constants.READ_WRITE
)
access_token.save()
return client, access_token
def use_access_token(client, access_token):
"""
Make an APIClient pass an access token for all requests
:param client: an APIClient
:param access_token: an AccessToken
"""
client.credentials(
HTTP_AUTHORIZATION="Bearer {}".format(access_token.token)
)
return client
class CourseTestCase(ModuleStoreTestCase):
"""
Extendable base for test cases dealing with courses
"""
def setUp(self):
"""
These tests need a user in the DB so that the django Test Client can
log them in.
The test user is created in the ModuleStoreTestCase setUp method.
They inherit from the ModuleStoreTestCase class so that the mongodb
collection will be cleared out before each test case execution and
deleted afterwards.
"""
self.user_password = super(CourseTestCase, self).setUp()
# Create an APIClient to simulate requests (like the Django Client, but
# without CSRF)
api_client = APIClient()
# Register an OAuth2 Client
_oauth2_client, access_token = create_oauth2_client(self.user)
self.client = use_access_token(api_client, access_token)
self.course = CourseFactory.create()
def create_non_staff_authed_user_client(self):
"""
Create a non-staff user, log them in (if authenticate=True), and return
the client, user to use for testing.
"""
nonstaff, _password = self.create_non_staff_user()
client = APIClient()
return client, nonstaff
"""
URLs for course publishing API
"""
from django.conf.urls import patterns, url
from django.conf import settings
from .views import FullCourseImportExport, FullCourseImportStatus
urlpatterns = patterns(
'api.courses.views',
url(
r'^{}$'.format(settings.COURSELIKE_KEY_PATTERN),
FullCourseImportExport.as_view(),
name='course_import_export_handler',
),
url(
r'^{}/import_status/(?P<filename>.+)$'.format(
settings.COURSELIKE_KEY_PATTERN
),
FullCourseImportStatus.as_view(),
name='course_import_status_handler',
),
)
"""
These views handle all actions in Studio related to import and exporting of
courses
"""
import base64
import logging
from opaque_keys import InvalidKeyError
import os
import re
import shutil
import tarfile
from path import path # pylint: disable=no-name-in-module
from django.conf import settings
from django.core.cache import cache
from django.core.exceptions import SuspiciousOperation
from django.core.files.temp import NamedTemporaryFile
from django.core.servers.basehttp import FileWrapper
from django.http import HttpResponse, Http404
from django.utils.translation import ugettext as _
from django.shortcuts import redirect
from rest_framework import renderers
from rest_framework.authentication import OAuth2Authentication, \
SessionAuthentication
from rest_framework.decorators import renderer_classes \
as renderer_classes_decorator
from rest_framework.permissions import IsAuthenticated, BasePermission
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.views import APIView
import dogstats_wrapper as dog_stats_api
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import SerializationError
from xmodule.modulestore.django import modulestore
from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.locator import LibraryLocator
from xmodule.modulestore.xml_importer import import_course_from_xml, import_library_from_xml
from xmodule.modulestore.xml_exporter import export_course_to_xml, export_library_to_xml
from xmodule.modulestore import COURSE_ROOT, LIBRARY_ROOT
from student.auth import has_course_author_access
from openedx.core.lib.extract_tar import safetar_extractall
from openedx.core.lib.tempdir import mkdtemp_clean
from util.json_request import JsonResponse
from util.views import ensure_valid_course_key
from urllib import urlencode
log = logging.getLogger(__name__)
# Regex to capture Content-Range header ranges.
CONTENT_RE = re.compile(
r"(?P<start>\d{1,11})-(?P<stop>\d{1,11})/(?P<end>\d{1,11})"
)
class HasCourseWriteAccess(BasePermission):
"""
Permission that checks to see if the request user has permission to access
all course content of the requested course
"""
def has_permission(self, request, view):
course_key_string = view.kwargs['course_key_string']
try:
course_key = CourseKey.from_string(course_key_string)
except InvalidKeyError:
raise Http404
return has_course_author_access(request.user, course_key)
class ArchiveRenderer(renderers.BaseRenderer):
"""
A Renderer for compressed tars. It gets used at the content negotiation
stage, but "render" never actually gets used.
"""
media_type = "application/x-tgz"
format = None
render_style = "binary"
def render(self, data, _media_type=None, _render_context=None):
return data
class FullCourseImportStatus(APIView):
"""
View the import status of a full course import.
"""
authentication_classes = (OAuth2Authentication, SessionAuthentication)
permission_classes = (IsAuthenticated, HasCourseWriteAccess)
@ensure_valid_course_key
def get(self, request, course_key_string, filename=None):
"""
Returns an integer corresponding to the status of a file import.
These are:
-X : Import unsuccessful due to some error with X as stage [0-3]
0 : No status info found (import done or upload still in progress)
1 : Extracting file
2 : Validating.
3 : Importing to mongo
4 : Import successful
"""
status_key = "import_export.import.status:{}|{}{}".format(
request.user.username,
course_key_string,
filename
)
status = cache.get(status_key, 0)
return Response({"ImportStatus": status})
class FullCourseImportExport(APIView):
"""
Import or export a full course archive.
"""
authentication_classes = (OAuth2Authentication, SessionAuthentication)
permission_classes = (IsAuthenticated, HasCourseWriteAccess)
renderer_classes = (ArchiveRenderer, JSONRenderer)
def _save_request_status(self, request, key, status):
"""
Save import status for a course in request session
"""
cache.set(
"import_export.import.status:{}|{}".format(request.user.username, key),
status
)
def _export_error_response(self, params, redirect_url=None):
"""
Reasons about what to do when an export error is encountered. If there
was a redirect URL supplied in the request, pass error information in
the redirect URL. Otherwise, return the information in a JSON response.
"""
if redirect_url:
return redirect("{0}?{1}".format(
redirect_url,
urlencode(params)
))
else:
return JsonResponse(params)
@ensure_valid_course_key
@renderer_classes_decorator((ArchiveRenderer,))
def get(self, request, course_key_string):
"""
The restful handler for exporting a full course or content library.
GET
application/x-tgz: return tar.gz file containing exported course
json: not supported
Note that there are 2 ways to request the tar.gz file. The request
header can specify application/x-tgz via HTTP_ACCEPT, or a query
parameter can be used (?accept=application/x-tgz).
If the tar.gz file has been requested but the export operation fails,
a JSON string will be returned which describes the error
"""
redirect_url = request.QUERY_PARAMS.get('redirect', None)
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
courselike_module = modulestore().get_library(courselike_key)
else:
courselike_module = modulestore().get_course(courselike_key)
name = courselike_module.url_name
export_file = NamedTemporaryFile(prefix=name + '.', suffix=".tar.gz")
root_dir = path(mkdtemp_clean())
try:
if library:
export_library_to_xml(
modulestore(),
contentstore(),
courselike_key,
root_dir,
name
)
else:
export_course_to_xml(
modulestore(),
contentstore(),
courselike_module.id,
root_dir,
name
)
logging.debug(
u'tar file being generated at %s', export_file.name
)
with tarfile.open(name=export_file.name, mode='w:gz') as tar_file:
tar_file.add(root_dir / name, arcname=name)
except SerializationError as exc:
log.exception(
u'There was an error exporting course %s',
courselike_key
)
unit = None
failed_item = None
parent = None
try:
failed_item = modulestore().get_item(exc.location)
parent_loc = modulestore().get_parent_location(
failed_item.location
)
if parent_loc is not None:
parent = modulestore().get_item(parent_loc)
if parent.location.category == 'vertical':
unit = parent
except Exception: # pylint: disable=broad-except
# if we have a nested exception, then we'll show the more
# generic error message
pass
return self._export_error_response(
{
"context_course": str(courselike_module.location),
"error": True,
"error_message": str(exc),
"failed_module":
str(failed_item.location) if failed_item else "",
"unit":
str(unit.location) if unit else ""
},
redirect_url=redirect_url
)
except Exception as exc: # pylint: disable=broad-except
log.exception(
'There was an error exporting course %s',
courselike_key
)
return self._export_error_response(
{
"context_course": courselike_module.url_name,
"error": True,
"error_message": str(exc),
"unit": ""
},
redirect_url=redirect_url
)
# The course is all set; return the tar.gz
wrapper = FileWrapper(export_file)
response = HttpResponse(wrapper, content_type='application/x-tgz')
response['Content-Disposition'] = 'attachment; filename={}'.format(
os.path.basename(
export_file.name.encode('utf-8')
)
)
response['Content-Length'] = os.path.getsize(export_file.name)
return response
@ensure_valid_course_key
@renderer_classes_decorator((JSONRenderer,))
def post(self, request, course_key_string):
"""
The restful handler for importing a course.
GET
json: return json import status
POST or PUT
json: import a course via the .tar.gz file specified inrequest.FILES
"""
courselike_key = CourseKey.from_string(course_key_string)
library = isinstance(courselike_key, LibraryLocator)
if library:
root_name = LIBRARY_ROOT
import_func = import_library_from_xml
else:
root_name = COURSE_ROOT
import_func = import_course_from_xml
filename = request.FILES['course-data'].name
courselike_string = unicode(courselike_key) + filename
data_root = path(settings.GITHUB_REPO_ROOT)
subdir = base64.urlsafe_b64encode(repr(courselike_key))
course_dir = data_root / subdir
status_key = "import_export.import.status:{}|{}".format(
request.user.username,
courselike_string
)
# Do everything in a try-except block to make sure everything is
# properly cleaned up.
try:
# Cache the import progress
self._save_request_status(request, courselike_string, 0)
if not filename.endswith('.tar.gz'):
self._save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'error_message': _(
'We only support uploading a .tar.gz file.'
),
'stage': -1
},
status=415
)
temp_filepath = course_dir / filename
# Only handle exceptions caused by the directory already existing,
# to avoid a potential race condition caused by the "check and go"
# method.
try:
os.makedirs(course_dir)
except OSError as exc:
if exc.errno != exc.EEXIST:
raise
logging.debug('importing course to %s', temp_filepath)
# Get upload chunks byte ranges
try:
matches = CONTENT_RE.search(request.META["HTTP_CONTENT_RANGE"])
content_range = matches.groupdict()
except KeyError: # Single chunk
# no Content-Range header, so make one that will work
content_range = {'start': 0, 'stop': 1, 'end': 2}
# stream out the uploaded files in chunks to disk
if int(content_range['start']) == 0:
mode = "wb+"
else:
mode = "ab+"
size = os.path.getsize(temp_filepath)
# Check to make sure we haven't missed a chunk
# This shouldn't happen, even if different instances are
# handling the same session, but it's always better to catch
# errors earlier.
if size < int(content_range['start']):
self._save_request_status(request, courselike_string, -1)
log.warning(
"Reported range %s does not match size downloaded so "
"far %s",
content_range['start'],
size
)
return JsonResponse(
{
'error_message': _(
'File upload corrupted. Please try again'
),
'stage': -1
},
status=409
)
# The last request sometimes comes twice. This happens because
# nginx sends a 499 error code when the response takes too long.
elif size > int(content_range['stop']) \
and size == int(content_range['end']):
return JsonResponse({'ImportStatus': 1})
with open(temp_filepath, mode) as temp_file:
for chunk in request.FILES['course-data'].chunks():
temp_file.write(chunk)
size = os.path.getsize(temp_filepath)
if int(content_range['stop']) != int(content_range['end']) - 1:
# More chunks coming
return JsonResponse({
"files": [{
"name": filename,
"size": size,
"delete_url": "",
"delete_type": "",
"thumbnail_url": ""
}]
})
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
self._save_request_status(request, courselike_string, -1)
if course_dir.isdir(): # pylint: disable=no-value-for-parameter
shutil.rmtree(course_dir)
log.info(
"Course import %s: Temp data cleared", courselike_key
)
log.exception("error importing course")
return JsonResponse(
{
'error_message': str(exception),
'stage': -1
},
status=400
)
# try-finally block for proper clean up after receiving last chunk.
try:
# This was the last chunk.
log.info("Course import %s: Upload complete", courselike_key)
self._save_request_status(request, courselike_string, 1)
tar_file = tarfile.open(temp_filepath)
try:
safetar_extractall(
tar_file,
(course_dir + '/').encode('utf-8'))
except SuspiciousOperation as exc:
self._save_request_status(request, courselike_string, -1)
return JsonResponse(
{
'error_message': 'Unsafe tar file. Aborting import.',
'suspicious_operation_message': exc.args[0],
'stage': -1
},
status=400
)
finally:
tar_file.close()
log.info(
"Course import %s: Uploaded file extracted", courselike_key
)
self._save_request_status(request, courselike_string, 2)
# find the 'course.xml' file
def get_all_files(directory):
"""
For each file in the directory, yield a 2-tuple of (file-name,
directory-path)
"""
for dirpath, _dirnames, filenames in os.walk(directory):
for filename in filenames:
yield (filename, dirpath)
def get_dir_for_fname(directory, filename):
"""
Returns the dirpath for the first file found in the directory
with the given name. If there is no file in the directory with
the specified name, return None.
"""
for fname, dirpath in get_all_files(directory):
if fname == filename:
return dirpath
return None
dirpath = get_dir_for_fname(course_dir, root_name)
if not dirpath:
self._save_request_status(request, courselike_string, -2)
return JsonResponse(
{
'error_message': _(
'Could not find the {root_xml_file} file in the package.'
).format(root_xml_file=root_name),
'stage': -2
},
status=415
)
dirpath = os.path.relpath(dirpath, data_root)
logging.debug('found %s at %s', root_name, dirpath)
log.info(
"Course import %s: Extracted file verified",
courselike_key
)
self._save_request_status(request, courselike_string, 3)
with dog_stats_api.timer(
'courselike_import.time',
tags=[u"courselike:{}".format(courselike_key)]
):
courselike_items = import_func(
modulestore(),
request.user.id,
settings.GITHUB_REPO_ROOT,
[dirpath],
load_error_modules=False,
static_content_store=contentstore(),
target_id=courselike_key,
)
new_location = courselike_items[0].location
logging.debug('new course at %s', new_location)
log.info(
"Course import %s: Course import successful", courselike_key
)
self._save_request_status(request, courselike_string, 4)
# Send errors to client with stage at which error occurred.
except Exception as exception: # pylint: disable=broad-except
log.exception(
"error importing course"
)
return JsonResponse(
{
'error_message': str(exception),
'stage': -cache.get(status_key)
},
status=400
)
finally:
if course_dir.isdir(): # pylint: disable=no-value-for-parameter
shutil.rmtree(course_dir)
log.info(
"Course import %s: Temp data cleared", courselike_key # pylint: disable=no-value-for-parameter
)
# set failed stage number with negative sign in case of an
# unsuccessful import
if cache.get(status_key) != 4:
self._save_request_status(
request,
courselike_string,
-abs(cache.get(status_key))
)
return JsonResponse({'status': 'OK'})
"""
A models.py is required to make this an app (until we move to Django 1.7)
"""
"""
URLs for the public API
"""
from django.conf.urls import patterns, url, include
urlpatterns = patterns(
'',
# Import/Export API
url(
r'^courses/',
include('openedx.core.djangoapps.import_export.courses.urls')
),
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment