Commit 3eb78149 by Gregory Martin

control node

parent 1eacc561
from __future__ import absolute_import
import os
import sys
from celery import Celery
import yaml
"""
Start Celery Worker
"""
try:
from control.control_env import *
except:
from control_env import *
try:
from control.veda_deliver import VedaDelivery
except:
from veda_deliver import VedaDelivery
auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
)
with open(auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
auth_dict = None
CEL_BROKER = 'amqp://' + auth_dict['rabbitmq_user'] + ':' + auth_dict['rabbitmq_pass'] + '@' \
+ auth_dict['rabbitmq_broker'] + ':5672//'
CEL_BACKEND = 'amqp://' + auth_dict['rabbitmq_user'] + ':' + auth_dict['rabbitmq_pass'] + \
'@' + auth_dict['rabbitmq_broker'] + ':5672//'
app = Celery(auth_dict['celery_app_name'], broker=CEL_BROKER, backend=CEL_BACKEND, include=[])
app.conf.update(
BROKER_CONNECTION_TIMEOUT=60,
CELERY_IGNORE_RESULT=True,
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_PREFETCH_MULTIPLIER=1,
CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml']
)
@app.task(name='worker_encode')
def worker_task_fire(veda_id, encode_profile, jobid):
pass
@app.task(name='supervisor_deliver')
def deliverable_route(veda_id, encode_profile):
VD = VedaDelivery(
veda_id=veda_id,
encode_profile=encode_profile
)
VD.run()
@app.task
def node_test(command):
os.system(command)
@app.task(name='legacy_heal')
def maintainer_healer(command):
os.system(command)
if __name__ == '__main__':
app.start()
#!/usr/bin/env python
"""
VEDA Environment variables
"""
import os
import sys
import django
from django.utils.timezone import utc
from django.db import reset_queries
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'common.settings'
django.setup()
from pipeline.models import Institution
from pipeline.models import Course
from pipeline.models import Video
from pipeline.models import Destination
from pipeline.models import Encode
from pipeline.models import URL
from pipeline.models import VedaUpload
"""
Central Config
"""
WORK_DIRECTORY = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'VEDA_WORKING'
)
if not os.path.exists(WORK_DIRECTORY):
os.mkdir(WORK_DIRECTORY)
"""
Occasionally this throws an error in the env,
but on EC2 with v_videocompile it's no biggie
"""
FFPROBE = "ffprobe"
FFMPEG = "ffmpeg"
"""
Generalized display and such
"""
"""
TERM COLORS
"""
NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m'
from ..veda_deliver_cielo import Cielo24Transcript
'''
TEST
list_of_ids = [
'XXXC93BC2016-V000100'
]
for l in list_of_ids:
x = Cielo24Transcript(
veda_id = l
)
output = x.perform_transcription()
print output
'''
import os
import sys
import unittest
"""
Test encode profiler
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_env import *
from veda_encode import VedaEncode
class TestEncode(unittest.TestCase):
def setUp(self):
self.course_object = Course.objects.get(
institution='XXX',
edx_classid='XXXXX'
)
self.veda_id = 'XXXXXXXX2016-V00TEST'
self.E = VedaEncode(
course_object=self.course_object,
veda_id=self.veda_id
)
def test_encode_url(self):
"""
gen baseline, gen a url, test against baseline
"""
URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest()
).delete()
encode_list = self.E.determine_encodes()
baseline = len(encode_list)
self.assertTrue(isinstance(encode_list, list))
self.E.encode_list = []
U = URL(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
encode_profile=Encode.objects.get(product_spec='mobile_low'),
encode_url='THIS Is A TEST'
)
U.save()
encode_list = self.E.determine_encodes()
self.assertTrue(len(encode_list) == baseline - 1)
self.E.encode_list = []
URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
).delete()
encode_list = self.E.determine_encodes()
self.assertTrue(len(encode_list) == baseline)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
'''
Save for poss future test
# import celeryapp
# co = Course.objects.get(institution='XXX', edx_classid='C93BC')
# vid = 'XXXC93BC2016-V003500'
# v = VedaEncode(course_object=co, veda_id=vid)
# encode_list = v.determine_encodes()
# for e in encode_list:
# veda_id = vid
# encode_profile = e
# jobid = uuid.uuid1().hex[0:10]
# # celeryapp.worker_task_fire.apply_async(
# # (veda_id, encode_profile, jobid),
# # queue='encode_worker'
# # )
'''
import os
import sys
import unittest
"""
Test VEDA API
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_file_discovery import FileDiscovery
class TestValidation(unittest.TestCase):
def setUp(self):
self.videofile = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.FD = FileDiscovery()
def test_build(self):
"""
Check a known file for validity
"""
self.assertTrue(True)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
import requests
import ast
import yaml
"""
This is an API connection test
set to pass if instance_config.yaml is missing
"""
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_file_ingest import VideoProto, VedaIngest
requests.packages.urllib3.disable_warnings()
class TestIngest(unittest.TestCase):
def setUp(self):
self.VP = VideoProto(
s3_filename=None,
client_title='OVTESTFILE_01',
file_extension='mp4'
)
self.VI = VedaIngest(
course_object=None,
video_proto=self.VP
)
self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml'
)
def test_file_ingest(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VI.auth_dict is None)
return None
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test heal processor
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_env import *
from veda_heal import VedaHeal
class TestEncode(unittest.TestCase):
def setUp(self):
U = URL(
videoID=Video.objects.filter(edx_id='XXXXXXXX2014-V00TES1').latest(),
encode_profile=Encode.objects.get(product_spec='mobile_low'),
encode_url='THIS Is A TEST')
U.save()
def test_encode_url(self):
H = VedaHeal()
H.discovery()
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test upload processes
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_hotstore import Hotstore
from veda_file_ingest import VideoProto
from veda_env import *
class TestHotstore(unittest.TestCase):
def setUp(self):
VP = VideoProto()
VP.veda_id = 'XXXXXXXX2014-V00TEST'
self.upload_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.H1 = Hotstore(
video_object=VP,
upload_filepath=self.upload_filepath
)
def test_single_upload(self):
if self.H1.auth_dict is None:
self.assertTrue(self.H1.upload() is False)
return None
self.assertTrue(self.H1.upload())
def test_multi_upload(self):
if self.H1.auth_dict is None:
self.assertTrue(self.H1.upload() is None)
return None
self.H1.auth_dict['multi_upload_barrier'] = 0
self.assertTrue(self.H1.upload())
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
A basic unittest for the "Course Addition Tool"
"""
sys.path.append(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
from veda_utils import Report
class TestReporting(unittest.TestCase):
def setUp(self):
self.R = Report(
status='Complete',
upload_serial="4939d60a60",
youtube_id='TEST'
)
def test_conn(self):
if self.R.auth_dict is None:
self.assertTrue(True)
return None
self.R.upload_status()
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
import requests
import ast
import yaml
requests.packages.urllib3.disable_warnings()
"""
This is an API connection test
set to pass if instance_config.yaml is missing
"""
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_val import VALAPICall
from veda_file_ingest import VideoProto
class TestVALAPI(unittest.TestCase):
def setUp(self):
self.VP = VideoProto(
client_title='Test Title',
veda_id='TESTID'
)
self.VAC = VALAPICall(
video_proto=self.VP,
val_status='complete'
)
self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml'
)
def test_val_setup(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VAC.auth_dict is None)
return None
salient_variables = [
'val_api_url',
'val_client_id',
'val_password',
'val_secret_key',
'val_username',
'val_token_url',
]
for s in salient_variables:
self.assertTrue(len(self.VAC.auth_dict[s]) > 0)
def test_val_connection(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VAC.auth_dict is None)
return None
self.VAC.val_tokengen()
self.assertFalse(self.VAC.val_token is None)
s = requests.get(
self.VAC.auth_dict['val_api_url'],
headers=self.VAC.headers,
timeout=20
)
self.assertFalse(s.status_code == 404)
self.assertFalse(s.status_code > 299)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test VEDA API
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_video_validation import Validation
class TestValidation(unittest.TestCase):
def setUp(self):
self.videofile = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.VALID = Validation(
videofile=self.videofile
)
def test_validation(self):
"""
Check a known file for validity
"""
self.assertTrue(self.VALID.validate())
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
---
# Authentication keys
# VEDA AWS Account
veda_s3_upload_bucket:
veda_s3_hotstore_bucket:
veda_deliverable_bucket:
veda_access_key_id:
veda_secret_access_key:
multi_upload_barrier: 2000000000
# edX AWS Account
edx_aws_user:
edx_access_key_id:
edx_secret_access_key:
edx_s3_ingest_bucket:
edx_s3_endpoint_bucket:
edx_cloudfront_prefix:
# email vars
veda_noreply_email:
admin_email:
# VAL user creds
val_api_url:
val_client_id:
val_password:
val_secret_key:
val_token_url:
val_username:
## Celery Info
celery_app_name:
# can do multiple queues like so: foo,bar,baz
main_celery_queue: encode_worker
largefile_queue_barrier: 1000000000
largefile_celery_queue: large_encode_worker
celery_stat_queue: transcode_stat
celery_threads: 1
rabbitmq_broker:
rabbitmq_pass:
rabbitmq_user:
# Shotgun Variables
sg_server_path:
sg_script_name:
sg_script_key:
# Endpoints
threeplay_ftphost:
xuetang_api_url:
xuetang_api_shared_secret:
##---
# This is a list of encodes and their respective course
# boolean matches
encode_dict:
review_proc:
- review
mobile_override:
- override
s3_proc:
- mobile_high
- mobile_low
- audio_mp3
- desktop_webm
- desktop_mp4
- hls
yt_proc:
- youtube
##---
# This is a list of encode profiles and their val profile matches
# boolean matches
val_profile_dict:
mobile_low:
- mobile_low
desktop_mp4:
- desktop_mp4
override:
- desktop_mp4
- mobile_low
- mobile_high
mobile_high:
- mobile_high
audio_mp3:
- audio_mp3
desktop_webm:
- desktop_webm
youtube:
- youtube
review:
hls:
- hls
#--
# Heal settings
heal_start: 1
heal_end: 144
...
import os
import sys
import requests
from requests.auth import HTTPBasicAuth
import ast
import urllib
"""
Cielo24 API Job Start and Download
Options (reflected in Course.models):
transcription_fidelity =
Mechanical (75%),
Premium (95%)(3-72h),
Professional (99+%)(3-72h)
priority =
standard (24h),
priority (48h)
turnaround_hours = number, overrides 'priority' call, will change a standard to a priority silently
"""
from control_env import *
from veda_utils import ErrorObject, Output
requests.packages.urllib3.disable_warnings()
class Cielo24Transcript():
def __init__(self, veda_id):
self.veda_id = veda_id
'''Defaults'''
self.c24_site = 'https://api.cielo24.com/api'
self.c24_login = '/account/login'
self.c24_joblist = '/job/list'
self.c24_newjob = '/job/new'
self.add_media = '/job/add_media'
self.transcribe = '/job/perform_transcription'
'''Retreive C24 Course-based defaults'''
self.c24_defaults = self.retrieve_defaults()
def perform_transcription(self):
if self.c24_defaults['c24_user'] is None:
return None
'''
GET /api/job/perform_transcription?v=1 HTTP/1.1
&api_token=xxxx
&job_id=xxxx
&transcription_fidelity=PREMIUM&priority=STANDARD
Host: api.cielo24.com
'''
api_token = self.tokengenerator()
if api_token is None:
return None
job_id = self.generate_jobs(api_token)
task_id = self.embed_url(api_token, job_id)
r5 = requests.get(
''.join((
self.c24_site,
self.transcribe,
'?v=1&api_token=',
api_token,
'&job_id=',
job_id,
'&transcription_fidelity=',
self.c24_defaults['c24_fidelity'],
'&priority=',
self.c24_defaults['c24_speed']
))
)
return ast.literal_eval(r5.text)['TaskId']
def retrieve_defaults(self):
video_query = Video.objects.filter(
edx_id=self.veda_id
).latest()
if video_query.inst_class.mobile_override is True:
url_query = URL.objects.filter(
videoID=video_query,
encode_url__icontains='_LBO.mp4',
).latest()
else:
url_query = URL.objects.filter(
videoID=video_query,
encode_url__icontains='_DTH.mp4',
).latest()
if video_query.inst_class.c24_username is None:
ErrorObject.print_error(
message='Cielo24 Record Incomplete',
)
return None
c24_defaults = {
'c24_user': video_query.inst_class.c24_username,
'c24_pass': video_query.inst_class.c24_password,
'c24_speed': video_query.inst_class.c24_speed,
'c24_fidelity': video_query.inst_class.c24_fidelity,
'edx_id': self.veda_id,
'url': url_query.encode_url
}
return c24_defaults
def tokengenerator(self):
token_url = self.c24_site + self.c24_login + \
'?v=1&username=' + self.c24_defaults['c24_user'] + \
'&password=' + self.c24_defaults['c24_pass']
# Generate Token
r1 = requests.get(token_url)
if r1.status_code > 299:
ErrorObject.print_error(
message='Cielo24 API Access Error',
)
return None
api_token = ast.literal_eval(r1.text)["ApiToken"]
return api_token
def listjobs(self):
"""List Jobs"""
api_token = self.tokengenerator()
r2 = requests.get(
''.join((
self.c24_site,
self.c24_joblist,
'?v=1&api_token=',
api_token
))
)
job_list = r2.text
return job_list
def generate_jobs(self, api_token):
"""
'https://api.cielo24.com/job/new?v=1&\
api_token=xxx&job_name=xxx&language=en'
"""
r3 = requests.get(
''.join((
self.c24_site,
self.c24_newjob,
'?v=1&api_token=',
api_token,
'&job_name=',
self.c24_defaults['edx_id'],
'&language=en'
))
)
job_id = ast.literal_eval(r3.text)['JobId']
return job_id
def embed_url(self, api_token, job_id):
"""
GET /api/job/add_media?v=1&api_token=xxxx
&job_id=xxxxx
&media_url=http%3A%2F%2Fwww.domain.com%2Fvideo.mp4 HTTP/1.1
Host: api.cielo24.com
"""
r4 = requests.get(
''.join((
self.c24_site,
self.add_media,
'?v=1&api_token=',
api_token,
'&job_id=',
job_id,
'&media_url=',
urllib.quote_plus(self.c24_defaults['url'])
))
)
print str(r4.status_code) + ' : Cielo24 Status Code'
return ast.literal_eval(r4.text)['TaskId']
def main():
pass
if __name__ == "__main__":
sys.exit(main())
import os
import hashlib
import hmac
import base64
import datetime
import requests
import json
import time
from time import strftime
import yaml
"""
Authored by Ed Zarecor / edx DevOps
included by request
Some adaptations for VEDA:
-auth yaml
**VEDA Note: since this isn't a real CDN, and represents the
'least effort' response to getting video into china,
we shan't monitor for success**
"""
read_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'instance_config.yaml'
)
if os.path.exists(read_yaml):
with open(read_yaml, 'r') as stream:
auth_dict = yaml.load(stream)
API_SHARED_SECRET = auth_dict['xuetang_api_shared_secret']
API_ENDPOINT = auth_dict['xuetang_api_url']
# Currently the API support no query arguments so this component of the signature
# will always be an empty string.
API_QUERY_STRING = ""
SEPERATOR = '*' * 10
"""
This script provides a functions for accessing the Xuetang CDN API
It expects that an environment variable name XUETANG_SHARED_SECRET is
available and refers to a valid secret provided by the Xuetang CDN team.
Running this script will cause a video hosted in cloudfront to be
uploaded to the CDN via the API.
The status of the video will be monitored in a loop, exiting when
the terminal status, available, has been reached.
Finally, the video will be deleted from the cache exercising the
delete functionality.
"""
def _pretty_print_request(req):
"""
Convenience function for pretty printing requests for debugging API
issues.
"""
print('\n'.join(
[SEPERATOR + ' start-request ' + SEPERATOR,
req.method + ' ' + req.url,
'\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
req.body,
SEPERATOR + ' end-request ' + SEPERATOR]))
def build_message(verb, uri, query_string, date, payload_hash):
"""
Builds a message conforming to the Xuetang format for mutual authentication. The
format is defined in their CDN API specification document.
"""
return os.linesep.join([verb, uri, query_string, date, payload_hash])
def sign_message(message, secret):
"""
Returns a hexdigest of HMAC generated using sha256. The value is included in
the HTTP headers and used for mutual authentication via a shared secret.
"""
return hmac.new(bytes(secret), bytes(message), digestmod=hashlib.sha256).hexdigest()
def hex_digest(payload):
"""
returns the sha256 hexdigest of the request payload, typically JSON.
"""
return hashlib.sha256(bytes(payload)).hexdigest()
def get_api_date():
"""
Returns an "RFC8601" date as specified in the Xuetang API specification
"""
return strftime("%Y-%m-%dT%H:%M:%S") + "-{0:04d}".format((time.timezone / 3600) * 100)
def prepare_create_or_update_video(edx_url, download_urls, md5sum):
"""
Returns a prepared HTTP request for initially seeding or updating an edX video
in the Xuetang CDN.
"""
api_target = "/edxvideo"
payload = {'edx_url':edx_url, 'download_url': download_urls, 'md5sum':md5sum}
return _prepare_api_request("POST", api_target, payload)
def prepare_delete_video(edx_url):
"""
Returns a prepared HTTP request for deleting an edX video in the Xuetang CDN.
"""
api_target = "/edxvideo"
payload = {'edx_url':edx_url}
return _prepare_api_request("DELETE", api_target, payload)
def prepare_check_task_status(edx_url):
"""
Returns a prepared HTTP request for checking the status of an edX video
in the Xuetang CDN.
"""
api_target = "/edxtask"
payload = {'edx_url':edx_url}
return _prepare_api_request("POST", api_target, payload)
def _prepare_api_request(http_verb, api_target, payload):
"""
General convenience function for creating prepared HTTP requests that conform the
Xuetang API specificiation.
"""
payload_json = json.dumps(payload)
payload_sha256_hexdigest = hex_digest(payload_json)
date = get_api_date()
message = bytes(build_message(http_verb, api_target, API_QUERY_STRING, date, payload_sha256_hexdigest))
secret = bytes(API_SHARED_SECRET)
signature = sign_message(message, secret)
headers = {"Authentication": "edx {0}".format(signature), "Content-Type": "application/json", "Date": date}
req = requests.Request(http_verb, API_ENDPOINT + api_target, headers=headers, data=payload_json)
return req.prepare()
def _submit_prepared_request(prepared):
"""
General function for submitting prepared HTTP requests.
"""
# Suppress InsecurePlatform warning
requests.packages.urllib3.disable_warnings()
s = requests.Session()
# TODO: enable certificate verification after working through
# certificate issues with Xuetang
return s.send(prepared, timeout=20, verify=False)
if __name__ == '__main__':
# Download URL from the LMS
edx_url = "xxx"
# edx_url = "http://s3.amazonaws.com/edx-course-videos/ut-takemeds/UTXUT401T313-V000300_DTH.mp4"
# A list containing the same URL
download_urls = ["xxx"]
# The md5sum of the video from the s3 ETAG value
md5sum = "xxx"
#
# The code below is a simple test harness for the Xuetang API that
#
# - pushes a new video to the CDN
# - checks the status of the video in a loop until it is available
# - issues a delete request to remove the video from the CDN
#
# upload or update
# prepared = prepare_create_or_update_video(edx_url, download_urls, md5sum)
# _pretty_print_request(prepared)
# print os.linesep
# res = _submit_prepared_request(prepared)
# print res.text
# print os.linesep
# # check status
# while True:
# prepared = prepare_check_task_status(edx_url)
# _pretty_print_request(prepared)
# res = _submit_prepared_request(prepared)
# print res.text
# if res.json()['status'] == 'available':
# break
# time.sleep(5)
# delete file
prepared = prepare_delete_video(edx_url)
_pretty_print_request(prepared)
res = _submit_prepared_request(prepared)
print res.text
# check status
prepared = prepare_check_task_status(edx_url)
_pretty_print_request(prepared)
res = _submit_prepared_request(prepared)
print res.text
import os
import os.path
import sys
import time
import pysftp
"""
Youtube Dynamic Upload
Note: This represents early VEDA work, but is functional
"""
from control_env import *
def printTotals(transferred, toBeTransferred):
"""
try:
sys.stdout.write('\r')
sys.stdout.write("Transferred: {0}\tOut of: {1}\r".format(transferred, toBeTransferred))
sys.stdout.flush()
except:
print 'Callback Failing'
"""
return None
class DeliverYoutube():
def __init__(self, veda_id, encode_profile):
self.veda_id = veda_id
self.encode_profile = encode_profile
self.video = None
self.course = None
self.file = None
self.youtubekey = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'youtubekey'
)
def upload(self):
self.video = Video.objects.filter(
edx_id=self.veda_id
).latest()
if self.encode_profile == 'review':
self.course = Course.objects.get(
institution='EDX',
edx_classid='RVW01'
)
self.file = self.veda_id + '_RVW.mp4'
else:
self.course = self.video.inst_class
self.file = self.veda_id + '_100.mp4'
self.csv_metadata()
self.batch_uploader()
def csv_metadata(self):
"""
Generate Youtube CMS CSV metadata sidecar file
Info: https://support.google.com/youtube/answer/6066171?hl=en (As of 05.2017)
Supported in favor of deprecated YT-XML
Fields in CSV are:
filename,
channel,
custom_id,
add_asset_labels,
title,
description,
keywords,
spoken_language,
caption_file,
caption_language,
category,
privacy,
notify_subscribers,
start_time,end_time,
custom_thumbnail,
ownership,
block_outside_ownership,
usage_policy,
enable_content_id,
reference_exclusions,
match_policy,ad_types,
ad_break_times,
playlist_id,
require_paid_subscription
"""
YOUTUBE_DEFAULT_CSV_COLUMNNAMES = [
'filename',
'channel',
'custom_id',
'add_asset_labels',
'title',
'description',
'keywords',
'spoken_language',
'caption_file',
'caption_language',
'category',
'privacy',
'notify_subscribers',
'start_time,end_time',
'custom_thumbnail',
'ownership',
'block_outside_ownership',
'usage_policy',
'enable_content_id',
'reference_exclusions',
'match_policy,ad_types',
'ad_break_times',
'playlist_id',
'require_paid_subscription'
]
print "%s : %s" % ("Generate CSV", str(self.video.edx_id))
# TODO: Refactor this into centrally located util for escaping bad chars
if self.video.client_title is not None:
try:
self.video.client_title.decode('ascii')
client_title = self.video.client_title
except:
client_title = ''
while len(self.video.client_title) > len(client_title):
try:
char = self.video.client_title[s1].decode('ascii')
except:
char = '-'
client_title += char
else:
client_title = self.file
"""
This is where we can add or subtract file attributes as needed
"""
print self.file
metadata_dict = {
'filename': self.file,
'channel': self.course.yt_channel,
'custom_id': self.video.edx_id,
'title': client_title.replace(',', ''),
'privacy': 'unlisted',
}
# Header Row
output = ','.join(([c for c in YOUTUBE_DEFAULT_CSV_COLUMNNAMES])) + '\n'
# Data Row
output += ','.join(([metadata_dict.get(c, '') for c in YOUTUBE_DEFAULT_CSV_COLUMNNAMES])) # + '\n' <--NO
with open(os.path.join(WORK_DIRECTORY, self.video.edx_id + '_100.csv'), 'w') as c1:
c1.write('%s %s' % (output, '\n'))
def batch_uploader(self):
"""
To successfully upload files to the CMS,
upload file, upload sidecar metadata (xml)
THEN upload empty 'delivery.complete' file
NOTE / TODO:
this doesn't feel like the right solution,
-BUT-
We'll generate a unix timestamp directory,
then use the timestamp to find it later for the
youtube ID / status xml
"""
remote_directory = str(time.time()).split('.')[0]
if not os.path.exists(os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
)):
with open(os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
), 'w') as d1:
d1.write('')
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
with pysftp.Connection(
'partnerupload.google.com',
username=self.course.yt_logon,
private_key=self.youtubekey,
port=19321,
cnopts=cnopts
) as s1:
print "Go for YT : " + str(self.video.edx_id)
s1.mkdir(remote_directory, mode=660)
s1.cwd(remote_directory)
s1.put(
os.path.join(WORK_DIRECTORY, self.file),
callback=printTotals
)
print
s1.put(
os.path.join(WORK_DIRECTORY, self.video.edx_id + '_100.csv'),
callback=printTotals
)
print
s1.put(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
),
callback=printTotals,
confirm=False,
preserve_mtime=False
)
print
os.remove(os.path.join(
WORK_DIRECTORY,
self.video.edx_id + '_100.csv'
))
def main():
pass
if __name__ == "__main__":
sys.exit(main())
import os
import sys
import django
import yaml
import uuid
import newrelic.agent
"""
Get a list of needed encodes from VEDA
* Protected against extant URLs *
"""
from control_env import *
from dependencies.shotgun_api3 import Shotgun
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
class VedaEncode():
"""
This should create a scrubbed list of encode profiles for processing
--NOTE: post-review processing should take place on the mediateam node
"""
def __init__(self, course_object, **kwargs):
self.course_object = course_object
self.encode_list = set()
self.overencode = kwargs.get('overencode', False)
self.veda_id = kwargs.get('veda_id', None)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.encode_dict = self._READ_AUTH()['encode_dict']
self.sg_server_path = self._READ_AUTH()['sg_server_path']
self.sg_script_name = self._READ_AUTH()['sg_script_name']
self.sg_script_key = self._READ_AUTH()['sg_script_key']
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def determine_encodes(self):
self.match_profiles()
for e in self.encode_list.copy():
enc_query = Encode.objects.filter(
product_spec=e
)
if len(enc_query) == 0:
self.encode_list.remove(e)
else:
if enc_query[0].profile_active is False:
self.encode_list.remove(e)
self.query_urls()
if len(self.encode_list) == 0:
return None
send_list = []
for s in self.encode_list.copy():
send_list.append(s)
return send_list
def match_profiles(self):
if self.course_object.review_proc is True and self.veda_id is not None:
"""
Here's where we check if this is review approved
"""
if self.check_review_approved() is False:
for e in self.encode_dict['review_proc']:
self.encode_list.add(e)
return None
if self.course_object.mobile_override is True:
for e in self.encode_dict['mobile_override']:
self.encode_list.add(e)
return None
for key, entry in self.encode_dict.iteritems():
if getattr(self.course_object, key) is True:
if key != 'review_proc':
for e in entry:
self.encode_list.add(e)
def query_urls(self):
"""
To allow the healing process & legacy imports
protection against double encoding -- will take a kwarg to
check against this 'overencode' / in case of a total redo
"""
if self.overencode is True:
return None
if self.veda_id is None:
return None
for l in self.encode_list.copy():
url_query = URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
encode_profile=Encode.objects.get(product_spec=l.strip())
)
if len(url_query) > 0:
self.encode_list.remove(l)
def check_review_approved(self):
"""
** Mediateam only **
Check in with SG to see if this video
is authorized to go to final publishing
"""
video_object = Video.objects.filter(
edx_id=self.veda_id
).latest()
if video_object.inst_class.sg_projID is None:
return False
sg = Shotgun(
self.sg_server_path,
self.sg_script_name,
self.sg_script_key
)
fields = ['project', 'entity', 'sg_status_list']
filters = [
['step', 'is', {'type': 'Step', 'id': 7}],
['project', 'is', {
"type": "Project",
"id": video_object.inst_class.sg_projID
}],
]
tasks = sg.find("Task", filters, fields)
for t in tasks:
if t['entity']['name'] == self.veda_id.split('-')[-1]:
if t['sg_status_list'] != 'wtg':
return True
return False
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os.path
import boto
import yaml
from boto.s3.connection import S3Connection
import newrelic.agent
try:
boto.config.add_section('Boto')
except:
pass
boto.config.set('Boto', 'http_socket_timeout', '100')
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
multi-point videofile discovery
Currently:
FTP
Amazon S3 (studio-ingest as well as about/marketing
video ingest
)
Local (watchfolder w/o edit priv.)
"""
from control_env import *
from veda_utils import ErrorObject
from veda_file_ingest import VideoProto, VedaIngest
from veda_val import VALAPICall
class FileDiscovery():
def __init__(self, **kwargs):
self.video_info = {}
self.auth_dict = {}
self.auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
)
with open(self.auth_yaml, 'r') as stream:
try:
self.auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
pass
self.bucket = None
"""
FTP Server Vars
"""
self.ftp_key = None
self.ftp_follow_delay = str(5000)
self.ftp_log = "/Users/Shared/edX1/LG/Transfers.log"
self.wfm_log = "/Users/Shared/edX1/LG/WFM.log"
self.ftp_faillog = "/Users/Shared/edX1/LG/FailedTransfers.log"
self.node_work_directory = kwargs.get('node_work_directory', WORK_DIRECTORY)
@newrelic.agent.background_task()
def about_video_ingest(self):
if self.node_work_directory is None:
ErrorObject().print_error(
message='No Workdir'
)
return None
"""
Crawl ingest bucket looking for files
"""
conn = S3Connection(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
"""
Occassional s3 Error
"""
try:
self.bucket = conn.get_bucket(self.auth_dict['veda_s3_upload_bucket'])
except:
return None
for key in self.bucket.list('upload/', '/'):
meta = self.bucket.get_key(key.name)
if meta.name != 'upload/':
self.about_video_validate(
meta=meta,
key=key
)
def about_video_validate(self, meta, key):
abvid_serial = meta.name.split('/')[1]
upload_query = VedaUpload.objects.filter(
video_serial=meta.name.split('/')[1]
)
if len(upload_query) == 0:
'''
Non serialized upload - reject
'''
return None
if upload_query[0].upload_filename is not None:
file_extension = upload_query[0].upload_filename.split('.')[-1]
else:
upload_query[0].upload_filename = 'null_file_name.mp4'
file_extension = 'mp4'
if len(file_extension) > 4:
file_extension = ''
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
upload_query[0].upload_filename
)
)
course_query = Course.objects.get(institution='EDX', edx_classid='ABVID')
"""
Trigger Ingest Process
"""
V = VideoProto(
abvid_serial=abvid_serial,
client_title=upload_query[0].upload_filename.replace('.' + file_extension, ''),
file_extension=file_extension,
)
I = VedaIngest(
course_object=course_query,
video_proto=V,
node_work_directory=self.node_work_directory
)
I.insert()
"""
Move Key out of 'upload' folder
"""
new_key = '/'.join(('process', meta.name.split('/')[1]))
key.copy(self.bucket, new_key)
key.delete()
reset_queries()
@newrelic.agent.background_task()
def studio_s3_ingest(self):
if self.node_work_directory is None:
ErrorObject().print_error(
message='No Workdir'
)
return None
"""
Ingest files from studio upload endpoint
"""
conn = S3Connection(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
"""Occassional s3 Error"""
try:
self.bucket = conn.get_bucket(self.auth_dict['edx_s3_ingest_bucket'])
except:
print 'S3: Ingest Conn Failure'
return None
for key in self.bucket.list('prod-edx/unprocessed/', '/'):
meta = self.bucket.get_key(key.name)
self.studio_s3_validate(
meta=meta,
key=key
)
def studio_s3_validate(self, meta, key):
if meta.get_metadata('course_video_upload_token') is None:
return None
client_title = meta.get_metadata('client_video_id')
course_hex = meta.get_metadata('course_video_upload_token')
course_url = meta.get_metadata('course_key')
edx_filename = key.name[::-1].split('/')[0][::-1]
if len(course_hex) == 0:
return None
course_query = Course.objects.filter(studio_hex=course_hex)
if len(course_query) == 0:
V = VideoProto(
s3_filename=edx_filename,
client_title=client_title,
file_extension='',
platform_course_url=course_url
)
"""
Call VAL Api
"""
val_status = 'invalid_token'
VAC = VALAPICall(
video_proto=V,
val_status=val_status
)
VAC.call()
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1]
key.copy(self.bucket, new_key)
key.delete()
return None
file_extension = client_title[::-1].split('.')[0][::-1]
"""
download file
"""
if len(file_extension) == 3:
try:
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
edx_filename + '.' + file_extension
)
)
file_ingested = True
except:
print 'File Copy Fail: Studio S3 Ingest'
file_ingested = False
else:
try:
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
edx_filename
)
)
file_ingested = True
except:
print 'File Copy Fail: Studio S3 Ingest'
file_ingested = False
file_extension = ''
if file_ingested is not True:
# 's3 Bucket ingest Fail'
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1]
try:
key.copy(self.bucket, new_key)
except:
key.copy(self.bucket, new_key)
key.delete()
return None
"""
Trigger Ingest Process
"""
V = VideoProto(
s3_filename=edx_filename,
client_title=client_title,
file_extension=file_extension,
platform_course_url=course_url
)
I = VedaIngest(
course_object=course_query[0],
video_proto=V,
node_work_directory=self.node_work_directory
)
I.insert()
if I.complete is False:
return None
"""
Delete Original After Copy
"""
new_key = 'prod-edx/processed/' + key.name[::-1].split('/')[0][::-1]
try:
key.copy(self.bucket, new_key)
except:
key.copy(self.bucket, new_key)
# key.copy(self.bucket, new_key)
key.delete()
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import datetime
from datetime import timedelta
import yaml
import uuid
import newrelic.agent
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
Heal Process
Roll through videos, check for completion
- reencode endpoints
- fix data (if wrong), including on VAL
- reschedule self
# Heuristic
# Encode
# Activation
# Logistics
"""
from control_env import *
from veda_encode import VedaEncode
from veda_val import VALAPICall
import celeryapp
time_safetygap = datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(days=1)
class VedaHeal():
"""
"""
def __init__(self, **kwargs):
self.current_time = datetime.datetime.utcnow().replace(tzinfo=utc)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.auth_dict = self._READ_AUTH()
# for individuals
self.video_query = kwargs.get('video_query', None)
self.freezing_bug = kwargs.get('freezing_bug', True)
self.val_status = None
self.retry_barrier_hours = 48
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def discovery(self):
self.video_query = Video.objects.filter(
video_trans_start__lt=self.current_time - timedelta(
hours=self.auth_dict['heal_start']
),
video_trans_start__gt=self.current_time - timedelta(
hours=self.auth_dict['heal_end']
)
)
self.send_encodes()
def send_encodes(self):
for v in self.video_query:
encode_list = self.determine_fault(video_object=v)
"""
Using the 'Video Proto' Model
"""
if self.val_status is not None:
VAC = VALAPICall(
video_proto=None,
video_object=v,
val_status=self.val_status,
)
VAC.call()
self.val_status = None
if len(encode_list) > 0:
"""
send job to queue
"""
if v.video_orig_filesize > self.auth_dict['largefile_queue_barrier']:
cel_queue = self.auth_dict['largefile_celery_queue']
else:
cel_queue = self.auth_dict['main_celery_queue']
for e in encode_list:
veda_id = v.edx_id
encode_profile = e
jobid = uuid.uuid1().hex[0:10]
celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, jobid),
queue=cel_queue
)
@newrelic.agent.background_task()
def determine_fault(self, video_object):
"""
Is there anything to do with this?
"""
if self.freezing_bug is True:
if video_object.video_trans_status == 'Corrupt File':
return []
if video_object.video_trans_status == 'Review Reject':
return []
if video_object.video_trans_status == 'Review Hold':
return []
if video_object.video_active is False:
return []
"""
Finally, determine encodes
"""
E = VedaEncode(
course_object=video_object.inst_class,
veda_id=video_object.edx_id
)
encode_list = E.determine_encodes()
try:
encode_list.remove('review')
except:
pass
"""
Status Cleaning
"""
check_list = []
if encode_list is not None:
for e in encode_list:
if e != 'mobile_high' and e != 'audio_mp3' and e != 'review':
check_list.append(e)
if check_list is None or len(check_list) == 0:
self.val_status = 'file_complete'
"""
File is complete!
Check for data parity, and call done
"""
if video_object.video_trans_status != 'Complete':
Video.objects.filter(
edx_id=video_object.edx_id
).update(
video_trans_status='Complete',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
)
if encode_list is None or len(encode_list) == 0:
return []
"""
get baseline // if there are == encodes and baseline,
mark file corrupt -- just run the query again with
no veda_id
"""
"""
This overrides
"""
if self.freezing_bug is False:
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return encode_list
E2 = VedaEncode(
course_object=video_object.inst_class,
)
E2.determine_encodes()
if len(E2.encode_list) == len(encode_list) and len(encode_list) > 1:
"""
Mark File Corrupt, accounting for migrated URLs
"""
url_test = URL.objects.filter(
videoID=Video.objects.filter(
edx_id=video_object.edx_id
).latest()
)
if video_object.video_trans_start < datetime.datetime.utcnow().replace(tzinfo=utc) - \
timedelta(hours=self.retry_barrier_hours):
if len(url_test) == 0:
Video.objects.filter(
edx_id=video_object.edx_id
).update(
video_trans_status='Corrupt File',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
)
self.val_status = 'file_corrupt'
return []
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return encode_list
@newrelic.agent.background_task()
def purge(self):
"""
Purge Work Directory
"""
for file in os.listdir(WORK_DIRECTORY):
full_filepath = os.path.join(WORK_DIRECTORY, file)
filetime = datetime.datetime.utcfromtimestamp(
os.path.getmtime(
full_filepath
)
).replace(tzinfo=utc)
if filetime < time_safetygap:
print file + " : WORK PURGE"
os.remove(full_filepath)
def main():
VH = VedaHeal()
VH.discovery()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import boto
import boto.s3
from boto.s3.key import Key
import yaml
import shutil
from os.path import expanduser
import newrelic.agent
try:
boto.config.add_section('Boto')
except:
pass
boto.config.set('Boto', 'http_socket_timeout', '100')
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
Upload file to hotstore
"""
from veda_utils import ErrorObject, Output
homedir = expanduser("~")
class Hotstore():
def __init__(self, video_proto, upload_filepath, **kwargs):
self.video_proto = video_proto
self.upload_filepath = upload_filepath
# is this a final/encode file?
self.endpoint = kwargs.get('endpoint', False)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.auth_dict = self._READ_AUTH()
self.endpoint_url = None
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def upload(self):
if self.auth_dict is None:
return False
if not os.path.exists(self.upload_filepath):
ErrorObject().print_error(
message='Hotstore: File Not Found'
)
return False
self.upload_filesize = os.stat(self.upload_filepath).st_size
if self.upload_filesize < self.auth_dict['multi_upload_barrier']:
return self._BOTO_SINGLEPART()
else:
return self._BOTO_MULTIPART()
def _BOTO_SINGLEPART(self):
"""
Upload single part (under threshold in instance_auth)
self.auth_dict['multi_upload_barrier']
"""
if self.endpoint is False:
try:
conn = boto.connect_s3(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
delv_bucket = conn.get_bucket(
self.auth_dict['veda_s3_hotstore_bucket']
)
except:
ErrorObject().print_error(
message='Hotstore: Bucket Connectivity'
)
return False
else:
try:
conn = boto.connect_s3(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
delv_bucket = conn.get_bucket(
self.auth_dict['edx_s3_endpoint_bucket']
)
except:
ErrorObject().print_error(
message='Endpoint: Bucket Connectivity'
)
return False
upload_key = Key(delv_bucket)
upload_key.key = '.'.join((
self.video_proto.veda_id,
self.upload_filepath.split('.')[-1]
))
try:
upload_key.set_contents_from_filename(self.upload_filepath)
return True
except:
upload_key.set_contents_from_filename(self.upload_filepath)
return True
def _BOTO_MULTIPART(self):
"""
Split file into chunks, upload chunks
NOTE: this should never happen, as your files should be much
smaller than this, but one never knows
"""
path_to_multipart = os.path.dirname(self.upload_filepath)
filename = os.path.basename(self.upload_filepath)
"""
This is modular
"""
if not os.path.exists(os.path.join(path_to_multipart, filename.split('.')[0])):
os.mkdir(os.path.join(path_to_multipart, filename.split('.')[0]))
os.chdir(os.path.join(path_to_multipart, filename.split('.')[0]))
"""
Split File into chunks
"""
split_command = 'split -b10m -a5' # 5 part names of 5mb
sys.stdout.write('%s : %s\n' % (filename, 'Generating Multipart'))
os.system(' '.join((split_command, self.upload_filepath)))
sys.stdout.flush()
"""
Connect to s3
"""
if self.endpoint is False:
try:
c = boto.connect_s3(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
b = c.lookup(self.auth_dict['veda_s3_hotstore_bucket'])
except:
ErrorObject().print_error(
message='Hotstore: Bucket Connectivity'
)
return False
else:
try:
c = boto.connect_s3(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
b = c.lookup(self.auth_dict['edx_s3_endpoint_bucket'])
except:
ErrorObject().print_error(
message='Endpoint: Bucket Connectivity'
)
return False
if b is None:
ErrorObject().print_error(
message='Deliverable Fail: s3 Bucket Error'
)
return False
"""
Upload and stitch parts // with a decent display
"""
mp = b.initiate_multipart_upload(
'.'.join((
self.video_proto.veda_id,
filename.split('.')[-1]
))
)
x = 1
for file in sorted(os.listdir(
os.path.join(
path_to_multipart,
filename.split('.')[0]
)
)):
sys.stdout.write('%s : %s\r' % (file, 'uploading part'))
fp = open(file, 'rb')
mp.upload_part_from_file(fp, x)
fp.close()
sys.stdout.flush()
x += 1
sys.stdout.write('\n')
mp.complete_upload()
"""
Clean up multipart
"""
os.chdir(homedir)
shutil.rmtree(os.path.join(path_to_multipart, filename.split('.')[0]))
return True
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import subprocess
import fnmatch
import django
import newrelic.agent
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
VEDA Intake/Product Final Testing Suite
Should Test for:
0 size
Corrupt Files
image files (which read as 0:00 duration or N/A)
Mismatched Durations (within 5 sec)
"""
from control_env import *
class Validation():
"""
Expects a full filepath
"""
def __init__(self, videofile, **kwargs):
self.videofile = videofile
self.mezzanine = kwargs.get('mezzanine', True)
self.veda_id = kwargs.get('veda_id', False)
def seconds_conversion(self, duration):
hours = float(duration.split(':')[0])
minutes = float(duration.split(':')[1])
seconds = float(duration.split(':')[2])
seconds_duration = (((hours * 60) + minutes) * 60) + seconds
return seconds_duration
@newrelic.agent.background_task()
def validate(self):
"""
Test #1 - assumes file is in 'work' directory of node
"""
ff_command = ' '.join((
FFPROBE,
"\"" + self.videofile + "\""
))
"""
Test if size is zero
"""
if int(os.path.getsize(self.videofile)) == 0:
print 'Corrupt: Invalid'
return False
p = subprocess.Popen(ff_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
for line in iter(p.stdout.readline, b''):
if "Invalid data found when processing input" in line:
print 'Corrupt: Invalid'
return False
if "multiple edit list entries, a/v desync might occur, patch welcome" in line:
return False
if "Duration: " in line:
if "Duration: 00:00:00.0" in line:
return False
elif "Duration: N/A, " in line:
return False
video_duration = line.split(',')[0][::-1].split(' ')[0][::-1]
try:
str(video_duration)
except:
return False
p.kill()
"""
Compare Product to DB averages - pass within 5 sec
"""
if self.mezzanine is True:
return True
if self.veda_id is None:
print 'Error: Validation, encoded file no comparison ID'
return False
try:
video_query = Video.objects.filter(edx_id=self.veda_id).latest()
except:
return False
product_duration = float(
self.seconds_conversion(
duration=video_duration
)
)
data_duration = float(
self.seconds_conversion(
duration=video_query.video_orig_duration
)
)
"""
Final Test
"""
if (data_duration - 5) <= product_duration <= (data_duration + 5):
return True
else:
return False
def main():
pass
# V = Validation(videofile='/Users/ernst/VEDA_WORKING/fecf210f-0e94-4627-8ac3-46c2338e5897.mp4')
# print V.validate()
# # def __init__(self, videofile, **kwargs):
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/python
"""
Simple Watchdog Timer
"""
'''
Stolen From:
--------------------------------------------------------------------------------
Module Name: watchdog.py
Author: Jon Peterson (PIJ)
Description: This module implements a simple watchdog timer for Python.
--------------------------------------------------------------------------------
Copyright (c) 2012, Jon Peterson
--------------------------------------------------------------------------------
'''
# Imports
from time import sleep
from threading import Timer
import thread
# Watchdog Class
class Watchdog(object):
def __init__(self, time=1.0):
''' Class constructor. The "time" argument has the units of seconds. '''
self._time = time
return
def StartWatchdog(self):
''' Starts the watchdog timer. '''
self._timer = Timer(self._time, self._WatchdogEvent)
self._timer.daemon = True
self._timer.start()
return
def PetWatchdog(self):
''' Reset watchdog timer. '''
self.StopWatchdog()
self.StartWatchdog()
return
def _WatchdogEvent(self):
'''
This internal method gets called when the timer triggers. A keyboard
interrupt is generated on the main thread. The watchdog timer is stopped
when a previous event is tripped.
'''
print 'Watchdog event...'
self.StopWatchdog()
thread.interrupt_main()
# thread.interrupt_main()
# thread.interrupt_main()
return
def StopWatchdog(self):
''' Stops the watchdog timer. '''
self._timer.cancel()
def main():
''' This function is used to unit test the watchdog module. '''
w = Watchdog(1.0)
w.StartWatchdog()
for i in range(0, 11):
print 'Testing %d...' % i
try:
if (i % 3) == 0:
sleep(1.5)
else:
sleep(0.5)
except:
print 'MAIN THREAD KNOWS ABOUT WATCHDOG'
w.PetWatchdog()
w.StopWatchdog() # Not strictly necessary
return
if __name__ == '__main__':
main()
<?xml version="1.0"?>
<feed xmlns="http://www.youtube.com/schemas/cms/2.0" notification_email="greg@edx.org">
<asset type="web" tag="">
<title> </title>
<custom_id> </custom_id>
<!-- other asset details included in feed -->
</asset>
<file type="video" tag="">
<filename> </filename>
</file>
<ownership/>
<relationship>
<item path="/feed/asset[@tag='']"/>
<related_item path="/feed/ownership[1]"/>
</relationship>
<video tag="">
<title> </title>
<channel> </channel>
<description> </description>
<keyword>edX</keyword>
<genre>Education</genre>
<allow_embedding>True</allow_embedding>
<allow_comments>Approve</allow_comments>
<allow_ratings>True</allow_ratings>
<!-- <allow_responses>Approve</allow_responses> -->
<public>unlisted</public>
<!-- <start_time>1990-11-30T12:00:00Z</start_time> -->
</video>
<relationship>
<item path="/feed/file[@tag='']"/>
<related_item path="/feed/asset[@tag='']"/>
<related_item path="/feed/video[@tag='']"/>
</relationship>
<rights_admin type="usage" owner="True"/>
<rights_admin type="match" owner="True"/>
<!-- declare a one-off policy -->
<rights_policy>
<rule action="track"/>
</rights_policy>
<!-- Claim our uploaded video with the one-off policy -->
<claim type="audiovisual" video="/feed/video[@tag='']" asset="/feed/asset[@tag='']" rights_admin="/feed/rights_admin[@type='usage']" rights_policy="/feed/rights_policy[1]"/>
<relationship>
<item path="/feed/rights_admin[@type='match']"/>
<item path="/feed/rights_policy[1]"/>
<related_item path="/feed/asset[@tag='']"/>
</relationship>
</feed>
\ No newline at end of file
from shotgun import (Shotgun, ShotgunError, ShotgunFileDownloadError, Fault,
ProtocolError, ResponseError, Error, __version__)
from shotgun import SG_TIMEZONE as sg_timezone
"""
iri2uri
Converts an IRI to a URI.
"""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
__contributors__ = []
__version__ = "1.0.0"
__license__ = "MIT"
__history__ = """
"""
import urlparse
# Convert an IRI to a URI following the rules in RFC 3987
#
# The characters we need to enocde and escape are defined in the spec:
#
# iprivate = %xE000-F8FF / %xF0000-FFFFD / %x100000-10FFFD
# ucschar = %xA0-D7FF / %xF900-FDCF / %xFDF0-FFEF
# / %x10000-1FFFD / %x20000-2FFFD / %x30000-3FFFD
# / %x40000-4FFFD / %x50000-5FFFD / %x60000-6FFFD
# / %x70000-7FFFD / %x80000-8FFFD / %x90000-9FFFD
# / %xA0000-AFFFD / %xB0000-BFFFD / %xC0000-CFFFD
# / %xD0000-DFFFD / %xE1000-EFFFD
escape_range = [
(0xA0, 0xD7FF ),
(0xE000, 0xF8FF ),
(0xF900, 0xFDCF ),
(0xFDF0, 0xFFEF),
(0x10000, 0x1FFFD ),
(0x20000, 0x2FFFD ),
(0x30000, 0x3FFFD),
(0x40000, 0x4FFFD ),
(0x50000, 0x5FFFD ),
(0x60000, 0x6FFFD),
(0x70000, 0x7FFFD ),
(0x80000, 0x8FFFD ),
(0x90000, 0x9FFFD),
(0xA0000, 0xAFFFD ),
(0xB0000, 0xBFFFD ),
(0xC0000, 0xCFFFD),
(0xD0000, 0xDFFFD ),
(0xE1000, 0xEFFFD),
(0xF0000, 0xFFFFD ),
(0x100000, 0x10FFFD)
]
def encode(c):
retval = c
i = ord(c)
for low, high in escape_range:
if i < low:
break
if i >= low and i <= high:
retval = "".join(["%%%2X" % ord(o) for o in c.encode('utf-8')])
break
return retval
def iri2uri(uri):
"""Convert an IRI to a URI. Note that IRIs must be
passed in a unicode strings. That is, do not utf-8 encode
the IRI before passing it into the function."""
if isinstance(uri ,unicode):
(scheme, authority, path, query, fragment) = urlparse.urlsplit(uri)
authority = authority.encode('idna')
# For each character in 'ucschar' or 'iprivate'
# 1. encode as utf-8
# 2. then %-encode each octet of that utf-8
uri = urlparse.urlunsplit((scheme, authority, path, query, fragment))
uri = "".join([encode(c) for c in uri])
return uri
if __name__ == "__main__":
import unittest
class Test(unittest.TestCase):
def test_uris(self):
"""Test that URIs are invariant under the transformation."""
invariant = [
u"ftp://ftp.is.co.za/rfc/rfc1808.txt",
u"http://www.ietf.org/rfc/rfc2396.txt",
u"ldap://[2001:db8::7]/c=GB?objectClass?one",
u"mailto:John.Doe@example.com",
u"news:comp.infosystems.www.servers.unix",
u"tel:+1-816-555-1212",
u"telnet://192.0.2.16:80/",
u"urn:oasis:names:specification:docbook:dtd:xml:4.1.2" ]
for uri in invariant:
self.assertEqual(uri, iri2uri(uri))
def test_iri(self):
""" Test that the right type of escaping is done for each part of the URI."""
self.assertEqual("http://xn--o3h.com/%E2%98%84", iri2uri(u"http://\N{COMET}.com/\N{COMET}"))
self.assertEqual("http://bitworking.org/?fred=%E2%98%84", iri2uri(u"http://bitworking.org/?fred=\N{COMET}"))
self.assertEqual("http://bitworking.org/#%E2%98%84", iri2uri(u"http://bitworking.org/#\N{COMET}"))
self.assertEqual("#%E2%98%84", iri2uri(u"#\N{COMET}"))
self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"))
self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")))
self.assertNotEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode('utf-8')))
unittest.main()
#! /opt/local/bin/python
# ----------------------------------------------------------------------------
# SG_TIMEZONE module
# this is rolled into the this shotgun api file to avoid having to require
# current users of api2 to install new modules and modify PYTHONPATH info.
# ----------------------------------------------------------------------------
class SgTimezone(object):
from datetime import tzinfo, timedelta, datetime
import time as _time
ZERO = timedelta(0)
STDOFFSET = timedelta(seconds = -_time.timezone)
if _time.daylight:
DSTOFFSET = timedelta(seconds = -_time.altzone)
else:
DSTOFFSET = STDOFFSET
DSTDIFF = DSTOFFSET - STDOFFSET
def __init__(self):
self.utc = self.UTC()
self.local = self.LocalTimezone()
class UTC(tzinfo):
def utcoffset(self, dt):
return SgTimezone.ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return SgTimezone.ZERO
class LocalTimezone(tzinfo):
def utcoffset(self, dt):
if self._isdst(dt):
return SgTimezone.DSTOFFSET
else:
return SgTimezone.STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return SgTimezone.DSTDIFF
else:
return SgTimezone.ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.weekday(), 0, -1)
import time as _time
stamp = _time.mktime(tt)
tt = _time.localtime(stamp)
return tt.tm_isdst > 0
"""Drop-in replacement for collections.OrderedDict by Raymond Hettinger
http://code.activestate.com/recipes/576693/
"""
from UserDict import DictMixin
# Modified from original to support Python 2.4, see
# http://code.google.com/p/simplejson/issues/detail?id=53
try:
all
except NameError:
def all(seq):
for elem in seq:
if not elem:
return False
return True
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next = self.__map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
# Modified from original to support Python 2.4, see
# http://code.google.com/p/simplejson/issues/detail?id=53
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
return len(self)==len(other) and \
all(p==q for p, q in zip(self.items(), other.items()))
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
"""JSON token scanner
"""
import re
def _import_c_make_scanner():
try:
from simplejson._speedups import make_scanner
return make_scanner
except ImportError:
return None
c_make_scanner = _import_c_make_scanner()
__all__ = ['make_scanner']
NUMBER_RE = re.compile(
r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?',
(re.VERBOSE | re.MULTILINE | re.DOTALL))
def py_make_scanner(context):
parse_object = context.parse_object
parse_array = context.parse_array
parse_string = context.parse_string
match_number = NUMBER_RE.match
encoding = context.encoding
strict = context.strict
parse_float = context.parse_float
parse_int = context.parse_int
parse_constant = context.parse_constant
object_hook = context.object_hook
object_pairs_hook = context.object_pairs_hook
memo = context.memo
def _scan_once(string, idx):
try:
nextchar = string[idx]
except IndexError:
raise StopIteration
if nextchar == '"':
return parse_string(string, idx + 1, encoding, strict)
elif nextchar == '{':
return parse_object((string, idx + 1), encoding, strict,
_scan_once, object_hook, object_pairs_hook, memo)
elif nextchar == '[':
return parse_array((string, idx + 1), _scan_once)
elif nextchar == 'n' and string[idx:idx + 4] == 'null':
return None, idx + 4
elif nextchar == 't' and string[idx:idx + 4] == 'true':
return True, idx + 4
elif nextchar == 'f' and string[idx:idx + 5] == 'false':
return False, idx + 5
m = match_number(string, idx)
if m is not None:
integer, frac, exp = m.groups()
if frac or exp:
res = parse_float(integer + (frac or '') + (exp or ''))
else:
res = parse_int(integer)
return res, m.end()
elif nextchar == 'N' and string[idx:idx + 3] == 'NaN':
return parse_constant('NaN'), idx + 3
elif nextchar == 'I' and string[idx:idx + 8] == 'Infinity':
return parse_constant('Infinity'), idx + 8
elif nextchar == '-' and string[idx:idx + 9] == '-Infinity':
return parse_constant('-Infinity'), idx + 9
else:
raise StopIteration
def scan_once(string, idx):
try:
return _scan_once(string, idx)
finally:
memo.clear()
return scan_once
make_scanner = c_make_scanner or py_make_scanner
r"""Command-line tool to validate and pretty-print JSON
Usage::
$ echo '{"json":"obj"}' | python -m simplejson.tool
{
"json": "obj"
}
$ echo '{ 1.2:3.4}' | python -m simplejson.tool
Expecting property name: line 1 column 2 (char 2)
"""
import sys
import simplejson as json
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'rb')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'rb')
outfile = open(sys.argv[2], 'wb')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
try:
obj = json.load(infile,
object_pairs_hook=json.OrderedDict,
use_decimal=True)
except ValueError, e:
raise SystemExit(e)
json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True)
outfile.write('\n')
if __name__ == '__main__':
main()
import os
import sys
import logging
from shotgun_api3.lib.httplib2 import Http, ProxyInfo, socks
from shotgun_api3.lib.sgtimezone import SgTimezone
from shotgun_api3.lib.xmlrpclib import Error, ProtocolError, ResponseError
LOG = logging.getLogger("shotgun_api3")
LOG.setLevel(logging.WARN)
try:
import simplejson as json
except ImportError:
LOG.debug("simplejson not found, dropping back to json")
try:
import json as json
except ImportError:
LOG.debug("json not found, dropping back to embedded simplejson")
# We need to munge the path so that the absolute imports in simplejson will work.
dir_path = os.path.dirname(__file__)
lib_path = os.path.join(dir_path, 'lib')
sys.path.append(lib_path)
import shotgun_api3.lib.simplejson as json
sys.path.pop()
import sys
import os
import logging
from .lib.httplib2 import Http, ProxyInfo, socks
from .lib.sgtimezone import SgTimezone
from .lib.xmlrpclib import Error, ProtocolError, ResponseError
LOG = logging.getLogger("shotgun_api3")
LOG.setLevel(logging.WARN)
try:
import simplejson as json
except ImportError:
LOG.debug("simplejson not found, dropping back to json")
try:
import json as json
except ImportError:
LOG.debug("json not found, dropping back to embedded simplejson")
# We need to munge the path so that the absolute imports in simplejson will work.
dir_path = os.path.dirname(__file__)
lib_path = os.path.join(dir_path, 'lib')
sys.path.append(lib_path)
from .lib import simplejson as json
sys.path.pop()
import os
import sys
from email.mime.text import MIMEText
from datetime import date
import boto.ses
import yaml
'''
ABVID REPORTING - email / etc.
'''
from frontend_env import *
'''
v1 = Video.objects.filter(edx_id = upload_info['edx_id'])
if v1.abvid_serial != None:
sys.path.append(up_twodirectory + '/VEDA_OS/VEDA_FE/')
import abvid_reporting
abvid_reporting.report_status(
status="Youtube Duplicate",
abvid_serial = v1.abvid_serial,
youtube_id = ''
)
'''
"""
get auth keys from instance yaml
"""
auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'frontend_auth.yaml'
)
with open(auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
print 'AUTH ERROR'
def report_status(status, abvid_serial, youtube_id):
v1 = VedaUpload.objects.filter(video_serial=abvid_serial).latest()
if len(youtube_id) > 0:
excuse = ''
if 'Duplicate' in status or 'Corrupt' in status:
if v1.final_report is True:
send_email = False
# pass
else:
excuse = 'There has been a failure for the following reason : ' + status
final_success = 'FAILED'
youtube_id = ''
VedaUpload.objects.filter(
pk=v1.pk
).update(
file_complete=False,
final_report=True,
file_valid=False
)
send_email = True
elif 'Complete' in status:
excuse = 'This file is complete.'
final_success = 'SUCCESS'
VedaUpload.objects.filter(
pk=v1.pk
).update(
file_complete=True,
final_report=True,
file_valid=True,
youtube_id=youtube_id
)
send_email = True
if send_email is True:
email_subject = 'VEDA / edX About Video Status Update : ' + final_success
email_body = 'This is an auto generated message:\n\n'
email_body += 'An edX partner uploaded a new about video:\n\n'
email_body += 'STATUS : ' + excuse + '\n\n'
if len(youtube_id) > 0:
email_body += 'Youtube URL : https://www.youtube.com/watch?v=' + youtube_id + '\n\n'
if v1.upload_filename is not None:
email_body += 'Filename : ' + v1.upload_filename + '\n'
email_body += 'Upload Date : ' + str(v1.upload_date) + '(UTC)\n'
email_body += 'Course Title (optional) : ' + v1.client_information + '\n'
email_body += 'edX Studio Course URL : ' + v1.edx_studio_url + '\n\n'
email_body += 'Please do not reply to this email.\n\n <<EOM'
conn = boto.ses.connect_to_region('us-east-1')
conn.send_email(
auth_dict['veda_noreply_email'],
email_subject,
email_body,
[v1.status_email, auth_dict['admin_email']]
)
if __name__ == '__main__':
report_status(status='Complete', abvid_serial="5c34a85e5f", youtube_id='TEST')
'''
About Video Input and Validation
'''
import os
import sys
import datetime
from frontend_env import *
def create_record(upload_data):
"""
Create Record
"""
ul1 = VedaUpload(
video_serial=upload_data['abvid_serial'],
edx_studio_url=upload_data['studio_url'],
client_information=upload_data['course_name'],
status_email=upload_data['pm_email'],
file_valid=False,
file_complete=False,
)
try:
ul1.save()
return True
except:
return False
def validate_incoming(upload_data):
ul2 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
upload_date=datetime.datetime.utcnow().replace(tzinfo=utc),
upload_filename=upload_data['orig_filename'],
)
return True
def send_to_pipeline(upload_data):
ul3 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
file_valid=upload_data['success'],
)
if upload_data['success'] == 'true':
print 'Sending File to Pipeline'
else:
ul3 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
comment='Failed Upload',
)
return False
if __name__ == '__main__':
upload_data = {}
upload_data['abvid_serial'] = '19e1e1c78e'
upload_data['success'] = 'true'
send_to_pipeline(upload_data)
from django.contrib import admin
# Register your models here.
---
# Instance Authentication
## Email Vars
veda_noreply_email:
admin_email:
## VEDA AWS Account
veda_access_key_id:
veda_secret_access_key:
veda_upload_bucket:
...
#!/usr/bin/env python
"""
VEDA Environment variables
"""
import os
import sys
import django
from django.utils.timezone import utc
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'VEDA.settings'
django.setup()
from pipeline.models import Institution
from pipeline.models import Course
from pipeline.models import Video
from pipeline.models import URL
from pipeline.models import VedaUpload
"""
TERM COLORS
"""
NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m'
VEDA_UTCOFFSET = -4
from django.db import models
import os
import sys
import unittest
"""
A basic unittest for the "Course Addition Tool"
"""
sys.path.append(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
import abvid_reporting
class TestVariables(unittest.TestCase):
def setUp(self):
self.VCT = VEDACat()
def test_config(self):
self.assertTrue(len(self.VCT.veda_model) > 0)
def test_institution_valid(self):
self.VCT.inst_code = '111'
self.assertTrue(self.VCT.institution_name() == 'Error')
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
A basic unittest for the "Course Addition Tool"
"""
sys.path.append(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
from course_validate import VEDACat
class TestVariables(unittest.TestCase):
def setUp(self):
self.VCT = VEDACat()
def test_config(self):
self.assertTrue(len(self.VCT.veda_model) > 0)
def test_institution_valid(self):
self.VCT.inst_code = '111'
self.assertTrue(self.VCT.institution_name() == 'Error')
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
from django.conf.urls import *
from django.contrib import admin
admin.autodiscover()
from django.views.generic import TemplateView
import views
urlpatterns = patterns(
'',
(r'^$', views.index),
(r'^robots\.txt$', TemplateView.as_view(template_name='robots.txt', content_type='text/plain')),
(r'^admin/', include(admin.site.urls)),
# Input Form
(r'cat/', views.input_form),
# Data Validation
(r'institution_validator/', views.institution_name),
(r'inst_id_validate/', views.inst_id_validate),
(r'institution_data/', views.institution_data),
(r'new_institution/', views.new_institution),
(r'course_id_validate/', views.course_id_validate),
(r'course_add/', views.course_add),
# Uploads
(r'upload/', views.upload_alpha_1),
(r'upload_success/', views.upload_success),
(r'about_input/', views.about_input),
)
---
# Course Models YAML
models_nottoget:
- id
- video
- ingest
- course_hold
- semesterid
- last_vid_number
- previous_statechange
- yt_proc
- s3_dir
- studio_hex
- sg_projID
- c24_hours
#boolean (checkbox)
bools:
- course_hold
- proc_loc
- review_proc
- mobile_override
- yt_proc
- tp_proc
- c24_proc
- s3_proc
- xue
- xuetang_proc
# dropdown boxes
dropdowns:
- tp_speed
- c24_speed
- c24_fidelity
# mandatory fields
must_haves:
- local_storedir
- course_name
- yt_channel
# advanced settings/institutional fields
organizational:
local_storedir: main
course_name: main
yt_channel: encode
institution: main
institution_name: main
edx_classid: main
parent_ID: main
yt_proc: encode
s3_proc: encode
mobile_override: encode
review_proc: encode
proc_loc: encode
xue: encode
xuetang_proc: encode
yt_logon: encode
tp_proc: transcribe
tp_speed: transcribe
tp_username: transcribe
tp_pass: transcribe
tp_apikey: transcribe
tp_password: transcribe
...
# VEDA F/E Views
import json
import datetime
from datetime import timedelta
import yaml
import base64
import hmac
import hashlib
import uuid
from django.http import HttpResponse
from django.template import RequestContext, loader
from django.http import HttpResponseRedirect
from frontend_env import *
from course_validate import VEDACat
from abvid_validate import validate_incoming, create_record, send_to_pipeline
auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'frontend_auth.yaml'
)
"""
Here's the links for the main Page
:::
"""
links = {
'VEDA_CAT': '../../cat/',
'Upload Page': '../../upload/',
}
def index(request):
if not request.user.is_authenticated():
auth = 'NO'
linkers = ''
else:
auth = 'YES'
linkers = links
template = loader.get_template('index.html')
context = RequestContext(request, ({
'links': linkers,
'auth': auth
}))
return HttpResponse(template.render(context))
def input_form(request):
"""
Course Addition Tool Endpoints
"""
if not request.user.is_authenticated():
return HttpResponseRedirect('/admin/login/?next=%s' % request.path)
VC1 = VEDACat()
VC1.institution_list()
inst_list = json.dumps(VC1.inst_list)
template = loader.get_template('course_form.html')
context = RequestContext(request, ({
'institution_list': inst_list
}))
return HttpResponse(template.render(context))
def institution_name(request):
if request.method == 'POST' and request.POST['input_text'] != 'NEWINST':
inst_code = request.POST['input_text']
VC1 = VEDACat(inst_code=inst_code)
institution_name = VC1.institution_name()
else:
institution_name = ''
return HttpResponse(
json.dumps(institution_name),
content_type="application/json"
)
def institution_data(request):
inst_code = request.POST['inst_code']
if inst_code != 'NEWINST':
VC1 = VEDACat(inst_code=inst_code[0:3])
else:
VC1 = VEDACat(inst_code=inst_code)
VC1.institution_data()
data = VC1.return_fields
return HttpResponse(
json.dumps(data),
content_type="application/json"
)
def inst_id_validate(request):
if request.method == 'POST':
try:
VC1 = VEDACat(inst_code=request.POST['input_text'])
data = VC1.validate_inst()
except:
data = ''
else:
data = ''
return HttpResponse(
json.dumps(data),
content_type="application/json"
)
def new_institution(request):
data = ''
return HttpResponse(
json.dumps(data),
content_type="application/json"
)
def course_id_validate(request):
if request.method == 'POST' and 'edx_classid' in request.POST:
inst_code = request.POST['institution']
course_code = request.POST['edx_classid']
VC1 = VEDACat(inst_code=inst_code[0:3])
data = VC1.validate_code(course_code=course_code[0:5])
else:
data = False
return HttpResponse(
json.dumps(data),
content_type="application/json"
)
def course_add(request):
if request.method == 'POST':
return_data = request.POST['return_data']
VC1 = VEDACat()
course_data = VC1.course_add(
return_data=return_data
)
else:
course_data = ''
return HttpResponse(
json.dumps(course_data),
content_type="application/json"
)
###############
# UPLOAD PAGE #
###############
def upload_alpha_1(request):
"""
TODO:
Get This to expire in 24h / 1 Time URL
Generate metadata From Fields
Auth?
"""
with open(auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
print 'AUTH ERROR'
policy_expiration = datetime.datetime.utcnow() + timedelta(hours=24)
policy_exp = str(policy_expiration).replace(' ', 'T').split('.')[0] + 'Z'
policy_document = ' \
{\"expiration\": \"' + policy_exp + '\", \
\"conditions\": [ \
{\"bucket\": \"' + auth_dict['veda_upload_bucket'] + '\"}, \
[\"starts-with\", \"$key\", \"\"], \
{\"acl\": \"private\"}, \
{\"success_action_redirect\": \"../upload_success/\"}, \
[\"starts-with\", \"$Content-Type\", \"\"], \
[\"content-length-range\", 0, 500000000000] \
] \
} '
abvid_serial = uuid.uuid1().hex[0:10]
policy = base64.b64encode(policy_document)
signature = base64.b64encode(hmac.new(
auth_dict['veda_secret_access_key'],
policy,
hashlib.sha1
).digest())
template = loader.get_template('upload_video.html')
context = RequestContext(
request, ({
'policy': policy,
'signature': signature,
'abvid_serial': abvid_serial,
'access_key': auth_dict['veda_access_key_id']
})
)
return HttpResponse(template.render(context))
def upload_success(request):
template = loader.get_template('upload_success.html')
context = RequestContext(
request, ({})
)
return HttpResponse(template.render(context))
def about_input(request):
if request.method == 'POST':
upload_data = {}
if 'success' in request.POST:
upload_data['abvid_serial'] = request.POST['abvid_serial']
upload_data['success'] = request.POST['success']
goahead = send_to_pipeline(upload_data)
elif 'orig_filename' in request.POST:
upload_data['abvid_serial'] = request.POST['abvid_serial']
upload_data['orig_filename'] = request.POST['orig_filename']
upload_data['goahead'] = False
goahead = validate_incoming(upload_data=upload_data)
else:
upload_data['abvid_serial'] = request.POST['abvid_serial']
upload_data['pm_email'] = request.POST['pm_email']
upload_data['studio_url'] = request.POST['studio_url']
upload_data['course_name'] = request.POST['course_name']
goahead = create_record(upload_data=upload_data)
else:
goahead = False
return HttpResponse(
json.dumps(goahead),
content_type="application/json"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment