Commit f724d3de by Brian Wilson

Add test coverage for course_enroll and eventlog. Fix pylint/pep8 in PR.

parent f6c8db32
...@@ -16,6 +16,7 @@ Example command lines: ...@@ -16,6 +16,7 @@ Example command lines:
--dest s3://edx-analytics-scratch/output --dest s3://edx-analytics-scratch/output
""" """
import sys
import luigi import luigi
import luigi.hadoop import luigi.hadoop
...@@ -25,85 +26,8 @@ import luigi.hdfs ...@@ -25,85 +26,8 @@ import luigi.hdfs
import edx.analytics.util.eventlog as eventlog import edx.analytics.util.eventlog as eventlog
from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask
import logging
def get_explicit_enrollment_output(line): logger = logging.getLogger(__name__)
"""
Generates output values for explicit enrollment events.
Output format: (course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the second.
Returns None if there is no valid enrollment event on the line.
"""
# Before parsing, check that the line contains something that
# suggests it's an enrollment event.
if 'edx.course.enrollment' not in line:
return None
# try to parse the line into a dict:
item = eventlog.parse_eventlog_item(line)
if item is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
# get event type, and check that it exists:
event_type = item.get('event_type')
if event_type is None:
eventlog.log_item("encountered event with no event_type", item)
return None
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = 1
elif event_type == 'edx.course.enrollment.deactivated':
action_value = -1
else:
# not an enrollment event...
return None
# get the timestamp:
datetime = eventlog.get_datetime(item)
if datetime is None:
eventlog.log_item("encountered event with bad datetime", item)
return None
timestamp = eventlog.get_timestamp(datetime)
# Enrollment parameters like course_id and user_id may be stored
# in the context and also in the data. Pick the data. For
# course_id, we expect the values to be the same. However, for
# user_id, the values may not be the same. This is because the
# context contains the name and id of the user making a particular
# request, but it is not necessarily the id of the user being
# enrolled. For example, Studio provides authors with the ability
# to add staff to their courses, and these staff get enrolled in
# the course while the author is listed in the context.
# Get the event data:
event_data = eventlog.get_event_data(item)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
# Get the course_id from the data, and validate.
course_id = event_data['course_id']
if not eventlog.is_valid_course_id(course_id):
eventlog.log_item("encountered explicit enrollment event with bogus course_id", item)
return None
# Get the user_id from the data:
if 'user_id' not in event_data:
eventlog.log_item("encountered explicit enrollment event with no user_id", item)
return None
user_id = event_data['user_id']
# For now, ignore the enrollment 'mode' (e.g. 'honor').
return (course_id, user_id), (timestamp, action_value)
################################ ################################
...@@ -111,64 +35,72 @@ def get_explicit_enrollment_output(line): ...@@ -111,64 +35,72 @@ def get_explicit_enrollment_output(line):
################################ ################################
class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask): class CourseEnrollmentEventsPerDayMixin(object):
"""Calculates daily change in enrollment for a user in a course, given raw event log input.""" """Calculates daily change in enrollment for a user in a course, given raw event log input."""
def mapper(self, line): def mapper(self, line):
""" """
Output format: (course_id, username), (datetime, action_value) Generates output values for explicit enrollment events.
where action_value = 1 (enrolled) or -1 (unenrolled) Args:
Example: line: text line from a tracking event log.
edX/DemoX/Demo_Course dummyuser 2013-09-10 1
edX/DemoX/Demo_Course dummyuser 2013-09-10 1
edX/DemoX/Demo_Course dummyuser 2013-09-10 -1
Yields:
(course_id, user_id), (timestamp, action_value)
where `timestamp` is in ISO format, with resolution to the millisecond
and `action_value` = 1 (enrolled) or -1 (unenrolled).
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
""" """
parsed_tuple = get_explicit_enrollment_output(line) parsed_tuple_or_none = get_explicit_enrollment_output(line)
if parsed_tuple is not None: if parsed_tuple_or_none is not None:
# sys.stderr.write("Found tuple in mapper: " + str(parsed_tuple) + '\n') yield parsed_tuple_or_none
yield parsed_tuple
def reducer(self, key, values): def reducer(self, key, values):
""" """
Calculate status for each user on the end of each day where they changed their status. Calculate status for each user on the end of each day where they changed their status.
Output key: (course_id, date) Args:
Output value: net enrollment change on that date for an individual user.
Expected values are -1, 0 (no change), 1
Note that we don't bother to actually output the username, key: (course_id, user_id) tuple
since it's not needed downstream. value: (timestamp, action_value) tuple
Yields:
(course_id, datestamp), enrollment_change
If the user were already enrolled (or attempted enrollment), where `datestamp` is in ISO format, with resolution to the day
the net change from a subsequent enrollment is zero. Same to and `enrollment_change` is the change on that date for an individual user.
unenroll after an unenroll. This is true whether they occur Produced values are -1 or 1.
on the same day or on widely disparate days. For implicit
enrollment events, we don't know when they succeed, so we
assume they succeed the first time, and ignore subsequent
attempts. Likewise for implicit enrollment events followed by
explicit enrollment events.
An unenroll following an enroll on the same day will also No output is yielded if a user enrolls and then unenrolls (or unenrolls and
result in zero change. then enrolls) on a given day. Only days with a change at the end of the day
when compared with the previous day are output.
Note that we don't bother to actually output the user_id,
since it's not needed downstream (though it might be sometimes useful
for debugging).
Example: Example:
edX/DemoX/Demo_Course 2013-09-10 1 (edX/DemoX/Demo_Course, 2013-09-10), 1
edX/DemoX/Demo_Course 2013-09-10 -1 (edX/DemoX/Demo_Course, 2013-09-12), -1
edX/DemoX/Demo_Course 2013-09-10 0
""" """
# sys.stderr.write("Found key in reducer: " + str(key) + '\n') # sys.stderr.write("Found key in reducer: " + str(key) + '\n')
course_id, username = key course_id, user_id = key
# Sort input values (by timestamp) to easily detect the end of a day. # Sort input values (by timestamp) to easily detect the end of a day.
# Note that this assumes the timestamp values (strings) are in ISO
# representation, so that the tuples will be ordered in ascending time value.
sorted_values = sorted(values) sorted_values = sorted(values)
# Convert timestamps to dates, so we can group them by day. # Convert timestamps to dates, so we can group them by day.
fn = eventlog.get_datestamp_from_timestamp func = eventlog.timestamp_to_datestamp
values = [(fn(timestamp), value) for timestamp, value in sorted_values] values = [(func(timestamp), value) for timestamp, value in sorted_values]
# Add a stop item to ensure we process the last entry. # Add a stop item to ensure we process the last entry.
values = values + [(None, None)] values = values + [(None, None)]
...@@ -179,7 +111,6 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask): ...@@ -179,7 +111,6 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
state, prev_state = 0, 0 state, prev_state = 0, 0
prev_date = None prev_date = None
import sys
for (this_date, action) in values: for (this_date, action) in values:
# Before we process a new date, report the state if it has # Before we process a new date, report the state if it has
# changed from the previously reported, if any. # changed from the previously reported, if any.
...@@ -193,6 +124,10 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask): ...@@ -193,6 +124,10 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
# Consecutive changes of the same kind don't affect the state. # Consecutive changes of the same kind don't affect the state.
if action != state: if action != state:
state = action state = action
else:
sys.stderr.write("WARNING: duplicate enrollment event {action} "
"for user_id {user_id} in course {course_id} on {date}".format(
action=action, user_id=user_id, course_id=course_id, date=this_date))
# If this is the first entry, then we need to infer what # If this is the first entry, then we need to infer what
# the previous state was before the first entry arrives. # the previous state was before the first entry arrives.
...@@ -202,161 +137,108 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask): ...@@ -202,161 +137,108 @@ class BaseCourseEnrollmentEventsPerDay(luigi.hadoop.JobTask):
prev_date = this_date prev_date = this_date
class BaseCourseEnrollmentChangesPerDay(luigi.hadoop.JobTask):
class CourseEnrollmentChangesPerDayMixin(object):
"""Calculates daily changes in enrollment, given per-user net changes by date.""" """Calculates daily changes in enrollment, given per-user net changes by date."""
def mapper(self, line): def mapper(self, line):
""" """
Output key: (course_id, date) Args: tab-delimited values in a single text line
Output value: net enrollment change on that date for an individual user.
Expected values are -1, 0 (no change), 1 Yields: (course_id, datestamp), enrollment_change
Example: Example:
edX/DemoX/Demo_Course 2013-09-10 1 (edX/DemoX/Demo_Course, 2013-09-10), 1
edX/DemoX/Demo_Course 2013-09-10 -1 (edX/DemoX/Demo_Course, 2013-09-12), -1
edX/DemoX/Demo_Course 2013-09-10 0
""" """
# yield line course_id, date, enrollment_change = line.split('\t')
inputs = line.split('\t') yield (course_id, date), enrollment_change
if len(inputs) == 3:
yield (inputs[0], inputs[1]), inputs[2]
def reducer(self, key, values): def reducer(self, key, values):
""" """
Reducer: sums enrollments for a given course on a particular date. Reducer: sums enrollments for a given course on a particular date.
Inputs are enrollments changes on a day due to a specific user. Args:
Outputs are enrollment changes on a day summed across all users.
Key: (course_id, date)
Values: individual enrollment changes, represented as -1 or 1.
Output key: (course_id, date) (course_id, datestamp), enrollment_changes
Output value: sum(changes)
"""
# sys.stderr.write("Found key in second reducer: " + str(key) + '\n')
count = 0
for value in values:
count += int(value)
yield key, count
Input `enrollment_changes` are the enrollment changes on a day due to a specific user.
Each user with a change has a separate input, either -1 (unenroll) or 1 (enroll).
class BaseCourseEnrollmentTotalsPerDay(luigi.hadoop.JobTask): Yields:
"""Calculates cumulative changes in enrollment, given net changes by date."""
def mapper(self, line): (course_id, datestamp), enrollment_change
"""
Key: course_id
Values: (date, net enrollment change on that date)
Example:
edX/DemoX/Demo_Course 2013-09-10 5
edX/DemoX/Demo_Course 2013-09-11 -3
"""
# yield line
inputs = line.split('\t')
if len(inputs) == 3:
yield inputs[0], (inputs[1], inputs[2])
def reducer(self, key, values):
"""
Reducer: sums enrollments for a given course through a particular date.
Key: course_id Output `enrollment_change` is summed across all users, one output per course.
Values: date, and enrollment changes per day
Output key: course_id
Output value: date, accum(changes)
""" """
# sys.stderr.write("Found key in third reducer: " + str(key) + '\n') # sys.stderr.write("Found key in second reducer: " + str(key) + '\n')
sorted_values = sorted(values) count = sum(int(v) for v in values)
accum_count = 0 yield key, count
for date, count in sorted_values:
accum_count += int(count)
yield key, date, accum_count
################################## ##################################
# Task requires/output definitions # Task requires/output definitions
################################## ##################################
class CourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): class BaseCourseEnrollmentTask(luigi.hadoop.JobTask):
"""
Base class for course enrollment calculations.
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
run_locally: a boolean flag to indicate that the task should be run locally rather than
on a hadoop cluster. This is used only to change the intepretation of S3 URLs in src and/or dest.
"""
name = luigi.Parameter() name = luigi.Parameter()
src = luigi.Parameter() src = luigi.Parameter()
dest = luigi.Parameter() dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter() run_locally = luigi.BooleanParameter()
def requires(self):
return PathSetTask(self.src, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self): def extra_modules(self):
# The following are needed for (almost) every course enrollment task.
# Boto is used for S3 access, cjson for parsing log files, and util
# is used for parsing events and date conversion.
import boto import boto
import cjson import cjson
import edx.analytics.util import edx.analytics.util
return [boto, edx.analytics.util, cjson] return [boto, edx.analytics.util, cjson]
class CourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay): class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def requires(self): def requires(self):
return CourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally) return PathSetTask(self.src, self.include, self.run_locally)
def output(self): def output(self):
# generate a single output file # generate a single output file
output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name) output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally) return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self):
import boto
import cjson
import edx.analytics.util
return [boto, edx.analytics.util, cjson]
class CourseEnrollmentTotalsPerDay(BaseCourseEnrollmentTotalsPerDay):
name = luigi.Parameter() class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
src = luigi.Parameter() """Calculates daily changes in enrollment, given per-user net changes by date."""
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def requires(self): def requires(self):
return CourseEnrollmentChangesPerDay(self.name, self.src, self.dest, self.include, self.run_locally) return CourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
def output(self): def output(self):
# generate a single output file # generate a single output file
output_name = 'course_enrollment_totals_per_day_{name}'.format(name=self.name) output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally) return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self):
import boto
import cjson
import edx.analytics.util
return [boto, edx.analytics.util, cjson]
class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): class FirstCourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask):
"""Calculate number of "first" course enrollments per-user, per-course, per-day."""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def requires(self): def requires(self):
return PathSetTask(self.src, self.include, self.run_locally) return PathSetTask(self.src, self.include, self.run_locally)
...@@ -366,11 +248,28 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): ...@@ -366,11 +248,28 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay):
output_name = 'first_course_enrollment_events_per_day_{name}'.format(name=self.name) output_name = 'first_course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally) return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self): def mapper(self, line):
import boto """
import cjson Generates output values for explicit enrollment events.
import edx.analytics.util
return [boto, edx.analytics.util, cjson] Args:
line: text line from a tracking event log.
Yields:
(course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the millisecond.
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
"""
parsed_tuple_or_none = get_explicit_enrollment_output(line)
if parsed_tuple_or_none is not None:
yield parsed_tuple_or_none
def reducer(self, key, values): def reducer(self, key, values):
""" """
...@@ -379,7 +278,7 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): ...@@ -379,7 +278,7 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay):
Output key: (course_id, date) Output key: (course_id, date)
Output value: 1 on the first date the user enrolls. Output value: 1 on the first date the user enrolls.
Note that we don't bother to actually output the username, Note that we don't bother to actually output the user_id,
since it's not needed downstream. since it's not needed downstream.
Example: Example:
...@@ -387,7 +286,7 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): ...@@ -387,7 +286,7 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay):
""" """
# sys.stderr.write("Found key in reducer: " + str(key) + '\n') # sys.stderr.write("Found key in reducer: " + str(key) + '\n')
course_id, username = key course_id, _user_id = key
sorted_values = sorted(values) sorted_values = sorted(values)
for (timestamp, change_value) in sorted_values: for (timestamp, change_value) in sorted_values:
# get the day's date from the event's timestamp: # get the day's date from the event's timestamp:
...@@ -398,13 +297,8 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay): ...@@ -398,13 +297,8 @@ class FirstCourseEnrollmentEventsPerDay(BaseCourseEnrollmentEventsPerDay):
return return
class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay): class FirstCourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
"""Calculate changes in "first" course enrollments per-course, per-day."""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def requires(self): def requires(self):
return FirstCourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally) return FirstCourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
...@@ -414,11 +308,95 @@ class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay): ...@@ -414,11 +308,95 @@ class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay):
output_name = 'first_course_enrollment_changes_per_day_{name}'.format(name=self.name) output_name = 'first_course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally) return get_target_for_url(self.dest, output_name, self.run_locally)
def extra_modules(self):
import boto ################################
import cjson # Helper methods
import edx.analytics.util
return [boto, edx.analytics.util, cjson] ################################
def get_explicit_enrollment_output(line):
"""
Generates output values for explicit enrollment events.
Args:
line: text line from a tracking event log.
Returns:
(course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the millisecond.
or None if there is no valid enrollment event on the line.
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
"""
# Before parsing, check that the line contains something that
# suggests it's an enrollment event.
if 'edx.course.enrollment' not in line:
return None
# try to parse the line into a dict:
item = eventlog.parse_eventlog_item(line)
if item is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
# get event type, and check that it exists:
event_type = item.get('event_type')
if event_type is None:
eventlog.log_item("encountered event with no event_type", item)
return None
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = 1
elif event_type == 'edx.course.enrollment.deactivated':
action_value = -1
else:
# not an enrollment event...
return None
# get the timestamp:
datetime = eventlog.get_event_time(item)
if datetime is None:
eventlog.log_item("encountered event with bad datetime", item)
return None
timestamp = eventlog.datetime_to_timestamp(datetime)
# Use the `user_id` from the event `data` field, since the
# `user_id` in the `context` field is the user who made the
# request but not necessarily the one who got enrolled. (The
# `course_id` should be the same in `context` as in `data`.)
# Get the event data:
event_data = eventlog.get_event_data(item)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
# Get the course_id from the data, and validate.
course_id = event_data['course_id']
if not eventlog.is_valid_course_id(course_id):
eventlog.log_item("encountered explicit enrollment event with bogus course_id", item)
return None
# Get the user_id from the data:
user_id = event_data.get('user_id')
if user_id is None:
eventlog.log_item("encountered explicit enrollment event with no user_id", item)
return None
# For now, ignore the enrollment 'mode' (e.g. 'honor').
return (course_id, user_id), (timestamp, action_value)
################################ ################################
# Running tasks # Running tasks
...@@ -426,6 +404,7 @@ class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay): ...@@ -426,6 +404,7 @@ class FirstCourseEnrollmentChangesPerDay(BaseCourseEnrollmentChangesPerDay):
def main(): def main():
"""Mainline for command-line testing."""
luigi.run() luigi.run()
......
...@@ -17,15 +17,7 @@ import luigi.s3 ...@@ -17,15 +17,7 @@ import luigi.s3
import luigi.hdfs import luigi.hdfs
import luigi.format import luigi.format
def get_s3_bucket_key_names(url): from s3_util import join_as_s3_url, generate_s3_sources
"""Extract bucket_name and root from S3 URL."""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Construct a URL for accessing S3, given its components."""
return 's3://{bucket}/{root}/{path}'.format(bucket=bucket, root=root, path=path)
class LocalPathTask(luigi.ExternalTask): class LocalPathTask(luigi.ExternalTask):
...@@ -68,15 +60,22 @@ class PathSetTask(luigi.Task): ...@@ -68,15 +60,22 @@ class PathSetTask(luigi.Task):
""" """
src = luigi.Parameter() src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
# TODO: modify this to get default values from a configuration file,
# and use that to determine whether running in a cluster or locally.
# It will be decoupled from the use of S3PathTask/HDFSPathTask.
# Instead, these will be distinguished by different protocol names.
run_locally = luigi.BooleanParameter() run_locally = luigi.BooleanParameter()
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs) super(PathSetTask, self).__init__(*args, **kwargs)
self.s3 = None self.s3_conn = None
def requires(self): def requires(self):
if self.src.startswith('s3'): if self.src.startswith('s3'):
for bucket, root, path in self._generate_sources(): # connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = boto.connect_s3()
for bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = join_as_s3_url(bucket, root, path) source = join_as_s3_url(bucket, root, path)
if self.run_locally: if self.run_locally:
yield luigi.s3.S3PathTask(source) yield luigi.s3.S3PathTask(source)
...@@ -100,27 +99,6 @@ class PathSetTask(luigi.Task): ...@@ -100,27 +99,6 @@ class PathSetTask(luigi.Task):
def output(self): def output(self):
return [task.output() for task in self.requires()] return [task.output() for task in self.requires()]
def _generate_sources(self):
bucket_name, root = get_s3_bucket_key_names(self.src)
# connect lazily, only if necessary:
if self.s3 is None:
self.s3 = boto.connect_s3()
bucket = self.s3.get_bucket(bucket_name)
keys = (s.key for s in bucket.list(root) if s.size > 0)
# remove root
paths = (k.lstrip(root).strip('/') for k in keys)
paths = self._filter_matches(paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(self, names):
patterns = self.include
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
def get_target_for_url(dest, output_name, run_locally=False): def get_target_for_url(dest, output_name, run_locally=False):
""" """
......
import os.path import os.path
from fnmatch import fnmatch
from urlparse import urlparse
import boto import boto
import luigi import luigi
import luigi.s3 import luigi.s3
from s3_util import join_as_s3_url, get_s3_bucket_key_names, generate_s3_sources, get_s3_key
class S3Copy(luigi.Task): class S3Copy(luigi.Task):
""" """
...@@ -47,8 +46,8 @@ class S3Copy(luigi.Task): ...@@ -47,8 +46,8 @@ class S3Copy(luigi.Task):
if not dst.exists(): if not dst.exists():
return False return False
src_key = self._get_s3_key(src.path) src_key = get_s3_key(self.s3, src.path)
dst_key = self._get_s3_key(dst.path) dst_key = get_s3_key(self.s3, dst.path)
if dst_key.size != src_key.size: if dst_key.size != src_key.size:
return False return False
...@@ -63,19 +62,13 @@ class S3Copy(luigi.Task): ...@@ -63,19 +62,13 @@ class S3Copy(luigi.Task):
src_url = self.input().path src_url = self.input().path
dst_url = self.output().path dst_url = self.output().path
src_key = self._get_s3_key(src_url) src_key = get_s3_key(self.s3, src_url)
dst_bucket_name, dst_key_name = get_s3_bucket_key_names(dst_url) dst_bucket_name, dst_key_name = get_s3_bucket_key_names(dst_url)
# The copy overwrittes the destination. The task checks if # The copy overwrites the destination. The task checks if
# that is necessary during the `complete()` call. # that is necessary during the `complete()` call.
src_key.copy(dst_bucket_name, dst_key_name) src_key.copy(dst_bucket_name, dst_key_name)
def _get_s3_key(self, url):
bucket_name, key_name = get_s3_bucket_key_names(url)
bucket = self.s3.get_bucket(bucket_name)
key = bucket.get_key(key_name)
return key
class S3Sync(luigi.Task): class S3Sync(luigi.Task):
""" """
...@@ -112,7 +105,7 @@ class S3Sync(luigi.Task): ...@@ -112,7 +105,7 @@ class S3Sync(luigi.Task):
return [boto] return [boto]
def requires(self): def requires(self):
for bucket, root, path in self._generate_sources(): for bucket, root, path in generate_s3_sources(self.s3, self.source, self.include):
source = join_as_s3_url(bucket, root, path) source = join_as_s3_url(bucket, root, path)
destination = os.path.join(self.destination, path) destination = os.path.join(self.destination, path)
yield S3Copy(source, destination) yield S3Copy(source, destination)
...@@ -121,34 +114,3 @@ class S3Sync(luigi.Task): ...@@ -121,34 +114,3 @@ class S3Sync(luigi.Task):
for task in self.requires(): for task in self.requires():
yield task.output() yield task.output()
def _generate_sources(self):
bucket_name, root = get_s3_bucket_key_names(self.source)
bucket = self.s3.get_bucket(bucket_name)
keys = (s.key for s in bucket.list(root) if s.size > 0)
# Make paths relative by removing root
paths = (k.lstrip(root).strip('/') for k in keys)
# Filter only paths that match the include patterns
paths = self._filter_matches(paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(self, names):
patterns = self.include
# Return only key names that match any of the include patterns
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
def get_s3_bucket_key_names(url):
"""Extract the bucket and key names from a S3 URL"""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Combine bucket name, root path and relative path into a S3 URL"""
return 's3://{0}/{1}/{2}'.format(bucket, root, path)
"""
Utility methods for interacting with S3 via boto.
"""
from fnmatch import fnmatch
from urlparse import urlparse
import boto
def get_s3_bucket_key_names(url):
"""Extract the bucket and key names from a S3 URL"""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Combine bucket name, root path and relative path into a S3 URL"""
return 's3://{0}/{1}/{2}'.format(bucket, root, path)
def get_s3_key(s3_conn, url):
"""Returns an S3 key for use in further boto actions."""
bucket_name, key_name = get_s3_bucket_key_names(url)
bucket = s3_conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
return key
def generate_s3_sources(s3_conn, source, patterns):
"""
Returns a list of S3 sources that match filters.
Args:
s3_conn: a boto connection to S3.
source: a url to S3.
patterns: a list of strings, each of which defines a pattern to match.
Yields:
(bucket, root, path) tuples for each matching file on S3.
where `bucket` and `root` are derived from the source url,
and `path` is a matching path relative to the `source`.
Does not include zero-length files.
"""
bucket_name, root = get_s3_bucket_key_names(source)
bucket = s3_conn.get_bucket(bucket_name)
# Skip keys that have zero size. This allows directories
# to be skipped, but also skips legitimate files that are
# also zero-length.
keys = (s.key for s in bucket.list(root) if s.size > 0)
# Make paths relative by removing root
paths = (k[len(root):].lstrip('/') for k in keys)
# Filter only paths that match the include patterns
paths = _filter_matches(patterns, paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(patterns, names):
"""Return only key names that match any of the include patterns."""
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
...@@ -3,24 +3,111 @@ Tests for tasks that collect enrollment events. ...@@ -3,24 +3,111 @@ Tests for tasks that collect enrollment events.
""" """
import unittest import unittest
import json
from edx.analytics.tasks.course_enroll import ( from edx.analytics.tasks.course_enroll import (
BaseCourseEnrollmentEventsPerDay, CourseEnrollmentEventsPerDayMixin,
BaseCourseEnrollmentChangesPerDay, CourseEnrollmentChangesPerDayMixin,
BaseCourseEnrollmentTotalsPerDay,
) )
from datetime import datetime
class CourseEnrollEventMapTest(unittest.TestCase):
"""
Tests to verify that event log parsing by mapper works correctly.
"""
def setUp(self):
self.task = CourseEnrollmentEventsPerDayMixin()
self.course_id = "MITx/8.02x/2013_Spring"
self.user_id = 21
self.timestamp = "2013-12-17T15:38:32"
def _create_event_log_line(self, **kwargs):
"""Create an event log with test values, as a JSON string."""
return json.dumps(self._create_event_dict(**kwargs))
def _create_event_dict(self, **kwargs):
"""Create an event log with test values, as a dict."""
# Define default values for event log entry.
org_id = self.course_id.split('/')[0]
event_dict = {
"username": "test_user",
"host": "test_host",
"event_source": "server",
"event_type": "edx.course.enrollment.activated",
"context": {
"course_id": self.course_id,
"org_id": org_id,
"user_id": self.user_id,
},
"time": "{}.805444+00:00".format(self.timestamp),
"ip": "127.0.0.1",
"event": {
"course_id": self.course_id,
"user_id": self.user_id,
"mode": "honor",
},
"agent": "blah, blah, blah",
"page": None
}
event_dict.update(**kwargs)
return event_dict
def assert_no_output_for(self, line):
self.assertEquals(list(self.task.mapper(line)), [])
def test_non_enrollment_event(self):
line = 'this is garbage'
self.assert_no_output_for(line)
def test_unparseable_enrollment_event(self):
line = 'this is garbage but contains edx.course.enrollment'
self.assert_no_output_for(line)
def test_missing_event_type(self):
event_dict = self._create_event_dict()
del event_dict['event_type']
line = json.dumps(event_dict)
self.assert_no_output_for(line)
def test_nonenroll_event_type(self):
line = self._create_event_log_line(event_type='edx.course.enrollment.unknown')
self.assert_no_output_for(line)
def test_bad_datetime(self):
line = self._create_event_log_line(time='this is a bogus time')
self.assert_no_output_for(line)
def test_bad_event_data(self):
line = self._create_event_log_line(event=["not an event"])
self.assert_no_output_for(line)
def test_illegal_course_id(self):
line = self._create_event_log_line(event={"course_id": ";;;;bad/id/val", "user_id": self.user_id})
self.assert_no_output_for(line)
def test_missing_user_id(self):
line = self._create_event_log_line(event={"course_id": self.course_id})
self.assert_no_output_for(line)
def test_good_enroll_event(self):
line = self._create_event_log_line()
event = list(self.task.mapper(line))
expected = [((self.course_id, self.user_id), (self.timestamp, 1))]
self.assertEquals(event, expected)
def test_good_unenroll_event(self):
line = self._create_event_log_line(event_type='edx.course.enrollment.deactivated')
event = list(self.task.mapper(line))
expected = [((self.course_id, self.user_id), (self.timestamp, -1))]
self.assertEquals(event, expected)
class CourseEnrollEventReduceTest(unittest.TestCase): class CourseEnrollEventReduceTest(unittest.TestCase):
""" """
Tests to verify that event log parsing works correctly. Tests to verify that events-per-day-per-user reducer works correctly.
""" """
def setUp(self): def setUp(self):
self.task = BaseCourseEnrollmentEventsPerDay() self.task = CourseEnrollmentEventsPerDayMixin()
self.key = ('course', 'user') self.key = ('course', 'user')
def _get_reducer_output(self, values): def _get_reducer_output(self, values):
...@@ -71,7 +158,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase): ...@@ -71,7 +158,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase):
(('course', '2013-01-01'), 1), (('course', '2013-01-01'), 1),
]) ])
def test_multiple_events_out_of_order(self): def test_multiple_events_out_of_order(self):
# Make sure that events are sorted by the reducer. # Make sure that events are sorted by the reducer.
self.assertEquals(self._get_reducer_output( self.assertEquals(self._get_reducer_output(
...@@ -96,7 +182,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase): ...@@ -96,7 +182,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase):
(('course', '2013-01-01'), 1), (('course', '2013-01-01'), 1),
]) ])
def test_multiple_unenroll_events_on_same_day(self): def test_multiple_unenroll_events_on_same_day(self):
self.assertEquals(self._get_reducer_output( self.assertEquals(self._get_reducer_output(
[ [
...@@ -122,7 +207,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase): ...@@ -122,7 +207,6 @@ class CourseEnrollEventReduceTest(unittest.TestCase):
(('course', '2013-01-01'), 1), (('course', '2013-01-01'), 1),
]) ])
def test_multiple_events_on_many_days(self): def test_multiple_events_on_many_days(self):
# Run with an arbitrary list of events. # Run with an arbitrary list of events.
self.assertEquals(self._get_reducer_output( self.assertEquals(self._get_reducer_output(
...@@ -153,10 +237,10 @@ class CourseEnrollEventReduceTest(unittest.TestCase): ...@@ -153,10 +237,10 @@ class CourseEnrollEventReduceTest(unittest.TestCase):
class CourseEnrollChangesReduceTest(unittest.TestCase): class CourseEnrollChangesReduceTest(unittest.TestCase):
""" """
Verify that BaseCourseEnrollmentChangesPerDay.reduce() works correctly. Verify that CourseEnrollmentChangesPerDayMixin.reduce() works correctly.
""" """
def setUp(self): def setUp(self):
self.task = BaseCourseEnrollmentChangesPerDay() self.task = CourseEnrollmentChangesPerDayMixin()
self.key = ('course', '2013-01-01') self.key = ('course', '2013-01-01')
def _get_reducer_output(self, values): def _get_reducer_output(self, values):
...@@ -172,44 +256,3 @@ class CourseEnrollChangesReduceTest(unittest.TestCase): ...@@ -172,44 +256,3 @@ class CourseEnrollChangesReduceTest(unittest.TestCase):
def test_multiple_user_count(self): def test_multiple_user_count(self):
inputs = [1, 1, 1, -1, 1] inputs = [1, 1, 1, -1, 1]
self.assertEquals(self._get_reducer_output(inputs), [(self.key, 3)]) self.assertEquals(self._get_reducer_output(inputs), [(self.key, 3)])
class CourseEnrollTotalsReduceTest(unittest.TestCase):
"""
Verify that BaseCourseEnrollmentTotalsPerDay.reduce() works correctly.
"""
def setUp(self):
self.task = BaseCourseEnrollmentTotalsPerDay()
self.key = 'course'
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), [])
def test_single_user_count(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01', 5),
]),
[
(self.key, '2013-01-01', 5),
])
def test_multiple_user_count(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01', 5),
('2013-01-02', 8),
('2013-01-03', 4),
('2013-01-04', 9),
]),
[
(self.key, '2013-01-01', 5),
(self.key, '2013-01-02', 13),
(self.key, '2013-01-03', 17),
(self.key, '2013-01-04', 26),
])
...@@ -12,6 +12,7 @@ PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$') ...@@ -12,6 +12,7 @@ PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
ALLOWED_ID_CHARS = r'[a-zA-Z0-9_\-~.:]' ALLOWED_ID_CHARS = r'[a-zA-Z0-9_\-~.:]'
PATTERN_COURSEID = re.compile(r'^' + ALLOWED_ID_CHARS + r'+$') PATTERN_COURSEID = re.compile(r'^' + ALLOWED_ID_CHARS + r'+$')
def is_valid_course_id(course_id): def is_valid_course_id(course_id):
""" """
Determines if a course_id from an event log is possibly legitimate. Determines if a course_id from an event log is possibly legitimate.
...@@ -24,6 +25,7 @@ def is_valid_course_id(course_id): ...@@ -24,6 +25,7 @@ def is_valid_course_id(course_id):
Note this will need to be updated as split-mongo changes are rolled out Note this will need to be updated as split-mongo changes are rolled out
that permit a broader set of id values. that permit a broader set of id values.
""" """
# TODO: [split-mongo] verify after course_id name changes.
components = course_id.split('/') components = course_id.split('/')
if len(components) != 3: if len(components) != 3:
return False return False
...@@ -32,6 +34,7 @@ def is_valid_course_id(course_id): ...@@ -32,6 +34,7 @@ def is_valid_course_id(course_id):
def json_decode(line): def json_decode(line):
"""Wrapper to decode JSON string in an implementation-independent way.""" """Wrapper to decode JSON string in an implementation-independent way."""
# TODO: Verify correctness of cjson
return cjson.decode(line) return cjson.decode(line)
...@@ -48,25 +51,26 @@ def parse_eventlog_item(line, nested=False): ...@@ -48,25 +51,26 @@ def parse_eventlog_item(line, nested=False):
""" """
try: try:
parsed = json_decode(line) parsed = json_decode(line)
except: except Exception:
if not nested: if not nested:
json_match = PATTERN_JSON.match(line) json_match = PATTERN_JSON.match(line)
if json_match: if json_match:
return parse_eventlog_item(json_match.group(1), nested=True) return parse_eventlog_item(json_match.group(1), nested=True)
# Seem to be truncated in input data at 10000 for some log files, 2043 for others... # TODO: There are too many to be logged. It might be useful
# First filter out common ones: # at some point to collect stats on the length of truncation
# if 'save_problem_check' not in line: # and the counts for different event "names" (normalized
# sys.stderr.write("ERROR: encountered event with bad json: length = {len} start={start}\n".format(len=len(line), start=line[:40])) # event_type values).
# Even that leaves too many to log.
# TODO: Might be good going forward to collect stats on the length of truncation and the counts for # Note that empirically some seem to be truncated in input
# different event "names" (normalized event_type values). # data at 10000 characters, 2043 for others...
return None return None
return parsed return parsed
def log_item(msg, item, level='ERROR'): def log_item(msg, item, level='ERROR'):
"""Writes a message about an eventlog item.""" """Writes a message about an eventlog item."""
# TODO: replace this with real logging.
sys.stderr.write("{level}: {msg}: {item}\n".format(msg=msg, item=item, level=level)) sys.stderr.write("{level}: {msg}: {item}\n".format(msg=msg, item=item, level=level))
...@@ -75,28 +79,28 @@ def log_item(msg, item, level='ERROR'): ...@@ -75,28 +79,28 @@ def log_item(msg, item, level='ERROR'):
# * timestamp: a string, with date and time (to second), in ISO format. # * timestamp: a string, with date and time (to second), in ISO format.
# * datestamp: a string with only date information, in ISO format. # * datestamp: a string with only date information, in ISO format.
def get_timestamp(datetime): def datetime_to_timestamp(datetime_obj):
"""Returns a string with the datetime value of the provided datetime object.""" """Returns a string with the datetime value of the provided datetime object."""
return datetime.strftime('%Y-%m-%dT%H:%M:%S') return datetime_obj.strftime('%Y-%m-%dT%H:%M:%S')
def get_datestamp(datetime): def datetime_to_datestamp(datetime_obj):
"""Returns a string with the date value of the provided datetime object.""" """Returns a string with the date value of the provided datetime object."""
return datetime.strftime('%Y-%m-%d') return datetime_obj.strftime('%Y-%m-%d')
def get_datestamp_from_timestamp(timestamp): def timestamp_to_datestamp(timestamp):
"""Returns a string with the date value of the provided ISO datetime string.""" """Returns a string with the date value of the provided ISO datetime string."""
return timestamp.split('T')[0] return timestamp.split('T')[0]
def get_datetime(item): def get_event_time(item):
"""Returns a datetime object from an event item, if present.""" """Returns a datetime object from an event item, if present."""
try: try:
timestamp = item['time'] timestamp = item['time']
removed_ms = timestamp.split('.')[0] removed_ms = timestamp.split('.')[0]
return datetime.datetime.strptime(removed_ms, '%Y-%m-%dT%H:%M:%S') return datetime.datetime.strptime(removed_ms, '%Y-%m-%dT%H:%M:%S')
except: except Exception:
return None return None
...@@ -116,7 +120,7 @@ def get_event_data(item): ...@@ -116,7 +120,7 @@ def get_event_data(item):
# If the value is a string, try to parse as JSON into a dict. # If the value is a string, try to parse as JSON into a dict.
try: try:
event_value = json_decode(event_value) event_value = json_decode(event_value)
except: except Exception:
log_item("encountered event with unparsable event value", item) log_item("encountered event with unparsable event value", item)
return None return None
...@@ -126,4 +130,3 @@ def get_event_data(item): ...@@ -126,4 +130,3 @@ def get_event_data(item):
else: else:
log_item("encountered event data with unrecognized type", item) log_item("encountered event data with unrecognized type", item)
return None return None
...@@ -7,9 +7,28 @@ import unittest ...@@ -7,9 +7,28 @@ import unittest
import edx.analytics.util.eventlog as eventlog import edx.analytics.util.eventlog as eventlog
class EventLogTest(unittest.TestCase):
class CourseIdTest(unittest.TestCase):
""" """
Tests to verify that event log parsing works correctly. Verify that course_id filtering works correctly.
"""
def test_normal_course_id(self):
course_id = "org/course_id/course_run"
self.assertTrue(eventlog.is_valid_course_id(course_id))
def test_course_id_without_components(self):
course_id = "org:course_id:course_run"
self.assertFalse(eventlog.is_valid_course_id(course_id))
def test_course_id_with_nonascii(self):
course_id = u"org/course\ufffd_id/course_run"
self.assertFalse(eventlog.is_valid_course_id(course_id))
class ParseEventLogTest(unittest.TestCase):
"""
Verify that event log parsing works correctly.
""" """
def test_parse_valid_eventlog_item(self): def test_parse_valid_eventlog_item(self):
...@@ -34,3 +53,43 @@ class EventLogTest(unittest.TestCase): ...@@ -34,3 +53,43 @@ class EventLogTest(unittest.TestCase):
self.assertEquals(result['username'], u'b\ufffdb') self.assertEquals(result['username'], u'b\ufffdb')
class TimestampTest(unittest.TestCase):
"""Verify timestamp-related functions."""
def test_datestamp_from_timestamp(self):
timestamp = "2013-12-17T15:38:32"
self.assertEquals(eventlog.timestamp_to_datestamp(timestamp), "2013-12-17")
def test_missing_datetime(self):
item = {"something else": "not an event"}
self.assertIsNone(eventlog.get_event_time(item))
def test_good_datetime(self):
item = {"time": "2013-12-17T15:38:32.805444+00:00"}
dt_value = eventlog.get_event_time(item)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
class GetEventDataTest(unittest.TestCase):
"""Verify that get_event_data works as expected."""
def test_missing_event_data(self):
item = {"something else": "not an event"}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_bad_string_event_data(self):
item = {"event": "a string but not JSON"}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_json_string_event_data(self):
item = {"event": '{ "a string": "that is JSON"}'}
self.assertEquals(eventlog.get_event_data(item), {"a string": "that is JSON"})
def test_event_data_with_unknown_type(self):
item = {"event": ["a list", "of strings"]}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_dict_event_data(self):
item = {"event": {"a dict": "that has strings"}}
self.assertEquals(eventlog.get_event_data(item), {"a dict": "that has strings"})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment