Commit 940fd7cb by Brian Wilson

Add support for JSON event output.

It coexists with regular event output, and is controlled by
an optional parameter.  By default it runs with
event_record_type equal to 'EventRecord', but can
be overridden by running with --event-record-type 'JsonEventRecord'.

Includes bug fix to timestamp handling:  Add validation screening dates < 1900.

Also includes support for event loading to BigQuery, by adding
support for partitioning to bigquery_load.

* Use records for warehouse loading where defined.
* Check bigquery availability in load code.

Add support for loading to S3 by interval or by date.  PerDate loading
checks whether the data already exists, which is good for incremental runs.
Bulk loading just runs over an interval, and assumes the data isn't already
present on S3.  This is better for processing many days more efficiently.

To address issue with loading into BigQuery, null characters in column values
are encoded as the string '\0'.
parent 04aca542
...@@ -5,6 +5,12 @@ import re ...@@ -5,6 +5,12 @@ import re
import datetime import datetime
import itertools import itertools
try:
from google.cloud.bigquery import SchemaField
bigquery_available = True # pylint: disable=invalid-name
except ImportError:
bigquery_available = False # pylint: disable=invalid-name
DEFAULT_NULL_VALUE = '\\N' # This is the default string used by Hive to represent a NULL value. DEFAULT_NULL_VALUE = '\\N' # This is the default string used by Hive to represent a NULL value.
...@@ -352,6 +358,28 @@ class Record(object): ...@@ -352,6 +358,28 @@ class Record(object):
return schema return schema
@classmethod @classmethod
def get_bigquery_schema(cls):
"""
A skeleton schema of the BigQuery table that could store this data.
Accepted types for legacy tables are 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', 'BYTES', 'DATE', 'TIME', 'DATETIME'.
Accepted types for standard tables are 'STRING', 'INT64', 'FLOAT64', 'BOOL', 'TIMESTAMP', 'BYTES', 'DATE', 'TIME', 'DATETIME'.
Going with legacy values for now.
Accepted modes are 'NULLABLE' or 'REQUIRED'.
Returns: A list of BigQuery SchemaField objects.
"""
if not bigquery_available:
raise ImportError('Bigquery library not available')
schema = []
for field_name, field_obj in cls.get_fields().items():
mode = 'NULLABLE' if field_obj.nullable else 'REQUIRED'
description=getattr(field_obj, 'description', None)
schema.append(SchemaField(field_name, field_obj.bigquery_type, description=description, mode=mode))
return schema
@classmethod
def get_elasticsearch_properties(cls): def get_elasticsearch_properties(cls):
""" """
An elasticsearch mapping that could store this data. An elasticsearch mapping that could store this data.
...@@ -517,6 +545,11 @@ class Field(object): ...@@ -517,6 +545,11 @@ class Field(object):
raise NotImplementedError raise NotImplementedError
@property @property
def biqquery_type(self):
"""Returns the BigQuery data type for this type of field."""
raise NotImplementedError
@property
def elasticsearch_type(self): def elasticsearch_type(self):
"""Returns the elasticsearch type for this type of field.""" """Returns the elasticsearch type for this type of field."""
raise NotImplementedError raise NotImplementedError
...@@ -526,6 +559,7 @@ class StringField(Field): # pylint: disable=abstract-method ...@@ -526,6 +559,7 @@ class StringField(Field): # pylint: disable=abstract-method
"""Represents a field that contains a relatively short string.""" """Represents a field that contains a relatively short string."""
hive_type = 'STRING' hive_type = 'STRING'
bigquery_type = 'STRING'
elasticsearch_type = 'string' elasticsearch_type = 'string'
def validate_parameters(self): def validate_parameters(self):
...@@ -569,6 +603,7 @@ class DelimitedStringField(Field): ...@@ -569,6 +603,7 @@ class DelimitedStringField(Field):
"""Represents a list of strings, stored as a single delimited string.""" """Represents a list of strings, stored as a single delimited string."""
hive_type = 'STRING' hive_type = 'STRING'
bigquery_type = 'STRING'
sql_base_type = 'VARCHAR' sql_base_type = 'VARCHAR'
elasticsearch_type = 'string' elasticsearch_type = 'string'
delimiter = '\0' delimiter = '\0'
...@@ -595,6 +630,7 @@ class BooleanField(Field): ...@@ -595,6 +630,7 @@ class BooleanField(Field):
"""Represents a field that contains a boolean.""" """Represents a field that contains a boolean."""
hive_type = 'TINYINT' hive_type = 'TINYINT'
bigquery_type = 'BOOLEAN'
sql_base_type = 'BOOLEAN' sql_base_type = 'BOOLEAN'
elasticsearch_type = 'boolean' elasticsearch_type = 'boolean'
...@@ -622,6 +658,7 @@ class IntegerField(Field): # pylint: disable=abstract-method ...@@ -622,6 +658,7 @@ class IntegerField(Field): # pylint: disable=abstract-method
"""Represents a field that contains an integer.""" """Represents a field that contains an integer."""
hive_type = sql_base_type = 'INT' hive_type = sql_base_type = 'INT'
bigquery_type = 'INTEGER'
elasticsearch_type = 'integer' elasticsearch_type = 'integer'
def validate(self, value): def validate(self, value):
...@@ -638,6 +675,7 @@ class DateField(Field): # pylint: disable=abstract-method ...@@ -638,6 +675,7 @@ class DateField(Field): # pylint: disable=abstract-method
"""Represents a field that contains a date.""" """Represents a field that contains a date."""
hive_type = 'STRING' hive_type = 'STRING'
bigquery_type = 'DATE'
sql_base_type = 'DATE' sql_base_type = 'DATE'
elasticsearch_type = 'date' elasticsearch_type = 'date'
...@@ -655,6 +693,7 @@ class DateTimeField(Field): # pylint: disable=abstract-method ...@@ -655,6 +693,7 @@ class DateTimeField(Field): # pylint: disable=abstract-method
"""Represents a field that contains a date and time.""" """Represents a field that contains a date and time."""
hive_type = 'TIMESTAMP' hive_type = 'TIMESTAMP'
bigquery_type = 'TIMESTAMP'
sql_base_type = 'DATETIME' sql_base_type = 'DATETIME'
elasticsearch_type = 'date' elasticsearch_type = 'date'
elasticsearch_format = 'yyyy-MM-dd HH:mm:ss.SSSSSS' elasticsearch_format = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
...@@ -689,6 +728,10 @@ class DateTimeField(Field): # pylint: disable=abstract-method ...@@ -689,6 +728,10 @@ class DateTimeField(Field): # pylint: disable=abstract-method
validation_errors.append('The value is a naive datetime.') validation_errors.append('The value is a naive datetime.')
elif value.utcoffset().total_seconds() != 0: elif value.utcoffset().total_seconds() != 0:
validation_errors.append('The value must use UTC timezone.') validation_errors.append('The value must use UTC timezone.')
elif value.year < 1900:
# https://docs.python.org/2/library/datetime.html?highlight=strftime#strftime-strptime-behavior
# "The exact range of years for which strftime() works also varies across platforms. Regardless of platform, years before 1900 cannot be used."
validation_errors.append('The value must be a date after 1900.')
return validation_errors return validation_errors
...@@ -710,6 +753,7 @@ class FloatField(Field): # pylint: disable=abstract-method ...@@ -710,6 +753,7 @@ class FloatField(Field): # pylint: disable=abstract-method
"""Represents a field that contains a floating point number.""" """Represents a field that contains a floating point number."""
hive_type = sql_base_type = 'FLOAT' hive_type = sql_base_type = 'FLOAT'
bigquery_type = 'FLOAT'
elasticsearch_type = 'float' elasticsearch_type = 'float'
def validate(self, value): def validate(self, value):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment