Commit c99f5f6d by Gabe Mulley

Include registration data in one year report

Change-Id: Ib73d66d635eda6f7c0d372996a5e136011d97609
parent 30be9af3
......@@ -10,7 +10,8 @@ from mock import MagicMock
from numpy import isnan
import pandas
from edx.analytics.tasks.reports.total_enrollments import WeeklyAllUsersAndEnrollments, TOTAL_ENROLLMENT_ROWNAME
from edx.analytics.tasks.user_registrations import UserRegistrationsPerDay
from edx.analytics.tasks.reports.total_enrollments import WeeklyAllUsersAndEnrollments
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget
......@@ -18,7 +19,11 @@ from edx.analytics.tasks.tests.target import FakeTarget
class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
"""Tests for WeeklyAllUsersAndEnrollments class."""
def run_task(self, source, date, weeks, offset=None, history=None):
def setUp(self):
self.enrollment_label = WeeklyAllUsersAndEnrollments.ROW_LABELS['enrollments']
self.registrations_label = WeeklyAllUsersAndEnrollments.ROW_LABELS['registrations']
def run_task(self, registrations, enrollments, date, weeks, offset=None, history=None):
"""
Run task with fake targets.
......@@ -30,12 +35,13 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
# Make offsets None if it was not specified.
task = WeeklyAllUsersAndEnrollments(
source='fake_source',
enrollments='fake_enrollments',
offsets='fake_offsets' if offset else None,
history='fake_history' if history else None,
destination='fake_destination',
date=parsed_date,
weeks=weeks
weeks=weeks,
credentials=None
)
# Mock the input and output targets
......@@ -44,8 +50,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
"""Reformat string to make it like a TSV."""
return textwrap.dedent(string).strip().replace(' ', '\t')
if source is None:
source = """
if enrollments is None:
enrollments = """
course_1 2013-03-01 1
course_1 2013-03-30 2
course_2 2013-03-07 1
......@@ -58,7 +64,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
"""
input_targets = {
'source': FakeTarget(reformat(source)),
'enrollments': FakeTarget(reformat(enrollments)),
'registrations': FakeTarget(reformat(registrations))
}
# Mock offsets only if specified.
......@@ -85,8 +92,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
return result
def test_parse_source(self):
source = """
def test_parsing(self):
enrollments = """
course_1 2013-01-01 10
course_1 2013-01-02 10
course_1 2013-01-03 10
......@@ -95,31 +102,49 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
course_2 2013-01-01 10
course_3 2013-01-01 10
"""
res = self.run_task(source, '2013-01-17', 3)
# self.assertEqual(set(['name']), set(res.index))
registrations = """
2013-01-01 30
2013-01-08 10
2013-01-17 10
"""
res = self.run_task(registrations, enrollments, '2013-01-17', 3)
self.assertEqual(set(['2013-01-03', '2013-01-10', '2013-01-17']),
set(res.columns))
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-03'], 50)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-10'], 60)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-01-17'], 70)
total_enrollment = res.loc[self.enrollment_label]
self.assertEqual(total_enrollment['2013-01-03'], 50)
self.assertEqual(total_enrollment['2013-01-10'], 60)
self.assertEqual(total_enrollment['2013-01-17'], 70)
reg_row = res.loc[self.registrations_label]
self.assertEqual(reg_row['2013-01-03'], 30)
self.assertEqual(reg_row['2013-01-10'], 40)
self.assertEqual(reg_row['2013-01-17'], 50)
def test_week_grouping(self):
source = """
enrollments = """
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
res = self.run_task(source, '2013-01-21', 4)
registrations = """
2013-01-06 10
2013-01-14 10
"""
res = self.run_task(registrations, enrollments, '2013-01-21', 4)
weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21'])
self.assertEqual(weeks, set(str(w) for w in res.columns))
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
total_enrollment = res.loc[self.enrollment_label]
self.assertTrue(isnan(total_enrollment['2012-12-31'])) # no data
self.assertEqual(total_enrollment['2013-01-07'], 10)
self.assertEqual(total_enrollment['2013-01-14'], 20)
self.assertTrue(isnan(total_enrollment['2013-01-21'])) # no data
reg_row = res.loc[self.registrations_label]
self.assertEqual(reg_row['2013-01-07'], 10)
self.assertEqual(reg_row['2013-01-14'], 20)
def test_cumulative(self):
source = """
enrollments = """
course_1 2013-02-01 4
course_1 2013-02-04 4
course_1 2013-02-08 5
......@@ -130,8 +155,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
course_2 2013-02-14 3
course_2 2013-02-15 -2
"""
res = self.run_task(source, '2013-02-18', 2)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
res = self.run_task('', enrollments, '2013-02-18', 2)
total_enrollment = res.loc[self.enrollment_label]
self.assertEqual(total_enrollment['2013-02-11'], 13)
self.assertEqual(total_enrollment['2013-02-18'], 24)
......@@ -140,8 +165,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
course_2 2013-03-07 8
course_3 2013-03-15 6
"""
res = self.run_task(None, '2013-03-28', 6, offset=offset)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
res = self.run_task('', None, '2013-03-28', 6, offset=offset)
total_enrollment = res.loc[self.enrollment_label]
self.assertTrue(isnan(total_enrollment['2013-02-21'])) # no data
self.assertTrue(isnan(total_enrollment['2013-02-28'])) # no data
self.assertEqual(total_enrollment['2013-03-07'], 10)
......@@ -160,8 +185,9 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
2013-02-21 4
2013-02-28 10
"""
res = self.run_task(None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
res = self.run_task('', None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[self.enrollment_label]
self.assertEqual(total_enrollment['2013-02-21'], 4)
self.assertEqual(total_enrollment['2013-02-28'], 10)
self.assertEqual(total_enrollment['2013-03-07'], 10)
......@@ -180,8 +206,8 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
2013-02-18 4
2013-03-21 22
"""
res = self.run_task(None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[TOTAL_ENROLLMENT_ROWNAME]
res = self.run_task('', None, '2013-03-28', 6, offset=offset, history=history)
total_enrollment = res.loc[self.enrollment_label]
print total_enrollment
self.assertEqual(total_enrollment['2013-02-21'], 5)
self.assertEqual(total_enrollment['2013-02-28'], 9)
......@@ -193,37 +219,46 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
def test_unicode(self):
course_id = u'course_\u2603'
source = u"""
enrollments = u"""
{course_id} 2013-04-01 1
{course_id} 2013-04-02 1
""".format(course_id=course_id)
res = self.run_task(source.encode('utf-8'), '2013-04-02', 1)
res = self.run_task('', enrollments.encode('utf-8'), '2013-04-02', 1)
self.assertEqual(res.loc[TOTAL_ENROLLMENT_ROWNAME]['2013-04-02'], 2)
self.assertEqual(res.loc[self.enrollment_label]['2013-04-02'], 2)
def test_task_urls(self):
def test_task_configuration(self):
date = datetime.date(2013, 01, 20)
task = WeeklyAllUsersAndEnrollments(source='s3://bucket/path/',
task = WeeklyAllUsersAndEnrollments(enrollments='s3://bucket/path/',
offsets='s3://bucket/file.txt',
destination='file://path/file.txt',
destination='s3://path/',
history='file://path/history/file.gz',
date=date)
date=date,
credentials='s3://bucket/cred.json')
requires = task.requires()
source = requires['source'].output()
self.assertIsInstance(source, luigi.hdfs.HdfsTarget)
self.assertEqual(source.format, luigi.hdfs.PlainDir)
enrollments = requires['enrollments'].output()
self.assertIsInstance(enrollments, luigi.hdfs.HdfsTarget)
self.assertEqual(enrollments.format, luigi.hdfs.PlainDir)
offsets = requires['offsets'].output()
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain)
offsets = requires['history'].output()
self.assertIsInstance(offsets, luigi.File)
self.assertEqual(offsets.format, luigi.format.Gzip)
history = requires['history'].output()
self.assertIsInstance(history, luigi.File)
self.assertEqual(history.format, luigi.format.Gzip)
registrations = requires['registrations'].output()
self.assertIsInstance(requires['registrations'], UserRegistrationsPerDay)
self.assertEqual(registrations.path, 's3://path/user_registrations_1900-01-01-2013-01-21.tsv')
self.assertIsInstance(registrations, luigi.hdfs.HdfsTarget)
self.assertEqual(registrations.format, luigi.hdfs.Plain)
destination = task.output()
self.assertIsInstance(destination, luigi.File)
self.assertEqual(destination.path, 's3://path/total_users_and_enrollments_2012-01-22-2013-01-20.csv')
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain)
"""Total Enrollment related reports"""
import csv
from datetime import timedelta
from datetime import timedelta, date
import luigi
import luigi.hdfs
from luigi.date_interval import Custom
import numpy
import pandas
from edx.analytics.tasks.util.tsv import read_tsv
from edx.analytics.tasks.url import ExternalURL, get_target_from_url
from edx.analytics.tasks.url import ExternalURL, get_target_from_url, url_path_join
from edx.analytics.tasks.user_registrations import UserRegistrationsPerDay
from edx.analytics.tasks.reports.enrollments import CourseEnrollmentCountMixin
ROWNAME_HEADER = 'name'
TOTAL_ENROLLMENT_ROWNAME = 'Total Enrollment'
MINIMUM_DATE = date(1900, 1, 1)
class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
ROWNAME_HEADER = 'name'
def read_date_count_tsv(self, input_file):
"""
Read TSV containing dates and corresponding counts into a pandas Series.
......@@ -89,7 +94,7 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
results = results.transpose()
# List of fieldnames for the report
fieldnames = [ROWNAME_HEADER] + list(results.columns)
fieldnames = [self.ROWNAME_HEADER] + list(results.columns)
writer = csv.DictWriter(output_file, fieldnames)
writer.writerow(dict((k, k) for k in fieldnames)) # Write header
......@@ -101,7 +106,7 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
for series_name, series in results.iterrows():
values = {
ROWNAME_HEADER: series_name,
self.ROWNAME_HEADER: series_name,
}
by_week_values = format_counts(series.to_dict())
values.update(by_week_values)
......@@ -113,10 +118,10 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
Calculates total users and enrollments across all (known) courses per week.
Parameters:
source: Location of daily enrollments per date. The format is a
enrollments: Location of daily enrollments per date. The format is a
TSV file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is an
excel-compatible CSV file.
destination: Directory to store the resulting report and intermediate
results. The output format is an excel-compatible CSV file.
history: Location of historical values for total course enrollment.
The format is a TSV file, with fields "date" and "enrollments".
offsets: Location of seed values for each course. The format is a
......@@ -133,18 +138,47 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
enrollments at the end of each week.
"""
# TODO: add the first (total users) row later, when we have access to total
# user counts (e.g. queried from and reconstructed from a production database).
source = luigi.Parameter()
enrollments = luigi.Parameter()
destination = luigi.Parameter()
offsets = luigi.Parameter(default=None)
history = luigi.Parameter(default=None)
date = luigi.DateParameter()
weeks = luigi.IntParameter(default=52)
credentials = luigi.Parameter()
ROW_LABELS = {
'header': 'name',
'enrollments': 'Total Enrollment',
'registrations': 'Total Registrations',
}
@property
def start_date(self):
"""
Returns:
The first date to include in the result.
"""
return self.date - timedelta(self.weeks * 7)
def requires(self):
results = {'source': ExternalURL(self.source)}
# The end date is not included in the result, so we have to add a day
# to the provided date in order to ensure user registration data is
# gathered for that date.
end_date = self.date + timedelta(1)
# In order to compute the cumulative sum of user registrations we need
# all changes in registrations up to (and including) the provided date.
registrations = UserRegistrationsPerDay(
credentials=self.credentials,
destination=self.destination,
date_interval=Custom(MINIMUM_DATE, end_date)
)
results = {
'enrollments': ExternalURL(self.enrollments),
'registrations': registrations
}
if self.offsets:
results.update({'offsets': ExternalURL(self.offsets)})
if self.history:
......@@ -153,11 +187,16 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
return results
def output(self):
return get_target_from_url(self.destination)
return get_target_from_url(
url_path_join(
self.destination,
'total_users_and_enrollments_{0}-{1}.csv'.format(self.start_date, self.date)
)
)
def run(self):
# Load the explicit enrollment data into a pandas dataframe.
daily_enrollment_changes = self.read_source()
daily_enrollment_changes = self.read_enrollments()
# Add enrollment offsets to allow totals to be calculated
# for explicit enrollments.
......@@ -177,13 +216,20 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
if overall_enrollment_history is not None:
daily_overall_enrollment = self.prepend_history(daily_overall_enrollment, overall_enrollment_history)
# TODO: get user counts, as another series.
daily_overall_enrollment.name = self.ROW_LABELS['enrollments']
daily_user_registration_totals = self.read_user_registrations()
# TODO: Combine the two series into a single DataFrame, indexed by date.
# For now, put the single series into a data frame, so that
# it can be sampled and output in a consistent way.
daily_overall_enrollment.name = TOTAL_ENROLLMENT_ROWNAME
total_counts_by_day = pandas.DataFrame(daily_overall_enrollment)
# Because the registration data index is the requested date range
# use it as the canonical index and left join in the enrollment
# counts.
total_counts_by_day = pandas.merge(
daily_user_registration_totals,
pandas.DataFrame(daily_overall_enrollment),
how='left',
left_index=True,
right_index=True
)
# Select values from DataFrame to display per-week.
total_counts_by_week = self.select_weekly_values(
......@@ -195,16 +241,16 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
with self.output().open('w') as output_file:
self.save_output(total_counts_by_week, output_file)
def read_source(self):
def read_enrollments(self):
"""
Read source into a pandas DataFrame.
Read enrollments into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the source data.
for the time interval available in the enrollments data.
"""
with self.input()['source'].open('r') as input_file:
with self.input()['enrollments'].open('r') as input_file:
course_date_count_data = self.read_course_date_count_tsv(input_file)
data = self.initialize_daily_count(course_date_count_data)
return data
......@@ -244,6 +290,31 @@ class WeeklyAllUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountMixin):
return data
def read_user_registrations(self):
"""
Read history of user registrations.
Returns:
Pandas DataFrame indexed by date with a single column
representing the number of users who have accounts at
the end of that day.
"""
with self.input()['registrations'].open('r') as registrations_file:
# The column name here will be converted in to a row name later when
# the data is transposed.
registration_changes = read_tsv(registrations_file, ['date', self.ROW_LABELS['registrations']])
registration_changes.date = pandas.to_datetime(registration_changes.date)
registration_changes.set_index(['date'], inplace=True)
cumulative_registrations = registration_changes.cumsum()
# Restrict the index to only the date range requested
date_range = pandas.date_range(self.start_date, self.date)
# Forward fill gaps because those dates have no change in registrations
cumulative_registrations = cumulative_registrations.reindex(date_range, method='ffill')
return cumulative_registrations
def prepend_history(self, count_by_day, history):
"""
Add history to a series in-place.
......
......@@ -28,7 +28,6 @@ edx.analytics.tasks =
total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments
inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
users-per-day = edx.analytics.tasks.user_registrations:UserRegistrationsPerDay
mapreduce.engine =
hadoop = luigi.hadoop:DefaultHadoopJobRunner
......
......@@ -50,6 +50,9 @@
- name: branch checked out
command: git checkout FETCH_HEAD chdir={{ working_repo_dir }}
- name: ensure system packages are installed
command: make system-requirements chdir={{ working_repo_dir }}
- name: bootstrap pip
command: sudo apt-get install -q -y python-pip
sudo: True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment