Commit 4d62487d by Brian Wilson

Add marker file output to MultiOutputMapReduceJobTask.

Includes deleting the output_root directory at task creation.

Change-Id: I0ba51f43e79e5794fd50ddf8000ea6c60d72590f
parent 13e60cca
[hadoop]
version = cdh3
[core] [core]
logging_conf_file=logging.cfg logging_conf_file=logging.cfg
......
...@@ -664,18 +664,18 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -664,18 +664,18 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
Groups answer distributions by course, producing a different file for each. Groups answer distributions by course, producing a different file for each.
Most parameters are passed through to :py:class:`AnswerDistributionPerCourse`. Most parameters are passed through to :py:class:`AnswerDistributionPerCourse`.
One additional parameter is defined: Additional parameters are defined by :py:class:`MultiOutputMapReduceJobTask`.:
output_root: location where the one-file-per-course outputs output_root: location where the one-file-per-course outputs
are written. This is distinct from `dest`, which is where are written. This is distinct from `dest`, which is where
intermediate output is written. intermediate output is written.
delete_output_root: if True, recursively deletes the output_root at task creation.
""" """
src = luigi.Parameter() src = luigi.Parameter()
dest = luigi.Parameter() dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
name = luigi.Parameter(default='periodic') name = luigi.Parameter(default='periodic')
output_root = luigi.Parameter()
answer_metadata = luigi.Parameter(default=None) answer_metadata = luigi.Parameter(default=None)
manifest = luigi.Parameter(default=None) manifest = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None) base_input_format = luigi.Parameter(default=None)
......
...@@ -25,7 +25,6 @@ log = logging.getLogger(__name__) ...@@ -25,7 +25,6 @@ log = logging.getLogger(__name__)
FIELD_SIZE_LIMIT = 4 * 1024 * 1024 # 4 MB FIELD_SIZE_LIMIT = 4 * 1024 * 1024 # 4 MB
csv.field_size_limit(FIELD_SIZE_LIMIT) csv.field_size_limit(FIELD_SIZE_LIMIT)
# Helpers for the courseware student module table. # Helpers for the courseware student module table.
STUDENT_MODULE_FIELDS = [ STUDENT_MODULE_FIELDS = [
...@@ -52,11 +51,9 @@ class StudentModulePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -52,11 +51,9 @@ class StudentModulePerCourseTask(MultiOutputMapReduceJobTask):
Parameters: Parameters:
dump_root: a URL location of the database dump. dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification. output_suffix: added to the filenames for identification.
""" """
dump_root = luigi.Parameter() dump_root = luigi.Parameter()
output_root = luigi.Parameter()
output_suffix = luigi.Parameter(default=None) output_suffix = luigi.Parameter(default=None)
def requires(self): def requires(self):
...@@ -113,6 +110,7 @@ class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask): ...@@ -113,6 +110,7 @@ class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask):
dump_root: a URL location of the database dump. dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored. output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification. output_suffix: added to the filenames for identification.
delete_output_root: if True, recursively deletes the output_root at task creation.
credentials: Path to the external access credentials file. credentials: Path to the external access credentials file.
num_mappers: The number of map tasks to ask Sqoop to use. num_mappers: The number of map tasks to ask Sqoop to use.
where: A 'where' clause to be passed to Sqoop. where: A 'where' clause to be passed to Sqoop.
......
...@@ -8,7 +8,11 @@ import luigi.hdfs ...@@ -8,7 +8,11 @@ import luigi.hdfs
import luigi.hadoop import luigi.hadoop
from luigi import configuration from luigi import configuration
from edx.analytics.tasks.url import get_target_from_url, IgnoredTarget from edx.analytics.tasks.url import get_target_from_url, url_path_join
# Name of marker file to appear in output directory of MultiOutputMapReduceJobTask to indicate success.
MARKER_FILENAME = 'job_success'
class MapReduceJobTask(luigi.hadoop.JobTask): class MapReduceJobTask(luigi.hadoop.JobTask):
...@@ -73,11 +77,16 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask): ...@@ -73,11 +77,16 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
reduce tasks must not write to the same file. Since all values for a given mapper output key are guaranteed to be reduce tasks must not write to the same file. Since all values for a given mapper output key are guaranteed to be
processed by the same reduce task, we only allow a single file to be output per key for safety. In the future, the processed by the same reduce task, we only allow a single file to be output per key for safety. In the future, the
reducer output key could be used to determine the output file name, however, reducer output key could be used to determine the output file name, however,
Parameters:
output_root: a URL location where the split files will be stored.
delete_output_root: if True, recursively deletes the output_root at task creation.
""" """
output_root = luigi.Parameter()
delete_output_root = luigi.BooleanParameter(default=False)
def output(self): def output(self):
# Unfortunately, Luigi requires an output. return get_target_from_url(url_path_join(self.output_root, MARKER_FILENAME))
return IgnoredTarget()
def reducer(self, key, values): def reducer(self, key, values):
""" """
...@@ -104,3 +113,15 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask): ...@@ -104,3 +113,15 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
from this function. from this function.
""" """
return None return None
def __init__(self, *args, **kwargs):
super(MultiOutputMapReduceJobTask, self).__init__(*args, **kwargs)
if self.delete_output_root:
# If requested, make sure that the output directory is empty. This gets rid
# of any generated data files from a previous run (that might not get
# regenerated in this run). It also makes sure that the marker file
# (i.e. the output target) will be removed, so that external functionality
# will know that the generation of data files is not complete.
output_dir_target = get_target_from_url(self.output_root)
if output_dir_target.exists():
output_dir_target.remove()
""" """
Tests for database export tasks Tests for database export tasks
""" """
from mock import Mock
from edx.analytics.tasks.database_exports import StudentModulePerCourseTask from edx.analytics.tasks.database_exports import StudentModulePerCourseTask
from edx.analytics.tasks.database_exports import STUDENT_MODULE_FIELDS from edx.analytics.tasks.database_exports import STUDENT_MODULE_FIELDS
from edx.analytics.tasks.tests import unittest from edx.analytics.tasks.tests import unittest
from mock import Mock
STATE_MYSQLDUMP = '\'{\\"answer\\": {\\"code\\": \\"print(\\\'hello world\\\')\\\\r\\\\n\\\\t\\", \\"score\\": 1.0}} ' \ STATE_MYSQLDUMP = '\'{\\"answer\\": {\\"code\\": \\"print(\\\'hello world\\\')\\\\r\\\\n\\\\t\\", \\"score\\": 1.0}} ' \
'\\"msg\\": \\"\\\\n<div class=\\\\\\"test\\\\\\">\\\\nTest\\\\n</div>\\\\n\\", \\"num\\": 100}\'' '\\"msg\\": \\"\\\\n<div class=\\\\\\"test\\\\\\">\\\\nTest\\\\n</div>\\\\n\\", \\"num\\": 100}\''
...@@ -81,11 +79,11 @@ class StudentModulePerCourseTestCase(unittest.TestCase): ...@@ -81,11 +79,11 @@ class StudentModulePerCourseTestCase(unittest.TestCase):
expected = 'test://output/Sample-Course-ID-courseware_studentmodule-test-analytics.sql' expected = 'test://output/Sample-Course-ID-courseware_studentmodule-test-analytics.sql'
self.assertEqual(filename, expected) self.assertEqual(filename, expected)
def test_empty_output_path(self): def test_empty_output_suffix(self):
task = StudentModulePerCourseTask( task = StudentModulePerCourseTask(
mapreduce_engine='local', mapreduce_engine='local',
dump_root='test://dump_root', dump_root='test://dump_root',
output_root='test://output' output_root='test://output/',
) )
course_id = 'Sample/Course/ID' course_id = 'Sample/Course/ID'
......
...@@ -3,6 +3,9 @@ ...@@ -3,6 +3,9 @@
from __future__ import absolute_import from __future__ import absolute_import
from mock import patch, call from mock import patch, call
import os
import tempfile
import shutil
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.tests import unittest from edx.analytics.tasks.tests import unittest
...@@ -12,14 +15,15 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase): ...@@ -12,14 +15,15 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
"""Tests for MultiOutputMapReduceJobTask.""" """Tests for MultiOutputMapReduceJobTask."""
def setUp(self): def setUp(self):
self.task = TestJobTask(
mapreduce_engine='local'
)
patcher = patch('edx.analytics.tasks.mapreduce.get_target_from_url') patcher = patch('edx.analytics.tasks.mapreduce.get_target_from_url')
self.mock_get_target = patcher.start() self.mock_get_target = patcher.start()
self.addCleanup(patcher.stop) self.addCleanup(patcher.stop)
self.task = TestJobTask(
mapreduce_engine='local',
output_root='/any/path',
)
def test_reducer(self): def test_reducer(self):
self.assert_values_written_to_file('foo', ['bar', 'baz']) self.assert_values_written_to_file('foo', ['bar', 'baz'])
...@@ -40,11 +44,54 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase): ...@@ -40,11 +44,54 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
self.assert_values_written_to_file('foo2', ['bar2']) self.assert_values_written_to_file('foo2', ['bar2'])
class MultiOutputMapReduceJobTaskOutputRootTest(unittest.TestCase):
"""Tests for output_root behavior of MultiOutputMapReduceJobTask."""
def setUp(self):
# Define a real output directory, so it can
# be removed if existing.
def cleanup(dirname):
"""Remove the temp directory only if it exists."""
if os.path.exists(dirname):
shutil.rmtree(dirname)
self.output_root = tempfile.mkdtemp()
self.addCleanup(cleanup, self.output_root)
def test_no_delete_output_root(self):
self.assertTrue(os.path.exists(self.output_root))
TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
)
self.assertTrue(os.path.exists(self.output_root))
def test_delete_output_root(self):
# We create a task in order to get the output path.
task = TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
)
output_marker = task.output().path
open(output_marker, 'a').close()
self.assertTrue(task.complete())
# Once the output path is created, we can
# then confirm that it gets cleaned up..
task = TestJobTask(
mapreduce_engine='local',
output_root=self.output_root,
delete_output_root="true",
)
self.assertFalse(task.complete())
self.assertFalse(os.path.exists(self.output_root))
class TestJobTask(MultiOutputMapReduceJobTask): class TestJobTask(MultiOutputMapReduceJobTask):
"""Dummy task to use for testing.""" """Dummy task to use for testing."""
def output_path_for_key(self, key): def output_path_for_key(self, key):
return '/any/path/' + key return os.path.join(self.output_root, key)
def multi_output_reducer(self, key, values, output_file): def multi_output_reducer(self, key, values, output_file):
for value in values: for value in values:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment