Commit 696b67f9 by Michael DeHaan

Fix async to use the new argfiles method (wrapping brain around rock, really…

Fix async to use the new argfiles method (wrapping brain around rock, really must write module development guide)
parent aeea4667
...@@ -40,6 +40,9 @@ import StringIO ...@@ -40,6 +40,9 @@ import StringIO
# FIXME: stop importing *, use as utils/errors # FIXME: stop importing *, use as utils/errors
from ansible.utils import * from ansible.utils import *
from ansible.errors import * from ansible.errors import *
# should be True except in debug
CLEANUP_FILES = True
################################################ ################################################
...@@ -221,7 +224,8 @@ class Runner(object): ...@@ -221,7 +224,8 @@ class Runner(object):
for filename in files: for filename in files:
if not filename.startswith('/tmp/'): if not filename.startswith('/tmp/'):
raise Exception("not going to happen") raise Exception("not going to happen")
self._exec_command(conn, "rm -rf %s" % filename) if CLEANUP_FILES:
self._exec_command(conn, "rm -rf %s" % filename)
# ***************************************************** # *****************************************************
...@@ -255,13 +259,15 @@ class Runner(object): ...@@ -255,13 +259,15 @@ class Runner(object):
args_fo.flush() args_fo.flush()
args_fo.close() args_fo.close()
args_remote = os.path.join(tmp, 'arguments') args_remote = os.path.join(tmp, 'arguments')
self._transfer_file(conn, args_file, 'arguments') self._transfer_file(conn, args_file, args_remote)
os.unlink(args_file) if CLEANUP_FILES:
os.unlink(args_file)
return args_remote return args_remote
# ***************************************************** # *****************************************************
def _execute_module(self, conn, tmp, remote_module_path, module_args): def _execute_module(self, conn, tmp, remote_module_path, module_args,
async_jid=None, async_module=None, async_limit=None):
''' '''
runs a module that has already been transferred, but first runs a module that has already been transferred, but first
modifies the command using setup_cache variables (see playbook) modifies the command using setup_cache variables (see playbook)
...@@ -286,7 +292,11 @@ class Runner(object): ...@@ -286,7 +292,11 @@ class Runner(object):
args = template.render(inject_vars) args = template.render(inject_vars)
argsfile = self._transfer_argsfile(conn, tmp, args) argsfile = self._transfer_argsfile(conn, tmp, args)
cmd = "%s %s" % (remote_module_path, 'arguments') if async_jid is None:
cmd = "%s %s" % (remote_module_path, argsfile)
else:
args = [str(x) for x in [remote_module_path, async_jid, async_limit, async_module, argsfile]]
cmd = " ".join(args)
result = self._exec_command(conn, cmd) result = self._exec_command(conn, cmd)
return result return result
...@@ -324,10 +334,11 @@ class Runner(object): ...@@ -324,10 +334,11 @@ class Runner(object):
async = self._transfer_module(conn, tmp, 'async_wrapper') async = self._transfer_module(conn, tmp, 'async_wrapper')
module = self._transfer_module(conn, tmp, self.module_name) module = self._transfer_module(conn, tmp, self.module_name)
new_args = [] result = self._execute_module(conn, tmp, async, self.module_args,
new_args = [ self.generated_jid, self.background, module ] async_module=module,
new_args.extend(self.module_args) async_jid=self.generated_jid,
result = self._execute_module(conn, tmp, async, new_args) async_limit=self.background
)
return self._return_from_module(conn, host, result) return self._return_from_module(conn, host, result)
# ***************************************************** # *****************************************************
...@@ -454,8 +465,7 @@ class Runner(object): ...@@ -454,8 +465,7 @@ class Runner(object):
msg = '%s: %s' % (self.module_name, cmd) msg = '%s: %s' % (self.module_name, cmd)
self.remote_log(conn, msg) self.remote_log(conn, msg)
stdin, stdout, stderr = conn.exec_command(cmd) stdin, stdout, stderr = conn.exec_command(cmd)
results = "\n".join(stdout.readlines()) return "\n".join(stdout.readlines())
return results
# ***************************************************** # *****************************************************
......
...@@ -30,12 +30,12 @@ import datetime ...@@ -30,12 +30,12 @@ import datetime
import traceback import traceback
# =========================================== # ===========================================
# convert arguments of form a=b c=d
# to a dictionary
# FIXME: make more idiomatic
args = " ".join(sys.argv[1:]) # FIXME: better error handling
items = shlex.split(args)
argsfile = sys.argv[1]
items = shlex.split(file(argsfile).read())
params = {} params = {}
for x in items: for x in items:
(k, v) = x.split("=") (k, v) = x.split("=")
......
...@@ -66,16 +66,15 @@ def daemonize_self(): ...@@ -66,16 +66,15 @@ def daemonize_self():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print json.dumps({ print json.dumps({
"failed" : True, "failed" : True,
"msg" : "usage: async_wrapper <jid> <module_script> <time_limit> <args>. Humans, do not call directly!" "msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile>. Humans, do not call directly!"
}) })
sys.exit(1) sys.exit(1)
jid = sys.argv[1] jid = sys.argv[1]
time_limit = sys.argv[2] time_limit = sys.argv[2]
wrapped_module = sys.argv[3] wrapped_module = sys.argv[3]
args = sys.argv[4:] argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
cmd = "%s %s" % (wrapped_module, " ".join(args))
# setup logging directory # setup logging directory
logdir = os.path.expanduser("~/.ansible_async") logdir = os.path.expanduser("~/.ansible_async")
...@@ -92,20 +91,20 @@ if not os.path.exists(logdir): ...@@ -92,20 +91,20 @@ if not os.path.exists(logdir):
def _run_command(wrapped_cmd, jid, log_path): def _run_command(wrapped_cmd, jid, log_path):
print "RUNNING: %s" % wrapped_cmd
logfile = open(log_path, "w") logfile = open(log_path, "w")
logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid })) logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid }))
logfile.close() logfile.close()
logfile = open(log_path, "w") logfile = open(log_path, "w")
result = {} result = {}
outdata = ''
try: try:
cmd = shlex.split(wrapped_cmd) cmd = shlex.split(wrapped_cmd)
script = subprocess.Popen(cmd, shell=False, script = subprocess.Popen(cmd, shell=False,
stdin=None, stdout=logfile, stderr=logfile) stdin=None, stdout=logfile, stderr=logfile)
script.communicate() script.communicate()
#result = json.loads(out) outdata = file(log_path).read()
result = json.loads(file(log_path).read()) result = json.loads(outdata)
except (OSError, IOError), e: except (OSError, IOError), e:
result = { result = {
...@@ -119,6 +118,7 @@ def _run_command(wrapped_cmd, jid, log_path): ...@@ -119,6 +118,7 @@ def _run_command(wrapped_cmd, jid, log_path):
result = { result = {
"failed" : 1, "failed" : 1,
"cmd" : wrapped_cmd, "cmd" : wrapped_cmd,
"data" : outdata, # temporary debug only
"msg" : traceback.format_exc() "msg" : traceback.format_exc()
} }
result['ansible_job_id'] = jid result['ansible_job_id'] = jid
......
...@@ -30,32 +30,10 @@ import traceback ...@@ -30,32 +30,10 @@ import traceback
import shlex import shlex
import os import os
if len(sys.argv) == 1:
print json.dumps({
"failed" : True,
"msg" : "the command module requires arguments (-a)"
})
sys.exit(1)
argfile = sys.argv[1] argfile = sys.argv[1]
if not os.path.exists(argfile):
print json.dumps({
"failed" : True,
"msg" : "Argument file not found"
})
sys.exit(1)
args = open(argfile, 'r').read() args = open(argfile, 'r').read()
args = shlex.split(args) args = shlex.split(args)
if not len(args):
print json.dumps({
"failed" : True,
"msg" : "the command module requires arguments (-a)"
})
sys.exit(1)
startd = datetime.datetime.now() startd = datetime.datetime.now()
try: try:
......
...@@ -159,14 +159,18 @@ class TestRunner(unittest.TestCase): ...@@ -159,14 +159,18 @@ class TestRunner(unittest.TestCase):
def test_async(self): def test_async(self):
# test async launch and job status # test async launch and job status
# of any particular module # of any particular module
print "firing command..."
result = self._run('command', [ "/bin/sleep", "3" ], background=20) result = self._run('command', [ "/bin/sleep", "3" ], background=20)
print "back..."
assert 'ansible_job_id' in result assert 'ansible_job_id' in result
assert 'started' in result assert 'started' in result
jid = result['ansible_job_id'] jid = result['ansible_job_id']
# no real chance of this op taking a while, but whatever # no real chance of this op taking a while, but whatever
time.sleep(5) time.sleep(5)
# CLI will abstract this, but this is how it works internally # CLI will abstract this, but this is how it works internally
print "checking status..."
result = self._run('async_status', [ "jid=%s" % jid ]) result = self._run('async_status', [ "jid=%s" % jid ])
print "back..."
# TODO: would be nice to have tests for supervisory process # TODO: would be nice to have tests for supervisory process
# killing job after X seconds # killing job after X seconds
assert 'finished' in result assert 'finished' in result
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment