diff options
Diffstat (limited to 'lib/Crypto/Cipher/blockalgo.py')
-rw-r--r-- | lib/Crypto/Cipher/blockalgo.py | 171 |
1 files changed, 132 insertions, 39 deletions
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py index 57738c4..be00832 100644 --- a/lib/Crypto/Cipher/blockalgo.py +++ b/lib/Crypto/Cipher/blockalgo.py @@ -31,7 +31,9 @@ from binascii import unhexlify from Crypto.Util import Counter from Crypto.Util.strxor import strxor -from Crypto.Util.number import long_to_bytes +from Crypto.Util.number import long_to_bytes, bytes_to_long +import Crypto.Util.Counter +from Crypto.Hash import CMAC #: *Electronic Code Book (ECB)*. #: This is the simplest encryption mode. Each of the plaintext blocks @@ -184,6 +186,31 @@ MODE_OPENPGP = 7 #: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html MODE_CCM = 8 +#: *EAX*. This is an Authenticated Encryption with Associated Data +#: (`AEAD`_) mode. It provides both confidentiality and authenticity. +#: +#: The header of the message may be left in the clear, if needed, and it will +#: still be subject to authentication. +#: +#: The decryption step tells the receiver if the message comes from a source +#: that really knowns the secret key. +#: Additionally, decryption detects if any part of the message - including the +#: header - has been modified or corrupted. +#: +#: This mode requires a nonce. The nonce shall never repeat for two +#: different messages encrypted with the same key, but it does not need to +#: be random. +# +#: This mode is only available for ciphers that operate on 64 or +#: 128 bits blocks. +#: +#: There are no official standards defining EAX. The implementation is based on +#: `a proposal`__ that was presented to NIST. +#: +#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html +#: .. __: http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/proposedmodes/eax/eax-spec.pdf +MODE_EAX = 9 + def _getParameter(name, index, args, kwargs, default=None): """Find a parameter in tuple and dictionary arguments a function receives""" @@ -236,10 +263,44 @@ class BlockAlgo: elif self.mode == MODE_OPENPGP: self._start_PGP(factory, key, *args, **kwargs) + elif self.mode == MODE_EAX: + self._start_eax(factory, key, *args, **kwargs) else: self._cipher = factory.new(key, *args, **kwargs) self.IV = self._cipher.IV + def _start_eax(self, factory, key, *args, **kwargs): + + self.nonce = _getParameter('nonce', 1, args, kwargs) + if not self.nonce: + raise ValueError("MODE_EAX requires a nonce") + + # Allowed transitions after initialization + self._next = [self.update, self.encrypt, self.decrypt, + self.digest, self.verify] + + self._mac_len = kwargs.get('mac_len', self.block_size) + if not (self._mac_len and 4 <= self._mac_len <= self.block_size): + raise ValueError("Parameter 'mac_len' must not be larger than %d" + % self.block_size) + + self._omac = [ + CMAC.new(key, bchr(0) * (self.block_size - 1) + bchr(i), + ciphermod=factory) + for i in xrange(0, 3) + ] + + # Compute MAC of nonce + self._omac[0].update(self.nonce) + + # MAC of the nonce is also the initial counter for CTR encryption + counter_int = bytes_to_long(self._omac[0].digest()) + counter_obj = Crypto.Util.Counter.new( + self.block_size * 8, + initial_value=counter_int, + allow_wraparound=True) + self._cipher = factory.new(key, MODE_CTR, counter=counter_obj) + def _start_PGP(self, factory, key, *args, **kwargs): # OPENPGP mode. For details, see 13.9 in RCC4880. # @@ -348,7 +409,7 @@ class BlockAlgo: def update(self, assoc_data): """Protect associated data - When using an AEAD mode like CCM, and if there is any associated data, + When using an AEAD mode like CCM or EAX, and if there is any associated data, the caller has to invoke this function one or more times, before using ``decrypt`` or ``encrypt``. @@ -356,7 +417,7 @@ class BlockAlgo: will not be encrypted and will be transmitted in the clear. However, the receiver is still able to detect any modification to it. In CCM, the *associated data* is also called *additional authenticated - data*. + data*. In EAX, the *associated data* is called *header*. If there is no associated data, this method must not be called. @@ -368,11 +429,11 @@ class BlockAlgo: A piece of associated data. There are no restrictions on its size. """ - if self.mode == MODE_CCM: + if self.mode in (MODE_CCM, MODE_EAX): if self.update not in self._next: raise TypeError("update() can only be called immediately after initialization") - self._next = [ self.update, self.encrypt, self.decrypt, - self.digest, self.verify ] + self._next = [self.update, self.encrypt, self.decrypt, + self.digest, self.verify] return self._update(assoc_data) def _update(self, assoc_data, do_zero_padding=False): @@ -398,6 +459,11 @@ class BlockAlgo: self._assoc_buffer = [buf[aligned_data:]] self._assoc_buffer_len -= aligned_data return + + if self.mode == MODE_EAX: + self._omac[1].update(assoc_data) + return + raise ValueError("update() not supported by this mode of operation") def encrypt(self, plaintext): @@ -425,8 +491,8 @@ class BlockAlgo: - For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM` *plaintext* can be - of any length. + - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX` + *plaintext* can be of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. @@ -460,12 +526,12 @@ class BlockAlgo: self._done_first_block = True return res - if self.mode == MODE_CCM: - + if self.mode in (MODE_CCM, MODE_EAX): if self.encrypt not in self._next: raise TypeError("encrypt() can only be called after initialization or an update()") self._next = [self.encrypt, self.digest] + if self.mode == MODE_CCM: if self._assoc_len is None: self._start_ccm(assoc_len=self._assoc_buffer_len) if self._msg_len is None: @@ -477,7 +543,12 @@ class BlockAlgo: self._update(plaintext) - return self._cipher.encrypt(plaintext) + ct = self._cipher.encrypt(plaintext) + + if self.mode == MODE_EAX: + self._omac[2].update(ct) + + return ct def decrypt(self, ciphertext): """Decrypt data with the key and the parameters set at initialization. @@ -504,7 +575,7 @@ class BlockAlgo: - For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM`, *ciphertext* can be + - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, and `MODE_EAX`, *ciphertext* can be of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, @@ -532,20 +603,24 @@ class BlockAlgo: res = self._cipher.decrypt(ciphertext) return res - if self.mode == MODE_CCM: + if self.mode in (MODE_CCM, MODE_EAX): if self.decrypt not in self._next: raise TypeError("decrypt() can only be called after initialization or an update()") self._next = [self.decrypt, self.verify] - if self._assoc_len is None: - self._start_ccm(assoc_len=self._assoc_buffer_len) - if self._msg_len is None: - self._start_ccm(msg_len=len(ciphertext)) - self._next = [self.verify] - if not self._done_assoc_data: - self._update(b(""), do_zero_padding=True) - self._done_assoc_data = True + if self.mode == MODE_CCM: + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=len(ciphertext)) + self._next = [self.verify] + if not self._done_assoc_data: + self._update(b(""), do_zero_padding=True) + self._done_assoc_data = True + + if self.mode == MODE_EAX: + self._omac[2].update(ciphertext) pt = self._cipher.decrypt(ciphertext) @@ -557,8 +632,8 @@ class BlockAlgo: def digest(self): """Compute the *binary* MAC tag in an AEAD mode. - When using an AEAD mode like CCM, the caller invokes this function - at the very end. + When using an AEAD mode like CCM or EAX, the caller invokes + this function at the very end. This method returns the MAC that shall be sent to the receiver, together with the ciphertext. @@ -566,19 +641,28 @@ class BlockAlgo: :Return: the MAC, as a byte string. """ - if self.mode == MODE_CCM: + if self.mode in (MODE_CCM, MODE_EAX): if self.digest not in self._next: raise TypeError("digest() cannot be called when decrypting or validating a message") self._next = [self.digest] - if self._assoc_len is None: - self._start_ccm(assoc_len=self._assoc_buffer_len) - if self._msg_len is None: - self._start_ccm(msg_len=0) - self._update(b(""), do_zero_padding=True) + if self.mode == MODE_CCM: + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + tag = strxor(self._t, self._s_0)[:self._mac_len] + + if self.mode == MODE_EAX: - return strxor(self._t, self._s_0)[:self._mac_len] + tag = bchr(0) * self.block_size + for i in xrange(3): + tag = strxor(tag, self._omac[i].digest()) + + return tag raise TypeError("digest() not supported by this mode of operation") @@ -594,8 +678,8 @@ class BlockAlgo: def verify(self, mac_tag): """Validate the *binary* MAC tag in an AEAD mode. - When using an AEAD mode like CCM, the caller invokes this function - at the very end. + When using an AEAD mode like CCM or EAX, the caller invokes + this function at the very end. This method checks if the decrypted message is indeed valid (that is, if the key is correct) and it has not been @@ -609,17 +693,26 @@ class BlockAlgo: or the key is incorrect. """ - if self.mode == MODE_CCM: + if self.mode in (MODE_CCM, MODE_EAX): if self.verify not in self._next: raise TypeError("verify() cannot be called when encrypting a message") self._next = [self.verify] - if self._assoc_len is None: - self._start_ccm(assoc_len=self._assoc_buffer_len) - if self._msg_len is None: - self._start_ccm(msg_len=0) - self._update(b(""), do_zero_padding=True) - u = strxor(self._t, self._s_0)[:self._mac_len] + if self.mode == MODE_CCM: + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + u = strxor(self._t, self._s_0)[:self._mac_len] + + if self.mode == MODE_EAX: + + u = bchr(0)*self.block_size + for i in xrange(3): + u = strxor(u, self._omac[i].digest()) + u = u[:self._mac_len] res = 0 # Constant-time comparison |