Commit e72d3ac3 by Gabe Mulley

Gather user registration data

* Add support for gathering data from MySQL databases.
* Gather user registration data from a database that is expected to conform to the LMS schema.

Fixes: AN-515

Change-Id: Ic0cfd2d608d209f33a29a192dac041a9810805ba
parent 54f60b75
...@@ -6,6 +6,10 @@ install: requirements ...@@ -6,6 +6,10 @@ install: requirements
develop: requirements develop: requirements
python setup.py develop python setup.py develop
system-requirements:
sudo apt-get update -q
sudo apt-get install -y -q libmysqlclient-dev
requirements: requirements:
pip install -r requirements/default.txt pip install -r requirements/default.txt
......
"""
Gather data using SQL queries run on MySQL databases.
"""
from __future__ import absolute_import
from contextlib import closing
import csv
import datetime
import json
import luigi
import oursql
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.url import get_target_from_url
from edx.analytics.tasks.url import url_path_join
class MysqlTask(luigi.Task):
"""
A task that reads data out of a MySQL database and writes it to a file in TSV format. In order to protect the
database access credentials they are loaded from an external file which can be secured appropriately. The
credentials file is expected to be JSON formatted and contain a simple map specifying the host, port, username
password and database.
Parameters:
credentials: Path to the external access credentials file.
destination: The directory to write the TSV file to.
Example Credentials File::
{
"host": "db.example.com",
"port": "3306",
"username": "exampleuser",
"password": "example password",
"database": "exampledata"
}
"""
# TODO: Defaults from config file
credentials = luigi.Parameter()
destination = luigi.Parameter()
converters = [
(datetime.date, lambda date: date.strftime('%Y-%m-%d'))
]
@property
def query(self):
"""
The SQL query to execute on the database. Results will be streamed in to the TSV output file. Parameters
can be marked by a question mark (?) and are populated using the tuple returned by the property
`query_parameters`.
Returns:
A string containing the SQL query.
"""
return "SELECT 1"
@property
def query_parameters(self):
"""
Parameters to pass in to the query. Each element of the tuple will be substituted in to the corresponding
? parameter locator in the query. The first element will replace the first question mark, the second will
replace the second etc.
Returns:
A tuple.
"""
return tuple()
@property
def filename(self):
"""
The name of the output file in the destination directory.
Returns:
A string containing the file name.
"""
raise NotImplementedError # pragma: no cover
def requires(self):
return {
'credentials': ExternalURL(url=self.credentials)
}
def output(self):
url_with_filename = url_path_join(self.destination, self.filename)
return get_target_from_url(url_with_filename)
def run(self):
with self.connect() as conn:
with conn.cursor() as cursor:
cursor.execute(self.query, self.query_parameters)
with self.output().open('w') as output_file: # pylint: disable=maybe-no-member
self.write_results_to_tsv(cursor, output_file)
def connect(self):
"""
Gathers the secure connection parameters from an external file and uses them to establish a connection to the
MySQL database specified in the secure parameters.
Returns:
A context manager that holds open a connection to the database while inside the scope.
"""
cred = {}
with self.input()['credentials'].open('r') as credentials_file:
cred = json.load(credentials_file)
conn = oursql.connect(
host=cred['host'],
user=cred['username'],
passwd=cred['password'],
db=cred['database'],
port=int(cred['port'])
)
return closing(conn)
def write_results_to_tsv(self, cursor, output_file):
"""
Streams results from the cursor to the output file in TSV format.
Args:
cursor (oursql.Cursor): A cursor that has already been used to execute an SQL query and now can be used
to fetch results.
output_file (file): A file-like object that the records will be written to.
"""
writer = csv.writer(output_file, delimiter="\t", quoting=csv.QUOTE_NONE)
while True:
row = cursor.fetchone()
if row is None:
break
# column values are native python objects, convert them to strings before writing them to the file
writer.writerow([self.convert(v) for v in row])
def convert(self, value):
"""
Converts a python object in to a string that is safe to write out to the TSV file. If an explicit converter
is not available for a given type then it is converted using `str()`.
Args:
value (obj): A python object.
Returns:
A string representation of the object or - if `value` is None.
"""
converted_value = u'-'
if value is not None:
converter = lambda x: x # Noop
for converter_spec in self.converters:
if isinstance(value, converter_spec[0]):
converter = converter_spec[1]
break
converted_value = converter(value)
return unicode(converted_value).encode('utf-8')
def mysql_datetime(datetime_object):
"""
Convert a python datetime object to a string that can be parsed by MySQL as a timestamp.
Args:
datetime_object (datetime.datetime): The native python representation of the timestamp.
Returns:
A string representation of the timestamp in the form "YYYY-MM-DD hh:mm:ss" (e.g. "2014-01-30 22:30:05").
"""
return datetime_object.strftime('%Y-%m-%d %H:%M:%S')
from contextlib import contextmanager
import datetime import datetime
import textwrap import textwrap
from StringIO import StringIO from StringIO import StringIO
...@@ -10,28 +9,11 @@ from numpy import isnan ...@@ -10,28 +9,11 @@ from numpy import isnan
import pandas import pandas
from edx.analytics.tasks.tests import unittest from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget
from edx.analytics.tasks.reports.enrollments import EnrollmentsByWeek from edx.analytics.tasks.reports.enrollments import EnrollmentsByWeek
from edx.analytics.tasks.reports.enrollments import ExternalURL from edx.analytics.tasks.reports.enrollments import ExternalURL
class FakeTarget(object):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def __init__(self, value=''):
self.buffer = StringIO(value)
# Rewind the buffer head so the value can be read
self.buffer.seek(0)
@contextmanager
def open(self, *args, **kwargs):
yield self.buffer
# Rewind the head for easy reading
self.buffer.seek(0)
class TestEnrollmentsByWeek(unittest.TestCase): class TestEnrollmentsByWeek(unittest.TestCase):
def run_task(self, source, date, weeks, offset=None, statuses=None): def run_task(self, source, date, weeks, offset=None, statuses=None):
......
"""Tests for Total Users and Enrollment report.""" """Tests for Total Users and Enrollment report."""
from contextlib import contextmanager
import datetime import datetime
import textwrap import textwrap
from StringIO import StringIO from StringIO import StringIO
...@@ -13,23 +12,7 @@ import pandas ...@@ -13,23 +12,7 @@ import pandas
from edx.analytics.tasks.reports.total_enrollments import TotalUsersAndEnrollmentsByWeek, TOTAL_ENROLLMENT_ROWNAME from edx.analytics.tasks.reports.total_enrollments import TotalUsersAndEnrollmentsByWeek, TOTAL_ENROLLMENT_ROWNAME
from edx.analytics.tasks.tests import unittest from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget
class FakeTarget(object):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def __init__(self, value=''):
self.buffer = StringIO(value)
# Rewind the buffer head so the value can be read
self.buffer.seek(0)
@contextmanager
def open(self, *args, **kwargs):
yield self.buffer
# Rewind the head for easy reading
self.buffer.seek(0)
class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase): class TestTotalUsersAndEnrollmentsByWeek(unittest.TestCase):
......
"""
Emulates a luigi target, storing all data in memory.
"""
from contextlib import contextmanager
from StringIO import StringIO
class FakeTarget(object):
"""
Fake luigi like target that saves data in memory, using a
StringIO buffer.
"""
def __init__(self, value=''):
self.buffer = StringIO(value)
# Rewind the buffer head so the value can be read
self.buffer.seek(0)
@contextmanager
def open(self, *args, **kwargs): # pylint: disable=unused-argument
"""
Returns:
A file-like object that can be used to read the data that is stored in the buffer.
"""
try:
yield self.buffer
finally:
self.buffer.seek(0)
"""
Ensure we can read from MySQL data sources.
"""
from __future__ import absolute_import
import datetime
import textwrap
import luigi
from mock import MagicMock
from mock import patch
from mock import sentinel
from pandas import read_csv
from edx.analytics.tasks.mysql import MysqlTask
from edx.analytics.tasks.mysql import mysql_datetime
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget
class ConversionTestCase(unittest.TestCase):
"""
Ensure we can reliably convert native python data types to strings.
"""
def setUp(self):
self.task = MysqlTask(
credentials=sentinel.ignored,
destination=sentinel.ignored
)
def test_convert_datetime(self):
self.assert_converted_string_equals(
datetime.datetime.strptime('2014-01-02', '%Y-%m-%d').date(), '2014-01-02'
)
def assert_converted_string_equals(self, obj, expected_string):
"""
Args:
obj (mixed): Any object.
expected_string (str): The expected string representation of `obj`.
Raises:
AssertionError: iff the string resulting from converting `obj` to a string does not match the
expected string.
"""
self.assertEquals(self.task.convert(obj), expected_string)
def test_convert_integer(self):
self.assert_converted_string_equals(
10, '10'
)
def test_convert_none(self):
self.assert_converted_string_equals(
None, '-'
)
def test_convert_unicode(self):
self.assert_converted_string_equals(
u'\u0669(\u0361\u0e4f\u032f\u0361\u0e4f)\u06f6',
u'\u0669(\u0361\u0e4f\u032f\u0361\u0e4f)\u06f6'.encode('utf-8')
)
class MysqlTaskTestCase(unittest.TestCase):
"""
Ensure we can connect to and read data from MySQL data sources.
"""
def setUp(self):
patcher = patch('edx.analytics.tasks.mysql.oursql')
self.mock_oursql = patcher.start()
self.addCleanup(patcher.stop)
mock_conn = self.mock_oursql.connect.return_value # pylint: disable=maybe-no-member
mock_cursor_ctx = mock_conn.cursor.return_value
self.mock_cursor = mock_cursor_ctx.__enter__.return_value
# By default, emulate 0 results returned
self.mock_cursor.fetchone.return_value = None
def run_task(self, credentials=None, query=None):
"""
Emulate execution of a generic MysqlTask.
"""
if not credentials:
credentials = '''\
{
"host": "db.example.com",
"port": "3306",
"username": "exampleuser",
"password": "example password",
"database": "exampledata"
}'''
if not query:
query = 'SELECT 1'
# Create a dummy task that simply returns the parameters given
class TestTask(MysqlTask):
"""A generic MysqlTask that wraps the parameters from the enclosing function"""
@property
def query(self):
return query
@property
def filename(self):
return None # pragma: no cover
task = TestTask(
credentials=sentinel.ignored,
destination=sentinel.ignored
)
fake_input = {
'credentials': FakeTarget(textwrap.dedent(credentials))
}
task.input = MagicMock(return_value=fake_input)
output_target = FakeTarget()
task.output = MagicMock(return_value=output_target)
task.run()
try:
parsed = read_csv(output_target.buffer,
header=None,
sep="\t",
na_values=['-'],
encoding='utf-8')
except ValueError:
parsed = None
return parsed
def test_connect_with_missing_credentials(self):
with self.assertRaises(KeyError):
self.run_task('{}')
def test_connect_with_credential_syntax_error(self):
with self.assertRaises(ValueError):
self.run_task('{')
def test_connect_with_complete_credentials(self):
self.run_task()
def test_execute_query(self):
self.mock_cursor.fetchone.side_effect = [
(2L,),
(3L,),
(10L,),
None
]
output = self.run_task(query=sentinel.query)
self.mock_cursor.execute.assert_called_once_with(sentinel.query, tuple())
self.assertEquals(output[0][0], 2)
self.assertEquals(output[0][1], 3)
self.assertEquals(output[0][2], 10)
def test_unicode_results(self):
unicode_string = u'\u0669(\u0361\u0e4f\u032f\u0361\u0e4f)\u06f6'
self.mock_cursor.fetchone.side_effect = [
(unicode_string,),
None
]
output = self.run_task(query=sentinel.query)
self.assertEquals(output[0][0], unicode_string)
def test_default_attributes(self):
destination = 'file:///tmp/foo'
class GenericTask(MysqlTask):
"""A dummy task used to ensure defaults are reasonable"""
@property
def filename(self):
return 'bar'
task = GenericTask(
credentials=sentinel.credentials,
destination=destination
)
self.assertEquals(task.credentials, sentinel.credentials)
self.assertEquals(task.destination, destination)
self.assertEquals(task.query, 'SELECT 1')
self.assertEquals(task.query_parameters, tuple())
self.assertIsInstance(task.requires()['credentials'], ExternalURL)
self.assertEquals(task.requires()['credentials'].url, sentinel.credentials)
self.assertIsInstance(task.output(), luigi.LocalTarget)
self.assertEquals(task.output().path, '/tmp/foo/bar') # pylint: disable=maybe-no-member
def test_mysql_timestamp(self):
string_timestamp = '2014-01-02 13:10:11'
timestamp = datetime.datetime.strptime(string_timestamp, '%Y-%m-%d %H:%M:%S')
self.assertEquals(mysql_datetime(timestamp), string_timestamp)
"""
Ensure the user registrations task is setup properly.
"""
from __future__ import absolute_import
from luigi import date_interval
from mock import sentinel
from edx.analytics.tasks.user_registrations import UserRegistrationsPerDay
from edx.analytics.tasks.tests import unittest
class UserRegistrationsPerDayTestCase(unittest.TestCase): # pylint: disable=missing-docstring
def test_day(self):
task = self.create_task(date_interval.Date.parse('2014-01-01'))
self.assertEquals(task.query_parameters, ('2014-01-01 00:00:00', '2014-01-02 00:00:00'))
def create_task(self, interval):
"""
Args:
interval (luigi.date_interval.DateInterval): The interval of dates to gather data for.
Returns:
A dummy task that uses the provided `interval` as it's date_interval.
"""
return UserRegistrationsPerDay(
credentials=sentinel.ignored,
destination=sentinel.ignored,
date_interval=interval
)
def test_week(self):
task = self.create_task(date_interval.Week.parse('2014-W01'))
self.assertEquals(task.query_parameters, ('2013-12-30 00:00:00', '2014-01-06 00:00:00'))
def test_month(self):
task = self.create_task(date_interval.Month.parse('2014-01'))
self.assertEquals(task.query_parameters, ('2014-01-01 00:00:00', '2014-02-01 00:00:00'))
def test_year(self):
task = self.create_task(date_interval.Year.parse('2014'))
self.assertEquals(task.query_parameters, ('2014-01-01 00:00:00', '2015-01-01 00:00:00'))
def test_static_parameters(self):
task = self.create_task(date_interval.Year.parse('2014'))
self.assertEquals(task.query, "SELECT DATE(date_joined), COUNT(1) FROM `auth_user`"
" WHERE `date_joined` >= ? AND `date_joined` < ? GROUP BY DATE(date_joined)"
" ORDER BY 1 ASC")
self.assertEquals(task.filename, 'user_registrations_2014.tsv')
"""
Determine the number of users that registered accounts each day.
"""
from __future__ import absolute_import
import datetime
import luigi
from edx.analytics.tasks.mysql import MysqlTask
from edx.analytics.tasks.mysql import mysql_datetime
class UserRegistrationsPerDay(MysqlTask):
"""
Determine the number of users that registered accounts each day.
Parameters:
date_interval: The range of dates to gather data for.
"""
date_interval = luigi.DateIntervalParameter()
@property
def query(self):
return ("SELECT DATE(date_joined), COUNT(1) FROM `auth_user`"
" WHERE `date_joined` >= ? AND `date_joined` < ? GROUP BY DATE(date_joined) ORDER BY 1 ASC")
@property
def query_parameters(self):
dates = self.date_interval.dates() # pylint: disable=no-member
start_date = dates[0]
# Note that we could probably use the end date at 23:59:59, however, it's easier to just add a day and use the
# next day as an excluded upper bound on the interval. So we actually select all data earlier than
# 00:00:00.000 on the day following the last day in the interval.
end_date = dates[-1] + datetime.timedelta(1)
return (
mysql_datetime(start_date),
mysql_datetime(end_date)
)
@property
def filename(self):
return 'user_registrations_{0}.tsv'.format(self.date_interval)
...@@ -7,5 +7,6 @@ stevedore==0.13 ...@@ -7,5 +7,6 @@ stevedore==0.13
tornado==3.1.1 tornado==3.1.1
ansible==1.4.4 ansible==1.4.4
python-cjson==1.0.5 python-cjson==1.0.5
oursql==0.9.3.1
-e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi -e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi
...@@ -26,6 +26,7 @@ edx.analytics.tasks = ...@@ -26,6 +26,7 @@ edx.analytics.tasks =
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
users-per-day = edx.analytics.tasks.user_registrations:UserRegistrationsPerDay
mapreduce.engine = mapreduce.engine =
hadoop = luigi.hadoop:DefaultHadoopJobRunner hadoop = luigi.hadoop:DefaultHadoopJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment