Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
ansible
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
ansible
Commits
64673cc9
Commit
64673cc9
authored
May 11, 2012
by
John Kleint
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add test for large output; fix indentation.
parent
0956aa96
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
203 additions
and
199 deletions
+203
-199
test/TestRunner.py
+203
-199
No files found.
test/TestRunner.py
View file @
64673cc9
...
...
@@ -10,9 +10,9 @@ import os
import
shutil
import
time
try
:
import
json
import
json
except
:
import
simplejson
as
json
import
simplejson
as
json
from
nose.plugins.skip
import
SkipTest
...
...
@@ -25,202 +25,206 @@ def get_binary(name):
class
TestRunner
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
user
=
getpass
.
getuser
()
self
.
runner
=
ansible
.
runner
.
Runner
(
module_name
=
'ping'
,
module_path
=
'library/'
,
module_args
=
''
,
remote_user
=
self
.
user
,
remote_pass
=
None
,
host_list
=
'test/ansible_hosts'
,
timeout
=
5
,
forks
=
1
,
background
=
0
,
pattern
=
'all'
,
)
self
.
cwd
=
os
.
getcwd
()
self
.
test_dir
=
os
.
path
.
join
(
self
.
cwd
,
'test'
)
self
.
stage_dir
=
self
.
_prepare_stage_dir
()
def
_prepare_stage_dir
(
self
):
stage_path
=
os
.
path
.
join
(
self
.
test_dir
,
'test_data'
)
if
os
.
path
.
exists
(
stage_path
):
shutil
.
rmtree
(
stage_path
,
ignore_errors
=
False
)
assert
not
os
.
path
.
exists
(
stage_path
)
os
.
makedirs
(
stage_path
)
assert
os
.
path
.
exists
(
stage_path
)
return
stage_path
def
_get_test_file
(
self
,
filename
):
# get a file inside the test input directory
filename
=
os
.
path
.
join
(
self
.
test_dir
,
filename
)
assert
os
.
path
.
exists
(
filename
)
return
filename
def
_get_stage_file
(
self
,
filename
):
# get a file inside the test output directory
filename
=
os
.
path
.
join
(
self
.
stage_dir
,
filename
)
return
filename
def
_run
(
self
,
module_name
,
module_args
,
background
=
0
):
''' run a module and get the localhost results '''
self
.
runner
.
module_name
=
module_name
args
=
' '
.
join
(
module_args
)
print
"DEBUG: using args=
%
s"
%
args
self
.
runner
.
module_args
=
args
self
.
runner
.
background
=
background
results
=
self
.
runner
.
run
()
# when using nosetests this will only show up on failure
# which is pretty useful
print
"RESULTS=
%
s"
%
results
assert
"127.0.0.2"
in
results
[
'contacted'
]
return
results
[
'contacted'
][
'127.0.0.2'
]
def
test_ping
(
self
):
result
=
self
.
_run
(
'ping'
,[])
assert
"ping"
in
result
def
test_facter
(
self
):
if
not
get_binary
(
"facter"
):
raise
SkipTest
result
=
self
.
_run
(
'facter'
,[])
assert
"hostname"
in
result
# temporarily disbabled since it occasionally hangs
# ohai's fault, setup module doesn't actually run this
# to get ohai's "facts" anyway
#
#def test_ohai(self):
# if not get_binary("facter"):
# raise SkipTest
# result = self._run('ohai',[])
# assert "hostname" in result
def
test_copy
(
self
):
# test copy module, change trigger, etc
pass
def
test_copy
(
self
):
input
=
self
.
_get_test_file
(
'sample.j2'
)
output
=
self
.
_get_stage_file
(
'sample.out'
)
assert
not
os
.
path
.
exists
(
output
)
result
=
self
.
_run
(
'copy'
,
[
"src=
%
s"
%
input
,
"dest=
%
s"
%
output
,
])
assert
os
.
path
.
exists
(
output
)
data_in
=
file
(
input
)
.
read
()
data_out
=
file
(
output
)
.
read
()
assert
data_in
==
data_out
assert
'failed'
not
in
result
assert
result
[
'changed'
]
==
True
assert
'md5sum'
in
result
result
=
self
.
_run
(
'copy'
,
[
"src=
%
s"
%
input
,
"dest=
%
s"
%
output
,
])
assert
result
[
'changed'
]
==
False
def
test_template
(
self
):
input
=
self
.
_get_test_file
(
'sample.j2'
)
metadata
=
self
.
_get_test_file
(
'metadata.json'
)
output
=
self
.
_get_stage_file
(
'sample.out'
)
result
=
self
.
_run
(
'template'
,
[
"src=
%
s"
%
input
,
"dest=
%
s"
%
output
,
"metadata=
%
s"
%
metadata
])
assert
os
.
path
.
exists
(
output
)
out
=
file
(
output
)
.
read
()
assert
out
.
find
(
"duck"
)
!=
-
1
assert
result
[
'changed'
]
==
True
assert
'md5sum'
in
result
assert
'failed'
not
in
result
result
=
self
.
_run
(
'template'
,
[
"src=
%
s"
%
input
,
"dest=
%
s"
%
output
,
"metadata=
%
s"
%
metadata
])
assert
result
[
'changed'
]
==
False
def
test_command
(
self
):
# test command module, change trigger, etc
result
=
self
.
_run
(
'command'
,
[
"/bin/echo"
,
"hi"
])
assert
"failed"
not
in
result
assert
"msg"
not
in
result
assert
result
[
'rc'
]
==
0
assert
result
[
'stdout'
]
==
'hi'
assert
result
[
'stderr'
]
==
''
result
=
self
.
_run
(
'command'
,
[
"/bin/false"
])
assert
result
[
'rc'
]
==
1
assert
'failed'
not
in
result
result
=
self
.
_run
(
'command'
,
[
"/usr/bin/this_does_not_exist"
,
"splat"
])
assert
'msg'
in
result
assert
'failed'
in
result
assert
'rc'
not
in
result
result
=
self
.
_run
(
'shell'
,
[
"/bin/echo"
,
"$HOME"
])
assert
'failed'
not
in
result
assert
result
[
'rc'
]
==
0
def
test_setup
(
self
):
output
=
self
.
_get_stage_file
(
'output.json'
)
result
=
self
.
_run
(
'setup'
,
[
"metadata=
%
s"
%
output
,
"a=2"
,
"b=3"
,
"c=4"
])
assert
'failed'
not
in
result
assert
'md5sum'
in
result
assert
result
[
'changed'
]
==
True
outds
=
json
.
loads
(
file
(
output
)
.
read
())
assert
outds
[
'c'
]
==
'4'
# not bothering to test change hooks here since ohai/facter results change
# almost every time so changed is always true, this just tests that
# rewriting the file is ok
result
=
self
.
_run
(
'setup'
,
[
"metadata=
%
s"
%
output
,
"a=2"
,
"b=3"
,
"c=4"
])
print
"RAW RESULT=
%
s"
%
result
assert
'md5sum'
in
result
def
test_async
(
self
):
# test async launch and job status
# of any particular module
result
=
self
.
_run
(
'command'
,
[
get_binary
(
"sleep"
),
"3"
],
background
=
20
)
assert
'ansible_job_id'
in
result
assert
'started'
in
result
jid
=
result
[
'ansible_job_id'
]
# no real chance of this op taking a while, but whatever
time
.
sleep
(
5
)
# CLI will abstract this (when polling), but this is how it works internally
result
=
self
.
_run
(
'async_status'
,
[
"jid=
%
s"
%
jid
])
# TODO: would be nice to have tests for supervisory process
# killing job after X seconds
assert
'finished'
in
result
assert
'failed'
not
in
result
assert
'rc'
in
result
assert
'stdout'
in
result
assert
result
[
'ansible_job_id'
]
==
jid
def
test_fetch
(
self
):
input
=
self
.
_get_test_file
(
'sample.j2'
)
output
=
os
.
path
.
join
(
self
.
stage_dir
,
'127.0.0.2'
,
input
)
result
=
self
.
_run
(
'fetch'
,
[
"src=
%
s"
%
input
,
"dest=
%
s"
%
self
.
stage_dir
])
assert
os
.
path
.
exists
(
output
)
assert
open
(
input
)
.
read
()
==
open
(
output
)
.
read
()
def
test_yum
(
self
):
if
not
get_binary
(
"yum"
):
raise
SkipTest
result
=
self
.
_run
(
'yum'
,
[
"list=repos"
])
assert
'failed'
not
in
result
def
test_git
(
self
):
# TODO: tests for the git module
pass
def
test_service
(
self
):
# TODO: tests for the service module
pass
def
setUp
(
self
):
self
.
user
=
getpass
.
getuser
()
self
.
runner
=
ansible
.
runner
.
Runner
(
module_name
=
'ping'
,
module_path
=
'library/'
,
module_args
=
''
,
remote_user
=
self
.
user
,
remote_pass
=
None
,
host_list
=
'test/ansible_hosts'
,
timeout
=
5
,
forks
=
1
,
background
=
0
,
pattern
=
'all'
,
)
self
.
cwd
=
os
.
getcwd
()
self
.
test_dir
=
os
.
path
.
join
(
self
.
cwd
,
'test'
)
self
.
stage_dir
=
self
.
_prepare_stage_dir
()
def
_prepare_stage_dir
(
self
):
stage_path
=
os
.
path
.
join
(
self
.
test_dir
,
'test_data'
)
if
os
.
path
.
exists
(
stage_path
):
shutil
.
rmtree
(
stage_path
,
ignore_errors
=
False
)
assert
not
os
.
path
.
exists
(
stage_path
)
os
.
makedirs
(
stage_path
)
assert
os
.
path
.
exists
(
stage_path
)
return
stage_path
def
_get_test_file
(
self
,
filename
):
# get a file inside the test input directory
filename
=
os
.
path
.
join
(
self
.
test_dir
,
filename
)
assert
os
.
path
.
exists
(
filename
)
return
filename
def
_get_stage_file
(
self
,
filename
):
# get a file inside the test output directory
filename
=
os
.
path
.
join
(
self
.
stage_dir
,
filename
)
return
filename
def
_run
(
self
,
module_name
,
module_args
,
background
=
0
):
''' run a module and get the localhost results '''
self
.
runner
.
module_name
=
module_name
args
=
' '
.
join
(
module_args
)
print
"DEBUG: using args=
%
s"
%
args
self
.
runner
.
module_args
=
args
self
.
runner
.
background
=
background
results
=
self
.
runner
.
run
()
# when using nosetests this will only show up on failure
# which is pretty useful
print
"RESULTS=
%
s"
%
results
assert
"127.0.0.2"
in
results
[
'contacted'
]
return
results
[
'contacted'
][
'127.0.0.2'
]
def
test_ping
(
self
):
result
=
self
.
_run
(
'ping'
,
[])
assert
"ping"
in
result
def
test_facter
(
self
):
if
not
get_binary
(
"facter"
):
raise
SkipTest
result
=
self
.
_run
(
'facter'
,
[])
assert
"hostname"
in
result
# temporarily disbabled since it occasionally hangs
# ohai's fault, setup module doesn't actually run this
# to get ohai's "facts" anyway
#
#def test_ohai(self):
# if not get_binary("facter"):
# raise SkipTest
# result = self._run('ohai',[])
# assert "hostname" in result
def
test_copy
(
self
):
# test copy module, change trigger, etc
input_
=
self
.
_get_test_file
(
'sample.j2'
)
output
=
self
.
_get_stage_file
(
'sample.out'
)
assert
not
os
.
path
.
exists
(
output
)
result
=
self
.
_run
(
'copy'
,
[
"src=
%
s"
%
input_
,
"dest=
%
s"
%
output
,
])
assert
os
.
path
.
exists
(
output
)
data_in
=
file
(
input_
)
.
read
()
data_out
=
file
(
output
)
.
read
()
assert
data_in
==
data_out
assert
'failed'
not
in
result
assert
result
[
'changed'
]
==
True
assert
'md5sum'
in
result
result
=
self
.
_run
(
'copy'
,
[
"src=
%
s"
%
input_
,
"dest=
%
s"
%
output
,
])
assert
result
[
'changed'
]
==
False
def
test_template
(
self
):
input_
=
self
.
_get_test_file
(
'sample.j2'
)
metadata
=
self
.
_get_test_file
(
'metadata.json'
)
output
=
self
.
_get_stage_file
(
'sample.out'
)
result
=
self
.
_run
(
'template'
,
[
"src=
%
s"
%
input_
,
"dest=
%
s"
%
output
,
"metadata=
%
s"
%
metadata
])
assert
os
.
path
.
exists
(
output
)
out
=
file
(
output
)
.
read
()
assert
out
.
find
(
"duck"
)
!=
-
1
assert
result
[
'changed'
]
==
True
assert
'md5sum'
in
result
assert
'failed'
not
in
result
result
=
self
.
_run
(
'template'
,
[
"src=
%
s"
%
input_
,
"dest=
%
s"
%
output
,
"metadata=
%
s"
%
metadata
])
assert
result
[
'changed'
]
==
False
def
test_command
(
self
):
# test command module, change trigger, etc
result
=
self
.
_run
(
'command'
,
[
"/bin/echo"
,
"hi"
])
assert
"failed"
not
in
result
assert
"msg"
not
in
result
assert
result
[
'rc'
]
==
0
assert
result
[
'stdout'
]
==
'hi'
assert
result
[
'stderr'
]
==
''
result
=
self
.
_run
(
'command'
,
[
"/bin/false"
])
assert
result
[
'rc'
]
==
1
assert
'failed'
not
in
result
result
=
self
.
_run
(
'command'
,
[
"/usr/bin/this_does_not_exist"
,
"splat"
])
assert
'msg'
in
result
assert
'failed'
in
result
assert
'rc'
not
in
result
result
=
self
.
_run
(
'shell'
,
[
"/bin/echo"
,
"$HOME"
])
assert
'failed'
not
in
result
assert
result
[
'rc'
]
==
0
def
test_large_output
(
self
):
# Ensure reading a large amount of output from a command doesn't hang.
result
=
self
.
_run
(
'command'
,
[
"/bin/cat"
,
"/usr/share/dict/words"
])
assert
"failed"
not
in
result
assert
"msg"
not
in
result
assert
result
[
'rc'
]
==
0
assert
len
(
result
[
'stdout'
])
>
100000
assert
result
[
'stderr'
]
==
''
def
test_setup
(
self
):
output
=
self
.
_get_stage_file
(
'output.json'
)
result
=
self
.
_run
(
'setup'
,
[
"metadata=
%
s"
%
output
,
"a=2"
,
"b=3"
,
"c=4"
])
assert
'failed'
not
in
result
assert
'md5sum'
in
result
assert
result
[
'changed'
]
==
True
outds
=
json
.
loads
(
file
(
output
)
.
read
())
assert
outds
[
'c'
]
==
'4'
# not bothering to test change hooks here since ohai/facter results change
# almost every time so changed is always true, this just tests that
# rewriting the file is ok
result
=
self
.
_run
(
'setup'
,
[
"metadata=
%
s"
%
output
,
"a=2"
,
"b=3"
,
"c=4"
])
print
"RAW RESULT=
%
s"
%
result
assert
'md5sum'
in
result
def
test_async
(
self
):
# test async launch and job status
# of any particular module
result
=
self
.
_run
(
'command'
,
[
get_binary
(
"sleep"
),
"3"
],
background
=
20
)
assert
'ansible_job_id'
in
result
assert
'started'
in
result
jid
=
result
[
'ansible_job_id'
]
# no real chance of this op taking a while, but whatever
time
.
sleep
(
5
)
# CLI will abstract this (when polling), but this is how it works internally
result
=
self
.
_run
(
'async_status'
,
[
"jid=
%
s"
%
jid
])
# TODO: would be nice to have tests for supervisory process
# killing job after X seconds
assert
'finished'
in
result
assert
'failed'
not
in
result
assert
'rc'
in
result
assert
'stdout'
in
result
assert
result
[
'ansible_job_id'
]
==
jid
def
test_fetch
(
self
):
input_
=
self
.
_get_test_file
(
'sample.j2'
)
output
=
os
.
path
.
join
(
self
.
stage_dir
,
'127.0.0.2'
,
input_
)
result
=
self
.
_run
(
'fetch'
,
[
"src=
%
s"
%
input_
,
"dest=
%
s"
%
self
.
stage_dir
])
assert
os
.
path
.
exists
(
output
)
assert
open
(
input_
)
.
read
()
==
open
(
output
)
.
read
()
def
test_yum
(
self
):
if
not
get_binary
(
"yum"
):
raise
SkipTest
result
=
self
.
_run
(
'yum'
,
[
"list=repos"
])
assert
'failed'
not
in
result
def
test_git
(
self
):
# TODO: tests for the git module
pass
def
test_service
(
self
):
# TODO: tests for the service module
pass
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment