Commit a5040553 by Carlos Andrés Rocha

Merge pull request #7 from rocha/enrollments-report

Enrollments CSV Report
parents 5a2313f6 9cce0818
"""Enrollment related reports"""
import csv
from datetime import timedelta
import luigi
import luigi.hdfs
import numpy
import pandas
class EnrollmentsByWeek(luigi.Task):
"""Calculates cumulative enrollments per week per course.
Parameters:
source: Location of daily enrollments per date. The format is a hadoop
tsv file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is a
excel csv file with course_id and one column per requested week.
offsets: Location of seed values for each course. The format is a
hadoop tsv file, with fields course_id, date and offset.
date: End date of the last week requested.
weeks: Number of weeks from the end date to request.
Output:
Excel CSV file with one row per course. The columns are
the cumulative enrollments counts for each week requested.
"""
source = luigi.Parameter()
destination = luigi.Parameter()
offsets = luigi.Parameter(default=None)
date = luigi.DateParameter()
weeks = luigi.IntParameter(default=10)
def requires(self):
results = {'source': ExternalURL(self.source)}
if self.offsets:
results.update({'offsets': ExternalURL(self.offsets)})
return results
def output(self):
return get_target_from_url(self.destination)
def run(self):
# Load the data into a pandas dataframe
count_by_day = self.read_source()
offsets = self.read_offsets()
if offsets is not None:
self.include_offsets(count_by_day, offsets)
cumulative_by_week = self.accumulate(count_by_day)
with self.output().open('w') as output_file:
self.save_output(cumulative_by_week, output_file)
def read_source(self):
"""
Read source into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the source data.
"""
with self.input()['source'].open('r') as input_file:
data = self.read_tsv(input_file)
# Reorganize the data. One column per course_id, with
# shared date index.
data = data.pivot(index='date',
columns='course_id',
values='count')
# Complete the range of data to include all days between
# the dates of the first and last events.
date_range = pandas.date_range(min(data.index), max(data.index))
data = data.reindex(date_range)
data = data.fillna(0)
return data
def read_offsets(self):
"""
Read offsets into a pandas DataFrame.
Returns:
Pandas dataframe with one row per course_id and
columns for the date and count of the offset.
Returns None if no offset was specified.
"""
data = None
if self.input().get('offsets'):
with self.input()['offsets'].open('r') as offset_file:
data = self.read_tsv(offset_file)
return data
def read_tsv(self, input_file):
"""Read hadoop formatted tsv file into a pandas DataFrame."""
names = ['course_id', 'date', 'count']
# Not assuming any encoding, course_id will be read as plain string
data = pandas.read_csv(input_file,
names=names,
quoting=csv.QUOTE_NONE,
encoding=None,
delimiter='\t')
data.date = pandas.to_datetime(data.date)
return data
def include_offsets(self, count_by_day, offsets):
"""
Add offsets to a dataframe inplace.
Args:
count_by_day: Dataframe with format from `read_source`
offsets: Dataframe with format from `read_offsets`.
"""
for n, (course_id, date, count) in offsets.iterrows():
if course_id in count_by_day.columns:
# The offsets are computed to begining of that day. We
# add them to the counts by the end of that day to
# get the correct count for the day.
count_by_day.loc[date, course_id] += count
# Flag values before the offset day with NaN,
# since they are not "available".
not_available = count_by_day.index < date
count_by_day.loc[not_available, course_id] = numpy.NaN
def accumulate(self, count_by_day):
# Calculate the cumulative sum per day of the input.
# Entries with NaN stay NaN.
# At this stage only the data prior to the offset should contain NaN.
cumulative_sum = count_by_day.cumsum()
# List the dates of the last day of each week requested.
start, weeks = self.date, self.weeks
days = [start - timedelta(i*7) for i in reversed(xrange(0, weeks))]
# Sample the cumulative data on the requested days.
# Result is NaN if there is no data available for that date.
results = cumulative_sum.loc[days]
return results
def save_output(self, results, output_file):
# Make a row per course_id
results = results.transpose()
# List of fieldnames for the report
fieldnames = ['course_id'] + list(results.columns)
writer = csv.DictWriter(output_file, fieldnames)
writer.writerow(dict((k, k) for k in fieldnames)) # Write header
def format_counts(counts_dict):
for k, v in counts_dict.iteritems():
yield k, '-' if numpy.isnan(v) else int(v)
for index, series in results.iterrows():
values = {'course_id': index}
by_week_values = format_counts(series.to_dict())
values.update(by_week_values)
writer.writerow(values)
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
url = luigi.Parameter()
def output(self):
return get_target_from_url(self.url)
def get_target_from_url(url):
"""Returns a luigi target based on the url scheme"""
# TODO: Make external utility to resolve target by URL,
# including s3, s3n, etc.
if url.startswith('hdfs://') or url.startswith('s3://'):
if url.endswith('/'):
return luigi.hdfs.HdfsTarget(url, format=luigi.hdfs.PlainDir)
else:
return luigi.hdfs.HdfsTarget(url)
else:
return luigi.LocalTarget(url)
from contextlib import contextmanager
import datetime
import textwrap
from StringIO import StringIO
from unittest import TestCase
import luigi
import luigi.hdfs
from mock import MagicMock
from numpy import isnan
import pandas
from edx.analytics.reports.enrollments import EnrollmentsByWeek
from edx.analytics.reports.enrollments import ExternalURL
class FakeTarget(object):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def __init__(self, value=''):
self.buffer = StringIO(value)
# Rewind the buffer head so the value can be read
self.buffer.seek(0)
@contextmanager
def open(self, *args, **kwargs):
yield self.buffer
# Rewind the head for easy reading
self.buffer.seek(0)
class TestEnrollmentsByWeek(TestCase):
def run_task(self, source, date, weeks, offset=None):
"""
Run task with fake targets.
Returns:
the task output as a pandas dataframe.
"""
parsed_date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
# Make offsets None if it was not specified.
task = EnrollmentsByWeek(source='fake_source',
offsets='fake_offsets' if offset else None,
destination='fake_destination',
date=parsed_date,
weeks=weeks)
# Mock the input and output targets
def reformat(string):
# Reformat string to make it like a hadoop tsv
return textwrap.dedent(string).strip().replace(' ', '\t')
input_targets = {
'source': FakeTarget(reformat(source)),
}
# Mock offsets only if specified.
if offset:
input_targets.update({'offsets': FakeTarget(reformat(offset))})
task.input = MagicMock(return_value=input_targets)
output_target = FakeTarget()
task.output = MagicMock(return_value=output_target)
# Run the task and parse the output into a pandas dataframe
task.run()
data = output_target.buffer.read()
result = pandas.read_csv(StringIO(data),
na_values=['-'],
index_col='course_id')
return result
def test_parse_source(self):
source = """
course_1 2013-01-01 10
course_1 2013-01-02 10
course_1 2013-01-03 10
course_1 2013-01-09 10
course_1 2013-01-17 10
course_2 2013-01-01 10
course_3 2013-01-01 10
"""
res = self.run_task(source, '2013-01-17', 3)
self.assertEqual(set(['course_1', 'course_2', 'course_3']),
set(res.index))
self.assertEqual(res.loc['course_1']['2013-01-03'], 30)
self.assertEqual(res.loc['course_1']['2013-01-10'], 40)
self.assertEqual(res.loc['course_1']['2013-01-17'], 50)
self.assertEqual(res.loc['course_2']['2013-01-03'], 10)
self.assertEqual(res.loc['course_3']['2013-01-03'], 10)
def test_week_grouping(self):
source = """
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
res = self.run_task(source, '2013-01-21', 4)
weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21'])
self.assertEqual(weeks, set(str(w) for w in res.columns))
course_1 = res.loc['course_1']
self.assertTrue(isnan(course_1['2012-12-31'])) # no data
self.assertEqual(course_1['2013-01-07'], 10)
self.assertEqual(course_1['2013-01-14'], 20)
self.assertTrue(isnan(course_1['2013-01-21'])) # no data
def test_cumulative(self):
source = """
course_1 2013-02-01 4
course_1 2013-02-04 4
course_1 2013-02-08 5
course_1 2013-02-12 -4
course_1 2013-02-16 6
course_1 2013-02-18 6
course_2 2013-02-12 2
course_2 2013-02-14 3
course_2 2013-02-15 -2
"""
res = self.run_task(source, '2013-02-18', 2)
course_1 = res.loc['course_1']
self.assertEqual(course_1['2013-02-11'], 13)
self.assertEqual(course_1['2013-02-18'], 21)
course_2 = res.loc['course_2']
self.assertEqual(course_2['2013-02-11'], 0)
self.assertEqual(course_2['2013-02-18'], 3)
def test_offsets(self):
source = """
course_1 2013-03-01 1
course_1 2013-03-30 2
course_2 2013-03-07 1
course_2 2013-03-08 1
course_2 2013-03-10 1
course_2 2013-03-13 1
course_3 2013-03-15 1
course_3 2013-03-18 1
course_3 2013-03-19 1
"""
offset = """
course_2 2013-03-07 8
course_3 2013-03-15 6
"""
res = self.run_task(source, '2013-03-28', 4, offset=offset)
course_2 = res.loc['course_2']
self.assertEqual(course_2['2013-03-07'], 9)
self.assertEqual(course_2['2013-03-14'], 12)
course_3 = res.loc['course_3']
self.assertTrue(isnan(course_3['2013-03-07'])) # no data
self.assertTrue(isnan(course_3['2013-03-14'])) # no data
self.assertEqual(course_3['2013-03-21'], 9)
def test_unicode(self):
course_id = u'course_\u2603'
source = u"""
{course_id} 2013-04-01 1
{course_id} 2013-04-02 1
""".format(course_id=course_id)
res = self.run_task(source.encode('utf-8'), '2013-04-02', 1)
self.assertEqual(res.loc[course_id.encode('utf-8')]['2013-04-02'], 2)
def test_task_urls(self):
date = datetime.date(2013, 01, 20)
task = EnrollmentsByWeek(source='s3://bucket/path/',
offsets='s3://bucket/file.txt',
destination='file://path/file.txt',
date=date)
requires = task.requires()
source = requires['source'].output()
self.assertIsInstance(source, luigi.hdfs.HdfsTarget)
self.assertEqual(source.format, luigi.hdfs.PlainDir)
offsets = requires['offsets'].output()
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain)
destination = task.output()
self.assertIsInstance(destination, luigi.File)
argparse==1.2.1
boto==2.22.1
numpy==1.8.0
pandas==0.13.0
pbr==0.5.23
stevedore==0.13
tornado==3.1.1
......
......@@ -23,3 +23,4 @@ edx.analytics.tasks =
s3-copy = edx.analytics.tasks.s3:S3Copy
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment