summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLegrandin <helderijs@gmail.com>2013-05-22 22:18:35 +0200
committerDwayne Litzenberger <dlitz@dlitz.net>2013-10-20 13:30:21 -0700
commit199a9741a1849066d070b114333fcf90bc73c55a (patch)
treec2330517d32c7fcdf654605a079e6bb4c0854ad0
parent8bdbdb8168511018d44ef014ae21da619ae73c24 (diff)
downloadpycrypto-199a9741a1849066d070b114333fcf90bc73c55a.tar.gz
Add support for SIV (Synthetic IV) mode
This patch add supports for SIV, an AEAD block cipher mode defined in RFC5297. SIV is only valid for AES. The PRF of SIV (S2V) is factored out in the Protocol.KDF module. See the following example to get a feeling of the API (slightly different than other AEAD mode, during decryption). Encryption (Python 2): >>> from Crypto.Cipher import AES >>> key = b'0'*32 >>> siv = AES.new(key, AES.MODE_SIV) >>> ct = siv.encrypt(b'Message') >>> mac = siv.digest() Decryption (Python 2): >>> from Crypto.Cipher import AES, MacMismatchError >>> key = b'0'*32 >>> siv = AES.new(key, AES.MODE_SIV) >>> pt = siv.decrypt(ct + mac) >>> try: >>> siv.verify(mac) >>> print "Plaintext", pt >>> except MacMismatchError: >>> print "Error" This change also fixes the description/design of AEAD API. With SIV (RFC5297), decryption can only start when the MAC is known. The original AEAD API did not support that. For SIV the MAC is now exceptionally passed together with the ciphertext to the decrypt() method. [dlitz@dlitz.net: Included changes from the following commits from the author's pull request:] - [9c13f9c] Rename 'IV' parameter to 'nonce' for AEAD modes. - [d7727fb] Fix description/design of AEAD API. - [fb62fae] ApiUsageError becomes TypeError [whitespace] - [4ec64d8] Removed last references to ApiUsageError [whitespace] - [ee46922] Removed most 'import *' statements - [ca460a7] Made blockalgo.py more PEP-8 compliant; The second parameter of the _GHASH constructor is now the length of the block (block_size) and not the full module. [dlitz@dlitz.net: A conflict that was not resolved in the previous commit was originally resolved here. Moved the resolution to the previous commit.] [dlitz@dlitz.net: Replaced MacMismatchError with ValueError] [dlitz@dlitz.net: Replaced ApiUsageError with TypeError] [dlitz@dlitz.net: Whitespace fixed with "git rebase --whitespace=fix"]
-rw-r--r--Doc/AEAD_API.txt73
-rw-r--r--lib/Crypto/Cipher/AES.py6
-rw-r--r--lib/Crypto/Cipher/blockalgo.py168
-rw-r--r--lib/Crypto/Protocol/KDF.py90
-rw-r--r--lib/Crypto/SelfTest/Cipher/common.py74
-rw-r--r--lib/Crypto/SelfTest/Cipher/test_AES.py36
-rw-r--r--lib/Crypto/SelfTest/Protocol/test_KDF.py60
7 files changed, 444 insertions, 63 deletions
diff --git a/Doc/AEAD_API.txt b/Doc/AEAD_API.txt
index 0dcc147..8a4c363 100644
--- a/Doc/AEAD_API.txt
+++ b/Doc/AEAD_API.txt
@@ -17,9 +17,13 @@ packet header.
In this file, that is called "associated data" (AD) even though terminology
may vary from mode to mode.
+The term "MAC" is used to indicate the authentication tag generated as part
+of the encryption process. The MAC is consumed by the receiver to tell
+whether the message has been tampered with.
+
=== Goals of the AEAD API
-1. Any piece of data (AD, ciphertext, plaintext) is passed only once.
+1. Any piece of data (AD, ciphertext, plaintext, MAC) is passed only once.
2. It is possible to encrypt/decrypt huge files withe little memory cost.
To this end, authentication, encryption, and decryption are always
incremental. That is, the caller may split data in any way, and the end
@@ -116,7 +120,7 @@ In addition to the existing Cipher methods new(), encrypt() and decrypt(),
- hexdigest() as digest(), but the output is ASCII hexadecimal.
- verify() to evaluate if the binary MAC is correct. It can only be
- called at the end of decryption.
+ called at the end of decryption. It takes the MAC as input.
- hexverify() as verify(), but the input is ASCII hexadecimal.
@@ -128,13 +132,13 @@ IV/nonce to never repeat, this method may be misused and lead to security
holes.
Since MAC validation (decryption mode) is subject to timing attacks,
-the (hex)verify() method accepts as input the expected MAC and raises a
-ValueError exception if it does not match.
-Internally, the method can perform the validation using time-independent code.
+the (hex)verify() method internally performs comparison of received
+and computed MACs using time-independent code.
+A ValueError exception is issued if the MAC does not match.
=== Padding
-The proposed API only supports AEAD modes than do not require padding
+The proposed API only supports AEAD modes that do not require padding
of the plaintext. Adding support for that would make the API more complex
(for instance, an external "finished" flag to pass encrypt()).
@@ -153,8 +157,8 @@ it such results across several encryptions/decryptions.
The proposed API supports single-pass, online AEAD modes.
A "single-pass" mode processes each block of AD or data only once.
-Compare that to "2-pass" modes, where each block of data (not AD) twice:
-once for authentication and one for enciphering.
+Compare that to "2-pass" modes, where each block of data (not AD)
+is processed twice: once for authentication and once for enciphering.
An "online" mode does not need to know the size of the message before
encryption starts. As a consequence:
@@ -169,12 +173,27 @@ are in widespread use.
That is still OK, provided that encryption can start *before* authentication is
completed. If that's the case (e.g. GCM, EAX) encrypt()/decrypt() will also
take care of completing the authentication over the plaintext.
-If that's NOT the case (e.g. SIV), encrypt()/decrypt() can only be called once.
A similar problem arises with non-online modes (e.g. CCM): they have to wait to see
the end of the plaintext before encryption can start. The only way to achieve
that is to only allow encrypt()/decrypt() to be called once.
+To summarize:
+
+Single-pass and online:
+ Fully supported by the API. The mode is most likely patented.
+
+Single-pass and not-online:
+ encrypt()/decrypt() can only be called once. The mode is most likely patented.
+
+2-pass and online:
+ Fully supported by the API, but only if encryption or decryption can start
+ *before* authentication is finished.
+
+2-pass and not-online:
+ Fully supported by the API, but only if encryption or decryption can start
+ *before* authentication is finished. encrypt()/decrypt() can only be called once.
+
=== Associated Data
Depending on the mode, the associated data AD can be defined as:
@@ -199,6 +218,13 @@ For modes of the 2nd type (e.g. SIV), the API assumes that each call
to update() ingests one full item of the vector. The two examples above
are now different.
+=== MAC
+
+During encryption, the MAC is computed and returned by digest().
+
+During decryption, the received MAC is passed as a parameter to verify()
+at the very end.
+
=== CCM mode
Number of keys required: 1
Compatible with ciphers: AES (may be used with others if block length
@@ -206,6 +232,8 @@ Compatible with ciphers: AES (may be used with others if block length
Counter/IV/nonce: 1 nonce required (length 8..13 bytes)
Single-pass: no
Online: no
+Encryption can start before
+authentication is complete: yes
Term for AD: "associate data" (NIST) or "additional
authenticated data" (RFC)
Term for MAC: part of ciphertext (NIST) or
@@ -228,40 +256,49 @@ Compatible with ciphers: AES (may be used with others if block length
Counter/IV/nonce: 1 IV required (any length)
Single-pass: no
Online: yes
+Encryption can start before
+authentication is complete: yes
Term for AD: "additional authenticated data"
Term for MAC: "authentication tag" or "tag"
Padding required: no
Pre-processing of AD: possible
-Encryption can start before authentication, so encrypt()/decrypt() can always
-be called multiple times.
-
=== EAX mode
Number of keys required: 1
Compatible with ciphers: any
Counter/IV/nonce: 1 IV required (any length)
Single-pass: no
Online: yes
+Encryption can start before
+authentication is complete: yes
Term for AD: "header"
Term for MAC: "tag"
Padding required: no
Pre-processing of AD: possible
-Encryption can start before authentication, so encrypt()/decrypt() can always
-be called multiple times.
-
=== SIV
Number of keys required: 2
Compatible with ciphers: AES
Counter/IV/nonce: 1 IV required (any length)
Single-pass: no
Online: no
+Encryption can start before
+authentication is complete: no
Term for AD: "associated data" (vector)
Term for MAC: "tag"
Padding required: no
Pre-processing of AD: possible
-Encryption can only start before authentication is ended, so encrypt()/decrypt()
-an only be called once. SIV is not suitable for big files or streaming.
-
AD is a vector of strings. One item in the vector is the (optional) IV.
+
+One property of SIV is that encryption or decryption can only start
+as soon as the MAC is available.
+
+That is not a problem for encryption: since encrypt() can only be called
+once, it can internally compute the MAC first, perform the encryption,
+and return the ciphertext. The MAC is cached for the subsequent call to digest().
+
+The major problem is decryption: decrypt() cannot produce any output because
+the MAC is meant to be passed to verify() only later.
+To overcome this limitation, decrypt() *exceptionally* accepts the ciphertext
+concatenated to the MAC. The MAC check is still performed with verify().
diff --git a/lib/Crypto/Cipher/AES.py b/lib/Crypto/Cipher/AES.py
index 5cbd04b..0a53f54 100644
--- a/lib/Crypto/Cipher/AES.py
+++ b/lib/Crypto/Cipher/AES.py
@@ -120,6 +120,8 @@ def new(key, *args, **kwargs):
key : byte string
The secret key to use in the symmetric cipher.
It must be 16 (*AES-128*), 24 (*AES-192*), or 32 (*AES-256*) bytes long.
+
+ Only in `MODE_SIV`, it needs to be 32, 48, or 64 bytes long.
:Keywords:
mode : a *MODE_** constant
The chaining mode to use for encryption or decryption.
@@ -138,7 +140,7 @@ def new(key, *args, **kwargs):
For all other modes, it must be 16 bytes long.
nonce : byte string
- (*Only* `MODE_CCM`, `MODE_EAX`).
+ (*Only* `MODE_CCM`, `MODE_EAX`, `MODE_SIV`).
A mandatory value that must never be reused for any other encryption.
@@ -194,6 +196,8 @@ MODE_OPENPGP = 7
MODE_CCM = 8
#: EAX Mode. See `blockalgo.MODE_EAX`.
MODE_EAX = 9
+#: Syntethic Initialization Vector (SIV). See `blockalgo.MODE_SIV`.
+MODE_SIV = 10
#: Size of a data block (in bytes)
block_size = 16
#: Size of a key (in bytes)
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py
index be00832..aab7873 100644
--- a/lib/Crypto/Cipher/blockalgo.py
+++ b/lib/Crypto/Cipher/blockalgo.py
@@ -34,6 +34,7 @@ from Crypto.Util.strxor import strxor
from Crypto.Util.number import long_to_bytes, bytes_to_long
import Crypto.Util.Counter
from Crypto.Hash import CMAC
+from Crypto.Protocol.KDF import S2V
#: *Electronic Code Book (ECB)*.
#: This is the simplest encryption mode. Each of the plaintext blocks
@@ -211,6 +212,51 @@ MODE_CCM = 8
#: .. __: http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/proposedmodes/eax/eax-spec.pdf
MODE_EAX = 9
+#: *Synthetic Initialization Vector*. This is an Authenticated Encryption with
+#: Associated Data (`AEAD`_) mode. It provides both confidentiality and
+#: authenticity.
+#: The header of the message may be left in the clear, if needed, and it will
+#: still be subject to authentication. The decryption step tells the receiver
+#: if the message comes from a source that really knowns the secret key.
+#: Additionally, decryption detects if any part of the message - including the
+#: header - has been modified or corrupted.
+#:
+#: If the data being encrypted is completely unpredictable to an adversary
+#: (e.g. a secret key, for key wrapping purposes) a nonce is not strictly
+#: required.
+#:
+#: Otherwise, a nonce has to be provided; the nonce shall never repeat
+#: for two different messages encrypted with the same key, but it does not
+#: need to be random.
+#:
+#: Unlike other AEAD modes such as CCM, EAX or GCM, accidental reuse of a
+#: nonce is not catastrophic for the confidentiality of the message. The only
+#: effect is that an attacker can tell when the same plaintext (and same
+#: associated data) is protected with the same key.
+#:
+#: The length of the MAC is fixed to the block size of the underlying cipher.
+#: The key size is twice the length of the key of the underlying cipher.
+#:
+#: This mode is only available for AES ciphers.
+#:
+#: +--------------------+---------------+-------------------+
+#: | Cipher | SIV MAC size | SIV key length |
+#: | | (bytes) | (bytes) |
+#: +====================+===============+===================+
+#: | AES-128 | 16 | 32 |
+#: +--------------------+---------------+-------------------+
+#: | AES-192 | 16 | 48 |
+#: +--------------------+---------------+-------------------+
+#: | AES-256 | 16 | 64 |
+#: +--------------------+---------------+-------------------+
+#:
+#: See `RFC5297`_ and the `original paper`__.
+#:
+#: .. _RFC5297: https://tools.ietf.org/html/rfc5297
+#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
+#: .. __: http://www.cs.ucdavis.edu/~rogaway/papers/keywrap.pdf
+MODE_SIV = 10
+
def _getParameter(name, index, args, kwargs, default=None):
"""Find a parameter in tuple and dictionary arguments a function receives"""
@@ -265,10 +311,40 @@ class BlockAlgo:
self._start_PGP(factory, key, *args, **kwargs)
elif self.mode == MODE_EAX:
self._start_eax(factory, key, *args, **kwargs)
+ elif self.mode == MODE_SIV:
+ self._start_siv(factory, key, *args, **kwargs)
else:
self._cipher = factory.new(key, *args, **kwargs)
self.IV = self._cipher.IV
+ def _start_siv(self, factory, key, *args, **kwargs):
+
+ subkey_size, rem = divmod(len(key), 2)
+ if rem:
+ raise ValueError("MODE_SIV requires a key twice as long as for the underlying cipher")
+
+ # IV is optional
+ self.nonce = _getParameter('nonce', 1, args, kwargs)
+
+ self._prf = S2V(key[:subkey_size], ciphermod=factory)
+ self._subkey_ctr = key[subkey_size:]
+ self._mac_len = factory.block_size
+
+ # Allowed transitions after initialization
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
+
+ def _siv_ctr_cipher(self, tag):
+ """Create a new CTR cipher from the MAC in SIV mode"""
+
+ tag_int = bytes_to_long(tag)
+ init_counter = tag_int ^ (tag_int & 0x8000000080000000L)
+ ctr = Counter.new(self._factory.block_size * 8,
+ initial_value=init_counter,
+ allow_wraparound=True)
+
+ return self._factory.new(self._subkey_ctr, MODE_CTR, counter=ctr)
+
def _start_eax(self, factory, key, *args, **kwargs):
self.nonce = _getParameter('nonce', 1, args, kwargs)
@@ -409,9 +485,10 @@ class BlockAlgo:
def update(self, assoc_data):
"""Protect associated data
- When using an AEAD mode like CCM or EAX, and if there is any associated data,
- the caller has to invoke this function one or more times, before
- using ``decrypt`` or ``encrypt``.
+ When using an AEAD mode like CCM, EAX or SIV, and
+ if there is any associated data, the caller has to invoke
+ this function one or more times, before using
+ ``decrypt`` or ``encrypt``.
By *associated data* it is meant any data (e.g. packet headers) that
will not be encrypted and will be transmitted in the clear.
@@ -429,7 +506,7 @@ class BlockAlgo:
A piece of associated data. There are no restrictions on its size.
"""
- if self.mode in (MODE_CCM, MODE_EAX):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
if self.update not in self._next:
raise TypeError("update() can only be called immediately after initialization")
self._next = [self.update, self.encrypt, self.decrypt,
@@ -464,19 +541,30 @@ class BlockAlgo:
self._omac[1].update(assoc_data)
return
+ if self.mode == MODE_SIV:
+ self._prf.update(assoc_data)
+ return
+
raise ValueError("update() not supported by this mode of operation")
def encrypt(self, plaintext):
"""Encrypt data with the key and the parameters set at initialization.
- The cipher object is stateful; encryption of a long block
- of data can be broken up in two or more calls to `encrypt()`.
+ A cipher object is stateful: once you have encrypted a message
+ you cannot encrypt (or decrypt) another message using the same
+ object.
+
+ For `MODE_SIV` (always) and `MODE_CCM` (when ``msg_len`` was not
+ passed at initialization), this method can be called only **once**.
+
+ For all other modes, the data to encrypt can be broken up in two or
+ more pieces and `encrypt` can be called multiple times.
That is, the statement:
>>> c.encrypt(a) + c.encrypt(b)
- is always equivalent to:
+ is equivalent to:
>>> c.encrypt(a+b)
@@ -491,7 +579,7 @@ class BlockAlgo:
- For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX`
+ - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, `MODE_EAX` and `MODE_SIV`
*plaintext* can be of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
@@ -526,7 +614,7 @@ class BlockAlgo:
self._done_first_block = True
return res
- if self.mode in (MODE_CCM, MODE_EAX):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
if self.encrypt not in self._next:
raise TypeError("encrypt() can only be called after initialization or an update()")
self._next = [self.encrypt, self.digest]
@@ -543,6 +631,15 @@ class BlockAlgo:
self._update(plaintext)
+ if self.mode == MODE_SIV:
+ self._next = [self.digest]
+
+ if self.nonce:
+ self._prf.update(self.nonce)
+
+ self._prf.update(plaintext)
+ self._cipher = self._siv_ctr_cipher(self._prf.derive())
+
ct = self._cipher.encrypt(plaintext)
if self.mode == MODE_EAX:
@@ -553,14 +650,21 @@ class BlockAlgo:
def decrypt(self, ciphertext):
"""Decrypt data with the key and the parameters set at initialization.
- The cipher object is stateful; decryption of a long block
- of data can be broken up in two or more calls to `decrypt()`.
+ A cipher object is stateful: once you have decrypted a message
+ you cannot decrypt (or encrypt) another message with the same
+ object.
+
+ For `MODE_SIV` (always) and `MODE_CCM` (when ``msg_len`` was not
+ passed at initialization), this method can be called only **once**.
+
+ For all other modes, the data to decrypt can be broken up in two or
+ more pieces and `decrypt` can be called multiple times.
That is, the statement:
>>> c.decrypt(a) + c.decrypt(b)
- is always equivalent to:
+ is equivalent to:
>>> c.decrypt(a+b)
@@ -575,16 +679,20 @@ class BlockAlgo:
- For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM`, and `MODE_EAX`, *ciphertext* can be
- of any length.
+ - For `MODE_OFB`, `MODE_CTR`, `MODE_CCM` and `MODE_EAX`
+ *ciphertext* can be of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
unless it is the last chunk of the message.
+ - For `MODE_SIV`, *ciphertext* can be of any length, but it must also
+ include the MAC (concatenated at the end).
+
:Parameters:
ciphertext : byte string
- The piece of data to decrypt.
- :Return: the decrypted data (byte string, as long as *ciphertext*).
+ The piece of data to decrypt (plus the MAC, for `MODE_SIV` only).
+
+ :Return: the decrypted data (byte string).
"""
if self.mode == MODE_OPENPGP:
@@ -603,7 +711,7 @@ class BlockAlgo:
res = self._cipher.decrypt(ciphertext)
return res
- if self.mode in (MODE_CCM, MODE_EAX):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
if self.decrypt not in self._next:
raise TypeError("decrypt() can only be called after initialization or an update()")
@@ -622,11 +730,27 @@ class BlockAlgo:
if self.mode == MODE_EAX:
self._omac[2].update(ciphertext)
+ if self.mode == MODE_SIV:
+ self._next = [self.verify]
+
+ # Take the MAC and start the cipher for decryption
+ self._mac = ciphertext[-self._factory.block_size:]
+ self._cipher = self._siv_ctr_cipher(self._mac)
+
+ # Remove MAC from ciphertext
+ ciphertext = ciphertext[:-self._factory.block_size]
+
pt = self._cipher.decrypt(ciphertext)
if self.mode == MODE_CCM:
self._update(pt)
+ if self.mode == MODE_SIV:
+ if self.nonce:
+ self._prf.update(self.nonce)
+ if pt:
+ self._prf.update(pt)
+
return pt
def digest(self):
@@ -641,7 +765,7 @@ class BlockAlgo:
:Return: the MAC, as a byte string.
"""
- if self.mode in (MODE_CCM, MODE_EAX):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
if self.digest not in self._next:
raise TypeError("digest() cannot be called when decrypting or validating a message")
@@ -662,6 +786,9 @@ class BlockAlgo:
for i in xrange(3):
tag = strxor(tag, self._omac[i].digest())
+ if self.mode == MODE_SIV:
+ tag = self._prf.derive()
+
return tag
raise TypeError("digest() not supported by this mode of operation")
@@ -693,7 +820,7 @@ class BlockAlgo:
or the key is incorrect.
"""
- if self.mode in (MODE_CCM, MODE_EAX):
+ if self.mode in (MODE_CCM, MODE_EAX, MODE_SIV):
if self.verify not in self._next:
raise TypeError("verify() cannot be called when encrypting a message")
self._next = [self.verify]
@@ -714,6 +841,9 @@ class BlockAlgo:
u = strxor(u, self._omac[i].digest())
u = u[:self._mac_len]
+ if self.mode == MODE_SIV:
+ u = self._prf.derive()
+
res = 0
# Constant-time comparison
for x,y in zip(u, mac_tag):
diff --git a/lib/Crypto/Protocol/KDF.py b/lib/Crypto/Protocol/KDF.py
index b13562a..0a92c15 100644
--- a/lib/Crypto/Protocol/KDF.py
+++ b/lib/Crypto/Protocol/KDF.py
@@ -38,16 +38,21 @@ __revision__ = "$Id$"
import math
import struct
+import sys
+if sys.version_info[0] == 2 and sys.version_info[1] == 1:
+ from Crypto.Util.py21compat import *
from Crypto.Util.py3compat import *
-from Crypto.Hash import SHA1, HMAC
+
+from Crypto.Hash import SHA1, HMAC, CMAC
from Crypto.Util.strxor import strxor
+from Crypto.Util.number import long_to_bytes, bytes_to_long
def PBKDF1(password, salt, dkLen, count=1000, hashAlgo=None):
"""Derive one key from a password (or passphrase).
This function performs key derivation according an old version of
the PKCS#5 standard (v1.5).
-
+
This algorithm is called ``PBKDF1``. Even though it is still described
in the latest version of the PKCS#5 standard (version 2, or RFC2898),
newer applications should use the more secure and versatile `PBKDF2` instead.
@@ -121,3 +126,84 @@ def PBKDF2(password, salt, dkLen=16, count=1000, prf=None):
i = i + 1
return key[:dkLen]
+class S2V(object):
+ """String-to-vector PRF as defined in `RFC5297`_.
+
+ This class implements a pseudorandom function family
+ based on CMAC that takes as input a vector of strings.
+
+ .. _RFC5297: http://tools.ietf.org/html/rfc5297
+ """
+
+ def __init__(self, key, ciphermod):
+ """Initialize the S2V PRF.
+
+ :Parameters:
+ key : byte string
+ A secret that can be used as key for CMACs
+ based on ciphers from ``ciphermod``.
+ ciphermod : module
+ A block cipher module from `Crypto.Cipher`.
+ """
+
+ self._key = key
+ self._ciphermod = ciphermod
+ self._last_string = self._cache = bchr(0)*ciphermod.block_size
+ self._n_updates = ciphermod.block_size*8-1
+
+ def new(key, ciphermod):
+ """Create a new S2V PRF.
+
+ :Parameters:
+ key : byte string
+ A secret that can be used as key for CMACs
+ based on ciphers from ``ciphermod``.
+ ciphermod : module
+ A block cipher module from `Crypto.Cipher`.
+ """
+ return S2V(key, ciphermod)
+ new = staticmethod(new)
+
+ def _double(self, bs):
+ doubled = bytes_to_long(bs)<<1
+ if bord(bs[0]) & 0x80:
+ doubled ^= 0x87
+ return long_to_bytes(doubled, len(bs))[-len(bs):]
+
+ def update(self, item):
+ """Pass the next component of the vector.
+
+ The maximum number of components you can pass is equal to the block
+ length of the cipher (in bits) minus 1.
+
+ :Parameters:
+ item : byte string
+ The next component of the vector.
+ :Raise TypeError: when the limit on the number of components has been reached.
+ :Raise ValueError: when the component is empty
+ """
+
+ if not item:
+ raise ValueError("A component cannot be empty")
+
+ if self._n_updates==0:
+ raise TypeError("Too many components passed to S2V")
+ self._n_updates -= 1
+
+ mac = CMAC.new(self._key, msg=self._last_string, ciphermod=self._ciphermod)
+ self._cache = strxor(self._double(self._cache), mac.digest())
+ self._last_string = item
+
+ def derive(self):
+ """"Derive a secret from the vector of components.
+
+ :Return: a byte string, as long as the block length of the cipher.
+ """
+
+ if len(self._last_string)>=16:
+ final = self._last_string[:-16] + strxor(self._last_string[-16:], self._cache)
+ else:
+ padded = (self._last_string + bchr(0x80)+ bchr(0)*15)[:16]
+ final = strxor(padded, self._double(self._cache))
+ mac = CMAC.new(self._key, msg=final, ciphermod=self._ciphermod)
+ return mac.digest()
diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py
index 603ab54..ed612b4 100644
--- a/lib/Crypto/SelfTest/Cipher/common.py
+++ b/lib/Crypto/SelfTest/Cipher/common.py
@@ -29,8 +29,12 @@ from __future__ import nested_scopes
__revision__ = "$Id$"
import sys
+if sys.version_info[0] == 2 and sys.version_info[1] == 1:
+ from Crypto.Util.py21compat import *
+
import unittest
from binascii import a2b_hex, b2a_hex, hexlify
+
from Crypto.Util.py3compat import *
from Crypto.Util.strxor import strxor_c
@@ -70,8 +74,6 @@ class CipherSelfTest(unittest.TestCase):
self.ciphertext = b(_extract(params, 'ciphertext'))
self.module_name = _extract(params, 'module_name', None)
self.assoc_data = _extract(params, 'assoc_data', None)
- if self.assoc_data:
- self.assoc_data = b(self.assoc_data)
self.mac = _extract(params, 'mac', None)
if self.assoc_data:
self.mac = b(self.mac)
@@ -130,12 +132,17 @@ class CipherSelfTest(unittest.TestCase):
else:
return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.iv), **params)
+ def isMode(self, name):
+ if not hasattr(self.module, "MODE_"+name):
+ return False
+ return self.mode == getattr(self.module, "MODE_"+name)
+
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
- assoc_data = None
+ assoc_data = []
if self.assoc_data:
- assoc_data = a2b_hex(self.assoc_data)
+ assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data]
ct = None
pt = None
@@ -149,19 +156,22 @@ class CipherSelfTest(unittest.TestCase):
decipher = self._new(1)
# Only AEAD modes
- if self.assoc_data:
- cipher.update(assoc_data)
- decipher.update(assoc_data)
+ for comp in assoc_data:
+ cipher.update(comp)
+ decipher.update(comp)
ctX = b2a_hex(cipher.encrypt(plaintext))
- ptX = b2a_hex(decipher.decrypt(ciphertext))
+ if self.isMode("SIV"):
+ ptX = b2a_hex(decipher.decrypt(ciphertext+a2b_hex(self.mac)))
+ else:
+ ptX = b2a_hex(decipher.decrypt(ciphertext))
if ct:
self.assertEqual(ct, ctX)
self.assertEqual(pt, ptX)
ct, pt = ctX, ptX
- if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
+ if self.isMode("OPENPGP"):
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
@@ -387,10 +397,18 @@ class AEADTests(unittest.TestCase):
self.module = module
self.mode_name = mode_name
self.mode = getattr(module, mode_name)
- self.key = b('\xFF')*key_size
+ if not self.isMode("SIV"):
+ self.key = b('\xFF')*key_size
+ else:
+ self.key = b('\xFF')*key_size*2
self.iv = b('\x00')*10
self.description = "AEAD Test"
+ def isMode(self, name):
+ if not hasattr(self.module, "MODE_"+name):
+ return False
+ return self.mode == getattr(self.module, "MODE_"+name)
+
def right_mac_test(self):
"""Positive tests for MAC"""
@@ -409,7 +427,10 @@ class AEADTests(unittest.TestCase):
# Decrypt and verify that MAC is accepted
decipher = self.module.new(self.key, self.mode, self.iv)
decipher.update(ad_ref)
- pt = decipher.decrypt(ct_ref)
+ if not self.isMode("SIV"):
+ pt = decipher.decrypt(ct_ref)
+ else:
+ pt = decipher.decrypt(ct_ref+mac_ref)
decipher.verify(mac_ref)
self.assertEqual(pt, pt_ref)
@@ -452,6 +473,15 @@ class AEADTests(unittest.TestCase):
self.description = "Test for multiple updates in %s of %s" % \
(self.mode_name, self.module.__name__)
+ # In all modes other than SIV, the associated data is a single
+ # component that can be arbitrarilly split and submitted to update().
+ #
+ # In SIV, associated data is instead organized in a vector or multiple
+ # components. Each component is passed to update() as a whole.
+ # This test is therefore not meaningful to SIV.
+ if self.isMode("SIV"):
+ return
+
ad = b("").join([bchr(x) for x in xrange(0,128)])
mac1, mac2, mac3 = (None,)*3
@@ -489,23 +519,23 @@ class AEADTests(unittest.TestCase):
# Calling decrypt after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
- cipher.encrypt(b("PT"))
- self.assertRaises(TypeError, cipher.decrypt, b("XYZ"))
+ cipher.encrypt(b("PT")*40)
+ self.assertRaises(TypeError, cipher.decrypt, b("XYZ")*40)
# Calling encrypt after decrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
- cipher.decrypt(b("CT"))
- self.assertRaises(TypeError, cipher.encrypt, b("XYZ"))
+ cipher.decrypt(b("CT")*40)
+ self.assertRaises(TypeError, cipher.encrypt, b("XYZ")*40)
# Calling verify after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
- cipher.encrypt(b("PT"))
+ cipher.encrypt(b("PT")*40)
self.assertRaises(TypeError, cipher.verify, b("XYZ"))
self.assertRaises(TypeError, cipher.hexverify, "12")
# Calling digest after decrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
- cipher.decrypt(b("CT"))
+ cipher.decrypt(b("CT")*40)
self.assertRaises(TypeError, cipher.digest)
self.assertRaises(TypeError, cipher.hexdigest)
@@ -518,13 +548,13 @@ class AEADTests(unittest.TestCase):
# Calling update after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(b("XX"))
- cipher.encrypt(b("PT"))
+ cipher.encrypt(b("PT")*40)
self.assertRaises(TypeError, cipher.update, b("XYZ"))
# Calling update after decrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(b("XX"))
- cipher.decrypt(b("CT"))
+ cipher.decrypt(b("CT")*40)
self.assertRaises(TypeError, cipher.update, b("XYZ"))
def runTest(self):
@@ -658,10 +688,10 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
extra_tests_added = 1
# Extract associated data and MAC for AEAD modes
- if p_mode in ('CCM', 'EAX'):
+ if p_mode in ('CCM', 'EAX', 'SIV'):
assoc_data, params['plaintext'] = params['plaintext'].split('|')
assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|')
- params['assoc_data'] = assoc_data
+ params['assoc_data'] = assoc_data.split("-")
params['mac_len'] = len(params['mac'])>>1
# Add the current test to the test suite
@@ -687,7 +717,7 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
CCMMACLengthTest(module),
CCMSplitEncryptionTest(module),
]
- for aead_mode in ("MODE_CCM","MODE_EAX"):
+ for aead_mode in ("MODE_CCM","MODE_EAX", "MODE_SIV"):
if hasattr(module, aead_mode):
key_sizes = []
try:
diff --git a/lib/Crypto/SelfTest/Cipher/test_AES.py b/lib/Crypto/SelfTest/Cipher/test_AES.py
index 53d60c0..89243c0 100644
--- a/lib/Crypto/SelfTest/Cipher/test_AES.py
+++ b/lib/Crypto/SelfTest/Cipher/test_AES.py
@@ -1759,6 +1759,42 @@ test_data = [
'EAX spec Appendix G',
dict(mode='EAX', nonce='22E7ADD93CFC6393C57EC0B3C17D6B44')
),
+
+ # Test vectors for SIV taken from RFC5297
+ # This is a list of tuples with 5 items:
+ #
+ # 1. Header + '|' + plaintext
+ # 2. Header + '|' + ciphertext + '|' + MAC
+ # 3. AES-128 key
+ # 4. Description
+ # 5. Dictionary of parameters to be passed to AES.new().
+ # It must include the nonce.
+ #
+ # A "Header" is a dash ('-') separated sequece of components.
+ #
+ ( '101112131415161718191a1b1c1d1e1f2021222324252627|112233445566778899aabbccddee',
+ '101112131415161718191a1b1c1d1e1f2021222324252627|40c02b9690c4dc04daef7f6afe5c|' +
+ '85632d07c6e8f37f950acd320a2ecc93',
+ 'fffefdfcfbfaf9f8f7f6f5f4f3f2f1f0f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff',
+ 'RFC5297 A.1',
+ dict(mode='SIV', nonce=None)
+ ),
+
+ ( '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddccbbaa9988' +
+ '7766554433221100-102030405060708090a0|' +
+ '7468697320697320736f6d6520706c61696e7465787420746f20656e63727970' +
+ '74207573696e67205349562d414553',
+
+ '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddccbbaa9988' +
+ '7766554433221100-102030405060708090a0|' +
+ 'cb900f2fddbe404326601965c889bf17dba77ceb094fa663b7a3f748ba8af829' +
+ 'ea64ad544a272e9c485b62a3fd5c0d|' +
+ '7bdb6e3b432667eb06f4d14bff2fbd0f',
+
+ '7f7e7d7c7b7a79787776757473727170404142434445464748494a4b4c4d4e4f',
+ 'RFC5297 A.2',
+ dict(mode='SIV', nonce='09f911029d74e35bd84156c5635688c0')
+ ),
]
def get_tests(config={}):
diff --git a/lib/Crypto/SelfTest/Protocol/test_KDF.py b/lib/Crypto/SelfTest/Protocol/test_KDF.py
index 75bb8f2..37cfb55 100644
--- a/lib/Crypto/SelfTest/Protocol/test_KDF.py
+++ b/lib/Crypto/SelfTest/Protocol/test_KDF.py
@@ -29,8 +29,9 @@ from Crypto.Util.py3compat import *
from Crypto.SelfTest.st_common import list_test_cases
from Crypto.Hash import SHA1, HMAC
+from Crypto.Cipher import AES, DES3
-from Crypto.Protocol.KDF import PBKDF1, PBKDF2
+from Crypto.Protocol.KDF import PBKDF1, PBKDF2, S2V
def t2b(t): return unhexlify(b(t))
@@ -87,10 +88,67 @@ class PBKDF2_Tests(unittest.TestCase):
self.assertEqual(res, t2b(v[4]))
self.assertEqual(res, res2)
+class S2V_Tests(unittest.TestCase):
+
+ # Sequence of test vectors.
+ # Each test vector is made up by:
+ # Item #0: a tuple of strings
+ # Item #1: an AES key
+ # Item #2: the result
+ # Item #3: the cipher module S2V is based on
+ # Everything is hex encoded
+ _testData = [
+
+ # RFC5297, A.1
+ (
+ ( '101112131415161718191a1b1c1d1e1f2021222324252627',
+ '112233445566778899aabbccddee' ),
+ 'fffefdfcfbfaf9f8f7f6f5f4f3f2f1f0',
+ '85632d07c6e8f37f950acd320a2ecc93',
+ AES
+ ),
+
+ # RFC5297, A.2
+ (
+ ( '00112233445566778899aabbccddeeffdeaddadadeaddadaffeeddcc'+
+ 'bbaa99887766554433221100',
+ '102030405060708090a0',
+ '09f911029d74e35bd84156c5635688c0',
+ '7468697320697320736f6d6520706c61'+
+ '696e7465787420746f20656e63727970'+
+ '74207573696e67205349562d414553'),
+ '7f7e7d7c7b7a79787776757473727170',
+ '7bdb6e3b432667eb06f4d14bff2fbd0f',
+ AES
+ ),
+
+ ]
+
+ def test1(self):
+ """Verify correctness of test vector"""
+ for tv in self._testData:
+ s2v = S2V.new(t2b(tv[1]), tv[3])
+ for s in tv[0]:
+ s2v.update(t2b(s))
+ result = s2v.derive()
+ self.assertEqual(result, t2b(tv[2]))
+
+ def test2(self):
+ """Verify that no more than 127(AES) and 63(TDES)
+ components are accepted."""
+ key = bchr(0)*16
+ for module in (AES, DES3):
+ s2v = S2V.new(key, module)
+ max_comps = module.block_size*8-1
+ for i in xrange(max_comps):
+ s2v.update(b("XX"))
+ self.assertRaises(TypeError, s2v.update, b("YY"))
+
def get_tests(config={}):
tests = []
tests += list_test_cases(PBKDF1_Tests)
tests += list_test_cases(PBKDF2_Tests)
+ tests += list_test_cases(S2V_Tests)
return tests
if __name__ == '__main__':