Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
ansible
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
OpenEdx
ansible
Commits
624a8dd1
Commit
624a8dd1
authored
Feb 01, 2015
by
Toshio Kuratomi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Replace large if-elif-else blocks with a dict-dispatcher
parent
616fda57
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
308 additions
and
222 deletions
+308
-222
lib/ansible/runner/filter_plugins/ipaddr.py
+308
-222
No files found.
lib/ansible/runner/filter_plugins/ipaddr.py
View file @
624a8dd1
...
...
@@ -28,16 +28,287 @@ else:
mac_linux
.
word_fmt
=
'
%.2
x'
# ---- IP address and network query helpers ----
def
_empty_ipaddr_query
(
v
,
vtype
):
# We don't have any query to process, so just check what type the user
# expects, and return the IP address in a correct format
if
v
:
if
vtype
==
'address'
:
return
str
(
v
.
ip
)
elif
vtype
==
'network'
:
return
str
(
v
)
def
_6to4_query
(
v
,
vtype
,
value
):
if
v
.
version
==
4
:
if
v
.
size
==
1
:
ipconv
=
str
(
v
.
ip
)
elif
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
ipconv
=
str
(
v
.
ip
)
else
:
ipconv
=
False
if
ipaddr
(
ipconv
,
'public'
):
numbers
=
list
(
map
(
int
,
ipconv
.
split
(
'.'
)))
try
:
return
'2002:{:02x}{:02x}:{:02x}{:02x}::1/48'
.
format
(
*
numbers
)
except
:
return
False
elif
v
.
version
==
6
:
if
vtype
==
'address'
:
if
ipaddr
(
str
(
v
),
'2002::/16'
):
return
value
elif
vtype
==
'network'
:
if
v
.
ip
!=
v
.
network
:
if
ipaddr
(
str
(
v
.
ip
),
'2002::/16'
):
return
value
else
:
return
False
def
_ip_query
(
v
):
if
v
.
size
==
1
:
return
str
(
v
.
ip
)
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
def
_gateway_query
(
v
):
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
+
'/'
+
str
(
v
.
prefixlen
)
def
_bool_ipaddr_query
(
v
):
if
v
:
return
True
def
_broadcast_query
(
v
):
if
v
.
size
>
1
:
return
str
(
v
.
broadcast
)
def
_cidr_query
(
v
):
return
str
(
v
)
def
_cidr_lookup_query
(
v
,
iplist
,
value
):
try
:
if
v
in
iplist
:
return
value
except
:
return
False
def
_host_query
(
v
):
if
v
.
size
==
1
:
return
str
(
v
)
elif
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
+
'/'
+
str
(
v
.
prefixlen
)
def
_hostmask_query
(
v
):
return
str
(
v
.
hostmask
)
def
_int_query
(
v
,
vtype
):
if
vtype
==
'address'
:
return
int
(
v
.
ip
)
elif
vtype
==
'network'
:
return
str
(
int
(
v
.
ip
))
+
'/'
+
str
(
int
(
v
.
prefixlen
))
def
_ipv4_query
(
v
,
value
):
if
v
.
version
==
6
:
try
:
return
str
(
v
.
ipv4
())
except
:
return
False
else
:
return
value
def
_ipv6_query
(
v
,
value
):
if
v
.
version
==
4
:
return
str
(
v
.
ipv6
())
else
:
return
value
def
_link_local_query
(
v
,
value
):
v_ip
=
netaddr
.
IPAddress
(
str
(
v
.
ip
))
if
v
.
version
==
4
:
if
ipaddr
(
str
(
v_ip
),
'169.254.0.0/24'
):
return
value
elif
v
.
version
==
6
:
if
ipaddr
(
str
(
v_ip
),
'fe80::/10'
):
return
value
def
_loopback_query
(
v
,
value
):
v_ip
=
netaddr
.
IPAddress
(
str
(
v
.
ip
))
if
v_ip
.
is_loopback
():
return
value
def
_multicast_query
(
v
,
value
):
if
v
.
is_multicast
():
return
value
def
_net_query
(
v
):
if
v
.
size
>
1
:
if
v
.
ip
==
v
.
network
:
return
str
(
v
.
network
)
+
'/'
+
str
(
v
.
prefixlen
)
def
_netmask_query
(
v
):
if
v
.
size
>
1
:
return
str
(
v
.
netmask
)
def
_network_query
(
v
):
if
v
.
size
>
1
:
return
str
(
v
.
network
)
def
_prefix_query
(
v
):
return
int
(
v
.
prefixlen
)
def
_private_query
(
v
,
value
):
if
v
.
is_private
():
return
value
def
_public_query
(
v
,
value
):
v_ip
=
netaddr
.
IPAddress
(
str
(
v
.
ip
))
if
v_ip
.
is_unicast
()
and
not
v_ip
.
is_private
()
and
\
not
v_ip
.
is_loopback
()
and
not
v_ip
.
is_netmask
()
and
\
not
v_ip
.
is_hostmask
():
return
value
def
_revdns_query
(
v
):
v_ip
=
netaddr
.
IPAddress
(
str
(
v
.
ip
))
return
v_ip
.
reverse_dns
def
_size_query
(
v
):
return
v
.
size
def
_subnet_query
(
v
):
return
str
(
v
.
cidr
)
def
_type_query
(
v
):
if
v
.
size
==
1
:
return
'address'
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
'address'
else
:
return
'network'
def
_unicast_query
(
v
,
value
):
if
v
.
is_unicast
():
return
value
def
_version_query
(
v
):
return
v
.
version
def
_wrap_query
(
v
,
vtype
,
value
):
if
v
.
version
==
6
:
if
vtype
==
'address'
:
return
'['
+
str
(
v
.
ip
)
+
']'
elif
vtype
==
'network'
:
return
'['
+
str
(
v
.
ip
)
+
']/'
+
str
(
v
.
prefixlen
)
else
:
return
value
# ---- HWaddr query helpers ----
def
_bare_query
(
v
):
v
.
dialect
=
netaddr
.
mac_bare
return
str
(
v
)
def
_bool_hwaddr_query
(
v
):
if
v
:
return
True
def
_cisco_query
(
v
):
v
.
dialect
=
netaddr
.
mac_cisco
return
str
(
v
)
def
_empty_hwaddr_query
(
v
,
value
):
if
v
:
return
value
def
_linux_query
(
v
):
v
.
dialect
=
mac_linux
return
str
(
v
)
def
_postgresql_query
(
v
):
v
.
dialect
=
netaddr
.
mac_pgsql
return
str
(
v
)
def
_unix_query
(
v
):
v
.
dialect
=
netaddr
.
mac_unix
return
str
(
v
)
def
_win_query
(
v
):
v
.
dialect
=
netaddr
.
mac_eui48
return
str
(
v
)
# ---- IP address and network filters ----
def
ipaddr
(
value
,
query
=
''
,
version
=
False
,
alias
=
'ipaddr'
):
''' Check if string is an IP address or network and filter it '''
query_types
=
[
'type'
,
'bool'
,
'int'
,
'version'
,
'size'
,
'address'
,
'ip'
,
'host'
,
\
'network'
,
'subnet'
,
'prefix'
,
'broadcast'
,
'netmask'
,
'hostmask'
,
\
'unicast'
,
'multicast'
,
'private'
,
'public'
,
'loopback'
,
'lo'
,
\
'revdns'
,
'wrap'
,
'ipv6'
,
'v6'
,
'ipv4'
,
'v4'
,
'cidr'
,
'net'
,
\
'hostnet'
,
'router'
,
'gateway'
,
'gw'
,
'host/prefix'
,
'address/prefix'
]
query_func_extra_args
=
{
''
:
(
'vtype'
,),
'6to4'
:
(
'vtype'
,
'value'
),
'cidr_lookup'
:
(
'iplist'
,
'value'
),
'int'
:
(
'vtype'
,),
'ipv4'
:
(
'value'
,),
'ipv6'
:
(
'value'
,),
'link-local'
:
(
'value'
,),
'loopback'
:
(
'value'
,),
'lo'
:
(
'value'
,),
'multicast'
:
(
'value'
,),
'private'
:
(
'value'
,),
'public'
:
(
'value'
,),
'unicast'
:
(
'value'
,),
'wrap'
:
(
'vtype'
,
'value'
),
}
query_func_map
=
{
''
:
_empty_ipaddr_query
,
'6to4'
:
_6to4_query
,
'address'
:
_ip_query
,
'address/prefix'
:
_gateway_query
,
'bool'
:
_bool_ipaddr_query
,
'broadcast'
:
_broadcast_query
,
'cidr'
:
_cidr_query
,
'cidr_lookup'
:
_cidr_lookup_query
,
'gateway'
:
_gateway_query
,
'gw'
:
_gateway_query
,
'host'
:
_host_query
,
'host/prefix'
:
_gateway_query
,
'hostmask'
:
_hostmask_query
,
'hostnet'
:
_gateway_query
,
'int'
:
_int_query
,
'ip'
:
_ip_query
,
'ipv4'
:
_ipv4_query
,
'ipv6'
:
_ipv6_query
,
'link-local'
:
_link_local_query
,
'lo'
:
_loopback_query
,
'loopback'
:
_loopback_query
,
'multicast'
:
_multicast_query
,
'net'
:
_net_query
,
'netmask'
:
_netmask_query
,
'network'
:
_network_query
,
'prefix'
:
_prefix_query
,
'private'
:
_private_query
,
'public'
:
_public_query
,
'revdns'
:
_revdns_query
,
'router'
:
_gateway_query
,
'size'
:
_size_query
,
'subnet'
:
_subnet_query
,
'type'
:
_type_query
,
'unicast'
:
_unicast_query
,
'v4'
:
_ipv4_query
,
'v6'
:
_ipv6_query
,
'version'
:
_version_query
,
'wrap'
:
_wrap_query
,
}
vtype
=
None
if
not
value
:
return
False
...
...
@@ -136,13 +407,12 @@ def ipaddr(value, query = '', version = False, alias = 'ipaddr'):
value
=
str
(
v
)
vtype
=
'network'
v_ip
=
netaddr
.
IPAddress
(
str
(
v
.
ip
))
# We have a query string but it's not in the known query types. Check if
# that string is a valid subnet, if so, we can check later if given IP
# address/network is inside that specific subnet
try
:
if
query
and
query
not
in
query_types
and
ipaddr
(
query
,
'network'
):
### ?? 6to4 and link-local were True here before. Should they still?
if
query
and
(
query
not
in
query_func_map
or
query
==
'cidr_lookup'
)
and
ipaddr
(
query
,
'network'
):
iplist
=
netaddr
.
IPSet
([
netaddr
.
IPNetwork
(
query
)])
query
=
'cidr_lookup'
except
:
...
...
@@ -154,185 +424,12 @@ def ipaddr(value, query = '', version = False, alias = 'ipaddr'):
if
version
and
v
.
version
!=
version
:
return
False
# We don't have any query to process, so just check what type the user
# expects, and return the IP address in a correct format
if
not
query
:
if
v
:
if
vtype
==
'address'
:
return
str
(
v
.
ip
)
elif
vtype
==
'network'
:
return
str
(
v
)
elif
query
==
'type'
:
if
v
.
size
==
1
:
return
'address'
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
'address'
else
:
return
'network'
elif
query
==
'bool'
:
if
v
:
return
True
elif
query
==
'int'
:
if
vtype
==
'address'
:
return
int
(
v
.
ip
)
elif
vtype
==
'network'
:
return
str
(
int
(
v
.
ip
))
+
'/'
+
str
(
int
(
v
.
prefixlen
))
elif
query
==
'version'
:
return
v
.
version
elif
query
==
'size'
:
return
v
.
size
elif
query
in
[
'address'
,
'ip'
]:
if
v
.
size
==
1
:
return
str
(
v
.
ip
)
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
elif
query
==
'host'
:
if
v
.
size
==
1
:
return
str
(
v
)
elif
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
+
'/'
+
str
(
v
.
prefixlen
)
elif
query
==
'net'
:
if
v
.
size
>
1
:
if
v
.
ip
==
v
.
network
:
return
str
(
v
.
network
)
+
'/'
+
str
(
v
.
prefixlen
)
elif
query
in
[
'hostnet'
,
'router'
,
'gateway'
,
'gw'
,
'host/prefix'
,
'address/prefix'
]:
if
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
return
str
(
v
.
ip
)
+
'/'
+
str
(
v
.
prefixlen
)
elif
query
==
'network'
:
if
v
.
size
>
1
:
return
str
(
v
.
network
)
elif
query
==
'subnet'
:
return
str
(
v
.
cidr
)
elif
query
==
'cidr'
:
return
str
(
v
)
elif
query
==
'prefix'
:
return
int
(
v
.
prefixlen
)
elif
query
==
'broadcast'
:
if
v
.
size
>
1
:
return
str
(
v
.
broadcast
)
elif
query
==
'netmask'
:
if
v
.
size
>
1
:
return
str
(
v
.
netmask
)
elif
query
==
'hostmask'
:
return
str
(
v
.
hostmask
)
elif
query
==
'unicast'
:
if
v
.
is_unicast
():
return
value
elif
query
==
'multicast'
:
if
v
.
is_multicast
():
return
value
elif
query
==
'link-local'
:
if
v
.
version
==
4
:
if
ipaddr
(
str
(
v_ip
),
'169.254.0.0/24'
):
return
value
elif
v
.
version
==
6
:
if
ipaddr
(
str
(
v_ip
),
'fe80::/10'
):
return
value
elif
query
==
'private'
:
if
v
.
is_private
():
return
value
elif
query
==
'public'
:
if
v_ip
.
is_unicast
()
and
not
v_ip
.
is_private
()
and
\
not
v_ip
.
is_loopback
()
and
not
v_ip
.
is_netmask
()
and
\
not
v_ip
.
is_hostmask
():
return
value
elif
query
in
[
'loopback'
,
'lo'
]:
if
v_ip
.
is_loopback
():
return
value
elif
query
==
'revdns'
:
return
v_ip
.
reverse_dns
elif
query
==
'wrap'
:
if
v
.
version
==
6
:
if
vtype
==
'address'
:
return
'['
+
str
(
v
.
ip
)
+
']'
elif
vtype
==
'network'
:
return
'['
+
str
(
v
.
ip
)
+
']/'
+
str
(
v
.
prefixlen
)
else
:
return
value
elif
query
in
[
'ipv6'
,
'v6'
]:
if
v
.
version
==
4
:
return
str
(
v
.
ipv6
())
else
:
return
value
elif
query
in
[
'ipv4'
,
'v4'
]:
if
v
.
version
==
6
:
try
:
return
str
(
v
.
ipv4
())
except
:
return
False
else
:
return
value
elif
query
==
'6to4'
:
if
v
.
version
==
4
:
if
v
.
size
==
1
:
ipconv
=
str
(
v
.
ip
)
elif
v
.
size
>
1
:
if
v
.
ip
!=
v
.
network
:
ipconv
=
str
(
v
.
ip
)
else
:
ipconv
=
False
if
ipaddr
(
ipconv
,
'public'
):
numbers
=
list
(
map
(
int
,
ipconv
.
split
(
'.'
)))
try
:
return
'2002:{:02x}{:02x}:{:02x}{:02x}::1/48'
.
format
(
*
numbers
)
except
:
return
False
elif
v
.
version
==
6
:
if
vtype
==
'address'
:
if
ipaddr
(
str
(
v
),
'2002::/16'
):
return
value
elif
vtype
==
'network'
:
if
v
.
ip
!=
v
.
network
:
if
ipaddr
(
str
(
v
.
ip
),
'2002::/16'
):
return
value
else
:
return
False
elif
query
==
'cidr_lookup'
:
try
:
if
v
in
iplist
:
return
value
except
:
return
False
else
:
extras
=
[]
for
arg
in
query_func_extra_args
.
get
(
query
,
tuple
()):
extras
.
append
(
locals
()[
arg
])
try
:
return
query_func_map
[
query
](
v
,
*
extras
)
except
KeyError
:
try
:
float
(
query
)
if
v
.
size
==
1
:
...
...
@@ -462,45 +559,35 @@ def ipsubnet(value, query = '', index = 'x'):
def
hwaddr
(
value
,
query
=
''
,
alias
=
'hwaddr'
):
''' Check if string is a HW/MAC address and filter it '''
query_func_extra_args
=
{
''
:
(
'value'
,),
}
query_func_map
=
{
''
:
_empty_hwaddr_query
,
'bare'
:
_bare_query
,
'bool'
:
_bool_hwaddr_query
,
'cisco'
:
_cisco_query
,
'eui48'
:
_win_query
,
'linux'
:
_linux_query
,
'pgsql'
:
_postgresql_query
,
'postgresql'
:
_postgresql_query
,
'psql'
:
_postgresql_query
,
'unix'
:
_unix_query
,
'win'
:
_win_query
,
}
try
:
v
=
netaddr
.
EUI
(
value
)
except
:
if
query
and
query
not
in
[
'bool'
]
:
if
query
and
query
!=
'bool'
:
raise
errors
.
AnsibleFilterError
(
alias
+
': not a hardware address:
%
s'
%
value
)
if
not
query
:
if
v
:
return
value
elif
query
==
'bool'
:
if
v
:
return
True
elif
query
in
[
'win'
,
'eui48'
]:
v
.
dialect
=
netaddr
.
mac_eui48
return
str
(
v
)
elif
query
==
'unix'
:
v
.
dialect
=
netaddr
.
mac_unix
return
str
(
v
)
elif
query
in
[
'pgsql'
,
'postgresql'
,
'psql'
]:
v
.
dialect
=
netaddr
.
mac_pgsql
return
str
(
v
)
elif
query
==
'cisco'
:
v
.
dialect
=
netaddr
.
mac_cisco
return
str
(
v
)
elif
query
==
'bare'
:
v
.
dialect
=
netaddr
.
mac_bare
return
str
(
v
)
elif
query
==
'linux'
:
v
.
dialect
=
mac_linux
return
str
(
v
)
else
:
extras
=
[]
for
arg
in
query_func_extra_args
.
get
(
query
,
tuple
()):
extras
.
append
(
locals
()[
arg
])
try
:
return
query_func_map
[
query
](
v
,
*
extras
)
except
KeyError
:
raise
errors
.
AnsibleFilterError
(
alias
+
': unknown filter type:
%
s'
%
query
)
return
False
...
...
@@ -508,7 +595,6 @@ def hwaddr(value, query = '', alias = 'hwaddr'):
def
macaddr
(
value
,
query
=
''
):
return
hwaddr
(
value
,
query
,
alias
=
'macaddr'
)
def
_need_netaddr
(
*
args
,
**
kwargs
):
raise
errors
.
AnsibleFilterError
(
'python-netaddr package is not installed'
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment