summaryrefslogtreecommitdiff
path: root/lib/Crypto/SelfTest
diff options
context:
space:
mode:
authorDwayne Litzenberger <dlitz@dlitz.net>2013-10-20 13:28:46 -0700
committerDwayne Litzenberger <dlitz@dlitz.net>2013-10-20 13:28:46 -0700
commitd044a478332682c253c379db87d444b056e4ab37 (patch)
treea72a64c0e89c926a23cd8ffb8400b84189ff12d5 /lib/Crypto/SelfTest
parentf9a0fc77e1c8847c1a17503e5a1b86a409b8cb2d (diff)
parent7fd528d03b5eae58eef6fd219af5d9ac9c83fa50 (diff)
downloadpycrypto-d044a478332682c253c379db87d444b056e4ab37.tar.gz
Merge tag 'v2.6.1' (fix CVE-2013-1445)
This is the PyCrypto 2.6.1 release. Dwayne Litzenberger (4): Random: Make Crypto.Random.atfork() set last_reseed=None (CVE-2013-1445) Fortuna: Add comments for reseed_interval and min_pool_size to FortunaAccumulator Update the ChangeLog Release v2.6.1
Diffstat (limited to 'lib/Crypto/SelfTest')
-rw-r--r--lib/Crypto/SelfTest/Random/__init__.py1
-rw-r--r--lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py171
2 files changed, 172 insertions, 0 deletions
diff --git a/lib/Crypto/SelfTest/Random/__init__.py b/lib/Crypto/SelfTest/Random/__init__.py
index 48d84ff..f972bf0 100644
--- a/lib/Crypto/SelfTest/Random/__init__.py
+++ b/lib/Crypto/SelfTest/Random/__init__.py
@@ -32,6 +32,7 @@ def get_tests(config={}):
from Crypto.SelfTest.Random import OSRNG; tests += OSRNG.get_tests(config=config)
from Crypto.SelfTest.Random import test_random; tests += test_random.get_tests(config=config)
from Crypto.SelfTest.Random import test_rpoolcompat; tests += test_rpoolcompat.get_tests(config=config)
+ from Crypto.SelfTest.Random import test__UserFriendlyRNG; tests += test__UserFriendlyRNG.get_tests(config=config)
return tests
if __name__ == '__main__':
diff --git a/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py b/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
new file mode 100644
index 0000000..771a663
--- /dev/null
+++ b/lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py
@@ -0,0 +1,171 @@
+# -*- coding: utf-8 -*-
+# Self-tests for the user-friendly Crypto.Random interface
+#
+# Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
+#
+# ===================================================================
+# The contents of this file are dedicated to the public domain. To
+# the extent that dedication to the public domain is not available,
+# everyone is granted a worldwide, perpetual, royalty-free,
+# non-exclusive license to exercise all rights associated with the
+# contents of this file for any purpose whatsoever.
+# No rights are reserved.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+# ===================================================================
+
+"""Self-test suite for generic Crypto.Random stuff """
+
+from __future__ import nested_scopes
+
+__revision__ = "$Id$"
+
+import binascii
+import pprint
+import unittest
+import os
+import time
+import sys
+if sys.version_info[0] == 2 and sys.version_info[1] == 1:
+ from Crypto.Util.py21compat import *
+from Crypto.Util.py3compat import *
+
+try:
+ import multiprocessing
+except ImportError:
+ multiprocessing = None
+
+import Crypto.Random._UserFriendlyRNG
+import Crypto.Random.random
+
+class RNGForkTest(unittest.TestCase):
+
+ def _get_reseed_count(self):
+ """
+ Get `FortunaAccumulator.reseed_count`, the global count of the
+ number of times that the PRNG has been reseeded.
+ """
+ rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
+ rng_singleton._lock.acquire()
+ try:
+ return rng_singleton._fa.reseed_count
+ finally:
+ rng_singleton._lock.release()
+
+ def runTest(self):
+ # Regression test for CVE-2013-1445. We had a bug where, under the
+ # right conditions, two processes might see the same random sequence.
+
+ if sys.platform.startswith('win'): # windows can't fork
+ assert not hasattr(os, 'fork') # ... right?
+ return
+
+ # Wait 150 ms so that we don't trigger the rate-limit prematurely.
+ time.sleep(0.15)
+
+ reseed_count_before = self._get_reseed_count()
+
+ # One or both of these calls together should trigger a reseed right here.
+ Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+ Crypto.Random.get_random_bytes(1)
+
+ reseed_count_after = self._get_reseed_count()
+ self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity check: test should reseed parent before forking
+
+ rfiles = []
+ for i in range(10):
+ rfd, wfd = os.pipe()
+ if os.fork() == 0:
+ # child
+ os.close(rfd)
+ f = os.fdopen(wfd, "wb")
+
+ Crypto.Random.atfork()
+
+ data = Crypto.Random.get_random_bytes(16)
+
+ f.write(data)
+ f.close()
+ os._exit(0)
+ # parent
+ os.close(wfd)
+ rfiles.append(os.fdopen(rfd, "rb"))
+
+ results = []
+ results_dict = {}
+ for f in rfiles:
+ data = binascii.hexlify(f.read())
+ results.append(data)
+ results_dict[data] = 1
+ f.close()
+
+ if len(results) != len(results_dict.keys()):
+ raise AssertionError("RNG output duplicated across fork():\n%s" %
+ (pprint.pformat(results)))
+
+
+# For RNGMultiprocessingForkTest
+def _task_main(q):
+ a = Crypto.Random.get_random_bytes(16)
+ time.sleep(0.1) # wait 100 ms
+ b = Crypto.Random.get_random_bytes(16)
+ q.put(binascii.b2a_hex(a))
+ q.put(binascii.b2a_hex(b))
+ q.put(None) # Wait for acknowledgment
+
+
+class RNGMultiprocessingForkTest(unittest.TestCase):
+
+ def runTest(self):
+ # Another regression test for CVE-2013-1445. This is basically the
+ # same as RNGForkTest, but less compatible with old versions of Python,
+ # and a little easier to read.
+
+ n_procs = 5
+ manager = multiprocessing.Manager()
+ queues = [manager.Queue(1) for i in range(n_procs)]
+
+ # Reseed the pool
+ time.sleep(0.15)
+ Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
+ Crypto.Random.get_random_bytes(1)
+
+ # Start the child processes
+ pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
+ map_result = pool.map_async(_task_main, queues)
+
+ # Get the results, ensuring that no pool processes are reused.
+ aa = [queues[i].get(30) for i in range(n_procs)]
+ bb = [queues[i].get(30) for i in range(n_procs)]
+ res = list(zip(aa, bb))
+
+ # Shut down the pool
+ map_result.get(30)
+ pool.close()
+ pool.join()
+
+ # Check that the results are unique
+ if len(set(aa)) != len(aa) or len(set(res)) != len(res):
+ raise AssertionError("RNG output duplicated across fork():\n%s" %
+ (pprint.pformat(res),))
+
+
+def get_tests(config={}):
+ tests = []
+ tests += [RNGForkTest()]
+ if multiprocessing is not None:
+ tests += [RNGMultiprocessingForkTest()]
+ return tests
+
+if __name__ == '__main__':
+ suite = lambda: unittest.TestSuite(get_tests())
+ unittest.main(defaultTest='suite')
+
+# vim:set ts=4 sw=4 sts=4 expandtab: