diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
| commit | 07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch) | |
| tree | 050ef65db988559c60f7aa40f2d0bfe24947e548 /test/engine | |
| parent | 560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff) | |
| parent | ee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff) | |
| download | sqlalchemy-ticket_2501.tar.gz | |
Merge branch 'master' into ticket_2501ticket_2501
Conflicts:
lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_bind.py | 42 | ||||
| -rw-r--r-- | test/engine/test_ddlemit.py | 176 | ||||
| -rw-r--r-- | test/engine/test_execute.py | 146 | ||||
| -rw-r--r-- | test/engine/test_parseconnect.py | 183 | ||||
| -rw-r--r-- | test/engine/test_pool.py | 248 | ||||
| -rw-r--r-- | test/engine/test_reconnect.py | 10 | ||||
| -rw-r--r-- | test/engine/test_reflection.py | 139 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 142 |
8 files changed, 709 insertions, 377 deletions
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py index 973cf4d84..8f6c547f1 100644 --- a/test/engine/test_bind.py +++ b/test/engine/test_bind.py @@ -1,7 +1,7 @@ """tests the "bind" attribute/argument across schema and SQL, including the deprecated versions of these arguments""" -from sqlalchemy.testing import eq_, assert_raises +from sqlalchemy.testing import assert_raises, assert_raises_message from sqlalchemy import engine, exc from sqlalchemy import MetaData, ThreadLocalMetaData from sqlalchemy import Integer, text @@ -44,7 +44,7 @@ class BindTest(fixtures.TestBase): testing.db.connect() ): for args in [ - ([], {'bind':bind}), + ([], {'bind': bind}), ([bind], {}) ]: metadata.create_all(*args[0], **args[1]) @@ -56,18 +56,13 @@ class BindTest(fixtures.TestBase): def test_create_drop_err_metadata(self): metadata = MetaData() - table = Table('test_table', metadata, Column('foo', Integer)) + Table('test_table', metadata, Column('foo', Integer)) for meth in [metadata.create_all, metadata.drop_all]: - try: - meth() - assert False - except exc.UnboundExecutionError as e: - eq_(str(e), - "The MetaData is not bound to an Engine or " - "Connection. Execution can not proceed without a " - "database to execute against. Either execute with " - "an explicit connection or assign the MetaData's " - ".bind to enable implicit execution.") + assert_raises_message( + exc.UnboundExecutionError, + "MetaData object is not bound to an Engine or Connection.", + meth + ) def test_create_drop_err_table(self): metadata = MetaData() @@ -79,23 +74,16 @@ class BindTest(fixtures.TestBase): table.create, table.drop, ]: - try: - meth() - assert False - except exc.UnboundExecutionError as e: - eq_( - str(e), - "The Table 'test_table' " - "is not bound to an Engine or Connection. " - "Execution can not proceed without a database to execute " - "against. Either execute with an explicit connection or " - "assign this Table's .metadata.bind to enable implicit " - "execution.") + assert_raises_message( + exc.UnboundExecutionError, + "Table object 'test_table' is not bound to an Engine or Connection.", + meth + ) @testing.uses_deprecated() def test_create_drop_bound(self): - for meta in (MetaData,ThreadLocalMetaData): + for meta in (MetaData, ThreadLocalMetaData): for bind in ( testing.db, testing.db.connect() @@ -136,7 +124,7 @@ class BindTest(fixtures.TestBase): try: for args in ( ([bind], {}), - ([], {'bind':bind}), + ([], {'bind': bind}), ): metadata = MetaData(*args[0], **args[1]) table = Table('test_table', metadata, diff --git a/test/engine/test_ddlemit.py b/test/engine/test_ddlemit.py deleted file mode 100644 index e773d0ced..000000000 --- a/test/engine/test_ddlemit.py +++ /dev/null @@ -1,176 +0,0 @@ -from sqlalchemy.testing import fixtures -from sqlalchemy.engine.ddl import SchemaGenerator, SchemaDropper -from sqlalchemy.engine import default -from sqlalchemy import MetaData, Table, Column, Integer, Sequence -from sqlalchemy import schema -from sqlalchemy.testing.mock import Mock - -class EmitDDLTest(fixtures.TestBase): - def _mock_connection(self, item_exists): - def has_item(connection, name, schema): - return item_exists(name) - - return Mock(dialect=Mock( - supports_sequences=True, - has_table=Mock(side_effect=has_item), - has_sequence=Mock(side_effect=has_item) - ) - ) - - def _mock_create_fixture(self, checkfirst, tables, - item_exists=lambda item: False): - connection = self._mock_connection(item_exists) - - return SchemaGenerator(connection.dialect, connection, - checkfirst=checkfirst, - tables=tables) - - def _mock_drop_fixture(self, checkfirst, tables, - item_exists=lambda item: True): - connection = self._mock_connection(item_exists) - - return SchemaDropper(connection.dialect, connection, - checkfirst=checkfirst, - tables=tables) - - def _table_fixture(self): - m = MetaData() - - return (m, ) + tuple( - Table('t%d' % i, m, Column('x', Integer)) - for i in range(1, 6) - ) - - def _table_seq_fixture(self): - m = MetaData() - - s1 = Sequence('s1') - s2 = Sequence('s2') - t1 = Table('t1', m, Column("x", Integer, s1, primary_key=True)) - t2 = Table('t2', m, Column("x", Integer, s2, primary_key=True)) - - return m, t1, t2, s1, s2 - - - def test_create_seq_checkfirst(self): - m, t1, t2, s1, s2 = self._table_seq_fixture() - generator = self._mock_create_fixture(True, [t1, t2], - item_exists=lambda t: t not in ("t1", "s1") - ) - - self._assert_create([t1, s1], generator, m) - - - def test_drop_seq_checkfirst(self): - m, t1, t2, s1, s2 = self._table_seq_fixture() - generator = self._mock_drop_fixture(True, [t1, t2], - item_exists=lambda t: t in ("t1", "s1") - ) - - self._assert_drop([t1, s1], generator, m) - - def test_create_collection_checkfirst(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_create_fixture(True, [t2, t3, t4], - item_exists=lambda t: t not in ("t2", "t4") - ) - - self._assert_create_tables([t2, t4], generator, m) - - def test_drop_collection_checkfirst(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_drop_fixture(True, [t2, t3, t4], - item_exists=lambda t: t in ("t2", "t4") - ) - - self._assert_drop_tables([t2, t4], generator, m) - - def test_create_collection_nocheck(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_create_fixture(False, [t2, t3, t4], - item_exists=lambda t: t not in ("t2", "t4") - ) - - self._assert_create_tables([t2, t3, t4], generator, m) - - def test_create_empty_collection(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_create_fixture(True, [], - item_exists=lambda t: t not in ("t2", "t4") - ) - - self._assert_create_tables([], generator, m) - - def test_drop_empty_collection(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_drop_fixture(True, [], - item_exists=lambda t: t in ("t2", "t4") - ) - - self._assert_drop_tables([], generator, m) - - def test_drop_collection_nocheck(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_drop_fixture(False, [t2, t3, t4], - item_exists=lambda t: t in ("t2", "t4") - ) - - self._assert_drop_tables([t2, t3, t4], generator, m) - - def test_create_metadata_checkfirst(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_create_fixture(True, None, - item_exists=lambda t: t not in ("t2", "t4") - ) - - self._assert_create_tables([t2, t4], generator, m) - - def test_drop_metadata_checkfirst(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_drop_fixture(True, None, - item_exists=lambda t: t in ("t2", "t4") - ) - - self._assert_drop_tables([t2, t4], generator, m) - - def test_create_metadata_nocheck(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_create_fixture(False, None, - item_exists=lambda t: t not in ("t2", "t4") - ) - - self._assert_create_tables([t1, t2, t3, t4, t5], generator, m) - - def test_drop_metadata_nocheck(self): - m, t1, t2, t3, t4, t5 = self._table_fixture() - generator = self._mock_drop_fixture(False, None, - item_exists=lambda t: t in ("t2", "t4") - ) - - self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m) - - def _assert_create_tables(self, elements, generator, argument): - self._assert_ddl(schema.CreateTable, elements, generator, argument) - - def _assert_drop_tables(self, elements, generator, argument): - self._assert_ddl(schema.DropTable, elements, generator, argument) - - def _assert_create(self, elements, generator, argument): - self._assert_ddl( - (schema.CreateTable, schema.CreateSequence), - elements, generator, argument) - - def _assert_drop(self, elements, generator, argument): - self._assert_ddl( - (schema.DropTable, schema.DropSequence), - elements, generator, argument) - - def _assert_ddl(self, ddl_cls, elements, generator, argument): - generator.traverse_single(argument) - for call_ in generator.connection.execute.mock_calls: - c = call_[1][0] - assert isinstance(c, ddl_cls) - assert c.element in elements, "element %r was not expected"\ - % c.element - elements.remove(c.element) - assert not elements, "elements remain in list: %r" % elements diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 1d2aebf97..d3bd3c2cd 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,4 +1,4 @@ - +# coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ config, is_ @@ -17,9 +17,9 @@ from sqlalchemy.testing.engines import testing_engine import logging.handlers from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam from sqlalchemy.engine import result as _result, default -from sqlalchemy.engine.base import Connection, Engine +from sqlalchemy.engine.base import Engine from sqlalchemy.testing import fixtures -from sqlalchemy.testing.mock import Mock, call +from sqlalchemy.testing.mock import Mock, call, patch users, metadata, users_autoinc = None, None, None @@ -29,11 +29,11 @@ class ExecuteTest(fixtures.TestBase): global users, users_autoinc, metadata metadata = MetaData(testing.db) users = Table('users', metadata, - Column('user_id', INT, primary_key = True, autoincrement=False), + Column('user_id', INT, primary_key=True, autoincrement=False), Column('user_name', VARCHAR(20)), ) users_autoinc = Table('users_autoinc', metadata, - Column('user_id', INT, primary_key = True, + Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) @@ -59,10 +59,9 @@ class ExecuteTest(fixtures.TestBase): scalar(stmt) eq_(result, '%') - @testing.fails_on_everything_except('firebird', 'maxdb', + @testing.fails_on_everything_except('firebird', 'sqlite', '+pyodbc', - '+mxodbc', '+zxjdbc', 'mysql+oursql', - 'informix+informixdb') + '+mxodbc', '+zxjdbc', 'mysql+oursql') def test_raw_qmark(self): def go(conn): conn.execute('insert into users (user_id, user_name) ' @@ -182,7 +181,7 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() - @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle', 'informix+informixdb') + @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle') def test_raw_named(self): def go(conn): conn.execute('insert into users (user_id, user_name) ' @@ -204,19 +203,36 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() + @testing.engines.close_open_connections def test_exception_wrapping_dbapi(self): - def go(conn): + conn = testing.db.connect() + for _c in testing.db, conn: assert_raises_message( tsa.exc.DBAPIError, r"not_a_valid_statement", - conn.execute, 'not_a_valid_statement' + _c.execute, 'not_a_valid_statement' ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + + @testing.requires.sqlite + def test_exception_wrapping_non_dbapi_error(self): + e = create_engine('sqlite://') + e.dialect.is_disconnect = is_disconnect = Mock() + + with e.connect() as c: + c.connection.cursor = Mock( + return_value=Mock( + execute=Mock( + side_effect=TypeError("I'm not a DBAPI error") + )) + ) + + assert_raises_message( + TypeError, + "I'm not a DBAPI error", + c.execute, "select " + ) + eq_(is_disconnect.call_count, 0) + def test_exception_wrapping_non_dbapi_statement(self): class MyType(TypeDecorator): @@ -227,7 +243,7 @@ class ExecuteTest(fixtures.TestBase): def _go(conn): assert_raises_message( tsa.exc.StatementError, - r"nope \(original cause: Exception: nope\) 'SELECT 1 ", + r"nope \(original cause: Exception: nope\) u?'SELECT 1 ", conn.execute, select([1]).\ where( @@ -241,6 +257,25 @@ class ExecuteTest(fixtures.TestBase): finally: conn.close() + def test_stmt_exception_non_ascii(self): + name = util.u('méil') + with testing.db.connect() as conn: + assert_raises_message( + tsa.exc.StatementError, + util.u( + "A value is required for bind parameter 'uname'" + r'.*SELECT users.user_name AS .m\\xe9il.') if util.py2k + else + util.u( + "A value is required for bind parameter 'uname'" + '.*SELECT users.user_name AS .méil.') + , + conn.execute, + select([users.c.user_name.label(name)]).where( + users.c.user_name == bindparam("uname")), + {'uname_incorrect': 'foo'} + ) + def test_stmt_exception_pickleable_no_dbapi(self): self._test_stmt_exception_pickleable(Exception("hello world")) @@ -326,17 +361,17 @@ class ExecuteTest(fixtures.TestBase): def test_engine_level_options(self): eng = engines.testing_engine(options={'execution_options': {'foo': 'bar'}}) - conn = eng.contextual_connect() - eq_(conn._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['foo' - ], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['bat' - ], 'hoho') - eq_(conn.execution_options(foo='hoho')._execution_options['foo' - ], 'hoho') - eng.update_execution_options(foo='hoho') - conn = eng.contextual_connect() - eq_(conn._execution_options['foo'], 'hoho') + with eng.contextual_connect() as conn: + eq_(conn._execution_options['foo'], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['foo' + ], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['bat' + ], 'hoho') + eq_(conn.execution_options(foo='hoho')._execution_options['foo' + ], 'hoho') + eng.update_execution_options(foo='hoho') + conn = eng.contextual_connect() + eq_(conn._execution_options['foo'], 'hoho') @testing.requires.ad_hoc_engines def test_generative_engine_execution_options(self): @@ -383,8 +418,8 @@ class ExecuteTest(fixtures.TestBase): event.listen(eng, "before_execute", l2) event.listen(eng1, "before_execute", l3) - eng.execute(select([1])) - eng1.execute(select([1])) + eng.execute(select([1])).close() + eng1.execute(select([1])).close() eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) @@ -892,45 +927,44 @@ class ResultProxyTest(fixtures.TestBase): def test_no_rowcount_on_selects_inserts(self): """assert that rowcount is only called on deletes and updates. - This because cursor.rowcount can be expensive on some dialects - such as Firebird. + This because cursor.rowcount may can be expensive on some dialects + such as Firebird, however many dialects require it be called + before the cursor is closed. """ metadata = self.metadata engine = engines.testing_engine() - metadata.bind = engine t = Table('t1', metadata, Column('data', String(10)) ) - metadata.create_all() + metadata.create_all(engine) - class BreakRowcountMixin(object): - @property - def rowcount(self): - assert False + with patch.object(engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: + mock_rowcount.__get__ = Mock() + engine.execute(t.insert(), + {'data': 'd1'}, + {'data': 'd2'}, + {'data': 'd3'}) - execution_ctx_cls = engine.dialect.execution_ctx_cls - engine.dialect.execution_ctx_cls = type("FakeCtx", - (BreakRowcountMixin, - execution_ctx_cls), - {}) + eq_(len(mock_rowcount.__get__.mock_calls), 0) - try: - r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, - {'data': 'd3'}) - eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ), - ('d3', )]) - assert_raises(AssertionError, t.update().execute, {'data' - : 'd4'}) - assert_raises(AssertionError, t.delete().execute) - finally: - engine.dialect.execution_ctx_cls = execution_ctx_cls + eq_( + engine.execute(t.select()).fetchall(), + [('d1', ), ('d2', ), ('d3', )] + ) + eq_(len(mock_rowcount.__get__.mock_calls), 0) + + engine.execute(t.update(), {'data': 'd4'}) + + eq_(len(mock_rowcount.__get__.mock_calls), 1) + + engine.execute(t.delete()) + eq_(len(mock_rowcount.__get__.mock_calls), 2) - @testing.requires.python26 def test_rowproxy_is_sequence(self): import collections from sqlalchemy.engine import RowProxy @@ -1016,7 +1050,7 @@ class ResultProxyTest(fixtures.TestBase): class ExecutionOptionsTest(fixtures.TestBase): def test_dialect_conn_options(self): - engine = testing_engine("sqlite://") + engine = testing_engine("sqlite://", options=dict(_initialize=False)) engine.dialect = Mock() conn = engine.connect() c2 = conn.execution_options(foo="bar") diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py index 106bd0782..391b92144 100644 --- a/test/engine/test_parseconnect.py +++ b/test/engine/test_parseconnect.py @@ -1,13 +1,12 @@ -from sqlalchemy.testing import assert_raises, eq_ +from sqlalchemy.testing import assert_raises, eq_, assert_raises_message from sqlalchemy.util.compat import configparser, StringIO import sqlalchemy.engine.url as url from sqlalchemy import create_engine, engine_from_config, exc, pool -from sqlalchemy.engine.util import _coerce_config from sqlalchemy.engine.default import DefaultDialect import sqlalchemy as tsa from sqlalchemy.testing import fixtures from sqlalchemy import testing -from sqlalchemy.testing.mock import Mock +from sqlalchemy.testing.mock import Mock, MagicMock, patch class ParseConnectTest(fixtures.TestBase): @@ -15,6 +14,7 @@ class ParseConnectTest(fixtures.TestBase): for text in ( 'dbtype://username:password@hostspec:110//usr/db_file.db', 'dbtype://username:password@hostspec/database', + 'dbtype+apitype://username:password@hostspec/database', 'dbtype://username:password@hostspec', 'dbtype://username:password@/database', 'dbtype://username@hostspec', @@ -22,25 +22,53 @@ class ParseConnectTest(fixtures.TestBase): 'dbtype://hostspec/database', 'dbtype://hostspec', 'dbtype://hostspec/?arg1=val1&arg2=val2', - 'dbtype:///database', + 'dbtype+apitype:///database', 'dbtype:///:memory:', 'dbtype:///foo/bar/im/a/file', 'dbtype:///E:/work/src/LEM/db/hello.db', 'dbtype:///E:/work/src/LEM/db/hello.db?foo=bar&hoho=lala', 'dbtype://', - 'dbtype://username:password@/db', - 'dbtype:////usr/local/mailman/lists/_xtest@example.com/memb' - 'ers.db', - 'dbtype://username:apples%2Foranges@hostspec/mydatabase', + 'dbtype://username:password@/database', + 'dbtype:////usr/local/_xtest@example.com/members.db', + 'dbtype://username:apples%2Foranges@hostspec/database', + 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]/database?foo=bar', + 'dbtype://username:password@[2001:da8:2004:1000:202:116:160:90]:80/database?foo=bar' ): u = url.make_url(text) - assert u.drivername == 'dbtype' - assert u.username == 'username' or u.username is None - assert u.password == 'password' or u.password \ - == 'apples/oranges' or u.password is None - assert u.host == 'hostspec' or u.host == '127.0.0.1' \ - or not u.host - assert str(u) == text + + assert u.drivername in ('dbtype', 'dbtype+apitype') + assert u.username in ('username', None) + assert u.password in ('password', 'apples/oranges', None) + assert u.host in ('hostspec', '127.0.0.1', + '2001:da8:2004:1000:202:116:160:90', '', None), u.host + assert u.database in ('database', + '/usr/local/_xtest@example.com/members.db', + '/usr/db_file.db', ':memory:', '', + 'foo/bar/im/a/file', + 'E:/work/src/LEM/db/hello.db', None), u.database + eq_(str(u), text) + + def test_rfc1738_password(self): + u = url.make_url("dbtype://user:pass word + other%3Awords@host/dbname") + eq_(u.password, "pass word + other:words") + eq_(str(u), "dbtype://user:pass word + other%3Awords@host/dbname") + + u = url.make_url('dbtype://username:apples%2Foranges@hostspec/database') + eq_(u.password, "apples/oranges") + eq_(str(u), 'dbtype://username:apples%2Foranges@hostspec/database') + + u = url.make_url('dbtype://username:apples%40oranges%40%40@hostspec/database') + eq_(u.password, "apples@oranges@@") + eq_(str(u), 'dbtype://username:apples%40oranges%40%40@hostspec/database') + + u = url.make_url('dbtype://username%40:@hostspec/database') + eq_(u.password, '') + eq_(u.username, "username@") + eq_(str(u), 'dbtype://username%40:@hostspec/database') + + u = url.make_url('dbtype://username:pass%2Fword@hostspec/database') + eq_(u.password, 'pass/word') + eq_(str(u), 'dbtype://username:pass%2Fword@hostspec/database') class DialectImportTest(fixtures.TestBase): def test_import_base_dialects(self): @@ -81,50 +109,6 @@ class CreateEngineTest(fixtures.TestBase): module=dbapi, _initialize=False) c = e.connect() - def test_coerce_config(self): - raw = r""" -[prefixed] -sqlalchemy.url=postgresql://scott:tiger@somehost/test?fooz=somevalue -sqlalchemy.convert_unicode=0 -sqlalchemy.echo=false -sqlalchemy.echo_pool=1 -sqlalchemy.max_overflow=2 -sqlalchemy.pool_recycle=50 -sqlalchemy.pool_size=2 -sqlalchemy.pool_threadlocal=1 -sqlalchemy.pool_timeout=10 -[plain] -url=postgresql://scott:tiger@somehost/test?fooz=somevalue -convert_unicode=0 -echo=0 -echo_pool=1 -max_overflow=2 -pool_recycle=50 -pool_size=2 -pool_threadlocal=1 -pool_timeout=10 -""" - ini = configparser.ConfigParser() - ini.readfp(StringIO(raw)) - - expected = { - 'url': 'postgresql://scott:tiger@somehost/test?fooz=somevalue', - 'convert_unicode': 0, - 'echo': False, - 'echo_pool': True, - 'max_overflow': 2, - 'pool_recycle': 50, - 'pool_size': 2, - 'pool_threadlocal': True, - 'pool_timeout': 10, - } - - prefixed = dict(ini.items('prefixed')) - self.assert_(_coerce_config(prefixed, 'sqlalchemy.') - == expected) - - plain = dict(ini.items('plain')) - self.assert_(_coerce_config(plain, '') == expected) def test_engine_from_config(self): dbapi = mock_dbapi @@ -141,19 +125,35 @@ pool_timeout=10 'z=somevalue') assert e.echo is True - for param, values in [ - ('convert_unicode', ('true', 'false', 'force')), - ('echo', ('true', 'false', 'debug')), - ('echo_pool', ('true', 'false', 'debug')), - ('use_native_unicode', ('true', 'false')), - ]: - for value in values: - config = { - 'sqlalchemy.url': 'postgresql://scott:tiger@somehost/test', - 'sqlalchemy.%s' % param : value - } - cfg = _coerce_config(config, 'sqlalchemy.') - assert cfg[param] == {'true':True, 'false':False}.get(value, value) + + def test_engine_from_config_custom(self): + from sqlalchemy import util + from sqlalchemy.dialects import registry + tokens = __name__.split(".") + + class MyDialect(MockDialect): + engine_config_types = { + "foobar": int, + "bathoho": util.bool_or_str('force') + } + + def __init__(self, foobar=None, bathoho=None, **kw): + self.foobar = foobar + self.bathoho = bathoho + + global dialect + dialect = MyDialect + registry.register("mockdialect.barb", + ".".join(tokens[0:-1]), tokens[-1]) + + config = { + "sqlalchemy.url": "mockdialect+barb://", + "sqlalchemy.foobar": "5", + "sqlalchemy.bathoho": "false" + } + e = engine_from_config(config, _initialize=False) + eq_(e.dialect.foobar, 5) + eq_(e.dialect.bathoho, False) def test_custom(self): @@ -227,17 +227,38 @@ pool_timeout=10 @testing.requires.sqlite def test_wraps_connect_in_dbapi(self): - # sqlite uses SingletonThreadPool which doesnt have max_overflow + e = create_engine('sqlite://') + sqlite3 = e.dialect.dbapi - assert_raises(TypeError, create_engine, 'sqlite://', - max_overflow=5, module=mock_sqlite_dbapi) - e = create_engine('sqlite://', connect_args={'use_unicode' - : True}, convert_unicode=True) + dbapi = MockDBAPI() + dbapi.Error = sqlite3.Error, + dbapi.ProgrammingError = sqlite3.ProgrammingError + dbapi.connect = Mock(side_effect=sqlite3.ProgrammingError("random error")) try: - e.connect() + create_engine('sqlite://', module=dbapi).connect() + assert False except tsa.exc.DBAPIError as de: assert not de.connection_invalidated + + @testing.requires.sqlite + def test_dont_touch_non_dbapi_exception_on_connect(self): + e = create_engine('sqlite://') + sqlite3 = e.dialect.dbapi + + dbapi = MockDBAPI() + dbapi.Error = sqlite3.Error, + dbapi.ProgrammingError = sqlite3.ProgrammingError + dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error")) + e = create_engine('sqlite://', module=dbapi) + e.dialect.is_disconnect = is_disconnect = Mock() + assert_raises_message( + TypeError, + "I'm not a DBAPI error", + e.connect + ) + eq_(is_disconnect.call_count, 0) + def test_ensure_dialect_does_is_disconnect_no_conn(self): """test that is_disconnect() doesn't choke if no connection, cursor given.""" dialect = testing.db.dialect @@ -277,6 +298,10 @@ pool_timeout=10 assert e.url.drivername == e2.url.drivername == 'mysql' assert e.url.username == e2.url.username == 'scott' assert e2.url is u + assert str(u) == 'mysql://scott:tiger@localhost/test' + assert repr(u) == 'mysql://scott:***@localhost/test' + assert repr(e) == 'Engine(mysql://scott:***@localhost/test)' + assert repr(e2) == 'Engine(mysql://scott:***@localhost/test)' def test_poolargs(self): """test that connection pool args make it thru""" @@ -363,7 +388,7 @@ def MockDBAPI(**assert_kwargs): ) return connection - return Mock( + return MagicMock( sqlite_version_info=(99, 9, 9,), version_info=(99, 9, 9,), sqlite_version='99.9.9', diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 05c0487f8..2e4c2dc48 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -10,6 +10,8 @@ from sqlalchemy.testing import fixtures from sqlalchemy.testing.mock import Mock, call +join_timeout = 10 + def MockDBAPI(): def cursor(): while True: @@ -306,6 +308,13 @@ class PoolEventsTest(PoolTestBase): return p, canary + def _invalidate_event_fixture(self): + p = self._queuepool_fixture() + canary = Mock() + event.listen(p, 'invalidate', canary) + + return p, canary + def test_first_connect_event(self): p, canary = self._first_connect_event_fixture() @@ -409,6 +418,31 @@ class PoolEventsTest(PoolTestBase): c1.close() eq_(canary, ['reset']) + def test_invalidate_event_no_exception(self): + p, canary = self._invalidate_event_fixture() + + c1 = p.connect() + c1.close() + assert not canary.called + c1 = p.connect() + dbapi_con = c1.connection + c1.invalidate() + assert canary.call_args_list[0][0][0] is dbapi_con + assert canary.call_args_list[0][0][2] is None + + def test_invalidate_event_exception(self): + p, canary = self._invalidate_event_fixture() + + c1 = p.connect() + c1.close() + assert not canary.called + c1 = p.connect() + dbapi_con = c1.connection + exc = Exception("hi") + c1.invalidate(exc) + assert canary.call_args_list[0][0][0] is dbapi_con + assert canary.call_args_list[0][0][2] is exc + def test_checkin_event_gc(self): p, canary = self._checkin_event_fixture() @@ -827,7 +861,7 @@ class QueuePoolTest(PoolTestBase): th.start() threads.append(th) for th in threads: - th.join() + th.join(join_timeout) assert len(timeouts) > 0 for t in timeouts: @@ -864,22 +898,109 @@ class QueuePoolTest(PoolTestBase): th.start() threads.append(th) for th in threads: - th.join() + th.join(join_timeout) self.assert_(max(peaks) <= max_overflow) lazy_gc() assert not pool._refs + + def test_overflow_reset_on_failed_connect(self): + dbapi = Mock() + + def failing_dbapi(): + time.sleep(2) + raise Exception("connection failed") + + creator = dbapi.connect + def create(): + return creator() + + p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3) + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() + eq_(p._overflow, 1) + creator = failing_dbapi + assert_raises(Exception, p.connect) + eq_(p._overflow, 1) + + @testing.requires.threading_with_mock + def test_hanging_connect_within_overflow(self): + """test that a single connect() call which is hanging + does not block other connections from proceeding.""" + + dbapi = Mock() + mutex = threading.Lock() + + def hanging_dbapi(): + time.sleep(2) + with mutex: + return dbapi.connect() + + def fast_dbapi(): + with mutex: + return dbapi.connect() + + creator = threading.local() + + def create(): + return creator.mock_connector() + + def run_test(name, pool, should_hang): + if should_hang: + creator.mock_connector = hanging_dbapi + else: + creator.mock_connector = fast_dbapi + + conn = pool.connect() + conn.operation(name) + time.sleep(1) + conn.close() + + p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3) + + threads = [ + threading.Thread( + target=run_test, args=("success_one", p, False)), + threading.Thread( + target=run_test, args=("success_two", p, False)), + threading.Thread( + target=run_test, args=("overflow_one", p, True)), + threading.Thread( + target=run_test, args=("overflow_two", p, False)), + threading.Thread( + target=run_test, args=("overflow_three", p, False)) + ] + for t in threads: + t.start() + time.sleep(.2) + + for t in threads: + t.join(timeout=join_timeout) + eq_( + dbapi.connect().operation.mock_calls, + [call("success_one"), call("success_two"), + call("overflow_two"), call("overflow_three"), + call("overflow_one")] + ) + + @testing.requires.threading_with_mock def test_waiters_handled(self): """test that threads waiting for connections are handled when the pool is replaced. """ + mutex = threading.Lock() dbapi = MockDBAPI() def creator(): - return dbapi.connect() + mutex.acquire() + try: + return dbapi.connect() + finally: + mutex.release() success = [] for timeout in (None, 30): @@ -897,21 +1018,27 @@ class QueuePoolTest(PoolTestBase): c1 = p.connect() c2 = p.connect() + threads = [] for i in range(2): t = threading.Thread(target=waiter, args=(p, timeout, max_overflow)) - t.setDaemon(True) # so the tests dont hang if this fails + t.daemon = True t.start() + threads.append(t) - c1.invalidate() - c2.invalidate() - p2 = p._replace() + # this sleep makes sure that the + # two waiter threads hit upon wait() + # inside the queue, before we invalidate the other + # two conns time.sleep(.2) + p2 = p._replace() + + for t in threads: + t.join(join_timeout) eq_(len(success), 12, "successes: %s" % success) @testing.requires.threading_with_mock - @testing.requires.python26 def test_notify_waiters(self): dbapi = MockDBAPI() canary = [] @@ -924,9 +1051,7 @@ class QueuePoolTest(PoolTestBase): p1 = pool.QueuePool(creator=creator1, pool_size=1, timeout=None, max_overflow=0) - p2 = pool.QueuePool(creator=creator2, - pool_size=1, timeout=None, - max_overflow=-1) + p2 = pool.NullPool(creator=creator2) def waiter(p): conn = p.connect() time.sleep(.5) @@ -934,14 +1059,18 @@ class QueuePoolTest(PoolTestBase): c1 = p1.connect() + threads = [] for i in range(5): t = threading.Thread(target=waiter, args=(p1, )) - t.setDaemon(True) t.start() + threads.append(t) time.sleep(.5) eq_(canary, [1]) p1._pool.abort(p2) - time.sleep(1) + + for t in threads: + t.join(join_timeout) + eq_(canary, [1, 2, 2, 2, 2, 2]) def test_dispose_closes_pooled(self): @@ -987,6 +1116,7 @@ class QueuePoolTest(PoolTestBase): self._test_overflow(40, 5) def test_mixed_close(self): + pool._refs.clear() p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() @@ -1198,6 +1328,96 @@ class QueuePoolTest(PoolTestBase): c2 = p.connect() assert c2.connection is not None +class ResetOnReturnTest(PoolTestBase): + def _fixture(self, **kw): + dbapi = Mock() + return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw) + + def test_plain_rollback(self): + dbapi, p = self._fixture(reset_on_return='rollback') + + c1 = p.connect() + c1.close() + assert dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_plain_commit(self): + dbapi, p = self._fixture(reset_on_return='commit') + + c1 = p.connect() + c1.close() + assert not dbapi.connect().rollback.called + assert dbapi.connect().commit.called + + def test_plain_none(self): + dbapi, p = self._fixture(reset_on_return=None) + + c1 = p.connect() + c1.close() + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_agent_rollback(self): + dbapi, p = self._fixture(reset_on_return='rollback') + + class Agent(object): + def __init__(self, conn): + self.conn = conn + + def rollback(self): + self.conn.special_rollback() + + def commit(self): + self.conn.special_commit() + + c1 = p.connect() + c1._reset_agent = Agent(c1) + c1.close() + + assert dbapi.connect().special_rollback.called + assert not dbapi.connect().special_commit.called + + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + c1 = p.connect() + c1.close() + eq_(dbapi.connect().special_rollback.call_count, 1) + eq_(dbapi.connect().special_commit.call_count, 0) + + assert dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_agent_commit(self): + dbapi, p = self._fixture(reset_on_return='commit') + + class Agent(object): + def __init__(self, conn): + self.conn = conn + + def rollback(self): + self.conn.special_rollback() + + def commit(self): + self.conn.special_commit() + + c1 = p.connect() + c1._reset_agent = Agent(c1) + c1.close() + assert not dbapi.connect().special_rollback.called + assert dbapi.connect().special_commit.called + + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + c1 = p.connect() + c1.close() + + eq_(dbapi.connect().special_rollback.call_count, 0) + eq_(dbapi.connect().special_commit.call_count, 1) + assert not dbapi.connect().rollback.called + assert dbapi.connect().commit.called + class SingletonThreadPoolTest(PoolTestBase): @testing.requires.threading_with_mock @@ -1245,7 +1465,7 @@ class SingletonThreadPoolTest(PoolTestBase): th.start() threads.append(th) for th in threads: - th.join() + th.join(join_timeout) assert len(p._all_conns) == 3 if strong_refs: diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 0a964cf63..ba336a1bf 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -413,8 +413,6 @@ class RealReconnectTest(fixtures.TestBase): def teardown(self): self.engine.dispose() - @testing.fails_on('+informixdb', - "Wrong error thrown, fix in informixdb?") def test_reconnect(self): conn = self.engine.connect() @@ -539,8 +537,6 @@ class RealReconnectTest(fixtures.TestBase): # pool was recreated assert engine.pool is not p1 - @testing.fails_on('+informixdb', - "Wrong error thrown, fix in informixdb?") def test_null_pool(self): engine = \ engines.reconnecting_engine(options=dict(poolclass=pool.NullPool)) @@ -554,8 +550,6 @@ class RealReconnectTest(fixtures.TestBase): eq_(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated - @testing.fails_on('+informixdb', - "Wrong error thrown, fix in informixdb?") def test_close(self): conn = self.engine.connect() eq_(conn.execute(select([1])).scalar(), 1) @@ -569,8 +563,6 @@ class RealReconnectTest(fixtures.TestBase): conn = self.engine.connect() eq_(conn.execute(select([1])).scalar(), 1) - @testing.fails_on('+informixdb', - "Wrong error thrown, fix in informixdb?") def test_with_transaction(self): conn = self.engine.connect() trans = conn.begin() @@ -651,8 +643,6 @@ class InvalidateDuringResultTest(fixtures.TestBase): '+cymysql', '+pymysql', '+pg8000' ], "Buffers the result set and doesn't check for " "connection close") - @testing.fails_on('+informixdb', - "Wrong error thrown, fix in informixdb?") def test_invalidate_on_results(self): conn = self.engine.connect() result = conn.execute('select * from sometable') diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index 52cbc15e6..2f311f7e7 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -361,6 +361,27 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): self.assert_(isinstance(table.c.col4.type, sa.String)) @testing.provide_metadata + def test_override_upgrade_pk_flag(self): + meta = self.metadata + table = Table( + 'override_test', meta, + Column('col1', sa.Integer), + Column('col2', sa.String(20)), + Column('col3', sa.Numeric) + ) + table.create() + + meta2 = MetaData(testing.db) + table = Table( + 'override_test', meta2, + Column('col1', sa.Integer, primary_key=True), + autoload=True) + + eq_(list(table.primary_key), [table.c.col1]) + eq_(table.c.col1.primary_key, True) + + + @testing.provide_metadata def test_override_pkfk(self): """test that you can override columns which contain foreign keys to other reflected tables, where the foreign key column is also @@ -602,6 +623,55 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): is a2.c.user_id assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + @testing.only_on(['postgresql', 'mysql']) + @testing.provide_metadata + def test_fk_options(self): + """test that foreign key reflection includes options (on + backends with {dialect}.get_foreign_keys() support)""" + + if testing.against('postgresql'): + test_attrs = ('match', 'onupdate', 'ondelete', 'deferrable', 'initially') + addresses_user_id_fkey = sa.ForeignKey( + # Each option is specifically not a Postgres default, or + # it won't be returned by PG's inspection + 'users.id', + name = 'addresses_user_id_fkey', + match='FULL', + onupdate='RESTRICT', + ondelete='RESTRICT', + deferrable=True, + initially='DEFERRED' + ) + elif testing.against('mysql'): + # MATCH, DEFERRABLE, and INITIALLY cannot be defined for MySQL + # ON UPDATE and ON DELETE have defaults of RESTRICT, which are + # elided by MySQL's inspection + addresses_user_id_fkey = sa.ForeignKey( + 'users.id', + name = 'addresses_user_id_fkey', + onupdate='CASCADE', + ondelete='CASCADE' + ) + test_attrs = ('onupdate', 'ondelete') + + meta = self.metadata + Table('users', meta, + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30)), + test_needs_fk=True) + Table('addresses', meta, + Column('id', sa.Integer, primary_key=True), + Column('user_id', sa.Integer, addresses_user_id_fkey), + test_needs_fk=True) + meta.create_all() + + meta2 = MetaData() + meta2.reflect(testing.db) + for fk in meta2.tables['addresses'].foreign_keys: + ref = addresses_user_id_fkey + for attr in test_attrs: + eq_(getattr(fk, attr), getattr(ref, attr)) + def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique indexes""" @@ -705,10 +775,6 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): @testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on') - @testing.fails_on('+informixdb', - "FIXME: should be supported via the " - "DELIMITED env var but that breaks " - "everything else for now") @testing.provide_metadata def test_reserved(self): @@ -725,7 +791,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): # There's currently no way to calculate identifier case # normalization in isolation, so... - if testing.against('firebird', 'oracle', 'maxdb'): + if testing.against('firebird', 'oracle'): check_col = 'TRUE' else: check_col = 'true' @@ -778,6 +844,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): def test_reflect_uses_bind_engine_reflect(self): self._test_reflect_uses_bind(lambda e: MetaData().reflect(e)) + @testing.provide_metadata def test_reflect_all(self): existing = testing.db.table_names() @@ -833,6 +900,18 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): m8.reflect ) + m8_e1 = MetaData(testing.db) + rt_c = Table('rt_c', m8_e1) + m8_e1.reflect(extend_existing=True) + eq_(set(m8_e1.tables.keys()), set(names)) + eq_(rt_c.c.keys(), ['id']) + + m8_e2 = MetaData(testing.db) + rt_c = Table('rt_c', m8_e2) + m8_e2.reflect(extend_existing=True, only=['rt_a', 'rt_c']) + eq_(set(m8_e2.tables.keys()), set(['rt_a', 'rt_c'])) + eq_(rt_c.c.keys(), ['id']) + if existing: print("Other tables present in database, skipping some checks.") else: @@ -1423,6 +1502,7 @@ class CaseSensitiveTest(fixtures.TablesTest): class ColumnEventsTest(fixtures.TestBase): + @classmethod def setup_class(cls): cls.metadata = MetaData() @@ -1430,7 +1510,16 @@ class ColumnEventsTest(fixtures.TestBase): 'to_reflect', cls.metadata, Column('x', sa.Integer, primary_key=True), + Column('y', sa.Integer), + test_needs_fk=True ) + cls.related = Table( + 'related', + cls.metadata, + Column('q', sa.Integer, sa.ForeignKey('to_reflect.x')), + test_needs_fk=True + ) + sa.Index("some_index", cls.to_reflect.c.y) cls.metadata.create_all(testing.db) @classmethod @@ -1440,7 +1529,7 @@ class ColumnEventsTest(fixtures.TestBase): def teardown(self): events.SchemaEventTarget.dispatch._clear() - def _do_test(self, col, update, assert_): + def _do_test(self, col, update, assert_, tablename="to_reflect"): # load the actual Table class, not the test # wrapper from sqlalchemy.schema import Table @@ -1450,22 +1539,54 @@ class ColumnEventsTest(fixtures.TestBase): if column_info['name'] == col: column_info.update(update) - t = Table('to_reflect', m, autoload=True, listeners=[ + t = Table(tablename, m, autoload=True, listeners=[ ('column_reflect', column_reflect), ]) assert_(t) m = MetaData(testing.db) event.listen(Table, 'column_reflect', column_reflect) - t2 = Table('to_reflect', m, autoload=True) + t2 = Table(tablename, m, autoload=True) assert_(t2) def test_override_key(self): + def assertions(table): + eq_(table.c.YXZ.name, "x") + eq_(set(table.primary_key), set([table.c.YXZ])) + self._do_test( "x", {"key": "YXZ"}, - lambda table: eq_(table.c.YXZ.name, "x") + assertions ) + def test_override_index(self): + def assertions(table): + idx = list(table.indexes)[0] + eq_(idx.columns, [table.c.YXZ]) + + self._do_test( + "y", {"key": "YXZ"}, + assertions + ) + + def test_override_key_fk(self): + m = MetaData(testing.db) + def column_reflect(insp, table, column_info): + + if column_info['name'] == 'q': + column_info['key'] = 'qyz' + elif column_info['name'] == 'x': + column_info['key'] = 'xyz' + + to_reflect = Table("to_reflect", m, autoload=True, listeners=[ + ('column_reflect', column_reflect), + ]) + related = Table("related", m, autoload=True, listeners=[ + ('column_reflect', column_reflect), + ]) + + assert related.c.qyz.references(to_reflect.c.xyz) + def test_override_type(self): def assert_(table): assert isinstance(table.c.x.type, sa.String) diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index ffc12b5b9..c373133d1 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -3,6 +3,7 @@ from sqlalchemy.testing import eq_, assert_raises, \ import sys import time import threading +from sqlalchemy import event from sqlalchemy.testing.engines import testing_engine from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \ select, Integer, String, func, text, exc @@ -29,7 +30,6 @@ class TransactionTest(fixtures.TestBase): testing.db.execute(users.delete()).close() @classmethod - @testing.crashes('mysql+cymysql', 'deadlock') def teardown_class(cls): users.drop(testing.db) @@ -342,7 +342,8 @@ class TransactionTest(fixtures.TestBase): transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() - connection.close() + connection.invalidate() + connection2 = testing.db.connect() eq_(connection2.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), @@ -379,6 +380,138 @@ class TransactionTest(fixtures.TestBase): eq_(result.fetchall(), [('user1', ), ('user4', )]) conn.close() + @testing.requires.two_phase_transactions + def test_reset_rollback_two_phase_no_rollback(self): + # test [ticket:2907], essentially that the + # TwoPhaseTransaction is given the job of "reset on return" + # so that picky backends like MySQL correctly clear out + # their state when a connection is closed without handling + # the transaction explicitly. + + eng = testing_engine() + + # MySQL raises if you call straight rollback() on + # a connection with an XID present + @event.listens_for(eng, "invalidate") + def conn_invalidated(dbapi_con, con_record, exception): + dbapi_con.close() + raise exception + + with eng.connect() as conn: + rec = conn.connection._connection_record + raw_dbapi_con = rec.connection + xa = conn.begin_twophase() + conn.execute(users.insert(), user_id=1, user_name='user1') + + assert rec.connection is raw_dbapi_con + + with eng.connect() as conn: + result = \ + conn.execute(select([users.c.user_name]). + order_by(users.c.user_id)) + eq_(result.fetchall(), []) + +class ResetAgentTest(fixtures.TestBase): + def test_begin_close(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + assert not trans.is_active + + def test_begin_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.savepoints + def test_begin_nested_close(self): + with testing.db.connect() as connection: + trans = connection.begin_nested() + assert connection.connection._reset_agent is trans + assert not trans.is_active + + @testing.requires.savepoints + def test_begin_begin_nested_close(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + assert trans2.is_active # was never closed + assert not trans.is_active + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_begin_rollback_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is None + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_begin_commit_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin() + assert connection.connection._reset_agent is trans + trans2.commit() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_commit(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + class AutoRollbackTest(fixtures.TestBase): @classmethod @@ -504,7 +637,7 @@ class ExplicitAutoCommitTest(fixtures.TestBase): conn2.close() @testing.uses_deprecated(r'autocommit on select\(\) is deprecated', - r'autocommit\(\) is deprecated') + r'``autocommit\(\)`` is deprecated') def test_explicit_compiled_deprecated(self): conn1 = testing.db.connect() conn2 = testing.db.connect() @@ -1036,7 +1169,6 @@ class ForUpdateTest(fixtures.TestBase): @testing.crashes('mssql', 'FIXME: unknown') @testing.crashes('firebird', 'FIXME: unknown') @testing.crashes('sybase', 'FIXME: unknown') - @testing.crashes('access', 'FIXME: unknown') @testing.requires.independent_connections def test_queued_update(self): """Test SELECT FOR UPDATE with concurrent modifications. @@ -1101,7 +1233,6 @@ class ForUpdateTest(fixtures.TestBase): @testing.crashes('mssql', 'FIXME: unknown') @testing.crashes('firebird', 'FIXME: unknown') @testing.crashes('sybase', 'FIXME: unknown') - @testing.crashes('access', 'FIXME: unknown') @testing.requires.independent_connections def test_queued_select(self): """Simple SELECT FOR UPDATE conflict test""" @@ -1113,7 +1244,6 @@ class ForUpdateTest(fixtures.TestBase): @testing.fails_on('mysql', 'No support for NOWAIT') @testing.crashes('firebird', 'FIXME: unknown') @testing.crashes('sybase', 'FIXME: unknown') - @testing.crashes('access', 'FIXME: unknown') @testing.requires.independent_connections def test_nowait_select(self): """Simple SELECT FOR UPDATE NOWAIT conflict test""" |
