Commit 1a50fdf4 by Brian Wilson

Use BigQuery-friendly sqoop settings for mysql to Vertica loading.

parent dd30cd4b
......@@ -38,22 +38,45 @@ class MysqlToWarehouseTaskMixin(WarehouseMixin):
)
class LoadMysqlToVerticaTableTask(MysqlToWarehouseTaskMixin, VerticaCopyTask):
class MysqlToVerticaTaskMixin(MysqlToWarehouseTaskMixin):
"""
Task to import a table from mysql into vertica.
Parameters for importing a mysql database into Vertica.
"""
table_name = luigi.Parameter(
description='The name of the table.',
exclude_field = luigi.ListParameter(
default=(),
description='List of regular expression patterns for matching "tablename.fieldname" fields that should not be output.',
)
date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
)
warehouse_subdirectory = luigi.Parameter(
default='import_mysql_to_vertica',
description='Subdirectory under warehouse_path to store intermediate data.'
)
class LoadMysqlToVerticaTableTask(MysqlToVerticaTaskMixin, VerticaCopyTask):
"""
Task to import a table from mysql into vertica.
"""
table_name = luigi.Parameter(
description='The name of the table.',
)
def __init__(self, *args, **kwargs):
super(LoadMysqlToVerticaTableTask, self).__init__(*args, **kwargs)
self.table_schema = []
self.deleted_fields = []
self.undeleted_fields = []
def should_exclude_field(self, field_name):
"""Determines whether to exclude an individual field during the import, matching against 'table.field'."""
full_name = "{}.{}".format(self.table_name, field_name)
if any(re.match(pattern, full_name) for pattern in self.exclude_field):
return True
return False
def requires(self):
if self.required_tasks is None:
......@@ -74,7 +97,9 @@ class LoadMysqlToVerticaTableTask(MysqlToWarehouseTaskMixin, VerticaCopyTask):
field_null = result[2].strip()
types_with_parentheses = ['tinyint', 'smallint', 'int', 'bigint', 'datetime']
if any(_type in field_type for _type in types_with_parentheses):
if field_type == 'tinyint(1)':
field_type = 'BOOLEAN'
elif any(_type in field_type for _type in types_with_parentheses):
field_type = field_type.rsplit('(')[0]
elif field_type == 'longtext':
field_type = 'LONG VARCHAR'
......@@ -86,32 +111,50 @@ class LoadMysqlToVerticaTableTask(MysqlToWarehouseTaskMixin, VerticaCopyTask):
if field_null == "NO":
field_type = field_type + " NOT NULL"
field_name = "\"{}\"".format(field_name)
self.table_schema.append((field_name, field_type))
if self.should_exclude_field(field_name):
self.deleted_fields.append(field_name)
else:
self.undeleted_fields.append(field_name)
field_name = "\"{}\"".format(field_name)
self.table_schema.append((field_name, field_type))
return self.table_schema
@property
def field_delimiter(self):
"""The delimiter in the data to be copied."""
return '\x01'
@property
def null_marker(self):
"""The null sequence in the data to be copied."""
return 'NNULLL'
@property
def quote_character(self):
return ''
@property
def copy_delimiter(self):
"""The delimiter in the data to be copied."""
return "','"
return "'{}'".format(self.field_delimiter)
@property
def copy_null_sequence(self):
"""The null sequence in the data to be copied."""
return "'NULL'"
return "'{}'".format(self.null_marker)
@property
def enclosed_by(self):
return "''''"
# Hopefully, setting this to a zero-length string will disable this.
return "''"
@property
def insert_source_task(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
destination = url_path_join(
self.warehouse_path,
"database_import",
self.warehouse_subdirectory,
self.database,
self.table_name,
partition_path_spec
......@@ -122,7 +165,12 @@ class LoadMysqlToVerticaTableTask(MysqlToWarehouseTaskMixin, VerticaCopyTask):
database=self.database,
destination=destination,
overwrite=self.overwrite,
mysql_delimiters=True,
mysql_delimiters=False,
fields_terminated_by=self.field_delimiter,
null_string=self.null_marker,
delimiter_replacement=' ',
direct=False,
columns=self.undeleted_fields,
)
@property
......@@ -196,7 +244,7 @@ class PostImportDatabaseTask(SchemaManagementTask):
)
class ImportMysqlToVerticaTask(MysqlToWarehouseTaskMixin, luigi.WrapperTask):
class ImportMysqlToVerticaTask(MysqlToVerticaTaskMixin, luigi.WrapperTask):
"""Provides entry point for importing a mysql database into Vertica."""
schema = luigi.Parameter(
......@@ -207,9 +255,6 @@ class ImportMysqlToVerticaTask(MysqlToWarehouseTaskMixin, luigi.WrapperTask):
config_path={'section': 'vertica-export', 'name': 'credentials'},
description='Path to the external access credentials file.',
)
date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
)
overwrite = luigi.BoolParameter(
default=False,
significant=False,
......@@ -367,6 +412,13 @@ class LoadMysqlToBigQueryTableTask(MysqlToBigQueryTaskMixin, BigQueryLoadTask):
return True
return False
def should_exclude_field(self, field_name):
"""Determines whether to exclude an individual field during the import, matching against 'table.field'."""
full_name = "{}.{}".format(self.table_name, field_name)
if any(re.match(pattern, full_name) for pattern in self.exclude_field):
return True
return False
def get_bigquery_schema(self):
"""Transforms mysql table schema into a vertica compliant schema."""
......@@ -421,7 +473,6 @@ class LoadMysqlToBigQueryTableTask(MysqlToBigQueryTaskMixin, BigQueryLoadTask):
def insert_source_task(self):
# Make sure yet again that columns have been calculated.
columns = [field.name for field in self.schema]
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
destination = url_path_join(
self.warehouse_path,
......@@ -475,6 +526,10 @@ class ImportMysqlDatabaseToBigQueryDatasetTask(MysqlToBigQueryTaskMixin, BigQuer
default=(),
description='List of regular expressions matching database table names that should not be imported from MySQL to BigQuery.'
)
exclude_field = luigi.ListParameter(
default=(),
description='List of regular expression patterns for matching "tablename.fieldname" fields that should not be output.',
)
def __init__(self, *args, **kwargs):
super(ImportMysqlDatabaseToBigQueryDatasetTask, self).__init__(*args, **kwargs)
......
......@@ -46,6 +46,7 @@ class LoadMysqlToVerticaTableTaskTest(TestCase):
('name', 'varchar(255)', 'NO', 'MUL', None, ''),
('meta', 'longtext', 'NO', '', None, ''),
('width', 'smallint(6)', 'YES', 'MUL', None, ''),
('test_tiny', 'tinyint(4)', 'YES', 'MUL', None, ''),
('allow_certificate', 'tinyint(1)', 'NO', '', None, ''),
('user_id', 'bigint(20) unsigned', 'NO', '', None, ''),
('profile_image_uploaded_at', 'datetime', 'YES', '', None, ''),
......@@ -64,7 +65,8 @@ class LoadMysqlToVerticaTableTaskTest(TestCase):
('"name"', 'varchar(255) NOT NULL'),
('"meta"', 'LONG VARCHAR NOT NULL'),
('"width"', 'smallint'),
('"allow_certificate"', 'tinyint NOT NULL'),
('"test_tiny"', 'tinyint'),
('"allow_certificate"', 'BOOLEAN NOT NULL'),
('"user_id"', 'bigint NOT NULL'),
('"profile_image_uploaded_at"', 'datetime'),
('"change_date"', 'datetime'),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment