Commit 42a5d33b by Gabe Mulley

Export tracking logs

- Refactor acceptance tests
- Update version of Luigi

Change-Id: Icd064e41d3e174e4c3b40a328538f312909a7447
parent 3b87fd6a
......@@ -42,3 +42,6 @@ diff_*.html
report
edx_analytics.log
venv
# Override config files
override.cfg
# TODO: Maybe deploy this configuration with ansible instead of checking it in to avoid installation specific values.
[hadoop]
version = apache1
[core]
logging_conf_file=logging.cfg
hdfs-tmp-dir=/tmp/luigi/partial
[map-reduce]
engine = hadoop
marker = s3://edx-analytics-scratch/marker/
[event-logs]
# The environment represents a consistent key space where joins can be performed without fear of consistency issues
environment = prod
expand_interval = 2 days
source = s3://edx-all-tracking-logs/
# TODO: Figure out a better way to store a per-environment mapping like this.
[environment:prod]
pattern = .*?prod-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz
[environment:edge]
pattern = .*?prod-edge-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz
[event-export]
config = s3://edx-analytics-data/export-config.yaml
[manifest]
threshold = 500
input_format = oddjob.ManifestTextInputFormat
# TODO: Operationalize the build process for this jar.
lib_jar = s3://edx-analytics-packages/oddjob-1.0.1-standalone.jar
path = s3://edx-analytics-scratch/manifest/
[hadoop]
version = cdh3
[core]
logging_conf_file=logging.cfg
[map-reduce]
engine = hadoop
[event-logs]
source = s3://edx-all-tracking-logs
destination = s3://edx-analytics-events/raw/by-server
include = prod-edx-*/tracking.log-*.gz
prod-edxapp-*/tracking.log-*.gz
......@@ -13,7 +13,7 @@ from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.sqoop import SqoopImportFromMysql
from edx.analytics.tasks.util import csv_util
from edx.analytics.tasks.url import url_path_join
from edx.analytics.tasks.url import url_path_join, get_target_from_url
log = logging.getLogger(__name__)
......@@ -44,6 +44,10 @@ STUDENT_MODULE_FIELDS = [
StudentModuleRecord = namedtuple('StudentModuleRecord', STUDENT_MODULE_FIELDS)
# Name of marker file to appear in output directory of MultiOutputMapReduceJobTask to indicate success.
MARKER_FILENAME = 'job_success'
class StudentModulePerCourseTask(MultiOutputMapReduceJobTask):
"""
Separates a raw SQL dump of a courseware_studentmodule table into
......@@ -59,6 +63,9 @@ class StudentModulePerCourseTask(MultiOutputMapReduceJobTask):
def requires(self):
return PathSetTask(self.dump_root)
def output(self):
return get_target_from_url(url_path_join(self.output_root, MARKER_FILENAME))
def mapper(self, line):
"""
Extract course and reformat each line.
......
"""Group events by institution and export them for research purposes"""
import logging
import os
import luigi
import luigi.configuration
import yaml
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.pathutil import EventLogSelectionTask
from edx.analytics.tasks.url import url_path_join, ExternalURL
from edx.analytics.tasks.util import eventlog
log = logging.getLogger(__name__)
class EventExportTask(MultiOutputMapReduceJobTask):
"""
Group events by institution and export them for research purposes.
Parameters:
output_root: Directory to store the output in.
config: A URL to a YAML file that contains the list of organizations and servers to export events for.
source: A URL to a path that contains log files that contain the events.
environment: A list of short strings that describe the environment that generated the events. Only include
events from this list of environments.
interval: The range of dates to export logs for.
pattern: A regex with a named capture group for the date that approximates the date that the events within were
emitted. Note that the search interval is expanded, so events don't have to be in exactly the right file
in order for them to be processed.
"""
output_root = luigi.Parameter(
default_from_config={'section': 'event-export', 'name': 'output_root'}
)
config = luigi.Parameter(
default_from_config={'section': 'event-export', 'name': 'config'}
)
source = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'source'}
)
environment = luigi.Parameter(is_list=True, default=['prod', 'edge'])
interval = luigi.DateIntervalParameter()
pattern = luigi.Parameter(default=None)
def requires(self):
tasks = []
for env in self.environment:
tasks.append(
EventLogSelectionTask(
source=self.source,
environment=env,
interval=self.interval,
pattern=self.pattern,
)
)
return tasks
def requires_local(self):
return ExternalURL(url=self.config)
def init_mapper(self):
with self.input_local().open() as config_input:
config_data = yaml.load(config_input)
self.organizations = config_data['organizations']
self.org_id_whitelist = set(self.organizations.keys())
for _org_id, org_config in self.organizations.iteritems():
for alias in org_config.get('other_names', []):
self.org_id_whitelist.add(alias)
log.debug('Using org_id whitelist ["%s"]', '", "'.join(self.org_id_whitelist))
self.server_name_whitelist = set()
for env in self.environment:
server_list = config_data.get('environments', {}).get(env, {}).get('servers', [])
self.server_name_whitelist.update(server_list)
log.debug('Using server_id whitelist ["%s"]', '", "'.join(self.server_name_whitelist))
self.lower_bound_date_string = self.interval.date_a.strftime('%Y-%m-%d')
self.upper_bound_date_string = self.interval.date_b.strftime('%Y-%m-%d')
def mapper(self, line):
event = eventlog.parse_json_event(line)
if event is None:
return
try:
event_time = event['time']
except KeyError:
self.incr_counter('Event', 'Missing Time Field', 1)
return
# Don't use strptime to parse the date, it is extremely slow to do so. Instead rely on alphanumeric comparisons.
# The timestamp is ISO8601 formatted, so dates will look like %Y-%m-%d. For example: 2014-05-20.
date_string = event_time.split("T")[0]
if date_string < self.lower_bound_date_string or date_string >= self.upper_bound_date_string:
return
server_id = self.get_server_id()
org_id = self.get_org_id(event)
if org_id not in self.org_id_whitelist:
log.debug('Unrecognized organization: server_id=%s org_id=%s', server_id or '', org_id or '')
return
if server_id not in self.server_name_whitelist:
log.debug('Unrecognized server: server_id=%s org_id=%s', server_id or '', org_id or '')
return
yield (date_string, org_id, server_id), line
def get_server_id(self):
"""
Attempt to determine the server the event was emitted from.
This method may return incorrect results, so a white list of valid server names is used to filter out the noise.
"""
try:
# Hadoop sets an environment variable with the full URL of the input file. This url will be something like:
# s3://bucket/root/host1/tracking.log.gz. In this example, assume self.source is "s3://bucket/root".
input_file_name = os.environ['map_input_file']
except KeyError:
log.warn('map_input_file not defined in os.environ, unable to determine server_id')
return None
# Even if len(self.source) > len(input_file_name) the slice will return ''
# lstrip is a noop on an empty string
relative_path = input_file_name[len(self.source):].lstrip('/')
# relative_path = "host1/tracking.log.gz"
# Assume the server name is the first directory in the relative path
path_elements = relative_path.split('/')
# The result of string.split() always is a list with at least one element
server = path_elements[0]
# server = "host1"
return server
# This is copied verbatim (only comments changed) from the legacy event log export script.
def get_org_id(self, item):
"""
Attempt to determine the institution that is associated with this particular event.
This method may return incorrect results, so a white list of valid institution names is used to filter out the
noise.
"""
try:
if item['event_source'] == 'server':
institution = item.get('context', {}).get('org_id')
if institution:
return institution
# Try to infer the institution from the event data
evt_type = item['event_type']
if '/courses/' in evt_type:
institution = evt_type.split('/')[2]
return institution
elif '/' in evt_type:
return "Global"
else:
# Specific server logging. One-off parser for each type Survey of logs showed 4 event types:
# reset_problem save_problem_check, save_problem_check_fail, save_problem_fail All four of these
# have a problem_id, which we extract from.
try:
return item['event']['problem_id'].split('/')[2]
except Exception: # pylint: disable=broad-except
return "Unhandled"
elif item['event_source'] == 'browser':
page = item['page']
if 'courses' in page:
institution = page.split('/')[4]
return institution
else:
return "BGE"
except Exception: # pylint: disable=broad-except
log.exception('Unable to determine institution for event: %s', unicode(item).encode('utf8'))
return "Exception"
def output_path_for_key(self, key):
date, org_id, server_id = key
# This is the structure currently produced by the existing tracking log export script
return url_path_join(
self.output_root,
org_id,
server_id,
'{date}_{org}.log'.format(
date=date,
org=org_id,
)
)
def multi_output_reducer(self, _key, values, output_file):
for value in values:
output_file.write(value.strip())
output_file.write('\n')
import luigi
import luigi.hdfs
from edx.analytics.tasks.s3 import S3Sync
class SyncEventLogs(luigi.Task):
"""
Copies the gzipped raw event logs to a new location.
The directory structure of the source is preserved.
The parameters will default to the values set in the Luigi
configuration.
Parameters:
`source`: root S3 with raw event logs
`destination`: root S3 path the events will be copied
`include`: list of glob expressions of the keys to include.
"""
source = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'source'}
)
destination = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'destination'}
)
include = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'include'},
is_list=True
)
def requires(self):
return S3Sync(self.source, self.destination, self.include)
def output(self):
for output in self.requires().output():
yield luigi.hdfs.HdfsTarget(output.path)
......@@ -36,21 +36,20 @@ from stevedore.extension import ExtensionManager
log = logging.getLogger(__name__)
DEFAULT_CONFIGURATION_FILE = 'default.cfg'
OVERRIDE_CONFIGURATION_FILE = 'override.cfg'
def main():
# In order to see errors during extension loading, you can uncomment the next line.
# logging.basicConfig(level=logging.DEBUG)
# Load tasks configured using entry_points
# TODO: launch tasks by their entry_point name
ExtensionManager('edx.analytics.tasks')
# Include default configuration file with task defaults
# TODO: add a config argument to specify the location of the file
configuration = luigi.configuration.get_config()
configuration.add_config_path(DEFAULT_CONFIGURATION_FILE)
if not os.path.isfile(DEFAULT_CONFIGURATION_FILE):
log.warning('Default configuration file not found: %s', DEFAULT_CONFIGURATION_FILE)
if os.path.exists(OVERRIDE_CONFIGURATION_FILE):
configuration.add_config_path(OVERRIDE_CONFIGURATION_FILE)
# Tell luigi what dependencies to pass to the Hadoop nodes
# - argparse is not included by default in python 2.6, but is required by luigi.
......
......@@ -22,6 +22,7 @@ def main():
parser.add_argument('--verbose', action='store_true', help='display very verbose output')
parser.add_argument('--log-path', help='download luigi output streams after completing the task', default=None)
parser.add_argument('--user', help='remote user name to connect as', default=None)
parser.add_argument('--override-config', help='config file to use to run the job', default=None)
arguments, extra_args = parser.parse_known_args()
arguments.launch_task_arguments = extra_args
......@@ -69,6 +70,8 @@ def convert_args_to_extra_vars(arguments, uid):
extra_vars['wait_for_task'] = True
if arguments.log_path:
extra_vars['local_log_dir'] = arguments.log_path
if arguments.override_config:
extra_vars['override_config'] = arguments.override_config
return ' '.join(["{}='{}'".format(k, extra_vars[k]) for k in extra_vars])
......
......@@ -3,16 +3,23 @@ Support executing map reduce tasks.
"""
from __future__ import absolute_import
import gzip
from hashlib import md5
import os
import StringIO
import luigi
import luigi.configuration
import luigi.hdfs
import luigi.hadoop
import luigi.task
from luigi import configuration
from edx.analytics.tasks.url import get_target_from_url, url_path_join
from edx.analytics.tasks.util.manifest import convert_tasks_to_manifest_if_necessary
# Name of marker file to appear in output directory of MultiOutputMapReduceJobTask to indicate success.
MARKER_FILENAME = 'job_success'
DEFAULT_MARKER_ROOT = 'hdfs:///tmp/marker'
class MapReduceJobTask(luigi.hadoop.JobTask):
......@@ -22,14 +29,16 @@ class MapReduceJobTask(luigi.hadoop.JobTask):
"""
mapreduce_engine = luigi.Parameter(
default_from_config={'section': 'map-reduce', 'name': 'engine'}
default_from_config={'section': 'map-reduce', 'name': 'engine'},
significant=False
)
input_format = luigi.Parameter(default=None)
lib_jar = luigi.Parameter(is_list=True, default=[])
# TODO: remove these parameters
input_format = luigi.Parameter(default=None, significant=False)
lib_jar = luigi.Parameter(is_list=True, default=[], significant=False)
# Override the parent class definition of this parameter. This typically wants to scale with the cluster size so the
# user should be able to tweak it depending on their particular configuration.
n_reduce_tasks = luigi.Parameter(default=25)
n_reduce_tasks = luigi.Parameter(default=25, significant=False)
def job_runner(self):
# Lazily import this since this module will be loaded on hadoop worker nodes however stevedore will not be
......@@ -43,10 +52,39 @@ class MapReduceJobTask(luigi.hadoop.JobTask):
raise KeyError('A map reduce engine must be specified in order to run MapReduceJobTasks')
if issubclass(engine_class, MapReduceJobRunner):
return engine_class(libjars_in_hdfs=self.lib_jar, input_format=self.input_format)
engine_kwargs = self._get_engine_parameters_from_targets()
return engine_class(**engine_kwargs)
else:
return engine_class()
def _get_engine_parameters_from_targets(self):
"""
Determine the set of job parameters that should be used to process the input.
Some types of input may not be simple files that Hadoop can process natively out of the box, they might require
special handling by custom input formats. Allow dynamic loading of input formats and the jars that contain them
by setting attributes on the input target.
"""
lib_jar = list(self.lib_jar)
input_format = self.input_format
for input_target in luigi.task.flatten(self.input_hadoop()):
if hasattr(input_target, 'lib_jar'):
lib_jar.extend(input_target.lib_jar)
if hasattr(input_target, 'input_format') and input_target.input_format is not None:
if input_format is not None and input_target.input_format != input_format:
raise RuntimeError('Multiple distinct input formats specified on input targets.')
input_format = input_target.input_format
return {
'libjars_in_hdfs': lib_jar,
'input_format': input_format,
}
def requires_hadoop(self):
return convert_tasks_to_manifest_if_necessary(self.requires())
class MapReduceJobRunner(luigi.hadoop.HadoopJobRunner):
"""
......@@ -60,14 +98,89 @@ class MapReduceJobRunner(luigi.hadoop.HadoopJobRunner):
def __init__(self, libjars_in_hdfs=None, input_format=None):
libjars_in_hdfs = libjars_in_hdfs or []
config = configuration.get_config()
streaming_jar = config.get('hadoop', 'streaming-jar')
streaming_jar = config.get('hadoop', 'streaming-jar', '/tmp/hadoop-streaming.jar')
if config.has_section('job-conf'):
job_confs = dict(config.items('job-conf'))
else:
job_confs = {}
super(MapReduceJobRunner, self).__init__(
streaming_jar,
input_format=input_format,
libjars_in_hdfs=libjars_in_hdfs
libjars_in_hdfs=libjars_in_hdfs,
jobconfs=job_confs,
)
class EmulatedMapReduceJobRunner(luigi.hadoop.JobRunner):
"""
Execute map reduce tasks in process on the machine that is running luigi.
This is a modified version of luigi.hadoop.LocalJobRunner. The key differences are:
* It gracefully handles .gz input files, decompressing them and streaming them directly to the mapper. This mirrors
the behavior of hadoop's default file input format. Note this only works for files that support `tell()` and
`seek()` since those methods are used by the gzip decompression library.
* It detects ".manifest" files and assumes that they are in fact just a file that contains paths to the real files
that should be processed by the task. It makes use of this information to "do the right thing". This mirrors the
behavior of a manifest input format in hadoop.
* It sets the "map_input_file" environment variable when running the mapper just like the hadoop streaming library.
Other than that it should behave identically to LocalJobRunner.
"""
def group(self, input):
output = StringIO.StringIO()
lines = []
for i, line in enumerate(input):
parts = line.rstrip('\n').split('\t')
blob = md5(str(i)).hexdigest() # pseudo-random blob to make sure the input isn't sorted
lines.append((parts[:-1], blob, line))
for k, _, line in sorted(lines):
output.write(line)
output.seek(0)
return output
def run_job(self, job):
job.init_hadoop()
job.init_mapper()
map_output = StringIO.StringIO()
input_targets = luigi.task.flatten(job.input_hadoop())
for input_target in input_targets:
with input_target.open('r') as input_file:
# S3 files not yet supported since they don't support tell() and seek()
if input_target.path.endswith('.gz'):
input_file = gzip.GzipFile(fileobj=input_file)
elif input_target.path.endswith('.manifest'):
for url in input_file:
input_targets.append(get_target_from_url(url.strip()))
continue
os.environ['map_input_file'] = input_target.path
try:
outputs = job._map_input((line[:-1] for line in input_file))
job.internal_writer(outputs, map_output)
finally:
del os.environ['map_input_file']
map_output.seek(0)
reduce_input = self.group(map_output)
try:
reduce_output = job.output().open('w')
except Exception:
reduce_output = StringIO.StringIO()
try:
job._run_reducer(reduce_input, reduce_output)
finally:
try:
reduce_output.close()
except Exception:
pass
class MultiOutputMapReduceJobTask(MapReduceJobTask):
"""
......@@ -83,10 +196,12 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
delete_output_root: if True, recursively deletes the output_root at task creation.
"""
output_root = luigi.Parameter()
delete_output_root = luigi.BooleanParameter(default=False)
delete_output_root = luigi.BooleanParameter(default=False, significant=False)
def output(self):
return get_target_from_url(url_path_join(self.output_root, MARKER_FILENAME))
marker_base_url = luigi.configuration.get_config().get('map-reduce', 'marker', DEFAULT_MARKER_ROOT)
marker_url = url_path_join(marker_base_url, str(hash(self)))
return get_target_from_url(marker_url)
def reducer(self, key, values):
"""
......@@ -123,5 +238,6 @@ class MultiOutputMapReduceJobTask(MapReduceJobTask):
# (i.e. the output target) will be removed, so that external functionality
# will know that the generation of data files is not complete.
output_dir_target = get_target_from_url(self.output_root)
if output_dir_target.exists():
output_dir_target.remove()
for target in [self.output(), output_dir_target]:
if target.exists():
target.remove()
......@@ -7,15 +7,25 @@ Supports outputs to HDFS, S3, and local FS.
"""
import boto
import datetime
import glob
import logging
import os
import re
import luigi
import luigi.s3
import luigi.configuration
import luigi.hdfs
import luigi.format
import luigi.task
from edx.analytics.tasks.s3_util import generate_s3_sources
from edx.analytics.tasks.url import ExternalURL, url_path_join, get_target_from_url
from luigi.date_interval import DateInterval
from edx.analytics.tasks.s3_util import generate_s3_sources, get_s3_bucket_key_names
from edx.analytics.tasks.url import ExternalURL, UncheckedExternalURL, url_path_join, get_target_from_url
log = logging.getLogger(__name__)
class PathSetTask(luigi.Task):
......@@ -78,3 +88,119 @@ class PathSetTask(luigi.Task):
def output(self):
return [task.output() for task in self.requires()]
class EventLogSelectionTask(luigi.WrapperTask):
"""
Select all relevant event log input files from a directory.
Recursively list all files in the directory which is expected to contain the input files organized in such a way
that a pattern can be used to find them. Filenames are expected to contain a date which represents an approximation
of the date found in the events themselves.
Parameters:
environment: A list of short strings that describe the environment that generated the events. Only include
events from this list of environments.
source: A URL to a path that contains log files that contain the events.
interval: The range of dates to export logs for.
expand_interval: A time interval to add to the beginning and end of the interval to expand the windows of files
captured.
pattern: A regex with a named capture group for the date that approximates the date that the events within were
emitted. Note that the search interval is expanded, so events don't have to be in exactly the right file
in order for them to be processed.
"""
# TODO: make this a list parameter, support pulling files for multiple environments at once
environment = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'environment'}
)
source = luigi.Parameter(
default_from_config={'section': 'event-logs', 'name': 'source'}
)
interval = luigi.DateIntervalParameter()
expand_interval = luigi.TimeDeltaParameter(
default_from_config={'section': 'event-logs', 'name': 'expand_interval'}
)
pattern = luigi.Parameter(default=None)
def __init__(self, *args, **kwargs):
super(EventLogSelectionTask, self).__init__(*args, **kwargs)
self.interval = DateInterval(
self.interval.date_a - self.expand_interval,
self.interval.date_b + self.expand_interval
)
configuration = luigi.configuration.get_config()
if self.pattern is None:
self.pattern = configuration.get('environment:' + self.environment, 'pattern')
self.requirements = None
def requires(self):
# This method gets called several times. Avoid making multiple round trips to S3 by caching the first result.
if self.requirements is None:
log.debug('No saved requirements found, refreshing requirements list.')
self.requirements = self._get_requirements()
else:
log.debug('Using cached requirements.')
return self.requirements
def _get_requirements(self):
"""
Gather the set of requirements needed to run the task.
This can be a rather expensive operation that requires usage of the S3 API to list all files in the source
bucket and select the ones that are applicable to the given date range.
"""
if self.source.startswith('s3'):
urls = self._get_s3_urls()
else:
urls = self._get_local_urls()
log.debug('Matching urls using pattern="%s"', self.pattern)
log.debug(
'Date interval: %s <= date < %s', self.interval.date_a.isoformat(), self.interval.date_b.isoformat()
)
return [UncheckedExternalURL(url) for url in urls if self.should_include_url(url)]
def _get_s3_urls(self):
"""Recursively list all files inside the source URL directory."""
s3_conn = boto.connect_s3()
bucket_name, root = get_s3_bucket_key_names(self.source)
bucket = s3_conn.get_bucket(bucket_name)
for key_metadata in bucket.list(root):
if key_metadata.size > 0:
key_path = key_metadata.key[len(root):].lstrip('/')
yield url_path_join(self.source, key_path)
def _get_local_urls(self):
"""Recursively list all files inside the source directory on the local filesystem."""
for directory_path, _subdir_paths, filenames in os.walk(self.source):
for filename in filenames:
yield os.path.join(directory_path, filename)
def should_include_url(self, url):
"""
Determine whether the file pointed to by the URL should be included in the set of files used for analysis.
Presently filters first on pattern match and then on the datestamp extracted from the file name.
"""
match = re.match(self.pattern, url)
if not match:
log.debug('Excluding due to pattern mismatch: %s', url)
return False
# TODO: support patterns that don't contain a "date" group
parsed_datetime = datetime.datetime.strptime(match.group('date'), '%Y%m%d')
parsed_date = datetime.date(parsed_datetime.year, parsed_datetime.month, parsed_datetime.day)
should_include = parsed_date in self.interval
if should_include:
log.debug('Including: %s', url)
else:
log.debug('Excluding due to date interval: %s', url)
return should_include
def output(self):
return [task.output() for task in self.requires()]
......@@ -288,7 +288,6 @@ class TestWeeklyAllUsersAndEnrollments(unittest.TestCase):
history = requires['history'].output()
self.assertIsInstance(history, luigi.File)
self.assertEqual(history.format, luigi.format.Gzip)
registrations = requires['registrations'].output()
self.assertIsInstance(requires['registrations'], UserRegistrationsPerDay)
......
import os.path
import boto
import luigi
import luigi.s3
from edx.analytics.tasks.s3_util import join_as_s3_url, get_s3_bucket_key_names, generate_s3_sources, get_s3_key
class S3Copy(luigi.Task):
"""
Copy a file from one S3 location to another.
Files in the destination are overriden unless they have the same.
The copy is done using boto.
Parameters:
`source`: location of original s3 file
`destination`: location where to copy the file
"""
source = luigi.Parameter()
destination = luigi.Parameter()
def __init__(self, *args, **kwargs):
super(S3Copy, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3()
def requires(self):
return luigi.s3.S3PathTask(self.source)
def output(self):
return luigi.s3.S3Target(self.destination)
def complete(self):
# Check if the destination file has been copied already by
# verifying its existence, and if so, determining if it has
# the same content as the source by using md5 hashes.
src = self.input()
dst = self.output()
if not dst.exists():
return False
src_key = get_s3_key(self.s3, src.path)
dst_key = get_s3_key(self.s3, dst.path)
if dst_key.size != src_key.size:
return False
# Check the md5 hashes of the keys.
if dst_key.etag != src_key.etag:
return False
return True
def run(self):
src_url = self.input().path
dst_url = self.output().path
src_key = get_s3_key(self.s3, src_url)
dst_bucket_name, dst_key_name = get_s3_bucket_key_names(dst_url)
# The copy overwrites the destination. The task checks if
# that is necessary during the `complete()` call.
src_key.copy(dst_bucket_name, dst_key_name)
class S3Sync(luigi.Task):
"""
Synchronizes a s3 root path with another.
The destination file paths are relative to the source and destination
roots. For example if:
source: s3://source-bucket/foo/bar
destination: s3://destination-bucket/baz
include = ['*.gz']
The file s3://source-bucket/foo/bar/zoo/lion.gz will be copied to
s3://destination-bucket/baz/zoo/lion.gz
Parameters:
`source`: root S3 path where of the keys to be copied
`destination`: root S3 path where the keys will be copied
`include`: list of glob expressions of the keys to include.
default is ['*']
"""
source = luigi.Parameter()
destination = luigi.Parameter()
include = luigi.Parameter(is_list=True, default=('*',))
def __init__(self, *args, **kwargs):
super(S3Sync, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3()
def requires(self):
for bucket, root, path in generate_s3_sources(self.s3, self.source, self.include):
source = join_as_s3_url(bucket, root, path)
destination = os.path.join(self.destination, path)
yield S3Copy(source, destination)
def output(self):
for task in self.requires():
yield task.output()
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/Open_DemoX/edx_demo_course", "org_id": "edX", "command": "./manage.py create_user"}, "time": "2014-04-18T17:09:59.941616+00:00", "ip": "", "event": {"course_id": "edX/Open_DemoX/edx_demo_course", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/Open_DemoX/edx_demo_course", "org_id": "edX", "command": "./manage.py create_user"}, "time": "2014-05-15T17:09:59.941616+00:00", "ip": "", "event": {"course_id": "edX/Open_DemoX/edx_demo_course", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "", "host": "acceptance.m.sandbox.edx.org", "event_source": "server", "event_type": "/", "context": {"username": "", "user_id": "", "ip": "127.0.0.1", "org_id": "", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": null, "course_id": "", "path": "/"}, "time": "2014-05-15T18:23:56.781588+00:00", "ip": "127.0.0.1", "event": "{\"POST\": {}, \"GET\": {}}", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "page": null}
{"username": "staff", "host": "acceptance.m.sandbox.edx.org", "event_source": "server", "event_type": "/courses/edX/Open_DemoX/edx_demo_course/info", "context": {"username": "staff", "course_user_tags": {}, "user_id": 4, "ip": "127.0.0.1", "org_id": "edX", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "course_id": "edX/Open_DemoX/edx_demo_course", "path": "/courses/edX/Open_DemoX/edx_demo_course/info"}, "time": "2014-05-16T18:52:20.302431+00:00", "ip": "127.0.0.1", "event": "{\"POST\": {}, \"GET\": {}}", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "page": null}
{"username": "staff", "event_type": "seek_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "event": "{\"id\":\"i4x-AcceptanceX-A101-video-0b9e39477cf34507a7a48f74be381fdd\",\"old_time\":38.910057,\"new_time\":147,\"type\":\"onSlideSeek\",\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"username": "staff", "user_id": 4, "ip": "127.0.0.1", "org_id": "AcceptanceX", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "course_id": "AcceptanceX/A101/T12014", "path": "/event"}, "time": "2014-05-15T19:03:19.487373+00:00", "page": "http://acceptance.m.sandbox.edx.org/courses/AcceptanceX/A101/T12014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"}
{"username": "staff", "event_type": "play_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "event": "{\"id\":\"i4x-AcceptanceX-A101-video-0b9e39477cf34507a7a48f74be381fdd\",\"currentTime\":38.910057,\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"username": "staff", "user_id": 4, "ip": "127.0.0.1", "org_id": "AcceptanceX", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "course_id": "AcceptanceX/A101/T12014", "path": "/event"}, "time": "2014-06-01T19:03:19.553862+00:00", "page": "http://acceptance.m.sandbox.edx.org/courses/AcceptanceX/A101/T12014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"}
{"username": "staff", "event_type": "seek_video", "ip": "127.0.0.1", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "event": "{\"id\":\"i4x-AcceptanceX-A101-video-0b9e39477cf34507a7a48f74be381fdd\",\"old_time\":38.910057,\"new_time\":147,\"type\":\"onSlideSeek\",\"code\":\"b7xgknqkQk8\"}", "event_source": "browser", "context": {"username": "staff", "user_id": 4, "ip": "127.0.0.1", "org_id": "AcceptanceX", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "course_id": "AcceptanceX/A101/T12014", "path": "/event"}, "time": "2014-05-15T19:03:19.487373+00:00", "page": "http://acceptance.m.sandbox.edx.org/courses/AcceptanceX/A101/T12014/courseware/d8a6192ade314473a78242dfeedfbf5b/edx_introduction/"}
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/Open_DemoX/edx_demo_course", "org_id": "edX", "command": "./manage.py create_user"}, "time": "2014-05-15T17:09:59.941616+00:00", "ip": "", "event": {"course_id": "edX/Open_DemoX/edx_demo_course", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "staff", "host": "acceptance.m.sandbox.edx.org", "event_source": "server", "event_type": "/courses/edX/Open_DemoX/edx_demo_course/info", "context": {"username": "staff", "course_user_tags": {}, "user_id": 4, "ip": "127.0.0.1", "org_id": "edX", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "host": "acceptance.m.sandbox.edx.org", "session": "3759e59e1164d199537519a77d0e9246", "course_id": "edX/Open_DemoX/edx_demo_course", "path": "/courses/edX/Open_DemoX/edx_demo_course/info"}, "time": "2014-05-16T18:52:20.302431+00:00", "ip": "127.0.0.1", "event": "{\"POST\": {}, \"GET\": {}}", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.132 Safari/537.36", "page": null}
"""
End to end test of event exports.
"""
import gzip
import os
import logging
import tempfile
import textwrap
import time
from luigi.s3 import S3Client, S3Target
from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase
from edx.analytics.tasks.url import url_path_join
log = logging.getLogger(__name__)
class EventExportAcceptanceTest(AcceptanceTestCase):
"""Validate data flow for bulk export of events for research purposes."""
INPUT_FILE = 'event_export_tracking.log'
PROD_SERVER_NAME = 'prod-edxapp-001'
EDGE_SERVER_NAME = 'prod-edge-edxapp-002'
NUM_REDUCERS = 1
def setUp(self):
super(EventExportAcceptanceTest, self).setUp()
# The name of an existing job flow to run the test on
assert('job_flow_name' in self.config)
# The git URL of the repository to checkout analytics-tasks from.
assert('tasks_repo' in self.config)
# The branch of the analytics-tasks repository to test. Note this can differ from the branch that is currently
# checked out and running this code.
assert('tasks_branch' in self.config)
# Where to store logs generated by analytics-tasks.
assert('tasks_log_path' in self.config)
# The user to connect to the job flow over SSH with.
assert('connection_user' in self.config)
# Where analytics-tasks should output data, should be a URL pointing to a directory.
assert('tasks_output_url' in self.config)
# Allow for parallel execution of the test by specifying a different identifier. Using an identical identifier
# allows for old virtualenvs to be reused etc, which is why a random one is not simply generated with each run.
assert('identifier' in self.config)
url = self.config['tasks_output_url']
identifier = self.config['identifier']
self.s3_client = S3Client()
self.test_root = url_path_join(url, identifier, 'event_export')
self.s3_client.remove(self.test_root, recursive=True)
self.test_src = url_path_join(self.test_root, 'src')
self.test_out = url_path_join(self.test_root, 'out')
self.test_config = url_path_join(self.test_root, 'config', 'default.yaml')
self.input_paths = {
'prod': url_path_join(self.test_src, self.PROD_SERVER_NAME, 'tracking.log-20140515.gz'),
'edge': url_path_join(self.test_src, self.EDGE_SERVER_NAME, 'tracking.log-20140516-12345456.gz')
}
self.upload_data()
self.write_config()
def upload_data(self):
src = os.path.join(self.data_dir, 'input', self.INPUT_FILE)
with tempfile.NamedTemporaryFile(suffix='.gz') as temp_file:
gzip_file = gzip.open(temp_file.name, 'wb')
try:
with open(src, 'r') as input_file:
for line in input_file:
gzip_file.write(line)
finally:
gzip_file.close()
temp_file.flush()
# Upload test data file
for dst in self.input_paths.values():
self.s3_client.put(temp_file.name, dst)
def write_config(self):
with S3Target(self.test_config).open('w') as target_file:
target_file.write(
textwrap.dedent(
"""
---
environments:
prod:
servers:
- {server_1}
edge:
servers:
- {server_2}
organizations:
edX:
recipient: automation@example.com
AcceptanceX:
recipient: automation@example.com
"""
.format(
server_1=self.PROD_SERVER_NAME,
server_2=self.EDGE_SERVER_NAME
)
)
)
def test_event_log_exports_using_manifest(self):
with tempfile.NamedTemporaryFile() as temp_config_file:
temp_config_file.write(
textwrap.dedent(
"""
[manifest]
threshold = 1
"""
)
)
temp_config_file.flush()
self.launch_task(config=temp_config_file.name)
self.validate_output()
def launch_task(self, config=None):
command = [
os.getenv('REMOTE_TASK'),
'--job-flow-name', self.config.get('job_flow_name'),
'--branch', self.config.get('tasks_branch'),
'--repo', self.config.get('tasks_repo'),
'--remote-name', self.config.get('identifier'),
'--wait',
'--log-path', self.config.get('tasks_log_path'),
'--user', self.config.get('connection_user'),
]
if config:
command.extend(['--override-config', config])
command.extend(
[
'EventExportTask',
'--local-scheduler',
'--source', self.test_src,
'--output-root', self.test_out,
'--delete-output-root',
'--config', self.test_config,
'--environment', 'prod',
'--environment', 'edge',
'--interval', '2014-05',
'--n-reduce-tasks', str(self.NUM_REDUCERS),
]
)
self.call_subprocess(command)
def validate_output(self):
# TODO: a lot of duplication here
comparisons = [
('2014-05-15_edX.log', url_path_join(self.test_out, 'edX', self.PROD_SERVER_NAME, '2014-05-15_edX.log')),
('2014-05-16_edX.log', url_path_join(self.test_out, 'edX', self.PROD_SERVER_NAME, '2014-05-16_edX.log')),
('2014-05-15_edX.log', url_path_join(self.test_out, 'edX', self.EDGE_SERVER_NAME, '2014-05-15_edX.log')),
('2014-05-16_edX.log', url_path_join(self.test_out, 'edX', self.EDGE_SERVER_NAME, '2014-05-16_edX.log')),
('2014-05-15_AcceptanceX.log', url_path_join(self.test_out, 'AcceptanceX', self.EDGE_SERVER_NAME, '2014-05-15_AcceptanceX.log')),
('2014-05-15_AcceptanceX.log', url_path_join(self.test_out, 'AcceptanceX', self.PROD_SERVER_NAME, '2014-05-15_AcceptanceX.log')),
]
for local_file_name, remote_url in comparisons:
with open(os.path.join(self.data_dir, 'output', local_file_name), 'r') as local_file:
remote_target = S3Target(remote_url)
# Files won't appear in S3 instantaneously, wait for the files to appear.
# TODO: exponential backoff
found = False
for _i in range(30):
if remote_target.exists():
found = True
break
else:
time.sleep(2)
if not found:
self.fail('Unable to find expected output file {0}'.format(remote_url))
with remote_target.open('r') as remote_file:
local_contents = local_file.read()
remote_contents = remote_file.read()
self.assertEquals(local_contents, remote_contents)
"""Support modifying luigi configuration settings in tests."""
from functools import wraps
from luigi.configuration import LuigiConfigParser
def with_luigi_config(*decorator_args):
"""
Decorator constructor that temporarily overrides a config file setting while executing the method.
Can be passed a special value of edx.analytics.tasks.tests.config.OPTION_REMOVED which will ensure that the given
option is *not* present in the configuration.
Examples::
@with_luigi_config('foo', 'bar', 'baz')
def test_something(self):
value = luigi.configuration.get_config().get('foo', 'bar')
assert value == 'baz'
@with_luigi_config(('foo', 'bar', 'baz'), ('x', 'y', 'z'))
def test_something_else(self):
config = luigi.configuration.get_config()
value = config.get('foo', 'bar')
assert value == 'baz'
other_value = config.get('x', 'y')
assert other_value == 'z'
from edx.analytics.tasks.tests.config import OPTION_REMOVED
@with_luigi_config('foo', 'bar', OPTION_REMOVED)
def test_no_option(self):
try:
luigi.configuration.get_config().get('foo', 'bar')
except NoOptionError:
# This will always be executed regardless of whether or not a 'foo' section exists in the config files
# and contains a 'bar' option.
pass
"""
def config_decorator(func):
"""Actual function decorator"""
@wraps(func)
def function_config_wrapper(*args, **kwargs):
"""Overrides the given settings while executing the wrapped function."""
# Save the current configuration
current_config_instance = LuigiConfigParser._instance
# This will force the configuration to be reloaded from disk.
LuigiConfigParser._instance = None
try:
# Get a brand new configuration object by loading from disk.
new_instance = LuigiConfigParser.instance()
def modify_config(section, option, value):
if value == OPTION_REMOVED:
new_instance.remove_option(section, option)
else:
new_instance.set(section, option, str(value))
# Support the single override case: @with_luigi_config('section', 'option', 'value')
if isinstance(decorator_args[0], basestring):
section, option, value = decorator_args
modify_config(section, option, value)
else:
# As well as the generic case that allows multiple overrides at once:
# @with_luigi_config(('sec', 'opt', 'val'), ('foo', 'bar', 'baz'))
for section, option, value in decorator_args:
modify_config(section, option, value)
return func(*args, **kwargs)
finally:
# Restore the saved configuration
LuigiConfigParser._instance = current_config_instance
return function_config_wrapper
return config_decorator
OPTION_REMOVED = object()
......@@ -841,22 +841,7 @@ class AnswerDistributionOneFilePerCourseTaskOutputRootTest(unittest.TestCase):
self.assertTrue(os.path.exists(self.output_root))
def test_delete_output_root(self):
# We create a task in order to get the output path.
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
src=None,
dest=None,
name='name',
include=None,
output_root=self.output_root,
)
# Write to the output path will not mark this task
# as complete.
output_marker = task.output().path
open(output_marker, 'a').close()
self.assertFalse(task.complete())
# But it's still possible to use the delete option
# It's still possible to use the delete option
# to get rid of the output_root directory.
task = AnswerDistributionOneFilePerCourseTask(
mapreduce_engine='local',
......
"""
Tests for event export tasks
"""
import datetime
from textwrap import dedent
from cStringIO import StringIO
from luigi.date_interval import Year
from mock import MagicMock, patch
import yaml
from edx.analytics.tasks.event_exports import EventExportTask
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.target import FakeTarget
class EventExportTestCase(unittest.TestCase):
"""Tests for EventExportTask."""
EXAMPLE_EVENT = '{"context":{"org_id": "FooX"}, "time": "2014-05-20T00:10:30+00:00","event_source": "server"}'
SERVER_NAME_1 = 'prod-app-001'
SERVER_NAME_2 = 'prod-app-002'
EXAMPLE_TIME = '2014-05-20T00:10:30+00:00'
EXAMPLE_DATE = '2014-05-20'
# Include some non-standard spacing in this JSON to ensure that the data is not modified in any way.
EVENT_TEMPLATE = \
'{{"context":{{"org_id": "{org_id}"}}, "time": "{time}","event_source": "server"}}' # pep8: disable=E231
CONFIG_DICT = {
'environments': {
'prod': {
'servers': [
SERVER_NAME_1,
SERVER_NAME_2
]
}
},
'organizations': {
'FooX': {
'recipient': 'automation@example.com'
},
'BarX': {
'recipient': 'automation@example.com',
'other_names': [
'BazX',
'bar'
]
}
}
}
CONFIGURATION = yaml.dump(CONFIG_DICT)
def setUp(self):
self.task = EventExportTask(
mapreduce_engine='local',
output_root='test://output/',
config='test://config/default.yaml',
source='test://input/',
environment=['edge', 'prod'],
interval=Year.parse('2014'),
)
self.task.input_local = MagicMock(return_value=FakeTarget(self.CONFIGURATION))
def test_org_whitelist_capture(self):
self.task.init_mapper()
self.assertItemsEqual(self.task.org_id_whitelist, ['FooX', 'BarX', 'BazX', 'bar'])
def test_server_whitelist_capture(self):
self.task.init_mapper()
self.assertItemsEqual(self.task.server_name_whitelist, [self.SERVER_NAME_1, self.SERVER_NAME_2])
def test_mapper(self):
expected_output = [
(
(self.EXAMPLE_DATE, 'FooX', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='FooX', time=self.EXAMPLE_TIME)
),
(
(self.EXAMPLE_DATE, 'BarX', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='BarX', time=self.EXAMPLE_TIME)
),
(
(self.EXAMPLE_DATE, 'BazX', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='BazX', time=self.EXAMPLE_TIME)
),
(
(self.EXAMPLE_DATE, 'BazX', self.SERVER_NAME_2),
self.EVENT_TEMPLATE.format(org_id='BazX', time=self.EXAMPLE_TIME)
),
]
excluded_events = [
(
(self.EXAMPLE_DATE, 'OtherOrgX', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='OtherOrgX', time=self.EXAMPLE_TIME)
),
(
(datetime.date(2013, 12, 31), 'bar', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='bar', time='2013-12-31T23:59:59+00:00')
),
(
(datetime.date(2015, 1, 1), 'bar', self.SERVER_NAME_1),
self.EVENT_TEMPLATE.format(org_id='bar', time='2015-01-01T00:00:00+00:00')
),
(
(datetime.date(2015, 1, 1), 'bar', self.SERVER_NAME_1),
'{invalid json'
)
]
input_events = expected_output + excluded_events
self.task.init_mapper()
results = []
for key, event_string in input_events:
results.extend(self.run_mapper_for_server_file(key[2], event_string))
self.assertItemsEqual(results, expected_output)
def run_mapper_for_server_file(self, server, event_string):
"""Emulate execution of the map function on data emitted by the given server."""
return self.run_mapper_for_file_path('test://input/{0}/tracking.log'.format(server), event_string)
def run_mapper_for_file_path(self, path, event_string):
"""Emulate execution of the map function on data read from the given file path."""
with patch.dict('os.environ', {'map_input_file': path}):
return [output for output in self.task.mapper(event_string) if output is not None]
def test_institution_from_context(self):
event = {
'event_source': 'server',
'context': {
'org_id': 'FooX'
}
}
self.assertEquals('FooX', self.task.get_org_id(event))
def test_empty_institution_from_context(self):
event = {
'event_source': 'server',
'context': {
'org_id': ''
}
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_missing_context(self):
event = {
'event_source': 'server'
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_institution_from_course_url(self):
event = {
'event_source': 'server',
'event_type': '/courses/FooX/LearningMath/2014T2/content'
}
self.assertEquals('FooX', self.task.get_org_id(event))
def test_implicit_event_without_course_url(self):
event = {
'event_source': 'server',
'event_type': '/any/page'
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_institution_from_problem_event(self):
event = {
'event_source': 'server',
'event_type': 'problem_check',
'event': {
'problem_id': 'i4x://FooX/LearningMath/Otherthings'
}
}
self.assertEquals('FooX', self.task.get_org_id(event))
def test_problem_without_id(self):
event = {
'event_source': 'server',
'event_type': 'problem_check',
'event': {
}
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_institution_from_page(self):
event = {
'event_source': 'browser',
'page': 'http://courses.example.com/courses/FooX/LearningMath/2014T2/content'
}
self.assertEquals('FooX', self.task.get_org_id(event))
def test_no_course_in_page_url(self):
event = {
'event_source': 'browser',
'page': 'http://foo.example.com/any/page'
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_no_event_source(self):
event = {
'foo': 'bar'
}
self.assertNotEquals('FooX', self.task.get_org_id(event))
def test_output_path_for_key(self):
path = self.task.output_path_for_key((datetime.date(2015, 1, 1), 'OrgX', 'prod-app-001'))
self.assertEquals('test://output/OrgX/prod-app-001/2015-01-01_OrgX.log', path)
def test_output_path_for_key_casing(self):
path = self.task.output_path_for_key((datetime.date(2015, 1, 1), 'orgX', 'prod-app-001'))
self.assertEquals('test://output/orgX/prod-app-001/2015-01-01_orgX.log', path)
def test_multi_output_reducer(self):
output = StringIO()
self.task.multi_output_reducer(None, ['a\t', 'b', 'c'], output)
output.seek(0)
self.assertEquals('a\nb\nc\n', output.read())
def test_local_requirements(self):
self.assertEquals(self.task.requires_local().url, 'test://config/default.yaml')
def test_hadoop_requirements(self):
requirements = self.task.requires_hadoop()
for task in requirements:
if hasattr(task, 'url') and task.url == 'test://config/default.yaml':
self.fail('Expected config task to be excluded from the hadoop requirements.')
self.assertEquals(2, len(requirements))
for i in range(2):
task = requirements[i]
self.assertEquals('test://input/', task.source)
self.assertEquals('edge' if i == 0 else 'prod', task.environment)
# Pattern is difficult to validate since it's read from the config
# Interval is also difficult to validate since it is expanded by the initializer
# Some coverage missing here, but it's probably good enough for now
def test_unrecognized_environment(self):
self.task.init_mapper()
for server in ['prod-app-001', 'prod-app-002']:
expected_output = [((self.EXAMPLE_DATE, 'FooX', server), self.EXAMPLE_EVENT)]
self.assertItemsEqual(self.run_mapper_for_server_file(server, self.EXAMPLE_EVENT), expected_output)
self.assertItemsEqual(self.run_mapper_for_server_file('foobar', self.EXAMPLE_EVENT), [])
def test_odd_file_paths(self):
self.task.init_mapper()
for path in ['something.gz', 'test://input/something.gz']:
self.assertItemsEqual(self.run_mapper_for_file_path(path, self.EXAMPLE_EVENT), [])
def test_missing_environment_variable(self):
self.task.init_mapper()
self.assertItemsEqual([output for output in self.task.mapper(self.EXAMPLE_EVENT) if output is not None], [])
......@@ -7,10 +7,83 @@ import os
import tempfile
import shutil
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
import luigi
import luigi.hdfs
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask, MapReduceJobTask
from edx.analytics.tasks.tests import unittest
class MapReduceJobTaskTest(unittest.TestCase):
"""Tests for MapReduceJobTask"""
def test_job_with_special_input_targets(self):
lib_jar_path = ['hdfs:///tmp/something.jar']
input_format = 'com.example.SpecialInputFormat'
required_job = TaskWithSpecialOutputs(lib_jar_path=lib_jar_path, input_format=input_format)
job = DynamicRequirementsJob()
job.requirements = [required_job]
runner = job.job_runner()
self.assertItemsEqual(runner.libjars_in_hdfs, lib_jar_path)
self.assertEquals(runner.input_format, input_format)
def test_job_with_different_input_formats(self):
job = DynamicRequirementsJob()
job.requirements = [
TaskWithSpecialOutputs(input_format='foo'),
TaskWithSpecialOutputs(input_format='bar')
]
with self.assertRaises(RuntimeError):
job.job_runner()
def test_multiple_lib_jars(self):
job = DynamicRequirementsJob()
job.requirements = [
TaskWithSpecialOutputs(lib_jar_path=['foo', 'bar']),
TaskWithSpecialOutputs(lib_jar_path=['baz'])
]
runner = job.job_runner()
self.assertItemsEqual(runner.libjars_in_hdfs, ['foo', 'bar', 'baz'])
def test_missing_input_format(self):
job = DynamicRequirementsJob()
job.requirements = [
TaskWithSpecialOutputs(lib_jar_path=['foo'], input_format='com.example.Foo'),
TaskWithSpecialOutputs(lib_jar_path=['baz'])
]
runner = job.job_runner()
self.assertItemsEqual(runner.libjars_in_hdfs, ['foo', 'baz'])
self.assertEquals(runner.input_format, 'com.example.Foo')
class TaskWithSpecialOutputs(luigi.ExternalTask):
"""A task with a single output that requires the use of a configurable library jar and input format."""
lib_jar_path = luigi.Parameter(default=[], is_list=True)
input_format = luigi.Parameter(default=None)
def output(self):
target = luigi.hdfs.HdfsTarget('/tmp/foo')
target.lib_jar = self.lib_jar_path
target.input_format = self.input_format
return target
class DynamicRequirementsJob(MapReduceJobTask):
"""A task with configurable requirements."""
def requires(self):
return self.requirements
class MultiOutputMapReduceJobTaskTest(unittest.TestCase):
"""Tests for MultiOutputMapReduceJobTask."""
......@@ -58,6 +131,10 @@ class MultiOutputMapReduceJobTaskOutputRootTest(unittest.TestCase):
self.output_root = tempfile.mkdtemp()
self.addCleanup(cleanup, self.output_root)
patcher = patch('edx.analytics.tasks.mapreduce.luigi.configuration.get_config')
self.mock_get_config = patcher.start()
self.addCleanup(patcher.stop)
def test_no_delete_output_root(self):
self.assertTrue(os.path.exists(self.output_root))
TestJobTask(
......@@ -67,6 +144,10 @@ class MultiOutputMapReduceJobTaskOutputRootTest(unittest.TestCase):
self.assertTrue(os.path.exists(self.output_root))
def test_delete_output_root(self):
temporary_file_path = tempfile.mkdtemp()
self.mock_get_config.return_value.get.return_value = temporary_file_path
self.addCleanup(shutil.rmtree, temporary_file_path)
# We create a task in order to get the output path.
task = TestJobTask(
mapreduce_engine='local',
......
"""Test selection of event log files."""
import datetime
from mock import patch
from luigi.date_interval import Month
from edx.analytics.tasks.pathutil import EventLogSelectionTask
from edx.analytics.tasks.url import UncheckedExternalURL
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.config import with_luigi_config
class EventLogSelectionTaskTest(unittest.TestCase):
"""Test selection of event log files."""
SOURCE = 's3://collection-bucket/'
SAMPLE_KEY_PATHS = [
'2012-prod-edx-001',
'2012-prod-edx-001/edx.log-20120912.gz',
'2012-prod-edx-001/.tracking_17438.log.gz.JscfpA',
'2012-prod-edx-001/edx.log-20120912.gz',
'2012-prod-edx-001/mnt',
'2012-prod-edx-002/nginx/old_logs/error.log.34.gz',
'2012-prod-edx-003/tracking_14602.log.gz',
'processed/BerkeleyX/prod-edx-002/2012-09-24_BerkeleyX.log.gz',
'processed/BerkeleyX/prod-edx-002/2012-09-24_BerkeleyX.log.gz.gpg',
'processed/exclude.txt',
'processed/testing',
'prod-edge-edxapp-001',
'prod-edge-edxapp-001/tracking.log',
'prod-edge-edxapp-001/tracking.log-20130301.gz',
'prod-edge-edxapp-001/tracking.log-20130823',
'prod-edge-edxapp-001/tracking.log-20140324-1395670621.gz',
'prod-edx-002/tracking.log-20130331.gz',
'prod-edx-006',
'prod-edxapp-004/tracking.log-20140227.gz',
'prod-edxapp-004/tracking.log-20140228.gz',
'prod-edxapp-004/tracking.log-20140318.gz',
'prod-edxapp-004/tracking.log-20140318',
'prod-edxapp-004/tracking.log-20140319-1395256622.gz',
'prod-edxapp-004/tracking.log-20140401-1395254574.gz',
'prod-edxapp-004/tracking.log-20140402-1395645654.gz',
'prod-worker-001',
'prod-worker-001/tracking.log',
'prod-worker-001/tracking.log-20131126.gz',
'prod-worker-001/tracking.log-20140416-1397643421.gz',
'test',
'test/tracking.log.gz',
'tmp/prod-edx-001-mnt-logs.tar.gz',
'tracking.log'
]
COMPLETE_SOURCE_PATHS = [SOURCE + path for path in SAMPLE_KEY_PATHS]
@patch('edx.analytics.tasks.pathutil.boto.connect_s3')
def test_requires(self, connect_s3_mock):
s3_conn_mock = connect_s3_mock.return_value
bucket_mock = s3_conn_mock.get_bucket.return_value
class FakeKey(object):
"""A test double of the structure returned by boto when listing keys in an S3 bucket."""
def __init__(self, path):
self.key = path
self.size = 10
bucket_mock.list.return_value = [FakeKey(path) for path in self.SAMPLE_KEY_PATHS]
task = EventLogSelectionTask(
source=self.SOURCE,
interval=Month.parse('2014-03'),
pattern=r'.*?prod-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz',
expand_interval=datetime.timedelta(0),
)
expected_paths = [
'prod-edxapp-004/tracking.log-20140318.gz',
'prod-edxapp-004/tracking.log-20140319-1395256622.gz',
]
self.assertItemsEqual(task.requires(), [UncheckedExternalURL(self.SOURCE + path) for path in expected_paths])
def test_filtering_of_urls(self):
task = EventLogSelectionTask(
source=self.SOURCE,
interval=Month.parse('2014-03'),
pattern=r'.*?prod-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz',
expand_interval=datetime.timedelta(0),
)
self.assert_only_matched(task, [
'prod-edxapp-004/tracking.log-20140318.gz',
'prod-edxapp-004/tracking.log-20140319-1395256622.gz',
])
def assert_only_matched(self, task, paths):
"""Assert that the task only includes the given paths in the selected set of files."""
matched_urls = []
for url in self.COMPLETE_SOURCE_PATHS:
if task.should_include_url(url):
matched_urls.append(url)
expected_urls = [self.SOURCE + path for path in paths]
self.assertItemsEqual(matched_urls, expected_urls)
def test_edge_urls(self):
task = EventLogSelectionTask(
source=self.SOURCE,
interval=Month.parse('2014-03'),
pattern=r'.*?prod-edge-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz',
expand_interval=datetime.timedelta(0),
)
self.assert_only_matched(task, [
'prod-edge-edxapp-001/tracking.log-20140324-1395670621.gz',
])
def test_expanded_interval(self):
task = EventLogSelectionTask(
source=self.SOURCE,
interval=Month.parse('2014-03'),
pattern=r'.*?prod-(?:edx(?:app)?|worker)-\d{3}/tracking.log-(?P<date>\d{8}).*\.gz',
expand_interval=datetime.timedelta(1),
)
self.assert_only_matched(task, [
'prod-edxapp-004/tracking.log-20140228.gz',
'prod-edxapp-004/tracking.log-20140318.gz',
'prod-edxapp-004/tracking.log-20140319-1395256622.gz',
'prod-edxapp-004/tracking.log-20140401-1395254574.gz',
])
@with_luigi_config('environment:test', 'pattern', 'foobar')
def test_pattern_from_config(self):
task = EventLogSelectionTask(
environment='test',
interval=Month.parse('2014-03')
)
self.assertEquals(task.pattern, 'foobar')
@with_luigi_config('environment:test', 'pattern', 'foobar')
def test_pattern_override(self):
task = EventLogSelectionTask(
environment='test',
interval=Month.parse('2014-03'),
pattern='baz'
)
self.assertEquals(task.pattern, 'baz')
......@@ -11,12 +11,17 @@ from edx.analytics.tasks.tests import unittest
class TargetFromUrlTestCase(unittest.TestCase):
"""Tests for get_target_from_url()."""
def test_hdfs_scheme(self):
for test_url in ['s3://foo/bar', 'hdfs://foo/bar', 's3n://foo/bar']:
def test_s3_scheme(self):
for test_url in ['s3://foo/bar', 's3n://foo/bar']:
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.hdfs.HdfsTarget)
self.assertEquals(target.path, test_url)
def test_hdfs_scheme(self):
target = url.get_target_from_url('hdfs:///foo/bar')
self.assertIsInstance(target, luigi.hdfs.HdfsTarget)
self.assertEquals(target.path, '/foo/bar')
def test_file_scheme(self):
path = '/foo/bar'
for test_url in [path, 'file://' + path]:
......@@ -37,13 +42,6 @@ class TargetFromUrlTestCase(unittest.TestCase):
self.assertEquals(target.path, test_url[:-1])
self.assertEquals(target.format, luigi.hdfs.PlainDir)
def test_gzip_local_file(self):
test_url = '/foo/bar.gz'
target = url.get_target_from_url(test_url)
self.assertIsInstance(target, luigi.LocalTarget)
self.assertEquals(target.path, test_url)
self.assertEquals(target.format, luigi.format.Gzip)
class UrlPathJoinTestCase(unittest.TestCase):
"""Tests for url_path_join()."""
......
......@@ -14,6 +14,7 @@ import os
import urlparse
import luigi
import luigi.configuration
import luigi.format
import luigi.hdfs
import luigi.s3
......@@ -29,6 +30,13 @@ class ExternalURL(luigi.ExternalTask):
return get_target_from_url(self.url)
class UncheckedExternalURL(ExternalURL):
"""A ExternalURL task that does not verify if the source file exists, which can be expensive for S3 URLs."""
def complete(self):
return True
class IgnoredTarget(luigi.hdfs.HdfsTarget):
"""Dummy target for use in Hadoop jobs that produce no explicit output file."""
def __init__(self):
......@@ -58,10 +66,10 @@ def get_target_from_url(url):
kwargs = {}
if issubclass(target_class, luigi.hdfs.HdfsTarget) and url.endswith('/'):
kwargs['format'] = luigi.hdfs.PlainDir
if issubclass(target_class, luigi.LocalTarget):
if issubclass(target_class, luigi.LocalTarget) or parsed_url.scheme == 'hdfs':
# LocalTarget and HdfsTarget both expect paths without any scheme, netloc etc, just bare paths. So strip
# everything else off the url and pass that in to the target.
url = parsed_url.path
if url.endswith('.gz'):
kwargs['format'] = luigi.format.Gzip
if issubclass(target_class, luigi.s3.S3Target):
kwargs['client'] = ScalableS3Client()
......
"""Support running map reduce jobs using a manifest file to store the input paths."""
import logging
import luigi
from luigi import configuration
from luigi import task
from edx.analytics.tasks.url import url_path_join, get_target_from_url, UncheckedExternalURL
CONFIG_SECTION = 'manifest'
log = logging.getLogger(__name__)
class URLManifestTask(luigi.Task):
"""
Support running map reduce jobs using a manifest file to store the input paths.
The operating system has a hard limit on the number and length of arguments passed to a process. When presented with
a very large number of input paths, it is fairly easy to exceed the limit, which prevents the hadoop streaming
system from being able to launch the python subprocess it uses to actually process the data.
The workaround for this problem is to instead provide hadoop with a single input path which is actually a file
containing the list of actual input paths. A custom input format is used to parse this file and blow out the list of
input paths in this file into first class hadoop job input paths. This file is called a "manifest" file.
This task requires some presumably large set of input paths and provides a manifest for those paths. The output
from this task can be used as input for a hadoop job as long as the custom input format is used.
"""
urls = luigi.Parameter(is_list=True, default=[])
def requires(self):
return [UncheckedExternalURL(url) for url in self.urls]
def output(self):
config = configuration.get_config()
base_url = config.get(CONFIG_SECTION, 'path')
target = get_target_from_url(url_path_join(base_url, str(hash(self))) + '.manifest')
lib_jar = config.get(CONFIG_SECTION, 'lib_jar', None)
if lib_jar:
target.lib_jar = [lib_jar]
input_format = config.get(CONFIG_SECTION, 'input_format', None)
if input_format:
target.input_format = input_format
return target
def run(self):
with self.output().open('w') as manifest_file:
for url in self.urls:
manifest_file.write(url)
manifest_file.write('\n')
def convert_tasks_to_manifest_if_necessary(input_tasks): # pylint: disable=invalid-name
"""
Provide a manifest for the input paths if there are too many of them.
The configuration section "manifest" can contain a "threshold" option which, when exceeded, causes this function
to return a URLManifestTask instead of the original input_tasks.
"""
all_input_tasks = task.flatten(input_tasks)
targets = task.flatten(task.getpaths(all_input_tasks))
threshold = configuration.get_config().getint(CONFIG_SECTION, 'threshold', -1)
if threshold > 0 and len(targets) >= threshold:
log.debug(
'Using manifest since %d inputs are greater than or equal to the threshold %d', len(targets), threshold
)
return [URLManifestTask(urls=[target.path for target in targets])]
else:
log.debug(
'Directly processing files since %d inputs are less than the threshold %d', len(targets), threshold
)
return all_input_tasks
"""Ensure manifest files are created appropriately."""
import luigi
from mock import patch
from edx.analytics.tasks.url import UncheckedExternalURL
from edx.analytics.tasks.util.manifest import URLManifestTask, convert_tasks_to_manifest_if_necessary
from edx.analytics.tasks.tests import unittest
from edx.analytics.tasks.tests.config import with_luigi_config, OPTION_REMOVED
from edx.analytics.tasks.tests.target import FakeTarget
class URLManifestTaskTest(unittest.TestCase):
"""Ensure manifest files are created appropriately."""
SOURCE_URL = 's3://foo/bar'
MANIFEST_BASE_PATH = '/tmp/manifest'
def setUp(self):
self.task = URLManifestTask(urls=[self.SOURCE_URL])
self.expected_path = '{0}/{1}.manifest'.format(self.MANIFEST_BASE_PATH, hash(self.task))
@with_luigi_config(
('manifest', 'path', MANIFEST_BASE_PATH),
('manifest', 'lib_jar', '/tmp/foo.jar'),
('manifest', 'input_format', 'com.example.SimpleFormat')
)
def test_annotate_output_target(self):
target = self.task.output()
self.assertEquals(target.path, self.expected_path)
self.assertEquals(target.lib_jar, ['/tmp/foo.jar'])
self.assertEquals(target.input_format, 'com.example.SimpleFormat')
@with_luigi_config(
('manifest', 'path', MANIFEST_BASE_PATH),
('manifest', 'lib_jar', OPTION_REMOVED),
('manifest', 'input_format', OPTION_REMOVED)
)
def test_parameters_not_configured(self):
target = self.task.output()
self.assertEquals(target.path, self.expected_path)
self.assertFalse(hasattr(target, 'lib_jar'))
self.assertFalse(hasattr(target, 'input_format'))
@patch('edx.analytics.tasks.util.manifest.get_target_from_url')
def test_manifest_file_construction(self, get_target_from_url_mock):
fake_target = FakeTarget()
get_target_from_url_mock.return_value = fake_target
self.task.run()
content = fake_target.buffer.read()
self.assertEquals(content, self.SOURCE_URL + '\n')
def test_requirements(self):
self.assertItemsEqual(self.task.requires(), [UncheckedExternalURL(self.SOURCE_URL)])
class ConversionTest(unittest.TestCase):
"""Ensure large numbers of inputs are correctly converted into manifest tasks when appropriate."""
@with_luigi_config('manifest', 'threshold', 1)
def test_over_threshold(self):
tasks = convert_tasks_to_manifest_if_necessary([FakeTask(), FakeTask()])
self.assertEquals(len(tasks), 1)
self.assertIsInstance(tasks[0], URLManifestTask)
@with_luigi_config('manifest', 'threshold', 3)
def test_under_threshold(self):
tasks = convert_tasks_to_manifest_if_necessary([FakeTask(), FakeTask()])
self.assertEquals(len(tasks), 2)
self.assertIsInstance(tasks[0], FakeTask)
self.assertIsInstance(tasks[1], FakeTask)
@with_luigi_config('manifest', 'threshold', 2)
def test_task_with_many_targets(self):
class MultiTargetTask(luigi.ExternalTask):
"""A fake task with multiple outputs."""
def output(self):
return [
luigi.LocalTarget('/tmp/foo'),
luigi.LocalTarget('/tmp/bar'),
]
tasks = convert_tasks_to_manifest_if_necessary(MultiTargetTask())
self.assertEquals(len(tasks), 1)
self.assertIsInstance(tasks[0], URLManifestTask)
@with_luigi_config('manifest', 'threshold', -1)
def test_negative_threshold(self):
tasks = convert_tasks_to_manifest_if_necessary([FakeTask(), FakeTask()])
self.assertEquals(len(tasks), 2)
self.assertIsInstance(tasks[0], FakeTask)
self.assertIsInstance(tasks[1], FakeTask)
@with_luigi_config('manifest', 'threshold', OPTION_REMOVED)
def test_threshold_not_set(self):
tasks = convert_tasks_to_manifest_if_necessary([FakeTask(), FakeTask()])
self.assertEquals(len(tasks), 2)
self.assertIsInstance(tasks[0], FakeTask)
self.assertIsInstance(tasks[1], FakeTask)
class FakeTask(luigi.ExternalTask):
"""A fake task with a single output target."""
def output(self):
return luigi.LocalTarget('/tmp/foo')
......@@ -21,8 +21,8 @@ handlers=localHandler
[logger_edx_analytics]
# Errors from edx/analytics get routed to stderr.
level=WARNING
handlers=stderrHandler
level=DEBUG
handlers=stderrHandler,localHandler
qualname=edx.analytics
propagate=0
......
......@@ -57,7 +57,11 @@ disable=
# R0912: Too many branches
# R0913: Too many arguments
# R0914: Too many local variables
C0302,R0201,R0901,R0902,R0903,R0904,R0911,R0912,R0913,R0914,E1103
C0302,R0201,R0901,R0902,R0903,R0904,R0911,R0912,R0913,R0914,E1103,
# These don't really make sense in Luigi code
# W0201: Attribute 'foo' defined outside of __init__
W0201
[REPORTS]
......
......@@ -21,9 +21,6 @@ console_scripts =
remote-task = edx.analytics.tasks.launchers.remote:main
edx.analytics.tasks =
s3-copy = edx.analytics.tasks.s3:S3Copy
s3-sync = edx.analytics.tasks.s3:S3Sync
sync-events = edx.analytics.tasks.eventlogs:SyncEventLogs
enrollments-report = edx.analytics.tasks.reports.enrollments:EnrollmentsByWeek
total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments
inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments
......@@ -33,8 +30,10 @@ edx.analytics.tasks =
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
last-country = edx.analytics.tasks.user_location:LastCountryForEachUser
export-events = edx.analytics.tasks.event_exports:EventExportTask
encrypt-exports = edx.analytics.tasks.encrypt:FakeEventExportWithEncryptionTask
mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
local = luigi.hadoop:LocalJobRunner
emu = edx.analytics.tasks.mapreduce:EmulatedMapReduceJobRunner
......@@ -21,8 +21,8 @@ handlers=localHandler
[logger_edx_analytics]
# Errors from edx/analytics get routed to stderr.
level=WARNING
handlers=stderrHandler
level=DEBUG
handlers=stderrHandler,localHandler
qualname=edx.analytics
propagate=0
......
......@@ -21,17 +21,23 @@
- working_repo_dir: "{{ working_dir }}/repo"
- working_venv_dir: "{{ working_dir }}/venv"
- task_arguments: ''
- git_server_hostname: github.com
- git_server_ip_address: 207.97.227.239
- git_server_public_key: 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccKrpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFzLQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaSjB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3skua2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ=='
- git_servers:
# TODO: edx specific, move elsewhere
- hostname: github.com
public_key: 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccKrpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFzLQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaSjB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3skua2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ=='
- hostname: review.edx.org
public_key: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDP6AOufe+CnkwwQs+zZtsDgiWnJykJ0TLoOICqQvimLRynsADmCuJpfMs4QvlYQQwWwSxBiO5wKhL0RmnPDV2ela+AHUD+SR7yPXCc1oSIAnm9k6IE1s0aFNKDdUv2nxnD8AiCQ6pGihSwth4bHgzdsTtpOPx+fgaAwuGdFCDMKQ=='
- wait_for_task: False
- local_log_dir: build/logs
- install_env:
# TODO: edx specific, move elsewhere
# EMR runs a modified version of Debian 6 (squeeze)
WHEEL_URL: http://edx-wheelhouse.s3-website-us-east-1.amazonaws.com/Debian/squeeze
# EMR ships with python 2.6 (unfortunately)
WHEEL_PYVER: 2.6
# - override_config: path/to/config.cfg (optionally adds a luigi config override)
tasks:
- name: known_hosts file exists
command: touch /home/{{ ansible_ssh_user }}/.ssh/known_hosts creates=/home/{{ ansible_ssh_user }}/.ssh/known_hosts
......@@ -39,8 +45,9 @@
- name: git server in known_hosts file
lineinfile: >
dest=/home/{{ ansible_ssh_user }}/.ssh/known_hosts
regexp=^github.com
line="{{ git_server_hostname }},{{ git_server_ip_address }} {{ git_server_public_key }}"
regexp=^{{item.hostname}}
line="{{ item.hostname }} {{ item.public_key }}"
with_items: git_servers
- name: root directories created
file: path={{ item }} state=directory owner=root group=root
......@@ -101,6 +108,13 @@
- name: logging configured
template: src=logging.cfg.j2 dest={{ working_repo_dir }}/logging.cfg
- name: configuration override removed
file: path={{ working_repo_dir }}/override.cfg state=absent
- name: configuration override installed
copy: src={{ override_config }} dest={{ working_repo_dir }}/override.cfg mode=644
when: override_config is defined
# Unfortunately, we cannot make the poll value a variable because of this open issue:
# https://github.com/ansible/ansible/issues/255
# As a workaround, we define two versions of this play, then choose
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment