Commit c1164ce5 by Brian Wilson

Add tasks to calculate course enrollment.

parent 80b4b413
# .coveragerc for analytics-tasks
[run]
data_file = .coverage
source = edx/analytics
[report]
ignore_errors = True
[html]
title = Analytics-Tasks Python Test Coverage Report
directory = report
[xml]
output = coverage.xml
[pep8]
ignore=E501
\ No newline at end of file
......@@ -11,3 +11,20 @@ requirements:
test-requirements: requirements
pip install -r requirements/test.txt
test: test-requirements
rm -rf .coverage
python -m coverage run --rcfile=./.coveragerc `which nosetests`
coverage: test
coverage html
coverage xml -o coverage.xml
diff-cover coverage.xml --html-report diff_cover.html
# Compute quality
diff-quality --violations=pep8 --html-report diff_quality_pep8.html
diff-quality --violations=pylint --html-report diff_quality_pylint.html
# Compute style violations
pep8 > pep8.report || echo "Not pep8 clean"
pylint -f parseable edx > pylint.report || echo "Not pylint clean"
"""
Helper classes to specify file dependencies for input and output.
Supports inputs from S3 and local FS.
Supports outputs to HDFS, S3, and local FS.
"""
import os
import boto
import glob
from urlparse import urlparse
from fnmatch import fnmatch
import luigi
import luigi.s3
import luigi.hdfs
import luigi.format
def get_s3_bucket_key_names(url):
"""Extract bucket_name and root from S3 URL."""
parts = urlparse(url)
return (parts.netloc.strip('/'), parts.path.strip('/'))
def join_as_s3_url(bucket, root, path):
"""Construct a URL for accessing S3, given its components."""
return 's3://{bucket}/{root}/{path}'.format(bucket=bucket, root=root, path=path)
class LocalPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in a local file system.
Treats files ending with .gz as Gzip files.
"""
path = luigi.Parameter()
def output(self):
if self.path.endswith('.gz'):
yield luigi.LocalTarget(self.path, format=luigi.format.Gzip)
else:
yield luigi.LocalTarget(self.path)
class HdfsPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in HDFS.
"""
path = luigi.Parameter()
def output(self):
return luigi.hdfs.HdfsTarget(self.path)
class PathSetTask(luigi.Task):
"""
A task to select a subset of files in an S3 bucket or local FS.
Parameters:
src: a URL pointing to a folder in s3:// or local FS.
include: a list of patterns to use to select. Multiple patterns are OR'd.
run_locally: if True, use S3PathTask instead of HDFSPathTask, to permit
reading S3 data when running in local mode.
"""
src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3()
def requires(self):
if self.src.startswith('s3'):
for bucket, root, path in self._generate_sources():
source = join_as_s3_url(bucket, root, path)
if self.run_locally:
yield luigi.s3.S3PathTask(source)
else:
yield HdfsPathTask(source)
else:
filelist = []
for include_val in self.include:
glob_pattern = "{src}/{include}".format(src=self.src, include=include_val)
filelist.extend(glob.glob(glob_pattern))
for filepath in filelist:
yield LocalPathTask(filepath)
def complete(self):
# An optimization: just declare that the task is always
# complete, by definition, because it is whatever files were
# requested that match the filter, not a set of files whose
# existence needs to be checked or generated again.
return True
def output(self):
return [task.output() for task in self.requires()]
def _generate_sources(self):
bucket_name, root = get_s3_bucket_key_names(self.src)
bucket = self.s3.get_bucket(bucket_name)
keys = (s.key for s in bucket.list(root) if s.size > 0)
# remove root
paths = (k.lstrip(root).strip('/') for k in keys)
paths = self._filter_matches(paths)
return ((bucket.name, root, path) for path in paths)
def _filter_matches(self, names):
patterns = self.include
fn = lambda n: any(fnmatch(n, p) for p in patterns)
return (n for n in names if fn(n))
def get_target_for_url(dest, output_name, run_locally=False):
"""
Generate an appropriate target for a given path, depending on protocol.
Parameters:
dest: a URL pointing to a folder in s3:// or hdfs:// or local FS.
output_name: name of file to be output.
run_locally: if True, use S3Target instead of HdfsTarget, to permit
writing S3 data when running in local mode.
"""
output_url = os.path.join(dest, output_name)
if output_url.startswith('s3://'):
if run_locally:
return luigi.s3.S3Target(output_url)
else:
return luigi.hdfs.HdfsTarget(output_url)
elif output_url.startswith('hdfs://'):
return luigi.hdfs.HdfsTarget(output_url)
else:
return luigi.LocalTarget(output_url)
"""Support for reading tracking event logs."""
import sys
import cjson
import datetime
import re
PATTERN_JSON = re.compile(r'^.*?(\{.*\})\s*$')
def get_datetime_string(timestamp):
return timestamp.strftime('%Y-%m-%dT%H:%M:%S')
def get_date_string(timestamp):
return timestamp.strftime('%Y-%m-%d')
def get_date_from_datetime(datetime_string):
return datetime_string.split('T')[0]
def json_decode(line):
"""Wrapper to decode JSON string in implementation-independent way."""
return cjson.decode(line)
def parse_eventlog_item(line, nested=False):
""" Parse a tracking log input line as JSON to create a dict representation."""
try:
parsed = json_decode(line)
except:
if not nested:
json_match = PATTERN_JSON.match(line)
if json_match:
return parse_eventlog_item(json_match.group(1), nested=True)
# Seem to be truncated in input data at 10000 for some log files, 2043 for others...
# First filter out common ones:
# if 'save_problem_check' not in line:
# sys.stderr.write("ERROR: encountered event with bad json: length = {len} start={start}\n".format(len=len(line), start=line[:40]))
# Even that leaves too many to log.
# TODO: Might be good going forward to collect stats on the length of truncation and the counts for
# different event "names" (normalized event_type values).
return None
return parsed
def log_item(msg, item, level='ERROR'):
sys.stderr.write("{level}: {msg}: {item}\n".format(msg=msg, item=item, level=level))
def get_timestamp(item):
try:
timestamp = item['time']
removed_ms = timestamp.split('.')[0]
return datetime.datetime.strptime(removed_ms, '%Y-%m-%dT%H:%M:%S')
except:
return None
def get_event_data(item):
event_value = item.get('event')
if event_value is None:
log_item("encountered event with missing event value", item)
return None
if isinstance(event_value, basestring):
# If the value is a string, try to parse as JSON into a dict:.
try:
event_value = json_decode(event_value)
except:
log_item("encountered event with unparsable event value", item)
return None
if isinstance(event_value, dict):
# It's fine, just return.
return event_value
else:
log_item("encountered event data with unrecognized type", item)
return None
"""
Tests for utilities that parse event logs.
"""
import unittest
import edx.analytics.util.eventlog as eventlog
class EventLogTest(unittest.TestCase):
"""
Tests to verify that event log parsing works correctly.
"""
def test_parse_valid_eventlog_item(self):
line = '{"username": "successful"}'
result = eventlog.parse_eventlog_item(line)
self.assertTrue(isinstance(result, dict))
def test_parse_eventlog_item_truncated(self):
line = '{"username": "unsuccessful'
result = eventlog.parse_eventlog_item(line)
self.assertIsNone(result)
def test_parse_eventlog_item_with_cruft(self):
line = 'leading cruft here {"username": "successful"} '
result = eventlog.parse_eventlog_item(line)
self.assertTrue(isinstance(result, dict))
def test_parse_eventlog_item_with_nonascii(self):
line = '{"username": "b\ufffdb"}'
result = eventlog.parse_eventlog_item(line)
self.assertTrue(isinstance(result, dict))
self.assertEquals(result['username'], u'b\ufffdb')
[MASTER]
# Specify a configuration file.
#rcfile=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Profiled execution.
profile=no
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=CVS, migrations
# Pickle collected data for later comparisons.
persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once).
disable=
# Never going to use these
# I0011: Locally disabling W0232
# W0141: Used builtin function 'map'
# W0142: Used * or ** magic
# R0921: Abstract class not referenced
# R0922: Abstract class is only referenced 1 times
I0011,W0141,W0142,R0921,R0922,
# Django makes classes that trigger these
# W0232: Class has no __init__ method
W0232,
# Might use these when the code is in better shape
# C0302: Too many lines in module
# R0201: Method could be a function
# R0901: Too many ancestors
# R0902: Too many instance attributes
# R0903: Too few public methods (1/2)
# R0904: Too many public methods
# R0911: Too many return statements
# R0912: Too many branches
# R0913: Too many arguments
# R0914: Too many local variables
C0302,R0201,R0901,R0902,R0903,R0904,R0911,R0912,R0913,R0914
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Include message's id in output
include-ids=yes
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]".
files-output=no
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Add a comment according to your evaluation note. This is used by the global
# evaluation report (RP0004).
comment=no
[TYPECHECK]
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
ignored-classes=SQLObject
# When zope mode is activated, add a predefined set of Zope acquired attributes
# to generated-members.
zope=no
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E0201 when accessed. Python regular
# expressions are accepted.
generated-members=
REQUEST,
acl_users,
aq_parent,
objects,
DoesNotExist,
can_read,
can_write,
get_url,
size,
content,
status_code,
# For factory_boy factories
create
[BASIC]
# Required attributes for module, separated by a comma
required-attributes=
# List of builtins function names that should not be used, separated by a comma
bad-functions=map,filter,apply,input
# Regular expression which should only match correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression which should only match correct module level names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__)|log|urlpatterns)$
# Regular expression which should only match correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Regular expression which should only match correct function names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct method names
method-rgx=([a-z_][a-z0-9_]{2,60}|setUp|set[Uu]pClass|tearDown|tear[Dd]ownClass|assert[A-Z]\w*)$
# Regular expression which should only match correct instance attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match correct list comprehension /
# generator expression variable names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Regular expression which should only match functions or classes name which do
# not require a docstring
no-docstring-rgx=__.*__|test_.*|setUp|tearDown
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=120
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the beginning of the name of dummy variables
# (i.e. not used).
dummy-variables-rgx=_|dummy|unused|.*_unused
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,string,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
[DESIGN]
# Maximum number of arguments for function / method
max-args=5
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of branch for function / method body
max-branchs=12
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
[CLASSES]
# List of interface methods to ignore, separated by a comma. This is used for
# instance to not check methods defines in Zope's Interface base class.
ignore-iface-methods=isImplementedBy,deferred,extends,names,namesAndDescriptions,queryDescriptionFor,getBases,getDescriptionFor,getDoc,getName,getTaggedValue,getTaggedValueTags,isEqualOrExtendedBy,setTaggedValue,isImplementedByInstancesOf,adaptWith,is_implemented_by
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=Exception
......@@ -6,5 +6,6 @@ pbr==0.5.23
stevedore==0.13
tornado==3.1.1
ansible==1.4.4
python-cjson==1.0.5
-e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi
nose
nose-ignore-docstring
coverage==3.7
pep8==1.4.5
pylint==0.28
diff-cover >= 0.2.1
......@@ -19,8 +19,10 @@ data_files =
console_scripts =
launch-task = edx.analytics.tasks.main:main
remote-task = edx.analytics.tasks.remote:main
edx.analytics.tasks =
s3-copy = edx.analytics.tasks.s3:S3Copy
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
\ No newline at end of file
enrollments-report = edx.analytics.reports.enrollments:EnrollmentsByWeek
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentTotalsPerDay
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment