diff options
Diffstat (limited to 'lib/Crypto/Cipher/blockalgo.py')
-rw-r--r-- | lib/Crypto/Cipher/blockalgo.py | 441 |
1 files changed, 393 insertions, 48 deletions
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py index e20ec5a..57738c4 100644 --- a/lib/Crypto/Cipher/blockalgo.py +++ b/lib/Crypto/Cipher/blockalgo.py @@ -24,8 +24,15 @@ import sys if sys.version_info[0] == 2 and sys.version_info[1] == 1: from Crypto.Util.py21compat import * + from Crypto.Util.py3compat import * +from binascii import unhexlify + +from Crypto.Util import Counter +from Crypto.Util.strxor import strxor +from Crypto.Util.number import long_to_bytes + #: *Electronic Code Book (ECB)*. #: This is the simplest encryption mode. Each of the plaintext blocks #: is directly encrypted into a ciphertext block, independently of @@ -123,6 +130,61 @@ MODE_CTR = 6 #: .. _OpenPGP: http://tools.ietf.org/html/rfc4880 MODE_OPENPGP = 7 +#: *Counter with CBC-MAC (CCM)*. This is an Authenticated Encryption with +#: Associated Data (`AEAD`_) mode. It provides both confidentiality and +#: authenticity. +#: The header of the message may be left in the clear, if needed, and it will +#: still be subject to authentication. The decryption step tells the receiver +#: if the message comes from a source that really knowns the secret key. +#: Additionally, decryption detects if any part of the message - including the +#: header - has been modified or corrupted. +#: +#: This mode requires a nonce. The nonce shall never repeat for two +#: different messages encrypted with the same key, but it does not need +#: to be random. +#: Note that there is a trade-off between the size of the nonce and the +#: maximum size of a single message you can encrypt. +#: +#: It is important to use a large nonce if the key is reused across several +#: messages and the nonce is chosen randomly. +#: +#: It is acceptable to us a short nonce if the key is only used a few times or +#: if the nonce is taken from a counter. +#: +#: The following table shows the trade-off when the nonce is chosen at +#: random. The column on the left shows how many messages it takes +#: for the keystream to repeat **on average**. In practice, you will want to +#: stop using the key way before that. +#: +#: +--------------------+---------------+-------------------+ +#: | Avg. # of messages | nonce | Max. message | +#: | before keystream | size | size | +#: | repeats | (bytes) | (bytes) | +#: +====================+===============+===================+ +#: | 2**52 | 13 | 64K | +#: +--------------------+---------------+-------------------+ +#: | 2**48 | 12 | 16M | +#: +--------------------+---------------+-------------------+ +#: | 2**44 | 11 | 4G | +#: +--------------------+---------------+-------------------+ +#: | 2**40 | 10 | 1T | +#: +--------------------+---------------+-------------------+ +#: | 2**36 | 9 | 64P | +#: +--------------------+---------------+-------------------+ +#: | 2**32 | 8 | 16E | +#: +--------------------+---------------+-------------------+ +#: +#: This mode is only available for ciphers that operate on 128 bits blocks +#: (e.g. AES but not TDES). +#: +#: See `NIST SP800-38C`_ or RFC3610_ . +#: +#: .. _`NIST SP800-38C`: http://csrc.nist.gov/publications/nistpubs/800-38C/SP800-38C.pdf +#: .. _RFC3610: https://tools.ietf.org/html/rfc3610 +#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html +MODE_CCM = 8 + + def _getParameter(name, index, args, kwargs, default=None): """Find a parameter in tuple and dictionary arguments a function receives""" @@ -132,68 +194,220 @@ def _getParameter(name, index, args, kwargs, default=None): raise ValueError("Parameter '%s' is specified twice" % name) param = args[index] return param or default - + + class BlockAlgo: """Class modelling an abstract block cipher.""" def __init__(self, factory, key, *args, **kwargs): self.mode = _getParameter('mode', 0, args, kwargs, default=MODE_ECB) self.block_size = factory.block_size - - if self.mode != MODE_OPENPGP: + self._factory = factory + + if self.mode == MODE_CCM: + if self.block_size != 16: + raise ValueError("CCM mode is only available for ciphers that operate on 128 bits blocks") + + self._mac_len = kwargs.get('mac_len', 16) # t + if self._mac_len not in (4, 6, 8, 10, 12, 14, 16): + raise ValueError("Parameter 'mac_len' must be even and in the range 4..16") + + self.nonce = _getParameter('nonce', 1, args, kwargs) # N + if not (self.nonce and 7 <= len(self.nonce) <= 13): + raise ValueError("Length of parameter 'nonce' must be" + " in the range 7..13 bytes") + + self._key = key + self._msg_len = kwargs.get('msg_len', None) # p + self._assoc_len = kwargs.get('assoc_len', None) # a + + self._assoc_buffer = [] + self._assoc_buffer_len = 0 + self._cipherCBC = None # To be used for MAC + self._done_assoc_data = False # True when all associated data + # has been processed + + # Allowed transitions after initialization + self._next = [self.update, self.encrypt, self.decrypt, + self.digest, self.verify] + + # Try to start CCM + self._start_ccm() + + elif self.mode == MODE_OPENPGP: + self._start_PGP(factory, key, *args, **kwargs) + else: self._cipher = factory.new(key, *args, **kwargs) self.IV = self._cipher.IV + + def _start_PGP(self, factory, key, *args, **kwargs): + # OPENPGP mode. For details, see 13.9 in RCC4880. + # + # A few members are specifically created for this mode: + # - _encrypted_iv, set in this constructor + # - _done_first_block, set to True after the first encryption + # - _done_last_block, set to True after a partial block is processed + + self._done_first_block = False + self._done_last_block = False + self.IV = _getParameter('iv', 1, args, kwargs) + if not self.IV: + raise ValueError("MODE_OPENPGP requires an IV") + + # Instantiate a temporary cipher to process the IV + IV_cipher = factory.new( + key, + MODE_CFB, + b('\x00') * self.block_size, # IV for CFB + segment_size=self.block_size * 8) + + # The cipher will be used for... + if len(self.IV) == self.block_size: + # ... encryption + self._encrypted_IV = IV_cipher.encrypt( + self.IV + self.IV[-2:] + # Plaintext + b('\x00') * (self.block_size - 2) # Padding + )[:self.block_size + 2] + elif len(self.IV) == self.block_size + 2: + # ... decryption + self._encrypted_IV = self.IV + self.IV = IV_cipher.decrypt( + self.IV + # Ciphertext + b('\x00') * (self.block_size - 2) # Padding + )[:self.block_size + 2] + if self.IV[-2:] != self.IV[-4:-2]: + raise ValueError("Failed integrity check for OPENPGP IV") + self.IV = self.IV[:-2] else: - # OPENPGP mode. For details, see 13.9 in RCC4880. - # - # A few members are specifically created for this mode: - # - _encrypted_iv, set in this constructor - # - _done_first_block, set to True after the first encryption - # - _done_last_block, set to True after a partial block is processed - - self._done_first_block = False - self._done_last_block = False - self.IV = _getParameter('iv', 1, args, kwargs) - if not self.IV: - raise ValueError("MODE_OPENPGP requires an IV") - - # Instantiate a temporary cipher to process the IV - IV_cipher = factory.new(key, MODE_CFB, - b('\x00')*self.block_size, # IV for CFB - segment_size=self.block_size*8) - - # The cipher will be used for... - if len(self.IV) == self.block_size: - # ... encryption - self._encrypted_IV = IV_cipher.encrypt( - self.IV + self.IV[-2:] + # Plaintext - b('\x00')*(self.block_size-2) # Padding - )[:self.block_size+2] - elif len(self.IV) == self.block_size+2: - # ... decryption - self._encrypted_IV = self.IV - self.IV = IV_cipher.decrypt(self.IV + # Ciphertext - b('\x00')*(self.block_size-2) # Padding - )[:self.block_size+2] - if self.IV[-2:] != self.IV[-4:-2]: - raise ValueError("Failed integrity check for OPENPGP IV") - self.IV = self.IV[:-2] + raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP" + % (self.block_size, self.block_size+2)) + + # Instantiate the cipher for the real PGP data + self._cipher = factory.new( + key, + MODE_CFB, + self._encrypted_IV[-self.block_size:], + segment_size=self.block_size * 8 + ) + + def _start_ccm(self, assoc_len=None, msg_len=None): + # CCM mode. This method creates the 2 ciphers used for the MAC + # (self._cipherCBC) and for the encryption/decryption (self._cipher). + # + # Member _assoc_buffer may already contain user data that needs to be + # authenticated. + + if self._cipherCBC: + # Already started + return + if assoc_len is not None: + self._assoc_len = assoc_len + if msg_len is not None: + self._msg_len = msg_len + if None in (self._assoc_len, self._msg_len): + return + + # q is the length of Q, the encoding of the message length + q = 15 - len(self.nonce) + + ## Compute B_0 + flags = ( + 64 * (self._assoc_len > 0) + + 8 * divmod(self._mac_len - 2, 2)[0] + + (q - 1) + ) + b_0 = bchr(flags) + self.nonce + long_to_bytes(self._msg_len, q) + self._assoc_buffer.insert(0, b_0) + self._assoc_buffer_len += 16 + + # Start CBC MAC with zero IV + # Mind that self._assoc_buffer may already contain some data + self._cipherCBC = self._factory.new(self._key, MODE_CBC, bchr(0)*16) + assoc_len_encoded = b('') + if self._assoc_len > 0: + if self._assoc_len < (2 ** 16 - 2 ** 8): + enc_size = 2 + elif self._assoc_len < (2L ** 32): + assoc_len_encoded = b('\xFF\xFE') + enc_size = 4 else: - raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP" - % (self.block_size, self.block_size+2)) + assoc_len_encoded = b('\xFF\xFF') + enc_size = 8 + assoc_len_encoded += long_to_bytes(self._assoc_len, enc_size) + self._assoc_buffer.insert(1, assoc_len_encoded) + self._assoc_buffer_len += len(assoc_len_encoded) + + # Start CTR cipher + flags = q - 1 + prefix = bchr(flags) + self.nonce + ctr = Counter.new(128 - len(prefix) * 8, prefix, initial_value=0) + self._cipher = self._factory.new(self._key, MODE_CTR, counter=ctr) + # Will XOR against CBC MAC + self._s_0 = self._cipher.encrypt(bchr(0) * 16) + + def update(self, assoc_data): + """Protect associated data + + When using an AEAD mode like CCM, and if there is any associated data, + the caller has to invoke this function one or more times, before + using ``decrypt`` or ``encrypt``. + + By *associated data* it is meant any data (e.g. packet headers) that + will not be encrypted and will be transmitted in the clear. + However, the receiver is still able to detect any modification to it. + In CCM, the *associated data* is also called *additional authenticated + data*. + + If there is no associated data, this method must not be called. + + The caller may split associated data in segments of any size, and + invoke this method multiple times, each time with the next segment. - # Instantiate the cipher for the real PGP data - self._cipher = factory.new(key, MODE_CFB, - self._encrypted_IV[-self.block_size:], - segment_size=self.block_size*8) + :Parameters: + assoc_data : byte string + A piece of associated data. There are no restrictions on its size. + """ + + if self.mode == MODE_CCM: + if self.update not in self._next: + raise TypeError("update() can only be called immediately after initialization") + self._next = [ self.update, self.encrypt, self.decrypt, + self.digest, self.verify ] + return self._update(assoc_data) + + def _update(self, assoc_data, do_zero_padding=False): + """Equivalent to update(), but without FSM checks.""" + + if self.mode == MODE_CCM: + self._assoc_buffer.append(assoc_data) + self._assoc_buffer_len += len(assoc_data) + + if not self._cipherCBC: + return + + if do_zero_padding and (self._assoc_buffer_len & 15): + npad = 16 - self._assoc_buffer_len & 15 + self._assoc_buffer.append(bchr(0) * npad) + self._assoc_buffer_len += npad + + # Feed data into CBC MAC + aligned_data = 16 * divmod(self._assoc_buffer_len, 16)[0] + if aligned_data > 0: + buf = b("").join(self._assoc_buffer) + self._t = self._cipherCBC.encrypt(buf[:aligned_data])[-16:] + self._assoc_buffer = [buf[aligned_data:]] + self._assoc_buffer_len -= aligned_data + return + raise ValueError("update() not supported by this mode of operation") def encrypt(self, plaintext): """Encrypt data with the key and the parameters set at initialization. - + The cipher object is stateful; encryption of a long block of data can be broken up in two or more calls to `encrypt()`. + That is, the statement: - + >>> c.encrypt(a) + c.encrypt(b) is always equivalent to: @@ -211,7 +425,8 @@ class BlockAlgo: - For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB` and `MODE_CTR`, *plaintext* can be of any length. + - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM` *plaintext* can be + of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. @@ -245,6 +460,23 @@ class BlockAlgo: self._done_first_block = True return res + if self.mode == MODE_CCM: + + if self.encrypt not in self._next: + raise TypeError("encrypt() can only be called after initialization or an update()") + self._next = [self.encrypt, self.digest] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=len(plaintext)) + self._next = [self.digest] + if not self._done_assoc_data: + self._update(b(""), do_zero_padding=True) + self._done_assoc_data = True + + self._update(plaintext) + return self._cipher.encrypt(plaintext) def decrypt(self, ciphertext): @@ -272,7 +504,8 @@ class BlockAlgo: - For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB` and `MODE_CTR`, *ciphertext* can be of any length. + - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM`, *ciphertext* can be + of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. @@ -282,6 +515,7 @@ class BlockAlgo: The piece of data to decrypt. :Return: the decrypted data (byte string, as long as *ciphertext*). """ + if self.mode == MODE_OPENPGP: padding_length = (self.block_size - len(ciphertext) % self.block_size) % self.block_size if padding_length > 0: @@ -298,5 +532,116 @@ class BlockAlgo: res = self._cipher.decrypt(ciphertext) return res - return self._cipher.decrypt(ciphertext) + if self.mode == MODE_CCM: + + if self.decrypt not in self._next: + raise TypeError("decrypt() can only be called after initialization or an update()") + self._next = [self.decrypt, self.verify] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=len(ciphertext)) + self._next = [self.verify] + if not self._done_assoc_data: + self._update(b(""), do_zero_padding=True) + self._done_assoc_data = True + + pt = self._cipher.decrypt(ciphertext) + + if self.mode == MODE_CCM: + self._update(pt) + + return pt + + def digest(self): + """Compute the *binary* MAC tag in an AEAD mode. + + When using an AEAD mode like CCM, the caller invokes this function + at the very end. + + This method returns the MAC that shall be sent to the receiver, + together with the ciphertext. + + :Return: the MAC, as a byte string. + """ + + if self.mode == MODE_CCM: + + if self.digest not in self._next: + raise TypeError("digest() cannot be called when decrypting or validating a message") + self._next = [self.digest] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + + return strxor(self._t, self._s_0)[:self._mac_len] + + raise TypeError("digest() not supported by this mode of operation") + + def hexdigest(self): + """Compute the *printable* MAC tag in an AEAD mode. + + This method is like `digest`. + + :Return: the MAC, as a hexadecimal string. + """ + return "".join(["%02x" % bord(x) for x in self.digest()]) + + def verify(self, mac_tag): + """Validate the *binary* MAC tag in an AEAD mode. + + When using an AEAD mode like CCM, the caller invokes this function + at the very end. + + This method checks if the decrypted message is indeed valid + (that is, if the key is correct) and it has not been + tampered with while in transit. + + :Parameters: + mac_tag : byte string + This is the *binary* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + if self.mode == MODE_CCM: + if self.verify not in self._next: + raise TypeError("verify() cannot be called when encrypting a message") + self._next = [self.verify] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + u = strxor(self._t, self._s_0)[:self._mac_len] + + res = 0 + # Constant-time comparison + for x,y in zip(u, mac_tag): + res |= bord(x) ^ bord(y) + if res or len(mac_tag)!=self._mac_len: + raise ValueError("MAC check failed") + return + + raise TypeError("verify() not supported by this mode of operation") + + def hexverify(self, hex_mac_tag): + """Validate the *printable* MAC tag in an AEAD mode. + + This method is like `verify`. + + :Parameters: + hex_mac_tag : string + This is the *printable* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + self.verify(unhexlify(hex_mac_tag)) |