Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-video-pipeline
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-video-pipeline
Commits
1af219aa
Unverified
Commit
1af219aa
authored
Mar 15, 2018
by
Gregory Martin
Committed by
GitHub
Mar 15, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #92 from edx/yro/EDUCATOR-2416
Yro/EDUCATOR 2416
parents
47c2d63d
6dafcbf3
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
110 additions
and
107 deletions
+110
-107
bin/heal
+3
-4
control/celeryapp.py
+11
-9
control/control_env.py
+6
-0
control/tests/test_heal.py
+2
-1
control/veda_auth.yaml
+0
-0
control/veda_file_discovery.py
+5
-8
control/veda_file_ingest.py
+32
-39
control/veda_heal.py
+40
-28
control/veda_val.py
+10
-9
requirements.txt
+1
-1
static_config.yaml
+0
-4
test_config.yaml
+0
-4
No files found.
bin/heal
View file @
1af219aa
...
...
@@ -107,7 +107,7 @@ def main():
HC
=
HealCli
()
HC
.
schedule
()
return
None
return
if
veda_id
is
not
None
:
VH
=
VedaHeal
(
...
...
@@ -116,7 +116,7 @@ def main():
)
)
VH
.
send_encodes
()
return
None
return
if
course_id
is
not
None
:
VH
=
VedaHeal
(
...
...
@@ -128,11 +128,10 @@ def main():
)
)
VH
.
send_encodes
()
return
None
return
# TODO: Data backup
# TODO: API key purge
if
schedule
is
True
:
HC
=
HealCli
()
HC
.
schedule
()
...
...
control/celeryapp.py
View file @
1af219aa
...
...
@@ -22,26 +22,28 @@ CEL_BROKER = 'amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_broker}:5672//'.f
rabbitmq_broker
=
auth_dict
[
'rabbitmq_broker'
]
)
CEL_BACKEND
=
'amqp://{rabbitmq_user}:{rabbitmq_pass}@{rabbitmq_broker}:5672//'
.
format
(
rabbitmq_user
=
auth_dict
[
'rabbitmq_user'
],
rabbitmq_pass
=
auth_dict
[
'rabbitmq_pass'
],
rabbitmq_broker
=
auth_dict
[
'rabbitmq_broker'
]
)
app
=
Celery
(
auth_dict
[
'celery_app_name'
],
broker
=
CEL_BROKER
,
backend
=
CEL_BACKEND
,
include
=
[])
app
=
Celery
(
auth_dict
[
'celery_app_name'
],
broker
=
CEL_BROKER
,
include
=
[
'celeryapp'
])
app
.
conf
.
update
(
BROKER_CONNECTION_TIMEOUT
=
60
,
CELERY_IGNORE_RESULT
=
True
,
CELERY_TASK_RESULT_EXPIRES
=
10
,
CELERYD_PREFETCH_MULTIPLIER
=
1
,
CELERY_ACCEPT_CONTENT
=
[
'pickle'
,
'json'
,
'msgpack'
,
'yaml'
]
CELERY_ACCEPT_CONTENT
=
[
'json'
],
CELERY_TASK_PUBLISH_RETRY
=
True
,
CELERY_TASK_PUBLISH_RETRY_POLICY
=
{
"max_retries"
:
3
,
"interval_start"
:
0
,
"interval_step"
:
1
,
"interval_max"
:
5
}
)
@app.task
(
name
=
'worker_encode'
)
def
worker_task_fire
(
veda_id
,
encode_profile
,
jobid
):
pass
print
'[ENCODE] Misfire : {id} : {encode}'
.
format
(
id
=
veda_id
,
encode
=
encode_profile
)
return
1
@app.task
(
name
=
'supervisor_deliver'
)
...
...
control/control_env.py
View file @
1af219aa
...
...
@@ -61,3 +61,9 @@ NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN
=
'
\033
[92m'
NODE_COLORS_RED
=
'
\033
[91m'
NODE_COLORS_END
=
'
\033
[0m'
"""
Heal process start and end query times for 'video_trans_start' in hours
"""
HEAL_START
=
6
HEAL_END
=
144
control/tests/test_heal.py
View file @
1af219aa
...
...
@@ -12,6 +12,7 @@ import responses
from
django.utils.timezone
import
utc
from
mock
import
PropertyMock
,
patch
from
control_env
import
HEAL_START
from
control.veda_heal
import
VedaHeal
from
VEDA_OS01.models
import
URL
,
Course
,
Destination
,
Encode
,
Video
,
TranscriptStatus
from
VEDA_OS01.utils
import
ValTranscriptStatus
...
...
@@ -50,7 +51,7 @@ class HealTests(TestCase):
studio_id
=
self
.
video_id
,
edx_id
=
'XXXXXXXX2014-V00TES1'
,
video_trans_start
=
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
)
-
timedelta
(
hours
=
CONFIG_DATA
[
'heal_start'
]
hours
=
HEAL_START
),
video_trans_end
=
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
)
...
...
control/veda_auth.yaml
deleted
100644 → 0
View file @
47c2d63d
control/veda_file_discovery.py
View file @
1af219aa
...
...
@@ -143,7 +143,8 @@ class FileDiscovery(object):
s3_filename
=
s3_filename
,
client_title
=
client_title
,
file_extension
=
''
,
platform_course_url
=
course_id
platform_course_url
=
course_id
,
video_orig_duration
=
0.0
)
# Update val status to 'invalid_token'
VALAPICall
(
video_proto
=
video_proto
,
val_status
=
u'invalid_token'
)
.
call
()
...
...
@@ -196,7 +197,7 @@ class FileDiscovery(object):
return
course
def
download_video_to_working_directory
(
self
,
key
,
file_name
,
file_extension
):
def
download_video_to_working_directory
(
self
,
key
,
file_name
):
"""
Downloads the video to working directory from S3 and
returns whether its successfully downloaded or not.
...
...
@@ -204,17 +205,13 @@ class FileDiscovery(object):
Arguments:
key: An S3 key whose content is going to be downloaded
file_name: Name of the file when its in working directory
file_extension: extension of this file.
"""
if
len
(
file_extension
)
==
3
:
file_name
=
u'{file_name}.{ext}'
.
format
(
file_name
=
file_name
,
ext
=
file_extension
)
file_ingested
=
False
try
:
key
.
get_contents_to_filename
(
os
.
path
.
join
(
self
.
node_work_directory
,
file_name
))
file_ingested
=
True
except
S3DataError
:
file_ingested
=
False
LOGGER
.
exception
(
'[File Ingest] Error downloading the file into node working directory.'
)
return
file_ingested
def
parse_transcript_preferences
(
self
,
course_id
,
transcript_preferences
):
...
...
@@ -287,7 +284,7 @@ class FileDiscovery(object):
if
course
:
# Download video file from S3 into node working directory.
file_extension
=
os
.
path
.
splitext
(
client_title
)[
1
][
1
:]
file_downloaded
=
self
.
download_video_to_working_directory
(
video_s3_key
,
filename
,
file_extension
)
file_downloaded
=
self
.
download_video_to_working_directory
(
video_s3_key
,
filename
)
if
not
file_downloaded
:
# S3 Bucket ingest failed, move the file rejected directory.
self
.
move_video
(
video_s3_key
,
destination_dir
=
self
.
auth_dict
[
'edx_s3_rejected_prefix'
])
...
...
control/veda_file_ingest.py
View file @
1af219aa
...
...
@@ -109,8 +109,9 @@ class VedaIngest(object):
# TODO: Break heal method listed here out into helper util
encode_instance
=
VedaHeal
(
video_query
=
Video
.
objects
.
filter
(
edx_id
=
self
.
video_proto
.
veda_id
)
edx_id
=
self
.
video_proto
.
veda_id
.
strip
()
),
val_status
=
'transcode_queue'
)
encode_instance
.
send_encodes
()
...
...
@@ -156,29 +157,27 @@ class VedaIngest(object):
def
database_record
(
self
):
"""
Start DB Inserts, Get
I
nformation
Start DB Inserts, Get
Basic File name i
nformation
"""
if
self
.
video_proto
.
s3_filename
is
not
None
:
if
self
.
video_proto
.
s3_filename
:
self
.
full_filename
=
'/'
.
join
((
self
.
node_work_directory
,
self
.
video_proto
.
s3_filename
))
if
self
.
video_proto
.
abvid_serial
is
not
None
:
if
self
.
video_proto
.
abvid_serial
:
self
.
full_filename
=
'/'
.
join
((
self
.
node_work_directory
,
self
.
video_proto
.
client_title
))
if
len
(
self
.
video_proto
.
file_extension
)
>
2
:
self
.
full_filename
+=
"."
+
self
.
video_proto
.
file_extension
if
self
.
full_filename
is
Non
e
:
if
not
self
.
full_filenam
e
:
self
.
full_filename
=
'/'
.
join
((
self
.
node_work_directory
,
self
.
video_proto
.
client_title
))
if
len
(
self
.
video_proto
.
file_extension
)
==
3
:
self
.
full_filename
+=
"."
+
self
.
video_proto
.
file_extension
if
not
os
.
path
.
exists
(
self
.
full_filename
):
LOGGER
.
exception
(
'[VIDEO_INGEST] File Not Found
%
s'
,
self
.
video_proto
.
veda_id
)
return
...
...
@@ -192,9 +191,17 @@ class VedaIngest(object):
if
self
.
video_proto
.
valid
is
True
:
self
.
_gather_metadata
()
"""
DB Inserts
"""
# DB Inserts
if
self
.
video_proto
.
s3_filename
:
video
=
Video
.
objects
.
filter
(
studio_id
=
self
.
video_proto
.
s3_filename
)
.
first
()
if
video
:
# Protect against crash/duplicate inserts, won't insert object
self
.
video_proto
.
veda_id
=
video
[
0
]
.
edx_id
self
.
video_proto
.
video_orig_duration
=
video
[
0
]
.
video_orig_duration
self
.
complete
=
True
return
v1
=
Video
(
inst_class
=
self
.
course_object
)
"""
Generate veda_id / update course record
...
...
@@ -301,7 +308,7 @@ class VedaIngest(object):
raise
def
val_insert
(
self
):
if
self
.
video_proto
.
abvid_serial
is
not
None
:
if
self
.
video_proto
.
abvid_serial
:
return
None
if
self
.
video_proto
.
valid
is
False
:
...
...
@@ -331,38 +338,24 @@ class VedaIngest(object):
def
rename
(
self
):
"""
Rename to VEDA ID,
Backup in Hotstore
"""
if
self
.
video_proto
.
veda_id
is
None
:
self
.
video_proto
.
valid
=
False
return
None
return
if
self
.
video_proto
.
file_extension
is
None
:
os
.
rename
(
self
.
full_filename
,
os
.
path
.
join
(
self
.
node_work_directory
,
self
.
video_proto
.
veda_id
)
)
self
.
full_filename
=
os
.
path
.
join
(
veda_filename
=
self
.
video_proto
.
veda_id
if
self
.
video_proto
.
file_extension
:
veda_filename
+=
'.{ext}'
.
format
(
ext
=
self
.
video_proto
.
file_extension
)
os
.
rename
(
self
.
full_filename
,
os
.
path
.
join
(
self
.
node_work_directory
,
self
.
video_proto
.
veda_id
)
else
:
os
.
rename
(
self
.
full_filename
,
os
.
path
.
join
(
self
.
node_work_directory
,
self
.
video_proto
.
veda_id
+
'.'
+
self
.
video_proto
.
file_extension
)
veda_filename
)
self
.
full_filename
=
os
.
path
.
join
(
self
.
node_work_directory
,
self
.
video_proto
.
veda_id
+
'.'
+
self
.
video_proto
.
file_extension
)
)
self
.
full_filename
=
os
.
path
.
join
(
self
.
node_work_directory
,
veda_filename
)
os
.
system
(
'chmod ugo+rwx '
+
self
.
full_filename
)
return
def
store
(
self
):
"""
...
...
control/veda_heal.py
View file @
1af219aa
...
...
@@ -20,7 +20,7 @@ from VEDA_OS01.models import Encode, URL, Video
from
VEDA_OS01.utils
import
VAL_TRANSCRIPT_STATUS_MAP
import
celeryapp
from
control_env
import
WORK_DIRECTORY
from
control_env
import
WORK_DIRECTORY
,
HEAL_START
,
HEAL_END
from
veda_encode
import
VedaEncode
from
veda_val
import
VALAPICall
from
VEDA.utils
import
get_config
...
...
@@ -49,10 +49,10 @@ class VedaHeal(object):
def
discovery
(
self
):
self
.
video_query
=
Video
.
objects
.
filter
(
video_trans_start__lt
=
self
.
current_time
-
timedelta
(
hours
=
self
.
auth_dict
[
'heal_start'
]
hours
=
HEAL_START
),
video_trans_start__gt
=
self
.
current_time
-
timedelta
(
hours
=
self
.
auth_dict
[
'heal_end'
]
hours
=
HEAL_END
)
)
...
...
@@ -62,36 +62,49 @@ class VedaHeal(object):
for
v
in
self
.
video_query
:
encode_list
=
self
.
determine_fault
(
video_object
=
v
)
# Using the 'Video Proto' Model
if
self
.
val_status
is
not
None
:
# Update to VAL is also happening for those videos which are already marked complete,
# All these retries are for the data-parity between VAL and VEDA, as calls to VAL api are
# unreliable and times out. For a completed Video, VEDA heal will keep doing this unless
# the Video is old enough and escapes from the time-span that HEAL is picking up on.
# cc Greg Martin
VAC
=
VALAPICall
(
video_proto
=
None
,
video_object
=
v
,
val_status
=
self
.
val_status
,
)
VAC
.
call
()
self
.
val_status
=
None
# Update to VAL is also happening for those videos which are already marked complete,
# All these retries are for the data-parity between VAL and VEDA, as calls to VAL api are
# unreliable and times out. For a completed Video, VEDA heal will keep doing this unless
# the Video is old enough and escapes from the time-span that HEAL is picking up on.
# cc Greg Martin
if
len
(
encode_list
)
>
0
:
self
.
val_status
=
'transcode_queue'
api_call
=
VALAPICall
(
video_proto
=
None
,
video_object
=
v
,
val_status
=
self
.
val_status
,
)
api_call
.
call
()
# Enqueue
if
self
.
auth_dict
[
'rabbitmq_broker'
]
is
not
None
:
for
e
in
encode_list
:
veda_id
=
v
.
edx_id
encode_profile
=
e
jobid
=
uuid
.
uuid1
()
.
hex
[
0
:
10
]
celeryapp
.
worker_task_fire
.
apply_async
(
(
veda_id
,
encode_profile
,
jobid
),
queue
=
self
.
auth_dict
[
'celery_worker_queue'
]
)
if
not
self
.
auth_dict
[
'rabbitmq_broker'
]:
return
for
encode
in
encode_list
:
veda_id
=
v
.
edx_id
encode_profile
=
encode
job_id
=
uuid
.
uuid1
()
.
hex
[
0
:
10
]
task_result
=
celeryapp
.
worker_task_fire
.
apply_async
(
(
veda_id
,
encode_profile
,
job_id
),
queue
=
self
.
auth_dict
[
'celery_worker_queue'
]
.
strip
(),
connect_timeout
=
3
)
# Misqueued Task
if
task_result
==
1
:
LOGGER
.
error
(
'[ENQUEUE ERROR] : {id}'
.
format
(
id
=
v
.
edx_id
))
continue
# Update Status
LOGGER
.
info
(
'[ENQUEUE] : {id}'
.
format
(
id
=
v
.
edx_id
))
Video
.
objects
.
filter
(
edx_id
=
v
.
edx_id
)
.
update
(
video_trans_status
=
'Queue'
)
def
determine_fault
(
self
,
video_object
):
"""
Determine expected and completed encodes
"""
LOGGER
.
info
(
'[
HEAL
] : {id}'
.
format
(
id
=
video_object
.
edx_id
))
LOGGER
.
info
(
'[
ENQUEUE
] : {id}'
.
format
(
id
=
video_object
.
edx_id
))
if
self
.
freezing_bug
is
True
:
if
video_object
.
video_trans_status
==
'Corrupt File'
:
self
.
val_status
=
'file_corrupt'
...
...
@@ -122,7 +135,7 @@ class VedaHeal(object):
pass
requeued_encodes
=
self
.
differentiate_encodes
(
uncompleted_encodes
,
expected_encodes
,
video_object
)
LOGGER
.
info
(
'[
HEAL
] : {id} : {status} : {encodes}'
.
format
(
LOGGER
.
info
(
'[
ENQUEUE
] : {id} : {status} : {encodes}'
.
format
(
id
=
video_object
.
edx_id
,
status
=
self
.
val_status
,
encodes
=
requeued_encodes
...
...
@@ -223,7 +236,6 @@ class VedaHeal(object):
def
purge
(
self
):
"""
Purge Work Directory
"""
for
file
in
os
.
listdir
(
WORK_DIRECTORY
):
full_filepath
=
os
.
path
.
join
(
WORK_DIRECTORY
,
file
)
...
...
control/veda_val.py
View file @
1af219aa
...
...
@@ -174,7 +174,10 @@ class VALAPICall():
self
.
video_object
.
video_orig_duration
=
0
self
.
video_object
.
duration
=
0.0
if
not
isinstance
(
self
.
video_proto
.
duration
,
float
):
except
AttributeError
:
pass
if
not
isinstance
(
self
.
video_proto
.
duration
,
float
)
and
self
.
val_status
!=
'invalid_token'
:
self
.
video_proto
.
duration
=
Output
.
_seconds_from_string
(
duration
=
self
.
video_object
.
video_orig_duration
)
...
...
@@ -369,15 +372,13 @@ class VALAPICall():
headers
=
self
.
headers
,
timeout
=
self
.
auth_dict
[
'global_timeout'
]
)
LOGGER
.
info
(
'[VAL] : {id} : {status} : {code}'
.
format
(
id
=
self
.
video_proto
.
val_id
,
status
=
self
.
val_status
,
code
=
r4
.
status_code
)
)
if
r4
.
status_code
>
299
:
ErrorObject
.
print_error
(
message
=
'
%
s
\n
%
s
\n
%
s
\n
'
%
(
'R4 : VAL POST/PUT Fail: VAL'
,
'Check VAL Config'
,
r4
.
status_code
)
)
LOGGER
.
error
(
'[VAL] : POST/PUT Fail : Check Config : {status}'
.
format
(
status
=
r4
.
status_code
))
def
update_val_transcript
(
self
,
video_id
,
lang_code
,
name
,
transcript_format
,
provider
):
"""
...
...
requirements.txt
View file @
1af219aa
...
...
@@ -13,7 +13,7 @@ pysftp
boto
pyyaml
requests
==2.18.1
celery
==
3.1.18
celery
==
4.1.0
pysrt
==1.1.1
MySQL-python
==1.2.5
gunicorn
==0.17.4
...
...
static_config.yaml
View file @
1af219aa
...
...
@@ -62,8 +62,4 @@ val_profile_dict:
hls
:
-
hls
# Heal settings
heal_start
:
1
heal_end
:
144
global_timeout
:
60
test_config.yaml
View file @
1af219aa
...
...
@@ -81,10 +81,6 @@ val_profile_dict:
hls
:
-
hls
# Heal settings
heal_start
:
2
heal_end
:
50
global_timeout
:
40
instance_prefix
:
'
127.0.0.1'
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment