Commit d0cfcdbc by James Cammarata

Getting dynamic includes working a bit better on v2

parent f12695f2
...@@ -39,6 +39,18 @@ class HostState: ...@@ -39,6 +39,18 @@ class HostState:
self.fail_state = PlayIterator.FAILED_NONE self.fail_state = PlayIterator.FAILED_NONE
self.pending_setup = False self.pending_setup = False
def __repr__(self):
return "HOST STATE: block=%d, task=%d, rescue=%d, always=%d, role=%s, run_state=%d, fail_state=%d, pending_setup=%s" % (
self.cur_block,
self.cur_regular_task,
self.cur_rescue_task,
self.cur_always_task,
self.cur_role,
self.run_state,
self.fail_state,
self.pending_setup,
)
def get_current_block(self): def get_current_block(self):
return self._blocks[self.cur_block] return self._blocks[self.cur_block]
...@@ -55,6 +67,7 @@ class HostState: ...@@ -55,6 +67,7 @@ class HostState:
return new_state return new_state
class PlayIterator: class PlayIterator:
# the primary running states for the play iteration # the primary running states for the play iteration
ITERATING_SETUP = 0 ITERATING_SETUP = 0
ITERATING_TASKS = 1 ITERATING_TASKS = 1
...@@ -193,7 +206,8 @@ class PlayIterator: ...@@ -193,7 +206,8 @@ class PlayIterator:
the different processes, and not all data structures are preserved. This method the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine. allows us to find the original task passed into the executor engine.
''' '''
for block in self._blocks: s = self.get_host_state(host)
for block in s._blocks:
if block.block: if block.block:
for t in block.block: for t in block.block:
if t._uuid == task._uuid: if t._uuid == task._uuid:
...@@ -208,20 +222,23 @@ class PlayIterator: ...@@ -208,20 +222,23 @@ class PlayIterator:
return t return t
return None return None
def add_tasks(self, task_list): def add_tasks(self, host, task_list):
if self._run_state == self.ITERATING_TASKS: s = self.get_host_state(host)
before = self._task_list[:self._cur_task_pos + self._tasks_added] target_block = s._blocks[s.cur_block].copy()
after = self._task_list[self._cur_task_pos + self._tasks_added:]
self._task_list = before + task_list + after if s.run_state == self.ITERATING_TASKS:
elif self._run_state == self.ITERATING_RESCUE: before = target_block.block[:s.cur_regular_task]
before = self._cur_block.rescue[:self._cur_rescue_pos + self._tasks_added] after = target_block.block[s.cur_regular_task:]
after = self._cur_block.rescue[self._cur_rescue_pos + self._tasks_added:] target_block.block = before + task_list + after
self._cur_block.rescue = before + task_list + after elif s.run_state == self.ITERATING_RESCUE:
elif self._run_state == self.ITERATING_ALWAYS: before = target_block.rescue[:s.cur_rescue_task]
before = self._cur_block.always[:self._cur_always_pos + self._tasks_added] after = target_block.rescue[s.cur_rescue_task:]
after = self._cur_block.always[self._cur_always_pos + self._tasks_added:] target_block.rescue = before + task_list + after
self._cur_block.always = before + task_list + after elif s.run_state == self.ITERATING_ALWAYS:
before = target_block.always[:s.cur_always_task]
# set this internal flag now so we know if after = target_block.always[s.cur_always_task:]
self._tasks_added += len(task_list) target_block.always = before + task_list + after
s._blocks[s.cur_block] = target_block
self._host_states[host.name] = s
...@@ -36,6 +36,9 @@ class Host: ...@@ -36,6 +36,9 @@ class Host:
def __setstate__(self, data): def __setstate__(self, data):
return self.deserialize(data) return self.deserialize(data)
def __eq__(self, other):
return self.name == other.name
def serialize(self): def serialize(self):
groups = [] groups = []
for group in self.groups: for group in self.groups:
......
...@@ -118,6 +118,8 @@ class StrategyBase: ...@@ -118,6 +118,8 @@ class StrategyBase:
based on the result (executing callbacks, updating state, etc.). based on the result (executing callbacks, updating state, etc.).
''' '''
ret_results = []
while not self._final_q.empty() and not self._tqm._terminated: while not self._final_q.empty() and not self._tqm._terminated:
try: try:
result = self._final_q.get(block=False) result = self._final_q.get(block=False)
...@@ -156,6 +158,8 @@ class StrategyBase: ...@@ -156,6 +158,8 @@ class StrategyBase:
if entry == hashed_entry : if entry == hashed_entry :
role_obj._had_task_run = True role_obj._had_task_run = True
ret_results.append(task_result)
#elif result[0] == 'include': #elif result[0] == 'include':
# host = result[1] # host = result[1]
# task = result[2] # task = result[2]
...@@ -211,19 +215,26 @@ class StrategyBase: ...@@ -211,19 +215,26 @@ class StrategyBase:
except Queue.Empty: except Queue.Empty:
pass pass
return ret_results
def _wait_on_pending_results(self, iterator): def _wait_on_pending_results(self, iterator):
''' '''
Wait for the shared counter to drop to zero, using a short sleep Wait for the shared counter to drop to zero, using a short sleep
between checks to ensure we don't spin lock between checks to ensure we don't spin lock
''' '''
ret_results = []
while self._pending_results > 0 and not self._tqm._terminated: while self._pending_results > 0 and not self._tqm._terminated:
debug("waiting for pending results (%d left)" % self._pending_results) debug("waiting for pending results (%d left)" % self._pending_results)
self._process_pending_results(iterator) results = self._process_pending_results(iterator)
ret_results.extend(results)
if self._tqm._terminated: if self._tqm._terminated:
break break
time.sleep(0.01) time.sleep(0.01)
return ret_results
def _add_host(self, host_info): def _add_host(self, host_info):
''' '''
Helper function to add a new host to inventory based on a task result. Helper function to add a new host to inventory based on a task result.
...@@ -292,22 +303,21 @@ class StrategyBase: ...@@ -292,22 +303,21 @@ class StrategyBase:
# and add the host to the group # and add the host to the group
new_group.add_host(actual_host) new_group.add_host(actual_host)
def _load_included_file(self, task, include_file, include_vars): def _load_included_file(self, included_file):
''' '''
Loads an included YAML file of tasks, applying the optional set of variables. Loads an included YAML file of tasks, applying the optional set of variables.
''' '''
data = self._loader.load_from_file(include_file) data = self._loader.load_from_file(included_file._filename)
if not isinstance(data, list): if not isinstance(data, list):
raise AnsibleParsingError("included task files must contain a list of tasks", obj=ds) raise AnsibleParsingError("included task files must contain a list of tasks", obj=included_file._task._ds)
is_handler = isinstance(task, Handler)
is_handler = isinstance(included_file._task, Handler)
block_list = load_list_of_blocks( block_list = load_list_of_blocks(
data, data,
parent_block=task._block, parent_block=included_file._task._block,
task_include=task, task_include=included_file._task,
role=task._role, role=included_file._task._role,
use_handlers=is_handler, use_handlers=is_handler,
loader=self._loader loader=self._loader
) )
...@@ -317,7 +327,7 @@ class StrategyBase: ...@@ -317,7 +327,7 @@ class StrategyBase:
# set the vars for this task from those specified as params to the include # set the vars for this task from those specified as params to the include
for t in task_list: for t in task_list:
t.vars = include_vars.copy() t.vars = included_file._args.copy()
return task_list return task_list
......
...@@ -139,6 +139,7 @@ class StrategyModule(StrategyBase): ...@@ -139,6 +139,7 @@ class StrategyModule(StrategyBase):
callback_sent = False callback_sent = False
work_to_do = False work_to_do = False
host_results = []
host_tasks = self._get_next_task_lockstep(hosts_left, iterator) host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
for (host, task) in host_tasks: for (host, task) in host_tasks:
if not task: if not task:
...@@ -151,9 +152,14 @@ class StrategyModule(StrategyBase): ...@@ -151,9 +152,14 @@ class StrategyModule(StrategyBase):
# sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
# will only send this task to the first host in the list. # will only send this task to the first host in the list.
action = action_loader.get(task.action, class_only=True) try:
if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False): action = action_loader.get(task.action, class_only=True)
run_once = True if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False):
run_once = True
except KeyError:
# we don't care here, because the action may simply not have a
# corresponding action plugin
pass
debug("getting variables") debug("getting variables")
task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task)
...@@ -178,6 +184,7 @@ class StrategyModule(StrategyBase): ...@@ -178,6 +184,7 @@ class StrategyModule(StrategyBase):
meta_action = task.args.get('_raw_params') meta_action = task.args.get('_raw_params')
if meta_action == 'noop': if meta_action == 'noop':
# FIXME: issue a callback for the noop here? # FIXME: issue a callback for the noop here?
print("%s => NOOP" % host)
continue continue
elif meta_action == 'flush_handlers': elif meta_action == 'flush_handlers':
self.run_handlers(iterator, connection_info) self.run_handlers(iterator, connection_info)
...@@ -191,18 +198,78 @@ class StrategyModule(StrategyBase): ...@@ -191,18 +198,78 @@ class StrategyModule(StrategyBase):
self._blocked_hosts[host.get_name()] = True self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, connection_info) self._queue_task(host, task, task_vars, connection_info)
self._process_pending_results(iterator) results = self._process_pending_results(iterator)
host_results.extend(results)
# if we're bypassing the host loop, break out now # if we're bypassing the host loop, break out now
if run_once: if run_once:
break break
debug("done queuing things up, now waiting for results queue to drain") debug("done queuing things up, now waiting for results queue to drain")
self._wait_on_pending_results(iterator) results = self._wait_on_pending_results(iterator)
host_results.extend(results)
# FIXME: MAKE PENDING RESULTS RETURN RESULTS PROCESSED AND USE THEM
# TO TAKE ACTION, ie. FOR INCLUDE STATEMENTS TO PRESERVE THE class IncludedFile:
# LOCK STEP OPERATION def __init__(self, filename, args, task):
self._filename = filename
self._args = args
self._task = task
self._hosts = []
def add_host(self, host):
if host not in self._hosts:
self._hosts.append(host)
def __eq__(self, other):
return other._filename == self._filename and other._args == self._args
def __repr__(self):
return "%s (%s): %s" % (self._filename, self._args, self._hosts)
included_files = []
for res in host_results:
if res._task.action == 'include':
if res._task.loop:
include_results = res._result['results']
else:
include_results = [ res._result ]
for include_result in include_results:
original_task = iterator.get_original_task(res._host, res._task)
if original_task and original_task._role:
include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_file)
else:
include_file = self._loader.path_dwim(res._task.args.get('_raw_params'))
include_variables = include_result.get('include_variables', dict())
inc_file = IncludedFile(include_file, include_variables, original_task)
try:
pos = included_files.index(inc_file)
inc_file = included_files[pos]
except ValueError:
included_files.append(inc_file)
inc_file.add_host(res._host)
if len(included_files) > 0:
noop_task = Task()
noop_task.action = 'meta'
noop_task.args['_raw_params'] = 'noop'
noop_task.set_loader(iterator._play._loader)
all_tasks = dict((host, []) for host in hosts_left)
for included_file in included_files:
# included hosts get the task list while those excluded get an equal-length
# list of noop tasks, to make sure that they continue running in lock-step
new_tasks = self._load_included_file(included_file)
noop_tasks = [noop_task for t in new_tasks]
for host in hosts_left:
if host in included_file._hosts:
all_tasks[host].extend(new_tasks)
else:
all_tasks[host].extend(noop_tasks)
for host in hosts_left:
iterator.add_tasks(host, all_tasks[host])
debug("results queue empty") debug("results queue empty")
except (IOError, EOFError), e: except (IOError, EOFError), e:
......
- debug: msg="this is the common include"
- debug: msg="this is the l1 include"
- include: common_include.yml
- debug: msg="this is the l2 include"
- debug: msg="a second task for l2"
- include: common_include.yml
- debug: msg="this is the l3 include"
- debug: msg="a second task for l3"
- debug: msg="a third task for l3"
- include: common_include.yml
- hosts: localhost - hosts: all
gather_facts: no gather_facts: no
tasks: tasks:
- block: - block:
- include: include.yml
when: 1 == 2
- include: include.yml a=1 - include: include.yml a=1
when: 1 == 1 when: 1 == 1
notify: foo notify: foo
...@@ -12,10 +10,11 @@ ...@@ -12,10 +10,11 @@
- foo - foo
- bar - bar
- bam - bam
- include: "{{inventory_hostname}}_include.yml"
- fail: - fail:
#rescue: rescue:
#- include: include.yml a=rescue - include: include.yml a=rescue
always: always:
- include: include.yml a=always - include: include.yml a=always
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment