summaryrefslogtreecommitdiff
path: root/test/engine/test_deprecations.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_deprecations.py')
-rw-r--r--test/engine/test_deprecations.py922
1 files changed, 29 insertions, 893 deletions
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 8bae599a9..b9c09de16 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -1,52 +1,65 @@
import re
-import time
import sqlalchemy as tsa
from sqlalchemy import column
from sqlalchemy import create_engine
-from sqlalchemy import engine_from_config
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
-from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import pool
from sqlalchemy import select
-from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import TypeDecorator
-from sqlalchemy import VARCHAR
from sqlalchemy.engine.base import Engine
+from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.testing.mock import call
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
-from .test_parseconnect import mock_dbapi
-
-tlengine = None
class SomeException(Exception):
pass
-def _tlengine_deprecated():
- return testing.expect_deprecated(
- "The 'threadlocal' engine strategy is deprecated"
- )
+class CreateEngineTest(fixtures.TestBase):
+ def test_strategy_keyword_mock(self):
+ def executor(x, y):
+ pass
+
+ with testing.expect_deprecated(
+ "The create_engine.strategy keyword is deprecated, and the "
+ "only argument accepted is 'mock'"
+ ):
+ e = create_engine(
+ "postgresql://", strategy="mock", executor=executor
+ )
+
+ assert isinstance(e, MockConnection)
+
+ def test_strategy_keyword_unknown(self):
+ with testing.expect_deprecated(
+ "The create_engine.strategy keyword is deprecated, and the "
+ "only argument accepted is 'mock'"
+ ):
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "unknown strategy: 'threadlocal'",
+ create_engine,
+ "postgresql://",
+ strategy="threadlocal",
+ )
class TableNamesOrderByTest(fixtures.TestBase):
@@ -81,623 +94,6 @@ class TableNamesOrderByTest(fixtures.TestBase):
eq_(tnames, ["t1", "t2", "t3"])
-class CreateEngineTest(fixtures.TestBase):
- def test_pool_threadlocal_from_config(self):
- dbapi = mock_dbapi
-
- config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test",
- "sqlalchemy.pool_threadlocal": "false",
- }
-
- e = engine_from_config(config, module=dbapi, _initialize=False)
- eq_(e.pool._use_threadlocal, False)
-
- config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test",
- "sqlalchemy.pool_threadlocal": "true",
- }
-
- with testing.expect_deprecated(
- "The Pool.use_threadlocal parameter is deprecated"
- ):
- e = engine_from_config(config, module=dbapi, _initialize=False)
- eq_(e.pool._use_threadlocal, True)
-
-
-class RecycleTest(fixtures.TestBase):
- __backend__ = True
-
- def test_basic(self):
- with testing.expect_deprecated(
- "The Pool.use_threadlocal parameter is deprecated"
- ):
- engine = engines.reconnecting_engine(
- options={"pool_threadlocal": True}
- )
-
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- conn = engine.contextual_connect()
- eq_(conn.execute(select([1])).scalar(), 1)
- conn.close()
-
- # set the pool recycle down to 1.
- # we aren't doing this inline with the
- # engine create since cx_oracle takes way
- # too long to create the 1st connection and don't
- # want to build a huge delay into this test.
-
- engine.pool._recycle = 1
-
- # kill the DB connection
- engine.test_shutdown()
-
- # wait until past the recycle period
- time.sleep(2)
-
- # can connect, no exception
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- conn = engine.contextual_connect()
- eq_(conn.execute(select([1])).scalar(), 1)
- conn.close()
-
-
-class TLTransactionTest(fixtures.TestBase):
- __requires__ = ("ad_hoc_engines",)
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global users, metadata, tlengine
-
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
- metadata = MetaData()
- users = Table(
- "query_users",
- metadata,
- Column(
- "user_id",
- INT,
- Sequence("query_users_id_seq", optional=True),
- primary_key=True,
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- metadata.create_all(tlengine)
-
- def teardown(self):
- tlengine.execute(users.delete()).close()
-
- @classmethod
- def teardown_class(cls):
- tlengine.close()
- metadata.drop_all(tlengine)
- tlengine.dispose()
-
- def setup(self):
-
- # ensure tests start with engine closed
-
- tlengine.close()
-
- @testing.crashes(
- "oracle", "TNS error of unknown origin occurs on the buildbot."
- )
- def test_rollback_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.rollback()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.rollback()
-
- def test_commit_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.commit()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.commit()
-
- def test_prepare_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.prepare()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.prepare()
-
- def test_connection_close(self):
- """test that when connections are closed for real, transactions
- are rolled back and disposed."""
-
- c = tlengine.contextual_connect()
- c.begin()
- assert c.in_transaction()
- c.close()
- assert not c.in_transaction()
-
- def test_transaction_close(self):
- c = tlengine.contextual_connect()
- t = c.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- t2 = c.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- t2.close()
- result = c.execute("select * from query_users")
- assert len(result.fetchall()) == 4
- t.close()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- c.close()
- external_connection.close()
-
- def test_rollback(self):
- """test a basic rollback"""
-
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- external_connection.close()
-
- def test_commit(self):
- """test a basic commit"""
-
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 3
- finally:
- external_connection.close()
-
- def test_with_interface(self):
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- trans.commit()
-
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- trans.__exit__(Exception, "fake", None)
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- trans.__exit__(None, None, None)
- eq_(
- tlengine.execute(
- users.select().order_by(users.c.user_id)
- ).fetchall(),
- [(1, "user1"), (2, "user2"), (4, "user4")],
- )
-
- def test_commits(self):
- connection = tlengine.connect()
- assert (
- connection.execute("select count(*) from query_users").scalar()
- == 0
- )
- connection.close()
- connection = tlengine.contextual_connect()
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- transaction.commit()
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
- transaction.commit()
- transaction = connection.begin()
- result = connection.execute("select * from query_users")
- rows = result.fetchall()
- assert len(rows) == 3, "expected 3 got %d" % len(rows)
- transaction.commit()
- connection.close()
-
- def test_rollback_off_conn(self):
-
- # test that a TLTransaction opened off a TLConnection allows
- # that TLConnection to be aware of the transactional context
-
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- conn.close()
- external_connection.close()
-
- def test_morerollback_off_conn(self):
-
- # test that an existing TLConnection automatically takes place
- # in a TLTransaction opened on a second TLConnection
-
- conn = tlengine.contextual_connect()
- conn2 = tlengine.contextual_connect()
- trans = conn2.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- conn.close()
- conn2.close()
- external_connection.close()
-
- def test_commit_off_connection(self):
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.commit()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 3
- finally:
- conn.close()
- external_connection.close()
-
- def test_nesting_rollback(self):
- """tests nesting of transactions, rollback at the end"""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.commit()
- tlengine.rollback()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- def test_nesting_commit(self):
- """tests nesting of transactions, commit at the end."""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.commit()
- tlengine.commit()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 5
- )
- finally:
- external_connection.close()
-
- def test_mixed_nesting(self):
- """tests nesting of transactions off the TLEngine directly
- inside of transactions off the connection from the TLEngine"""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- trans2 = conn.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.execute(users.insert(), user_id=6, user_name="user6")
- tlengine.execute(users.insert(), user_id=7, user_name="user7")
- tlengine.commit()
- tlengine.execute(users.insert(), user_id=8, user_name="user8")
- tlengine.commit()
- trans2.commit()
- trans.rollback()
- conn.close()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- def test_more_mixed_nesting(self):
- """tests nesting of transactions off the connection from the
- TLEngine inside of transactions off the TLEngine directly."""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- connection = tlengine.contextual_connect()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
- trans = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
- trans.commit()
- tlengine.commit()
- tlengine.rollback()
- connection.close()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- @testing.requires.savepoints
- def test_nested_subtransaction_rollback(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.rollback()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (3,)],
- )
- tlengine.close()
-
- @testing.requires.savepoints
- @testing.crashes(
- "oracle+zxjdbc",
- "Errors out and causes subsequent tests to " "deadlock",
- )
- def test_nested_subtransaction_commit(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.commit()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,), (3,)],
- )
- tlengine.close()
-
- @testing.requires.savepoints
- def test_rollback_to_subtransaction(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- tlengine.rollback()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (4,)],
- )
- tlengine.close()
-
- def test_connections(self):
- """tests that contextual_connect is threadlocal"""
-
- c1 = tlengine.contextual_connect()
- c2 = tlengine.contextual_connect()
- assert c1.connection is c2.connection
- c2.close()
- assert not c1.closed
- assert not tlengine.closed
-
- @testing.requires.independent_cursors
- def test_result_closing(self):
- """tests that contextual_connect is threadlocal"""
-
- r1 = tlengine.execute(select([1]))
- r2 = tlengine.execute(select([1]))
- r1.fetchone()
- r2.fetchone()
- r1.close()
- assert r2.connection is r1.connection
- assert not r2.connection.closed
- assert not tlengine.closed
-
- # close again, nothing happens since resultproxy calls close()
- # only once
-
- r1.close()
- assert r2.connection is r1.connection
- assert not r2.connection.closed
- assert not tlengine.closed
- r2.close()
- assert r2.connection.closed
- assert tlengine.closed
-
- @testing.crashes(
- "oracle+cx_oracle", "intermittent failures on the buildbot"
- )
- def test_dispose(self):
- with _tlengine_deprecated():
- eng = testing_engine(options=dict(strategy="threadlocal"))
- eng.execute(select([1]))
- eng.dispose()
- eng.execute(select([1]))
-
- @testing.requires.two_phase_transactions
- def test_two_phase_transaction(self):
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.prepare()
- tlengine.commit()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.commit()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.prepare()
- tlengine.rollback()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,)],
- )
-
-
-class ConvenienceExecuteTest(fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- cls.table = Table(
- "exec_test",
- metadata,
- Column("a", Integer),
- Column("b", Integer),
- test_needs_acid=True,
- )
-
- def _trans_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
-
- return go
-
- def _trans_rollback_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
- raise SomeException("breakage")
-
- return go
-
- def _assert_no_data(self):
- eq_(
- testing.db.scalar(
- select([func.count("*")]).select_from(self.table)
- ),
- 0,
- )
-
- def _assert_fn(self, x, value=None):
- eq_(testing.db.execute(self.table.select()).fetchall(), [(x, value)])
-
- def test_transaction_tlocal_engine_ctx_commit(self):
- fn = self._trans_fn()
- with _tlengine_deprecated():
- engine = engines.testing_engine(
- options=dict(strategy="threadlocal", pool=testing.db.pool)
- )
- ctx = engine.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_tlocal_engine_ctx_rollback(self):
- fn = self._trans_rollback_fn()
- with _tlengine_deprecated():
- engine = engines.testing_engine(
- options=dict(strategy="threadlocal", pool=testing.db.pool)
- )
- ctx = engine.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager,
- ctx,
- fn,
- 5,
- value=8,
- )
- self._assert_no_data()
-
-
def _proxy_execute_deprecated():
return (
testing.expect_deprecated("ConnectionProxy.execute is deprecated."),
@@ -767,21 +163,7 @@ class ProxyConnectionTest(fixtures.TestBase):
options=dict(implicit_returning=False, proxy=MyProxy())
)
- with testing.expect_deprecated(
- "ConnectionProxy.execute is deprecated.",
- "ConnectionProxy.cursor_execute is deprecated.",
- "The 'threadlocal' engine strategy is deprecated",
- ):
-
- tl_engine = engines.testing_engine(
- options=dict(
- implicit_returning=False,
- proxy=MyProxy(),
- strategy="threadlocal",
- )
- )
-
- for engine in (plain_engine, tl_engine):
+ for engine in (plain_engine,):
m = MetaData(engine)
t1 = Table(
"t1",
@@ -1030,44 +412,6 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase):
self.dbapi = dbapi
self.ProgrammingError = sqlite3.ProgrammingError
- def test_dont_touch_non_dbapi_exception_on_contextual_connect(self):
- dbapi = self.dbapi
- dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
-
- e = create_engine("sqlite://", module=dbapi)
- e.dialect.is_disconnect = is_disconnect = Mock()
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- assert_raises_message(
- TypeError, "I'm not a DBAPI error", e.contextual_connect
- )
- eq_(is_disconnect.call_count, 0)
-
- def test_invalidate_on_contextual_connect(self):
- """test that is_disconnect() is called during connect.
-
- interpretation of connection failures are not supported by
- every backend.
-
- """
-
- dbapi = self.dbapi
- dbapi.connect = Mock(
- side_effect=self.ProgrammingError(
- "Cannot operate on a closed database."
- )
- )
- e = create_engine("sqlite://", module=dbapi)
- try:
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- e.contextual_connect()
- assert False
- except tsa.exc.DBAPIError as de:
- assert de.connection_invalidated
-
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
@@ -1516,214 +860,6 @@ class DeprecatedPoolListenerTest(PoolTestBase):
assert counts == [1, 2, 2]
-class PoolTest(PoolTestBase):
- def test_manager(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI(), use_threadlocal=True)
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- c1 = manager.connect("foo.db")
- c2 = manager.connect("foo.db")
- c3 = manager.connect("bar.db")
- c4 = manager.connect("foo.db", bar="bat")
- c5 = manager.connect("foo.db", bar="hoho")
- c6 = manager.connect("foo.db", bar="bat")
-
- assert c1.cursor() is not None
- assert c1 is c2
- assert c1 is not c3
- assert c4 is c6
- assert c4 is not c5
-
- def test_manager_with_key(self):
-
- dbapi = MockDBAPI()
-
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(dbapi, use_threadlocal=True)
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- c1 = manager.connect("foo.db", sa_pool_key="a")
- c2 = manager.connect("foo.db", sa_pool_key="b")
- c3 = manager.connect("bar.db", sa_pool_key="a")
-
- assert c1.cursor() is not None
- assert c1 is not c2
- assert c1 is c3
-
- eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
-
- def test_bad_args(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI())
- manager.connect(None)
-
- def test_non_thread_local_manager(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI(), use_threadlocal=False)
-
- connection = manager.connect("foo.db")
- connection2 = manager.connect("foo.db")
-
- self.assert_(connection.cursor() is not None)
- self.assert_(connection is not connection2)
-
- def test_threadlocal_del(self):
- self._do_testthreadlocal(useclose=False)
-
- def test_threadlocal_close(self):
- self._do_testthreadlocal(useclose=True)
-
- def _do_testthreadlocal(self, useclose=False):
- dbapi = MockDBAPI()
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- for p in (
- pool.QueuePool(
- creator=dbapi.connect,
- pool_size=3,
- max_overflow=-1,
- use_threadlocal=True,
- ),
- pool.SingletonThreadPool(
- creator=dbapi.connect, use_threadlocal=True
- ),
- ):
- c1 = p.connect()
- c2 = p.connect()
- self.assert_(c1 is c2)
- c3 = p.unique_connection()
- self.assert_(c3 is not c1)
- if useclose:
- c2.close()
- else:
- c2 = None
- c2 = p.connect()
- self.assert_(c1 is c2)
- self.assert_(c3 is not c1)
- if useclose:
- c2.close()
- else:
- c2 = None
- lazy_gc()
- if useclose:
- c1 = p.connect()
- c2 = p.connect()
- c3 = p.connect()
- c3.close()
- c2.close()
- self.assert_(c1.connection is not None)
- c1.close()
- c1 = c2 = c3 = None
-
- # extra tests with QueuePool to ensure connections get
- # __del__()ed when dereferenced
-
- if isinstance(p, pool.QueuePool):
- lazy_gc()
- self.assert_(p.checkedout() == 0)
- c1 = p.connect()
- c2 = p.connect()
- if useclose:
- c2.close()
- c1.close()
- else:
- c2 = None
- c1 = None
- lazy_gc()
- self.assert_(p.checkedout() == 0)
-
- def test_mixed_close(self):
- pool._refs.clear()
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- assert c1 is c2
- c1.close()
- c2 = None
- assert p.checkedout() == 1
- c1 = None
- lazy_gc()
- assert p.checkedout() == 0
- lazy_gc()
- assert not pool._refs
-
-
-class QueuePoolTest(PoolTestBase):
- def test_threadfairy(self):
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c1.close()
- c2 = p.connect()
- assert c2.connection is not None
-
- def test_trick_the_counter(self):
- """this is a "flaw" in the connection pool; since threadlocal
- uses a single ConnectionFairy per thread with an open/close
- counter, you can fool the counter into giving you a
- ConnectionFairy with an ambiguous counter. i.e. its not true
- reference counting."""
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- assert c1 is c2
- c1.close()
- c2 = p.connect()
- c2.close()
- self.assert_(p.checkedout() != 0)
- c2.close()
- self.assert_(p.checkedout() == 0)
-
- @testing.requires.predictable_gc
- def test_weakref_kaboom(self):
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- c1.close()
- c2 = None
- del c1
- del c2
- gc_collect()
- assert p.checkedout() == 0
- c3 = p.connect()
- assert c3 is not None
-
-
class ExplicitAutoCommitDeprecatedTest(fixtures.TestBase):
"""test the 'autocommit' flag on select() and text() objects.