Commit 6dda6f12 by Michael DeHaan

Applying callback model to runner, and using that in playbooks, so output can be…

Applying callback model to runner, and using that in playbooks, so output can be more immediate in playbooks.

(Runner still does not use callbacks for default output)
parent 3052d85a
...@@ -29,6 +29,7 @@ import ansible.runner ...@@ -29,6 +29,7 @@ import ansible.runner
import ansible.constants as C import ansible.constants as C
from ansible import utils from ansible import utils
from ansible import errors from ansible import errors
from ansible import callbacks
######################################################## ########################################################
...@@ -38,7 +39,8 @@ class Cli(object): ...@@ -38,7 +39,8 @@ class Cli(object):
# ---------------------------------------------- # ----------------------------------------------
def __init__(self): def __init__(self):
pass self.stats = callbacks.AggregateStats()
self.callbacks = callbacks.DefaultRunnerCallbacks()
# ---------------------------------------------- # ----------------------------------------------
...@@ -98,6 +100,7 @@ class Cli(object): ...@@ -98,6 +100,7 @@ class Cli(object):
forks=options.forks, forks=options.forks,
background=options.seconds, background=options.seconds,
pattern=pattern, pattern=pattern,
callbacks=self.callbacks,
verbose=True, verbose=True,
) )
return (runner, runner.run()) return (runner, runner.run())
...@@ -116,6 +119,7 @@ class Cli(object): ...@@ -116,6 +119,7 @@ class Cli(object):
timeout=old_runner.timeout, timeout=old_runner.timeout,
forks=old_runner.forks, forks=old_runner.forks,
pattern='*', pattern='*',
callbacks=self.callbacks,
verbose=True, verbose=True,
) )
...@@ -178,6 +182,7 @@ class Cli(object): ...@@ -178,6 +182,7 @@ class Cli(object):
utils.write_tree_file(options.tree, hostname, utils.bigjson(utils.contacted_host_result(results, hostname))) utils.write_tree_file(options.tree, hostname, utils.bigjson(utils.contacted_host_result(results, hostname)))
buf += msg buf += msg
# TODO: remove, callbacks now do this
if utils.has_dark_hosts(results): if utils.has_dark_hosts(results):
buf += utils.dark_hosts_msg(results) buf += utils.dark_hosts_msg(results)
......
...@@ -28,14 +28,6 @@ from ansible import errors ...@@ -28,14 +28,6 @@ from ansible import errors
from ansible import utils from ansible import utils
from ansible import callbacks from ansible import callbacks
def summarize(results):
''' print out per host statistics '''
print "PLAY RECAP ******************************\n"
hosts = sorted(results.keys())
for host in hosts:
print "%s : %s" % (host, utils.smjson(results[host]))
def main(args): def main(args):
''' run ansible-playbook operations ''' ''' run ansible-playbook operations '''
...@@ -70,6 +62,11 @@ def main(args): ...@@ -70,6 +62,11 @@ def main(args):
# run all playbooks specified on the command line # run all playbooks specified on the command line
for playbook in args: for playbook in args:
stats = callbacks.AggregateStats()
playbook_cb = callbacks.PlaybookCallbacks()
runner_cb = callbacks.PlaybookRunnerCallbacks(stats)
pb = ansible.playbook.PlayBook( pb = ansible.playbook.PlayBook(
playbook=playbook, playbook=playbook,
host_list=options.inventory, host_list=options.inventory,
...@@ -77,13 +74,24 @@ def main(args): ...@@ -77,13 +74,24 @@ def main(args):
forks=options.forks, forks=options.forks,
verbose=True, verbose=True,
remote_pass=sshpass, remote_pass=sshpass,
callbacks=callbacks.PlaybookCallbacks(), callbacks=playbook_cb,
runner_callbacks=runner_cb,
stats=stats,
timeout=options.timeout, timeout=options.timeout,
override_hosts=override_hosts, override_hosts=override_hosts,
) )
try: try:
results = pb.run() results = pb.run()
summarize(results) hosts = sorted(pb.stats.processed.keys())
print "\n\nPLAY RECAP **********************\n\n"
for h in hosts:
t = pb.stats.summarize(h)
print "%-30s : ok=%4s changed=%4s unreachable=%4s failed=%4s " % (h,
t['ok'], t['changed'], t['unreachable'], t['failures']
)
print "\n"
except errors.AnsibleError, e: except errors.AnsibleError, e:
print >>sys.stderr, "ERROR: %s" % e print >>sys.stderr, "ERROR: %s" % e
return 1 return 1
......
...@@ -23,25 +23,70 @@ import utils ...@@ -23,25 +23,70 @@ import utils
####################################################### #######################################################
class PlaybookCallbacks(object): class AggregateStats(object):
def __init__(self):
self.processed = {}
self.failures = {}
self.ok = {}
self.dark = {}
self.changed = {}
self.skipped = {}
def _increment(self, what, host):
self.processed[host] = 1
prev = (getattr(self, what)).get(host, 0)
getattr(self, what)[host] = prev+1
def compute(self, runner_results, setup=False, poll=False):
for (host, value) in runner_results.get('contacted', {}).iteritems():
if ('failed' in value and bool(value['failed'])) or ('rc' in value and value['rc'] != 0):
self._increment('failures', host)
elif 'skipped' in value and bool(value['skipped']):
self._increment('skipped', host)
elif 'changed' in value and bool(value['changed']):
if not setup:
self._increment('changed', host)
self._increment('ok', host)
else:
if not poll or ('finished' in value and bool(value['finished'])):
self._increment('ok', host)
for (host, value) in runner_results.get('dark', {}).iteritems():
self._increment('dark', host)
def summarize(self, host):
return dict(
ok = self.ok.get(host, 0),
failures = self.failures.get(host, 0),
unreachable = self.dark.get(host,0),
changed = self.changed.get(host, 0),
skipped = self.skipped.get(host, 0)
)
class DefaultRunnerCallbacks(object):
def __init__(self): def __init__(self):
pass pass
def set_playbook(self, playbook): def on_failed(self, host, res):
self.playbook = playbook pass
def on_start(self): def on_ok(self, host, res):
print "\n" pass
def on_task_start(self, name, is_conditional): def on_skipped(self, host):
print utils.task_start_msg(name, is_conditional) pass
def on_setup_primary(self): def on_unreachable(self, host, res):
print "SETUP PHASE ****************************\n" pass
def on_setup_secondary(self): class PlaybookRunnerCallbacks(DefaultRunnerCallbacks):
print "\nVARIABLE IMPORT PHASE ******************\n"
def __init__(self, stats):
self.stats = stats
def on_unreachable(self, host, msg): def on_unreachable(self, host, msg):
print "unreachable: [%s] => %s" % (host, msg) print "unreachable: [%s] => %s" % (host, msg)
...@@ -55,7 +100,9 @@ class PlaybookCallbacks(object): ...@@ -55,7 +100,9 @@ class PlaybookCallbacks(object):
def on_ok(self, host, host_result): def on_ok(self, host, host_result):
invocation = host_result.get('invocation',None) invocation = host_result.get('invocation',None)
if not invocation or invocation.startswith('setup ') or invocation.startswith('async_status '): if invocation.startswith('async_status'):
pass
elif not invocation or invocation.startswith('setup '):
print "ok: [%s]\n" % (host) print "ok: [%s]\n" % (host)
else: else:
print "ok: [%s] => %s\n" % (host, invocation) print "ok: [%s] => %s\n" % (host, invocation)
...@@ -63,6 +110,30 @@ class PlaybookCallbacks(object): ...@@ -63,6 +110,30 @@ class PlaybookCallbacks(object):
def on_skipped(self, host): def on_skipped(self, host):
print "skipping: [%s]\n" % host print "skipping: [%s]\n" % host
class PlaybookCallbacks(object):
def __init__(self):
pass
# TOOD: -- remove this
def set_playbook(self, playbook):
self.playbook = playbook
def on_start(self):
print "\n"
def on_notify(self, host, handler):
pass
def on_task_start(self, name, is_conditional):
print utils.task_start_msg(name, is_conditional)
def on_setup_primary(self):
print "SETUP PHASE ****************************\n"
def on_setup_secondary(self):
print "\nVARIABLE IMPORT PHASE ******************\n"
def on_import_for_host(self, host, imported_file): def on_import_for_host(self, host, imported_file):
print "%s: importing %s" % (host, imported_file) print "%s: importing %s" % (host, imported_file)
...@@ -78,6 +149,3 @@ class PlaybookCallbacks(object): ...@@ -78,6 +149,3 @@ class PlaybookCallbacks(object):
def on_async_poll(self, jid, host, clock, host_result): def on_async_poll(self, jid, host, clock, host_result):
print utils.async_poll_status(jid, host, clock, host_result) print utils.async_poll_status(jid, host, clock, host_result)
def on_dark_host(self, host, msg):
print "exception: [%s] => %s" % (host, msg)
...@@ -32,6 +32,7 @@ import ansible.constants as C ...@@ -32,6 +32,7 @@ import ansible.constants as C
import ansible.connection import ansible.connection
from ansible import utils from ansible import utils
from ansible import errors from ansible import errors
from ansible import callbacks as ans_callbacks
################################################ ################################################
...@@ -46,13 +47,9 @@ def _executor_hook(job_queue, result_queue): ...@@ -46,13 +47,9 @@ def _executor_hook(job_queue, result_queue):
result_queue.put(runner._executor(host)) result_queue.put(runner._executor(host))
except Queue.Empty: except Queue.Empty:
pass pass
except errors.AnsibleError, ae: except:
result_queue.put([host, False, str(ae)]) traceback.print_exc()
except Exception:
# probably should include the full trace
result_queue.put([host, False, traceback.format_exc()])
################################################ ################################################
class Runner(object): class Runner(object):
...@@ -64,13 +61,17 @@ class Runner(object): ...@@ -64,13 +61,17 @@ class Runner(object):
forks=C.DEFAULT_FORKS, timeout=C.DEFAULT_TIMEOUT, pattern=C.DEFAULT_PATTERN, forks=C.DEFAULT_FORKS, timeout=C.DEFAULT_TIMEOUT, pattern=C.DEFAULT_PATTERN,
remote_user=C.DEFAULT_REMOTE_USER, remote_pass=C.DEFAULT_REMOTE_PASS, remote_user=C.DEFAULT_REMOTE_USER, remote_pass=C.DEFAULT_REMOTE_PASS,
background=0, basedir=None, setup_cache=None, transport='paramiko', background=0, basedir=None, setup_cache=None, transport='paramiko',
conditional='True', groups={}, verbose=False): conditional='True', groups={}, callbacks=None, verbose=False):
if setup_cache is None: if setup_cache is None:
setup_cache = {} setup_cache = {}
if basedir is None: if basedir is None:
basedir = os.getcwd() basedir = os.getcwd()
if callbacks is None:
callbacks = ans_callbacks.DefaultRunnerCallbacks()
self.callbacks = callbacks
self.generated_jid = str(random.randint(0, 999999999999)) self.generated_jid = str(random.randint(0, 999999999999))
self.connector = ansible.connection.Connection(self, transport) self.connector = ansible.connection.Connection(self, transport)
...@@ -492,6 +493,18 @@ class Runner(object): ...@@ -492,6 +493,18 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _executor(self, host): def _executor(self, host):
try:
return self._executor_internal(host)
except errors.AnsibleError, ae:
msg = str(ae)
self.callbacks.on_unreachable(host, msg)
return [host, False, msg]
except Exception:
msg = traceback.format_exc()
self.callbacks.on_unreachable(host, msg)
return [host, False, msg]
def _executor_internal(self, host):
''' callback executed in parallel for each host. returns (hostname, connected_ok, extra) ''' ''' callback executed in parallel for each host. returns (hostname, connected_ok, extra) '''
ok, conn = self._connect(host) ok, conn = self._connect(host)
...@@ -515,6 +528,18 @@ class Runner(object): ...@@ -515,6 +528,18 @@ class Runner(object):
self._delete_remote_files(conn, tmp) self._delete_remote_files(conn, tmp)
conn.close() conn.close()
(host, connect_ok, data) = result
if not connect_ok:
self.callbacks.on_unreachable(host, data)
else:
if 'failed' in data or 'rc' in data and str(data['rc']) != '0':
self.callbacks.on_failed(host, data)
elif 'skipped' in data:
self.callbacks.on_skipped(host)
else:
self.callbacks.on_ok(host, data)
return result return result
# ***************************************************** # *****************************************************
...@@ -566,10 +591,10 @@ class Runner(object): ...@@ -566,10 +591,10 @@ class Runner(object):
''' handles mulitprocessing when more than 1 fork is required ''' ''' handles mulitprocessing when more than 1 fork is required '''
job_queue = multiprocessing.Manager().Queue() job_queue = multiprocessing.Manager().Queue()
result_queue = multiprocessing.Manager().Queue()
[job_queue.put(i) for i in hosts] [job_queue.put(i) for i in hosts]
result_queue = multiprocessing.Manager().Queue()
workers = [] workers = []
for i in range(self.forks): for i in range(self.forks):
prc = multiprocessing.Process(target=_executor_hook, prc = multiprocessing.Process(target=_executor_hook,
...@@ -597,6 +622,9 @@ class Runner(object): ...@@ -597,6 +622,9 @@ class Runner(object):
results2 = dict(contacted={}, dark={}) results2 = dict(contacted={}, dark={})
if results is None:
return None
for result in results: for result in results:
(host, contacted_ok, result) = result (host, contacted_ok, result) = result
if contacted_ok: if contacted_ok:
...@@ -622,10 +650,11 @@ class Runner(object): ...@@ -622,10 +650,11 @@ class Runner(object):
return dict(contacted={}, dark={}) return dict(contacted={}, dark={})
hosts = [ (self,x) for x in hosts ] hosts = [ (self,x) for x in hosts ]
results = None
if self.forks > 1: if self.forks > 1:
results = self._parallel_exec(hosts) results = self._parallel_exec(hosts)
else: else:
results = [ x._executor(h) for (x,h) in hosts ] results = [ self._executor(h[1]) for h in hosts ]
return self._partition_results(results) return self._partition_results(results)
...@@ -87,7 +87,6 @@ except Exception, e: ...@@ -87,7 +87,6 @@ except Exception, e:
print json.dumps({ print json.dumps({
"results_file" : log_path, "results_file" : log_path,
"ansible_job_id" : jid, "ansible_job_id" : jid,
"traceback" : str(e),
"started" : 1, "started" : 1,
}) })
else: else:
......
...@@ -7,6 +7,7 @@ import unittest ...@@ -7,6 +7,7 @@ import unittest
import getpass import getpass
import ansible.playbook import ansible.playbook
import ansible.utils as utils import ansible.utils as utils
import ansible.callbacks as ans_callbacks
import os import os
import shutil import shutil
import time import time
...@@ -15,63 +16,69 @@ try: ...@@ -15,63 +16,69 @@ try:
except: except:
import simplejson as json import simplejson as json
EVENTS = []
class TestCallbacks(object): class TestCallbacks(object):
# using same callbacks class for both runner and playbook
def __init__(self): def __init__(self):
self.events = [] pass
def set_playbook(self, playbook): def set_playbook(self, playbook):
self.playbook = playbook self.playbook = playbook
def on_start(self): def on_start(self):
self.events.append('start') EVENTS.append('start')
def on_setup_primary(self): def on_setup_primary(self):
self.events.append([ 'primary_setup' ]) EVENTS.append([ 'primary_setup' ])
def on_setup_secondary(self): def on_setup_secondary(self):
self.events.append([ 'secondary_setup' ]) EVENTS.append([ 'secondary_setup' ])
def on_skipped(self, host): def on_skipped(self, host):
self.events.append([ 'skipped', [ host ]]) EVENTS.append([ 'skipped', [ host ]])
def on_import_for_host(self, host, filename): def on_import_for_host(self, host, filename):
self.events.append([ 'import', [ host, filename ]]) EVENTS.append([ 'import', [ host, filename ]])
def on_not_import_for_host(self, host, missing_filename): def on_not_import_for_host(self, host, missing_filename):
pass pass
def on_notify(self, host, handler):
EVENTS.append([ 'notify', [ host, handler ]])
def on_task_start(self, name, is_conditional): def on_task_start(self, name, is_conditional):
self.events.append([ 'task start', [ name, is_conditional ]]) EVENTS.append([ 'task start', [ name, is_conditional ]])
def on_unreachable(self, host, msg): def on_unreachable(self, host, msg):
self.events.append([ 'unreachable', [ host, msg ]]) EVENTS.append([ 'unreachable', [ host, msg ]])
def on_failed(self, host, results): def on_failed(self, host, results):
self.events.append([ 'failed', [ host, results ]]) EVENTS.append([ 'failed', [ host, results ]])
def on_ok(self, host, result): def on_ok(self, host, result):
# delete certain info from host_result to make test comparisons easier # delete certain info from host_result to make test comparisons easier
host_result = result.copy() host_result = result.copy()
for k in [ 'ansible_job_id', 'invocation', 'md5sum', 'delta', 'start', 'end' ]: for k in [ 'ansible_job_id', 'results_file', 'invocation', 'md5sum', 'delta', 'start', 'end' ]:
if k in host_result: if k in host_result:
del host_result[k] del host_result[k]
for k in host_result.keys(): for k in host_result.keys():
if k.startswith('facter_') or k.startswith('ohai_'): if k.startswith('facter_') or k.startswith('ohai_'):
del host_result[k] del host_result[k]
self.events.append([ 'ok', [ host, host_result ]]) EVENTS.append([ 'ok', [ host, host_result ]])
def on_play_start(self, pattern): def on_play_start(self, pattern):
self.events.append([ 'play start', [ pattern ]]) EVENTS.append([ 'play start', [ pattern ]])
def on_async_confused(self, msg): def on_async_confused(self, msg):
self.events.append([ 'async confused', [ msg ]]) EVENTS.append([ 'async confused', [ msg ]])
def on_async_poll(self, jid, host, clock, host_result): def on_async_poll(self, jid, host, clock, host_result):
self.events.append([ 'async poll', [ host ]]) EVENTS.append([ 'async poll', [ host ]])
def on_dark_host(self, host, msg): def on_unreachable(self, host, msg):
self.events.append([ 'failed/dark', [ host, msg ]]) EVENTS.append([ 'failed/dark', [ host, msg ]])
def on_setup_primary(self): def on_setup_primary(self):
pass pass
...@@ -125,12 +132,14 @@ class TestRunner(unittest.TestCase): ...@@ -125,12 +132,14 @@ class TestRunner(unittest.TestCase):
remote_user = self.user, remote_user = self.user,
remote_pass = None, remote_pass = None,
verbose = False, verbose = False,
callbacks = self.test_callbacks stats = ans_callbacks.AggregateStats(),
callbacks = self.test_callbacks,
runner_callbacks = self.test_callbacks
) )
results = self.playbook.run() results = self.playbook.run()
return dict( return dict(
results = results, results = results,
events = self.test_callbacks.events, events = EVENTS
) )
def test_one(self): def test_one(self):
......
...@@ -152,6 +152,13 @@ ...@@ -152,6 +152,13 @@
] ]
], ],
[ [
"notify",
[
"127.0.0.1",
"on change 1"
]
],
[
"task start", "task start",
[ [
"test template", "test template",
...@@ -173,6 +180,20 @@ ...@@ -173,6 +180,20 @@
] ]
], ],
[ [
"notify",
[
"127.0.0.1",
"on change 1"
]
],
[
"notify",
[
"127.0.0.1",
"on change 2"
]
],
[
"task start", "task start",
[ [
"async poll test", "async poll test",
...@@ -180,9 +201,21 @@ ...@@ -180,9 +201,21 @@
] ]
], ],
[ [
"async poll", "ok",
[ [
"127.0.0.1" "127.0.0.1",
{
"started": 1
}
]
],
[
"ok",
[
"127.0.0.1",
{
"started": 1
}
] ]
], ],
[ [
...@@ -192,6 +225,15 @@ ...@@ -192,6 +225,15 @@
] ]
], ],
[ [
"ok",
[
"127.0.0.1",
{
"started": 1
}
]
],
[
"async poll", "async poll",
[ [
"127.0.0.1" "127.0.0.1"
...@@ -235,6 +277,18 @@ ...@@ -235,6 +277,18 @@
[ [
"127.0.0.1", "127.0.0.1",
{ {
"cmd": "echo this should fire once ",
"rc": 0,
"stderr": "",
"stdout": "this should fire once"
}
]
],
[
"ok",
[
"127.0.0.1",
{
"cmd": "echo this should fire once ", "cmd": "echo this should fire once ",
"rc": 0, "rc": 0,
"stderr": "", "stderr": "",
...@@ -265,10 +319,10 @@ ...@@ -265,10 +319,10 @@
"results": { "results": {
"127.0.0.1": { "127.0.0.1": {
"changed": 2, "changed": 2,
"dark": 0, "failures": 0,
"failed": 0, "ok": 12,
"resources": 11, "skipped": 1,
"skipped": 1 "unreachable": 0
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment