Commit 84e51419 by muhammad-ammar Committed by muzaffaryousaf

update cielo24 integration

parent 6f0afecb
......@@ -10,7 +10,7 @@ from rest_framework import routers
from django.conf.urls import patterns, include, url
from django.contrib import admin
from VEDA_OS01 import views
from VEDA_OS01 import views, transcripts
router = routers.DefaultRouter()
admin.autodiscover()
......@@ -33,5 +33,10 @@ urlpatterns = [
url(r'^api/', include(router.urls)),
url(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework')),
# Cheap auth server
url(r'^veda_auth/', views.token_auth)
url(r'^veda_auth/', views.token_auth),
url(
regex=r'^cielo24/transcript_completed/(?P<token>[\w]+)$',
view=transcripts.Cielo24CallbackHandlerView.as_view(),
name='cielo24_transcript_completed'
),
]
"""
Tests common utils
"""
from unittest import TestCase
from ddt import data, ddt, unpack
from VEDA_OS01 import utils
@ddt
class UtilTests(TestCase):
"""
Common util tests.
"""
@data(
{
'urls': ('http://api.cielo24/', '/add/job'),
'params': {},
'expected_url': 'http://api.cielo24/add/job'
},
{
'urls': ('http://api.cielo24', '/add/job'),
'params': {'a': 1, 'b': 2},
'expected_url': 'http://api.cielo24/add/job?a=1&b=2'
},
{
'urls': ('http://api.cielo24/', 'add/job'),
'params': {'c': 3, 'd': 4},
'expected_url': 'http://api.cielo24/add/job?c=3&d=4'
},
{
'urls': ('http://api.cielo24','add/job'),
'params': {'p': 100},
'expected_url': 'http://api.cielo24/add/job?p=100'
},
{
'urls': ('http://api.cielo24', 'add/job', 'media'),
'params': {'p': 100},
'expected_url': 'http://api.cielo24/add/job/media?p=100'
}
)
@unpack
def test_build_url(self, urls, params, expected_url):
"""
Tests that urils.build_url works as expected.
"""
url = utils.build_url(
*urls,
**params
)
self.assertEqual(
url,
expected_url
)
@data(
{
'course_id': 'course-v1:MITx+4.605x+3T2017',
'expected_org': 'MITx'
},
{
'course_id': 'WestonHS/PFLC1x/3T2015',
'expected_org': 'WestonHS'
},
{
'course_id': '',
'expected_org': None
},
)
@unpack
def test_extract_course_org(self, course_id, expected_org):
"""
Tests that urils.extract_course_org works as expected.
"""
org = utils.extract_course_org(course_id)
self.assertEqual(
org,
expected_org
)
def test_get_config(self):
"""
Tests that urils.get_config works as expected.
"""
config = utils.get_config()
self.assertNotEqual(config, {})
"""
Transcript handlers.
"""
import json
import logging
import uuid
import boto
import django.dispatch
import requests
from boto.s3.key import Key
from pysrt import SubRipFile
from requests.packages.urllib3.exceptions import InsecurePlatformWarning
from rest_framework import status
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from control.veda_val import VALAPICall
from VEDA_OS01 import utils
from VEDA_OS01.models import (TranscriptPreferences, TranscriptProcessMetadata,
TranscriptProvider, TranscriptStatus,
VideoStatus)
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
LOGGER = logging.getLogger(__name__)
TRANSCRIPT_SJSON = 'sjson'
CIELO24_TRANSCRIPT_COMPLETED = django.dispatch.Signal(providing_args=['job_id', 'lang_code', 'org', 'video_id'])
CIELO24_GET_CAPTION_URL = 'https://api.cielo24.com/api/job/get_caption'
CONFIG = utils.get_config()
class TranscriptError(Exception):
"""
An error occurred during fetching transcript from cielo24.
"""
pass
class TranscriptFetchError(TranscriptError):
"""
An error occurred during fetching transcript from cielo24.
"""
pass
class TranscriptConversionError(TranscriptError):
"""
An error occurred during srt to sjson conversion.
"""
pass
class TranscriptUploadError(TranscriptError):
"""
An error occurred during sjson upload to s3.
"""
pass
class AllowValidTranscriptProvider(AllowAny):
"""
Permission class to allow only valid transcript provider.
"""
def has_permission(self, request, view):
"""
Check if request is from valid transcript provider.
"""
try:
return CONFIG['transcript_provider_request_token'] == view.kwargs['token']
except KeyError:
return False
class Cielo24CallbackHandlerView(APIView):
"""
View to handler Cielo24 callback requests.
"""
permission_classes = (AllowValidTranscriptProvider,)
def get(self, request, **kwargs):
"""
Handle Cielo24 callback request.
"""
attrs = ('job_id', 'lang_code', 'org', 'video_id')
if not all([attr in request.query_params for attr in attrs]):
LOGGER.warn('[CIELO24 HANDLER] Required params are missing %s', request.query_params.keys())
return Response({}, status=status.HTTP_400_BAD_REQUEST)
CIELO24_TRANSCRIPT_COMPLETED.send_robust(
sender=self,
org=request.query_params['org'],
job_id=request.query_params['job_id'],
video_id=request.query_params['video_id'],
lang_code=request.query_params['lang_code'],
)
return Response()
@django.dispatch.receiver(CIELO24_TRANSCRIPT_COMPLETED, dispatch_uid="cielo24_transcript_completed")
def cielo24_transcript_callback(sender, **kwargs):
"""
* download transcript(SRT) from Cielo24
* convert SRT to SJSON
* upload SJSON to AWS S3
* update transcript status in VAL
"""
process_metadata = None
transcript_prefs = None
org = kwargs['org']
job_id = kwargs['job_id']
video_id = kwargs['video_id']
lang_code = kwargs['lang_code']
LOGGER.info(
'[CIELO24 TRANSCRIPTS] Transcript complete request received for video=%s -- org=%s -- lang=%s -- job_id=%s',
video_id,
org,
lang_code,
job_id
)
# get transcript preferences for an organization
try:
transcript_prefs = TranscriptPreferences.objects.get(
org=org,
provider=TranscriptProvider.CIELO24,
)
except TranscriptPreferences.DoesNotExist:
LOGGER.exception('[CIELO24 TRANSCRIPTS] Unable to get transcript preferences for job_id=%s', job_id)
# mark the transcript for a particular language as ready
try:
process_metadata = TranscriptProcessMetadata.objects.filter(
provider=TranscriptProvider.CIELO24,
process_id=job_id,
lang_code=lang_code
).latest('modified')
except TranscriptProcessMetadata.DoesNotExist:
LOGGER.exception(
'[CIELO24 TRANSCRIPTS] Unable to get transcript process metadata for job_id=%s',
job_id
)
# if transcript preferences are missing then we can do nothing
if not transcript_prefs and process_metadata:
process_metadata.status = TranscriptStatus.FAILED
process_metadata.save()
if transcript_prefs and process_metadata:
api_key = transcript_prefs.api_key
try:
srt_data = fetch_srt_data(
CIELO24_GET_CAPTION_URL,
v=1,
job_id=job_id,
api_token=api_key,
caption_format='SRT'
)
except TranscriptFetchError:
process_metadata.status = TranscriptStatus.FAILED
process_metadata.save()
LOGGER.exception(
'[CIELO24 TRANSCRIPTS] Fetch request failed for video=%s -- lang=%s -- job_id=%s',
video_id,
lang_code,
job_id
)
return
process_metadata.status = TranscriptStatus.READY
process_metadata.save()
try:
sjson = convert_srt_to_sjson(srt_data)
sjson_file_name = upload_sjson_to_s3(CONFIG, sjson)
except Exception:
LOGGER.exception(
'[CIELO24 TRANSCRIPTS] Request failed for video=%s -- lang=%s -- job_id=%s -- message=%s',
video_id,
lang_code,
job_id
)
raise
# update edx-val with completed transcript information
val_api = VALAPICall(process_metadata.video, val_status=None)
val_api.update_val_transcript(
video_id=process_metadata.video.studio_id,
lang_code=lang_code,
name=sjson_file_name,
transcript_format=TRANSCRIPT_SJSON,
provider=TranscriptProvider.CIELO24
)
# update transcript status for video in edx-val only if all langauge transcripts are ready
video_jobs = TranscriptProcessMetadata.objects.filter(video__studio_id=video_id)
if all(video_job.status == TranscriptStatus.READY for video_job in video_jobs):
val_api.update_video_status(process_metadata.video.studio_id, VideoStatus.TRANSCRIPTION_READY)
def fetch_srt_data(url, **request_params):
"""
Fetch srt data from transcript provider.
"""
# return TRANSCRIPT_SRT_DATA
response = requests.get(
utils.build_url(url, **request_params)
)
if not response.ok:
raise TranscriptFetchError(
'[TRANSCRIPT FETCH ERROR] status={} -- text={}'.format(
response.status_code,
response.text
)
)
return response.text
def convert_srt_to_sjson(srt_data):
"""
Convert SRT to SJSON
Arguments:
srt_data: unicode, content of source subs.
Returns:
dict: SJSON data
"""
srt_subs_obj = SubRipFile.from_string(srt_data)
sub_starts = []
sub_ends = []
sub_texts = []
for sub in srt_subs_obj:
sub_starts.append(sub.start.ordinal)
sub_ends.append(sub.end.ordinal)
sub_texts.append(sub.text.replace('\n', ' '))
subs = {
'start': sub_starts,
'end': sub_ends,
'text': sub_texts
}
return subs
def upload_sjson_to_s3(config, sjson_data):
"""
Upload sjson data to s3.
"""
s3_conn = boto.connect_s3()
bucket = s3_conn.get_bucket(config['transcript_bucket_name'])
k = Key(bucket)
k.content_type = 'application/json'
k.key = '{directory}{uuid}.sjson'.format(
directory=config['transcript_bucket_directory'],
uuid=uuid.uuid4().hex
)
k.set_contents_from_string(json.dumps(sjson_data))
return k.key
"""
Common utils.
"""
import os
import urllib
import yaml
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
def get_config(yaml_config_file='instance_config.yaml'):
"""
Read yaml config file.
Arguments:
yaml_config_file (str): yaml config file name
Returns:
dict: yaml conifg
"""
config_dict = {}
yaml_config_file = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
yaml_config_file
)
with open(yaml_config_file, 'r') as config:
try:
config_dict = yaml.load(config)
except yaml.YAMLError:
pass
return config_dict
def extract_course_org(course_id):
"""
Extract video organization from course url.
"""
org = None
try:
org = CourseKey.from_string(course_id).org
except InvalidKeyError:
pass
return org
def build_url(*urls, **query_params):
"""
Build a url from specified params.
Arguments:
base_url (str): base url
relative_url (str): endpoint
query_params (dict): query params
Returns:
absolute url
"""
url = '/'.join(item.strip('/') for item in urls)
if query_params:
url = '{}?{}'.format(url, urllib.urlencode(query_params))
return url
"""
Cielo24 transcription testing
"""
from unittest import TestCase
import responses
from ddt import ddt
from mock import patch
from control.veda_deliver_cielo import Cielo24Transcript
from VEDA_OS01.models import (Cielo24Fidelity, Cielo24Turnaround, Course,
TranscriptProcessMetadata, TranscriptStatus,
Video)
from VEDA_OS01.utils import build_url
CONFIG_DATA = {
'cielo24_get_caption_url': 'http://api.cielo24.com/job/get_caption',
'transcript_bucket_access_key': 'bucket_access_key',
'transcript_bucket_secret_key': 'bucket_secret_key',
'transcript_bucket_name': 'bucket_name',
'val_token_url': 'http://val.edx.org/token',
'val_username': 'username',
'val_password': 'password',
'val_client_id': 'client',
'val_secret_key': 'secret',
'val_transcript_create_url': 'http://val.edx.org/transcript/create',
'val_video_transcript_status_url': 'http://val.edx.org/video/status',
'veda_base_url': 'https://veda.edx.org',
'transcript_provider_request_token': '1234a5a67cr890'
}
VIDEO_DATA = {
'studio_id': '12345'
}
@ddt
class Cielo24TranscriptTests(TestCase):
"""
Cielo24 transcription tests
"""
def setUp(self):
"""
Tests setup
"""
self.course = Course.objects.create(
course_name='Intro to VEDA',
institution='MAx',
edx_classid='123'
)
self.video = Video.objects.create(
inst_class=self.course,
**VIDEO_DATA
)
self.video_transcript_preferences = {
'org': 'MAx',
'api_key': 'cielo24_api_key',
'turnaround': Cielo24Turnaround.PRIORITY,
'fidelity': Cielo24Fidelity.PROFESSIONAL,
'preferred_languages': ['en', 'ur'],
's3_video_url': 'https://s3.amazonaws.com/bkt/video.mp4',
'callback_base_url': 'https://veda.edx.org/cielo24/transcript_completed/1234567890',
}
def tearDown(self):
"""
Test cleanup
"""
TranscriptProcessMetadata.objects.all().delete()
def cielo24_url(self, cielo24, endpoint):
"""
Return absolute url
Arguments:
cielo24 (Cielo24Transcript), object
endpoint (srt): url endpoint
Returns:
absolute url
"""
return build_url(cielo24.cielo24_site, endpoint)
def assert_request(self, received_request, expected_request):
"""
Verify that `received_request` matches `expected_request`
"""
self.assertEqual(received_request.method, expected_request['method'])
self.assertEqual(received_request.url, expected_request['url'])
self.assertEqual(received_request.body, expected_request['body'])
@responses.activate
def test_transcript_flow(self):
"""
Verify cielo24 transcription flow
"""
job_id = '000-111-222'
cielo24 = Cielo24Transcript(
video=self.video,
**self.video_transcript_preferences
)
responses.add(
responses.GET,
self.cielo24_url(cielo24, cielo24.cielo24_new_job),
body={'JobId': job_id},
status=200
)
responses.add(
responses.GET,
self.cielo24_url(cielo24, cielo24.cielo24_add_media),
body={'TaskId': '000-000-111'},
status=200
)
responses.add(
responses.GET,
self.cielo24_url(cielo24, cielo24.cielo24_perform_transcription),
body={'TaskId': '000-000-000'},
status=200
)
cielo24.start_transcription_flow()
# Total of 6 HTTP requests are made
# 3 cielo24 requests for first language(en)
# 3 cielo24 requests for second language(ur)
self.assertEqual(len(responses.calls), 6)
# pylint: disable=line-too-long
expected_data = [
{
'url': 'https://api.cielo24.com/api/job/new?api_token=cielo24_api_key&job_name=12345&language=en&v=1',
'body': None,
'method': 'GET'
},
{
'url': 'https://api.cielo24.com/api/job/add_media?media_url=https%253A%252F%252Fs3.amazonaws.com%252Fbkt%252Fvideo.mp4&api_token=cielo24_api_key&job_id=000-111-222&v=1',
'body': None,
'method': 'GET'
},
{
'url': 'https://api.cielo24.com/api/job/perform_transcription?transcription_fidelity=PROFESSIONAL&job_id=000-111-222&v=1&priority=PRIORITY&api_token=cielo24_api_key&callback_url=https%253A%252F%252Fveda.edx.org%252Fcielo24%252Ftranscript_completed%252F1234567890%253Flang_code%253D{}%2526video_id%253D12345%2526job_id%253D000-111-222%2526org%253DMAx&target_language={}',
'body': None,
'method': 'GET'
}
]
received_request_index = 0
for preferred_language in self.video_transcript_preferences['preferred_languages']:
for request_data in expected_data:
# replace target language with appropriate value
if 'api/job/perform_transcription' in request_data['url']:
request_data = dict(request_data)
request_data['url'] = request_data['url'].format(preferred_language, preferred_language)
self.assert_request(
responses.calls[received_request_index].request,
request_data
)
received_request_index += 1
@patch('control.veda_deliver_cielo.LOGGER')
@responses.activate
def test_transcript_flow_exceptions(self, mock_logger):
"""
Verify that cielo24 transcription flow works as expected in case of bad response from cielo24
"""
job_id = '010-010-010'
bad_request_message = 'Bad request data'
preferences = dict(self.video_transcript_preferences)
preferences['preferred_languages'] = ['en']
cielo24 = Cielo24Transcript(
video=self.video,
**preferences
)
responses.add(
responses.GET,
self.cielo24_url(cielo24, cielo24.cielo24_new_job),
body={'JobId': job_id},
status=200
)
responses.add(
responses.GET,
self.cielo24_url(cielo24, cielo24.cielo24_add_media),
body=bad_request_message,
status=400
)
cielo24.start_transcription_flow()
mock_logger.exception.assert_called_with(
'[CIELO24] Request failed for video=%s -- lang=%s -- job_id=%s',
self.video.studio_id,
preferences['preferred_languages'][0],
job_id
)
# Total of 2 HTTP requests are made for2 cielo24
self.assertEqual(len(responses.calls), 2)
process_metadata = TranscriptProcessMetadata.objects.all()
self.assertEqual(process_metadata.count(), 1)
self.assertEqual(process_metadata.first().status, TranscriptStatus.FAILED)
from ..veda_deliver_cielo import Cielo24Transcript
'''
TEST
list_of_ids = [
'XXXC93BC2016-V000100'
]
for l in list_of_ids:
x = Cielo24Transcript(
veda_id = l
)
output = x.perform_transcription()
print output
'''
import datetime
import ftplib
import logging
import os
import sys
import yaml
from os.path import expanduser
import boto
import boto.s3
from boto.s3.key import Key
from boto.exception import S3ResponseError
from os.path import expanduser
import requests
import datetime
import ftplib
import shutil
import yaml
from boto.exception import S3ResponseError
from boto.s3.key import Key
from django.core.urlresolvers import reverse
import veda_deliver_xuetang
from control_env import *
from veda_deliver_cielo import Cielo24Transcript
from veda_deliver_youtube import DeliverYoutube
from VEDA_OS01 import utils
from VEDA_OS01.models import TranscriptPreferences, VideoStatus, TranscriptProvider
from VEDA_OS01.utils import build_url
from veda_utils import ErrorObject, Metadata, Output, VideoProto
from veda_val import VALAPICall
from veda_video_validation import Validation
from watchdog import Watchdog
LOGGER = logging.getLogger(__name__)
try:
......@@ -28,14 +45,6 @@ and upload to the appropriate endpoint via the approp. methods
"""
homedir = expanduser("~")
from control_env import *
from veda_utils import ErrorObject, Output, Metadata, VideoProto
from veda_video_validation import Validation
from veda_val import VALAPICall
from veda_deliver_cielo import Cielo24Transcript
import veda_deliver_xuetang
from veda_deliver_youtube import DeliverYoutube
from watchdog import Watchdog
watchdog_time = 10.0
......@@ -171,7 +180,18 @@ class VedaDelivery:
Transcript, Xuetang
"""
self._THREEPLAY_UPLOAD()
self._CIELO24_UPLOAD()
# Transcription Process
# We only want to generate transcripts for `desktop_mp4` profile.
if self.encode_profile == 'desktop_mp4' and self.video_query.process_transcription:
# 3PlayMedia
if self.video_query.provider == TranscriptProvider.THREE_PLAY:
self.start_3play_transcription_process()
# Cielo24
if self.video_query.provider == TranscriptProvider.CIELO24:
self.cielo24_transcription_flow()
self._XUETANG_ROUTE()
self.status = self._DETERMINE_STATUS()
......@@ -507,21 +527,48 @@ class VedaDelivery:
os.chdir(homedir)
return True
def _CIELO24_UPLOAD(self):
if self.video_query.inst_class.c24_proc is False:
def cielo24_transcription_flow(self):
"""
Cielo24 transcription flow.
"""
org = utils.extract_course_org(self.video_proto.platform_course_url[0])
try:
api_key = TranscriptPreferences.objects.get(org=org, provider=self.video_query.provider).api_key
except TranscriptPreferences.DoesNotExist:
LOGGER.warn('[cielo24] Unable to find api_key for org=%s', org)
return None
if self.video_query.inst_class.mobile_override is False:
if self.encode_profile != 'desktop_mp4':
return None
s3_video_url = build_url(
self.auth_dict['s3_base_url'],
self.auth_dict['edx_s3_endpoint_bucket'],
self.encoded_file
)
C24 = Cielo24Transcript(
veda_id=self.video_query.edx_id
callback_base_url = build_url(
self.auth_dict['veda_base_url'],
reverse(
'cielo24_transcript_completed',
args=[self.auth_dict['transcript_provider_request_token']]
)
)
output = C24.perform_transcription()
print '[ %s ] : %s' % (
'Cielo24 JOB', self.video_query.edx_id
# update transcript status for video in edx-val
VALAPICall(video_proto=None, val_status=None).update_video_status(
self.video_query.studio_id, VideoStatus.TRANSCRIPTION_IN_PROGRESS
)
cielo24 = Cielo24Transcript(
self.video_query,
org,
api_key,
self.video_query.cielo24_turnaround,
self.video_query.cielo24_fidelity,
self.video_query.preferred_languages,
s3_video_url,
callback_base_url
)
cielo24.start_transcription_flow()
def _THREEPLAY_UPLOAD(self):
......
import logging
import os
import sys
import requests
......@@ -7,6 +8,8 @@ import json
import datetime
import yaml
LOGGER = logging.getLogger(__name__)
requests.packages.urllib3.disable_warnings()
......@@ -386,6 +389,63 @@ class VALAPICall():
)
)
def update_val_transcript(self, video_id, lang_code, name, transcript_format, provider):
"""
Update status for a completed transcript.
"""
if self.val_token is None:
self.val_tokengen()
post_data = {
'video_id': video_id,
'language': lang_code,
'transcript_url': name,
'transcript_format': transcript_format,
'provider': provider,
}
response = requests.post(
self.auth_dict['val_transcript_create_url'],
json=post_data,
headers=self.headers,
timeout=20
)
if not response.ok:
LOGGER.error(
'update_val_transcript failed -- video_id=%s -- provider=% -- status=%s',
video_id,
provider,
response.status_code
)
def update_video_status(self, video_id, status):
"""
Update video transcript status.
"""
if self.val_token is None:
self.val_tokengen()
val_data = {
'edx_video_id': video_id,
'status': status
}
response = requests.patch(
self.auth_dict['val_video_transcript_status_url'],
json=val_data,
headers=self.headers,
timeout=20
)
if not response.ok:
LOGGER.error(
'update_video_status failed -- video_id=%s -- status=%s -- text=%s',
video_id,
response.status_code,
response.text
)
def main():
pass
......
......@@ -43,6 +43,17 @@ veda_s3_hotstore_bucket:
veda_deliverable_bucket:
# Settings
multi_upload_barrier: 2000000000
veda_base_url:
s3_base_url: https://s3.amazonaws.com
# transcript bucket config
transcript_bucket_name:
transcript_bucket_directory: video-transcripts/
# a token identifying a valid request from transcript provider
transcript_provider_request_token:
# Ingest Secret
# TODO: Elminate access key after AWS Support ticket 08/20/17 regarding cross-account IAM role access.
......@@ -76,6 +87,8 @@ val_client_id:
val_secret_key:
val_password:
val_username:
val_transcript_create_url:
val_video_transcript_status_url:
# ---
# Celery Info
......
......@@ -12,3 +12,5 @@ boto
pyyaml
requests==2.18.1
celery==3.1.18
pysrt==1.1.1
edx-opaque-keys==0.4
\ No newline at end of file
......@@ -2,3 +2,7 @@
codecov==2.0.9
pep8==1.7.0
coverage==3.7.1
isort==4.2.15
ddt==1.1.1
moto==1.0.1
responses==0.6.1
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment