Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-analytics-data-api
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-analytics-data-api
Commits
9cce0818
Commit
9cce0818
authored
Jan 29, 2014
by
Carlos Andrés Rocha
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add report of enrollments per week.
parent
5a2313f6
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
405 additions
and
0 deletions
+405
-0
edx/analytics/reports/__init__.py
+0
-0
edx/analytics/reports/enrollments.py
+199
-0
edx/analytics/reports/tests/test_enrollments.py
+202
-0
requirements/default.txt
+2
-0
setup.cfg
+2
-0
No files found.
edx/analytics/reports/__init__.py
0 → 100644
View file @
9cce0818
edx/analytics/reports/enrollments.py
0 → 100644
View file @
9cce0818
"""Enrollment related reports"""
import
csv
from
datetime
import
timedelta
import
luigi
import
luigi.hdfs
import
numpy
import
pandas
class
EnrollmentsByWeek
(
luigi
.
Task
):
"""Calculates cumulative enrollments per week per course.
Parameters:
source: Location of daily enrollments per date. The format is a hadoop
tsv file, with fields course_id, date and count.
destination: Location of the resulting report. The output format is a
excel csv file with course_id and one column per requested week.
offsets: Location of seed values for each course. The format is a
hadoop tsv file, with fields course_id, date and offset.
date: End date of the last week requested.
weeks: Number of weeks from the end date to request.
Output:
Excel CSV file with one row per course. The columns are
the cumulative enrollments counts for each week requested.
"""
source
=
luigi
.
Parameter
()
destination
=
luigi
.
Parameter
()
offsets
=
luigi
.
Parameter
(
default
=
None
)
date
=
luigi
.
DateParameter
()
weeks
=
luigi
.
IntParameter
(
default
=
10
)
def
requires
(
self
):
results
=
{
'source'
:
ExternalURL
(
self
.
source
)}
if
self
.
offsets
:
results
.
update
({
'offsets'
:
ExternalURL
(
self
.
offsets
)})
return
results
def
output
(
self
):
return
get_target_from_url
(
self
.
destination
)
def
run
(
self
):
# Load the data into a pandas dataframe
count_by_day
=
self
.
read_source
()
offsets
=
self
.
read_offsets
()
if
offsets
is
not
None
:
self
.
include_offsets
(
count_by_day
,
offsets
)
cumulative_by_week
=
self
.
accumulate
(
count_by_day
)
with
self
.
output
()
.
open
(
'w'
)
as
output_file
:
self
.
save_output
(
cumulative_by_week
,
output_file
)
def
read_source
(
self
):
"""
Read source into a pandas DataFrame.
Returns:
Pandas dataframe with one column per course_id. Indexed
for the time interval available in the source data.
"""
with
self
.
input
()[
'source'
]
.
open
(
'r'
)
as
input_file
:
data
=
self
.
read_tsv
(
input_file
)
# Reorganize the data. One column per course_id, with
# shared date index.
data
=
data
.
pivot
(
index
=
'date'
,
columns
=
'course_id'
,
values
=
'count'
)
# Complete the range of data to include all days between
# the dates of the first and last events.
date_range
=
pandas
.
date_range
(
min
(
data
.
index
),
max
(
data
.
index
))
data
=
data
.
reindex
(
date_range
)
data
=
data
.
fillna
(
0
)
return
data
def
read_offsets
(
self
):
"""
Read offsets into a pandas DataFrame.
Returns:
Pandas dataframe with one row per course_id and
columns for the date and count of the offset.
Returns None if no offset was specified.
"""
data
=
None
if
self
.
input
()
.
get
(
'offsets'
):
with
self
.
input
()[
'offsets'
]
.
open
(
'r'
)
as
offset_file
:
data
=
self
.
read_tsv
(
offset_file
)
return
data
def
read_tsv
(
self
,
input_file
):
"""Read hadoop formatted tsv file into a pandas DataFrame."""
names
=
[
'course_id'
,
'date'
,
'count'
]
# Not assuming any encoding, course_id will be read as plain string
data
=
pandas
.
read_csv
(
input_file
,
names
=
names
,
quoting
=
csv
.
QUOTE_NONE
,
encoding
=
None
,
delimiter
=
'
\t
'
)
data
.
date
=
pandas
.
to_datetime
(
data
.
date
)
return
data
def
include_offsets
(
self
,
count_by_day
,
offsets
):
"""
Add offsets to a dataframe inplace.
Args:
count_by_day: Dataframe with format from `read_source`
offsets: Dataframe with format from `read_offsets`.
"""
for
n
,
(
course_id
,
date
,
count
)
in
offsets
.
iterrows
():
if
course_id
in
count_by_day
.
columns
:
# The offsets are computed to begining of that day. We
# add them to the counts by the end of that day to
# get the correct count for the day.
count_by_day
.
loc
[
date
,
course_id
]
+=
count
# Flag values before the offset day with NaN,
# since they are not "available".
not_available
=
count_by_day
.
index
<
date
count_by_day
.
loc
[
not_available
,
course_id
]
=
numpy
.
NaN
def
accumulate
(
self
,
count_by_day
):
# Calculate the cumulative sum per day of the input.
# Entries with NaN stay NaN.
# At this stage only the data prior to the offset should contain NaN.
cumulative_sum
=
count_by_day
.
cumsum
()
# List the dates of the last day of each week requested.
start
,
weeks
=
self
.
date
,
self
.
weeks
days
=
[
start
-
timedelta
(
i
*
7
)
for
i
in
reversed
(
xrange
(
0
,
weeks
))]
# Sample the cumulative data on the requested days.
# Result is NaN if there is no data available for that date.
results
=
cumulative_sum
.
loc
[
days
]
return
results
def
save_output
(
self
,
results
,
output_file
):
# Make a row per course_id
results
=
results
.
transpose
()
# List of fieldnames for the report
fieldnames
=
[
'course_id'
]
+
list
(
results
.
columns
)
writer
=
csv
.
DictWriter
(
output_file
,
fieldnames
)
writer
.
writerow
(
dict
((
k
,
k
)
for
k
in
fieldnames
))
# Write header
def
format_counts
(
counts_dict
):
for
k
,
v
in
counts_dict
.
iteritems
():
yield
k
,
'-'
if
numpy
.
isnan
(
v
)
else
int
(
v
)
for
index
,
series
in
results
.
iterrows
():
values
=
{
'course_id'
:
index
}
by_week_values
=
format_counts
(
series
.
to_dict
())
values
.
update
(
by_week_values
)
writer
.
writerow
(
values
)
class
ExternalURL
(
luigi
.
ExternalTask
):
"""Simple Task that returns a target based on its URL"""
url
=
luigi
.
Parameter
()
def
output
(
self
):
return
get_target_from_url
(
self
.
url
)
def
get_target_from_url
(
url
):
"""Returns a luigi target based on the url scheme"""
# TODO: Make external utility to resolve target by URL,
# including s3, s3n, etc.
if
url
.
startswith
(
'hdfs://'
)
or
url
.
startswith
(
's3://'
):
if
url
.
endswith
(
'/'
):
return
luigi
.
hdfs
.
HdfsTarget
(
url
,
format
=
luigi
.
hdfs
.
PlainDir
)
else
:
return
luigi
.
hdfs
.
HdfsTarget
(
url
)
else
:
return
luigi
.
LocalTarget
(
url
)
edx/analytics/reports/tests/test_enrollments.py
0 → 100644
View file @
9cce0818
from
contextlib
import
contextmanager
import
datetime
import
textwrap
from
StringIO
import
StringIO
from
unittest
import
TestCase
import
luigi
import
luigi.hdfs
from
mock
import
MagicMock
from
numpy
import
isnan
import
pandas
from
edx.analytics.reports.enrollments
import
EnrollmentsByWeek
from
edx.analytics.reports.enrollments
import
ExternalURL
class
FakeTarget
(
object
):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def
__init__
(
self
,
value
=
''
):
self
.
buffer
=
StringIO
(
value
)
# Rewind the buffer head so the value can be read
self
.
buffer
.
seek
(
0
)
@contextmanager
def
open
(
self
,
*
args
,
**
kwargs
):
yield
self
.
buffer
# Rewind the head for easy reading
self
.
buffer
.
seek
(
0
)
class
TestEnrollmentsByWeek
(
TestCase
):
def
run_task
(
self
,
source
,
date
,
weeks
,
offset
=
None
):
"""
Run task with fake targets.
Returns:
the task output as a pandas dataframe.
"""
parsed_date
=
datetime
.
datetime
.
strptime
(
date
,
'
%
Y-
%
m-
%
d'
)
.
date
()
# Make offsets None if it was not specified.
task
=
EnrollmentsByWeek
(
source
=
'fake_source'
,
offsets
=
'fake_offsets'
if
offset
else
None
,
destination
=
'fake_destination'
,
date
=
parsed_date
,
weeks
=
weeks
)
# Mock the input and output targets
def
reformat
(
string
):
# Reformat string to make it like a hadoop tsv
return
textwrap
.
dedent
(
string
)
.
strip
()
.
replace
(
' '
,
'
\t
'
)
input_targets
=
{
'source'
:
FakeTarget
(
reformat
(
source
)),
}
# Mock offsets only if specified.
if
offset
:
input_targets
.
update
({
'offsets'
:
FakeTarget
(
reformat
(
offset
))})
task
.
input
=
MagicMock
(
return_value
=
input_targets
)
output_target
=
FakeTarget
()
task
.
output
=
MagicMock
(
return_value
=
output_target
)
# Run the task and parse the output into a pandas dataframe
task
.
run
()
data
=
output_target
.
buffer
.
read
()
result
=
pandas
.
read_csv
(
StringIO
(
data
),
na_values
=
[
'-'
],
index_col
=
'course_id'
)
return
result
def
test_parse_source
(
self
):
source
=
"""
course_1 2013-01-01 10
course_1 2013-01-02 10
course_1 2013-01-03 10
course_1 2013-01-09 10
course_1 2013-01-17 10
course_2 2013-01-01 10
course_3 2013-01-01 10
"""
res
=
self
.
run_task
(
source
,
'2013-01-17'
,
3
)
self
.
assertEqual
(
set
([
'course_1'
,
'course_2'
,
'course_3'
]),
set
(
res
.
index
))
self
.
assertEqual
(
res
.
loc
[
'course_1'
][
'2013-01-03'
],
30
)
self
.
assertEqual
(
res
.
loc
[
'course_1'
][
'2013-01-10'
],
40
)
self
.
assertEqual
(
res
.
loc
[
'course_1'
][
'2013-01-17'
],
50
)
self
.
assertEqual
(
res
.
loc
[
'course_2'
][
'2013-01-03'
],
10
)
self
.
assertEqual
(
res
.
loc
[
'course_3'
][
'2013-01-03'
],
10
)
def
test_week_grouping
(
self
):
source
=
"""
course_1 2013-01-06 10
course_1 2013-01-14 10
"""
res
=
self
.
run_task
(
source
,
'2013-01-21'
,
4
)
weeks
=
set
([
'2012-12-31'
,
'2013-01-07'
,
'2013-01-14'
,
'2013-01-21'
])
self
.
assertEqual
(
weeks
,
set
(
str
(
w
)
for
w
in
res
.
columns
))
course_1
=
res
.
loc
[
'course_1'
]
self
.
assertTrue
(
isnan
(
course_1
[
'2012-12-31'
]))
# no data
self
.
assertEqual
(
course_1
[
'2013-01-07'
],
10
)
self
.
assertEqual
(
course_1
[
'2013-01-14'
],
20
)
self
.
assertTrue
(
isnan
(
course_1
[
'2013-01-21'
]))
# no data
def
test_cumulative
(
self
):
source
=
"""
course_1 2013-02-01 4
course_1 2013-02-04 4
course_1 2013-02-08 5
course_1 2013-02-12 -4
course_1 2013-02-16 6
course_1 2013-02-18 6
course_2 2013-02-12 2
course_2 2013-02-14 3
course_2 2013-02-15 -2
"""
res
=
self
.
run_task
(
source
,
'2013-02-18'
,
2
)
course_1
=
res
.
loc
[
'course_1'
]
self
.
assertEqual
(
course_1
[
'2013-02-11'
],
13
)
self
.
assertEqual
(
course_1
[
'2013-02-18'
],
21
)
course_2
=
res
.
loc
[
'course_2'
]
self
.
assertEqual
(
course_2
[
'2013-02-11'
],
0
)
self
.
assertEqual
(
course_2
[
'2013-02-18'
],
3
)
def
test_offsets
(
self
):
source
=
"""
course_1 2013-03-01 1
course_1 2013-03-30 2
course_2 2013-03-07 1
course_2 2013-03-08 1
course_2 2013-03-10 1
course_2 2013-03-13 1
course_3 2013-03-15 1
course_3 2013-03-18 1
course_3 2013-03-19 1
"""
offset
=
"""
course_2 2013-03-07 8
course_3 2013-03-15 6
"""
res
=
self
.
run_task
(
source
,
'2013-03-28'
,
4
,
offset
=
offset
)
course_2
=
res
.
loc
[
'course_2'
]
self
.
assertEqual
(
course_2
[
'2013-03-07'
],
9
)
self
.
assertEqual
(
course_2
[
'2013-03-14'
],
12
)
course_3
=
res
.
loc
[
'course_3'
]
self
.
assertTrue
(
isnan
(
course_3
[
'2013-03-07'
]))
# no data
self
.
assertTrue
(
isnan
(
course_3
[
'2013-03-14'
]))
# no data
self
.
assertEqual
(
course_3
[
'2013-03-21'
],
9
)
def
test_unicode
(
self
):
course_id
=
u'course_
\u2603
'
source
=
u"""
{course_id} 2013-04-01 1
{course_id} 2013-04-02 1
"""
.
format
(
course_id
=
course_id
)
res
=
self
.
run_task
(
source
.
encode
(
'utf-8'
),
'2013-04-02'
,
1
)
self
.
assertEqual
(
res
.
loc
[
course_id
.
encode
(
'utf-8'
)][
'2013-04-02'
],
2
)
def
test_task_urls
(
self
):
date
=
datetime
.
date
(
2013
,
01
,
20
)
task
=
EnrollmentsByWeek
(
source
=
's3://bucket/path/'
,
offsets
=
's3://bucket/file.txt'
,
destination
=
'file://path/file.txt'
,
date
=
date
)
requires
=
task
.
requires
()
source
=
requires
[
'source'
]
.
output
()
self
.
assertIsInstance
(
source
,
luigi
.
hdfs
.
HdfsTarget
)
self
.
assertEqual
(
source
.
format
,
luigi
.
hdfs
.
PlainDir
)
offsets
=
requires
[
'offsets'
]
.
output
()
self
.
assertIsInstance
(
offsets
,
luigi
.
hdfs
.
HdfsTarget
)
self
.
assertEqual
(
offsets
.
format
,
luigi
.
hdfs
.
Plain
)
destination
=
task
.
output
()
self
.
assertIsInstance
(
destination
,
luigi
.
File
)
requirements/default.txt
View file @
9cce0818
argparse==1.2.1
boto==2.22.1
numpy==1.8.0
pandas==0.13.0
pbr==0.5.23
stevedore==0.13
tornado==3.1.1
...
...
setup.cfg
View file @
9cce0818
...
...
@@ -23,3 +23,4 @@ edx.analytics.tasks =
s3-copy = edx.analytics.tasks.s3:S3Copy
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment