Commit e6e930c0 by Brian Wilson

Calculate number of users per country.

Based on IP geolocation of the last seen event for a user.

Change-Id: Ib85b42ed6062e4cd9cd0efcb96dd4729b6feccff
parent 3929cee0
"""
Tests for user geolocation tasks.
"""
import datetime
import json
import tempfile
import os
import shutil
import textwrap
from mock import Mock, MagicMock, patch
import luigi.worker
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.user_location import LastCountryForEachUser
from edx.analytics.tasks.user_location import UsersPerCountry
from edx.analytics.tasks.user_location import UsersPerCountryReport
from edx.analytics.tasks.user_location import UsersPerCountryReportWorkflow
from edx.analytics.tasks.user_location import UNKNOWN_COUNTRY, UNKNOWN_CODE
from edx.analytics.tasks.tests.target import FakeTarget
class FakeGeoLocation(object):
"""Fake version of pygeoip.GeoIp() instance for use in testing."""
ip_address_1 = "123.45.67.89"
ip_address_2 = "98.76.54.123"
country_name_1 = "COUNTRY NAME 1"
country_code_1 = "COUNTRY CODE 1"
country_name_2 = "COUNTRY NAME 2"
country_code_2 = "COUNTRY CODE 2"
def country_name_by_addr(self, ip_address):
"""Generates a country name if ip address is recognized."""
country_name_map = {
self.ip_address_1: self.country_name_1,
self.ip_address_2: self.country_name_2,
}
return country_name_map.get(ip_address)
def country_code_by_addr(self, ip_address):
"""Generates a country code if ip address is recognized."""
country_code_map = {
self.ip_address_1: self.country_code_1,
self.ip_address_2: self.country_code_2,
}
return country_code_map.get(ip_address)
class BaseUserLocationEventTestCase(unittest.TestCase):
"""Provides create-event functionality for testing user location tasks."""
username = 'test_user'
timestamp = "2013-12-17T15:38:32.805444"
ip_address = FakeGeoLocation.ip_address_1
def _create_event_log_line(self, **kwargs):
"""Create an event log with test values, as a JSON string."""
return json.dumps(self._create_event_dict(**kwargs))
def _create_event_dict(self, **kwargs):
"""Create an event log with test values, as a dict."""
# Define default values for event log entry.
event_dict = {
"username": self.username,
"time": "{0}+00:00".format(self.timestamp),
"ip": self.ip_address,
}
event_dict.update(**kwargs)
return event_dict
class LastCountryForEachUserMapperTestCase(BaseUserLocationEventTestCase):
"""Tests of LastCountryForEachUser.mapper()"""
def setUp(self):
self.task = LastCountryForEachUser(
mapreduce_engine='local',
name='test',
src='test://input/',
dest='test://output/',
end_date=datetime.datetime.strptime('2014-04-01', '%Y-%m-%d').date(),
geolocation_data='test://data/data.file',
)
def assert_no_output_for(self, line):
"""Assert that an input line generates no output."""
self.assertEquals(tuple(self.task.mapper(line)), tuple())
def test_non_enrollment_event(self):
line = 'this is garbage'
self.assert_no_output_for(line)
def test_bad_datetime(self):
line = self._create_event_log_line(time='this is a bogus time')
self.assert_no_output_for(line)
def test_after_end_date(self):
line = self._create_event_log_line(time="2015-12-17T15:38:32.805444")
self.assert_no_output_for(line)
def test_missing_username(self):
event_dict = self._create_event_dict()
del event_dict['username']
line = json.dumps(event_dict)
self.assert_no_output_for(line)
def test_missing_ip_address(self):
event_dict = self._create_event_dict()
del event_dict['ip']
line = json.dumps(event_dict)
self.assert_no_output_for(line)
def test_good_event(self):
line = self._create_event_log_line()
event = tuple(self.task.mapper(line))
expected = ((self.username, (self.timestamp, self.ip_address)),)
self.assertEquals(event, expected)
def test_username_with_newline(self):
line = self._create_event_log_line(username="baduser\n")
event = tuple(self.task.mapper(line))
expected = (("baduser", (self.timestamp, self.ip_address)),)
self.assertEquals(event, expected)
class LastCountryForEachUserReducerTestCase(unittest.TestCase):
"""Tests of LastCountryForEachUser.reducer()"""
def setUp(self):
self.username = "test_user"
self.timestamp = "2013-12-17T15:38:32.805444"
self.earlier_timestamp = "2013-12-15T15:38:32.805444"
self.task = LastCountryForEachUser(
mapreduce_engine='local',
name='test',
src='test://input/',
dest='test://output/',
end_date=datetime.datetime.strptime('2014-04-01', '%Y-%m-%d').date(),
geolocation_data='test://data/data.file',
)
self.task.geoip = FakeGeoLocation()
def _get_reducer_output(self, values):
"""Run reducer with provided values hardcoded key."""
return tuple(self.task.reducer(self.username, values))
def _check_output(self, inputs, expected):
"""Compare generated with expected output."""
self.assertEquals(self._get_reducer_output(inputs), expected)
def test_no_ip(self):
self._check_output([], tuple())
def test_single_ip(self):
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((FakeGeoLocation.country_name_1, FakeGeoLocation.country_code_1), self.username),)
self._check_output(inputs, expected)
def test_multiple_ip(self):
inputs = [
(self.earlier_timestamp, FakeGeoLocation.ip_address_1),
(self.timestamp, FakeGeoLocation.ip_address_2),
]
expected = (((FakeGeoLocation.country_name_2, FakeGeoLocation.country_code_2), self.username),)
self._check_output(inputs, expected)
def test_multiple_ip_in_different_order(self):
inputs = [
(self.timestamp, FakeGeoLocation.ip_address_2),
(self.earlier_timestamp, FakeGeoLocation.ip_address_1),
]
expected = (((FakeGeoLocation.country_name_2, FakeGeoLocation.country_code_2), self.username),)
self._check_output(inputs, expected)
def test_country_name_exception(self):
self.task.geoip.country_name_by_addr = Mock(side_effect=Exception)
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((UNKNOWN_COUNTRY, UNKNOWN_CODE), self.username),)
self._check_output(inputs, expected)
def test_country_code_exception(self):
self.task.geoip.country_code_by_addr = Mock(side_effect=Exception)
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((UNKNOWN_COUNTRY, UNKNOWN_CODE), self.username),)
self._check_output(inputs, expected)
def test_missing_country_name(self):
self.task.geoip.country_name_by_addr = Mock(return_value=None)
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((UNKNOWN_COUNTRY, FakeGeoLocation.country_code_1), self.username),)
self._check_output(inputs, expected)
def test_empty_country_name(self):
self.task.geoip.country_name_by_addr = Mock(return_value=" ")
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((UNKNOWN_COUNTRY, FakeGeoLocation.country_code_1), self.username),)
self._check_output(inputs, expected)
def test_missing_country_code(self):
self.task.geoip.country_code_by_addr = Mock(return_value="")
inputs = [(self.timestamp, FakeGeoLocation.ip_address_1)]
expected = (((FakeGeoLocation.country_name_1, ""), self.username),)
self._check_output(inputs, expected)
class UsersPerCountryTestCase(unittest.TestCase):
"""Tests of UsersPerCountry."""
def setUp(self):
self.end_date = '2014-04-01',
self.task = UsersPerCountry(
mapreduce_engine='local',
name='test',
src='test://input/',
dest='test://output/',
end_date=self.end_date,
geolocation_data='test://data/data.file',
)
def _create_input_line(self, country, code, username):
"""Generates input matching what LastCountryForEachUser.reducer() would produce."""
return "{country}\t{code}\t{username}".format(country=country, code=code, username=username)
def test_mapper_on_normal(self):
line = self._create_input_line("COUNTRY", "CODE", "USER")
self.assertEquals(tuple(self.task.mapper(line)), ((('COUNTRY', 'CODE'), 1),))
def test_mapper_with_empty_country(self):
line = self._create_input_line("", "CODE", "USER")
self.assertEquals(tuple(self.task.mapper(line)), tuple())
def test_reducer(self):
key = ("Country_1", "Code_1")
values = [34, 29, 102]
expected = ((key, sum(values), self.end_date),)
self.assertEquals(tuple(self.task.reducer(key, values)), expected)
class UsersPerCountryReportTestCase(unittest.TestCase):
"""Tests of UsersPerCountryReport."""
def run_task(self, counts):
"""
Run task with fake targets.
Returns:
the task output as a string.
"""
task = UsersPerCountryReport(counts='fake_counts', report='fake_report')
def reformat(string):
"""Reformat string to make it like a TSV."""
return textwrap.dedent(string).strip().replace(' ', '\t')
task.input = MagicMock(return_value=FakeTarget(reformat(counts)))
output_target = FakeTarget()
task.output = MagicMock(return_value=output_target)
task.run()
return output_target.buffer.read()
def test_report(self):
date = '2014-04-01'
# Output counts in reverse order, to confirm that sorting works.
counts = """
Country_1 Code_1 34 {date}
Country_2 Code_2 43 {date}
""".format(date=date)
output = self.run_task(counts)
output_lines = output.split('\n')
self.assertEquals(output_lines[0], UsersPerCountryReport.create_header(date))
self.assertEquals(
output_lines[1], UsersPerCountryReport.create_csv_entry(float(43) / 77, 43, "Country_2", "Code_2")
)
self.assertEquals(
output_lines[2], UsersPerCountryReport.create_csv_entry(float(34) / 77, 34, "Country_1", "Code_1")
)
class UsersPerCountryReportWorkflowTestCase(BaseUserLocationEventTestCase):
"""Tests of UsersPerCountryReportWorkflow."""
def setUp(self):
# Define a real output directory, so it can
# be removed if existing.
def cleanup(dirname):
"""Remove the temp directory only if it exists."""
if os.path.exists(dirname):
shutil.rmtree(dirname)
self.temp_rootdir = tempfile.mkdtemp()
self.addCleanup(cleanup, self.temp_rootdir)
def test_workflow(self):
# set up directories:
src_path = os.path.join(self.temp_rootdir, "src")
os.mkdir(src_path)
counts_path = os.path.join(self.temp_rootdir, "counts")
os.mkdir(counts_path)
report_path = os.path.join(self.temp_rootdir, "report.csv")
data_filepath = os.path.join(self.temp_rootdir, "geoloc.dat")
with open(data_filepath, 'w') as data_file:
data_file.write("Dummy geolocation data.")
# create input:
log_filepath = os.path.join(src_path, "tracking.log")
with open(log_filepath, 'w') as log_file:
log_file.write(self._create_event_log_line())
log_file.write('\n')
log_file.write(self._create_event_log_line(username="second_user", ip=FakeGeoLocation.ip_address_2))
log_file.write('\n')
end_date = '2014-04-01'
task = UsersPerCountryReportWorkflow(
mapreduce_engine='local',
name='test',
src=src_path,
end_date=datetime.datetime.strptime(end_date, '%Y-%m-%d').date(),
geolocation_data=data_filepath,
counts=counts_path,
report=report_path,
)
worker = luigi.worker.Worker()
worker.add(task)
with patch('edx.analytics.tasks.user_location.pygeoip') as mock_pygeoip:
mock_pygeoip.GeoIP = Mock(return_value=FakeGeoLocation())
worker.run()
worker.stop()
output_lines = []
with open(report_path) as report_file:
output_lines = report_file.readlines()
self.assertEquals(len(output_lines), 3)
self.assertEquals(output_lines[0].strip('\n'), UsersPerCountryReport.create_header(end_date))
expected = UsersPerCountryReport.create_csv_entry(
0.5, 1, FakeGeoLocation.country_name_1, FakeGeoLocation.country_code_1
)
self.assertEquals(output_lines[1].strip('\n'), expected)
expected = UsersPerCountryReport.create_csv_entry(
0.5, 1, FakeGeoLocation.country_name_2, FakeGeoLocation.country_code_2
)
self.assertEquals(output_lines[2].strip('\n'), expected)
"""
Determine the number of users in each country.
"""
import datetime
import tempfile
import luigi
import pygeoip
import edx.analytics.tasks.util.eventlog as eventlog
from edx.analytics.tasks.mapreduce import MapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import get_target_from_url, url_path_join, ExternalURL
import logging
log = logging.getLogger(__name__)
UNKNOWN_COUNTRY = "UNKNOWN"
UNKNOWN_CODE = "UNKNOWN"
class BaseUserLocationTask(object):
"""
Parameters:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
manifest: a URL to a file location that can store the complete set of input files.
end_date: events before or on this date are kept, and after this date are filtered out.
geolocation_data: a URL to the location of country-level geolocation data.
"""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
# A manifest file is required by hadoop if there are too many
# input paths. It hits an operating system limit on the
# number of arguments passed to the mapper process on the task nodes.
manifest = luigi.Parameter(default=None)
end_date = luigi.DateParameter()
geolocation_data = luigi.Parameter()
class LastCountryForEachUser(MapReduceJobTask, BaseUserLocationTask):
"""Identifies the country of the last IP address associated with each user."""
def __init__(self, *args, **kwargs):
super(LastCountryForEachUser, self).__init__(*args, **kwargs)
# end_datetime is midnight of the day after the day to be included.
end_date_exclusive = self.end_date + datetime.timedelta(1)
self.end_datetime = datetime.datetime(end_date_exclusive.year, end_date_exclusive.month, end_date_exclusive.day)
def requires(self):
results = {
'events': PathSetTask(self.src, self.include, self.manifest),
'geoloc_data': ExternalURL(self.geolocation_data),
}
return results
def requires_local(self):
return self.requires()['geoloc_data']
def requires_hadoop(self):
# Only pass the input files on to hadoop, not any data file.
return self.requires()['events']
def output(self):
output_name = u'last_country_for_each_user_{name}/'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
def mapper(self, line):
event = eventlog.parse_json_event(line)
if event is None:
return
username = event.get('username')
if not username:
return
stripped_username = username.strip()
if username != stripped_username:
log.error("User '%s' has extra whitespace, which is being stripped. Event: %s", username, event)
username = stripped_username
timestamp_as_datetime = eventlog.get_event_time(event)
if timestamp_as_datetime is None:
return
if timestamp_as_datetime >= self.end_datetime:
return
timestamp = eventlog.datetime_to_timestamp(timestamp_as_datetime)
ip_address = event.get('ip')
if not ip_address:
log.warning("No ip_address found for user '%s' on '%s'.", username, timestamp)
return
yield username, (timestamp, ip_address)
def init_reducer(self):
# Copy the remote version of the geolocation data file to a local file.
# This is required by the GeoIP call, which assumes that the data file is located
# on a local file system.
self.temporary_data_file = tempfile.NamedTemporaryFile(prefix='geolocation_data')
with self.input()['geoloc_data'].open() as geolocation_data_input:
while True:
transfer_buffer = geolocation_data_input.read(1024)
if transfer_buffer:
self.temporary_data_file.write(transfer_buffer)
else:
break
self.temporary_data_file.seek(0)
self.geoip = pygeoip.GeoIP(self.temporary_data_file.name, pygeoip.STANDARD)
def reducer(self, key, values):
"""Outputs country for last ip address associated with a user."""
# DON'T presort input values (by timestamp). The data potentially takes up too
# much memory. Scan the input values instead.
# We assume the timestamp values (strings) are in ISO
# representation, so that they can be compared as strings.
username = key
last_ip = None
last_timestamp = ""
for timestamp, ip_address in values:
if timestamp > last_timestamp:
last_ip = ip_address
last_timestamp = timestamp
if not last_ip:
return
# This ip address might not provide a country name.
try:
country = self.geoip.country_name_by_addr(last_ip)
code = self.geoip.country_code_by_addr(last_ip)
except Exception:
log.exception("Encountered exception getting country: user '%s', last_ip '%s' on '%s'.",
username, last_ip, last_timestamp)
country = UNKNOWN_COUNTRY
code = UNKNOWN_CODE
if country is None or len(country.strip()) <= 0:
log.error("No country found for user '%s', last_ip '%s' on '%s'.", username, last_ip, last_timestamp)
# TODO: try earlier IP addresses, if we find this happens much.
country = UNKNOWN_COUNTRY
if code is None:
log.error("No code found for user '%s', last_ip '%s', country '%s' on '%s'.",
username, last_ip, country, last_timestamp)
# TODO: try earlier IP addresses, if we find this happens much.
code = UNKNOWN_CODE
# Add the username for debugging purposes. (Not needed for counts.)
yield (country, code), username
def final_reducer(self):
"""Clean up after the reducer is done."""
del self.geoip
self.temporary_data_file.close()
return tuple()
def extra_modules(self):
"""Pygeoip is required by all tasks that load this file."""
return [pygeoip]
class UsersPerCountry(MapReduceJobTask, BaseUserLocationTask):
"""
Counts number of unique users per country, using a user's last IP address.
Most parameters are passed through to :py:class:`LastCountryForEachUser`.
Additional parameter:
base_input_format: value of input_format to be passed to :py:class:`LastCountryForEachUser`.
"""
base_input_format = luigi.Parameter(default=None)
def requires(self):
return LastCountryForEachUser(
mapreduce_engine=self.mapreduce_engine,
lib_jar=self.lib_jar,
input_format=self.base_input_format,
n_reduce_tasks=self.n_reduce_tasks,
src=self.src,
dest=self.dest,
include=self.include,
name=self.name,
manifest=self.manifest,
geolocation_data=self.geolocation_data,
end_date=self.end_date,
)
def output(self):
output_name = u'users_per_country_{name}/'.format(name=self.name)
return get_target_from_url(url_path_join(self.dest, output_name))
def mapper(self, line):
"""Replace username with count of 1 for summing."""
country, code, _username = line.split('\t')
if len(country) > 0:
yield (country, code), 1
def reducer(self, key, values):
"""Sum counts over countries, and append date of current run to each entry."""
yield key, sum(values), self.end_date
# The size of mapper outputs can be shrunk by defining the
# combiner to generate sums for each country coming out of each
# mapper. The reducer then only needs to sum the partial sums.
combiner = reducer
def extra_modules(self):
"""Pygeoip is required by all tasks that load this file."""
return [pygeoip]
class UsersPerCountryReport(luigi.Task):
"""
Calculates TSV file containing number of users per country.
Parameters:
counts: Location of counts per country. The format is a hadoop
tsv file, with fields country, count, and date.
report: Location of the resulting report. The output format is a
excel csv file with country and count.
"""
counts = luigi.Parameter()
report = luigi.Parameter()
def requires(self):
return ExternalURL(self.counts)
def output(self):
return get_target_from_url(self.report)
@classmethod
def create_header(cls, date):
"""Generate a header for CSV output."""
fields = ['percent', 'count', 'country', 'code', 'date={date}'.format(date=date)]
return '\t'.join(fields)
@classmethod
def create_csv_entry(cls, percent, count, country, code):
"""Generate a single entry in CSV format."""
return "{percent:.2%}\t{count}\t{country}\t{code}".format(
percent=percent, count=count, country=country, code=code
)
def run(self):
# Provide default values for when no counts are available.
counts = []
date = "UNKNOWN"
total = 0
with self.input().open('r') as input_file:
for line in input_file.readlines():
country, code, count, date = line.split('\t')
counts.append((count, country, code))
date = date.strip()
total += int(count)
# Write out the counts as a CSV, in reverse order of counts.
with self.output().open('w') as output_file:
output_file.write(self.create_header(date))
output_file.write('\n')
for count, country, code in sorted(counts, reverse=True, key=lambda k: int(k[0])):
percent = float(count) / float(total)
output_file.write(self.create_csv_entry(percent, count, country, code))
output_file.write('\n')
def extra_modules(self):
"""Pygeoip is required by all tasks that load this file."""
return [pygeoip]
class UsersPerCountryReportWorkflow(UsersPerCountryReport):
"""
Generates report containing number of users per location (country).
Most parameters are passed through to :py:class:`LastCountryForEachUser`
via :py:class:`UsersPerCountry`. These are:
name: a unique identifier to distinguish one run from another. It is used in
the construction of output filenames, so each run will have distinct outputs.
src: a URL to the root location of input tracking log files.
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
dest: a URL to the root location to write output file(s).
manifest: a URL to a file location that can store the complete set of input files.
end_date: events before or on this date are kept, and after this date are filtered out.
geolocation_data: a URL to the location of country-level geolocation data.
Additional optional parameters are passed through to :py:class:`UsersPerCountryReport`:
mapreduce_engine: 'hadoop' (the default) or 'local'.
input_format: override the input_format for Hadoop job to use. For example, when
running with manifest file above, specify "oddjob.ManifestTextInputFormat" for input_format.
lib_jar: points to jar defining input_format, if any.
n_reduce_tasks: number of reducer tasks to use in upstream tasks.
Additional parameters are passed through to :py:class:`UsersPerCountryReport`:
counts: Location of counts per country. The format is a hadoop
tsv file, with fields country, count, and date.
report: Location of the resulting report. The output format is a
excel csv file with country and count.
"""
name = luigi.Parameter()
src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
manifest = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None)
end_date = luigi.DateParameter()
geolocation_data = luigi.Parameter()
mapreduce_engine = luigi.Parameter(
default_from_config={'section': 'map-reduce', 'name': 'engine'}
)
lib_jar = luigi.Parameter(is_list=True, default=[])
n_reduce_tasks = luigi.Parameter(default=25)
def requires(self):
return UsersPerCountry(
mapreduce_engine=self.mapreduce_engine,
lib_jar=self.lib_jar,
base_input_format=self.base_input_format,
n_reduce_tasks=self.n_reduce_tasks,
src=self.src,
dest=self.counts,
include=self.include,
name=self.name,
manifest=self.manifest,
geolocation_data=self.geolocation_data,
end_date=self.end_date,
)
...@@ -7,6 +7,7 @@ numpy==1.8.0 ...@@ -7,6 +7,7 @@ numpy==1.8.0
oursql==0.9.3.1 oursql==0.9.3.1
pandas==0.13.0 pandas==0.13.0
pbr==0.5.23 pbr==0.5.23
pygeoip==0.3.1
python-cjson==1.0.5 python-cjson==1.0.5
stevedore==0.14.1 stevedore==0.14.1
tornado==3.1.1 tornado==3.1.1
......
...@@ -32,6 +32,7 @@ edx.analytics.tasks = ...@@ -32,6 +32,7 @@ edx.analytics.tasks =
sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
last-country = edx.analytics.tasks.user_location:LastCountryForEachUser
mapreduce.engine = mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment