Commit f0196c12 by Carlos Andrés Rocha

Add verbose option to SqoopImportFromMysql

Change-Id: Ifa66f0cd3534132d94715d4214c8dc35132e0b60
parent 3929cee0
......@@ -114,11 +114,13 @@ class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask):
credentials: Path to the external access credentials file.
num_mappers: The number of map tasks to ask Sqoop to use.
where: A 'where' clause to be passed to Sqoop.
verbose: Sqoop prints more information while working.
"""
credentials = luigi.Parameter() # TODO: move to config
num_mappers = luigi.Parameter(default=None) # TODO: move to config
where = luigi.Parameter(default=None)
verbose = luigi.BooleanParameter(default=False)
def requires(self):
return SqoopImportFromMysql(
......@@ -126,5 +128,6 @@ class StudentModulePerCourseAfterImportWorkflow(StudentModulePerCourseTask):
destination=self.dump_root,
table_name='courseware_studentmodule',
num_mappers=self.num_mappers,
where=self.where
where=self.where,
verbose=self.verbose
)
......@@ -37,6 +37,7 @@ class SqoopImportTask(luigi.hadoop.BaseHadoopJobTask):
where: A 'where' clause to be passed to Sqoop. Note that
no spaces should be embedded and special characters should
be escaped. For example: --where "id\<50".
verbose: Print more information while working.
Example Credentials File::
......@@ -54,6 +55,7 @@ class SqoopImportTask(luigi.hadoop.BaseHadoopJobTask):
table_name = luigi.Parameter()
num_mappers = luigi.Parameter(default=None)
where = luigi.Parameter(default=None)
verbose = luigi.BooleanParameter(default=False)
def requires(self):
return {
......@@ -81,6 +83,9 @@ class SqoopImportTask(luigi.hadoop.BaseHadoopJobTask):
url = self.connection_url(cred)
generic_args = ['--connect', url, '--username', cred['username']]
if self.verbose:
generic_args.append('--verbose')
# write password to temp file object, and pass name of file to Sqoop:
with password_target.open('w') as password_file:
password_file.write(cred['password'])
......
......@@ -26,7 +26,7 @@ class SqoopImportFromMysqlTestCase(unittest.TestCase):
self.mock_run = patcher2.start()
self.addCleanup(patcher2.stop)
def run_task(self, credentials=None, num_mappers=None, where=None):
def run_task(self, credentials=None, num_mappers=None, where=None, verbose=False):
"""
Emulate execution of a generic MysqlTask.
"""
......@@ -45,7 +45,8 @@ class SqoopImportFromMysqlTestCase(unittest.TestCase):
destination="/fake/destination",
table_name="example_table",
num_mappers=num_mappers,
where=where
where=where,
verbose=verbose
)
fake_input = {
......@@ -92,6 +93,10 @@ class SqoopImportFromMysqlTestCase(unittest.TestCase):
self.assertEquals(arglist, expected_arglist)
self.assertTrue(self.mock_hdfstarget().remove.called)
def test_verbose_arguments(self):
arglist = self.run_task(verbose=True)
self.assertIn('--verbose', arglist)
def test_connect_with_where_args(self):
arglist = self.run_task(where='id < 50')
self.assertEquals(arglist[-4], '--where')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment