Commit 0ac0d915 by Gregory Martin Committed by GitHub

Merge pull request #26 from edx/yro/update_heal_process

Update heal commands
parents 13fb273e 6ecc6b67
...@@ -21,5 +21,5 @@ with open(read_yaml, 'r') as stream: ...@@ -21,5 +21,5 @@ with open(read_yaml, 'r') as stream:
DJANGO_SECRET_KEY = return_dict['django_secret_key'] or 'test_secret_key' DJANGO_SECRET_KEY = return_dict['django_secret_key'] or 'test_secret_key'
DJANGO_ADMIN = ('', '') DJANGO_ADMIN = ('', '')
DEBUG = True DJANGO_DEBUG = return_dict['debug']
DATABASES = return_dict['DATABASES'] DATABASES = return_dict['DATABASES']
...@@ -9,7 +9,6 @@ from django_secrets import * ...@@ -9,7 +9,6 @@ from django_secrets import *
ROOT_DIR = os.path.dirname(os.path.dirname((__file__))) ROOT_DIR = os.path.dirname(os.path.dirname((__file__)))
ADMINS = ( ADMINS = (
DJANGO_ADMIN, DJANGO_ADMIN,
) )
...@@ -33,6 +32,8 @@ if DATABASES is None: ...@@ -33,6 +32,8 @@ if DATABASES is None:
# Make this unique, and don't share it with anybody. # Make this unique, and don't share it with anybody.
SECRET_KEY = DJANGO_SECRET_KEY SECRET_KEY = DJANGO_SECRET_KEY
DEBUG = DJANGO_DEBUG
ALLOWED_HOSTS = ['*'] ALLOWED_HOSTS = ['*']
TIME_ZONE = 'UTC' TIME_ZONE = 'UTC'
LANGUAGE_CODE = 'en-us' LANGUAGE_CODE = 'en-us'
......
...@@ -104,6 +104,9 @@ def main(): ...@@ -104,6 +104,9 @@ def main():
VH.send_encodes() VH.send_encodes()
return None return None
# TODO: Data backup
# TODO: API key purge
if schedule is True: if schedule is True:
HC = HealCli() HC = HealCli()
HC.schedule() HC.schedule()
......
import ast
import os import os
import sys import sys
import unittest import unittest
import requests import requests
import ast
import yaml import yaml
from veda_file_ingest import VedaIngest, VideoProto
""" """
This is an API connection test This is an API connection test
set to pass if instance_config.yaml is missing set to pass if instance_config.yaml is missing
""" """
sys.path.append(os.path.dirname(os.path.dirname(__file__))) sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_file_ingest import VideoProto, VedaIngest
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
...@@ -29,16 +31,9 @@ class TestIngest(unittest.TestCase): ...@@ -29,16 +31,9 @@ class TestIngest(unittest.TestCase):
course_object=None, course_object=None,
video_proto=self.VP video_proto=self.VP
) )
self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml'
)
def test_file_ingest(self): def test_file_ingest(self):
if not os.path.exists(self.auth_yaml): pass
self.assertTrue(self.VI.auth_dict is None)
return None
def main(): def main():
unittest.main() unittest.main()
......
"""
Tests HEAL process
"""
import datetime
import os import os
import sys from datetime import timedelta
import unittest from unittest import TestCase
""" import yaml
Test heal processor from ddt import data, ddt, unpack
from django.utils.timezone import utc
""" from veda_heal import VedaHeal
sys.path.append(os.path.dirname(os.path.dirname( from VEDA_OS01.models import Course, Video
os.path.abspath(__file__)
)))
from control.veda_heal import VedaHeal
from VEDA_OS01.models import URL, Video, Encode
class TestEncode(unittest.TestCase): @ddt
class HealTests(TestCase):
"""
Tests HEAL process
"""
def setUp(self): def setUp(self):
U = URL( self.heal_instance = VedaHeal()
videoID=Video.objects.filter(edx_id='XXXXXXXX2014-V00TES1').latest(), self.auth_yaml = os.path.join(
encode_profile=Encode.objects.get(product_spec='mobile_low'), os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
encode_url='THIS Is A TEST') 'instance_config.yaml'
U.save() )
self.encode_list = set()
with open(self.auth_yaml, 'r') as stream:
for key, entry in yaml.load(stream)['encode_dict'].items():
for e in entry:
self.encode_list.add(e)
def test_encode_url(self): @data(
H = VedaHeal() {
H.discovery() 'edx_id': '1',
'video_trans_status': 'Corrupt File',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
},
{
'edx_id': '1',
'video_trans_status': 'Review Reject',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
},
{
'edx_id': '1',
'video_trans_status': 'Review Hold',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
},
{
'edx_id': '1',
'video_trans_status': 'Complete',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': False,
},
{
'edx_id': '2',
'video_trans_status': 'Ingest',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
},
{
'edx_id': '1',
'video_trans_status': 'Corrupt File',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
},
)
@unpack
def test_determine_fault(self, edx_id, video_trans_status, video_trans_start, video_active):
"""
Tests that determine_fault works in various video states.
"""
video_instance = Video(
edx_id=edx_id,
video_trans_status=video_trans_status,
video_trans_start=video_trans_start,
video_active=video_active,
inst_class=Course()
)
encode_list = self.heal_instance.determine_fault(video_instance)
if video_instance.edx_id == '1':
self.assertEqual(encode_list, [])
elif video_instance.edx_id == '2':
for e in encode_list:
self.assertTrue(e in self.encode_list)
def main(): @data(
unittest.main() {
'uncompleted_encodes': [],
'expected_encodes': ['test_obj'],
'video_object': {
'edx_id': '1',
'video_trans_status': 'Complete',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
}
},
{
'uncompleted_encodes': ['test_obj'],
'expected_encodes': ['test_obj'],
'video_object': {
'edx_id': '2',
'video_trans_status': 'Ingest',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
}
}
)
@unpack
def test_differentiate_encodes(self, uncompleted_encodes, expected_encodes, video_object):
"""
Tests that differentiate_encodes list comparison works as expected. This doesn't test video states,
just the list comparison function.
"""
video_instance = Video(
edx_id=video_object['edx_id'],
video_trans_status=video_object['video_trans_status'],
video_trans_start=video_object['video_trans_start'],
video_active=video_object['video_active'],
inst_class=Course()
)
encode_list = self.heal_instance.differentiate_encodes(
uncompleted_encodes,
expected_encodes,
video_instance
)
if video_instance.edx_id == '1':
self.assertEqual(encode_list, [])
elif video_instance.edx_id == '2':
self.assertEqual(encode_list, ['test_obj'])
@data(
{
'uncompleted_encodes': ['test_encode', 'test_encode'],
'expected_encodes': ['test_encode', 'test_encode'],
'video_object': {
'edx_id': '1',
'video_trans_status': 'Complete',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
}
},
{
'uncompleted_encodes': ['test_encode', 'test_encode', 'hls'],
'expected_encodes': ['test_encode', 'test_encode', 'hls'],
'video_object': {
'edx_id': '2',
'video_trans_status': 'Complete',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc),
'video_active': True,
}
},
{
'uncompleted_encodes': ['test_encode', 'test_encode', 'hls'],
'expected_encodes': ['test_encode', 'test_encode', 'hls'],
'video_object': {
'edx_id': '3',
'video_trans_status': 'Ingest',
'video_trans_start': datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(days=10),
'video_active': True,
}
}
)
@unpack
def test_determine_longterm_corrupt(self, uncompleted_encodes, expected_encodes, video_object):
video_instance = Video(
edx_id=video_object['edx_id'],
video_trans_status=video_object['video_trans_status'],
video_trans_start=video_object['video_trans_start'],
video_active=video_object['video_active'],
inst_class=Course()
)
longterm_corrupt = self.heal_instance.determine_longterm_corrupt(
uncompleted_encodes,
expected_encodes,
video_instance
)
if video_instance.edx_id == '1':
self.assertEqual(longterm_corrupt, False)
elif video_instance.edx_id == '2':
self.assertEqual(longterm_corrupt, False)
elif video_instance.edx_id == '3':
self.assertEqual(longterm_corrupt, True)
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) unittest.main()
import ast
import os import os
import sys import sys
import unittest import unittest
import requests import requests
import ast
import yaml import yaml
from veda_file_ingest import VideoProto
from veda_val import VALAPICall
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
""" """
This is an API connection test This is an API connection test
...@@ -14,8 +18,6 @@ set to pass if instance_config.yaml is missing ...@@ -14,8 +18,6 @@ set to pass if instance_config.yaml is missing
""" """
sys.path.append(os.path.dirname(os.path.dirname(__file__))) sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from veda_val import VALAPICall
from veda_file_ingest import VideoProto
class TestVALAPI(unittest.TestCase): class TestVALAPI(unittest.TestCase):
...@@ -33,7 +35,7 @@ class TestVALAPI(unittest.TestCase): ...@@ -33,7 +35,7 @@ class TestVALAPI(unittest.TestCase):
self.auth_yaml = os.path.join( self.auth_yaml = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'veda_auth.yaml' 'instance_config.yaml'
) )
def test_val_setup(self): def test_val_setup(self):
......
---
# Authentication keys
# VEDA AWS Account
veda_s3_upload_bucket:
veda_s3_hotstore_bucket:
veda_deliverable_bucket:
veda_access_key_id:
veda_secret_access_key:
multi_upload_barrier: 2000000000
# edX AWS Account
edx_aws_user:
edx_access_key_id:
edx_secret_access_key:
edx_s3_ingest_bucket:
edx_s3_endpoint_bucket:
edx_cloudfront_prefix:
# email vars
veda_noreply_email:
admin_email:
# VAL user creds
val_api_url:
val_client_id:
val_password:
val_secret_key:
val_token_url:
val_username:
## Celery Info
celery_app_name:
# can do multiple queues like so: foo,bar,baz
main_celery_queue: encode_worker
largefile_queue_barrier: 1000000000
largefile_celery_queue: large_encode_worker
celery_stat_queue: transcode_stat
celery_threads: 1
rabbitmq_broker:
rabbitmq_pass:
rabbitmq_user:
# Shotgun Variables
sg_server_path:
sg_script_name:
sg_script_key:
# Endpoints
threeplay_ftphost:
xuetang_api_url:
xuetang_api_shared_secret:
##---
# This is a list of encodes and their respective course
# boolean matches
encode_dict:
review_proc:
- review
mobile_override:
- override
s3_proc:
- mobile_high
- mobile_low
- audio_mp3
- desktop_webm
- desktop_mp4
- hls
yt_proc:
- youtube
##---
# This is a list of encode profiles and their val profile matches
# boolean matches
val_profile_dict:
mobile_low:
- mobile_low
desktop_mp4:
- desktop_mp4
override:
- desktop_mp4
- mobile_low
- mobile_high
mobile_high:
- mobile_high
audio_mp3:
- audio_mp3
desktop_webm:
- desktop_webm
youtube:
- youtube
review:
hls:
- hls
#--
# Heal settings
heal_start: 1
heal_end: 144
...
import os import os
import sys import sys
import uuid
import django import django
import yaml import yaml
import uuid
from control_env import *
from dependencies.shotgun_api3 import Shotgun
""" """
...@@ -12,11 +16,9 @@ Get a list of needed encodes from VEDA ...@@ -12,11 +16,9 @@ Get a list of needed encodes from VEDA
* Protected against extant URLs * * Protected against extant URLs *
""" """
from control_env import *
from dependencies.shotgun_api3 import Shotgun
class VedaEncode(): class VedaEncode(object):
""" """
This should create a scrubbed list of encode profiles for processing This should create a scrubbed list of encode profiles for processing
...@@ -55,27 +57,28 @@ class VedaEncode(): ...@@ -55,27 +57,28 @@ class VedaEncode():
return None return None
def determine_encodes(self): def determine_encodes(self):
"""
Determine which encodes are needed via course-based workflow for video.
"""
self.match_profiles() self.match_profiles()
for e in self.encode_list.copy(): for encode in self.encode_list.copy():
enc_query = Encode.objects.filter( try:
product_spec=e encode_queryset = Encode.objects.filter(product_spec=encode)
) if encode_queryset.exists():
if len(enc_query) == 0: if not encode_queryset.first.profile_active:
self.encode_list.remove(e) self.encode_list.remove(encode)
else: else:
if enc_query[0].profile_active is False: self.encode_list.remove(encode)
self.encode_list.remove(e) except AttributeError:
continue
self.query_urls() self.query_urls()
if len(self.encode_list) == 0:
return None
send_list = [] if len(self.encode_list) <= 0:
for s in self.encode_list.copy(): return
send_list.append(s)
return send_list return self.encode_list.copy()
def match_profiles(self): def match_profiles(self):
if self.course_object.review_proc is True and self.veda_id is not None: if self.course_object.review_proc is True and self.veda_id is not None:
...@@ -110,14 +113,20 @@ class VedaEncode(): ...@@ -110,14 +113,20 @@ class VedaEncode():
return None return None
for l in self.encode_list.copy(): for l in self.encode_list.copy():
url_query = URL.objects.filter( try:
videoID=Video.objects.filter(edx_id=self.veda_id).latest(), url_query = URL.objects.filter(
encode_profile=Encode.objects.get(product_spec=l.strip()) videoID=Video.objects.filter(edx_id=self.veda_id).latest(),
) encode_profile=Encode.objects.get(product_spec=l.strip())
if len(url_query) > 0: )
self.encode_list.remove(l) if len(url_query) > 0:
self.encode_list.remove(l)
except AttributeError:
continue
def check_review_approved(self): def check_review_approved(self):
if self.sg_script_key is None:
return True
""" """
** Mediateam only ** ** Mediateam only **
Check in with SG to see if this video Check in with SG to see if this video
......
import os
import sys
import datetime
from datetime import timedelta
import yaml
import uuid
""" """
Heal Process Heal Process
...@@ -15,28 +6,25 @@ Roll through videos, check for completion ...@@ -15,28 +6,25 @@ Roll through videos, check for completion
- fix data (if wrong), including on VAL - fix data (if wrong), including on VAL
- reschedule self - reschedule self
# Heuristic """
# Encode import datetime
# Activation import uuid
# Logistics from datetime import timedelta
import yaml
from django.utils.timezone import utc
""" import celeryapp
from control_env import * from control_env import *
from veda_encode import VedaEncode from veda_encode import VedaEncode
from veda_val import VALAPICall from veda_val import VALAPICall
import celeryapp
time_safetygap = datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(days=1) time_safetygap = datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(days=1)
# TODO: make a checklist of these if e != 'mobile_high' and e != 'audio_mp3' and e != 'review' and e != 'hls':
class VedaHeal(object):
class VedaHeal():
""" """
Maintenance process for finding and repairing failed encodes
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
...@@ -83,9 +71,7 @@ class VedaHeal(): ...@@ -83,9 +71,7 @@ class VedaHeal():
def send_encodes(self): def send_encodes(self):
for v in self.video_query: for v in self.video_query:
encode_list = self.determine_fault(video_object=v) encode_list = self.determine_fault(video_object=v)
""" # Using the 'Video Proto' Model
Using the 'Video Proto' Model
"""
if self.val_status is not None: if self.val_status is not None:
VAC = VALAPICall( VAC = VALAPICall(
video_proto=None, video_proto=None,
...@@ -115,7 +101,7 @@ class VedaHeal(): ...@@ -115,7 +101,7 @@ class VedaHeal():
def determine_fault(self, video_object): def determine_fault(self, video_object):
""" """
Is there anything to do with this? Determine expected and completed encodes
""" """
if self.freezing_bug is True: if self.freezing_bug is True:
if video_object.video_trans_status == 'Corrupt File': if video_object.video_trans_status == 'Corrupt File':
...@@ -133,32 +119,36 @@ class VedaHeal(): ...@@ -133,32 +119,36 @@ class VedaHeal():
""" """
Finally, determine encodes Finally, determine encodes
""" """
E = VedaEncode( uncompleted_encodes = VedaEncode(
course_object=video_object.inst_class, course_object=video_object.inst_class,
veda_id=video_object.edx_id veda_id=video_object.edx_id
) ).determine_encodes()
expected_encodes = VedaEncode(
encode_list = E.determine_encodes() course_object=video_object.inst_class,
).determine_encodes()
try: try:
encode_list.remove('review') uncompleted_encodes.remove('review')
except: except ValueError:
pass pass
# list comparison
return self.differentiate_encodes(uncompleted_encodes, expected_encodes, video_object)
def differentiate_encodes(self, uncompleted_encodes, expected_encodes, video_object):
""" """
Status Cleaning Update video status if complete
""" """
# Video Status Updating
check_list = [] check_list = []
if encode_list is not None: if uncompleted_encodes is not None:
for e in encode_list: for e in uncompleted_encodes:
if e != 'mobile_high' and e != 'audio_mp3' and e != 'review' and e != 'hls': # These encodes don't count towards 'file_complete'
if e != 'mobile_high' and e != 'audio_mp3' and e != 'review':
check_list.append(e) check_list.append(e)
if check_list is None or len(check_list) == 0: if check_list is None or len(check_list) == 0:
self.val_status = 'file_complete' self.val_status = 'file_complete'
# File is complete!
""" # Check for data parity, and call done
File is complete!
Check for data parity, and call done
"""
if video_object.video_trans_status != 'Complete': if video_object.video_trans_status != 'Complete':
Video.objects.filter( Video.objects.filter(
edx_id=video_object.edx_id edx_id=video_object.edx_id
...@@ -166,51 +156,58 @@ class VedaHeal(): ...@@ -166,51 +156,58 @@ class VedaHeal():
video_trans_status='Complete', video_trans_status='Complete',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc) video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
) )
if encode_list is None or len(encode_list) == 0: if not uncompleted_encodes or len(uncompleted_encodes) == 0:
return [] return []
if self.freezing_bug:
if self.determine_longterm_corrupt(uncompleted_encodes, expected_encodes, video_object):
return []
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return uncompleted_encodes
def determine_longterm_corrupt(self, uncompleted_encodes, expected_encodes, video_object):
""" """
get baseline // if there are == encodes and baseline, get baseline // if there are == encodes and baseline,
mark file corrupt -- just run the query again with mark file corrupt -- just run the query again with
no veda_id no veda_id
""" """
"""
This overrides
"""
if self.freezing_bug is False:
if self.val_status != 'file_complete':
self.val_status = 'transcode_queue'
return encode_list
E2 = VedaEncode(
course_object=video_object.inst_class,
)
E2.determine_encodes()
E2.encode_list.remove('hls')
if len(E2.encode_list) == len(encode_list) and len(encode_list) > 1:
"""
Mark File Corrupt, accounting for migrated URLs
"""
url_test = URL.objects.filter(
videoID=Video.objects.filter(
edx_id=video_object.edx_id
).latest()
)
if video_object.video_trans_start < datetime.datetime.utcnow().replace(tzinfo=utc) - \
timedelta(hours=self.retry_barrier_hours):
if len(url_test) == 0: try:
Video.objects.filter( expected_encodes.remove('hls')
except ValueError:
pass
# Mark File Corrupt, accounting for migrated URLs
if len(expected_encodes) == len(uncompleted_encodes) - 1 and len(expected_encodes) > 1:
try:
url_test = URL.objects.filter(
videoID=Video.objects.filter(
edx_id=video_object.edx_id edx_id=video_object.edx_id
).update( ).latest()
video_trans_status='Corrupt File', ).exclude(
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc) encode_profile=Encode.objects.get(
product_spec='hls'
) )
)
except AttributeError:
url_test = []
retry_barrier = datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(hours=self.retry_barrier_hours)
if video_object.video_trans_start < retry_barrier:
if len(url_test) < 1:
try:
Video.objects.filter(
edx_id=video_object.edx_id
).update(
video_trans_status='Corrupt File',
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc)
)
except AttributeError:
pass
self.val_status = 'file_corrupt' self.val_status = 'file_corrupt'
return [] return True
if self.val_status != 'file_complete': return False
self.val_status = 'transcode_queue'
return encode_list
def purge(self): def purge(self):
""" """
...@@ -233,6 +230,5 @@ def main(): ...@@ -233,6 +230,5 @@ def main():
VH = VedaHeal() VH = VedaHeal()
VH.discovery() VH.discovery()
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main()) sys.exit(main())
...@@ -21,6 +21,10 @@ DATABASES: ...@@ -21,6 +21,10 @@ DATABASES:
django_secret_key: "" django_secret_key: ""
# Django DEBUG global
# (set to false in prod)
debug: false
# --- # ---
# AWS Buckets, Prefixes # AWS Buckets, Prefixes
# --- # ---
...@@ -40,6 +44,10 @@ veda_deliverable_bucket: ...@@ -40,6 +44,10 @@ veda_deliverable_bucket:
# Settings # Settings
multi_upload_barrier: 2000000000 multi_upload_barrier: 2000000000
# Ingest Secret
# TODO: Elminate access key after AWS Support ticket 08/20/17 regarding cross-account IAM role access.
veda_secret_access_key:
veda_access_key_id:
# --- # ---
# email vars # email vars
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment