Commit 7935ba87 by Andrew Zafft

Removing HiveQueryToMysqlTask from the codebase and adding in controls for…

Removing HiveQueryToMysqlTask from the codebase and adding in controls for overwrite_hive and overwrite_mysql
parent b36b68a6
......@@ -101,6 +101,8 @@ Notes
* It *does not* require the "enrollment-reports" section. That section is used to generate static CSV reports.
* The interval here, should be the beginning of time essentially. It computes enrollment by observing state changes from the beginning of time.
* ``$FROM_DATE`` can be any string that is accepted by the unix utility ``date``. Here are a few examples: "today", "yesterday", and "2016-05-01".
* overwrite-mysql controls whether or not the MySQL tables are replaced in a transaction during processing. Set this flag if you are fully replacing the table, false (default) otherwise
* overwrite-hive controls whether or not the Hive intermediate table metadata is removed and replaced during processing. Set this flag if you want the metadata to be fully recreated, false (default) otherwise
Task
~~~~
......@@ -109,7 +111,9 @@ Task
ImportEnrollmentsIntoMysql --local-scheduler \
--interval $(date +%Y-%m-%d -d "$FROM_DATE")-$(date +%Y-%m-%d -d "$TO_DATE") \
--n-reduce-tasks $NUM_REDUCE_TASKS
--n-reduce-tasks $NUM_REDUCE_TASKS \
--overwrite-mysql \
--overwrite-hive
Incremental implementation
~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -18,7 +18,7 @@ from edx.analytics.tasks.insights.calendar_task import CalendarTableTask
from edx.analytics.tasks.insights.database_imports import (
ImportAuthUserTask, ImportCourseUserGroupTask, ImportCourseUserGroupUsersTask
)
from edx.analytics.tasks.insights.enrollments import CourseEnrollmentTableTask
from edx.analytics.tasks.insights.enrollments import CourseEnrollmentPartitionTask
from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.hive import (
BareHiveTableTask,
......@@ -455,7 +455,7 @@ class JoinedStudentEngagementTableTask(StudentEngagementTableDownstreamMixin, Hi
ImportAuthUserTask(**kwargs_for_db_import),
ImportCourseUserGroupTask(**kwargs_for_db_import),
ImportCourseUserGroupUsersTask(**kwargs_for_db_import),
CourseEnrollmentTableTask(**kwargs_for_enrollment),
CourseEnrollmentPartitionTask(**kwargs_for_enrollment),
)
# Only the weekly requires use of the calendar.
if self.interval_type == "weekly":
......
......@@ -19,7 +19,7 @@ from edx.analytics.tasks.common.pathutil import EventLogSelectionDownstreamMixin
from edx.analytics.tasks.insights.database_imports import (
ImportAuthUserProfileTask, ImportAuthUserTask, ImportCourseUserGroupTask, ImportCourseUserGroupUsersTask
)
from edx.analytics.tasks.insights.enrollments import ExternalCourseEnrollmentTableTask
from edx.analytics.tasks.insights.enrollments import ExternalCourseEnrollmentPartitionTask
from edx.analytics.tasks.util import eventlog
from edx.analytics.tasks.util.decorators import workflow_entry_point
from edx.analytics.tasks.util.hive import BareHiveTableTask, HivePartitionTask, WarehouseMixin, hive_database_name
......@@ -1216,7 +1216,7 @@ class ModuleEngagementRosterPartitionTask(WeekIntervalMixin, ModuleEngagementDow
overwrite=self.overwrite,
overwrite_from_date=self.overwrite_from_date,
),
ExternalCourseEnrollmentTableTask(
ExternalCourseEnrollmentPartitionTask(
interval_end=self.date
),
ImportAuthUserTask(**kwargs_for_db_import),
......
......@@ -9,7 +9,7 @@ import luigi
from edx.analytics.tasks.common.tests.map_reduce_mixins import MapperTestMixin, ReducerTestMixin
from edx.analytics.tasks.insights.enrollments import (
ACTIVATED, DEACTIVATED, MODE_CHANGED, CourseEnrollmentEventsTask, CourseEnrollmentSummaryTask, CourseEnrollmentTask,
ImportCourseSummaryEnrollmentsIntoMysql
CourseMetaSummaryEnrollmentIntoMysql
)
from edx.analytics.tasks.util.tests.opaque_key_mixins import InitializeLegacyKeysMixin, InitializeOpaqueKeysMixin
......@@ -562,13 +562,14 @@ class CourseEnrollmentSummaryTaskReducerTest(ReducerTestMixin, TestCase):
class TestImportCourseSummaryEnrollmentsIntoMysql(TestCase):
"""Test that the correct columns are in the Course Summary Enrollments test set."""
def test_query(self):
expected_columns = ('course_id', 'catalog_course_title', 'catalog_course', 'start_time', 'end_time',
'pacing_type', 'availability', 'mode', 'count', 'count_change_7_days',
'cumulative_count', 'passing_users',)
import_task = ImportCourseSummaryEnrollmentsIntoMysql(
import_task = CourseMetaSummaryEnrollmentIntoMysql(
date=datetime(2017, 1, 1), warehouse_path='/tmp/foo'
)
select_clause = import_task.query.partition('FROM')[0]
select_clause = import_task.insert_source_task.query().partition('FROM')[0]
for column in expected_columns:
assert column in select_clause
......@@ -34,7 +34,7 @@ class ModuleEngagementAcceptanceTest(AcceptanceTestCase):
self.task.launch(
[
'CourseEnrollmentTableTask',
'CourseEnrollmentPartitionTask',
'--interval-end', '2015-04-17',
'--n-reduce-tasks', str(self.NUM_REDUCERS),
]
......
......@@ -230,7 +230,7 @@ class LuigiTaskDescription(object):
task_name = match.group('name')
raw_params = match.group('params')
param_parser = default_parameter_parser
if task_name == 'HiveTableFromQueryTask' or task_name == 'HiveTableFromParameterQueryTask':
if task_name == 'HiveTableFromQueryTask':
param_parser = hive_parameter_parser
if task_name == 'SqoopImportFromMysql':
param_parser = sqoop_parameter_parser
......
......@@ -421,89 +421,64 @@ class HiveTableFromQueryTask(HiveTableTask): # pylint: disable=abstract-method
raise NotImplementedError
class HiveTableFromParameterQueryTask(HiveTableFromQueryTask): # pylint: disable=abstract-method
"""Creates a hive table from the results of a hive query, given parameters instead of properties."""
insert_query = luigi.Parameter()
table = luigi.Parameter()
columns = luigi.Parameter(is_list=True)
partition = HivePartitionParameter()
class HiveQueryToMysqlTask(WarehouseMixin, MysqlInsertTask):
"""Populates a MySQL table with the results of a hive query."""
overwrite = luigi.BooleanParameter(
default=True,
description='If True, overwrite the MySQL data.',
)
hive_overwrite = luigi.BooleanParameter(
default=False,
description='If True, overwrite the hive data.',
)
SQL_TO_HIVE_TYPE = {
'varchar': 'STRING',
'datetime': 'TIMESTAMP',
'date': 'STRING',
'integer': 'INT',
'int': 'INT',
'double': 'DOUBLE',
'tinyint': 'TINYINT',
'longtext': 'STRING',
}
@property
def insert_source_task(self):
return HiveTableFromParameterQueryTask(
warehouse_path=self.warehouse_path,
insert_query=self.query,
table=self.table,
columns=self.hive_columns,
partition=self.partition,
overwrite=self.hive_overwrite,
)
def requires(self):
# MysqlInsertTask customizes requires() somewhat, so don't clobber that logic. Instead allow subclasses to
# extend the requirements with their own.
requirements = super(HiveQueryToMysqlTask, self).requires()
requirements['other_tables'] = self.required_table_tasks
return requirements
@property
def table(self):
raise NotImplementedError
class OverwriteAwareHiveQueryDataTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
"""
A generalized Data task whose output is a hive table populated from a hive query.
"""
@property
def query(self):
"""Hive query to run."""
def insert_query(self):
"""The query builder that controls the structure and fields inserted into the new table. This insert_query()
is used as part of the query() function below."""
raise NotImplementedError
@property
def columns(self):
def hive_partition_task(self):
"""The HivePartitionTask that needs to be generated."""
raise NotImplementedError
@property
def partition(self):
"""HivePartition object specifying the partition to store the data in."""
raise NotImplementedError
def query(self): # pragma: no cover
full_insert_query = """
USE {database_name};
INSERT INTO TABLE {table}
PARTITION ({partition.query_spec})
{insert_query};
""".format(database_name=hive_database_name(),
table=self.partition_task.hive_table_task.table,
partition=self.partition,
insert_query=self.insert_query.strip(), # pylint: disable=no-member
)
return textwrap.dedent(full_insert_query)
@property
def required_table_tasks(self):
"""List of tasks that generate any tables needed to run the query."""
return []
def partition_task(self): # pragma: no cover
"""The task that creates the partition used by this job."""
if not hasattr(self, '_partition_task'):
self._partition_task = self.hive_partition_task
return self._partition_task
@property
def hive_columns(self):
"""Convert MySQL column data types to hive data types and return hive column specs as (name, type) tuples."""
hive_cols = []
for column in self.columns:
column_name, sql_type = column
raw_sql_type = sql_type.split(' ')[0]
unparam_sql_type = raw_sql_type.split('(')[0]
hive_type = self.SQL_TO_HIVE_TYPE[unparam_sql_type.lower()]
hive_cols.append((column_name, hive_type))
return hive_cols
def partition(self): # pragma: no cover
"""A shorthand for the partition information on the upstream partition task."""
return self.partition_task.partition # pylint: disable=no-member
def output(self): # pragma: no cover
output_root = url_path_join(self.warehouse_path,
self.partition_task.hive_table_task.table,
self.partition.path_spec + '/')
return get_target_from_url(output_root, marker=True)
def on_success(self): # pragma: no cover
"""Overload the success method to touch the _SUCCESS file. Any class that uses a separate Marker file from the
data file will need to override the base on_success() call to create this marker."""
self.output().touch_marker()
def run(self):
self.remove_output_on_overwrite()
return super(OverwriteAwareHiveQueryDataTask, self).run()
def requires(self): # pragma: no cover
for requirement in super(OverwriteAwareHiveQueryDataTask, self).requires():
yield requirement
yield self.partition_task
......@@ -37,55 +37,6 @@ class HivePartitionParameterTest(TestCase):
self.assertEquals(str(partition), 'dt=2014-01-01')
class HiveQueryToMysqlTaskTest(TestCase):
"""Test some of the tricky logic in HiveQueryToMysqlTask"""
def test_hive_columns(self):
class TestQuery(hive.HiveQueryToMysqlTask): # pylint: disable=abstract-method
"""Sample task with just a column definition."""
columns = [
('one', 'VARCHAR(255) NOT NULL AUTO_INCREMENT UNIQUE'),
('two', 'VARCHAR'),
('three', 'DATETIME NOT NULL'),
('four', 'DATE'),
('five', 'INTEGER'),
('six', 'INT'),
('seven', 'DOUBLE'),
('eight', 'tinyint'),
('nine', 'longtext')
]
self.assertEquals(TestQuery().hive_columns, [
('one', 'STRING'),
('two', 'STRING'),
('three', 'TIMESTAMP'),
('four', 'STRING'),
('five', 'INT'),
('six', 'INT'),
('seven', 'DOUBLE'),
('eight', 'TINYINT'),
('nine', 'STRING')
])
def test_other_tables(self):
class TestOtherTables(hive.HiveQueryToMysqlTask): # pylint: disable=abstract-method
"""Sample task that relies on other tables."""
@property
def required_table_tasks(self):
return (
sentinel.table_1,
sentinel.table_2
)
query = 'SELECT 1'
table = 'test_table'
columns = [('one', 'VARCHAR')]
partition = hive.HivePartition('dt', '2014-01-01')
requirements = TestOtherTables().requires()
self.assertEquals(requirements['other_tables'], (sentinel.table_1, sentinel.table_2))
class HiveWarehouseMixinTest(TestCase):
"""Test the partition path generation"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment