Commit 6c5dcf74 by Andrew Zafft

Hive partition inserts overwrite by default and added in a marker file check for DE-490

parent 4709a8e7
...@@ -6,7 +6,7 @@ import textwrap ...@@ -6,7 +6,7 @@ import textwrap
import luigi import luigi
from luigi.configuration import get_config from luigi.configuration import get_config
from luigi.hive import HivePartitionTarget, HiveQueryRunner, HiveQueryTask, HiveTableTarget from luigi.hive import HivePartitionTarget, HiveQueryRunner, HiveQueryTask, HiveTableTarget
from luigi.parameter import Parameter from luigi.parameter import Parameter, BooleanParameter
from edx.analytics.tasks.common.mysql_load import MysqlInsertTask from edx.analytics.tasks.common.mysql_load import MysqlInsertTask
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
...@@ -426,6 +426,13 @@ class OverwriteAwareHiveQueryDataTask(WarehouseMixin, OverwriteOutputMixin, Hive ...@@ -426,6 +426,13 @@ class OverwriteAwareHiveQueryDataTask(WarehouseMixin, OverwriteOutputMixin, Hive
A generalized Data task whose output is a hive table populated from a hive query. A generalized Data task whose output is a hive table populated from a hive query.
""" """
overwrite_target_partition = BooleanParameter(
significant=False,
description='Overwrite the target partition, deleting any existing data. This will not impact other '
'partitions. Do not use with incrementally built partitions.',
default=True
)
@property @property
def insert_query(self): def insert_query(self):
"""The query builder that controls the structure and fields inserted into the new table. This insert_query() """The query builder that controls the structure and fields inserted into the new table. This insert_query()
...@@ -437,18 +444,26 @@ class OverwriteAwareHiveQueryDataTask(WarehouseMixin, OverwriteOutputMixin, Hive ...@@ -437,18 +444,26 @@ class OverwriteAwareHiveQueryDataTask(WarehouseMixin, OverwriteOutputMixin, Hive
"""The HivePartitionTask that needs to be generated.""" """The HivePartitionTask that needs to be generated."""
raise NotImplementedError raise NotImplementedError
@property
def data_modification_sql_text(self):
"""Returns the appropriate SQL text for the chosen overwrite_target_partition strategy."""
if self.overwrite_target_partition:
return "OVERWRITE"
else:
return "INTO"
def query(self): # pragma: no cover def query(self): # pragma: no cover
full_insert_query = """ full_insert_query = """
USE {database_name}; USE {database_name};
INSERT INTO TABLE {table} INSERT {into_or_overwrite} TABLE {table}
PARTITION ({partition.query_spec}) PARTITION ({partition.query_spec})
{insert_query}; {insert_query};
""".format(database_name=hive_database_name(), """.format(database_name=hive_database_name(),
into_or_overwrite=self.data_modification_sql_text,
table=self.partition_task.hive_table_task.table, table=self.partition_task.hive_table_task.table,
partition=self.partition, partition=self.partition,
insert_query=self.insert_query.strip(), # pylint: disable=no-member insert_query=self.insert_query.strip(), # pylint: disable=no-member
) )
return textwrap.dedent(full_insert_query) return textwrap.dedent(full_insert_query)
@property @property
......
...@@ -10,7 +10,9 @@ Examples:: ...@@ -10,7 +10,9 @@ Examples::
""" """
from __future__ import absolute_import from __future__ import absolute_import
import logging
import os import os
import time
import urlparse import urlparse
import luigi import luigi
...@@ -23,11 +25,16 @@ from luigi.s3 import S3Target ...@@ -23,11 +25,16 @@ from luigi.s3 import S3Target
from edx.analytics.tasks.util.s3_util import S3HdfsTarget, ScalableS3Client from edx.analytics.tasks.util.s3_util import S3HdfsTarget, ScalableS3Client
log = logging.getLogger(__name__)
class MarkerMixin(object): class MarkerMixin(object):
"""This mixin handles Targets that cannot accurately be measured by the existence of data files, and instead need """This mixin handles Targets that cannot accurately be measured by the existence of data files, and instead need
another positive marker to indicate Task success.""" another positive marker to indicate Task success."""
# Check if the marker file is readable after being written, and if not then block for up to 10 minutes until a read
# is successful.
confirm_marker_file_after_writing = True
def exists(self): # pragma: no cover def exists(self): # pragma: no cover
"""Completion of this target is based solely on the existence of the marker file.""" """Completion of this target is based solely on the existence of the marker file."""
return self.fs.exists(self.path + "/_SUCCESS") return self.fs.exists(self.path + "/_SUCCESS")
...@@ -37,6 +44,20 @@ class MarkerMixin(object): ...@@ -37,6 +44,20 @@ class MarkerMixin(object):
marker = self.__class__(path=self.path + "/_SUCCESS") marker = self.__class__(path=self.path + "/_SUCCESS")
marker.open("w").close() marker.open("w").close()
if self.confirm_marker_file_after_writing:
read_attempts = 10
marker_exists = False
while read_attempts > 0 and not marker_exists:
marker_exists = self.exists()
if not marker_exists:
log.debug("Marker file %s does not exist, sleeping for 60 seconds", marker)
time.sleep(60)
if not marker_exists:
log.error("Error Marker file %s should have been created but could not be read!", marker)
class S3MarkerTarget(MarkerMixin, S3Target): class S3MarkerTarget(MarkerMixin, S3Target):
"""An S3 Target that uses a marker file to indicate success.""" """An S3 Target that uses a marker file to indicate success."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment