diff options
Diffstat (limited to 'passlib/tests/utils.py')
-rw-r--r-- | passlib/tests/utils.py | 105 |
1 files changed, 66 insertions, 39 deletions
diff --git a/passlib/tests/utils.py b/passlib/tests/utils.py index 353bf39..21e954a 100644 --- a/passlib/tests/utils.py +++ b/passlib/tests/utils.py @@ -154,6 +154,12 @@ def get_alt_backend(*args, **kwds): return backend return None +def unwrap_handler(handler): + """return original handler, removing any wrapper objects""" + while hasattr(handler, "wrapped"): + handler = handler.wrapped + return handler + #============================================================================= # misc helpers #============================================================================= @@ -368,7 +374,7 @@ class TestCase(_TestCase): def __exit__(self, *exc_info): self.__super.__exit__(*exc_info) - if not exc_info or exc_info[0] is None: + if exc_info[0] is None: self.case.assertWarningList(self.log, **self.kwds) def assertWarningList(self, wlist=None, desc=None, msg=None): @@ -1278,7 +1284,15 @@ class HandlerCase(TestCase): # helpers #------------------------------------- handler = self.handler + + if handler.name == "bsdi_crypt": + # hack to bypass bsdi-crypt's "odd rounds only" behavior, messes up this test + orig_handler = handler + handler = handler.using() + handler._generate_rounds = lambda self: super(orig_handler, self)._generate_rounds() + def effective_rounds(cls, rounds=None): + cls = unwrap_handler(cls) return cls(rounds=rounds, use_defaults=True).rounds # create some fake values to test with @@ -1322,9 +1336,10 @@ class HandlerCase(TestCase): #------------------------------------- # .using() should clip values below valid minimum, w/ warning - with self.assertWarningList([PasslibHashWarning]): - temp = handler.using(min_desired_rounds=orig_min_rounds - 1) - self.assertEqual(temp.min_desired_rounds, orig_min_rounds) + if orig_min_rounds > 0: + with self.assertWarningList([PasslibHashWarning]): + temp = handler.using(min_desired_rounds=orig_min_rounds - 1) + self.assertEqual(temp.min_desired_rounds, orig_min_rounds) # .using() should clip values above valid maximum, w/ warning if orig_max_rounds: @@ -1360,9 +1375,10 @@ class HandlerCase(TestCase): #------------------------------------- # .using() should clip values below valid minimum w/ warning - with self.assertWarningList([PasslibHashWarning]): - temp = handler.using(max_desired_rounds=orig_min_rounds - 1) - self.assertEqual(temp.max_desired_rounds, orig_min_rounds) + if orig_min_rounds > 0: + with self.assertWarningList([PasslibHashWarning]): + temp = handler.using(max_desired_rounds=orig_min_rounds - 1) + self.assertEqual(temp.max_desired_rounds, orig_min_rounds) # .using() should clip values above valid maximum, w/ warning if orig_max_rounds: @@ -1431,6 +1447,8 @@ class HandlerCase(TestCase): # TODO: HasRounds -- using() -- linear & log vary_rounds. # borrow code from CryptContext's test_51_linear_vary_rounds & friends + # TODO: HasRounds.needs_update() -- min_desired_rounds / max_desired_rounds checks. + #=================================================================== # idents #=================================================================== @@ -1498,12 +1516,14 @@ class HandlerCase(TestCase): handler = self.handler orig_ident = handler.default_ident for alt_ident in handler.ident_values: - if alt_ident != handler.default_ident: + if alt_ident != orig_ident: break else: - raise AssertionError("expected to find alternate ident") + raise AssertionError("expected to find alternate ident: default=%r values=%r" % + (orig_ident, handler.ident_values)) def effective_ident(cls): + cls = unwrap_handler(cls) return cls(use_defaults=True).ident # keep default if nothing else specified @@ -1898,6 +1918,10 @@ class HandlerCase(TestCase): * runs output of selected backend against other available backends (if any) to detect errors occurring between different backends. * runs output against other "external" verifiers such as OS crypt() + + :param report_thread_state: + if true, writes state of loop to current_thread().passlib_fuzz_state. + used to help debug multi-threaded fuzz test issues (below) """ if self.is_disabled_handler: raise self.skipTest("not applicable") @@ -1993,14 +2017,23 @@ class HandlerCase(TestCase): threads = [launch(n) for n in irange(thread_count)] # wait until all threads exit - # XXX: should this have a maximum timeout set? + timeout = self.max_fuzz_time * 4 + stalled = 0 for thread in threads: - thread.join() + thread.join(timeout) + if not thread.is_alive(): + continue + # XXX: not sure why this is happening, main one seems 1/4 times for sun_md5_crypt + log.error("%s timed out after %f seconds", thread.name, timeout) + stalled += 1 # if any thread threw an error, raise one ourselves. if failed[0]: raise self.fail("%d/%d threads failed concurrent fuzz testing " "(see error log for details)" % (failed[0], thread_count)) + if stalled: + raise self.fail("%d/%d threads stalled during concurrent fuzz testing " + "(see error log for details)" % (stalled, thread_count)) #--------------------------------------------------------------- # fuzz constants & helpers @@ -2033,10 +2066,8 @@ class HandlerCase(TestCase): return value elif TEST_MODE(max="quick"): return 0 - elif TEST_MODE(max="default"): - return 10 else: - return 20 + return 10 #--------------------------------------------------------------- # fuzz verifiers @@ -2226,9 +2257,7 @@ class OsCryptMixin(HandlerCase): # find handler that generates safe_crypt() compatible hash handler = cls.alt_safe_crypt_handler if not handler: - handler = cls.handler - while hasattr(handler, "wrapped"): - handler = handler.wrapped + handler = unwrap_handler(cls.handler) # hack to prevent recursion issue when .has_backend() is called handler.get_backend() @@ -2536,6 +2565,7 @@ class EncodingHandlerMixin(HandlerCase): #============================================================================= class reset_warnings(warnings.catch_warnings): """catch_warnings() wrapper which clears warning registry & filters""" + def __init__(self, reset_filter="always", reset_registry=".*", **kwds): super(reset_warnings, self).__init__(**kwds) self._reset_filter = reset_filter @@ -2554,37 +2584,34 @@ class reset_warnings(warnings.catch_warnings): # that match the 'reset' pattern. pattern = self._reset_registry if pattern: - orig = self._orig_registry = {} - for name, mod in sys.modules.items(): - if pattern.match(name): - reg = getattr(mod, "__warningregistry__", None) - if reg: - orig[name] = reg.copy() - reg.clear() + backup = self._orig_registry = {} + for name, mod in list(sys.modules.items()): + if mod is None or not pattern.match(name): + continue + reg = getattr(mod, "__warningregistry__", None) + if reg: + backup[name] = reg.copy() + reg.clear() return ret def __exit__(self, *exc_info): # restore warning registry for all modules pattern = self._reset_registry if pattern: - # restore archived registry data - orig = self._orig_registry - for name, content in iteritems(orig): - mod = sys.modules.get(name) - if mod is None: + # restore registry backup, clearing all registry entries that we didn't archive + backup = self._orig_registry + for name, mod in list(sys.modules.items()): + if mod is None or not pattern.match(name): continue reg = getattr(mod, "__warningregistry__", None) - if reg is None: - setattr(mod, "__warningregistry__", content) - else: + if reg: reg.clear() - reg.update(content) - # clear all registry entries that we didn't archive - for name, mod in sys.modules.items(): - if pattern.match(name) and name not in orig: - reg = getattr(mod, "__warningregistry__", None) - if reg: - reg.clear() + orig = backup.get(name) + if orig: + if reg is None: + setattr(mod, "__warningregistry__", orig) + else: + reg.update(orig) super(reset_warnings, self).__exit__(*exc_info) #============================================================================= |