Commit d23b6a98 by Brian Wilson

Change AnswerDistributionOneFilePerCourseTask to not write a marker file.

Change-Id: I89a9a3882cdc4199de16272c1577374f61b2250f
parent 8395cd3d
...@@ -15,7 +15,7 @@ import luigi.s3 ...@@ -15,7 +15,7 @@ import luigi.s3
import edx.analytics.tasks.util.eventlog as eventlog import edx.analytics.tasks.util.eventlog as eventlog
from edx.analytics.tasks.mapreduce import MapReduceJobTask, MultiOutputMapReduceJobTask from edx.analytics.tasks.mapreduce import MapReduceJobTask, MultiOutputMapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import ExternalURL from edx.analytics.tasks.url import ExternalURL, IgnoredTarget
from edx.analytics.tasks.url import get_target_from_url, url_path_join from edx.analytics.tasks.url import get_target_from_url, url_path_join
import logging import logging
...@@ -680,6 +680,12 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -680,6 +680,12 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
manifest = luigi.Parameter(default=None) manifest = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None) base_input_format = luigi.Parameter(default=None)
def output(self):
# Because this task writes to a shared directory, we don't
# want to include a marker for job success. Use a special
# target that always triggers new runs and never writes out.
return IgnoredTarget()
def requires(self): def requires(self):
return AnswerDistributionPerCourse( return AnswerDistributionPerCourse(
mapreduce_engine=self.mapreduce_engine, mapreduce_engine=self.mapreduce_engine,
......
...@@ -5,6 +5,9 @@ Tests for tasks that calculate answer distributions. ...@@ -5,6 +5,9 @@ Tests for tasks that calculate answer distributions.
import json import json
import StringIO import StringIO
import hashlib import hashlib
import os
import tempfile
import shutil
from mock import Mock, call from mock import Mock, call
...@@ -807,3 +810,62 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase): ...@@ -807,3 +810,62 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
output_path = task.output_path_for_key(course_id) output_path = task.output_path_for_key(course_id)
expected_output_path = '/tmp/{0}/foo_bar_baz_answer_distribution.csv'.format(hashed_course_id) expected_output_path = '/tmp/{0}/foo_bar_baz_answer_distribution.csv'.format(hashed_course_id)
self.assertEquals(output_path, expected_output_path) self.assertEquals(output_path, expected_output_path)
class AnswerDistributionOneFilePerCourseTaskOutputRootTest(unittest.TestCase):
"""Tests for output_root behavior of AnswerDistributionOneFilePerCourseTask."""
def setUp(self):
# Define a real output directory, so it can
# be removed if existing.
def cleanup(dirname):
"""Remove the temp directory only if it exists."""
if os.path.exists(dirname):
shutil.rmtree(dirname)
self.output_root = tempfile.mkdtemp()
self.addCleanup(cleanup, self.output_root)
def test_no_delete_output_root(self):
# Not using the delete_output_root option will
# not delete the output_root.
self.assertTrue(os.path.exists(self.output_root))
AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest=None,
name='name',
include=None,
output_root=self.output_root,
)
self.assertTrue(os.path.exists(self.output_root))
def test_delete_output_root(self):
# We create a task in order to get the output path.
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest=None,
name='name',
include=None,
output_root=self.output_root,
)
# Write to the output path will not mark this task
# as complete.
output_marker = task.output().path
open(output_marker, 'a').close()
self.assertFalse(task.complete())
# But it's still possible to use the delete option
# to get rid of the output_root directory.
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest=None,
name='name',
include=None,
output_root=self.output_root,
delete_output_root="true",
)
self.assertFalse(task.complete())
self.assertFalse(os.path.exists(self.output_root))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment