Commit 319069b5 by Gabe Mulley

centralize resolution of external URL resources

Change-Id: I215651f104b0f486c766abc94e7da21d9ab4c9e2
parent 064fc1b0
[core]
logging_conf_file=logging.cfg
[map-reduce]
engine = hadoop
[event-logs]
source = s3://edx-all-tracking-logs
destination = s3://edx-analytics-events/raw/by-server
......
......@@ -2,12 +2,12 @@
Luigi tasks for extracting course enrollment statistics from tracking log files.
"""
import luigi
import luigi.hadoop
import luigi.s3
import luigi.hdfs
import edx.analytics.tasks.util.eventlog as eventlog
from edx.analytics.tasks.pathutil import get_target_for_url, PathSetTask
from edx.analytics.tasks.mapreduce import MapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import get_target_from_url, url_path_join
import logging
logger = logging.getLogger(__name__)
......@@ -164,7 +164,7 @@ class CourseEnrollmentChangesPerDayMixin(object):
# Task requires/output definitions
##################################
class BaseCourseEnrollmentTask(luigi.hadoop.JobTask):
class BaseCourseEnrollmentTask(MapReduceJobTask):
"""
Base class for course enrollment calculations.
......@@ -175,14 +175,11 @@ class BaseCourseEnrollmentTask(luigi.hadoop.JobTask):
dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*'].
run_locally: a boolean flag to indicate that the task should be run locally rather than
on a hadoop cluster. This is used only to change the intepretation of S3 URLs in src and/or dest.
"""
name = luigi.Parameter()
src = luigi.Parameter()
dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
run_locally = luigi.BooleanParameter()
def extra_modules(self):
# The following are needed for (almost) every course enrollment task.
......@@ -197,22 +194,22 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse
"""Calculates daily change in enrollment for a user in a course, given raw event log input."""
def requires(self):
return PathSetTask(self.src, self.include, self.run_locally)
return PathSetTask(self.src, self.include)
def output(self):
output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
return get_target_from_url(url_path_join(self.dest, output_name))
class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily changes in enrollment, given per-user net changes by date."""
def requires(self):
return CourseEnrollmentEventsPerDay(self.name, self.src, self.dest, self.include, self.run_locally)
return CourseEnrollmentEventsPerDay(self.mapreduce_engine, self.name, self.src, self.dest, self.include)
def output(self):
output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name)
return get_target_for_url(self.dest, output_name, self.run_locally)
return get_target_from_url(url_path_join(self.dest, output_name))
################################
......
"""
Support executing map reduce tasks.
"""
from __future__ import absolute_import
import luigi.hadoop
from stevedore import ExtensionManager
class MapReduceJobTask(luigi.hadoop.JobTask):
"""
Execute a map reduce job. Typically using Hadoop, but can execute the
jobs in process as well.
"""
mapreduce_engine = luigi.Parameter(
default_from_config={'section': 'map-reduce', 'name': 'engine'}
)
def job_runner(self):
extension_manager = ExtensionManager('mapreduce.engine')
try:
engine_class = extension_manager[self.mapreduce_engine].plugin
except KeyError:
raise KeyError('A map reduce engine must be specified in order to run MapReduceJobTasks')
return engine_class()
......@@ -16,33 +16,7 @@ import luigi.hdfs
import luigi.format
from edx.analytics.tasks.s3_util import join_as_s3_url, generate_s3_sources
class LocalPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in a local file system.
Treats files ending with .gz as Gzip files.
"""
path = luigi.Parameter()
def output(self):
if self.path.endswith('.gz'):
yield luigi.LocalTarget(self.path, format=luigi.format.Gzip)
else:
yield luigi.LocalTarget(self.path)
class HdfsPathTask(luigi.ExternalTask):
"""
An external task that to require existence of
a path in HDFS.
"""
path = luigi.Parameter()
def output(self):
return luigi.hdfs.HdfsTarget(self.path)
from edx.analytics.tasks.url import ExternalURL, url_path_join
class PathSetTask(luigi.Task):
......@@ -53,16 +27,9 @@ class PathSetTask(luigi.Task):
src: a URL pointing to a folder in s3:// or local FS.
include: a list of patterns to use to select. Multiple patterns are OR'd.
run_locally: if True, use S3PathTask instead of HDFSPathTask, to permit
reading S3 data when running in local mode.
"""
src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
# TODO: modify this to get default values from a configuration file,
# and use that to determine whether running in a cluster or locally.
# It will be decoupled from the use of S3PathTask/HDFSPathTask.
# Instead, these will be distinguished by different protocol names.
run_locally = luigi.BooleanParameter()
def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs)
......@@ -74,18 +41,15 @@ class PathSetTask(luigi.Task):
if self.s3_conn is None:
self.s3_conn = boto.connect_s3()
for bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = join_as_s3_url(bucket, root, path)
if self.run_locally:
yield luigi.s3.S3PathTask(source)
else:
yield HdfsPathTask(source)
source = url_path_join(self.src, root, path)
yield ExternalURL(source)
else:
filelist = []
for include_val in self.include:
glob_pattern = "{src}/{include}".format(src=self.src, include=include_val)
filelist.extend(glob.glob(glob_pattern))
for filepath in filelist:
yield LocalPathTask(filepath)
yield ExternalURL(filepath)
def complete(self):
# An optimization: just declare that the task is always
......@@ -96,27 +60,3 @@ class PathSetTask(luigi.Task):
def output(self):
return [task.output() for task in self.requires()]
def get_target_for_url(dest, output_name, run_locally=False):
"""
Generate an appropriate target for a given path, depending on protocol.
Parameters:
dest: a URL pointing to a folder in s3:// or hdfs:// or local FS.
output_name: name of file to be output.
run_locally: if True, use S3Target instead of HdfsTarget, to permit
writing S3 data when running in local mode.
"""
output_url = os.path.join(dest, output_name)
if output_url.startswith('s3://'):
if run_locally:
return luigi.s3.S3Target(output_url)
else:
return luigi.hdfs.HdfsTarget(output_url)
elif output_url.startswith('hdfs://'):
return luigi.hdfs.HdfsTarget(output_url)
else:
return luigi.LocalTarget(output_url)
......@@ -9,6 +9,8 @@ import luigi.hdfs
import numpy
import pandas
from edx.analytics.tasks.url import ExternalURL, get_target_from_url
class EnrollmentsByWeek(luigi.Task):
"""Calculates cumulative enrollments per week per course.
......@@ -248,24 +250,3 @@ class EnrollmentsByWeek(luigi.Task):
'''
split_course = course_id.split('/')
return '-' if len(split_course) != 3 else split_course[0]
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
url = luigi.Parameter()
def output(self):
return get_target_from_url(self.url)
def get_target_from_url(url):
"""Returns a luigi target based on the url scheme"""
# TODO: Make external utility to resolve target by URL,
# including s3, s3n, etc.
if url.startswith('hdfs://') or url.startswith('s3://'):
if url.endswith('/'):
return luigi.hdfs.HdfsTarget(url, format=luigi.hdfs.PlainDir)
else:
return luigi.hdfs.HdfsTarget(url)
else:
return luigi.LocalTarget(url)
from unittest import TestCase
import luigi
import luigi.format
import luigi.hdfs
import luigi.s3
from edx.analytics.tasks import url
class TargetFromUrlTestCase(TestCase):
def test_hdfs_scheme(self):
for test_url in ['s3://foo/bar', 'hdfs://foo/bar', 's3n://foo/bar']:
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.hdfs.HdfsTarget)
self.assertEquals(target.path, test_url)
def test_file_scheme(self):
path = '/foo/bar'
for test_url in [path, 'file://' + path]:
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.LocalTarget)
self.assertEquals(target.path, path)
def test_s3_https_scheme(self):
test_url = 's3+https://foo/bar'
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.s3.S3Target)
self.assertEquals(target.path, test_url)
def test_hdfs_directory(self):
test_url = 's3://foo/bar/'
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.hdfs.HdfsTarget)
self.assertEquals(target.path, test_url)
self.assertEquals(target.format, luigi.hdfs.PlainDir)
def test_gzip_local_file(self):
test_url = '/foo/bar.gz'
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.LocalTarget)
self.assertEquals(target.path, test_url)
self.assertEquals(target.format, luigi.format.Gzip)
class UrlPathJoinTestCase(TestCase):
def test_relative(self):
self.assertEquals(url.url_path_join('s3://foo/bar', 'baz'), 's3://foo/bar/baz')
def test_absolute(self):
self.assertEquals(url.url_path_join('s3://foo/bar', '/baz'), 's3://foo/baz')
def test_attempted_special_elements(self):
self.assertEquals(url.url_path_join('s3://foo/bar', './baz'), 's3://foo/bar/./baz')
self.assertEquals(url.url_path_join('s3://foo/bar', '../baz'), 's3://foo/bar/../baz')
def test_no_path(self):
self.assertEquals(url.url_path_join('s3://foo', 'baz'), 's3://foo/baz')
def test_no_netloc(self):
self.assertEquals(url.url_path_join('file:///foo/bar', 'baz'), 'file:///foo/bar/baz')
def test_extra_separators(self):
self.assertEquals(url.url_path_join('s3://foo/bar', '///baz'), 's3://foo///baz')
self.assertEquals(url.url_path_join('s3://foo/bar', 'baz//bar'), 's3://foo/bar/baz//bar')
def test_extra_separators(self):
self.assertEquals(url.url_path_join('s3://foo/bar', '///baz'), 's3://foo///baz')
def test_query_string(self):
self.assertEquals(url.url_path_join('s3://foo/bar?x=y', 'baz'), 's3://foo/bar/baz?x=y')
def test_multiple_elements(self):
self.assertEquals(url.url_path_join('s3://foo', 'bar', 'baz'), 's3://foo/bar/baz')
self.assertEquals(url.url_path_join('s3://foo', 'bar/bing', 'baz'), 's3://foo/bar/bing/baz')
"""
Support URLs. Specifically, we want to be able to refer to data stored in a
variety of locations and formats using a standard URL syntax.
Examples::
s3://some-bucket/path/to/file
/path/to/local/file.gz
hdfs://some/directory/
"""
from __future__ import absolute_import
import os
import urlparse
import luigi
import luigi.format
import luigi.hdfs
import luigi.s3
class ExternalURL(luigi.ExternalTask):
"""Simple Task that returns a target based on its URL"""
url = luigi.Parameter()
def output(self):
return get_target_from_url(self.url)
DEFAULT_TARGET_CLASS = luigi.LocalTarget
URL_SCHEME_TO_TARGET_CLASS = {
'hdfs': luigi.hdfs.HdfsTarget,
's3': luigi.hdfs.HdfsTarget,
's3n': luigi.hdfs.HdfsTarget,
'file': luigi.LocalTarget,
's3+https': luigi.s3.S3Target,
}
def get_target_from_url(url):
"""Returns a luigi target based on the url scheme"""
parsed_url = urlparse.urlparse(url)
target_class = URL_SCHEME_TO_TARGET_CLASS.get(parsed_url.scheme, DEFAULT_TARGET_CLASS)
kwargs = {}
if issubclass(target_class, luigi.hdfs.HdfsTarget) and url.endswith('/'):
kwargs['format'] = luigi.hdfs.PlainDir
if issubclass(target_class, luigi.LocalTarget):
url = parsed_url.path
if url.endswith('.gz'):
kwargs['format'] = luigi.format.Gzip
return target_class(url, **kwargs)
def url_path_join(url, *extra_path):
"""
Extend the path component of the given URL. Relative paths extend the
existing path, absolute paths replace it. Special path elements like '.'
and '..' are not treated any differently than any other path element.
Examples:
url=http://foo.com/bar, extra_path=baz -> http://foo.com/bar/baz
url=http://foo.com/bar, extra_path=/baz -> http://foo.com/baz
url=http://foo.com/bar, extra_path=../baz -> http://foo.com/bar/../baz
Args:
url (str): The URL to modify.
extra_path (str): The path to join with the current URL path.
Returns:
The URL with the path component joined with `extra_path` argument.
"""
(scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
joined_path = os.path.join(path, *extra_path)
return urlparse.urlunparse((scheme, netloc, joined_path, params, query, fragment))
......@@ -26,3 +26,7 @@ edx.analytics.tasks =
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
mapreduce.engine =
hadoop = luigi.hadoop:DefaultHadoopJobRunner
local = luigi.hadoop:LocalJobRunner
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment