Commit 3141cbbe by Gregory Martin Committed by GitHub

Merge pull request #1 from edx/yro/swap-repo

Yro/swap repo
parents 27476b22 0d61645e
.idea/
*.pyc
static/admin/
sandbox.db
[pep8]
max-line-length = 120
ignore = E402
exclude = dependencies, watchdog.py, veda_deliver_xuetang.py, scripts
language: python
python:
- "2.7"
sudo: required
install: pip install -r requirements.txt
install: pip install pep8
# build tests
script:
- python common/tests/test_build.py
- pep8
- cd control/tests nosetests
===========
edx-video-pipeline
===========
Video encode automation django app/control node for edx-platform
----------------------------------------------------------------
**The video pipeline performs the following tasks**
- Ingest (Discovery, Cataloging, Sending tasks to worker cluster)
- Delivery
- Storage
- Maintenance
The video pipeline seeks modularity between parts, and for each part to operate as cleanly and independently as possible.
Each course's workflow operates independently, and workflows can be configured to serve a variety of endpoints.
INGEST:
Currently we ingest remote video from edx-platform via the Studio video upload tool. The videos are discovered by the video pipeline and ingested upon succcessful upload, renamed to an internal ID schema, and routed to the appropriate transcode task cluster.
TRANSCODE:
code for this is housed at https://github.com/edx/edx-video-worker
DELIVERY:
Uploads product videos to specific third-party destinations (YT, AWS, 3Play, cielo24), retrieves URLs/Statuses/products.
STORAGE:
A specified AWS S3 bucket=
MAINTENANCE:
Logging, Data dumping, Celery node status and queue information
.. image:: https://travis-ci.org/edx/edx-video-pipeline.svg?branch=master
:target: https://travis-ci.org/edx/edx-video-pipeline
from django.contrib import admin
from VEDA_OS01.models import Course
from VEDA_OS01.models import Video
from VEDA_OS01.models import Encode
from VEDA_OS01.models import URL
from VEDA_OS01.models import Destination
from VEDA_OS01.models import Institution
from VEDA_OS01.models import VedaUpload
class CourseAdmin(admin.ModelAdmin):
ordering = ['institution']
list_display = [
'course_name',
'course_hold',
'institution',
'edx_classid',
'last_vid_number',
'previous_statechange'
]
list_filter = ['institution']
search_fields = [
'course_name',
'edx_classid',
'institution',
'studio_hex'
]
save_as = True
class VideoAdmin(admin.ModelAdmin):
model = Video
list_display = [
'edx_id',
'client_title',
'studio_id',
'video_trans_start',
'video_trans_status',
'video_active'
]
list_filter = ['inst_class__institution']
search_fields = ['edx_id', 'client_title', 'studio_id']
class EncodeAdmin(admin.ModelAdmin):
model = Encode
ordering = ['encode_name']
list_display = [
'encode_name',
'profile_active',
'encode_filetype',
'get_destination',
'encode_suffix',
'encode_bitdepth',
'product_spec'
]
def get_destination(self, obj):
return obj.encode_destination.destination_name
get_destination.short_description = 'Destination'
save_as = True
class URLAdmin(admin.ModelAdmin):
model = URL
list_display = [
'videoID',
'encode_get',
'encode_url',
'url_date',
'val_input',
'xuetang_input'
]
list_filter = ['videoID__inst_class__institution']
def encode_get(self, obj):
return obj.encode_profile.encode_name
search_fields = [
'videoID__edx_id',
'videoID__client_title',
'encode_url'
]
class DestinationAdmin(admin.ModelAdmin):
model = Destination
list_display = ['destination_name', 'destination_active']
class InstitutionAdmin(admin.ModelAdmin):
model = Institution
list_display = ['institution_name', 'institution_code']
class VideoUploadAdmin(admin.ModelAdmin):
model = VedaUpload
list_display = [
'client_information',
'upload_filename',
'status_email',
'file_complete',
'youtube_id'
]
admin.site.register(Course, CourseAdmin)
admin.site.register(Video, VideoAdmin)
admin.site.register(Encode, EncodeAdmin)
admin.site.register(URL, URLAdmin)
admin.site.register(Destination, DestinationAdmin)
admin.site.register(Institution, InstitutionAdmin)
admin.site.register(VedaUpload, VideoUploadAdmin)
"""
Pipeline API METHODS
1. cheap-o token authorizer
This is a super hacky way to finish the Oauth2 Flow, but I need to move on
will get the token id from a url view, auth it, then push forward with a success bool
"""
import os
import sys
import oauth2_provider
from oauth2_provider import models
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import User
from pipeline_env import *
primary_directory = os.path.dirname(__file__)
sys.path.append(primary_directory)
def token_finisher(token_id):
try:
d = oauth2_provider.models.AccessToken.objects.get(token=token_id)
except:
return False
d.user = User.objects.get(pk=1)
d.save()
try:
token = Token.objects.create(user=d.user)
except:
token = Token.objects.get(user=d.user)
return token.key
if __name__ == '__main__':
pass
#!/usr/bin/env python
import os
import sys
import django
"""
VEDA Environment variables
"""
"""
Import Django Shit
"""
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'common.settings'
django.setup()
from VEDA_OS01.models import Institution
from VEDA_OS01.models import Course
from VEDA_OS01.models import Video
from VEDA_OS01.models import URL
from VEDA_OS01.models import VedaUpload
#! usr/bin/env python
from rest_framework import serializers
from VEDA_OS01.models import Course, Video, URL, Encode
class CourseSerializer(serializers.ModelSerializer):
class Meta:
model = Course
fields = (
'id',
'course_hold',
'review_proc',
'yt_proc',
's3_proc',
'mobile_override',
'course_name',
'institution',
'edx_classid',
'semesterid',
'last_vid_number',
'previous_statechange',
'studio_hex',
'proc_loc',
'sg_projID'
)
def create(self, validated_data, partial=True):
return Course.objects.create(**validated_data)
def update(self, instance, validated_data, partial=True):
instance.course_name = validated_data.get(
'course_name',
instance.course_name
)
instance.course_hold = validated_data.get(
'course_hold',
instance.course_hold
)
instance.last_vid_number = validated_data.get(
'last_vid_number',
instance.last_vid_number
)
instance.previous_statechange = validated_data.get(
'previous_statechange',
instance.previous_statechange
)
instance.save()
return instance
class VideoSerializer(serializers.ModelSerializer):
class Meta:
model = Video
fields = (
'id',
'inst_class',
'edx_id',
'studio_id',
'video_active',
'client_title',
'video_orig_duration',
'video_orig_filesize',
'video_orig_bitrate',
'video_orig_extension',
'video_orig_resolution',
'video_trans_start',
'video_trans_end',
'video_trans_status',
'video_glacierid'
)
def create(self, validated_data):
return Video.objects.create(**validated_data)
def update(self, instance, validated_data):
"""Might be able to pare this down"""
instance.inst_class = validated_data.get(
'inst_class',
instance.inst_class
)
instance.edx_id = validated_data.get(
'edx_id',
instance.edx_id
)
instance.studio_id = validated_data.get(
'studio_id',
instance.studio_id
)
instance.video_active = validated_data.get(
'video_active',
instance.video_active
)
instance.client_title = validated_data.get(
'client_title',
instance.client_title
)
instance.video_orig_duration = validated_data.get(
'video_orig_duration',
instance.video_orig_duration
)
instance.video_orig_extension = validated_data.get(
'video_orig_extension',
instance.video_orig_extension
)
instance.video_trans_start = validated_data.get(
'video_trans_start',
instance.video_trans_start
)
instance.video_trans_end = validated_data.get(
'video_trans_end',
instance.video_trans_end
)
instance.video_trans_status = validated_data.get(
'video_trans_status',
instance.video_trans_status
)
instance.video_glacierid = validated_data.get(
'video_glacierid',
instance.video_glacierid
)
instance.save()
return instance
class EncodeSerializer(serializers.ModelSerializer):
"""
View only Field
"""
class Meta:
model = Encode
fields = (
'id',
'profile_active',
'encode_suffix',
'encode_filetype',
'encode_bitdepth',
'encode_resolution',
'product_spec',
'xuetang_proc',
)
class URLSerializer(serializers.ModelSerializer):
class Meta:
model = URL
fields = (
'id',
'encode_profile',
'videoID',
'encode_url',
'url_date',
'encode_duration',
'encode_bitdepth',
'encode_size',
'val_input',
'xuetang_input',
'md5_sum',
)
def create(self, validated_data):
return URL.objects.create(**validated_data)
"""views"""
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.views.decorators.csrf import csrf_exempt
from rest_framework import renderers
from rest_framework import viewsets
from rest_framework.decorators import detail_route
from rest_framework import filters
from api import token_finisher
from VEDA_OS01.models import Course, Video, URL, Encode
from VEDA_OS01.serializers import CourseSerializer
from VEDA_OS01.serializers import VideoSerializer
from VEDA_OS01.serializers import EncodeSerializer
from VEDA_OS01.serializers import URLSerializer
class CourseViewSet(viewsets.ModelViewSet):
queryset = Course.objects.all()
serializer_class = CourseSerializer
filter_backends = (filters.DjangoFilterBackend,)
filter_fields = (
'institution',
'edx_classid',
'proc_loc',
'course_hold',
'sg_projID'
)
@detail_route(renderer_classes=[renderers.StaticHTMLRenderer])
def highlight(self, request, *args, **kwargs):
course = self.get_object()
return Response(course.highlighted)
@csrf_exempt
def perform_create(self, serializer):
serializer.save()
class VideoViewSet(viewsets.ModelViewSet):
queryset = Video.objects.all()
serializer_class = VideoSerializer
filter_backends = (filters.DjangoFilterBackend,)
filter_fields = ('inst_class', 'edx_id')
@detail_route(renderer_classes=[renderers.StaticHTMLRenderer])
def highlight(self, request, *args, **kwargs):
video = self.get_object()
return Response(video.highlighted)
@csrf_exempt
def perform_create(self, serializer):
serializer.save()
class EncodeViewSet(viewsets.ModelViewSet):
queryset = Encode.objects.all()
serializer_class = EncodeSerializer
filter_backends = (filters.DjangoFilterBackend,)
filter_fields = ('encode_filetype', 'encode_suffix', 'product_spec')
@detail_route(renderer_classes=[renderers.StaticHTMLRenderer])
def highlight(self, request, *args, **kwargs):
encode = self.get_object()
return Response(encode.highlighted)
@csrf_exempt
def perform_create(self, serializer):
serializer.save()
class URLViewSet(viewsets.ModelViewSet):
queryset = URL.objects.all()
serializer_class = URLSerializer
filter_backends = (filters.DjangoFilterBackend,)
filter_fields = (
'videoID__edx_id',
'encode_profile__encode_suffix',
'encode_profile__encode_filetype'
)
@detail_route(renderer_classes=[renderers.StaticHTMLRenderer])
def highlight(self, request, *args, **kwargs):
url = self.get_object()
return Response(url.highlighted)
@csrf_exempt
def perform_create(self, serializer):
serializer.save()
@csrf_exempt
def token_auth(request):
"""
This is a hack to override the "Authorize" step in token generation
"""
if request.method == 'POST':
complete = token_finisher(request.POST['data'])
return HttpResponse(complete)
else:
return HttpResponse(status=404)
def user_login(request):
if request.user.is_authenticated():
return HttpResponseRedirect(request.path)
else:
return HttpResponseRedirect('../admin') # settings.LOGIN_REDIRECT_URL)
if __name__ == "__main__":
course_view()
#!/usr/bin/env python
import os
import sys
import argparse
import yaml
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
class DeliverCli:
"""
Deliver
Command Line Interface
"""
def __init__(self, **kwargs):
self.args = None
self.test = False
self.logging = kwargs.get('logging', True)
self.ROOT_DIR = os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
))
self.auth_yaml = os.path.join(self.ROOT_DIR, 'instance_config.yaml')
self.new_relic_config = os.path.join(self.ROOT_DIR, 'veda_newrelic.ini')
self.celery_daemon = os.path.join(self.ROOT_DIR, 'control', 'celeryapp.py')
def get_args(self):
parser = argparse.ArgumentParser()
parser.usage = '''
{cmd} -l List
[-l ]
Use --help to see all options.
'''.format(cmd=sys.argv[0])
parser.add_argument(
'-l', '--list',
help='Unused, Exit',
action='store_true'
)
self.args = parser.parse_args()
self._parse_args()
def _parse_args(self):
self.list = self.args.list
def run(self):
"""
Launch Celery Delivery Worker
"""
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
auth_dict = None
if auth_dict is not None:
os.system(
' '.join((
'NEW_RELIC_CONFIG_FILE=' + self.new_relic_config,
'newrelic-admin run-program python',
self.celery_daemon,
'worker',
'--loglevel=info',
'--concurrency=' + str(auth_dict['celery_threads']),
'-Q ' + auth_dict['celery_stat_queue'],
'-n deliver.%h'
))
)
def main():
deliverinstance = DeliverCli()
deliverinstance.get_args()
deliverinstance.run()
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
import os
import sys
import argparse
import datetime
from datetime import timedelta
import pytz
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
from control.celeryapp import maintainer_healer
from control.veda_heal import VedaHeal
from VEDA_OS01.models import Video
"""
Deliver
Command Line Interface
"""
class HealCli:
def __init__(self, **kwargs):
self.logging = kwargs.get('logging', True)
self.binscript = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'heal')
def schedule(self):
go_time = datetime.datetime.now(pytz.timezone("America/New_York")) \
.replace(hour=0, minute=0, second=0, microsecond=0) \
.astimezone(pytz.utc) + timedelta(days=1)
maintainer_healer.apply_async((self.binscript,), queue='transcode_stat', eta=go_time)
def main():
"""
Maintenance Daemon + ETA dialer for healing
"""
parser = argparse.ArgumentParser()
parser.usage = '''
{cmd} -i veda_id
{cmd} -c course_id
{cmd} -s schedule
[-i -c -s]
Use --help to see all options.
'''.format(cmd=sys.argv[0])
parser.add_argument(
'-i', '--veda_id', default=None,
help='VEDA ID'
)
parser.add_argument(
'-c', '--course_id',
help='Course ID',
)
parser.add_argument(
'-s', '--schedule',
help='Trigger Scheduler',
action='store_true'
)
args = parser.parse_args()
veda_id = args.veda_id
course_id = args.course_id
schedule = args.schedule
print '%s - %s: %s' % ('Healing', 'VEDA ID', veda_id)
print '%s - %s: %s' % ('Healing', 'Course', course_id)
if veda_id is None and course_id is None and schedule is False:
VH = VedaHeal()
VH.discovery()
VH.purge()
HC = HealCli()
HC.schedule()
return None
if veda_id is not None:
VH = VedaHeal(
video_query=Video.objects.filter(
edx_id=veda_id.strip()
)
)
VH.send_encodes()
return None
if course_id is not None:
VH = VedaHeal(
video_query=Video.objects.filter(
inst_class=Course.objects.filter(
institution=course_id[0:3],
edx_classid=course_id[3:8]
)
)
)
VH.send_encodes()
return None
if schedule is True:
HC = HealCli()
HC.schedule()
return None
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
import os
import sys
import argparse
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
from control.veda_utils import EmailAlert
"""
Ingest
Command Line Interface
"""
class IngestCli():
def __init__(self, **kwargs):
self.args = None
self.test = False
self.logging = kwargs.get('logging', True)
self.course_list = []
def get_args(self):
parser = argparse.ArgumentParser()
parser.usage = '''
{cmd} -l List
[-l ]
Use --help to see all options.
'''.format(cmd=sys.argv[0])
parser.add_argument(
'-l', '--list',
help='List Eligible Courses, Exit',
action='store_true'
)
parser.add_argument(
'-c', '--courseid',
help='List Eligible Courses, Exit',
action='store_true'
)
self.args = parser.parse_args()
self._parse_args()
def _parse_args(self):
self.course_id = self.args.courseid
self.list = self.args.list
def run(self):
"""
Loop, constant video retreival from Remotes
:return: If fault: Email admin list
"""
runcommand = ' '.join((
'python',
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'loop.py'),
'-i'
))
os.system(runcommand)
E1 = EmailAlert(message='Ingest Daemon Crash', subject='Ingest Daemon')
E1.email()
def main():
IC = IngestCli()
IC.get_args()
IC.run()
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
import os
import sys
import argparse
from django.db import reset_queries
import resource
import time
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
"""
This is a cheapo way to get a pager (using SES)
"""
from control.veda_file_discovery import FileDiscovery
from youtube_callback.daemon import generate_course_list
from youtube_callback.sftp_id_retrieve import callfunction
class DaemonCli:
def __init__(self):
self.args = None
self.ingest = False
self.youtube = False
self.course_list = []
def get_args(self):
parser = argparse.ArgumentParser()
parser.usage = '''
{cmd} -ingest IngestDaemon
{cmd} -youtube YoutubeCallbackDaemon
[-i -y]
Use --help to see all options.
'''.format(cmd=sys.argv[0])
parser.add_argument(
'-i', '--ingest',
help='Activate alerted ingest daemon',
action='store_true'
)
parser.add_argument(
'-y', '--youtube',
help='Activate alerted youtube callback daemon',
action='store_true'
)
self.args = parser.parse_args()
self.ingest = self.args.ingest
self.youtube = self.args.youtube
def run(self):
"""
actually run the function
"""
if self.ingest is True:
self.ingest_daemon()
if self.youtube is True:
self.youtube_daemon()
def ingest_daemon(self):
x = 0
while True:
node_work_directory = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
))),
'VEDA_WORKING'
)
FD = FileDiscovery(
node_work_directory=node_work_directory
)
FD.studio_s3_ingest()
FD.about_video_ingest()
reset_queries()
x += 1
if x >= 100:
print 'Memory usage: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
x = 0
def youtube_daemon(self):
x = 0
while True:
self.course_list = generate_course_list()
for course in self.course_list:
print "%s%s: Callback" % (course.institution, course.edx_classid)
callfunction(course)
x += 1
if x >= 100:
print 'Memory usage: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
x = 0
reset_queries()
self.course_list = []
time.sleep(10)
def main():
DC = DaemonCli()
DC.get_args()
DC.run()
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
import os
import sys
import argparse
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
from control.veda_utils import EmailAlert
from youtube_callback.daemon import generate_course_list, get_course
from youtube_callback.sftp_id_retrieve import callfunction
"""
Youtube Callback
Command Line Interface
"""
class YoutubeCallbackCli():
def __init__(self, **kwargs):
self.args = None
self.test = False
self.logging = kwargs.get('logging', True)
self.course_list = []
def get_args(self):
parser = argparse.ArgumentParser()
parser.usage = '''
{cmd} -l List
{cmd} -c course
[-l -c ]
Use --help to see all options.
'''.format(cmd=sys.argv[0])
parser.add_argument(
'-l', '--list',
help='List Eligible Courses, Exit',
action='store_true'
)
parser.add_argument(
'-c', '--courseid',
default=None,
help='Parse Specific Course ID, Exit',
)
self.args = parser.parse_args()
self._parse_args()
def _parse_args(self):
self.course_id = self.args.courseid
self.list = self.args.list
def run(self):
if self.list is True:
self.listcourses()
else:
self.loop()
def loop(self):
"""
Daemon Loop
"""
if self.course_id is not None:
course = get_course(course_id=self.course_id)
if course is not None:
callfunction(course)
return None
runcommand = ' '.join((
'python',
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'loop.py'),
'-y'
))
os.system(runcommand)
E1 = EmailAlert(message='Youtube Callback Daemon Crash', subject='Youtube Callback Daemon')
E1.email()
def listcourses(self):
"""
list and exit
:return:
"""
self.course_list = generate_course_list()
for course in self.course_list:
print course.institution
print course.edx_classid
def main():
YTCC = YoutubeCallbackCli()
YTCC.get_args()
YTCC.run()
if __name__ == '__main__':
sys.exit(main())
"""
edx-video-pipeline's Django Secrets
NEVER SHARE ANYTHING IN HERE, like, EVER
--assume unchanged in git--
"""
import os
DJANGO_SECRET_KEY = ""
DJANGO_DB_USER = ""
DJANGO_DB_PASS = ""
DJANGO_ADMIN = ('', '')
SANDBOX_TOKEN = None
if SANDBOX_TOKEN is not None and SANDBOX_TOKEN in os.path.dirname(__file__):
DEBUG = True
DBHOST = ''
pipeline_dbname = ""
else:
DEBUG = False
DBHOST = ''
pipeline_dbname = ""
"""
Settings
"""
DATABASES = None
import os
from django_secrets import *
ROOT_DIR = os.path.dirname(os.path.dirname((__file__)))
ADMINS = (
DJANGO_ADMIN,
)
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend' # Port the warnings to the backend
MANAGERS = ADMINS
if DATABASES is None:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': pipeline_dbname,
'USER': DJANGO_DB_USER,
'PASSWORD': DJANGO_DB_PASS,
'HOST': DBHOST,
'PORT': '3306',
}
}
# Make this unique, and don't share it with anybody.
SECRET_KEY = DJANGO_SECRET_KEY
ALLOWED_HOSTS = ['*']
TIME_ZONE = 'UTC'
LANGUAGE_CODE = 'en-us'
SITE_ID = 1
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Absolute filesystem path to the directory that will hold user-uploaded files.
STATIC_ROOT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "static")
STATIC_URL = '/static/'
# Additional locations of static files
STATICFILES_DIRS = (
os.path.join(ROOT_DIR, 'templates', 'admin'),
os.path.join(ROOT_DIR, 'templates'),
)
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
)
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [
os.path.join(ROOT_DIR, 'templates')
],
'OPTIONS': {
'context_processors': [
"django.contrib.auth.context_processors.auth",
],
'debug': DEBUG,
'loaders': [
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader'
]
}
},
]
MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
# Oauth2
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'oauth2_provider.middleware.OAuth2TokenMiddleware',
)
AUTHENTICATION_BACKENDS = (
# Oauth2
'django.contrib.auth.backends.ModelBackend',
'oauth2_provider.backends.OAuth2Backend',
)
OAUTH2_PROVIDER = {
# this is the list of available scopes
'SCOPES': {'read': 'Read scope', 'write': 'Write scope', 'groups': 'Access to your groups'}
}
X_FRAME_OPTIONS = 'DENY'
SECURE_SSL_REDIRECT = False
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
ROOT_URLCONF = 'common.urls'
# Python dotted path to the WSGI application used by Django's runserver.
WSGI_APPLICATION = 'common.wsgi.application'
REST_FRAMEWORK = {
'PAGE_SIZE': 10,
'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser',
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
'DEFAULT_AUTHENTICATION_CLASSES': (
'oauth2_provider.ext.rest_framework.OAuth2Authentication',
'rest_framework.authentication.BasicAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.TokenAuthentication',
),
'DEFAULT_FILTER_BACKENDS': (
'rest_framework.filters.DjangoFilterBackend',
),
}
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.admin',
'rest_framework.authtoken',
'oauth2_provider',
'rest_framework',
'corsheaders',
'frontend',
'VEDA_OS01',
)
CORS_ORIGIN_ALLOW_ALL = True
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.JSONSerializer'
# A sample logging configuration. The only tangible logging
# performed by this configuration is to send an email to
# the site admins on every HTTP 500 error when DEBUG=False.
# See http://docs.djangoproject.com/en/dev/topics/logging for
# more details on how to customize your logging configuration.
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse'
}
},
'handlers': {
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': True,
},
}
}
"""
build test
"""
import sys
import unittest
class BuildTest(unittest.TestCase):
def setUp(self):
self.assertTrue(True)
def test_defaults(self):
self.assertTrue(True)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import sys
import os
sys.path.append(os.path.abspath(__file__))
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
from django.conf import settings
from rest_framework.routers import DefaultRouter
from django.conf.urls import patterns, include, url
from django.contrib import admin
from VEDA_OS01 import views
router = DefaultRouter()
admin.autodiscover()
router.register(r'courses', views.CourseViewSet)
router.register(r'videos', views.VideoViewSet)
router.register(r'encodes', views.EncodeViewSet)
router.register(r'urls', views.URLViewSet)
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^static/(?P<path>.*)$', 'django.views.static.serve', {'document_root': settings.STATIC_ROOT}),
# Front End
url(r'^', include('frontend.urls')),
# API
url(r'^login/', views.user_login),
url(r'^accounts/login/$', 'django.contrib.auth.views.login', ),
url(r'^accounts/logout/$', 'django.contrib.auth.views.logout'),
url(r'^o/', include('oauth2_provider.urls', namespace='oauth2_provider')),
url(r'^api/', include(router.urls)),
url(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework')),
# Cheap auth server
url(r'^veda_auth/', views.token_auth)
]
"""
"""
import os
from django.core.wsgi import get_wsgi_application
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.environ.setdefault("PYTHON_EGG_CACHE", BASE_DIR + "/egg_cache")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "common.settings")
application = get_wsgi_application()
from __future__ import absolute_import
import os
import sys
from celery import Celery
import yaml
"""
Start Celery Worker
"""
try:
from control.control_env import *
except:
from control_env import *
try:
from control.veda_deliver import VedaDelivery
except:
from veda_deliver import VedaDelivery
auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
)
with open(auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
auth_dict = None
CEL_BROKER = 'amqp://' + auth_dict['rabbitmq_user'] + ':' + auth_dict['rabbitmq_pass'] + '@' \
+ auth_dict['rabbitmq_broker'] + ':5672//'
CEL_BACKEND = 'amqp://' + auth_dict['rabbitmq_user'] + ':' + auth_dict['rabbitmq_pass'] + \
'@' + auth_dict['rabbitmq_broker'] + ':5672//'
app = Celery(auth_dict['celery_app_name'], broker=CEL_BROKER, backend=CEL_BACKEND, include=[])
app.conf.update(
BROKER_CONNECTION_TIMEOUT=60,
CELERY_IGNORE_RESULT=True,
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_PREFETCH_MULTIPLIER=1,
CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml']
)
@app.task(name='worker_encode')
def worker_task_fire(veda_id, encode_profile, jobid):
pass
@app.task(name='supervisor_deliver')
def deliverable_route(veda_id, encode_profile):
VD = VedaDelivery(
veda_id=veda_id,
encode_profile=encode_profile
)
VD.run()
@app.task
def node_test(command):
os.system(command)
@app.task(name='legacy_heal')
def maintainer_healer(command):
os.system(command)
if __name__ == '__main__':
app.start()
#!/usr/bin/env python
"""
VEDA Environment variables
"""
import os
import sys
import django
from django.utils.timezone import utc
from django.db import reset_queries
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'common.settings'
django.setup()
from VEDA_OS01.models import Institution
from VEDA_OS01.models import Course
from VEDA_OS01.models import Video
from VEDA_OS01.models import Destination
from VEDA_OS01.models import Encode
from VEDA_OS01.models import URL
from VEDA_OS01.models import VedaUpload
"""
Central Config
"""
WORK_DIRECTORY = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'VEDA_WORKING'
)
if not os.path.exists(WORK_DIRECTORY):
os.mkdir(WORK_DIRECTORY)
"""
Occasionally this throws an error in the env,
but on EC2 with v_videocompile it's no biggie
"""
FFPROBE = "ffprobe"
FFMPEG = "ffmpeg"
"""
Generalized display and such
"""
"""
TERM COLORS
"""
NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m'
from ..veda_deliver_cielo import Cielo24Transcript
'''
TEST
list_of_ids = [
'XXXC93BC2016-V000100'
]
for l in list_of_ids:
x = Cielo24Transcript(
veda_id = l
)
output = x.perform_transcription()
print output
'''
import os
import sys
import unittest
"""
Test encode profiler
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_env import *
from veda_encode import VedaEncode
class TestEncode(unittest.TestCase):
def setUp(self):
self.course_object = Course.objects.get(
institution='XXX',
edx_classid='XXXXX'
)
self.veda_id = 'XXXXXXXX2016-V00TEST'
self.E = VedaEncode(
course_object=self.course_object,
veda_id=self.veda_id
)
def test_encode_url(self):
"""
gen baseline, gen a url, test against baseline
"""
URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest()
).delete()
encode_list = self.E.determine_encodes()
baseline = len(encode_list)
self.assertTrue(isinstance(encode_list, list))
self.E.encode_list = []
U = URL(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
encode_profile=Encode.objects.get(product_spec='mobile_low'),
encode_url='THIS Is A TEST'
)
U.save()
encode_list = self.E.determine_encodes()
self.assertTrue(len(encode_list) == baseline - 1)
self.E.encode_list = []
URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
).delete()
encode_list = self.E.determine_encodes()
self.assertTrue(len(encode_list) == baseline)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
'''
Save for poss future test
# import celeryapp
# co = Course.objects.get(institution='XXX', edx_classid='C93BC')
# vid = 'XXXC93BC2016-V003500'
# v = VedaEncode(course_object=co, veda_id=vid)
# encode_list = v.determine_encodes()
# for e in encode_list:
# veda_id = vid
# encode_profile = e
# jobid = uuid.uuid1().hex[0:10]
# # celeryapp.worker_task_fire.apply_async(
# # (veda_id, encode_profile, jobid),
# # queue='encode_worker'
# # )
'''
import os
import sys
import unittest
"""
Test VEDA API
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_file_discovery import FileDiscovery
class TestValidation(unittest.TestCase):
def setUp(self):
self.videofile = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.FD = FileDiscovery()
def test_build(self):
"""
Check a known file for validity
"""
self.assertTrue(True)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
import requests
import ast
import yaml
"""
This is an API connection test
set to pass if instance_config.yaml is missing
"""
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_file_ingest import VideoProto, VedaIngest
requests.packages.urllib3.disable_warnings()
class TestIngest(unittest.TestCase):
def setUp(self):
self.VP = VideoProto(
s3_filename=None,
client_title='OVTESTFILE_01',
file_extension='mp4'
)
self.VI = VedaIngest(
course_object=None,
video_proto=self.VP
)
self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml'
)
def test_file_ingest(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VI.auth_dict is None)
return None
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test heal processor
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_env import *
from veda_heal import VedaHeal
class TestEncode(unittest.TestCase):
def setUp(self):
U = URL(
videoID=Video.objects.filter(edx_id='XXXXXXXX2014-V00TES1').latest(),
encode_profile=Encode.objects.get(product_spec='mobile_low'),
encode_url='THIS Is A TEST')
U.save()
def test_encode_url(self):
H = VedaHeal()
H.discovery()
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test upload processes
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_hotstore import Hotstore
from veda_file_ingest import VideoProto
from veda_env import *
class TestHotstore(unittest.TestCase):
def setUp(self):
VP = VideoProto()
VP.veda_id = 'XXXXXXXX2014-V00TEST'
self.upload_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.H1 = Hotstore(
video_object=VP,
upload_filepath=self.upload_filepath
)
def test_single_upload(self):
if self.H1.auth_dict is None:
self.assertTrue(self.H1.upload() is False)
return None
self.assertTrue(self.H1.upload())
def test_multi_upload(self):
if self.H1.auth_dict is None:
self.assertTrue(self.H1.upload() is None)
return None
self.H1.auth_dict['multi_upload_barrier'] = 0
self.assertTrue(self.H1.upload())
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
A basic unittest for the "Course Addition Tool"
"""
sys.path.append(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
from veda_utils import Report
class TestReporting(unittest.TestCase):
def setUp(self):
self.R = Report(
status='Complete',
upload_serial="4939d60a60",
youtube_id='TEST'
)
def test_conn(self):
if self.R.auth_dict is None:
self.assertTrue(True)
return None
self.R.upload_status()
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
import requests
import ast
import yaml
requests.packages.urllib3.disable_warnings()
"""
This is an API connection test
set to pass if instance_config.yaml is missing
"""
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_val import VALAPICall
from veda_file_ingest import VideoProto
class TestVALAPI(unittest.TestCase):
def setUp(self):
self.VP = VideoProto(
client_title='Test Title',
veda_id='TESTID'
)
self.VAC = VALAPICall(
video_proto=self.VP,
val_status='complete'
)
self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml'
)
def test_val_setup(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VAC.auth_dict is None)
return None
salient_variables = [
'val_api_url',
'val_client_id',
'val_password',
'val_secret_key',
'val_username',
'val_token_url',
]
for s in salient_variables:
self.assertTrue(len(self.VAC.auth_dict[s]) > 0)
def test_val_connection(self):
if not os.path.exists(self.auth_yaml):
self.assertTrue(self.VAC.auth_dict is None)
return None
self.VAC.val_tokengen()
self.assertFalse(self.VAC.val_token is None)
s = requests.get(
self.VAC.auth_dict['val_api_url'],
headers=self.VAC.headers,
timeout=20
)
self.assertFalse(s.status_code == 404)
self.assertFalse(s.status_code > 299)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
"""
Test VEDA API
"""
sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
)))
from veda_video_validation import Validation
class TestValidation(unittest.TestCase):
def setUp(self):
self.videofile = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'test_files',
'OVTESTFILE_01.mp4'
)
self.VALID = Validation(
videofile=self.videofile
)
def test_validation(self):
"""
Check a known file for validity
"""
self.assertTrue(self.VALID.validate())
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
---
# Authentication keys
# VEDA AWS Account
veda_s3_upload_bucket:
veda_s3_hotstore_bucket:
veda_deliverable_bucket:
veda_access_key_id:
veda_secret_access_key:
multi_upload_barrier: 2000000000
# edX AWS Account
edx_aws_user:
edx_access_key_id:
edx_secret_access_key:
edx_s3_ingest_bucket:
edx_s3_endpoint_bucket:
edx_cloudfront_prefix:
# email vars
veda_noreply_email:
admin_email:
# VAL user creds
val_api_url:
val_client_id:
val_password:
val_secret_key:
val_token_url:
val_username:
## Celery Info
celery_app_name:
# can do multiple queues like so: foo,bar,baz
main_celery_queue: encode_worker
largefile_queue_barrier: 1000000000
largefile_celery_queue: large_encode_worker
celery_stat_queue: transcode_stat
celery_threads: 1
rabbitmq_broker:
rabbitmq_pass:
rabbitmq_user:
# Shotgun Variables
sg_server_path:
sg_script_name:
sg_script_key:
# Endpoints
threeplay_ftphost:
xuetang_api_url:
xuetang_api_shared_secret:
##---
# This is a list of encodes and their respective course
# boolean matches
encode_dict:
review_proc:
- review
mobile_override:
- override
s3_proc:
- mobile_high
- mobile_low
- audio_mp3
- desktop_webm
- desktop_mp4
- hls
yt_proc:
- youtube
##---
# This is a list of encode profiles and their val profile matches
# boolean matches
val_profile_dict:
mobile_low:
- mobile_low
desktop_mp4:
- desktop_mp4
override:
- desktop_mp4
- mobile_low
- mobile_high
mobile_high:
- mobile_high
audio_mp3:
- audio_mp3
desktop_webm:
- desktop_webm
youtube:
- youtube
review:
hls:
- hls
#--
# Heal settings
heal_start: 1
heal_end: 144
...
import os
import sys
import requests
from requests.auth import HTTPBasicAuth
import ast
import urllib
"""
Cielo24 API Job Start and Download
Options (reflected in Course.models):
transcription_fidelity =
Mechanical (75%),
Premium (95%)(3-72h),
Professional (99+%)(3-72h)
priority =
standard (24h),
priority (48h)
turnaround_hours = number, overrides 'priority' call, will change a standard to a priority silently
"""
from control_env import *
from veda_utils import ErrorObject, Output
requests.packages.urllib3.disable_warnings()
class Cielo24Transcript():
def __init__(self, veda_id):
self.veda_id = veda_id
'''Defaults'''
self.c24_site = 'https://api.cielo24.com/api'
self.c24_login = '/account/login'
self.c24_joblist = '/job/list'
self.c24_newjob = '/job/new'
self.add_media = '/job/add_media'
self.transcribe = '/job/perform_transcription'
'''Retreive C24 Course-based defaults'''
self.c24_defaults = self.retrieve_defaults()
def perform_transcription(self):
if self.c24_defaults['c24_user'] is None:
return None
'''
GET /api/job/perform_transcription?v=1 HTTP/1.1
&api_token=xxxx
&job_id=xxxx
&transcription_fidelity=PREMIUM&priority=STANDARD
Host: api.cielo24.com
'''
api_token = self.tokengenerator()
if api_token is None:
return None
job_id = self.generate_jobs(api_token)
task_id = self.embed_url(api_token, job_id)
r5 = requests.get(
''.join((
self.c24_site,
self.transcribe,
'?v=1&api_token=',
api_token,
'&job_id=',
job_id,
'&transcription_fidelity=',
self.c24_defaults['c24_fidelity'],
'&priority=',
self.c24_defaults['c24_speed']
))
)
return ast.literal_eval(r5.text)['TaskId']
def retrieve_defaults(self):
video_query = Video.objects.filter(
edx_id=self.veda_id
).latest()
if video_query.inst_class.mobile_override is True:
url_query = URL.objects.filter(
videoID=video_query,
encode_url__icontains='_LBO.mp4',
).latest()
else:
url_query = URL.objects.filter(
videoID=video_query,
encode_url__icontains='_DTH.mp4',
).latest()
if video_query.inst_class.c24_username is None:
ErrorObject.print_error(
message='Cielo24 Record Incomplete',
)
return None
c24_defaults = {
'c24_user': video_query.inst_class.c24_username,
'c24_pass': video_query.inst_class.c24_password,
'c24_speed': video_query.inst_class.c24_speed,
'c24_fidelity': video_query.inst_class.c24_fidelity,
'edx_id': self.veda_id,
'url': url_query.encode_url
}
return c24_defaults
def tokengenerator(self):
token_url = self.c24_site + self.c24_login + \
'?v=1&username=' + self.c24_defaults['c24_user'] + \
'&password=' + self.c24_defaults['c24_pass']
# Generate Token
r1 = requests.get(token_url)
if r1.status_code > 299:
ErrorObject.print_error(
message='Cielo24 API Access Error',
)
return None
api_token = ast.literal_eval(r1.text)["ApiToken"]
return api_token
def listjobs(self):
"""List Jobs"""
api_token = self.tokengenerator()
r2 = requests.get(
''.join((
self.c24_site,
self.c24_joblist,
'?v=1&api_token=',
api_token
))
)
job_list = r2.text
return job_list
def generate_jobs(self, api_token):
"""
'https://api.cielo24.com/job/new?v=1&\
api_token=xxx&job_name=xxx&language=en'
"""
r3 = requests.get(
''.join((
self.c24_site,
self.c24_newjob,
'?v=1&api_token=',
api_token,
'&job_name=',
self.c24_defaults['edx_id'],
'&language=en'
))
)
job_id = ast.literal_eval(r3.text)['JobId']
return job_id
def embed_url(self, api_token, job_id):
"""
GET /api/job/add_media?v=1&api_token=xxxx
&job_id=xxxxx
&media_url=http%3A%2F%2Fwww.domain.com%2Fvideo.mp4 HTTP/1.1
Host: api.cielo24.com
"""
r4 = requests.get(
''.join((
self.c24_site,
self.add_media,
'?v=1&api_token=',
api_token,
'&job_id=',
job_id,
'&media_url=',
urllib.quote_plus(self.c24_defaults['url'])
))
)
print str(r4.status_code) + ' : Cielo24 Status Code'
return ast.literal_eval(r4.text)['TaskId']
def main():
pass
if __name__ == "__main__":
sys.exit(main())
import os
import hashlib
import hmac
import base64
import datetime
import requests
import json
import time
from time import strftime
import yaml
"""
Authored by Ed Zarecor / edx DevOps
included by request
Some adaptations for VEDA:
-auth yaml
**VEDA Note: since this isn't a real CDN, and represents the
'least effort' response to getting video into china,
we shan't monitor for success**
"""
read_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'instance_config.yaml'
)
if os.path.exists(read_yaml):
with open(read_yaml, 'r') as stream:
auth_dict = yaml.load(stream)
API_SHARED_SECRET = auth_dict['xuetang_api_shared_secret']
API_ENDPOINT = auth_dict['xuetang_api_url']
# Currently the API support no query arguments so this component of the signature
# will always be an empty string.
API_QUERY_STRING = ""
SEPERATOR = '*' * 10
"""
This script provides a functions for accessing the Xuetang CDN API
It expects that an environment variable name XUETANG_SHARED_SECRET is
available and refers to a valid secret provided by the Xuetang CDN team.
Running this script will cause a video hosted in cloudfront to be
uploaded to the CDN via the API.
The status of the video will be monitored in a loop, exiting when
the terminal status, available, has been reached.
Finally, the video will be deleted from the cache exercising the
delete functionality.
"""
def _pretty_print_request(req):
"""
Convenience function for pretty printing requests for debugging API
issues.
"""
print('\n'.join(
[SEPERATOR + ' start-request ' + SEPERATOR,
req.method + ' ' + req.url,
'\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
req.body,
SEPERATOR + ' end-request ' + SEPERATOR]))
def build_message(verb, uri, query_string, date, payload_hash):
"""
Builds a message conforming to the Xuetang format for mutual authentication. The
format is defined in their CDN API specification document.
"""
return os.linesep.join([verb, uri, query_string, date, payload_hash])
def sign_message(message, secret):
"""
Returns a hexdigest of HMAC generated using sha256. The value is included in
the HTTP headers and used for mutual authentication via a shared secret.
"""
return hmac.new(bytes(secret), bytes(message), digestmod=hashlib.sha256).hexdigest()
def hex_digest(payload):
"""
returns the sha256 hexdigest of the request payload, typically JSON.
"""
return hashlib.sha256(bytes(payload)).hexdigest()
def get_api_date():
"""
Returns an "RFC8601" date as specified in the Xuetang API specification
"""
return strftime("%Y-%m-%dT%H:%M:%S") + "-{0:04d}".format((time.timezone / 3600) * 100)
def prepare_create_or_update_video(edx_url, download_urls, md5sum):
"""
Returns a prepared HTTP request for initially seeding or updating an edX video
in the Xuetang CDN.
"""
api_target = "/edxvideo"
payload = {'edx_url':edx_url, 'download_url': download_urls, 'md5sum':md5sum}
return _prepare_api_request("POST", api_target, payload)
def prepare_delete_video(edx_url):
"""
Returns a prepared HTTP request for deleting an edX video in the Xuetang CDN.
"""
api_target = "/edxvideo"
payload = {'edx_url':edx_url}
return _prepare_api_request("DELETE", api_target, payload)
def prepare_check_task_status(edx_url):
"""
Returns a prepared HTTP request for checking the status of an edX video
in the Xuetang CDN.
"""
api_target = "/edxtask"
payload = {'edx_url':edx_url}
return _prepare_api_request("POST", api_target, payload)
def _prepare_api_request(http_verb, api_target, payload):
"""
General convenience function for creating prepared HTTP requests that conform the
Xuetang API specificiation.
"""
payload_json = json.dumps(payload)
payload_sha256_hexdigest = hex_digest(payload_json)
date = get_api_date()
message = bytes(build_message(http_verb, api_target, API_QUERY_STRING, date, payload_sha256_hexdigest))
secret = bytes(API_SHARED_SECRET)
signature = sign_message(message, secret)
headers = {"Authentication": "edx {0}".format(signature), "Content-Type": "application/json", "Date": date}
req = requests.Request(http_verb, API_ENDPOINT + api_target, headers=headers, data=payload_json)
return req.prepare()
def _submit_prepared_request(prepared):
"""
General function for submitting prepared HTTP requests.
"""
# Suppress InsecurePlatform warning
requests.packages.urllib3.disable_warnings()
s = requests.Session()
# TODO: enable certificate verification after working through
# certificate issues with Xuetang
return s.send(prepared, timeout=20, verify=False)
if __name__ == '__main__':
# Download URL from the LMS
edx_url = "xxx"
# edx_url = "http://s3.amazonaws.com/edx-course-videos/ut-takemeds/UTXUT401T313-V000300_DTH.mp4"
# A list containing the same URL
download_urls = ["xxx"]
# The md5sum of the video from the s3 ETAG value
md5sum = "xxx"
#
# The code below is a simple test harness for the Xuetang API that
#
# - pushes a new video to the CDN
# - checks the status of the video in a loop until it is available
# - issues a delete request to remove the video from the CDN
#
# upload or update
# prepared = prepare_create_or_update_video(edx_url, download_urls, md5sum)
# _pretty_print_request(prepared)
# print os.linesep
# res = _submit_prepared_request(prepared)
# print res.text
# print os.linesep
# # check status
# while True:
# prepared = prepare_check_task_status(edx_url)
# _pretty_print_request(prepared)
# res = _submit_prepared_request(prepared)
# print res.text
# if res.json()['status'] == 'available':
# break
# time.sleep(5)
# delete file
prepared = prepare_delete_video(edx_url)
_pretty_print_request(prepared)
res = _submit_prepared_request(prepared)
print res.text
# check status
prepared = prepare_check_task_status(edx_url)
_pretty_print_request(prepared)
res = _submit_prepared_request(prepared)
print res.text
import os
import os.path
import sys
import time
import pysftp
"""
Youtube Dynamic Upload
Note: This represents early VEDA work, but is functional
"""
from control_env import *
def printTotals(transferred, toBeTransferred):
"""
try:
sys.stdout.write('\r')
sys.stdout.write("Transferred: {0}\tOut of: {1}\r".format(transferred, toBeTransferred))
sys.stdout.flush()
except:
print 'Callback Failing'
"""
return None
class DeliverYoutube():
def __init__(self, veda_id, encode_profile):
self.veda_id = veda_id
self.encode_profile = encode_profile
self.video = None
self.course = None
self.file = None
self.youtubekey = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'youtubekey'
)
def upload(self):
self.video = Video.objects.filter(
edx_id=self.veda_id
).latest()
if self.encode_profile == 'review':
self.course = Course.objects.get(
institution='EDX',
edx_classid='RVW01'
)
self.file = self.veda_id + '_RVW.mp4'
else:
self.course = self.video.inst_class
self.file = self.veda_id + '_100.mp4'
self.csv_metadata()
self.batch_uploader()
def csv_metadata(self):
"""
Generate Youtube CMS CSV metadata sidecar file
Info: https://support.google.com/youtube/answer/6066171?hl=en (As of 05.2017)
Supported in favor of deprecated YT-XML
Fields in CSV are:
filename,
channel,
custom_id,
add_asset_labels,
title,
description,
keywords,
spoken_language,
caption_file,
caption_language,
category,
privacy,
notify_subscribers,
start_time,end_time,
custom_thumbnail,
ownership,
block_outside_ownership,
usage_policy,
enable_content_id,
reference_exclusions,
match_policy,ad_types,
ad_break_times,
playlist_id,
require_paid_subscription
"""
YOUTUBE_DEFAULT_CSV_COLUMNNAMES = [
'filename',
'channel',
'custom_id',
'add_asset_labels',
'title',
'description',
'keywords',
'spoken_language',
'caption_file',
'caption_language',
'category',
'privacy',
'notify_subscribers',
'start_time,end_time',
'custom_thumbnail',
'ownership',
'block_outside_ownership',
'usage_policy',
'enable_content_id',
'reference_exclusions',
'match_policy,ad_types',
'ad_break_times',
'playlist_id',
'require_paid_subscription'
]
print "%s : %s" % ("Generate CSV", str(self.video.edx_id))
# TODO: Refactor this into centrally located util for escaping bad chars
if self.video.client_title is not None:
try:
self.video.client_title.decode('ascii')
client_title = self.video.client_title
except:
client_title = ''
while len(self.video.client_title) > len(client_title):
try:
char = self.video.client_title[s1].decode('ascii')
except:
char = '-'
client_title += char
else:
client_title = self.file
"""
This is where we can add or subtract file attributes as needed
"""
print self.file
metadata_dict = {
'filename': self.file,
'channel': self.course.yt_channel,
'custom_id': self.video.edx_id,
'title': client_title.replace(',', ''),
'privacy': 'unlisted',
}
# Header Row
output = ','.join(([c for c in YOUTUBE_DEFAULT_CSV_COLUMNNAMES])) + '\n'
# Data Row
output += ','.join(([metadata_dict.get(c, '') for c in YOUTUBE_DEFAULT_CSV_COLUMNNAMES])) # + '\n' <--NO
with open(os.path.join(WORK_DIRECTORY, self.video.edx_id + '_100.csv'), 'w') as c1:
c1.write('%s %s' % (output, '\n'))
def batch_uploader(self):
"""
To successfully upload files to the CMS,
upload file, upload sidecar metadata (xml)
THEN upload empty 'delivery.complete' file
NOTE / TODO:
this doesn't feel like the right solution,
-BUT-
We'll generate a unix timestamp directory,
then use the timestamp to find it later for the
youtube ID / status xml
"""
remote_directory = str(time.time()).split('.')[0]
if not os.path.exists(os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
)):
with open(os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
), 'w') as d1:
d1.write('')
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
with pysftp.Connection(
'partnerupload.google.com',
username=self.course.yt_logon,
private_key=self.youtubekey,
port=19321,
cnopts=cnopts
) as s1:
print "Go for YT : " + str(self.video.edx_id)
s1.mkdir(remote_directory, mode=660)
s1.cwd(remote_directory)
s1.put(
os.path.join(WORK_DIRECTORY, self.file),
callback=printTotals
)
print
s1.put(
os.path.join(WORK_DIRECTORY, self.video.edx_id + '_100.csv'),
callback=printTotals
)
print
s1.put(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'dependencies',
'delivery.complete'
),
callback=printTotals,
confirm=False,
preserve_mtime=False
)
print
os.remove(os.path.join(
WORK_DIRECTORY,
self.video.edx_id + '_100.csv'
))
def main():
pass
if __name__ == "__main__":
sys.exit(main())
import os
import sys
import django
import yaml
import uuid
import newrelic.agent
"""
Get a list of needed encodes from VEDA
* Protected against extant URLs *
"""
from control_env import *
from dependencies.shotgun_api3 import Shotgun
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
class VedaEncode():
"""
This should create a scrubbed list of encode profiles for processing
--NOTE: post-review processing should take place on the mediateam node
"""
def __init__(self, course_object, **kwargs):
self.course_object = course_object
self.encode_list = set()
self.overencode = kwargs.get('overencode', False)
self.veda_id = kwargs.get('veda_id', None)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.encode_dict = self._READ_AUTH()['encode_dict']
self.sg_server_path = self._READ_AUTH()['sg_server_path']
self.sg_script_name = self._READ_AUTH()['sg_script_name']
self.sg_script_key = self._READ_AUTH()['sg_script_key']
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def determine_encodes(self):
self.match_profiles()
for e in self.encode_list.copy():
enc_query = Encode.objects.filter(
product_spec=e
)
if len(enc_query) == 0:
self.encode_list.remove(e)
else:
if enc_query[0].profile_active is False:
self.encode_list.remove(e)
self.query_urls()
if len(self.encode_list) == 0:
return None
send_list = []
for s in self.encode_list.copy():
send_list.append(s)
return send_list
def match_profiles(self):
if self.course_object.review_proc is True and self.veda_id is not None:
"""
Here's where we check if this is review approved
"""
if self.check_review_approved() is False:
for e in self.encode_dict['review_proc']:
self.encode_list.add(e)
return None
if self.course_object.mobile_override is True:
for e in self.encode_dict['mobile_override']:
self.encode_list.add(e)
return None
for key, entry in self.encode_dict.iteritems():
if getattr(self.course_object, key) is True:
if key != 'review_proc':
for e in entry:
self.encode_list.add(e)
def query_urls(self):
"""
To allow the healing process & legacy imports
protection against double encoding -- will take a kwarg to
check against this 'overencode' / in case of a total redo
"""
if self.overencode is True:
return None
if self.veda_id is None:
return None
for l in self.encode_list.copy():
url_query = URL.objects.filter(
videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
encode_profile=Encode.objects.get(product_spec=l.strip())
)
if len(url_query) > 0:
self.encode_list.remove(l)
def check_review_approved(self):
"""
** Mediateam only **
Check in with SG to see if this video
is authorized to go to final publishing
"""
video_object = Video.objects.filter(
edx_id=self.veda_id
).latest()
if video_object.inst_class.sg_projID is None:
return False
sg = Shotgun(
self.sg_server_path,
self.sg_script_name,
self.sg_script_key
)
fields = ['project', 'entity', 'sg_status_list']
filters = [
['step', 'is', {'type': 'Step', 'id': 7}],
['project', 'is', {
"type": "Project",
"id": video_object.inst_class.sg_projID
}],
]
tasks = sg.find("Task", filters, fields)
for t in tasks:
if t['entity']['name'] == self.veda_id.split('-')[-1]:
if t['sg_status_list'] != 'wtg':
return True
return False
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os.path
import boto
import yaml
from boto.s3.connection import S3Connection
import newrelic.agent
try:
boto.config.add_section('Boto')
except:
pass
boto.config.set('Boto', 'http_socket_timeout', '100')
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
multi-point videofile discovery
Currently:
FTP
Amazon S3 (studio-ingest as well as about/marketing
video ingest
)
Local (watchfolder w/o edit priv.)
"""
from control_env import *
from veda_utils import ErrorObject
from veda_file_ingest import VideoProto, VedaIngest
from veda_val import VALAPICall
class FileDiscovery():
def __init__(self, **kwargs):
self.video_info = {}
self.auth_dict = {}
self.auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
)
with open(self.auth_yaml, 'r') as stream:
try:
self.auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
pass
self.bucket = None
"""
FTP Server Vars
"""
self.ftp_key = None
self.ftp_follow_delay = str(5000)
self.ftp_log = "/Users/Shared/edX1/LG/Transfers.log"
self.wfm_log = "/Users/Shared/edX1/LG/WFM.log"
self.ftp_faillog = "/Users/Shared/edX1/LG/FailedTransfers.log"
self.node_work_directory = kwargs.get('node_work_directory', WORK_DIRECTORY)
@newrelic.agent.background_task()
def about_video_ingest(self):
if self.node_work_directory is None:
ErrorObject().print_error(
message='No Workdir'
)
return None
"""
Crawl ingest bucket looking for files
"""
conn = S3Connection(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
"""
Occassional s3 Error
"""
try:
self.bucket = conn.get_bucket(self.auth_dict['veda_s3_upload_bucket'])
except:
return None
for key in self.bucket.list('upload/', '/'):
meta = self.bucket.get_key(key.name)
if meta.name != 'upload/':
self.about_video_validate(
meta=meta,
key=key
)
def about_video_validate(self, meta, key):
abvid_serial = meta.name.split('/')[1]
upload_query = VedaUpload.objects.filter(
video_serial=meta.name.split('/')[1]
)
if len(upload_query) == 0:
'''
Non serialized upload - reject
'''
return None
if upload_query[0].upload_filename is not None:
file_extension = upload_query[0].upload_filename.split('.')[-1]
else:
upload_query[0].upload_filename = 'null_file_name.mp4'
file_extension = 'mp4'
if len(file_extension) > 4:
file_extension = ''
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
upload_query[0].upload_filename
)
)
course_query = Course.objects.get(institution='EDX', edx_classid='ABVID')
"""
Trigger Ingest Process
"""
V = VideoProto(
abvid_serial=abvid_serial,
client_title=upload_query[0].upload_filename.replace('.' + file_extension, ''),
file_extension=file_extension,
)
I = VedaIngest(
course_object=course_query,
video_proto=V,
node_work_directory=self.node_work_directory
)
I.insert()
"""
Move Key out of 'upload' folder
"""
new_key = '/'.join(('process', meta.name.split('/')[1]))
key.copy(self.bucket, new_key)
key.delete()
reset_queries()
@newrelic.agent.background_task()
def studio_s3_ingest(self):
if self.node_work_directory is None:
ErrorObject().print_error(
message='No Workdir'
)
return None
"""
Ingest files from studio upload endpoint
"""
conn = S3Connection(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
"""Occassional s3 Error"""
try:
self.bucket = conn.get_bucket(self.auth_dict['edx_s3_ingest_bucket'])
except:
print 'S3: Ingest Conn Failure'
return None
for key in self.bucket.list('prod-edx/unprocessed/', '/'):
meta = self.bucket.get_key(key.name)
self.studio_s3_validate(
meta=meta,
key=key
)
def studio_s3_validate(self, meta, key):
if meta.get_metadata('course_video_upload_token') is None:
return None
client_title = meta.get_metadata('client_video_id')
course_hex = meta.get_metadata('course_video_upload_token')
course_url = meta.get_metadata('course_key')
edx_filename = key.name[::-1].split('/')[0][::-1]
if len(course_hex) == 0:
return None
course_query = Course.objects.filter(studio_hex=course_hex)
if len(course_query) == 0:
V = VideoProto(
s3_filename=edx_filename,
client_title=client_title,
file_extension='',
platform_course_url=course_url
)
"""
Call VAL Api
"""
val_status = 'invalid_token'
VAC = VALAPICall(
video_proto=V,
val_status=val_status
)
VAC.call()
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1]
key.copy(self.bucket, new_key)
key.delete()
return None
file_extension = client_title[::-1].split('.')[0][::-1]
"""
download file
"""
if len(file_extension) == 3:
try:
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
edx_filename + '.' + file_extension
)
)
file_ingested = True
except:
print 'File Copy Fail: Studio S3 Ingest'
file_ingested = False
else:
try:
meta.get_contents_to_filename(
os.path.join(
self.node_work_directory,
edx_filename
)
)
file_ingested = True
except:
print 'File Copy Fail: Studio S3 Ingest'
file_ingested = False
file_extension = ''
if file_ingested is not True:
# 's3 Bucket ingest Fail'
new_key = 'prod-edx/rejected/' + key.name[::-1].split('/')[0][::-1]
try:
key.copy(self.bucket, new_key)
except:
key.copy(self.bucket, new_key)
key.delete()
return None
"""
Trigger Ingest Process
"""
V = VideoProto(
s3_filename=edx_filename,
client_title=client_title,
file_extension=file_extension,
platform_course_url=course_url
)
I = VedaIngest(
course_object=course_query[0],
video_proto=V,
node_work_directory=self.node_work_directory
)
I.insert()
if I.complete is False:
return None
"""
Delete Original After Copy
"""
new_key = 'prod-edx/processed/' + key.name[::-1].split('/')[0][::-1]
try:
key.copy(self.bucket, new_key)
except:
key.copy(self.bucket, new_key)
# key.copy(self.bucket, new_key)
key.delete()
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import datetime
from datetime import timedelta
import yaml
import uuid
import newrelic.agent
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
Heal Process
Roll through videos, check for completion
- reencode endpoints
- fix data (if wrong), including on VAL
- reschedule self
# Heuristic
# Encode
# Activation
# Logistics
"""
from control_env import *
from veda_encode import VedaEncode
from veda_val import VALAPICall
import celeryapp
time_safetygap = datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(days=1)
class VedaHeal():
"""
"""
def __init__(self, **kwargs):
self.current_time = datetime.datetime.utcnow().replace(tzinfo=utc)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.auth_dict = self._READ_AUTH()
# for individuals
self.video_query = kwargs.get('video_query', None)
self.freezing_bug = kwargs.get('freezing_bug', True)
self.val_status = None
self.retry_barrier_hours = 48
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def discovery(self):
self.video_query = Video.objects.filter(
video_trans_start__lt=self.current_time - timedelta(
hours=self.auth_dict['heal_start']
),
video_trans_start__gt=self.current_time - timedelta(
hours=self.auth_dict['heal_end']
)
)
self.send_encodes()
def send_encodes(self):
for v in self.video_query:
encode_list = self.determine_fault(video_object=v)
"""
Using the 'Video Proto' Model
"""
if self.val_status is not None:
VAC = VALAPICall(
video_proto=None,
video_object=v,
val_status=self.val_status,
)
VAC.call()
self.val_status = None
if len(encode_list) > 0:
"""
send job to queue
"""
if v.video_orig_filesize > self.auth_dict['largefile_queue_barrier']:
cel_queue = self.auth_dict['largefile_celery_queue']
else:
cel_queue = self.auth_dict['main_celery_queue']
for e in encode_list:
veda_id = v.edx_id
encode_profile = e
jobid = uuid.uuid1().hex[0:10]
celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, jobid),
queue=cel_queue
)
@newrelic.agent.background_task()
def determine_fault(self, video_object):
"""
Is there anything to do with this?
"""
if self.freezing_bug is True:
if video_object.video_trans_status == 'Corrupt File':
return []
if video_object.video_trans_status == 'Review Reject':
return []
if video_object.video_trans_status == 'Review Hold':
return []
if video_object.video_active is False:
return []
"""
Finally, determine encodes
"""
E = VedaEncode(
course_object=video_object.inst_class,
veda_id=video_object.edx_id
)
encode_list = E.determine_encodes()
try:
encode_list.remove('review')
except:
pass
"""
Status Cleaning
"""
check_list = []
if encode_list is not None:
for e in encode_list:
if e != 'mobile_high' and e != 'audio_mp3' and e != 'review':
check_list.append(e)
if check_list is None or len(check_list) == 0:
self.val_status = 'file_complete'
"""
File is complete!
Check for data parity, and call done
"""
if video_object.video_trans_status != 'Complete':
Video.objects.filter(
edx_id=video_object.edx_id
).update(
video_trans_status='Complete',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
)
if encode_list is None or len(encode_list) == 0:
return []
"""
get baseline // if there are == encodes and baseline,
mark file corrupt -- just run the query again with
no veda_id
"""
"""
This overrides
"""
if self.freezing_bug is False:
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return encode_list
E2 = VedaEncode(
course_object=video_object.inst_class,
)
E2.determine_encodes()
if len(E2.encode_list) == len(encode_list) and len(encode_list) > 1:
"""
Mark File Corrupt, accounting for migrated URLs
"""
url_test = URL.objects.filter(
videoID=Video.objects.filter(
edx_id=video_object.edx_id
).latest()
)
if video_object.video_trans_start < datetime.datetime.utcnow().replace(tzinfo=utc) - \
timedelta(hours=self.retry_barrier_hours):
if len(url_test) == 0:
Video.objects.filter(
edx_id=video_object.edx_id
).update(
video_trans_status='Corrupt File',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
)
self.val_status = 'file_corrupt'
return []
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return encode_list
@newrelic.agent.background_task()
def purge(self):
"""
Purge Work Directory
"""
for file in os.listdir(WORK_DIRECTORY):
full_filepath = os.path.join(WORK_DIRECTORY, file)
filetime = datetime.datetime.utcfromtimestamp(
os.path.getmtime(
full_filepath
)
).replace(tzinfo=utc)
if filetime < time_safetygap:
print file + " : WORK PURGE"
os.remove(full_filepath)
def main():
VH = VedaHeal()
VH.discovery()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import boto
import boto.s3
from boto.s3.key import Key
import yaml
import shutil
from os.path import expanduser
import newrelic.agent
try:
boto.config.add_section('Boto')
except:
pass
boto.config.set('Boto', 'http_socket_timeout', '100')
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
Upload file to hotstore
"""
from veda_utils import ErrorObject, Output
homedir = expanduser("~")
class Hotstore():
def __init__(self, video_proto, upload_filepath, **kwargs):
self.video_proto = video_proto
self.upload_filepath = upload_filepath
# is this a final/encode file?
self.endpoint = kwargs.get('endpoint', False)
self.auth_yaml = kwargs.get(
'auth_yaml',
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'veda_auth.yaml'
),
)
self.auth_dict = self._READ_AUTH()
self.endpoint_url = None
def _READ_AUTH(self):
if self.auth_yaml is None:
return None
if not os.path.exists(self.auth_yaml):
return None
with open(self.auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
return auth_dict
except yaml.YAMLError as exc:
return None
@newrelic.agent.background_task()
def upload(self):
if self.auth_dict is None:
return False
if not os.path.exists(self.upload_filepath):
ErrorObject().print_error(
message='Hotstore: File Not Found'
)
return False
self.upload_filesize = os.stat(self.upload_filepath).st_size
if self.upload_filesize < self.auth_dict['multi_upload_barrier']:
return self._BOTO_SINGLEPART()
else:
return self._BOTO_MULTIPART()
def _BOTO_SINGLEPART(self):
"""
Upload single part (under threshold in instance_auth)
self.auth_dict['multi_upload_barrier']
"""
if self.endpoint is False:
try:
conn = boto.connect_s3(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
delv_bucket = conn.get_bucket(
self.auth_dict['veda_s3_hotstore_bucket']
)
except:
ErrorObject().print_error(
message='Hotstore: Bucket Connectivity'
)
return False
else:
try:
conn = boto.connect_s3(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
delv_bucket = conn.get_bucket(
self.auth_dict['edx_s3_endpoint_bucket']
)
except:
ErrorObject().print_error(
message='Endpoint: Bucket Connectivity'
)
return False
upload_key = Key(delv_bucket)
upload_key.key = '.'.join((
self.video_proto.veda_id,
self.upload_filepath.split('.')[-1]
))
try:
upload_key.set_contents_from_filename(self.upload_filepath)
return True
except:
upload_key.set_contents_from_filename(self.upload_filepath)
return True
def _BOTO_MULTIPART(self):
"""
Split file into chunks, upload chunks
NOTE: this should never happen, as your files should be much
smaller than this, but one never knows
"""
path_to_multipart = os.path.dirname(self.upload_filepath)
filename = os.path.basename(self.upload_filepath)
"""
This is modular
"""
if not os.path.exists(os.path.join(path_to_multipart, filename.split('.')[0])):
os.mkdir(os.path.join(path_to_multipart, filename.split('.')[0]))
os.chdir(os.path.join(path_to_multipart, filename.split('.')[0]))
"""
Split File into chunks
"""
split_command = 'split -b10m -a5' # 5 part names of 5mb
sys.stdout.write('%s : %s\n' % (filename, 'Generating Multipart'))
os.system(' '.join((split_command, self.upload_filepath)))
sys.stdout.flush()
"""
Connect to s3
"""
if self.endpoint is False:
try:
c = boto.connect_s3(
self.auth_dict['veda_access_key_id'],
self.auth_dict['veda_secret_access_key']
)
b = c.lookup(self.auth_dict['veda_s3_hotstore_bucket'])
except:
ErrorObject().print_error(
message='Hotstore: Bucket Connectivity'
)
return False
else:
try:
c = boto.connect_s3(
self.auth_dict['edx_access_key_id'],
self.auth_dict['edx_secret_access_key']
)
b = c.lookup(self.auth_dict['edx_s3_endpoint_bucket'])
except:
ErrorObject().print_error(
message='Endpoint: Bucket Connectivity'
)
return False
if b is None:
ErrorObject().print_error(
message='Deliverable Fail: s3 Bucket Error'
)
return False
"""
Upload and stitch parts // with a decent display
"""
mp = b.initiate_multipart_upload(
'.'.join((
self.video_proto.veda_id,
filename.split('.')[-1]
))
)
x = 1
for file in sorted(os.listdir(
os.path.join(
path_to_multipart,
filename.split('.')[0]
)
)):
sys.stdout.write('%s : %s\r' % (file, 'uploading part'))
fp = open(file, 'rb')
mp.upload_part_from_file(fp, x)
fp.close()
sys.stdout.flush()
x += 1
sys.stdout.write('\n')
mp.complete_upload()
"""
Clean up multipart
"""
os.chdir(homedir)
shutil.rmtree(os.path.join(path_to_multipart, filename.split('.')[0]))
return True
def main():
pass
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import subprocess
import fnmatch
import django
import newrelic.agent
newrelic.agent.initialize(
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_newrelic.ini'
)
)
"""
VEDA Intake/Product Final Testing Suite
Should Test for:
0 size
Corrupt Files
image files (which read as 0:00 duration or N/A)
Mismatched Durations (within 5 sec)
"""
from control_env import *
class Validation():
"""
Expects a full filepath
"""
def __init__(self, videofile, **kwargs):
self.videofile = videofile
self.mezzanine = kwargs.get('mezzanine', True)
self.veda_id = kwargs.get('veda_id', False)
def seconds_conversion(self, duration):
hours = float(duration.split(':')[0])
minutes = float(duration.split(':')[1])
seconds = float(duration.split(':')[2])
seconds_duration = (((hours * 60) + minutes) * 60) + seconds
return seconds_duration
@newrelic.agent.background_task()
def validate(self):
"""
Test #1 - assumes file is in 'work' directory of node
"""
ff_command = ' '.join((
FFPROBE,
"\"" + self.videofile + "\""
))
"""
Test if size is zero
"""
if int(os.path.getsize(self.videofile)) == 0:
print 'Corrupt: Invalid'
return False
p = subprocess.Popen(ff_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
for line in iter(p.stdout.readline, b''):
if "Invalid data found when processing input" in line:
print 'Corrupt: Invalid'
return False
if "multiple edit list entries, a/v desync might occur, patch welcome" in line:
return False
if "Duration: " in line:
if "Duration: 00:00:00.0" in line:
return False
elif "Duration: N/A, " in line:
return False
video_duration = line.split(',')[0][::-1].split(' ')[0][::-1]
try:
str(video_duration)
except:
return False
p.kill()
"""
Compare Product to DB averages - pass within 5 sec
"""
if self.mezzanine is True:
return True
if self.veda_id is None:
print 'Error: Validation, encoded file no comparison ID'
return False
try:
video_query = Video.objects.filter(edx_id=self.veda_id).latest()
except:
return False
product_duration = float(
self.seconds_conversion(
duration=video_duration
)
)
data_duration = float(
self.seconds_conversion(
duration=video_query.video_orig_duration
)
)
"""
Final Test
"""
if (data_duration - 5) <= product_duration <= (data_duration + 5):
return True
else:
return False
def main():
pass
# V = Validation(videofile='/Users/ernst/VEDA_WORKING/fecf210f-0e94-4627-8ac3-46c2338e5897.mp4')
# print V.validate()
# # def __init__(self, videofile, **kwargs):
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/python
"""
Simple Watchdog Timer
"""
'''
Stolen From:
--------------------------------------------------------------------------------
Module Name: watchdog.py
Author: Jon Peterson (PIJ)
Description: This module implements a simple watchdog timer for Python.
--------------------------------------------------------------------------------
Copyright (c) 2012, Jon Peterson
--------------------------------------------------------------------------------
'''
# Imports
from time import sleep
from threading import Timer
import thread
# Watchdog Class
class Watchdog(object):
def __init__(self, time=1.0):
''' Class constructor. The "time" argument has the units of seconds. '''
self._time = time
return
def StartWatchdog(self):
''' Starts the watchdog timer. '''
self._timer = Timer(self._time, self._WatchdogEvent)
self._timer.daemon = True
self._timer.start()
return
def PetWatchdog(self):
''' Reset watchdog timer. '''
self.StopWatchdog()
self.StartWatchdog()
return
def _WatchdogEvent(self):
'''
This internal method gets called when the timer triggers. A keyboard
interrupt is generated on the main thread. The watchdog timer is stopped
when a previous event is tripped.
'''
print 'Watchdog event...'
self.StopWatchdog()
thread.interrupt_main()
# thread.interrupt_main()
# thread.interrupt_main()
return
def StopWatchdog(self):
''' Stops the watchdog timer. '''
self._timer.cancel()
def main():
''' This function is used to unit test the watchdog module. '''
w = Watchdog(1.0)
w.StartWatchdog()
for i in range(0, 11):
print 'Testing %d...' % i
try:
if (i % 3) == 0:
sleep(1.5)
else:
sleep(0.5)
except:
print 'MAIN THREAD KNOWS ABOUT WATCHDOG'
w.PetWatchdog()
w.StopWatchdog() # Not strictly necessary
return
if __name__ == '__main__':
main()
<?xml version="1.0"?>
<feed xmlns="http://www.youtube.com/schemas/cms/2.0" notification_email="greg@edx.org">
<asset type="web" tag="">
<title> </title>
<custom_id> </custom_id>
<!-- other asset details included in feed -->
</asset>
<file type="video" tag="">
<filename> </filename>
</file>
<ownership/>
<relationship>
<item path="/feed/asset[@tag='']"/>
<related_item path="/feed/ownership[1]"/>
</relationship>
<video tag="">
<title> </title>
<channel> </channel>
<description> </description>
<keyword>edX</keyword>
<genre>Education</genre>
<allow_embedding>True</allow_embedding>
<allow_comments>Approve</allow_comments>
<allow_ratings>True</allow_ratings>
<!-- <allow_responses>Approve</allow_responses> -->
<public>unlisted</public>
<!-- <start_time>1990-11-30T12:00:00Z</start_time> -->
</video>
<relationship>
<item path="/feed/file[@tag='']"/>
<related_item path="/feed/asset[@tag='']"/>
<related_item path="/feed/video[@tag='']"/>
</relationship>
<rights_admin type="usage" owner="True"/>
<rights_admin type="match" owner="True"/>
<!-- declare a one-off policy -->
<rights_policy>
<rule action="track"/>
</rights_policy>
<!-- Claim our uploaded video with the one-off policy -->
<claim type="audiovisual" video="/feed/video[@tag='']" asset="/feed/asset[@tag='']" rights_admin="/feed/rights_admin[@type='usage']" rights_policy="/feed/rights_policy[1]"/>
<relationship>
<item path="/feed/rights_admin[@type='match']"/>
<item path="/feed/rights_policy[1]"/>
<related_item path="/feed/asset[@tag='']"/>
</relationship>
</feed>
\ No newline at end of file
from shotgun import (Shotgun, ShotgunError, ShotgunFileDownloadError, Fault,
ProtocolError, ResponseError, Error, __version__)
from shotgun import SG_TIMEZONE as sg_timezone
"""
iri2uri
Converts an IRI to a URI.
"""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
__contributors__ = []
__version__ = "1.0.0"
__license__ = "MIT"
__history__ = """
"""
import urlparse
# Convert an IRI to a URI following the rules in RFC 3987
#
# The characters we need to enocde and escape are defined in the spec:
#
# iprivate = %xE000-F8FF / %xF0000-FFFFD / %x100000-10FFFD
# ucschar = %xA0-D7FF / %xF900-FDCF / %xFDF0-FFEF
# / %x10000-1FFFD / %x20000-2FFFD / %x30000-3FFFD
# / %x40000-4FFFD / %x50000-5FFFD / %x60000-6FFFD
# / %x70000-7FFFD / %x80000-8FFFD / %x90000-9FFFD
# / %xA0000-AFFFD / %xB0000-BFFFD / %xC0000-CFFFD
# / %xD0000-DFFFD / %xE1000-EFFFD
escape_range = [
(0xA0, 0xD7FF ),
(0xE000, 0xF8FF ),
(0xF900, 0xFDCF ),
(0xFDF0, 0xFFEF),
(0x10000, 0x1FFFD ),
(0x20000, 0x2FFFD ),
(0x30000, 0x3FFFD),
(0x40000, 0x4FFFD ),
(0x50000, 0x5FFFD ),
(0x60000, 0x6FFFD),
(0x70000, 0x7FFFD ),
(0x80000, 0x8FFFD ),
(0x90000, 0x9FFFD),
(0xA0000, 0xAFFFD ),
(0xB0000, 0xBFFFD ),
(0xC0000, 0xCFFFD),
(0xD0000, 0xDFFFD ),
(0xE1000, 0xEFFFD),
(0xF0000, 0xFFFFD ),
(0x100000, 0x10FFFD)
]
def encode(c):
retval = c
i = ord(c)
for low, high in escape_range:
if i < low:
break
if i >= low and i <= high:
retval = "".join(["%%%2X" % ord(o) for o in c.encode('utf-8')])
break
return retval
def iri2uri(uri):
"""Convert an IRI to a URI. Note that IRIs must be
passed in a unicode strings. That is, do not utf-8 encode
the IRI before passing it into the function."""
if isinstance(uri ,unicode):
(scheme, authority, path, query, fragment) = urlparse.urlsplit(uri)
authority = authority.encode('idna')
# For each character in 'ucschar' or 'iprivate'
# 1. encode as utf-8
# 2. then %-encode each octet of that utf-8
uri = urlparse.urlunsplit((scheme, authority, path, query, fragment))
uri = "".join([encode(c) for c in uri])
return uri
if __name__ == "__main__":
import unittest
class Test(unittest.TestCase):
def test_uris(self):
"""Test that URIs are invariant under the transformation."""
invariant = [
u"ftp://ftp.is.co.za/rfc/rfc1808.txt",
u"http://www.ietf.org/rfc/rfc2396.txt",
u"ldap://[2001:db8::7]/c=GB?objectClass?one",
u"mailto:John.Doe@example.com",
u"news:comp.infosystems.www.servers.unix",
u"tel:+1-816-555-1212",
u"telnet://192.0.2.16:80/",
u"urn:oasis:names:specification:docbook:dtd:xml:4.1.2" ]
for uri in invariant:
self.assertEqual(uri, iri2uri(uri))
def test_iri(self):
""" Test that the right type of escaping is done for each part of the URI."""
self.assertEqual("http://xn--o3h.com/%E2%98%84", iri2uri(u"http://\N{COMET}.com/\N{COMET}"))
self.assertEqual("http://bitworking.org/?fred=%E2%98%84", iri2uri(u"http://bitworking.org/?fred=\N{COMET}"))
self.assertEqual("http://bitworking.org/#%E2%98%84", iri2uri(u"http://bitworking.org/#\N{COMET}"))
self.assertEqual("#%E2%98%84", iri2uri(u"#\N{COMET}"))
self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"))
self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")))
self.assertNotEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode('utf-8')))
unittest.main()
#! /opt/local/bin/python
# ----------------------------------------------------------------------------
# SG_TIMEZONE module
# this is rolled into the this shotgun api file to avoid having to require
# current users of api2 to install new modules and modify PYTHONPATH info.
# ----------------------------------------------------------------------------
class SgTimezone(object):
from datetime import tzinfo, timedelta, datetime
import time as _time
ZERO = timedelta(0)
STDOFFSET = timedelta(seconds = -_time.timezone)
if _time.daylight:
DSTOFFSET = timedelta(seconds = -_time.altzone)
else:
DSTOFFSET = STDOFFSET
DSTDIFF = DSTOFFSET - STDOFFSET
def __init__(self):
self.utc = self.UTC()
self.local = self.LocalTimezone()
class UTC(tzinfo):
def utcoffset(self, dt):
return SgTimezone.ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return SgTimezone.ZERO
class LocalTimezone(tzinfo):
def utcoffset(self, dt):
if self._isdst(dt):
return SgTimezone.DSTOFFSET
else:
return SgTimezone.STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return SgTimezone.DSTDIFF
else:
return SgTimezone.ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.weekday(), 0, -1)
import time as _time
stamp = _time.mktime(tt)
tt = _time.localtime(stamp)
return tt.tm_isdst > 0
"""Drop-in replacement for collections.OrderedDict by Raymond Hettinger
http://code.activestate.com/recipes/576693/
"""
from UserDict import DictMixin
# Modified from original to support Python 2.4, see
# http://code.google.com/p/simplejson/issues/detail?id=53
try:
all
except NameError:
def all(seq):
for elem in seq:
if not elem:
return False
return True
class OrderedDict(dict, DictMixin):
def __init__(self, *args, **kwds):
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__end
except AttributeError:
self.clear()
self.update(*args, **kwds)
def clear(self):
self.__end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.__map = {} # key --> [key, prev, next]
dict.clear(self)
def __setitem__(self, key, value):
if key not in self:
end = self.__end
curr = end[1]
curr[2] = end[1] = self.__map[key] = [key, curr, end]
dict.__setitem__(self, key, value)
def __delitem__(self, key):
dict.__delitem__(self, key)
key, prev, next = self.__map.pop(key)
prev[2] = next
next[1] = prev
def __iter__(self):
end = self.__end
curr = end[2]
while curr is not end:
yield curr[0]
curr = curr[2]
def __reversed__(self):
end = self.__end
curr = end[1]
while curr is not end:
yield curr[0]
curr = curr[1]
def popitem(self, last=True):
if not self:
raise KeyError('dictionary is empty')
# Modified from original to support Python 2.4, see
# http://code.google.com/p/simplejson/issues/detail?id=53
if last:
key = reversed(self).next()
else:
key = iter(self).next()
value = self.pop(key)
return key, value
def __reduce__(self):
items = [[k, self[k]] for k in self]
tmp = self.__map, self.__end
del self.__map, self.__end
inst_dict = vars(self).copy()
self.__map, self.__end = tmp
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def keys(self):
return list(self)
setdefault = DictMixin.setdefault
update = DictMixin.update
pop = DictMixin.pop
values = DictMixin.values
items = DictMixin.items
iterkeys = DictMixin.iterkeys
itervalues = DictMixin.itervalues
iteritems = DictMixin.iteritems
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
def copy(self):
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
if isinstance(other, OrderedDict):
return len(self)==len(other) and \
all(p==q for p, q in zip(self.items(), other.items()))
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
"""JSON token scanner
"""
import re
def _import_c_make_scanner():
try:
from simplejson._speedups import make_scanner
return make_scanner
except ImportError:
return None
c_make_scanner = _import_c_make_scanner()
__all__ = ['make_scanner']
NUMBER_RE = re.compile(
r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?',
(re.VERBOSE | re.MULTILINE | re.DOTALL))
def py_make_scanner(context):
parse_object = context.parse_object
parse_array = context.parse_array
parse_string = context.parse_string
match_number = NUMBER_RE.match
encoding = context.encoding
strict = context.strict
parse_float = context.parse_float
parse_int = context.parse_int
parse_constant = context.parse_constant
object_hook = context.object_hook
object_pairs_hook = context.object_pairs_hook
memo = context.memo
def _scan_once(string, idx):
try:
nextchar = string[idx]
except IndexError:
raise StopIteration
if nextchar == '"':
return parse_string(string, idx + 1, encoding, strict)
elif nextchar == '{':
return parse_object((string, idx + 1), encoding, strict,
_scan_once, object_hook, object_pairs_hook, memo)
elif nextchar == '[':
return parse_array((string, idx + 1), _scan_once)
elif nextchar == 'n' and string[idx:idx + 4] == 'null':
return None, idx + 4
elif nextchar == 't' and string[idx:idx + 4] == 'true':
return True, idx + 4
elif nextchar == 'f' and string[idx:idx + 5] == 'false':
return False, idx + 5
m = match_number(string, idx)
if m is not None:
integer, frac, exp = m.groups()
if frac or exp:
res = parse_float(integer + (frac or '') + (exp or ''))
else:
res = parse_int(integer)
return res, m.end()
elif nextchar == 'N' and string[idx:idx + 3] == 'NaN':
return parse_constant('NaN'), idx + 3
elif nextchar == 'I' and string[idx:idx + 8] == 'Infinity':
return parse_constant('Infinity'), idx + 8
elif nextchar == '-' and string[idx:idx + 9] == '-Infinity':
return parse_constant('-Infinity'), idx + 9
else:
raise StopIteration
def scan_once(string, idx):
try:
return _scan_once(string, idx)
finally:
memo.clear()
return scan_once
make_scanner = c_make_scanner or py_make_scanner
r"""Command-line tool to validate and pretty-print JSON
Usage::
$ echo '{"json":"obj"}' | python -m simplejson.tool
{
"json": "obj"
}
$ echo '{ 1.2:3.4}' | python -m simplejson.tool
Expecting property name: line 1 column 2 (char 2)
"""
import sys
import simplejson as json
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'rb')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'rb')
outfile = open(sys.argv[2], 'wb')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
try:
obj = json.load(infile,
object_pairs_hook=json.OrderedDict,
use_decimal=True)
except ValueError, e:
raise SystemExit(e)
json.dump(obj, outfile, sort_keys=True, indent=' ', use_decimal=True)
outfile.write('\n')
if __name__ == '__main__':
main()
import os
import sys
import logging
from shotgun_api3.lib.httplib2 import Http, ProxyInfo, socks
from shotgun_api3.lib.sgtimezone import SgTimezone
from shotgun_api3.lib.xmlrpclib import Error, ProtocolError, ResponseError
LOG = logging.getLogger("shotgun_api3")
LOG.setLevel(logging.WARN)
try:
import simplejson as json
except ImportError:
LOG.debug("simplejson not found, dropping back to json")
try:
import json as json
except ImportError:
LOG.debug("json not found, dropping back to embedded simplejson")
# We need to munge the path so that the absolute imports in simplejson will work.
dir_path = os.path.dirname(__file__)
lib_path = os.path.join(dir_path, 'lib')
sys.path.append(lib_path)
import shotgun_api3.lib.simplejson as json
sys.path.pop()
import sys
import os
import logging
from .lib.httplib2 import Http, ProxyInfo, socks
from .lib.sgtimezone import SgTimezone
from .lib.xmlrpclib import Error, ProtocolError, ResponseError
LOG = logging.getLogger("shotgun_api3")
LOG.setLevel(logging.WARN)
try:
import simplejson as json
except ImportError:
LOG.debug("simplejson not found, dropping back to json")
try:
import json as json
except ImportError:
LOG.debug("json not found, dropping back to embedded simplejson")
# We need to munge the path so that the absolute imports in simplejson will work.
dir_path = os.path.dirname(__file__)
lib_path = os.path.join(dir_path, 'lib')
sys.path.append(lib_path)
from .lib import simplejson as json
sys.path.pop()
import os
import sys
from email.mime.text import MIMEText
from datetime import date
import boto.ses
import yaml
'''
ABVID REPORTING - email / etc.
'''
from frontend_env import *
'''
v1 = Video.objects.filter(edx_id = upload_info['edx_id'])
if v1.abvid_serial != None:
sys.path.append(up_twodirectory + '/VEDA_OS/VEDA_FE/')
import abvid_reporting
abvid_reporting.report_status(
status="Youtube Duplicate",
abvid_serial = v1.abvid_serial,
youtube_id = ''
)
'''
"""
get auth keys from instance yaml
"""
auth_yaml = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'frontend_auth.yaml'
)
with open(auth_yaml, 'r') as stream:
try:
auth_dict = yaml.load(stream)
except yaml.YAMLError as exc:
print 'AUTH ERROR'
def report_status(status, abvid_serial, youtube_id):
v1 = VedaUpload.objects.filter(video_serial=abvid_serial).latest()
if len(youtube_id) > 0:
excuse = ''
if 'Duplicate' in status or 'Corrupt' in status:
if v1.final_report is True:
send_email = False
# pass
else:
excuse = 'There has been a failure for the following reason : ' + status
final_success = 'FAILED'
youtube_id = ''
VedaUpload.objects.filter(
pk=v1.pk
).update(
file_complete=False,
final_report=True,
file_valid=False
)
send_email = True
elif 'Complete' in status:
excuse = 'This file is complete.'
final_success = 'SUCCESS'
VedaUpload.objects.filter(
pk=v1.pk
).update(
file_complete=True,
final_report=True,
file_valid=True,
youtube_id=youtube_id
)
send_email = True
if send_email is True:
email_subject = 'VEDA / edX About Video Status Update : ' + final_success
email_body = 'This is an auto generated message:\n\n'
email_body += 'An edX partner uploaded a new about video:\n\n'
email_body += 'STATUS : ' + excuse + '\n\n'
if len(youtube_id) > 0:
email_body += 'Youtube URL : https://www.youtube.com/watch?v=' + youtube_id + '\n\n'
if v1.upload_filename is not None:
email_body += 'Filename : ' + v1.upload_filename + '\n'
email_body += 'Upload Date : ' + str(v1.upload_date) + '(UTC)\n'
email_body += 'Course Title (optional) : ' + v1.client_information + '\n'
email_body += 'edX Studio Course URL : ' + v1.edx_studio_url + '\n\n'
email_body += 'Please do not reply to this email.\n\n <<EOM'
conn = boto.ses.connect_to_region('us-east-1')
conn.send_email(
auth_dict['veda_noreply_email'],
email_subject,
email_body,
[v1.status_email, auth_dict['admin_email']]
)
if __name__ == '__main__':
report_status(status='Complete', abvid_serial="5c34a85e5f", youtube_id='TEST')
'''
About Video Input and Validation
'''
import os
import sys
import datetime
from frontend_env import *
def create_record(upload_data):
"""
Create Record
"""
ul1 = VedaUpload(
video_serial=upload_data['abvid_serial'],
edx_studio_url=upload_data['studio_url'],
client_information=upload_data['course_name'],
status_email=upload_data['pm_email'],
file_valid=False,
file_complete=False,
)
try:
ul1.save()
return True
except:
return False
def validate_incoming(upload_data):
ul2 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
upload_date=datetime.datetime.utcnow().replace(tzinfo=utc),
upload_filename=upload_data['orig_filename'],
)
return True
def send_to_pipeline(upload_data):
ul3 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
file_valid=upload_data['success'],
)
if upload_data['success'] == 'true':
print 'Sending File to Pipeline'
else:
ul3 = VedaUpload.objects.filter(
video_serial=upload_data['abvid_serial']
).update(
comment='Failed Upload',
)
return False
if __name__ == '__main__':
upload_data = {}
upload_data['abvid_serial'] = '19e1e1c78e'
upload_data['success'] = 'true'
send_to_pipeline(upload_data)
from django.contrib import admin
# Register your models here.
---
# Instance Authentication
## Email Vars
veda_noreply_email:
admin_email:
## VEDA AWS Account
veda_access_key_id:
veda_secret_access_key:
veda_upload_bucket:
...
#!/usr/bin/env python
"""
VEDA Environment variables
"""
import os
import sys
import django
from django.utils.timezone import utc
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if project_path not in sys.path:
sys.path.append(project_path)
os.environ['DJANGO_SETTINGS_MODULE'] = 'VEDA.settings'
django.setup()
from VEDA_OS01.models import Institution
from VEDA_OS01.models import Course
from VEDA_OS01.models import Video
from VEDA_OS01.models import URL
from VEDA_OS01.models import VedaUpload
"""
TERM COLORS
"""
NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m'
VEDA_UTCOFFSET = -4
This diff is collapsed. Click to expand it.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed. Click to expand it.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment