summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/engines.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing/engines.py')
-rw-r--r--lib/sqlalchemy/testing/engines.py179
1 files changed, 100 insertions, 79 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index a4c1f3973..8b334fde2 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -7,6 +7,7 @@
from __future__ import absolute_import
+import collections
import re
import warnings
import weakref
@@ -20,26 +21,29 @@ from .. import pool
class ConnectionKiller(object):
def __init__(self):
self.proxy_refs = weakref.WeakKeyDictionary()
- self.testing_engines = weakref.WeakKeyDictionary()
- self.conns = set()
+ self.testing_engines = collections.defaultdict(set)
+ self.dbapi_connections = set()
def add_pool(self, pool):
- event.listen(pool, "connect", self.connect)
- event.listen(pool, "checkout", self.checkout)
- event.listen(pool, "invalidate", self.invalidate)
-
- def add_engine(self, engine):
- self.add_pool(engine.pool)
- self.testing_engines[engine] = True
+ event.listen(pool, "checkout", self._add_conn)
+ event.listen(pool, "checkin", self._remove_conn)
+ event.listen(pool, "close", self._remove_conn)
+ event.listen(pool, "close_detached", self._remove_conn)
+ # note we are keeping "invalidated" here, as those are still
+ # opened connections we would like to roll back
+
+ def _add_conn(self, dbapi_con, con_record, con_proxy):
+ self.dbapi_connections.add(dbapi_con)
+ self.proxy_refs[con_proxy] = True
- def connect(self, dbapi_conn, con_record):
- self.conns.add((dbapi_conn, con_record))
+ def _remove_conn(self, dbapi_conn, *arg):
+ self.dbapi_connections.discard(dbapi_conn)
- def checkout(self, dbapi_con, con_record, con_proxy):
- self.proxy_refs[con_proxy] = True
+ def add_engine(self, engine, scope):
+ self.add_pool(engine.pool)
- def invalidate(self, dbapi_con, con_record, exception):
- self.conns.discard((dbapi_con, con_record))
+ assert scope in ("class", "global", "function", "fixture")
+ self.testing_engines[scope].add(engine)
def _safe(self, fn):
try:
@@ -54,53 +58,76 @@ class ConnectionKiller(object):
if rec is not None and rec.is_valid:
self._safe(rec.rollback)
- def close_all(self):
+ def checkin_all(self):
+ # run pool.checkin() for all ConnectionFairy instances we have
+ # tracked.
+
for rec in list(self.proxy_refs):
if rec is not None and rec.is_valid:
- self._safe(rec._close)
-
- def _after_test_ctx(self):
- # this can cause a deadlock with pg8000 - pg8000 acquires
- # prepared statement lock inside of rollback() - if async gc
- # is collecting in finalize_fairy, deadlock.
- # not sure if this should be for non-cpython only.
- # note that firebird/fdb definitely needs this though
- for conn, rec in list(self.conns):
- if rec.connection is None:
- # this is a hint that the connection is closed, which
- # is causing segfaults on mysqlclient due to
- # https://github.com/PyMySQL/mysqlclient-python/issues/270;
- # try to work around here
- continue
- self._safe(conn.rollback)
-
- def _stop_test_ctx(self):
- if config.options.low_connections:
- self._stop_test_ctx_minimal()
- else:
- self._stop_test_ctx_aggressive()
+ self.dbapi_connections.discard(rec.connection)
+ self._safe(rec._checkin)
- def _stop_test_ctx_minimal(self):
- self.close_all()
+ # for fairy refs that were GCed and could not close the connection,
+ # such as asyncio, roll back those remaining connections
+ for con in self.dbapi_connections:
+ self._safe(con.rollback)
+ self.dbapi_connections.clear()
- self.conns = set()
+ def close_all(self):
+ self.checkin_all()
- for rec in list(self.testing_engines):
- if rec is not config.db:
- rec.dispose()
+ def prepare_for_drop_tables(self, connection):
+ # don't do aggressive checks for third party test suites
+ if not config.bootstrapped_as_sqlalchemy:
+ return
- def _stop_test_ctx_aggressive(self):
- self.close_all()
- for conn, rec in list(self.conns):
- self._safe(conn.close)
- rec.connection = None
+ from . import provision
+
+ provision.prepare_for_drop_tables(connection.engine.url, connection)
+
+ def _drop_testing_engines(self, scope):
+ eng = self.testing_engines[scope]
+ for rec in list(eng):
+ for proxy_ref in list(self.proxy_refs):
+ if proxy_ref is not None and proxy_ref.is_valid:
+ if (
+ proxy_ref._pool is not None
+ and proxy_ref._pool is rec.pool
+ ):
+ self._safe(proxy_ref._checkin)
+ rec.dispose()
+ eng.clear()
+
+ def after_test(self):
+ self._drop_testing_engines("function")
+
+ def after_test_outside_fixtures(self, test):
+ # don't do aggressive checks for third party test suites
+ if not config.bootstrapped_as_sqlalchemy:
+ return
+
+ if test.__class__.__leave_connections_for_teardown__:
+ return
- self.conns = set()
- for rec in list(self.testing_engines):
- if hasattr(rec, "sync_engine"):
- rec.sync_engine.dispose()
- else:
- rec.dispose()
+ self.checkin_all()
+
+ # on PostgreSQL, this will test for any "idle in transaction"
+ # connections. useful to identify tests with unusual patterns
+ # that can't be cleaned up correctly.
+ from . import provision
+
+ with config.db.connect() as conn:
+ provision.prepare_for_drop_tables(conn.engine.url, conn)
+
+ def stop_test_class_inside_fixtures(self):
+ self.checkin_all()
+ self._drop_testing_engines("function")
+ self._drop_testing_engines("class")
+
+ def final_cleanup(self):
+ self.checkin_all()
+ for scope in self.testing_engines:
+ self._drop_testing_engines(scope)
def assert_all_closed(self):
for rec in self.proxy_refs:
@@ -111,20 +138,6 @@ class ConnectionKiller(object):
testing_reaper = ConnectionKiller()
-def drop_all_tables(metadata, bind):
- testing_reaper.close_all()
- if hasattr(bind, "close"):
- bind.close()
-
- if not config.db.dialect.supports_alter:
- from . import assertions
-
- with assertions.expect_warnings("Can't sort tables", assert_=False):
- metadata.drop_all(bind)
- else:
- metadata.drop_all(bind)
-
-
@decorator
def assert_conns_closed(fn, *args, **kw):
try:
@@ -147,7 +160,7 @@ def rollback_open_connections(fn, *args, **kw):
def close_first(fn, *args, **kw):
"""Decorator that closes all connections before fn execution."""
- testing_reaper.close_all()
+ testing_reaper.checkin_all()
fn(*args, **kw)
@@ -157,7 +170,7 @@ def close_open_connections(fn, *args, **kw):
try:
fn(*args, **kw)
finally:
- testing_reaper.close_all()
+ testing_reaper.checkin_all()
def all_dialects(exclude=None):
@@ -239,12 +252,14 @@ def reconnecting_engine(url=None, options=None):
return engine
-def testing_engine(url=None, options=None, future=False, asyncio=False):
+def testing_engine(url=None, options=None, future=None, asyncio=False):
"""Produce an engine configured by --options with optional overrides."""
if asyncio:
from sqlalchemy.ext.asyncio import create_async_engine as create_engine
- elif future or config.db and config.db._is_future:
+ elif future or (
+ config.db and config.db._is_future and future is not False
+ ):
from sqlalchemy.future import create_engine
else:
from sqlalchemy import create_engine
@@ -252,8 +267,10 @@ def testing_engine(url=None, options=None, future=False, asyncio=False):
if not options:
use_reaper = True
+ scope = "function"
else:
use_reaper = options.pop("use_reaper", True)
+ scope = options.pop("scope", "function")
url = url or config.db.url
@@ -268,16 +285,20 @@ def testing_engine(url=None, options=None, future=False, asyncio=False):
default_opt.update(options)
engine = create_engine(url, **options)
- if asyncio:
- engine.sync_engine._has_events = True
- else:
- engine._has_events = True # enable event blocks, helps with profiling
+
+ if scope == "global":
+ if asyncio:
+ engine.sync_engine._has_events = True
+ else:
+ engine._has_events = (
+ True # enable event blocks, helps with profiling
+ )
if isinstance(engine.pool, pool.QueuePool):
engine.pool._timeout = 0
- engine.pool._max_overflow = 5
+ engine.pool._max_overflow = 0
if use_reaper:
- testing_reaper.add_engine(engine)
+ testing_reaper.add_engine(engine, scope)
return engine