diff options
Diffstat (limited to 'lib/sqlalchemy/testing/assertions.py')
-rw-r--r-- | lib/sqlalchemy/testing/assertions.py | 105 |
1 files changed, 64 insertions, 41 deletions
diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 0cf0cbc7a..986dbb5e9 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -133,6 +133,11 @@ def uses_deprecated(*messages): return decorate +_FILTERS = None +_SEEN = None +_EXC_CLS = None + + @contextlib.contextmanager def _expect_warnings( exc_cls, @@ -143,58 +148,76 @@ def _expect_warnings( raise_on_any_unexpected=False, ): + global _FILTERS, _SEEN, _EXC_CLS + if regex: filters = [re.compile(msg, re.I | re.S) for msg in messages] else: - filters = messages - - seen = set(filters) + filters = list(messages) + + if _FILTERS is not None: + # nested call; update _FILTERS and _SEEN, return. outer + # block will assert our messages + assert _SEEN is not None + assert _EXC_CLS is not None + _FILTERS.extend(filters) + _SEEN.update(filters) + _EXC_CLS += (exc_cls,) + yield + else: + seen = _SEEN = set(filters) + _FILTERS = filters + _EXC_CLS = (exc_cls,) - if raise_on_any_unexpected: + if raise_on_any_unexpected: - def real_warn(msg, *arg, **kw): - raise AssertionError("Got unexpected warning: %r" % msg) + def real_warn(msg, *arg, **kw): + raise AssertionError("Got unexpected warning: %r" % msg) - else: - real_warn = warnings.warn - - def our_warn(msg, *arg, **kw): - if isinstance(msg, exc_cls): - exception = type(msg) - msg = str(msg) - elif arg: - exception = arg[0] else: - exception = None + real_warn = warnings.warn - if not exception or not issubclass(exception, exc_cls): - return real_warn(msg, *arg, **kw) + def our_warn(msg, *arg, **kw): - if not filters and not raise_on_any_unexpected: - return + if isinstance(msg, _EXC_CLS): + exception = type(msg) + msg = str(msg) + elif arg: + exception = arg[0] + else: + exception = None - for filter_ in filters: - if (regex and filter_.match(msg)) or ( - not regex and filter_ == msg - ): - seen.discard(filter_) - break - else: - real_warn(msg, *arg, **kw) - - with mock.patch("warnings.warn", our_warn), mock.patch( - "sqlalchemy.util.SQLALCHEMY_WARN_20", True - ), mock.patch( - "sqlalchemy.util.deprecations.SQLALCHEMY_WARN_20", True - ), mock.patch( - "sqlalchemy.engine.row.LegacyRow._default_key_style", 2 - ): - yield + if not exception or not issubclass(exception, _EXC_CLS): + return real_warn(msg, *arg, **kw) - if assert_ and (not py2konly or not compat.py3k): - assert not seen, "Warnings were not seen: %s" % ", ".join( - "%r" % (s.pattern if regex else s) for s in seen - ) + if not filters and not raise_on_any_unexpected: + return + + for filter_ in filters: + if (regex and filter_.match(msg)) or ( + not regex and filter_ == msg + ): + seen.discard(filter_) + break + else: + real_warn(msg, *arg, **kw) + + with mock.patch("warnings.warn", our_warn), mock.patch( + "sqlalchemy.util.SQLALCHEMY_WARN_20", True + ), mock.patch( + "sqlalchemy.util.deprecations.SQLALCHEMY_WARN_20", True + ), mock.patch( + "sqlalchemy.engine.row.LegacyRow._default_key_style", 2 + ): + try: + yield + finally: + _SEEN = _FILTERS = _EXC_CLS = None + + if assert_ and (not py2konly or not compat.py3k): + assert not seen, "Warnings were not seen: %s" % ", ".join( + "%r" % (s.pattern if regex else s) for s in seen + ) def global_cleanup_assertions(): |