summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2018-08-02 15:05:20 +0300
committerGitHub <noreply@github.com>2018-08-02 15:05:20 +0300
commit79d679883f0b198511ea5eeaaf53f1f625e8d938 (patch)
tree52e9bb0bd2dd016b76d9e1dcd89fbba8c1edae87
parentd5e4e1b386306fb1e4118ae7bdf52a459328a18f (diff)
parent0e6aab4a3d6ae62c7f0791c7c4e85f93eba2958e (diff)
downloadpysaml2-79d679883f0b198511ea5eeaaf53f1f625e8d938.tar.gz
Merge pull request #519 from c00kiemon5ter/fix-aes-ctr-ecb-CVE-2017-1000246
Fix AES IV reuse - drop support for CTR and ECB - address CVE-2017-1000246
-rw-r--r--src/saml2/aes.py56
-rw-r--r--src/saml2/authn.py2
-rw-r--r--src/saml2/server.py3
-rw-r--r--tests/test_92_aes.py74
4 files changed, 83 insertions, 52 deletions
diff --git a/src/saml2/aes.py b/src/saml2/aes.py
index ee6a944a..54808e67 100644
--- a/src/saml2/aes.py
+++ b/src/saml2/aes.py
@@ -11,36 +11,27 @@ from cryptography.hazmat.primitives.ciphers import modes
POSTFIX_MODE = {
'cbc': modes.CBC,
'cfb': modes.CFB,
- 'ecb': modes.ECB,
}
AES_BLOCK_SIZE = int(algorithms.AES.block_size / 8)
class AESCipher(object):
- def __init__(self, key, iv=None):
+ def __init__(self, key):
"""
:param key: The encryption key
- :param iv: Init vector
:return: AESCipher instance
"""
self.key = key
- self.iv = iv
- def build_cipher(self, iv=None, alg='aes_128_cbc'):
+ def build_cipher(self, alg='aes_128_cbc'):
"""
- :param iv: init vector
:param alg: cipher algorithm
:return: A Cipher instance
"""
typ, bits, cmode = alg.lower().split('_')
bits = int(bits)
-
- if not iv:
- if self.iv:
- iv = self.iv
- else:
- iv = os.urandom(AES_BLOCK_SIZE)
+ iv = os.urandom(AES_BLOCK_SIZE)
if len(iv) != AES_BLOCK_SIZE:
raise Exception('Wrong iv size: {}'.format(len(iv)))
@@ -63,11 +54,10 @@ class AESCipher(object):
return cipher, iv
- def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
- b64enc=True, block_size=AES_BLOCK_SIZE):
+ def encrypt(self, msg, alg='aes_128_cbc', padding='PKCS#7', b64enc=True,
+ block_size=AES_BLOCK_SIZE):
"""
:param key: The encryption key
- :param iv: init vector
:param msg: Message to be encrypted
:param padding: Which padding that should be used
:param b64enc: Whether the result should be base64encoded
@@ -87,7 +77,7 @@ class AESCipher(object):
c = chr(plen).encode()
msg += c * plen
- cipher, iv = self.build_cipher(iv, alg)
+ cipher, iv = self.build_cipher(alg)
encryptor = cipher.encryptor()
cmsg = iv + encryptor.update(msg) + encryptor.finalize()
@@ -98,48 +88,18 @@ class AESCipher(object):
return enc_msg
- def decrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
- b64dec=True):
+ def decrypt(self, msg, alg='aes_128_cbc', padding='PKCS#7', b64dec=True):
"""
:param key: The encryption key
- :param iv: init vector
:param msg: Base64 encoded message to be decrypted
:return: The decrypted message
"""
data = b64decode(msg) if b64dec else msg
- _iv = data[:AES_BLOCK_SIZE]
- if iv:
- assert iv == _iv
- cipher, iv = self.build_cipher(iv, alg=alg)
+ cipher, iv = self.build_cipher(alg=alg)
decryptor = cipher.decryptor()
res = decryptor.update(data)[AES_BLOCK_SIZE:] + decryptor.finalize()
if padding in ['PKCS#5', 'PKCS#7']:
idx = bytearray(res)[-1]
res = res[:-idx]
return res
-
-
-def run_test():
- key = b'1234523451234545' # 16 byte key
- iv = os.urandom(AES_BLOCK_SIZE)
- # Iff padded, the message doesn't have to be multiple of 16 in length
- original_msg = b'ToBeOrNotTobe W.S.'
- aes = AESCipher(key)
-
- encrypted_msg = aes.encrypt(original_msg, iv)
- decrypted_msg = aes.decrypt(encrypted_msg, iv)
- assert decrypted_msg == original_msg
-
- encrypted_msg = aes.encrypt(original_msg)
- decrypted_msg = aes.decrypt(encrypted_msg)
- assert decrypted_msg == original_msg
-
- aes = AESCipher(key, iv)
- encrypted_msg = aes.encrypt(original_msg)
- decrypted_msg = aes.decrypt(encrypted_msg)
- assert decrypted_msg == original_msg
-
-
-if __name__ == '__main__':
- run_test()
diff --git a/src/saml2/authn.py b/src/saml2/authn.py
index 32f91247..049622e7 100644
--- a/src/saml2/authn.py
+++ b/src/saml2/authn.py
@@ -120,7 +120,7 @@ class UsernamePasswordMako(UserAuthnMethod):
self.return_to = return_to
self.active = {}
self.query_param = "upm_answer"
- self.aes = AESCipher(self.srv.symkey.encode(), srv.iv)
+ self.aes = AESCipher(self.srv.symkey.encode())
def __call__(self, cookie=None, policy_url=None, logo_url=None,
query="", **kwargs):
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 0e7e0403..0a2943f2 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -83,12 +83,9 @@ class Server(Entity):
self.init_config(stype)
self.cache = cache
self.ticket = {}
- #
self.session_db = self.choose_session_storage()
- # Needed for
self.symkey = symkey
self.seed = rndstr()
- self.iv = os.urandom(16)
self.lock = threading.Lock()
def getvalid_certificate_str(self):
diff --git a/tests/test_92_aes.py b/tests/test_92_aes.py
new file mode 100644
index 00000000..564260bc
--- /dev/null
+++ b/tests/test_92_aes.py
@@ -0,0 +1,74 @@
+import os
+
+import saml2.aes
+
+
+class TestAES():
+ def test_aes_defaults(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(16)
+ aes = saml2.aes.AESCipher(key)
+
+ encrypted_msg = aes.encrypt(original_msg)
+ decrypted_msg = aes.decrypt(encrypted_msg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_128_cbc(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(16)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_128_cbc'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_128_cfb(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(16)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_128_cfb'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_192_cbc(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(24)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_192_cbc'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_192_cfb(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(24)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_192_cfb'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_256_cbc(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(32)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_256_cbc'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg
+
+ def test_aes_256_cfb(self):
+ original_msg = b'ToBeOrNotTobe W.S.'
+ key = os.urandom(32)
+ aes = saml2.aes.AESCipher(key)
+ alg = 'aes_256_cfb'
+
+ encrypted_msg = aes.encrypt(original_msg, alg=alg)
+ decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
+ assert decrypted_msg == original_msg