Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-analytics-data-api
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-analytics-data-api
Commits
3963d379
Commit
3963d379
authored
May 06, 2014
by
Brian Wilson
Committed by
Gerrit Code Review
May 06, 2014
Browse files
Options
Browse Files
Download
Plain Diff
Merge "Calculate number of users per country."
parents
f0196c12
e6e930c0
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
694 additions
and
0 deletions
+694
-0
edx/analytics/tasks/tests/test_user_location.py
+345
-0
edx/analytics/tasks/user_location.py
+347
-0
requirements/default.txt
+1
-0
setup.cfg
+1
-0
No files found.
edx/analytics/tasks/tests/test_user_location.py
0 → 100644
View file @
3963d379
"""
Tests for user geolocation tasks.
"""
import
datetime
import
json
import
tempfile
import
os
import
shutil
import
textwrap
from
mock
import
Mock
,
MagicMock
,
patch
import
luigi.worker
from
edx.analytics.tasks.tests
import
unittest
from
edx.analytics.tasks.user_location
import
LastCountryForEachUser
from
edx.analytics.tasks.user_location
import
UsersPerCountry
from
edx.analytics.tasks.user_location
import
UsersPerCountryReport
from
edx.analytics.tasks.user_location
import
UsersPerCountryReportWorkflow
from
edx.analytics.tasks.user_location
import
UNKNOWN_COUNTRY
,
UNKNOWN_CODE
from
edx.analytics.tasks.tests.target
import
FakeTarget
class
FakeGeoLocation
(
object
):
"""Fake version of pygeoip.GeoIp() instance for use in testing."""
ip_address_1
=
"123.45.67.89"
ip_address_2
=
"98.76.54.123"
country_name_1
=
"COUNTRY NAME 1"
country_code_1
=
"COUNTRY CODE 1"
country_name_2
=
"COUNTRY NAME 2"
country_code_2
=
"COUNTRY CODE 2"
def
country_name_by_addr
(
self
,
ip_address
):
"""Generates a country name if ip address is recognized."""
country_name_map
=
{
self
.
ip_address_1
:
self
.
country_name_1
,
self
.
ip_address_2
:
self
.
country_name_2
,
}
return
country_name_map
.
get
(
ip_address
)
def
country_code_by_addr
(
self
,
ip_address
):
"""Generates a country code if ip address is recognized."""
country_code_map
=
{
self
.
ip_address_1
:
self
.
country_code_1
,
self
.
ip_address_2
:
self
.
country_code_2
,
}
return
country_code_map
.
get
(
ip_address
)
class
BaseUserLocationEventTestCase
(
unittest
.
TestCase
):
"""Provides create-event functionality for testing user location tasks."""
username
=
'test_user'
timestamp
=
"2013-12-17T15:38:32.805444"
ip_address
=
FakeGeoLocation
.
ip_address_1
def
_create_event_log_line
(
self
,
**
kwargs
):
"""Create an event log with test values, as a JSON string."""
return
json
.
dumps
(
self
.
_create_event_dict
(
**
kwargs
))
def
_create_event_dict
(
self
,
**
kwargs
):
"""Create an event log with test values, as a dict."""
# Define default values for event log entry.
event_dict
=
{
"username"
:
self
.
username
,
"time"
:
"{0}+00:00"
.
format
(
self
.
timestamp
),
"ip"
:
self
.
ip_address
,
}
event_dict
.
update
(
**
kwargs
)
return
event_dict
class
LastCountryForEachUserMapperTestCase
(
BaseUserLocationEventTestCase
):
"""Tests of LastCountryForEachUser.mapper()"""
def
setUp
(
self
):
self
.
task
=
LastCountryForEachUser
(
mapreduce_engine
=
'local'
,
name
=
'test'
,
src
=
'test://input/'
,
dest
=
'test://output/'
,
end_date
=
datetime
.
datetime
.
strptime
(
'2014-04-01'
,
'
%
Y-
%
m-
%
d'
)
.
date
(),
geolocation_data
=
'test://data/data.file'
,
)
def
assert_no_output_for
(
self
,
line
):
"""Assert that an input line generates no output."""
self
.
assertEquals
(
tuple
(
self
.
task
.
mapper
(
line
)),
tuple
())
def
test_non_enrollment_event
(
self
):
line
=
'this is garbage'
self
.
assert_no_output_for
(
line
)
def
test_bad_datetime
(
self
):
line
=
self
.
_create_event_log_line
(
time
=
'this is a bogus time'
)
self
.
assert_no_output_for
(
line
)
def
test_after_end_date
(
self
):
line
=
self
.
_create_event_log_line
(
time
=
"2015-12-17T15:38:32.805444"
)
self
.
assert_no_output_for
(
line
)
def
test_missing_username
(
self
):
event_dict
=
self
.
_create_event_dict
()
del
event_dict
[
'username'
]
line
=
json
.
dumps
(
event_dict
)
self
.
assert_no_output_for
(
line
)
def
test_missing_ip_address
(
self
):
event_dict
=
self
.
_create_event_dict
()
del
event_dict
[
'ip'
]
line
=
json
.
dumps
(
event_dict
)
self
.
assert_no_output_for
(
line
)
def
test_good_event
(
self
):
line
=
self
.
_create_event_log_line
()
event
=
tuple
(
self
.
task
.
mapper
(
line
))
expected
=
((
self
.
username
,
(
self
.
timestamp
,
self
.
ip_address
)),)
self
.
assertEquals
(
event
,
expected
)
def
test_username_with_newline
(
self
):
line
=
self
.
_create_event_log_line
(
username
=
"baduser
\n
"
)
event
=
tuple
(
self
.
task
.
mapper
(
line
))
expected
=
((
"baduser"
,
(
self
.
timestamp
,
self
.
ip_address
)),)
self
.
assertEquals
(
event
,
expected
)
class
LastCountryForEachUserReducerTestCase
(
unittest
.
TestCase
):
"""Tests of LastCountryForEachUser.reducer()"""
def
setUp
(
self
):
self
.
username
=
"test_user"
self
.
timestamp
=
"2013-12-17T15:38:32.805444"
self
.
earlier_timestamp
=
"2013-12-15T15:38:32.805444"
self
.
task
=
LastCountryForEachUser
(
mapreduce_engine
=
'local'
,
name
=
'test'
,
src
=
'test://input/'
,
dest
=
'test://output/'
,
end_date
=
datetime
.
datetime
.
strptime
(
'2014-04-01'
,
'
%
Y-
%
m-
%
d'
)
.
date
(),
geolocation_data
=
'test://data/data.file'
,
)
self
.
task
.
geoip
=
FakeGeoLocation
()
def
_get_reducer_output
(
self
,
values
):
"""Run reducer with provided values hardcoded key."""
return
tuple
(
self
.
task
.
reducer
(
self
.
username
,
values
))
def
_check_output
(
self
,
inputs
,
expected
):
"""Compare generated with expected output."""
self
.
assertEquals
(
self
.
_get_reducer_output
(
inputs
),
expected
)
def
test_no_ip
(
self
):
self
.
_check_output
([],
tuple
())
def
test_single_ip
(
self
):
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
FakeGeoLocation
.
country_name_1
,
FakeGeoLocation
.
country_code_1
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_multiple_ip
(
self
):
inputs
=
[
(
self
.
earlier_timestamp
,
FakeGeoLocation
.
ip_address_1
),
(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_2
),
]
expected
=
(((
FakeGeoLocation
.
country_name_2
,
FakeGeoLocation
.
country_code_2
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_multiple_ip_in_different_order
(
self
):
inputs
=
[
(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_2
),
(
self
.
earlier_timestamp
,
FakeGeoLocation
.
ip_address_1
),
]
expected
=
(((
FakeGeoLocation
.
country_name_2
,
FakeGeoLocation
.
country_code_2
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_country_name_exception
(
self
):
self
.
task
.
geoip
.
country_name_by_addr
=
Mock
(
side_effect
=
Exception
)
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
UNKNOWN_COUNTRY
,
UNKNOWN_CODE
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_country_code_exception
(
self
):
self
.
task
.
geoip
.
country_code_by_addr
=
Mock
(
side_effect
=
Exception
)
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
UNKNOWN_COUNTRY
,
UNKNOWN_CODE
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_missing_country_name
(
self
):
self
.
task
.
geoip
.
country_name_by_addr
=
Mock
(
return_value
=
None
)
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
UNKNOWN_COUNTRY
,
FakeGeoLocation
.
country_code_1
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_empty_country_name
(
self
):
self
.
task
.
geoip
.
country_name_by_addr
=
Mock
(
return_value
=
" "
)
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
UNKNOWN_COUNTRY
,
FakeGeoLocation
.
country_code_1
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
def
test_missing_country_code
(
self
):
self
.
task
.
geoip
.
country_code_by_addr
=
Mock
(
return_value
=
""
)
inputs
=
[(
self
.
timestamp
,
FakeGeoLocation
.
ip_address_1
)]
expected
=
(((
FakeGeoLocation
.
country_name_1
,
""
),
self
.
username
),)
self
.
_check_output
(
inputs
,
expected
)
class
UsersPerCountryTestCase
(
unittest
.
TestCase
):
"""Tests of UsersPerCountry."""
def
setUp
(
self
):
self
.
end_date
=
'2014-04-01'
,
self
.
task
=
UsersPerCountry
(
mapreduce_engine
=
'local'
,
name
=
'test'
,
src
=
'test://input/'
,
dest
=
'test://output/'
,
end_date
=
self
.
end_date
,
geolocation_data
=
'test://data/data.file'
,
)
def
_create_input_line
(
self
,
country
,
code
,
username
):
"""Generates input matching what LastCountryForEachUser.reducer() would produce."""
return
"{country}
\t
{code}
\t
{username}"
.
format
(
country
=
country
,
code
=
code
,
username
=
username
)
def
test_mapper_on_normal
(
self
):
line
=
self
.
_create_input_line
(
"COUNTRY"
,
"CODE"
,
"USER"
)
self
.
assertEquals
(
tuple
(
self
.
task
.
mapper
(
line
)),
(((
'COUNTRY'
,
'CODE'
),
1
),))
def
test_mapper_with_empty_country
(
self
):
line
=
self
.
_create_input_line
(
""
,
"CODE"
,
"USER"
)
self
.
assertEquals
(
tuple
(
self
.
task
.
mapper
(
line
)),
tuple
())
def
test_reducer
(
self
):
key
=
(
"Country_1"
,
"Code_1"
)
values
=
[
34
,
29
,
102
]
expected
=
((
key
,
sum
(
values
),
self
.
end_date
),)
self
.
assertEquals
(
tuple
(
self
.
task
.
reducer
(
key
,
values
)),
expected
)
class
UsersPerCountryReportTestCase
(
unittest
.
TestCase
):
"""Tests of UsersPerCountryReport."""
def
run_task
(
self
,
counts
):
"""
Run task with fake targets.
Returns:
the task output as a string.
"""
task
=
UsersPerCountryReport
(
counts
=
'fake_counts'
,
report
=
'fake_report'
)
def
reformat
(
string
):
"""Reformat string to make it like a TSV."""
return
textwrap
.
dedent
(
string
)
.
strip
()
.
replace
(
' '
,
'
\t
'
)
task
.
input
=
MagicMock
(
return_value
=
FakeTarget
(
reformat
(
counts
)))
output_target
=
FakeTarget
()
task
.
output
=
MagicMock
(
return_value
=
output_target
)
task
.
run
()
return
output_target
.
buffer
.
read
()
def
test_report
(
self
):
date
=
'2014-04-01'
# Output counts in reverse order, to confirm that sorting works.
counts
=
"""
Country_1 Code_1 34 {date}
Country_2 Code_2 43 {date}
"""
.
format
(
date
=
date
)
output
=
self
.
run_task
(
counts
)
output_lines
=
output
.
split
(
'
\n
'
)
self
.
assertEquals
(
output_lines
[
0
],
UsersPerCountryReport
.
create_header
(
date
))
self
.
assertEquals
(
output_lines
[
1
],
UsersPerCountryReport
.
create_csv_entry
(
float
(
43
)
/
77
,
43
,
"Country_2"
,
"Code_2"
)
)
self
.
assertEquals
(
output_lines
[
2
],
UsersPerCountryReport
.
create_csv_entry
(
float
(
34
)
/
77
,
34
,
"Country_1"
,
"Code_1"
)
)
class
UsersPerCountryReportWorkflowTestCase
(
BaseUserLocationEventTestCase
):
"""Tests of UsersPerCountryReportWorkflow."""
def
setUp
(
self
):
# Define a real output directory, so it can
# be removed if existing.
def
cleanup
(
dirname
):
"""Remove the temp directory only if it exists."""
if
os
.
path
.
exists
(
dirname
):
shutil
.
rmtree
(
dirname
)
self
.
temp_rootdir
=
tempfile
.
mkdtemp
()
self
.
addCleanup
(
cleanup
,
self
.
temp_rootdir
)
def
test_workflow
(
self
):
# set up directories:
src_path
=
os
.
path
.
join
(
self
.
temp_rootdir
,
"src"
)
os
.
mkdir
(
src_path
)
counts_path
=
os
.
path
.
join
(
self
.
temp_rootdir
,
"counts"
)
os
.
mkdir
(
counts_path
)
report_path
=
os
.
path
.
join
(
self
.
temp_rootdir
,
"report.csv"
)
data_filepath
=
os
.
path
.
join
(
self
.
temp_rootdir
,
"geoloc.dat"
)
with
open
(
data_filepath
,
'w'
)
as
data_file
:
data_file
.
write
(
"Dummy geolocation data."
)
# create input:
log_filepath
=
os
.
path
.
join
(
src_path
,
"tracking.log"
)
with
open
(
log_filepath
,
'w'
)
as
log_file
:
log_file
.
write
(
self
.
_create_event_log_line
())
log_file
.
write
(
'
\n
'
)
log_file
.
write
(
self
.
_create_event_log_line
(
username
=
"second_user"
,
ip
=
FakeGeoLocation
.
ip_address_2
))
log_file
.
write
(
'
\n
'
)
end_date
=
'2014-04-01'
task
=
UsersPerCountryReportWorkflow
(
mapreduce_engine
=
'local'
,
name
=
'test'
,
src
=
src_path
,
end_date
=
datetime
.
datetime
.
strptime
(
end_date
,
'
%
Y-
%
m-
%
d'
)
.
date
(),
geolocation_data
=
data_filepath
,
counts
=
counts_path
,
report
=
report_path
,
)
worker
=
luigi
.
worker
.
Worker
()
worker
.
add
(
task
)
with
patch
(
'edx.analytics.tasks.user_location.pygeoip'
)
as
mock_pygeoip
:
mock_pygeoip
.
GeoIP
=
Mock
(
return_value
=
FakeGeoLocation
())
worker
.
run
()
worker
.
stop
()
output_lines
=
[]
with
open
(
report_path
)
as
report_file
:
output_lines
=
report_file
.
readlines
()
self
.
assertEquals
(
len
(
output_lines
),
3
)
self
.
assertEquals
(
output_lines
[
0
]
.
strip
(
'
\n
'
),
UsersPerCountryReport
.
create_header
(
end_date
))
expected
=
UsersPerCountryReport
.
create_csv_entry
(
0.5
,
1
,
FakeGeoLocation
.
country_name_1
,
FakeGeoLocation
.
country_code_1
)
self
.
assertEquals
(
output_lines
[
1
]
.
strip
(
'
\n
'
),
expected
)
expected
=
UsersPerCountryReport
.
create_csv_entry
(
0.5
,
1
,
FakeGeoLocation
.
country_name_2
,
FakeGeoLocation
.
country_code_2
)
self
.
assertEquals
(
output_lines
[
2
]
.
strip
(
'
\n
'
),
expected
)
edx/analytics/tasks/user_location.py
0 → 100644
View file @
3963d379
"""
Determine the number of users in each country.
"""
import
datetime
import
tempfile
import
luigi
import
pygeoip
import
edx.analytics.tasks.util.eventlog
as
eventlog
from
edx.analytics.tasks.mapreduce
import
MapReduceJobTask
from
edx.analytics.tasks.pathutil
import
PathSetTask
from
edx.analytics.tasks.url
import
get_target_from_url
,
url_path_join
,
ExternalURL
import
logging
log
=
logging
.
getLogger
(
__name__
)
UNKNOWN_COUNTRY
=
"UNKNOWN"
UNKNOWN_CODE
=
"UNKNOWN"
class
BaseUserLocationTask
(
object
):
"""
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
manifest: a URL to a file location that can store the complete set of input files.
end_date: events before or on this date are kept, and after this date are filtered out.
geolocation_data: a URL to the location of country-level geolocation data.
"""
name
=
luigi
.
Parameter
()
src
=
luigi
.
Parameter
()
dest
=
luigi
.
Parameter
()
include
=
luigi
.
Parameter
(
is_list
=
True
,
default
=
(
'*'
,))
# A manifest file is required by hadoop if there are too many
# input paths. It hits an operating system limit on the
# number of arguments passed to the mapper process on the task nodes.
manifest
=
luigi
.
Parameter
(
default
=
None
)
end_date
=
luigi
.
DateParameter
()
geolocation_data
=
luigi
.
Parameter
()
class
LastCountryForEachUser
(
MapReduceJobTask
,
BaseUserLocationTask
):
"""Identifies the country of the last IP address associated with each user."""
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
(
LastCountryForEachUser
,
self
)
.
__init__
(
*
args
,
**
kwargs
)
# end_datetime is midnight of the day after the day to be included.
end_date_exclusive
=
self
.
end_date
+
datetime
.
timedelta
(
1
)
self
.
end_datetime
=
datetime
.
datetime
(
end_date_exclusive
.
year
,
end_date_exclusive
.
month
,
end_date_exclusive
.
day
)
def
requires
(
self
):
results
=
{
'events'
:
PathSetTask
(
self
.
src
,
self
.
include
,
self
.
manifest
),
'geoloc_data'
:
ExternalURL
(
self
.
geolocation_data
),
}
return
results
def
requires_local
(
self
):
return
self
.
requires
()[
'geoloc_data'
]
def
requires_hadoop
(
self
):
# Only pass the input files on to hadoop, not any data file.
return
self
.
requires
()[
'events'
]
def
output
(
self
):
output_name
=
u'last_country_for_each_user_{name}/'
.
format
(
name
=
self
.
name
)
return
get_target_from_url
(
url_path_join
(
self
.
dest
,
output_name
))
def
mapper
(
self
,
line
):
event
=
eventlog
.
parse_json_event
(
line
)
if
event
is
None
:
return
username
=
event
.
get
(
'username'
)
if
not
username
:
return
stripped_username
=
username
.
strip
()
if
username
!=
stripped_username
:
log
.
error
(
"User '
%
s' has extra whitespace, which is being stripped. Event:
%
s"
,
username
,
event
)
username
=
stripped_username
timestamp_as_datetime
=
eventlog
.
get_event_time
(
event
)
if
timestamp_as_datetime
is
None
:
return
if
timestamp_as_datetime
>=
self
.
end_datetime
:
return
timestamp
=
eventlog
.
datetime_to_timestamp
(
timestamp_as_datetime
)
ip_address
=
event
.
get
(
'ip'
)
if
not
ip_address
:
log
.
warning
(
"No ip_address found for user '
%
s' on '
%
s'."
,
username
,
timestamp
)
return
yield
username
,
(
timestamp
,
ip_address
)
def
init_reducer
(
self
):
# Copy the remote version of the geolocation data file to a local file.
# This is required by the GeoIP call, which assumes that the data file is located
# on a local file system.
self
.
temporary_data_file
=
tempfile
.
NamedTemporaryFile
(
prefix
=
'geolocation_data'
)
with
self
.
input
()[
'geoloc_data'
]
.
open
()
as
geolocation_data_input
:
while
True
:
transfer_buffer
=
geolocation_data_input
.
read
(
1024
)
if
transfer_buffer
:
self
.
temporary_data_file
.
write
(
transfer_buffer
)
else
:
break
self
.
temporary_data_file
.
seek
(
0
)
self
.
geoip
=
pygeoip
.
GeoIP
(
self
.
temporary_data_file
.
name
,
pygeoip
.
STANDARD
)
def
reducer
(
self
,
key
,
values
):
"""Outputs country for last ip address associated with a user."""
# DON'T presort input values (by timestamp). The data potentially takes up too
# much memory. Scan the input values instead.
# We assume the timestamp values (strings) are in ISO
# representation, so that they can be compared as strings.
username
=
key
last_ip
=
None
last_timestamp
=
""
for
timestamp
,
ip_address
in
values
:
if
timestamp
>
last_timestamp
:
last_ip
=
ip_address
last_timestamp
=
timestamp
if
not
last_ip
:
return
# This ip address might not provide a country name.
try
:
country
=
self
.
geoip
.
country_name_by_addr
(
last_ip
)
code
=
self
.
geoip
.
country_code_by_addr
(
last_ip
)
except
Exception
:
log
.
exception
(
"Encountered exception getting country: user '
%
s', last_ip '
%
s' on '
%
s'."
,
username
,
last_ip
,
last_timestamp
)
country
=
UNKNOWN_COUNTRY
code
=
UNKNOWN_CODE
if
country
is
None
or
len
(
country
.
strip
())
<=
0
:
log
.
error
(
"No country found for user '
%
s', last_ip '
%
s' on '
%
s'."
,
username
,
last_ip
,
last_timestamp
)
# TODO: try earlier IP addresses, if we find this happens much.
country
=
UNKNOWN_COUNTRY
if
code
is
None
:
log
.
error
(
"No code found for user '
%
s', last_ip '
%
s', country '
%
s' on '
%
s'."
,
username
,
last_ip
,
country
,
last_timestamp
)
# TODO: try earlier IP addresses, if we find this happens much.
code
=
UNKNOWN_CODE
# Add the username for debugging purposes. (Not needed for counts.)
yield
(
country
,
code
),
username
def
final_reducer
(
self
):
"""Clean up after the reducer is done."""
del
self
.
geoip
self
.
temporary_data_file
.
close
()
return
tuple
()
def
extra_modules
(
self
):
"""Pygeoip is required by all tasks that load this file."""
return
[
pygeoip
]
class
UsersPerCountry
(
MapReduceJobTask
,
BaseUserLocationTask
):
"""
Counts number of unique users per country, using a user's last IP address.
Most parameters are passed through to :py:class:`LastCountryForEachUser`.
Additional parameter:
base_input_format: value of input_format to be passed to :py:class:`LastCountryForEachUser`.
"""
base_input_format
=
luigi
.
Parameter
(
default
=
None
)
def
requires
(
self
):
return
LastCountryForEachUser
(
mapreduce_engine
=
self
.
mapreduce_engine
,
lib_jar
=
self
.
lib_jar
,
input_format
=
self
.
base_input_format
,
n_reduce_tasks
=
self
.
n_reduce_tasks
,
src
=
self
.
src
,
dest
=
self
.
dest
,
include
=
self
.
include
,
name
=
self
.
name
,
manifest
=
self
.
manifest
,
geolocation_data
=
self
.
geolocation_data
,
end_date
=
self
.
end_date
,
)
def
output
(
self
):
output_name
=
u'users_per_country_{name}/'
.
format
(
name
=
self
.
name
)
return
get_target_from_url
(
url_path_join
(
self
.
dest
,
output_name
))
def
mapper
(
self
,
line
):
"""Replace username with count of 1 for summing."""
country
,
code
,
_username
=
line
.
split
(
'
\t
'
)
if
len
(
country
)
>
0
:
yield
(
country
,
code
),
1
def
reducer
(
self
,
key
,
values
):
"""Sum counts over countries, and append date of current run to each entry."""
yield
key
,
sum
(
values
),
self
.
end_date
# The size of mapper outputs can be shrunk by defining the
# combiner to generate sums for each country coming out of each
# mapper. The reducer then only needs to sum the partial sums.
combiner
=
reducer
def
extra_modules
(
self
):
"""Pygeoip is required by all tasks that load this file."""
return
[
pygeoip
]
class
UsersPerCountryReport
(
luigi
.
Task
):
"""
Calculates TSV file containing number of users per country.
Parameters:
counts: Location of counts per country. The format is a hadoop
tsv file, with fields country, count, and date.
report: Location of the resulting report. The output format is a
excel csv file with country and count.
"""
counts
=
luigi
.
Parameter
()
report
=
luigi
.
Parameter
()
def
requires
(
self
):
return
ExternalURL
(
self
.
counts
)
def
output
(
self
):
return
get_target_from_url
(
self
.
report
)
@classmethod
def
create_header
(
cls
,
date
):
"""Generate a header for CSV output."""
fields
=
[
'percent'
,
'count'
,
'country'
,
'code'
,
'date={date}'
.
format
(
date
=
date
)]
return
'
\t
'
.
join
(
fields
)
@classmethod
def
create_csv_entry
(
cls
,
percent
,
count
,
country
,
code
):
"""Generate a single entry in CSV format."""
return
"{percent:.2
%
}
\t
{count}
\t
{country}
\t
{code}"
.
format
(
percent
=
percent
,
count
=
count
,
country
=
country
,
code
=
code
)
def
run
(
self
):
# Provide default values for when no counts are available.
counts
=
[]
date
=
"UNKNOWN"
total
=
0
with
self
.
input
()
.
open
(
'r'
)
as
input_file
:
for
line
in
input_file
.
readlines
():
country
,
code
,
count
,
date
=
line
.
split
(
'
\t
'
)
counts
.
append
((
count
,
country
,
code
))
date
=
date
.
strip
()
total
+=
int
(
count
)
# Write out the counts as a CSV, in reverse order of counts.
with
self
.
output
()
.
open
(
'w'
)
as
output_file
:
output_file
.
write
(
self
.
create_header
(
date
))
output_file
.
write
(
'
\n
'
)
for
count
,
country
,
code
in
sorted
(
counts
,
reverse
=
True
,
key
=
lambda
k
:
int
(
k
[
0
])):
percent
=
float
(
count
)
/
float
(
total
)
output_file
.
write
(
self
.
create_csv_entry
(
percent
,
count
,
country
,
code
))
output_file
.
write
(
'
\n
'
)
def
extra_modules
(
self
):
"""Pygeoip is required by all tasks that load this file."""
return
[
pygeoip
]
class
UsersPerCountryReportWorkflow
(
UsersPerCountryReport
):
"""
Generates report containing number of users per location (country).
Most parameters are passed through to :py:class:`LastCountryForEachUser`
via :py:class:`UsersPerCountry`. These are:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
dest: a URL to the root location to write output file(s).
manifest: a URL to a file location that can store the complete set of input files.
end_date: events before or on this date are kept, and after this date are filtered out.
geolocation_data: a URL to the location of country-level geolocation data.
Additional optional parameters are passed through to :py:class:`UsersPerCountryReport`:
mapreduce_engine: 'hadoop' (the default) or 'local'.
input_format: override the input_format for Hadoop job to use. For example, when
running with manifest file above, specify "oddjob.ManifestTextInputFormat" for input_format.
lib_jar: points to jar defining input_format, if any.
n_reduce_tasks: number of reducer tasks to use in upstream tasks.
Additional parameters are passed through to :py:class:`UsersPerCountryReport`:
counts: Location of counts per country. The format is a hadoop
tsv file, with fields country, count, and date.
report: Location of the resulting report. The output format is a
excel csv file with country and count.
"""
name
=
luigi
.
Parameter
()
src
=
luigi
.
Parameter
()
include
=
luigi
.
Parameter
(
is_list
=
True
,
default
=
(
'*'
,))
manifest
=
luigi
.
Parameter
(
default
=
None
)
base_input_format
=
luigi
.
Parameter
(
default
=
None
)
end_date
=
luigi
.
DateParameter
()
geolocation_data
=
luigi
.
Parameter
()
mapreduce_engine
=
luigi
.
Parameter
(
default_from_config
=
{
'section'
:
'map-reduce'
,
'name'
:
'engine'
}
)
lib_jar
=
luigi
.
Parameter
(
is_list
=
True
,
default
=
[])
n_reduce_tasks
=
luigi
.
Parameter
(
default
=
25
)
def
requires
(
self
):
return
UsersPerCountry
(
mapreduce_engine
=
self
.
mapreduce_engine
,
lib_jar
=
self
.
lib_jar
,
base_input_format
=
self
.
base_input_format
,
n_reduce_tasks
=
self
.
n_reduce_tasks
,
src
=
self
.
src
,
dest
=
self
.
counts
,
include
=
self
.
include
,
name
=
self
.
name
,
manifest
=
self
.
manifest
,
geolocation_data
=
self
.
geolocation_data
,
end_date
=
self
.
end_date
,
)
requirements/default.txt
View file @
3963d379
...
@@ -7,6 +7,7 @@ numpy==1.8.0
...
@@ -7,6 +7,7 @@ numpy==1.8.0
oursql==0.9.3.1
oursql==0.9.3.1
pandas==0.13.0
pandas==0.13.0
pbr==0.5.23
pbr==0.5.23
pygeoip==0.3.1
python-cjson==1.0.5
python-cjson==1.0.5
stevedore==0.14.1
stevedore==0.14.1
tornado==3.1.1
tornado==3.1.1
...
...
setup.cfg
View file @
3963d379
...
@@ -32,6 +32,7 @@ edx.analytics.tasks =
...
@@ -32,6 +32,7 @@ edx.analytics.tasks =
sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql
sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
last-country = edx.analytics.tasks.user_location:LastCountryForEachUser
mapreduce.engine =
mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment