Commit a5f9b310 by Andrew Zafft

Integrated test of sqoop from vertica

parent 16780bdb
...@@ -92,7 +92,7 @@ class SqoopImportTask(OverwriteOutputMixin, SqoopImportMixin, luigi.contrib.hado ...@@ -92,7 +92,7 @@ class SqoopImportTask(OverwriteOutputMixin, SqoopImportMixin, luigi.contrib.hado
) )
fields_terminated_by = luigi.Parameter( fields_terminated_by = luigi.Parameter(
default=None, default=None,
description='Defines the file separator to use on output.', description='Defines the field separator to use on output.',
) )
delimiter_replacement = luigi.Parameter( delimiter_replacement = luigi.Parameter(
default=None, default=None,
...@@ -197,7 +197,7 @@ class SqoopImportTask(OverwriteOutputMixin, SqoopImportMixin, luigi.contrib.hado ...@@ -197,7 +197,7 @@ class SqoopImportTask(OverwriteOutputMixin, SqoopImportMixin, luigi.contrib.hado
class SqoopImportFromMysql(SqoopImportTask): class SqoopImportFromMysql(SqoopImportTask):
""" """
An abstract task that uses Sqoop to read data out of a database and writes it to a file in CSV format. An abstract task that uses Sqoop to read data out of a MySQL database and writes it to a file in CSV format.
By default, the output format is defined by meaning of --mysql-delimiters option, which defines defaults used by By default, the output format is defined by meaning of --mysql-delimiters option, which defines defaults used by
mysqldump tool: mysqldump tool:
...@@ -283,3 +283,75 @@ class SqoopImportRunner(luigi.contrib.hadoop.JobRunner): ...@@ -283,3 +283,75 @@ class SqoopImportRunner(luigi.contrib.hadoop.JobRunner):
except Exception: except Exception:
log.exception("Unable to dump metadata information.") log.exception("Unable to dump metadata information.")
pass pass
class SqoopImportFromVertica(SqoopImportTask):
"""
An abstract task that uses Sqoop to read data out of a Vertica database and writes it to a file in CSV format.
* fields delimited by comma
* lines delimited by \n
* fields optionally enclosed by single quotes (')
"""
# Direct is not supported by the Vertica JDBC connector.
direct = None
# A bug in the Sqoop process with this JDBC connector makes this parameter useless.
num_mappers = None
schema_name = luigi.Parameter(
description='The schema that contains the table being exported.'
)
timezone_adjusted_column_list = luigi.ListParameter(
default=[],
description='The list of columns that need to have their times manually adjusted to UTC.'
)
def connection_url(self, cred):
"""Construct connection URL from provided credentials."""
return 'jdbc:vertica://{host}/{database}?searchpath={schema}'.format(
host=cred['host'],
database=self.database,
schema=self.schema_name
)
def import_args(self):
if self.columns is None or len(self.columns) == 0:
raise RuntimeError('Error Vertica\'s connector requires specific columns listed. No columns were supplied.')
arglist = [
'--target-dir', self.destination,
'--driver', 'com.vertica.jdbc.Driver',
]
# The vertica JBDC client does not handle timestamptz fields or column names that are reserved words
column_list = []
for column in self.columns:
if column in self.timezone_adjusted_column_list:
column_list.append('"{}" AT TIME ZONE \'UTC\''.format(column))
else:
column_list.append('"{}"'.format(column))
query = 'SELECT {cols} FROM {tbl} WHERE $CONDITIONS'.format(cols=','.join(column_list), tbl=self.table_name)
arglist.extend(['--query', query])
# There appears to be a bug in the handling of the --num-mappers and --split-by options. if --num-mappers is
# omitted Sqoop terminates with errors. If --num-mappers is included with any value other than 1 then Sqoop
# terminates requiring the --split-by field also be defined. If --num-mappers is included and a valid
# --split-by field is included then in one example Sqoop exported only 1/3 of the proper number of records.
# For now disable num_mappers and default the parallelism to 1.
arglist.extend(['--num-mappers', '1'])
if self.verbose:
arglist.append('--verbose')
if self.null_string is not None:
arglist.extend(['--null-string', self.null_string, '--null-non-string', self.null_string])
if self.fields_terminated_by is not None:
arglist.extend(['--fields-terminated-by', self.fields_terminated_by])
# At some point we may want to promote this sqoop option to a luigi parameter option
arglist.extend(['--optionally-enclosed-by', '\''])
if self.delimiter_replacement is not None:
arglist.extend(['--hive-delims-replacement', self.delimiter_replacement])
arglist.extend(['--lines-terminated-by', '\n'])
return arglist
...@@ -69,6 +69,7 @@ edx.analytics.tasks = ...@@ -69,6 +69,7 @@ edx.analytics.tasks =
push_to_vertica_lms_courseware_link_clicked = edx.analytics.tasks.warehouse.lms_courseware_link_clicked:PushToVerticaLMSCoursewareLinkClickedTask push_to_vertica_lms_courseware_link_clicked = edx.analytics.tasks.warehouse.lms_courseware_link_clicked:PushToVerticaLMSCoursewareLinkClickedTask
run-vertica-sql-script = edx.analytics.tasks.warehouse.run_vertica_sql_script:RunVerticaSqlScriptTask run-vertica-sql-script = edx.analytics.tasks.warehouse.run_vertica_sql_script:RunVerticaSqlScriptTask
run-vertica-sql-scripts = edx.analytics.tasks.warehouse.run_vertica_sql_scripts:RunVerticaSqlScriptTask run-vertica-sql-scripts = edx.analytics.tasks.warehouse.run_vertica_sql_scripts:RunVerticaSqlScriptTask
test-vertica-sqoop = edx.analytics.tasks.common.vertica_export:VerticaSchemaToBigQueryTask
# financial: # financial:
cybersource = edx.analytics.tasks.warehouse.financial.cybersource:DailyPullFromCybersourceTask cybersource = edx.analytics.tasks.warehouse.financial.cybersource:DailyPullFromCybersourceTask
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment