summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorLegrandin <helderijs@gmail.com>2013-05-28 23:57:56 +0200
committerDwayne Litzenberger <dlitz@dlitz.net>2013-10-20 13:30:21 -0700
commit5d7ab24c513fb43a604ad23b23d040a53069c4db (patch)
treeb024034c16e14454e94756690c88b9acb52e0422 /lib
parent199a9741a1849066d070b114333fcf90bc73c55a (diff)
downloadpycrypto-5d7ab24c513fb43a604ad23b23d040a53069c4db.tar.gz
Add support for GCM mode (AES only).
The main change done by this commit is adding support for MODE_GCM (NIST SP 800 38D). Test vectors are included. The mode uses a C extension (Crypto.Util.galois._ghash) to compute the GHASH step. The C implementation is the most basic one and it is still significantly (5x times) slower than CTR. Optimizations can be introduced using tables (CPU/memory trade-off) or even AES NI instructions on newer x86 CPUs. This patch also simplifies Crypto.Cipher.blockalgo.py by: * removing duplicated code previously shared by digest() and verify(). * removing duplicated code previously shared by Crypto.Hash.CMAC and Crypto.Cipher.block_algo (management of internal buffers for MACs that can only operate on block aligned data, like CMAC, CBCMAC, and now also GHASH). [dlitz@dlitz.net: Included changes from the following commits from the author's pull request:] - [9c13f9c] Rename 'IV' parameter to 'nonce' for AEAD modes. - [ca460a7] Made blockalgo.py more PEP-8 compliant; The second parameter of the _GHASH constructor is now the length of the block (block_size) and not the full module. [dlitz@dlitz.net: Replaced MacMismatchError with ValueError] [dlitz@dlitz.net: Replaced ApiUsageError with TypeError] [dlitz@dlitz.net: Replaced renamed variable `ht` with original `h`] [dlitz@dlitz.net: Whitespace fixed with "git rebase --whitespace=fix"]
Diffstat (limited to 'lib')
-rw-r--r--lib/Crypto/Cipher/AES.py6
-rw-r--r--lib/Crypto/Cipher/blockalgo.py353
-rw-r--r--lib/Crypto/Hash/CMAC.py135
-rw-r--r--lib/Crypto/SelfTest/Cipher/common.py4
-rw-r--r--lib/Crypto/SelfTest/Cipher/test_AES.py197
5 files changed, 537 insertions, 158 deletions
diff --git a/lib/Crypto/Cipher/AES.py b/lib/Crypto/Cipher/AES.py
index 0a53f54..b18b7d0 100644
--- a/lib/Crypto/Cipher/AES.py
+++ b/lib/Crypto/Cipher/AES.py
@@ -140,7 +140,7 @@ def new(key, *args, **kwargs):
For all other modes, it must be 16 bytes long.
nonce : byte string
- (*Only* `MODE_CCM`, `MODE_EAX`, `MODE_SIV`).
+ (*Only* `MODE_CCM`, `MODE_EAX`, `MODE_GCM`, `MODE_SIV`).
A mandatory value that must never be reused for any other encryption.
@@ -163,7 +163,7 @@ def new(key, *args, **kwargs):
(*Only* `MODE_CCM`). Length of the MAC, in bytes. It must be even and in
the range ``[4..16]``. The default is 16.
- (*Only* `MODE_EAX`). Length of the MAC, in bytes. It must be no
+ (*Only* `MODE_EAX` and `MODE_GCM`). Length of the MAC, in bytes. It must be no
larger than 16 bytes (which is the default).
msg_len : integer
(*Only* `MODE_CCM`). Length of the message to (de)cipher.
@@ -198,6 +198,8 @@ MODE_CCM = 8
MODE_EAX = 9
#: Syntethic Initialization Vector (SIV). See `blockalgo.MODE_SIV`.
MODE_SIV = 10
+#: Galois Counter Mode (GCM). See `blockalgo.MODE_GCM`.
+MODE_GCM = 11
#: Size of a data block (in bytes)
block_size = 16
#: Size of a key (in bytes)
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py
index aab7873..7ab892a 100644
--- a/lib/Crypto/Cipher/blockalgo.py
+++ b/lib/Crypto/Cipher/blockalgo.py
@@ -34,8 +34,11 @@ from Crypto.Util.strxor import strxor
from Crypto.Util.number import long_to_bytes, bytes_to_long
import Crypto.Util.Counter
from Crypto.Hash import CMAC
+from Crypto.Hash.CMAC import _SmoothMAC
from Crypto.Protocol.KDF import S2V
+from Crypto.Util.galois import _ghash
+
#: *Electronic Code Book (ECB)*.
#: This is the simplest encryption mode. Each of the plaintext blocks
#: is directly encrypted into a ciphertext block, independently of
@@ -257,6 +260,28 @@ MODE_EAX = 9
#: .. __: http://www.cs.ucdavis.edu/~rogaway/papers/keywrap.pdf
MODE_SIV = 10
+#: *Galois/Counter Mode (GCM)*. This is an Authenticated Encryption with
+#: Associated Data (`AEAD`_) mode. It provides both confidentiality and
+#: authenticity.
+#: The header of the message may be left in the clear, if needed, and it will
+#: still be subject to authentication. The decryption step tells the receiver
+#: if the message comes from a source that really knowns the secret key.
+#: Additionally, decryption detects if any part of the message - including the
+#: header - has been modified or corrupted.
+#:
+#: This mode requires a nonce. The nonce shall never repeat for two
+#: different messages encrypted with the same key, but it does not need to
+#: be random.
+#:
+#: This mode is only available for ciphers that operate on 128 bits blocks
+#: (e.g. AES but not TDES).
+#:
+#: See `NIST SP800-38D`_ .
+#:
+#: .. _`NIST SP800-38D`: http://csrc.nist.gov/publications/nistpubs/800-38D/SP-800-38D.pdf
+#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
+MODE_GCM = 11
+
def _getParameter(name, index, args, kwargs, default=None):
"""Find a parameter in tuple and dictionary arguments a function receives"""
@@ -269,6 +294,60 @@ def _getParameter(name, index, args, kwargs, default=None):
return param or default
+class _CBCMAC(_SmoothMAC):
+
+ def __init__(self, key, ciphermod):
+ _SmoothMAC.__init__(self, ciphermod.block_size, None, 0)
+ self._key = key
+ self._factory = ciphermod
+
+ def ignite(self, data):
+ if self._mac:
+ raise TypeError("ignite() cannot be called twice")
+
+ self._buffer.insert(0, data)
+ self._buffer_len += len(data)
+ self._mac = self._factory.new(self._key, MODE_CBC, bchr(0) * 16)
+ self.update(b(""))
+
+ def _update(self, block_data):
+ self._t = self._mac.encrypt(block_data)[-16:]
+
+ def _digest(self, left_data):
+ return self._t
+
+
+class _GHASH(_SmoothMAC):
+ """GHASH function defined in NIST SP 800-38D, Algorithm 2.
+
+ If X_1, X_2, .. X_m are the blocks of input data, the function
+ computes:
+
+ X_1*H^{m} + X_2*H^{m-1} + ... + X_m*H
+
+ in the Galois field GF(2^256) using the reducing polynomial
+ (x^128 + x^7 + x^2 + x + 1).
+ """
+
+ def __init__(self, hash_subkey, block_size):
+ _SmoothMAC.__init__(self, block_size, None, 0)
+ self._hash_subkey = hash_subkey
+ self._last_y = bchr(0) * 16
+ self._mac = _ghash
+
+ def copy(self):
+ clone = _GHASH(self._hash_subkey, self._bs)
+ _SmoothMAC._deep_copy(self, clone)
+ clone._last_y = self._last_y
+ return clone
+
+ def _update(self, block_data):
+ self._last_y = _ghash(block_data, self._last_y, self._hash_subkey)
+
+ def _digest(self, left_data):
+ return self._last_y
+
+
class BlockAlgo:
"""Class modelling an abstract block cipher."""
@@ -276,6 +355,7 @@ class BlockAlgo:
self.mode = _getParameter('mode', 0, args, kwargs, default=MODE_ECB)
self.block_size = factory.block_size
self._factory = factory
+ self._tag = None
if self.mode == MODE_CCM:
if self.block_size != 16:
@@ -294,9 +374,7 @@ class BlockAlgo:
self._msg_len = kwargs.get('msg_len', None) # p
self._assoc_len = kwargs.get('assoc_len', None) # a
- self._assoc_buffer = []
- self._assoc_buffer_len = 0
- self._cipherCBC = None # To be used for MAC
+ self._cipherMAC = _CBCMAC(key, factory)
self._done_assoc_data = False # True when all associated data
# has been processed
@@ -313,10 +391,63 @@ class BlockAlgo:
self._start_eax(factory, key, *args, **kwargs)
elif self.mode == MODE_SIV:
self._start_siv(factory, key, *args, **kwargs)
+ elif self.mode == MODE_GCM:
+ self._start_gcm(factory, key, *args, **kwargs)
else:
self._cipher = factory.new(key, *args, **kwargs)
self.IV = self._cipher.IV
+ def _start_gcm(self, factory, key, *args, **kwargs):
+
+ if self.block_size != 16:
+ raise ValueError("GCM mode is only available for ciphers that operate on 128 bits blocks")
+
+ self.nonce = _getParameter('nonce', 1, args, kwargs)
+ if not self.nonce:
+ raise ValueError("MODE_GCM requires a nonce")
+
+ self._mac_len = kwargs.get('mac_len', 16)
+ if not (self._mac_len and 4 <= self._mac_len <= 16):
+ raise ValueError("Parameter 'mac_len' must not be larger than 16 bytes")
+
+ # Allowed transitions after initialization
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
+
+ self._done_assoc_data = False
+
+ # Length of the ciphertext or plaintext
+ self._msg_len = 0
+
+ # Step 1 in SP800-38D, Algorithm 4 (encryption) - Compute H
+ # See also Algorithm 5 (decryption)
+ hash_subkey = factory.new(key).encrypt(bchr(0) * 16)
+
+ # Step 2 - Compute J0 (integer, not byte string!)
+ if len(self.nonce) == 12:
+ self._j0 = bytes_to_long(self.nonce + b("\x00\x00\x00\x01"))
+ else:
+ fill = (16 - (len(self.nonce) % 16)) % 16 + 8
+ ghash_in = (self.nonce +
+ bchr(0) * fill +
+ long_to_bytes(8 * len(self.nonce), 8))
+
+ mac = _GHASH(hash_subkey, factory.block_size)
+ mac.update(ghash_in)
+ self._j0 = bytes_to_long(mac.digest())
+
+ # Step 3 - Prepare GCTR cipher for encryption/decryption
+ ctr = Counter.new(128, initial_value=self._j0 + 1,
+ allow_wraparound=True)
+ self._cipher = self._factory.new(key, MODE_CTR, counter=ctr)
+
+ # Step 5 - Bootstrat GHASH
+ self._cipherMAC = _GHASH(hash_subkey, factory.block_size)
+
+ # Step 6 - Prepare GCTR cipher for GMAC
+ ctr = Counter.new(128, initial_value=self._j0, allow_wraparound=True)
+ self._tag_cipher = self._factory.new(key, MODE_CTR, counter=ctr)
+
def _start_siv(self, factory, key, *args, **kwargs):
subkey_size, rem = divmod(len(key), 2)
@@ -326,10 +457,12 @@ class BlockAlgo:
# IV is optional
self.nonce = _getParameter('nonce', 1, args, kwargs)
- self._prf = S2V(key[:subkey_size], ciphermod=factory)
+ self._cipherMAC = S2V(key[:subkey_size], ciphermod=factory)
self._subkey_ctr = key[subkey_size:]
self._mac_len = factory.block_size
+ self._cipherMAC = self._cipherMAC
+
# Allowed transitions after initialization
self._next = [self.update, self.encrypt, self.decrypt,
self.digest, self.verify]
@@ -369,6 +502,8 @@ class BlockAlgo:
# Compute MAC of nonce
self._omac[0].update(self.nonce)
+ self._cipherMAC = self._omac[1]
+
# MAC of the nonce is also the initial counter for CTR encryption
counter_int = bytes_to_long(self._omac[0].digest())
counter_obj = Crypto.Util.Counter.new(
@@ -429,12 +564,12 @@ class BlockAlgo:
def _start_ccm(self, assoc_len=None, msg_len=None):
# CCM mode. This method creates the 2 ciphers used for the MAC
- # (self._cipherCBC) and for the encryption/decryption (self._cipher).
+ # (self._cipherMAC) and for the encryption/decryption (self._cipher).
#
# Member _assoc_buffer may already contain user data that needs to be
# authenticated.
- if self._cipherCBC:
+ if self._cipherMAC.can_reduce():
# Already started
return
if assoc_len is not None:
@@ -454,12 +589,8 @@ class BlockAlgo:
(q - 1)
)
b_0 = bchr(flags) + self.nonce + long_to_bytes(self._msg_len, q)
- self._assoc_buffer.insert(0, b_0)
- self._assoc_buffer_len += 16
# Start CBC MAC with zero IV
- # Mind that self._assoc_buffer may already contain some data
- self._cipherCBC = self._factory.new(self._key, MODE_CBC, bchr(0)*16)
assoc_len_encoded = b('')
if self._assoc_len > 0:
if self._assoc_len < (2 ** 16 - 2 ** 8):
@@ -471,12 +602,10 @@ class BlockAlgo:
assoc_len_encoded = b('\xFF\xFF')
enc_size = 8
assoc_len_encoded += long_to_bytes(self._assoc_len, enc_size)
- self._assoc_buffer.insert(1, assoc_len_encoded)
- self._assoc_buffer_len += len(assoc_len_encoded)
+ self._cipherMAC.ignite(b_0 + assoc_len_encoded)
# Start CTR cipher
- flags = q - 1
- prefix = bchr(flags) + self.nonce
+ prefix = bchr(q - 1) + self.nonce
ctr = Counter.new(128 - len(prefix) * 8, prefix, initial_value=0)
self._cipher = self._factory.new(self._key, MODE_CTR, counter=ctr)
# Will XOR against CBC MAC
@@ -485,7 +614,7 @@ class BlockAlgo:
def update(self, assoc_data):
"""Protect associated data
- When using an AEAD mode like CCM, EAX or SIV, and
+ When using an AEAD mode like CCM, EAX, GCM or SIV, and
if there is any associated data, the caller has to invoke
this function one or more times, before using
``decrypt`` or ``encrypt``.
@@ -493,8 +622,9 @@ class BlockAlgo:
By *associated data* it is meant any data (e.g. packet headers) that
will not be encrypted and will be transmitted in the clear.
However, the receiver is still able to detect any modification to it.
- In CCM, the *associated data* is also called *additional authenticated
- data*. In EAX, the *associated data* is called *header*.
+ In CCM and GCM, the *associated data* is also called
+ *additional authenticated data* (AAD).
+ In EAX, the *associated data* is called *header*.
If there is no associated data, this method must not be called.
@@ -506,46 +636,16 @@ class BlockAlgo:
A piece of associated data. There are no restrictions on its size.
"""
- if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
- if self.update not in self._next:
- raise TypeError("update() can only be called immediately after initialization")
- self._next = [self.update, self.encrypt, self.decrypt,
- self.digest, self.verify]
- return self._update(assoc_data)
-
- def _update(self, assoc_data, do_zero_padding=False):
- """Equivalent to update(), but without FSM checks."""
+ if self.mode not in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM):
+ raise ValueError("update() not supported by this mode of operation")
- if self.mode == MODE_CCM:
- self._assoc_buffer.append(assoc_data)
- self._assoc_buffer_len += len(assoc_data)
-
- if not self._cipherCBC:
- return
-
- if do_zero_padding and (self._assoc_buffer_len & 15):
- npad = 16 - self._assoc_buffer_len & 15
- self._assoc_buffer.append(bchr(0) * npad)
- self._assoc_buffer_len += npad
-
- # Feed data into CBC MAC
- aligned_data = 16 * divmod(self._assoc_buffer_len, 16)[0]
- if aligned_data > 0:
- buf = b("").join(self._assoc_buffer)
- self._t = self._cipherCBC.encrypt(buf[:aligned_data])[-16:]
- self._assoc_buffer = [buf[aligned_data:]]
- self._assoc_buffer_len -= aligned_data
- return
+ if self.update not in self._next:
+ raise TypeError("update() can only be called immediately after initialization")
- if self.mode == MODE_EAX:
- self._omac[1].update(assoc_data)
- return
-
- if self.mode == MODE_SIV:
- self._prf.update(assoc_data)
- return
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
- raise ValueError("update() not supported by this mode of operation")
+ return self._cipherMAC.update(assoc_data)
def encrypt(self, plaintext):
"""Encrypt data with the key and the parameters set at initialization.
@@ -579,7 +679,7 @@ class BlockAlgo:
- For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, `MODE_EAX` and `MODE_SIV`
+ - For `MODE_OFB`, `MODE_CTR` and all AEAD modes
*plaintext* can be of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
@@ -614,37 +714,44 @@ class BlockAlgo:
self._done_first_block = True
return res
- if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM):
if self.encrypt not in self._next:
raise TypeError("encrypt() can only be called after initialization or an update()")
self._next = [self.encrypt, self.digest]
if self.mode == MODE_CCM:
if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
+ self._start_ccm(assoc_len=self._cipherMAC.get_len())
if self._msg_len is None:
self._start_ccm(msg_len=len(plaintext))
self._next = [self.digest]
if not self._done_assoc_data:
- self._update(b(""), do_zero_padding=True)
+ self._cipherMAC.zero_pad()
self._done_assoc_data = True
- self._update(plaintext)
+ self._cipherMAC.update(plaintext)
if self.mode == MODE_SIV:
self._next = [self.digest]
if self.nonce:
- self._prf.update(self.nonce)
+ self._cipherMAC.update(self.nonce)
- self._prf.update(plaintext)
- self._cipher = self._siv_ctr_cipher(self._prf.derive())
+ self._cipherMAC.update(plaintext)
+ self._cipher = self._siv_ctr_cipher(self._cipherMAC.derive())
ct = self._cipher.encrypt(plaintext)
if self.mode == MODE_EAX:
self._omac[2].update(ct)
+ if self.mode == MODE_GCM:
+ if not self._done_assoc_data:
+ self._cipherMAC.zero_pad()
+ self._done_assoc_data = True
+ self._cipherMAC.update(ct)
+ self._msg_len += len(plaintext)
+
return ct
def decrypt(self, ciphertext):
@@ -679,7 +786,7 @@ class BlockAlgo:
- For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX`
+ - For `MODE_OFB`, `MODE_CTR` and all AEAD modes
*ciphertext* can be of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
@@ -711,7 +818,7 @@ class BlockAlgo:
res = self._cipher.decrypt(ciphertext)
return res
- if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM):
if self.decrypt not in self._next:
raise TypeError("decrypt() can only be called after initialization or an update()")
@@ -719,14 +826,22 @@ class BlockAlgo:
if self.mode == MODE_CCM:
if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
+ self._start_ccm(assoc_len=self._cipherMAC.get_len())
if self._msg_len is None:
self._start_ccm(msg_len=len(ciphertext))
self._next = [self.verify]
if not self._done_assoc_data:
- self._update(b(""), do_zero_padding=True)
+ self._cipherMAC.zero_pad()
+ self._done_assoc_data = True
+
+ if self.mode == MODE_GCM:
+ if not self._done_assoc_data:
+ self._cipherMAC.zero_pad()
self._done_assoc_data = True
+ self._cipherMAC.update(ciphertext)
+ self._msg_len += len(ciphertext)
+
if self.mode == MODE_EAX:
self._omac[2].update(ciphertext)
@@ -743,13 +858,13 @@ class BlockAlgo:
pt = self._cipher.decrypt(ciphertext)
if self.mode == MODE_CCM:
- self._update(pt)
+ self._cipherMAC.update(pt)
if self.mode == MODE_SIV:
if self.nonce:
- self._prf.update(self.nonce)
+ self._cipherMAC.update(self.nonce)
if pt:
- self._prf.update(pt)
+ self._cipherMAC.update(pt)
return pt
@@ -765,33 +880,53 @@ class BlockAlgo:
:Return: the MAC, as a byte string.
"""
- if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
+ if self.mode not in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM):
+ raise TypeError("digest() not supported by this mode of operation")
- if self.digest not in self._next:
- raise TypeError("digest() cannot be called when decrypting or validating a message")
- self._next = [self.digest]
+ if self.digest not in self._next:
+ raise TypeError("digest() cannot be called when decrypting or validating a message")
+ self._next = [self.digest]
- if self.mode == MODE_CCM:
+ return self._compute_mac()
- if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
- if self._msg_len is None:
- self._start_ccm(msg_len=0)
- self._update(b(""), do_zero_padding=True)
- tag = strxor(self._t, self._s_0)[:self._mac_len]
+ def _compute_mac(self):
+ """Compute MAC without any FSM checks."""
- if self.mode == MODE_EAX:
+ if self._tag:
+ return self._tag
- tag = bchr(0) * self.block_size
- for i in xrange(3):
- tag = strxor(tag, self._omac[i].digest())
+ if self.mode == MODE_CCM:
- if self.mode == MODE_SIV:
- tag = self._prf.derive()
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._cipherMAC.get_len())
+ if self._msg_len is None:
+ self._start_ccm(msg_len=0)
+ self._cipherMAC.zero_pad()
+ self._tag = strxor(self._cipherMAC.digest(),
+ self._s_0)[:self._mac_len]
+
+ if self.mode == MODE_GCM:
- return tag
+ # Step 5 in NIST SP 800-38D, Algorithm 4 - Compute S
+ self._cipherMAC.zero_pad()
+ auth_len = self._cipherMAC.get_len() - self._msg_len
+ for tlen in (auth_len, self._msg_len):
+ self._cipherMAC.update(long_to_bytes(8 * tlen, 8))
+ s_tag = self._cipherMAC.digest()
- raise TypeError("digest() not supported by this mode of operation")
+ # Step 6 - Compute T
+ self._tag = self._tag_cipher.encrypt(s_tag)[:self._mac_len]
+
+ if self.mode == MODE_EAX:
+ tag = bchr(0) * self.block_size
+ for i in xrange(3):
+ tag = strxor(tag, self._omac[i].digest())
+ self._tag = tag[:self._mac_len]
+
+ if self.mode == MODE_SIV:
+ self._tag = self._cipherMAC.derive()
+
+ return self._tag
def hexdigest(self):
"""Compute the *printable* MAC tag in an AEAD mode.
@@ -820,39 +955,19 @@ class BlockAlgo:
or the key is incorrect.
"""
- if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
- if self.verify not in self._next:
- raise TypeError("verify() cannot be called when encrypting a message")
- self._next = [self.verify]
-
- if self.mode == MODE_CCM:
-
- if self._assoc_len is None:
- self._start_ccm(assoc_len=self._assoc_buffer_len)
- if self._msg_len is None:
- self._start_ccm(msg_len=0)
- self._update(b(""), do_zero_padding=True)
- u = strxor(self._t, self._s_0)[:self._mac_len]
-
- if self.mode == MODE_EAX:
-
- u = bchr(0)*self.block_size
- for i in xrange(3):
- u = strxor(u, self._omac[i].digest())
- u = u[:self._mac_len]
+ if self.mode not in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM):
+ raise TypeError("verify() not supported by this mode of operation")
- if self.mode == MODE_SIV:
- u = self._prf.derive()
-
- res = 0
- # Constant-time comparison
- for x,y in zip(u, mac_tag):
- res |= bord(x) ^ bord(y)
- if res or len(mac_tag)!=self._mac_len:
- raise ValueError("MAC check failed")
- return
+ if self.verify not in self._next:
+ raise TypeError("verify() cannot be called when encrypting a message")
+ self._next = [self.verify]
- raise TypeError("verify() not supported by this mode of operation")
+ res = 0
+ # Constant-time comparison
+ for x, y in zip(self._compute_mac(), mac_tag):
+ res |= bord(x) ^ bord(y)
+ if res or len(mac_tag) != self._mac_len:
+ raise ValueError("MAC check failed")
def hexverify(self, hex_mac_tag):
"""Validate the *printable* MAC tag in an AEAD mode.
diff --git a/lib/Crypto/Hash/CMAC.py b/lib/Crypto/Hash/CMAC.py
index 1e3cf53..ac86729 100644
--- a/lib/Crypto/Hash/CMAC.py
+++ b/lib/Crypto/Hash/CMAC.py
@@ -79,7 +79,88 @@ def _shift_bytes(bs, xor_lsb=0):
num = (bytes_to_long(bs)<<1) ^ xor_lsb
return long_to_bytes(num, len(bs))[-len(bs):]
-class CMAC(object):
+class _SmoothMAC(object):
+ """Turn a MAC that only operates on aligned blocks of data
+ into a MAC with granularity of 1 byte."""
+
+ def __init__(self, block_size, msg=b(""), min_digest=0):
+ self._bs = block_size
+ #: Data waiting to be MAC-ed
+ self._buffer = []
+ self._buffer_len = 0
+ #: Data received via update()
+ self._total_len = 0
+ #: Minimum amount of bytes required by the final digest step
+ self._min_digest = min_digest
+ #: Block MAC object
+ self._mac = None
+ #: Cached digest
+ self._tag = None
+ if msg:
+ self.update(msg)
+
+ def can_reduce(self):
+ return (self._mac is not None)
+
+ def get_len(self):
+ return self._total_len
+
+ def zero_pad(self):
+ if self._buffer_len & (self._bs-1):
+ npad = self._bs - self._buffer_len & (self._bs-1)
+ self._buffer.append(bchr(0)*npad)
+ self._buffer_len += npad
+
+ def update(self, data):
+ # Optimization (try not to copy data if possible)
+ if self._buffer_len==0 and self.can_reduce() and\
+ self._min_digest==0 and len(data)%self._bs==0:
+ self._update(data)
+ self._total_len += len(data)
+ return
+
+ self._buffer.append(data)
+ self._buffer_len += len(data)
+ self._total_len += len(data)
+
+ # Feed data into MAC
+ blocks, rem = divmod(self._buffer_len, self._bs)
+ if rem<self._min_digest:
+ blocks -= 1
+ if blocks>0 and self.can_reduce():
+ aligned_data = blocks*self._bs
+ buf = b("").join(self._buffer)
+ self._update(buf[:aligned_data])
+ self._buffer = [ buf[aligned_data:] ]
+ self._buffer_len -= aligned_data
+
+ def _deep_copy(self, target):
+ # Copy everything by self._mac, since we don't know how to
+ target._buffer = self._buffer[:]
+ for m in [ '_bs', '_buffer_len', '_total_len', '_min_digest', '_tag' ]:
+ setattr(target, m, getattr(self, m))
+
+ def _update(self, data_block):
+ """Delegate to the implementation the update
+ of the MAC state given some new *block aligned* data."""
+ raise NotImplementedError("_update() must be still implemented")
+
+ def _digest(self, left_data):
+ """Delegate to the implementation the computation
+ of the final MAC given the current MAC state
+ and the last piece of data (not block aligned)."""
+ raise NotImplementedError("_digest() must be still implemented")
+
+ def digest(self):
+ if self._tag:
+ return self._tag
+ if self._buffer_len>0:
+ self.update(b(""))
+ left_data = b("").join(self._buffer)
+ self._tag = self._digest(left_data)
+ return self._tag
+
+class CMAC(_SmoothMAC):
"""Class that implements CMAC"""
#: The size of the authentication tag produced by the MAC.
@@ -105,6 +186,8 @@ class CMAC(object):
if ciphermod is None:
raise ValueError("ciphermod must be specified (try AES)")
+ _SmoothMAC.__init__(self, ciphermod.block_size, msg, 1)
+
self._key = key
self._factory = ciphermod
@@ -117,9 +200,6 @@ class CMAC(object):
raise ValueError("For CMAC, block length of the selected cipher must be 8 or 16 bytes")
self.digest_size = ciphermod.block_size
- # MAC cache
- self._tag = None
-
# Compute sub-keys
cipher = ciphermod.new(key, ciphermod.MODE_ECB)
l = cipher.encrypt(bchr(0)*ciphermod.block_size)
@@ -134,13 +214,7 @@ class CMAC(object):
# Initialize CBC cipher with zero IV
self._IV = bchr(0)*ciphermod.block_size
- self._cipherCBC = ciphermod.new(key, ciphermod.MODE_CBC, self._IV)
-
- self._buffer = []
- self._buffer_len = 0
-
- if msg is not None:
- self.update(msg)
+ self._mac = ciphermod.new(key, ciphermod.MODE_CBC, self._IV)
def update(self, msg):
"""Continue authentication of a message by consuming the next chunk of data.
@@ -159,18 +233,10 @@ class CMAC(object):
The next chunk of the message being authenticated
"""
- self._buffer += [ msg ]
- self._buffer_len += len(msg)
+ _SmoothMAC.update(self, msg)
- # MAC data as you go but leave at least 1 byte in the buffer
- bsize = self._cipherCBC.block_size
- if self._buffer_len>bsize:
- data = b("").join(self._buffer)
- self._buffer_len = self._buffer_len&(bsize-1)
- if self._buffer_len==0:
- self._buffer_len=bsize
- self._buffer = [ data[-self._buffer_len:] ]
- self._IV = self._cipherCBC.encrypt(data[:-self._buffer_len])[-bsize:]
+ def _update(self, data_block):
+ self._IV = self._mac.encrypt(data_block)[-self._mac.block_size:]
def copy(self):
"""Return a copy ("clone") of the MAC object.
@@ -184,10 +250,10 @@ class CMAC(object):
"""
obj = CMAC(self._key, ciphermod=self._factory)
- # Deep copy
- for m in [ '_tag', '_buffer', '_buffer_len', '_k1', '_k2', '_IV']:
+ _SmoothMAC._deep_copy(self, obj)
+ obj._mac = self._factory.new(self._key, self._factory.MODE_CBC, self._IV)
+ for m in [ '_tag', '_k1', '_k2', '_IV']:
setattr(obj, m, getattr(self, m))
- obj._cipherCBC = self._factory.new(self._key, self._factory.MODE_CBC, self._IV)
return obj
def digest(self):
@@ -200,17 +266,16 @@ class CMAC(object):
:Return: A byte string of `digest_size` bytes. It may contain non-ASCII
characters, including null bytes.
"""
+ return _SmoothMAC.digest(self)
- if not self._tag:
- data = b("").join(self._buffer)
- bsize = self._cipherCBC.block_size
- if len(data)==bsize:
- last_block = strxor(data, self._k1)
- else:
- last_block = strxor(data+bchr(128)+bchr(0)*(bsize-1-len(data)), self._k2)
- self._tag = self._cipherCBC.encrypt(last_block)
-
- return self._tag
+ def _digest(self, last_data):
+ if len(last_data)==self._bs:
+ last_block = strxor(last_data, self._k1)
+ else:
+ last_block = strxor(last_data+bchr(128)+
+ bchr(0)*(self._bs-1-len(last_data)), self._k2)
+ tag = self._mac.encrypt(last_block)
+ return tag
def hexdigest(self):
"""Return the **printable** MAC of the message that has been
diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py
index ed612b4..727086d 100644
--- a/lib/Crypto/SelfTest/Cipher/common.py
+++ b/lib/Crypto/SelfTest/Cipher/common.py
@@ -688,7 +688,7 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
extra_tests_added = 1
# Extract associated data and MAC for AEAD modes
- if p_mode in ('CCM', 'EAX', 'SIV'):
+ if p_mode in ('CCM', 'EAX', 'SIV', 'GCM'):
assoc_data, params['plaintext'] = params['plaintext'].split('|')
assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|')
params['assoc_data'] = assoc_data.split("-")
@@ -717,7 +717,7 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
CCMMACLengthTest(module),
CCMSplitEncryptionTest(module),
]
- for aead_mode in ("MODE_CCM","MODE_EAX", "MODE_SIV"):
+ for aead_mode in ("MODE_CCM","MODE_EAX", "MODE_SIV", "MODE_GCM"):
if hasattr(module, aead_mode):
key_sizes = []
try:
diff --git a/lib/Crypto/SelfTest/Cipher/test_AES.py b/lib/Crypto/SelfTest/Cipher/test_AES.py
index 89243c0..f54f473 100644
--- a/lib/Crypto/SelfTest/Cipher/test_AES.py
+++ b/lib/Crypto/SelfTest/Cipher/test_AES.py
@@ -1795,6 +1795,203 @@ test_data = [
'RFC5297 A.2',
dict(mode='SIV', nonce='09f911029d74e35bd84156c5635688c0')
),
+
+ # Test vectors for GCM taken from
+ # http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/proposedmodes/gcm/gcm-revised-spec.pdf
+ # This is a list of tuples with 5 items:
+ #
+ # 1. Header + '|' + plaintext
+ # 2. Header + '|' + ciphertext + '|' + MAC
+ # 3. AES-128 key
+ # 4. Description
+ # 5. Dictionary of parameters to be passed to AES.new().
+ # It must include the nonce.
+ #
+ ( '|',
+ '||58e2fccefa7e3061367f1d57a4e7455a',
+ '00000000000000000000000000000000',
+ 'GCM Test Case 1',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|00000000000000000000000000000000',
+ '|0388dace60b6a392f328c2b971b2fe78|ab6e47d42cec13bdf53a67b21257bddf',
+ '00000000000000000000000000000000',
+ 'GCM Test Case 2',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|d9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b391aafd255',
+ '|42831ec2217774244b7221b784d0d49ce3aa212f2c02a4e035c17e2329aca12e' +
+ '21d514b25466931c7d8f6a5aac84aa051ba30b396a0aac973d58e091473f5985|' +
+ '4d5c2af327cd64a62cf35abd2ba6fab4',
+ 'feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 3',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '42831ec2217774244b7221b784d0d49ce3aa212f2c02a4e035c17e2329aca12e' +
+ '21d514b25466931c7d8f6a5aac84aa051ba30b396a0aac973d58e091|' +
+ '5bc94fbc3221a5db94fae95ae7121a47',
+ 'feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 4',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '61353b4c2806934a777ff51fa22a4755699b2a714fcdc6f83766e5f97b6c7423' +
+ '73806900e49f24b22b097544d4896b424989b5e1ebac0f07c23f4598|' +
+ '3612d2e79e3b0785561be14aaca2fccb',
+ 'feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 5',
+ dict(mode='GCM', nonce='cafebabefacedbad')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '8ce24998625615b603a033aca13fb894be9112a5c3a211a8ba262a3cca7e2ca7' +
+ '01e4a9a4fba43c90ccdcb281d48c7c6fd62875d2aca417034c34aee5|' +
+ '619cc5aefffe0bfa462af43c1699d050',
+ 'feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 6',
+ dict(mode='GCM', nonce='9313225df88406e555909c5aff5269aa'+
+ '6a7a9538534f7da1e4c303d2a318a728c3c0c95156809539fcf0e2429a6b5254'+
+ '16aedbf5a0de6a57a637b39b' )
+ ),
+
+ ( '|',
+ '||cd33b28ac773f74ba00ed1f312572435',
+ '000000000000000000000000000000000000000000000000',
+ 'GCM Test Case 7',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|00000000000000000000000000000000',
+ '|98e7247c07f0fe411c267e4384b0f600|2ff58d80033927ab8ef4d4587514f0fb',
+ '000000000000000000000000000000000000000000000000',
+ 'GCM Test Case 8',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|d9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b391aafd255',
+ '|3980ca0b3c00e841eb06fac4872a2757859e1ceaa6efd984628593b40ca1e19c' +
+ '7d773d00c144c525ac619d18c84a3f4718e2448b2fe324d9ccda2710acade256|' +
+ '9924a7c8587336bfb118024db8674a14',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c',
+ 'GCM Test Case 9',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '3980ca0b3c00e841eb06fac4872a2757859e1ceaa6efd984628593b40ca1e19c' +
+ '7d773d00c144c525ac619d18c84a3f4718e2448b2fe324d9ccda2710|' +
+ '2519498e80f1478f37ba55bd6d27618c',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c',
+ 'GCM Test Case 10',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '0f10f599ae14a154ed24b36e25324db8c566632ef2bbb34f8347280fc4507057' +
+ 'fddc29df9a471f75c66541d4d4dad1c9e93a19a58e8b473fa0f062f7|' +
+ '65dcc57fcf623a24094fcca40d3533f8',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c',
+ 'GCM Test Case 11',
+ dict(mode='GCM', nonce='cafebabefacedbad')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd27e88681ce3243c4830165a8fdcf9ff1de9a1d8e6b447ef6ef7b79828666e45' +
+ '81e79012af34ddd9e2f037589b292db3e67c036745fa22e7e9b7373b|' +
+ 'dcf566ff291c25bbb8568fc3d376a6d9',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c',
+ 'GCM Test Case 12',
+ dict(mode='GCM', nonce='9313225df88406e555909c5aff5269aa'+
+ '6a7a9538534f7da1e4c303d2a318a728c3c0c95156809539fcf0e2429a6b5254'+
+ '16aedbf5a0de6a57a637b39b' )
+ ),
+
+ ( '|',
+ '||530f8afbc74536b9a963b4f1c4cb738b',
+ '0000000000000000000000000000000000000000000000000000000000000000',
+ 'GCM Test Case 13',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|00000000000000000000000000000000',
+ '|cea7403d4d606b6e074ec5d3baf39d18|d0d1c8a799996bf0265b98b5d48ab919',
+ '0000000000000000000000000000000000000000000000000000000000000000',
+ 'GCM Test Case 14',
+ dict(mode='GCM', nonce='000000000000000000000000')
+ ),
+
+ ( '|d9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b391aafd255',
+ '|522dc1f099567d07f47f37a32a84427d643a8cdcbfe5c0c97598a2bd2555d1aa' +
+ '8cb08e48590dbb3da7b08b1056828838c5f61e6393ba7a0abcc9f662898015ad|' +
+ 'b094dac5d93471bdec1a502270e3cc6c',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 15',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '522dc1f099567d07f47f37a32a84427d643a8cdcbfe5c0c97598a2bd2555d1aa' +
+ '8cb08e48590dbb3da7b08b1056828838c5f61e6393ba7a0abcc9f662|' +
+ '76fc6ece0f4e1768cddf8853bb2d551b',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 16',
+ dict(mode='GCM', nonce='cafebabefacedbaddecaf888')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'c3762df1ca787d32ae47c13bf19844cbaf1ae14d0b976afac52ff7d79bba9de0' +
+ 'feb582d33934a4f0954cc2363bc73f7862ac430e64abe499f47c9b1f|' +
+ '3a337dbf46a792c45e454913fe2ea8f2',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 17',
+ dict(mode='GCM', nonce='cafebabefacedbad')
+ ),
+
+ ( 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ 'd9313225f88406e5a55909c5aff5269a86a7a9531534f7da2e4c303d8a318a72' +
+ '1c3c0c95956809532fcf0e2449a6b525b16aedf5aa0de657ba637b39',
+ 'feedfacedeadbeeffeedfacedeadbeefabaddad2|' +
+ '5a8def2f0c9e53f1f75d7853659e2a20eeb2b22aafde6419a058ab4f6f746bf4' +
+ '0fc0c3b780f244452da3ebf1c5d82cdea2418997200ef82e44ae7e3f|' +
+ 'a44a8266ee1c8eb0c8b5d4cf5ae9f19a',
+ 'feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308',
+ 'GCM Test Case 18',
+ dict(mode='GCM', nonce='9313225df88406e555909c5aff5269aa'+
+ '6a7a9538534f7da1e4c303d2a318a728c3c0c95156809539fcf0e2429a6b5254'+
+ '16aedbf5a0de6a57a637b39b' )
+ ),
]
def get_tests(config={}):