Commit 68abaf6e by Brian Wilson

Add history to WeeklyAllUsersAndEnrollments report.

Change-Id: Iccd7c7ec7970375fd8e5151320575277674e949a
parent a7128612
...@@ -41,9 +41,22 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -41,9 +41,22 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
# Mock the input and output targets # Mock the input and output targets
def reformat(string): def reformat(string):
# Reformat string to make it like a hadoop tsv """Reformat string to make it like a TSV."""
return textwrap.dedent(string).strip().replace(' ', '\t') return textwrap.dedent(string).strip().replace(' ', '\t')
if source is None:
source = """
course_1 2013-03-01 1
course_1 2013-03-30 2
course_2 2013-03-07 1
course_2 2013-03-08 1
course_2 2013-03-10 1
course_2 2013-03-13 1
course_3 2013-03-15 1
course_3 2013-03-18 1
course_3 2013-03-19 1
"""
input_targets = { input_targets = {
'source': FakeTarget(reformat(source)), 'source': FakeTarget(reformat(source)),
} }
...@@ -98,8 +111,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -98,8 +111,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
""" """
res = self.run_task(source, '2013-01-21', 4) res = self.run_task(source, '2013-01-21', 4)
weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21']) weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21'])
self.assertEqual(weeks, set(str(w) for w in res.columns)) self.assertEqual(weeks, set(str(w) for w in res.columns)) # pylint: disable=maybe-no-member
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # pylint: disable=maybe-no-member
self.assertTrue(isnan(total_enrollment['2012-12-31'])) # no data self.assertTrue(isnan(total_enrollment['2012-12-31'])) # no data
self.assertEqual(total_enrollment['2013-01-07'], 10) self.assertEqual(total_enrollment['2013-01-07'], 10)
self.assertEqual(total_enrollment['2013-01-14'], 20) self.assertEqual(total_enrollment['2013-01-14'], 20)
...@@ -118,29 +131,60 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -118,29 +131,60 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
course_2 2013-02-15 -2 course_2 2013-02-15 -2
""" """
res = self.run_task(source, '2013-02-18', 2) res = self.run_task(source, '2013-02-18', 2)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # pylint: disable=maybe-no-member
self.assertEqual(total_enrollment['2013-02-11'], 13) self.assertEqual(total_enrollment['2013-02-11'], 13)
self.assertEqual(total_enrollment['2013-02-18'], 24) self.assertEqual(total_enrollment['2013-02-18'], 24)
def test_offsets(self): def test_offsets(self):
source = """ offset = """
course_1 2013-03-01 1 course_2 2013-03-07 8
course_1 2013-03-30 2 course_3 2013-03-15 6
course_2 2013-03-07 1
course_2 2013-03-08 1
course_2 2013-03-10 1
course_2 2013-03-13 1
course_3 2013-03-15 1
course_3 2013-03-18 1
course_3 2013-03-19 1
""" """
res = self.run_task(None, '2013-03-28', 6, offset=offset)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # pylint: disable=maybe-no-member
self.assertTrue(isnan(total_enrollment['2013-02-21'])) # no data
self.assertTrue(isnan(total_enrollment['2013-02-28'])) # no data
self.assertEqual(total_enrollment['2013-03-07'], 10)
self.assertEqual(total_enrollment['2013-03-14'], 13)
self.assertEqual(total_enrollment['2013-03-21'], 22)
self.assertEqual(total_enrollment['2013-03-28'], 22)
def test_non_overlapping_history(self):
offset = """ offset = """
course_2 2013-03-07 8 course_2 2013-03-07 8
course_3 2013-03-15 6 course_3 2013-03-15 6
""" """
res = self.run_task(source, '2013-03-28', 4, offset=offset) # Choose history so that it ends right before
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # source data begins (on 3/1).
history = """
2013-02-21 4
2013-02-28 10
"""
res = self.run_task(None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # pylint: disable=maybe-no-member
self.assertEqual(total_enrollment['2013-02-21'], 4)
self.assertEqual(total_enrollment['2013-02-28'], 10)
self.assertEqual(total_enrollment['2013-03-07'], 10)
self.assertEqual(total_enrollment['2013-03-14'], 13)
self.assertEqual(total_enrollment['2013-03-21'], 22)
self.assertEqual(total_enrollment['2013-03-28'], 22)
def test_overlapping_history(self):
offset = """
course_2 2013-03-07 8
course_3 2013-03-15 6
"""
# Choose history so that it overlaps
# with when source data begins (on 3/1).
history = """
2013-02-18 4
2013-03-21 22
"""
res = self.run_task(None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME] # pylint: disable=maybe-no-member
print total_enrollment
self.assertEqual(total_enrollment['2013-02-21'], 5)
self.assertEqual(total_enrollment['2013-02-28'], 9)
self.assertEqual(total_enrollment['2013-03-07'], 10) self.assertEqual(total_enrollment['2013-03-07'], 10)
self.assertEqual(total_enrollment['2013-03-14'], 13) self.assertEqual(total_enrollment['2013-03-14'], 13)
self.assertEqual(total_enrollment['2013-03-21'], 22) self.assertEqual(total_enrollment['2013-03-21'], 22)
...@@ -156,7 +200,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -156,7 +200,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
res = self.run_task(source.encode('utf-8'), '2013-04-02', 1) res = self.run_task(source.encode('utf-8'), '2013-04-02', 1)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-04-02'], 2) self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-04-02'], 2) # pylint: disable=maybe-no-member
def test_task_urls(self): def test_task_urls(self):
date = datetime.date(2013, 01, 20) date = datetime.date(2013, 01, 20)
...@@ -164,6 +208,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -164,6 +208,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
task = WeeklyAllUsersAndEnrollments(source='s3://bucket/path/', task = WeeklyAllUsersAndEnrollments(source='s3://bucket/path/',
offsets='s3://bucket/file.txt', offsets='s3://bucket/file.txt',
destination='file://path/file.txt', destination='file://path/file.txt',
history='file://path/history/file.gz',
date=date) date=date)
requires = task.requires() requires = task.requires()
...@@ -176,5 +221,9 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -176,5 +221,9 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget) self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain) self.assertEqual(offsets.format, luigi.hdfs.Plain)
offsets = requires['history'].output()
self.assertIsInstance(offsets, luigi.File)
self.assertEqual(offsets.format, luigi.format.Gzip)
destination = task.output() destination = task.output()
self.assertIsInstance(destination, luigi.File) self.assertIsInstance(destination, luigi.File)
"""Total Enrollment related reports""" """Total Enrollment related reports"""
import csv import csv
from datetime import timedelta
import luigi import luigi
import luigi.hdfs import luigi.hdfs
...@@ -29,8 +30,8 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin): ...@@ -29,8 +30,8 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
names = ['date', 'count'] names = ['date', 'count']
data = read_tsv(input_file, names) data = read_tsv(input_file, names)
data.date = pandas.to_datetime(data.date) data.date = pandas.to_datetime(data.date) # pylint: disable=maybe-no-member
data = data.set_index('date') data = data.set_index('date') # pylint: disable=maybe-no-member
date_range = pandas.date_range(min(data.index), max(data.index)) date_range = pandas.date_range(min(data.index), max(data.index))
data = data.reindex(date_range) data = data.reindex(date_range)
...@@ -40,18 +41,27 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin): ...@@ -40,18 +41,27 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
def read_incremental_count_tsv(self, input_file): def read_incremental_count_tsv(self, input_file):
""" """
Read TSV containing dates and corresponding counts into a pandas Series. Read TSV containing dates and incremental counts.
Args:
input_file: TSV file with dates and incremental counts.
Interstitial incremental counts are filled as zeroes. Returns:
pandas Series containing daily counts. Counts for missing days are set to zero.
""" """
return self.read_date_count_tsv(input_file).fillna(0) return self.read_date_count_tsv(input_file).fillna(0)
def read_total_count_tsv(self, input_file): def read_total_count_tsv(self, input_file):
# TODO: this is a placeholder for reading in historical counts, """
# such as total enrollment numbers. It will Read TSV containing dates and total counts.
# need to interpolate the interstitial NANs.
data = self.read_date_count_tsv(input_file) Args:
return data input_file: TSV file with dates and total counts.
Returns:
pandas Series containing daily counts. Counts for missing days are interpolated.
"""
return self.read_date_count_tsv(input_file).interpolate(method='time')
def filter_duplicate_courses(self, daily_enrollment_totals): def filter_duplicate_courses(self, daily_enrollment_totals):
# TODO: implement this for real. (This is just a placeholder.) # TODO: implement this for real. (This is just a placeholder.)
...@@ -85,6 +95,7 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin): ...@@ -85,6 +95,7 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
writer.writerow(dict((k, k) for k in fieldnames)) # Write header writer.writerow(dict((k, k) for k in fieldnames)) # Write header
def format_counts(counts_dict): def format_counts(counts_dict):
"""Replace NaN with dashes."""
for k, v in counts_dict.iteritems(): for k, v in counts_dict.iteritems():
yield k, '-' if numpy.isnan(v) else int(v) yield k, '-' if numpy.isnan(v) else int(v)
...@@ -160,18 +171,18 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin): ...@@ -160,18 +171,18 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
# Sum per-course counts to create a single series # Sum per-course counts to create a single series
# of total enrollment counts per day. # of total enrollment counts per day.
daily_overall_enrollment = daily_enrollment_totals.sum(axis=1) daily_overall_enrollment = daily_enrollment_totals.sum(axis=1)
daily_overall_enrollment.name = TOTAL_ENROLLMENT_ROWNAME
# Prepend total enrollment history. # Prepend total enrollment history.
overall_enrollment_history = self.read_history() overall_enrollment_history = self.read_history()
if overall_enrollment_history is not None: if overall_enrollment_history is not None:
self.prepend_history(daily_overall_enrollment, overall_enrollment_history) daily_overall_enrollment = self.prepend_history(daily_overall_enrollment, overall_enrollment_history)
# TODO: get user counts, as another series. # TODO: get user counts, as another series.
# TODO: Combine the two series into a single DataFrame, indexed by date. # TODO: Combine the two series into a single DataFrame, indexed by date.
# For now, put the single series into a data frame, so that # For now, put the single series into a data frame, so that
# it can be sampled and output in a consistent way. # it can be sampled and output in a consistent way.
daily_overall_enrollment.name = TOTAL_ENROLLMENT_ROWNAME
total_counts_by_day = pandas.DataFrame(daily_overall_enrollment) total_counts_by_day = pandas.DataFrame(daily_overall_enrollment)
# Select values from DataFrame to display per-week. # Select values from DataFrame to display per-week.
...@@ -181,7 +192,7 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin): ...@@ -181,7 +192,7 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
self.weeks, self.weeks,
) )
with self.output().open('w') as output_file: with self.output().open('w') as output_file: # pylint: disable=maybe-no-member
self.save_output(total_counts_by_week, output_file) self.save_output(total_counts_by_week, output_file)
def read_source(self): def read_source(self):
...@@ -211,7 +222,7 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin): ...@@ -211,7 +222,7 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
""" """
data = None data = None
if self.input().get('offsets'): if self.input().get('offsets'):
with self.input()['offsets'].open('r') as offset_file: with self.input()['offsets'].open('r') as offset_file: # pylint: disable=maybe-no-member
data = self.read_course_date_count_tsv(offset_file) data = self.read_course_date_count_tsv(offset_file)
return data return data
...@@ -226,12 +237,11 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin): ...@@ -226,12 +237,11 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
Returns None if no history was specified. Returns None if no history was specified.
""" """
# TODO: implement this for real. (This is just a placeholder.)
data = None data = None
if self.input().get('history'): if self.input().get('history'):
with self.input()['history'].open('r') as history_file: with self.input()['history'].open('r') as history_file: # pylint: disable=maybe-no-member
# TODO: read input file and convert to a Series. data = self.read_total_count_tsv(history_file)
pass
return data return data
def prepend_history(self, count_by_day, history): def prepend_history(self, count_by_day, history):
...@@ -243,9 +253,10 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin): ...@@ -243,9 +253,10 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
history: pandas Series, also of counts indexed by date. history: pandas Series, also of counts indexed by date.
""" """
# TODO: implement this for real. (This is just a placeholder.) # Get history dates that are not in the regular count data so there is no overlap.
# Check that entry doesn't already exist in count_by_day last_day_of_history = count_by_day.index[0] - timedelta(1)
# before adding value from history. truncated_history = history[:last_day_of_history]
# For gaps in history, values should be extrapolated.
# Also may to need to reindex, since new dates are being added. result = count_by_day.append(truncated_history, verify_integrity=True)
pass
return result
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment