diff options
-rw-r--r-- | Doc/AEAD_API.txt | 73 | ||||
-rw-r--r-- | lib/Crypto/Cipher/AES.py | 6 | ||||
-rw-r--r-- | lib/Crypto/Cipher/blockalgo.py | 168 | ||||
-rw-r--r-- | lib/Crypto/Protocol/KDF.py | 90 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Cipher/common.py | 74 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Cipher/test_AES.py | 36 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Protocol/test_KDF.py | 60 |
7 files changed, 444 insertions, 63 deletions
diff --git a/Doc/AEAD_API.txt b/Doc/AEAD_API.txt index 0dcc147..8a4c363 100644 --- a/Doc/AEAD_API.txt +++ b/Doc/AEAD_API.txt @@ -17,9 +17,13 @@ packet header. In this file, that is called "associated data" (AD) even though terminology may vary from mode to mode. +The term "MAC" is used to indicate the authentication tag generated as part +of the encryption process. The MAC is consumed by the receiver to tell +whether the message has been tampered with. + === Goals of the AEAD API -1. Any piece of data (AD, ciphertext, plaintext) is passed only once. +1. Any piece of data (AD, ciphertext, plaintext, MAC) is passed only once. 2. It is possible to encrypt/decrypt huge files withe little memory cost. To this end, authentication, encryption, and decryption are always incremental. That is, the caller may split data in any way, and the end @@ -116,7 +120,7 @@ In addition to the existing Cipher methods new(), encrypt() and decrypt(), - hexdigest() as digest(), but the output is ASCII hexadecimal. - verify() to evaluate if the binary MAC is correct. It can only be - called at the end of decryption. + called at the end of decryption. It takes the MAC as input. - hexverify() as verify(), but the input is ASCII hexadecimal. @@ -128,13 +132,13 @@ IV/nonce to never repeat, this method may be misused and lead to security holes. Since MAC validation (decryption mode) is subject to timing attacks, -the (hex)verify() method accepts as input the expected MAC and raises a -ValueError exception if it does not match. -Internally, the method can perform the validation using time-independent code. +the (hex)verify() method internally performs comparison of received +and computed MACs using time-independent code. +A ValueError exception is issued if the MAC does not match. === Padding -The proposed API only supports AEAD modes than do not require padding +The proposed API only supports AEAD modes that do not require padding of the plaintext. Adding support for that would make the API more complex (for instance, an external "finished" flag to pass encrypt()). @@ -153,8 +157,8 @@ it such results across several encryptions/decryptions. The proposed API supports single-pass, online AEAD modes. A "single-pass" mode processes each block of AD or data only once. -Compare that to "2-pass" modes, where each block of data (not AD) twice: -once for authentication and one for enciphering. +Compare that to "2-pass" modes, where each block of data (not AD) +is processed twice: once for authentication and once for enciphering. An "online" mode does not need to know the size of the message before encryption starts. As a consequence: @@ -169,12 +173,27 @@ are in widespread use. That is still OK, provided that encryption can start *before* authentication is completed. If that's the case (e.g. GCM, EAX) encrypt()/decrypt() will also take care of completing the authentication over the plaintext. -If that's NOT the case (e.g. SIV), encrypt()/decrypt() can only be called once. A similar problem arises with non-online modes (e.g. CCM): they have to wait to see the end of the plaintext before encryption can start. The only way to achieve that is to only allow encrypt()/decrypt() to be called once. +To summarize: + +Single-pass and online: + Fully supported by the API. The mode is most likely patented. + +Single-pass and not-online: + encrypt()/decrypt() can only be called once. The mode is most likely patented. + +2-pass and online: + Fully supported by the API, but only if encryption or decryption can start + *before* authentication is finished. + +2-pass and not-online: + Fully supported by the API, but only if encryption or decryption can start + *before* authentication is finished. encrypt()/decrypt() can only be called once. + === Associated Data Depending on the mode, the associated data AD can be defined as: @@ -199,6 +218,13 @@ For modes of the 2nd type (e.g. SIV), the API assumes that each call to update() ingests one full item of the vector. The two examples above are now different. +=== MAC + +During encryption, the MAC is computed and returned by digest(). + +During decryption, the received MAC is passed as a parameter to verify() +at the very end. + === CCM mode Number of keys required: 1 Compatible with ciphers: AES (may be used with others if block length @@ -206,6 +232,8 @@ Compatible with ciphers: AES (may be used with others if block length Counter/IV/nonce: 1 nonce required (length 8..13 bytes) Single-pass: no Online: no +Encryption can start before +authentication is complete: yes Term for AD: "associate data" (NIST) or "additional authenticated data" (RFC) Term for MAC: part of ciphertext (NIST) or @@ -228,40 +256,49 @@ Compatible with ciphers: AES (may be used with others if block length Counter/IV/nonce: 1 IV required (any length) Single-pass: no Online: yes +Encryption can start before +authentication is complete: yes Term for AD: "additional authenticated data" Term for MAC: "authentication tag" or "tag" Padding required: no Pre-processing of AD: possible -Encryption can start before authentication, so encrypt()/decrypt() can always -be called multiple times. - === EAX mode Number of keys required: 1 Compatible with ciphers: any Counter/IV/nonce: 1 IV required (any length) Single-pass: no Online: yes +Encryption can start before +authentication is complete: yes Term for AD: "header" Term for MAC: "tag" Padding required: no Pre-processing of AD: possible -Encryption can start before authentication, so encrypt()/decrypt() can always -be called multiple times. - === SIV Number of keys required: 2 Compatible with ciphers: AES Counter/IV/nonce: 1 IV required (any length) Single-pass: no Online: no +Encryption can start before +authentication is complete: no Term for AD: "associated data" (vector) Term for MAC: "tag" Padding required: no Pre-processing of AD: possible -Encryption can only start before authentication is ended, so encrypt()/decrypt() -an only be called once. SIV is not suitable for big files or streaming. - AD is a vector of strings. One item in the vector is the (optional) IV. + +One property of SIV is that encryption or decryption can only start +as soon as the MAC is available. + +That is not a problem for encryption: since encrypt() can only be called +once, it can internally compute the MAC first, perform the encryption, +and return the ciphertext. The MAC is cached for the subsequent call to digest(). + +The major problem is decryption: decrypt() cannot produce any output because +the MAC is meant to be passed to verify() only later. +To overcome this limitation, decrypt() *exceptionally* accepts the ciphertext +concatenated to the MAC. The MAC check is still performed with verify(). diff --git a/lib/Crypto/Cipher/AES.py b/lib/Crypto/Cipher/AES.py index 5cbd04b..0a53f54 100644 --- a/lib/Crypto/Cipher/AES.py +++ b/lib/Crypto/Cipher/AES.py @@ -120,6 +120,8 @@ def new(key, *args, **kwargs): key : byte string The secret key to use in the symmetric cipher. It must be 16 (*AES-128*), 24 (*AES-192*), or 32 (*AES-256*) bytes long. + + Only in `MODE_SIV`, it needs to be 32, 48, or 64 bytes long. :Keywords: mode : a *MODE_** constant The chaining mode to use for encryption or decryption. @@ -138,7 +140,7 @@ def new(key, *args, **kwargs): For all other modes, it must be 16 bytes long. nonce : byte string - (*Only* `MODE_CCM`, `MODE_EAX`). + (*Only* `MODE_CCM`, `MODE_EAX`, `MODE_SIV`). A mandatory value that must never be reused for any other encryption. @@ -194,6 +196,8 @@ MODE_OPENPGP = 7 MODE_CCM = 8 #: EAX Mode. See `blockalgo.MODE_EAX`. MODE_EAX = 9 +#: Syntethic Initialization Vector (SIV). See `blockalgo.MODE_SIV`. +MODE_SIV = 10 #: Size of a data block (in bytes) block_size = 16 #: Size of a key (in bytes) diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py index be00832..aab7873 100644 --- a/lib/Crypto/Cipher/blockalgo.py +++ b/lib/Crypto/Cipher/blockalgo.py @@ -34,6 +34,7 @@ from Crypto.Util.strxor import strxor from Crypto.Util.number import long_to_bytes, bytes_to_long import Crypto.Util.Counter from Crypto.Hash import CMAC +from Crypto.Protocol.KDF import S2V #: *Electronic Code Book (ECB)*. #: This is the simplest encryption mode. Each of the plaintext blocks @@ -211,6 +212,51 @@ MODE_CCM = 8 #: .. __: http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/proposedmodes/eax/eax-spec.pdf MODE_EAX = 9 +#: *Synthetic Initialization Vector*. This is an Authenticated Encryption with +#: Associated Data (`AEAD`_) mode. It provides both confidentiality and +#: authenticity. +#: The header of the message may be left in the clear, if needed, and it will +#: still be subject to authentication. The decryption step tells the receiver +#: if the message comes from a source that really knowns the secret key. +#: Additionally, decryption detects if any part of the message - including the +#: header - has been modified or corrupted. +#: +#: If the data being encrypted is completely unpredictable to an adversary +#: (e.g. a secret key, for key wrapping purposes) a nonce is not strictly +#: required. +#: +#: Otherwise, a nonce has to be provided; the nonce shall never repeat +#: for two different messages encrypted with the same key, but it does not +#: need to be random. +#: +#: Unlike other AEAD modes such as CCM, EAX or GCM, accidental reuse of a +#: nonce is not catastrophic for the confidentiality of the message. The only +#: effect is that an attacker can tell when the same plaintext (and same +#: associated data) is protected with the same key. +#: +#: The length of the MAC is fixed to the block size of the underlying cipher. +#: The key size is twice the length of the key of the underlying cipher. +#: +#: This mode is only available for AES ciphers. +#: +#: +--------------------+---------------+-------------------+ +#: | Cipher | SIV MAC size | SIV key length | +#: | | (bytes) | (bytes) | +#: +====================+===============+===================+ +#: | AES-128 | 16 | 32 | +#: +--------------------+---------------+-------------------+ +#: | AES-192 | 16 | 48 | +#: +--------------------+---------------+-------------------+ +#: | AES-256 | 16 | 64 | +#: +--------------------+---------------+-------------------+ +#: +#: See `RFC5297`_ and the `original paper`__. +#: +#: .. _RFC5297: https://tools.ietf.org/html/rfc5297 +#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html +#: .. __: http://www.cs.ucdavis.edu/~rogaway/papers/keywrap.pdf +MODE_SIV = 10 + def _getParameter(name, index, args, kwargs, default=None): """Find a parameter in tuple and dictionary arguments a function receives""" @@ -265,10 +311,40 @@ class BlockAlgo: self._start_PGP(factory, key, *args, **kwargs) elif self.mode == MODE_EAX: self._start_eax(factory, key, *args, **kwargs) + elif self.mode == MODE_SIV: + self._start_siv(factory, key, *args, **kwargs) else: self._cipher = factory.new(key, *args, **kwargs) self.IV = self._cipher.IV + def _start_siv(self, factory, key, *args, **kwargs): + + subkey_size, rem = divmod(len(key), 2) + if rem: + raise ValueError("MODE_SIV requires a key twice as long as for the underlying cipher") + + # IV is optional + self.nonce = _getParameter('nonce', 1, args, kwargs) + + self._prf = S2V(key[:subkey_size], ciphermod=factory) + self._subkey_ctr = key[subkey_size:] + self._mac_len = factory.block_size + + # Allowed transitions after initialization + self._next = [self.update, self.encrypt, self.decrypt, + self.digest, self.verify] + + def _siv_ctr_cipher(self, tag): + """Create a new CTR cipher from the MAC in SIV mode""" + + tag_int = bytes_to_long(tag) + init_counter = tag_int ^ (tag_int & 0x8000000080000000L) + ctr = Counter.new(self._factory.block_size * 8, + initial_value=init_counter, + allow_wraparound=True) + + return self._factory.new(self._subkey_ctr, MODE_CTR, counter=ctr) + def _start_eax(self, factory, key, *args, **kwargs): self.nonce = _getParameter('nonce', 1, args, kwargs) @@ -409,9 +485,10 @@ class BlockAlgo: def update(self, assoc_data): """Protect associated data - When using an AEAD mode like CCM or EAX, and if there is any associated data, - the caller has to invoke this function one or more times, before - using ``decrypt`` or ``encrypt``. + When using an AEAD mode like CCM, EAX or SIV, and + if there is any associated data, the caller has to invoke + this function one or more times, before using + ``decrypt`` or ``encrypt``. By *associated data* it is meant any data (e.g. packet headers) that will not be encrypted and will be transmitted in the clear. @@ -429,7 +506,7 @@ class BlockAlgo: A piece of associated data. There are no restrictions on its size. """ - if self.mode in (MODE_CCM, MODE_EAX): + if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV): if self.update not in self._next: raise TypeError("update() can only be called immediately after initialization") self._next = [self.update, self.encrypt, self.decrypt, @@ -464,19 +541,30 @@ class BlockAlgo: self._omac[1].update(assoc_data) return + if self.mode == MODE_SIV: + self._prf.update(assoc_data) + return + raise ValueError("update() not supported by this mode of operation") def encrypt(self, plaintext): """Encrypt data with the key and the parameters set at initialization. - The cipher object is stateful; encryption of a long block - of data can be broken up in two or more calls to `encrypt()`. + A cipher object is stateful: once you have encrypted a message + you cannot encrypt (or decrypt) another message using the same + object. + + For `MODE_SIV` (always) and `MODE_CCM` (when ``msg_len`` was not + passed at initialization), this method can be called only **once**. + + For all other modes, the data to encrypt can be broken up in two or + more pieces and `encrypt` can be called multiple times. That is, the statement: >>> c.encrypt(a) + c.encrypt(b) - is always equivalent to: + is equivalent to: >>> c.encrypt(a+b) @@ -491,7 +579,7 @@ class BlockAlgo: - For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX` + - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, `MODE_EAX` and `MODE_SIV` *plaintext* can be of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, @@ -526,7 +614,7 @@ class BlockAlgo: self._done_first_block = True return res - if self.mode in (MODE_CCM, MODE_EAX): + if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV): if self.encrypt not in self._next: raise TypeError("encrypt() can only be called after initialization or an update()") self._next = [self.encrypt, self.digest] @@ -543,6 +631,15 @@ class BlockAlgo: self._update(plaintext) + if self.mode == MODE_SIV: + self._next = [self.digest] + + if self.nonce: + self._prf.update(self.nonce) + + self._prf.update(plaintext) + self._cipher = self._siv_ctr_cipher(self._prf.derive()) + ct = self._cipher.encrypt(plaintext) if self.mode == MODE_EAX: @@ -553,14 +650,21 @@ class BlockAlgo: def decrypt(self, ciphertext): """Decrypt data with the key and the parameters set at initialization. - The cipher object is stateful; decryption of a long block - of data can be broken up in two or more calls to `decrypt()`. + A cipher object is stateful: once you have decrypted a message + you cannot decrypt (or encrypt) another message with the same + object. + + For `MODE_SIV` (always) and `MODE_CCM` (when ``msg_len`` was not + passed at initialization), this method can be called only **once**. + + For all other modes, the data to decrypt can be broken up in two or + more pieces and `decrypt` can be called multiple times. That is, the statement: >>> c.decrypt(a) + c.decrypt(b) - is always equivalent to: + is equivalent to: >>> c.decrypt(a+b) @@ -575,16 +679,20 @@ class BlockAlgo: - For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, and `MODE_EAX`, *ciphertext* can be - of any length. + - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX` + *ciphertext* can be of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. + - For `MODE_SIV`, *ciphertext* can be of any length, but it must also + include the MAC (concatenated at the end). + :Parameters: ciphertext : byte string - The piece of data to decrypt. - :Return: the decrypted data (byte string, as long as *ciphertext*). + The piece of data to decrypt (plus the MAC, for `MODE_SIV` only). + + :Return: the decrypted data (byte string). """ if self.mode == MODE_OPENPGP: @@ -603,7 +711,7 @@ class BlockAlgo: res = self._cipher.decrypt(ciphertext) return res - if self.mode in (MODE_CCM, MODE_EAX): + if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV): if self.decrypt not in self._next: raise TypeError("decrypt() can only be called after initialization or an update()") @@ -622,11 +730,27 @@ class BlockAlgo: if self.mode == MODE_EAX: self._omac[2].update(ciphertext) + if self.mode == MODE_SIV: + self._next = [self.verify] + + # Take the MAC and start the cipher for decryption + self._mac = ciphertext[-self._factory.block_size:] + self._cipher = self._siv_ctr_cipher(self._mac) + + # Remove MAC from ciphertext + ciphertext = ciphertext[:-self._factory.block_size] + pt = self._cipher.decrypt(ciphertext) if self.mode == MODE_CCM: self._update(pt) + if self.mode == MODE_SIV: + if self.nonce: + self._prf.update(self.nonce) + if pt: + self._prf.update(pt) + return pt def digest(self): @@ -641,7 +765,7 @@ class BlockAlgo: :Return: the MAC, as a byte string. """ - if self.mode in (MODE_CCM, MODE_EAX): + if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV): if self.digest not in self._next: raise TypeError("digest() cannot be called when decrypting or validating a message") @@ -662,6 +786,9 @@ class BlockAlgo: for i in xrange(3): tag = strxor(tag, self._omac[i].digest()) + if self.mode == MODE_SIV: + tag = self._prf.derive() + return tag raise TypeError("digest() not supported by this mode of operation") @@ -693,7 +820,7 @@ class BlockAlgo: or the key is incorrect. """ - if self.mode in (MODE_CCM, MODE_EAX): + if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV): if self.verify not in self._next: raise TypeError("verify() cannot be called when encrypting a message") self._next = [self.verify] @@ -714,6 +841,9 @@ class BlockAlgo: u = strxor(u, self._omac[i].digest()) u = u[:self._mac_len] + if self.mode == MODE_SIV: + u = self._prf.derive() + res = 0 # Constant-time comparison for x,y in zip(u, mac_tag): diff --git a/lib/Crypto/Protocol/KDF.py b/lib/Crypto/Protocol/KDF.py index b13562a..0a92c15 100644 --- a/lib/Crypto/Protocol/KDF.py +++ b/lib/Crypto/Protocol/KDF.py @@ -38,16 +38,21 @@ __revision__ = "$Id$" import math import struct +import sys +if sys.version_info[0] == 2 and sys.version_info[1] == 1: + from Crypto.Util.py21compat import * from Crypto.Util.py3compat import * -from Crypto.Hash import SHA1, HMAC + +from Crypto.Hash import SHA1, HMAC, CMAC from Crypto.Util.strxor import strxor +from Crypto.Util.number import long_to_bytes, bytes_to_long def PBKDF1(password, salt, dkLen, count=1000, hashAlgo=None): """Derive one key from a password (or passphrase). This function performs key derivation according an old version of the PKCS#5 standard (v1.5). - + This algorithm is called ``PBKDF1``. Even though it is still described in the latest version of the PKCS#5 standard (version 2, or RFC2898), newer applications should use the more secure and versatile `PBKDF2` instead. @@ -121,3 +126,84 @@ def PBKDF2(password, salt, dkLen=16, count=1000, prf=None): i = i + 1 return key[:dkLen] +class S2V(object): + """String-to-vector PRF as defined in `RFC5297`_. + + This class implements a pseudorandom function family + based on CMAC that takes as input a vector of strings. + + .. _RFC5297: http://tools.ietf.org/html/rfc5297 + """ + + def __init__(self, key, ciphermod): + """Initialize the S2V PRF. + + :Parameters: + key : byte string + A secret that can be used as key for CMACs + based on ciphers from ``ciphermod``. + ciphermod : module + A block cipher module from `Crypto.Cipher`. + """ + + self._key = key + self._ciphermod = ciphermod + self._last_string = self._cache = bchr(0)*ciphermod.block_size + self._n_updates = ciphermod.block_size*8-1 + + def new(key, ciphermod): + """Create a new S2V PRF. + + :Parameters: + key : byte string + A secret that can be used as key for CMACs + based on ciphers from ``ciphermod``. + ciphermod : module + A block cipher module from `Crypto.Cipher`. + """ + return S2V(key, ciphermod) + new = staticmethod(new) + + def _double(self, bs): + doubled = bytes_to_long(bs)<<1 + if bord(bs[0]) & 0x80: + doubled ^= 0x87 + return long_to_bytes(doubled, len(bs))[-len(bs):] + + def update(self, item): + """Pass the next component of the vector. + + The maximum number of components you can pass is equal to the block + length of the cipher (in bits) minus 1. + + :Parameters: + item : byte string + The next component of the vector. + :Raise TypeError: when the limit on the number of components has been reached. + :Raise ValueError: when the component is empty + """ + + if not item: + raise ValueError("A component cannot be empty") + + if self._n_updates==0: + raise TypeError("Too many components passed to S2V") + self._n_updates -= 1 + + mac = CMAC.new(self._key, msg=self._last_string, ciphermod=self._ciphermod) + self._cache = strxor(self._double(self._cache), mac.digest()) + self._last_string = item + + def derive(self): + """"Derive a secret from the vector of components. + + :Return: a byte string, as long as the block length of the cipher. + """ + + if len(self._last_string)>=16: + final = self._last_string[:-16] + strxor(self._last_string[-16:], self._cache) + else: + padded = (self._last_string + bchr(0x80)+ bchr(0)*15)[:16] + final = strxor(padded, self._double(self._cache)) + mac = CMAC.new(self._key, msg=final, ciphermod=self._ciphermod) + return mac.digest() diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py index 603ab54..ed612b4 100644 --- a/lib/Crypto/SelfTest/Cipher/common.py +++ b/lib/Crypto/SelfTest/Cipher/common.py @@ -29,8 +29,12 @@ from __future__ import nested_scopes __revision__ = "$Id$" import sys +if sys.version_info[0] == 2 and sys.version_info[1] == 1: + from Crypto.Util.py21compat import * + import unittest from binascii import a2b_hex, b2a_hex, hexlify + from Crypto.Util.py3compat import * from Crypto.Util.strxor import strxor_c @@ -70,8 +74,6 @@ class CipherSelfTest(unittest.TestCase): self.ciphertext = b(_extract(params, 'ciphertext')) self.module_name = _extract(params, 'module_name', None) self.assoc_data = _extract(params, 'assoc_data', None) - if self.assoc_data: - self.assoc_data = b(self.assoc_data) self.mac = _extract(params, 'mac', None) if self.assoc_data: self.mac = b(self.mac) @@ -130,12 +132,17 @@ class CipherSelfTest(unittest.TestCase): else: return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.iv), **params) + def isMode(self, name): + if not hasattr(self.module, "MODE_"+name): + return False + return self.mode == getattr(self.module, "MODE_"+name) + def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) - assoc_data = None + assoc_data = [] if self.assoc_data: - assoc_data = a2b_hex(self.assoc_data) + assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data] ct = None pt = None @@ -149,19 +156,22 @@ class CipherSelfTest(unittest.TestCase): decipher = self._new(1) # Only AEAD modes - if self.assoc_data: - cipher.update(assoc_data) - decipher.update(assoc_data) + for comp in assoc_data: + cipher.update(comp) + decipher.update(comp) ctX = b2a_hex(cipher.encrypt(plaintext)) - ptX = b2a_hex(decipher.decrypt(ciphertext)) + if self.isMode("SIV"): + ptX = b2a_hex(decipher.decrypt(ciphertext+a2b_hex(self.mac))) + else: + ptX = b2a_hex(decipher.decrypt(ciphertext)) if ct: self.assertEqual(ct, ctX) self.assertEqual(pt, ptX) ct, pt = ctX, ptX - if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP: + if self.isMode("OPENPGP"): # In PGP mode, data returned by the first encrypt() # is prefixed with the encrypted IV. # Here we check it and then remove it from the ciphertexts. @@ -387,10 +397,18 @@ class AEADTests(unittest.TestCase): self.module = module self.mode_name = mode_name self.mode = getattr(module, mode_name) - self.key = b('\xFF')*key_size + if not self.isMode("SIV"): + self.key = b('\xFF')*key_size + else: + self.key = b('\xFF')*key_size*2 self.iv = b('\x00')*10 self.description = "AEAD Test" + def isMode(self, name): + if not hasattr(self.module, "MODE_"+name): + return False + return self.mode == getattr(self.module, "MODE_"+name) + def right_mac_test(self): """Positive tests for MAC""" @@ -409,7 +427,10 @@ class AEADTests(unittest.TestCase): # Decrypt and verify that MAC is accepted decipher = self.module.new(self.key, self.mode, self.iv) decipher.update(ad_ref) - pt = decipher.decrypt(ct_ref) + if not self.isMode("SIV"): + pt = decipher.decrypt(ct_ref) + else: + pt = decipher.decrypt(ct_ref+mac_ref) decipher.verify(mac_ref) self.assertEqual(pt, pt_ref) @@ -452,6 +473,15 @@ class AEADTests(unittest.TestCase): self.description = "Test for multiple updates in %s of %s" % \ (self.mode_name, self.module.__name__) + # In all modes other than SIV, the associated data is a single + # component that can be arbitrarilly split and submitted to update(). + # + # In SIV, associated data is instead organized in a vector or multiple + # components. Each component is passed to update() as a whole. + # This test is therefore not meaningful to SIV. + if self.isMode("SIV"): + return + ad = b("").join([bchr(x) for x in xrange(0,128)]) mac1, mac2, mac3 = (None,)*3 @@ -489,23 +519,23 @@ class AEADTests(unittest.TestCase): # Calling decrypt after encrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) - cipher.encrypt(b("PT")) - self.assertRaises(TypeError, cipher.decrypt, b("XYZ")) + cipher.encrypt(b("PT")*40) + self.assertRaises(TypeError, cipher.decrypt, b("XYZ")*40) # Calling encrypt after decrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) - cipher.decrypt(b("CT")) - self.assertRaises(TypeError, cipher.encrypt, b("XYZ")) + cipher.decrypt(b("CT")*40) + self.assertRaises(TypeError, cipher.encrypt, b("XYZ")*40) # Calling verify after encrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) - cipher.encrypt(b("PT")) + cipher.encrypt(b("PT")*40) self.assertRaises(TypeError, cipher.verify, b("XYZ")) self.assertRaises(TypeError, cipher.hexverify, "12") # Calling digest after decrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) - cipher.decrypt(b("CT")) + cipher.decrypt(b("CT")*40) self.assertRaises(TypeError, cipher.digest) self.assertRaises(TypeError, cipher.hexdigest) @@ -518,13 +548,13 @@ class AEADTests(unittest.TestCase): # Calling update after encrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) cipher.update(b("XX")) - cipher.encrypt(b("PT")) + cipher.encrypt(b("PT")*40) self.assertRaises(TypeError, cipher.update, b("XYZ")) # Calling update after decrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) cipher.update(b("XX")) - cipher.decrypt(b("CT")) + cipher.decrypt(b("CT")*40) self.assertRaises(TypeError, cipher.update, b("XYZ")) def runTest(self): @@ -658,10 +688,10 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()): extra_tests_added = 1 # Extract associated data and MAC for AEAD modes - if p_mode in ('CCM', 'EAX'): + if p_mode in ('CCM', 'EAX', 'SIV'): assoc_data, params['plaintext'] = params['plaintext'].split('|') assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|') - params['assoc_data'] = assoc_data + params['assoc_data'] = assoc_data.split("-") params['mac_len'] = len(params['mac'])>>1 # Add the current test to the test suite @@ -687,7 +717,7 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()): CCMMACLengthTest(module), CCMSplitEncryptionTest(module), ] - for aead_mode in ("MODE_CCM","MODE_EAX"): + for aead_mode in ("MODE_CCM","MODE_EAX", "MODE_SIV"): if hasattr(module, aead_mode): key_sizes = [] try: diff --git a/lib/Crypto/SelfTest/Cipher/test_AES.py b/lib/Crypto/SelfTest/Cipher/test_AES.py index 53d60c0..89243c0 100644 --- a/lib/Crypto/SelfTest/Cipher/test_AES.py +++ b/lib/Crypto/SelfTest/Cipher/test_AES.py @@ -1759,6 +1759,42 @@ test_data = [ 'EAX spec Appendix G', dict(mode='EAX', nonce='22E7ADD93CFC6393C57EC0B3C17D6B44') ), + + # Test vectors for SIV taken from RFC5297 + # This is a list of tuples with 5 items: + # + # 1. Header + '|' + plaintext + # 2. Header + '|' + ciphertext + '|' + MAC + # 3. AES-128 key + # 4. Description + # 5. Dictionary of parameters to be passed to AES.new(). + # It must include the nonce. + # + # A "Header" is a dash ('-') separated sequece of components. + # + ( '101112131415161718191a1b1c1d1e1f2021222324252627|112233445566778899aabbccddee', + '101112131415161718191a1b1c1d1e1f2021222324252627|40c02b9690c4dc04daef7f6afe5c|' + + '85632d07c6e8f37f950acd320a2ecc93', + 'fffefdfcfbfaf9f8f7f6f5f4f3f2f1f0f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff', + 'RFC5297 A.1', + dict(mode='SIV', nonce=None) + ), + + ( '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddccbbaa9988' + + '7766554433221100-102030405060708090a0|' + + '7468697320697320736f6d6520706c61696e7465787420746f20656e63727970' + + '74207573696e67205349562d414553', + + '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddccbbaa9988' + + '7766554433221100-102030405060708090a0|' + + 'cb900f2fddbe404326601965c889bf17dba77ceb094fa663b7a3f748ba8af829' + + 'ea64ad544a272e9c485b62a3fd5c0d|' + + '7bdb6e3b432667eb06f4d14bff2fbd0f', + + '7f7e7d7c7b7a79787776757473727170404142434445464748494a4b4c4d4e4f', + 'RFC5297 A.2', + dict(mode='SIV', nonce='09f911029d74e35bd84156c5635688c0') + ), ] def get_tests(config={}): diff --git a/lib/Crypto/SelfTest/Protocol/test_KDF.py b/lib/Crypto/SelfTest/Protocol/test_KDF.py index 75bb8f2..37cfb55 100644 --- a/lib/Crypto/SelfTest/Protocol/test_KDF.py +++ b/lib/Crypto/SelfTest/Protocol/test_KDF.py @@ -29,8 +29,9 @@ from Crypto.Util.py3compat import * from Crypto.SelfTest.st_common import list_test_cases from Crypto.Hash import SHA1, HMAC +from Crypto.Cipher import AES, DES3 -from Crypto.Protocol.KDF import PBKDF1, PBKDF2 +from Crypto.Protocol.KDF import PBKDF1, PBKDF2, S2V def t2b(t): return unhexlify(b(t)) @@ -87,10 +88,67 @@ class PBKDF2_Tests(unittest.TestCase): self.assertEqual(res, t2b(v[4])) self.assertEqual(res, res2) +class S2V_Tests(unittest.TestCase): + + # Sequence of test vectors. + # Each test vector is made up by: + # Item #0: a tuple of strings + # Item #1: an AES key + # Item #2: the result + # Item #3: the cipher module S2V is based on + # Everything is hex encoded + _testData = [ + + # RFC5297, A.1 + ( + ( '101112131415161718191a1b1c1d1e1f2021222324252627', + '112233445566778899aabbccddee' ), + 'fffefdfcfbfaf9f8f7f6f5f4f3f2f1f0', + '85632d07c6e8f37f950acd320a2ecc93', + AES + ), + + # RFC5297, A.2 + ( + ( '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddcc'+ + 'bbaa99887766554433221100', + '102030405060708090a0', + '09f911029d74e35bd84156c5635688c0', + '7468697320697320736f6d6520706c61'+ + '696e7465787420746f20656e63727970'+ + '74207573696e67205349562d414553'), + '7f7e7d7c7b7a79787776757473727170', + '7bdb6e3b432667eb06f4d14bff2fbd0f', + AES + ), + + ] + + def test1(self): + """Verify correctness of test vector""" + for tv in self._testData: + s2v = S2V.new(t2b(tv[1]), tv[3]) + for s in tv[0]: + s2v.update(t2b(s)) + result = s2v.derive() + self.assertEqual(result, t2b(tv[2])) + + def test2(self): + """Verify that no more than 127(AES) and 63(TDES) + components are accepted.""" + key = bchr(0)*16 + for module in (AES, DES3): + s2v = S2V.new(key, module) + max_comps = module.block_size*8-1 + for i in xrange(max_comps): + s2v.update(b("XX")) + self.assertRaises(TypeError, s2v.update, b("YY")) + def get_tests(config={}): tests = [] tests += list_test_cases(PBKDF1_Tests) tests += list_test_cases(PBKDF2_Tests) + tests += list_test_cases(S2V_Tests) return tests if __name__ == '__main__': |