Commit f747a669 by Joel Barciauskas

Add course import REST API to Studio

parent 5e51c5cf
"""
Tests for the course import API views
"""
import os
import shutil
import tarfile
import tempfile
from datetime import datetime
from urllib import urlencode
from django.core.urlresolvers import reverse
from path import Path as path
from mock import patch
from rest_framework import status
from rest_framework.test import APITestCase
from lms.djangoapps.courseware.tests.factories import GlobalStaffFactory, StaffFactory
from student.tests.factories import UserFactory
from user_tasks.models import UserTaskStatus
from xmodule.modulestore.tests.django_utils import TEST_DATA_SPLIT_MODULESTORE, SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
class CourseImportViewTest(SharedModuleStoreTestCase, APITestCase):
"""
Test importing courses via a RESTful API (POST method only)
"""
MODULESTORE = TEST_DATA_SPLIT_MODULESTORE
@classmethod
def setUpClass(cls):
super(CourseImportViewTest, cls).setUpClass()
cls.course = CourseFactory.create(display_name='test course', run="Testing_course")
cls.course_key = cls.course.id
cls.restricted_course = CourseFactory.create(display_name='restricted test course', run="Restricted_course")
cls.restricted_course_key = cls.restricted_course.id
cls.password = 'test'
cls.student = UserFactory(username='dummy', password=cls.password)
cls.staff = StaffFactory(course_key=cls.course.id, password=cls.password)
cls.restricted_staff = StaffFactory(course_key=cls.restricted_course.id, password=cls.password)
cls.content_dir = path(tempfile.mkdtemp())
# Create tar test files -----------------------------------------------
# OK course:
good_dir = tempfile.mkdtemp(dir=cls.content_dir)
# test course being deeper down than top of tar file
embedded_dir = os.path.join(good_dir, "grandparent", "parent")
os.makedirs(os.path.join(embedded_dir, "course"))
with open(os.path.join(embedded_dir, "course.xml"), "w+") as f:
f.write('<course url_name="2013_Spring" org="EDx" course="0.00x"/>')
with open(os.path.join(embedded_dir, "course", "2013_Spring.xml"), "w+") as f:
f.write('<course></course>')
cls.good_tar_filename = "good.tar.gz"
cls.good_tar_fullpath = os.path.join(cls.content_dir, cls.good_tar_filename)
with tarfile.open(cls.good_tar_fullpath, "w:gz") as gtar:
gtar.add(good_dir)
def get_url(self, course_id):
"""
Helper function to create the url
"""
return reverse(
'courses_api:course_import',
kwargs={
'course_id': course_id
}
)
def test_anonymous_import_fails(self):
"""
Test that an anonymous user cannot access the API and an error is received.
"""
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_student_import_fails(self):
"""
Test that a student user cannot access the API and an error is received.
"""
self.client.login(username=self.student.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_staff_with_access_import_succeeds(self):
"""
Test that a staff user can access the API and successfully upload a course
"""
self.client.login(username=self.staff.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_staff_has_no_access_import_fails(self):
"""
Test that a staff user can't access another course via the API
"""
self.client.login(username=self.staff.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.restricted_course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_student_get_status_fails(self):
"""
Test that a student user cannot access the API and an error is received.
"""
self.client.login(username=self.student.username, password=self.password)
resp = self.client.get(self.get_url(self.course_key), {'task_id': '1234', 'filename': self.good_tar_filename})
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_anonymous_get_status_fails(self):
"""
Test that an anonymous user cannot access the API and an error is received.
"""
resp = self.client.get(self.get_url(self.course_key), {'task_id': '1234', 'filename': self.good_tar_filename})
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
def test_staff_get_status_succeeds(self):
"""
Test that an import followed by a get status results in success
Note: This relies on the fact that we process imports synchronously during testing
"""
self.client.login(username=self.staff.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_200_OK)
resp = self.client.get(
self.get_url(self.course_key),
{'task_id': resp.data['task_id'], 'filename': self.good_tar_filename},
format='multipart'
)
self.assertEqual(resp.data['state'], UserTaskStatus.SUCCEEDED)
def test_staff_no_access_get_status_fails(self):
"""
Test that an import followed by a get status as an unauthorized staff fails
Note: This relies on the fact that we process imports synchronously during testing
"""
self.client.login(username=self.staff.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_200_OK)
task_id = resp.data['task_id']
resp = self.client.get(
self.get_url(self.course_key),
{'task_id': task_id, 'filename': self.good_tar_filename},
format='multipart'
)
self.assertEqual(resp.data['state'], UserTaskStatus.SUCCEEDED)
self.client.logout()
self.client.login(username=self.restricted_staff.username, password=self.password)
resp = self.client.get(
self.get_url(self.course_key),
{'task_id': task_id, 'filename': self.good_tar_filename},
format='multipart'
)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
def test_course_task_mismatch_get_status_fails(self):
"""
Test that an import followed by a get status as an unauthorized staff fails
Note: This relies on the fact that we process imports synchronously during testing
"""
self.client.login(username=self.staff.username, password=self.password)
with open(self.good_tar_fullpath, 'rb') as fp:
resp = self.client.post(self.get_url(self.course_key), {'course_data': fp}, format='multipart')
self.assertEqual(resp.status_code, status.HTTP_200_OK)
task_id = resp.data['task_id']
resp = self.client.get(
self.get_url(self.restricted_course_key),
{'task_id': task_id, 'filename': self.good_tar_filename},
format='multipart'
)
self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
""" Course Import API URLs. """
from django.conf import settings
from django.conf.urls import (
patterns,
url,
)
from cms.djangoapps.contentstore.api import views
urlpatterns = patterns(
'',
url(
r'^v0/import/{course_id}/$'.format(
course_id=settings.COURSE_ID_PATTERN,
),
views.CourseImportView.as_view(), name='course_import'
),
)
""" API v0 views. """
import base64
import logging
import os
from path import Path as path
from six import text_type
from django.conf import settings
from django.contrib.auth import get_user_model
from django.views.decorators.csrf import csrf_exempt
from django.core.files import File
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from rest_framework import status
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from user_tasks.models import UserTaskArtifact, UserTaskStatus
from student.auth import has_course_author_access
from contentstore.storage import course_import_export_storage
from contentstore.tasks import CourseImportTask, import_olx
from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin, view_auth_classes
log = logging.getLogger(__name__)
@view_auth_classes()
class CourseImportExportViewMixin(DeveloperErrorViewMixin):
"""
Mixin class for course import/export related views.
"""
def perform_authentication(self, request):
"""
Ensures that the user is authenticated (e.g. not an AnonymousUser)
"""
super(CourseImportExportViewMixin, self).perform_authentication(request)
if request.user.is_anonymous():
raise AuthenticationFailed
class CourseImportView(CourseImportExportViewMixin, GenericAPIView):
"""
**Use Case**
* Start an asynchronous task to import a course from a .tar.gz file into
the specified course ID, overwriting the existing course
* Get a status on an asynchronous task import
**Example Requests**
POST /api/courses/v0/import/{course_id}/
GET /api/courses/v0/import/{course_id}/?task_id={task_id}
**POST Parameters**
A POST request must include the following parameters.
* course_id: (required) A string representation of a Course ID,
e.g., course-v1:edX+DemoX+Demo_Course
* course_data: (required) The course .tar.gz file to import
**POST Response Values**
If the import task is started successfully, an HTTP 200 "OK" response is
returned.
The HTTP 200 response has the following values.
* task_id: UUID of the created task, usable for checking status
* filename: string of the uploaded filename
**Example POST Response**
{
"task_id": "4b357bb3-2a1e-441d-9f6c-2210cf76606f"
}
**GET Parameters**
A GET request must include the following parameters.
* task_id: (required) The UUID of the task to check, e.g. "4b357bb3-2a1e-441d-9f6c-2210cf76606f"
* filename: (required) The filename of the uploaded course .tar.gz
**GET Response Values**
If the import task is found successfully by the UUID provided, an HTTP
200 "OK" response is returned.
The HTTP 200 response has the following values.
* state: String description of the state of the task
**Example GET Response**
{
"state": "Succeeded"
}
"""
def post(self, request, course_id):
"""
Kicks off an asynchronous course import and returns an ID to be used to check
the task's status
"""
courselike_key = CourseKey.from_string(course_id)
if not has_course_author_access(request.user, courselike_key):
return self.make_error_response(
status_code=status.HTTP_403_FORBIDDEN,
developer_message='The user requested does not have the required permissions.',
error_code='user_mismatch'
)
try:
if 'course_data' not in request.FILES:
return self.make_error_response(
status_code=status.HTTP_400_BAD_REQUEST,
developer_message='Missing required parameter',
error_code='internal_error',
field_errors={'course_data': '"course_data" parameter is required, and must be a .tar.gz file'}
)
filename = request.FILES['course_data'].name
if not filename.endswith('.tar.gz'):
return self.make_error_response(
status_code=status.HTTP_400_BAD_REQUEST,
developer_message='Parameter in the wrong format',
error_code='internal_error',
field_errors={'course_data': '"course_data" parameter is required, and must be a .tar.gz file'}
)
course_dir = path(settings.GITHUB_REPO_ROOT) / base64.urlsafe_b64encode(repr(courselike_key))
temp_filepath = course_dir / filename
if not course_dir.isdir(): # pylint: disable=no-value-for-parameter
os.mkdir(course_dir)
log.debug('importing course to {0}'.format(temp_filepath))
with open(temp_filepath, "wb+") as temp_file:
for chunk in request.FILES['course_data'].chunks():
temp_file.write(chunk)
log.info("Course import %s: Upload complete", courselike_key)
with open(temp_filepath, 'rb') as local_file:
django_file = File(local_file)
storage_path = course_import_export_storage.save(u'olx_import/' + filename, django_file)
async_result = import_olx.delay(
request.user.id, text_type(courselike_key), storage_path, filename, request.LANGUAGE_CODE)
return Response({
'task_id': async_result.task_id
})
except Exception as e:
return self.make_error_response(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
developer_message=str(e),
error_code='internal_error'
)
def get(self, request, course_id):
"""
Check the status of the specified task
"""
courselike_key = CourseKey.from_string(course_id)
if not has_course_author_access(request.user, courselike_key):
return self.make_error_response(
status_code=status.HTTP_403_FORBIDDEN,
developer_message='The user requested does not have the required permissions.',
error_code='user_mismatch'
)
try:
task_id = request.GET['task_id']
filename = request.GET['filename']
args = {u'course_key_string': course_id, u'archive_name': filename}
name = CourseImportTask.generate_name(args)
task_status = UserTaskStatus.objects.filter(name=name, task_id=task_id).first()
return Response({
'state': task_status.state
})
except Exception as e:
return self.make_error_response(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
developer_message=str(e),
error_code='internal_error'
)
......@@ -98,6 +98,11 @@ urlpatterns += patterns(
url(r'^assets/{}/{}?$'.format(settings.COURSE_KEY_PATTERN, settings.ASSET_KEY_PATTERN), 'assets_handler'),
url(r'^import/{}$'.format(COURSELIKE_KEY_PATTERN), 'import_handler'),
url(r'^import_status/{}/(?P<filename>.+)$'.format(COURSELIKE_KEY_PATTERN), 'import_status_handler'),
# rest api for course import/export
url(
r'^api/courses/',
include('cms.djangoapps.contentstore.api.urls', namespace='courses_api')
),
url(r'^export/{}$'.format(COURSELIKE_KEY_PATTERN), 'export_handler'),
url(r'^export_output/{}$'.format(COURSELIKE_KEY_PATTERN), 'export_output_handler'),
url(r'^export_status/{}$'.format(COURSELIKE_KEY_PATTERN), 'export_status_handler'),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment