Commit 2c38e2dd by Andrew Zafft

Integrated test of sqoop from vertica

parent 2ce0ee9e
...@@ -283,3 +283,79 @@ class SqoopImportRunner(luigi.contrib.hadoop.JobRunner): ...@@ -283,3 +283,79 @@ class SqoopImportRunner(luigi.contrib.hadoop.JobRunner):
except Exception: except Exception:
log.exception("Unable to dump metadata information.") log.exception("Unable to dump metadata information.")
pass pass
class SqoopImportFromVertica(SqoopImportTask):
"""
An abstract task that uses Sqoop to read data out of a Vertica database and writes it to a file in CSV format.
* fields delimited by comma
* lines delimited by \n
* delimiters escaped by backslash
* fields optionally enclosed by single quotes (')
"""
# Direct does not exist for the Vertica connector and so columns is now required
direct = luigi.BoolParameter(
default=None
)
columns = luigi.ListParameter(
description='A list of column names to be included. Default is to include all columns.',
)
schema_name = luigi.Parameter(
description=''
)
null_string = luigi.Parameter(
default=None,
description='String to use to represent NULL values in output data.',
)
fields_terminated_by = luigi.Parameter(
default=',',
description='Defines the file separator to use on output.',
)
delimiter_replacement = luigi.Parameter(
default='\\',
description='Defines a character to use as replacement for delimiters '
'that appear within data values, for use with Hive. Not specified by default.'
)
def connection_url(self, cred):
"""Construct connection URL from provided credentials."""
return 'jdbc:vertica://{host}/{database}'.format(host=cred['host'], database=self.database)
def import_args(self):
if self.columns is None or len(self.columns) == 0:
raise RuntimeError('Error Vertica\'s connector requires specific columns listed or a wild card be present. '
' No columns were supplied.')
arglist = [
'--target-dir', self.destination,
'--driver', 'com.vertica.jdbc.Driver',
]
if self.where is not None:
where_qry = "({}) and $CONDITIONS".format(self.where)
else:
where_qry = "$CONDITIONS"
select_qry = "SELECT {columns} FROM {schema}.{table} WHERE {where_qry}".format(columns=self.columns,
schema=self.schema_name,
table=self.table_name,
where_qry=where_qry)
arglist.extend(['--query', str(select_qry)])
if self.num_mappers is not None:
arglist.extend(['--num-mappers', str(self.num_mappers)])
else:
arglist.extend(['--num-mappers', '1'])
if self.verbose:
arglist.append('--verbose')
if self.null_string is not None:
arglist.extend(['--null-string', self.null_string, '--null-non-string', self.null_string])
if self.fields_terminated_by is not None:
arglist.extend(['--fields-terminated-by', self.fields_terminated_by])
arglist.extend(['--lines-terminated-by', '\n'])
if self.delimiter_replacement is not None:
arglist.extend(['--escaped-by', self.delimiter_replacement])
arglist.extend(['--optionally-enclosed-by', '\''])
return arglist
"""
Supports exporting data from Vertica.
"""
import logging
import luigi
from edx.analytics.tasks.common.sqoop import SqoopImportFromVertica
from edx.analytics.tasks.util.decorators import workflow_entry_point
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.url import ExternalURL, url_path_join
log = logging.getLogger(__name__)
class VerticaSourcedSqoopMixin(OverwriteOutputMixin):
"""A collection of parameters used by the sqoop command."""
credentials = luigi.Parameter(
config_path={'section': 'vertica-export', 'name': 'credentials'},
description='Path to the external access credentials file.',
)
database = luigi.Parameter(
default='warehouse',
description='The Vertica database that is the source of the sqoop command.'
)
schema = luigi.Parameter(
default=None,
description='The Vertica schema that is the source of the sqoop command.'
)
table = luigi.Parameter(
default=None,
description='The Vertica table that is the source of the sqoop command.'
)
warehouse_path = luigi.Parameter(
config_path={'section': 'vertica-export-sqoop', 'name': 'warehouse_path'},
description='A URL location of the data warehouse.',
)
class LoadVerticaToS3TableTask(VerticaSourcedSqoopMixin, luigi.Task):
"""
Sample S3 loader to S3
A sample loader that reads a table from Vertica and persists the entry to S3. In order to use
SqoopImportFromVertica the caller must already know the Vertica schema, table name, and column names. This
functionality should be added in future development cycles.
"""
required_tasks = None
def __init__(self, *args, **kwargs):
super(LoadVerticaToS3TableTask, self).__init__(*args, **kwargs)
def requires(self):
if self.required_tasks is None:
self.required_tasks = {
'credentials': ExternalURL(url=self.credentials),
'insert_source': self.insert_source_task,
}
return self.required_tasks
def complete(self):
return self.insert_source_task.complete()
@property
def insert_source_task(self):
"""The sqoop command that manages the connection to the source datasource."""
target_url = url_path_join(self.warehouse_path, self.database, self.schema, self.table)
return SqoopImportFromVertica(
schema_name=self.schema,
table_name=self.table,
credentials=self.credentials,
database=self.database,
columns='course_id,course_org_id,course_number,course_run,course_start,course_end,course_name',
destination=target_url,
overwrite=self.overwrite,
)
@workflow_entry_point
class ImportVerticaToS3Workflow(VerticaSourcedSqoopMixin, luigi.WrapperTask):
"""
A sample workflow to transfer data from Vertica to S3.
This is a sample workflow used for manual testing and to act as an example of a workflow to copy Vertica data. In
the final version the table name should be a list, and should be optional. Additionally table exclusions should be
a run time parameter.
"""
def __init__(self, *args, **kwargs):
super(ImportVerticaToS3Workflow, self).__init__(*args, **kwargs)
def requires(self):
return LoadVerticaToS3TableTask(
database=self.database,
schema=self.schema,
table=self.table,
credentials=self.credentials,
overwrite=self.overwrite,
warehouse_path=self.warehouse_path,
)
...@@ -69,6 +69,7 @@ edx.analytics.tasks = ...@@ -69,6 +69,7 @@ edx.analytics.tasks =
push_to_vertica_lms_courseware_link_clicked = edx.analytics.tasks.warehouse.lms_courseware_link_clicked:PushToVerticaLMSCoursewareLinkClickedTask push_to_vertica_lms_courseware_link_clicked = edx.analytics.tasks.warehouse.lms_courseware_link_clicked:PushToVerticaLMSCoursewareLinkClickedTask
run-vertica-sql-script = edx.analytics.tasks.warehouse.run_vertica_sql_script:RunVerticaSqlScriptTask run-vertica-sql-script = edx.analytics.tasks.warehouse.run_vertica_sql_script:RunVerticaSqlScriptTask
run-vertica-sql-scripts = edx.analytics.tasks.warehouse.run_vertica_sql_scripts:RunVerticaSqlScriptTask run-vertica-sql-scripts = edx.analytics.tasks.warehouse.run_vertica_sql_scripts:RunVerticaSqlScriptTask
test-vertica-sqoop = edx.analytics.tasks.common.vertica_export:ImportVerticaToS3Workflow
# financial: # financial:
cybersource = edx.analytics.tasks.warehouse.financial.cybersource:DailyPullFromCybersourceTask cybersource = edx.analytics.tasks.warehouse.financial.cybersource:DailyPullFromCybersourceTask
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment