Commit 014e57f4 by Gabe Mulley

Support processing a smaller list of orgs

Change-Id: I0ae887baa749a06b4457d3c591cbd1e6a2ddf138
parent 2598c1cb
...@@ -29,6 +29,8 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask): ...@@ -29,6 +29,8 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
config: url path to configuration file that defines organizations and their aliases. config: url path to configuration file that defines organizations and their aliases.
output_root: url path to location where output archives get written. output_root: url path to location where output archives get written.
temp_dir: optional path to local file directory to use to create archives. temp_dir: optional path to local file directory to use to create archives.
org_id: A list of organizations to process data for. If provided, only these organizations will be processed.
Otherwise, all valid organizations will be processed.
""" """
eventlog_output_root = luigi.Parameter( eventlog_output_root = luigi.Parameter(
...@@ -41,6 +43,7 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask): ...@@ -41,6 +43,7 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
default_from_config={'section': 'archive-event-export', 'name': 'output_root'} default_from_config={'section': 'archive-event-export', 'name': 'output_root'}
) )
temp_dir = luigi.Parameter(default=None) temp_dir = luigi.Parameter(default=None)
org_id = luigi.Parameter(is_list=True, default=[])
# Force this job to flush each counter increment instead of # Force this job to flush each counter increment instead of
# batching them. The tasks does not output data directly through # batching them. The tasks does not output data directly through
...@@ -91,6 +94,11 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask): ...@@ -91,6 +94,11 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
Yields tuple of absolute and relative paths (relative to org subdirectory in source). Yields tuple of absolute and relative paths (relative to org subdirectory in source).
""" """
# If org_ids are specified, restrict the processed files to that set.
if len(self.org_id) > 0 and org_name not in self.org_id:
return
org_source = url_path_join(self.eventlog_output_root, org_name) org_source = url_path_join(self.eventlog_output_root, org_name)
# Only include paths that include ".log" so that directory names are not included. # Only include paths that include ".log" so that directory names are not included.
...@@ -236,6 +244,7 @@ class ArchivedEventExportWorkflow(ArchiveExportTask): ...@@ -236,6 +244,7 @@ class ArchivedEventExportWorkflow(ArchiveExportTask):
environment=self.environment, environment=self.environment,
interval=self.interval, interval=self.interval,
pattern=self.pattern, pattern=self.pattern,
org_id=self.org_id,
mapreduce_engine=self.mapreduce_engine, mapreduce_engine=self.mapreduce_engine,
n_reduce_tasks=self.n_reduce_tasks, n_reduce_tasks=self.n_reduce_tasks,
delete_output_root=self.delete_output_root, delete_output_root=self.delete_output_root,
......
...@@ -31,6 +31,8 @@ class EventExportTask(MultiOutputMapReduceJobTask): ...@@ -31,6 +31,8 @@ class EventExportTask(MultiOutputMapReduceJobTask):
pattern: A regex with a named capture group for the date that approximates the date that the events within were pattern: A regex with a named capture group for the date that approximates the date that the events within were
emitted. Note that the search interval is expanded, so events don't have to be in exactly the right file emitted. Note that the search interval is expanded, so events don't have to be in exactly the right file
in order for them to be processed. in order for them to be processed.
org_id: A list of organizations to process data for. If provided, only these organizations will be processed.
Otherwise, all valid organizations will be processed.
""" """
output_root = luigi.Parameter( output_root = luigi.Parameter(
...@@ -45,6 +47,7 @@ class EventExportTask(MultiOutputMapReduceJobTask): ...@@ -45,6 +47,7 @@ class EventExportTask(MultiOutputMapReduceJobTask):
environment = luigi.Parameter(is_list=True, default=['prod', 'edge']) environment = luigi.Parameter(is_list=True, default=['prod', 'edge'])
interval = luigi.DateIntervalParameter() interval = luigi.DateIntervalParameter()
pattern = luigi.Parameter(default=None) pattern = luigi.Parameter(default=None)
org_id = luigi.Parameter(is_list=True, default=[])
gpg_key_dir = luigi.Parameter( gpg_key_dir = luigi.Parameter(
default_from_config={'section': 'event-export', 'name': 'gpg_key_dir'} default_from_config={'section': 'event-export', 'name': 'gpg_key_dir'}
...@@ -87,7 +90,12 @@ class EventExportTask(MultiOutputMapReduceJobTask): ...@@ -87,7 +90,12 @@ class EventExportTask(MultiOutputMapReduceJobTask):
for alias in org_config.get('other_names', []): for alias in org_config.get('other_names', []):
self.recipient_for_org_id[alias] = recipient self.recipient_for_org_id[alias] = recipient
self.org_id_whitelist = self.recipient_for_org_id.keys() self.org_id_whitelist = set(self.recipient_for_org_id.keys())
# If org_ids are specified, restrict the processed files to that set.
if len(self.org_id) > 0:
self.org_id_whitelist.intersection_update(self.org_id)
log.debug('Using org_id whitelist ["%s"]', '", "'.join(self.org_id_whitelist)) log.debug('Using org_id whitelist ["%s"]', '", "'.join(self.org_id_whitelist))
self.server_name_whitelist = set() self.server_name_whitelist = set()
......
...@@ -106,7 +106,7 @@ class ArchiveExportTaskTestCase(unittest.TestCase): ...@@ -106,7 +106,7 @@ class ArchiveExportTaskTestCase(unittest.TestCase):
self.assertEquals(actual, self._create_file_contents(org, server, log_date)) self.assertEquals(actual, self._create_file_contents(org, server, log_date))
tar_file.close() tar_file.close()
def _run_task(self, config_filepath): def _run_task(self, config_filepath, **kwargs):
"""Define and run ArchiveExportTask locally in Luigi.""" """Define and run ArchiveExportTask locally in Luigi."""
# Define and run the task. # Define and run the task.
task = ArchiveExportTask( task = ArchiveExportTask(
...@@ -115,6 +115,7 @@ class ArchiveExportTaskTestCase(unittest.TestCase): ...@@ -115,6 +115,7 @@ class ArchiveExportTaskTestCase(unittest.TestCase):
eventlog_output_root=self.src_path, eventlog_output_root=self.src_path,
output_root=self.output_root_path, output_root=self.output_root_path,
temp_dir=self.archive_temp_path, temp_dir=self.archive_temp_path,
**kwargs
) )
worker = luigi.worker.Worker() worker = luigi.worker.Worker()
worker.add(task) worker.add(task)
...@@ -160,3 +161,21 @@ class ArchiveExportTaskTestCase(unittest.TestCase): ...@@ -160,3 +161,21 @@ class ArchiveExportTaskTestCase(unittest.TestCase):
tar_file = tarfile.open(tarfile_path) tar_file = tarfile.open(tarfile_path)
self.assertEquals(len(tar_file.getmembers()), len(SERVERS) * len(DATES)) self.assertEquals(len(tar_file.getmembers()), len(SERVERS) * len(DATES))
tar_file.close() tar_file.close()
def test_limited_orgs(self):
self._create_input_data(self.src_path)
config_filepath = self._create_config_file()
self._run_task(config_filepath, org_id=['edX'])
# Confirm that the job succeeded.
output_files = os.listdir(self.output_root_path)
self.assertEquals(len(output_files), 1)
output_file = output_files[0]
self.assertEquals(output_file.split('-')[3], 'edX')
# Confirm that the output files were correctly tarred.
tarfile_path = os.path.join(self.output_root_path, output_file)
self._check_tar_file_contents(tarfile_path)
...@@ -50,7 +50,10 @@ class EventExportTestCase(unittest.TestCase): ...@@ -50,7 +50,10 @@ class EventExportTestCase(unittest.TestCase):
CONFIGURATION = yaml.dump(CONFIG_DICT) CONFIGURATION = yaml.dump(CONFIG_DICT)
def setUp(self): def setUp(self):
self.task = EventExportTask( self.task = self._create_export_task()
def _create_export_task(self, **kwargs):
task = EventExportTask(
mapreduce_engine='local', mapreduce_engine='local',
output_root='test://output/', output_root='test://output/',
config='test://config/default.yaml', config='test://config/default.yaml',
...@@ -58,15 +61,22 @@ class EventExportTestCase(unittest.TestCase): ...@@ -58,15 +61,22 @@ class EventExportTestCase(unittest.TestCase):
environment=['edge', 'prod'], environment=['edge', 'prod'],
interval=Year.parse('2014'), interval=Year.parse('2014'),
gpg_key_dir='test://config/gpg-keys/', gpg_key_dir='test://config/gpg-keys/',
gpg_master_key='skeleton.key@example.com' gpg_master_key='skeleton.key@example.com',
**kwargs
) )
self.task.input_local = MagicMock(return_value=FakeTarget(self.CONFIGURATION)) task.input_local = MagicMock(return_value=FakeTarget(self.CONFIGURATION))
return task
def test_org_whitelist_capture(self): def test_org_whitelist_capture(self):
self.task.init_local() self.task.init_local()
self.assertItemsEqual(self.task.org_id_whitelist, ['FooX', 'BarX', 'BazX', 'bar']) self.assertItemsEqual(self.task.org_id_whitelist, ['FooX', 'BarX', 'BazX', 'bar'])
def test_limited_orgs(self):
task = self._create_export_task(org_id=['FooX', 'bar'])
task.init_local()
self.assertItemsEqual(task.org_id_whitelist, ['FooX', 'bar'])
def test_server_whitelist_capture(self): def test_server_whitelist_capture(self):
self.task.init_local() self.task.init_local()
self.assertItemsEqual(self.task.server_name_whitelist, [self.SERVER_NAME_1, self.SERVER_NAME_2]) self.assertItemsEqual(self.task.server_name_whitelist, [self.SERVER_NAME_1, self.SERVER_NAME_2])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment