Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-analytics-data-api
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-analytics-data-api
Commits
a7128612
Commit
a7128612
authored
Feb 13, 2014
by
Brian Wilson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Define WeeklyIncrementalUsersAndEnrollments task.
Change-Id: I0c1e80771b8b7818096bf16aaf44da50518ad773
parent
4f71aadd
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
525 additions
and
55 deletions
+525
-55
edx/analytics/tasks/reports/enrollments.py
+3
-3
edx/analytics/tasks/reports/incremental_enrollments.py
+174
-0
edx/analytics/tasks/reports/tests/__init__.py
+0
-0
edx/analytics/tasks/reports/tests/test_incremental_enrollments.py
+256
-0
edx/analytics/tasks/reports/tests/test_total_enrollments.py
+8
-8
edx/analytics/tasks/reports/total_enrollments.py
+82
-44
setup.cfg
+2
-0
No files found.
edx/analytics/tasks/reports/enrollments.py
View file @
a7128612
...
@@ -23,7 +23,7 @@ class CourseEnrollmentCountMixin(object):
...
@@ -23,7 +23,7 @@ class CourseEnrollmentCountMixin(object):
# Not assuming any encoding, course_id will be read as plain string
# Not assuming any encoding, course_id will be read as plain string
data
=
read_tsv
(
input_file
,
names
)
data
=
read_tsv
(
input_file
,
names
)
data
.
date
=
pandas
.
to_datetime
(
data
.
date
)
data
.
date
=
pandas
.
to_datetime
(
data
.
date
)
# pylint: disable=maybe-no-member
return
data
return
data
def
initialize_daily_count
(
self
,
course_date_count_data
):
def
initialize_daily_count
(
self
,
course_date_count_data
):
...
@@ -75,7 +75,7 @@ class CourseEnrollmentCountMixin(object):
...
@@ -75,7 +75,7 @@ class CourseEnrollmentCountMixin(object):
# Flag values before the offset day with NaN,
# Flag values before the offset day with NaN,
# since they are not "available".
# since they are not "available".
not_available
=
count_by_day
.
index
<
date
not_available
=
count_by_day
.
index
<
date
count_by_day
.
loc
[
not_available
,
course_id
]
=
numpy
.
NaN
count_by_day
.
loc
[
not_available
,
course_id
]
=
numpy
.
NaN
# pylint: disable=maybe-no-member
def
calculate_total_enrollment
(
self
,
count_by_day
,
offsets
=
None
):
def
calculate_total_enrollment
(
self
,
count_by_day
,
offsets
=
None
):
"""
"""
...
@@ -174,7 +174,7 @@ class EnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
...
@@ -174,7 +174,7 @@ class EnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
statuses
=
self
.
read_statuses
()
statuses
=
self
.
read_statuses
()
with
self
.
output
()
.
open
(
'w'
)
as
output_file
:
with
self
.
output
()
.
open
(
'w'
)
as
output_file
:
# pylint: disable=maybe-no-member
self
.
save_output
(
weekly_enrollment_totals
,
statuses
,
output_file
)
self
.
save_output
(
weekly_enrollment_totals
,
statuses
,
output_file
)
def
read_source
(
self
):
def
read_source
(
self
):
...
...
edx/analytics/tasks/reports/incremental_enrollments.py
0 → 100644
View file @
a7128612
"""Reports about Incremental enrollment."""
from
datetime
import
timedelta
import
luigi
import
luigi.hdfs
import
pandas
from
edx.analytics.tasks.url
import
ExternalURL
,
get_target_from_url
from
edx.analytics.tasks.reports.total_enrollments
import
AllCourseEnrollmentCountMixin
class
WeeklyIncrementalUsersAndEnrollments
(
luigi
.
Task
,
AllCourseEnrollmentCountMixin
):
"""
Calculates weekly incremental changes in users and enrollments across courses.
Parameters:
registrations: Location of daily registrations per date. The format is a
TSV file, with fields date and count.
enrollments: Location of daily enrollments per date. The format is a
TSV file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is an
excel-compatible CSV file.
date: End date of the last week requested.
weeks: Number of weeks from the end date to request.
Output:
Excel-compatible CSV file with a header row and four
non-header rows. The first column is a title for the row, and
subsequent columns are the incremental counts for each week
requested. The first non-header row contains the change in
registered users during each week, and the second calculates
the daily average change in users. The third row contains the
change in total course enrollments during each week, and the
fourth row again averages this for a per-day average change in
course enrollments.
"""
registrations
=
luigi
.
Parameter
()
enrollments
=
luigi
.
Parameter
()
destination
=
luigi
.
Parameter
()
date
=
luigi
.
DateParameter
()
weeks
=
luigi
.
IntParameter
(
default
=
10
)
ROW_LABELS
=
{
'header'
:
'name'
,
'registration_change'
:
'Registration Changes'
,
'average_registration_change'
:
'Average Daily Registration Changes'
,
'enrollment_change'
:
'Enrollment Changes'
,
'average_enrollment_change'
:
'Average Daily Enrollment Changes'
,
}
def
requires
(
self
):
results
=
{
'enrollments'
:
ExternalURL
(
self
.
enrollments
),
'registrations'
:
ExternalURL
(
self
.
registrations
),
}
return
results
def
output
(
self
):
return
get_target_from_url
(
self
.
destination
)
def
run
(
self
):
# Load the user registration data into a pandas dataframe.
with
self
.
input
()[
'registrations'
]
.
open
(
'r'
)
as
input_file
:
daily_registration_changes
=
self
.
read_incremental_count_tsv
(
input_file
)
# Load the explicit enrollment data into a pandas dataframe.
daily_enrollment_changes
=
self
.
read_enrollments
()
# Remove (or merge or whatever) data for courses that
# would otherwise result in duplicate counts.
self
.
filter_duplicate_courses
(
daily_enrollment_changes
)
# Sum per-course counts to create a single series
# of total enrollment counts per day.
daily_overall_enrollment
=
daily_enrollment_changes
.
sum
(
axis
=
1
)
# Roll up values from DataFrame into per-week sums.
weekly_registration_changes
=
self
.
aggregate_per_week
(
daily_registration_changes
,
self
.
date
,
self
.
weeks
,
)
weekly_overall_enrollment
=
self
.
aggregate_per_week
(
daily_overall_enrollment
,
self
.
date
,
self
.
weeks
,
)
# Gather all required series into a single DataFrame
# in the form it should take for output:
weekly_report
=
self
.
assemble_report_dataframe
(
weekly_registration_changes
,
weekly_overall_enrollment
)
with
self
.
output
()
.
open
(
'w'
)
as
output_file
:
# pylint: disable=maybe-no-member
self
.
save_output
(
weekly_report
,
output_file
)
def
read_enrollments
(
self
):
"""
Read enrollments into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the enrollments data.
"""
with
self
.
input
()[
'enrollments'
]
.
open
(
'r'
)
as
input_file
:
course_date_count_data
=
self
.
read_course_date_count_tsv
(
input_file
)
data
=
self
.
initialize_daily_count
(
course_date_count_data
)
return
data
def
aggregate_per_week
(
self
,
daily_values
,
last_week_ending
,
weeks
):
"""
Aggregates daily values into weekly values.
Args:
daily_values: Pandas Series of daily values, indexed by date.
All dates are assumed to be contiguous, though their values may be NaN.
Dates do not have to cover the periods being sampled.
last_week_ending: last day of last week.
weeks: number of weeks to sample (including the last day)
Returns:
Pandas Series with weekly values, indexed by date of last day of week.
Any day with NaN will result in the corresponding week also being NaN.
As a consequence, any week requested that is not completely covered
by the input daily_values will be NaN.
"""
# For each date in daily input, find sum of day's value with the previous
# six days.
week_window
=
pandas
.
rolling_sum
(
daily_values
,
window
=
7
)
# Pull out the requested end-of-week days. If requested week dates are
# not in the range of the daily input, NaN values are returned.
days
=
[
last_week_ending
-
timedelta
(
i
*
7
)
for
i
in
reversed
(
xrange
(
weeks
))]
return
week_window
.
loc
[
days
]
@staticmethod
def
row_label
(
row_name
):
"""Returns label value for reference row, given its internal row name."""
return
WeeklyIncrementalUsersAndEnrollments
.
ROW_LABELS
[
row_name
]
def
assemble_report_dataframe
(
self
,
weekly_registration_changes
,
weekly_enrollment_changes
):
"""
Create a dataframe that represents the final report.
Args:
weekly_registration_changes: Pandas series, with date as index.
weekly_enrollment_changes: Pandas series, with date as index.
Returns:
A Pandas dataframe, with date as index and four columns.
"""
weekly_report
=
pandas
.
DataFrame
(
{
self
.
row_label
(
'registration_change'
):
weekly_registration_changes
,
self
.
row_label
(
'average_registration_change'
):
weekly_registration_changes
/
7.
,
self
.
row_label
(
'enrollment_change'
):
weekly_enrollment_changes
,
self
.
row_label
(
'average_enrollment_change'
):
weekly_enrollment_changes
/
7.
,
},
columns
=
[
self
.
row_label
(
'registration_change'
),
self
.
row_label
(
'average_registration_change'
),
self
.
row_label
(
'enrollment_change'
),
self
.
row_label
(
'average_enrollment_change'
),
]
)
return
weekly_report
edx/analytics/tasks/reports/tests/__init__.py
0 → 100644
View file @
a7128612
edx/analytics/tasks/reports/tests/test_incremental_enrollments.py
0 → 100644
View file @
a7128612
"""Tests for Total Users and Enrollment report."""
import
datetime
import
textwrap
from
StringIO
import
StringIO
from
mock
import
MagicMock
from
numpy
import
isnan
# pylint: disable=no-name-in-module
import
pandas
from
edx.analytics.tasks.tests
import
unittest
from
edx.analytics.tasks.tests.target
import
FakeTarget
from
edx.analytics.tasks.reports.incremental_enrollments
import
WeeklyIncrementalUsersAndEnrollments
class
TestWeeklyIncrementalUsersAndEnrollments
(
unittest
.
TestCase
):
"""Tests for WeeklyIncrementalUsersAndEnrollments class."""
# pylint: disable=maybe-no-member
@staticmethod
def
row_label
(
row_name
):
"""Returns label value for reference row, given its internal row name."""
return
WeeklyIncrementalUsersAndEnrollments
.
ROW_LABELS
[
row_name
]
def
run_task
(
self
,
registrations
,
enrollments
,
date
,
weeks
):
"""
Run task with fake targets.
Returns:
the task output as a pandas dataframe.
"""
parsed_date
=
datetime
.
datetime
.
strptime
(
date
,
'
%
Y-
%
m-
%
d'
)
.
date
()
task
=
WeeklyIncrementalUsersAndEnrollments
(
registrations
=
'fake_registrations'
,
enrollments
=
'fake_enrollments'
,
destination
=
'fake_destination'
,
date
=
parsed_date
,
weeks
=
weeks
)
# Default missing inputs
if
registrations
is
None
:
registrations
=
"""
2013-01-01 10
2013-01-10 20
"""
if
enrollments
is
None
:
enrollments
=
"""
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
# Mock the input and output targets
def
reformat
(
string
):
"""Reformat string to make it like a TSV."""
return
textwrap
.
dedent
(
string
)
.
strip
()
.
replace
(
' '
,
'
\t
'
)
input_targets
=
{
'enrollments'
:
FakeTarget
(
reformat
(
enrollments
)),
'registrations'
:
FakeTarget
(
reformat
(
registrations
)),
}
task
.
input
=
MagicMock
(
return_value
=
input_targets
)
output_target
=
FakeTarget
()
task
.
output
=
MagicMock
(
return_value
=
output_target
)
# Run the task and parse the output into a pandas dataframe
task
.
run
()
data
=
output_target
.
buffer
.
read
()
result
=
pandas
.
read_csv
(
StringIO
(
data
),
na_values
=
[
'-'
],
index_col
=
self
.
row_label
(
'header'
))
return
result
def
test_parse_registrations
(
self
):
registrations
=
"""
2012-12-20 1
2013-01-01 10
2013-01-02 11
2013-01-03 12
2013-01-09 13
2013-01-17 14
"""
res
=
self
.
run_task
(
registrations
,
None
,
'2013-01-17'
,
3
)
self
.
assertEqual
(
set
([
'2013-01-03'
,
'2013-01-10'
,
'2013-01-17'
]),
set
(
res
.
columns
))
inc_registration
=
res
.
loc
[
self
.
row_label
(
'registration_change'
)]
self
.
assertEqual
(
inc_registration
[
'2013-01-03'
],
33
)
self
.
assertEqual
(
inc_registration
[
'2013-01-10'
],
13
)
self
.
assertEqual
(
inc_registration
[
'2013-01-17'
],
14
)
def
test_parse_enrollments
(
self
):
enrollments
=
"""
course_0 2012-12-20 1
course_1 2013-01-01 10
course_1 2013-01-02 11
course_1 2013-01-03 12
course_1 2013-01-09 13
course_1 2013-01-17 14
course_2 2013-01-01 15
course_3 2013-01-01 16
"""
res
=
self
.
run_task
(
None
,
enrollments
,
'2013-01-17'
,
3
)
self
.
assertEqual
(
set
([
'2013-01-03'
,
'2013-01-10'
,
'2013-01-17'
]),
set
(
res
.
columns
))
inc_enrollment
=
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)]
self
.
assertEqual
(
inc_enrollment
[
'2013-01-03'
],
64
)
self
.
assertEqual
(
inc_enrollment
[
'2013-01-10'
],
13
)
self
.
assertEqual
(
inc_enrollment
[
'2013-01-17'
],
14
)
def
test_week_grouping
(
self
):
# A range of valid data that starts on a week boundary:
registrations
=
"""
2013-01-01 11
2013-01-10 22
"""
# A range of valid data that ends on a week boundary:
enrollments
=
"""
course_1 2013-01-06 13
course_1 2013-01-14 14
"""
res
=
self
.
run_task
(
registrations
,
enrollments
,
'2013-01-21'
,
4
)
weeks
=
set
([
'2012-12-31'
,
'2013-01-07'
,
'2013-01-14'
,
'2013-01-21'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
inc_registration
=
res
.
loc
[
self
.
row_label
(
'registration_change'
)]
self
.
assertTrue
(
isnan
(
inc_registration
[
'2012-12-31'
]))
# no data
self
.
assertEqual
(
inc_registration
[
'2013-01-07'
],
11
)
self
.
assertTrue
(
isnan
(
inc_registration
[
'2013-01-14'
]))
# no data
self
.
assertTrue
(
isnan
(
inc_registration
[
'2013-01-21'
]))
# no data
inc_enrollment
=
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)]
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2012-12-31'
]))
# no data
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2013-01-07'
]))
# no data
self
.
assertEqual
(
inc_enrollment
[
'2013-01-14'
],
14
)
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2013-01-21'
]))
# no data
def
test_less_than_week
(
self
):
registrations
=
"""
2013-01-01 11
2013-01-05 22
"""
enrollments
=
"""
course_1 2013-01-15 13
course_1 2013-01-17 14
"""
res
=
self
.
run_task
(
registrations
,
enrollments
,
'2013-01-21'
,
4
)
weeks
=
set
([
'2012-12-31'
,
'2013-01-07'
,
'2013-01-14'
,
'2013-01-21'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
inc_registration
=
res
.
loc
[
self
.
row_label
(
'registration_change'
)]
inc_enrollment
=
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)]
for
date
in
weeks
:
self
.
assertTrue
(
isnan
(
inc_registration
[
date
]))
self
.
assertTrue
(
isnan
(
inc_enrollment
[
date
]))
def
test_non_overlapping_weeks
(
self
):
registrations
=
"""
2013-01-01 11
2013-01-10 22
"""
enrollments
=
"""
course_1 2013-01-15 13
course_1 2013-01-21 14
"""
res
=
self
.
run_task
(
registrations
,
enrollments
,
'2013-01-21'
,
4
)
weeks
=
set
([
'2012-12-31'
,
'2013-01-07'
,
'2013-01-14'
,
'2013-01-21'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
inc_registration
=
res
.
loc
[
self
.
row_label
(
'registration_change'
)]
self
.
assertTrue
(
isnan
(
inc_registration
[
'2012-12-31'
]))
# no data
self
.
assertEqual
(
inc_registration
[
'2013-01-07'
],
11
)
self
.
assertTrue
(
isnan
(
inc_registration
[
'2013-01-14'
]))
# no data
self
.
assertTrue
(
isnan
(
inc_registration
[
'2013-01-21'
]))
# no data
inc_enrollment
=
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)]
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2012-12-31'
]))
# no data
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2013-01-07'
]))
# no data
self
.
assertTrue
(
isnan
(
inc_enrollment
[
'2013-01-14'
]))
# no data
self
.
assertEqual
(
inc_enrollment
[
'2013-01-21'
],
27
)
def
test_incremental_registration
(
self
):
registrations
=
"""
2013-02-01 4
2013-02-04 4
2013-02-08 5
2013-02-12 -2
2013-02-14 3
2013-02-15 -2
2013-02-16 6
2013-02-18 6
"""
res
=
self
.
run_task
(
registrations
,
None
,
'2013-02-18'
,
2
)
weeks
=
set
([
'2013-02-11'
,
'2013-02-18'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
inc_registration
=
res
.
loc
[
self
.
row_label
(
'registration_change'
)]
self
.
assertEqual
(
inc_registration
[
'2013-02-11'
],
5
)
self
.
assertEqual
(
inc_registration
[
'2013-02-18'
],
11
)
# also test averages:
avg_registration
=
res
.
loc
[
self
.
row_label
(
'average_registration_change'
)]
self
.
assertEqual
(
avg_registration
[
'2013-02-11'
],
5
/
7
)
self
.
assertEqual
(
avg_registration
[
'2013-02-18'
],
11
/
7
)
def
test_incremental_enrollment
(
self
):
enrollments
=
"""
course_1 2013-02-01 4
course_1 2013-02-04 4
course_1 2013-02-08 5
course_1 2013-02-12 -4
course_1 2013-02-16 6
course_1 2013-02-18 6
course_2 2013-02-12 2
course_2 2013-02-14 3
course_2 2013-02-15 -2
"""
res
=
self
.
run_task
(
None
,
enrollments
,
'2013-02-18'
,
2
)
weeks
=
set
([
'2013-02-11'
,
'2013-02-18'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
inc_enrollment
=
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)]
self
.
assertEqual
(
inc_enrollment
[
'2013-02-11'
],
5
)
self
.
assertEqual
(
inc_enrollment
[
'2013-02-18'
],
11
)
# also test averages:
avg_enrollment
=
res
.
loc
[
self
.
row_label
(
'average_enrollment_change'
)]
self
.
assertEqual
(
avg_enrollment
[
'2013-02-11'
],
5
/
7
)
self
.
assertEqual
(
avg_enrollment
[
'2013-02-18'
],
11
/
7
)
def
test_output_row_order
(
self
):
res
=
self
.
run_task
(
None
,
None
,
'2013-02-18'
,
2
)
expected_rows
=
[
self
.
row_label
(
'registration_change'
),
self
.
row_label
(
'average_registration_change'
),
self
.
row_label
(
'enrollment_change'
),
self
.
row_label
(
'average_enrollment_change'
),
]
self
.
assertEqual
(
res
.
index
.
tolist
(),
expected_rows
)
def
test_unicode_course_id
(
self
):
course_id
=
u'course_
\u2603
'
enrollments
=
u"""
{course_id} 2013-03-20 1
{course_id} 2013-04-01 2
{course_id} 2013-04-02 3
"""
.
format
(
course_id
=
course_id
)
res
=
self
.
run_task
(
None
,
enrollments
.
encode
(
'utf-8'
),
'2013-04-02'
,
2
)
self
.
assertEqual
(
res
.
loc
[
self
.
row_label
(
'enrollment_change'
)][
'2013-04-02'
],
5
)
edx/analytics/tasks/reports/tests/test_total_enrollments.py
View file @
a7128612
...
@@ -10,13 +10,13 @@ from mock import MagicMock
...
@@ -10,13 +10,13 @@ from mock import MagicMock
from
numpy
import
isnan
from
numpy
import
isnan
import
pandas
import
pandas
from
edx.analytics.tasks.reports.total_enrollments
import
TotalUsersAndEnrollmentsByWeek
,
TOTAL_ENROLLMENT_ROWNAME
from
edx.analytics.tasks.reports.total_enrollments
import
WeeklyAllUsersAndEnrollments
,
TOTAL_ENROLLMENT_ROWNAME
from
edx.analytics.tasks.tests
import
unittest
from
edx.analytics.tasks.tests
import
unittest
from
edx.analytics.tasks.tests.target
import
FakeTarget
from
edx.analytics.tasks.tests.target
import
FakeTarget
class
Test
TotalUsersAndEnrollmentsByWeek
(
unittest
.
TestCase
):
class
Test
WeeklyAllUsersAndEnrollments
(
unittest
.
TestCase
):
"""Tests for
TotalUsersAndEnrollmentsByWeek
class."""
"""Tests for
WeeklyAllUsersAndEnrollments
class."""
def
run_task
(
self
,
source
,
date
,
weeks
,
offset
=
None
,
history
=
None
):
def
run_task
(
self
,
source
,
date
,
weeks
,
offset
=
None
,
history
=
None
):
"""
"""
...
@@ -29,7 +29,7 @@ class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase):
...
@@ -29,7 +29,7 @@ class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase):
parsed_date
=
datetime
.
datetime
.
strptime
(
date
,
'
%
Y-
%
m-
%
d'
)
.
date
()
parsed_date
=
datetime
.
datetime
.
strptime
(
date
,
'
%
Y-
%
m-
%
d'
)
.
date
()
# Make offsets None if it was not specified.
# Make offsets None if it was not specified.
task
=
TotalUsersAndEnrollmentsByWeek
(
task
=
WeeklyAllUsersAndEnrollments
(
source
=
'fake_source'
,
source
=
'fake_source'
,
offsets
=
'fake_offsets'
if
offset
else
None
,
offsets
=
'fake_offsets'
if
offset
else
None
,
history
=
'fake_history'
if
history
else
None
,
history
=
'fake_history'
if
history
else
None
,
...
@@ -161,10 +161,10 @@ class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase):
...
@@ -161,10 +161,10 @@ class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase):
def
test_task_urls
(
self
):
def
test_task_urls
(
self
):
date
=
datetime
.
date
(
2013
,
01
,
20
)
date
=
datetime
.
date
(
2013
,
01
,
20
)
task
=
TotalUsersAndEnrollmentsByWeek
(
source
=
's3://bucket/path/'
,
task
=
WeeklyAllUsersAndEnrollments
(
source
=
's3://bucket/path/'
,
offsets
=
's3://bucket/file.txt'
,
offsets
=
's3://bucket/file.txt'
,
destination
=
'file://path/file.txt'
,
destination
=
'file://path/file.txt'
,
date
=
date
)
date
=
date
)
requires
=
task
.
requires
()
requires
=
task
.
requires
()
...
...
edx/analytics/tasks/reports/total_enrollments.py
View file @
a7128612
...
@@ -8,6 +8,7 @@ import luigi.hdfs
...
@@ -8,6 +8,7 @@ import luigi.hdfs
import
numpy
import
numpy
import
pandas
import
pandas
from
edx.analytics.tasks.util.tsv
import
read_tsv
from
edx.analytics.tasks.url
import
ExternalURL
,
get_target_from_url
from
edx.analytics.tasks.url
import
ExternalURL
,
get_target_from_url
from
edx.analytics.tasks.reports.enrollments
import
CourseEnrollmentCountMixin
from
edx.analytics.tasks.reports.enrollments
import
CourseEnrollmentCountMixin
...
@@ -16,7 +17,87 @@ ROWNAME_HEADER = 'name'
...
@@ -16,7 +17,87 @@ ROWNAME_HEADER = 'name'
TOTAL_ENROLLMENT_ROWNAME
=
'Total Enrollment'
TOTAL_ENROLLMENT_ROWNAME
=
'Total Enrollment'
class
TotalUsersAndEnrollmentsByWeek
(
luigi
.
Task
,
CourseEnrollmentCountMixin
):
class
AllCourseEnrollmentCountMixin
(
CourseEnrollmentCountMixin
):
def
read_date_count_tsv
(
self
,
input_file
):
"""
Read TSV containing dates and corresponding counts into a pandas Series.
NANs are not filled in here, as more than one filling strategy is
used with such files.
"""
names
=
[
'date'
,
'count'
]
data
=
read_tsv
(
input_file
,
names
)
data
.
date
=
pandas
.
to_datetime
(
data
.
date
)
data
=
data
.
set_index
(
'date'
)
date_range
=
pandas
.
date_range
(
min
(
data
.
index
),
max
(
data
.
index
))
data
=
data
.
reindex
(
date_range
)
# return as a Series
return
data
[
'count'
]
def
read_incremental_count_tsv
(
self
,
input_file
):
"""
Read TSV containing dates and corresponding counts into a pandas Series.
Interstitial incremental counts are filled as zeroes.
"""
return
self
.
read_date_count_tsv
(
input_file
)
.
fillna
(
0
)
def
read_total_count_tsv
(
self
,
input_file
):
# TODO: this is a placeholder for reading in historical counts,
# such as total enrollment numbers. It will
# need to interpolate the interstitial NANs.
data
=
self
.
read_date_count_tsv
(
input_file
)
return
data
def
filter_duplicate_courses
(
self
,
daily_enrollment_totals
):
# TODO: implement this for real. (This is just a placeholder.)
# At this point we should remove data for courses that are
# represented by other courses, because the students have been
# moved to the new course. Perhaps this should actually
# perform a merge of the two courses, since we would want the
# history of one before the move date, and the history of the
# second after that date.
# Note that this is not the same filtering that would be applied
# to the EnrollmentsByWeek report.
pass
def
save_output
(
self
,
results
,
output_file
):
"""
Write output to CSV file.
Args:
results: a pandas DataFrame object containing series data
per row to be output.
"""
# transpose the dataframe so that weeks are columns, and output:
results
=
results
.
transpose
()
# List of fieldnames for the report
fieldnames
=
[
ROWNAME_HEADER
]
+
list
(
results
.
columns
)
writer
=
csv
.
DictWriter
(
output_file
,
fieldnames
)
writer
.
writerow
(
dict
((
k
,
k
)
for
k
in
fieldnames
))
# Write header
def
format_counts
(
counts_dict
):
for
k
,
v
in
counts_dict
.
iteritems
():
yield
k
,
'-'
if
numpy
.
isnan
(
v
)
else
int
(
v
)
for
series_name
,
series
in
results
.
iterrows
():
values
=
{
ROWNAME_HEADER
:
series_name
,
}
by_week_values
=
format_counts
(
series
.
to_dict
())
values
.
update
(
by_week_values
)
writer
.
writerow
(
values
)
class
WeeklyAllUsersAndEnrollments
(
luigi
.
Task
,
AllCourseEnrollmentCountMixin
):
"""
"""
Calculates total users and enrollments across all (known) courses per week.
Calculates total users and enrollments across all (known) courses per week.
...
@@ -168,46 +249,3 @@ class TotalUsersAndEnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
...
@@ -168,46 +249,3 @@ class TotalUsersAndEnrollmentsByWeek(luigi.Task, CourseEnrollmentCountMixin):
# For gaps in history, values should be extrapolated.
# For gaps in history, values should be extrapolated.
# Also may to need to reindex, since new dates are being added.
# Also may to need to reindex, since new dates are being added.
pass
pass
def
filter_duplicate_courses
(
self
,
daily_enrollment_totals
):
# TODO: implement this for real. (This is just a placeholder.)
# At this point we should remove data for courses that are
# represented by other courses, because the students have been
# moved to the new course. Perhaps this should actually
# perform a merge of the two courses, since we would want the
# history of one before the move date, and the history of the
# second after that date.
# Note that this is not the same filtering that would be applied
# to the EnrollmentsByWeek report.
pass
def
save_output
(
self
,
results
,
output_file
):
"""
Write output to CSV file.
Args:
results: a pandas DataFrame object containing series data
per row to be output.
"""
# transpose the dataframe so that weeks are columns, and output:
results
=
results
.
transpose
()
# List of fieldnames for the report
fieldnames
=
[
ROWNAME_HEADER
]
+
list
(
results
.
columns
)
writer
=
csv
.
DictWriter
(
output_file
,
fieldnames
)
writer
.
writerow
(
dict
((
k
,
k
)
for
k
in
fieldnames
))
# Write header
def
format_counts
(
counts_dict
):
for
k
,
v
in
counts_dict
.
iteritems
():
yield
k
,
'-'
if
numpy
.
isnan
(
v
)
else
int
(
v
)
for
series_name
,
series
in
results
.
iterrows
():
values
=
{
ROWNAME_HEADER
:
series_name
,
}
by_week_values
=
format_counts
(
series
.
to_dict
())
values
.
update
(
by_week_values
)
writer
.
writerow
(
values
)
setup.cfg
View file @
a7128612
...
@@ -25,6 +25,8 @@ edx.analytics.tasks =
...
@@ -25,6 +25,8 @@ edx.analytics.tasks =
s3-sync = edx.analytics.tasks.s3:S3Sync
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek
enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek
total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments
inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
users-per-day = edx.analytics.tasks.user_registrations:UserRegistrationsPerDay
users-per-day = edx.analytics.tasks.user_registrations:UserRegistrationsPerDay
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment