Commit 6a17d980 by Brian Wilson

AnswerDistributionOneFilePerCourseTask changed to output rows in sorted order.

Change-Id: I441c50fc917edae695314bad0e67c6ee336fd62e
parent 5cbcb347
...@@ -5,6 +5,7 @@ tracking log files. ...@@ -5,6 +5,7 @@ tracking log files.
import hashlib import hashlib
import json import json
import csv import csv
from operator import itemgetter
import luigi import luigi
import luigi.hdfs import luigi.hdfs
...@@ -355,13 +356,13 @@ class AnswerDistributionPerCourseMixin(object): ...@@ -355,13 +356,13 @@ class AnswerDistributionPerCourseMixin(object):
return [ return [
'ModuleID', 'ModuleID',
'PartID', 'PartID',
'Correct Answer',
'Count',
'ValueID', 'ValueID',
'AnswerValue', 'AnswerValue',
'Variant', 'Variant',
'Problem Display Name', 'Problem Display Name',
'Question', 'Question',
'Correct Answer',
'Count',
] ]
def load_answer_metadata(self, answer_metadata_file): def load_answer_metadata(self, answer_metadata_file):
...@@ -662,13 +663,22 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -662,13 +663,22 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
writer.writerow(dict( writer.writerow(dict(
(k, k) for k in field_names (k, k) for k in field_names
)) ))
# Collect in memory the list of dicts to be output.
row_data = []
for content_tuple in values: for content_tuple in values:
# Restore tabs that were removed in the map task. Tabs are special characters to hadoop, so they were # Restore tabs that were removed in the map task. Tabs
# removed to prevent interpretation of them. Restore them here. # are special characters to hadoop, so they were removed
# to prevent interpretation of them. Restore them here.
tab_separated_content = '\t'.join(content_tuple) tab_separated_content = '\t'.join(content_tuple)
row_data.append(json.loads(tab_separated_content))
# Sort the list of dicts by their field names before encoding.
row_data = sorted(row_data, key=itemgetter(*field_names))
for row_dict in row_data:
encoded_dict = dict() encoded_dict = dict()
for key, value in json.loads(tab_separated_content).iteritems(): for key, value in row_dict.iteritems():
encoded_dict[key] = unicode(value).encode('utf8') encoded_dict[key] = unicode(value).encode('utf8')
writer.writerow(encoded_dict) writer.writerow(encoded_dict)
......
...@@ -694,19 +694,27 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase): ...@@ -694,19 +694,27 @@ class AnswerDistributionOneFilePerCourseTaskTest(unittest.TestCase):
def test_reduce_multiple_values(self): def test_reduce_multiple_values(self):
field_names = AnswerDistributionPerCourseMixin.get_column_order() field_names = AnswerDistributionPerCourseMixin.get_column_order()
column_values = [(k, unicode(k) + u'\u2603') for k in field_names]
column_values[3] = (column_values[3][0], 10) # To test sorting, the first sample is made to sort after the
sample_input = json.dumps(dict(column_values)) # second sample.
column_values_2 = [(k, unicode(k) + u'\u2603') for k in field_names]
column_values_2[3] = (column_values_2[3][0], 10)
column_values_1 = list(column_values_2)
column_values_1[4] = (column_values_1[4][0], u'ZZZZZZZZZZZ')
sample_input_1 = json.dumps(dict(column_values_1))
sample_input_2 = json.dumps(dict(column_values_2))
mock_output_file = Mock() mock_output_file = Mock()
self.task.multi_output_reducer('foo', iter([(sample_input,), (sample_input,)]), mock_output_file) self.task.multi_output_reducer('foo', iter([(sample_input_1,), (sample_input_2,)]), mock_output_file)
expected_header_string = ','.join(field_names) + '\r\n' expected_header_string = ','.join(field_names) + '\r\n'
self.assertEquals(mock_output_file.write.mock_calls[0], call(expected_header_string)) self.assertEquals(mock_output_file.write.mock_calls[0], call(expected_header_string))
expected_row_string = ','.join(unicode(v[1]).encode('utf8') for v in column_values) + '\r\n' # Confirm that the second sample appears before the first.
self.assertEquals(mock_output_file.write.mock_calls[1], call(expected_row_string)) expected_row_1 = ','.join(unicode(v[1]).encode('utf8') for v in column_values_2) + '\r\n'
self.assertEquals(mock_output_file.write.mock_calls[2], call(expected_row_string)) self.assertEquals(mock_output_file.write.mock_calls[1], call(expected_row_1))
expected_row_2 = ','.join(unicode(v[1]).encode('utf8') for v in column_values_1) + '\r\n'
self.assertEquals(mock_output_file.write.mock_calls[2], call(expected_row_2))
def test_output_path_for_key(self): def test_output_path_for_key(self):
course_id = 'foo/bar/baz' course_id = 'foo/bar/baz'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment