summaryrefslogtreecommitdiff
path: root/test/units/parsing/vault/test_vault.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/units/parsing/vault/test_vault.py')
-rw-r--r--test/units/parsing/vault/test_vault.py136
1 files changed, 87 insertions, 49 deletions
diff --git a/test/units/parsing/vault/test_vault.py b/test/units/parsing/vault/test_vault.py
index e4a25f0a16..c657e82936 100644
--- a/test/units/parsing/vault/test_vault.py
+++ b/test/units/parsing/vault/test_vault.py
@@ -26,7 +26,7 @@ import io
import os
from binascii import hexlify
-from nose.plugins.skip import SkipTest
+import pytest
from ansible.compat.tests import unittest
@@ -37,28 +37,6 @@ from ansible.parsing.vault import VaultLib
from ansible.parsing import vault
-# Counter import fails for 2.0.1, requires >= 2.6.1 from pip
-try:
- from Crypto.Util import Counter
- HAS_COUNTER = True
-except ImportError:
- HAS_COUNTER = False
-
-# KDF import fails for 2.0.1, requires >= 2.6.1 from pip
-try:
- from Crypto.Protocol.KDF import PBKDF2
- HAS_PBKDF2 = True
-except ImportError:
- HAS_PBKDF2 = False
-
-# AES IMPORTS
-try:
- from Crypto.Cipher import AES as AES
- HAS_AES = True
-except ImportError:
- HAS_AES = False
-
-
class TestVaultIsEncrypted(unittest.TestCase):
def test_bytes_not_encrypted(self):
b_data = b"foobar"
@@ -151,6 +129,8 @@ class TestVaultIsEncryptedFile(unittest.TestCase):
self.assertTrue(vault.is_encrypted_file(b_data_fo, start_pos=4, count=vault_length))
+@pytest.mark.skipif(not vault.HAS_CRYPTOGRAPHY,
+ reason="Skipping cryptography tests because cryptography is not installed")
class TestVaultCipherAes256(unittest.TestCase):
def setUp(self):
self.vault_cipher = vault.VaultAES256()
@@ -159,26 +139,71 @@ class TestVaultCipherAes256(unittest.TestCase):
self.assertIsInstance(self.vault_cipher, vault.VaultAES256)
# TODO: tag these as slow tests
- def test_create_key(self):
+ def test_create_key_cryptography(self):
b_password = b'hunter42'
b_salt = os.urandom(32)
- b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
- self.assertIsInstance(b_key, six.binary_type)
+ b_key_cryptography = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_cryptography, six.binary_type)
+
+ @pytest.mark.skipif(not vault.HAS_PYCRYPTO, reason='Not testing pycrypto key as pycrypto is not installed')
+ def test_create_key_pycrypto(self):
+ b_password = b'hunter42'
+ b_salt = os.urandom(32)
+
+ b_key_pycrypto = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_pycrypto, six.binary_type)
+
+ @pytest.mark.skipif(not vault.HAS_PYCRYPTO,
+ reason='Not comparing cryptography key to pycrypto key as pycrypto is not installed')
+ def test_compare_new_keys(self):
+ b_password = b'hunter42'
+ b_salt = os.urandom(32)
+ b_key_cryptography = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
+
+ b_key_pycrypto = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertEqual(b_key_cryptography, b_key_pycrypto)
+
+ def test_create_key_known_cryptography(self):
+ b_password = b'hunter42'
+
+ # A fixed salt
+ b_salt = b'q' * 32 # q is the most random letter.
+ b_key_1 = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_1, six.binary_type)
- def test_create_key_known(self):
+ # verify we get the same answer
+ # we could potentially run a few iterations of this and time it to see if it's roughly constant time
+ # and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
+ b_key_2 = self.vault_cipher._create_key_cryptography(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_2, six.binary_type)
+ self.assertEqual(b_key_1, b_key_2)
+
+ # And again with pycrypto
+ b_key_3 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_3, six.binary_type)
+
+ # verify we get the same answer
+ # we could potentially run a few iterations of this and time it to see if it's roughly constant time
+ # and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
+ b_key_4 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_4, six.binary_type)
+ self.assertEqual(b_key_3, b_key_4)
+ self.assertEqual(b_key_1, b_key_4)
+
+ def test_create_key_known_pycrypto(self):
b_password = b'hunter42'
# A fixed salt
b_salt = b'q' * 32 # q is the most random letter.
- b_key = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
- self.assertIsInstance(b_key, six.binary_type)
+ b_key_3 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_3, six.binary_type)
# verify we get the same answer
# we could potentially run a few iterations of this and time it to see if it's roughly constant time
# and or that it exceeds some minimal time, but that would likely cause unreliable fails, esp in CI
- b_key_2 = self.vault_cipher._create_key(b_password, b_salt, keylength=32, ivlength=16)
- self.assertIsInstance(b_key, six.binary_type)
- self.assertEqual(b_key, b_key_2)
+ b_key_4 = self.vault_cipher._create_key_pycrypto(b_password, b_salt, key_length=32, iv_length=16)
+ self.assertIsInstance(b_key_4, six.binary_type)
+ self.assertEqual(b_key_3, b_key_4)
def test_is_equal_is_equal(self):
self.assertTrue(self.vault_cipher._is_equal(b'abcdefghijklmnopqrstuvwxyz', b'abcdefghijklmnopqrstuvwxyz'))
@@ -213,6 +238,21 @@ class TestVaultCipherAes256(unittest.TestCase):
self.assertRaises(TypeError, self.vault_cipher._is_equal, b"blue fish", 2)
+@pytest.mark.skipif(not vault.HAS_PYCRYPTO,
+ reason="Skipping Pycrypto tests because pycrypto is not installed")
+class TestVaultCipherAes256PyCrypto(TestVaultCipherAes256):
+ def setUp(self):
+ self.has_cryptography = vault.HAS_CRYPTOGRAPHY
+ vault.HAS_CRYPTOGRAPHY = False
+ super(TestVaultCipherAes256PyCrypto, self).setUp()
+
+ def tearDown(self):
+ vault.HAS_CRYPTOGRAPHY = self.has_cryptography
+ super(TestVaultCipherAes256PyCrypto, self).tearDown()
+
+
+@pytest.mark.skipif(not vault.HAS_CRYPTOGRAPHY,
+ reason="Skipping cryptography tests because cryptography is not installed")
class TestVaultLib(unittest.TestCase):
def setUp(self):
self.v = VaultLib('test-vault-password')
@@ -266,8 +306,6 @@ class TestVaultLib(unittest.TestCase):
self.assertEqual(self.v.b_version, b"9.9", msg="version was not properly set")
def test_encrypt_decrypt_aes(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
self.v.cipher_name = u'AES'
self.v.b_password = b'ansible'
# AES encryption code has been removed, so this is old output for
@@ -281,8 +319,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
def test_encrypt_decrypt_aes256(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
self.v.cipher_name = u'AES256'
plaintext = u"foobar"
b_vaulttext = self.v.encrypt(plaintext)
@@ -291,8 +327,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
self.assertEqual(b_plaintext, b"foobar", msg="decryption failed")
def test_encrypt_decrypt_aes256_existing_vault(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
self.v.cipher_name = u'AES256'
b_orig_plaintext = b"Setec Astronomy"
vaulttext = u'''$ANSIBLE_VAULT;1.1;AES256
@@ -309,12 +343,10 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
b_plaintext = self.v.decrypt(b_vaulttext)
self.assertEqual(b_plaintext, b_orig_plaintext, msg="decryption failed")
+ # FIXME This test isn't working quite yet.
+ @pytest.mark.skip(reason='This test is not ready yet')
def test_encrypt_decrypt_aes256_bad_hmac(self):
- # FIXME This test isn't working quite yet.
- raise SkipTest
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
self.v.cipher_name = 'AES256'
# plaintext = "Setec Astronomy"
enc_data = '''$ANSIBLE_VAULT;1.1;AES256
@@ -349,8 +381,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
self.v.decrypt(b_invalid_ciphertext)
def test_encrypt_encrypted(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
self.v.cipher_name = u'AES'
b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible")
vaulttext = to_text(b_vaulttext, errors='strict')
@@ -358,8 +388,6 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
self.assertRaises(errors.AnsibleError, self.v.encrypt, vaulttext)
def test_decrypt_decrypted(self):
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
plaintext = u"ansible"
self.assertRaises(errors.AnsibleError, self.v.decrypt, plaintext)
@@ -367,9 +395,19 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e
self.assertRaises(errors.AnsibleError, self.v.decrypt, b_plaintext)
def test_cipher_not_set(self):
- # not setting the cipher should default to AES256
- if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2:
- raise SkipTest
plaintext = u"ansible"
self.v.encrypt(plaintext)
self.assertEquals(self.v.cipher_name, "AES256")
+
+
+@pytest.mark.skipif(not vault.HAS_PYCRYPTO,
+ reason="Skipping Pycrypto tests because pycrypto is not installed")
+class TestVaultLibPyCrypto(TestVaultLib):
+ def setUp(self):
+ self.has_cryptography = vault.HAS_CRYPTOGRAPHY
+ vault.HAS_CRYPTOGRAPHY = False
+ super(TestVaultLibPyCrypto, self).setUp()
+
+ def tearDown(self):
+ vault.HAS_CRYPTOGRAPHY = self.has_cryptography
+ super(TestVaultLibPyCrypto, self).tearDown()