Commit 6dafcbf3 by Gregory Martin

Add logging, Attempt Celery Bugfix

parent 47c2d63d
......@@ -107,7 +107,7 @@ def main():
HC = HealCli()
HC.schedule()
return None
return
if veda_id is not None:
VH = VedaHeal(
......@@ -116,7 +116,7 @@ def main():
)
)
VH.send_encodes()
return None
return
if course_id is not None:
VH = VedaHeal(
......@@ -128,11 +128,10 @@ def main():
)
)
VH.send_encodes()
return None
return
# TODO: Data backup
# TODO: API key purge
if schedule is True:
HC = HealCli()
HC.schedule()
......
......@@ -22,26 +22,28 @@ CEL_BROKER = 'amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_broker}:5672//'.f
rabbitmq_broker=auth_dict['rabbitmq_broker']
)
CEL_BACKEND = 'amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_broker}:5672//'.format(
rabbitmq_user=auth_dict['rabbitmq_user'],
rabbitmq_pass=auth_dict['rabbitmq_pass'],
rabbitmq_broker=auth_dict['rabbitmq_broker']
)
app = Celery(auth_dict['celery_app_name'], broker=CEL_BROKER, backend=CEL_BACKEND, include=[])
app = Celery(auth_dict['celery_app_name'], broker=CEL_BROKER, include=['celeryapp'])
app.conf.update(
BROKER_CONNECTION_TIMEOUT=60,
CELERY_IGNORE_RESULT=True,
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_PREFETCH_MULTIPLIER=1,
CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml']
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_PUBLISH_RETRY=True,
CELERY_TASK_PUBLISH_RETRY_POLICY={
"max_retries": 3,
"interval_start": 0,
"interval_step": 1,
"interval_max": 5
}
)
@app.task(name='worker_encode')
def worker_task_fire(veda_id, encode_profile, jobid):
pass
print '[ENCODE] Misfire : {id} : {encode}'.format(id=veda_id, encode=encode_profile)
return 1
@app.task(name='supervisor_deliver')
......
......@@ -61,3 +61,9 @@ NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m'
"""
Heal process start and end query times for 'video_trans_start' in hours
"""
HEAL_START = 6
HEAL_END = 144
......@@ -12,6 +12,7 @@ import responses
from django.utils.timezone import utc
from mock import PropertyMock, patch
from control_env import HEAL_START
from control.veda_heal import VedaHeal
from VEDA_OS01.models import URL, Course, Destination, Encode, Video, TranscriptStatus
from VEDA_OS01.utils import ValTranscriptStatus
......@@ -50,7 +51,7 @@ class HealTests(TestCase):
studio_id=self.video_id,
edx_id='XXXXXXXX2014-V00TES1',
video_trans_start=datetime.datetime.utcnow().replace(tzinfo=utc) - timedelta(
hours=CONFIG_DATA['heal_start']
hours=HEAL_START
),
video_trans_end=datetime.datetime.utcnow().replace(tzinfo=utc),
)
......
......@@ -143,7 +143,8 @@ class FileDiscovery(object):
s3_filename=s3_filename,
client_title=client_title,
file_extension='',
platform_course_url=course_id
platform_course_url=course_id,
video_orig_duration=0.0
)
# Update val status to 'invalid_token'
VALAPICall(video_proto=video_proto, val_status=u'invalid_token').call()
......@@ -196,7 +197,7 @@ class FileDiscovery(object):
return course
def download_video_to_working_directory(self, key, file_name, file_extension):
def download_video_to_working_directory(self, key, file_name):
"""
Downloads the video to working directory from S3 and
returns whether its successfully downloaded or not.
......@@ -204,17 +205,13 @@ class FileDiscovery(object):
Arguments:
key: An S3 key whose content is going to be downloaded
file_name: Name of the file when its in working directory
file_extension: extension of this file.
"""
if len(file_extension) == 3:
file_name = u'{file_name}.{ext}'.format(file_name=file_name, ext=file_extension)
file_ingested = False
try:
key.get_contents_to_filename(os.path.join(self.node_work_directory, file_name))
file_ingested = True
except S3DataError:
file_ingested = False
LOGGER.exception('[File Ingest] Error downloading the file into node working directory.')
return file_ingested
def parse_transcript_preferences(self, course_id, transcript_preferences):
......@@ -287,7 +284,7 @@ class FileDiscovery(object):
if course:
# Download video file from S3 into node working directory.
file_extension = os.path.splitext(client_title)[1][1:]
file_downloaded = self.download_video_to_working_directory(video_s3_key, filename, file_extension)
file_downloaded = self.download_video_to_working_directory(video_s3_key, filename)
if not file_downloaded:
# S3 Bucket ingest failed, move the file rejected directory.
self.move_video(video_s3_key, destination_dir=self.auth_dict['edx_s3_rejected_prefix'])
......
......@@ -109,8 +109,9 @@ class VedaIngest(object):
# TODO: Break heal method listed here out into helper util
encode_instance = VedaHeal(
video_query=Video.objects.filter(
edx_id=self.video_proto.veda_id
)
edx_id=self.video_proto.veda_id.strip()
),
val_status='transcode_queue'
)
encode_instance.send_encodes()
......@@ -156,29 +157,27 @@ class VedaIngest(object):
def database_record(self):
"""
Start DB Inserts, Get Information
Start DB Inserts, Get Basic File name information
"""
if self.video_proto.s3_filename is not None:
if self.video_proto.s3_filename:
self.full_filename = '/'.join((
self.node_work_directory,
self.video_proto.s3_filename
))
if self.video_proto.abvid_serial is not None:
if self.video_proto.abvid_serial:
self.full_filename = '/'.join((
self.node_work_directory,
self.video_proto.client_title
))
if len(self.video_proto.file_extension) > 2:
self.full_filename += "." + self.video_proto.file_extension
if self.full_filename is None:
if not self.full_filename:
self.full_filename = '/'.join((
self.node_work_directory,
self.video_proto.client_title
))
if len(self.video_proto.file_extension) == 3:
self.full_filename += "." + self.video_proto.file_extension
if not os.path.exists(self.full_filename):
LOGGER.exception('[VIDEO_INGEST] File Not Found %s', self.video_proto.veda_id)
return
......@@ -192,9 +191,17 @@ class VedaIngest(object):
if self.video_proto.valid is True:
self._gather_metadata()
"""
DB Inserts
"""
# DB Inserts
if self.video_proto.s3_filename:
video = Video.objects.filter(studio_id=self.video_proto.s3_filename).first()
if video:
# Protect against crash/duplicate inserts, won't insert object
self.video_proto.veda_id = video[0].edx_id
self.video_proto.video_orig_duration = video[0].video_orig_duration
self.complete = True
return
v1 = Video(inst_class=self.course_object)
"""
Generate veda_id / update course record
......@@ -301,7 +308,7 @@ class VedaIngest(object):
raise
def val_insert(self):
if self.video_proto.abvid_serial is not None:
if self.video_proto.abvid_serial:
return None
if self.video_proto.valid is False:
......@@ -331,38 +338,24 @@ class VedaIngest(object):
def rename(self):
"""
Rename to VEDA ID,
Backup in Hotstore
"""
if self.video_proto.veda_id is None:
self.video_proto.valid = False
return None
return
if self.video_proto.file_extension is None:
os.rename(
self.full_filename, os.path.join(
self.node_work_directory,
self.video_proto.veda_id
)
)
self.full_filename = os.path.join(
veda_filename = self.video_proto.veda_id
if self.video_proto.file_extension:
veda_filename += '.{ext}'.format(ext=self.video_proto.file_extension)
os.rename(
self.full_filename, os.path.join(
self.node_work_directory,
self.video_proto.veda_id
)
else:
os.rename(
self.full_filename,
os.path.join(
self.node_work_directory,
self.video_proto.veda_id + '.' + self.video_proto.file_extension
)
veda_filename
)
self.full_filename = os.path.join(
self.node_work_directory,
self.video_proto.veda_id + '.' + self.video_proto.file_extension
)
)
self.full_filename = os.path.join(self.node_work_directory, veda_filename)
os.system('chmod ugo+rwx ' + self.full_filename)
return
def store(self):
"""
......
......@@ -20,7 +20,7 @@ from VEDA_OS01.models import Encode, URL, Video
from VEDA_OS01.utils import VAL_TRANSCRIPT_STATUS_MAP
import celeryapp
from control_env import WORK_DIRECTORY
from control_env import WORK_DIRECTORY, HEAL_START, HEAL_END
from veda_encode import VedaEncode
from veda_val import VALAPICall
from VEDA.utils import get_config
......@@ -49,10 +49,10 @@ class VedaHeal(object):
def discovery(self):
self.video_query = Video.objects.filter(
video_trans_start__lt=self.current_time - timedelta(
hours=self.auth_dict['heal_start']
hours=HEAL_START
),
video_trans_start__gt=self.current_time - timedelta(
hours=self.auth_dict['heal_end']
hours=HEAL_END
)
)
......@@ -62,36 +62,49 @@ class VedaHeal(object):
for v in self.video_query:
encode_list = self.determine_fault(video_object=v)
# Using the 'Video Proto' Model
if self.val_status is not None:
# Update to VAL is also happening for those videos which are already marked complete,
# All these retries are for the data-parity between VAL and VEDA, as calls to VAL api are
# unreliable and times out. For a completed Video, VEDA heal will keep doing this unless
# the Video is old enough and escapes from the time-span that HEAL is picking up on.
# cc Greg Martin
VAC = VALAPICall(
video_proto=None,
video_object=v,
val_status=self.val_status,
)
VAC.call()
self.val_status = None
# Update to VAL is also happening for those videos which are already marked complete,
# All these retries are for the data-parity between VAL and VEDA, as calls to VAL api are
# unreliable and times out. For a completed Video, VEDA heal will keep doing this unless
# the Video is old enough and escapes from the time-span that HEAL is picking up on.
# cc Greg Martin
if len(encode_list) > 0:
self.val_status = 'transcode_queue'
api_call = VALAPICall(
video_proto=None,
video_object=v,
val_status=self.val_status,
)
api_call.call()
# Enqueue
if self.auth_dict['rabbitmq_broker'] is not None:
for e in encode_list:
veda_id = v.edx_id
encode_profile = e
jobid = uuid.uuid1().hex[0:10]
celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, jobid),
queue=self.auth_dict['celery_worker_queue']
)
if not self.auth_dict['rabbitmq_broker']:
return
for encode in encode_list:
veda_id = v.edx_id
encode_profile = encode
job_id = uuid.uuid1().hex[0:10]
task_result = celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, job_id),
queue=self.auth_dict['celery_worker_queue'].strip(),
connect_timeout=3
)
# Misqueued Task
if task_result == 1:
LOGGER.error('[ENQUEUE ERROR] : {id}'.format(id=v.edx_id))
continue
# Update Status
LOGGER.info('[ENQUEUE] : {id}'.format(id=v.edx_id))
Video.objects.filter(edx_id=v.edx_id).update(
video_trans_status='Queue'
)
def determine_fault(self, video_object):
"""
Determine expected and completed encodes
"""
LOGGER.info('[HEAL] : {id}'.format(id=video_object.edx_id))
LOGGER.info('[ENQUEUE] : {id}'.format(id=video_object.edx_id))
if self.freezing_bug is True:
if video_object.video_trans_status == 'Corrupt File':
self.val_status = 'file_corrupt'
......@@ -122,7 +135,7 @@ class VedaHeal(object):
pass
requeued_encodes = self.differentiate_encodes(uncompleted_encodes, expected_encodes, video_object)
LOGGER.info('[HEAL] : {id} : {status} : {encodes}'.format(
LOGGER.info('[ENQUEUE] : {id} : {status} : {encodes}'.format(
id=video_object.edx_id,
status=self.val_status,
encodes=requeued_encodes
......@@ -223,7 +236,6 @@ class VedaHeal(object):
def purge(self):
"""
Purge Work Directory
"""
for file in os.listdir(WORK_DIRECTORY):
full_filepath = os.path.join(WORK_DIRECTORY, file)
......
......@@ -174,7 +174,10 @@ class VALAPICall():
self.video_object.video_orig_duration = 0
self.video_object.duration = 0.0
if not isinstance(self.video_proto.duration, float):
except AttributeError:
pass
if not isinstance(self.video_proto.duration, float) and self.val_status != 'invalid_token':
self.video_proto.duration = Output._seconds_from_string(
duration=self.video_object.video_orig_duration
)
......@@ -369,15 +372,13 @@ class VALAPICall():
headers=self.headers,
timeout=self.auth_dict['global_timeout']
)
LOGGER.info('[VAL] : {id} : {status} : {code}'.format(
id=self.video_proto.val_id,
status=self.val_status,
code=r4.status_code)
)
if r4.status_code > 299:
ErrorObject.print_error(
message='%s\n %s\n %s\n' % (
'R4 : VAL POST/PUT Fail: VAL',
'Check VAL Config',
r4.status_code
)
)
LOGGER.error('[VAL] : POST/PUT Fail : Check Config : {status}'.format(status=r4.status_code))
def update_val_transcript(self, video_id, lang_code, name, transcript_format, provider):
"""
......
......@@ -13,7 +13,7 @@ pysftp
boto
pyyaml
requests==2.18.1
celery==3.1.18
celery==4.1.0
pysrt==1.1.1
MySQL-python==1.2.5
gunicorn==0.17.4
......
......@@ -62,8 +62,4 @@ val_profile_dict:
hls:
- hls
# Heal settings
heal_start: 1
heal_end: 144
global_timeout: 60
......@@ -81,10 +81,6 @@ val_profile_dict:
hls:
- hls
# Heal settings
heal_start: 2
heal_end: 50
global_timeout: 40
instance_prefix: '127.0.0.1'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment