Commit b82f7a8e by Brian Wilson

Add implementation of AnswerDistributionByCourse.

Implemented as two tasks.

Change-Id: I9219e42f7e347242b4cea2515f49480b2390ed16
parent c6d849a8
"""
Luigi tasks for extracting problem answer distribution statistics from
tracking log files.
"""
import json
import luigi
import luigi.s3
import edx.analytics.tasks.util.eventlog as eventlog
from edx.analytics.tasks.mapreduce import MapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.url import get_target_from_url, url_path_join
import logging
log = logging.getLogger(__name__)
################################
# Task Map-Reduce definitions
################################
UNKNOWN_ANSWER_VALUE = ''
UNMAPPED_ANSWER_VALUE = ''
class LastProblemCheckEventMixin(object):
"""Identifies last problem_check event for a user on a problem in a course, given raw event log input."""
def mapper(self, line):
"""
Generates output values for explicit problem_check events.
Args:
line: text line from a tracking event log.
Returns:
(problem_id, username), (timestamp, problem_check_info)
where timestamp is in ISO format, with resolution to the millisecond
and problem_check_info is a JSON-serialized dict
containing the contents of the problem_check event's
'event' field, augmented with entries for 'timestamp',
'username', and 'context' from the event.
or None if there is no valid problem_check event on the line.
Example:
(i4x://edX/DemoX/Demo_Course/problem/PS1_P1, dummy_username), (2013-09-10T00:01:05.123456, blah)
"""
parsed_tuple_or_none = get_problem_check_event(line)
if parsed_tuple_or_none is not None:
yield parsed_tuple_or_none
def reducer(self, _key, values):
"""
Calculate a list of answers from the final response of a user to a problem in a course.
Args:
key: (problem_id, username)
values: iterator of (timestamp, problem_check_info)
Yields:
list of answer data tuples, where a tuple consists of:
(course_id, answer_id), (timestamp, answer_data)
where answer_data is a json-encoded dict, containing:
'problem_id': the id of the problem (i4x)
'problem_display_name': the display name for the problem
'answer': if an event with 'submission' information,
this is the text of the answer. For events with no
'submission' information, this is not defined.
'answer_value_id': if an event with 'submission'
information, this is the moniker for the answer, and
is not defined if there is no moniker. For events
with no 'submission' information, this holds either
the moniker (if used) or the answer (if no moniker is
used).
'question': the display text for the problem part being answered, if available.
'correct': boolean if the answer is correct.
'variant': seed value
"""
# Sort input values (by timestamp) to easily detect the most
# recent answer to a problem by a particular user. Note that
# this assumes the timestamp values (strings) are in ISO
# representation, so that the tuples will be ordered in
# ascending time value.
values = sorted(values)
if not values:
return
# Get the last entry.
_timestamp, most_recent_event = values[-1]
for answer in self._generate_answers(most_recent_event):
yield answer
def _generate_answers(self, event_string):
"""
Generates a list of answers given a problem_check event.
Args:
event_string: a json-encoded string version of an event's data.
Returns:
list of answer data tuples.
See docstring for reducer() for more details.
"""
event = json.loads(event_string)
# Get context information:
course_id = event.get('context').get('course_id')
timestamp = event.get('timestamp')
problem_id = event.get('problem_id')
problem_display_name = event.get('context').get('module', {}).get('display_name', None)
result = []
def append_submission(answer_id, submission):
"""Convert submission to result to be returned."""
# First augment submission with problem-level information
# not found in the submission:
submission['problem_id'] = problem_id
submission['problem_display_name'] = problem_display_name
# Add the timestamp so that all responses can be sorted in order.
# We want to use the "latest" values for some fields.
output_key = (course_id, answer_id)
output_value = (timestamp, json.dumps(submission))
result.append((output_key, output_value))
answers = event.get('answers')
if 'submission' in event:
submissions = event.get('submission')
for answer_id in submissions:
if not self.is_hidden_answer(answer_id):
submission = submissions.get(answer_id)
# But submission doesn't contain moniker value for answer.
# So we check the raw answers, and see if its value is
# different. If so, we assume it's a moniker.
answer_value = answers[answer_id]
if answer_value != submission.get('answer'):
submission['answer_value_id'] = answer_value
append_submission(answer_id, submission)
else:
# Otherwise, it's an older event with no 'submission'
# information, so parse it as well as possible.
answers = event.get('answers')
correct_map = event.get('correct_map')
for answer_id in answers:
if not self.is_hidden_answer(answer_id):
answer_value = answers[answer_id]
correct_entry = correct_map[answer_id]
# We do not know the values for 'input_type',
# 'response_type', or 'question'. We also don't know if
# answer_value should be identified as 'answer_value_id' or
# 'answer', so we choose to use 'answer_value_id' here and
# never define 'answer'. This allows disambiguation from
# events with a submission field, which will always have
# an 'answer' and only sometimes have an 'answer_value_id'.
submission = {
'answer_value_id': answer_value,
'correct': correct_entry.get('correctness'),
'variant': event.get('seed'),
}
append_submission(answer_id, submission)
return result
def is_hidden_answer(self, answer_id):
"""Check Id to identify hidden kinds of values."""
# some problems have additional answers that have '_dynamath' appended
# to the regular answer_id. In this case, the contents seem to contain
# something like:
#
# <math xmlns="http://www.w3.org/1998/Math/MathML">
# <mstyle displaystyle="true">
# <mo></mo>
# </mstyle>
# </math>
if answer_id.endswith('_dynamath'):
return True
# Others seem to end with _comment, and I don't know yet what these
# look like.
if answer_id.endswith('_comment'):
return True
return False
class AnswerDistributionPerCourseMixin(object):
"""Calculates answer distribution on a problem in a course, given per-user answers by date."""
def mapper(self, line):
"""
Args: tab-delimited values in a single text line
Yields: (course_id, answer_id), (timestamp, answer_data)
Example:
(edX/DemoX/Demo_Course, i4x-edX-DemoX-problem-c554538a57664fac80783b99d9d6da7c_2_1),
(2013-09-10T01:10:25.012345, TBD)
"""
course_id, answer_id, date, answer_data = line.split('\t')
yield (course_id, answer_id), (date, answer_data)
def reducer(self, key, values):
"""
Calculate a JSON dict for each unique answer to a problem in a course.
Args:
key: (course_id, answer_id)
values: iterator of (timestamp, answer_data)
Yields:
list of answer data tuples, where a tuple consists of:
course_id, answer_json
where answer_json is a JSON string corresponding to a
particular response value to a particular "answer" within
a problem. The JSON includes metadata about the particular
answer, the value of the answer, and the count of how many
users for whom it was an answer.
"""
course_id, answer_id = key
values = sorted(values)
if not values:
return
# Get the last entry. We will use its values to provide
# metadata about the particular answer.
_timestamp, most_recent_answer_string = values[-1]
most_recent_answer = json.loads(most_recent_answer_string)
self.add_metadata_to_answer(answer_id, most_recent_answer)
# Determine if any answers should be included based on
# information in the most recent answer.
if not self.should_include_answer(most_recent_answer):
return
# Now construct answer distribution for this input.
problem_id = most_recent_answer.get('problem_id')
problem_display_name = most_recent_answer.get('problem_display_name')
most_recent_question = most_recent_answer.get('question', '')
answer_uses_code = ('answer_value_id' in most_recent_answer)
answer_dist = {}
for _timestamp, value_string in reversed(values):
answer = json.loads(value_string)
self.add_metadata_to_answer(answer_id, answer)
answer_grouping_key = self.get_answer_grouping_key(answer)
# TODO: add check here to see if the number of distinct
# variants for the problem is high enough to trigger
# abandoning the output of the distribution.
# If this is the first entry we find that has this value,
# then save out the relevant metadata about this value.
# We only want this from the most recent answer that has
# this value.
if answer_grouping_key not in answer_dist:
if answer_uses_code:
# The most recent overall answer indicates that
# the code should be returned as such. If this
# particular answer did not have 'submission'
# information, it may not have an answer_value, so
# we flag it.
value_id = answer['answer_value_id']
answer_value = answer.get('answer', UNKNOWN_ANSWER_VALUE)
else:
# There should be no value_id returned. If the
# current answer did not have 'submission'
# information, then move the value from the
# 'answer_value_id' to the 'answer' field.
value_id = ""
answer_value = answer.get('answer', answer.get('answer_value_id'))
# These values may be lists, so convert to output format.
value_id = self.stringify(value_id)
answer_value = self.stringify(answer_value)
# If there is a variant, then the question might not be
# the same for all variants presented to students. So
# we take the value (if any) provided in this variant.
# If there is no variant, then the question should be
# the same, and we want to go with the most recently
# defined value.
if answer.get('variant'):
question = answer.get('question', '')
else:
question = most_recent_question
# Key values here should match those used in get_column_order().
answer_dist[answer_grouping_key] = {
'ModuleID': problem_id,
'PartID': answer_id,
'ValueID': value_id,
'AnswerValue': answer_value,
'Variant': answer.get('variant'),
'Problem Display Name': problem_display_name,
'Question': question,
'Correct Answer': '1' if answer.get('correct') else '0',
'Count': 0,
}
# For most cases, just increment a counter:
answer_dist[answer_grouping_key]['Count'] += 1
# Finally dispatch the answers, providing the course_id as a
# key so that the answers belonging to a course will be
# gathered downstream into a report.
for answer_entry in answer_dist.values():
# Transform the entry into a form suitable for output.
yield course_id, json.dumps(answer_entry)
@classmethod
def get_column_order(cls):
"""Return column order to use for Answer Distribution report."""
# Key values here should match those used in the answer dict being output.
return [
'ModuleID',
'PartID',
'ValueID',
'AnswerValue',
'Variant',
'Problem Display Name',
'Question',
'Correct Answer',
'Count',
]
def load_answer_metadata(self, answer_metadata_file):
"""
Load metadata for answers that may lack it in problem_check events.
Information is read from a JSON file, with dict keyed by
answer_id, where "answer_id" is the i4x identifier for
particular answer.
Expected fields in dict are:
"problem_display_name": contains display name of containing Problem.
"input_type": xml element name for the input type.
"response_type": xml element name for the response type.
"question": contains question displayed to user.
"answer_value_id_map": dict with key equal to 'answer_value_id' values,
and displayed text as its value.
Stores data internally as a dict, keyed on answer_id.
This information was added to problem_check events, but this
provides a mechanism for providing the information for those
problem_check events that occurred before this addition was
made.
"""
self.answer_metadata_dict = json.load(answer_metadata_file) # pylint: disable=attribute-defined-outside-init
def add_metadata_to_answer(self, answer_id, answer):
"""
Add externally-provided metadata for answers that lack it.
See docstring for load_answer_metadata() for list of fields.
Adds these fields to the answer if the answer lacks a
non-empty value. Uses the answer_value_id_map to provide a
corresponding 'answer' when only an 'answer_value_id' is
available. These are done for answers that are derived from
problem_check events that lack these fields, because they
occurred before the information was added to events.
"""
# The 'answer_metadata_dict' should only exist if load_answer_metadata() is called.
answer_metadata = getattr(self, 'answer_metadata_dict', {}).get(answer_id)
if answer_metadata is not None:
for key, value in answer_metadata.iteritems():
# Should only add values that are not already present
# (and non-null). Also skips over values that are not
# strings (such as the answer_value_id_map), as this is
# handled separately below.
if not answer.get(key) and isinstance(value, basestring):
answer[key] = value
if 'answer' not in answer:
response_type = answer.get('response_type')
if response_type in ['choiceresponse', 'multiplechoiceresponse']:
# We leave what we have in 'answer_value_id', and look
# up the 'answer' to use from the
# answer_metadata_dict, based on the value(s) in
# 'answer_value_id'.
if 'answer_value_id_map' in answer_metadata:
answer_value_id = answer['answer_value_id']
answer_value_id_map = answer_metadata['answer_value_id_map']
get_answer_value = lambda code: answer_value_id_map.get(code, UNMAPPED_ANSWER_VALUE)
if isinstance(answer_value_id, basestring):
answer['answer'] = get_answer_value(answer_value_id)
elif isinstance(answer_value_id, list):
answer['answer'] = [get_answer_value(code) for code in answer_value_id]
else:
# The 'answer_value_id' is really the 'answer', so move it.
answer['answer'] = answer['answer_value_id']
del answer['answer_value_id']
def should_include_answer(self, answer):
"""Determine if a problem "part" should be included in the distribution."""
response_type = answer.get('response_type')
# For problems which only have old responses, we don't
# have information about whether to include their answers.
if response_type is None:
return False
# At some point, we could make this more parameterized, but
# support for other types would likely require special
# handling here anyway.
valid_types = set([
'choiceresponse',
'optionresponse',
'multiplechoiceresponse',
'numericalresponse',
'stringresponse',
'formularesponse',
])
if response_type in valid_types:
return True
return False
def get_answer_grouping_key(self, answer):
"""Return value to use for uniquely identify an answer value in the distribution."""
variant = answer.get('variant', 'NO_VARIANT')
# Events that lack 'submission' information will have a value
# for 'answer_value_id' and none for 'answer'. Events with
# 'submission' information will have the reverse situation
# most of the time, but both values filled in for multiple
# choice. In the latter case, we need to use the
# answer_value_id for comparison.
if 'answer_value_id' in answer:
answer_value = answer.get('answer_value_id')
else:
answer_value = answer.get('answer')
# answer_value may be a list of multiple values, so we need to
# convert it to a string that can be used as an index (i.e. to
# increment a previous occurrence).
return '{value}_{variant}'.format(value=self.stringify(answer_value), variant=variant)
@staticmethod
def stringify(answer_value):
"""
Convert answer value to a canonical string representation.
If answer_value is a list, then returns list values
surrounded by square brackets and delimited by pipes
(e.g. "[choice_1|choice_3|choice_4]").
If answer_value is a string, just returns as-is.
"""
# If it's a list, convert to a string. Note that it's not
# enough to call str() or unicode(), as this will appear as
# "[u'choice_5']".
# TODO: also need to strip out XML tags here, that may be
# appearing for the answer text displayed for choices. But
# what happens if the answer is not a choice, but is a text
# string, and that string happens to have XML-like markup in
# it?
if isinstance(answer_value, basestring):
return answer_value
elif isinstance(answer_value, list):
return '[{list_val}]'.format(list_val='|'.join(answer_value))
else:
# unexpected type:
log.error("Unexpected type for an answer_value: %s", answer_value)
return unicode(answer_value)
##################################
# Task requires/output definitions
##################################
class BaseAnswerDistributionTask(MapReduceJobTask):
"""
Base class for answer distribution calculations.
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
"""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
def extra_modules(self):
# Boto is used for S3 access and cjson for parsing log files.
import boto
import cjson
return [boto, cjson]
class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTask):
"""Identifies last problem_check event for a user on a problem in a course, given raw event log input."""
def requires(self):
return PathSetTask(self.src, self.include)
def output(self):
output_name = 'last_problem_check_events_{name}'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
class AnswerDistributionPerCourse(AnswerDistributionPerCourseMixin, BaseAnswerDistributionTask):
"""
Calculates answer distribution on a problem in a course, given per-user answers by date.
Additional Parameters:
answer_metadata: optional file to provide information about particular answers.
Includes problem_display_name, input_type, response_type, and question.
"""
answer_metadata = luigi.Parameter(default=None)
def requires(self):
results = {
'events': LastProblemCheckEvent(self.mapreduce_engine, self.name, self.src, self.dest, self.include),
}
if self.answer_metadata:
results.update({'answer_metadata': ExternalURL(self.answer_metadata)})
return results
def requires_hadoop(self):
# Only pass the input files on to hadoop, not any metadata file.
return self.requires()['events']
def output(self):
output_name = 'answer_distribution_per_course_{name}'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
def run(self):
# Define answer_metadata on the object if specified.
if 'answer_metadata' in self.input():
with self.input()['answer_metadata'].open('r') as answer_metadata_file:
self.load_answer_metadata(answer_metadata_file)
super(AnswerDistributionPerCourse, self).run()
################################
# Helper methods
################################
def get_problem_check_event(line):
"""
Generates output values for explicit problem_check events.
Args:
line: text line from a tracking event log.
Returns:
(problem_id, username), (timestamp, problem_check_info)
where timestamp is in ISO format, with resolution to the millisecond
and problem_check_info is a JSON-serialized dict containing
the contents of the problem_check event's 'event' field,
augmented with entries for 'timestamp', 'username', and
'context' from the event.
or None if there is no valid problem_check event on the line.
Example:
(i4x://edX/DemoX/Demo_Course/problem/PS1_P1, dummy_username), (2013-09-10T00:01:05.123456, blah)
"""
# Parse the line into a dict.
event = eventlog.parse_json_server_event(line, 'problem_check')
if event is None:
return None
# Get the "problem data". This is the event data, the context, and anything else that would
# be useful further downstream. (We could just pass the entire event dict?)
# Get the user from the username, not from the user_id in the
# context. While we are currently requiring context (as described
# above), we might not in future. Older events will not have
# context information, so we can't rely on user_id from there.
# And we don't expect problem_check events to occur without a
# username, and don't expect them to occur with the wrong user
# (i.e. one user acting on behalf of another, as in an instructor
# acting on behalf of a student).
augmented_data_fields = ['context', 'username', 'timestamp']
problem_data = eventlog.get_augmented_event_data(event, augmented_data_fields)
if problem_data is None:
return None
# Get the course_id from context. We won't work with older events
# that do not have context information, since they do not directly
# provide course_id information. (The problem_id/answer_id values
# contain the org and course name, but not the run.) Course_id
# information could be found from other events, but it would
# require expanding the events being selected.
course_id = problem_data.get('context').get('course_id')
if course_id is None:
log.error("encountered explicit problem_check event with missing course_id: %s", event)
return None
if not eventlog.is_valid_course_id(course_id):
log.error("encountered explicit problem_check event with bogus course_id: %s", event)
return None
# Get the problem_id from the event data.
problem_id = problem_data.get('problem_id')
if problem_id is None:
log.error("encountered explicit problem_check event with bogus problem_id: %s", event)
return None
problem_data_json = json.dumps(problem_data)
key = (course_id, problem_id, problem_data.get('username'))
value = (problem_data.get('timestamp'), problem_data_json)
return key, value
"""
Tests for tasks that calculate answer distributions.
"""
import json
import StringIO
from edx.analytics.tasks.answer_dist import (
LastProblemCheckEventMixin,
AnswerDistributionPerCourseMixin,
)
from edx.analytics.tasks.tests import unittest
class LastProblemCheckEventBaseTest(unittest.TestCase):
"""Base test class for testing LastProblemCheckEventMixin."""
def setUp(self):
self.task = LastProblemCheckEventMixin()
self.course_id = "MITx/7.00x/2013_Spring"
self.org_id = self.course_id.split('/')[0]
self.problem_id = "i4x://MITx/7.00x/2013_Spring/problem/PSet1:PS1_Q1"
self.answer_id = "i4x-MITx-7_00x-problem-PSet1_PS1_Q1_2_1"
self.username = 'test_user'
self.user_id = 24
self.timestamp = "2013-12-17T15:38:32.805444"
self.key = (self.course_id, self.problem_id, self.username)
def _create_event_log_line(self, **kwargs):
"""Create an event log with test values, as a JSON string."""
return json.dumps(self._create_event_dict(**kwargs))
def _create_event_data_dict(self, **kwargs):
event_data = {
"problem_id": self.problem_id,
"seed": 1,
"attempts": 2,
"answers": {self.answer_id: "3"},
"correct_map": {
self.answer_id: {
"queuestate": None,
"npoints": None,
"msg": "",
"correctness": "incorrect",
"hintmode": None,
"hint": ""
},
},
"state": {
"input_state": {self.answer_id: None},
"correct_map": None,
"done": False,
"seed": 1,
"student_answers": {self.answer_id: "1"},
},
"grade": 0,
"max_grade": 1,
"success": "incorrect",
}
self._update_with_kwargs(event_data, **kwargs)
return event_data
@staticmethod
def _update_with_kwargs(data_dict, **kwargs):
# Update from kwargs only if it modifies a top-level value.
for key, value in kwargs.iteritems():
if key in data_dict:
data_dict[key] = value
def _create_event_context(self, **kwargs):
context = {
"course_id": self.course_id,
"org_id": self.org_id,
"user_id": self.user_id,
}
self._update_with_kwargs(context, **kwargs)
return context
def _create_problem_data_dict(self, **kwargs):
problem_data = self._create_event_data_dict(**kwargs)
problem_data['timestamp'] = self.timestamp
problem_data['username'] = self.username
problem_data['context'] = self._create_event_context(**kwargs)
self._update_with_kwargs(problem_data, **kwargs)
return problem_data
def _create_event_dict(self, **kwargs):
"""Create an event log with test values, as a dict."""
# Define default values for event log entry.
event_dict = {
"username": self.username,
"host": "test_host",
"event_source": "server",
"event_type": "problem_check",
"context": self._create_event_context(**kwargs),
"time": "{0}+00:00".format(self.timestamp),
"ip": "127.0.0.1",
"event": self._create_event_data_dict(**kwargs),
"agent": "blah, blah, blah",
"page": None
}
self._update_with_kwargs(event_dict, **kwargs)
return event_dict
class LastProblemCheckEventMapTest(LastProblemCheckEventBaseTest):
"""Tests to verify that event log parsing by mapper works correctly."""
def assert_no_output_for(self, line):
"""Assert that an input line generates no output."""
self.assertEquals(tuple(self.task.mapper(line)), tuple())
def test_non_problem_check_event(self):
line = 'this is garbage'
self.assert_no_output_for(line)
def test_unparseable_problem_check_event(self):
line = 'this is garbage but contains problem_check'
self.assert_no_output_for(line)
def test_browser_event_source(self):
line = self._create_event_log_line(event_source='browser')
self.assert_no_output_for(line)
def test_missing_event_source(self):
line = self._create_event_log_line(event_source=None)
self.assert_no_output_for(line)
def test_missing_username(self):
line = self._create_event_log_line(username=None)
self.assert_no_output_for(line)
def test_missing_event_type(self):
event_dict = self._create_event_dict()
event_dict['old_event_type'] = event_dict['event_type']
del event_dict['event_type']
line = json.dumps(event_dict)
self.assert_no_output_for(line)
def test_implicit_problem_check_event_type(self):
line = self._create_event_log_line(event_type='implicit/event/ending/with/problem_check')
self.assert_no_output_for(line)
def test_bad_datetime(self):
line = self._create_event_log_line(time='this is a bogus time')
self.assert_no_output_for(line)
def test_bad_event_data(self):
line = self._create_event_log_line(event=["not an event"])
self.assert_no_output_for(line)
def test_missing_course_id(self):
line = self._create_event_log_line(context={})
self.assert_no_output_for(line)
def test_illegal_course_id(self):
line = self._create_event_log_line(course_id=";;;;bad/id/val")
self.assert_no_output_for(line)
def test_missing_problem_id(self):
line = self._create_event_log_line(problem_id=None)
self.assert_no_output_for(line)
def test_missing_context(self):
line = self._create_event_log_line(context=None)
self.assert_no_output_for(line)
def test_good_problem_check_event(self):
event = self._create_event_dict()
line = json.dumps(event)
mapper_output = tuple(self.task.mapper(line))
expected_data = self._create_problem_data_dict()
expected_key = self.key
self.assertEquals(len(mapper_output), 1)
self.assertEquals(len(mapper_output[0]), 2)
self.assertEquals(mapper_output[0][0], expected_key)
self.assertEquals(len(mapper_output[0][1]), 2)
self.assertEquals(mapper_output[0][1][0], self.timestamp)
# apparently the output of json.dumps() is not consistent enough
# to compare, due to ordering issues. So compare the dicts
# rather than the JSON strings.
actual_info = mapper_output[0][1][1]
actual_data = json.loads(actual_info)
self.assertEquals(actual_data, expected_data)
class LastProblemCheckEventReduceTest(LastProblemCheckEventBaseTest):
"""
Verify that LastProblemCheckEventMixin.reduce() works correctly.
"""
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return tuple(self.task.reducer(self.key, values))
def _check_output(self, inputs, expected):
"""Compare generated with expected output."""
reducer_output = self._get_reducer_output(inputs)
self.assertEquals(len(reducer_output), len(expected))
for i, reducer_value in enumerate(reducer_output):
expected_value = expected[i]
self.assertEquals(len(reducer_value), 2)
self.assertEquals(reducer_value[0], expected_value[0])
self.assertEquals(len(reducer_value[1]), 2)
self.assertEquals(reducer_value[1][0], self.timestamp)
# apparently the output of json.dumps() is not consistent enough
# to compare, due to ordering issues. So compare the dicts
# rather than the JSON strings.
actual_info = reducer_value[1][1]
actual_data = json.loads(actual_info)
expected_data = expected_value[1][1]
self.assertEquals(actual_data, expected_data)
def test_no_events(self):
self._check_output([], tuple())
def test_one_answer_event(self):
problem_data = self._create_problem_data_dict()
input_data = (self.timestamp, json.dumps(problem_data))
answer_data = {
"answer_value_id": "3",
"problem_display_name": None,
"variant": 1,
"correct": "incorrect",
"problem_id": self.problem_id,
}
expected_key = (self.course_id, self.answer_id)
expected_value = (self.timestamp, answer_data)
self._check_output([input_data], [(expected_key, expected_value)])
def test_one_submission_event(self):
problem_data = self._create_problem_data_dict()
problem_data['submission'] = {
self.answer_id: {
"input_type": "formulaequationinput",
"question": "Enter the number of fingers on a human hand",
"response_type": "numericalresponse",
"answer": "3",
"variant": 629,
"correct": False
},
}
input_data = (self.timestamp, json.dumps(problem_data))
answer_data = {
u"answer": u"3",
u"problem_display_name": None,
u"variant": 629,
u"correct": False,
u"problem_id": unicode(self.problem_id),
u"input_type": u"formulaequationinput",
u"question": u"Enter the number of fingers on a human hand",
u"response_type": u"numericalresponse",
}
expected_key = (self.course_id, self.answer_id)
expected_value = (self.timestamp, answer_data)
self._check_output([input_data], [(expected_key, expected_value)])
# TODO: test adding problem_display_name to context.
# TODO: test with multiple answers from the same problem.
# TODO: add hidden Ids
# TODO: add submissions with answer_value_ids.
# TODO: test multiple answers from the same user (w/different times).
# (and different orders).
class AnswerDistributionPerCourseReduceTest(unittest.TestCase):
"""
Verify that AnswerDistributionPerCourseMixin.reduce() works correctly.
"""
def setUp(self):
self.task = AnswerDistributionPerCourseMixin()
self.course_id = "MITx/7.00x/2013_Spring"
self.problem_id = "i4x://MITx/7.00x/2013_Spring/problem/PSet1:PS1_Q1"
self.answer_id = "i4x-MITx-7_00x-problem-PSet1_PS1_Q1_2_1"
self.timestamp = "2013-12-17T15:38:32.805444"
self.earlier_timestamp = "2013-12-15T15:38:32.805444"
self.key = (self.course_id, self.answer_id)
self.problem_display_name = "This is the Problem for You!"
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return tuple(self.task.reducer(self.key, values))
def _check_output(self, inputs, expected):
"""Compare generated with expected output."""
reducer_output = self._get_reducer_output(inputs)
self.assertEquals(len(reducer_output), len(expected))
for course_id, _output in reducer_output:
self.assertEquals(course_id, self.course_id)
# We don't know what order the outputs will be dumped for a given
# set of inputs, so we have to compare sets.
reducer_outputs = set([frozenset(json.loads(output).items()) for _, output in reducer_output])
expected_outputs = set([frozenset(output.items()) for output in expected])
self.assertEquals(reducer_outputs, expected_outputs)
def _get_answer_data(self, **kwargs):
answer_data = {
"answer": "3",
"problem_display_name": None,
"variant": None,
"correct": False,
"problem_id": self.problem_id,
"input_type": "formulaequationinput",
"question": "Enter the number of fingers on a human hand",
"response_type": "numericalresponse",
}
answer_data.update(**kwargs)
return answer_data
def _get_non_submission_answer_data(self, **kwargs):
answer_data = {
"answer_value_id": "3",
"problem_display_name": None,
"variant": None,
"correct": False,
"problem_id": self.problem_id,
}
answer_data.update(**kwargs)
return answer_data
def _get_expected_output(self, answer_data, **kwargs):
"""Get an output based on the input."""
expected_output = {
"Problem Display Name": answer_data.get('problem_display_name'),
"Count": 1,
"PartID": self.answer_id,
"Question": answer_data.get('question'),
"AnswerValue": answer_data.get('answer'),
"ValueID": "",
"Variant": answer_data.get('variant'),
"Correct Answer": "1" if answer_data['correct'] else '0',
"ModuleID": self.problem_id,
}
expected_output.update(**kwargs)
return expected_output
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), tuple())
def test_one_answer_event(self):
answer_data = self._get_answer_data()
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data)
self._check_output([input_data], (expected_output,))
def test_event_with_variant(self):
answer_data = self._get_answer_data(variant=629)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data)
self._check_output([input_data], (expected_output,))
def test_event_with_problem_name(self):
answer_data = self._get_answer_data(problem_display_name=self.problem_display_name)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data)
self._check_output([input_data], (expected_output,))
def test_choice_answer(self):
answer_data = self._get_answer_data(
answer_value_id='choice_1',
answer='First Choice',
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='choice_1',
AnswerValue='First Choice'
)
self._check_output([input_data], (expected_output,))
def test_multiple_choice_answer(self):
answer_data = self._get_answer_data(
answer_value_id=['choice_1','choice_2','choice_4'],
answer=['First Choice','Second Choice','Fourth Choice'],
response_type="multiplechoiceresponse",
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='[choice_1|choice_2|choice_4]',
AnswerValue='[First Choice|Second Choice|Fourth Choice]'
)
self._check_output([input_data], (expected_output,))
def test_filtered_response_type(self):
answer_data = self._get_answer_data(
response_type="customresponse",
)
input_data = (self.timestamp, json.dumps(answer_data))
self.assertEquals(self._get_reducer_output([input_data]), tuple())
def test_filtered_non_submission_answer(self):
answer_data = self._get_non_submission_answer_data()
input_data = (self.timestamp, json.dumps(answer_data))
self.assertEquals(self._get_reducer_output([input_data]), tuple())
def test_two_answer_event_same(self):
answer_data = self._get_answer_data()
input_data_1 = (self.earlier_timestamp, json.dumps(answer_data))
input_data_2 = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data, Count=2)
self._check_output([input_data_1, input_data_2], (expected_output,))
def test_two_answer_event_same_reversed(self):
answer_data = self._get_answer_data()
input_data_1 = (self.earlier_timestamp, json.dumps(answer_data))
input_data_2 = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data, Count=2)
self._check_output([input_data_2, input_data_1], (expected_output,))
def test_two_answer_event_same_old_and_new(self):
answer_data_1 = self._get_non_submission_answer_data()
answer_data_2 = self._get_answer_data()
input_data_1 = (self.earlier_timestamp, json.dumps(answer_data_1))
input_data_2 = (self.timestamp, json.dumps(answer_data_2))
expected_output = self._get_expected_output(answer_data_2, Count=2)
self._check_output([input_data_1, input_data_2], (expected_output,))
def test_two_answer_event_different_answer(self):
answer_data_1 = self._get_answer_data(answer="first")
answer_data_2 = self._get_answer_data(answer="second")
input_data_1 = (self.earlier_timestamp, json.dumps(answer_data_1))
input_data_2 = (self.timestamp, json.dumps(answer_data_2))
expected_output_1 = self._get_expected_output(answer_data_1)
expected_output_2 = self._get_expected_output(answer_data_2)
self._check_output([input_data_1, input_data_2], (expected_output_1, expected_output_2))
def test_two_answer_event_different_variant(self):
answer_data_1 = self._get_answer_data(variant=123)
answer_data_2 = self._get_answer_data(variant=456)
input_data_1 = (self.earlier_timestamp, json.dumps(answer_data_1))
input_data_2 = (self.timestamp, json.dumps(answer_data_2))
expected_output_1 = self._get_expected_output(answer_data_1)
expected_output_2 = self._get_expected_output(answer_data_2)
self._check_output([input_data_1, input_data_2], (expected_output_1, expected_output_2))
def _load_metadata(self, **kwargs):
"""Defines some metadata for test answer."""
metadata_dict = {
self.answer_id: {
"question": "Pick One or Two",
"response_type": "multiplechoiceresponse",
"input_type": "my_input_type",
"problem_display_name": self.problem_display_name,
}
}
metadata_dict[self.answer_id].update(**kwargs)
answer_metadata = StringIO.StringIO(json.dumps(metadata_dict))
self.task.load_answer_metadata(answer_metadata)
def test_non_submission_choice_with_metadata(self):
self._load_metadata(
answer_value_id_map={"choice_1": "First Choice", "choice_2": "Second Choice"}
)
answer_data = self._get_non_submission_answer_data(
answer_value_id='choice_1',
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='choice_1',
AnswerValue='First Choice',
Question="Pick One or Two",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
def test_non_submission_multichoice_with_metadata(self):
self._load_metadata(
answer_value_id_map={"choice_1": "First Choice", "choice_2": "Second Choice"}
)
answer_data = self._get_non_submission_answer_data(
answer_value_id=['choice_1','choice_2']
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='[choice_1|choice_2]',
AnswerValue='[First Choice|Second Choice]',
Question="Pick One or Two",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
def test_non_submission_nonmapped_multichoice_with_metadata(self):
self._load_metadata()
answer_data = self._get_non_submission_answer_data(
answer_value_id=['choice_1','choice_2']
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='[choice_1|choice_2]',
AnswerValue='',
Question="Pick One or Two",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
def test_non_submission_nonmapped_choice_with_metadata(self):
self._load_metadata()
answer_data = self._get_non_submission_answer_data(
answer_value_id='choice_1'
)
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
ValueID='choice_1',
AnswerValue='',
Question="Pick One or Two",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
def test_non_submission_nonmapped_nonchoice_with_metadata(self):
self._load_metadata(response_type="optionresponse")
answer_data = self._get_non_submission_answer_data()
input_data = (self.timestamp, json.dumps(answer_data))
expected_output = self._get_expected_output(answer_data,
AnswerValue='3',
Question="Pick One or Two",
)
expected_output["Problem Display Name"] = self.problem_display_name
self._check_output([input_data], (expected_output,))
...@@ -5,7 +5,7 @@ import datetime ...@@ -5,7 +5,7 @@ import datetime
import re import re
import logging import logging
logger = logging.getLogger(__name__) log = logging.getLogger(__name__)
PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$') PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
...@@ -72,6 +72,53 @@ def parse_json_event(line, nested=False): ...@@ -72,6 +72,53 @@ def parse_json_event(line, nested=False):
return parsed return parsed
def parse_json_server_event(line, requested_event_type):
"""
Parse a tracking log input line as JSON to create a dict representation.
Arguments:
line: the eventlog text
requested_event_type: string representing the requested event_type
Returns:
tracking event log entry as a dict, if line corresponds to a server
event with the requested event_type.
Returns None if an error is encountered or if it doesn't match.
"""
# Before parsing, check that the line contains something that
# suggests it's a problem_check event.
if requested_event_type not in line:
return None
# Parse the line into a dict.
event = parse_json_event(line)
if event is None:
# The line didn't parse. We know that some significant number
# of server lines do not parse because of line length issues,
# so log these here.
log.error("encountered event line that did not parse: %s", line)
return None
# We are only interested in server events, not browser events.
event_source = event.get('event_source')
if event_source is None:
log.error("encountered event with no event_source: %s", event)
return None
if event_source != 'server':
return None
# We only want the explicit event, not the implicit form.
event_type = event.get('event_type')
if event_type is None:
log.error("encountered event with no event_type: %s", event)
return None
if event_type != requested_event_type:
return None
return event
# Time-related terminology: # Time-related terminology:
# * datetime: a datetime object. # * datetime: a datetime object.
# * timestamp: a string, with date and time (to millisecond), in ISO format. # * timestamp: a string, with date and time (to millisecond), in ISO format.
...@@ -118,7 +165,7 @@ def get_event_data(event): ...@@ -118,7 +165,7 @@ def get_event_data(event):
event_value = event.get('event') event_value = event.get('event')
if event_value is None: if event_value is None:
logger.error("encountered event with missing event value: %s", event) log.error("encountered event with missing event value: %s", event)
return None return None
if isinstance(event_value, basestring): if isinstance(event_value, basestring):
...@@ -126,12 +173,59 @@ def get_event_data(event): ...@@ -126,12 +173,59 @@ def get_event_data(event):
try: try:
event_value = decode_json(event_value) event_value = decode_json(event_value)
except Exception: except Exception:
logger.error("encountered event with unparsable event value: %s", event) log.error("encountered event with unparsable event value: %s", event)
return None return None
if isinstance(event_value, dict): if isinstance(event_value, dict):
# It's fine, just return. # It's fine, just return.
return event_value return event_value
else: else:
logger.error("encountered event data with unrecognized type: %s", event) log.error("encountered event data with unrecognized type: %s", event)
return None return None
def get_augmented_event_data(event, fields_to_augment):
"""
Returns event data from an event log entry, and adds additional fields.
Args:
event: event log entry as a dict object
fields_to_augment: list of field names to use as keys.
Returns:
dict containing event data, with keys listed in `fields_to_augment`
pulled from event dict and place in the returned dict.
Returns None if not found.
"""
# Get the event data.
event_data = get_event_data(event)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
if 'timestamp' in fields_to_augment:
# Get the timestamp as an object.
datetime_obj = get_event_time(event)
if datetime_obj is None:
log.error("encountered event with bad datetime: %s", event)
return None
timestamp = datetime_to_timestamp(datetime_obj)
event_data['timestamp'] = timestamp
if 'context' in fields_to_augment:
# Get the event context.
context = event.get('context')
if context is None:
# Too common -- do not log here.
return None
event_data['context'] = context
if 'username' in fields_to_augment:
username = event.get('username')
if username is None:
log.error("encountered event with unexpected missing username: %s", event)
return None
event_data['username'] = username
return event_data
...@@ -28,6 +28,7 @@ edx.analytics.tasks = ...@@ -28,6 +28,7 @@ edx.analytics.tasks =
total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments
inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
answer_dist = edx.analytics.tasks.answer_dist:AnswerDistributionPerCourse
mapreduce.engine = mapreduce.engine =
hadoop = luigi.hadoop:DefaultHadoopJobRunner hadoop = luigi.hadoop:DefaultHadoopJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment