Commit 770e7c30 by brianhw

Merge pull request #3 from edx/brian/course_enroll

Add first pass at course enrollment calculation.
parents 80b4b413 7aa25025
# .coveragerc for analytics-tasks
[run]
data_file = .coverage
source = edx/analytics
[report]
ignore_errors = True
[html]
title = Analytics-Tasks Python Test Coverage Report
directory = report
[xml]
output = coverage.xml
[pep8]
ignore=E501
max_line_length=119
......@@ -11,3 +11,21 @@ requirements:
test-requirements: requirements
pip install -r requirements/test.txt
test: test-requirements
# TODO: when we have better coverage, modify this to actually fail when coverage is too low.
rm -rf .coverage
python -m coverage run --rcfile=./.coveragerc `which nosetests`
coverage: test
coverage html
coverage xml -o coverage.xml
diff-cover coverage.xml --html-report diff_cover.html
# Compute quality
diff-quality --violations=pep8 --html-report diff_quality_pep8.html
diff-quality --violations=pylint --html-report diff_quality_pylint.html
# Compute style violations
pep8 > pep8.report || echo "Not pep8 clean"
pylint -f parseable edx > pylint.report || echo "Not pylint clean"
[core]
logging_conf_file=logging.cfg
[event-logs]
source = s3://edx-all-tracking-logs
destination = s3://edx-analytics-events/raw/by-server
include = prod-edx-*/tracking.log-*.gz
prod-edxapp-*/tracking.log-*.gz
"""
Luigi tasks for extracting course enrollment statistics from tracking log files.
"""
import luigi
import luigi.hadoop
import luigi.s3
import luigi.hdfs
import edx.analytics.util.eventlog as eventlog
from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask
import logging
logger = logging.getLogger(__name__)
UNENROLLED = -1
ENROLLED = 1
################################
# Task Map-Reduce definitions
################################
class CourseEnrollmentEventsPerDayMixin(object):
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
def mapper(self, line):
"""
Generates output values for explicit enrollment events.
Args:
line: text line from a tracking event log.
Yields:
(course_id, user_id), (timestamp, action_value)
where `timestamp` is in ISO format, with resolution to the millisecond
and `action_value` = 1 (enrolled) or -1 (unenrolled).
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05.123456, 1)
"""
parsed_tuple_or_none = get_explicit_enrollment_output(line)
if parsed_tuple_or_none is not None:
yield parsed_tuple_or_none
def reducer(self, key, values):
"""
Calculate status for each user on the end of each day where they changed their status.
Args:
key: (course_id, user_id) tuple
values: iterator of (timestamp, action_value) tuples
Yields:
(course_id, datestamp), enrollment_change
where `datestamp` is in ISO format, with resolution to the day
and `enrollment_change` is the change on that date for an individual user.
Produced values are -1 or 1.
No output is yielded if a user enrolls and then unenrolls (or unenrolls and
then enrolls) on a given day. Only days with a change at the end of the day
when compared with the previous day are output.
Note that we don't bother to actually output the user_id,
since it's not needed downstream (though it might be sometimes useful
for debugging).
Also note that this can be greatly simplified if we have
confidence that there are no duplicate enrollment events. Then
we could just have the mapper generate output with the key as
(course_id, user_id, datestamp), with a value of just the
action. Then the reducer just sums the actions for the date,
and yields those days with non-zero values.
Example output:
(edX/DemoX/Demo_Course, 2013-09-10), 1
(edX/DemoX/Demo_Course, 2013-09-12), -1
"""
logger.debug("Found key in reducer: %s", key)
course_id, user_id = key
# Sort input values (by timestamp) to easily detect the end of a day.
# Note that this assumes the timestamp values (strings) are in ISO
# representation, so that the tuples will be ordered in ascending time value.
sorted_values = sorted(values)
if len(sorted_values) <= 0:
return
# Convert timestamps to dates, so we can group them by day.
func = eventlog.timestamp_to_datestamp
values = [(func(timestamp), value) for timestamp, value in sorted_values]
# Add a stop item to ensure we process the last entry.
values = values + [(None, None)]
# Remove the first action and use it to initialize the state machine.
first_date, first_enrollment_status = values.pop(0)
last_reported_enrollment_status = UNENROLLED if first_enrollment_status == ENROLLED else ENROLLED
enrollment_status = first_enrollment_status
prev_date = first_date
for (this_date, new_enrollment_status) in values:
# Before we process a new date, report the state if it has
# changed from the previously reported, if any.
if this_date != prev_date and enrollment_status != last_reported_enrollment_status:
logger.debug("outputting date and value: %s %s", prev_date, enrollment_status)
last_reported_enrollment_status = enrollment_status
yield (course_id, prev_date), enrollment_status
logger.debug("accumulating date and value: %s %s", this_date, new_enrollment_status)
# Consecutive changes of the same kind don't affect the state.
if new_enrollment_status != enrollment_status:
enrollment_status = new_enrollment_status
else:
logger.warning("WARNING: duplicate enrollment event {status} "
"for user_id {user_id} in course {course_id} on {date}".format(
status=new_enrollment_status, user_id=user_id, course_id=course_id, date=this_date))
prev_date = this_date
class CourseEnrollmentChangesPerDayMixin(object):
"""Calculates daily changes in enrollment, given per-user net changes by date."""
def mapper(self, line):
"""
Args: tab-delimited values in a single text line
Yields: (course_id, datestamp), enrollment_change
Example:
(edX/DemoX/Demo_Course, 2013-09-10), 1
(edX/DemoX/Demo_Course, 2013-09-12), -1
"""
course_id, date, enrollment_change = line.split('\t')
yield (course_id, date), enrollment_change
def reducer(self, key, values):
"""
Reducer: sums enrollments for a given course on a particular date.
Args:
key: (course_id, datestamp) tuple
values: iterator of enrollment_changes
Input `enrollment_changes` are the enrollment changes on a day due to a specific user.
Each user with a change has a separate input, either -1 (unenroll) or 1 (enroll).
Yields:
(course_id, datestamp), enrollment_change
Output `enrollment_change` is summed across all users, one output per course.
"""
logger.debug("Found key in second reducer: %s", key)
count = sum(int(v) for v in values)
yield key, count
##################################
# Task requires/output definitions
##################################
class BaseCourseEnrollmentTask(luigi.hadoop.JobTask):
"""
Base class for course enrollment calculations.
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
run_locally: a boolean flag to indicate that the task should be run locally rather than
on a hadoop cluster. This is used only to change the intepretation of S3 URLs in src and/or dest.
"""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def extra_modules(self):
# The following are needed for (almost) every course enrollment task.
# Boto is used for S3 access, cjson for parsing log files, and util
# is used for parsing events and date conversion.
import boto
import cjson
import edx.analytics.util
return [boto, edx.analytics.util, cjson]
class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
def requires(self):
return PathSetTask(self.src, self.include, self.run_locally)
def output(self):
output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily changes in enrollment, given per-user net changes by date."""
def requires(self):
return CourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
def output(self):
output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
################################
# Helper methods
################################
def get_explicit_enrollment_output(line):
"""
Generates output values for explicit enrollment events.
Args:
line: text line from a tracking event log.
Returns:
(course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the millisecond.
or None if there is no valid enrollment event on the line.
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05.123456, 1)
"""
# Before parsing, check that the line contains something that
# suggests it's an enrollment event.
if 'edx.course.enrollment' not in line:
return None
# try to parse the line into a dict:
event = eventlog.parse_json_event(line)
if event is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
# get event type, and check that it exists:
event_type = event.get('event_type')
if event_type is None:
logger.error("encountered event with no event_type: %s", event)
return None
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = ENROLLED
elif event_type == 'edx.course.enrollment.deactivated':
action_value = UNENROLLED
else:
# not an enrollment event...
return None
# get the timestamp:
datetime = eventlog.get_event_time(event)
if datetime is None:
logger.error("encountered event with bad datetime: %s", event)
return None
timestamp = eventlog.datetime_to_timestamp(datetime)
# Use the `user_id` from the event `data` field, since the
# `user_id` in the `context` field is the user who made the
# request but not necessarily the one who got enrolled. (The
# `course_id` should be the same in `context` as in `data`.)
# Get the event data:
event_data = eventlog.get_event_data(event)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
# Get the course_id from the data, and validate.
course_id = event_data['course_id']
if not eventlog.is_valid_course_id(course_id):
logger.error("encountered explicit enrollment event with bogus course_id: %s", event)
return None
# Get the user_id from the data:
user_id = event_data.get('user_id')
if user_id is None:
logger.error("encountered explicit enrollment event with no user_id: %s", event)
return None
# For now, ignore the enrollment 'mode' (e.g. 'honor').
return (course_id, user_id), (timestamp, action_value)
"""
Main method for running tasks.
Invoke a task by running `launch-task` with task's classname and
arguments for Luigi and for the task. Use `remote-task` to run
to submit the task to run on an EMR cluster.
Example command lines for various tasks:
* CourseEnrollmentChangesPerDay:
launch-task --local-scheduler CourseEnrollmentChangesPerDay
--name mytest --src input --include 'tracking*' --include '2012*'
--dest output7
remote-task --job-flow-id <job-id> --branch <branch-name> --remote-name run-20140204
--local-scheduler CourseEnrollmentChangesPerDay
--name run-20140204 --src s3://edx-all-tracking-logs --include 'prod-edx*/tracking.*-201312*.gz'
--include 'prod-edx*/tracking.*-2014*.gz' --dest s3://edx-analytics-scratch/output
"""
import os.path
import logging
......@@ -26,7 +48,7 @@ def main():
configuration.add_config_path(DEFAULT_CONFIGURATION_FILE)
if not os.path.isfile(DEFAULT_CONFIGURATION_FILE):
log.warning('Default configuration file not found:', DEFAULT_CONFIGURATION_FILE)
log.warning('Default configuration file not found: %s', DEFAULT_CONFIGURATION_FILE)
# Tell luigi what dependencies to pass to the Hadoop nodes
# - argparse is not included by default in python 2.6
......
"""
Helper classes to specify file dependencies for input and output.
Supports inputs from S3 and local FS.
Supports outputs to HDFS, S3, and local FS.
"""
import os
import boto
import glob
import luigi
import luigi.s3
import luigi.hdfs
import luigi.format
from edx.analytics.tasks.s3_util import join_as_s3_url, generate_s3_sources
class LocalPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in a local file system.
Treats files ending with .gz as Gzip files.
"""
path = luigi.Parameter()
def output(self):
if self.path.endswith('.gz'):
yield luigi.LocalTarget(self.path, format=luigi.format.Gzip)
else:
yield luigi.LocalTarget(self.path)
class HdfsPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in HDFS.
"""
path = luigi.Parameter()
def output(self):
return luigi.hdfs.HdfsTarget(self.path)
class PathSetTask(luigi.Task):
"""
A task to select a subset of files in an S3 bucket or local FS.
Parameters:
src: a URL pointing to a folder in s3:// or local FS.
include: a list of patterns to use to select. Multiple patterns are OR'd.
run_locally: if True, use S3PathTask instead of HDFSPathTask, to permit
reading S3 data when running in local mode.
"""
src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
# TODO: modify this to get default values from a configuration file,
# and use that to determine whether running in a cluster or locally.
# It will be decoupled from the use of S3PathTask/HDFSPathTask.
# Instead, these will be distinguished by different protocol names.
run_locally = luigi.BooleanParameter()
def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs)
self.s3_conn = None
def requires(self):
if self.src.startswith('s3'):
# connect lazily as needed:
if self.s3_conn is None:
self.s3_conn = boto.connect_s3()
for bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = join_as_s3_url(bucket, root, path)
if self.run_locally:
yield luigi.s3.S3PathTask(source)
else:
yield HdfsPathTask(source)
else:
filelist = []
for include_val in self.include:
glob_pattern = "{src}/{include}".format(src=self.src, include=include_val)
filelist.extend(glob.glob(glob_pattern))
for filepath in filelist:
yield LocalPathTask(filepath)
def complete(self):
# An optimization: just declare that the task is always
# complete, by definition, because it is whatever files were
# requested that match the filter, not a set of files whose
# existence needs to be checked or generated again.
return True
def output(self):
return [task.output() for task in self.requires()]
def get_target_for_url(dest, output_name, run_locally=False):
"""
Generate an appropriate target for a given path, depending on protocol.
Parameters:
dest: a URL pointing to a folder in s3:// or hdfs:// or local FS.
output_name: name of file to be output.
run_locally: if True, use S3Target instead of HdfsTarget, to permit
writing S3 data when running in local mode.
"""
output_url = os.path.join(dest, output_name)
if output_url.startswith('s3://'):
if run_locally:
return luigi.s3.S3Target(output_url)
else:
return luigi.hdfs.HdfsTarget(output_url)
elif output_url.startswith('hdfs://'):
return luigi.hdfs.HdfsTarget(output_url)
else:
return luigi.LocalTarget(output_url)
import os.path
from fnmatch import fnmatch
from urlparse import urlparse
import boto
import luigi
import luigi.s3
from edx.analytics.tasks.s3_util import join_as_s3_url, get_s3_bucket_key_names, generate_s3_sources, get_s3_key
class S3Copy(luigi.Task):
"""
......@@ -47,8 +47,8 @@ class S3Copy(luigi.Task):
if not dst.exists():
return False
src_key = self._get_s3_key(src.path)
dst_key = self._get_s3_key(dst.path)
src_key = get_s3_key(self.s3, src.path)
dst_key = get_s3_key(self.s3, dst.path)
if dst_key.size != src_key.size:
return False
......@@ -63,19 +63,13 @@ class S3Copy(luigi.Task):
src_url = self.input().path
dst_url = self.output().path
src_key = self._get_s3_key(src_url)
src_key = get_s3_key(self.s3, src_url)
dst_bucket_name, dst_key_name = get_s3_bucket_key_names(dst_url)
# The copy overwrittes the destination. The task checks if
# The copy overwrites the destination. The task checks if
# that is necessary during the `complete()` call.
src_key.copy(dst_bucket_name, dst_key_name)
def _get_s3_key(self, url):
bucket_name, key_name = get_s3_bucket_key_names(url)
bucket = self.s3.get_bucket(bucket_name)
key = bucket.get_key(key_name)
return key
class S3Sync(luigi.Task):
"""
......@@ -112,7 +106,7 @@ class S3Sync(luigi.Task):
return [boto]
def requires(self):
for bucket, root, path in self._generate_sources():
for bucket, root, path in generate_s3_sources(self.s3, self.source, self.include):
source = join_as_s3_url(bucket, root, path)
destination = os.path.join(self.destination, path)
yield S3Copy(source, destination)
......@@ -120,35 +114,3 @@ class S3Sync(luigi.Task):
def output(self):
for task in self.requires():
yield task.output()
def _generate_sources(self):
bucket_name, root = get_s3_bucket_key_names(self.source)
bucket = self.s3.get_bucket(bucket_name)
keys = (s.key for s in bucket.list(root) if s.size > 0)
# Make paths relative by removing root
paths = (k.lstrip(root).strip('/') for k in keys)
# Filter only paths that match the include patterns
paths = self._filter_matches(paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(self, names):
patterns = self.include
# Return only key names that match any of the include patterns
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
def get_s3_bucket_key_names(url):
"""Extract the bucket and key names from a S3 URL"""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Combine bucket name, root path and relative path into a S3 URL"""
return 's3://{0}/{1}/{2}'.format(bucket, root, path)
"""
Utility methods for interacting with S3 via boto.
"""
from fnmatch import fnmatch
from urlparse import urlparse
def get_s3_bucket_key_names(url):
"""Extract the bucket and key names from a S3 URL"""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Combine bucket name, root path and relative path into a S3 URL"""
return 's3://{0}/{1}/{2}'.format(bucket, root, path)
def get_s3_key(s3_conn, url):
"""Returns an S3 key for use in further boto actions."""
bucket_name, key_name = get_s3_bucket_key_names(url)
bucket = s3_conn.get_bucket(bucket_name)
key = bucket.get_key(key_name)
return key
def generate_s3_sources(s3_conn, source, patterns):
"""
Returns a list of S3 sources that match filters.
Args:
s3_conn: a boto connection to S3.
source: a url to S3.
patterns: a list of strings, each of which defines a pattern to match.
Yields:
(bucket, root, path) tuples for each matching file on S3.
where `bucket` and `root` are derived from the source url,
and `path` is a matching path relative to the `source`.
Does not include zero-length files.
"""
bucket_name, root = get_s3_bucket_key_names(source)
bucket = s3_conn.get_bucket(bucket_name)
# Skip keys that have zero size. This allows directories
# to be skipped, but also skips legitimate files that are
# also zero-length.
keys = (s.key for s in bucket.list(root) if s.size > 0)
# Make paths relative by removing root
paths = (k[len(root):].lstrip('/') for k in keys)
# Filter only paths that match the include patterns
paths = _filter_matches(patterns, paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(patterns, names):
"""Return only key names that match any of the include patterns."""
func = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if func(n))
"""
Tests for tasks that collect enrollment events.
"""
import unittest
import json
from edx.analytics.tasks.course_enroll import (
CourseEnrollmentEventsPerDayMixin,
CourseEnrollmentChangesPerDayMixin,
)
class CourseEnrollEventMapTest(unittest.TestCase):
"""
Tests to verify that event log parsing by mapper works correctly.
"""
def setUp(self):
self.task = CourseEnrollmentEventsPerDayMixin()
self.course_id = "MITx/8.02x/2013_Spring"
self.user_id = 21
self.timestamp = "2013-12-17T15:38:32.805444"
def _create_event_log_line(self, **kwargs):
"""Create an event log with test values, as a JSON string."""
return json.dumps(self._create_event_dict(**kwargs))
def _create_event_dict(self, **kwargs):
"""Create an event log with test values, as a dict."""
# Define default values for event log entry.
org_id = self.course_id.split('/')[0]
event_dict = {
"username": "test_user",
"host": "test_host",
"event_source": "server",
"event_type": "edx.course.enrollment.activated",
"context": {
"course_id": self.course_id,
"org_id": org_id,
"user_id": self.user_id,
},
"time": "{}+00:00".format(self.timestamp),
"ip": "127.0.0.1",
"event": {
"course_id": self.course_id,
"user_id": self.user_id,
"mode": "honor",
},
"agent": "blah, blah, blah",
"page": None
}
event_dict.update(**kwargs)
return event_dict
def assert_no_output_for(self, line):
"""Assert that an input line generates no output."""
self.assertEquals(tuple(self.task.mapper(line)), tuple())
def test_non_enrollment_event(self):
line = 'this is garbage'
self.assert_no_output_for(line)
def test_unparseable_enrollment_event(self):
line = 'this is garbage but contains edx.course.enrollment'
self.assert_no_output_for(line)
def test_missing_event_type(self):
event_dict = self._create_event_dict()
event_dict['old_event_type'] = event_dict['event_type']
del event_dict['event_type']
line = json.dumps(event_dict)
self.assert_no_output_for(line)
def test_nonenroll_event_type(self):
line = self._create_event_log_line(event_type='edx.course.enrollment.unknown')
self.assert_no_output_for(line)
def test_bad_datetime(self):
line = self._create_event_log_line(time='this is a bogus time')
self.assert_no_output_for(line)
def test_bad_event_data(self):
line = self._create_event_log_line(event=["not an event"])
self.assert_no_output_for(line)
def test_illegal_course_id(self):
line = self._create_event_log_line(event={"course_id": ";;;;bad/id/val", "user_id": self.user_id})
self.assert_no_output_for(line)
def test_missing_user_id(self):
line = self._create_event_log_line(event={"course_id": self.course_id})
self.assert_no_output_for(line)
def test_good_enroll_event(self):
line = self._create_event_log_line()
event = tuple(self.task.mapper(line))
expected = (((self.course_id, self.user_id), (self.timestamp, 1)),)
self.assertEquals(event, expected)
def test_good_unenroll_event(self):
line = self._create_event_log_line(event_type='edx.course.enrollment.deactivated')
event = tuple(self.task.mapper(line))
expected = (((self.course_id, self.user_id), (self.timestamp, -1)),)
self.assertEquals(event, expected)
class CourseEnrollEventReduceTest(unittest.TestCase):
"""
Tests to verify that events-per-day-per-user reducer works correctly.
"""
def setUp(self):
self.task = CourseEnrollmentEventsPerDayMixin()
self.key = ('course', 'user')
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return tuple(self.task.reducer(self.key, values))
def _check_output(self, inputs, expected):
"""Compare generated with expected output."""
self.assertEquals(self._get_reducer_output(inputs), expected)
def test_no_events(self):
self._check_output([], tuple())
def test_single_enrollment(self):
inputs = [('2013-01-01T00:00:01', 1), ]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_single_unenrollment(self):
inputs = [('2013-01-01T00:00:01', -1), ]
expected = ((('course', '2013-01-01'), -1),)
self._check_output(inputs, expected)
def test_multiple_events_on_same_day(self):
# run first with no output expected:
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', -1),
]
expected = tuple()
self._check_output(inputs, expected)
# then run with output expected:
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_events_out_of_order(self):
# Make sure that events are sorted by the reducer.
inputs = [
('2013-01-01T00:00:04', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
]
expected = tuple()
self._check_output(inputs, expected)
def test_multiple_enroll_events_on_same_day(self):
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_unenroll_events_on_same_day(self):
inputs = [
('2013-01-01T00:00:01', -1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', -1),
]
expected = ((('course', '2013-01-01'), -1),)
self._check_output(inputs, expected)
def test_multiple_enroll_events_on_many_days(self):
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-02T00:00:03', 1),
('2013-01-02T00:00:04', 1),
('2013-01-04T00:00:05', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_events_on_many_days(self):
# Run with an arbitrary list of events.
inputs = [
('2013-01-01T1', 1),
('2013-01-01T2', -1),
('2013-01-01T3', 1),
('2013-01-01T4', -1),
('2013-01-02', 1),
('2013-01-03', 1),
('2013-01-04T1', 1),
('2013-01-04T2', -1),
('2013-01-05', -1),
('2013-01-06', -1),
('2013-01-07', 1),
('2013-01-08T1', 1),
('2013-01-08T2', 1),
('2013-01-09T1', -1),
('2013-01-09T2', -1),
]
expected = (
(('course', '2013-01-02'), 1),
(('course', '2013-01-04'), -1),
(('course', '2013-01-07'), 1),
(('course', '2013-01-09'), -1),
)
self._check_output(inputs, expected)
class CourseEnrollChangesReduceTest(unittest.TestCase):
"""
Verify that CourseEnrollmentChangesPerDayMixin.reduce() works correctly.
"""
def setUp(self):
self.task = CourseEnrollmentChangesPerDayMixin()
self.key = ('course', '2013-01-01')
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return tuple(self.task.reducer(self.key, values))
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), ((self.key, 0),))
def test_single_user_count(self):
self.assertEquals(self._get_reducer_output([1]), ((self.key, 1),))
def test_multiple_user_count(self):
inputs = [1, 1, 1, -1, 1]
self.assertEquals(self._get_reducer_output(inputs), ((self.key, 3),))
"""Support for reading tracking event logs."""
import cjson
import datetime
import re
import logging
logger = logging.getLogger(__name__)
PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
# borrowed from modulestore/parsers.py:
ALLOWED_ID_CHARS = r'[a-zA-Z0-9_\-~.:]'
PATTERN_COURSEID = re.compile(r'^' + ALLOWED_ID_CHARS + r'+$')
def is_valid_course_id(course_id):
"""
Determines if a course_id from an event log is possibly legitimate.
Applies two tests:
* Course Id can be split into org/coursename/runname using '/' as delimiter.
* Components of id contain only "allowed" characters as defined in modulestore/parsers.py.
Note this will need to be updated as split-mongo changes are rolled out
that permit a broader set of id values.
"""
# TODO: [split-mongo] verify after course_id name changes.
components = course_id.split('/')
if len(components) != 3:
return False
return all(PATTERN_COURSEID.match(component) for component in components)
def decode_json(line):
"""Wrapper to decode JSON string in an implementation-independent way."""
# TODO: Verify correctness of cjson
return cjson.decode(line)
def parse_json_event(line, nested=False):
"""
Parse a tracking log input line as JSON to create a dict representation.
Arguments:
* line: the eventlog text
* nested: boolean flag permitting this to be called recursively.
Apparently some eventlog entries are pure JSON, while others are
JSON that are prepended by a timestamp.
"""
try:
parsed = decode_json(line)
except Exception:
if not nested:
json_match = PATTERN_JSON.match(line)
if json_match:
return parse_json_event(json_match.group(1), nested=True)
# TODO: There are too many to be logged. It might be useful
# at some point to collect stats on the length of truncation
# and the counts for different event "names" (normalized
# event_type values).
# Note that empirically some seem to be truncated in input
# data at 10000 characters, 2043 for others...
return None
# TODO: add basic validation here.
return parsed
# Time-related terminology:
# * datetime: a datetime object.
# * timestamp: a string, with date and time (to millisecond), in ISO format.
# * datestamp: a string with only date information, in ISO format.
def datetime_to_timestamp(datetime_obj):
"""
Returns a string with the datetime value of the provided datetime object.
Note that if the datetime has zero microseconds, the microseconds will not be output.
"""
return datetime_obj.isoformat()
def datetime_to_datestamp(datetime_obj):
"""Returns a string with the date value of the provided datetime object."""
return datetime_obj.strftime('%Y-%m-%d')
def timestamp_to_datestamp(timestamp):
"""Returns a string with the date value of the provided ISO datetime string."""
return timestamp.split('T')[0]
def get_event_time(event):
"""Returns a datetime object from an event object, if present."""
try:
# Get entry, and strip off time zone information. Keep microseconds, if any.
raw_timestamp = event['time']
timestamp = raw_timestamp.split('+')[0]
if '.' not in timestamp:
timestamp = '{datetime}.000000'.format(datetime=timestamp)
return datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%f')
except Exception:
return None
def get_event_data(event):
"""
Returns event data from an event log entry as a dict object.
Returns None if not found.
"""
event_value = event.get('event')
if event_value is None:
logger.error("encountered event with missing event value: %s", event)
return None
if isinstance(event_value, basestring):
# If the value is a string, try to parse as JSON into a dict.
try:
event_value = decode_json(event_value)
except Exception:
logger.error("encountered event with unparsable event value: %s", event)
return None
if isinstance(event_value, dict):
# It's fine, just return.
return event_value
else:
logger.error("encountered event data with unrecognized type: %s", event)
return None
"""
Tests for utilities that parse event logs.
"""
import unittest
import edx.analytics.util.eventlog as eventlog
class CourseIdTest(unittest.TestCase):
"""
Verify that course_id filtering works correctly.
"""
def test_normal_course_id(self):
course_id = "org/course_id/course_run"
self.assertTrue(eventlog.is_valid_course_id(course_id))
def test_course_id_without_components(self):
course_id = "org:course_id:course_run"
self.assertFalse(eventlog.is_valid_course_id(course_id))
def test_course_id_with_nonascii(self):
course_id = u"org/course\ufffd_id/course_run"
self.assertFalse(eventlog.is_valid_course_id(course_id))
class ParseEventLogTest(unittest.TestCase):
"""
Verify that event log parsing works correctly.
"""
def test_parse_valid_json_event(self):
line = '{"username": "successful"}'
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
def test_parse_json_event_truncated(self):
line = '{"username": "unsuccessful'
result = eventlog.parse_json_event(line)
self.assertIsNone(result)
def test_parse_json_event_with_cruft(self):
line = 'leading cruft here {"username": "successful"} '
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
def test_parse_json_event_with_nonascii(self):
line = '{"username": "b\ufffdb"}'
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
self.assertEquals(result['username'], u'b\ufffdb')
class TimestampTest(unittest.TestCase):
"""Verify timestamp-related functions."""
def test_datestamp_from_timestamp(self):
timestamp = "2013-12-17T15:38:32.805444"
self.assertEquals(eventlog.timestamp_to_datestamp(timestamp), "2013-12-17")
def test_missing_datetime(self):
item = {"something else": "not an event"}
self.assertIsNone(eventlog.get_event_time(item))
def test_good_datetime_with_microseconds_and_timezone(self):
item = {"time": "2013-12-17T15:38:32.805444+00:00"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32.805444")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_timezone(self):
item = {"time": "2013-12-17T15:38:32+00:00"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_microseconds(self):
item = {"time": "2013-12-17T15:38:32.805444"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32.805444")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_no_microseconds_or_timezone(self):
item = {"time": "2013-12-17T15:38:32"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
class GetEventDataTest(unittest.TestCase):
"""Verify that get_event_data works as expected."""
def test_missing_event_data(self):
item = {"something else": "not an event"}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_bad_string_event_data(self):
item = {"event": "a string but not JSON"}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_json_string_event_data(self):
item = {"event": '{ "a string": "that is JSON"}'}
self.assertEquals(eventlog.get_event_data(item), {"a string": "that is JSON"})
def test_event_data_with_unknown_type(self):
item = {"event": ["a list", "of strings"]}
self.assertIsNone(eventlog.get_event_data(item))
def test_get_dict_event_data(self):
item = {"event": {"a dict": "that has strings"}}
self.assertEquals(eventlog.get_event_data(item), {"a dict": "that has strings"})
#
# Define logging for use with analytics tasks.
#
# This defines handlers for logging coming from
# edx/analytics code, and from luigi code.
# Luigi messages go to stdout, while edx messages
# are routed to stderr.
[loggers]
keys=root,edx_analytics,luigi_interface
[handlers]
keys=stderrHandler,luigiHandler,localHandler
[formatters]
keys=standard,luigi_default
[logger_root]
level=DEBUG
handlers=localHandler
[logger_edx_analytics]
# Errors from edx/analytics get routed to stderr.
level=WARNING
handlers=stderrHandler
qualname=edx.analytics
propagate=0
[logger_luigi_interface]
# Errors from luigi-interface get routed to stdout.
level=INFO
handlers=luigiHandler
qualname=luigi-interface
propagate=0
[handler_stderrHandler]
class=StreamHandler
formatter=standard
args=(sys.stderr,)
[handler_luigiHandler]
# Define as in luigi/interface.py.
class=StreamHandler
formatter=luigi_default
args=(sys.stdout,)
[handler_localHandler]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
class=logging.handlers.RotatingFileHandler
formatter=standard
args=('edx_analytics.log', 'w')
# 'maxBytes': 1024 * 1024 * 2,
# 'backupCount': 5,
[formatter_standard]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
format=%(asctime)s %(levelname)s %(process)d [%(name)s] %(filename)s:%(lineno)d - %(message)s
[formatter_luigi_default]
# Define as in luigi/interface.py.
format=%(levelname)s: %(message)s
[MASTER]
# Specify a configuration file.
#rcfile=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Profiled execution.
profile=no
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=CVS, migrations
# Pickle collected data for later comparisons.
persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once).
disable=
# Never going to use these
# I0011: Locally disabling W0232
# W0141: Used builtin function 'map'
# W0142: Used * or ** magic
# R0921: Abstract class not referenced
# R0922: Abstract class is only referenced 1 times
I0011,W0141,W0142,R0921,R0922,
# Django makes classes that trigger these
# W0232: Class has no __init__ method
W0232,
# Might use these when the code is in better shape
# C0302: Too many lines in module
# R0201: Method could be a function
# R0901: Too many ancestors
# R0902: Too many instance attributes
# R0903: Too few public methods (1/2)
# R0904: Too many public methods
# R0911: Too many return statements
# R0912: Too many branches
# R0913: Too many arguments
# R0914: Too many local variables
C0302,R0201,R0901,R0902,R0903,R0904,R0911,R0912,R0913,R0914
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Include message's id in output
include-ids=yes
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]".
files-output=no
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Add a comment according to your evaluation note. This is used by the global
# evaluation report (RP0004).
comment=no
[TYPECHECK]
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=SQLObject
# When zope mode is activated, add a predefined set of Zope acquired attributes
# to generated-members.
zope=no
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E0201 when accessed. Python regular
# expressions are accepted.
generated-members=
REQUEST,
acl_users,
aq_parent,
objects,
DoesNotExist,
can_read,
can_write,
get_url,
size,
content,
status_code,
# For factory_boy factories
create
[BASIC]
# Required attributes for module, separated by a comma
required-attributes=
# List of builtins function names that should not be used, separated by a comma
bad-functions=map,filter,apply,input
# Regular expression which should only match correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression which should only match correct module level names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__)|log|urlpatterns)$
# Regular expression which should only match correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Regular expression which should only match correct function names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct method names
method-rgx=([a-z_][a-z0-9_]{2,60}|setUp|set[Uu]pClass|tearDown|tear[Dd]ownClass|assert[A-Z]\w*)$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct list comprehension /
# generator expression variable names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Regular expression which should only match functions or classes name which do
# not require a docstring
no-docstring-rgx=__.*__|test_.*|setUp|tearDown
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=120
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the beginning of the name of dummy variables
# (i.e. not used).
dummy-variables-rgx=_|dummy|unused|.*_unused
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,string,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
[DESIGN]
# Maximum number of arguments for function / method
max-args=5
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of branch for function / method body
max-branchs=12
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
[CLASSES]
# List of interface methods to ignore, separated by a comma. This is used for
# instance to not check methods defines in Zope's Interface base class.
ignore-iface-methods=isImplementedBy,deferred,extends,names,namesAndDescriptions,queryDescriptionFor,getBases,getDescriptionFor,getDoc,getName,getTaggedValue,getTaggedValueTags,isEqualOrExtendedBy,setTaggedValue,isImplementedByInstancesOf,adaptWith,is_implemented_by
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=Exception
......@@ -6,5 +6,6 @@ pbr==0.5.23
stevedore==0.13
tornado==3.1.1
ansible==1.4.4
python-cjson==1.0.5
-e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi
nose
nose==1.3.0
nose-ignore-docstring==0.2
coverage==3.7
pep8==1.4.5
pylint==0.28
diff-cover >= 0.2.1
mock==1.0.1
......@@ -19,8 +19,10 @@ data_files =
console_scripts =
launch-task = edx.analytics.tasks.main:main
remote-task = edx.analytics.tasks.remote:main
edx.analytics.tasks =
s3-copy = edx.analytics.tasks.s3:S3Copy
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment