Commit 80b4b413 by Gabe Mulley

Merge pull request #9 from mulby/gabe/course-status

Include status and org_id in enrollment report
parents a5040553 7c840978
......@@ -32,6 +32,7 @@ class EnrollmentsByWeek(luigi.Task):
source = luigi.Parameter()
destination = luigi.Parameter()
offsets = luigi.Parameter(default=None)
statuses = luigi.Parameter(default=None)
date = luigi.DateParameter()
weeks = luigi.IntParameter(default=10)
......@@ -39,6 +40,8 @@ class EnrollmentsByWeek(luigi.Task):
results = {'source': ExternalURL(self.source)}
if self.offsets:
results.update({'offsets': ExternalURL(self.offsets)})
if self.statuses:
results.update({'statuses': ExternalURL(self.statuses)})
return results
......@@ -55,8 +58,10 @@ class EnrollmentsByWeek(luigi.Task):
cumulative_by_week = self.accumulate(count_by_day)
statuses = self.read_statuses()
with self.output().open('w') as output_file:
self.save_output(cumulative_by_week, output_file)
self.save_output(cumulative_by_week, statuses, output_file)
def read_source(self):
"""
......@@ -68,7 +73,7 @@ class EnrollmentsByWeek(luigi.Task):
"""
with self.input()['source'].open('r') as input_file:
data = self.read_tsv(input_file)
data = self.read_date_count_tsv(input_file)
# Reorganize the data. One column per course_id, with
# shared date index.
......@@ -100,25 +105,61 @@ class EnrollmentsByWeek(luigi.Task):
if self.input().get('offsets'):
with self.input()['offsets'].open('r') as offset_file:
data = self.read_tsv(offset_file)
data = self.read_date_count_tsv(offset_file)
return data
def read_tsv(self, input_file):
def read_date_count_tsv(self, input_file):
"""Read hadoop formatted tsv file into a pandas DataFrame."""
names = ['course_id', 'date', 'count']
# Not assuming any encoding, course_id will be read as plain string
data = pandas.read_csv(input_file,
names=names,
quoting=csv.QUOTE_NONE,
encoding=None,
delimiter='\t')
data = self.read_tsv(input_file, names)
data.date = pandas.to_datetime(data.date)
return data
def read_statuses(self):
"""
Read course statuses into a pandas DataFrame.
Returns:
Pandas dataframe with one row per course_id and
a column for the status. The status should
be either "past", "current" or "new". The index
for the DataFrame is the course_id.
Returns None if no statuses was specified.
"""
data = None
names = ['course_id', 'status']
if self.input().get('statuses'):
with self.input()['statuses'].open('r') as status_file:
data = self.read_tsv(status_file, names)
data = data.set_index('course_id')
return data
def read_tsv(self, input_file, names):
"""
Reads a tab-separated file into a DataFrame.
Args:
input_file (str): Path to the input file.
names (list): The names of the columns in the input file.
Returns:
A pandas DataFrame read from the file contents of the file.
"""
return pandas.read_csv(
input_file,
names=names,
quoting=csv.QUOTE_NONE,
encoding=None,
delimiter='\t'
)
def include_offsets(self, count_by_day, offsets):
"""
Add offsets to a dataframe inplace.
......@@ -157,12 +198,11 @@ class EnrollmentsByWeek(luigi.Task):
return results
def save_output(self, results, output_file):
# Make a row per course_id
def save_output(self, results, statuses, output_file):
results = results.transpose()
# List of fieldnames for the report
fieldnames = ['course_id'] + list(results.columns)
fieldnames = ['status', 'course_id', 'org_id'] + list(results.columns)
writer = csv.DictWriter(output_file, fieldnames)
writer.writerow(dict((k, k) for k in fieldnames)) # Write header
......@@ -171,12 +211,44 @@ class EnrollmentsByWeek(luigi.Task):
for k, v in counts_dict.iteritems():
yield k, '-' if numpy.isnan(v) else int(v)
for index, series in results.iterrows():
values = {'course_id': index}
for course_id, series in results.iterrows():
values = {
'course_id': course_id,
'status': self.get_status_for_course(course_id, statuses),
'org_id': self.get_org_id_for_course(course_id),
}
by_week_values = format_counts(series.to_dict())
values.update(by_week_values)
writer.writerow(values)
def get_status_for_course(self, course_id, statuses):
'''
Args:
course_id(str): The identifier for the course. Should be formatted
as <org_id>/<name>/<run>.
statuses(pandas.DataFrame): A pandas DataFrame mapping course_ids
to course statuses. It is expected to be indexed on course_id.
Returns:
The course's status as a string.
'''
if statuses is None or course_id not in statuses.index:
return '-'
return statuses.loc[course_id]['status']
def get_org_id_for_course(self, course_id):
'''
Args:
course_id(str): The identifier for the course. Should be formatted
as <org_id>/<name>/<run>.
Returns:
The org_id extracted from the course_id.
'''
split_course = course_id.split('/')
return '-' if len(split_course) != 3 else split_course[0]
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
......
......@@ -34,7 +34,7 @@ class FakeTarget(object):
class TestEnrollmentsByWeek(TestCase):
def run_task(self, source, date, weeks, offset=None):
def run_task(self, source, date, weeks, offset=None, statuses=None):
"""
Run task with fake targets.
......@@ -65,6 +65,10 @@ class TestEnrollmentsByWeek(TestCase):
if offset:
input_targets.update({'offsets': FakeTarget(reformat(offset))})
# Mock statuses only if specified.
if statuses:
input_targets.update({'statuses': FakeTarget(reformat(statuses))})
task.input = MagicMock(return_value=input_targets)
output_target = FakeTarget()
......@@ -110,7 +114,7 @@ class TestEnrollmentsByWeek(TestCase):
"""
res = self.run_task(source, '2013-01-21', 4)
weeks = set(['2012-12-31', '2013-01-07', '2013-01-14', '2013-01-21'])
self.assertEqual(weeks, set(str(w) for w in res.columns))
self.assertEqual(weeks | set(['org_id', 'status']), set(str(w) for w in res.columns))
course_1 = res.loc['course_1']
self.assertTrue(isnan(course_1['2012-12-31'])) # no data
......@@ -200,3 +204,34 @@ class TestEnrollmentsByWeek(TestCase):
destination = task.output()
self.assertIsInstance(destination, luigi.File)
def test_statuses(self):
source = """
course_1 2013-03-01 1
course_2 2013-03-07 1
course_3 2013-03-15 1
"""
statuses = """
course_2 new
course_3 past
"""
res = self.run_task(source, '2013-03-28', 4, statuses=statuses)
self.assertTrue(isnan(res.loc['course_1']['status']))
self.assertEquals(res.loc['course_2']['status'], 'new')
self.assertEquals(res.loc['course_3']['status'], 'past')
def test_organization_mapping(self):
source = """
foo/course_1/run 2013-03-01 1
bar/course_2/run 2013-03-07 1
baz/course_3 2013-03-15 1
course_4 2013-03-16 1
"""
res = self.run_task(source, '2013-03-28', 4)
self.assertEquals(res.loc['foo/course_1/run']['org_id'], 'foo')
self.assertEquals(res.loc['bar/course_2/run']['org_id'], 'bar')
self.assertTrue(isnan(res.loc['baz/course_3']['org_id']))
self.assertTrue(isnan(res.loc['course_4']['org_id']))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment