diff options
author | Legrandin <helderijs@gmail.com> | 2013-08-18 23:07:32 +0200 |
---|---|---|
committer | Dwayne Litzenberger <dlitz@dlitz.net> | 2013-10-20 13:30:22 -0700 |
commit | 2062e5f843fa54d9da45e5d53652f3fb5dbbd97e (patch) | |
tree | 46251d5d19d1a2c8563f42eed95905416e134043 /lib | |
parent | 965871a72773457d73fda6a1a2970a4279dcbe6f (diff) | |
download | pycrypto-2062e5f843fa54d9da45e5d53652f3fb5dbbd97e.tar.gz |
Add encrypt_and_digest() and decrypt_and_verify()
This patch adds encrypt_and_digest() and decrypt_and_verify()
methods to a cipher object.
In most cases they are just shortcuts to the existing functions.
For SIV mode, decrypt_and_verify() replaces decrypt().
[dlitz@dlitz.net: Squashed with bugfix commit:]
Bug in encrypt_and_digest() (all AEAD modes)
decrypt() was being called instead of encrypt().
Added also a unit test to validate that composition
of encrypt_and_digest() and decrypt_and_verify()
is the identity function.
[dlitz@dlitz.net: Included changes from the following commit from the author's pull request:]
- [9c13f9c] Rename 'IV' parameter to 'nonce' for AEAD modes.
[dlitz@dlitz.net: Whitespace fixed with "git rebase --whitespace=fix"]
[dlitz@dlitz.net: Replaced MacMismatchError with ValueError]
[dlitz@dlitz.net: Replaced ApiUsageError with TypeError]
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Crypto/Cipher/blockalgo.py | 74 | ||||
-rw-r--r-- | lib/Crypto/SelfTest/Cipher/common.py | 69 |
2 files changed, 103 insertions, 40 deletions
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py index 0d21f44..2864f6b 100644 --- a/lib/Crypto/Cipher/blockalgo.py +++ b/lib/Crypto/Cipher/blockalgo.py @@ -822,7 +822,11 @@ class BlockAlgo: res = self._cipher.decrypt(ciphertext) return res - if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV, MODE_GCM): + if self.mode == MODE_SIV: + raise TypeError("decrypt() not allowed for SIV mode." + " Use decrypt_and_verify() instead.") + + if self.mode in (MODE_CCM, MODE_EAX, MODE_GCM): if self.decrypt not in self._next: raise TypeError("decrypt() can only be called after initialization or an update()") @@ -849,27 +853,11 @@ class BlockAlgo: if self.mode == MODE_EAX: self._omac[2].update(ciphertext) - if self.mode == MODE_SIV: - self._next = [self.verify] - - # Take the MAC and start the cipher for decryption - self._mac = ciphertext[-self._factory.block_size:] - self._cipher = self._siv_ctr_cipher(self._mac) - - # Remove MAC from ciphertext - ciphertext = ciphertext[:-self._factory.block_size] - pt = self._cipher.decrypt(ciphertext) if self.mode == MODE_CCM: self._cipherMAC.update(pt) - if self.mode == MODE_SIV: - if self.nonce: - self._cipherMAC.update(self.nonce) - if pt: - self._cipherMAC.update(pt) - return pt def digest(self): @@ -987,3 +975,55 @@ class BlockAlgo: """ self.verify(unhexlify(hex_mac_tag)) + + def encrypt_and_digest(self, plaintext): + """Perform encrypt() and digest() in one step. + + :Parameters: + plaintext : byte string + The piece of data to encrypt. + :Return: + a tuple with two byte strings: + + - the encrypted data + - the MAC + """ + + return self.encrypt(plaintext), self.digest() + + def decrypt_and_verify(self, ciphertext, mac_tag): + """Perform decrypt() and verify() in one step. + + :Parameters: + ciphertext : byte string + The piece of data to decrypt. + mac_tag : byte string + This is the *binary* MAC, as received from the sender. + + :Return: the decrypted data (byte string). + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + if self.mode == MODE_SIV: + if self.decrypt not in self._next: + raise TypeError("decrypt() can only be called" + " after initialization or an update()") + self._next = [self.verify] + + # Take the MAC and start the cipher for decryption + self._mac = mac_tag + self._cipher = self._siv_ctr_cipher(self._mac) + + pt = self._cipher.decrypt(ciphertext) + + if self.nonce: + self._cipherMAC.update(self.nonce) + if pt: + self._cipherMAC.update(pt) + else: + pt = self.decrypt(ciphertext) + + self.verify(mac_tag) + return pt diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py index 727086d..420b6ff 100644 --- a/lib/Crypto/SelfTest/Cipher/common.py +++ b/lib/Crypto/SelfTest/Cipher/common.py @@ -162,7 +162,7 @@ class CipherSelfTest(unittest.TestCase): ctX = b2a_hex(cipher.encrypt(plaintext)) if self.isMode("SIV"): - ptX = b2a_hex(decipher.decrypt(ciphertext+a2b_hex(self.mac))) + ptX = b2a_hex(decipher.decrypt_and_verify(ciphertext, a2b_hex(self.mac))) else: ptX = b2a_hex(decipher.decrypt(ciphertext)) @@ -427,11 +427,7 @@ class AEADTests(unittest.TestCase): # Decrypt and verify that MAC is accepted decipher = self.module.new(self.key, self.mode, self.iv) decipher.update(ad_ref) - if not self.isMode("SIV"): - pt = decipher.decrypt(ct_ref) - else: - pt = decipher.decrypt(ct_ref+mac_ref) - decipher.verify(mac_ref) + pt = decipher.decrypt_and_verify(ct_ref, mac_ref) self.assertEqual(pt, pt_ref) # Verify that hexverify work @@ -456,8 +452,8 @@ class AEADTests(unittest.TestCase): wrong_mac = strxor_c(mac_ref, 255) decipher = self.module.new(self.key, self.mode, self.iv) decipher.update(ad_ref) - pt = decipher.decrypt(ct_ref) - self.assertRaises(ValueError, decipher.verify, wrong_mac) + self.assertRaises(ValueError, decipher.decrypt_and_verify, + ct_ref, wrong_mac) def zero_data(self): """Verify transition from INITIALIZED to FINISHED""" @@ -522,10 +518,12 @@ class AEADTests(unittest.TestCase): cipher.encrypt(b("PT")*40) self.assertRaises(TypeError, cipher.decrypt, b("XYZ")*40) - # Calling encrypt after decrypt raises an exception - cipher = self.module.new(self.key, self.mode, self.iv) - cipher.decrypt(b("CT")*40) - self.assertRaises(TypeError, cipher.encrypt, b("XYZ")*40) + # Calling encrypt() after decrypt() raises an exception + # (excluded for SIV, since decrypt() is not valid) + if not self.isMode("SIV"): + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.decrypt(b("CT")*40) + self.assertRaises(TypeError, cipher.encrypt, b("XYZ")*40) # Calling verify after encrypt raises an exception cipher = self.module.new(self.key, self.mode, self.iv) @@ -533,11 +531,13 @@ class AEADTests(unittest.TestCase): self.assertRaises(TypeError, cipher.verify, b("XYZ")) self.assertRaises(TypeError, cipher.hexverify, "12") - # Calling digest after decrypt raises an exception - cipher = self.module.new(self.key, self.mode, self.iv) - cipher.decrypt(b("CT")*40) - self.assertRaises(TypeError, cipher.digest) - self.assertRaises(TypeError, cipher.hexdigest) + # Calling digest() after decrypt() raises an exception + # (excluded for SIV, since decrypt() is not valid) + if not self.isMode("SIV"): + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.decrypt(b("CT")*40) + self.assertRaises(TypeError, cipher.digest) + self.assertRaises(TypeError, cipher.hexdigest) def no_late_update(self): """Verify that update cannot be called after encrypt or decrypt""" @@ -551,11 +551,33 @@ class AEADTests(unittest.TestCase): cipher.encrypt(b("PT")*40) self.assertRaises(TypeError, cipher.update, b("XYZ")) - # Calling update after decrypt raises an exception - cipher = self.module.new(self.key, self.mode, self.iv) - cipher.update(b("XX")) - cipher.decrypt(b("CT")*40) - self.assertRaises(TypeError, cipher.update, b("XYZ")) + # Calling update() after decrypt() raises an exception + # (excluded for SIV, since decrypt() is not valid) + if not self.isMode("SIV"): + cipher = self.module.new(self.key, self.mode, self.iv) + cipher.update(b("XX")) + cipher.decrypt(b("CT")*40) + self.assertRaises(TypeError, cipher.update, b("XYZ")) + + def loopback(self): + """Verify composition of encrypt_and_digest() and decrypt_and_verify() + is the identity function.""" + + self.description = "Lookback test decrypt_and_verify(encrypt_and_digest)"\ + "for %s in %s" % (self.mode_name, + self.module.__name__) + + enc_cipher = self.module.new(self.key, self.mode, self.iv) + dec_cipher = self.module.new(self.key, self.mode, self.iv) + + enc_cipher.update(b("XXX")) + dec_cipher.update(b("XXX")) + + plaintext = b("Reference") * 10 + ct, mac = enc_cipher.encrypt_and_digest(plaintext) + pt = dec_cipher.decrypt_and_verify(ct, mac) + + self.assertEqual(plaintext, pt) def runTest(self): self.right_mac_test() @@ -564,6 +586,7 @@ class AEADTests(unittest.TestCase): self.multiple_updates() self.no_mix_encrypt_decrypt() self.no_late_update() + self.loopback() def shortDescription(self): return self.description @@ -585,7 +608,7 @@ class RoundtripTest(unittest.TestCase): for mode in (self.module.MODE_ECB, self.module.MODE_CBC, self.module.MODE_CFB, self.module.MODE_OFB, self.module.MODE_OPENPGP): encryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv) ciphertext = encryption_cipher.encrypt(self.plaintext) - + if mode != self.module.MODE_OPENPGP: decryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv) else: |