Commit b9a9cad9 by Brian Wilson

Get answer distributions to run as a remote task.

* Log unknown answer_id values that should be hidden.
* Fix handling of answers with extended characters.
* Add "output_root" argument to AnswerDistributionOneFilePerCourseTask.
* Modify IgnoredTarget to work in remote tasks.
* Fix some more Pep8 and pylint errors.

Change-Id: I8bccc8f9a7028c54773af288d4ac110110c495c3
parent f691aa2b
......@@ -161,6 +161,19 @@ class LastProblemCheckEventMixin(object):
for answer_id in answers:
if not self.is_hidden_answer(answer_id):
answer_value = answers[answer_id]
# Argh. It seems that sometimes we're encountering
# bogus answer_id values. In particular, one that
# is including the possible choice values, instead
# of any actual values selected by the student.
# For now, let's just dump an error and skip it,
# so that it becomes the equivalent of a hidden
# answer. Eventually we would probably want to treat
# it explicitly as a hidden answer.
if answer_id not in correct_map:
log.error("Unexpected answer_id %s not in correct_map: %s", answer_id, event)
continue
correct_entry = correct_map[answer_id]
# We do not know the values for 'input_type',
......@@ -459,7 +472,7 @@ class AnswerDistributionPerCourseMixin(object):
# answer_value may be a list of multiple values, so we need to
# convert it to a string that can be used as an index (i.e. to
# increment a previous occurrence).
return '{value}_{variant}'.format(value=self.stringify(answer_value), variant=variant)
return u'{value}_{variant}'.format(value=self.stringify(answer_value), variant=variant)
@staticmethod
def stringify(answer_value):
......@@ -484,7 +497,7 @@ class AnswerDistributionPerCourseMixin(object):
if isinstance(answer_value, basestring):
return answer_value
elif isinstance(answer_value, list):
return '[{list_val}]'.format(list_val='|'.join(answer_value))
return u'[{list_val}]'.format(list_val=u'|'.join(answer_value))
else:
# unexpected type:
log.error("Unexpected type for an answer_value: %s", answer_value)
......@@ -526,7 +539,7 @@ class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTa
return PathSetTask(self.src, self.include)
def output(self):
output_name = 'last_problem_check_events_{name}'.format(name=self.name)
output_name = u'last_problem_check_events_{name}/'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
......@@ -557,7 +570,7 @@ class AnswerDistributionPerCourse(AnswerDistributionPerCourseMixin, BaseAnswerDi
return self.requires()['events']
def output(self):
output_name = 'answer_distribution_per_course_{name}'.format(name=self.name)
output_name = u'answer_distribution_per_course_{name}/'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
def run(self):
......@@ -573,13 +586,19 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
"""
Groups answer distributions by course, producing a different file for each.
All parameters are passed through to :py:class:`AnswerDistributionPerCourse`.
Most parameters are passed through to :py:class:`AnswerDistributionPerCourse`.
One additional parameter is defined:
output_root: location where the one-file-per-course outputs
are written. This is distinct from `dest`, which is where
intermediate output is written.
"""
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
name = luigi.Parameter(default='periodic')
output_root = luigi.Parameter()
answer_metadata = luigi.Parameter(default=None)
def requires(self):
......@@ -616,10 +635,10 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
"""
hashed_course_id = hashlib.sha1(course_id).hexdigest()
filename_safe_course_id = course_id.replace('/', '_')
filename = '{course_id}_answer_distribution.csv'.format(course_id=filename_safe_course_id)
return url_path_join(self.dest, hashed_course_id, filename)
filename = u'{course_id}_answer_distribution.csv'.format(course_id=filename_safe_course_id)
return url_path_join(self.output_root, hashed_course_id, filename)
def multi_output_reducer(self, course_id, values, output_file):
def multi_output_reducer(self, _course_id, values, output_file):
"""
Each entry should be written to the output file in csv format.
......
......@@ -61,11 +61,11 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
# Luigi requires the reducer to return an iterable
return iter(tuple())
def multi_output_reducer(self, key, values, output_file):
def multi_output_reducer(self, _key, _values, _output_file):
"""Returns an iterable of strings that are written out to the appropriate output file for this key."""
return iter(tuple())
def output_path_for_key(self, key):
def output_path_for_key(self, _key):
"""
Returns a URL that is unique to the given key.
......
......@@ -6,7 +6,6 @@ Supports outputs to HDFS, S3, and local FS.
"""
import os
import boto
import glob
......@@ -15,7 +14,7 @@ import luigi.s3
import luigi.hdfs
import luigi.format
from edx.analytics.tasks.s3_util import join_as_s3_url, generate_s3_sources
from edx.analytics.tasks.s3_util import generate_s3_sources
from edx.analytics.tasks.url import ExternalURL, url_path_join
......@@ -40,7 +39,7 @@ class PathSetTask(luigi.Task):
# connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = boto.connect_s3()
for bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
for _bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = url_path_join(self.src, root, path)
yield ExternalURL(source)
else:
......
......@@ -372,11 +372,25 @@ class LastProblemCheckEventReduceTest(LastProblemCheckEventBaseTest):
answer_data = self._get_answer_data()
self._check_output([input_data], {self.answer_id: answer_data})
def test_bogus_choice_event(self):
# In real data, values appeared in student_answers that were
# not in the correct_map. This was causing a failure.
problem_data = self._create_problem_data_dict()
del problem_data['answers'][self.answer_id]
for bogus_value in ['choice_1', 'choice_2', 'choice_3']:
bogus_answer_id = "{answer_id}_{suffix}".format(
answer_id=self.answer_id, suffix=bogus_value
)
problem_data['answers'][bogus_answer_id] = bogus_value
input_data = (self.timestamp, json.dumps(problem_data))
# The problem should be skipped.
self._check_output([input_data], {})
def test_problem_display_name(self):
problem_data = self._create_problem_data_dict()
problem_data['context']['module'] = {'display_name': "Display Name"}
problem_data['context']['module'] = {'display_name': u"Displ\u0101y Name"}
input_data = (self.timestamp, json.dumps(problem_data))
answer_data = self._get_answer_data(problem_display_name="Display Name")
answer_data = self._get_answer_data(problem_display_name=u"Displ\u0101y Name")
self._check_output([input_data], {self.answer_id: answer_data})
......@@ -413,13 +427,13 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
def _get_answer_data(self, **kwargs):
"""Returns answer data for input with submission information."""
answer_data = {
"answer": "3",
"answer": u"\u00b2",
"problem_display_name": None,
"variant": None,
"correct": False,
"problem_id": self.problem_id,
"input_type": "formulaequationinput",
"question": "Enter the number of fingers on a human hand",
"question": u"Enter the number(\u00ba) of fingers on a human hand",
"response_type": "numericalresponse",
}
answer_data.update(**kwargs)
......@@ -428,7 +442,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
def _get_non_submission_answer_data(self, **kwargs):
"""Returns answer data for input without submission information."""
answer_data = {
"answer_value_id": "3",
"answer_value_id": u'\u00b2',
"problem_display_name": None,
"variant": None,
"correct": False,
......@@ -486,14 +500,14 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
def test_multiple_choice_answer(self):
answer_data = self._get_answer_data(
answer_value_id=['choice_1', 'choice_2', 'choice_4'],
answer=['First Choice', 'Second Choice', 'Fourth Choice'],
answer=[u'First Ch\u014dice', u'Second Ch\u014dice', u'Fourth Ch\u014dice'],
response_type="multiplechoiceresponse",
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(
answer_data,
ValueID='[choice_1|choice_2|choice_4]',
AnswerValue='[First Choice|Second Choice|Fourth Choice]'
AnswerValue=u'[First Ch\u014dice|Second Ch\u014dice|Fourth Ch\u014dice]'
)
self._check_output([input_data], (expected_output,))
......@@ -553,7 +567,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
"""Defines some metadata for test answer."""
metadata_dict = {
self.answer_id: {
"question": "Pick One or Two",
"question": u"Pick One or \u00b2",
"response_type": "multiplechoiceresponse",
"input_type": "my_input_type",
"problem_display_name": self.problem_display_name,
......@@ -565,7 +579,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
def test_non_submission_choice_with_metadata(self):
self._load_metadata(
answer_value_id_map={"choice_1": "First Choice", "choice_2": "Second Choice"}
answer_value_id_map={"choice_1": u"First Ch\u014dice", "choice_2": u"Second Ch\u014dice"}
)
answer_data = self._get_non_submission_answer_data(
answer_value_id='choice_1',
......@@ -574,8 +588,8 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
expected_output = self._get_expected_output(
answer_data,
ValueID='choice_1',
AnswerValue='First Choice',
Question="Pick One or Two",
AnswerValue=u'First Ch\u014dice',
Question=u"Pick One or \u00b2",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
......@@ -592,7 +606,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
answer_data,
ValueID='[choice_1|choice_2]',
AnswerValue='[First Choice|Second Choice]',
Question="Pick One or Two",
Question=u"Pick One or \u00b2",
)
expected_output["Problem Display Name"] = self.problem_display_name
......@@ -608,7 +622,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
answer_data,
ValueID='[choice_1|choice_2]',
AnswerValue='',
Question="Pick One or Two",
Question=u"Pick One or \u00b2",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
......@@ -623,7 +637,7 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
answer_data,
ValueID='choice_1',
AnswerValue='',
Question="Pick One or Two",
Question=u"Pick One or \u00b2",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
......@@ -634,14 +648,15 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(
answer_data,
AnswerValue='3',
Question="Pick One or Two",
AnswerValue=u'\u00b2',
Question=u"Pick One or \u00b2",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
"""Tests for AnswerDistributionOneFilePerCourseTask class."""
def setUp(self):
self.task = AnswerDistributionOneFilePerCourseTask(
......@@ -650,6 +665,7 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
dest=None,
name=None,
include=None,
output_root=None,
)
def test_map_single_value(self):
......@@ -684,10 +700,11 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest='/tmp',
dest=None,
name='name',
include=None,
output_root='/tmp',
)
output_path = task.output_path_for_key(course_id)
self.assertEquals(output_path,
'/tmp/{0}/foo_bar_baz_answer_distribution.csv'.format(hashed_course_id))
expected_output_path = '/tmp/{0}/foo_bar_baz_answer_distribution.csv'.format(hashed_course_id)
self.assertEquals(output_path, expected_output_path)
"""Tests for classes defined in mapreduce.py."""
from __future__ import absolute_import
from mock import patch, call
......@@ -7,6 +9,7 @@ from edx.analytics.tasks.tests import unittest
class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
"""Tests for MultiOutputMapReduceJobTask."""
def setUp(self):
self.task = TestJobTask(
......@@ -21,13 +24,14 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
self.assert_values_written_to_file('foo', ['bar', 'baz'])
def assert_values_written_to_file(self, key, values):
"""Confirm that values passed to reducer appear in output file."""
self.assertItemsEqual(self.task.reducer(key, values), [])
self.mock_get_target.assert_called_once_with('/any/path/' + key)
mock_target = self.mock_get_target.return_value
mock_file = mock_target.open.return_value.__enter__.return_value
mock_file.write.assert_has_calls([ call(v + '\n') for v in values ])
mock_file.write.assert_has_calls([call(v + '\n') for v in values])
self.mock_get_target.reset_mock()
......@@ -37,6 +41,7 @@ class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
class TestJobTask(MultiOutputMapReduceJobTask):
"""Dummy task to use for testing."""
def output_path_for_key(self, key):
return '/any/path/' + key
......
"""Tests for URL-related functionality."""
import luigi
import luigi.format
import luigi.hdfs
......@@ -8,6 +9,7 @@ from edx.analytics.tasks.tests import unittest
class TargetFromUrlTestCase(unittest.TestCase):
"""Tests for get_target_from_url()."""
def test_hdfs_scheme(self):
for test_url in ['s3://foo/bar', 'hdfs://foo/bar', 's3n://foo/bar']:
......@@ -32,7 +34,7 @@ class TargetFromUrlTestCase(unittest.TestCase):
test_url = 's3://foo/bar/'
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.hdfs.HdfsTarget)
self.assertEquals(target.path, test_url)
self.assertEquals(target.path, test_url[:-1])
self.assertEquals(target.format, luigi.hdfs.PlainDir)
def test_gzip_local_file(self):
......@@ -44,6 +46,7 @@ class TargetFromUrlTestCase(unittest.TestCase):
class UrlPathJoinTestCase(unittest.TestCase):
"""Tests for url_path_join()."""
def test_relative(self):
self.assertEquals(url.url_path_join('s3://foo/bar', 'baz'), 's3://foo/bar/baz')
......@@ -65,9 +68,6 @@ class UrlPathJoinTestCase(unittest.TestCase):
self.assertEquals(url.url_path_join('s3://foo/bar', '///baz'), 's3://foo///baz')
self.assertEquals(url.url_path_join('s3://foo/bar', 'baz//bar'), 's3://foo/bar/baz//bar')
def test_extra_separators(self):
self.assertEquals(url.url_path_join('s3://foo/bar', '///baz'), 's3://foo///baz')
def test_multiple_elements(self):
self.assertEquals(url.url_path_join('s3://foo', 'bar', 'baz'), 's3://foo/bar/baz')
self.assertEquals(url.url_path_join('s3://foo', 'bar/bing', 'baz'), 's3://foo/bar/bing/baz')
......@@ -18,8 +18,6 @@ import luigi.format
import luigi.hdfs
import luigi.s3
from luigi.target import Target
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
......@@ -29,7 +27,10 @@ class ExternalURL(luigi.ExternalTask):
return get_target_from_url(self.url)
class IgnoredTarget(Target):
class IgnoredTarget(luigi.hdfs.HdfsTarget):
"""Dummy target for use in Hadoop jobs that produce no explicit output file."""
def __init__(self):
super(IgnoredTarget, self).__init__(is_tmp=True)
def exists(self):
return False
......@@ -60,6 +61,7 @@ def get_target_from_url(url):
if url.endswith('.gz'):
kwargs['format'] = luigi.format.Gzip
url = url.rstrip('/')
return target_class(url, **kwargs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment