Commit c274b537 by Brian Wilson

Add --dependency-tree option.

parent 94da7300
# -*- coding: utf-8 -*-
"""
Main method for running tasks on a local machine.
......@@ -11,6 +12,7 @@ import argparse
import logging
import os
import sys
import warnings
from contextlib import contextmanager
import boto
......@@ -52,6 +54,11 @@ def main():
default=None,
action='append'
)
parser.add_argument(
'--dependency-tree',
help='display dependency tree instead of running the job',
action='store_true',
)
arguments, _extra_args = parser.parse_known_args()
# We get a cleaned command-line arguments list, free of the arguments *we* care about, since Luigi will throw
......@@ -102,9 +109,11 @@ def main():
# TODO: setup logging for tasks or configured logging mechanism
# Launch Luigi using the default builder
with profile_if_necessary(os.getenv('WORKFLOW_PROFILER', ''), os.getenv('WORKFLOW_PROFILER_PATH', '')):
luigi.retcodes.run_with_retcodes(cmdline_args)
if arguments.dependency_tree:
output_dependency_tree(cmdline_args)
else:
with profile_if_necessary(os.getenv('WORKFLOW_PROFILER', ''), os.getenv('WORKFLOW_PROFILER_PATH', '')):
luigi.retcodes.run_with_retcodes(cmdline_args)
def get_cleaned_command_line_args():
......@@ -119,6 +128,8 @@ def get_cleaned_command_line_args():
# Clear out the flag, and clear out the value attached to it.
modified_arg_list[i] = None
modified_arg_list[i + 1] = None
elif v == '--dependency-tree':
modified_arg_list[i] = None
return list(filter(lambda x: x is not None, modified_arg_list))
......@@ -137,5 +148,35 @@ def profile_if_necessary(profiler_name, file_path):
profiler.save(filename=os.path.join(file_path, 'launch-task.trace'))
def print_dependency_tree(task, indent='', last=True):
"""Return a string representation of the tasks, their statuses/parameters in a dependency tree format."""
# Don't bother printing out warnings about tasks with no output.
with warnings.catch_warnings():
warnings.filterwarnings(action='ignore', message='Task .* without outputs has no custom complete\(\) method')
is_task_complete = task.complete()
is_complete = 'COMPLETE' if is_task_complete else 'PENDING'
name = task.__class__.__name__
params = task.to_str_params(only_significant=True)
result = '\n' + indent
if last:
result += '└─--'
indent += ' '
else:
result += '|--'
indent += '| '
result += '[{0}-{1} ({2})]'.format(name, params, is_complete)
children = task.deps()
for index, child in enumerate(children):
result += print_dependency_tree(child, indent, (index + 1) == len(children))
return result
def output_dependency_tree(cmdline_args):
"""Print out a tree representation of the dependencies of the given task."""
with luigi.cmdline_parser.CmdlineParser.global_instance(cmdline_args) as command_parser:
task = command_parser.get_task_obj()
print print_dependency_tree(task)
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment