Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
E
edx-analytics-pipeline
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
edx
edx-analytics-pipeline
Commits
c5d4ce31
Commit
c5d4ce31
authored
May 30, 2018
by
Andrew Zafft
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Correcting coerced timestamp columns so that Sqoop specifies a column name
parent
e558abd3
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
24 additions
and
3 deletions
+24
-3
edx/analytics/tasks/common/sqoop.py
+4
-1
edx/analytics/tasks/common/tests/test_sqoop.py
+20
-2
No files found.
edx/analytics/tasks/common/sqoop.py
View file @
c5d4ce31
...
@@ -327,7 +327,10 @@ class SqoopImportFromVertica(SqoopImportTask):
...
@@ -327,7 +327,10 @@ class SqoopImportFromVertica(SqoopImportTask):
column_list
=
[]
column_list
=
[]
for
column
in
self
.
columns
:
for
column
in
self
.
columns
:
if
column
in
self
.
timezone_adjusted_column_list
:
if
column
in
self
.
timezone_adjusted_column_list
:
column_list
.
append
(
'"{}" AT TIME ZONE
\'
UTC
\'
'
.
format
(
column
))
column_list
.
append
(
'"{source_column}" AT TIME ZONE
\'
UTC
\'
AS "{exported_column_name}"'
.
format
(
source_column
=
column
,
exported_column_name
=
column
))
else
:
else
:
column_list
.
append
(
'"{}"'
.
format
(
column
))
column_list
.
append
(
'"{}"'
.
format
(
column
))
...
...
edx/analytics/tasks/common/tests/test_sqoop.py
View file @
c5d4ce31
...
@@ -48,7 +48,8 @@ class SqoopImportTestCase(unittest.TestCase):
...
@@ -48,7 +48,8 @@ class SqoopImportTestCase(unittest.TestCase):
return
task
return
task
def
create_vertica_task
(
self
,
num_mappers
=
None
,
where
=
None
,
table_name
=
None
,
schema_name
=
None
,
columns
=
None
,
def
create_vertica_task
(
self
,
num_mappers
=
None
,
where
=
None
,
table_name
=
None
,
schema_name
=
None
,
columns
=
None
,
null_string
=
None
,
fields_terminated_by
=
None
,
delimiter_replacement
=
None
,
overwrite
=
False
):
null_string
=
None
,
fields_terminated_by
=
None
,
delimiter_replacement
=
None
,
overwrite
=
False
,
timezone_adjusted_column_list
=
[]):
"""A generic task generator for Sqoop on Vertica"""
"""A generic task generator for Sqoop on Vertica"""
if
columns
is
None
:
if
columns
is
None
:
column_list
=
[]
column_list
=
[]
...
@@ -70,6 +71,7 @@ class SqoopImportTestCase(unittest.TestCase):
...
@@ -70,6 +71,7 @@ class SqoopImportTestCase(unittest.TestCase):
"fields_terminated_by"
:
fields_terminated_by
,
"fields_terminated_by"
:
fields_terminated_by
,
"delimiter_replacement"
:
delimiter_replacement
,
"delimiter_replacement"
:
delimiter_replacement
,
"overwrite"
:
overwrite
,
"overwrite"
:
overwrite
,
"timezone_adjusted_column_list"
:
timezone_adjusted_column_list
,
}
}
# remove options marked as None
# remove options marked as None
trimmed_kws
=
{
k
:
v
for
k
,
v
in
kw_args
.
iteritems
()
if
v
is
not
None
}
trimmed_kws
=
{
k
:
v
for
k
,
v
in
kw_args
.
iteritems
()
if
v
is
not
None
}
...
@@ -116,7 +118,7 @@ class SqoopImportTestCase(unittest.TestCase):
...
@@ -116,7 +118,7 @@ class SqoopImportTestCase(unittest.TestCase):
def
create_and_run_vertica_task
(
self
,
credentials
=
None
,
num_mappers
=
None
,
where
=
None
,
table_name
=
None
,
def
create_and_run_vertica_task
(
self
,
credentials
=
None
,
num_mappers
=
None
,
where
=
None
,
table_name
=
None
,
schema_name
=
None
,
columns
=
None
,
null_string
=
None
,
fields_terminated_by
=
None
,
schema_name
=
None
,
columns
=
None
,
null_string
=
None
,
fields_terminated_by
=
None
,
delimiter_replacement
=
None
,
overwrite
=
False
):
delimiter_replacement
=
None
,
overwrite
=
False
,
timezone_adjusted_column_list
=
[]
):
"""Create a SqoopImportFromVertica task with specified options, and then run it."""
"""Create a SqoopImportFromVertica task with specified options, and then run it."""
task
=
self
.
create_vertica_task
(
task
=
self
.
create_vertica_task
(
num_mappers
=
num_mappers
,
num_mappers
=
num_mappers
,
...
@@ -128,6 +130,7 @@ class SqoopImportTestCase(unittest.TestCase):
...
@@ -128,6 +130,7 @@ class SqoopImportTestCase(unittest.TestCase):
fields_terminated_by
=
fields_terminated_by
,
fields_terminated_by
=
fields_terminated_by
,
delimiter_replacement
=
delimiter_replacement
,
delimiter_replacement
=
delimiter_replacement
,
overwrite
=
overwrite
,
overwrite
=
overwrite
,
timezone_adjusted_column_list
=
timezone_adjusted_column_list
,
)
)
self
.
run_task
(
task
,
credentials
)
self
.
run_task
(
task
,
credentials
)
...
@@ -275,6 +278,21 @@ class SqoopImportTestCase(unittest.TestCase):
...
@@ -275,6 +278,21 @@ class SqoopImportTestCase(unittest.TestCase):
self
.
assertEquals
(
arglist
,
expected_arglist
)
self
.
assertEquals
(
arglist
,
expected_arglist
)
self
.
assertTrue
(
self
.
mock_sqoop_password_target
()
.
remove
.
called
)
self
.
assertTrue
(
self
.
mock_sqoop_password_target
()
.
remove
.
called
)
def
test_success_vertica_with_timestamp_field
(
self
):
self
.
create_and_run_vertica_task
(
table_name
=
'example_table'
,
schema_name
=
'fake_schema'
,
delimiter_replacement
=
' '
,
fields_terminated_by
=
','
,
columns
=
[
'field1'
,
'field2'
,
'field3'
],
timezone_adjusted_column_list
=
[
'field1'
])
arglist
=
self
.
get_call_args_after_run
()
self
.
assertTrue
(
self
.
mock_run
.
called
)
generated_query
=
'SELECT "field1" AT TIME ZONE
\'
UTC
\'
AS "field1","field2","field3" FROM example_table '
\
'WHERE $CONDITIONS'
self
.
assertEquals
(
arglist
[
12
],
'--query'
)
self
.
assertEquals
(
arglist
[
13
],
generated_query
)
def
test_success_vertica_with_custom_delimiters
(
self
):
def
test_success_vertica_with_custom_delimiters
(
self
):
self
.
create_and_run_vertica_task
(
table_name
=
'example_table'
,
schema_name
=
'fake_schema'
,
self
.
create_and_run_vertica_task
(
table_name
=
'example_table'
,
schema_name
=
'fake_schema'
,
columns
=
[
'field1'
,
'field2'
,
'field3'
],
fields_terminated_by
=
'
\a
'
,
columns
=
[
'field1'
,
'field2'
,
'field3'
],
fields_terminated_by
=
'
\a
'
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment