Commit 79a305e2 by Hassan Javeed

Inspect traceback for handling Luigi RuntimeError..

parent 2573d4bf
...@@ -3,6 +3,7 @@ Support for loading data into a Mysql database. ...@@ -3,6 +3,7 @@ Support for loading data into a Mysql database.
""" """
import json import json
import logging import logging
import traceback
from itertools import chain from itertools import chain
import luigi import luigi
...@@ -169,9 +170,19 @@ class MysqlInsertTask(MysqlInsertTaskMixin, luigi.Task): ...@@ -169,9 +170,19 @@ class MysqlInsertTask(MysqlInsertTaskMixin, luigi.Task):
def rows(self): def rows(self):
"""Return/yield tuples or lists corresponding to each row to be inserted """ """Return/yield tuples or lists corresponding to each row to be inserted """
with self.input()['insert_source'].open('r') as fobj: try:
for line in fobj: with self.input()['insert_source'].open('r') as fobj:
yield line.strip('\n').split('\t') for line in fobj:
yield line.strip('\n').split('\t')
except RuntimeError:
# While calling finish on an input target, Luigi throws a RuntimeError exception if the subprocess command
# to read the input returns a non-zero return code. As all of the data's been read already, we choose to ignore
# this exception.
traceback_str = traceback.format_exc()
if "self._finish()" in traceback_str:
log.debug("Luigi raised RuntimeError while calling _finish on input target.")
else:
raise
def update_id(self): def update_id(self):
"""This update id will be a unique identifier for this insert on this table.""" """This update id will be a unique identifier for this insert on this table."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment