Unverified Commit 2d9c855a by Gregory Martin Committed by GitHub

Merge pull request #19 from edx/yro/add_onsite_workers

Reintegrate onsite GPU Workers / Update Celery
parents 8eaf8b34 157fe217
--- ---
## NOTE!
# NEVER COMMIT A SECRET HERE!
#
# This file is meant as a guide, but should not be assumed to deploy to production.
# To deploy changes to this file to production,
# change in @edx/veda-secure/shared/instance_config.yaml
#
#
# --- # ----------
# AWS onsite_worker: False
# --- veda_s3_hotstore_bucket: ""
veda_s3_hotstore_bucket: ##---
veda_deliverable_bucket: # This is a list of encodes and their respective course
edx_s3_endpoint_bucket: # boolean matches
aws_video_images_bucket: encode_dict:
aws_video_images_prefix: '' review_proc:
- review
# --- mobile_override:
# Celery - override
# --- s3_proc:
celery_threads: 1 - mobile_low
- audio_mp3
# --- - desktop_mp4
# VAL user creds - hls
# ---
val_api_url: yt_proc:
val_video_images_url: - youtube
val_client_id:
val_password: ##---
val_secret_key: # This is a list of encode profiles and their val profile matches
val_token_url: # boolean matches
val_username: val_profile_dict:
mobile_low:
## VEDA API Auth - mobile_low
veda_api_url:
veda_auth_url: desktop_mp4:
veda_client_id: - desktop_mp4
veda_secret_key:
veda_token_url: override:
- desktop_mp4
## CLOUD - mobile_low
rabbitmq_broker: ""
rabbitmq_pass: "" audio_mp3:
rabbitmq_user: "" - audio_mp3
instance_prefix: "" youtube:
- youtube
review:
hls:
- hls
#--
# Global settings
heal_start: 3
heal_end: 144
global_timeout: 40
ffmpeg_compiled: 'ffmpeg'
ffprobe_compiled: 'ffprobe'
...
...@@ -2,8 +2,8 @@ PyYAML==3.11 ...@@ -2,8 +2,8 @@ PyYAML==3.11
boto==2.39.0 boto==2.39.0
nose==1.3.3 nose==1.3.3
requests==2.10.0 requests==2.10.0
celery==3.1.18 celery==4.1.0
newrelic newrelic
# TODO! https://openedx.atlassian.net/browse/EDUCATOR-2279 # TODO! https://openedx.atlassian.net/browse/EDUCATOR-2279
# this way is required when installing using `pip install requirements.txt` so that `encode_profiles.json` is accessible # this way is required when installing using `pip install requirements.txt` so that `encode_profiles.json` is accessible
-e git+https://github.com/yro/chunkey.git@14f9c330d4ba5a4290dd0d17429b142618bc2c9f#egg=chunkey -e git+https://github.com/yro/chunkey.git@f659c95552413d340ccab7bd27e8d63a874fd316#egg=chunkey
...@@ -18,7 +18,14 @@ from video_worker.api_communicate import UpdateAPIStatus ...@@ -18,7 +18,14 @@ from video_worker.api_communicate import UpdateAPIStatus
from celeryapp import deliverable_route from celeryapp import deliverable_route
from video_worker.generate_encode import CommandGenerate from video_worker.generate_encode import CommandGenerate
from video_worker.generate_delivery import Deliverable from video_worker.generate_delivery import Deliverable
from video_worker.global_vars import ENCODE_WORK_DIR, VAL_TRANSCODE_STATUS, NODE_TRANSCODE_STATUS
from video_worker.global_vars import (
HOME_DIR,
ENCODE_WORK_DIR,
VAL_TRANSCODE_STATUS,
NODE_TRANSCODE_STATUS,
BOTO_TIMEOUT
)
from video_worker.reporting import Output from video_worker.reporting import Output
from video_worker.validate import ValidateVideo from video_worker.validate import ValidateVideo
from video_worker.video_images import VideoImages from video_worker.video_images import VideoImages
...@@ -29,9 +36,8 @@ try: ...@@ -29,9 +36,8 @@ try:
except: except:
pass pass
boto.config.set('Boto', 'http_socket_timeout', '10') boto.config.set('Boto', 'http_socket_timeout', BOTO_TIMEOUT)
logging.basicConfig(level=logging.INFO)
logging.basicConfig()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -45,17 +51,14 @@ class VideoWorker(object): ...@@ -45,17 +51,14 @@ class VideoWorker(object):
self.encode_profile = kwargs.get('encode_profile', None) self.encode_profile = kwargs.get('encode_profile', None)
self.VideoObject = kwargs.get('VideoObject', None) self.VideoObject = kwargs.get('VideoObject', None)
# Working Dir Config self.instance_yaml = kwargs.get(
self.workdir = kwargs.get('workdir', None) 'instance_yaml',
if self.workdir is None: os.path.join(
if self.jobid is None: os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
self.workdir = ENCODE_WORK_DIR 'instance_config.yaml'
else: )
self.workdir = os.path.join(ENCODE_WORK_DIR, self.jobid) )
self.workdir = kwargs.get('workdir', self.determine_workdir())
if not os.path.exists(ENCODE_WORK_DIR):
os.mkdir(ENCODE_WORK_DIR)
self.ffcommand = None self.ffcommand = None
self.source_file = kwargs.get('source_file', None) self.source_file = kwargs.get('source_file', None)
self.output_file = None self.output_file = None
...@@ -64,6 +67,14 @@ class VideoWorker(object): ...@@ -64,6 +67,14 @@ class VideoWorker(object):
self.encoded = False self.encoded = False
self.delivered = False self.delivered = False
def determine_workdir(self):
if not os.path.exists(ENCODE_WORK_DIR):
os.mkdir(ENCODE_WORK_DIR)
if self.jobid is None:
return ENCODE_WORK_DIR
else:
return os.path.join(ENCODE_WORK_DIR, self.jobid)
def test(self): def test(self):
""" """
Run tests Run tests
...@@ -128,6 +139,9 @@ class VideoWorker(object): ...@@ -128,6 +139,9 @@ class VideoWorker(object):
# generate video images command and update S3 and edxval # generate video images command and update S3 and edxval
# run against 'hls' encode only # run against 'hls' encode only
if self.encode_profile == 'hls': if self.encode_profile == 'hls':
# Run HLS encode
self._hls_pipeline()
# Auto-video Images
VideoImages( VideoImages(
video_object=self.VideoObject, video_object=self.VideoObject,
work_dir=self.workdir, work_dir=self.workdir,
...@@ -135,8 +149,7 @@ class VideoWorker(object): ...@@ -135,8 +149,7 @@ class VideoWorker(object):
jobid=self.jobid, jobid=self.jobid,
settings=self.settings settings=self.settings
).create_and_update() ).create_and_update()
# Run HLS encode
self._hls_pipeline()
else: else:
self._static_pipeline() self._static_pipeline()
...@@ -179,14 +192,23 @@ class VideoWorker(object): ...@@ -179,14 +192,23 @@ class VideoWorker(object):
os.chdir(self.workdir) os.chdir(self.workdir)
V1 = Chunkey( if self.settings['onsite_worker'] is True:
mezz_file=os.path.join(self.workdir, self.source_file), hls_chunk_instance = Chunkey(
DELIVER_BUCKET=self.settings['edx_s3_endpoint_bucket'], mezz_file=os.path.join(self.workdir, self.source_file),
clean=False DELIVER_BUCKET=self.settings['edx_s3_endpoint_bucket'],
) clean=False,
ACCESS_KEY_ID=self.settings['edx_access_key_id'],
SECRET_ACCESS_KEY=self.settings['edx_secret_access_key']
)
else:
hls_chunk_instance = Chunkey(
mezz_file=os.path.join(self.workdir, self.source_file),
DELIVER_BUCKET=self.settings['edx_s3_endpoint_bucket'],
clean=False,
)
if V1.complete: if hls_chunk_instance.complete:
self.endpoint_url = V1.manifest_url self.endpoint_url = hls_chunk_instance.manifest_url
def _engine_intake(self): def _engine_intake(self):
""" """
...@@ -197,7 +219,13 @@ class VideoWorker(object): ...@@ -197,7 +219,13 @@ class VideoWorker(object):
return return
if self.source_file is None: if self.source_file is None:
conn = S3Connection() if self.settings['onsite_worker'] is True:
conn = S3Connection(
self.settings['veda_access_key_id'],
self.settings['veda_secret_access_key']
)
else:
conn = S3Connection()
try: try:
bucket = conn.get_bucket(self.settings['veda_s3_hotstore_bucket']) bucket = conn.get_bucket(self.settings['veda_s3_hotstore_bucket'])
except S3ResponseError: except S3ResponseError:
...@@ -266,6 +294,9 @@ class VideoWorker(object): ...@@ -266,6 +294,9 @@ class VideoWorker(object):
if not os.path.exists(os.path.join(self.workdir, self.source_file)): if not os.path.exists(os.path.join(self.workdir, self.source_file)):
logger.error('[VIDEO_WORKER] Source File (local) NOT FOUND - Input') logger.error('[VIDEO_WORKER] Source File (local) NOT FOUND - Input')
return return
print '%s : %s' % (self.VideoObject.veda_id, self.encode_profile)
# to be polite
print
process = subprocess.Popen( process = subprocess.Popen(
self.ffcommand, self.ffcommand,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
...@@ -273,7 +304,6 @@ class VideoWorker(object): ...@@ -273,7 +304,6 @@ class VideoWorker(object):
shell=True, shell=True,
universal_newlines=True universal_newlines=True
) )
print '%s : %s' % (self.VideoObject.veda_id, self.encode_profile)
Output.status_bar(process=process) Output.status_bar(process=process)
# to be polite # to be polite
print print
...@@ -288,11 +318,15 @@ class VideoWorker(object): ...@@ -288,11 +318,15 @@ class VideoWorker(object):
Validate encode by matching (w/in 5 sec) encode duration, Validate encode by matching (w/in 5 sec) encode duration,
as well as standard validation tests as well as standard validation tests
""" """
self.encoded = ValidateVideo( if self.output_file is None:
filepath=os.path.join(self.workdir, self.output_file), self.encoded = False
product_file=True, return
VideoObject=self.VideoObject else:
).valid self.encoded = ValidateVideo(
filepath=os.path.join(self.workdir, self.output_file),
product_file=True,
VideoObject=self.VideoObject
).valid
def _deliver_file(self): def _deliver_file(self):
""" """
......
...@@ -7,7 +7,6 @@ Start Celery Worker (if VEDA-attached node) ...@@ -7,7 +7,6 @@ Start Celery Worker (if VEDA-attached node)
from celery import Celery from celery import Celery
import os import os
import shutil import shutil
import sys
from video_worker.utils import get_config from video_worker.utils import get_config
from video_worker.global_vars import ENCODE_WORK_DIR from video_worker.global_vars import ENCODE_WORK_DIR
...@@ -16,16 +15,12 @@ from video_worker.global_vars import ENCODE_WORK_DIR ...@@ -16,16 +15,12 @@ from video_worker.global_vars import ENCODE_WORK_DIR
settings = get_config() settings = get_config()
def cel_start():
def cel_Start():
app = Celery( app = Celery(
settings.setdefault('celery_app_name', ''), settings.setdefault('celery_app_name', ''),
broker='amqp://' + settings.setdefault('rabbitmq_user', '') + broker='amqp://' + settings.setdefault('rabbitmq_user', '') +
':' + settings.setdefault('rabbitmq_pass', '') + ':' + settings.setdefault('rabbitmq_pass', '') +
'@' + settings.setdefault('rabbitmq_broker', '') + ':5672//', '@' + settings.setdefault('rabbitmq_broker', '') + ':5672//',
backend='amqp://' + settings.setdefault('rabbitmq_user', '') +
':' + settings.setdefault('rabbitmq_pass', '') +
'@' + settings.setdefault('rabbitmq_broker', '') + ':5672//',
include=['celeryapp'] include=['celeryapp']
) )
...@@ -34,13 +29,20 @@ def cel_Start(): ...@@ -34,13 +29,20 @@ def cel_Start():
CELERY_IGNORE_RESULT=True, CELERY_IGNORE_RESULT=True,
CELERY_TASK_RESULT_EXPIRES=10, CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_PREFETCH_MULTIPLIER=1, CELERYD_PREFETCH_MULTIPLIER=1,
CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml'] CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_PUBLISH_RETRY=True,
CELERY_TASK_PUBLISH_RETRY_POLICY={
"max_retries": 3,
"interval_start": 0,
"interval_step": 1,
"interval_max": 5
}
) )
return app return app
app = cel_Start() app = cel_start()
@app.task(name='worker_encode') @app.task(name='worker_encode')
...@@ -98,5 +100,5 @@ def test_command(message): ...@@ -98,5 +100,5 @@ def test_command(message):
if __name__ == '__main__': if __name__ == '__main__':
app = cel_Start() app = cel_start()
app.start() app.start()
...@@ -83,15 +83,14 @@ class Deliverable(): ...@@ -83,15 +83,14 @@ class Deliverable():
Upload single part (under threshold in node_config) Upload single part (under threshold in node_config)
node_config MULTI_UPLOAD_BARRIER node_config MULTI_UPLOAD_BARRIER
""" """
try: if settings['onsite_worker'] is True:
conn = boto.connect_s3() conn = boto.connect_s3(
delv_bucket = conn.get_bucket(settings['veda_deliverable_bucket']) settings['veda_access_key_id'],
settings['veda_secret_access_key']
except S3ResponseError:
ErrorObject().print_error(
message='Deliverable Fail: s3 Connection Error - Singleton'
) )
return False else:
conn = boto.connect_s3()
delv_bucket = conn.get_bucket(settings['veda_deliverable_bucket'])
upload_key = Key(delv_bucket) upload_key = Key(delv_bucket)
upload_key.key = self.output_file upload_key.key = self.output_file
...@@ -129,14 +128,14 @@ class Deliverable(): ...@@ -129,14 +128,14 @@ class Deliverable():
sys.stdout.flush() sys.stdout.flush()
# Connect to s3 # Connect to s3
try: if settings['onsite_worker'] is True:
c = boto.connect_s3() conn = boto.connect_s3(
b = c.lookup(settings['veda_deliverable_bucket']) settings['veda_access_key_id'],
except S3ResponseError: settings['veda_secret_access_key']
ErrorObject().print_error(
message='Deliverable Fail: s3 Connection Error - Multipart'
) )
return False else:
conn = boto.connect_s3()
b = conn.lookup(settings['veda_deliverable_bucket'])
if b is None: if b is None:
ErrorObject().print_error( ErrorObject().print_error(
......
...@@ -5,7 +5,7 @@ Globals ...@@ -5,7 +5,7 @@ Globals
""" """
import os import os
from os.path import expanduser
from video_worker.utils import get_config, ROOT_DIR from video_worker.utils import get_config, ROOT_DIR
DEFAULT_ENCODE_WORK_DIR = os.path.join(ROOT_DIR, 'ENCODE_WORKDIR') DEFAULT_ENCODE_WORK_DIR = os.path.join(ROOT_DIR, 'ENCODE_WORKDIR')
...@@ -28,7 +28,7 @@ HLS_SUBSTITUTE = 'mobile_low' ...@@ -28,7 +28,7 @@ HLS_SUBSTITUTE = 'mobile_low'
# For BOTO Multipart uploader # For BOTO Multipart uploader
MULTI_UPLOAD_BARRIER = 2000000000 MULTI_UPLOAD_BARRIER = 2000000000
BOTO_TIMEOUT = 60 BOTO_TIMEOUT = '60'
# Settings for testing # Settings for testing
TEST_VIDEO_DIR = os.path.join( TEST_VIDEO_DIR = os.path.join(
...@@ -42,3 +42,5 @@ NODE_COLORS_BLUE = '\033[94m' ...@@ -42,3 +42,5 @@ NODE_COLORS_BLUE = '\033[94m'
NODE_COLORS_GREEN = '\033[92m' NODE_COLORS_GREEN = '\033[92m'
NODE_COLORS_RED = '\033[91m' NODE_COLORS_RED = '\033[91m'
NODE_COLORS_END = '\033[0m' NODE_COLORS_END = '\033[0m'
HOME_DIR = expanduser("~")
#---# #---#
# Test instance_config for edx-video-worker # Test instance_config for edx-video-worker
#---# #---#
# Onsite
onsite_worker: False
# VEDA credentials # VEDA credentials
veda_api_url: 'http://dummy-veda-api-url' veda_api_url: 'http://dummy-veda-api-url'
veda_token_url: 'http://dummy-veda-token-url' veda_token_url: 'http://dummy-veda-token-url'
......
...@@ -14,9 +14,6 @@ from video_worker.abstractions import Video ...@@ -14,9 +14,6 @@ from video_worker.abstractions import Video
from video_worker.global_vars import ENCODE_WORK_DIR from video_worker.global_vars import ENCODE_WORK_DIR
from video_worker.utils import get_config from video_worker.utils import get_config
from utils import TEST_INSTANCE_YAML
worker_settings = get_config() worker_settings = get_config()
......
...@@ -122,7 +122,13 @@ class VideoImages(object): ...@@ -122,7 +122,13 @@ class VideoImages(object):
""" """
Upload auto generated images to S3. Upload auto generated images to S3.
""" """
s3_connection = S3Connection() if self.settings['onsite_worker'] is True:
s3_connection = S3Connection(
self.settings['edx_access_key_id'],
self.settings['edx_secret_access_key']
)
else:
s3_connection = S3Connection()
try: try:
bucket = s3_connection.get_bucket(self.settings['aws_video_images_bucket']) bucket = s3_connection.get_bucket(self.settings['aws_video_images_bucket'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment