diff options
Diffstat (limited to 'lib/sqlalchemy/testing/engines.py')
-rw-r--r-- | lib/sqlalchemy/testing/engines.py | 179 |
1 files changed, 100 insertions, 79 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index a4c1f3973..8b334fde2 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -7,6 +7,7 @@ from __future__ import absolute_import +import collections import re import warnings import weakref @@ -20,26 +21,29 @@ from .. import pool class ConnectionKiller(object): def __init__(self): self.proxy_refs = weakref.WeakKeyDictionary() - self.testing_engines = weakref.WeakKeyDictionary() - self.conns = set() + self.testing_engines = collections.defaultdict(set) + self.dbapi_connections = set() def add_pool(self, pool): - event.listen(pool, "connect", self.connect) - event.listen(pool, "checkout", self.checkout) - event.listen(pool, "invalidate", self.invalidate) - - def add_engine(self, engine): - self.add_pool(engine.pool) - self.testing_engines[engine] = True + event.listen(pool, "checkout", self._add_conn) + event.listen(pool, "checkin", self._remove_conn) + event.listen(pool, "close", self._remove_conn) + event.listen(pool, "close_detached", self._remove_conn) + # note we are keeping "invalidated" here, as those are still + # opened connections we would like to roll back + + def _add_conn(self, dbapi_con, con_record, con_proxy): + self.dbapi_connections.add(dbapi_con) + self.proxy_refs[con_proxy] = True - def connect(self, dbapi_conn, con_record): - self.conns.add((dbapi_conn, con_record)) + def _remove_conn(self, dbapi_conn, *arg): + self.dbapi_connections.discard(dbapi_conn) - def checkout(self, dbapi_con, con_record, con_proxy): - self.proxy_refs[con_proxy] = True + def add_engine(self, engine, scope): + self.add_pool(engine.pool) - def invalidate(self, dbapi_con, con_record, exception): - self.conns.discard((dbapi_con, con_record)) + assert scope in ("class", "global", "function", "fixture") + self.testing_engines[scope].add(engine) def _safe(self, fn): try: @@ -54,53 +58,76 @@ class ConnectionKiller(object): if rec is not None and rec.is_valid: self._safe(rec.rollback) - def close_all(self): + def checkin_all(self): + # run pool.checkin() for all ConnectionFairy instances we have + # tracked. + for rec in list(self.proxy_refs): if rec is not None and rec.is_valid: - self._safe(rec._close) - - def _after_test_ctx(self): - # this can cause a deadlock with pg8000 - pg8000 acquires - # prepared statement lock inside of rollback() - if async gc - # is collecting in finalize_fairy, deadlock. - # not sure if this should be for non-cpython only. - # note that firebird/fdb definitely needs this though - for conn, rec in list(self.conns): - if rec.connection is None: - # this is a hint that the connection is closed, which - # is causing segfaults on mysqlclient due to - # https://github.com/PyMySQL/mysqlclient-python/issues/270; - # try to work around here - continue - self._safe(conn.rollback) - - def _stop_test_ctx(self): - if config.options.low_connections: - self._stop_test_ctx_minimal() - else: - self._stop_test_ctx_aggressive() + self.dbapi_connections.discard(rec.connection) + self._safe(rec._checkin) - def _stop_test_ctx_minimal(self): - self.close_all() + # for fairy refs that were GCed and could not close the connection, + # such as asyncio, roll back those remaining connections + for con in self.dbapi_connections: + self._safe(con.rollback) + self.dbapi_connections.clear() - self.conns = set() + def close_all(self): + self.checkin_all() - for rec in list(self.testing_engines): - if rec is not config.db: - rec.dispose() + def prepare_for_drop_tables(self, connection): + # don't do aggressive checks for third party test suites + if not config.bootstrapped_as_sqlalchemy: + return - def _stop_test_ctx_aggressive(self): - self.close_all() - for conn, rec in list(self.conns): - self._safe(conn.close) - rec.connection = None + from . import provision + + provision.prepare_for_drop_tables(connection.engine.url, connection) + + def _drop_testing_engines(self, scope): + eng = self.testing_engines[scope] + for rec in list(eng): + for proxy_ref in list(self.proxy_refs): + if proxy_ref is not None and proxy_ref.is_valid: + if ( + proxy_ref._pool is not None + and proxy_ref._pool is rec.pool + ): + self._safe(proxy_ref._checkin) + rec.dispose() + eng.clear() + + def after_test(self): + self._drop_testing_engines("function") + + def after_test_outside_fixtures(self, test): + # don't do aggressive checks for third party test suites + if not config.bootstrapped_as_sqlalchemy: + return + + if test.__class__.__leave_connections_for_teardown__: + return - self.conns = set() - for rec in list(self.testing_engines): - if hasattr(rec, "sync_engine"): - rec.sync_engine.dispose() - else: - rec.dispose() + self.checkin_all() + + # on PostgreSQL, this will test for any "idle in transaction" + # connections. useful to identify tests with unusual patterns + # that can't be cleaned up correctly. + from . import provision + + with config.db.connect() as conn: + provision.prepare_for_drop_tables(conn.engine.url, conn) + + def stop_test_class_inside_fixtures(self): + self.checkin_all() + self._drop_testing_engines("function") + self._drop_testing_engines("class") + + def final_cleanup(self): + self.checkin_all() + for scope in self.testing_engines: + self._drop_testing_engines(scope) def assert_all_closed(self): for rec in self.proxy_refs: @@ -111,20 +138,6 @@ class ConnectionKiller(object): testing_reaper = ConnectionKiller() -def drop_all_tables(metadata, bind): - testing_reaper.close_all() - if hasattr(bind, "close"): - bind.close() - - if not config.db.dialect.supports_alter: - from . import assertions - - with assertions.expect_warnings("Can't sort tables", assert_=False): - metadata.drop_all(bind) - else: - metadata.drop_all(bind) - - @decorator def assert_conns_closed(fn, *args, **kw): try: @@ -147,7 +160,7 @@ def rollback_open_connections(fn, *args, **kw): def close_first(fn, *args, **kw): """Decorator that closes all connections before fn execution.""" - testing_reaper.close_all() + testing_reaper.checkin_all() fn(*args, **kw) @@ -157,7 +170,7 @@ def close_open_connections(fn, *args, **kw): try: fn(*args, **kw) finally: - testing_reaper.close_all() + testing_reaper.checkin_all() def all_dialects(exclude=None): @@ -239,12 +252,14 @@ def reconnecting_engine(url=None, options=None): return engine -def testing_engine(url=None, options=None, future=False, asyncio=False): +def testing_engine(url=None, options=None, future=None, asyncio=False): """Produce an engine configured by --options with optional overrides.""" if asyncio: from sqlalchemy.ext.asyncio import create_async_engine as create_engine - elif future or config.db and config.db._is_future: + elif future or ( + config.db and config.db._is_future and future is not False + ): from sqlalchemy.future import create_engine else: from sqlalchemy import create_engine @@ -252,8 +267,10 @@ def testing_engine(url=None, options=None, future=False, asyncio=False): if not options: use_reaper = True + scope = "function" else: use_reaper = options.pop("use_reaper", True) + scope = options.pop("scope", "function") url = url or config.db.url @@ -268,16 +285,20 @@ def testing_engine(url=None, options=None, future=False, asyncio=False): default_opt.update(options) engine = create_engine(url, **options) - if asyncio: - engine.sync_engine._has_events = True - else: - engine._has_events = True # enable event blocks, helps with profiling + + if scope == "global": + if asyncio: + engine.sync_engine._has_events = True + else: + engine._has_events = ( + True # enable event blocks, helps with profiling + ) if isinstance(engine.pool, pool.QueuePool): engine.pool._timeout = 0 - engine.pool._max_overflow = 5 + engine.pool._max_overflow = 0 if use_reaper: - testing_reaper.add_engine(engine) + testing_reaper.add_engine(engine, scope) return engine |