diff options
-rw-r--r-- | Doc/AEAD_API.txt | 267 | ||||
-rw-r--r-- | lib/Crypto/Cipher/AES.py | 63 | ||||
-rw-r--r-- | lib/Crypto/Cipher/blockalgo.py | 441 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Cipher/common.py | 340 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Cipher/test_AES.py | 234 | ||||
-rw-r--r-- | pct-speedtest.py | 4 |
6 files changed, 1281 insertions, 68 deletions
diff --git a/Doc/AEAD_API.txt b/Doc/AEAD_API.txt new file mode 100644 index 0000000..0dcc147 --- /dev/null +++ b/Doc/AEAD_API.txt @@ -0,0 +1,267 @@ +AEAD (Authenticated Encryption with Associated Data) modes of operations +for symmetric block ciphers provide authentication and integrity in addition +to confidentiality. + +Traditional modes like CBC or CTR only guarantee confidentiality; one +has to combine them with cryptographic MACs in order to have assurance of +authenticity. It is not trivial to implement such generic composition +without introducing subtle security flaws. + +AEAD modes are designed to be more secure and often more efficient than +ad-hoc constructions. Widespread AEAD modes are GCM, CCM, EAX, SIV, OCB. + +Most AEAD modes allow to optionally authenticate the plaintext with some +piece of arbitrary data that will not be encrypted, for instance a +packet header. + +In this file, that is called "associated data" (AD) even though terminology +may vary from mode to mode. + +=== Goals of the AEAD API + +1. Any piece of data (AD, ciphertext, plaintext) is passed only once. +2. It is possible to encrypt/decrypt huge files withe little memory cost. + To this end, authentication, encryption, and decryption are always + incremental. That is, the caller may split data in any way, and the end + result does not depend on how splitting is done. + Data is processed immediately without being internally buffered. +3. API is similar to a Crypto.Hash MAC object, to the point that when only + AD is present, the object behaves exactly like a MAC. +4. API is similar to a classic cipher object, to the point that when no AD is present, + the object behaves exactly like a classic cipher mode (e.g. CBC). +5. MACs are produced and handled separately from ciphertext/plaintext. +6. The API is intuitive and hard to misuse. Exceptions are raised immediately + in case of misuse. +7. Caller does not have to add or remove padding. + +The following state diagram shows the proposed API for AEAD modes. +In general, it is applicable to all block ciphers in the Crypto.Cipher +module, even though certain modes may put restrictions on certain +cipher properties (e.g. block size). + + .--------. + | START | + '--------' + | + | .new(key, mode, iv, ...) + v + .-------------. +.-------------.-------------| INITIALIZED |---------------+-------------. +| | '-------------' | | +| | | | | +| |.encrypt() |.update() |.decrypt() | +| | | | | +| | v | | +| | .--------------.___ .update() | | +| | | CLEAR HEADER |<--' | | +| | '--------------' | | +| | | | | | | | +| | .encrypt()| | | |.decrypt() | | +| v | | | | v | +| .--------------. | | | | .--------------. | +| | ENCRYPTING |<----' | | '---->| DECRYPTING | | +| '--------------' | | '--------------' | +| | ^ | | | | ^ | | +| hex/digest()| | |.encrypt()* | | | | |.decrypt()* | +| | ''' | | | ''' | +| | / \ | | +| | hex/digest()/ \hex/verify()|hex/verify() | +|hex/digest() | / \ | | +| | / \ | hex/verify()| +| v / \ v | +| .--------------. / \ .--------------. | +'------->| FINISHED/ENC |<-- -->| FINISHED/DEC |<---------' + '--------------' '--------------' + ^ | ^ | + | |hex/digest() | |hex/verify() + ''' ''' + + [*] this transition is not always allowed for non-online or 2-pass modes + +The following states are defined: + + - INITIALIZED. The cipher has been instantiated with a key, possibly + a nonce (or IV), and other parameters. + The cipher object may be used for encryption or decryption. + + - CLEAR HEADER. The cipher is authenticating the associated data. + + - ENCRYPTING. It has been decided that the cipher has to be used for + encrypting data. At least one piece of ciphertext has been produced. + + - DECRYPTING. It has been decided that the cipher has to be used for + decrypting data. At least one piece of candidate plaintext has + been produced. + + - FINISHED/ENC. Authenticated encryption has completed. + The final MAC has been produced. + + - FINISHED/DEC. Authenticated decryption has completed. + It is now established if the associated data together the plaintext + are authentic. + +In addition to the existing Cipher methods new(), encrypt() and decrypt(), +5 new methods are added to a Cipher object: + + - update() to consume all associated data. It takes as input + a byte string, and it can be invoked multiple times. + Ciphertext / plaintext must not be passed to update(). + For simplicity, update() can only be called before encryption + or decryption starts. If no associated data is used, update() + is not called. + + - digest() to output the binary MAC. It can only be called at the end + of the encryption. + + - hexdigest() as digest(), but the output is ASCII hexadecimal. + + - verify() to evaluate if the binary MAC is correct. It can only be + called at the end of decryption. + + - hexverify() as verify(), but the input is ASCII hexadecimal. + +The syntax of update(), digest(), and hexdigest() are consistent with +the API of a Hash object. + +IMPORTANT: method copy() is not present. Since most AEAD modes require +IV/nonce to never repeat, this method may be misused and lead to security +holes. + +Since MAC validation (decryption mode) is subject to timing attacks, +the (hex)verify() method accepts as input the expected MAC and raises a +ValueError exception if it does not match. +Internally, the method can perform the validation using time-independent code. + +=== Padding + +The proposed API only supports AEAD modes than do not require padding +of the plaintext. Adding support for that would make the API more complex +(for instance, an external "finished" flag to pass encrypt()). + +=== Pre-processing of associated data + +The proposed API does not support pre-processing of AD. + +Sometimes, a lot of encryptions/decryptions are performed with +the same key and the same AD, and different nonces and plaintext. + +Certain modes like GCM allow to cache processing of AD, and reuse +it such results across several encryptions/decryptions. + +=== Singe/2-pass modes and online/non-online modes + +The proposed API supports single-pass, online AEAD modes. + +A "single-pass" mode processes each block of AD or data only once. +Compare that to "2-pass" modes, where each block of data (not AD) twice: +once for authentication and one for enciphering. + +An "online" mode does not need to know the size of the message before +encryption starts. As a consequence: + - memory usage doesn't depend on the plaintext/ciphertext size. + - when encrypting, partial ciphertexts can be immediately sent to the receiver. + - when decrypting, partial (unauthenticated) plaintexts can be obtained from + the cipher. + +Most single-pass modes are patented, and only the less efficient 2-pass modes +are in widespread use. + +That is still OK, provided that encryption can start *before* authentication is +completed. If that's the case (e.g. GCM, EAX) encrypt()/decrypt() will also +take care of completing the authentication over the plaintext. +If that's NOT the case (e.g. SIV), encrypt()/decrypt() can only be called once. + +A similar problem arises with non-online modes (e.g. CCM): they have to wait to see +the end of the plaintext before encryption can start. The only way to achieve +that is to only allow encrypt()/decrypt() to be called once. + +=== Associated Data + +Depending on the mode, the associated data AD can be defined as: + 1. a single binary string or + 2. a vector of binary strings + +For modes of the 1st type (e.g. CCM), the API allows the AD to be split +in any number of segments, and fed to update() in multiple calls. +The resulting AD does not depend on how splitting is performed. +It is responsability of the caller to ensure that concatenation of strings +is secure. For instance, there is no difference between: + + c.update(b"builtin") + c.update(b"securely") + +and: + + c.update(b"built") + c.update(b"insecurely") + +For modes of the 2nd type (e.g. SIV), the API assumes that each call +to update() ingests one full item of the vector. The two examples above +are now different. + +=== CCM mode +Number of keys required: 1 +Compatible with ciphers: AES (may be used with others if block length + is 128 bits) +Counter/IV/nonce: 1 nonce required (length 8..13 bytes) +Single-pass: no +Online: no +Term for AD: "associate data" (NIST) or "additional + authenticated data" (RFC) +Term for MAC: part of ciphertext (NIST) or + "encrypted authentication value" (RFC). +Padding required: no +Pre-processing of AD: not possible + +In order to allow encrypt()/decrypt() to be called multiple times, +and to reduce the memory usage, the new() method of the Cipher module +optionally accepts the following parameters: + + - 'assoc_len', the total length (in bytes) of the AD + + - 'msg_len', the total length (in bytes) of the plaintext or ciphertext + +=== GCM mode +Number of keys required: 1 +Compatible with ciphers: AES (may be used with others if block length + is 128 bits) +Counter/IV/nonce: 1 IV required (any length) +Single-pass: no +Online: yes +Term for AD: "additional authenticated data" +Term for MAC: "authentication tag" or "tag" +Padding required: no +Pre-processing of AD: possible + +Encryption can start before authentication, so encrypt()/decrypt() can always +be called multiple times. + +=== EAX mode +Number of keys required: 1 +Compatible with ciphers: any +Counter/IV/nonce: 1 IV required (any length) +Single-pass: no +Online: yes +Term for AD: "header" +Term for MAC: "tag" +Padding required: no +Pre-processing of AD: possible + +Encryption can start before authentication, so encrypt()/decrypt() can always +be called multiple times. + +=== SIV +Number of keys required: 2 +Compatible with ciphers: AES +Counter/IV/nonce: 1 IV required (any length) +Single-pass: no +Online: no +Term for AD: "associated data" (vector) +Term for MAC: "tag" +Padding required: no +Pre-processing of AD: possible + +Encryption can only start before authentication is ended, so encrypt()/decrypt() +an only be called once. SIV is not suitable for big files or streaming. + +AD is a vector of strings. One item in the vector is the (optional) IV. diff --git a/lib/Crypto/Cipher/AES.py b/lib/Crypto/Cipher/AES.py index 351b954..5316700 100644 --- a/lib/Crypto/Cipher/AES.py +++ b/lib/Crypto/Cipher/AES.py @@ -31,15 +31,46 @@ encryption. As an example, encryption can be done as follows: >>> from Crypto.Cipher import AES - >>> from Crypto import Random + >>> from Crypto.Random import get_random_bytes >>> >>> key = b'Sixteen byte key' - >>> iv = Random.new().read(AES.block_size) + >>> iv = get_random_bytes(16) >>> cipher = AES.new(key, AES.MODE_CFB, iv) >>> msg = iv + cipher.encrypt(b'Attack at dawn') +A more complicated example is based on CCM, (see `MODE_CCM`) an `AEAD`_ mode +that provides both confidentiality and authentication for a message. +It also allows message for the header to remain in the clear, whilst still +being authenticated. The encryption is done as follows: + + >>> from Crypto.Cipher import AES + >>> from Crypto.Random import get_random_bytes + >>> + >>> + >>> hdr = b'To your eyes only' + >>> plaintext = b'Attack at dawn' + >>> key = b'Sixteen byte key' + >>> nonce = get_random_bytes(11) + >>> cipher = AES.new(key, AES.MODE_CCM, nonce) + >>> cipher.update(hdr) + >>> msg = nonce, hdr, cipher.encrypt(plaintext), cipher.digest() + +We assume that the tuple ``msg`` is transmitted to the receiver: + + >>> nonce, hdr, ciphertext, mac = msg + >>> key = b'Sixteen byte key' + >>> cipher = AES.new(key, AES.MODE_CCM, nonce) + >>> cipher.update(hdr) + >>> plaintext = cipher.decrypt(ciphertext) + >>> try: + >>> cipher.verify(mac) + >>> print "The message is authentic: hdr=%s, pt=%s" % (hdr, plaintext) + >>> except ValueError: + >>> print "Key incorrect or message corrupted" + .. __: http://en.wikipedia.org/wiki/Advanced_Encryption_Standard .. _NIST: http://csrc.nist.gov/publications/fips/fips197/fips-197.pdf +.. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html :undocumented: __revision__, __package__ """ @@ -67,7 +98,7 @@ class AESCipher (blockalgo.BlockAlgo): def __init__(self, key, *args, **kwargs): """Initialize an AES cipher object - + See also `new()` at the module level.""" # Check if the use_aesni was specified. @@ -94,6 +125,8 @@ def new(key, *args, **kwargs): The chaining mode to use for encryption or decryption. Default is `MODE_ECB`. IV : byte string + (*Only* `MODE_CBC`, `MODE_CFB`, `MODE_OFB`, `MODE_OPENPGP`). + The initialization vector to use for encryption or decryption. It is ignored for `MODE_ECB` and `MODE_CTR`. @@ -102,8 +135,17 @@ def new(key, *args, **kwargs): and `block_size` +2 bytes for decryption (in the latter case, it is actually the *encrypted* IV which was prefixed to the ciphertext). It is mandatory. - - For all other modes, it must be `block_size` bytes longs. + + For all other modes, it must be 16 bytes long. + nonce : byte string + (*Only* `MODE_CCM`). + + A mandatory value that must never be reused for any other encryption. + + For `MODE_CCM`, its length must be in the range ``[7..13]``. + 11 or 12 bytes are reasonable values in general. Bear in + mind that with CCM there is a trade-off between nonce length and + maximum message size. counter : callable (*Only* `MODE_CTR`). A stateful function that returns the next *counter block*, which is a byte string of `block_size` bytes. @@ -112,6 +154,15 @@ def new(key, *args, **kwargs): (*Only* `MODE_CFB`).The number of bits the plaintext and ciphertext are segmented in. It must be a multiple of 8. If 0 or not specified, it will be assumed to be 8. + mac_len : integer + (*Only* `MODE_CCM`). Length of the MAC, in bytes. It must be even and in + the range ``[4..16]``. The default is 16. + msg_len : integer + (*Only* `MODE_CCM`). Length of the message to (de)cipher. + If not specified, ``encrypt`` or ``decrypt`` may only be called once. + assoc_len : integer + (*Only* `MODE_CCM`). Length of the associated data. + If not specified, all data is internally buffered. use_aesni : boolean Use AES-NI if available. @@ -133,6 +184,8 @@ MODE_OFB = 5 MODE_CTR = 6 #: OpenPGP Mode. See `blockalgo.MODE_OPENPGP`. MODE_OPENPGP = 7 +#: Counter with CBC-MAC (CCM) Mode. See `blockalgo.MODE_CCM`. +MODE_CCM = 8 #: Size of a data block (in bytes) block_size = 16 #: Size of a key (in bytes) diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py index e20ec5a..57738c4 100644 --- a/lib/Crypto/Cipher/blockalgo.py +++ b/lib/Crypto/Cipher/blockalgo.py @@ -24,8 +24,15 @@ import sys if sys.version_info[0] == 2 and sys.version_info[1] == 1: from Crypto.Util.py21compat import * + from Crypto.Util.py3compat import * +from binascii import unhexlify + +from Crypto.Util import Counter +from Crypto.Util.strxor import strxor +from Crypto.Util.number import long_to_bytes + #: *Electronic Code Book (ECB)*. #: This is the simplest encryption mode. Each of the plaintext blocks #: is directly encrypted into a ciphertext block, independently of @@ -123,6 +130,61 @@ MODE_CTR = 6 #: .. _OpenPGP: http://tools.ietf.org/html/rfc4880 MODE_OPENPGP = 7 +#: *Counter with CBC-MAC (CCM)*. This is an Authenticated Encryption with +#: Associated Data (`AEAD`_) mode. It provides both confidentiality and +#: authenticity. +#: The header of the message may be left in the clear, if needed, and it will +#: still be subject to authentication. The decryption step tells the receiver +#: if the message comes from a source that really knowns the secret key. +#: Additionally, decryption detects if any part of the message - including the +#: header - has been modified or corrupted. +#: +#: This mode requires a nonce. The nonce shall never repeat for two +#: different messages encrypted with the same key, but it does not need +#: to be random. +#: Note that there is a trade-off between the size of the nonce and the +#: maximum size of a single message you can encrypt. +#: +#: It is important to use a large nonce if the key is reused across several +#: messages and the nonce is chosen randomly. +#: +#: It is acceptable to us a short nonce if the key is only used a few times or +#: if the nonce is taken from a counter. +#: +#: The following table shows the trade-off when the nonce is chosen at +#: random. The column on the left shows how many messages it takes +#: for the keystream to repeat **on average**. In practice, you will want to +#: stop using the key way before that. +#: +#: +--------------------+---------------+-------------------+ +#: | Avg. # of messages | nonce | Max. message | +#: | before keystream | size | size | +#: | repeats | (bytes) | (bytes) | +#: +====================+===============+===================+ +#: | 2**52 | 13 | 64K | +#: +--------------------+---------------+-------------------+ +#: | 2**48 | 12 | 16M | +#: +--------------------+---------------+-------------------+ +#: | 2**44 | 11 | 4G | +#: +--------------------+---------------+-------------------+ +#: | 2**40 | 10 | 1T | +#: +--------------------+---------------+-------------------+ +#: | 2**36 | 9 | 64P | +#: +--------------------+---------------+-------------------+ +#: | 2**32 | 8 | 16E | +#: +--------------------+---------------+-------------------+ +#: +#: This mode is only available for ciphers that operate on 128 bits blocks +#: (e.g. AES but not TDES). +#: +#: See `NIST SP800-38C`_ or RFC3610_ . +#: +#: .. _`NIST SP800-38C`: http://csrc.nist.gov/publications/nistpubs/800-38C/SP800-38C.pdf +#: .. _RFC3610: https://tools.ietf.org/html/rfc3610 +#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html +MODE_CCM = 8 + + def _getParameter(name, index, args, kwargs, default=None): """Find a parameter in tuple and dictionary arguments a function receives""" @@ -132,68 +194,220 @@ def _getParameter(name, index, args, kwargs, default=None): raise ValueError("Parameter '%s' is specified twice" % name) param = args[index] return param or default - + + class BlockAlgo: """Class modelling an abstract block cipher.""" def __init__(self, factory, key, *args, **kwargs): self.mode = _getParameter('mode', 0, args, kwargs, default=MODE_ECB) self.block_size = factory.block_size - - if self.mode != MODE_OPENPGP: + self._factory = factory + + if self.mode == MODE_CCM: + if self.block_size != 16: + raise ValueError("CCM mode is only available for ciphers that operate on 128 bits blocks") + + self._mac_len = kwargs.get('mac_len', 16) # t + if self._mac_len not in (4, 6, 8, 10, 12, 14, 16): + raise ValueError("Parameter 'mac_len' must be even and in the range 4..16") + + self.nonce = _getParameter('nonce', 1, args, kwargs) # N + if not (self.nonce and 7 <= len(self.nonce) <= 13): + raise ValueError("Length of parameter 'nonce' must be" + " in the range 7..13 bytes") + + self._key = key + self._msg_len = kwargs.get('msg_len', None) # p + self._assoc_len = kwargs.get('assoc_len', None) # a + + self._assoc_buffer = [] + self._assoc_buffer_len = 0 + self._cipherCBC = None # To be used for MAC + self._done_assoc_data = False # True when all associated data + # has been processed + + # Allowed transitions after initialization + self._next = [self.update, self.encrypt, self.decrypt, + self.digest, self.verify] + + # Try to start CCM + self._start_ccm() + + elif self.mode == MODE_OPENPGP: + self._start_PGP(factory, key, *args, **kwargs) + else: self._cipher = factory.new(key, *args, **kwargs) self.IV = self._cipher.IV + + def _start_PGP(self, factory, key, *args, **kwargs): + # OPENPGP mode. For details, see 13.9 in RCC4880. + # + # A few members are specifically created for this mode: + # - _encrypted_iv, set in this constructor + # - _done_first_block, set to True after the first encryption + # - _done_last_block, set to True after a partial block is processed + + self._done_first_block = False + self._done_last_block = False + self.IV = _getParameter('iv', 1, args, kwargs) + if not self.IV: + raise ValueError("MODE_OPENPGP requires an IV") + + # Instantiate a temporary cipher to process the IV + IV_cipher = factory.new( + key, + MODE_CFB, + b('\x00') * self.block_size, # IV for CFB + segment_size=self.block_size * 8) + + # The cipher will be used for... + if len(self.IV) == self.block_size: + # ... encryption + self._encrypted_IV = IV_cipher.encrypt( + self.IV + self.IV[-2:] + # Plaintext + b('\x00') * (self.block_size - 2) # Padding + )[:self.block_size + 2] + elif len(self.IV) == self.block_size + 2: + # ... decryption + self._encrypted_IV = self.IV + self.IV = IV_cipher.decrypt( + self.IV + # Ciphertext + b('\x00') * (self.block_size - 2) # Padding + )[:self.block_size + 2] + if self.IV[-2:] != self.IV[-4:-2]: + raise ValueError("Failed integrity check for OPENPGP IV") + self.IV = self.IV[:-2] else: - # OPENPGP mode. For details, see 13.9 in RCC4880. - # - # A few members are specifically created for this mode: - # - _encrypted_iv, set in this constructor - # - _done_first_block, set to True after the first encryption - # - _done_last_block, set to True after a partial block is processed - - self._done_first_block = False - self._done_last_block = False - self.IV = _getParameter('iv', 1, args, kwargs) - if not self.IV: - raise ValueError("MODE_OPENPGP requires an IV") - - # Instantiate a temporary cipher to process the IV - IV_cipher = factory.new(key, MODE_CFB, - b('\x00')*self.block_size, # IV for CFB - segment_size=self.block_size*8) - - # The cipher will be used for... - if len(self.IV) == self.block_size: - # ... encryption - self._encrypted_IV = IV_cipher.encrypt( - self.IV + self.IV[-2:] + # Plaintext - b('\x00')*(self.block_size-2) # Padding - )[:self.block_size+2] - elif len(self.IV) == self.block_size+2: - # ... decryption - self._encrypted_IV = self.IV - self.IV = IV_cipher.decrypt(self.IV + # Ciphertext - b('\x00')*(self.block_size-2) # Padding - )[:self.block_size+2] - if self.IV[-2:] != self.IV[-4:-2]: - raise ValueError("Failed integrity check for OPENPGP IV") - self.IV = self.IV[:-2] + raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP" + % (self.block_size, self.block_size+2)) + + # Instantiate the cipher for the real PGP data + self._cipher = factory.new( + key, + MODE_CFB, + self._encrypted_IV[-self.block_size:], + segment_size=self.block_size * 8 + ) + + def _start_ccm(self, assoc_len=None, msg_len=None): + # CCM mode. This method creates the 2 ciphers used for the MAC + # (self._cipherCBC) and for the encryption/decryption (self._cipher). + # + # Member _assoc_buffer may already contain user data that needs to be + # authenticated. + + if self._cipherCBC: + # Already started + return + if assoc_len is not None: + self._assoc_len = assoc_len + if msg_len is not None: + self._msg_len = msg_len + if None in (self._assoc_len, self._msg_len): + return + + # q is the length of Q, the encoding of the message length + q = 15 - len(self.nonce) + + ## Compute B_0 + flags = ( + 64 * (self._assoc_len > 0) + + 8 * divmod(self._mac_len - 2, 2)[0] + + (q - 1) + ) + b_0 = bchr(flags) + self.nonce + long_to_bytes(self._msg_len, q) + self._assoc_buffer.insert(0, b_0) + self._assoc_buffer_len += 16 + + # Start CBC MAC with zero IV + # Mind that self._assoc_buffer may already contain some data + self._cipherCBC = self._factory.new(self._key, MODE_CBC, bchr(0)*16) + assoc_len_encoded = b('') + if self._assoc_len > 0: + if self._assoc_len < (2 ** 16 - 2 ** 8): + enc_size = 2 + elif self._assoc_len < (2L ** 32): + assoc_len_encoded = b('\xFF\xFE') + enc_size = 4 else: - raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP" - % (self.block_size, self.block_size+2)) + assoc_len_encoded = b('\xFF\xFF') + enc_size = 8 + assoc_len_encoded += long_to_bytes(self._assoc_len, enc_size) + self._assoc_buffer.insert(1, assoc_len_encoded) + self._assoc_buffer_len += len(assoc_len_encoded) + + # Start CTR cipher + flags = q - 1 + prefix = bchr(flags) + self.nonce + ctr = Counter.new(128 - len(prefix) * 8, prefix, initial_value=0) + self._cipher = self._factory.new(self._key, MODE_CTR, counter=ctr) + # Will XOR against CBC MAC + self._s_0 = self._cipher.encrypt(bchr(0) * 16) + + def update(self, assoc_data): + """Protect associated data + + When using an AEAD mode like CCM, and if there is any associated data, + the caller has to invoke this function one or more times, before + using ``decrypt`` or ``encrypt``. + + By *associated data* it is meant any data (e.g. packet headers) that + will not be encrypted and will be transmitted in the clear. + However, the receiver is still able to detect any modification to it. + In CCM, the *associated data* is also called *additional authenticated + data*. + + If there is no associated data, this method must not be called. + + The caller may split associated data in segments of any size, and + invoke this method multiple times, each time with the next segment. - # Instantiate the cipher for the real PGP data - self._cipher = factory.new(key, MODE_CFB, - self._encrypted_IV[-self.block_size:], - segment_size=self.block_size*8) + :Parameters: + assoc_data : byte string + A piece of associated data. There are no restrictions on its size. + """ + + if self.mode == MODE_CCM: + if self.update not in self._next: + raise TypeError("update() can only be called immediately after initialization") + self._next = [ self.update, self.encrypt, self.decrypt, + self.digest, self.verify ] + return self._update(assoc_data) + + def _update(self, assoc_data, do_zero_padding=False): + """Equivalent to update(), but without FSM checks.""" + + if self.mode == MODE_CCM: + self._assoc_buffer.append(assoc_data) + self._assoc_buffer_len += len(assoc_data) + + if not self._cipherCBC: + return + + if do_zero_padding and (self._assoc_buffer_len & 15): + npad = 16 - self._assoc_buffer_len & 15 + self._assoc_buffer.append(bchr(0) * npad) + self._assoc_buffer_len += npad + + # Feed data into CBC MAC + aligned_data = 16 * divmod(self._assoc_buffer_len, 16)[0] + if aligned_data > 0: + buf = b("").join(self._assoc_buffer) + self._t = self._cipherCBC.encrypt(buf[:aligned_data])[-16:] + self._assoc_buffer = [buf[aligned_data:]] + self._assoc_buffer_len -= aligned_data + return + raise ValueError("update() not supported by this mode of operation") def encrypt(self, plaintext): """Encrypt data with the key and the parameters set at initialization. - + The cipher object is stateful; encryption of a long block of data can be broken up in two or more calls to `encrypt()`. + That is, the statement: - + >>> c.encrypt(a) + c.encrypt(b) is always equivalent to: @@ -211,7 +425,8 @@ class BlockAlgo: - For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB` and `MODE_CTR`, *plaintext* can be of any length. + - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM` *plaintext* can be + of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. @@ -245,6 +460,23 @@ class BlockAlgo: self._done_first_block = True return res + if self.mode == MODE_CCM: + + if self.encrypt not in self._next: + raise TypeError("encrypt() can only be called after initialization or an update()") + self._next = [self.encrypt, self.digest] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=len(plaintext)) + self._next = [self.digest] + if not self._done_assoc_data: + self._update(b(""), do_zero_padding=True) + self._done_assoc_data = True + + self._update(plaintext) + return self._cipher.encrypt(plaintext) def decrypt(self, ciphertext): @@ -272,7 +504,8 @@ class BlockAlgo: - For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple of *segment_size*/8. - - For `MODE_OFB` and `MODE_CTR`, *ciphertext* can be of any length. + - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM`, *ciphertext* can be + of any length. - For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*, unless it is the last chunk of the message. @@ -282,6 +515,7 @@ class BlockAlgo: The piece of data to decrypt. :Return: the decrypted data (byte string, as long as *ciphertext*). """ + if self.mode == MODE_OPENPGP: padding_length = (self.block_size - len(ciphertext) % self.block_size) % self.block_size if padding_length > 0: @@ -298,5 +532,116 @@ class BlockAlgo: res = self._cipher.decrypt(ciphertext) return res - return self._cipher.decrypt(ciphertext) + if self.mode == MODE_CCM: + + if self.decrypt not in self._next: + raise TypeError("decrypt() can only be called after initialization or an update()") + self._next = [self.decrypt, self.verify] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=len(ciphertext)) + self._next = [self.verify] + if not self._done_assoc_data: + self._update(b(""), do_zero_padding=True) + self._done_assoc_data = True + + pt = self._cipher.decrypt(ciphertext) + + if self.mode == MODE_CCM: + self._update(pt) + + return pt + + def digest(self): + """Compute the *binary* MAC tag in an AEAD mode. + + When using an AEAD mode like CCM, the caller invokes this function + at the very end. + + This method returns the MAC that shall be sent to the receiver, + together with the ciphertext. + + :Return: the MAC, as a byte string. + """ + + if self.mode == MODE_CCM: + + if self.digest not in self._next: + raise TypeError("digest() cannot be called when decrypting or validating a message") + self._next = [self.digest] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + + return strxor(self._t, self._s_0)[:self._mac_len] + + raise TypeError("digest() not supported by this mode of operation") + + def hexdigest(self): + """Compute the *printable* MAC tag in an AEAD mode. + + This method is like `digest`. + + :Return: the MAC, as a hexadecimal string. + """ + return "".join(["%02x" % bord(x) for x in self.digest()]) + + def verify(self, mac_tag): + """Validate the *binary* MAC tag in an AEAD mode. + + When using an AEAD mode like CCM, the caller invokes this function + at the very end. + + This method checks if the decrypted message is indeed valid + (that is, if the key is correct) and it has not been + tampered with while in transit. + + :Parameters: + mac_tag : byte string + This is the *binary* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + if self.mode == MODE_CCM: + if self.verify not in self._next: + raise TypeError("verify() cannot be called when encrypting a message") + self._next = [self.verify] + + if self._assoc_len is None: + self._start_ccm(assoc_len=self._assoc_buffer_len) + if self._msg_len is None: + self._start_ccm(msg_len=0) + self._update(b(""), do_zero_padding=True) + u = strxor(self._t, self._s_0)[:self._mac_len] + + res = 0 + # Constant-time comparison + for x,y in zip(u, mac_tag): + res |= bord(x) ^ bord(y) + if res or len(mac_tag)!=self._mac_len: + raise ValueError("MAC check failed") + return + + raise TypeError("verify() not supported by this mode of operation") + + def hexverify(self, hex_mac_tag): + """Validate the *printable* MAC tag in an AEAD mode. + + This method is like `verify`. + + :Parameters: + hex_mac_tag : string + This is the *printable* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + self.verify(unhexlify(hex_mac_tag)) diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py index a20a3aa..e52a781 100644 --- a/lib/Crypto/SelfTest/Cipher/common.py +++ b/lib/Crypto/SelfTest/Cipher/common.py @@ -30,8 +30,9 @@ __revision__ = "$Id$" import sys import unittest -from binascii import a2b_hex, b2a_hex +from binascii import a2b_hex, b2a_hex, hexlify from Crypto.Util.py3compat import * +from Crypto.Util.strxor import strxor_c # For compatibility with Python 2.1 and Python 2.2 if sys.hexversion < 0x02030000: @@ -68,14 +69,24 @@ class CipherSelfTest(unittest.TestCase): self.plaintext = b(_extract(params, 'plaintext')) self.ciphertext = b(_extract(params, 'ciphertext')) self.module_name = _extract(params, 'module_name', None) + self.assoc_data = _extract(params, 'assoc_data', None) + if self.assoc_data: + self.assoc_data = b(self.assoc_data) + self.mac = _extract(params, 'mac', None) + if self.assoc_data: + self.mac = b(self.mac) mode = _extract(params, 'mode', None) self.mode_name = str(mode) if mode is not None: # Block cipher self.mode = getattr(self.module, "MODE_" + mode) + self.iv = _extract(params, 'iv', None) - if self.iv is not None: self.iv = b(self.iv) + if self.iv is None: + self.iv = _extract(params, 'nonce', None) + if self.iv is not None: + self.iv = b(self.iv) # Only relevant for OPENPGP mode self.encrypted_iv = _extract(params, 'encrypted_iv', None) @@ -122,26 +133,49 @@ class CipherSelfTest(unittest.TestCase): def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) - - ct1 = b2a_hex(self._new().encrypt(plaintext)) - pt1 = b2a_hex(self._new(1).decrypt(ciphertext)) - ct2 = b2a_hex(self._new().encrypt(plaintext)) - pt2 = b2a_hex(self._new(1).decrypt(ciphertext)) + assoc_data = None + if self.assoc_data: + assoc_data = a2b_hex(self.assoc_data) + + ct = None + pt = None + + # + # Repeat the same encryption or decryption twice and verify + # that the result is always the same + # + for i in xrange(2): + cipher = self._new() + decipher = self._new(1) + + # Only AEAD modes + if self.assoc_data: + cipher.update(assoc_data) + decipher.update(assoc_data) + + ctX = b2a_hex(cipher.encrypt(plaintext)) + ptX = b2a_hex(decipher.decrypt(ciphertext)) + + if ct: + self.assertEqual(ct, ctX) + self.assertEqual(pt, ptX) + ct, pt = ctX, ptX if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP: # In PGP mode, data returned by the first encrypt() # is prefixed with the encrypted IV. # Here we check it and then remove it from the ciphertexts. eilen = len(self.encrypted_iv) - self.assertEqual(self.encrypted_iv, ct1[:eilen]) - self.assertEqual(self.encrypted_iv, ct2[:eilen]) - ct1 = ct1[eilen:] - ct2 = ct2[eilen:] + self.assertEqual(self.encrypted_iv, ct[:eilen]) + ct = ct[eilen:] - self.assertEqual(self.ciphertext, ct1) # encrypt - self.assertEqual(self.ciphertext, ct2) # encrypt (second time) - self.assertEqual(self.plaintext, pt1) # decrypt - self.assertEqual(self.plaintext, pt2) # decrypt (second time) + self.assertEqual(self.ciphertext, ct) # encrypt + self.assertEqual(self.plaintext, pt) # decrypt + + if self.mac: + mac = b2a_hex(cipher.digest()) + self.assertEqual(self.mac, mac) + decipher.verify(a2b_hex(self.mac)) class CipherStreamingSelfTest(CipherSelfTest): @@ -252,6 +286,258 @@ class CFBSegmentSizeTest(unittest.TestCase): self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), self.module.MODE_CFB, segment_size=i) self.module.new(a2b_hex(self.key), self.module.MODE_CFB, "\0"*self.module.block_size, segment_size=8) # should succeed +class CCMMACLengthTest(unittest.TestCase): + """CCM specific tests about MAC""" + + def __init__(self, module): + unittest.TestCase.__init__(self) + self.module = module + self.key = b('\xFF')*16 + self.iv = b('\x00')*10 + + def shortDescription(self): + return self.description + + def runTest(self): + """Verify that MAC can only be 4,6,8,..,16 bytes long.""" + for i in range(3,16,2): + self.description = "CCM MAC length check (%d bytes)" % i + self.assertRaises(ValueError, self.module.new, self.key, + self.module.MODE_CCM, self.iv, msg_len=10, mac_len=i) + + """Verify that default MAC length is 16.""" + self.description = "CCM default MAC length check" + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv, msg_len=4) + cipher.encrypt(b('z')*4) + self.assertEqual(len(cipher.digest()), 16) + +class CCMSplitEncryptionTest(unittest.TestCase): + """CCM specific tests to validate how encrypt() + decrypt() can be called multiple times on the + same object.""" + + def __init__(self, module): + unittest.TestCase.__init__(self) + self.module = module + self.key = b('\xFF')*16 + self.iv = b('\x00')*10 + self.description = "CCM Split Encryption Test" + + def shortDescription(self): + return self.description + + def runTest(self): + """Verify that CCM update()/encrypt() can be called multiple times, + provided that lengths are declared beforehand""" + + data = b("AUTH DATA") + pt1 = b("PLAINTEXT1") # Short + pt2 = b("PLAINTEXT2") # Long + pt_ref = pt1+pt2 + + # REFERENCE: Run with 1 update() and 1 encrypt() + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv) + cipher.update(data) + ct_ref = cipher.encrypt(pt_ref) + mac_ref = cipher.digest() + + # Verify that calling CCM encrypt()/decrypt() twice is not + # possible without the 'msg_len' parameter and regardless + # of the 'assoc_len' parameter + for ad_len in None, len(data): + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv, assoc_len=ad_len) + cipher.update(data) + cipher.encrypt(pt1) + self.assertRaises(TypeError, cipher.encrypt, pt2) + + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv, assoc_len=ad_len) + cipher.update(data) + cipher.decrypt(ct_ref[:len(pt1)]) + self.assertRaises(TypeError, cipher.decrypt, ct_ref[len(pt1):]) + + # Run with 2 encrypt()/decrypt(). Results must be the same + # regardless of the 'assoc_len' parameter + for ad_len in None, len(data): + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv, assoc_len=ad_len, msg_len=len(pt_ref)) + cipher.update(data) + ct = cipher.encrypt(pt1) + ct += cipher.encrypt(pt2) + mac = cipher.digest() + self.assertEqual(ct_ref, ct) + self.assertEqual(mac_ref, mac) + + cipher = self.module.new(self.key, self.module.MODE_CCM, + self.iv, msg_len=len(pt1+pt2)) + cipher.update(data) + pt = cipher.decrypt(ct[:len(pt1)]) + pt += cipher.decrypt(ct[len(pt1):]) + mac = cipher.verify(mac_ref) + self.assertEqual(pt_ref, pt) + +class AEADTests(unittest.TestCase): + """Tests generic to all AEAD modes""" + + def __init__(self, module, mode_name): + unittest.TestCase.__init__(self) + self.module = module + self.mode_name = mode_name + self.mode = getattr(module, mode_name) + self.key = b('\xFF')*16 + self.iv = b('\x00')*10 + self.description = "AEAD Test" + + def right_mac_test(self): + """Positive tests for MAC""" + + self.description = "Test for right MAC in %s of %s" % \ + (self.mode_name, self.module.__name__) + + ad_ref = b("Reference AD") + pt_ref = b("Reference plaintext") + + # Encrypt and create the reference MAC + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.update(ad_ref) + ct_ref = cipher.encrypt(pt_ref) + mac_ref = cipher.digest() + + # Decrypt and verify that MAC is accepted + decipher = self.module.new(self.key, self.mode, self.iv) + decipher.update(ad_ref) + pt = decipher.decrypt(ct_ref) + decipher.verify(mac_ref) + self.assertEqual(pt, pt_ref) + + # Verify that hexverify work + decipher.hexverify(hexlify(mac_ref)) + + def wrong_mac_test(self): + """Negative tests for MAC""" + + self.description = "Test for wrong MAC in %s of %s" % \ + (self.mode_name, self.module.__name__) + + ad_ref = b("Reference AD") + pt_ref = b("Reference plaintext") + + # Encrypt and create the reference MAC + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.update(ad_ref) + ct_ref = cipher.encrypt(pt_ref) + mac_ref = cipher.digest() + + # Modify the MAC and verify it is NOT ACCEPTED + wrong_mac = strxor_c(mac_ref, 255) + decipher = self.module.new(self.key, self.mode, self.iv) + decipher.update(ad_ref) + pt = decipher.decrypt(ct_ref) + self.assertRaises(ValueError, decipher.verify, wrong_mac) + + def zero_data(self): + """Verify transition from INITIALIZED to FINISHED""" + + self.description = "Test for zero data in %s of %s" % \ + (self.mode_name, self.module.__name__) + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.digest() + + def multiple_updates(self): + """Verify that update() can be called multiple times""" + + self.description = "Test for multiple updates in %s of %s" % \ + (self.mode_name, self.module.__name__) + + ad = b("").join([bchr(x) for x in xrange(0,128)]) + + mac1, mac2, mac3 = (None,)*3 + for chunk_length in 1,10,40,80,128: + chunks = [ad[i:i+chunk_length] for i in range(0, len(ad), chunk_length)] + + # No encryption/decryption + cipher = self.module.new(self.key, self.mode, self.iv) + for c in chunks: + cipher.update(c) + if mac1: + cipher.verify(mac1) + else: + mac1 = cipher.digest() + + # Encryption + cipher = self.module.new(self.key, self.mode, self.iv) + for c in chunks: + cipher.update(c) + ct = cipher.encrypt(b("PT")) + mac2 = cipher.digest() + + # Decryption + cipher = self.module.new(self.key, self.mode, self.iv) + for c in chunks: + cipher.update(c) + cipher.decrypt(ct) + cipher.verify(mac2) + + def no_mix_encrypt_decrypt(self): + """Verify that encrypt and decrypt cannot be mixed up""" + + self.description = "Test for mix of encrypt and decrypt in %s of %s" % \ + (self.mode_name, self.module.__name__) + + # Calling decrypt after encrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.encrypt(b("PT")) + self.assertRaises(TypeError, cipher.decrypt, b("XYZ")) + + # Calling encrypt after decrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.decrypt(b("CT")) + self.assertRaises(TypeError, cipher.encrypt, b("XYZ")) + + # Calling verify after encrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.encrypt(b("PT")) + self.assertRaises(TypeError, cipher.verify, b("XYZ")) + self.assertRaises(TypeError, cipher.hexverify, "12") + + # Calling digest after decrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.decrypt(b("CT")) + self.assertRaises(TypeError, cipher.digest) + self.assertRaises(TypeError, cipher.hexdigest) + + def no_late_update(self): + """Verify that update cannot be called after encrypt or decrypt""" + + self.description = "Test for late update in %s of %s" % \ + (self.mode_name, self.module.__name__) + + # Calling update after encrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.update(b("XX")) + cipher.encrypt(b("PT")) + self.assertRaises(TypeError, cipher.update, b("XYZ")) + + # Calling update after decrypt raises an exception + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.update(b("XX")) + cipher.decrypt(b("CT")) + self.assertRaises(TypeError, cipher.update, b("XYZ")) + + def runTest(self): + self.right_mac_test() + self.wrong_mac_test() + self.zero_data() + self.multiple_updates() + self.no_mix_encrypt_decrypt() + self.no_late_update() + + def shortDescription(self): + return self.description + class RoundtripTest(unittest.TestCase): def __init__(self, module, params): from Crypto import Random @@ -310,6 +596,10 @@ class IVLengthTest(unittest.TestCase): self.module.MODE_OFB, "") self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), self.module.MODE_OPENPGP, "") + if hasattr(self.module, "MODE_CCM"): + for ivlen in (0,6,14): + self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), + self.module.MODE_CCM, bchr(0)*ivlen, msg_len=10) self.module.new(a2b_hex(self.key), self.module.MODE_ECB, "") self.module.new(a2b_hex(self.key), self.module.MODE_CTR, "", counter=self._dummy_counter) @@ -367,6 +657,13 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()): ] extra_tests_added = 1 + # Extract associated data and MAC for AEAD modes + if p_mode == 'CCM': + assoc_data, params['plaintext'] = params['plaintext'].split('|') + assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|') + params['assoc_data'] = assoc_data + params['mac_len'] = len(params['mac'])>>1 + # Add the current test to the test suite tests.append(CipherSelfTest(module, params)) @@ -383,6 +680,19 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()): if not params2['ctr_params'].has_key('disable_shortcut'): params2['ctr_params']['disable_shortcut'] = 1 tests.append(CipherSelfTest(module, params2)) + + # Add tests that don't use test vectors + if hasattr(module, "MODE_CCM"): + tests += [ + CCMMACLengthTest(module), + CCMSplitEncryptionTest(module), + ] + for aead_mode in ("MODE_CCM",): + if hasattr(module, aead_mode): + tests += [ + AEADTests(module, aead_mode), + ] + return tests def make_stream_tests(module, module_name, test_data): diff --git a/lib/Crypto/SelfTest/Cipher/test_AES.py b/lib/Crypto/SelfTest/Cipher/test_AES.py index 8fd4a6f..878f56b 100644 --- a/lib/Crypto/SelfTest/Cipher/test_AES.py +++ b/lib/Crypto/SelfTest/Cipher/test_AES.py @@ -1445,6 +1445,240 @@ test_data = [ '5baa61e4c9b93f3f0682250b6cf8331b', # Key (hash of 'password') 'GPG Test Vector #1', dict(mode='OPENPGP', iv='3d7d3e62282add7eb203eeba5c800733', encrypted_iv='fd934601ef49cb58b6d9aebca6056bdb96ef' ) ), + + # NIST SP 800-38C test vectors for CCM + # This is a list of tuples with 5 items: + # + # 1. Associated data + '|' + plaintext + # 2. Associated data + '|' + ciphertext + '|' + MAC + # 3. AES-128 key + # 4. Description + # 5. Dictionary of parameters to be passed to AES.new(). + # It must include the nonce. + # + ( '0001020304050607|20212223', + '0001020304050607|7162015b|4dac255d', + '404142434445464748494a4b4c4d4e4f', + 'NIST SP 800-38C Appex C.1', + dict(mode='CCM', nonce='10111213141516') + ), + ( '000102030405060708090a0b0c0d0e0f|202122232425262728292a2b2c2d2e2f', + '000102030405060708090a0b0c0d0e0f|d2a1f0e051ea5f62081a7792073d593d|1fc64fbfaccd', + '404142434445464748494a4b4c4d4e4f', + 'NIST SP 800-38C Appex C.2', + dict(mode='CCM', nonce='1011121314151617') + ), + ( '000102030405060708090a0b0c0d0e0f10111213|'+ + '202122232425262728292a2b2c2d2e2f3031323334353637', + '000102030405060708090a0b0c0d0e0f10111213|'+ + 'e3b201a9f5b71a7a9b1ceaeccd97e70b6176aad9a4428aa5|484392fbc1b09951', + '404142434445464748494a4b4c4d4e4f', + 'NIST SP 800-38C Appex C.3', + dict(mode='CCM', nonce='101112131415161718191a1b') + ), + ( + (''.join(["%02X" % (x*16+y) for x in xrange(0,16) for y in xrange(0,16)]))*256+'|'+ + '202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f', + (''.join(["%02X" % (x*16+y) for x in xrange(0,16) for y in xrange(0,16)]))*256+'|'+ + '69915dad1e84c6376a68c2967e4dab615ae0fd1faec44cc484828529463ccf72|'+ + 'b4ac6bec93e8598e7f0dadbcea5b', + '404142434445464748494a4b4c4d4e4f', + 'NIST SP 800-38C Appex C.4', + dict(mode='CCM', nonce='101112131415161718191a1b1c') + ), + # RFC3610 test vectors + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e', + '0001020304050607|588c979a61c663d2f066d0c2c0f989806d5f6b61dac384|'+ + '17e8d12cfdf926e0', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #1', + dict(mode='CCM', nonce='00000003020100a0a1a2a3a4a5') + ), + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f', + '0001020304050607|72c91a36e135f8cf291ca894085c87e3cc15c439c9e43a3b|'+ + 'a091d56e10400916', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #2', + dict(mode='CCM', nonce='00000004030201a0a1a2a3a4a5') + ), + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20', + '0001020304050607|51b1e5f44a197d1da46b0f8e2d282ae871e838bb64da859657|'+ + '4adaa76fbd9fb0c5', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #3', + dict(mode='CCM', nonce='00000005040302A0A1A2A3A4A5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e', + '000102030405060708090a0b|a28c6865939a9a79faaa5c4c2a9d4a91cdac8c|'+ + '96c861b9c9e61ef1', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #4', + dict(mode='CCM', nonce='00000006050403a0a1a2a3a4a5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f', + '000102030405060708090a0b|dcf1fb7b5d9e23fb9d4e131253658ad86ebdca3e|'+ + '51e83f077d9c2d93', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #5', + dict(mode='CCM', nonce='00000007060504a0a1a2a3a4a5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f20', + '000102030405060708090a0b|6fc1b011f006568b5171a42d953d469b2570a4bd87|'+ + '405a0443ac91cb94', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #6', + dict(mode='CCM', nonce='00000008070605a0a1a2a3a4a5') + ), + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e', + '0001020304050607|0135d1b2c95f41d5d1d4fec185d166b8094e999dfed96c|'+ + '048c56602c97acbb7490', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #7', + dict(mode='CCM', nonce='00000009080706a0a1a2a3a4a5') + ), + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f', + '0001020304050607|7b75399ac0831dd2f0bbd75879a2fd8f6cae6b6cd9b7db24|'+ + 'c17b4433f434963f34b4', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #8', + dict(mode='CCM', nonce='0000000a090807a0a1a2a3a4a5') + ), + ( + '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20', + '0001020304050607|82531a60cc24945a4b8279181ab5c84df21ce7f9b73f42e197|'+ + 'ea9c07e56b5eb17e5f4e', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #9', + dict(mode='CCM', nonce='0000000b0a0908a0a1a2a3a4a5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e', + '000102030405060708090a0b|07342594157785152b074098330abb141b947b|'+ + '566aa9406b4d999988dd', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #10', + dict(mode='CCM', nonce='0000000c0b0a09a0a1a2a3a4a5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f', + '000102030405060708090a0b|676bb20380b0e301e8ab79590a396da78b834934|'+ + 'f53aa2e9107a8b6c022c', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #11', + dict(mode='CCM', nonce='0000000d0c0b0aa0a1a2a3a4a5') + ), + ( + '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f20', + '000102030405060708090a0b|c0ffa0d6f05bdb67f24d43a4338d2aa4bed7b20e43|'+ + 'cd1aa31662e7ad65d6db', + 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf', + 'RFC3610 Packet Vector #12', + dict(mode='CCM', nonce='0000000e0d0c0ba0a1a2a3a4a5') + ), + ( + '0be1a88bace018b1|08e8cf97d820ea258460e96ad9cf5289054d895ceac47c', + '0be1a88bace018b1|4cb97f86a2a4689a877947ab8091ef5386a6ffbdd080f8|'+ + 'e78cf7cb0cddd7b3', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #13', + dict(mode='CCM', nonce='00412b4ea9cdbe3c9696766cfa') + ), + ( + '63018f76dc8a1bcb|9020ea6f91bdd85afa0039ba4baff9bfb79c7028949cd0ec', + '63018f76dc8a1bcb|4ccb1e7ca981befaa0726c55d378061298c85c92814abc33|'+ + 'c52ee81d7d77c08a', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #14', + dict(mode='CCM', nonce='0033568ef7b2633c9696766cfa') + ), + ( + 'aa6cfa36cae86b40|b916e0eacc1c00d7dcec68ec0b3bbb1a02de8a2d1aa346132e', + 'aa6cfa36cae86b40|b1d23a2220ddc0ac900d9aa03c61fcf4a559a4417767089708|'+ + 'a776796edb723506', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #15', + dict(mode='CCM', nonce='00103fe41336713c9696766cfa') + ), + ( + 'd0d0735c531e1becf049c244|12daac5630efa5396f770ce1a66b21f7b2101c', + 'd0d0735c531e1becf049c244|14d253c3967b70609b7cbb7c49916028324526|'+ + '9a6f49975bcadeaf', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #16', + dict(mode='CCM', nonce='00764c63b8058e3c9696766cfa') + ), + ( + '77b60f011c03e1525899bcae|e88b6a46c78d63e52eb8c546efb5de6f75e9cc0d', + '77b60f011c03e1525899bcae|5545ff1a085ee2efbf52b2e04bee1e2336c73e3f|'+ + '762c0c7744fe7e3c', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #17', + dict(mode='CCM', nonce='00f8b678094e3b3c9696766cfa') + ), + ( + 'cd9044d2b71fdb8120ea60c0|6435acbafb11a82e2f071d7ca4a5ebd93a803ba87f', + 'cd9044d2b71fdb8120ea60c0|009769ecabdf48625594c59251e6035722675e04c8|'+ + '47099e5ae0704551', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #18', + dict(mode='CCM', nonce='00d560912d3f703c9696766cfa') + ), + ( + 'd85bc7e69f944fb8|8a19b950bcf71a018e5e6701c91787659809d67dbedd18', + 'd85bc7e69f944fb8|bc218daa947427b6db386a99ac1aef23ade0b52939cb6a|'+ + '637cf9bec2408897c6ba', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #19', + dict(mode='CCM', nonce='0042fff8f1951c3c9696766cfa') + ), + ( + '74a0ebc9069f5b37|1761433c37c5a35fc1f39f406302eb907c6163be38c98437', + '74a0ebc9069f5b37|5810e6fd25874022e80361a478e3e9cf484ab04f447efff6|'+ + 'f0a477cc2fc9bf548944', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #20', + dict(mode='CCM', nonce='00920f40e56cdc3c9696766cfa') + ), + ( + '44a3aa3aae6475ca|a434a8e58500c6e41530538862d686ea9e81301b5ae4226bfa', + '44a3aa3aae6475ca|f2beed7bc5098e83feb5b31608f8e29c38819a89c8e776f154|'+ + '4d4151a4ed3a8b87b9ce', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #21', + dict(mode='CCM', nonce='0027ca0c7120bc3c9696766cfa') + ), + ( + 'ec46bb63b02520c33c49fd70|b96b49e21d621741632875db7f6c9243d2d7c2', + 'ec46bb63b02520c33c49fd70|31d750a09da3ed7fddd49a2032aabf17ec8ebf|'+ + '7d22c8088c666be5c197', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #22', + dict(mode='CCM', nonce='005b8ccbcd9af83c9696766cfa') + ), + ( + '47a65ac78b3d594227e85e71|e2fcfbb880442c731bf95167c8ffd7895e337076', + '47a65ac78b3d594227e85e71|e882f1dbd38ce3eda7c23f04dd65071eb41342ac|'+ + 'df7e00dccec7ae52987d', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #23', + dict(mode='CCM', nonce='003ebe94044b9a3c9696766cfa') + ), + ( + '6e37a6ef546d955d34ab6059|abf21c0b02feb88f856df4a37381bce3cc128517d4', + '6e37a6ef546d955d34ab6059|f32905b88a641b04b9c9ffb58cc390900f3da12ab1|'+ + '6dce9e82efa16da62059', + 'd7828d13b2b0bdc325a76236df93cc6b', + 'RFC3610 Packet Vector #24', + dict(mode='CCM', nonce='008d493b30ae8b3c9696766cfa') + ), ] def get_tests(config={}): diff --git a/pct-speedtest.py b/pct-speedtest.py index e84cf81..378604f 100644 --- a/pct-speedtest.py +++ b/pct-speedtest.py @@ -167,6 +167,8 @@ class Benchmark: elif mode == "CTR-LE": from Crypto.Util import Counter cipher = module.new(key, module.MODE_CTR, counter=Counter.new(module.block_size*8, little_endian=True)) + elif hasattr(module, 'MODE_CCM') and mode==module.MODE_CCM: + cipher = module.new(key, mode, iv[:8], msg_len=len(rand)*len(blocks)) elif mode==module.MODE_CTR: ctr = Crypto.Util.Counter.new(module.block_size*8, initial_value=bytes_to_long(iv), @@ -359,6 +361,8 @@ class Benchmark: self.test_encryption("%s-OPENPGP" % (cipher_name,), module, key_bytes, module.MODE_OPENPGP) self.test_encryption("%s-CTR-BE" % (cipher_name,), module, key_bytes, "CTR-BE") self.test_encryption("%s-CTR-LE" % (cipher_name,), module, key_bytes, "CTR-LE") + if hasattr(module, "MODE_CCM"): + self.test_encryption("%s-CCM" % (cipher_name,), module, key_bytes, module.MODE_CCM) # Crypto.Cipher (stream ciphers) for cipher_name, module, key_bytes in stream_specs: |