Unverified Commit 7de77557 by M. Rehan Committed by GitHub

Merge pull request #69 from edx/mrehan/course-hex-dependency

[File Discovery]  Course can be fetched/created without a course_hex
parents b89c731c 6a32abcb
...@@ -398,6 +398,22 @@ class Course(TimeStampedModel): ...@@ -398,6 +398,22 @@ class Course(TimeStampedModel):
return org return org
@property
def course_runs(self):
"""
Returns the studio course runs associated with this course. Ideally, there should be one entry
in this Model for all the course runs in Studio.
"""
course_runs = []
if self.local_storedir:
course_runs = [
course_id.strip()
for course_id in self.local_storedir.split(',')
if course_id
]
return course_runs
def __unicode__(self): def __unicode__(self):
return u'{institution} {edx_class_id} {course_name}'.format( return u'{institution} {edx_class_id} {course_name}'.format(
institution=self.institution, institution=self.institution,
......
...@@ -79,7 +79,7 @@ class DaemonCli: ...@@ -79,7 +79,7 @@ class DaemonCli:
node_work_directory=node_work_directory node_work_directory=node_work_directory
) )
FD.studio_s3_ingest() FD.discover_studio_ingested_videos()
FD.about_video_ingest() FD.about_video_ingest()
reset_queries() reset_queries()
x += 1 x += 1
......
"""
Unit Tests for File Discovery Phase.
"""
from contextlib import contextmanager
import ddt
import json
import os import os
import sys import shutil
import unittest import tempfile
from django.test import TestCase
""" from boto.s3.connection import S3Connection
Test VEDA API from boto.s3.key import Key
from boto.exception import S3ResponseError, S3DataError
from django.test import TestCase
from mock import ANY, Mock, patch
from moto import mock_s3_deprecated
from opaque_keys.edx.keys import CourseKey
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from control.veda_file_discovery import FileDiscovery from control.veda_file_discovery import FileDiscovery
from VEDA.utils import get_config
from VEDA_OS01.models import Course, TranscriptCredentials, TranscriptProvider
class TestValidation(TestCase): CONFIG_DATA = get_config('test_config.yaml')
TEST_FILES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_files')
# Default S3 metadata
S3_METADATA = {
'course_video_upload_token': 'xxx',
'client_video_id': 'OVTESTFILE_01.mp4',
'course_key': 'course-v1:MAx+123+test_run',
'transcript_preferences': json.dumps({})
}
# Default course data
COURSE_DATA = {
'course_name': 'MAx 123',
'institution': 'MAx',
'edx_classid': '123',
'local_storedir': ''
}
@contextmanager
def temporary_directory():
"""
Context manager for tempfile.mkdtemp() so it's usable with "with" statement.
"""
name = tempfile.mkdtemp()
yield name
shutil.rmtree(name)
@ddt.ddt
@mock_s3_deprecated
@patch('control.veda_file_discovery.get_config', Mock(return_value=CONFIG_DATA))
class TestFileDiscovery(TestCase):
"""
Tests for file discovery phase.
"""
def setUp(self): def setUp(self):
self.file_name = u'OVTESTFILE_01.mp4'
self.video_file_path = os.path.join(TEST_FILES_DIR, self.file_name)
# Create s3 bucket -- all this is happening in moto virtual environment
connection = S3Connection()
connection.create_bucket(CONFIG_DATA['edx_s3_ingest_bucket'])
def upload_video_with_metadata(self, **metadata):
"""
Sets the metadata on an S3 video key.
"""
# Upload the video file to ingest bucket
connection = S3Connection()
self.ingest_bucket = connection.get_bucket(CONFIG_DATA['edx_s3_ingest_bucket'])
key_name = os.path.join(CONFIG_DATA['edx_s3_ingest_prefix'], self.file_name)
self.video_key = Key(self.ingest_bucket, key_name)
for metadata_name, value in dict(S3_METADATA, **metadata).iteritems():
if value is not None:
self.video_key.set_metadata(metadata_name, value)
self.video_key.set_contents_from_filename(self.video_file_path)
def setup_course(self, **course_data):
"""
Sets up a course.
Arguments:
course_data(dict): A dict containing the course properties.
"""
return Course.objects.create(**dict(COURSE_DATA, **course_data))
def assert_video_location(self, filename, expected_directory):
"""
Asserts that video file is in the expected directory.
self.videofile = os.path.join( Arguments:
os.path.dirname(os.path.abspath(__file__)), filename: Name of the file.
'test_files', expected_directory: A prefix in which the file with filename is expected.
'OVTESTFILE_01.mp4' """
videos = list(self.ingest_bucket.list(expected_directory, '/'))
self.assertEqual(len(videos), 1)
self.assertEqual(os.path.basename(videos[0].name), filename)
@ddt.data(
(
'course-v1:MAx+123+test_run',
json.dumps({'provider': TranscriptProvider.THREE_PLAY}),
{'provider': TranscriptProvider.THREE_PLAY}
),
(
'invalid_course_key',
json.dumps({'provider': TranscriptProvider.THREE_PLAY}),
None
),
(
'course-v1:MAx+123+test_run',
'invalid_json_data',
None
),
)
@ddt.unpack
def test_parse_transcript_preferences(self, course_id, transcript_preferences, expected_preferences):
"""
Tests that 'FileDiscovery.parse_transcript_preferences' works as expected.
"""
# create test credentials
TranscriptCredentials.objects.create(
org='MAx',
provider=TranscriptProvider.THREE_PLAY,
api_key='test-api-key',
api_secret='test-api-secret'
) )
self.FD = FileDiscovery() file_discovery = FileDiscovery()
actual_preferences = file_discovery.parse_transcript_preferences(course_id, transcript_preferences)
# Assert the transcript preferences.
assert actual_preferences == expected_preferences
@patch('control.veda_file_discovery.VALAPICall.call')
def test_reject_file_and_update_val(self, mock_val_api):
"""
Tests that 'FileDiscovery.reject_file_and_update_val' works as expected.
"""
self.upload_video_with_metadata()
# instantiate file discovery instance with the ingest bucket.
file_discovery_instance = FileDiscovery()
file_discovery_instance.bucket = self.ingest_bucket
# rejecting a key will move it to 'prod-edx/rejected/ingest/' directory in the bucket.
file_discovery_instance.reject_file_and_update_val(self.video_key, ANY, ANY, ANY)
self.assertTrue(mock_val_api.called)
# assert that video file is no more in '/ingest' directory.
ingested_videos = list(self.ingest_bucket.list(CONFIG_DATA['edx_s3_ingest_prefix'], '/'))
self.assertEqual(ingested_videos, [])
# assert that video file is now among rejected videos.
self.assert_video_location(self.file_name, CONFIG_DATA['edx_s3_rejected_prefix'])
@patch('control.veda_file_discovery.FileDiscovery.validate_metadata_and_feed_to_ingest')
def test_discover_studio_ingested_videos(self, mock_validate_and_feed_to_ingest):
"""
Tests that 'FileDiscovery.discover_studio_ingested_videos' works as expected.
"""
self.upload_video_with_metadata()
with temporary_directory() as node_work_directory:
file_discovery_instance = FileDiscovery(node_work_directory=node_work_directory)
file_discovery_instance.discover_studio_ingested_videos()
self.assertTrue(mock_validate_and_feed_to_ingest.called)
def test_build(self): @ddt.data(
('veda/working', '[File Ingest] S3 Ingest Connection Failure'),
(None, '[File Ingest] No Working Node directory')
)
@ddt.unpack
@patch('control.veda_file_discovery.ErrorObject.print_error')
@patch('boto.s3.connection.S3Connection')
def test_discover_studio_ingested_video_exceptions(self, work_dir, error_message, mocked_s3_conn, mock_error):
""" """
Check a known file for validity Tests 'FileDiscovery.discover_studio_ingested_videos' exception cases.
""" """
self.assertTrue(True) mocked_s3_conn.side_effect = S3ResponseError('Error', 'Timeout')
file_discovery_instance = FileDiscovery(node_work_directory=work_dir)
file_discovery_instance.discover_studio_ingested_videos()
mock_error.assert_called_with(message=error_message)
@ddt.data(
(None, 'invalid_course_key'),
('non-existent-hex', None)
)
@ddt.unpack
@patch('control.veda_file_discovery.VALAPICall.call')
def test_validate_metadata_and_feed_to_ingest_invalid_course(self, course_hex, course_key, mock_val_api):
"""
Tests 'validate_metadata_and_feed_to_ingest' with non-existent course hex and invalid
course key, this won't create a course.
"""
self.upload_video_with_metadata(course_video_upload_token=course_hex, course_key=course_key)
with temporary_directory() as node_work_directory:
file_discovery_instance = FileDiscovery(node_work_directory=node_work_directory)
file_discovery_instance.discover_studio_ingested_videos()
def main(): # assert that video file now among rejected videos.
unittest.main() self.assert_video_location(self.file_name, CONFIG_DATA['edx_s3_rejected_prefix'])
self.assertTrue(mock_val_api.called)
@ddt.data(
'course-v1:MAx+123+test_run',
'course-v1:new_org+new_number+new_run'
)
@patch('control.veda_file_discovery.VedaIngest', Mock(complete=True))
@patch('control.veda_file_discovery.FileDiscovery.parse_transcript_preferences', Mock(return_value={}))
def test_validate_metadata_and_feed_to_ingest_happy_flow(self, course_id):
"""
Tests 'validate_metadata_and_feed_to_ingest' once with existing course and then with valid
course key, while the course_hex is not set.
"""
self.setup_course()
self.upload_video_with_metadata(course_video_upload_token=None, course_key=course_id)
with temporary_directory() as node_work_directory:
file_discovery_instance = FileDiscovery(node_work_directory=node_work_directory)
file_discovery_instance.discover_studio_ingested_videos()
# Assert the course in the database.
course_key = CourseKey.from_string(course_id)
course = Course.objects.get(institution=course_key.org, edx_classid=course_key.course)
self.assertEqual(course.course_name, ' '.join([course_key.org, course_key.course]))
self.assertEqual(course.local_storedir, course_id)
# assert that video file has been ingested successfully.
self.assert_video_location(self.file_name, CONFIG_DATA['edx_s3_processed_prefix'])
@patch('boto.s3.key.Key.get_contents_to_filename', Mock(side_effect=S3DataError('Unable to download.')))
def test_validate_metadata_and_feed_to_ingest_with_download_failure(self):
"""
Tests 'validate_metadata_and_feed_to_ingest' with video download failure from s3 to working directory.
"""
self.setup_course()
self.upload_video_with_metadata(course_video_upload_token=None)
with temporary_directory() as node_work_directory:
file_discovery_instance = FileDiscovery(node_work_directory=node_work_directory)
file_discovery_instance.discover_studio_ingested_videos()
if __name__ == '__main__': # assert that video file now among rejected videos.
sys.exit(main()) self.assert_video_location(self.file_name, CONFIG_DATA['edx_s3_rejected_prefix'])
import json import json
import logging import logging
import os.path
import boto import boto
import boto.s3 import boto.s3
from boto.exception import S3ResponseError, S3DataError from boto.exception import S3ResponseError, S3DataError
from VEDA_OS01.models import TranscriptCredentials from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from VEDA.utils import extract_course_org, get_config from VEDA.utils import extract_course_org, get_config
from VEDA_OS01.models import TranscriptCredentials
try: try:
boto.config.add_section('Boto') boto.config.add_section('Boto')
...@@ -125,162 +127,209 @@ class FileDiscovery(object): ...@@ -125,162 +127,209 @@ class FileDiscovery(object):
reset_queries() reset_queries()
def studio_s3_ingest(self): def move_video(self, key, destination_dir):
""" """
Ingest files from studio upload endpoint Moves an S3 video key to destination directory within the same bucket.
"""
if self.node_work_directory is None:
ErrorObject().print_error(
message='No Workdir'
)
return
conn = boto.connect_s3()
try:
self.bucket = conn.get_bucket(self.auth_dict['edx_s3_ingest_bucket'])
except S3ResponseError:
print 'S3: Ingest Conn Failure'
return
for key in self.bucket.list(self.auth_dict['edx_s3_ingest_prefix'], '/'):
meta = self.bucket.get_key(key.name)
self.studio_s3_validate(
meta=meta,
key=key
)
def studio_s3_validate(self, meta, key): Arguments:
if meta.get_metadata('course_video_upload_token') is None: key: An S3 file key.
return None destination_dir: target directory where the key will be moved eventually.
"""
client_title = meta.get_metadata('client_video_id') new_key_name = os.path.join(destination_dir, os.path.basename(key.name))
course_hex = meta.get_metadata('course_video_upload_token') key.copy(self.bucket, new_key_name)
course_url = meta.get_metadata('course_key') key.delete()
transcript_preferences = meta.get_metadata('transcript_preferences')
edx_filename = key.name[::-1].split('/')[0][::-1]
if len(course_hex) == 0: def reject_file_and_update_val(self, key, s3_filename, client_title, course_id):
return None """
Moves a video file to rejected videos, update edx-val to 'invalid_token'.
course_query = Course.objects.filter(studio_hex=course_hex) Arguments:
if len(course_query) == 0: key: An S3 key to be moved to /rejected
V = VideoProto( s3_filename: Name of the file
s3_filename=edx_filename, client_title: client title from Key's S3 metadata
client_title=client_title, course_id: course run identifier
file_extension='', """
platform_course_url=course_url video_proto = VideoProto(
) s3_filename=s3_filename,
client_title=client_title,
file_extension='',
platform_course_url=course_id
)
# Update val status to 'invalid_token'
VALAPICall(video_proto=video_proto, val_status=u'invalid_token').call()
# Move the video file to 'edx-prod/rejected' directory.
self.move_video(key, destination_dir=self.auth_dict['edx_s3_rejected_prefix'])
""" def get_or_create_course(self, course_id, course_hex=None):
Call VAL Api """
""" Retrieves a course associated with course_hex, course_id or a creates new one.
val_status = 'invalid_token'
VAC = VALAPICall(
video_proto=V,
val_status=val_status
)
VAC.call()
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1] Arguments:
key.copy(self.bucket, new_key) course_id: course id identifying a course run
key.delete() course_hex: studio_hex identifying course runs
return
file_extension = client_title[::-1].split('.')[0][::-1] Details:
- if course_hex is there, try getting course with course_hex.
- otherwise try making use of course_id to get the associated course
and if no course is associated with the course_id, try creating
a new course with course_name, institution, edx_classid and
local_storedir.
""" """
download file if not course_hex:
"""
if len(file_extension) == 3:
try: try:
meta.get_contents_to_filename( course_key = CourseKey.from_string(course_id)
os.path.join( except InvalidKeyError:
self.node_work_directory, return
edx_filename + '.' + file_extension
) course = Course.objects.filter(institution=course_key.org, edx_classid=course_key.course).first()
if course:
course_runs = course.course_runs
if course_id not in course_runs:
course_runs.append(course_id)
course.local_storedir = ','.join(course_runs)
course.save()
else:
course_name = '{org} {number}'.format(org=course_key.org, number=course_key.course)
course = Course.objects.create(
course_name=course_name,
institution=course_key.org,
edx_classid=course_key.course,
local_storedir=course_id,
) )
file_ingested = True
except S3DataError:
print 'File Copy Fail: Studio S3 Ingest'
file_ingested = False
else: else:
try: try:
meta.get_contents_to_filename( course = Course.objects.get(studio_hex=course_hex)
os.path.join( except Course.DoesNotExist:
self.node_work_directory, return
edx_filename
) return course
)
file_ingested = True def download_video_to_working_directory(self, key, file_name, file_extension):
except S3DataError: """
print 'File Copy Fail: Studio S3 Ingest' Downloads the video to working directory from S3 and
file_ingested = False returns whether its successfully downloaded or not.
file_extension = ''
Arguments:
if file_ingested is not True: key: An S3 key whose content is going to be downloaded
# 's3 Bucket ingest Fail' file_name: Name of the file when its in working directory
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1] file_extension: extension of this file.
key.copy(self.bucket, new_key) """
key.delete() if len(file_extension) == 3:
return file_name = u'{file_name}.{ext}'.format(file_name=file_name, ext=file_extension)
# Make decision if this video needs the transcription as well. try:
key.get_contents_to_filename(os.path.join(self.node_work_directory, file_name))
file_ingested = True
except S3DataError:
file_ingested = False
LOGGER.exception('[File Ingest] Error downloading the file into node working directory.')
return file_ingested
def parse_transcript_preferences(self, course_id, transcript_preferences):
"""
Parses and validates transcript preferences.
Arguments:
course_id: course id identifying a course run.
transcript_preferences: A serialized dict containing third party transcript preferences.
"""
try: try:
transcript_preferences = json.loads(transcript_preferences) transcript_preferences = json.loads(transcript_preferences)
TranscriptCredentials.objects.get( TranscriptCredentials.objects.get(
org=extract_course_org(course_url), org=extract_course_org(course_id),
provider=transcript_preferences.get('provider') provider=transcript_preferences.get('provider')
) )
process_transcription = True
except (TypeError, TranscriptCredentials.DoesNotExist): except (TypeError, TranscriptCredentials.DoesNotExist):
# when the preferences are not set OR these are set to some data in invalid format OR these don't # when the preferences are not set OR these are set to some data in invalid format OR these don't
# have associated 3rd party transcription provider API keys. # have associated 3rd party transcription provider API keys.
process_transcription = False transcript_preferences = None
except ValueError: except ValueError:
LOGGER.error('[VIDEO-PIPELINE] File Discovery - Invalid transcripts preferences=%s', transcript_preferences) LOGGER.exception('[File Discovery] Invalid transcripts preferences=%s', transcript_preferences)
process_transcription = False transcript_preferences = None
# Trigger Ingest Process
video_metadata = dict(
s3_filename=edx_filename,
client_title=client_title,
file_extension=file_extension,
platform_course_url=course_url,
)
if process_transcription:
video_metadata.update({
'process_transcription': process_transcription,
'provider': transcript_preferences.get('provider'),
'three_play_turnaround': transcript_preferences.get('three_play_turnaround'),
'cielo24_turnaround': transcript_preferences.get('cielo24_turnaround'),
'cielo24_fidelity': transcript_preferences.get('cielo24_fidelity'),
'preferred_languages': transcript_preferences.get('preferred_languages'),
'source_language': transcript_preferences.get('video_source_language'),
})
ingest = VedaIngest(
course_object=course_query[0],
video_proto=VideoProto(**video_metadata),
node_work_directory=self.node_work_directory
)
ingest.insert()
if ingest.complete is False: return transcript_preferences
return
def discover_studio_ingested_videos(self):
""" """
Delete Original After Copy Discovers studio ingested videos, for further validations and processes.
""" """
if self.auth_dict['edx_s3_ingest_prefix'] is not None: if self.node_work_directory:
new_key = 'prod-edx/processed/' + key.name[::-1].split('/')[0][::-1] try:
key.copy(self.bucket, new_key) connection = boto.connect_s3()
self.bucket = connection.get_bucket(self.auth_dict['edx_s3_ingest_bucket'])
for video_s3_key in self.bucket.list(self.auth_dict['edx_s3_ingest_prefix'], '/'):
self.validate_metadata_and_feed_to_ingest(video_s3_key=self.bucket.get_key(video_s3_key.name))
except S3ResponseError:
ErrorObject.print_error(message='[File Ingest] S3 Ingest Connection Failure')
else:
ErrorObject.print_error(message='[File Ingest] No Working Node directory')
key.delete() def validate_metadata_and_feed_to_ingest(self, video_s3_key):
"""
Validates the video key and feed it to ingestion phase.
Arguments:
video_s3_key: An S3 Key associated with a (to be ingested)video file.
def main(): Process/Steps:
pass 1 - Get or create an associated course for a video.
2 - Download video to node working directory from S3.
3 - Check if this video has valid 3rd Party transcript provider along with the preferences.
4 - Set up an ingest instance and insert video to ingestion phase.
5 - On completing ingestion, mark the video file as processed.
Note:
Failure at any discovery point will cause video file to be marked as rejected.
"""
client_title = video_s3_key.get_metadata('client_video_id')
course_hex = video_s3_key.get_metadata('course_video_upload_token')
course_id = video_s3_key.get_metadata('course_key')
transcript_preferences = video_s3_key.get_metadata('transcript_preferences')
filename = os.path.basename(video_s3_key.name)
# Try getting course based on the S3 metadata set on the video file.
course = self.get_or_create_course(course_id, course_hex=course_hex)
if course:
# Download video file from S3 into node working directory.
file_extension = os.path.splitext(client_title)[1][1:]
file_downloaded = self.download_video_to_working_directory(video_s3_key, filename, file_extension)
if not file_downloaded:
# S3 Bucket ingest failed, move the file rejected directory.
self.move_video(video_s3_key, destination_dir=self.auth_dict['edx_s3_rejected_prefix'])
return
# Prepare to ingest.
video_metadata = dict(
s3_filename=filename,
client_title=client_title,
file_extension=file_extension,
platform_course_url=course_id,
)
# Check if this video also having valid 3rd party transcription preferences.
transcript_preferences = self.parse_transcript_preferences(course_id, transcript_preferences)
if transcript_preferences is not None:
video_metadata.update({
'process_transcription': True,
'provider': transcript_preferences.get('provider'),
'three_play_turnaround': transcript_preferences.get('three_play_turnaround'),
'cielo24_turnaround': transcript_preferences.get('cielo24_turnaround'),
'cielo24_fidelity': transcript_preferences.get('cielo24_fidelity'),
'preferred_languages': transcript_preferences.get('preferred_languages'),
'source_language': transcript_preferences.get('video_source_language'),
})
ingest = VedaIngest(
course_object=course,
video_proto=VideoProto(**video_metadata),
node_work_directory=self.node_work_directory
)
ingest.insert()
if __name__ == '__main__': if ingest.complete:
sys.exit(main()) # Move the video file into 'prod-edx/processed' directory, if ingestion is complete.
self.move_video(video_s3_key, destination_dir=self.auth_dict['edx_s3_processed_prefix'])
else:
# Reject the video file and update val status to 'invalid_token'
self.reject_file_and_update_val(video_s3_key, filename, client_title, course_id)
--- ---
# This configuration should only have static settings. # This configuration should only have static settings.
# s3 bucket static prefixes
edx_s3_processed_prefix: prod-edx/processed/
edx_s3_rejected_prefix: prod-edx/rejected/
# Celery Info # Celery Info
onsite_worker: False onsite_worker: False
celery_threads: 1 celery_threads: 1
......
--- ---
veda_s3_hotstore_bucket: s3_hotstore_bucket veda_s3_hotstore_bucket: s3_hotstore_bucket
veda_deliverable_bucket: s3_deliverable_bucket veda_deliverable_bucket: s3_deliverable_bucket
edx_s3_ingest_prefix: ingest/
edx_s3_ingest_bucket: s3_ingest_bucket
edx_s3_endpoint_bucket: s3_deliverable_bucket edx_s3_endpoint_bucket: s3_deliverable_bucket
multi_upload_barrier: 2000000000 multi_upload_barrier: 2000000000
veda_base_url: https://veda.edx.org veda_base_url: https://veda.edx.org
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment