summaryrefslogtreecommitdiff
path: root/passlib/tests/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'passlib/tests/utils.py')
-rw-r--r--passlib/tests/utils.py105
1 files changed, 66 insertions, 39 deletions
diff --git a/passlib/tests/utils.py b/passlib/tests/utils.py
index 353bf39..21e954a 100644
--- a/passlib/tests/utils.py
+++ b/passlib/tests/utils.py
@@ -154,6 +154,12 @@ def get_alt_backend(*args, **kwds):
return backend
return None
+def unwrap_handler(handler):
+ """return original handler, removing any wrapper objects"""
+ while hasattr(handler, "wrapped"):
+ handler = handler.wrapped
+ return handler
+
#=============================================================================
# misc helpers
#=============================================================================
@@ -368,7 +374,7 @@ class TestCase(_TestCase):
def __exit__(self, *exc_info):
self.__super.__exit__(*exc_info)
- if not exc_info or exc_info[0] is None:
+ if exc_info[0] is None:
self.case.assertWarningList(self.log, **self.kwds)
def assertWarningList(self, wlist=None, desc=None, msg=None):
@@ -1278,7 +1284,15 @@ class HandlerCase(TestCase):
# helpers
#-------------------------------------
handler = self.handler
+
+ if handler.name == "bsdi_crypt":
+ # hack to bypass bsdi-crypt's "odd rounds only" behavior, messes up this test
+ orig_handler = handler
+ handler = handler.using()
+ handler._generate_rounds = lambda self: super(orig_handler, self)._generate_rounds()
+
def effective_rounds(cls, rounds=None):
+ cls = unwrap_handler(cls)
return cls(rounds=rounds, use_defaults=True).rounds
# create some fake values to test with
@@ -1322,9 +1336,10 @@ class HandlerCase(TestCase):
#-------------------------------------
# .using() should clip values below valid minimum, w/ warning
- with self.assertWarningList([PasslibHashWarning]):
- temp = handler.using(min_desired_rounds=orig_min_rounds - 1)
- self.assertEqual(temp.min_desired_rounds, orig_min_rounds)
+ if orig_min_rounds > 0:
+ with self.assertWarningList([PasslibHashWarning]):
+ temp = handler.using(min_desired_rounds=orig_min_rounds - 1)
+ self.assertEqual(temp.min_desired_rounds, orig_min_rounds)
# .using() should clip values above valid maximum, w/ warning
if orig_max_rounds:
@@ -1360,9 +1375,10 @@ class HandlerCase(TestCase):
#-------------------------------------
# .using() should clip values below valid minimum w/ warning
- with self.assertWarningList([PasslibHashWarning]):
- temp = handler.using(max_desired_rounds=orig_min_rounds - 1)
- self.assertEqual(temp.max_desired_rounds, orig_min_rounds)
+ if orig_min_rounds > 0:
+ with self.assertWarningList([PasslibHashWarning]):
+ temp = handler.using(max_desired_rounds=orig_min_rounds - 1)
+ self.assertEqual(temp.max_desired_rounds, orig_min_rounds)
# .using() should clip values above valid maximum, w/ warning
if orig_max_rounds:
@@ -1431,6 +1447,8 @@ class HandlerCase(TestCase):
# TODO: HasRounds -- using() -- linear & log vary_rounds.
# borrow code from CryptContext's test_51_linear_vary_rounds & friends
+ # TODO: HasRounds.needs_update() -- min_desired_rounds / max_desired_rounds checks.
+
#===================================================================
# idents
#===================================================================
@@ -1498,12 +1516,14 @@ class HandlerCase(TestCase):
handler = self.handler
orig_ident = handler.default_ident
for alt_ident in handler.ident_values:
- if alt_ident != handler.default_ident:
+ if alt_ident != orig_ident:
break
else:
- raise AssertionError("expected to find alternate ident")
+ raise AssertionError("expected to find alternate ident: default=%r values=%r" %
+ (orig_ident, handler.ident_values))
def effective_ident(cls):
+ cls = unwrap_handler(cls)
return cls(use_defaults=True).ident
# keep default if nothing else specified
@@ -1898,6 +1918,10 @@ class HandlerCase(TestCase):
* runs output of selected backend against other available backends
(if any) to detect errors occurring between different backends.
* runs output against other "external" verifiers such as OS crypt()
+
+ :param report_thread_state:
+ if true, writes state of loop to current_thread().passlib_fuzz_state.
+ used to help debug multi-threaded fuzz test issues (below)
"""
if self.is_disabled_handler:
raise self.skipTest("not applicable")
@@ -1993,14 +2017,23 @@ class HandlerCase(TestCase):
threads = [launch(n) for n in irange(thread_count)]
# wait until all threads exit
- # XXX: should this have a maximum timeout set?
+ timeout = self.max_fuzz_time * 4
+ stalled = 0
for thread in threads:
- thread.join()
+ thread.join(timeout)
+ if not thread.is_alive():
+ continue
+ # XXX: not sure why this is happening, main one seems 1/4 times for sun_md5_crypt
+ log.error("%s timed out after %f seconds", thread.name, timeout)
+ stalled += 1
# if any thread threw an error, raise one ourselves.
if failed[0]:
raise self.fail("%d/%d threads failed concurrent fuzz testing "
"(see error log for details)" % (failed[0], thread_count))
+ if stalled:
+ raise self.fail("%d/%d threads stalled during concurrent fuzz testing "
+ "(see error log for details)" % (stalled, thread_count))
#---------------------------------------------------------------
# fuzz constants & helpers
@@ -2033,10 +2066,8 @@ class HandlerCase(TestCase):
return value
elif TEST_MODE(max="quick"):
return 0
- elif TEST_MODE(max="default"):
- return 10
else:
- return 20
+ return 10
#---------------------------------------------------------------
# fuzz verifiers
@@ -2226,9 +2257,7 @@ class OsCryptMixin(HandlerCase):
# find handler that generates safe_crypt() compatible hash
handler = cls.alt_safe_crypt_handler
if not handler:
- handler = cls.handler
- while hasattr(handler, "wrapped"):
- handler = handler.wrapped
+ handler = unwrap_handler(cls.handler)
# hack to prevent recursion issue when .has_backend() is called
handler.get_backend()
@@ -2536,6 +2565,7 @@ class EncodingHandlerMixin(HandlerCase):
#=============================================================================
class reset_warnings(warnings.catch_warnings):
"""catch_warnings() wrapper which clears warning registry & filters"""
+
def __init__(self, reset_filter="always", reset_registry=".*", **kwds):
super(reset_warnings, self).__init__(**kwds)
self._reset_filter = reset_filter
@@ -2554,37 +2584,34 @@ class reset_warnings(warnings.catch_warnings):
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
- orig = self._orig_registry = {}
- for name, mod in sys.modules.items():
- if pattern.match(name):
- reg = getattr(mod, "__warningregistry__", None)
- if reg:
- orig[name] = reg.copy()
- reg.clear()
+ backup = self._orig_registry = {}
+ for name, mod in list(sys.modules.items()):
+ if mod is None or not pattern.match(name):
+ continue
+ reg = getattr(mod, "__warningregistry__", None)
+ if reg:
+ backup[name] = reg.copy()
+ reg.clear()
return ret
def __exit__(self, *exc_info):
# restore warning registry for all modules
pattern = self._reset_registry
if pattern:
- # restore archived registry data
- orig = self._orig_registry
- for name, content in iteritems(orig):
- mod = sys.modules.get(name)
- if mod is None:
+ # restore registry backup, clearing all registry entries that we didn't archive
+ backup = self._orig_registry
+ for name, mod in list(sys.modules.items()):
+ if mod is None or not pattern.match(name):
continue
reg = getattr(mod, "__warningregistry__", None)
- if reg is None:
- setattr(mod, "__warningregistry__", content)
- else:
+ if reg:
reg.clear()
- reg.update(content)
- # clear all registry entries that we didn't archive
- for name, mod in sys.modules.items():
- if pattern.match(name) and name not in orig:
- reg = getattr(mod, "__warningregistry__", None)
- if reg:
- reg.clear()
+ orig = backup.get(name)
+ if orig:
+ if reg is None:
+ setattr(mod, "__warningregistry__", orig)
+ else:
+ reg.update(orig)
super(reset_warnings, self).__exit__(*exc_info)
#=============================================================================