Commit 8395cd3d by Brian Wilson Committed by Gerrit Code Review

Merge "Add marker file output to MultiOutputMapReduceJobTask."

parents d6181177 4d62487d
[hadoop]
version = cdh3
[core]
logging_conf_file=logging.cfg
......
......@@ -664,18 +664,18 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
Groups answer distributions by course, producing a different file for each.
Most parameters are passed through to :py:class:`AnswerDistributionPerCourse`.
One additional parameter is defined:
Additional parameters are defined by :py:class:`MultiOutputMapReduceJobTask`.:
output_root: location where the one-file-per-course outputs
are written. This is distinct from `dest`, which is where
intermediate output is written.
delete_output_root: if True, recursively deletes the output_root at task creation.
"""
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
name = luigi.Parameter(default='periodic')
output_root = luigi.Parameter()
answer_metadata = luigi.Parameter(default=None)
manifest = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None)
......
......@@ -25,7 +25,6 @@ log = logging.getLogger(__name__)
FIELD_SIZE_LIMIT = 4 * 1024 * 1024 # 4 MB
csv.field_size_limit(FIELD_SIZE_LIMIT)
# Helpers for the courseware student module table.
STUDENT_MODULE_FIELDS = [
......@@ -52,11 +51,9 @@ class StudentModulePerCourseTask(MultiOutputMapReduceJobTask):
Parameters:
dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification.
"""
dump_root = luigi.Parameter()
output_root = luigi.Parameter()
output_suffix = luigi.Parameter(default=None)
def requires(self):
......@@ -113,6 +110,7 @@ class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask):
dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification.
delete_output_root: if True, recursively deletes the output_root at task creation.
credentials: Path to the external access credentials file.
num_mappers: The number of map tasks to ask Sqoop to use.
where: A 'where' clause to be passed to Sqoop.
......
......@@ -8,7 +8,11 @@ import luigi.hdfs
import luigi.hadoop
from luigi import configuration
from edx.analytics.tasks.url import get_target_from_url, IgnoredTarget
from edx.analytics.tasks.url import get_target_from_url, url_path_join
# Name of marker file to appear in output directory of MultiOutputMapReduceJobTask to indicate success.
MARKER_FILENAME = 'job_success'
class MapReduceJobTask(luigi.hadoop.JobTask):
......@@ -73,11 +77,16 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
reduce tasks must not write to the same file. Since all values for a given mapper output key are guaranteed to be
processed by the same reduce task, we only allow a single file to be output per key for safety. In the future, the
reducer output key could be used to determine the output file name, however,
Parameters:
output_root: a URL location where the split files will be stored.
delete_output_root: if True, recursively deletes the output_root at task creation.
"""
output_root = luigi.Parameter()
delete_output_root = luigi.BooleanParameter(default=False)
def output(self):
# Unfortunately, Luigi requires an output.
return IgnoredTarget()
return get_target_from_url(url_path_join(self.output_root, MARKER_FILENAME))
def reducer(self, key, values):
"""
......@@ -104,3 +113,15 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
from this function.
"""
return None
def __init__(self, *args, **kwargs):
super(MultiOutputMapReduceJobTask, self).__init__(*args, **kwargs)
if self.delete_output_root:
# If requested, make sure that the output directory is empty. This gets rid
# of any generated data files from a previous run (that might not get
# regenerated in this run). It also makes sure that the marker file
# (i.e. the output target) will be removed, so that external functionality
# will know that the generation of data files is not complete.
output_dir_target = get_target_from_url(self.output_root)
if output_dir_target.exists():
output_dir_target.remove()
"""
Tests for database export tasks
"""
from mock import Mock
from edx.analytics.tasks.database_exports import StudentModulePerCourseTask
from edx.analytics.tasks.database_exports import STUDENT_MODULE_FIELDS
from edx.analytics.tasks.tests import unittest
from mock import Mock
STATE_MYSQLDUMP = '\'{\\"answer\\": {\\"code\\": \\"print(\\\'hello world\\\')\\\\r\\\\n\\\\t\\", \\"score\\": 1.0}} ' \
'\\"msg\\": \\"\\\\n<div class=\\\\\\"test\\\\\\">\\\\nTest\\\\n</div>\\\\n\\", \\"num\\": 100}\''
......@@ -81,11 +79,11 @@ class StudentModulePerCourseTestCase(unittest.TestCase):
expected = 'test://output/Sample-Course-ID-courseware_studentmodule-test-analytics.sql'
self.assertEqual(filename, expected)
def test_empty_output_path(self):
def test_empty_output_suffix(self):
task = StudentModulePerCourseTask(
mapreduce_engine='local',
dump_root='test://dump_root',
output_root='test://output'
output_root='test://output/',
)
course_id = 'Sample/Course/ID'
......
......@@ -3,6 +3,9 @@
from __future__ import absolute_import
from mock import patch, call
import os
import tempfile
import shutil
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.tests import unittest
......@@ -12,14 +15,15 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
"""Tests for MultiOutputMapReduceJobTask."""
def setUp(self):
self.task = TestJobTask(
mapreduce_engine='local'
)
patcher = patch('edx.analytics.tasks.mapreduce.get_target_from_url')
self.mock_get_target = patcher.start()
self.addCleanup(patcher.stop)
self.task = TestJobTask(
mapreduce_engine='local',
output_root='/any/path',
)
def test_reducer(self):
self.assert_values_written_to_file('foo', ['bar', 'baz'])
......@@ -40,11 +44,54 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
self.assert_values_written_to_file('foo2', ['bar2'])
class MultiOutputMapReduceJobTaskOutputRootTest(unittest.TestCase):
"""Tests for output_root behavior of MultiOutputMapReduceJobTask."""
def setUp(self):
# Define a real output directory, so it can
# be removed if existing.
def cleanup(dirname):
"""Remove the temp directory only if it exists."""
if os.path.exists(dirname):
shutil.rmtree(dirname)
self.output_root = tempfile.mkdtemp()
self.addCleanup(cleanup, self.output_root)
def test_no_delete_output_root(self):
self.assertTrue(os.path.exists(self.output_root))
TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
)
self.assertTrue(os.path.exists(self.output_root))
def test_delete_output_root(self):
# We create a task in order to get the output path.
task = TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
)
output_marker = task.output().path
open(output_marker, 'a').close()
self.assertTrue(task.complete())
# Once the output path is created, we can
# then confirm that it gets cleaned up..
task = TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
delete_output_root="true",
)
self.assertFalse(task.complete())
self.assertFalse(os.path.exists(self.output_root))
class TestJobTask(MultiOutputMapReduceJobTask):
"""Dummy task to use for testing."""
def output_path_for_key(self, key):
return '/any/path/' + key
return os.path.join(self.output_root, key)
def multi_output_reducer(self, key, values, output_file):
for value in values:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment