Commit 62d6530f by Gregory Martin

Change Encode Indentation

parent a94870ee
......@@ -18,8 +18,6 @@ install:
script: video_worker
script:
- python tests/test_api_connect.py
- python tests/test_build.py
- make validate
after_success:
......
---
#--
# Global settings
heal_start: 1
heal_end: 144
global_timeout: 40
# ---
# Database information
# ---
......@@ -88,9 +81,9 @@ celery_receiver_queue: encode_worker,large_encode_worker
celery_threads:
## CLOUD
rabbitmq_broker:
rabbitmq_pass:
rabbitmq_user:
rabbitmq_broker: ""
rabbitmq_pass: ""
rabbitmq_user: ""
# ---
# Shotgun Variables (internal mediateam)
......@@ -138,7 +131,6 @@ val_profile_dict:
override:
- desktop_mp4
- mobile_low
- mobile_high
audio_mp3:
- audio_mp3
......@@ -149,4 +141,13 @@ val_profile_dict:
hls:
- hls
#--
# Global settings
heal_start: 3
heal_end: 144
global_timeout: 40
ffmpeg_compiled: 'ffmpeg'
ffprobe_compiled: 'ffprobe'
...
import os
import sys
import unittest
import requests
# for Travis
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_worker.config import WorkerSetup
from video_worker.generate_apitoken import val_tokengen, veda_tokengen
"""
This is an API connection test
set to pass if instance_config.yaml is missing
"""
# Disable warning
requests.packages.urllib3.disable_warnings()
class TestAPIConnection(unittest.TestCase):
def setUp(self):
self.WS = WorkerSetup()
if os.path.exists(self.WS.instance_yaml):
self.WS.run()
self.settings = self.WS.settings_dict
def test_val_setup(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
salient_variables = [
'val_api_url',
'val_client_id',
'val_password',
'val_secret_key',
'val_username',
'val_token_url',
]
for s in salient_variables:
self.assertTrue(len(self.WS.settings_dict[s]) > 0)
def test_veda_setup(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
salient_variables = [
'veda_api_url',
'veda_auth_url',
'veda_client_id',
'veda_secret_key',
'veda_token_url',
]
for s in salient_variables:
self.assertTrue(len(self.WS.settings_dict[s]) > 0)
def test_val_connection(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
val_token = val_tokengen()
self.assertFalse(val_token is None)
headers = {
'Authorization': 'Bearer ' + val_token,
'content-type': 'application/json'
}
s = requests.get(self.WS.settings_dict['val_api_url'], headers=headers, timeout=20)
self.assertFalse(s.status_code == 404)
self.assertFalse(s.status_code > 299)
def test_veda_connection(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
veda_token = veda_tokengen()
self.assertFalse(veda_token is None)
headers = {
'Authorization': 'Token ' + veda_token,
'content-type': 'application/json'
}
s = requests.get(self.WS.settings_dict['veda_api_url'] + '/', headers=headers, timeout=20)
self.assertFalse(s.status_code == 404)
self.assertFalse(s.status_code > 299)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
from boto.exception import S3ResponseError
from boto.s3.connection import S3Connection
import os
import unittest
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_worker.config import WorkerSetup
"""
Test for deliverable connection
set to pass if instance_config.yaml is missing
"""
class TestAssetConnection(unittest.TestCase):
def setUp(self):
self.WS = WorkerSetup()
if os.path.exists(self.WS.instance_yaml):
self.WS.run()
self.settings = self.WS.settings_dict
def test_storage_connection(self):
if not os.path.exists(self.WS.instance_yaml):
return None
conn = S3Connection()
try:
bucket = conn.get_bucket(self.settings['veda_s3_hotstore_bucket'])
self.assertTrue(True)
except S3ResponseError:
self.assertFalse(True)
def test_delivery_connection(self):
if not os.path.exists(self.WS.instance_yaml):
return None
conn = S3Connection()
try:
bucket = conn.get_bucket(self.settings['veda_deliverable_bucket'])
self.assertTrue(True)
except S3ResponseError:
self.assertFalse(True)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_worker.abstractions import Video, Encode
from video_worker.config import WorkerSetup
from video_worker.generate_encode import CommandGenerate
"""
test Encode Abstraction and Command Gen
"""
class Test_Encode_Command(unittest.TestCase):
def setUp(self):
self.WS = WorkerSetup()
if os.path.exists(self.WS.instance_yaml):
self.WS.run()
self.settings = self.WS.settings_dict
self.encode_profile = 'desktop_mp4'
"""
Gen abstractions
"""
# Video
self.VideoObject = Video(
veda_id='XXXXXXXX2016-V00TEST',
)
self.VideoObject.activate()
# Encode
self.E = Encode(
VideoObject=self.VideoObject,
profile_name=self.encode_profile
)
self.E.pull_data()
self.ffcommand = None
def test_generate_command(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
"""
Generate the (shell) command / Encode Object
"""
self.assertTrue(self.VideoObject.valid is True)
"""
Generate the Encode Object
"""
self.E.pull_data()
self.assertFalse(self.E.filetype is None)
"""
Generate the (shell) command
"""
self.ffcommand = CommandGenerate(
VideoObject=self.VideoObject,
EncodeObject=self.E
).generate()
self.assertFalse(self.ffcommand is None)
"""
TODO: More sophisticated encode tests
"""
# TODO: pillarbox, letterbox commands
# TODO: scalar commands
# TODO: destination file, etc.
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_worker.__init__ import VideoWorker
from video_worker.abstractions import Video, Encode
from video_worker.config import WorkerSetup
"""
file intake test
"""
class TestIntake(unittest.TestCase):
def setUp(self):
self.WS = WorkerSetup()
if os.path.exists(self.WS.instance_yaml):
self.WS.run()
self.settings = self.WS.settings_dict
self.encode_profile = 'desktop_mp4'
self.veda_id = 'XXXXXXXX2016-V00TEST'
self.jobid = 'xx4xx'
self.VW = VideoWorker(
veda_id=self.veda_id,
encode_profile=self.encode_profile,
jobid=self.jobid
)
def test_intake(self):
if not os.path.exists(self.WS.instance_yaml):
self.assertTrue(True)
return None
# copied from __init__
self.VW.VideoObject = Video(
veda_id=self.VW.veda_id
)
self.VW.VideoObject.activate()
self.assertTrue(self.VW.VideoObject.valid)
self.VW.settings = self.settings
self.VW._engine_intake()
print self.VW.VideoObject
self.assertTrue(self.VW.VideoObject.valid)
self.assertTrue(
os.path.exists(
os.path.join(
self.VW.workdir,
self.VW.source_file
)
)
)
self.assertTrue(self.VW.VideoObject.valid)
@unittest.skip("not implemented")
def tearDown(self):
if self.jobid is not None:
shutil.rmtree(self.VW.workdir)
else:
os.remove(
os.path.join(
self.VW.workdir,
self.VW.output_file
)
)
os.remove(
os.path.join(
self.VW.workdir,
self.VW.source_file
)
)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
import os
import sys
import unittest
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from video_worker.abstractions import Video
"""
init end-to-end function tests
"""
class TestVideoAbstraction(unittest.TestCase):
def setUp(self):
self.VideoObject = Video()
self.VideoObject.activate()
def test_video_object(self):
self.assertTrue(os.path.exists(self.VideoObject.mezz_filepath))
def test_video_validate(self):
self.assertTrue(self.VideoObject.valid)
def main():
unittest.main()
if __name__ == '__main__':
sys.exit(main())
......@@ -92,9 +92,6 @@ class VideoWorker(object):
WS = WorkerSetup(
instance_yaml=self.instance_yaml
)
if self.setup:
WS.setup = True
WS.run()
self.settings = WS.settings_dict
......
......@@ -200,9 +200,9 @@ class Encode(object):
self.encode_suffix = e['encode_suffix']
self.encode_pk = e['id']
if self.encode_suffix is None:
# In the case of an API Error
self._default_encodes()
if self.encode_suffix is None:
# In the case of an API Error
self._default_encodes()
def _default_encodes(self):
"""
......
......@@ -6,6 +6,7 @@ based on variables within
import logging
import os
import sys
import yaml
logger = logging.getLogger(__name__)
......@@ -24,9 +25,6 @@ class WorkerSetup:
self.settings_dict = {}
def run(self):
"""
Read Extant Settings or Generate New Ones
"""
if not os.path.exists(self.instance_yaml):
logger.error('Not Configured')
return
......@@ -34,6 +32,6 @@ class WorkerSetup:
with open(self.instance_yaml, 'r') as stream:
try:
self.settings_dict = yaml.load(stream)
except yaml.YAMLError as exc:
except yaml.YAMLError:
logger.error('Config YAML read error')
return
......@@ -2,14 +2,13 @@
This file tests configuration settings for worker setup.
"""
import os
import unittest
import yaml
from ddt import ddt, data, unpack
from mock import Mock, patch, mock_open
from mock import patch
from utils import create_worker_setup, TEST_INSTANCE_YAML, DUMMY_INSTANCE_YAML
from utils import create_worker_setup, TEST_INSTANCE_YAML
@ddt
......@@ -17,45 +16,21 @@ class ConfigTest(unittest.TestCase):
"""
Config test class.
"""
@data(
(True, TEST_INSTANCE_YAML),
(False, TEST_INSTANCE_YAML),
(False, '/dummy-instance-config-path'),
(True, '/dummy-instance-config-path')
)
@unpack
@patch('video_worker.config.WorkerSetup._READ_SETTINGS')
@patch('video_worker.config.WorkerSetup._CONFIGURE')
def test_run(self, setup, instance_yaml, mock_configure, mock_read_settings):
"""
Tests that correct method is called based on worker setup configuration
"""
WS = create_worker_setup({
'instance_yaml': instance_yaml,
'setup': setup
})
WS.run()
instance_yaml_exists = os.path.exists(WS.instance_yaml)
self.assertEqual(mock_configure.called, setup or not instance_yaml_exists)
self.assertEqual(mock_read_settings.called, not setup and instance_yaml_exists)
@data(
('/dummy-instance-config-path', 'Not Configured'),
(TEST_INSTANCE_YAML, '')
)
@unpack
@patch('video_worker.config.logger')
def test_read_settings(self, instance_yaml, error_message, mock_logger):
def test_run(self, instance_yaml, error_message, mock_logger):
"""
Tests that `_READ_SETTINGS` method works correctly.
Tests that `_read_settings` method works correctly.
"""
WS = create_worker_setup({
'instance_yaml': instance_yaml
})
self.assertEqual(WS.settings_dict, {})
WS._READ_SETTINGS()
WS.run()
if error_message:
self.assertFalse(len(WS.settings_dict))
......@@ -64,55 +39,16 @@ class ConfigTest(unittest.TestCase):
self.assertNotEqual(WS.settings_dict, {})
self.assertTrue(len(WS.settings_dict))
@patch('video_worker.config.raw_input', create=True)
@patch('video_worker.config.yaml.load')
def test_configure(self, yaml_load, mock_raw_input):
"""
Tests that `_CONFIGURE` method works correctly.
"""
with open(DUMMY_INSTANCE_YAML, 'w+') as file:
mock_raw_input.return_value = 'dummy-value'
yaml_load.return_value = {
'dummy_key': 'dummy-value',
'empty_value_key': ''
}
WS = create_worker_setup({
'instance_yaml': DUMMY_INSTANCE_YAML
})
self.assertEqual(WS.settings_dict, {})
WS._CONFIGURE()
self.assertNotEqual(WS.settings_dict, {})
self.assertTrue(len(WS.settings_dict))
# Close and delete the file after used
file.close()
os.remove(DUMMY_INSTANCE_YAML)
@patch('video_worker.config.logger')
@patch('video_worker.config.yaml.load')
def test_read_settings_load_error(self, yaml_load, mock_logger):
def test_run_load_error(self, yaml_load, mock_logger):
"""
Tests that `_READ_SETTINGS` method raises correct log in case of yaml load error.
Tests that `_read_settings` method raises correct log in case of yaml load error.
"""
error_message = 'Config YAML read error'
yaml_load.side_effect = yaml.YAMLError(error_message)
WS = create_worker_setup()
self.assertEqual(WS.settings_dict, {})
WS._READ_SETTINGS()
mock_logger.error.assert_called_with(error_message)
self.assertEqual(WS.settings_dict, {})
@patch('video_worker.config.logger')
@patch('video_worker.config.yaml.load')
def test_configure_load_error(self, yaml_load, mock_logger):
"""
Tests that `_CONFIGURE` method raises correct log in case of yaml load error.
"""
error_message = 'default YAML read error'
yaml_load.side_effect = yaml.YAMLError(error_message)
WS = create_worker_setup()
self.assertEqual(WS.settings_dict, {})
WS._CONFIGURE()
WS.run()
mock_logger.error.assert_called_with(error_message)
self.assertEqual(WS.settings_dict, {})
......@@ -27,8 +27,14 @@ veda_s3_upload_bucket: 'dummy-upload-bucket'
veda_s3_hotstore_bucket: 'dummy-hotstore-bucket'
veda_deliverable_bucket: 'dummy-deliverable-bucket'
ffmpeg_compiled: 'ffmpeg'
val_client_id: None
val_video_images_url: 'https://www.testimg.com/update/images'
aws_video_images_prefix: 'video-images/'
aws_video_images_bucket: 'imagesbucket'
# Celery
celery_deliver_queue: ""
#--
# Global settings
global_timeout: 40
......@@ -82,17 +82,6 @@ class VideoWorkerTest(unittest.TestCase):
)
self.assertEqual(VW.workdir, expected_workdir)
@data(True, False)
@patch('nose.run')
def test_video_worker_test(self, nose_result, mock_nose):
"""
Test `test` method works correctly.
"""
# Mock nose.run
mock_nose.return_value = nose_result
result = self.VW.test()
self.assertEqual(result, nose_result)
@data(
(
{
......@@ -116,7 +105,7 @@ class VideoWorkerTest(unittest.TestCase):
(
{
'encode_profile': 'static-pipeline',
'path_exists': False
'exists_side_effects': [True, False]
}
),
# Success
......@@ -162,7 +151,7 @@ class VideoWorkerTest(unittest.TestCase):
# First 2 calls to os.path.exists will be called in WS.run() so we need to make sure our TEST_INSTANCE_YAML
# file path is check correctly.
mock_exists.side_effect = [True, True, path_exists]
mock_exists.side_effect = mock_data.get('exists_side_effects', [True, True, True])
video_images_setup_mock.return_value = WS.settings_dict
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment