Commit 6f6c7605 by Brian Wilson Committed by Andrew Zafft

Fix isort and pep8 errors.

There were two cases of "E731 do not assign a lambda expression, use a
def", but I just added E731 to the exclusion list rather than
changing that.
parent baac6d29
...@@ -28,6 +28,7 @@ except ImportError: ...@@ -28,6 +28,7 @@ except ImportError:
RETRY_LIMIT = 500 RETRY_LIMIT = 500
WAIT_DURATION = 5 WAIT_DURATION = 5
def wait_for_job(job, check_error_result=True): def wait_for_job(job, check_error_result=True):
counter = 0 counter = 0
while True: while True:
...@@ -60,13 +61,13 @@ class BigQueryTarget(luigi.Target): ...@@ -60,13 +61,13 @@ class BigQueryTarget(luigi.Target):
self.create_marker_table() self.create_marker_table()
dataset = self.client.dataset(self.dataset_id) dataset = self.client.dataset(self.dataset_id)
table = dataset.table('table_updates') table = dataset.table('table_updates')
table.reload() # Load the schema table.reload() # Load the schema
# Use a tempfile for loading data into table_updates # Use a tempfile for loading data into table_updates
# We deliberately don't use table.insert_data as we cannot use delete on # We deliberately don't use table.insert_data as we cannot use delete on
# a bigquery table with streaming inserts. # a bigquery table with streaming inserts.
tmp = tempfile.NamedTemporaryFile(delete=False) tmp = tempfile.NamedTemporaryFile(delete=False)
table_update_row = (self.update_id, "{dataset}.{table}".format(dataset=self.dataset_id, table=self.table)) table_update_row = (self.update_id, "{dataset}.{table}".format(dataset=self.dataset_id, table=self.table))
tmp.write(','.join(table_update_row)) tmp.write(','.join(table_update_row))
tmp.close() tmp.close()
...@@ -138,6 +139,7 @@ class BigQueryTarget(luigi.Target): ...@@ -138,6 +139,7 @@ class BigQueryTarget(luigi.Target):
return len(query.rows) == 1 return len(query.rows) == 1
class BigQueryLoadDownstreamMixin(OverwriteOutputMixin): class BigQueryLoadDownstreamMixin(OverwriteOutputMixin):
dataset_id = luigi.Parameter() dataset_id = luigi.Parameter()
......
...@@ -346,7 +346,7 @@ class MysqlInsertTask(MysqlInsertTaskMixin, luigi.Task): ...@@ -346,7 +346,7 @@ class MysqlInsertTask(MysqlInsertTaskMixin, luigi.Task):
# commit only if both operations completed successfully. # commit only if both operations completed successfully.
connection.commit() connection.commit()
except: except Exception:
connection.rollback() connection.rollback()
raise raise
finally: finally:
......
...@@ -287,7 +287,7 @@ class EventLogSelectionMixin(EventLogSelectionDownstreamMixin): ...@@ -287,7 +287,7 @@ class EventLogSelectionMixin(EventLogSelectionDownstreamMixin):
date_string = event_time.split("T")[0] date_string = event_time.split("T")[0]
if date_string < self.lower_bound_date_string or date_string >= self.upper_bound_date_string: if date_string < self.lower_bound_date_string or date_string >= self.upper_bound_date_string:
## self.incr_counter('Event', 'Discard Outside Date Interval', 1) # Slow: self.incr_counter('Event', 'Discard Outside Date Interval', 1)
return None return None
return event, date_string return event, date_string
......
...@@ -471,7 +471,6 @@ ENABLE;""".format(schema=self.schema, table=self.table, column=column, expressio ...@@ -471,7 +471,6 @@ ENABLE;""".format(schema=self.schema, table=self.table, column=column, expressio
else: else:
raise raise
def run(self): def run(self):
""" """
Inserts data generated by the copy command into target table. Inserts data generated by the copy command into target table.
......
...@@ -11,8 +11,10 @@ from itertools import groupby ...@@ -11,8 +11,10 @@ from itertools import groupby
from operator import itemgetter from operator import itemgetter
import luigi import luigi
from luigi.hive import HiveQueryTask
from edx.analytics.tasks.common.mapreduce import MapReduceJobTask, MapReduceJobTaskMixin, MultiOutputMapReduceJobTask from edx.analytics.tasks.common.mapreduce import MapReduceJobTask, MapReduceJobTaskMixin, MultiOutputMapReduceJobTask
from edx.analytics.tasks.common.mysql_load import MysqlInsertTask
from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin, EventLogSelectionMixin
from edx.analytics.tasks.insights.calendar_task import CalendarTableTask from edx.analytics.tasks.insights.calendar_task import CalendarTableTask
from edx.analytics.tasks.insights.database_imports import ( from edx.analytics.tasks.insights.database_imports import (
...@@ -21,19 +23,12 @@ from edx.analytics.tasks.insights.database_imports import ( ...@@ -21,19 +23,12 @@ from edx.analytics.tasks.insights.database_imports import (
from edx.analytics.tasks.insights.enrollments import CourseEnrollmentPartitionTask from edx.analytics.tasks.insights.enrollments import CourseEnrollmentPartitionTask
from edx.analytics.tasks.util import eventlog from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.hive import ( from edx.analytics.tasks.util.hive import (
BareHiveTableTask, BareHiveTableTask, HivePartition, HivePartitionTask, HiveTableFromQueryTask, HiveTableTask, WarehouseMixin,
HivePartition,
HiveTableFromQueryTask,
HiveTableTask,
WarehouseMixin,
HivePartitionTask,
hive_database_name hive_database_name
) )
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.record import DateField, IntegerField, Record, StringField
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join from edx.analytics.tasks.util.url import get_target_from_url, url_path_join
from edx.analytics.tasks.common.mysql_load import MysqlInsertTask
from edx.analytics.tasks.util.record import Record, StringField, IntegerField, DateField
from luigi.hive import HiveQueryTask
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -46,7 +41,7 @@ class StudentEngagementIntervalTypeRecord(Record): ...@@ -46,7 +41,7 @@ class StudentEngagementIntervalTypeRecord(Record):
""" """
end_date = DateField(description='End date of the interval being analyzed.') end_date = DateField(description='End date of the interval being analyzed.')
course_id = StringField( nullable=False, length=255, description='Identifier of course run.') course_id = StringField(nullable=False, length=255, description='Identifier of course run.')
username = StringField( username = StringField(
nullable=False, nullable=False,
length=255, length=255,
...@@ -655,7 +650,7 @@ class StudentEngagementDataTask(StudentEngagementTableDownstreamMixin, HiveQuery ...@@ -655,7 +650,7 @@ class StudentEngagementDataTask(StudentEngagementTableDownstreamMixin, HiveQuery
end_date=self.interval.date_b.isoformat() end_date=self.interval.date_b.isoformat()
) )
def query(self): # pragma: no cover def query(self): # pragma: no cover
full_insert_query = """ full_insert_query = """
USE {database_name}; USE {database_name};
......
...@@ -18,10 +18,12 @@ log = logging.getLogger(__name__) ...@@ -18,10 +18,12 @@ log = logging.getLogger(__name__)
class DatabaseImportMixin(SqoopImportMixin): class DatabaseImportMixin(SqoopImportMixin):
"""Provides parameters for accessing RDBMS databases and determining date to assign to Hive partition.""" """Provides parameters for accessing RDBMS databases and determining date to assign to Hive partition."""
import_date = luigi.DateParameter( import_date = luigi.DateParameter(
default=None, default=None,
description='Date to assign to Hive partition. Default is today\'s date, UTC.', description='Date to assign to Hive partition. Default is today\'s date, UTC.',
) )
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(DatabaseImportMixin, self).__init__(*args, **kwargs) super(DatabaseImportMixin, self).__init__(*args, **kwargs)
......
...@@ -815,10 +815,10 @@ class EnrollmentByGenderDataTask(CourseEnrollmentDownstreamMixin, OverwriteAware ...@@ -815,10 +815,10 @@ class EnrollmentByGenderDataTask(CourseEnrollmentDownstreamMixin, OverwriteAware
def hive_partition_task(self): # pragma: no cover def hive_partition_task(self): # pragma: no cover
"""The task that creates the partition used by this job.""" """The task that creates the partition used by this job."""
return EnrollmentByGenderHivePartitionTask( return EnrollmentByGenderHivePartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite, overwrite=self.overwrite,
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(EnrollmentByGenderDataTask, self).requires(): for requirement in super(EnrollmentByGenderDataTask, self).requires():
...@@ -884,6 +884,7 @@ class EnrollmentByGenderMysqlTask(OverwriteHiveAndMysqlDownstreamMixin, CourseEn ...@@ -884,6 +884,7 @@ class EnrollmentByGenderMysqlTask(OverwriteHiveAndMysqlDownstreamMixin, CourseEn
('course_id', 'date'), ('course_id', 'date'),
] ]
class EnrollmentByBirthYearRecord(Record): class EnrollmentByBirthYearRecord(Record):
"""Summarizes a course's enrollments by birth year and date.""" """Summarizes a course's enrollments by birth year and date."""
date = StringField(length=255, nullable=False, description='Enrollment date.') date = StringField(length=255, nullable=False, description='Enrollment date.')
...@@ -957,10 +958,10 @@ class EnrollmentByBirthYearDataTask(CourseEnrollmentDownstreamMixin, OverwriteAw ...@@ -957,10 +958,10 @@ class EnrollmentByBirthYearDataTask(CourseEnrollmentDownstreamMixin, OverwriteAw
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_enrollment_birth_year_daily`.""" """Returns Task that creates partition on `course_enrollment_birth_year_daily`."""
return EnrollmentByBirthYearPartitionTask( return EnrollmentByBirthYearPartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite overwrite=self.overwrite
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(EnrollmentByBirthYearDataTask, self).requires(): for requirement in super(EnrollmentByBirthYearDataTask, self).requires():
...@@ -1045,6 +1046,7 @@ class EnrollmentByEducationLevelTableTask(BareHiveTableTask): # pragma: no cove ...@@ -1045,6 +1046,7 @@ class EnrollmentByEducationLevelTableTask(BareHiveTableTask): # pragma: no cove
def columns(self): def columns(self):
return EnrollmentByEducationLevelRecord.get_hive_schema() return EnrollmentByEducationLevelRecord.get_hive_schema()
class EnrollmentByEducationLevelPartitionTask(HivePartitionTask): # pragma: no cover class EnrollmentByEducationLevelPartitionTask(HivePartitionTask): # pragma: no cover
"""Creates storage partition for the `course_enrollment_education_level_daily` Hive table.""" """Creates storage partition for the `course_enrollment_education_level_daily` Hive table."""
...@@ -1117,10 +1119,10 @@ class EnrollmentByEducationLevelDataTask(CourseEnrollmentDownstreamMixin, Overwr ...@@ -1117,10 +1119,10 @@ class EnrollmentByEducationLevelDataTask(CourseEnrollmentDownstreamMixin, Overwr
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_enrollment_education_level_daily`.""" """Returns Task that creates partition on `course_enrollment_education_level_daily`."""
return EnrollmentByEducationLevelPartitionTask( return EnrollmentByEducationLevelPartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite overwrite=self.overwrite
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(EnrollmentByEducationLevelDataTask, self).requires(): for requirement in super(EnrollmentByEducationLevelDataTask, self).requires():
...@@ -1145,7 +1147,8 @@ class EnrollmentByEducationLevelDataTask(CourseEnrollmentDownstreamMixin, Overwr ...@@ -1145,7 +1147,8 @@ class EnrollmentByEducationLevelDataTask(CourseEnrollmentDownstreamMixin, Overwr
class EnrollmentByEducationLevelMysqlTask( class EnrollmentByEducationLevelMysqlTask(
OverwriteHiveAndMysqlDownstreamMixin, OverwriteHiveAndMysqlDownstreamMixin,
CourseEnrollmentDownstreamMixin, CourseEnrollmentDownstreamMixin,
MysqlInsertTask): MysqlInsertTask
):
""" """
Breakdown of enrollments by education level as reported by the user. Breakdown of enrollments by education level as reported by the user.
...@@ -1260,10 +1263,10 @@ class EnrollmentByModeDataTask(CourseEnrollmentDownstreamMixin, OverwriteAwareHi ...@@ -1260,10 +1263,10 @@ class EnrollmentByModeDataTask(CourseEnrollmentDownstreamMixin, OverwriteAwareHi
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_enrollment_mode_daily`.""" """Returns Task that creates partition on `course_enrollment_mode_daily`."""
return EnrollmentByModePartitionTask( return EnrollmentByModePartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite overwrite=self.overwrite
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(EnrollmentByModeDataTask, self).requires(): for requirement in super(EnrollmentByModeDataTask, self).requires():
...@@ -1396,10 +1399,10 @@ class EnrollmentDailyDataTask(CourseEnrollmentDownstreamMixin, OverwriteAwareHiv ...@@ -1396,10 +1399,10 @@ class EnrollmentDailyDataTask(CourseEnrollmentDownstreamMixin, OverwriteAwareHiv
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_enrollment_daily`.""" """Returns Task that creates partition on `course_enrollment_daily`."""
return EnrollmentDailyPartitionTask( return EnrollmentDailyPartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite, overwrite=self.overwrite,
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(EnrollmentDailyDataTask, self).requires(): for requirement in super(EnrollmentDailyDataTask, self).requires():
...@@ -1532,7 +1535,8 @@ class CourseMetaSummaryEnrollmentDataTask( ...@@ -1532,7 +1535,8 @@ class CourseMetaSummaryEnrollmentDataTask(
OverwriteHiveAndMysqlDownstreamMixin, OverwriteHiveAndMysqlDownstreamMixin,
CourseSummaryEnrollmentDownstreamMixin, CourseSummaryEnrollmentDownstreamMixin,
LoadInternalReportingCourseCatalogMixin, LoadInternalReportingCourseCatalogMixin,
OverwriteAwareHiveQueryDataTask): # pragma: no cover OverwriteAwareHiveQueryDataTask
): # pragma: no cover
""" """
Aggregates data from the various course_enrollment tables into `course_meta_summary_enrollment` Hive table. Aggregates data from the various course_enrollment tables into `course_meta_summary_enrollment` Hive table.
...@@ -1585,10 +1589,10 @@ class CourseMetaSummaryEnrollmentDataTask( ...@@ -1585,10 +1589,10 @@ class CourseMetaSummaryEnrollmentDataTask(
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_meta_summary_enrollment`.""" """Returns Task that creates partition on `course_meta_summary_enrollment`."""
return CourseMetaSummaryEnrollmentPartitionTask( return CourseMetaSummaryEnrollmentPartitionTask(
date=self.interval.date_b, date=self.interval.date_b,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite_hive, overwrite=self.overwrite_hive,
) )
def requires(self): # pragma: no cover def requires(self): # pragma: no cover
for requirement in super(CourseMetaSummaryEnrollmentDataTask, self).requires(): for requirement in super(CourseMetaSummaryEnrollmentDataTask, self).requires():
...@@ -1654,7 +1658,8 @@ class CourseMetaSummaryEnrollmentDataTask( ...@@ -1654,7 +1658,8 @@ class CourseMetaSummaryEnrollmentDataTask(
class CourseMetaSummaryEnrollmentIntoMysql( class CourseMetaSummaryEnrollmentIntoMysql(
OverwriteHiveAndMysqlDownstreamMixin, OverwriteHiveAndMysqlDownstreamMixin,
CourseSummaryEnrollmentDownstreamMixin, CourseSummaryEnrollmentDownstreamMixin,
MysqlInsertTask): MysqlInsertTask
):
""" """
Creates the course_meta_summary_enrollment sql table. Creates the course_meta_summary_enrollment sql table.
...@@ -1754,10 +1759,10 @@ class CourseProgramMetadataDataTask(CourseSummaryEnrollmentDownstreamMixin, Over ...@@ -1754,10 +1759,10 @@ class CourseProgramMetadataDataTask(CourseSummaryEnrollmentDownstreamMixin, Over
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_program_metadata`.""" """Returns Task that creates partition on `course_program_metadata`."""
return CourseProgramMetadataPartitionTask( return CourseProgramMetadataPartitionTask(
date=self.date, date=self.date,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite, overwrite=self.overwrite,
) )
def requires(self): def requires(self):
for requirement in super(CourseProgramMetadataDataTask, self).requires(): for requirement in super(CourseProgramMetadataDataTask, self).requires():
...@@ -1887,10 +1892,10 @@ class CourseGradeByModeDataTask(CourseSummaryEnrollmentDownstreamMixin, Overwrit ...@@ -1887,10 +1892,10 @@ class CourseGradeByModeDataTask(CourseSummaryEnrollmentDownstreamMixin, Overwrit
def hive_partition_task(self): def hive_partition_task(self):
"""Returns Task that creates partition on `course_grade_by_mode`.""" """Returns Task that creates partition on `course_grade_by_mode`."""
return CourseGradeByModePartitionTask( return CourseGradeByModePartitionTask(
date=self.date, date=self.date,
warehouse_path=self.warehouse_path, warehouse_path=self.warehouse_path,
overwrite=self.overwrite, overwrite=self.overwrite,
) )
def requires(self): def requires(self):
for requirement in super(CourseGradeByModeDataTask, self).requires(): for requirement in super(CourseGradeByModeDataTask, self).requires():
......
...@@ -33,7 +33,6 @@ except ImportError: ...@@ -33,7 +33,6 @@ except ImportError:
numpy = None # pylint: disable=invalid-name numpy = None # pylint: disable=invalid-name
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -181,7 +180,7 @@ class ModuleEngagementDataTask(EventLogSelectionMixin, OverwriteOutputMixin, Map ...@@ -181,7 +180,7 @@ class ModuleEngagementDataTask(EventLogSelectionMixin, OverwriteOutputMixin, Map
elif event_type == 'play_video': elif event_type == 'play_video':
entity_type = 'video' entity_type = 'video'
user_actions.append('viewed') user_actions.append('viewed')
entity_id = event_data.get('id', '').strip() # we have seen id values with leading newlines entity_id = event_data.get('id', '').strip() # We have seen id values with leading newlines.
elif event_type.startswith('edx.forum.'): elif event_type.startswith('edx.forum.'):
entity_type = 'discussion' entity_type = 'discussion'
if event_type.endswith('.created'): if event_type.endswith('.created'):
......
...@@ -6,9 +6,10 @@ from edx.analytics.tasks.insights.enrollments import CourseGradeByModeDataTask, ...@@ -6,9 +6,10 @@ from edx.analytics.tasks.insights.enrollments import CourseGradeByModeDataTask,
class TestCourseGradeByModeDataTask(TestCase): class TestCourseGradeByModeDataTask(TestCase):
def test_requires(self): def test_requires(self):
"""The CourseGradeByModeDataTask should require the CourseGradeByModePartitionTask # The CourseGradeByModeDataTask should require the CourseGradeByModePartitionTask
and the ImportPersistentCourseGradeTask.""" # and the ImportPersistentCourseGradeTask.
a_date = datetime(2017, 1, 1) a_date = datetime(2017, 1, 1)
the_warehouse_path = '/tmp/foo' the_warehouse_path = '/tmp/foo'
data_task = CourseGradeByModeDataTask(date=a_date, warehouse_path=the_warehouse_path) data_task = CourseGradeByModeDataTask(date=a_date, warehouse_path=the_warehouse_path)
......
...@@ -41,7 +41,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te ...@@ -41,7 +41,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te
}, },
"time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP), "time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP),
"ip": "127.0.0.1", "ip": "127.0.0.1",
"event": '{"id": "%s", "currentTime": 23.4398, "code": "87389iouhdfh", "duration": %s}' %( "event": '{"id": "%s", "currentTime": 23.4398, "code": "87389iouhdfh", "duration": %s}' % (
self.video_id, self.video_id,
self.video_duration self.video_duration
), ),
...@@ -60,7 +60,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te ...@@ -60,7 +60,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te
}, },
"time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP), "time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP),
"ip": "127.0.0.1", "ip": "127.0.0.1",
"event": '{"id": "%s", "currentTime": 28, "code": "87389iouhdfh", "duration": %s}' %( "event": '{"id": "%s", "currentTime": 28, "code": "87389iouhdfh", "duration": %s}' % (
self.video_id, self.video_id,
self.video_duration self.video_duration
), ),
...@@ -79,7 +79,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te ...@@ -79,7 +79,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te
}, },
"time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP), "time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP),
"ip": "127.0.0.1", "ip": "127.0.0.1",
"event": '{"id": "%s", "currentTime": 100, "code": "87389iouhdfh", "duration": %s}' %( "event": '{"id": "%s", "currentTime": 100, "code": "87389iouhdfh", "duration": %s}' % (
self.video_id, self.video_id,
self.video_duration self.video_duration
), ),
...@@ -98,7 +98,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te ...@@ -98,7 +98,7 @@ class UserVideoViewingTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMixin, Te
}, },
"time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP), "time": "{0}+00:00".format(self.DEFAULT_TIMESTAMP),
"ip": "127.0.0.1", "ip": "127.0.0.1",
"event": '{"id": "%s", "old_time": 14, "new_time": 10, "code": "87389iouhdfh", "duration": %s}' %( "event": '{"id": "%s", "old_time": 14, "new_time": 10, "code": "87389iouhdfh", "duration": %s}' % (
self.video_id, self.video_id,
self.video_duration self.video_duration
), ),
......
...@@ -42,7 +42,6 @@ class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMi ...@@ -42,7 +42,6 @@ class UserActivityTask(OverwriteOutputMixin, WarehouseMixin, EventLogSelectionMi
output_root = None output_root = None
def mapper(self, line): def mapper(self, line):
value = self.get_event_and_date_string(line) value = self.get_event_and_date_string(line)
if value is None: if value is None:
......
...@@ -44,8 +44,12 @@ OVERRIDE_CONFIGURATION_FILE = 'override.cfg' ...@@ -44,8 +44,12 @@ OVERRIDE_CONFIGURATION_FILE = 'override.cfg'
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--additional-config', help='additional configuration file to be loaded after default/override', parser.add_argument(
default=None, action='append') '--additional-config',
help='additional configuration file to be loaded after default/override',
default=None,
action='append'
)
arguments, _extra_args = parser.parse_known_args() arguments, _extra_args = parser.parse_known_args()
# We get a cleaned command-line arguments list, free of the arguments *we* care about, since Luigi will throw # We get a cleaned command-line arguments list, free of the arguments *we* care about, since Luigi will throw
...@@ -76,8 +80,7 @@ def main(): ...@@ -76,8 +80,7 @@ def main():
else: else:
log.debug('Configuration file \'%s\' does not exist!', additional_config) log.debug('Configuration file \'%s\' does not exist!', additional_config)
# Tell luigi what dependencies to pass to the Hadoop nodes:
# Tell luigi what dependencies to pass to the Hadoop nodes
# - edx.analytics.tasks is used to load the pipeline code, since we cannot trust all will be loaded automatically. # - edx.analytics.tasks is used to load the pipeline code, since we cannot trust all will be loaded automatically.
# - boto is used for all direct interactions with s3. # - boto is used for all direct interactions with s3.
# - cjson is used for all parsing event logs. # - cjson is used for all parsing event logs.
...@@ -103,19 +106,19 @@ def main(): ...@@ -103,19 +106,19 @@ def main():
def get_cleaned_command_line_args(): def get_cleaned_command_line_args():
""" """
Gets a list of command-line arguments after removing local launcher-specific parameters. Gets a list of command-line arguments after removing local launcher-specific parameters.
""" """
arg_list = sys.argv[1:] arg_list = sys.argv[1:]
modified_arg_list = arg_list modified_arg_list = arg_list
for i, v in enumerate(arg_list): for i, v in enumerate(arg_list):
if v == '--additional-config': if v == '--additional-config':
# Clear out the flag, and clear out the value attached to it. # Clear out the flag, and clear out the value attached to it.
modified_arg_list[i] = None modified_arg_list[i] = None
modified_arg_list[i+1] = None modified_arg_list[i + 1] = None
return list(filter(lambda x: x is not None, modified_arg_list)) return list(filter(lambda x: x is not None, modified_arg_list))
@contextmanager @contextmanager
......
...@@ -20,6 +20,7 @@ REMOTE_LOG_DIR = '/var/log/analytics-tasks' ...@@ -20,6 +20,7 @@ REMOTE_LOG_DIR = '/var/log/analytics-tasks'
REMOTE_CONFIG_DIR_BASE = 'config' REMOTE_CONFIG_DIR_BASE = 'config'
REMOTE_CODE_DIR_BASE = 'repo' REMOTE_CODE_DIR_BASE = 'repo'
def main(): def main():
"""Parse arguments and run the remote task.""" """Parse arguments and run the remote task."""
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -57,13 +58,13 @@ def main(): ...@@ -57,13 +58,13 @@ def main():
# Push in any secure config values that we got. # Push in any secure config values that we got.
if arguments.secure_config: if arguments.secure_config:
for config_path in arguments.secure_config: for config_path in arguments.secure_config:
# We construct an absolute path here because the parameter that comes in is simply # We construct an absolute path here because the parameter that comes in is simply
# relative to the checkout of the configuration repository, but the local scheduler # relative to the checkout of the configuration repository, but the local scheduler
# shouldn't have to know that, which in turn makes --additional-config agnostic of # shouldn't have to know that, which in turn makes --additional-config agnostic of
# how we're using it for edX's purposes (with a repository). # how we're using it for edX's purposes (with a repository).
arguments.launch_task_arguments.append('--additional-config') arguments.launch_task_arguments.append('--additional-config')
arguments.launch_task_arguments.append(os.path.join(REMOTE_DATA_DIR, uid, REMOTE_CONFIG_DIR_BASE, config_path)) arguments.launch_task_arguments.append(os.path.join(REMOTE_DATA_DIR, uid, REMOTE_CONFIG_DIR_BASE, config_path))
if arguments.vagrant_path: if arguments.vagrant_path:
parse_vagrant_ssh_config(arguments) parse_vagrant_ssh_config(arguments)
......
...@@ -28,7 +28,7 @@ class DatabaseService(object): ...@@ -28,7 +28,7 @@ class DatabaseService(object):
with closing(conn.cursor()) as cur: with closing(conn.cursor()) as cur:
try: try:
yield cur yield cur
except: except Exception:
conn.rollback() conn.rollback()
raise raise
else: else:
......
...@@ -34,7 +34,7 @@ class VerticaService(object): ...@@ -34,7 +34,7 @@ class VerticaService(object):
with closing(conn.cursor()) as cur: with closing(conn.cursor()) as cur:
try: try:
yield cur yield cur
except: except Exception:
conn.rollback() conn.rollback()
raise raise
else: else:
......
...@@ -73,7 +73,7 @@ def analyze_log_file(filename): ...@@ -73,7 +73,7 @@ def analyze_log_file(filename):
parser = LogFileParser(log_file, message_pattern=MESSAGE_START_PATTERN, message_factory=create_log_message) parser = LogFileParser(log_file, message_pattern=MESSAGE_START_PATTERN, message_factory=create_log_message)
try: try:
return analyze_log(parser) return analyze_log(parser)
except: except Exception:
sys.stderr.write('Exception on line {0}\n'.format(parser.line_number)) sys.stderr.write('Exception on line {0}\n'.format(parser.line_number))
raise raise
...@@ -188,9 +188,9 @@ def analyze_hadoop_job(starting_message, parser): ...@@ -188,9 +188,9 @@ def analyze_hadoop_job(starting_message, parser):
while message: while message:
message = parser.next_message() message = parser.next_message()
if ('Job complete:' in message.content or job_complete = ('Job complete:' in message.content or 'completed successfully' in message.content)
'completed successfully' in message.content or 'Ended Job = ' in message.content): if job_complete or 'Ended Job = ' in message.content:
if 'Job complete:' in message.content or 'completed successfully' in message.content: if job_complete:
move_measure = analyze_output_move(parser) move_measure = analyze_output_move(parser)
if move_measure: if move_measure:
yield move_measure yield move_measure
...@@ -254,5 +254,6 @@ def sqoop_parameter_parser(raw_params): ...@@ -254,5 +254,6 @@ def sqoop_parameter_parser(raw_params):
if table_param_match: if table_param_match:
return {'table': table_param_match.group('name')} return {'table': table_param_match.group('name')}
if __name__ == '__main__': if __name__ == '__main__':
analyze() analyze()
...@@ -150,6 +150,7 @@ def create_directory(output_dir): ...@@ -150,6 +150,7 @@ def create_directory(output_dir):
elif exc.errno != errno.EEXIST or os.path.isdir(output_dir): elif exc.errno != errno.EEXIST or os.path.isdir(output_dir):
raise raise
# These event_type values are known to have the possibility that the # These event_type values are known to have the possibility that the
# user_id in context be different from the user_id in event payload. # user_id in context be different from the user_id in event payload.
# In these cases, the context user_id represents the user performing the # In these cases, the context user_id represents the user performing the
......
...@@ -17,6 +17,7 @@ key_cache = {} # pylint: disable=invalid-name ...@@ -17,6 +17,7 @@ key_cache = {} # pylint: disable=invalid-name
DEFAULT_HADOOP_COUNTER_FUNC = lambda x: None DEFAULT_HADOOP_COUNTER_FUNC = lambda x: None
def get_key_from_target(key_file_target): def get_key_from_target(key_file_target):
"""Get the contents of the key file pointed to by the target""" """Get the contents of the key file pointed to by the target"""
......
...@@ -24,7 +24,7 @@ def copy_file_to_file(src_file, output_file, progress=None): ...@@ -24,7 +24,7 @@ def copy_file_to_file(src_file, output_file, progress=None):
if progress: if progress:
try: try:
progress(len(transfer_buffer)) progress(len(transfer_buffer))
except: # pylint: disable=bare-except except Exception: # pylint: disable=bare-except
pass pass
else: else:
break break
......
...@@ -419,6 +419,7 @@ def find_email_context(text, log_context=DEFAULT_LOG_CONTEXT): ...@@ -419,6 +419,7 @@ def find_email_context(text, log_context=DEFAULT_LOG_CONTEXT):
"""Development: Find context phrases that might indicate the presence of an email address nearby.""" """Development: Find context phrases that might indicate the presence of an email address nearby."""
return find_all_matches(EMAIL_CONTEXT, text, "EMAIL_CONTEXT", log_context) return find_all_matches(EMAIL_CONTEXT, text, "EMAIL_CONTEXT", log_context)
# Find names. # Find names.
NAME_CONTEXT = re.compile( NAME_CONTEXT = re.compile(
r'\b(hi|hello|sincerely|yours truly|Dear|Mr|Ms|Mrs|regards|cordially|best wishes|cheers|my name)\b', r'\b(hi|hello|sincerely|yours truly|Dear|Mr|Ms|Mrs|regards|cordially|best wishes|cheers|my name)\b',
......
...@@ -375,7 +375,7 @@ class Record(object): ...@@ -375,7 +375,7 @@ class Record(object):
schema = [] schema = []
for field_name, field_obj in cls.get_fields().items(): for field_name, field_obj in cls.get_fields().items():
mode = 'NULLABLE' if field_obj.nullable else 'REQUIRED' mode = 'NULLABLE' if field_obj.nullable else 'REQUIRED'
description=getattr(field_obj, 'description', None) description = getattr(field_obj, 'description', None)
schema.append(SchemaField(field_name, field_obj.bigquery_type, description=description, mode=mode)) schema.append(SchemaField(field_name, field_obj.bigquery_type, description=description, mode=mode))
return schema return schema
......
...@@ -224,7 +224,7 @@ class OrderTableTask(DatabaseImportMixin, HiveTableFromQueryTask): ...@@ -224,7 +224,7 @@ class OrderTableTask(DatabaseImportMixin, HiveTableFromQueryTask):
-- Partner information -- Partner information
LEFT OUTER JOIN partner_partner partner ON partner.id = ol.partner_id LEFT OUTER JOIN partner_partner partner ON partner.id = ol.partner_id
-- Get course entitlement data -- Get course entitlement data
LEFT OUTER JOIN entitlements_courseentitlement entitlements ON entitlements.order_number = o.number LEFT OUTER JOIN entitlements_courseentitlement entitlements ON entitlements.order_number = o.number
LEFT OUTER JOIN student_courseenrollment enrollments ON enrollments.id = entitlements.enrollment_course_run_id LEFT OUTER JOIN student_courseenrollment enrollments ON enrollments.id = entitlements.enrollment_course_run_id
...@@ -301,7 +301,7 @@ class OrderTableTask(DatabaseImportMixin, HiveTableFromQueryTask): ...@@ -301,7 +301,7 @@ class OrderTableTask(DatabaseImportMixin, HiveTableFromQueryTask):
-- The partner short code is extracted from the course ID during order reconciliation. -- The partner short code is extracted from the course ID during order reconciliation.
'' AS partner_short_code, '' AS partner_short_code,
-- These fields are not relevant to shoppingcart orders -- These fields are not relevant to shoppingcart orders
NULL AS course_uuid, NULL AS course_uuid,
NULL AS expiration_date NULL AS expiration_date
......
...@@ -662,8 +662,8 @@ class CourseDataTask(BaseCourseRunMetadataTask): ...@@ -662,8 +662,8 @@ class CourseDataTask(BaseCourseRunMetadataTask):
marketing_url=course_run.get('marketing_url'), marketing_url=course_run.get('marketing_url'),
min_effort=course_run.get('min_effort'), min_effort=course_run.get('min_effort'),
max_effort=course_run.get('max_effort'), max_effort=course_run.get('max_effort'),
announcement_time = DateTimeField().deserialize_from_string(course_run.get('announcement')), announcement_time=DateTimeField().deserialize_from_string(course_run.get('announcement')),
reporting_type = course_run.get('reporting_type'), reporting_type=course_run.get('reporting_type'),
) )
output_file.write(record.to_separated_values(sep=u'\t')) output_file.write(record.to_separated_values(sep=u'\t'))
output_file.write('\n') output_file.write('\n')
......
...@@ -871,7 +871,6 @@ class TrackingEventRecordDataTask(EventLogSelectionMixin, BaseEventRecordDataTas ...@@ -871,7 +871,6 @@ class TrackingEventRecordDataTask(EventLogSelectionMixin, BaseEventRecordDataTas
project_name = self.PROJECT_NAME project_name = self.PROJECT_NAME
event_dict = {} event_dict = {}
self.add_calculated_event_entry(event_dict, 'input_file', self.get_map_input_file()) self.add_calculated_event_entry(event_dict, 'input_file', self.get_map_input_file())
self.add_calculated_event_entry(event_dict, 'event_type', event_type) self.add_calculated_event_entry(event_dict, 'event_type', event_type)
...@@ -1079,7 +1078,7 @@ class SegmentEventRecordDataTask(SegmentEventLogSelectionMixin, BaseEventRecordD ...@@ -1079,7 +1078,7 @@ class SegmentEventRecordDataTask(SegmentEventLogSelectionMixin, BaseEventRecordD
pass pass
# Skip values that are explicitly set or calculated for JSONEventRecord: # Skip values that are explicitly set or calculated for JSONEventRecord:
elif field_key in ['emitter_type', 'source', 'raw_event']: elif field_key in ['emitter_type', 'source', 'raw_event']:
pass pass
# Map values that are top-level: # Map values that are top-level:
elif field_key in ['channel']: elif field_key in ['channel']:
add_event_mapping_entry(u"root.{}".format(field_key)) add_event_mapping_entry(u"root.{}".format(field_key))
...@@ -1233,7 +1232,6 @@ class SegmentEventRecordDataTask(SegmentEventLogSelectionMixin, BaseEventRecordD ...@@ -1233,7 +1232,6 @@ class SegmentEventRecordDataTask(SegmentEventLogSelectionMixin, BaseEventRecordD
self.add_calculated_event_entry(event_dict, 'event_source', event_source) self.add_calculated_event_entry(event_dict, 'event_source', event_source)
self.add_calculated_event_entry(event_dict, 'event_category', event_category) self.add_calculated_event_entry(event_dict, 'event_category', event_category)
event_mapping = self.get_event_mapping() event_mapping = self.get_event_mapping()
self.add_event_info(event_dict, event_mapping, event) self.add_event_info(event_dict, event_mapping, event)
......
...@@ -102,7 +102,7 @@ class TestCourseSubjects(TestCase): ...@@ -102,7 +102,7 @@ class TestCourseSubjects(TestCase):
def test_course_no_subjects(self): def test_course_no_subjects(self):
"""With a course with no subjects, we expect a row with NULLs.""" """With a course with no subjects, we expect a row with NULLs."""
course_with_no_subjects = [{"course_runs": [ {"key": "foo"} ], "subjects": [{}]}] course_with_no_subjects = [{"course_runs": [{"key": "foo"}], "subjects": [{}]}]
data = self.run_task(course_with_no_subjects) data = self.run_task(course_with_no_subjects)
# We expect an entry in the list of courses, since there is a course in the catalog. # We expect an entry in the list of courses, since there is a course in the catalog.
self.assertEquals(data.shape[0], 1) self.assertEquals(data.shape[0], 1)
...@@ -119,11 +119,11 @@ class TestCourseSubjects(TestCase): ...@@ -119,11 +119,11 @@ class TestCourseSubjects(TestCase):
def test_course_with_one_subject(self): def test_course_with_one_subject(self):
"""With a course with one subject, we expect to see that subject.""" """With a course with one subject, we expect to see that subject."""
input_data = [ input_data = [
{ {
"course_runs": [ {"key": "foo"} ], "course_runs": [{"key": "foo"}],
"subjects": [{"slug": "testing", "name": "Testing"}] "subjects": [{"slug": "testing", "name": "Testing"}]
} }
] ]
data = self.run_task(input_data) data = self.run_task(input_data)
# We expect to see this course with the mock_subject information. # We expect to see this course with the mock_subject information.
...@@ -140,14 +140,14 @@ class TestCourseSubjects(TestCase): ...@@ -140,14 +140,14 @@ class TestCourseSubjects(TestCase):
def test_course_with_two_subjects(self): def test_course_with_two_subjects(self):
"""With a course with two subjects, we expect to see both of those subjects.""" """With a course with two subjects, we expect to see both of those subjects."""
input_data = [ input_data = [
{ {
"course_runs": [ {"key": "foo"} ], "course_runs": [{"key": "foo"}],
"subjects": [ "subjects": [
{"slug": "testing", "name": "Testing"}, {"slug": "testing", "name": "Testing"},
{"slug": "bar", "name": "Bar"}, {"slug": "bar", "name": "Bar"},
] ]
} }
] ]
data = self.run_task(input_data) data = self.run_task(input_data)
# We expect to see this course with two subjects of information. # We expect to see this course with two subjects of information.
...@@ -173,11 +173,11 @@ class TestCourseSubjects(TestCase): ...@@ -173,11 +173,11 @@ class TestCourseSubjects(TestCase):
def test_multiple_courses(self): def test_multiple_courses(self):
"""With multiple courses, we expect to see subject information for all of them.""" """With multiple courses, we expect to see subject information for all of them."""
input_data = [ input_data = [
{ {
"course_runs": [ {"key": "foo"}, {"key": "bar"} ], "course_runs": [{"key": "foo"}, {"key": "bar"}],
"subjects": [{"slug": "testing", "name": "Testing"}] "subjects": [{"slug": "testing", "name": "Testing"}]
}, },
] ]
data = self.run_task(input_data) data = self.run_task(input_data)
# We expect to see two courses. # We expect to see two courses.
self.assertEquals(data.shape[0], 2) self.assertEquals(data.shape[0], 2)
...@@ -201,15 +201,15 @@ class TestCourseSubjects(TestCase): ...@@ -201,15 +201,15 @@ class TestCourseSubjects(TestCase):
def test_multiple_courses(self): def test_multiple_courses(self):
"""With multiple courses, we expect to see subject information for all of them.""" """With multiple courses, we expect to see subject information for all of them."""
input_data = [ input_data = [
{ {
"course_runs": [ {"key": "foo"} ], "course_runs": [{"key": "foo"}],
"subjects": [{"slug": "testing", "name": "Testing"}] "subjects": [{"slug": "testing", "name": "Testing"}]
}, },
{ {
"course_runs": [ {"key": "bar"} ], "course_runs": [{"key": "bar"}],
"subjects": [{"slug": "testing", "name": "Testing"}] "subjects": [{"slug": "testing", "name": "Testing"}]
} }
] ]
data = self.run_task(input_data) data = self.run_task(input_data)
# We expect to see two courses. # We expect to see two courses.
self.assertEquals(data.shape[0], 2) self.assertEquals(data.shape[0], 2)
...@@ -237,14 +237,14 @@ class TestCourseSubjects(TestCase): ...@@ -237,14 +237,14 @@ class TestCourseSubjects(TestCase):
- null values if portions of the subject data are missing - null values if portions of the subject data are missing
""" """
input_data = [ input_data = [
{ {
"subjects": [{"slug": "testing", "name": "Testing"}] "subjects": [{"slug": "testing", "name": "Testing"}]
}, },
{ {
"course_runs": [ {"key": "bar"} ], "course_runs": [{"key": "bar"}],
"subjects": [{"slug": "testing"}] "subjects": [{"slug": "testing"}]
} }
] ]
data = self.run_task(input_data) data = self.run_task(input_data)
expected = { expected = {
'course_id': 'bar', 'course_id': 'bar',
......
...@@ -91,6 +91,7 @@ class BaseTrackingEventRecordTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMi ...@@ -91,6 +91,7 @@ class BaseTrackingEventRecordTaskMapTest(InitializeOpaqueKeysMixin, MapperTestMi
) )
self.task.init_local() self.task.init_local()
@ddt @ddt
class TrackingEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittest.TestCase): class TrackingEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittest.TestCase):
"""Test class for emission of tracking log events in EventRecord format.""" """Test class for emission of tracking log events in EventRecord format."""
...@@ -170,6 +171,7 @@ class TrackingEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittes ...@@ -170,6 +171,7 @@ class TrackingEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittes
expected_value = EventRecord(**expected_dict).to_separated_values() expected_value = EventRecord(**expected_dict).to_separated_values()
self.assert_single_map_output(event, expected_key, expected_value) self.assert_single_map_output(event, expected_key, expected_value)
@ddt @ddt
class TrackingJsonEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittest.TestCase): class TrackingJsonEventRecordTaskMapTest(BaseTrackingEventRecordTaskMapTest, unittest.TestCase):
"""Test class for emission of tracking log events in JsonEventRecord format.""" """Test class for emission of tracking log events in JsonEventRecord format."""
...@@ -412,8 +414,8 @@ class SegmentEventRecordTaskMapTest(BaseSegmentEventRecordTaskMapTest, unittest. ...@@ -412,8 +414,8 @@ class SegmentEventRecordTaskMapTest(BaseSegmentEventRecordTaskMapTest, unittest.
@data( @data(
({'receivedAt': "2013-12-17T15:38:32.805444Z", 'requestTime': "2014-12-18T15:38:32.805444Z"}, "2013-12-17T15:38:32.805444+00:00"), ({'receivedAt': "2013-12-17T15:38:32.805444Z", 'requestTime': "2014-12-18T15:38:32.805444Z"}, "2013-12-17T15:38:32.805444+00:00"),
({'requestTime': "2014-12-01T15:38:32.805444Z"}, '2014-12-01T15:38:32.805444+00:00'), # default to requestTime ({'requestTime': "2014-12-01T15:38:32.805444Z"}, '2014-12-01T15:38:32.805444+00:00'), # default to requestTime
({}, '2013-12-17T15:38:32.796000+00:00'), # default to timestamp ({}, '2013-12-17T15:38:32.796000+00:00'), # default to timestamp
) )
@unpack @unpack
def test_defaulting_arrival_timestamps(self, kwargs, expected_timestamp): def test_defaulting_arrival_timestamps(self, kwargs, expected_timestamp):
......
...@@ -99,5 +99,5 @@ mapreduce.engine = ...@@ -99,5 +99,5 @@ mapreduce.engine =
emu = edx.analytics.tasks.common.mapreduce:EmulatedMapReduceJobRunner emu = edx.analytics.tasks.common.mapreduce:EmulatedMapReduceJobRunner
[pycodestyle] [pycodestyle]
ignore=E501 ignore=E501,E731
max_line_length=119 max_line_length=119
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment