summaryrefslogtreecommitdiff
path: root/lib/Crypto/Cipher/blockalgo.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Crypto/Cipher/blockalgo.py')
-rw-r--r--lib/Crypto/Cipher/blockalgo.py171
1 files changed, 132 insertions, 39 deletions
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py
index 57738c4..be00832 100644
--- a/lib/Crypto/Cipher/blockalgo.py
+++ b/lib/Crypto/Cipher/blockalgo.py
@@ -31,7 +31,9 @@ from binascii import unhexlify
from Crypto.Util import Counter
from Crypto.Util.strxor import strxor
-from Crypto.Util.number import long_to_bytes
+from Crypto.Util.number import long_to_bytes, bytes_to_long
+import Crypto.Util.Counter
+from Crypto.Hash import CMAC
#: *Electronic Code Book (ECB)*.
#: This is the simplest encryption mode. Each of the plaintext blocks
@@ -184,6 +186,31 @@ MODE_OPENPGP = 7
#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
MODE_CCM = 8
+#: *EAX*. This is an Authenticated Encryption with Associated Data
+#: (`AEAD`_) mode. It provides both confidentiality and authenticity.
+#:
+#: The header of the message may be left in the clear, if needed, and it will
+#: still be subject to authentication.
+#:
+#: The decryption step tells the receiver if the message comes from a source
+#: that really knowns the secret key.
+#: Additionally, decryption detects if any part of the message - including the
+#: header - has been modified or corrupted.
+#:
+#: This mode requires a nonce. The nonce shall never repeat for two
+#: different messages encrypted with the same key, but it does not need to
+#: be random.
+#
+#: This mode is only available for ciphers that operate on 64 or
+#: 128 bits blocks.
+#:
+#: There are no official standards defining EAX. The implementation is based on
+#: `a proposal`__ that was presented to NIST.
+#:
+#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
+#: .. __: http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/proposedmodes/eax/eax-spec.pdf
+MODE_EAX = 9
+
def _getParameter(name, index, args, kwargs, default=None):
"""Find a parameter in tuple and dictionary arguments a function receives"""
@@ -236,10 +263,44 @@ class BlockAlgo:
elif self.mode == MODE_OPENPGP:
self._start_PGP(factory, key, *args, **kwargs)
+ elif self.mode == MODE_EAX:
+ self._start_eax(factory, key, *args, **kwargs)
else:
self._cipher = factory.new(key, *args, **kwargs)
self.IV = self._cipher.IV
+ def _start_eax(self, factory, key, *args, **kwargs):
+
+ self.nonce = _getParameter('nonce', 1, args, kwargs)
+ if not self.nonce:
+ raise ValueError("MODE_EAX requires a nonce")
+
+ # Allowed transitions after initialization
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
+
+ self._mac_len = kwargs.get('mac_len', self.block_size)
+ if not (self._mac_len and 4 <= self._mac_len <= self.block_size):
+ raise ValueError("Parameter 'mac_len' must not be larger than %d"
+ % self.block_size)
+
+ self._omac = [
+ CMAC.new(key, bchr(0) * (self.block_size - 1) + bchr(i),
+ ciphermod=factory)
+ for i in xrange(0, 3)
+ ]
+
+ # Compute MAC of nonce
+ self._omac[0].update(self.nonce)
+
+ # MAC of the nonce is also the initial counter for CTR encryption
+ counter_int = bytes_to_long(self._omac[0].digest())
+ counter_obj = Crypto.Util.Counter.new(
+ self.block_size * 8,
+ initial_value=counter_int,
+ allow_wraparound=True)
+ self._cipher = factory.new(key, MODE_CTR, counter=counter_obj)
+
def _start_PGP(self, factory, key, *args, **kwargs):
# OPENPGP mode. For details, see 13.9 in RCC4880.
#
@@ -348,7 +409,7 @@ class BlockAlgo:
def update(self, assoc_data):
"""Protect associated data
- When using an AEAD mode like CCM, and if there is any associated data,
+ When using an AEAD mode like CCM or EAX, and if there is any associated data,
the caller has to invoke this function one or more times, before
using ``decrypt`` or ``encrypt``.
@@ -356,7 +417,7 @@ class BlockAlgo:
will not be encrypted and will be transmitted in the clear.
However, the receiver is still able to detect any modification to it.
In CCM, the *associated data* is also called *additional authenticated
- data*.
+ data*. In EAX, the *associated data* is called *header*.
If there is no associated data, this method must not be called.
@@ -368,11 +429,11 @@ class BlockAlgo:
A piece of associated data. There are no restrictions on its size.
"""
- if self.mode == MODE_CCM:
+ if self.mode in (MODE_CCM, MODE_EAX):
if self.update not in self._next:
raise TypeError("update() can only be called immediately after initialization")
- self._next = [ self.update, self.encrypt, self.decrypt,
- self.digest, self.verify ]
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
return self._update(assoc_data)
def _update(self, assoc_data, do_zero_padding=False):
@@ -398,6 +459,11 @@ class BlockAlgo:
self._assoc_buffer = [buf[aligned_data:]]
self._assoc_buffer_len -= aligned_data
return
+
+ if self.mode == MODE_EAX:
+ self._omac[1].update(assoc_data)
+ return
+
raise ValueError("update() not supported by this mode of operation")
def encrypt(self, plaintext):
@@ -425,8 +491,8 @@ class BlockAlgo:
- For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM` *plaintext* can be
- of any length.
+ - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX`
+ *plaintext* can be of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
unless it is the last chunk of the message.
@@ -460,12 +526,12 @@ class BlockAlgo:
self._done_first_block = True
return res
- if self.mode == MODE_CCM:
-
+ if self.mode in (MODE_CCM, MODE_EAX):
if self.encrypt not in self._next:
raise TypeError("encrypt() can only be called after initialization or an update()")
self._next = [self.encrypt, self.digest]
+ if self.mode == MODE_CCM:
if self._assoc_len is None:
self._start_ccm(assoc_len=self._assoc_buffer_len)
if self._msg_len is None:
@@ -477,7 +543,12 @@ class BlockAlgo:
self._update(plaintext)
- return self._cipher.encrypt(plaintext)
+ ct = self._cipher.encrypt(plaintext)
+
+ if self.mode == MODE_EAX:
+ self._omac[2].update(ct)
+
+ return ct
def decrypt(self, ciphertext):
"""Decrypt data with the key and the parameters set at initialization.
@@ -504,7 +575,7 @@ class BlockAlgo:
- For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM`, *ciphertext* can be
+ - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, and `MODE_EAX`, *ciphertext* can be
of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
@@ -532,20 +603,24 @@ class BlockAlgo:
res = self._cipher.decrypt(ciphertext)
return res
- if self.mode == MODE_CCM:
+ if self.mode in (MODE_CCM, MODE_EAX):
if self.decrypt not in self._next:
raise TypeError("decrypt() can only be called after initialization or an update()")
self._next = [self.decrypt, self.verify]
- if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
- if self._msg_len is None:
- self._start_ccm(msg_len=len(ciphertext))
- self._next = [self.verify]
- if not self._done_assoc_data:
- self._update(b(""), do_zero_padding=True)
- self._done_assoc_data = True
+ if self.mode == MODE_CCM:
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=len(ciphertext))
+ self._next = [self.verify]
+ if not self._done_assoc_data:
+ self._update(b(""), do_zero_padding=True)
+ self._done_assoc_data = True
+
+ if self.mode == MODE_EAX:
+ self._omac[2].update(ciphertext)
pt = self._cipher.decrypt(ciphertext)
@@ -557,8 +632,8 @@ class BlockAlgo:
def digest(self):
"""Compute the *binary* MAC tag in an AEAD mode.
- When using an AEAD mode like CCM, the caller invokes this function
- at the very end.
+ When using an AEAD mode like CCM or EAX, the caller invokes
+ this function at the very end.
This method returns the MAC that shall be sent to the receiver,
together with the ciphertext.
@@ -566,19 +641,28 @@ class BlockAlgo:
:Return: the MAC, as a byte string.
"""
- if self.mode == MODE_CCM:
+ if self.mode in (MODE_CCM, MODE_EAX):
if self.digest not in self._next:
raise TypeError("digest() cannot be called when decrypting or validating a message")
self._next = [self.digest]
- if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
- if self._msg_len is None:
- self._start_ccm(msg_len=0)
- self._update(b(""), do_zero_padding=True)
+ if self.mode == MODE_CCM:
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=0)
+ self._update(b(""), do_zero_padding=True)
+ tag = strxor(self._t, self._s_0)[:self._mac_len]
+
+ if self.mode == MODE_EAX:
- return strxor(self._t, self._s_0)[:self._mac_len]
+ tag = bchr(0) * self.block_size
+ for i in xrange(3):
+ tag = strxor(tag, self._omac[i].digest())
+
+ return tag
raise TypeError("digest() not supported by this mode of operation")
@@ -594,8 +678,8 @@ class BlockAlgo:
def verify(self, mac_tag):
"""Validate the *binary* MAC tag in an AEAD mode.
- When using an AEAD mode like CCM, the caller invokes this function
- at the very end.
+ When using an AEAD mode like CCM or EAX, the caller invokes
+ this function at the very end.
This method checks if the decrypted message is indeed valid
(that is, if the key is correct) and it has not been
@@ -609,17 +693,26 @@ class BlockAlgo:
or the key is incorrect.
"""
- if self.mode == MODE_CCM:
+ if self.mode in (MODE_CCM, MODE_EAX):
if self.verify not in self._next:
raise TypeError("verify() cannot be called when encrypting a message")
self._next = [self.verify]
- if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
- if self._msg_len is None:
- self._start_ccm(msg_len=0)
- self._update(b(""), do_zero_padding=True)
- u = strxor(self._t, self._s_0)[:self._mac_len]
+ if self.mode == MODE_CCM:
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=0)
+ self._update(b(""), do_zero_padding=True)
+ u = strxor(self._t, self._s_0)[:self._mac_len]
+
+ if self.mode == MODE_EAX:
+
+ u = bchr(0)*self.block_size
+ for i in xrange(3):
+ u = strxor(u, self._omac[i].digest())
+ u = u[:self._mac_len]
res = 0
# Constant-time comparison