Commit c6d849a8 by Carlos Andrés Rocha Committed by Carlos Andrés Rocha

Add skeleton for daily enrollments and registrations report

Change-Id: I37c37c1c95fefae275b1bcede6cb0f55b956295c
parent 642687f8
...@@ -174,3 +174,126 @@ class WeeklyIncrementalUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountM ...@@ -174,3 +174,126 @@ class WeeklyIncrementalUsersAndEnrollments(luigi.Task, AllCourseEnrollmentCountM
] ]
) )
return weekly_report return weekly_report
class DailyRegistrationsEnrollmentsAndCourses(luigi.Task,
AllCourseEnrollmentCountMixin):
"""
Calculates users registration and total enrollments across courses.
Parameters:
registrations: Location of daily registrations per date. The format is a
TSV file, with fields date and count.
enrollments: Location of daily enrollments per date. The format is a
TSV file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is an
excel-compatible CSV file.
date: End date of the last week requested.
days: Number of days from the end date to request.
Output:
Excel-compatible CSV file with a header row.
Columns are the days requested.
First row is number of user registrations.
Second row is the number of total course enrollments.
Third row is the number of live courses.
"""
ROW_LABELS = {
'header': 'name',
'registrations': 'Daily New Users',
'enrollments': 'Daily Course Enrollment Changes',
'courses': 'Total Live Courses',
}
registrations = luigi.Parameter()
enrollments = luigi.Parameter()
destination = luigi.Parameter()
date = luigi.DateParameter()
days = luigi.IntParameter(default=28)
blacklist = luigi.Parameter(default=None)
@property
def date_range(self):
"""The date range according to the task parameters"""
end = self.date
start = end - timedelta(self.days)
date_range = pandas.date_range(start, end, closed='right')
return date_range.date
def requires(self):
results = {
'enrollments': ExternalURL(self.enrollments),
'registrations': ExternalURL(self.registrations),
}
if self.blacklist:
results.update({'blacklist': ExternalURL(self.blacklist)})
return results
def output(self):
return get_target_from_url(self.destination)
def run(self):
daily_registrations = self.read_registrations()
daily_enrollments = self.read_enrollments()
report = self.assemble_report(
daily_registrations,
daily_enrollments,
)
with self.output().open('w') as output_file:
self.save_output(report, output_file)
def read_registrations(self):
"""
Read history of user registrations.
Returns:
Pandas DataFrame indexed by date with a single column
representing the number of users who have accounts at
the end of that day.
"""
with self.input()['registrations'].open('r') as input_file:
daily_registration_changes = self.read_incremental_count_tsv(input_file)
return daily_registration_changes.reindex(self.date_range)
def read_enrollments(self):
"""
Read enrollments into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the enrollments data.
"""
with self.input()['enrollments'].open('r') as input_file:
course_date_count_data = self.read_course_date_count_tsv(input_file)
enrollments = self.initialize_daily_count(course_date_count_data)
course_blacklist = self.read_course_blacklist()
self.filter_out_courses(enrollments, course_blacklist)
# Sum counts for all courses
result = enrollments.sum(axis=1)
return result.reindex(self.date_range)
def assemble_report(self, registrations, enrollments):
"""
Create a dataframe that represents the final report.
Args:
registration: Pandas series, with date as index.
enrollments: Pandas series, with date as index.
Returns:
A Pandas dataframe, with date as index and two columns.
"""
report = pandas.DataFrame.from_items([
(self.ROW_LABELS['registrations'], registrations),
(self.ROW_LABELS['enrollments'], enrollments),
])
return report
...@@ -10,6 +10,7 @@ import pandas ...@@ -10,6 +10,7 @@ import pandas
from edx.analytics.tasks.tests import unittest from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget from edx.analytics.tasks.tests.target import FakeTarget
from edx.analytics.tasks.reports.incremental_enrollments import WeeklyIncrementalUsersAndEnrollments from edx.analytics.tasks.reports.incremental_enrollments import WeeklyIncrementalUsersAndEnrollments
from edx.analytics.tasks.reports.incremental_enrollments import DailyRegistrationsEnrollmentsAndCourses
class TestWeeklyIncrementalUsersAndEnrollments(unittest.TestCase): class TestWeeklyIncrementalUsersAndEnrollments(unittest.TestCase):
...@@ -274,3 +275,138 @@ class TestWeeklyIncrementalUsersAndEnrollments(unittest.TestCase): ...@@ -274,3 +275,138 @@ class TestWeeklyIncrementalUsersAndEnrollments(unittest.TestCase):
res = self.run_task(None, enrollments, '2013-01-15', 2, blacklist=blacklist) res = self.run_task(None, enrollments, '2013-01-15', 2, blacklist=blacklist)
self.assertEqual(res.loc[self.row_label('enrollment_change')]['2013-01-08'], 4) self.assertEqual(res.loc[self.row_label('enrollment_change')]['2013-01-08'], 4)
self.assertEqual(res.loc[self.row_label('enrollment_change')]['2013-01-15'], 2) self.assertEqual(res.loc[self.row_label('enrollment_change')]['2013-01-15'], 2)
class TestDailyRegistrationsEnrollmentsAndCourses(unittest.TestCase):
"""Tests for DailyRegistrationsEnrollmentsAndCourses class."""
def setUp(self):
self.enrollment_label = DailyRegistrationsEnrollmentsAndCourses.ROW_LABELS['enrollments']
self.registrations_label = DailyRegistrationsEnrollmentsAndCourses.ROW_LABELS['registrations']
def run_task(self, registrations, enrollments, date, days, blacklist=None):
"""
Run task with fake targets.
Returns:
the task output as a pandas dataframe.
"""
parsed_date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
# Make offsets None if it was not specified.
task = DailyRegistrationsEnrollmentsAndCourses(
registrations='fake_registrations',
enrollments='fake_enrollments',
destination='fake_destination',
date=parsed_date,
days=days,
blacklist=blacklist
)
# Default missing inputs
if registrations is None:
registrations = """
2013-01-01 10
2013-01-10 20
"""
if enrollments is None:
enrollments = """
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
# Mock the input and output targets
def reformat(string):
# Reformat string to make it like a hadoop tsv
return textwrap.dedent(string).strip().replace(' ', '\t')
input_targets = {
'enrollments': FakeTarget(reformat(enrollments)),
'registrations': FakeTarget(reformat(registrations)),
}
# Mock blacklist only if specified.
if blacklist:
input_targets.update({'blacklist': FakeTarget(reformat(blacklist))})
task.input = MagicMock(return_value=input_targets)
output_target = FakeTarget()
task.output = MagicMock(return_value=output_target)
# Run the task and parse the output into a pandas dataframe
task.run()
data = output_target.buffer.read()
result = pandas.read_csv(StringIO(data),
na_values=['-'],
index_col='name')
return result
def test_incremental_registration(self):
registrations = """
2013-02-15 -2
2013-02-16 6
2013-02-18 6
"""
res = self.run_task(registrations, None, '2013-02-19', 6)
days = set(['2013-02-14', '2013-02-15', '2013-02-16', '2013-02-17', '2013-02-18', '2013-02-19'])
self.assertEqual(days, set(str(col) for col in res.columns))
inc_registration = res.loc[self.registrations_label]
self.assertTrue(isnan(inc_registration['2013-02-14']))
self.assertEqual(inc_registration['2013-02-15'], -2)
self.assertEqual(inc_registration['2013-02-16'], 6)
self.assertEqual(inc_registration['2013-02-17'], 0)
self.assertEqual(inc_registration['2013-02-18'], 6)
self.assertTrue(isnan(inc_registration['2013-02-19']))
def test_incremental_enrollment(self):
enrollments = """
course_1 2013-02-01 4
course_1 2013-02-18 6
course_2 2013-02-17 3
course_2 2013-02-18 -2
"""
res = self.run_task(None, enrollments, '2013-02-19', 4)
days = set(['2013-02-16', '2013-02-17', '2013-02-18', '2013-02-19'])
self.assertEqual(days, set(str(d) for d in res.columns))
inc_enrollment = res.loc[self.enrollment_label]
self.assertEqual(inc_enrollment['2013-02-16'], 0)
self.assertEqual(inc_enrollment['2013-02-17'], 3)
self.assertEqual(inc_enrollment['2013-02-18'], 4)
self.assertTrue(isnan(inc_enrollment['2013-02-19']))
def test_output_row_order(self):
res = self.run_task(None, None, '2013-02-18', 2)
expected_rows = [
self.registrations_label,
self.enrollment_label,
]
self.assertEqual(res.index.tolist(), expected_rows)
def test_blacklist(self):
enrollments = """
course_1 2013-01-02 1
course_2 2013-01-02 2
course_3 2013-01-02 4
course_2 2013-01-09 1
course_3 2013-01-15 2
"""
blacklist = """
course_1
course_2
"""
res = self.run_task(None, enrollments, '2013-01-15', 20, blacklist=blacklist)
self.assertEqual(res.loc[self.enrollment_label]['2013-01-02'], 4)
self.assertEqual(res.loc[self.enrollment_label]['2013-01-10'], 0)
self.assertEqual(res.loc[self.enrollment_label]['2013-01-15'], 2)
...@@ -297,6 +297,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase): ...@@ -297,6 +297,7 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
self.assertEqual(registrations.format, luigi.hdfs.Plain) self.assertEqual(registrations.format, luigi.hdfs.Plain)
destination = task.output() destination = task.output()
self.assertEqual(destination.path, 's3://path/total_users_and_enrollments_2012-01-22-2013-01-20.csv') self.assertEqual(destination.path, 's3://path/total_users_and_enrollments_2012-01-22-2013-01-20.csv')
self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget) self.assertIsInstance(offsets, luigi.hdfs.HdfsTarget)
self.assertEqual(offsets.format, luigi.hdfs.Plain) self.assertEqual(offsets.format, luigi.hdfs.Plain)
"""Total Enrollment related reports""" """Total Enrollment related reports"""
import csv import csv
from datetime import timedelta, date from datetime import timedelta, date
import luigi import luigi
...@@ -38,10 +37,11 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin): ...@@ -38,10 +37,11 @@ class AllCourseEnrollmentCountMixin(CourseEnrollmentCountMixin):
data.date = pandas.to_datetime(data.date) data.date = pandas.to_datetime(data.date)
data = data.set_index('date') data = data.set_index('date')
# Ensure a continuos date range
date_range = pandas.date_range(min(data.index), max(data.index)) date_range = pandas.date_range(min(data.index), max(data.index))
data = data.reindex(date_range) data = data.reindex(date_range)
# return as a Series # Return as a Series
return data['count'] return data['count']
def read_incremental_count_tsv(self, input_file): def read_incremental_count_tsv(self, input_file):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment