Commit bde0453c by Gregory Martin

Update Celery Queuenames, Bugfixes

parent bddc6252
...@@ -66,7 +66,7 @@ class DeliverCli: ...@@ -66,7 +66,7 @@ class DeliverCli:
'worker', 'worker',
'--loglevel=info', '--loglevel=info',
'--concurrency=' + str(auth_dict['celery_threads']), '--concurrency=' + str(auth_dict['celery_threads']),
'-Q ' + auth_dict['celery_stat_queue'], '-Q ' + auth_dict['celery_deliver_queue'],
'-n deliver.%h' '-n deliver.%h'
)) ))
) )
......
...@@ -163,22 +163,14 @@ class VedaIngest: ...@@ -163,22 +163,14 @@ class VedaIngest:
if len(self.encode_list) == 0: if len(self.encode_list) == 0:
return None return None
""" # Enqueue
send job to queue
"""
if self.video_proto.filesize > self.auth_dict['largefile_queue_barrier']:
cel_queue = self.auth_dict['largefile_celery_queue']
else:
cel_queue = self.auth_dict['main_celery_queue']
for e in self.encode_list: for e in self.encode_list:
# print e
veda_id = self.video_proto.veda_id veda_id = self.video_proto.veda_id
encode_profile = e encode_profile = e
jobid = uuid.uuid1().hex[0:10] jobid = uuid.uuid1().hex[0:10]
celeryapp.worker_task_fire.apply_async( celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, jobid), (veda_id, encode_profile, jobid),
queue=cel_queue queue=self.auth_dict['celery_worker_queue']
) )
""" """
......
...@@ -81,23 +81,15 @@ class VedaHeal(object): ...@@ -81,23 +81,15 @@ class VedaHeal(object):
VAC.call() VAC.call()
self.val_status = None self.val_status = None
if len(encode_list) > 0: # Enqueue
""" for e in encode_list:
send job to queue veda_id = v.edx_id
""" encode_profile = e
if v.video_orig_filesize > self.auth_dict['largefile_queue_barrier']: jobid = uuid.uuid1().hex[0:10]
cel_queue = self.auth_dict['largefile_celery_queue'] celeryapp.worker_task_fire.apply_async(
else: (veda_id, encode_profile, jobid),
cel_queue = self.auth_dict['main_celery_queue'] queue=self.auth_dict['celery_worker_queue']
)
for e in encode_list:
veda_id = v.edx_id
encode_profile = e
jobid = uuid.uuid1().hex[0:10]
celeryapp.worker_task_fire.apply_async(
(veda_id, encode_profile, jobid),
queue=cel_queue
)
def determine_fault(self, video_object): def determine_fault(self, video_object):
""" """
......
...@@ -39,7 +39,10 @@ with open(auth_yaml, 'r') as stream: ...@@ -39,7 +39,10 @@ with open(auth_yaml, 'r') as stream:
def report_status(status, abvid_serial, youtube_id): def report_status(status, abvid_serial, youtube_id):
v1 = VedaUpload.objects.filter(video_serial=abvid_serial).latest() try:
v1 = VedaUpload.objects.filter(video_serial=abvid_serial).latest()
except ObjectDoesNotExist:
return
if len(youtube_id) > 0: if len(youtube_id) > 0:
excuse = '' excuse = ''
......
...@@ -82,13 +82,11 @@ val_username: ...@@ -82,13 +82,11 @@ val_username:
# --- # ---
celery_app_name: veda_production celery_app_name: veda_production
# can do multiple queues like so: foo,bar,baz # can do multiple queues like so: foo,bar,baz
main_celery_queue: encode_worker celery_worker_queue: encode_worker
celery_receiver_queue: encode_worker celery_deliver_queue: deliver_worker
largefile_celery_queue: large_encode_worker
celery_stat_queue: transcode_stat
largefile_queue_barrier: 1000000000
celery_threads: 1 celery_threads: 1
rabbitmq_broker: rabbitmq_broker:
rabbitmq_pass: rabbitmq_pass:
rabbitmq_user: rabbitmq_user:
......
...@@ -144,7 +144,7 @@ def domxml_parser(file): ...@@ -144,7 +144,7 @@ def domxml_parser(file):
""" """
if 'status-' not in file: if 'status-' not in file:
return None return
upload_data = { upload_data = {
'datetime': None, 'datetime': None,
...@@ -157,7 +157,9 @@ def domxml_parser(file): ...@@ -157,7 +157,9 @@ def domxml_parser(file):
try: try:
tree = ET.parse(os.path.join(workdir, file)) tree = ET.parse(os.path.join(workdir, file))
except ET.ParseError: except ET.ParseError:
return None return
except IOError:
return
root = tree.getroot() root = tree.getroot()
for child in root: for child in root:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment