Commit f073aabb by Brian Wilson

Define StudentModulePerCourseTask and StudentModulePerCourseAfterImportWorkflow.

* Update StudentModulePerCourseTask to use multipart upload.

* Move extra_modules calls into launcher's attach call: add boto,
  cjson, and filechunkio to argparse.

* Fix bug in PathSetTask.generate_file_list() with redundant root in
  returned path.

Change-Id: I8861ccfb02ee10c5188e11c9ceb86eca9f7b3363
parent 26818e3f
...@@ -592,11 +592,8 @@ class BaseAnswerDistributionTask(MapReduceJobTask): ...@@ -592,11 +592,8 @@ class BaseAnswerDistributionTask(MapReduceJobTask):
manifest = luigi.Parameter(default=None) manifest = luigi.Parameter(default=None)
def extra_modules(self): def extra_modules(self):
# Boto is used for S3 access and cjson for parsing log files.
import boto
import cjson
import six import six
return [boto, cjson, html5lib, six] return [html5lib, six]
class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTask): class LastProblemCheckEvent(LastProblemCheckEventMixin, BaseAnswerDistributionTask):
...@@ -747,10 +744,8 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask): ...@@ -747,10 +744,8 @@ class AnswerDistributionOneFilePerCourseTask(MultiOutputMapReduceJobTask):
writer.writerow(encoded_dict) writer.writerow(encoded_dict)
def extra_modules(self): def extra_modules(self):
import cjson
import boto
import six import six
return [boto, cjson, html5lib, six] return [html5lib, six]
################################ ################################
......
...@@ -185,14 +185,6 @@ class BaseCourseEnrollmentTask(MapReduceJobTask): ...@@ -185,14 +185,6 @@ class BaseCourseEnrollmentTask(MapReduceJobTask):
# number of arguments passed to the mapper process on the task nodes. # number of arguments passed to the mapper process on the task nodes.
manifest = luigi.Parameter(default=None) manifest = luigi.Parameter(default=None)
def extra_modules(self):
# The following are needed for (almost) every course enrollment task.
# Boto is used for S3 access, cjson for parsing log files, and util
# is used for parsing events and date conversion.
import boto
import cjson
return [boto, cjson]
class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask): class CourseEnrollmentEventsPerDay(CourseEnrollmentEventsPerDayMixin, BaseCourseEnrollmentTask):
"""Calculates daily change in enrollment for a user in a course, given raw event log input.""" """Calculates daily change in enrollment for a user in a course, given raw event log input."""
......
"""
Tasks to split database exports in different groups, per class,
per organization, etc.
"""
import csv
from collections import namedtuple
import logging
import luigi
from edx.analytics.tasks.mapreduce import MultiOutputMapReduceJobTask
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.sqoop import SqoopImportFromMysql
from edx.analytics.tasks.util import csv_util
from edx.analytics.tasks.url import url_path_join
log = logging.getLogger(__name__)
# Increase maximum number of characters per field since we have
# entries that easily exceed the default value of 124 KB.
FIELD_SIZE_LIMIT = 4 * 1024 * 1024 # 4 MB
csv.field_size_limit(FIELD_SIZE_LIMIT)
# Helpers for the courseware student module table.
STUDENT_MODULE_FIELDS = [
'id',
'module_type',
'module_id',
'student_id',
'state',
'grade',
'created',
'modified',
'max_grade',
'done',
'course_id'
]
StudentModuleRecord = namedtuple('StudentModuleRecord', STUDENT_MODULE_FIELDS)
class StudentModulePerCourseTask(MultiOutputMapReduceJobTask):
"""
Separates a raw SQL dump of a courseware_studentmodule table into
a different tsv file for each course.
Parameters:
dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification.
"""
dump_root = luigi.Parameter()
output_root = luigi.Parameter()
output_suffix = luigi.Parameter(default=None)
def requires(self):
return PathSetTask(self.dump_root)
def mapper(self, line):
"""
Extract course and reformat each line.
Returns:
key: course_id
value: tab separated row data
"""
values = csv_util.parse_line(line, dialect='mysqldump')
record = StudentModuleRecord(*values)
course_id = record.course_id
# Convert to a tab separated row
tab_separated_row = csv_util.to_csv_line(record, dialect='mysqlpipe')
yield course_id, tab_separated_row
def multi_output_reducer(self, _key, rows, output_file):
"""
Save one file per course_id.
"""
header = '\t'.join(STUDENT_MODULE_FIELDS)
output_file.write(header)
output_file.write('\n')
for row in rows:
output_file.write(row)
output_file.write('\n')
def output_path_for_key(self, course_id):
template = "{course_id}-courseware_studentmodule-{suffix}analytics.sql"
filename = template.format(
course_id=course_id.replace('/', '-'),
suffix=(self.output_suffix + '-') if self.output_suffix else ''
)
return url_path_join(self.output_root, filename)
class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask):
"""
Generates a raw SQL dump of a courseware_studentmodule table
and separates it into a different tsv file for each course.
Parameters:
dump_root: a URL location of the database dump.
output_root: a URL location where the split files will be stored.
output_suffix: added to the filenames for identification.
credentials: Path to the external access credentials file.
num_mappers: The number of map tasks to ask Sqoop to use.
where: A 'where' clause to be passed to Sqoop.
"""
credentials = luigi.Parameter() # TODO: move to config
num_mappers = luigi.Parameter(default=None) # TODO: move to config
where = luigi.Parameter(default=None)
def requires(self):
return SqoopImportFromMysql(
credentials=self.credentials,
destination=self.dump_root,
table_name='courseware_studentmodule',
num_mappers=self.num_mappers,
where=self.where
)
...@@ -25,6 +25,8 @@ import logging ...@@ -25,6 +25,8 @@ import logging
import boto import boto
import argparse import argparse
import filechunkio
import cjson
import luigi import luigi
import luigi.configuration import luigi.configuration
...@@ -51,8 +53,11 @@ def main(): ...@@ -51,8 +53,11 @@ def main():
log.warning('Default configuration file not found: %s', DEFAULT_CONFIGURATION_FILE) log.warning('Default configuration file not found: %s', DEFAULT_CONFIGURATION_FILE)
# Tell luigi what dependencies to pass to the Hadoop nodes # Tell luigi what dependencies to pass to the Hadoop nodes
# - argparse is not included by default in python 2.6 # - argparse is not included by default in python 2.6, but is required by luigi.
luigi.hadoop.attach(argparse) # - boto is used for all direct interactions with s3.
# - cjson is used for all parsing event logs.
# - filechunkio is used for multipart uploads of large files to s3.
luigi.hadoop.attach(argparse, boto, cjson, filechunkio)
# TODO: setup logging for tasks or configured logging mechanism # TODO: setup logging for tasks or configured logging mechanism
......
...@@ -43,7 +43,7 @@ class PathSetTask(luigi.Task): ...@@ -43,7 +43,7 @@ class PathSetTask(luigi.Task):
if self.s3_conn is None: if self.s3_conn is None:
self.s3_conn = boto.connect_s3() self.s3_conn = boto.connect_s3()
for _bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include): for _bucket, root, path in generate_s3_sources(self.s3_conn, self.src, self.include):
source = url_path_join(self.src, root, path) source = url_path_join(self.src, path)
yield ExternalURL(source) yield ExternalURL(source)
else: else:
filelist = [] filelist = []
......
...@@ -27,9 +27,6 @@ class S3Copy(luigi.Task): ...@@ -27,9 +27,6 @@ class S3Copy(luigi.Task):
super(S3Copy, self).__init__(*args, **kwargs) super(S3Copy, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3() self.s3 = boto.connect_s3()
def extra_modules(self):
return [boto]
def requires(self): def requires(self):
return luigi.s3.S3PathTask(self.source) return luigi.s3.S3PathTask(self.source)
...@@ -102,9 +99,6 @@ class S3Sync(luigi.Task): ...@@ -102,9 +99,6 @@ class S3Sync(luigi.Task):
super(S3Sync, self).__init__(*args, **kwargs) super(S3Sync, self).__init__(*args, **kwargs)
self.s3 = boto.connect_s3() self.s3 = boto.connect_s3()
def extra_modules(self):
return [boto]
def requires(self): def requires(self):
for bucket, root, path in generate_s3_sources(self.s3, self.source, self.include): for bucket, root, path in generate_s3_sources(self.s3, self.source, self.include):
source = join_as_s3_url(bucket, root, path) source = join_as_s3_url(bucket, root, path)
......
""" """
Utility methods for interacting with S3 via boto. Utility methods for interacting with S3 via boto.
""" """
import os
import math
import logging
from fnmatch import fnmatch from fnmatch import fnmatch
from urlparse import urlparse from urlparse import urlparse
from boto.s3.key import Key from boto.s3.key import Key
from filechunkio import FileChunkIO
from luigi.s3 import S3Client, AtomicS3File from luigi.s3 import S3Client, AtomicS3File
from luigi.hdfs import HdfsTarget, Plain
import luigi.hdfs
log = logging.getLogger(__name__)
# S3 does not permit using "put" for files larger than 5 GB, and
# returns a socket error. There is also a chance that smaller files
# might also fail. Arbitrarily choose a threshold so that files
# larger than 1GB should use multipart upload instead of a single put.
MULTIPART_UPLOAD_THRESHOLD = 1 * 1024 * 1024 * 1024
# Multipart upload algorithm taken from
# https://gist.github.com/fabiant7t/924094, which
# defines a minimum chunk size for multipart upload.
MINIMUM_BYTES_PER_CHUNK = 5242880
# By default, AWS does not apply an ACL to keys that are put into a
# bucket from another account. Having no ACL at all effectively
# renders the object useless since it cannot be read or anything. The
# only workaround we found was to explicitly set the ACL policy when
# putting the object. Define here what that policy will be.
DEFAULT_KEY_ACCESS_POLICY = 'bucket-owner-full-control'
def get_s3_bucket_key_names(url): def get_s3_bucket_key_names(url):
...@@ -73,43 +97,105 @@ def _filter_matches(patterns, names): ...@@ -73,43 +97,105 @@ def _filter_matches(patterns, names):
return (n for n in names if func(n)) return (n for n in names if func(n))
class RestrictedPermissionsS3Client(S3Client): class ScalableS3Client(S3Client):
""" """
S3 client that requires minimal permissions to write objects to a bucket. S3 client that adds support for multipart uploads and requires minimal permissions.
It should only require PutObject and PutObjectAcl permissions in order to write to the target bucket. Uses S3 multipart upload API for large files, and regular S3 puts for smaller files.
This client should only require PutObject and PutObjectAcl permissions in order to write to the target bucket.
""" """
# TODO: Make this behavior configurable and submit this change upstream. # TODO: Make this behavior configurable and submit this change upstream.
def put(self, local_path, destination_s3_path): def put(self, local_path, destination_s3_path):
"""Put an object stored locally to an S3 path.""" """Put an object stored locally to an S3 path."""
# parse path into bucket and key
(bucket, key) = self._path_to_bucket_and_key(destination_s3_path) (bucket, key) = self._path_to_bucket_and_key(destination_s3_path)
# Boto will list all of the keys in the bucket if it is passed "validate=True" this requires an additional # If Boto is passed "validate=True", it will require an
# permission. We want to minimize the set of required permissions so we get a reference to the bucket without # additional permission to be present when asked to list all
# validating that it exists. # of the keys in the bucket. We want to minimize the set of
# required permissions so we get a reference to the bucket
# without validating that it exists. It should only require
# PutObject and PutObjectAcl permissions in order to write to
# the target bucket.
s3_bucket = self.s3.get_bucket(bucket, validate=False) s3_bucket = self.s3.get_bucket(bucket, validate=False)
# By default, AWS does not apply an ACL to keys that are put into a bucket from another account. Having no ACL # Check first if we should be doing a multipart upload.
# at all effectively renders the object useless since it cannot be read or anything. The only workaround we source_size_bytes = os.stat(local_path).st_size
# found was to explicitly set the ACL policy when putting the object. if source_size_bytes < MULTIPART_UPLOAD_THRESHOLD:
self._upload_single(local_path, s3_bucket, key)
else:
log.info("File '%s' has size %d, exceeding threshold %d for using put -- using multipart upload.",
destination_s3_path, source_size_bytes, MULTIPART_UPLOAD_THRESHOLD)
self._upload_multipart(local_path, destination_s3_path, s3_bucket, key, source_size_bytes)
def _upload_single(self, local_path, s3_bucket, key):
"""
Write a local file to an S3 key using single PUT.
This only works for files < 5GB in size.
"""
s3_key = Key(s3_bucket) s3_key = Key(s3_bucket)
s3_key.key = key s3_key.key = key
s3_key.set_contents_from_filename(local_path, policy='bucket-owner-full-control') # Explicitly set the ACL policy when putting the object, so
# that it has an ACL when AWS writes to keys from another account.
s3_key.set_contents_from_filename(local_path, policy=DEFAULT_KEY_ACCESS_POLICY)
def _upload_multipart(self, local_path, destination_s3_path, s3_bucket, key, source_size_bytes):
"""Upload a large local file to an S3 path, using S3's multipart upload API."""
# Explicitly set the ACL policy when putting the object, so
# that it has an ACL when AWS writes to keys from another account.
multipart = s3_bucket.initiate_multipart_upload(key, policy=DEFAULT_KEY_ACCESS_POLICY)
number_of_chunks, bytes_per_chunk = self._get_chunk_specs(source_size_bytes)
log.info("Uploading file '%s' with size %d in %d parts, with chunksize of %d.",
destination_s3_path, source_size_bytes, number_of_chunks, bytes_per_chunk)
class S3HdfsTarget(luigi.hdfs.HdfsTarget): chunk_generator = self._generate_chunks(source_size_bytes, number_of_chunks, bytes_per_chunk)
for part_num, chunk_byte_offset, num_bytes in chunk_generator:
with FileChunkIO(local_path, 'r', offset=chunk_byte_offset, bytes=num_bytes) as chunk:
multipart.upload_part_from_file(fp=chunk, part_num=part_num)
if len(multipart.get_all_parts()) == number_of_chunks:
multipart.complete_upload()
else:
multipart.cancel_upload()
def _get_chunk_specs(self, source_size_bytes):
"""Returns number of chunks and bytes-per-chunk given a filesize."""
# Select a chunk size, so that the chunk size grows with the overall size, but
# more slowly. (Scale so that it equals the minimum chunk size.)
bytes_per_chunk = int(math.sqrt(MINIMUM_BYTES_PER_CHUNK) * math.sqrt(source_size_bytes))
bytes_per_chunk = min(max(bytes_per_chunk, MINIMUM_BYTES_PER_CHUNK), MULTIPART_UPLOAD_THRESHOLD)
number_of_chunks = int(math.ceil(source_size_bytes / float(bytes_per_chunk)))
return number_of_chunks, bytes_per_chunk
def _generate_chunks(self, source_size_bytes, number_of_chunks, bytes_per_chunk):
"""Returns the index, offset, and size of chunks."""
for chunk_index in range(number_of_chunks):
chunk_byte_offset = chunk_index * bytes_per_chunk
remaining_bytes_in_file = source_size_bytes - chunk_byte_offset
num_bytes = min([bytes_per_chunk, remaining_bytes_in_file])
# indexing of parts is one-based.
yield chunk_index + 1, chunk_byte_offset, num_bytes
class S3HdfsTarget(HdfsTarget):
"""HDFS target that supports writing and reading files directly in S3.""" """HDFS target that supports writing and reading files directly in S3."""
# Luigi does not support writing to HDFS targets that point to complete URLs like "s3://foo/bar" it only supports # Luigi does not support writing to HDFS targets that point to complete URLs like "s3://foo/bar" it only supports
# HDFS paths that look like standard file paths "/foo/bar". Once this bug is fixed this class is no longer # HDFS paths that look like standard file paths "/foo/bar".
# necessary.
# (This class also provides a customized implementation for S3Client.)
# TODO: Fix the upstream bug in luigi that prevents writing to HDFS files that are specified by complete URLs # TODO: Fix the upstream bug in luigi that prevents writing to HDFS files that are specified by complete URLs
def __init__(self, path=None, format=luigi.hdfs.Plain, is_tmp=False): def __init__(self, path=None, format=Plain, is_tmp=False):
super(S3HdfsTarget, self).__init__(path=path, format=format, is_tmp=is_tmp) super(S3HdfsTarget, self).__init__(path=path, format=format, is_tmp=is_tmp)
self.s3_client = RestrictedPermissionsS3Client() self.s3_client = ScalableS3Client()
def open(self, mode='r'): def open(self, mode='r'):
if mode not in ('r', 'w'): if mode not in ('r', 'w'):
......
"""
Tests for database export tasks
"""
from edx.analytics.tasks.database_exports import StudentModulePerCourseTask
from edx.analytics.tasks.database_exports import STUDENT_MODULE_FIELDS
from edx.analytics.tasks.tests import unittest
from mock import Mock
STATE_MYSQLDUMP = '\'{\\"answer\\": {\\"code\\": \\"print(\\\'hello world\\\')\\\\r\\\\n\\\\t\\", \\"score\\": 1.0}} ' \
'\\"msg\\": \\"\\\\n<div class=\\\\\\"test\\\\\\">\\\\nTest\\\\n</div>\\\\n\\", \\"num\\": 100}\''
STATE_EXPORT = '{"answer": {"code": "print(\'hello world\')\\\\r\\\\n\\\\t", "score": 1.0}} ' \
'"msg": "\\\\n<div class=\\\\"test\\\\">\\\\nTest\\\\n</div>\\\\n", "num": 100}'
STUDENT_MODULE_MYSQLDUMP = {
'id': 10,
'module_type': 'problem',
'module_id': 'i4x://a/module/id',
'student_id': 20,
'state': STATE_MYSQLDUMP,
'grade': 'NULL',
'created': '2012-08-23 18:31:56',
'modified': '2012-08-23 18:31:56',
'max_grade': 3,
'done': 'na',
'course_id': 'a/course/id'
}
class StudentModulePerCourseTestCase(unittest.TestCase):
"""Tests for StudentModulePerCourseTask."""
def setUp(self):
self.task = StudentModulePerCourseTask(
mapreduce_engine='local',
dump_root='test://dump_root',
output_root='test://output/',
output_suffix='test',
)
def test_mapper(self):
data = STUDENT_MODULE_MYSQLDUMP
line = ','.join(str(data[k]) for k in STUDENT_MODULE_FIELDS)
key, value = self.task.mapper(line).next()
course_id = data['course_id']
self.assertEqual(key, course_id)
data['state'] = STATE_EXPORT
export = '\t'.join(str(data[k]) for k in STUDENT_MODULE_FIELDS)
self.assertEqual(value, export)
def test_multi_output_reducer(self):
mock_output_file = Mock()
rows = [str(i) for i in xrange(5)]
self.task.multi_output_reducer('key', rows, mock_output_file)
# Verify addition of new lines at the end of each row
calls = mock_output_file.write.mock_calls
get_argument = lambda call: call[1][0]
result = ''.join(get_argument(c) for c in calls).split('\n')
result_header = result[0]
result_body = '\n'.join(result[1:])
expected_header = '\t'.join(STUDENT_MODULE_FIELDS)
self.assertEqual(result_header, expected_header)
expected_body = ''.join(r + '\n' for r in rows)
self.assertEqual(result_body, expected_body)
def test_output_path(self):
course_id = 'Sample/Course/ID'
filename = self.task.output_path_for_key(course_id)
expected = 'test://output/Sample-Course-ID-courseware_studentmodule-test-analytics.sql'
self.assertEqual(filename, expected)
def test_empty_output_path(self):
task = StudentModulePerCourseTask(
mapreduce_engine='local',
dump_root='test://dump_root',
output_root='test://output'
)
course_id = 'Sample/Course/ID'
filename = task.output_path_for_key(course_id)
expected = 'test://output/Sample-Course-ID-courseware_studentmodule-analytics.sql'
self.assertEqual(filename, expected)
"""Tests for S3--related utility functionality."""
from mock import MagicMock
from mock import patch
from mock import sentinel
import luigi
import luigi.format
import luigi.hdfs
import luigi.s3
from edx.analytics.tasks import s3_util
from edx.analytics.tasks.tests import unittest
class GenerateS3SourcesTestCase(unittest.TestCase):
"""Tests for generate_s3_sources()."""
def _make_key(self, keyname, size):
s3_key = MagicMock()
s3_key.key = keyname
s3_key.size = size
return s3_key
def _make_s3_generator(self, bucket_name, root, path_info, patterns):
s3_conn = MagicMock()
s3_bucket = MagicMock()
s3_conn.get_bucket = MagicMock(return_value=s3_bucket)
target_list = [self._make_key("{root}/{path}".format(root=root, path=path), size)
for path, size in path_info.iteritems()]
s3_bucket.list = MagicMock(return_value=target_list)
print [(k.key, k.size) for k in target_list]
s3_bucket.name = bucket_name
source = "s3://{bucket}/{root}/".format(bucket=bucket_name, root=root)
generator = s3_util.generate_s3_sources(s3_conn, source, patterns)
output = list(generator)
return output
def test_normal_generate(self):
bucket_name = "bucket_name"
root = "root1/root2"
path_info = {
"subdir1/path1": 1000,
"path2": 2000,
}
patterns = ['*']
output = self._make_s3_generator(bucket_name, root, path_info, patterns)
self.assertEquals(len(output), 2)
self.assertEquals(set(output),
set([(bucket_name, root, "subdir1/path1"), (bucket_name, root, "path2")]))
def test_generate_with_pattern_filtering(self):
bucket_name = "bucket_name"
root = "root1/root2"
path_info = {
"subdir1/path1": 1000,
"path2": 2000,
}
patterns = ['*1']
output = self._make_s3_generator(bucket_name, root, path_info, patterns)
self.assertEquals(len(output), 1)
self.assertEquals(output, [(bucket_name, root, "subdir1/path1")])
def test_generate_with_size_filtering(self):
bucket_name = "bucket_name"
root = "root1/root2"
path_info = {
"subdir1/path1": 1000,
"path2": 0,
}
patterns = ['*1']
output = self._make_s3_generator(bucket_name, root, path_info, patterns)
self.assertEquals(len(output), 1)
self.assertEquals(output, [(bucket_name, root, "subdir1/path1")])
class ScalableS3ClientTestCase(unittest.TestCase):
"""Tests for ScalableS3Client class."""
def setUp(self):
self.client = s3_util.ScalableS3Client()
def _assert_get_chunk_specs(self, source_size_bytes, expected_num_chunks, expected_chunk_size):
number_of_chunks, bytes_per_chunk = self.client._get_chunk_specs(source_size_bytes)
self.assertEquals(number_of_chunks, expected_num_chunks)
self.assertEquals(bytes_per_chunk, expected_chunk_size)
def test_get_minimum_chunk_specs(self):
self._assert_get_chunk_specs(1, 1, s3_util.MINIMUM_BYTES_PER_CHUNK)
self._assert_get_chunk_specs(s3_util.MINIMUM_BYTES_PER_CHUNK, 1, s3_util.MINIMUM_BYTES_PER_CHUNK)
def test_get_maximum_chunk_specs(self):
size = ((s3_util.MULTIPART_UPLOAD_THRESHOLD * s3_util.MULTIPART_UPLOAD_THRESHOLD)
/ s3_util.MINIMUM_BYTES_PER_CHUNK) + 1000
self._assert_get_chunk_specs(size, 205, s3_util.MULTIPART_UPLOAD_THRESHOLD)
size *= 2
self._assert_get_chunk_specs(size, 410, s3_util.MULTIPART_UPLOAD_THRESHOLD)
def test_generate_even_chunks(self):
generator = self.client._generate_chunks(1000, 4, 250)
output = list(generator)
expected_output = [(1, 0, 250), (2, 250, 250), (3, 500, 250), (4, 750, 250)]
self.assertEquals(output, expected_output)
def test_generate_uneven_chunks(self):
generator = self.client._generate_chunks(900, 4, 250)
output = list(generator)
expected_output = [(1, 0, 250), (2, 250, 250), (3, 500, 250), (4, 750, 150)]
self.assertEquals(output, expected_output)
...@@ -18,7 +18,7 @@ import luigi.format ...@@ -18,7 +18,7 @@ import luigi.format
import luigi.hdfs import luigi.hdfs
import luigi.s3 import luigi.s3
from edx.analytics.tasks.s3_util import RestrictedPermissionsS3Client, S3HdfsTarget from edx.analytics.tasks.s3_util import ScalableS3Client, S3HdfsTarget
class ExternalURL(luigi.ExternalTask): class ExternalURL(luigi.ExternalTask):
...@@ -63,7 +63,7 @@ def get_target_from_url(url): ...@@ -63,7 +63,7 @@ def get_target_from_url(url):
if url.endswith('.gz'): if url.endswith('.gz'):
kwargs['format'] = luigi.format.Gzip kwargs['format'] = luigi.format.Gzip
if issubclass(target_class, luigi.s3.S3Target): if issubclass(target_class, luigi.s3.S3Target):
kwargs['client'] = RestrictedPermissionsS3Client() kwargs['client'] = ScalableS3Client()
url = url.rstrip('/') url = url.rstrip('/')
return target_class(url, **kwargs) return target_class(url, **kwargs)
......
"""
Simple CSV utilities.
"""
import csv
from StringIO import StringIO
class MySQLDumpDialect(csv.Dialect):
"""CSV dialect for files created by mysqldump"""
delimiter = ','
doublequote = False
escapechar = "\\"
lineterminator = "\n"
quotechar = "\'"
quoting = csv.QUOTE_ALL
skipinitialspace = False
strict = True
class MySQLPipeDialect(csv.Dialect):
"""
CSV Dialect for files created by piping the output of a mysql
command query to a file.
"""
delimiter = '\t'
doublequote = False
escapechar = "\\"
lineterminator = "\n"
quotechar = None
quoting = csv.QUOTE_NONE
skipinitialspace = False
strict = True
DIALECTS = {
'mysqldump': MySQLDumpDialect,
'mysqlpipe': MySQLPipeDialect
}
for dialect_name, dialect_class in DIALECTS.iteritems():
csv.register_dialect(dialect_name, dialect_class)
def parse_line(line, dialect='excel'):
"""Parse one line of CSV in the dialect specified."""
# csv.reader requires an iterable per row, so we wrap the line in a list
parsed = csv.reader([line], dialect=dialect).next()
return parsed
def to_csv_line(row, dialect='excel'):
"""Return a CSV line by joining the values in row in the dialect specified."""
output = StringIO()
csv.writer(output, dialect=dialect).writerow(row)
output.seek(0)
return output.read().strip()
...@@ -9,5 +9,5 @@ ansible==1.4.4 ...@@ -9,5 +9,5 @@ ansible==1.4.4
python-cjson==1.0.5 python-cjson==1.0.5
oursql==0.9.3.1 oursql==0.9.3.1
html5lib==1.0b3 html5lib==1.0b3
filechunkio==1.5
-e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi -e git+https://github.com/spotify/luigi.git@a33756c781b9bf7e51384f0eb19d6a25050ef136#egg=luigi
...@@ -28,8 +28,10 @@ edx.analytics.tasks = ...@@ -28,8 +28,10 @@ edx.analytics.tasks =
total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments total-enrollments-report = edx.analytics.tasks.reports.total_enrollments:WeeklyAllUsersAndEnrollments
inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments inc-enrollments-report = edx.analytics.tasks.reports.incremental_enrollments:WeeklyIncrementalUsersAndEnrollments
course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay course-enroll = edx.analytics.tasks.course_enroll:CourseEnrollmentChangesPerDay
answer_dist = edx.analytics.tasks.answer_dist:AnswerDistributionPerCourse answer-dist = edx.analytics.tasks.answer_dist:AnswerDistributionPerCourse
sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql sqoop-import = edx.analytics.tasks.sqoop:SqoopImportFromMysql
dump-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseTask
export-student-module = edx.analytics.tasks.database_exports:StudentModulePerCourseAfterImportWorkflow
mapreduce.engine = mapreduce.engine =
hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment