Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-video-pipeline
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-video-pipeline
Commits
e4a89c4a
Unverified
Commit
e4a89c4a
authored
Feb 09, 2018
by
Gregory Martin
Committed by
GitHub
Feb 09, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #85 from edx/yro/bugfix_0118
Yro/EDUCATOR-2152
parents
d096135b
6a2fc83d
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
119 additions
and
161 deletions
+119
-161
AUTHORS
+1
-0
control/tests/test_heal.py
+73
-28
control/veda_file_ingest.py
+32
-115
control/veda_heal.py
+10
-17
static_config.yaml
+1
-1
youtube_callback/sftp_id_retrieve.py
+2
-0
No files found.
AUTHORS
View file @
e4a89c4a
Mushtaq Ali <mushtaak@gmail.com>
Muhammad Ammar <mammar@gmail.com>
Gregory Martin <greg@edx.org>
control/tests/test_heal.py
View file @
e4a89c4a
...
...
@@ -107,49 +107,102 @@ class HealTests(TestCase):
@data
(
{
'edx_id'
:
'1'
,
'video_trans_status'
:
'Corrupt File'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
set
([
'hls'
,
'mobile_low'
])
},
{
'edx_id'
:
'1'
,
'video_trans_status'
:
'Review Reject'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
{
'edx_id'
:
'1'
,
'video_trans_status'
:
'Review Hold'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
{
'edx_id'
:
'2'
,
'video_trans_status'
:
'Complete'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
False
,
'expected_encodes'
:
set
([
'hls'
,
'mobile_low'
])
},
{
'edx_id'
:
'2'
,
'video_trans_status'
:
'Ingest'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
set
([
'hls'
,
'mobile_low'
])
},
)
@unpack
def
test_determine_fault
(
self
,
video_trans_status
,
video_trans_start
,
video_active
,
expected_encodes
):
"""
Tests that determine_fault works in various video states.
"""
video_instance
=
Video
(
edx_id
=
'test_id'
,
video_trans_status
=
video_trans_status
,
video_trans_start
=
video_trans_start
,
video_active
=
video_active
,
inst_class
=
self
.
course_object
)
video_instance
.
save
()
encodes
=
self
.
heal_instance
.
determine_fault
(
video_instance
)
self
.
assertEqual
(
encodes
,
expected_encodes
)
@data
(
{
'video_trans_status'
:
'Corrupt File'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
{
'video_trans_status'
:
'Review Reject'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
{
'video_trans_status'
:
'Review Hold'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
{
'video_trans_status'
:
'Complete'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
False
,
'expected_encodes'
:
set
([
'hls'
,
'mobile_low'
])
},
{
'video_trans_status'
:
'Ingest'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
set
([
'hls'
,
'mobile_low'
])
},
{
'edx_id'
:
'1'
,
'video_trans_status'
:
'Corrupt File'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
'expected_encodes'
:
[]
},
)
@unpack
def
test_determine_fault
(
self
,
edx_id
,
video_trans_status
,
video_trans_start
,
video_active
):
"""
Tests that determine_fault works in various video states.
"""
def
test_determine_fault_freeze_bug
(
self
,
video_trans_status
,
video_trans_start
,
video_active
,
expected_encodes
):
video_instance
=
Video
(
edx_id
=
edx_id
,
edx_id
=
'test_id'
,
video_trans_status
=
video_trans_status
,
video_trans_start
=
video_trans_start
,
video_active
=
video_active
,
...
...
@@ -157,13 +210,10 @@ class HealTests(TestCase):
)
video_instance
.
save
()
encode_list
=
self
.
heal_instance
.
determine_fault
(
video_instance
)
heal_instance_two
=
VedaHeal
(
freezing_bug
=
True
)
encode_list
=
heal_instance_two
.
determine_fault
(
video_instance
)
self
.
assertEqual
(
encode_list
,
expected_encodes
)
if
video_instance
.
edx_id
==
'1'
:
self
.
assertEqual
(
encode_list
,
[])
elif
video_instance
.
edx_id
==
'2'
:
for
e
in
encode_list
:
self
.
assertTrue
(
e
in
self
.
encode_list
)
@data
(
{
...
...
@@ -271,8 +321,8 @@ class HealTests(TestCase):
{
'uncompleted_encodes'
:
[
'test_encode'
,
'test_encode'
],
'expected_encodes'
:
[
'test_encode'
,
'test_encode'
],
'expected_long_corrupt'
:
False
,
'video_object'
:
{
'edx_id'
:
'1'
,
'video_trans_status'
:
'Complete'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
...
...
@@ -281,8 +331,8 @@ class HealTests(TestCase):
{
'uncompleted_encodes'
:
[
'test_encode'
,
'test_encode'
,
'hls'
],
'expected_encodes'
:
[
'test_encode'
,
'test_encode'
,
'hls'
],
'expected_long_corrupt'
:
False
,
'video_object'
:
{
'edx_id'
:
'2'
,
'video_trans_status'
:
'Complete'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
),
'video_active'
:
True
,
...
...
@@ -291,8 +341,8 @@ class HealTests(TestCase):
{
'uncompleted_encodes'
:
[
'test_encode'
,
'test_encode'
,
'hls'
],
'expected_encodes'
:
[
'test_encode'
,
'test_encode'
,
'hls'
],
'expected_long_corrupt'
:
True
,
'video_object'
:
{
'edx_id'
:
'3'
,
'video_trans_status'
:
'Ingest'
,
'video_trans_start'
:
datetime
.
datetime
.
utcnow
()
.
replace
(
tzinfo
=
utc
)
-
timedelta
(
days
=
10
),
'video_active'
:
True
,
...
...
@@ -300,9 +350,9 @@ class HealTests(TestCase):
}
)
@unpack
def
test_determine_longterm_corrupt
(
self
,
uncompleted_encodes
,
expected_encodes
,
video_object
):
def
test_determine_longterm_corrupt
(
self
,
uncompleted_encodes
,
expected_encodes
,
video_object
,
expected_long_corrupt
):
video_instance
=
Video
(
edx_id
=
video_object
[
'edx_id'
]
,
edx_id
=
'test_id'
,
video_trans_status
=
video_object
[
'video_trans_status'
],
video_trans_start
=
video_object
[
'video_trans_start'
],
video_active
=
video_object
[
'video_active'
],
...
...
@@ -317,9 +367,4 @@ class HealTests(TestCase):
video_instance
)
if
video_instance
.
edx_id
==
'1'
:
self
.
assertEqual
(
longterm_corrupt
,
False
)
elif
video_instance
.
edx_id
==
'2'
:
self
.
assertEqual
(
longterm_corrupt
,
False
)
elif
video_instance
.
edx_id
==
'3'
:
self
.
assertEqual
(
longterm_corrupt
,
True
)
self
.
assertEqual
(
longterm_corrupt
,
expected_long_corrupt
)
control/veda_file_ingest.py
View file @
e4a89c4a
import
logging
import
os
import
sys
import
subprocess
import
datetime
from
datetime
import
timedelta
import
time
import
fnmatch
import
django
from
django.db.utils
import
DatabaseError
from
django.utils.timezone
import
utc
from
django.db
import
reset_queries
import
uuid
import
hashlib
from
VEDA.utils
import
get_config
"""
Discovered file ingest/insert/job triggering
**NOTE**
Local Files, Migrated files are eliminated
This just takes discovered
- About Vids
- Studio Uploads
- FTP Uploads
"""
import
datetime
import
logging
import
subprocess
from
django.db.utils
import
DatabaseError
from
control_env
import
*
from
VEDA.utils
import
get_config
from
veda_heal
import
VedaHeal
from
veda_hotstore
import
Hotstore
from
veda_video_validation
import
Validation
from
veda_utils
import
ErrorObject
,
Output
,
Report
from
veda_val
import
VALAPICall
from
veda_encode
import
VedaEncode
import
celeryapp
from
VEDA_OS01.models
import
TranscriptStatus
from
veda_utils
import
Report
from
veda_val
import
VALAPICall
from
veda_video_validation
import
Validation
LOGGER
=
logging
.
getLogger
(
__name__
)
...
...
@@ -56,7 +39,7 @@ if I.complete is False:
'''
class
VideoProto
():
class
VideoProto
(
object
):
def
__init__
(
self
,
**
kwargs
):
self
.
s3_filename
=
kwargs
.
get
(
's3_filename'
,
None
)
...
...
@@ -74,7 +57,7 @@ class VideoProto():
self
.
preferred_languages
=
kwargs
.
get
(
'preferred_languages'
,
[])
self
.
source_language
=
kwargs
.
get
(
'source_language'
,
None
)
# Determined Attributes
# Determined
Videofile
Attributes
self
.
valid
=
False
self
.
filesize
=
0
self
.
duration
=
0
...
...
@@ -83,27 +66,20 @@ class VideoProto():
self
.
veda_id
=
None
class
VedaIngest
:
class
VedaIngest
(
object
)
:
def
__init__
(
self
,
course_object
,
video_proto
,
**
kwargs
):
self
.
course_object
=
course_object
self
.
video_proto
=
video_proto
self
.
auth_dict
=
get_config
()
# --- #
self
.
node_work_directory
=
kwargs
.
get
(
'node_work_directory'
,
WORK_DIRECTORY
)
self
.
full_filename
=
kwargs
.
get
(
'full_filename'
,
None
)
self
.
complete
=
False
self
.
archived
=
False
def
insert
(
self
):
"""
NOTE:
eliminate Ingest Field
"""
self
.
database_record
()
self
.
val_insert
()
# --- #
self
.
rename
()
self
.
archived
=
self
.
store
()
...
...
@@ -114,12 +90,12 @@ class VedaIngest:
os
.
remove
(
self
.
full_filename
)
return
None
self
.
queue_job
()
print
'
%
s : [
%
s ] :
%
s'
%
(
str
(
datetime
.
datetime
.
utcnow
()),
self
.
video_proto
.
veda_id
,
'File Active'
LOGGER
.
info
(
'[VIDEO_INGEST : Ingested] {video_id} : {datetime}'
.
format
(
video_id
=
self
.
video_proto
.
veda_id
,
datetime
=
str
(
datetime
.
datetime
.
utcnow
()))
)
self
.
queue_job
()
Course
.
objects
.
filter
(
pk
=
self
.
course_object
.
pk
)
.
update
(
...
...
@@ -130,55 +106,18 @@ class VedaIngest:
self
.
complete
=
True
def
queue_job
(
self
):
print
'
%
s : [
%
s ] :
%
s'
%
(
str
(
datetime
.
datetime
.
utcnow
()),
self
.
video_proto
.
veda_id
,
'Remote Assimilate'
)
'''
nouvelle:
'''
if
self
.
auth_dict
is
None
:
ErrorObject
()
.
print_error
(
message
=
'No Auth YAML Found'
# TODO: Break heal method listed here out into helper util
encode_instance
=
VedaHeal
(
video_query
=
Video
.
objects
.
filter
(
edx_id
=
self
.
video_proto
.
veda_id
)
return
None
# WRITE JOB QUEUEING
En
=
VedaEncode
(
course_object
=
self
.
course_object
,
veda_id
=
self
.
video_proto
.
veda_id
)
self
.
encode_list
=
En
.
determine_encodes
()
if
len
(
self
.
encode_list
)
==
0
:
return
None
encode_instance
.
send_encodes
()
# Enqueue
for
e
in
self
.
encode_list
:
veda_id
=
self
.
video_proto
.
veda_id
encode_profile
=
e
jobid
=
uuid
.
uuid1
()
.
hex
[
0
:
10
]
celeryapp
.
worker_task_fire
.
apply_async
(
(
veda_id
,
encode_profile
,
jobid
),
queue
=
self
.
auth_dict
[
'celery_worker_queue'
]
.
split
(
','
)[
0
]
)
"""
Update Video Status
"""
Video
.
objects
.
filter
(
edx_id
=
self
.
video_proto
.
veda_id
)
.
update
(
video_trans_status
=
'Queue'
)
def
_METADATA
(
self
):
def
_gather_metadata
(
self
):
"""
use st filesize for filesize
Use "ffprobe" for other metadata
***
"""
self
.
video_proto
.
filesize
=
os
.
stat
(
self
.
full_filename
)
.
st_size
...
...
@@ -189,7 +128,6 @@ class VedaIngest:
p
=
subprocess
.
Popen
(
ff_command
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
shell
=
True
)
for
line
in
iter
(
p
.
stdout
.
readline
,
b
''
):
# print line
if
"Duration: "
in
line
:
self
.
video_proto
.
duration
=
line
.
split
(
','
)[
0
]
.
split
(
' '
)[
-
1
]
...
...
@@ -205,7 +143,6 @@ class VedaIngest:
vid_reso_break
=
vid_breakout
[
2
]
.
strip
()
.
split
(
' '
)
for
v
in
vid_reso_break
:
if
"x"
in
v
:
print
v
self
.
video_proto
.
resolution
=
v
.
strip
()
if
self
.
video_proto
.
resolution
is
None
:
self
.
video_proto
.
resolution
=
vid_breakout
[
3
]
.
strip
()
...
...
@@ -243,10 +180,8 @@ class VedaIngest:
self
.
full_filename
+=
"."
+
self
.
video_proto
.
file_extension
if
not
os
.
path
.
exists
(
self
.
full_filename
):
ErrorObject
()
.
print_error
(
message
=
'Ingest: File Not Found'
)
return
None
LOGGER
.
exception
(
'[VIDEO_INGEST] File Not Found
%
s'
,
self
.
video_proto
.
veda_id
)
return
"""
Validate File
...
...
@@ -256,7 +191,7 @@ class VedaIngest:
self
.
video_proto
.
valid
=
VV
.
validate
()
if
self
.
video_proto
.
valid
is
True
:
self
.
_
METADATA
()
self
.
_
gather_metadata
()
"""
DB Inserts
"""
...
...
@@ -362,7 +297,7 @@ class VedaIngest:
except
Exception
:
# Log the exception and raise.
LOGGER
.
exception
(
'[VIDEO
-PIPELINE] File Ingest
- Cataloging of video=
%
s failed.'
,
self
.
video_proto
.
veda_id
)
LOGGER
.
exception
(
'[VIDEO
_INGEST]
- Cataloging of video=
%
s failed.'
,
self
.
video_proto
.
veda_id
)
raise
def
val_insert
(
self
):
...
...
@@ -385,12 +320,12 @@ class VedaIngest:
if
self
.
video_proto
.
abvid_serial
is
None
:
return
None
R
=
Report
(
email_report
=
Report
(
status
=
"File Corrupt on Ingest"
,
upload_serial
=
self
.
video_proto
.
abvid_serial
,
youtube_id
=
''
)
R
.
upload_status
()
email_report
.
upload_status
()
self
.
complete
=
True
def
rename
(
self
):
...
...
@@ -438,21 +373,3 @@ class VedaIngest:
upload_filepath
=
self
.
full_filename
)
return
H1
.
upload
()
def
main
():
"""
VP = VideoProto()
VI = VedaIngest(
course_object='Mock',
video_proto=VP,
full_filename='/Users/gregmartin/Downloads/MIT15662T115-V016800.mov'
)
VI._METADATA()
print VI.video_proto.resolution
"""
pass
if
__name__
==
"__main__"
:
sys
.
exit
(
main
())
control/veda_heal.py
View file @
e4a89c4a
...
...
@@ -13,7 +13,6 @@ import logging
import
os
import
sys
import
uuid
import
yaml
from
django.utils.timezone
import
utc
...
...
@@ -43,7 +42,7 @@ class VedaHeal(object):
self
.
auth_dict
=
get_config
()
# for individuals
self
.
video_query
=
kwargs
.
get
(
'video_query'
,
None
)
self
.
freezing_bug
=
kwargs
.
get
(
'freezing_bug'
,
Tru
e
)
self
.
freezing_bug
=
kwargs
.
get
(
'freezing_bug'
,
Fals
e
)
self
.
val_status
=
None
self
.
retry_barrier_hours
=
24
...
...
@@ -85,7 +84,7 @@ class VedaHeal(object):
jobid
=
uuid
.
uuid1
()
.
hex
[
0
:
10
]
celeryapp
.
worker_task_fire
.
apply_async
(
(
veda_id
,
encode_profile
,
jobid
),
queue
=
self
.
auth_dict
[
'celery_worker_queue'
]
.
split
(
','
)[
0
]
queue
=
self
.
auth_dict
[
'celery_worker_queue'
]
)
def
determine_fault
(
self
,
video_object
):
...
...
@@ -98,8 +97,8 @@ class VedaHeal(object):
self
.
val_status
=
'file_corrupt'
return
[]
if
video_object
.
video_trans_status
==
'Review Reject'
or
video_object
.
video_trans_status
==
'Review Hold'
or
\
video_object
.
video_trans_status
==
'Review Hold'
:
if
video_object
.
video_trans_status
==
'Review Reject'
or
\
video_object
.
video_trans_status
==
'Review Hold'
:
return
[]
if
video_object
.
video_trans_status
==
'Youtube Duplicate'
:
...
...
@@ -123,9 +122,12 @@ class VedaHeal(object):
pass
requeued_encodes
=
self
.
differentiate_encodes
(
uncompleted_encodes
,
expected_encodes
,
video_object
)
LOGGER
.
info
(
'[HEAL] : Status: {status}, Encodes: {encodes}'
.
format
(
status
=
self
.
val_status
,
encodes
=
requeued_encodes
)
)
LOGGER
.
info
(
'[HEAL] : {id} : {status} : {encodes}'
.
format
(
id
=
video_object
.
edx_id
,
status
=
self
.
val_status
,
encodes
=
requeued_encodes
))
return
requeued_encodes
def
differentiate_encodes
(
self
,
uncompleted_encodes
,
expected_encodes
,
video_object
):
...
...
@@ -231,13 +233,4 @@ class VedaHeal(object):
)
)
.
replace
(
tzinfo
=
utc
)
if
filetime
<
time_safetygap
:
print
file
+
" : WORK PURGE"
os
.
remove
(
full_filepath
)
def
main
():
VH
=
VedaHeal
()
VH
.
discovery
()
if
__name__
==
'__main__'
:
sys
.
exit
(
main
())
static_config.yaml
View file @
e4a89c4a
...
...
@@ -9,7 +9,7 @@ edx_s3_rejected_prefix: prod-edx/rejected/
onsite_worker
:
False
celery_threads
:
1
celery_app_name
:
veda_production
celery_worker_queue
:
encode_worker
,large_encode_worker
celery_worker_queue
:
encode_worker
celery_deliver_queue
:
deliver_worker
celery_heal_queue
:
heal_queue
...
...
youtube_callback/sftp_id_retrieve.py
View file @
e4a89c4a
...
...
@@ -93,6 +93,8 @@ def xml_downloader(course):
LOGGER
.
info
(
"{inst}{clss} : Authentication Failed"
.
format
(
inst
=
course
.
institution
,
clss
=
course
.
edx_classid
))
except
SSHException
:
LOGGER
.
info
(
"{inst}{clss} : Authentication Failed"
.
format
(
inst
=
course
.
institution
,
clss
=
course
.
edx_classid
))
except
IOError
:
LOGGER
.
info
(
"{inst}{clss} : List Dir Failed"
.
format
(
inst
=
course
.
institution
,
clss
=
course
.
edx_classid
))
def
crawl_sftp
(
d
,
s1
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment