Commit 126ae6f7 by Brian Wilson

Define logging via config file.

Also other refactoring based on PR feedback.
parent f724d3de
[pep8]
ignore=E501
\ No newline at end of file
ignore=E501
max_line_length=119
......@@ -13,6 +13,7 @@ test-requirements: requirements
pip install -r requirements/test.txt
test: test-requirements
# TODO: when we have better coverage, modify this to actually fail when coverage is too low.
rm -rf .coverage
python -m coverage run --rcfile=./.coveragerc `which nosetests`
......
[core]
logging_conf_file=logging.cfg
[event-logs]
source = s3://edx-all-tracking-logs
destination = s3://edx-analytics-events/raw/by-server
include = prod-edx-*/tracking.log-*.gz
prod-edxapp-*/tracking.log-*.gz
\ No newline at end of file
prod-edxapp-*/tracking.log-*.gz
"""
Luigi tasks for extracting course enrollment statistics from tracking log files.
Example command lines:
(local)
python course_enroll.py --local-scheduler CourseEnrollmentTotalsPerDay
--name mytest --src input --include 'tracking*' --include '2012*'
--dest output7
(local using s3)
python course_enroll.py --local-scheduler CourseEnrollmentTotalsPerDay
--name mytest --src s3://edx-analytics-test-data/data --include 'tracking*'
--dest s3://edx-analytics-scratch/output
"""
import sys
import luigi
import luigi.hadoop
import luigi.s3
......@@ -29,6 +12,8 @@ from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask
import logging
logger = logging.getLogger(__name__)
UNENROLLED = -1
ENROLLED = 1
################################
# Task Map-Reduce definitions
......@@ -43,18 +28,16 @@ class CourseEnrollmentEventsPerDayMixin(object):
Generates output values for explicit enrollment events.
Args:
line: text line from a tracking event log.
line: text line from a tracking event log.
Yields:
(course_id, user_id), (timestamp, action_value)
(course_id, user_id), (timestamp, action_value)
where `timestamp` is in ISO format, with resolution to the millisecond
and `action_value` = 1 (enrolled) or -1 (unenrolled).
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05.123456, 1)
"""
parsed_tuple_or_none = get_explicit_enrollment_output(line)
if parsed_tuple_or_none is not None:
......@@ -65,13 +48,11 @@ class CourseEnrollmentEventsPerDayMixin(object):
Calculate status for each user on the end of each day where they changed their status.
Args:
key: (course_id, user_id) tuple
value: (timestamp, action_value) tuple
key: (course_id, user_id) tuple
values: iterator of (timestamp, action_value) tuples
Yields:
(course_id, datestamp), enrollment_change
(course_id, datestamp), enrollment_change
where `datestamp` is in ISO format, with resolution to the day
and `enrollment_change` is the change on that date for an individual user.
......@@ -83,20 +64,29 @@ class CourseEnrollmentEventsPerDayMixin(object):
Note that we don't bother to actually output the user_id,
since it's not needed downstream (though it might be sometimes useful
for debugging).
for debugging).
Example:
Also note that this can be greatly simplified if we have
confidence that there are no duplicate enrollment events. Then
we could just have the mapper generate output with the key as
(course_id, user_id, datestamp), with a value of just the
action. Then the reducer just sums the actions for the date,
and yields those days with non-zero values.
Example output:
(edX/DemoX/Demo_Course, 2013-09-10), 1
(edX/DemoX/Demo_Course, 2013-09-12), -1
"""
# sys.stderr.write("Found key in reducer: " + str(key) + '\n')
logger.debug("Found key in reducer: %s", key)
course_id, user_id = key
# Sort input values (by timestamp) to easily detect the end of a day.
# Note that this assumes the timestamp values (strings) are in ISO
# representation, so that the tuples will be ordered in ascending time value.
sorted_values = sorted(values)
if len(sorted_values) <= 0:
return
# Convert timestamps to dates, so we can group them by day.
func = eventlog.timestamp_to_datestamp
......@@ -105,35 +95,28 @@ class CourseEnrollmentEventsPerDayMixin(object):
# Add a stop item to ensure we process the last entry.
values = values + [(None, None)]
# The enrollment state for each student: {1 : enrolled, -1: unenrolled}
# Assume students start in an unknown state, so that whatever happens on
# the first day will get output.
state, prev_state = 0, 0
# Remove the first action and use it to initialize the state machine.
first_date, first_enrollment_status = values.pop(0)
last_reported_enrollment_status = UNENROLLED if first_enrollment_status == ENROLLED else ENROLLED
enrollment_status = first_enrollment_status
prev_date = first_date
prev_date = None
for (this_date, action) in values:
for (this_date, new_enrollment_status) in values:
# Before we process a new date, report the state if it has
# changed from the previously reported, if any.
if this_date != prev_date and prev_date is not None:
if state != prev_state:
# sys.stderr.write("outputting date and value: " + str(prev_date) + " " + str(state) + '\n')
prev_state = state
yield (course_id, prev_date), state
if this_date != prev_date and enrollment_status != last_reported_enrollment_status:
logger.debug("outputting date and value: %s %s", prev_date, enrollment_status)
last_reported_enrollment_status = enrollment_status
yield (course_id, prev_date), enrollment_status
# sys.stderr.write("accumulating date and value: " + str(this_date) + " " + str(action) + '\n')
logger.debug("accumulating date and value: %s %s", this_date, new_enrollment_status)
# Consecutive changes of the same kind don't affect the state.
if action != state:
state = action
if new_enrollment_status != enrollment_status:
enrollment_status = new_enrollment_status
else:
sys.stderr.write("WARNING: duplicate enrollment event {action} "
logger.warning("WARNING: duplicate enrollment event {status} "
"for user_id {user_id} in course {course_id} on {date}".format(
action=action, user_id=user_id, course_id=course_id, date=this_date))
# If this is the first entry, then we need to infer what
# the previous state was before the first entry arrives.
# For this, we take the opposite of the first entry.
if prev_date is None:
prev_state = -1 if action == 1 else 1
status=new_enrollment_status, user_id=user_id, course_id=course_id, date=this_date))
prev_date = this_date
......@@ -160,20 +143,19 @@ class CourseEnrollmentChangesPerDayMixin(object):
Reducer: sums enrollments for a given course on a particular date.
Args:
key: (course_id, datestamp) tuple
values: iterator of enrollment_changes
(course_id, datestamp), enrollment_changes
Input `enrollment_changes` are the enrollment changes on a day due to a specific user.
Each user with a change has a separate input, either -1 (unenroll) or 1 (enroll).
Input `enrollment_changes` are the enrollment changes on a day due to a specific user.
Each user with a change has a separate input, either -1 (unenroll) or 1 (enroll).
Yields:
(course_id, datestamp), enrollment_change
(course_id, datestamp), enrollment_change
Output `enrollment_change` is summed across all users, one output per course.
Output `enrollment_change` is summed across all users, one output per course.
"""
# sys.stderr.write("Found key in second reducer: " + str(key) + '\n')
logger.debug("Found key in second reducer: %s", key)
count = sum(int(v) for v in values)
yield key, count
......@@ -187,7 +169,6 @@ class BaseCourseEnrollmentTask(luigi.hadoop.JobTask):
Base class for course enrollment calculations.
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
......@@ -220,7 +201,6 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse
return PathSetTask(self.src, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
......@@ -232,86 +212,12 @@ class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCour
return CourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
class FirstCourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask):
"""Calculate number of "first" course enrollments per-user, per-course, per-day."""
def requires(self):
return PathSetTask(self.src, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'first_course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
def mapper(self, line):
"""
Generates output values for explicit enrollment events.
Args:
line: text line from a tracking event log.
Yields:
(course_id, user_id), (timestamp, action_value)
where action_value = 1 (enrolled) or -1 (unenrolled)
and timestamp is in ISO format, with resolution to the millisecond.
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
"""
parsed_tuple_or_none = get_explicit_enrollment_output(line)
if parsed_tuple_or_none is not None:
yield parsed_tuple_or_none
def reducer(self, key, values):
"""
Calculate first time each user enrolls in a course.
Output key: (course_id, date)
Output value: 1 on the first date the user enrolls.
Note that we don't bother to actually output the user_id,
since it's not needed downstream.
Example:
edX/DemoX/Demo_Course 2013-09-10 1
"""
# sys.stderr.write("Found key in reducer: " + str(key) + '\n')
course_id, _user_id = key
sorted_values = sorted(values)
for (timestamp, change_value) in sorted_values:
# get the day's date from the event's timestamp:
this_date = eventlog.get_datestamp_from_timestamp(timestamp)
# if it's an enrollment, output it and we're done.
if change_value > 0:
yield (course_id, this_date), change_value
return
class FirstCourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
"""Calculate changes in "first" course enrollments per-course, per-day."""
def requires(self):
return FirstCourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
def output(self):
# generate a single output file
output_name = 'first_course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
################################
# Helper methods
################################
def get_explicit_enrollment_output(line):
......@@ -332,7 +238,7 @@ def get_explicit_enrollment_output(line):
or None if there is no valid enrollment event on the line.
Example:
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05, 1)
(edX/DemoX/Demo_Course, dummy_userid), (2013-09-10T00:01:05.123456, 1)
"""
# Before parsing, check that the line contains something that
......@@ -341,32 +247,32 @@ def get_explicit_enrollment_output(line):
return None
# try to parse the line into a dict:
item = eventlog.parse_eventlog_item(line)
if item is None:
event = eventlog.parse_json_event(line)
if event is None:
# The line didn't parse. For this specific purpose,
# we can assume that all enrollment-related lines would parse,
# and these non-parsing lines would get skipped anyway.
return None
# get event type, and check that it exists:
event_type = item.get('event_type')
event_type = event.get('event_type')
if event_type is None:
eventlog.log_item("encountered event with no event_type", item)
logger.error("encountered event with no event_type: %s", event)
return None
# convert the type to a value:
if event_type == 'edx.course.enrollment.activated':
action_value = 1
action_value = ENROLLED
elif event_type == 'edx.course.enrollment.deactivated':
action_value = -1
action_value = UNENROLLED
else:
# not an enrollment event...
return None
# get the timestamp:
datetime = eventlog.get_event_time(item)
datetime = eventlog.get_event_time(event)
if datetime is None:
eventlog.log_item("encountered event with bad datetime", item)
logger.error("encountered event with bad datetime: %s", event)
return None
timestamp = eventlog.datetime_to_timestamp(datetime)
......@@ -376,7 +282,7 @@ def get_explicit_enrollment_output(line):
# `course_id` should be the same in `context` as in `data`.)
# Get the event data:
event_data = eventlog.get_event_data(item)
event_data = eventlog.get_event_data(event)
if event_data is None:
# Assume it's already logged (and with more specifics).
return None
......@@ -384,29 +290,15 @@ def get_explicit_enrollment_output(line):
# Get the course_id from the data, and validate.
course_id = event_data['course_id']
if not eventlog.is_valid_course_id(course_id):
eventlog.log_item("encountered explicit enrollment event with bogus course_id", item)
logger.error("encountered explicit enrollment event with bogus course_id: %s", event)
return None
# Get the user_id from the data:
user_id = event_data.get('user_id')
if user_id is None:
eventlog.log_item("encountered explicit enrollment event with no user_id", item)
logger.error("encountered explicit enrollment event with no user_id: %s", event)
return None
# For now, ignore the enrollment 'mode' (e.g. 'honor').
return (course_id, user_id), (timestamp, action_value)
################################
# Running tasks
################################
def main():
"""Mainline for command-line testing."""
luigi.run()
if __name__ == '__main__':
main()
"""
Main method for running tasks.
Invoke a task by running `launch-task` with task's classname and
arguments for Luigi and for the task. Use `remote-task` to run
to submit the task to run on an EMR cluster.
Example command lines for various tasks:
* CourseEnrollmentChangesPerDay:
launch-task --local-scheduler CourseEnrollmentChangesPerDay
--name mytest --src input --include 'tracking*' --include '2012*'
--dest output7
remote-task --job-flow-id <job-id> --branch <branch-name> --remote-name run-20140204
--local-scheduler CourseEnrollmentChangesPerDay
--name run-20140204 --src s3://edx-all-tracking-logs --include 'prod-edx*/tracking.*-201312*.gz'
--include 'prod-edx*/tracking.*-2014*.gz' --dest s3://edx-analytics-scratch/output
"""
import os.path
import logging
......@@ -26,7 +48,7 @@ def main():
configuration.add_config_path(DEFAULT_CONFIGURATION_FILE)
if not os.path.isfile(DEFAULT_CONFIGURATION_FILE):
log.warning('Default configuration file not found:', DEFAULT_CONFIGURATION_FILE)
log.warning('Default configuration file not found: %s', DEFAULT_CONFIGURATION_FILE)
# Tell luigi what dependencies to pass to the Hadoop nodes
# - argparse is not included by default in python 2.6
......
......@@ -9,15 +9,13 @@ Supports outputs to HDFS, S3, and local FS.
import os
import boto
import glob
from urlparse import urlparse
from fnmatch import fnmatch
import luigi
import luigi.s3
import luigi.hdfs
import luigi.format
from s3_util import join_as_s3_url, generate_s3_sources
from edx.analytics.tasks.s3_util import join_as_s3_url, generate_s3_sources
class LocalPathTask(luigi.ExternalTask):
......
......@@ -19,7 +19,7 @@ def main():
change_directory_to_ansible_script_home()
extra_vars = convert_cli_arguments_to_ansible_extra_vars(arguments)
run_ansible_playbook(arguments.verbose, extra_vars)
......@@ -43,7 +43,7 @@ def convert_cli_arguments_to_ansible_extra_vars(arguments):
def run_ansible_playbook(verbose, extra_vars):
ansible_playbook_path = os.path.join(sys.prefix, 'bin', 'ansible-playbook')
command = [
ansible_playbook_path, '-i', 'ec2.py', 'task.yml', '-e', extra_vars
ansible_playbook_path, '-i', 'ec2.py', 'task.yml', '-e', extra_vars
]
if verbose:
command.append('-vvvv')
......
......@@ -4,7 +4,8 @@ import boto
import luigi
import luigi.s3
from s3_util import join_as_s3_url, get_s3_bucket_key_names, generate_s3_sources, get_s3_key
from edx.analytics.tasks.s3_util import join_as_s3_url, get_s3_bucket_key_names, generate_s3_sources, get_s3_key
class S3Copy(luigi.Task):
"""
......@@ -113,4 +114,3 @@ class S3Sync(luigi.Task):
def output(self):
for task in self.requires():
yield task.output()
......@@ -5,8 +5,6 @@ Utility methods for interacting with S3 via boto.
from fnmatch import fnmatch
from urlparse import urlparse
import boto
def get_s3_bucket_key_names(url):
"""Extract the bucket and key names from a S3 URL"""
......@@ -18,6 +16,7 @@ def join_as_s3_url(bucket, root, path):
"""Combine bucket name, root path and relative path into a S3 URL"""
return 's3://{0}/{1}/{2}'.format(bucket, root, path)
def get_s3_key(s3_conn, url):
"""Returns an S3 key for use in further boto actions."""
bucket_name, key_name = get_s3_bucket_key_names(url)
......@@ -25,6 +24,7 @@ def get_s3_key(s3_conn, url):
key = bucket.get_key(key_name)
return key
def generate_s3_sources(s3_conn, source, patterns):
"""
Returns a list of S3 sources that match filters.
......@@ -61,8 +61,8 @@ def generate_s3_sources(s3_conn, source, patterns):
return ((bucket.name, root, path) for path in paths)
def _filter_matches(patterns, names):
"""Return only key names that match any of the include patterns."""
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
func = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if func(n))
......@@ -19,7 +19,7 @@ class CourseEnrollEventMapTest(unittest.TestCase):
self.task = CourseEnrollmentEventsPerDayMixin()
self.course_id = "MITx/8.02x/2013_Spring"
self.user_id = 21
self.timestamp = "2013-12-17T15:38:32"
self.timestamp = "2013-12-17T15:38:32.805444"
def _create_event_log_line(self, **kwargs):
"""Create an event log with test values, as a JSON string."""
......@@ -39,7 +39,7 @@ class CourseEnrollEventMapTest(unittest.TestCase):
"org_id": org_id,
"user_id": self.user_id,
},
"time": "{}.805444+00:00".format(self.timestamp),
"time": "{}+00:00".format(self.timestamp),
"ip": "127.0.0.1",
"event": {
"course_id": self.course_id,
......@@ -53,7 +53,8 @@ class CourseEnrollEventMapTest(unittest.TestCase):
return event_dict
def assert_no_output_for(self, line):
self.assertEquals(list(self.task.mapper(line)), [])
"""Assert that an input line generates no output."""
self.assertEquals(tuple(self.task.mapper(line)), tuple())
def test_non_enrollment_event(self):
line = 'this is garbage'
......@@ -91,14 +92,14 @@ class CourseEnrollEventMapTest(unittest.TestCase):
def test_good_enroll_event(self):
line = self._create_event_log_line()
event = list(self.task.mapper(line))
expected = [((self.course_id, self.user_id), (self.timestamp, 1))]
event = tuple(self.task.mapper(line))
expected = (((self.course_id, self.user_id), (self.timestamp, 1)),)
self.assertEquals(event, expected)
def test_good_unenroll_event(self):
line = self._create_event_log_line(event_type='edx.course.enrollment.deactivated')
event = list(self.task.mapper(line))
expected = [((self.course_id, self.user_id), (self.timestamp, -1))]
event = tuple(self.task.mapper(line))
expected = (((self.course_id, self.user_id), (self.timestamp, -1)),)
self.assertEquals(event, expected)
......@@ -112,127 +113,114 @@ class CourseEnrollEventReduceTest(unittest.TestCase):
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
return tuple(self.task.reducer(self.key, values))
def _check_output(self, inputs, expected):
"""Compare generated with expected output."""
self.assertEquals(self._get_reducer_output(inputs), expected)
def test_no_events(self):
self.assertEquals(self._get_reducer_output([]), [])
self._check_output([], tuple())
def test_single_enrollment(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
]),
[
(('course', '2013-01-01'), 1),
])
inputs = [('2013-01-01T00:00:01', 1), ]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_single_unenrollment(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', -1),
]),
[
(('course', '2013-01-01'), -1),
])
inputs = [('2013-01-01T00:00:01', -1), ]
expected = ((('course', '2013-01-01'), -1),)
self._check_output(inputs, expected)
def test_multiple_events_on_same_day(self):
# run first with no output expected:
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', -1),
]),
[
])
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', -1),
]
expected = tuple()
self._check_output(inputs, expected)
# then run with output expected:
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', 1),
]),
[
(('course', '2013-01-01'), 1),
])
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_events_out_of_order(self):
# Make sure that events are sorted by the reducer.
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:04', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
]),
[
])
inputs = [
('2013-01-01T00:00:04', -1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', -1),
]
expected = tuple()
self._check_output(inputs, expected)
def test_multiple_enroll_events_on_same_day(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', 1),
]),
[
(('course', '2013-01-01'), 1),
])
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-01T00:00:03', 1),
('2013-01-01T00:00:04', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_unenroll_events_on_same_day(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', -1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', -1),
]),
[
(('course', '2013-01-01'), -1),
])
inputs = [
('2013-01-01T00:00:01', -1),
('2013-01-01T00:00:02', -1),
('2013-01-01T00:00:03', -1),
('2013-01-01T00:00:04', -1),
]
expected = ((('course', '2013-01-01'), -1),)
self._check_output(inputs, expected)
def test_multiple_enroll_events_on_many_days(self):
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-02T00:00:03', 1),
('2013-01-02T00:00:04', 1),
('2013-01-04T00:00:05', 1),
]),
[
(('course', '2013-01-01'), 1),
])
inputs = [
('2013-01-01T00:00:01', 1),
('2013-01-01T00:00:02', 1),
('2013-01-02T00:00:03', 1),
('2013-01-02T00:00:04', 1),
('2013-01-04T00:00:05', 1),
]
expected = ((('course', '2013-01-01'), 1),)
self._check_output(inputs, expected)
def test_multiple_events_on_many_days(self):
# Run with an arbitrary list of events.
self.assertEquals(self._get_reducer_output(
[
('2013-01-01T1', 1),
('2013-01-01T2', -1),
('2013-01-01T3', 1),
('2013-01-01T4', -1),
('2013-01-02', 1),
('2013-01-03', 1),
('2013-01-04T1', 1),
('2013-01-04T2', -1),
('2013-01-05', -1),
('2013-01-06', -1),
('2013-01-07', 1),
('2013-01-08T1', 1),
('2013-01-08T2', 1),
('2013-01-09T1', -1),
('2013-01-09T2', -1),
]),
[
(('course', '2013-01-02'), 1),
(('course', '2013-01-04'), -1),
(('course', '2013-01-07'), 1),
(('course', '2013-01-09'), -1),
])
inputs = [
('2013-01-01T1', 1),
('2013-01-01T2', -1),
('2013-01-01T3', 1),
('2013-01-01T4', -1),
('2013-01-02', 1),
('2013-01-03', 1),
('2013-01-04T1', 1),
('2013-01-04T2', -1),
('2013-01-05', -1),
('2013-01-06', -1),
('2013-01-07', 1),
('2013-01-08T1', 1),
('2013-01-08T2', 1),
('2013-01-09T1', -1),
('2013-01-09T2', -1),
]
expected = (
(('course', '2013-01-02'), 1),
(('course', '2013-01-04'), -1),
(('course', '2013-01-07'), 1),
(('course', '2013-01-09'), -1),
)
self._check_output(inputs, expected)
class CourseEnrollChangesReduceTest(unittest.TestCase):
......@@ -245,14 +233,14 @@ class CourseEnrollChangesReduceTest(unittest.TestCase):
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return list(self.task.reducer(self.key, values))
return tuple(self.task.reducer(self.key, values))
def test_no_user_counts(self):
self.assertEquals(self._get_reducer_output([]), [(self.key, 0)])
self.assertEquals(self._get_reducer_output([]), ((self.key, 0),))
def test_single_user_count(self):
self.assertEquals(self._get_reducer_output([1]), [(self.key, 1)])
self.assertEquals(self._get_reducer_output([1]), ((self.key, 1),))
def test_multiple_user_count(self):
inputs = [1, 1, 1, -1, 1]
self.assertEquals(self._get_reducer_output(inputs), [(self.key, 3)])
self.assertEquals(self._get_reducer_output(inputs), ((self.key, 3),))
"""Support for reading tracking event logs."""
import sys
import cjson
import datetime
import re
import logging
logger = logging.getLogger(__name__)
PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
......@@ -32,13 +33,13 @@ def is_valid_course_id(course_id):
return all(PATTERN_COURSEID.match(component) for component in components)
def json_decode(line):
def decode_json(line):
"""Wrapper to decode JSON string in an implementation-independent way."""
# TODO: Verify correctness of cjson
return cjson.decode(line)
def parse_eventlog_item(line, nested=False):
def parse_json_event(line, nested=False):
"""
Parse a tracking log input line as JSON to create a dict representation.
......@@ -50,12 +51,12 @@ def parse_eventlog_item(line, nested=False):
JSON that are prepended by a timestamp.
"""
try:
parsed = json_decode(line)
parsed = decode_json(line)
except Exception:
if not nested:
json_match = PATTERN_JSON.match(line)
if json_match:
return parse_eventlog_item(json_match.group(1), nested=True)
return parse_json_event(json_match.group(1), nested=True)
# TODO: There are too many to be logged. It might be useful
# at some point to collect stats on the length of truncation
......@@ -65,23 +66,24 @@ def parse_eventlog_item(line, nested=False):
# Note that empirically some seem to be truncated in input
# data at 10000 characters, 2043 for others...
return None
return parsed
# TODO: add basic validation here.
def log_item(msg, item, level='ERROR'):
"""Writes a message about an eventlog item."""
# TODO: replace this with real logging.
sys.stderr.write("{level}: {msg}: {item}\n".format(msg=msg, item=item, level=level))
return parsed
# Time-related terminology:
# * datetime: a datetime object.
# * timestamp: a string, with date and time (to second), in ISO format.
# * timestamp: a string, with date and time (to millisecond), in ISO format.
# * datestamp: a string with only date information, in ISO format.
def datetime_to_timestamp(datetime_obj):
"""Returns a string with the datetime value of the provided datetime object."""
return datetime_obj.strftime('%Y-%m-%dT%H:%M:%S')
"""
Returns a string with the datetime value of the provided datetime object.
Note that if the datetime has zero microseconds, the microseconds will not be output.
"""
return datetime_obj.isoformat()
def datetime_to_datestamp(datetime_obj):
......@@ -94,39 +96,42 @@ def timestamp_to_datestamp(timestamp):
return timestamp.split('T')[0]
def get_event_time(item):
"""Returns a datetime object from an event item, if present."""
def get_event_time(event):
"""Returns a datetime object from an event object, if present."""
try:
timestamp = item['time']
removed_ms = timestamp.split('.')[0]
return datetime.datetime.strptime(removed_ms, '%Y-%m-%dT%H:%M:%S')
# Get entry, and strip off time zone information. Keep microseconds, if any.
raw_timestamp = event['time']
timestamp = raw_timestamp.split('+')[0]
if '.' not in timestamp:
timestamp = '{datetime}.000000'.format(datetime=timestamp)
return datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%f')
except Exception:
return None
def get_event_data(item):
def get_event_data(event):
"""
Returns event data from an event log item as a dict object.
Returns event data from an event log entry as a dict object.
Returns None if not found.
"""
event_value = item.get('event')
event_value = event.get('event')
if event_value is None:
log_item("encountered event with missing event value", item)
logger.error("encountered event with missing event value: %s", event)
return None
if isinstance(event_value, basestring):
# If the value is a string, try to parse as JSON into a dict.
try:
event_value = json_decode(event_value)
event_value = decode_json(event_value)
except Exception:
log_item("encountered event with unparsable event value", item)
logger.error("encountered event with unparsable event value: %s", event)
return None
if isinstance(event_value, dict):
# It's fine, just return.
return event_value
else:
log_item("encountered event data with unrecognized type", item)
logger.error("encountered event data with unrecognized type: %s", event)
return None
......@@ -31,24 +31,24 @@ class ParseEventLogTest(unittest.TestCase):
Verify that event log parsing works correctly.
"""
def test_parse_valid_eventlog_item(self):
def test_parse_valid_json_event(self):
line = '{"username": "successful"}'
result = eventlog.parse_eventlog_item(line)
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
def test_parse_eventlog_item_truncated(self):
def test_parse_json_event_truncated(self):
line = '{"username": "unsuccessful'
result = eventlog.parse_eventlog_item(line)
result = eventlog.parse_json_event(line)
self.assertIsNone(result)
def test_parse_eventlog_item_with_cruft(self):
def test_parse_json_event_with_cruft(self):
line = 'leading cruft here {"username": "successful"} '
result = eventlog.parse_eventlog_item(line)
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
def test_parse_eventlog_item_with_nonascii(self):
def test_parse_json_event_with_nonascii(self):
line = '{"username": "b\ufffdb"}'
result = eventlog.parse_eventlog_item(line)
result = eventlog.parse_json_event(line)
self.assertTrue(isinstance(result, dict))
self.assertEquals(result['username'], u'b\ufffdb')
......@@ -57,16 +57,38 @@ class TimestampTest(unittest.TestCase):
"""Verify timestamp-related functions."""
def test_datestamp_from_timestamp(self):
timestamp = "2013-12-17T15:38:32"
timestamp = "2013-12-17T15:38:32.805444"
self.assertEquals(eventlog.timestamp_to_datestamp(timestamp), "2013-12-17")
def test_missing_datetime(self):
item = {"something else": "not an event"}
self.assertIsNone(eventlog.get_event_time(item))
def test_good_datetime(self):
def test_good_datetime_with_microseconds_and_timezone(self):
item = {"time": "2013-12-17T15:38:32.805444+00:00"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32.805444")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_timezone(self):
item = {"time": "2013-12-17T15:38:32+00:00"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_microseconds(self):
item = {"time": "2013-12-17T15:38:32.805444"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32.805444")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
def test_good_datetime_with_no_microseconds_or_timezone(self):
item = {"time": "2013-12-17T15:38:32"}
dt_value = eventlog.get_event_time(item)
self.assertIsNotNone(dt_value)
self.assertEquals(eventlog.datetime_to_timestamp(dt_value), "2013-12-17T15:38:32")
self.assertEquals(eventlog.datetime_to_datestamp(dt_value), "2013-12-17")
......
#
# Define logging for use with analytics tasks.
#
# This defines handlers for logging coming from
# edx/analytics code, and from luigi code.
# Luigi messages go to stdout, while edx messages
# are routed to stderr.
[loggers]
keys=root,edx_analytics,luigi_interface
[handlers]
keys=stderrHandler,luigiHandler,localHandler
[formatters]
keys=standard,luigi_default
[logger_root]
level=DEBUG
handlers=localHandler
[logger_edx_analytics]
# Errors from edx/analytics get routed to stderr.
level=WARNING
handlers=stderrHandler
qualname=edx.analytics
propagate=0
[logger_luigi_interface]
# Errors from luigi-interface get routed to stdout.
level=INFO
handlers=luigiHandler
qualname=luigi-interface
propagate=0
[handler_stderrHandler]
class=StreamHandler
formatter=standard
args=(sys.stderr,)
[handler_luigiHandler]
# Define as in luigi/interface.py.
class=StreamHandler
formatter=luigi_default
args=(sys.stdout,)
[handler_localHandler]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
class=logging.handlers.RotatingFileHandler
formatter=standard
args=('edx_analytics.log', 'w')
# 'maxBytes': 1024 * 1024 * 2,
# 'backupCount': 5,
[formatter_standard]
# Define as in edx-platform/common/lib/logsettings.py (for dev logging, not syslog).
format=%(asctime)s %(levelname)s %(process)d [%(name)s] %(filename)s:%(lineno)d - %(message)s
[formatter_luigi_default]
# Define as in luigi/interface.py.
format=%(levelname)s: %(message)s
......@@ -25,4 +25,4 @@ edx.analytics.tasks =
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentTotalsPerDay
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment