Commit 3aded9ec by Gabe Mulley

Support manifest based input file lists

Change-Id: Ic059030e633848fcde13216d5a96b8b03e5c4b55
parent b16d3b46
...@@ -578,11 +578,15 @@ class BaseAnswerDistributionTask(MapReduceJobTask): ...@@ -578,11 +578,15 @@ class BaseAnswerDistributionTask(MapReduceJobTask):
dest: a URL to the root location to write output file(s). dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL. include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*']. The default value is ['*'].
manifest: a URL to a file location that can store the complete set of input files.
""" """
name = luigi.Parameter() name = luigi.Parameter()
src = luigi.Parameter() src = luigi.Parameter()
dest = luigi.Parameter() dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
# A manifest file is required by hadoop if there are too many input paths. It hits an operating system limit on the
# number of arguments passed to the mapper process on the task nodes.
manifest = luigi.Parameter(default=None)
def extra_modules(self): def extra_modules(self):
# Boto is used for S3 access and cjson for parsing log files. # Boto is used for S3 access and cjson for parsing log files.
...@@ -596,7 +600,7 @@ class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTa ...@@ -596,7 +600,7 @@ class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTa
"""Identifies last problem_check event for a user on a problem in a course, given raw event log input.""" """Identifies last problem_check event for a user on a problem in a course, given raw event log input."""
def requires(self): def requires(self):
return PathSetTask(self.src, self.include) return PathSetTask(self.src, self.include, self.manifest)
def output(self): def output(self):
output_name = u'last_problem_check_events_{name}/'.format(name=self.name) output_name = u'last_problem_check_events_{name}/'.format(name=self.name)
...@@ -611,13 +615,26 @@ class AnswerDistributionPerCourse(AnswerDistributionPerCourseMixin, BaseAnswerDi ...@@ -611,13 +615,26 @@ class AnswerDistributionPerCourse(AnswerDistributionPerCourseMixin, BaseAnswerDi
Additional Parameters: Additional Parameters:
answer_metadata: optional file to provide information about particular answers. answer_metadata: optional file to provide information about particular answers.
Includes problem_display_name, input_type, response_type, and question. Includes problem_display_name, input_type, response_type, and question.
base_input_format: The input format to use on the first map reduce job in the chain. This job takes in the most
input and may need a custom input format.
""" """
answer_metadata = luigi.Parameter(default=None) answer_metadata = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None)
def requires(self): def requires(self):
results = { results = {
'events': LastProblemCheckEvent(self.mapreduce_engine, self.name, self.src, self.dest, self.include), 'events': LastProblemCheckEvent(
mapreduce_engine=self.mapreduce_engine,
input_format=self.base_input_format,
lib_jar=self.lib_jar,
n_reduce_tasks=self.n_reduce_tasks,
name=self.name,
src=self.src,
dest=self.dest,
include=self.include,
manifest=self.manifest,
),
} }
if self.answer_metadata: if self.answer_metadata:
...@@ -660,15 +677,21 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -660,15 +677,21 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
name = luigi.Parameter(default='periodic') name = luigi.Parameter(default='periodic')
output_root = luigi.Parameter() output_root = luigi.Parameter()
answer_metadata = luigi.Parameter(default=None) answer_metadata = luigi.Parameter(default=None)
manifest = luigi.Parameter(default=None)
base_input_format = luigi.Parameter(default=None)
def requires(self): def requires(self):
return AnswerDistributionPerCourse( return AnswerDistributionPerCourse(
mapreduce_engine=self.mapreduce_engine, mapreduce_engine=self.mapreduce_engine,
lib_jar=self.lib_jar,
base_input_format=self.base_input_format,
n_reduce_tasks=self.n_reduce_tasks,
src=self.src, src=self.src,
dest=self.dest, dest=self.dest,
include=self.include, include=self.include,
name=self.name, name=self.name,
answer_metadata=self.answer_metadata answer_metadata=self.answer_metadata,
manifest=self.manifest,
) )
def mapper(self, line): def mapper(self, line):
......
...@@ -175,11 +175,15 @@ class BaseCourseEnrollmentTask(MapReduceJobTask): ...@@ -175,11 +175,15 @@ class BaseCourseEnrollmentTask(MapReduceJobTask):
dest: a URL to the root location to write output file(s). dest: a URL to the root location to write output file(s).
include: a list of patterns to be used to match input files, relative to `src` URL. include: a list of patterns to be used to match input files, relative to `src` URL.
The default value is ['*']. The default value is ['*'].
manifest: a URL to a file location that can store the complete set of input files.
""" """
name = luigi.Parameter() name = luigi.Parameter()
src = luigi.Parameter() src = luigi.Parameter()
dest = luigi.Parameter() dest = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
# A manifest file is required by hadoop if there are too many input paths. It hits an operating system limit on the
# number of arguments passed to the mapper process on the task nodes.
manifest = luigi.Parameter(default=None)
def extra_modules(self): def extra_modules(self):
# The following are needed for (almost) every course enrollment task. # The following are needed for (almost) every course enrollment task.
...@@ -194,7 +198,7 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse ...@@ -194,7 +198,7 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse
"""Calculates daily change in enrollment for a user in a course, given raw event log input.""" """Calculates daily change in enrollment for a user in a course, given raw event log input."""
def requires(self): def requires(self):
return PathSetTask(self.src, self.include) return PathSetTask(self.src, self.include, self.manifest)
def output(self): def output(self):
output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name) output_name = 'course_enrollment_events_per_day_{name}'.format(name=self.name)
...@@ -204,8 +208,20 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse ...@@ -204,8 +208,20 @@ class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourse
class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask): class CourseEnrollmentChangesPerDay(CourseEnrollmentChangesPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily changes in enrollment, given per-user net changes by date.""" """Calculates daily changes in enrollment, given per-user net changes by date."""
base_input_format = luigi.Parameter(default=None)
def requires(self): def requires(self):
return CourseEnrollmentEventsPerDay(self.mapreduce_engine, self.name, self.src, self.dest, self.include) return CourseEnrollmentEventsPerDay(
mapreduce_engine=self.mapreduce_engine,
input_format=self.base_input_format,
lib_jar=self.lib_jar,
n_reduce_tasks=self.n_reduce_tasks,
name=self.name,
src=self.src,
dest=self.dest,
include=self.include,
manifest=self.manifest
)
def output(self): def output(self):
output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name) output_name = 'course_enrollment_changes_per_day_{name}'.format(name=self.name)
......
...@@ -6,6 +6,7 @@ from __future__ import absolute_import ...@@ -6,6 +6,7 @@ from __future__ import absolute_import
import luigi import luigi
import luigi.hdfs import luigi.hdfs
import luigi.hadoop import luigi.hadoop
from luigi import configuration
from edx.analytics.tasks.url import get_target_from_url, IgnoredTarget from edx.analytics.tasks.url import get_target_from_url, IgnoredTarget
...@@ -19,6 +20,12 @@ class MapReduceJobTask(luigi.hadoop.JobTask): ...@@ -19,6 +20,12 @@ class MapReduceJobTask(luigi.hadoop.JobTask):
mapreduce_engine = luigi.Parameter( mapreduce_engine = luigi.Parameter(
default_from_config={'section': 'map-reduce', 'name': 'engine'} default_from_config={'section': 'map-reduce', 'name': 'engine'}
) )
input_format = luigi.Parameter(default=None)
lib_jar = luigi.Parameter(is_list=True, default=[])
# Override the parent class definition of this parameter. This typically wants to scale with the cluster size so the
# user should be able to tweak it depending on their particular configuration.
n_reduce_tasks = luigi.Parameter(default=25)
def job_runner(self): def job_runner(self):
# Lazily import this since this module will be loaded on hadoop worker nodes however stevedore will not be # Lazily import this since this module will be loaded on hadoop worker nodes however stevedore will not be
...@@ -31,7 +38,31 @@ class MapReduceJobTask(luigi.hadoop.JobTask): ...@@ -31,7 +38,31 @@ class MapReduceJobTask(luigi.hadoop.JobTask):
except KeyError: except KeyError:
raise KeyError('A map reduce engine must be specified in order to run MapReduceJobTasks') raise KeyError('A map reduce engine must be specified in order to run MapReduceJobTasks')
return engine_class() if issubclass(engine_class, MapReduceJobRunner):
return engine_class(libjars_in_hdfs=self.lib_jar, input_format=self.input_format)
else:
return engine_class()
class MapReduceJobRunner(luigi.hadoop.HadoopJobRunner):
"""
Support more customization of the streaming command.
Args:
libjars_in_hdfs (list): An optional list of library jars that the hadoop job can make use of.
input_format (str): An optional full class name of a hadoop input format to use.
"""
def __init__(self, libjars_in_hdfs=None, input_format=None):
libjars_in_hdfs = libjars_in_hdfs or []
config = configuration.get_config()
streaming_jar = config.get('hadoop', 'streaming-jar')
super(MapReduceJobRunner, self).__init__(
streaming_jar,
input_format=input_format,
libjars_in_hdfs=libjars_in_hdfs
)
class MultiOutputMapReduceJobTask(MapReduceJobTask): class MultiOutputMapReduceJobTask(MapReduceJobTask):
......
...@@ -15,7 +15,7 @@ import luigi.hdfs ...@@ -15,7 +15,7 @@ import luigi.hdfs
import luigi.format import luigi.format
from edx.analytics.tasks.s3_util import generate_s3_sources from edx.analytics.tasks.s3_util import generate_s3_sources
from edx.analytics.tasks.url import ExternalURL, url_path_join from edx.analytics.tasks.url import ExternalURL, url_path_join, get_target_from_url
class PathSetTask(luigi.Task): class PathSetTask(luigi.Task):
...@@ -26,15 +26,18 @@ class PathSetTask(luigi.Task): ...@@ -26,15 +26,18 @@ class PathSetTask(luigi.Task):
src: a URL pointing to a folder in s3:// or local FS. src: a URL pointing to a folder in s3:// or local FS.
include: a list of patterns to use to select. Multiple patterns are OR'd. include: a list of patterns to use to select. Multiple patterns are OR'd.
manifest: a URL pointing to a manifest file location.
""" """
src = luigi.Parameter() src = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',)) include = luigi.Parameter(is_list=True, default=('*',))
manifest = luigi.Parameter(default=None)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(PathSetTask, self).__init__(*args, **kwargs) super(PathSetTask, self).__init__(*args, **kwargs)
self.s3_conn = None self.s3_conn = None
def requires(self): def generate_file_list(self):
"""Yield each individual path given a source folder and a set of glob expressions."""
if self.src.startswith('s3'): if self.src.startswith('s3'):
# connect lazily as needed: # connect lazily as needed:
if self.s3_conn is None: if self.s3_conn is None:
...@@ -50,6 +53,23 @@ class PathSetTask(luigi.Task): ...@@ -50,6 +53,23 @@ class PathSetTask(luigi.Task):
for filepath in filelist: for filepath in filelist:
yield ExternalURL(filepath) yield ExternalURL(filepath)
def manifest_file_list(self):
"""Write each individual path to a manifest file and yield the path to that file."""
manifest_target = get_target_from_url(self.manifest)
if not manifest_target.exists():
with manifest_target.open('w') as manifest_file:
for external_url_task in self.generate_file_list():
manifest_file.write(external_url_task.url + '\n')
yield ExternalURL(self.manifest)
def requires(self):
if self.manifest is not None:
return self.manifest_file_list()
else:
return self.generate_file_list()
def complete(self): def complete(self):
# An optimization: just declare that the task is always # An optimization: just declare that the task is always
# complete, by definition, because it is whatever files were # complete, by definition, because it is whatever files were
......
...@@ -6,7 +6,9 @@ from fnmatch import fnmatch ...@@ -6,7 +6,9 @@ from fnmatch import fnmatch
from urlparse import urlparse from urlparse import urlparse
from boto.s3.key import Key from boto.s3.key import Key
from luigi.s3 import S3Client from luigi.s3 import S3Client, AtomicS3File, ReadableS3File, FileNotFoundException
import luigi.hdfs
def get_s3_bucket_key_names(url): def get_s3_bucket_key_names(url):
...@@ -94,3 +96,31 @@ class RestrictedPermissionsS3Client(S3Client): ...@@ -94,3 +96,31 @@ class RestrictedPermissionsS3Client(S3Client):
s3_key = Key(s3_bucket) s3_key = Key(s3_bucket)
s3_key.key = key s3_key.key = key
s3_key.set_contents_from_filename(local_path, policy='bucket-owner-full-control') s3_key.set_contents_from_filename(local_path, policy='bucket-owner-full-control')
class S3HdfsTarget(luigi.hdfs.HdfsTarget):
"""HDFS target that supports writing and reading files directly in S3."""
# Luigi does not support HDFS targets that point to complete URLs like "s3://foo/bar" it only supports HDFS paths
# that look like standard file paths "/foo/bar". Once this bug is fixed this class is no longer necessary.
# TODO: Fix the upstream bug in luigi that prevents writing to HDFS files that are specified by complete URLs
def __init__(self, path=None, format=luigi.hdfs.Plain, is_tmp=False):
super(S3HdfsTarget, self).__init__(path=path, format=format, is_tmp=is_tmp)
self.s3_client = RestrictedPermissionsS3Client()
def open(self, mode='r'):
if mode not in ('r', 'w'):
raise ValueError("Unsupported open mode '{mode}'".format(mode=mode))
safe_path = self.path.replace('s3n://', 's3://')
if mode == 'r':
s3_key = self.s3_client.get_key(safe_path)
if s3_key:
return ReadableS3File(s3_key)
else:
raise FileNotFoundException("Could not find file at %s" % safe_path)
else:
return AtomicS3File(safe_path, self.s3_client)
...@@ -18,7 +18,7 @@ import luigi.format ...@@ -18,7 +18,7 @@ import luigi.format
import luigi.hdfs import luigi.hdfs
import luigi.s3 import luigi.s3
from edx.analytics.tasks.s3_util import RestrictedPermissionsS3Client from edx.analytics.tasks.s3_util import RestrictedPermissionsS3Client, S3HdfsTarget
class ExternalURL(luigi.ExternalTask): class ExternalURL(luigi.ExternalTask):
...@@ -44,8 +44,8 @@ class IgnoredTarget(luigi.hdfs.HdfsTarget): ...@@ -44,8 +44,8 @@ class IgnoredTarget(luigi.hdfs.HdfsTarget):
DEFAULT_TARGET_CLASS = luigi.LocalTarget DEFAULT_TARGET_CLASS = luigi.LocalTarget
URL_SCHEME_TO_TARGET_CLASS = { URL_SCHEME_TO_TARGET_CLASS = {
'hdfs': luigi.hdfs.HdfsTarget, 'hdfs': luigi.hdfs.HdfsTarget,
's3': luigi.hdfs.HdfsTarget, 's3': S3HdfsTarget,
's3n': luigi.hdfs.HdfsTarget, 's3n': S3HdfsTarget,
'file': luigi.LocalTarget, 'file': luigi.LocalTarget,
's3+https': luigi.s3.S3Target, 's3+https': luigi.s3.S3Target,
} }
......
...@@ -31,5 +31,5 @@ edx.analytics.tasks = ...@@ -31,5 +31,5 @@ edx.analytics.tasks =
answer_dist = edx.analytics.tasks.answer_dist:AnswerDistributionPerCourse answer_dist = edx.analytics.tasks.answer_dist:AnswerDistributionPerCourse
mapreduce.engine = mapreduce.engine =
hadoop = luigi.hadoop:DefaultHadoopJobRunner hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
local = luigi.hadoop:LocalJobRunner local = luigi.hadoop:LocalJobRunner
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment