Commit 24c12d74 by Brian Wilson

Add export-archiving task.

* Includes test.
* Includes workflow task to tie it to EventExport task.
* Some cleanup of pylint errors.

Change-Id: I4db83f9ba9fcec9e09a085d3df156af45c0ddbd2
parent 7873b7a0
......@@ -399,7 +399,7 @@ class AnswerDistributionPerCourseMixin(object):
made.
"""
self.answer_metadata_dict = json.load(answer_metadata_file) # pylint: disable=attribute-defined-outside-init
self.answer_metadata_dict = json.load(answer_metadata_file)
def add_metadata_to_answer(self, answer_id, answer):
"""
......
"""
Create archive files containing log files for a particular organization.
"""
import datetime
import logging
import os
import shutil
import tempfile
import tarfile
import luigi
import yaml
from edx.analytics.tasks.event_exports import EventExportTask
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.url import get_target_from_url, url_path_join, ExternalURL, IgnoredTarget
from edx.analytics.tasks.util.tempdir import make_temp_directory
from edx.analytics.tasks.pathutil import PathSetTask
log = logging.getLogger(__name__)
class ArchiveExportTask(MultiOutputMapReduceJobTask):
"""
Groups source URLs by destination organization and outputs to a single tar file per org.
Parameters:
eventlog_output_root: Directory to find the output of intermediate event-logs-by-org in.
config: url path to configuration file that defines organizations and their aliases.
output_root: url path to location where output archives get written.
temp_dir: optional path to local file directory to use to create archives.
"""
eventlog_output_root = luigi.Parameter(
default_from_config={'section': 'event-export', 'name': 'output_root'}
)
config = luigi.Parameter(
default_from_config={'section': 'event-export', 'name': 'config'}
)
output_root = luigi.Parameter(
default_from_config={'section': 'archive-event-export', 'name': 'output_root'}
)
temp_dir = luigi.Parameter(default=None)
def requires(self):
return ExternalURL(self.config)
def output(self):
# Because this task writes to a shared directory, we don't
# want to include a marker for job success. Use a special
# target that always triggers new runs and never writes out.
return IgnoredTarget()
def input_hadoop(self):
# The contents of the config will be passed to the mapper method,
# so that it gets called but without a lot of wasted I/O.
# But we do not want the source files (as generated by
# an upstream workflow) to be passed to the mapper, so isolate
# this from requires().
return get_target_from_url(self.config)
def jobconfs(self):
# Make sure that only one mapper runs. The mapper generates all
# reducer input in a single call, so it should be called once
# from a single mapper node. Appending should override any previous
# settings of the parameter (as per java command line).
jcs = super(ArchiveExportTask, self).jobconfs()
jcs.append('mapred.map.tasks=1')
return jcs
def init_local(self):
self._get_organization_info()
self.date_label = "{date}".format(date=datetime.date.today())
def init_mapper(self):
self.mapper_done = False
def _generate_log_files_for_org(self, org_name):
"""
A generator that yields log files for the specified org.
Yields tuple of absolute and relative paths (relative to org subdirectory in source).
"""
org_source = url_path_join(self.eventlog_output_root, org_name)
# Only include paths that include ".log" so that directory names are not included.
task = PathSetTask(src=org_source, include=['*.log*'])
for target in task.output():
target_url = target.path
relative_url = target_url[len(org_source):].lstrip('/')
yield (target_url, relative_url)
def mapper(self, _line):
# Enforce that this code gets run once and only once.
if self.mapper_done:
return
for org_primary_name in self.organizations:
for target_url, relative_url in self._generate_log_files_for_org(org_primary_name):
yield org_primary_name, (target_url, relative_url)
for alt_name in self.organizations[org_primary_name].get('other_names', []):
for target_url, relative_url in self._generate_log_files_for_org(alt_name):
yield org_primary_name, (target_url, relative_url)
self.mapper_done = True
def extra_modules(self):
return [yaml]
def output_path_for_key(self, key):
primary_org_id = key
tar_name = '{0}-{1}-tracking.tar'.format(self.date_label, primary_org_id)
return url_path_join(self.output_root, tar_name)
def multi_output_reducer(self, key, values, output_file):
log.debug("Calling reducer for key %s.", key)
primary_org_id = key
# Make sure that parent temp directory exists. We're going to leave it around
# when done, because it may be used for other things.
if not os.path.isdir(self.temp_dir):
log.info("Local temporary directory does not exist, creating: %s", self.temp_dir)
os.makedirs(self.temp_dir)
# Create a temp file to contain the archive output locally.
with make_temp_directory(prefix="archive", dir=self.temp_dir) as tar_temp_dir:
temp_tar_filepath = os.path.join(tar_temp_dir, "{org}.tar".format(org=primary_org_id))
# Python 2.6 doesn't have support for context manager form, so do it ourselves.
tar_output_file = tarfile.open(temp_tar_filepath, mode='w')
try:
self._write_files_to_tarfile(tar_output_file, values, tar_temp_dir)
finally:
tar_output_file.close()
# When done writing the file locally, move it to its final destination.
with open(temp_tar_filepath, 'r') as src_file:
shutil.copyfileobj(src_file, output_file)
def _get_organization_info(self):
"""Pulls organization configuration out of .yaml file."""
with self.input_hadoop().open() as config_input:
config_data = yaml.load(config_input)
self.organizations = config_data['organizations']
def _write_files_to_tarfile(self, tar_output_file, urls, tar_temp_dir):
"""Writes files to open tarfile, using temp directory for local storage."""
for target_url, relative_url in urls:
log.debug("Processing %s", target_url)
source_target = get_target_from_url(target_url)
# Copy file to local temp file.
with tempfile.NamedTemporaryFile(dir=tar_temp_dir, delete=False) as temp_input_file:
local_temp_filepath = temp_input_file.name
with source_target.open('r') as source_file:
shutil.copyfileobj(source_file, temp_input_file)
# Once we have added the temp file to the tarfile, we don't need it anymore.
tar_output_file.add(name=local_temp_filepath, arcname=relative_url)
os.remove(local_temp_filepath)
class ArchivedEventExportWorkflow(ArchiveExportTask):
"""
Generates archive (tar) files containing log files per organization.
A few parameters are shared by :py:class:`EventExportTask` and :py:class:`ArchiveExportTask`:
eventlog_output_root: Directory to write intermediate event-logs-by-org to.
config: A URL to a YAML file that contains the list of organizations and servers to export events for.
Additional parameters are passed through to :py:class:`ArchiveExportTask`:
output_root: url path to location where output archives get written.
temp_dir: optional path to local file directory to use to create archives.
Several parameters are passed through to :py:class:`EventExportTask`. These are:
source: A URL to a path that contains log files that contain the events.
environment: A list of short strings that describe the environment that generated the events. Only include
events from this list of environments.
interval: The range of dates to export logs for.
pattern: A regex with a named capture group for the date that approximates the date that the events within were
emitted. Note that the search interval is expanded, so events don't have to be in exactly the right file
in order for them to be processed.
Additional optional parameters are passed through to :py:class:`MapReduceJobTask`:
mapreduce_engine: 'hadoop' (the default) or 'local'.
n_reduce_tasks: number of reducer tasks to use in upstream tasks.
Additional optional parameters are passed through to :py:class:`MultiOutputMapReduceJobTask`:
delete_output_root: if True, recursively deletes the output_root at task creation.
"""
# Define parameters that are used only for EventExportTask.
source = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'source'}
)
environment = luigi.Parameter(is_list=True, default=['prod', 'edge'])
interval = luigi.DateIntervalParameter()
pattern = luigi.Parameter(default=None)
def requires(self):
return EventExportTask(
output_root=self.eventlog_output_root,
config=self.config,
source=self.source,
environment=self.environment,
interval=self.interval,
pattern=self.pattern,
mapreduce_engine=self.mapreduce_engine,
n_reduce_tasks=self.n_reduce_tasks,
delete_output_root=self.delete_output_root,
)
......@@ -9,7 +9,6 @@ import os
import StringIO
import luigi
import luigi.configuration
import luigi.hdfs
import luigi.hadoop
import luigi.task
......@@ -112,6 +111,7 @@ class MapReduceJobRunner(luigi.hadoop.HadoopJobRunner):
jobconfs=job_confs,
)
class EmulatedMapReduceJobRunner(luigi.hadoop.JobRunner):
"""
Execute map reduce tasks in process on the machine that is running luigi.
......@@ -189,7 +189,7 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
The mapper output tuple key is used to determine the name of the file that reducer results are written to. Different
reduce tasks must not write to the same file. Since all values for a given mapper output key are guaranteed to be
processed by the same reduce task, we only allow a single file to be output per key for safety. In the future, the
reducer output key could be used to determine the output file name, however,
reducer output key could be used to determine the output file name, however.
Parameters:
output_root: a URL location where the split files will be stored.
......@@ -199,7 +199,7 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
delete_output_root = luigi.BooleanParameter(default=False, significant=False)
def output(self):
marker_base_url = luigi.configuration.get_config().get('map-reduce', 'marker', DEFAULT_MARKER_ROOT)
marker_base_url = configuration.get_config().get('map-reduce', 'marker', DEFAULT_MARKER_ROOT)
marker_url = url_path_join(marker_base_url, str(hash(self)))
return get_target_from_url(marker_url)
......
......@@ -8,7 +8,7 @@ Supports outputs to HDFS, S3, and local FS.
import boto
import datetime
import glob
import fnmatch
import logging
import os
import re
......@@ -47,21 +47,22 @@ class PathSetTask(luigi.Task):
self.s3_conn = None
def generate_file_list(self):
"""Yield each individual path given a source folder and a set of glob expressions."""
"""Yield each individual path given a source folder and a set of file-matching expressions."""
if self.src.startswith('s3'):
# connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = boto.connect_s3()
for _bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
for _bucket, _root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = url_path_join(self.src, path)
yield ExternalURL(source)
else:
filelist = []
for include_val in self.include:
glob_pattern = "{src}/{include}".format(src=self.src, include=include_val)
filelist.extend(glob.glob(glob_pattern))
for filepath in filelist:
yield ExternalURL(filepath)
# Apply the include patterns to the relative path below the src directory.
for dirpath, _dirnames, files in os.walk(self.src):
for filename in files:
filepath = os.path.join(dirpath, filename)
relpath = os.path.relpath(filepath, self.src)
if any(fnmatch.fnmatch(relpath, include_val) for include_val in self.include):
yield ExternalURL(filepath)
def manifest_file_list(self):
"""Write each individual path to a manifest file and yield the path to that file."""
......
......@@ -77,13 +77,17 @@ def generate_s3_sources(s3_conn, source, patterns):
bucket = s3_conn.get_bucket(bucket_name)
# Make sure that the listing is done on a "folder" boundary,
# since list() just looks for matching prefixes.
root_with_slash = root + '/' if root[-1] != '/' else root
# Skip keys that have zero size. This allows directories
# to be skipped, but also skips legitimate files that are
# also zero-length.
keys = (s.key for s in bucket.list(root) if s.size > 0)
keys = (s.key for s in bucket.list(root_with_slash) if s.size > 0)
# Make paths relative by removing root
paths = (k[len(root):].lstrip('/') for k in keys)
paths = (k[len(root_with_slash):].lstrip('/') for k in keys)
# Filter only paths that match the include patterns
paths = _filter_matches(patterns, paths)
......
......@@ -62,11 +62,11 @@ class AnswerDistributionAcceptanceTest(AcceptanceTestCase):
'--user', self.config.get('connection_user'),
'AnswerDistributionOneFilePerCourseTask',
'--local-scheduler',
'--src', self.test_src,
'--src', self.test_src,
'--dest', url_path_join(self.test_root, 'dst'),
'--name', 'test',
'--output-root', self.test_out,
'--include', '"*"',
'--include', '"*"',
'--manifest', url_path_join(self.test_root, 'manifest.txt'),
'--base-input-format', self.INPUT_FORMAT,
'--lib-jar', self.oddjob_jar,
......@@ -90,4 +90,4 @@ class AnswerDistributionAcceptanceTest(AcceptanceTestCase):
# Check that at least one of the count columns is non zero
get_count = lambda line: int(line.split(',')[3])
self.assertTrue(any(get_count(l) > 0 for l in lines ))
self.assertTrue(any(get_count(l) > 0 for l in lines))
......@@ -150,7 +150,6 @@ class ExportAcceptanceTest(AcceptanceTestCase):
cursor.execute(line)
def run_export_task(self):
"""
Preconditions: Populated courseware_studentmodule table in the MySQL database.
......
"""Tests of utilities to archive files."""
import os
import shutil
import tempfile
import tarfile
import luigi.configuration
import luigi.worker
import yaml
from edx.analytics.tasks.archive import ArchiveExportTask
from edx.analytics.tasks.tests import unittest
# Define test data.
SERVERS = ['prod-edge-edxapp-001', 'prod-edxapp-011']
DATES = ['2014-05-09', '2014-05-10']
TEST_ORGS = {
'edX': ['edX'],
'HSchoolX': ['HSchoolX', 'HKSchool', 'HSchool'],
}
class ArchiveExportTaskTestCase(unittest.TestCase):
"""Tests of ArchiveExportTask."""
def setUp(self):
# Set up temporary root directory that will get cleaned up.
def cleanup(dirname):
"""Remove the temp directory only if it exists."""
if os.path.exists(dirname):
shutil.rmtree(dirname)
self.temp_rootdir = tempfile.mkdtemp()
self.addCleanup(cleanup, self.temp_rootdir)
# Set up input and output directories. Define but don't create
# the output directories.
self.src_path = os.path.join(self.temp_rootdir, "src")
os.mkdir(self.src_path)
self.output_root_path = os.path.join(self.temp_rootdir, "output")
self.archive_temp_path = os.path.join(self.temp_rootdir, "temp")
def _create_config_file(self):
"""
Create a .yaml file that contains organization data in form used by exports.
Assumes that organization names are in mixed-case, not all-lowercase.
"""
config_filepath = os.path.join(self.temp_rootdir, "config.yaml")
org_data = {'organizations': {}}
for org_name in TEST_ORGS:
org_dict = {'recipient': "person@{org}.org".format(org=org_name.lower())}
others = [org for org in TEST_ORGS[org_name] if org != org_name]
if others:
org_dict['other_names'] = others
org_data['organizations'][org_name] = org_dict
with open(config_filepath, 'w') as config_file:
yaml.dump(org_data, config_file)
return config_filepath
def _create_file_contents(self, org, server, log_date):
"""Create file contents specific to a given org, server, and log date."""
return "This log was written for {org} on {server} on {date}\n".format(org=org, server=server, date=log_date)
def _create_input_data(self, src_path, orgs=None):
"""Create tar files in the input directory."""
if orgs is None:
orgs = TEST_ORGS
for org_key in orgs:
for org in orgs[org_key]:
org_dir = os.path.join(src_path, org)
os.mkdir(org_dir)
for server in SERVERS:
server_dir = os.path.join(org_dir, server)
os.mkdir(server_dir)
for log_date in DATES:
log_filepath = os.path.join(server_dir, "{date}_{org}.log".format(date=log_date, org=org))
with open(log_filepath, 'w') as log_file:
log_file.write(self._create_file_contents(org, server, log_date))
def _parse_log_file_name(self, logfile_name):
"""
Extract parameters from log file name.
Expects name of form "prod-edge-edxapp-001/2014-05-09_HSchoolX.log".
"""
server, name = logfile_name.split('/')
date, org = name.split('_')
org = org[:-4]
return org, server, date
def _check_tar_file_contents(self, tarfile_path):
"""Confirm that tar file contains the expected input."""
self.assertTrue(tarfile.is_tarfile(tarfile_path))
org_name = tarfile_path.split('-')[3]
self.assertIn(org_name, TEST_ORGS)
tar_file = tarfile.open(tarfile_path)
for member_info in tar_file.getmembers():
org, server, log_date = self._parse_log_file_name(member_info.name)
member_file = tar_file.extractfile(member_info)
actual = member_file.read()
self.assertIn(org, TEST_ORGS[org_name])
self.assertEquals(actual, self._create_file_contents(org, server, log_date))
tar_file.close()
def _run_task(self, config_filepath):
"""Define and run ArchiveExportTask locally in Luigi."""
# Define and run the task.
task = ArchiveExportTask(
mapreduce_engine='local',
config=config_filepath,
eventlog_output_root=self.src_path,
output_root=self.output_root_path,
temp_dir=self.archive_temp_path,
)
worker = luigi.worker.Worker()
worker.add(task)
worker.run()
worker.stop()
def test_normal_task(self):
self._create_input_data(self.src_path)
config_filepath = self._create_config_file()
self._run_task(config_filepath)
# Confirm that the temp directory was created as needed, but was left empty.
self.assertTrue(os.path.isdir(self.archive_temp_path))
self.assertEquals(os.listdir(self.archive_temp_path), [])
# Confirm that the job succeeded.
output_files = os.listdir(self.output_root_path)
# Confirm that the output files were correctly tarred.
for output_file in output_files:
tarfile_path = os.path.join(self.output_root_path, output_file)
self._check_tar_file_contents(tarfile_path)
def test_orgs_with_wrong_case(self):
# Create data for orgs that differ only by case.
test_orgs = {
'HSchoolX': ['HSchoolx', 'HSchoolX'],
}
self._create_input_data(self.src_path, orgs=test_orgs)
config_filepath = self._create_config_file()
self._run_task(config_filepath)
# Confirm that output files were tarred for only
# one of the test orgs (even though the config file
# contained more).
output_files = os.listdir(self.output_root_path)
self.assertEquals(len(output_files), 1)
output_file = output_files[0]
tarfile_path = os.path.join(self.output_root_path, output_file)
self._check_tar_file_contents(tarfile_path)
tar_file = tarfile.open(tarfile_path)
self.assertEquals(len(tar_file.getmembers()), len(SERVERS) * len(DATES))
tar_file.close()
......@@ -3,7 +3,6 @@ Tests for event export tasks
"""
import datetime
from textwrap import dedent
from cStringIO import StringIO
from luigi.date_interval import Year
......
......@@ -174,6 +174,6 @@ class TestJobTask(MultiOutputMapReduceJobTask):
def output_path_for_key(self, key):
return os.path.join(self.output_root, key)
def multi_output_reducer(self, key, values, output_file):
def multi_output_reducer(self, _key, values, output_file):
for value in values:
output_file.write(value + '\n')
"""Tests for S3--related utility functionality."""
from mock import MagicMock
from mock import patch
from mock import sentinel
import luigi
import luigi.format
import luigi.hdfs
import luigi.s3
from edx.analytics.tasks import s3_util
from edx.analytics.tasks.tests import unittest
......@@ -16,12 +9,14 @@ class GenerateS3SourcesTestCase(unittest.TestCase):
"""Tests for generate_s3_sources()."""
def _make_key(self, keyname, size):
"""Makes a dummy key object, providing the necessary accessors."""
s3_key = MagicMock()
s3_key.key = keyname
s3_key.size = size
return s3_key
def _make_s3_generator(self, bucket_name, root, path_info, patterns):
"""Generates a list of matching S3 sources using a mock S3 connection."""
s3_conn = MagicMock()
s3_bucket = MagicMock()
s3_conn.get_bucket = MagicMock(return_value=s3_bucket)
......@@ -31,7 +26,7 @@ class GenerateS3SourcesTestCase(unittest.TestCase):
print [(k.key, k.size) for k in target_list]
s3_bucket.name = bucket_name
source = "s3://{bucket}/{root}/".format(bucket=bucket_name, root=root)
source = "s3://{bucket}/{root}".format(bucket=bucket_name, root=root)
generator = s3_util.generate_s3_sources(s3_conn, source, patterns)
output = list(generator)
return output
......@@ -73,6 +68,21 @@ class GenerateS3SourcesTestCase(unittest.TestCase):
self.assertEquals(len(output), 1)
self.assertEquals(output, [(bucket_name, root, "subdir1/path1")])
def test_generate_with_trailing_slash(self):
bucket_name = "bucket_name"
root = "root1/root2/"
path_info = {
"subdir1/path1": 1000,
"path2": 2000,
}
patterns = ['*']
output = self._make_s3_generator(bucket_name, root, path_info, patterns)
self.assertEquals(len(output), 2)
self.assertEquals(set(output), set([
(bucket_name, root.rstrip('/'), "subdir1/path1"),
(bucket_name, root.rstrip('/'), "path2")
]))
class ScalableS3ClientTestCase(unittest.TestCase):
"""Tests for ScalableS3Client class."""
......@@ -81,6 +91,7 @@ class ScalableS3ClientTestCase(unittest.TestCase):
self.client = s3_util.ScalableS3Client()
def _assert_get_chunk_specs(self, source_size_bytes, expected_num_chunks, expected_chunk_size):
"""Asserts that _get_chunk_specs returns the expected values."""
number_of_chunks, bytes_per_chunk = self.client._get_chunk_specs(source_size_bytes)
self.assertEquals(number_of_chunks, expected_num_chunks)
self.assertEquals(bytes_per_chunk, expected_chunk_size)
......
......@@ -31,6 +31,7 @@ edx.analytics.tasks =
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
last-country = edx.analytics.tasks.user_location:LastCountryForEachUser
export-events = edx.analytics.tasks.event_exports:EventExportTask
archive-exports = edx.analytics.tasks.archive:ArchiveExportTask
mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment