Commit 0d6f6ad2 by James Tanner Committed by James Cammarata

Implement new default cipher class AES256

parent ba0fec4f
......@@ -52,7 +52,7 @@ def build_option_parser(action):
sys.exit()
# options for all actions
#parser.add_option('-c', '--cipher', dest='cipher', default="AES", help="cipher to use")
#parser.add_option('-c', '--cipher', dest='cipher', default="AES256", help="cipher to use")
parser.add_option('--debug', dest='debug', action="store_true", help="debug")
parser.add_option('--vault-password-file', dest='password_file',
help="vault password file")
......@@ -119,7 +119,7 @@ def execute_create(args, options, parser):
else:
password = _read_password(options.password_file)
cipher = 'AES'
cipher = 'AES256'
if hasattr(options, 'cipher'):
cipher = options.cipher
......@@ -133,7 +133,7 @@ def execute_decrypt(args, options, parser):
else:
password = _read_password(options.password_file)
cipher = 'AES'
cipher = 'AES256'
if hasattr(options, 'cipher'):
cipher = options.cipher
......@@ -169,7 +169,7 @@ def execute_encrypt(args, options, parser):
else:
password = _read_password(options.password_file)
cipher = 'AES'
cipher = 'AES256'
if hasattr(options, 'cipher'):
cipher = options.cipher
......
......@@ -12,6 +12,21 @@ from nose.plugins.skip import SkipTest
from ansible import errors
from ansible.utils.vault import VaultLib
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
try:
from Crypto.Util import Counter
HAS_COUNTER = True
except ImportError:
HAS_COUNTER = False
# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
try:
from Crypto.Protocol.KDF import PBKDF2
HAS_PBKDF2 = True
except ImportError:
HAS_PBKDF2 = False
# AES IMPORTS
try:
from Crypto.Cipher import AES as AES
......@@ -26,8 +41,8 @@ class TestVaultLib(TestCase):
slots = ['is_encrypted',
'encrypt',
'decrypt',
'_add_headers_and_hexify_encrypted_data',
'_split_headers_and_get_unhexified_data',]
'_add_header',
'_split_header',]
for slot in slots:
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
......@@ -41,8 +56,7 @@ class TestVaultLib(TestCase):
v = VaultLib('ansible')
v.cipher_name = "TEST"
sensitive_data = "ansible"
sensitive_hex = hexlify(sensitive_data)
data = v._add_headers_and_hexify_encrypted_data(sensitive_data)
data = v._add_header(sensitive_data)
lines = data.split('\n')
assert len(lines) > 1, "failed to properly add header"
header = lines[0]
......@@ -52,19 +66,18 @@ class TestVaultLib(TestCase):
assert header_parts[0] == '$ANSIBLE_VAULT', "header does not start with $ANSIBLE_VAULT"
assert header_parts[1] == v.version, "header version is incorrect"
assert header_parts[2] == 'TEST', "header does end with cipher name"
assert lines[1] == sensitive_hex
def test_remove_header(self):
def test_split_header(self):
v = VaultLib('ansible')
data = "$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify("ansible")
rdata = v._split_headers_and_get_unhexified_data(data)
data = "$ANSIBLE_VAULT;9.9;TEST\nansible"
rdata = v._split_header(data)
lines = rdata.split('\n')
assert lines[0] == "ansible"
assert v.cipher_name == 'TEST', "cipher name was not set"
assert v.version == "9.9"
def test_encyrpt_decrypt(self):
if not HAS_AES:
def test_encrypt_decrypt_aes(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES'
......@@ -73,8 +86,18 @@ class TestVaultLib(TestCase):
assert enc_data != "foobar", "encryption failed"
assert dec_data == "foobar", "decryption failed"
def test_encrypt_decrypt_aes256(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES256'
enc_data = v.encrypt("foobar")
dec_data = v.decrypt(enc_data)
assert enc_data != "foobar", "encryption failed"
assert dec_data == "foobar", "decryption failed"
def test_encrypt_encrypted(self):
if not HAS_AES:
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
v.cipher_name = 'AES'
......@@ -87,7 +110,7 @@ class TestVaultLib(TestCase):
assert error_hit, "No error was thrown when trying to encrypt data with a header"
def test_decrypt_decrypted(self):
if not HAS_AES:
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
data = "ansible"
......@@ -99,7 +122,8 @@ class TestVaultLib(TestCase):
assert error_hit, "No error was thrown when trying to decrypt data without a header"
def test_cipher_not_set(self):
if not HAS_AES:
# not setting the cipher should default to AES256
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
v = VaultLib('ansible')
data = "ansible"
......@@ -108,6 +132,5 @@ class TestVaultLib(TestCase):
enc_data = v.encrypt(data)
except errors.AnsibleError, e:
error_hit = True
assert error_hit, "No error was thrown when trying to encrypt data without the cipher set"
assert not error_hit, "An error was thrown when trying to encrypt data without the cipher set"
assert v.cipher_name == "AES256", "cipher name is not set to AES256: %s" % v.cipher_name
#!/usr/bin/env python
from unittest import TestCase
import getpass
import os
import shutil
import time
import tempfile
from binascii import unhexlify
from binascii import hexlify
from nose.plugins.skip import SkipTest
from ansible import errors
from ansible.utils.vault import VaultLib
from ansible.utils.vault import VaultEditor
# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
try:
from Crypto.Util import Counter
HAS_COUNTER = True
except ImportError:
HAS_COUNTER = False
# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
try:
from Crypto.Protocol.KDF import PBKDF2
HAS_PBKDF2 = True
except ImportError:
HAS_PBKDF2 = False
# AES IMPORTS
try:
from Crypto.Cipher import AES as AES
HAS_AES = True
except ImportError:
HAS_AES = False
class TestVaultEditor(TestCase):
def test_methods_exist(self):
v = VaultEditor(None, None, None)
slots = ['create_file',
'decrypt_file',
'edit_file',
'encrypt_file',
'rekey_file',
'read_data',
'write_data',
'shuffle_files']
for slot in slots:
assert hasattr(v, slot), "VaultLib is missing the %s method" % slot
def test_decrypt_1_0(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
dirpath = tempfile.mkdtemp()
filename = os.path.join(dirpath, "foo-ansible-1.0.yml")
shutil.rmtree(dirpath)
shutil.copytree("vault_test_data", dirpath)
ve = VaultEditor(None, "ansible", filename)
# make sure the password functions for the cipher
error_hit = False
try:
ve.decrypt_file()
except errors.AnsibleError, e:
error_hit = True
# verify decrypted content
f = open(filename, "rb")
fdata = f.read()
f.close()
shutil.rmtree(dirpath)
assert error_hit == False, "error decrypting 1.0 file"
assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
def test_decrypt_1_1(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
dirpath = tempfile.mkdtemp()
filename = os.path.join(dirpath, "foo-ansible-1.1.yml")
shutil.rmtree(dirpath)
shutil.copytree("vault_test_data", dirpath)
ve = VaultEditor(None, "ansible", filename)
# make sure the password functions for the cipher
error_hit = False
try:
ve.decrypt_file()
except errors.AnsibleError, e:
error_hit = True
# verify decrypted content
f = open(filename, "rb")
fdata = f.read()
f.close()
shutil.rmtree(dirpath)
assert error_hit == False, "error decrypting 1.0 file"
assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip()
def test_rekey_migration(self):
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
raise SkipTest
dirpath = tempfile.mkdtemp()
filename = os.path.join(dirpath, "foo-ansible-1.0.yml")
shutil.rmtree(dirpath)
shutil.copytree("vault_test_data", dirpath)
ve = VaultEditor(None, "ansible", filename)
# make sure the password functions for the cipher
error_hit = False
try:
ve.rekey_file('ansible2')
except errors.AnsibleError, e:
error_hit = True
# verify decrypted content
f = open(filename, "rb")
fdata = f.read()
f.close()
shutil.rmtree(dirpath)
assert error_hit == False, "error rekeying 1.0 file to 1.1"
# ensure filedata can be decrypted, is 1.1 and is AES256
vl = VaultLib("ansible2")
dec_data = None
error_hit = False
try:
dec_data = vl.decrypt(fdata)
except errors.AnsibleError, e:
error_hit = True
assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name
assert error_hit == False, "error decrypting migrated 1.0 file"
assert dec_data.strip() == "foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data
$ANSIBLE_VAULT;1.0;AES
53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9
9ad98d59f61d06a4b66718d855f16fb7bdfe54d1ec8aeaa4d06c2dc1fa630ae1846a029877f0eeb1
83c62ffb04c2512995e815de4b4d29ed
$ANSIBLE_VAULT;1.1;AES256
62303130653266653331306264616235333735323636616539316433666463323964623162386137
3961616263373033353631316333623566303532663065310a393036623466376263393961326530
64336561613965383835646464623865663966323464653236343638373165343863623638316664
3631633031323837340a396530313963373030343933616133393566366137363761373930663833
3739
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment