Commit 7832943b by Carlos Andrés Rocha

Use Hadoop counter in event export archive task as keep-alive.

Change-Id: I0658a81486b5007c2e71697768cdc7e67090c3f1
parent f8265ccd
......@@ -42,6 +42,16 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
)
temp_dir = luigi.Parameter(default=None)
# Force this job to flush each counter increment instead of
# batching them. The tasks does not output data directly through
# Hadoop. Since there is no other way for Hadoop to verify that
# the task is progressing, we use counters as a keep-alive.
batch_counter_default = 1
# For the reason described above we copy to S3 in chunks, using a counter between chunks.
# This is the size of each chunk
chunk_size = 16 * 1024 * 1024 # 16 MB
def requires(self):
return ExternalURL(self.config)
......@@ -126,6 +136,7 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
# Create a temp file to contain the archive output locally.
with make_temp_directory(prefix="archive", dir=self.temp_dir) as tar_temp_dir:
temp_tar_filepath = os.path.join(tar_temp_dir, "{org}.tar".format(org=primary_org_id))
# Python 2.6 doesn't have support for context manager form, so do it ourselves.
tar_output_file = tarfile.open(temp_tar_filepath, mode='w')
try:
......@@ -135,7 +146,7 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
# When done writing the file locally, move it to its final destination.
with open(temp_tar_filepath, 'r') as src_file:
shutil.copyfileobj(src_file, output_file)
self._copy_file(src_file, output_file)
def _get_organization_info(self):
"""Pulls organization configuration out of .yaml file."""
......@@ -153,11 +164,26 @@ class ArchiveExportTask(MultiOutputMapReduceJobTask):
with tempfile.NamedTemporaryFile(dir=tar_temp_dir, delete=False) as temp_input_file:
local_temp_filepath = temp_input_file.name
with source_target.open('r') as source_file:
shutil.copyfileobj(source_file, temp_input_file)
self._copy_file(source_file, temp_input_file)
# Once we have added the temp file to the tarfile, we don't need it anymore.
tar_output_file.add(name=local_temp_filepath, arcname=relative_url)
os.remove(local_temp_filepath)
# See comment for `batch_counter_default` above.
self.incr_counter('Archive exported events', 'Files archived', 1)
def _copy_file(self, source, destination):
"""Copy a file object to another in predetermined chunk size"""
while True:
data = source.read(self.chunk_size)
if data:
# See comment for `batch_counter_default` above.
self.incr_counter('Archive exported events', 'Bytes transferred', len(data))
destination.write(data)
else:
break
class ArchivedEventExportWorkflow(ArchiveExportTask):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment