Commit f691aa2b by Gabe Mulley

Output a separate file for each course's answer distribution

Fixes: AN-589
Change-Id: I0156691780f9b9ba0d787d60ae301a32d51b7a26
parent 6e72728e
......@@ -2,13 +2,16 @@
Luigi tasks for extracting problem answer distribution statistics from
tracking log files.
"""
import hashlib
import json
import csv
import luigi
import luigi.hdfs
import luigi.s3
import edx.analytics.tasks.util.eventlog as eventlog
from edx.analytics.tasks.mapreduce import MapReduceJobTask
from edx.analytics.tasks.mapreduce import MapReduceJobTask, MultiOutputMapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.url import get_target_from_url, url_path_join
......@@ -566,6 +569,83 @@ class AnswerDistributionPerCourse(AnswerDistributionPerCourseMixin, BaseAnswerDi
super(AnswerDistributionPerCourse, self).run()
class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
"""
Groups answer distributions by course, producing a different file for each.
All parameters are passed through to :py:class:`AnswerDistributionPerCourse`.
"""
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
name = luigi.Parameter(default='periodic')
answer_metadata = luigi.Parameter(default=None)
def requires(self):
return AnswerDistributionPerCourse(
mapreduce_engine=self.mapreduce_engine,
src=self.src,
dest=self.dest,
include=self.include,
name=self.name,
answer_metadata=self.answer_metadata
)
def mapper(self, line):
"""
Groups inputs by course_id, writes all records with the same course_id to the same output file.
Each input line is expected to consist of tab separated columns. The first column is expected to be the
course_id and is used to group the entries. The course_id is stripped from the output and the remaining columns
are written to the appropriate output file in the same format they were read in (tab separated).
"""
split_line = line.split('\t')
# Ensure that the first column is interpreted as the grouping key by the hadoop streaming API. Note that since
# Configuration values can change this behavior, the remaining tab separated columns are encoded in a python
# structure before returning to hadoop. They are decoded in the reducer.
course_id, content = split_line[0], split_line[1:]
yield course_id, tuple(content)
def output_path_for_key(self, course_id):
"""
Match the course folder hierarchy that is expected by the instructor dashboard.
The instructor dashboard expects the file to be stored in a folder named sha1(course_id). All files in that
directory will be displayed on the instructor dashboard for that course.
"""
hashed_course_id = hashlib.sha1(course_id).hexdigest()
filename_safe_course_id = course_id.replace('/', '_')
filename = '{course_id}_answer_distribution.csv'.format(course_id=filename_safe_course_id)
return url_path_join(self.dest, hashed_course_id, filename)
def multi_output_reducer(self, course_id, values, output_file):
"""
Each entry should be written to the output file in csv format.
This output is visible to instructors, so use an excel friendly format (csv).
"""
field_names = AnswerDistributionPerCourse.get_column_order()
writer = csv.DictWriter(output_file, field_names)
writer.writerow(dict(
(k, k) for k in field_names
))
for content_tuple in values:
# Restore tabs that were removed in the map task. Tabs are special characters to hadoop, so they were
# removed to prevent interpretation of them. Restore them here.
tab_separated_content = '\t'.join(content_tuple)
encoded_dict = dict()
for key, value in json.loads(tab_separated_content).iteritems():
encoded_dict[key] = unicode(value).encode('utf8')
writer.writerow(encoded_dict)
def extra_modules(self):
import cjson
import boto
return [cjson, boto]
################################
# Helper methods
################################
......
......@@ -3,8 +3,12 @@ Support executing map reduce tasks.
"""
from __future__ import absolute_import
import luigi
import luigi.hdfs
import luigi.hadoop
from edx.analytics.tasks.url import get_target_from_url, IgnoredTarget
class MapReduceJobTask(luigi.hadoop.JobTask):
"""
......@@ -28,3 +32,44 @@ class MapReduceJobTask(luigi.hadoop.JobTask):
raise KeyError('A map reduce engine must be specified in order to run MapReduceJobTasks')
return engine_class()
class MultiOutputMapReduceJobTask(MapReduceJobTask):
"""
Produces multiple output files from a map reduce job.
The mapper output tuple key is used to determine the name of the file that reducer results are written to. Different
reduce tasks must not write to the same file. Since all values for a given mapper output key are guaranteed to be
processed by the same reduce task, we only allow a single file to be output per key for safety. In the future, the
reducer output key could be used to determine the output file name, however,
"""
def output(self):
# Unfortunately, Luigi requires an output.
return IgnoredTarget()
def reducer(self, key, values):
"""
Write out values from each key into different output files.
"""
output_path = self.output_path_for_key(key)
if output_path:
output_file_target = get_target_from_url(output_path)
with output_file_target.open('w') as output_file:
self.multi_output_reducer(key, values, output_file)
# Luigi requires the reducer to return an iterable
return iter(tuple())
def multi_output_reducer(self, key, values, output_file):
"""Returns an iterable of strings that are written out to the appropriate output file for this key."""
return iter(tuple())
def output_path_for_key(self, key):
"""
Returns a URL that is unique to the given key.
All values returned from the reducer for the given key will be output to the file specified by the URL returned
from this function.
"""
return None
......@@ -4,10 +4,14 @@ Tests for tasks that calculate answer distributions.
"""
import json
import StringIO
import hashlib
from mock import Mock, call
from edx.analytics.tasks.answer_dist import (
LastProblemCheckEventMixin,
AnswerDistributionPerCourseMixin,
AnswerDistributionOneFilePerCourseTask,
)
from edx.analytics.tasks.tests import unittest
......@@ -635,3 +639,55 @@ class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
def setUp(self):
self.task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest=None,
name=None,
include=None,
)
def test_map_single_value(self):
key, value = next(self.task.mapper('foo\tbar'))
self.assertEquals(key, 'foo')
self.assertEquals(value, ('bar',))
def test_map_multiple_values(self):
key, value = next(self.task.mapper('foo\tbar\tbaz'))
self.assertEquals(key, 'foo')
self.assertEquals(value, ('bar', 'baz'))
def test_reduce_multiple_values(self):
field_names = AnswerDistributionPerCourseMixin.get_column_order()
column_values = [(k, unicode(k) + u'\u2603') for k in field_names]
column_values[3] = (column_values[3][0], 10)
sample_input = json.dumps(dict(column_values))
mock_output_file = Mock()
self.task.multi_output_reducer('foo', iter([(sample_input,), (sample_input,)]), mock_output_file)
expected_header_string = ','.join(field_names) + '\r\n'
self.assertEquals(mock_output_file.write.mock_calls[0], call(expected_header_string))
expected_row_string = ','.join(unicode(v[1]).encode('utf8') for v in column_values) + '\r\n'
self.assertEquals(mock_output_file.write.mock_calls[1], call(expected_row_string))
self.assertEquals(mock_output_file.write.mock_calls[2], call(expected_row_string))
def test_output_path_for_key(self):
course_id = 'foo/bar/baz'
hashed_course_id = hashlib.sha1(course_id).hexdigest()
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest='/tmp',
name='name',
include=None,
)
output_path = task.output_path_for_key(course_id)
self.assertEquals(output_path,
'/tmp/{0}/foo_bar_baz_answer_distribution.csv'.format(hashed_course_id))
from __future__ import absolute_import
from mock import patch, call
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.tests import unittest
class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
def setUp(self):
self.task = TestJobTask(
mapreduce_engine='local'
)
patcher = patch('edx.analytics.tasks.mapreduce.get_target_from_url')
self.mock_get_target = patcher.start()
self.addCleanup(patcher.stop)
def test_reducer(self):
self.assert_values_written_to_file('foo', ['bar', 'baz'])
def assert_values_written_to_file(self, key, values):
self.assertItemsEqual(self.task.reducer(key, values), [])
self.mock_get_target.assert_called_once_with('/any/path/' + key)
mock_target = self.mock_get_target.return_value
mock_file = mock_target.open.return_value.__enter__.return_value
mock_file.write.assert_has_calls([ call(v + '\n') for v in values ])
self.mock_get_target.reset_mock()
def test_multiple_reducer_calls(self):
self.assert_values_written_to_file('foo', ['bar', 'baz'])
self.assert_values_written_to_file('foo2', ['bar2'])
class TestJobTask(MultiOutputMapReduceJobTask):
def output_path_for_key(self, key):
return '/any/path/' + key
def multi_output_reducer(self, key, values, output_file):
for value in values:
output_file.write(value + '\n')
......@@ -18,6 +18,8 @@ import luigi.format
import luigi.hdfs
import luigi.s3
from luigi.target import Target
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
......@@ -27,6 +29,15 @@ class ExternalURL(luigi.ExternalTask):
return get_target_from_url(self.url)
class IgnoredTarget(Target):
def exists(self):
return False
def open(self, mode='r'):
return open('/dev/null', mode)
DEFAULT_TARGET_CLASS = luigi.LocalTarget
URL_SCHEME_TO_TARGET_CLASS = {
'hdfs': luigi.hdfs.HdfsTarget,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment