Commit f6c8db32 by Brian Wilson

Refactor course enrollment calculations.

Omit days with no change in user status.  Add tests for reduce.  Rename time-related functions.
parent c1164ce5
......@@ -26,172 +26,93 @@ import edx.analytics.util.eventlog as eventlog
from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask
################################
# Task Map-Reduce definitions
################################
class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
def get_implicit_enrollment_output(self, item):
def get_explicit_enrollment_output(line):
"""
Generates output values for implicit enrollment events.
Generates output values for explicit enrollment events.
Output format: (course_id, username), (datetime, action_value)
Output format: (course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the second.
Returns None if the enrollment event on the line is not valid.
Returns None if there is no valid enrollment event on the line.
"""
event_data = eventlog.get_event_data(item)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
# The args are part of the POST request.
# The course_id is stored in a list, so just take the first value:
post_args = event_data['POST']
if 'course_id' not in post_args:
eventlog.log_item("encountered event with no course_id in post args", item)
return None
course_id = post_args['course_id'][0]
if len(course_id) == 0:
eventlog.log_item("encountered event with zero-length course_id in post args", item)
# Before parsing, check that the line contains something that
# suggests it's an enrollment event.
if 'edx.course.enrollment' not in line:
return None
# This is a hack, due to a bug in luigi/hadoop.py:
# In JobTask.writer(), it calls "\t".join(map(str, flatten(output)))
# which returns a UnicodeEncodeError when output contains non-ascii characters.
# For now, just log and skip such course_ids. Create a separate story in future
# to make sure that Luigi handles non-ascii characters in general.
try:
str(course_id)
except:
eventlog.log_item("encountered event with non-ascii course_id in post args", item)
# try to parse the line into a dict:
item = eventlog.parse_eventlog_item(line)
if item is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
# The value of action is expected to be 'enroll' or 'unenroll', but is
# stored in a list. We just take the first value (but log if there are more).
if 'enrollment_action' not in post_args:
eventlog.log_item("encountered event with no enrollment_action in post args", item)
# get event type, and check that it exists:
event_type = item.get('event_type')
if event_type is None:
eventlog.log_item("encountered event with no event_type", item)
return None
actions = post_args['enrollment_action']
if len(actions) != 1:
eventlog.log_item("encountered event with multiple enrollment_actions in post args", item, "WARNING")
action = actions[0]
if action == 'enroll':
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = 1
elif action == 'unenroll':
elif event_type == 'edx.course.enrollment.deactivated':
action_value = -1
else:
eventlog.log_item("encountered event with unrecognized value for enrollment_action in post args", item, "WARNING")
return None
# get additional data: timestamp and username:
timestamp = eventlog.get_timestamp(item)
if timestamp is None:
# bad format?
eventlog.log_item("encountered event with bad timestamp", item)
# not an enrollment event...
return None
if 'username' not in item:
# bad format?
eventlog.log_item("encountered implicit enrollment event with no username", item, "WARNING")
# get the timestamp:
datetime = eventlog.get_datetime(item)
if datetime is None:
eventlog.log_item("encountered event with bad datetime", item)
return None
timestamp = eventlog.get_timestamp(datetime)
username = item['username']
return (course_id, username), (eventlog.get_datetime_string(timestamp), action_value)
def get_explicit_enrollment_output(self, item, event_type):
"""
Generates output values for explicit enrollment events.
Output format: (course_id, username), (datetime, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
Returns None if the enrollment event on the line is not valid.
"""
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = 1
elif event_type == 'edx.course.enrollment.deactivated':
action_value = -1
# Enrollment parameters like course_id and user_id may be stored
# in the context and also in the data. Pick the data. For
# course_id, we expect the values to be the same. However, for
# user_id, the values may not be the same. This is because the
# context contains the name and id of the user making a particular
# request, but it is not necessarily the id of the user being
# enrolled. For example, Studio provides authors with the ability
# to add staff to their courses, and these staff get enrolled in
# the course while the author is listed in the context.
# Data is stored in the context, but it's also in the data.
# Pick one.
# Get the event data:
event_data = eventlog.get_event_data(item)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
# Get the course_id from the data, and validate.
course_id = event_data['course_id']
# for now, ignore the enrollment 'mode' (e.g. 'honor')
# get additional data:
timestamp = eventlog.get_timestamp(item)
if timestamp is None:
# bad format?
eventlog.log_item("encountered event with bad timestamp", item)
if not eventlog.is_valid_course_id(course_id):
eventlog.log_item("encountered explicit enrollment event with bogus course_id", item)
return None
# there is also a user_id in the event_data, but who knows if
# it's the same as the username? But for old events, we don't have
# such a user_id, and I don't think we're planning on loading such a mapping.
if 'username' not in item:
# bad format?
eventlog.log_item("encountered explicit enrollment event with no username", item)
# Get the user_id from the data:
if 'user_id' not in event_data:
eventlog.log_item("encountered explicit enrollment event with no user_id", item)
return None
username = item['username']
return (course_id, username), (eventlog.get_datetime_string(timestamp), action_value)
def get_enrollment_event(self, line):
"""
Generates output values for explicit enrollment events.
Output format: (course_id, username), (datetime, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
user_id = event_data['user_id']
Returns None if there is no enrollment event on the line.
"""
# Before parsing, check that the line contains something that
# suggests it's an enrollment event.
if 'edx.course.enrollment' not in line and '/change_enrollment' not in line:
return None
# For now, ignore the enrollment 'mode' (e.g. 'honor').
# try to parse the line into a dict:
item = eventlog.parse_eventlog_item(line)
if item is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
return (course_id, user_id), (timestamp, action_value)
# get event type, and check that it exists:
event_type = item.get('event_type')
if event_type is None:
eventlog.log_item("encountered event with no event_type", item)
return None
# check if it is an 'explicit' enrollment event:
if (event_type == 'edx.course.enrollment.activated' or
event_type == 'edx.course.enrollment.deactivated'):
return self.get_explicit_enrollment_output(item, event_type)
################################
# Task Map-Reduce definitions
################################
# check if it is an 'implicit' enrollment event:
if event_type == '/change_enrollment':
return self.get_implicit_enrollment_output(item)
# Not an enrollment event...
return None
class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
def mapper(self, line):
"""
......@@ -205,7 +126,7 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
edX/DemoX/Demo_Course dummyuser 2013-09-10 -1
"""
parsed_tuple = self.get_enrollment_event(line)
parsed_tuple = get_explicit_enrollment_output(line)
if parsed_tuple is not None:
# sys.stderr.write("Found tuple in mapper: " + str(parsed_tuple) + '\n')
yield parsed_tuple
......@@ -241,30 +162,45 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
"""
# sys.stderr.write("Found key in reducer: " + str(key) + '\n')
course_id, username = key
# Sort input values (by timestamp) to easily detect the end of a day.
sorted_values = sorted(values)
prev_date = None
prev_change = 0
net_change = 0
for (datetime, change_value) in sorted_values:
# get the day's date from the event timestamp:
this_date = eventlog.get_date_from_datetime(datetime)
# if the date is different, then output the previous date:
if this_date != prev_date and prev_date is not None:
# sys.stderr.write("outputting date and value: " + str(prev_date) + " " + str(net_change) + '\n')
yield (course_id, prev_date), net_change
net_change = 0
# Convert timestamps to dates, so we can group them by day.
fn = eventlog.get_datestamp_from_timestamp
values = [(fn(timestamp), value) for timestamp, value in sorted_values]
# sys.stderr.write("accumulating date and value: " + str(this_date) + " " + str(change_value) + '\n')
# accumulate the new numbers:
prev_date = this_date
if change_value != prev_change:
net_change += change_value
prev_change = change_value
# Add a stop item to ensure we process the last entry.
values = values + [(None, None)]
if prev_date is not None:
yield (course_id, prev_date), net_change
# The enrollment state for each student: {1 : enrolled, -1: unenrolled}
# Assume students start in an unknown state, so that whatever happens on
# the first day will get output.
state, prev_state = 0, 0
prev_date = None
import sys
for (this_date, action) in values:
# Before we process a new date, report the state if it has
# changed from the previously reported, if any.
if this_date != prev_date and prev_date is not None:
if state != prev_state:
# sys.stderr.write("outputting date and value: " + str(prev_date) + " " + str(state) + '\n')
prev_state = state
yield (course_id, prev_date), state
# sys.stderr.write("accumulating date and value: " + str(this_date) + " " + str(action) + '\n')
# Consecutive changes of the same kind don't affect the state.
if action != state:
state = action
# If this is the first entry, then we need to infer what
# the previous state was before the first entry arrives.
# For this, we take the opposite of the first entry.
if prev_date is None:
prev_state = -1 if action == 1 else 1
prev_date = this_date
class BaseCourseEnrollmentChangesPerDay(luigi.hadoop.JobTask):
"""Calculates daily changes in enrollment, given per-user net changes by date."""
......@@ -300,10 +236,10 @@ class BaseCourseEnrollmentChangesPerDay(luigi.hadoop.JobTask):
Output value: sum(changes)
"""
# sys.stderr.write("Found key in second reducer: " + str(key) + '\n')
sum_value = 0
count = 0
for value in values:
sum_value += int(value)
yield key, sum_value
count += int(value)
yield key, count
class BaseCourseEnrollmentTotalsPerDay(luigi.hadoop.JobTask):
......@@ -453,9 +389,10 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay):
# sys.stderr.write("Found key in reducer: " + str(key) + '\n')
course_id, username = key
sorted_values = sorted(values)
for (datetime, change_value) in sorted_values:
# get the day's date from the event timestamp:
this_date = eventlog.get_date_from_datetime(datetime)
for (timestamp, change_value) in sorted_values:
# get the day's date from the event's timestamp:
this_date = eventlog.get_datestamp_from_timestamp(timestamp)
# if it's an enrollment, output it and we're done.
if change_value > 0:
yield (course_id, this_date), change_value
return
......@@ -483,40 +420,12 @@ class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay):
import edx.analytics.util
return [boto, edx.analytics.util, cjson]
class FirstCourseEnrollmentTotalsPerDay(BaseCourseEnrollmentTotalsPerDay):
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def requires(self):
return FirstCourseEnrollmentChangesPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'first_course_enrollment_totals_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self):
import boto
import cjson
import edx.analytics.util
return [boto, edx.analytics.util, cjson]
################################
# Running tasks
################################
def main():
import argparse
import boto
import cjson
luigi.hadoop.attach(boto, argparse, cjson)
luigi.run()
......
......@@ -72,7 +72,7 @@ class PathSetTask(luigi.Task):
def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3()
self.s3 = None
def requires(self):
if self.src.startswith('s3'):
......@@ -103,6 +103,10 @@ class PathSetTask(luigi.Task):
def _generate_sources(self):
bucket_name, root = get_s3_bucket_key_names(self.src)
# connect lazily, only if necessary:
if self.s3 is None:
self.s3 = boto.connect_s3()
bucket = self.s3.get_bucket(bucket_name)
keys = (s.key for s in bucket.list(root) if s.size > 0)
......
"""
Tests for tasks that collect enrollment events.
"""
import unittest
from edx.analytics.tasks.course_enroll import (
BaseCourseEnrollmentEventsPerDay,
BaseCourseEnrollmentChangesPerDay,
BaseCourseEnrollmentTotalsPerDay,
)
from datetime import datetime
class CourseEnrollEventReduceTest(unittest.TestCase):
"""
Tests to verify that event log parsing works correctly.
"""
def setUp(self):
self.task = BaseCourseEnrollmentEventsPerDay()
self.key = ('course', 'user')
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
def test_no_events(self):
self.assertEquals(self._get_reducer_output([]), [])
def test_single_enrollment(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
]),
[
(('course', '2013-01-01'), 1),
])
def test_single_unenrollment(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', -1),
]),
[
(('course', '2013-01-01'), -1),
])
def test_multiple_events_on_same_day(self):
# run first with no output expected:
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', -1),
]),
[
])
# then run with output expected:
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', 1),
]),
[
(('course', '2013-01-01'), 1),
])
def test_multiple_events_out_of_order(self):
# Make sure that events are sorted by the reducer.
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:04', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
]),
[
])
def test_multiple_enroll_events_on_same_day(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', 1),
]),
[
(('course', '2013-01-01'), 1),
])
def test_multiple_unenroll_events_on_same_day(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', -1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', -1),
]),
[
(('course', '2013-01-01'), -1),
])
def test_multiple_enroll_events_on_many_days(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-02T00:00:03', 1),
('2013-01-02T00:00:04', 1),
('2013-01-04T00:00:05', 1),
]),
[
(('course', '2013-01-01'), 1),
])
def test_multiple_events_on_many_days(self):
# Run with an arbitrary list of events.
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T1', 1),
('2013-01-01T2', -1),
('2013-01-01T3', 1),
('2013-01-01T4', -1),
('2013-01-02', 1),
('2013-01-03', 1),
('2013-01-04T1', 1),
('2013-01-04T2', -1),
('2013-01-05', -1),
('2013-01-06', -1),
('2013-01-07', 1),
('2013-01-08T1', 1),
('2013-01-08T2', 1),
('2013-01-09T1', -1),
('2013-01-09T2', -1),
]),
[
(('course', '2013-01-02'), 1),
(('course', '2013-01-04'), -1),
(('course', '2013-01-07'), 1),
(('course', '2013-01-09'), -1),
])
class CourseEnrollChangesReduceTest(unittest.TestCase):
"""
Verify that BaseCourseEnrollmentChangesPerDay.reduce() works correctly.
"""
def setUp(self):
self.task = BaseCourseEnrollmentChangesPerDay()
self.key = ('course', '2013-01-01')
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), [(self.key, 0)])
def test_single_user_count(self):
self.assertEquals(self._get_reducer_output([1]), [(self.key, 1)])
def test_multiple_user_count(self):
inputs = [1, 1, 1, -1, 1]
self.assertEquals(self._get_reducer_output(inputs), [(self.key, 3)])
class CourseEnrollTotalsReduceTest(unittest.TestCase):
"""
Verify that BaseCourseEnrollmentTotalsPerDay.reduce() works correctly.
"""
def setUp(self):
self.task = BaseCourseEnrollmentTotalsPerDay()
self.key = 'course'
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), [])
def test_single_user_count(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01', 5),
]),
[
(self.key, '2013-01-01', 5),
])
def test_multiple_user_count(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01', 5),
('2013-01-02', 8),
('2013-01-03', 4),
('2013-01-04', 9),
]),
[
(self.key, '2013-01-01', 5),
(self.key, '2013-01-02', 13),
(self.key, '2013-01-03', 17),
(self.key, '2013-01-04', 26),
])
......@@ -8,26 +8,44 @@ import re
PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
# borrowed from modulestore/parsers.py:
ALLOWED_ID_CHARS = r'[a-zA-Z0-9_\-~.:]'
PATTERN_COURSEID = re.compile(r'^' + ALLOWED_ID_CHARS + r'+$')
def get_datetime_string(timestamp):
return timestamp.strftime('%Y-%m-%dT%H:%M:%S')
def is_valid_course_id(course_id):
"""
Determines if a course_id from an event log is possibly legitimate.
Applies two tests:
def get_date_string(timestamp):
return timestamp.strftime('%Y-%m-%d')
* Course Id can be split into org/coursename/runname using '/' as delimiter.
* Components of id contain only "allowed" characters as defined in modulestore/parsers.py.
def get_date_from_datetime(datetime_string):
return datetime_string.split('T')[0]
Note this will need to be updated as split-mongo changes are rolled out
that permit a broader set of id values.
"""
components = course_id.split('/')
if len(components) != 3:
return False
return all(PATTERN_COURSEID.match(component) for component in components)
def json_decode(line):
"""Wrapper to decode JSON string in implementation-independent way."""
"""Wrapper to decode JSON string in an implementation-independent way."""
return cjson.decode(line)
def parse_eventlog_item(line, nested=False):
""" Parse a tracking log input line as JSON to create a dict representation."""
"""
Parse a tracking log input line as JSON to create a dict representation.
Arguments:
* line: the eventlog text
* nested: boolean flag permitting this to be called recursively.
Apparently some eventlog entries are pure JSON, while others are
JSON that are prepended by a timestamp.
"""
try:
parsed = json_decode(line)
except:
......@@ -48,11 +66,32 @@ def parse_eventlog_item(line, nested=False):
def log_item(msg, item, level='ERROR'):
"""Writes a message about an eventlog item."""
sys.stderr.write("{level}: {msg}: {item}\n".format(msg=msg, item=item, level=level))
def get_timestamp(item):
# Time-related terminology:
# * datetime: a datetime object.
# * timestamp: a string, with date and time (to second), in ISO format.
# * datestamp: a string with only date information, in ISO format.
def get_timestamp(datetime):
"""Returns a string with the datetime value of the provided datetime object."""
return datetime.strftime('%Y-%m-%dT%H:%M:%S')
def get_datestamp(datetime):
"""Returns a string with the date value of the provided datetime object."""
return datetime.strftime('%Y-%m-%d')
def get_datestamp_from_timestamp(timestamp):
"""Returns a string with the date value of the provided ISO datetime string."""
return timestamp.split('T')[0]
def get_datetime(item):
"""Returns a datetime object from an event item, if present."""
try:
timestamp = item['time']
removed_ms = timestamp.split('.')[0]
......@@ -62,6 +101,11 @@ def get_timestamp(item):
def get_event_data(item):
"""
Returns event data from an event log item as a dict object.
Returns None if not found.
"""
event_value = item.get('event')
if event_value is None:
......@@ -69,7 +113,7 @@ def get_event_data(item):
return None
if isinstance(event_value, basestring):
# If the value is a string, try to parse as JSON into a dict:.
# If the value is a string, try to parse as JSON into a dict.
try:
event_value = json_decode(event_value)
except:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment