Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
ansible
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
ansible
Commits
f3cfc449
Commit
f3cfc449
authored
Sep 30, 2013
by
James Cammarata
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add more verbose debugging options for accelerate
parent
c1e28b45
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
59 additions
and
19 deletions
+59
-19
lib/ansible/runner/connection_plugins/accelerate.py
+11
-2
library/utilities/accelerate
+48
-17
No files found.
lib/ansible/runner/connection_plugins/accelerate.py
View file @
f3cfc449
...
@@ -21,7 +21,7 @@ import base64
...
@@ -21,7 +21,7 @@ import base64
import
socket
import
socket
import
struct
import
struct
import
time
import
time
from
ansible.callbacks
import
vvv
from
ansible.callbacks
import
vvv
,
vvvv
from
ansible.runner.connection_plugins.ssh
import
Connection
as
SSHConnection
from
ansible.runner.connection_plugins.ssh
import
Connection
as
SSHConnection
from
ansible
import
utils
from
ansible
import
utils
from
ansible
import
errors
from
ansible
import
errors
...
@@ -73,12 +73,13 @@ class Connection(object):
...
@@ -73,12 +73,13 @@ class Connection(object):
utils
.
AES_KEYS
=
self
.
runner
.
aes_keys
utils
.
AES_KEYS
=
self
.
runner
.
aes_keys
def
_execute_accelerate_module
(
self
):
def
_execute_accelerate_module
(
self
):
args
=
"password=
%
s port=
%
s
"
%
(
base64
.
b64encode
(
self
.
key
.
__str__
()),
str
(
self
.
accport
))
args
=
"password=
%
s port=
%
s
debug=
%
d"
%
(
base64
.
b64encode
(
self
.
key
.
__str__
()),
str
(
self
.
accport
),
int
(
utils
.
VERBOSITY
))
inject
=
dict
(
password
=
self
.
key
)
inject
=
dict
(
password
=
self
.
key
)
if
self
.
runner
.
accelerate_inventory_host
:
if
self
.
runner
.
accelerate_inventory_host
:
inject
=
utils
.
combine_vars
(
inject
,
self
.
runner
.
inventory
.
get_variables
(
self
.
runner
.
accelerate_inventory_host
))
inject
=
utils
.
combine_vars
(
inject
,
self
.
runner
.
inventory
.
get_variables
(
self
.
runner
.
accelerate_inventory_host
))
else
:
else
:
inject
=
utils
.
combine_vars
(
inject
,
self
.
runner
.
inventory
.
get_variables
(
self
.
host
))
inject
=
utils
.
combine_vars
(
inject
,
self
.
runner
.
inventory
.
get_variables
(
self
.
host
))
vvvv
(
"attempting to start up the accelerate daemon..."
)
self
.
ssh
.
connect
()
self
.
ssh
.
connect
()
tmp_path
=
self
.
runner
.
_make_tmp_path
(
self
.
ssh
)
tmp_path
=
self
.
runner
.
_make_tmp_path
(
self
.
ssh
)
return
self
.
runner
.
_execute_module
(
self
.
ssh
,
tmp_path
,
'accelerate'
,
args
,
inject
=
inject
)
return
self
.
runner
.
_execute_module
(
self
.
ssh
,
tmp_path
,
'accelerate'
,
args
,
inject
=
inject
)
...
@@ -92,11 +93,13 @@ class Connection(object):
...
@@ -92,11 +93,13 @@ class Connection(object):
tries
=
3
tries
=
3
self
.
conn
=
socket
.
socket
()
self
.
conn
=
socket
.
socket
()
self
.
conn
.
settimeout
(
300.0
)
self
.
conn
.
settimeout
(
300.0
)
vvvv
(
"attempting connection to
%
s via the accelerated port
%
d"
%
(
self
.
host
,
self
.
accport
))
while
tries
>
0
:
while
tries
>
0
:
try
:
try
:
self
.
conn
.
connect
((
self
.
host
,
self
.
accport
))
self
.
conn
.
connect
((
self
.
host
,
self
.
accport
))
break
break
except
:
except
:
vvvv
(
"failed, retrying..."
)
time
.
sleep
(
0.1
)
time
.
sleep
(
0.1
)
tries
-=
1
tries
-=
1
if
tries
==
0
:
if
tries
==
0
:
...
@@ -122,18 +125,24 @@ class Connection(object):
...
@@ -122,18 +125,24 @@ class Connection(object):
header_len
=
8
# size of a packed unsigned long long
header_len
=
8
# size of a packed unsigned long long
data
=
b
""
data
=
b
""
try
:
try
:
vvvv
(
"
%
s: in recv_data(), waiting for the header"
%
self
.
host
)
while
len
(
data
)
<
header_len
:
while
len
(
data
)
<
header_len
:
d
=
self
.
conn
.
recv
(
1024
)
d
=
self
.
conn
.
recv
(
1024
)
if
not
d
:
if
not
d
:
vvvv
(
"
%
s: received nothing, bailing out"
%
self
.
host
)
return
None
return
None
data
+=
d
data
+=
d
vvvv
(
"
%
s: got the header, unpacking"
%
self
.
host
)
data_len
=
struct
.
unpack
(
'Q'
,
data
[:
header_len
])[
0
]
data_len
=
struct
.
unpack
(
'Q'
,
data
[:
header_len
])[
0
]
data
=
data
[
header_len
:]
data
=
data
[
header_len
:]
vvvv
(
"
%
s: data received so far (expecting
%
d):
%
d"
%
(
self
.
host
,
data_len
,
len
(
data
)))
while
len
(
data
)
<
data_len
:
while
len
(
data
)
<
data_len
:
d
=
self
.
conn
.
recv
(
1024
)
d
=
self
.
conn
.
recv
(
1024
)
if
not
d
:
if
not
d
:
vvvv
(
"
%
s: received nothing, bailing out"
%
self
.
host
)
return
None
return
None
data
+=
d
data
+=
d
vvvv
(
"
%
s: received all of the data, returning"
%
self
.
host
)
return
data
return
data
except
socket
.
timeout
:
except
socket
.
timeout
:
raise
errors
.
AnsibleError
(
"timed out while waiting to receive data"
)
raise
errors
.
AnsibleError
(
"timed out while waiting to receive data"
)
...
...
library/utilities/accelerate
View file @
f3cfc449
...
@@ -85,8 +85,22 @@ PIDFILE = os.path.expanduser("~/.accelerate.pid")
...
@@ -85,8 +85,22 @@ PIDFILE = os.path.expanduser("~/.accelerate.pid")
# which leaves room for the TCP/IP header
# which leaves room for the TCP/IP header
CHUNK_SIZE
=
10240
CHUNK_SIZE
=
10240
def
log
(
msg
):
# FIXME: this all should be moved to module_common, as it's
syslog
.
syslog
(
syslog
.
LOG_NOTICE
|
syslog
.
LOG_DAEMON
,
msg
)
# pretty much a copy from the callbacks/util code
DEBUG_LEVEL
=
0
def
log
(
msg
,
cap
=
0
):
global
DEBUG_LEVEL
if
cap
>=
DEBUG_LEVEL
:
syslog
.
syslog
(
syslog
.
LOG_NOTICE
|
syslog
.
LOG_DAEMON
,
msg
)
def
vv
(
msg
):
log
(
msg
,
cap
=
2
)
def
vvv
(
msg
):
log
(
msg
,
cap
=
3
)
def
vvvv
(
msg
):
log
(
msg
,
cap
=
4
)
if
os
.
path
.
exists
(
PIDFILE
):
if
os
.
path
.
exists
(
PIDFILE
):
try
:
try
:
...
@@ -114,7 +128,7 @@ def daemonize_self(module, password, port, minutes):
...
@@ -114,7 +128,7 @@ def daemonize_self(module, password, port, minutes):
try
:
try
:
pid
=
os
.
fork
()
pid
=
os
.
fork
()
if
pid
>
0
:
if
pid
>
0
:
log
(
"exiting pid
%
s"
%
pid
)
vvv
(
"exiting pid
%
s"
%
pid
)
# exit first parent
# exit first parent
module
.
exit_json
(
msg
=
"daemonized accelerate on port
%
s for
%
s minutes"
%
(
port
,
minutes
))
module
.
exit_json
(
msg
=
"daemonized accelerate on port
%
s for
%
s minutes"
%
(
port
,
minutes
))
except
OSError
,
e
:
except
OSError
,
e
:
...
@@ -134,7 +148,7 @@ def daemonize_self(module, password, port, minutes):
...
@@ -134,7 +148,7 @@ def daemonize_self(module, password, port, minutes):
pid_file
=
open
(
PIDFILE
,
"w"
)
pid_file
=
open
(
PIDFILE
,
"w"
)
pid_file
.
write
(
"
%
s"
%
pid
)
pid_file
.
write
(
"
%
s"
%
pid
)
pid_file
.
close
()
pid_file
.
close
()
log
(
"pidfile written"
)
vvv
(
"pidfile written"
)
sys
.
exit
(
0
)
sys
.
exit
(
0
)
except
OSError
,
e
:
except
OSError
,
e
:
log
(
"fork #2 failed:
%
d (
%
s)"
%
(
e
.
errno
,
e
.
strerror
))
log
(
"fork #2 failed:
%
d (
%
s)"
%
(
e
.
errno
,
e
.
strerror
))
...
@@ -162,52 +176,64 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
...
@@ -162,52 +176,64 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
def
recv_data
(
self
):
def
recv_data
(
self
):
header_len
=
8
# size of a packed unsigned long long
header_len
=
8
# size of a packed unsigned long long
data
=
b
""
data
=
b
""
vvvv
(
"in recv_data(), waiting for the header"
)
while
len
(
data
)
<
header_len
:
while
len
(
data
)
<
header_len
:
d
=
self
.
request
.
recv
(
1024
)
d
=
self
.
request
.
recv
(
1024
)
if
not
d
:
if
not
d
:
vvv
(
"received nothing, bailing out"
)
return
None
return
None
data
+=
d
data
+=
d
vvvv
(
"in recv_data(), got the header, unpacking"
)
data_len
=
struct
.
unpack
(
'Q'
,
data
[:
header_len
])[
0
]
data_len
=
struct
.
unpack
(
'Q'
,
data
[:
header_len
])[
0
]
data
=
data
[
header_len
:]
data
=
data
[
header_len
:]
vvvv
(
"data received so far (expecting
%
d):
%
d"
%
(
data_len
,
len
(
data
)))
while
len
(
data
)
<
data_len
:
while
len
(
data
)
<
data_len
:
d
=
self
.
request
.
recv
(
1024
)
d
=
self
.
request
.
recv
(
1024
)
if
not
d
:
if
not
d
:
vvv
(
"received nothing, bailing out"
)
return
None
return
None
data
+=
d
data
+=
d
vvvv
(
"received all of the data, returning"
)
return
data
return
data
def
handle
(
self
):
def
handle
(
self
):
while
True
:
while
True
:
#log
("waiting for data")
vvvv
(
"waiting for data"
)
data
=
self
.
recv_data
()
data
=
self
.
recv_data
()
if
not
data
:
if
not
data
:
vvvv
(
"received nothing back from recv_data(), breaking out"
)
break
break
try
:
try
:
#log
("got data, decrypting")
vvvv
(
"got data, decrypting"
)
data
=
self
.
server
.
key
.
Decrypt
(
data
)
data
=
self
.
server
.
key
.
Decrypt
(
data
)
#log
("decryption done")
vvvv
(
"decryption done"
)
except
:
except
:
log
(
"bad decrypt, skipping..."
)
vv
(
"bad decrypt, skipping..."
)
data2
=
json
.
dumps
(
dict
(
rc
=
1
))
data2
=
json
.
dumps
(
dict
(
rc
=
1
))
data2
=
self
.
server
.
key
.
Encrypt
(
data2
)
data2
=
self
.
server
.
key
.
Encrypt
(
data2
)
send_data
(
client
,
data2
)
send_data
(
client
,
data2
)
return
return
#log
("loading json from the data")
vvvv
(
"loading json from the data"
)
data
=
json
.
loads
(
data
)
data
=
json
.
loads
(
data
)
mode
=
data
[
'mode'
]
mode
=
data
[
'mode'
]
response
=
{}
response
=
{}
if
mode
==
'command'
:
if
mode
==
'command'
:
vvvv
(
"received a command request, running it"
)
response
=
self
.
command
(
data
)
response
=
self
.
command
(
data
)
elif
mode
==
'put'
:
elif
mode
==
'put'
:
vvvv
(
"received a put request, putting it"
)
response
=
self
.
put
(
data
)
response
=
self
.
put
(
data
)
elif
mode
==
'fetch'
:
elif
mode
==
'fetch'
:
vvvv
(
"received a fetch request, getting it"
)
response
=
self
.
fetch
(
data
)
response
=
self
.
fetch
(
data
)
data2
=
json
.
dumps
(
response
)
data2
=
json
.
dumps
(
response
)
data2
=
self
.
server
.
key
.
Encrypt
(
data2
)
data2
=
self
.
server
.
key
.
Encrypt
(
data2
)
vvvv
(
"sending the response back to the controller"
)
self
.
send_data
(
data2
)
self
.
send_data
(
data2
)
vvvv
(
"done sending the response"
)
def
command
(
self
,
data
):
def
command
(
self
,
data
):
if
'cmd'
not
in
data
:
if
'cmd'
not
in
data
:
...
@@ -217,14 +243,14 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
...
@@ -217,14 +243,14 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
if
'executable'
not
in
data
:
if
'executable'
not
in
data
:
return
dict
(
failed
=
True
,
msg
=
'internal error: executable is required'
)
return
dict
(
failed
=
True
,
msg
=
'internal error: executable is required'
)
#log
("executing: %s" % data['cmd'])
vvvv
(
"executing:
%
s"
%
data
[
'cmd'
])
rc
,
stdout
,
stderr
=
self
.
server
.
module
.
run_command
(
data
[
'cmd'
],
executable
=
data
[
'executable'
],
close_fds
=
True
)
rc
,
stdout
,
stderr
=
self
.
server
.
module
.
run_command
(
data
[
'cmd'
],
executable
=
data
[
'executable'
],
close_fds
=
True
)
if
stdout
is
None
:
if
stdout
is
None
:
stdout
=
''
stdout
=
''
if
stderr
is
None
:
if
stderr
is
None
:
stderr
=
''
stderr
=
''
#log
("got stdout: %s" % stdout)
vvvv
(
"got stdout:
%
s"
%
stdout
)
#log
("got stderr: %s" % stderr)
vvvv
(
"got stderr:
%
s"
%
stderr
)
return
dict
(
rc
=
rc
,
stdout
=
stdout
,
stderr
=
stderr
)
return
dict
(
rc
=
rc
,
stdout
=
stdout
,
stderr
=
stderr
)
...
@@ -235,7 +261,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
...
@@ -235,7 +261,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
try
:
try
:
fd
=
file
(
data
[
'in_path'
],
'rb'
)
fd
=
file
(
data
[
'in_path'
],
'rb'
)
fstat
=
os
.
stat
(
data
[
'in_path'
])
fstat
=
os
.
stat
(
data
[
'in_path'
])
log
(
"FETCH file is
%
d bytes"
%
fstat
.
st_size
)
vvv
(
"FETCH file is
%
d bytes"
%
fstat
.
st_size
)
while
fd
.
tell
()
<
fstat
.
st_size
:
while
fd
.
tell
()
<
fstat
.
st_size
:
data
=
fd
.
read
(
CHUNK_SIZE
)
data
=
fd
.
read
(
CHUNK_SIZE
)
last
=
False
last
=
False
...
@@ -276,7 +302,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
...
@@ -276,7 +302,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
final_path
=
None
final_path
=
None
final_user
=
None
final_user
=
None
if
'user'
in
data
and
data
.
get
(
'user'
)
!=
getpass
.
getuser
():
if
'user'
in
data
and
data
.
get
(
'user'
)
!=
getpass
.
getuser
():
log
(
"the target user doesn't match this user, we'll move the file into place via sudo"
)
vv
(
"the target user doesn't match this user, we'll move the file into place via sudo"
)
(
fd
,
out_path
)
=
tempfile
.
mkstemp
(
prefix
=
'ansible.'
,
dir
=
os
.
path
.
expanduser
(
'~/.ansible/tmp/'
))
(
fd
,
out_path
)
=
tempfile
.
mkstemp
(
prefix
=
'ansible.'
,
dir
=
os
.
path
.
expanduser
(
'~/.ansible/tmp/'
))
out_fd
=
os
.
fdopen
(
fd
,
'w'
,
0
)
out_fd
=
os
.
fdopen
(
fd
,
'w'
,
0
)
final_path
=
data
[
'out_path'
]
final_path
=
data
[
'out_path'
]
...
@@ -306,11 +332,11 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
...
@@ -306,11 +332,11 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
log
(
"failed to put the file:
%
s"
%
tb
)
log
(
"failed to put the file:
%
s"
%
tb
)
return
dict
(
failed
=
True
,
stdout
=
"Could not write the file"
)
return
dict
(
failed
=
True
,
stdout
=
"Could not write the file"
)
finally
:
finally
:
#log
("wrote %d bytes" % bytes)
vvvv
(
"wrote
%
d bytes"
%
bytes
)
out_fd
.
close
()
out_fd
.
close
()
if
final_path
:
if
final_path
:
log
(
"moving
%
s to
%
s"
%
(
out_path
,
final_path
))
vvv
(
"moving
%
s to
%
s"
%
(
out_path
,
final_path
))
args
=
[
'sudo'
,
'cp'
,
out_path
,
final_path
]
args
=
[
'sudo'
,
'cp'
,
out_path
,
final_path
]
rc
,
stdout
,
stderr
=
self
.
server
.
module
.
run_command
(
args
,
close_fds
=
True
)
rc
,
stdout
,
stderr
=
self
.
server
.
module
.
run_command
(
args
,
close_fds
=
True
)
if
rc
!=
0
:
if
rc
!=
0
:
...
@@ -334,7 +360,7 @@ def daemonize(module, password, port, minutes):
...
@@ -334,7 +360,7 @@ def daemonize(module, password, port, minutes):
server
=
ThreadedTCPServer
((
"0.0.0.0"
,
port
),
ThreadedTCPRequestHandler
,
module
,
password
)
server
=
ThreadedTCPServer
((
"0.0.0.0"
,
port
),
ThreadedTCPRequestHandler
,
module
,
password
)
server
.
allow_reuse_address
=
True
server
.
allow_reuse_address
=
True
log
(
"serving!"
)
vv
(
"serving!"
)
server
.
serve_forever
(
poll_interval
=
1.0
)
server
.
serve_forever
(
poll_interval
=
1.0
)
except
Exception
,
e
:
except
Exception
,
e
:
tb
=
traceback
.
format_exc
()
tb
=
traceback
.
format_exc
()
...
@@ -342,11 +368,13 @@ def daemonize(module, password, port, minutes):
...
@@ -342,11 +368,13 @@ def daemonize(module, password, port, minutes):
sys
.
exit
(
0
)
sys
.
exit
(
0
)
def
main
():
def
main
():
global
DEBUG_LEVEL
module
=
AnsibleModule
(
module
=
AnsibleModule
(
argument_spec
=
dict
(
argument_spec
=
dict
(
port
=
dict
(
required
=
False
,
default
=
5099
),
port
=
dict
(
required
=
False
,
default
=
5099
),
password
=
dict
(
required
=
True
),
password
=
dict
(
required
=
True
),
minutes
=
dict
(
required
=
False
,
default
=
30
),
minutes
=
dict
(
required
=
False
,
default
=
30
),
debug
=
dict
(
required
=
False
,
default
=
0
,
type
=
'int'
)
),
),
supports_check_mode
=
True
supports_check_mode
=
True
)
)
...
@@ -354,10 +382,13 @@ def main():
...
@@ -354,10 +382,13 @@ def main():
password
=
base64
.
b64decode
(
module
.
params
[
'password'
])
password
=
base64
.
b64decode
(
module
.
params
[
'password'
])
port
=
int
(
module
.
params
[
'port'
])
port
=
int
(
module
.
params
[
'port'
])
minutes
=
int
(
module
.
params
[
'minutes'
])
minutes
=
int
(
module
.
params
[
'minutes'
])
debug
=
int
(
module
.
params
[
'debug'
])
if
not
HAS_KEYCZAR
:
if
not
HAS_KEYCZAR
:
module
.
fail_json
(
msg
=
"keyczar is not installed"
)
module
.
fail_json
(
msg
=
"keyczar is not installed"
)
DEBUG_LEVEL
=
debug
daemonize
(
module
,
password
,
port
,
minutes
)
daemonize
(
module
,
password
,
port
,
minutes
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment