summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
commit07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch)
tree050ef65db988559c60f7aa40f2d0bfe24947e548 /test/engine
parent560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff)
parentee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff)
downloadsqlalchemy-ticket_2501.tar.gz
Merge branch 'master' into ticket_2501ticket_2501
Conflicts: lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_bind.py42
-rw-r--r--test/engine/test_ddlemit.py176
-rw-r--r--test/engine/test_execute.py146
-rw-r--r--test/engine/test_parseconnect.py183
-rw-r--r--test/engine/test_pool.py248
-rw-r--r--test/engine/test_reconnect.py10
-rw-r--r--test/engine/test_reflection.py139
-rw-r--r--test/engine/test_transaction.py142
8 files changed, 709 insertions, 377 deletions
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py
index 973cf4d84..8f6c547f1 100644
--- a/test/engine/test_bind.py
+++ b/test/engine/test_bind.py
@@ -1,7 +1,7 @@
"""tests the "bind" attribute/argument across schema and SQL,
including the deprecated versions of these arguments"""
-from sqlalchemy.testing import eq_, assert_raises
+from sqlalchemy.testing import assert_raises, assert_raises_message
from sqlalchemy import engine, exc
from sqlalchemy import MetaData, ThreadLocalMetaData
from sqlalchemy import Integer, text
@@ -44,7 +44,7 @@ class BindTest(fixtures.TestBase):
testing.db.connect()
):
for args in [
- ([], {'bind':bind}),
+ ([], {'bind': bind}),
([bind], {})
]:
metadata.create_all(*args[0], **args[1])
@@ -56,18 +56,13 @@ class BindTest(fixtures.TestBase):
def test_create_drop_err_metadata(self):
metadata = MetaData()
- table = Table('test_table', metadata, Column('foo', Integer))
+ Table('test_table', metadata, Column('foo', Integer))
for meth in [metadata.create_all, metadata.drop_all]:
- try:
- meth()
- assert False
- except exc.UnboundExecutionError as e:
- eq_(str(e),
- "The MetaData is not bound to an Engine or "
- "Connection. Execution can not proceed without a "
- "database to execute against. Either execute with "
- "an explicit connection or assign the MetaData's "
- ".bind to enable implicit execution.")
+ assert_raises_message(
+ exc.UnboundExecutionError,
+ "MetaData object is not bound to an Engine or Connection.",
+ meth
+ )
def test_create_drop_err_table(self):
metadata = MetaData()
@@ -79,23 +74,16 @@ class BindTest(fixtures.TestBase):
table.create,
table.drop,
]:
- try:
- meth()
- assert False
- except exc.UnboundExecutionError as e:
- eq_(
- str(e),
- "The Table 'test_table' "
- "is not bound to an Engine or Connection. "
- "Execution can not proceed without a database to execute "
- "against. Either execute with an explicit connection or "
- "assign this Table's .metadata.bind to enable implicit "
- "execution.")
+ assert_raises_message(
+ exc.UnboundExecutionError,
+ "Table object 'test_table' is not bound to an Engine or Connection.",
+ meth
+ )
@testing.uses_deprecated()
def test_create_drop_bound(self):
- for meta in (MetaData,ThreadLocalMetaData):
+ for meta in (MetaData, ThreadLocalMetaData):
for bind in (
testing.db,
testing.db.connect()
@@ -136,7 +124,7 @@ class BindTest(fixtures.TestBase):
try:
for args in (
([bind], {}),
- ([], {'bind':bind}),
+ ([], {'bind': bind}),
):
metadata = MetaData(*args[0], **args[1])
table = Table('test_table', metadata,
diff --git a/test/engine/test_ddlemit.py b/test/engine/test_ddlemit.py
deleted file mode 100644
index e773d0ced..000000000
--- a/test/engine/test_ddlemit.py
+++ /dev/null
@@ -1,176 +0,0 @@
-from sqlalchemy.testing import fixtures
-from sqlalchemy.engine.ddl import SchemaGenerator, SchemaDropper
-from sqlalchemy.engine import default
-from sqlalchemy import MetaData, Table, Column, Integer, Sequence
-from sqlalchemy import schema
-from sqlalchemy.testing.mock import Mock
-
-class EmitDDLTest(fixtures.TestBase):
- def _mock_connection(self, item_exists):
- def has_item(connection, name, schema):
- return item_exists(name)
-
- return Mock(dialect=Mock(
- supports_sequences=True,
- has_table=Mock(side_effect=has_item),
- has_sequence=Mock(side_effect=has_item)
- )
- )
-
- def _mock_create_fixture(self, checkfirst, tables,
- item_exists=lambda item: False):
- connection = self._mock_connection(item_exists)
-
- return SchemaGenerator(connection.dialect, connection,
- checkfirst=checkfirst,
- tables=tables)
-
- def _mock_drop_fixture(self, checkfirst, tables,
- item_exists=lambda item: True):
- connection = self._mock_connection(item_exists)
-
- return SchemaDropper(connection.dialect, connection,
- checkfirst=checkfirst,
- tables=tables)
-
- def _table_fixture(self):
- m = MetaData()
-
- return (m, ) + tuple(
- Table('t%d' % i, m, Column('x', Integer))
- for i in range(1, 6)
- )
-
- def _table_seq_fixture(self):
- m = MetaData()
-
- s1 = Sequence('s1')
- s2 = Sequence('s2')
- t1 = Table('t1', m, Column("x", Integer, s1, primary_key=True))
- t2 = Table('t2', m, Column("x", Integer, s2, primary_key=True))
-
- return m, t1, t2, s1, s2
-
-
- def test_create_seq_checkfirst(self):
- m, t1, t2, s1, s2 = self._table_seq_fixture()
- generator = self._mock_create_fixture(True, [t1, t2],
- item_exists=lambda t: t not in ("t1", "s1")
- )
-
- self._assert_create([t1, s1], generator, m)
-
-
- def test_drop_seq_checkfirst(self):
- m, t1, t2, s1, s2 = self._table_seq_fixture()
- generator = self._mock_drop_fixture(True, [t1, t2],
- item_exists=lambda t: t in ("t1", "s1")
- )
-
- self._assert_drop([t1, s1], generator, m)
-
- def test_create_collection_checkfirst(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_create_fixture(True, [t2, t3, t4],
- item_exists=lambda t: t not in ("t2", "t4")
- )
-
- self._assert_create_tables([t2, t4], generator, m)
-
- def test_drop_collection_checkfirst(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_drop_fixture(True, [t2, t3, t4],
- item_exists=lambda t: t in ("t2", "t4")
- )
-
- self._assert_drop_tables([t2, t4], generator, m)
-
- def test_create_collection_nocheck(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_create_fixture(False, [t2, t3, t4],
- item_exists=lambda t: t not in ("t2", "t4")
- )
-
- self._assert_create_tables([t2, t3, t4], generator, m)
-
- def test_create_empty_collection(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_create_fixture(True, [],
- item_exists=lambda t: t not in ("t2", "t4")
- )
-
- self._assert_create_tables([], generator, m)
-
- def test_drop_empty_collection(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_drop_fixture(True, [],
- item_exists=lambda t: t in ("t2", "t4")
- )
-
- self._assert_drop_tables([], generator, m)
-
- def test_drop_collection_nocheck(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_drop_fixture(False, [t2, t3, t4],
- item_exists=lambda t: t in ("t2", "t4")
- )
-
- self._assert_drop_tables([t2, t3, t4], generator, m)
-
- def test_create_metadata_checkfirst(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_create_fixture(True, None,
- item_exists=lambda t: t not in ("t2", "t4")
- )
-
- self._assert_create_tables([t2, t4], generator, m)
-
- def test_drop_metadata_checkfirst(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_drop_fixture(True, None,
- item_exists=lambda t: t in ("t2", "t4")
- )
-
- self._assert_drop_tables([t2, t4], generator, m)
-
- def test_create_metadata_nocheck(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_create_fixture(False, None,
- item_exists=lambda t: t not in ("t2", "t4")
- )
-
- self._assert_create_tables([t1, t2, t3, t4, t5], generator, m)
-
- def test_drop_metadata_nocheck(self):
- m, t1, t2, t3, t4, t5 = self._table_fixture()
- generator = self._mock_drop_fixture(False, None,
- item_exists=lambda t: t in ("t2", "t4")
- )
-
- self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m)
-
- def _assert_create_tables(self, elements, generator, argument):
- self._assert_ddl(schema.CreateTable, elements, generator, argument)
-
- def _assert_drop_tables(self, elements, generator, argument):
- self._assert_ddl(schema.DropTable, elements, generator, argument)
-
- def _assert_create(self, elements, generator, argument):
- self._assert_ddl(
- (schema.CreateTable, schema.CreateSequence),
- elements, generator, argument)
-
- def _assert_drop(self, elements, generator, argument):
- self._assert_ddl(
- (schema.DropTable, schema.DropSequence),
- elements, generator, argument)
-
- def _assert_ddl(self, ddl_cls, elements, generator, argument):
- generator.traverse_single(argument)
- for call_ in generator.connection.execute.mock_calls:
- c = call_[1][0]
- assert isinstance(c, ddl_cls)
- assert c.element in elements, "element %r was not expected"\
- % c.element
- elements.remove(c.element)
- assert not elements, "elements remain in list: %r" % elements
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 1d2aebf97..d3bd3c2cd 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,4 +1,4 @@
-
+# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
config, is_
@@ -17,9 +17,9 @@ from sqlalchemy.testing.engines import testing_engine
import logging.handlers
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
from sqlalchemy.engine import result as _result, default
-from sqlalchemy.engine.base import Connection, Engine
+from sqlalchemy.engine.base import Engine
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.mock import Mock, call
+from sqlalchemy.testing.mock import Mock, call, patch
users, metadata, users_autoinc = None, None, None
@@ -29,11 +29,11 @@ class ExecuteTest(fixtures.TestBase):
global users, users_autoinc, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
- Column('user_id', INT, primary_key = True, autoincrement=False),
+ Column('user_id', INT, primary_key=True, autoincrement=False),
Column('user_name', VARCHAR(20)),
)
users_autoinc = Table('users_autoinc', metadata,
- Column('user_id', INT, primary_key = True,
+ Column('user_id', INT, primary_key=True,
test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
@@ -59,10 +59,9 @@ class ExecuteTest(fixtures.TestBase):
scalar(stmt)
eq_(result, '%')
- @testing.fails_on_everything_except('firebird', 'maxdb',
+ @testing.fails_on_everything_except('firebird',
'sqlite', '+pyodbc',
- '+mxodbc', '+zxjdbc', 'mysql+oursql',
- 'informix+informixdb')
+ '+mxodbc', '+zxjdbc', 'mysql+oursql')
def test_raw_qmark(self):
def go(conn):
conn.execute('insert into users (user_id, user_name) '
@@ -182,7 +181,7 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
- @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle', 'informix+informixdb')
+ @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle')
def test_raw_named(self):
def go(conn):
conn.execute('insert into users (user_id, user_name) '
@@ -204,19 +203,36 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
+ @testing.engines.close_open_connections
def test_exception_wrapping_dbapi(self):
- def go(conn):
+ conn = testing.db.connect()
+ for _c in testing.db, conn:
assert_raises_message(
tsa.exc.DBAPIError,
r"not_a_valid_statement",
- conn.execute, 'not_a_valid_statement'
+ _c.execute, 'not_a_valid_statement'
)
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+
+ @testing.requires.sqlite
+ def test_exception_wrapping_non_dbapi_error(self):
+ e = create_engine('sqlite://')
+ e.dialect.is_disconnect = is_disconnect = Mock()
+
+ with e.connect() as c:
+ c.connection.cursor = Mock(
+ return_value=Mock(
+ execute=Mock(
+ side_effect=TypeError("I'm not a DBAPI error")
+ ))
+ )
+
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ c.execute, "select "
+ )
+ eq_(is_disconnect.call_count, 0)
+
def test_exception_wrapping_non_dbapi_statement(self):
class MyType(TypeDecorator):
@@ -227,7 +243,7 @@ class ExecuteTest(fixtures.TestBase):
def _go(conn):
assert_raises_message(
tsa.exc.StatementError,
- r"nope \(original cause: Exception: nope\) 'SELECT 1 ",
+ r"nope \(original cause: Exception: nope\) u?'SELECT 1 ",
conn.execute,
select([1]).\
where(
@@ -241,6 +257,25 @@ class ExecuteTest(fixtures.TestBase):
finally:
conn.close()
+ def test_stmt_exception_non_ascii(self):
+ name = util.u('méil')
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ tsa.exc.StatementError,
+ util.u(
+ "A value is required for bind parameter 'uname'"
+ r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k
+ else
+ util.u(
+ "A value is required for bind parameter 'uname'"
+ '.*SELECT users.user_name AS .méil.')
+ ,
+ conn.execute,
+ select([users.c.user_name.label(name)]).where(
+ users.c.user_name == bindparam("uname")),
+ {'uname_incorrect': 'foo'}
+ )
+
def test_stmt_exception_pickleable_no_dbapi(self):
self._test_stmt_exception_pickleable(Exception("hello world"))
@@ -326,17 +361,17 @@ class ExecuteTest(fixtures.TestBase):
def test_engine_level_options(self):
eng = engines.testing_engine(options={'execution_options':
{'foo': 'bar'}})
- conn = eng.contextual_connect()
- eq_(conn._execution_options['foo'], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['foo'
- ], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['bat'
- ], 'hoho')
- eq_(conn.execution_options(foo='hoho')._execution_options['foo'
- ], 'hoho')
- eng.update_execution_options(foo='hoho')
- conn = eng.contextual_connect()
- eq_(conn._execution_options['foo'], 'hoho')
+ with eng.contextual_connect() as conn:
+ eq_(conn._execution_options['foo'], 'bar')
+ eq_(conn.execution_options(bat='hoho')._execution_options['foo'
+ ], 'bar')
+ eq_(conn.execution_options(bat='hoho')._execution_options['bat'
+ ], 'hoho')
+ eq_(conn.execution_options(foo='hoho')._execution_options['foo'
+ ], 'hoho')
+ eng.update_execution_options(foo='hoho')
+ conn = eng.contextual_connect()
+ eq_(conn._execution_options['foo'], 'hoho')
@testing.requires.ad_hoc_engines
def test_generative_engine_execution_options(self):
@@ -383,8 +418,8 @@ class ExecuteTest(fixtures.TestBase):
event.listen(eng, "before_execute", l2)
event.listen(eng1, "before_execute", l3)
- eng.execute(select([1]))
- eng1.execute(select([1]))
+ eng.execute(select([1])).close()
+ eng1.execute(select([1])).close()
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
@@ -892,45 +927,44 @@ class ResultProxyTest(fixtures.TestBase):
def test_no_rowcount_on_selects_inserts(self):
"""assert that rowcount is only called on deletes and updates.
- This because cursor.rowcount can be expensive on some dialects
- such as Firebird.
+ This because cursor.rowcount may can be expensive on some dialects
+ such as Firebird, however many dialects require it be called
+ before the cursor is closed.
"""
metadata = self.metadata
engine = engines.testing_engine()
- metadata.bind = engine
t = Table('t1', metadata,
Column('data', String(10))
)
- metadata.create_all()
+ metadata.create_all(engine)
- class BreakRowcountMixin(object):
- @property
- def rowcount(self):
- assert False
+ with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
+ mock_rowcount.__get__ = Mock()
+ engine.execute(t.insert(),
+ {'data': 'd1'},
+ {'data': 'd2'},
+ {'data': 'd3'})
- execution_ctx_cls = engine.dialect.execution_ctx_cls
- engine.dialect.execution_ctx_cls = type("FakeCtx",
- (BreakRowcountMixin,
- execution_ctx_cls),
- {})
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- try:
- r = t.insert().execute({'data': 'd1'}, {'data': 'd2'},
- {'data': 'd3'})
- eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ),
- ('d3', )])
- assert_raises(AssertionError, t.update().execute, {'data'
- : 'd4'})
- assert_raises(AssertionError, t.delete().execute)
- finally:
- engine.dialect.execution_ctx_cls = execution_ctx_cls
+ eq_(
+ engine.execute(t.select()).fetchall(),
+ [('d1', ), ('d2', ), ('d3', )]
+ )
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+ engine.execute(t.update(), {'data': 'd4'})
+
+ eq_(len(mock_rowcount.__get__.mock_calls), 1)
+
+ engine.execute(t.delete())
+ eq_(len(mock_rowcount.__get__.mock_calls), 2)
- @testing.requires.python26
def test_rowproxy_is_sequence(self):
import collections
from sqlalchemy.engine import RowProxy
@@ -1016,7 +1050,7 @@ class ResultProxyTest(fixtures.TestBase):
class ExecutionOptionsTest(fixtures.TestBase):
def test_dialect_conn_options(self):
- engine = testing_engine("sqlite://")
+ engine = testing_engine("sqlite://", options=dict(_initialize=False))
engine.dialect = Mock()
conn = engine.connect()
c2 = conn.execution_options(foo="bar")
diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py
index 106bd0782..391b92144 100644
--- a/test/engine/test_parseconnect.py
+++ b/test/engine/test_parseconnect.py
@@ -1,13 +1,12 @@
-from sqlalchemy.testing import assert_raises, eq_
+from sqlalchemy.testing import assert_raises, eq_, assert_raises_message
from sqlalchemy.util.compat import configparser, StringIO
import sqlalchemy.engine.url as url
from sqlalchemy import create_engine, engine_from_config, exc, pool
-from sqlalchemy.engine.util import _coerce_config
from sqlalchemy.engine.default import DefaultDialect
import sqlalchemy as tsa
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
-from sqlalchemy.testing.mock import Mock
+from sqlalchemy.testing.mock import Mock, MagicMock, patch
class ParseConnectTest(fixtures.TestBase):
@@ -15,6 +14,7 @@ class ParseConnectTest(fixtures.TestBase):
for text in (
'dbtype://username:password@hostspec:110//usr/db_file.db',
'dbtype://username:password@hostspec/database',
+ 'dbtype+apitype://username:password@hostspec/database',
'dbtype://username:password@hostspec',
'dbtype://username:password@/database',
'dbtype://username@hostspec',
@@ -22,25 +22,53 @@ class ParseConnectTest(fixtures.TestBase):
'dbtype://hostspec/database',
'dbtype://hostspec',
'dbtype://hostspec/?arg1=val1&arg2=val2',
- 'dbtype:///database',
+ 'dbtype+apitype:///database',
'dbtype:///:memory:',
'dbtype:///foo/bar/im/a/file',
'dbtype:///E:/work/src/LEM/db/hello.db',
'dbtype:///E:/work/src/LEM/db/hello.db?foo=bar&hoho=lala',
'dbtype://',
- 'dbtype://username:password@/db',
- 'dbtype:////usr/local/mailman/lists/_xtest@example.com/memb'
- 'ers.db',
- 'dbtype://username:apples%2Foranges@hostspec/mydatabase',
+ 'dbtype://username:password@/database',
+ 'dbtype:////usr/local/_xtest@example.com/members.db',
+ 'dbtype://username:apples%2Foranges@hostspec/database',
+ 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]/database?foo=bar',
+ 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]:80/database?foo=bar'
):
u = url.make_url(text)
- assert u.drivername == 'dbtype'
- assert u.username == 'username' or u.username is None
- assert u.password == 'password' or u.password \
- == 'apples/oranges' or u.password is None
- assert u.host == 'hostspec' or u.host == '127.0.0.1' \
- or not u.host
- assert str(u) == text
+
+ assert u.drivername in ('dbtype', 'dbtype+apitype')
+ assert u.username in ('username', None)
+ assert u.password in ('password', 'apples/oranges', None)
+ assert u.host in ('hostspec', '127.0.0.1',
+ '2001:da8:2004:1000:202:116:160:90', '', None), u.host
+ assert u.database in ('database',
+ '/usr/local/_xtest@example.com/members.db',
+ '/usr/db_file.db', ':memory:', '',
+ 'foo/bar/im/a/file',
+ 'E:/work/src/LEM/db/hello.db', None), u.database
+ eq_(str(u), text)
+
+ def test_rfc1738_password(self):
+ u = url.make_url("dbtype://user:pass word + other%3Awords@host/dbname")
+ eq_(u.password, "pass word + other:words")
+ eq_(str(u), "dbtype://user:pass word + other%3Awords@host/dbname")
+
+ u = url.make_url('dbtype://username:apples%2Foranges@hostspec/database')
+ eq_(u.password, "apples/oranges")
+ eq_(str(u), 'dbtype://username:apples%2Foranges@hostspec/database')
+
+ u = url.make_url('dbtype://username:apples%40oranges%40%40@hostspec/database')
+ eq_(u.password, "apples@oranges@@")
+ eq_(str(u), 'dbtype://username:apples%40oranges%40%40@hostspec/database')
+
+ u = url.make_url('dbtype://username%40:@hostspec/database')
+ eq_(u.password, '')
+ eq_(u.username, "username@")
+ eq_(str(u), 'dbtype://username%40:@hostspec/database')
+
+ u = url.make_url('dbtype://username:pass%2Fword@hostspec/database')
+ eq_(u.password, 'pass/word')
+ eq_(str(u), 'dbtype://username:pass%2Fword@hostspec/database')
class DialectImportTest(fixtures.TestBase):
def test_import_base_dialects(self):
@@ -81,50 +109,6 @@ class CreateEngineTest(fixtures.TestBase):
module=dbapi, _initialize=False)
c = e.connect()
- def test_coerce_config(self):
- raw = r"""
-[prefixed]
-sqlalchemy.url=postgresql://scott:tiger@somehost/test?fooz=somevalue
-sqlalchemy.convert_unicode=0
-sqlalchemy.echo=false
-sqlalchemy.echo_pool=1
-sqlalchemy.max_overflow=2
-sqlalchemy.pool_recycle=50
-sqlalchemy.pool_size=2
-sqlalchemy.pool_threadlocal=1
-sqlalchemy.pool_timeout=10
-[plain]
-url=postgresql://scott:tiger@somehost/test?fooz=somevalue
-convert_unicode=0
-echo=0
-echo_pool=1
-max_overflow=2
-pool_recycle=50
-pool_size=2
-pool_threadlocal=1
-pool_timeout=10
-"""
- ini = configparser.ConfigParser()
- ini.readfp(StringIO(raw))
-
- expected = {
- 'url': 'postgresql://scott:tiger@somehost/test?fooz=somevalue',
- 'convert_unicode': 0,
- 'echo': False,
- 'echo_pool': True,
- 'max_overflow': 2,
- 'pool_recycle': 50,
- 'pool_size': 2,
- 'pool_threadlocal': True,
- 'pool_timeout': 10,
- }
-
- prefixed = dict(ini.items('prefixed'))
- self.assert_(_coerce_config(prefixed, 'sqlalchemy.')
- == expected)
-
- plain = dict(ini.items('plain'))
- self.assert_(_coerce_config(plain, '') == expected)
def test_engine_from_config(self):
dbapi = mock_dbapi
@@ -141,19 +125,35 @@ pool_timeout=10
'z=somevalue')
assert e.echo is True
- for param, values in [
- ('convert_unicode', ('true', 'false', 'force')),
- ('echo', ('true', 'false', 'debug')),
- ('echo_pool', ('true', 'false', 'debug')),
- ('use_native_unicode', ('true', 'false')),
- ]:
- for value in values:
- config = {
- 'sqlalchemy.url': 'postgresql://scott:tiger@somehost/test',
- 'sqlalchemy.%s' % param : value
- }
- cfg = _coerce_config(config, 'sqlalchemy.')
- assert cfg[param] == {'true':True, 'false':False}.get(value, value)
+
+ def test_engine_from_config_custom(self):
+ from sqlalchemy import util
+ from sqlalchemy.dialects import registry
+ tokens = __name__.split(".")
+
+ class MyDialect(MockDialect):
+ engine_config_types = {
+ "foobar": int,
+ "bathoho": util.bool_or_str('force')
+ }
+
+ def __init__(self, foobar=None, bathoho=None, **kw):
+ self.foobar = foobar
+ self.bathoho = bathoho
+
+ global dialect
+ dialect = MyDialect
+ registry.register("mockdialect.barb",
+ ".".join(tokens[0:-1]), tokens[-1])
+
+ config = {
+ "sqlalchemy.url": "mockdialect+barb://",
+ "sqlalchemy.foobar": "5",
+ "sqlalchemy.bathoho": "false"
+ }
+ e = engine_from_config(config, _initialize=False)
+ eq_(e.dialect.foobar, 5)
+ eq_(e.dialect.bathoho, False)
def test_custom(self):
@@ -227,17 +227,38 @@ pool_timeout=10
@testing.requires.sqlite
def test_wraps_connect_in_dbapi(self):
- # sqlite uses SingletonThreadPool which doesnt have max_overflow
+ e = create_engine('sqlite://')
+ sqlite3 = e.dialect.dbapi
- assert_raises(TypeError, create_engine, 'sqlite://',
- max_overflow=5, module=mock_sqlite_dbapi)
- e = create_engine('sqlite://', connect_args={'use_unicode'
- : True}, convert_unicode=True)
+ dbapi = MockDBAPI()
+ dbapi.Error = sqlite3.Error,
+ dbapi.ProgrammingError = sqlite3.ProgrammingError
+ dbapi.connect = Mock(side_effect=sqlite3.ProgrammingError("random error"))
try:
- e.connect()
+ create_engine('sqlite://', module=dbapi).connect()
+ assert False
except tsa.exc.DBAPIError as de:
assert not de.connection_invalidated
+
+ @testing.requires.sqlite
+ def test_dont_touch_non_dbapi_exception_on_connect(self):
+ e = create_engine('sqlite://')
+ sqlite3 = e.dialect.dbapi
+
+ dbapi = MockDBAPI()
+ dbapi.Error = sqlite3.Error,
+ dbapi.ProgrammingError = sqlite3.ProgrammingError
+ dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
+ e = create_engine('sqlite://', module=dbapi)
+ e.dialect.is_disconnect = is_disconnect = Mock()
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ e.connect
+ )
+ eq_(is_disconnect.call_count, 0)
+
def test_ensure_dialect_does_is_disconnect_no_conn(self):
"""test that is_disconnect() doesn't choke if no connection, cursor given."""
dialect = testing.db.dialect
@@ -277,6 +298,10 @@ pool_timeout=10
assert e.url.drivername == e2.url.drivername == 'mysql'
assert e.url.username == e2.url.username == 'scott'
assert e2.url is u
+ assert str(u) == 'mysql://scott:tiger@localhost/test'
+ assert repr(u) == 'mysql://scott:***@localhost/test'
+ assert repr(e) == 'Engine(mysql://scott:***@localhost/test)'
+ assert repr(e2) == 'Engine(mysql://scott:***@localhost/test)'
def test_poolargs(self):
"""test that connection pool args make it thru"""
@@ -363,7 +388,7 @@ def MockDBAPI(**assert_kwargs):
)
return connection
- return Mock(
+ return MagicMock(
sqlite_version_info=(99, 9, 9,),
version_info=(99, 9, 9,),
sqlite_version='99.9.9',
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 05c0487f8..2e4c2dc48 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -10,6 +10,8 @@ from sqlalchemy.testing import fixtures
from sqlalchemy.testing.mock import Mock, call
+join_timeout = 10
+
def MockDBAPI():
def cursor():
while True:
@@ -306,6 +308,13 @@ class PoolEventsTest(PoolTestBase):
return p, canary
+ def _invalidate_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = Mock()
+ event.listen(p, 'invalidate', canary)
+
+ return p, canary
+
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()
@@ -409,6 +418,31 @@ class PoolEventsTest(PoolTestBase):
c1.close()
eq_(canary, ['reset'])
+ def test_invalidate_event_no_exception(self):
+ p, canary = self._invalidate_event_fixture()
+
+ c1 = p.connect()
+ c1.close()
+ assert not canary.called
+ c1 = p.connect()
+ dbapi_con = c1.connection
+ c1.invalidate()
+ assert canary.call_args_list[0][0][0] is dbapi_con
+ assert canary.call_args_list[0][0][2] is None
+
+ def test_invalidate_event_exception(self):
+ p, canary = self._invalidate_event_fixture()
+
+ c1 = p.connect()
+ c1.close()
+ assert not canary.called
+ c1 = p.connect()
+ dbapi_con = c1.connection
+ exc = Exception("hi")
+ c1.invalidate(exc)
+ assert canary.call_args_list[0][0][0] is dbapi_con
+ assert canary.call_args_list[0][0][2] is exc
+
def test_checkin_event_gc(self):
p, canary = self._checkin_event_fixture()
@@ -827,7 +861,7 @@ class QueuePoolTest(PoolTestBase):
th.start()
threads.append(th)
for th in threads:
- th.join()
+ th.join(join_timeout)
assert len(timeouts) > 0
for t in timeouts:
@@ -864,22 +898,109 @@ class QueuePoolTest(PoolTestBase):
th.start()
threads.append(th)
for th in threads:
- th.join()
+ th.join(join_timeout)
self.assert_(max(peaks) <= max_overflow)
lazy_gc()
assert not pool._refs
+
+ def test_overflow_reset_on_failed_connect(self):
+ dbapi = Mock()
+
+ def failing_dbapi():
+ time.sleep(2)
+ raise Exception("connection failed")
+
+ creator = dbapi.connect
+ def create():
+ return creator()
+
+ p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
+ c1 = p.connect()
+ c2 = p.connect()
+ c3 = p.connect()
+ eq_(p._overflow, 1)
+ creator = failing_dbapi
+ assert_raises(Exception, p.connect)
+ eq_(p._overflow, 1)
+
+ @testing.requires.threading_with_mock
+ def test_hanging_connect_within_overflow(self):
+ """test that a single connect() call which is hanging
+ does not block other connections from proceeding."""
+
+ dbapi = Mock()
+ mutex = threading.Lock()
+
+ def hanging_dbapi():
+ time.sleep(2)
+ with mutex:
+ return dbapi.connect()
+
+ def fast_dbapi():
+ with mutex:
+ return dbapi.connect()
+
+ creator = threading.local()
+
+ def create():
+ return creator.mock_connector()
+
+ def run_test(name, pool, should_hang):
+ if should_hang:
+ creator.mock_connector = hanging_dbapi
+ else:
+ creator.mock_connector = fast_dbapi
+
+ conn = pool.connect()
+ conn.operation(name)
+ time.sleep(1)
+ conn.close()
+
+ p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
+
+ threads = [
+ threading.Thread(
+ target=run_test, args=("success_one", p, False)),
+ threading.Thread(
+ target=run_test, args=("success_two", p, False)),
+ threading.Thread(
+ target=run_test, args=("overflow_one", p, True)),
+ threading.Thread(
+ target=run_test, args=("overflow_two", p, False)),
+ threading.Thread(
+ target=run_test, args=("overflow_three", p, False))
+ ]
+ for t in threads:
+ t.start()
+ time.sleep(.2)
+
+ for t in threads:
+ t.join(timeout=join_timeout)
+ eq_(
+ dbapi.connect().operation.mock_calls,
+ [call("success_one"), call("success_two"),
+ call("overflow_two"), call("overflow_three"),
+ call("overflow_one")]
+ )
+
+
@testing.requires.threading_with_mock
def test_waiters_handled(self):
"""test that threads waiting for connections are
handled when the pool is replaced.
"""
+ mutex = threading.Lock()
dbapi = MockDBAPI()
def creator():
- return dbapi.connect()
+ mutex.acquire()
+ try:
+ return dbapi.connect()
+ finally:
+ mutex.release()
success = []
for timeout in (None, 30):
@@ -897,21 +1018,27 @@ class QueuePoolTest(PoolTestBase):
c1 = p.connect()
c2 = p.connect()
+ threads = []
for i in range(2):
t = threading.Thread(target=waiter,
args=(p, timeout, max_overflow))
- t.setDaemon(True) # so the tests dont hang if this fails
+ t.daemon = True
t.start()
+ threads.append(t)
- c1.invalidate()
- c2.invalidate()
- p2 = p._replace()
+ # this sleep makes sure that the
+ # two waiter threads hit upon wait()
+ # inside the queue, before we invalidate the other
+ # two conns
time.sleep(.2)
+ p2 = p._replace()
+
+ for t in threads:
+ t.join(join_timeout)
eq_(len(success), 12, "successes: %s" % success)
@testing.requires.threading_with_mock
- @testing.requires.python26
def test_notify_waiters(self):
dbapi = MockDBAPI()
canary = []
@@ -924,9 +1051,7 @@ class QueuePoolTest(PoolTestBase):
p1 = pool.QueuePool(creator=creator1,
pool_size=1, timeout=None,
max_overflow=0)
- p2 = pool.QueuePool(creator=creator2,
- pool_size=1, timeout=None,
- max_overflow=-1)
+ p2 = pool.NullPool(creator=creator2)
def waiter(p):
conn = p.connect()
time.sleep(.5)
@@ -934,14 +1059,18 @@ class QueuePoolTest(PoolTestBase):
c1 = p1.connect()
+ threads = []
for i in range(5):
t = threading.Thread(target=waiter, args=(p1, ))
- t.setDaemon(True)
t.start()
+ threads.append(t)
time.sleep(.5)
eq_(canary, [1])
p1._pool.abort(p2)
- time.sleep(1)
+
+ for t in threads:
+ t.join(join_timeout)
+
eq_(canary, [1, 2, 2, 2, 2, 2])
def test_dispose_closes_pooled(self):
@@ -987,6 +1116,7 @@ class QueuePoolTest(PoolTestBase):
self._test_overflow(40, 5)
def test_mixed_close(self):
+ pool._refs.clear()
p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
@@ -1198,6 +1328,96 @@ class QueuePoolTest(PoolTestBase):
c2 = p.connect()
assert c2.connection is not None
+class ResetOnReturnTest(PoolTestBase):
+ def _fixture(self, **kw):
+ dbapi = Mock()
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+
+ def test_plain_rollback(self):
+ dbapi, p = self._fixture(reset_on_return='rollback')
+
+ c1 = p.connect()
+ c1.close()
+ assert dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_plain_commit(self):
+ dbapi, p = self._fixture(reset_on_return='commit')
+
+ c1 = p.connect()
+ c1.close()
+ assert not dbapi.connect().rollback.called
+ assert dbapi.connect().commit.called
+
+ def test_plain_none(self):
+ dbapi, p = self._fixture(reset_on_return=None)
+
+ c1 = p.connect()
+ c1.close()
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_agent_rollback(self):
+ dbapi, p = self._fixture(reset_on_return='rollback')
+
+ class Agent(object):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def rollback(self):
+ self.conn.special_rollback()
+
+ def commit(self):
+ self.conn.special_commit()
+
+ c1 = p.connect()
+ c1._reset_agent = Agent(c1)
+ c1.close()
+
+ assert dbapi.connect().special_rollback.called
+ assert not dbapi.connect().special_commit.called
+
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ c1 = p.connect()
+ c1.close()
+ eq_(dbapi.connect().special_rollback.call_count, 1)
+ eq_(dbapi.connect().special_commit.call_count, 0)
+
+ assert dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_agent_commit(self):
+ dbapi, p = self._fixture(reset_on_return='commit')
+
+ class Agent(object):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def rollback(self):
+ self.conn.special_rollback()
+
+ def commit(self):
+ self.conn.special_commit()
+
+ c1 = p.connect()
+ c1._reset_agent = Agent(c1)
+ c1.close()
+ assert not dbapi.connect().special_rollback.called
+ assert dbapi.connect().special_commit.called
+
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ c1 = p.connect()
+ c1.close()
+
+ eq_(dbapi.connect().special_rollback.call_count, 0)
+ eq_(dbapi.connect().special_commit.call_count, 1)
+ assert not dbapi.connect().rollback.called
+ assert dbapi.connect().commit.called
+
class SingletonThreadPoolTest(PoolTestBase):
@testing.requires.threading_with_mock
@@ -1245,7 +1465,7 @@ class SingletonThreadPoolTest(PoolTestBase):
th.start()
threads.append(th)
for th in threads:
- th.join()
+ th.join(join_timeout)
assert len(p._all_conns) == 3
if strong_refs:
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 0a964cf63..ba336a1bf 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -413,8 +413,6 @@ class RealReconnectTest(fixtures.TestBase):
def teardown(self):
self.engine.dispose()
- @testing.fails_on('+informixdb',
- "Wrong error thrown, fix in informixdb?")
def test_reconnect(self):
conn = self.engine.connect()
@@ -539,8 +537,6 @@ class RealReconnectTest(fixtures.TestBase):
# pool was recreated
assert engine.pool is not p1
- @testing.fails_on('+informixdb',
- "Wrong error thrown, fix in informixdb?")
def test_null_pool(self):
engine = \
engines.reconnecting_engine(options=dict(poolclass=pool.NullPool))
@@ -554,8 +550,6 @@ class RealReconnectTest(fixtures.TestBase):
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
- @testing.fails_on('+informixdb',
- "Wrong error thrown, fix in informixdb?")
def test_close(self):
conn = self.engine.connect()
eq_(conn.execute(select([1])).scalar(), 1)
@@ -569,8 +563,6 @@ class RealReconnectTest(fixtures.TestBase):
conn = self.engine.connect()
eq_(conn.execute(select([1])).scalar(), 1)
- @testing.fails_on('+informixdb',
- "Wrong error thrown, fix in informixdb?")
def test_with_transaction(self):
conn = self.engine.connect()
trans = conn.begin()
@@ -651,8 +643,6 @@ class InvalidateDuringResultTest(fixtures.TestBase):
'+cymysql', '+pymysql', '+pg8000'
], "Buffers the result set and doesn't check for "
"connection close")
- @testing.fails_on('+informixdb',
- "Wrong error thrown, fix in informixdb?")
def test_invalidate_on_results(self):
conn = self.engine.connect()
result = conn.execute('select * from sometable')
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 52cbc15e6..2f311f7e7 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -361,6 +361,27 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
self.assert_(isinstance(table.c.col4.type, sa.String))
@testing.provide_metadata
+ def test_override_upgrade_pk_flag(self):
+ meta = self.metadata
+ table = Table(
+ 'override_test', meta,
+ Column('col1', sa.Integer),
+ Column('col2', sa.String(20)),
+ Column('col3', sa.Numeric)
+ )
+ table.create()
+
+ meta2 = MetaData(testing.db)
+ table = Table(
+ 'override_test', meta2,
+ Column('col1', sa.Integer, primary_key=True),
+ autoload=True)
+
+ eq_(list(table.primary_key), [table.c.col1])
+ eq_(table.c.col1.primary_key, True)
+
+
+ @testing.provide_metadata
def test_override_pkfk(self):
"""test that you can override columns which contain foreign keys
to other reflected tables, where the foreign key column is also
@@ -602,6 +623,55 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
is a2.c.user_id
assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+ @testing.only_on(['postgresql', 'mysql'])
+ @testing.provide_metadata
+ def test_fk_options(self):
+ """test that foreign key reflection includes options (on
+ backends with {dialect}.get_foreign_keys() support)"""
+
+ if testing.against('postgresql'):
+ test_attrs = ('match', 'onupdate', 'ondelete', 'deferrable', 'initially')
+ addresses_user_id_fkey = sa.ForeignKey(
+ # Each option is specifically not a Postgres default, or
+ # it won't be returned by PG's inspection
+ 'users.id',
+ name = 'addresses_user_id_fkey',
+ match='FULL',
+ onupdate='RESTRICT',
+ ondelete='RESTRICT',
+ deferrable=True,
+ initially='DEFERRED'
+ )
+ elif testing.against('mysql'):
+ # MATCH, DEFERRABLE, and INITIALLY cannot be defined for MySQL
+ # ON UPDATE and ON DELETE have defaults of RESTRICT, which are
+ # elided by MySQL's inspection
+ addresses_user_id_fkey = sa.ForeignKey(
+ 'users.id',
+ name = 'addresses_user_id_fkey',
+ onupdate='CASCADE',
+ ondelete='CASCADE'
+ )
+ test_attrs = ('onupdate', 'ondelete')
+
+ meta = self.metadata
+ Table('users', meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)),
+ test_needs_fk=True)
+ Table('addresses', meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('user_id', sa.Integer, addresses_user_id_fkey),
+ test_needs_fk=True)
+ meta.create_all()
+
+ meta2 = MetaData()
+ meta2.reflect(testing.db)
+ for fk in meta2.tables['addresses'].foreign_keys:
+ ref = addresses_user_id_fkey
+ for attr in test_attrs:
+ eq_(getattr(fk, attr), getattr(ref, attr))
+
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique
indexes"""
@@ -705,10 +775,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
@testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
- @testing.fails_on('+informixdb',
- "FIXME: should be supported via the "
- "DELIMITED env var but that breaks "
- "everything else for now")
@testing.provide_metadata
def test_reserved(self):
@@ -725,7 +791,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
# There's currently no way to calculate identifier case
# normalization in isolation, so...
- if testing.against('firebird', 'oracle', 'maxdb'):
+ if testing.against('firebird', 'oracle'):
check_col = 'TRUE'
else:
check_col = 'true'
@@ -778,6 +844,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
def test_reflect_uses_bind_engine_reflect(self):
self._test_reflect_uses_bind(lambda e: MetaData().reflect(e))
+
@testing.provide_metadata
def test_reflect_all(self):
existing = testing.db.table_names()
@@ -833,6 +900,18 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
m8.reflect
)
+ m8_e1 = MetaData(testing.db)
+ rt_c = Table('rt_c', m8_e1)
+ m8_e1.reflect(extend_existing=True)
+ eq_(set(m8_e1.tables.keys()), set(names))
+ eq_(rt_c.c.keys(), ['id'])
+
+ m8_e2 = MetaData(testing.db)
+ rt_c = Table('rt_c', m8_e2)
+ m8_e2.reflect(extend_existing=True, only=['rt_a', 'rt_c'])
+ eq_(set(m8_e2.tables.keys()), set(['rt_a', 'rt_c']))
+ eq_(rt_c.c.keys(), ['id'])
+
if existing:
print("Other tables present in database, skipping some checks.")
else:
@@ -1423,6 +1502,7 @@ class CaseSensitiveTest(fixtures.TablesTest):
class ColumnEventsTest(fixtures.TestBase):
+
@classmethod
def setup_class(cls):
cls.metadata = MetaData()
@@ -1430,7 +1510,16 @@ class ColumnEventsTest(fixtures.TestBase):
'to_reflect',
cls.metadata,
Column('x', sa.Integer, primary_key=True),
+ Column('y', sa.Integer),
+ test_needs_fk=True
)
+ cls.related = Table(
+ 'related',
+ cls.metadata,
+ Column('q', sa.Integer, sa.ForeignKey('to_reflect.x')),
+ test_needs_fk=True
+ )
+ sa.Index("some_index", cls.to_reflect.c.y)
cls.metadata.create_all(testing.db)
@classmethod
@@ -1440,7 +1529,7 @@ class ColumnEventsTest(fixtures.TestBase):
def teardown(self):
events.SchemaEventTarget.dispatch._clear()
- def _do_test(self, col, update, assert_):
+ def _do_test(self, col, update, assert_, tablename="to_reflect"):
# load the actual Table class, not the test
# wrapper
from sqlalchemy.schema import Table
@@ -1450,22 +1539,54 @@ class ColumnEventsTest(fixtures.TestBase):
if column_info['name'] == col:
column_info.update(update)
- t = Table('to_reflect', m, autoload=True, listeners=[
+ t = Table(tablename, m, autoload=True, listeners=[
('column_reflect', column_reflect),
])
assert_(t)
m = MetaData(testing.db)
event.listen(Table, 'column_reflect', column_reflect)
- t2 = Table('to_reflect', m, autoload=True)
+ t2 = Table(tablename, m, autoload=True)
assert_(t2)
def test_override_key(self):
+ def assertions(table):
+ eq_(table.c.YXZ.name, "x")
+ eq_(set(table.primary_key), set([table.c.YXZ]))
+
self._do_test(
"x", {"key": "YXZ"},
- lambda table: eq_(table.c.YXZ.name, "x")
+ assertions
)
+ def test_override_index(self):
+ def assertions(table):
+ idx = list(table.indexes)[0]
+ eq_(idx.columns, [table.c.YXZ])
+
+ self._do_test(
+ "y", {"key": "YXZ"},
+ assertions
+ )
+
+ def test_override_key_fk(self):
+ m = MetaData(testing.db)
+ def column_reflect(insp, table, column_info):
+
+ if column_info['name'] == 'q':
+ column_info['key'] = 'qyz'
+ elif column_info['name'] == 'x':
+ column_info['key'] = 'xyz'
+
+ to_reflect = Table("to_reflect", m, autoload=True, listeners=[
+ ('column_reflect', column_reflect),
+ ])
+ related = Table("related", m, autoload=True, listeners=[
+ ('column_reflect', column_reflect),
+ ])
+
+ assert related.c.qyz.references(to_reflect.c.xyz)
+
def test_override_type(self):
def assert_(table):
assert isinstance(table.c.x.type, sa.String)
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index ffc12b5b9..c373133d1 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -3,6 +3,7 @@ from sqlalchemy.testing import eq_, assert_raises, \
import sys
import time
import threading
+from sqlalchemy import event
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \
select, Integer, String, func, text, exc
@@ -29,7 +30,6 @@ class TransactionTest(fixtures.TestBase):
testing.db.execute(users.delete()).close()
@classmethod
- @testing.crashes('mysql+cymysql', 'deadlock')
def teardown_class(cls):
users.drop(testing.db)
@@ -342,7 +342,8 @@ class TransactionTest(fixtures.TestBase):
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name='user1')
transaction.prepare()
- connection.close()
+ connection.invalidate()
+
connection2 = testing.db.connect()
eq_(connection2.execute(select([users.c.user_id]).
order_by(users.c.user_id)).fetchall(),
@@ -379,6 +380,138 @@ class TransactionTest(fixtures.TestBase):
eq_(result.fetchall(), [('user1', ), ('user4', )])
conn.close()
+ @testing.requires.two_phase_transactions
+ def test_reset_rollback_two_phase_no_rollback(self):
+ # test [ticket:2907], essentially that the
+ # TwoPhaseTransaction is given the job of "reset on return"
+ # so that picky backends like MySQL correctly clear out
+ # their state when a connection is closed without handling
+ # the transaction explicitly.
+
+ eng = testing_engine()
+
+ # MySQL raises if you call straight rollback() on
+ # a connection with an XID present
+ @event.listens_for(eng, "invalidate")
+ def conn_invalidated(dbapi_con, con_record, exception):
+ dbapi_con.close()
+ raise exception
+
+ with eng.connect() as conn:
+ rec = conn.connection._connection_record
+ raw_dbapi_con = rec.connection
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=1, user_name='user1')
+
+ assert rec.connection is raw_dbapi_con
+
+ with eng.connect() as conn:
+ result = \
+ conn.execute(select([users.c.user_name]).
+ order_by(users.c.user_id))
+ eq_(result.fetchall(), [])
+
+class ResetAgentTest(fixtures.TestBase):
+ def test_begin_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ assert not trans.is_active
+
+ def test_begin_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.savepoints
+ def test_begin_nested_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ assert not trans.is_active
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ assert trans2.is_active # was never closed
+ assert not trans.is_active
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_commit_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.commit()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
class AutoRollbackTest(fixtures.TestBase):
@classmethod
@@ -504,7 +637,7 @@ class ExplicitAutoCommitTest(fixtures.TestBase):
conn2.close()
@testing.uses_deprecated(r'autocommit on select\(\) is deprecated',
- r'autocommit\(\) is deprecated')
+ r'``autocommit\(\)`` is deprecated')
def test_explicit_compiled_deprecated(self):
conn1 = testing.db.connect()
conn2 = testing.db.connect()
@@ -1036,7 +1169,6 @@ class ForUpdateTest(fixtures.TestBase):
@testing.crashes('mssql', 'FIXME: unknown')
@testing.crashes('firebird', 'FIXME: unknown')
@testing.crashes('sybase', 'FIXME: unknown')
- @testing.crashes('access', 'FIXME: unknown')
@testing.requires.independent_connections
def test_queued_update(self):
"""Test SELECT FOR UPDATE with concurrent modifications.
@@ -1101,7 +1233,6 @@ class ForUpdateTest(fixtures.TestBase):
@testing.crashes('mssql', 'FIXME: unknown')
@testing.crashes('firebird', 'FIXME: unknown')
@testing.crashes('sybase', 'FIXME: unknown')
- @testing.crashes('access', 'FIXME: unknown')
@testing.requires.independent_connections
def test_queued_select(self):
"""Simple SELECT FOR UPDATE conflict test"""
@@ -1113,7 +1244,6 @@ class ForUpdateTest(fixtures.TestBase):
@testing.fails_on('mysql', 'No support for NOWAIT')
@testing.crashes('firebird', 'FIXME: unknown')
@testing.crashes('sybase', 'FIXME: unknown')
- @testing.crashes('access', 'FIXME: unknown')
@testing.requires.independent_connections
def test_nowait_select(self):
"""Simple SELECT FOR UPDATE NOWAIT conflict test"""