Commit d0307a38 by Brian Wilson Committed by Gerrit Code Review

Merge "Implement skeleton version of Total Enrollment and Users report."

parents 8b555648 2365e97e
...@@ -9,10 +9,117 @@ import luigi.hdfs ...@@ -9,10 +9,117 @@ import luigi.hdfs
import numpy import numpy
import pandas import pandas
from edx.analytics.tasks.util.tsv import read_tsv
from edx.analytics.tasks.url import ExternalURL, get_target_from_url from edx.analytics.tasks.url import ExternalURL, get_target_from_url
class EnrollmentsByWeek(luigi.Task): class CourseEnrollmentCountMixin(object):
"""Provides methods useful for generating reports using course enrollment counts."""
def read_course_date_count_tsv(self, input_file):
"""Read TSV file with hard-coded column names into a pandas DataFrame."""
names = ['course_id', 'date', 'count']
# Not assuming any encoding, course_id will be read as plain string
data = read_tsv(input_file, names)
data.date = pandas.to_datetime(data.date)
return data
def initialize_daily_count(self, course_date_count_data):
"""
Reorganize a course-date-count data table to index by date.
Args:
Pandas dataframe with one row per course_id and
columns for the date and count of the offset.
Returns:
Pandas dataframe with one column per course_id, and
indexed rows for the date. Counts are set to zero for
dates that are missing.
"""
data = course_date_count_data.pivot(
index='date',
columns='course_id',
values='count',
)
# Complete the range of data to include all days between
# the dates of the first and last events.
date_range = pandas.date_range(min(data.index), max(data.index))
data = data.reindex(date_range)
data = data.fillna(0)
return data
def add_offsets_to_daily_count(self, count_by_day, offsets):
"""
Add offsets to a dataframe in-place.
Args:
count_by_day: Pandas dataframe with one column per course_id, and
indexed rows for the date.
offsets: Pandas dataframe with one row per course_id and
columns for the date and count of the offset.
"""
for _, (course_id, date, count) in offsets.iterrows():
if course_id in count_by_day.columns:
# The offsets are computed to beginning of that day. We
# add them to the counts by the end of that day to
# get the correct count for the day.
count_by_day.loc[date, course_id] += count
# Flag values before the offset day with NaN,
# since they are not "available".
not_available = count_by_day.index < date
count_by_day.loc[not_available, course_id] = numpy.NaN
def calculate_total_enrollment(self, count_by_day, offsets=None):
"""
Accumulate enrollment changes per day to find total enrollment per day.
Args:
count_by_day: Pandas dataframe with one column per course_id, and
indexed rows for the date. Counts are net changes in enrollment
during the day for each course.
offsets: Pandas dataframe with one row per course_id and
columns for the date and count of the offset. The offset
for a course is used to provide total enrollment counts
at a point in time right before the timeframe covered by count_by_day.
"""
if offsets is not None:
self.add_offsets_to_daily_count(count_by_day, offsets)
# Calculate the cumulative sum per day of the input.
# Entries with NaN stay NaN.
# At this stage only the data prior to the offset should contain NaN.
cumulative_sum = count_by_day.cumsum()
return cumulative_sum
def select_weekly_values(self, daily_values, start, weeks):
"""
Sample daily values on a weekly basis.
Args:
daily_values: Pandas dataframe with one column per course_id, and
indexed rows for the date.
start: last day to request.
weeks: number of weeks to sample (including the last day)
"""
# List the dates of the last day of each week requested.
days = [start - timedelta(i * 7) for i in reversed(xrange(0, weeks))]
# Sample the cumulative data on the requested days.
# Result is NaN if there is no data available for that date.
results = daily_values.loc[days]
return results
class EnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
"""Calculates cumulative enrollments per week per course. """Calculates cumulative enrollments per week per course.
Parameters: Parameters:
...@@ -51,19 +158,24 @@ class EnrollmentsByWeek(luigi.Task): ...@@ -51,19 +158,24 @@ class EnrollmentsByWeek(luigi.Task):
return get_target_from_url(self.destination) return get_target_from_url(self.destination)
def run(self): def run(self):
# Load the data into a pandas dataframe # Load the data into pandas dataframes
count_by_day = self.read_source() daily_enrollment_changes = self.read_source()
offsets = self.read_offsets() offsets = self.read_offsets()
if offsets is not None:
self.include_offsets(count_by_day, offsets)
cumulative_by_week = self.accumulate(count_by_day) daily_enrollment_totals = self.calculate_total_enrollment(daily_enrollment_changes, offsets)
# Sample the cumulative data on the requested days.
# Result is NaN if there is no data available for that date.
weekly_enrollment_totals = self.select_weekly_values(
daily_enrollment_totals,
self.date,
self.weeks
)
statuses = self.read_statuses() statuses = self.read_statuses()
with self.output().open('w') as output_file: with self.output().open('w') as output_file:
self.save_output(cumulative_by_week, statuses, output_file) self.save_output(weekly_enrollment_totals, statuses, output_file)
def read_source(self): def read_source(self):
""" """
...@@ -75,19 +187,8 @@ class EnrollmentsByWeek(luigi.Task): ...@@ -75,19 +187,8 @@ class EnrollmentsByWeek(luigi.Task):
""" """
with self.input()['source'].open('r') as input_file: with self.input()['source'].open('r') as input_file:
data = self.read_date_count_tsv(input_file) course_date_count_data = self.read_course_date_count_tsv(input_file)
data = self.initialize_daily_count(course_date_count_data)
# Reorganize the data. One column per course_id, with
# shared date index.
data = data.pivot(index='date',
columns='course_id',
values='count')
# Complete the range of data to include all days between
# the dates of the first and last events.
date_range = pandas.date_range(min(data.index), max(data.index))
data = data.reindex(date_range)
data = data.fillna(0)
return data return data
...@@ -102,24 +203,12 @@ class EnrollmentsByWeek(luigi.Task): ...@@ -102,24 +203,12 @@ class EnrollmentsByWeek(luigi.Task):
Returns None if no offset was specified. Returns None if no offset was specified.
""" """
data = None data = None
if self.input().get('offsets'): if self.input().get('offsets'):
with self.input()['offsets'].open('r') as offset_file: with self.input()['offsets'].open('r') as offset_file:
data = self.read_date_count_tsv(offset_file) data = self.read_course_date_count_tsv(offset_file)
return data
def read_date_count_tsv(self, input_file):
"""Read hadoop formatted tsv file into a pandas DataFrame."""
names = ['course_id', 'date', 'count']
# Not assuming any encoding, course_id will be read as plain string
data = self.read_tsv(input_file, names)
data.date = pandas.to_datetime(data.date)
return data return data
def read_statuses(self): def read_statuses(self):
...@@ -139,67 +228,11 @@ class EnrollmentsByWeek(luigi.Task): ...@@ -139,67 +228,11 @@ class EnrollmentsByWeek(luigi.Task):
if self.input().get('statuses'): if self.input().get('statuses'):
with self.input()['statuses'].open('r') as status_file: with self.input()['statuses'].open('r') as status_file:
data = self.read_tsv(status_file, names) data = read_tsv(status_file, names)
data = data.set_index('course_id') data = data.set_index('course_id')
return data return data
def read_tsv(self, input_file, names):
"""
Reads a tab-separated file into a DataFrame.
Args:
input_file (str): Path to the input file.
names (list): The names of the columns in the input file.
Returns:
A pandas DataFrame read from the file contents of the file.
"""
return pandas.read_csv(
input_file,
names=names,
quoting=csv.QUOTE_NONE,
encoding=None,
delimiter='\t'
)
def include_offsets(self, count_by_day, offsets):
"""
Add offsets to a dataframe inplace.
Args:
count_by_day: Dataframe with format from `read_source`
offsets: Dataframe with format from `read_offsets`.
"""
for n, (course_id, date, count) in offsets.iterrows():
if course_id in count_by_day.columns:
# The offsets are computed to begining of that day. We
# add them to the counts by the end of that day to
# get the correct count for the day.
count_by_day.loc[date, course_id] += count
# Flag values before the offset day with NaN,
# since they are not "available".
not_available = count_by_day.index < date
count_by_day.loc[not_available, course_id] = numpy.NaN
def accumulate(self, count_by_day):
# Calculate the cumulative sum per day of the input.
# Entries with NaN stay NaN.
# At this stage only the data prior to the offset should contain NaN.
cumulative_sum = count_by_day.cumsum()
# List the dates of the last day of each week requested.
start, weeks = self.date, self.weeks
days = [start - timedelta(i * 7) for i in reversed(xrange(0, weeks))]
# Sample the cumulative data on the requested days.
# Result is NaN if there is no data available for that date.
results = cumulative_sum.loc[days]
return results
def save_output(self, results, statuses, output_file): def save_output(self, results, statuses, output_file):
results = results.transpose() results = results.transpose()
......
"""Tests for Total Users and Enrollment report."""
from contextlib import contextmanager
import datetime
import textwrap
from StringIO import StringIO
from unittest import TestCase
import luigi
import luigi.hdfs
from mock import MagicMock
from numpy import isnan
import pandas
from edx.analytics.tasks.reports.total_enrollments import TotalUsersAndEnrollmentsByWeek, TOTAL_ENROLLMENT_ROWNAME
class FakeTarget(object):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def __init__(self, value=''):
self.buffer = StringIO(value)
# Rewind the buffer head so the value can be read
self.buffer.seek(0)
@contextmanager
def open(self, *args, **kwargs):
yield self.buffer
# Rewind the head for easy reading
self.buffer.seek(0)
class TestTotalUsersAndEnrollmentsByWeek(TestCase):
"""Tests for TotalUsersAndEnrollmentsByWeek class."""
def run_task(self, source, date, weeks, offset=None, history=None):
"""
Run task with fake targets.
Returns:
the task output as a pandas dataframe.
"""
parsed_date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
# Make offsets None if it was not specified.
task = TotalUsersAndEnrollmentsByWeek(
source='fake_source',
offsets='fake_offsets' if offset else None,
history='fake_history' if history else None,
destination='fake_destination',
date=parsed_date,
weeks=weeks
)
# Mock the input and output targets
def reformat(string):
# Reformat string to make it like a hadoop tsv
return textwrap.dedent(string).strip().replace(' ', '\t')
input_targets = {
'source': FakeTarget(reformat(source)),
}
# Mock offsets only if specified.
if offset:
input_targets.update({'offsets': FakeTarget(reformat(offset))})
# Mock history only if specified.
if history:
input_targets.update({'history': FakeTarget(reformat(history))})
task.input = MagicMock(return_value=input_targets)
output_target = FakeTarget()
task.output = MagicMock(return_value=output_target)
# Run the task and parse the output into a pandas dataframe
task.run()
data = output_target.buffer.read()
result = pandas.read_csv(StringIO(data),
na_values=['-'],
index_col='name')
return result
def test_parse_source(self):
source = """
course_1 2013-01-01 10
course_1 2013-01-02 10
course_1 2013-01-03 10
course_1 2013-01-09 10
course_1 2013-01-17 10
course_2 2013-01-01 10
course_3 2013-01-01 10
"""
res = self.run_task(source, '2013-01-17', 3)
# self.assertEqual(set(['name']), set(res.index))
self.assertEqual(set(['2013-01-03', '2013-01-10', '2013-01-17']),
set(res.columns))
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-03'], 50)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-10'], 60)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-17'], 70)
def test_week_grouping(self):
source = """
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
res = self.run_task(source, '2013-01-21', 4)
weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21'])
self.assertEqual(weeks, set(str(w) for w in res.columns))
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
self.assertTrue(isnan(total_enrollment['2012-12-31'])) # no data
self.assertEqual(total_enrollment['2013-01-07'], 10)
self.assertEqual(total_enrollment['2013-01-14'], 20)
self.assertTrue(isnan(total_enrollment['2013-01-21'])) # no data
def test_cumulative(self):
source = """
course_1 2013-02-01 4
course_1 2013-02-04 4
course_1 2013-02-08 5
course_1 2013-02-12 -4
course_1 2013-02-16 6
course_1 2013-02-18 6
course_2 2013-02-12 2
course_2 2013-02-14 3
course_2 2013-02-15 -2
"""
res = self.run_task(source, '2013-02-18', 2)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
self.assertEqual(total_enrollment['2013-02-11'], 13)
self.assertEqual(total_enrollment['2013-02-18'], 24)
def test_offsets(self):
source = """
course_1 2013-03-01 1
course_1 2013-03-30 2
course_2 2013-03-07 1
course_2 2013-03-08 1
course_2 2013-03-10 1
course_2 2013-03-13 1
course_3 2013-03-15 1
course_3 2013-03-18 1
course_3 2013-03-19 1
"""
offset = """
course_2 2013-03-07 8
course_3 2013-03-15 6
"""
res = self.run_task(source, '2013-03-28', 4, offset=offset)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
self.assertEqual(total_enrollment['2013-03-07'], 10)
self.assertEqual(total_enrollment['2013-03-14'], 13)
self.assertEqual(total_enrollment['2013-03-21'], 22)
self.assertEqual(total_enrollment['2013-03-28'], 22)
def test_unicode(self):
course_id = u'course_\u2603'
source = u"""
{course_id} 2013-04-01 1
{course_id} 2013-04-02 1
""".format(course_id=course_id)
res = self.run_task(source.encode('utf-8'), '2013-04-02', 1)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-04-02'], 2)
def test_task_urls(self):
date = datetime.date(2013, 01, 20)
task = TotalUsersAndEnrollmentsByWeek(source='s3://bucket/path/',
offsets='s3://bucket/file.txt',
destination='file://path/file.txt',
date=date)
requires = task.requires()
source = requires['source'].output()
self.assertIsInstance(source, luigi.hdfs.HdfsTarget)
self.assertEqual(source.format, luigi.hdfs.PlainDir)
offsets = requires['offsets'].output()
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain)
destination = task.output()
self.assertIsInstance(destination, luigi.File)
"""Total Enrollment related reports"""
import csv
import luigi
import luigi.hdfs
import numpy
import pandas
from edx.analytics.tasks.url import ExternalURL, get_target_from_url
from edx.analytics.tasks.reports.enrollments import CourseEnrollmentCountMixin
ROWNAME_HEADER = 'name'
TOTAL_ENROLLMENT_ROWNAME = 'Total Enrollment'
class TotalUsersAndEnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
"""
Calculates total users and enrollments across all (known) courses per week.
Parameters:
source: Location of daily enrollments per date. The format is a
TSV file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is an
excel-compatible CSV file.
history: Location of historical values for total course enrollment.
The format is a TSV file, with fields "date" and "enrollments".
offsets: Location of seed values for each course. The format is a
Hadoop TSV file, with fields "course_id", "date" and "offset".
date: End date of the last week requested.
weeks: Number of weeks from the end date to request.
Output:
Excel-compatible CSV file with a header row and two non-header
rows. The first column is a title for the row, and subsequent
columns are the total counts for each week requested. The
first non-header row contains the total users at the end of
each week. The second row contains the total course
enrollments at the end of each week.
"""
# TODO: add the first (total users) row later, when we have access to total
# user counts (e.g. queried from and reconstructed from a production database).
source = luigi.Parameter()
destination = luigi.Parameter()
offsets = luigi.Parameter(default=None)
history = luigi.Parameter(default=None)
date = luigi.DateParameter()
weeks = luigi.IntParameter(default=52)
def requires(self):
results = {'source': ExternalURL(self.source)}
if self.offsets:
results.update({'offsets': ExternalURL(self.offsets)})
if self.history:
results.update({'history': ExternalURL(self.history)})
return results
def output(self):
return get_target_from_url(self.destination)
def run(self):
# Load the explicit enrollment data into a pandas dataframe.
daily_enrollment_changes = self.read_source()
# Add enrollment offsets to allow totals to be calculated
# for explicit enrollments.
offsets = self.read_offsets()
daily_enrollment_totals = self.calculate_total_enrollment(daily_enrollment_changes, offsets)
# Remove (or merge or whatever) data for courses that
# would otherwise result in duplicate counts.
self.filter_duplicate_courses(daily_enrollment_totals)
# Sum per-course counts to create a single series
# of total enrollment counts per day.
daily_overall_enrollment = daily_enrollment_totals.sum(axis=1)
daily_overall_enrollment.name = TOTAL_ENROLLMENT_ROWNAME
# Prepend total enrollment history.
overall_enrollment_history = self.read_history()
if overall_enrollment_history is not None:
self.prepend_history(daily_overall_enrollment, overall_enrollment_history)
# TODO: get user counts, as another series.
# TODO: Combine the two series into a single DataFrame, indexed by date.
# For now, put the single series into a data frame, so that
# it can be sampled and output in a consistent way.
total_counts_by_day = pandas.DataFrame(daily_overall_enrollment)
# Select values from DataFrame to display per-week.
total_counts_by_week = self.select_weekly_values(
total_counts_by_day,
self.date,
self.weeks,
)
with self.output().open('w') as output_file:
self.save_output(total_counts_by_week, output_file)
def read_source(self):
"""
Read source into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the source data.
"""
with self.input()['source'].open('r') as input_file:
course_date_count_data = self.read_course_date_count_tsv(input_file)
data = self.initialize_daily_count(course_date_count_data)
return data
def read_offsets(self):
"""
Read offsets into a pandas DataFrame.
Returns:
Pandas dataframe with one row per course_id and
columns for the date and count of the offset.
Returns None if no offset was specified.
"""
data = None
if self.input().get('offsets'):
with self.input()['offsets'].open('r') as offset_file:
data = self.read_course_date_count_tsv(offset_file)
return data
def read_history(self):
"""
Read course total enrollment history into a pandas DataFrame.
Returns:
Pandas Series, indexed by date, containing total
enrollment counts by date.
Returns None if no history was specified.
"""
# TODO: implement this for real. (This is just a placeholder.)
data = None
if self.input().get('history'):
with self.input()['history'].open('r') as history_file:
# TODO: read input file and convert to a Series.
pass
return data
def prepend_history(self, count_by_day, history):
"""
Add history to a series in-place.
Args:
count_by_day: pandas Series
history: pandas Series, also of counts indexed by date.
"""
# TODO: implement this for real. (This is just a placeholder.)
# Check that entry doesn't already exist in count_by_day
# before adding value from history.
# For gaps in history, values should be extrapolated.
# Also may to need to reindex, since new dates are being added.
pass
def filter_duplicate_courses(self, daily_enrollment_totals):
# TODO: implement this for real. (This is just a placeholder.)
# At this point we should remove data for courses that are
# represented by other courses, because the students have been
# moved to the new course. Perhaps this should actually
# perform a merge of the two courses, since we would want the
# history of one before the move date, and the history of the
# second after that date.
# Note that this is not the same filtering that would be applied
# to the EnrollmentsByWeek report.
pass
def save_output(self, results, output_file):
"""
Write output to CSV file.
Args:
results: a pandas DataFrame object containing series data
per row to be output.
"""
# transpose the dataframe so that weeks are columns, and output:
results = results.transpose()
# List of fieldnames for the report
fieldnames = [ROWNAME_HEADER] + list(results.columns)
writer = csv.DictWriter(output_file, fieldnames)
writer.writerow(dict((k, k) for k in fieldnames)) # Write header
def format_counts(counts_dict):
for k, v in counts_dict.iteritems():
yield k, '-' if numpy.isnan(v) else int(v)
for series_name, series in results.iterrows():
values = {
ROWNAME_HEADER: series_name,
}
by_week_values = format_counts(series.to_dict())
values.update(by_week_values)
writer.writerow(values)
"""Helpers for reading TSV files."""
import csv
import pandas
def read_tsv(input_file, names):
"""
Reads a tab-separated file into a DataFrame.
Args:
input_file (str): Path to the input file.
names (list): The names of the columns in the input file.
Returns:
A pandas DataFrame read from the file contents of the file.
"""
return pandas.read_csv(
input_file,
names=names,
quoting=csv.QUOTE_NONE,
encoding=None,
delimiter='\t'
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment