diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 112 |
1 files changed, 56 insertions, 56 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 90d2bc578..9379e207f 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -29,7 +29,7 @@ class ExecuteTest(TestBase): @engines.close_first def teardown(self): testing.db.connect().execute(users.delete()) - + @classmethod def teardown_class(cls): metadata.drop_all() @@ -152,7 +152,7 @@ class ExecuteTest(TestBase): eng.update_execution_options(foo='hoho') conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'hoho') - + class CompiledCacheTest(TestBase): @classmethod @@ -169,23 +169,23 @@ class CompiledCacheTest(TestBase): @engines.close_first def teardown(self): testing.db.connect().execute(users.delete()) - + @classmethod def teardown_class(cls): metadata.drop_all() - + def test_cache(self): conn = testing.db.connect() cache = {} cached_conn = conn.execution_options(compiled_cache=cache) - + ins = users.insert() cached_conn.execute(ins, {'user_name':'u1'}) cached_conn.execute(ins, {'user_name':'u2'}) cached_conn.execute(ins, {'user_name':'u3'}) assert len(cache) == 1 eq_(conn.execute("select count(*) from users").scalar(), 3) - + class LoggingNameTest(TestBase): def _assert_names_in_execute(self, eng, eng_name, pool_name): eng.execute(select([1])) @@ -203,7 +203,7 @@ class LoggingNameTest(TestBase): 'sqlalchemy.engine.base.Engine', 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__ ) - + def _named_engine(self, **kw): options = { 'logging_name':'myenginename', @@ -229,24 +229,24 @@ class LoggingNameTest(TestBase): logging.getLogger('sqlalchemy.pool') ]: log.removeHandler(self.buf) - + def test_named_logger_names(self): eng = self._named_engine() eq_(eng.logging_name, "myenginename") eq_(eng.pool.logging_name, "mypoolname") - + def test_named_logger_names_after_dispose(self): eng = self._named_engine() eng.execute(select([1])) eng.dispose() eq_(eng.logging_name, "myenginename") eq_(eng.pool.logging_name, "mypoolname") - + def test_unnamed_logger_names(self): eng = self._unnamed_engine() eq_(eng.logging_name, None) eq_(eng.pool.logging_name, None) - + def test_named_logger_execute(self): eng = self._named_engine() self._assert_names_in_execute(eng, "myenginename", "mypoolname") @@ -270,27 +270,27 @@ class LoggingNameTest(TestBase): self._assert_no_name_in_execute(eng) class EchoTest(TestBase): - + def setup(self): self.level = logging.getLogger('sqlalchemy.engine').level logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN) self.buf = logging.handlers.BufferingHandler(100) logging.getLogger('sqlalchemy.engine').addHandler(self.buf) - + def teardown(self): logging.getLogger('sqlalchemy.engine').removeHandler(self.buf) logging.getLogger('sqlalchemy.engine').setLevel(self.level) - + def testing_engine(self): e = engines.testing_engine() - + # do an initial execute to clear out 'first connect' # messages e.execute(select([10])) self.buf.flush() - + return e - + def test_levels(self): e1 = engines.testing_engine() @@ -310,40 +310,40 @@ class EchoTest(TestBase): eq_(e1._should_log_debug(), True) eq_(e1.logger.isEnabledFor(logging.DEBUG), True) eq_(e1.logger.getEffectiveLevel(), logging.DEBUG) - + e1.echo = False eq_(e1._should_log_info(), False) eq_(e1._should_log_debug(), False) eq_(e1.logger.isEnabledFor(logging.INFO), False) eq_(e1.logger.getEffectiveLevel(), logging.WARN) - + def test_echo_flag_independence(self): """test the echo flag's independence to a specific engine.""" e1 = self.testing_engine() e2 = self.testing_engine() - + e1.echo = True e1.execute(select([1])) e2.execute(select([2])) - + e1.echo = False e1.execute(select([3])) e2.execute(select([4])) - + e2.echo = True e1.execute(select([5])) e2.execute(select([6])) - + assert self.buf.buffer[0].getMessage().startswith("SELECT 1") assert self.buf.buffer[2].getMessage().startswith("SELECT 6") assert len(self.buf.buffer) == 4 - + class ResultProxyTest(TestBase): def test_nontuple_row(self): """ensure the C version of BaseRowProxy handles duck-type-dependent rows.""" - + from sqlalchemy.engine import RowProxy class MyList(object): @@ -373,7 +373,7 @@ class ResultProxyTest(TestBase): engine = engines.testing_engine() metadata.bind = engine - + t = Table('t1', metadata, Column('data', String(10)) ) @@ -383,7 +383,7 @@ class ResultProxyTest(TestBase): @property def rowcount(self): assert False - + execution_ctx_cls = engine.dialect.execution_ctx_cls engine.dialect.execution_ctx_cls = type("FakeCtx", (BreakRowcountMixin, @@ -409,13 +409,13 @@ class ResultProxyTest(TestBase): row = RowProxy(object(), ['value'], [None], {'key' : (None, 0), 0: (None, 0)}) assert isinstance(row, collections.Sequence) - + @testing.requires.cextensions def test_row_c_sequence_check(self): import csv import collections from StringIO import StringIO - + metadata = MetaData() metadata.bind = 'sqlite://' users = Table('users', metadata, @@ -470,7 +470,7 @@ class EngineEventsTest(TestBase): ]: event.listen(engine, 'before_execute', execute) event.listen(engine, 'before_cursor_execute', cursor_execute) - + m = MetaData(engine) t1 = Table('t1', m, Column('c1', Integer, primary_key=True), @@ -523,10 +523,10 @@ class EngineEventsTest(TestBase): canary = [] def execute(conn, *args, **kw): canary.append('execute') - + def cursor_execute(conn, *args, **kw): canary.append('cursor_execute') - + engine = engines.testing_engine() event.listen(engine, 'before_execute', execute) event.listen(engine, 'before_cursor_execute', cursor_execute) @@ -548,42 +548,42 @@ class EngineEventsTest(TestBase): def execute(conn, clauseelement, multiparams, params): canary.append('execute') return clauseelement, multiparams, params - + def cursor_execute(conn, cursor, statement, parameters, context, executemany): canary.append('cursor_execute') return statement, parameters - + engine = engines.testing_engine() - + assert_raises( tsa.exc.ArgumentError, event.listen, engine, "begin", tracker("begin"), retval=True ) - + event.listen(engine, "before_execute", execute, retval=True) event.listen(engine, "before_cursor_execute", cursor_execute, retval=True) engine.execute(select([1])) eq_( canary, ['execute', 'cursor_execute'] ) - - - + + + def test_transactional(self): canary = [] def tracker(name): def go(conn, *args, **kw): canary.append(name) return go - + engine = engines.testing_engine() event.listen(engine, 'before_execute', tracker('execute')) event.listen(engine, 'before_cursor_execute', tracker('cursor_execute')) event.listen(engine, 'begin', tracker('begin')) event.listen(engine, 'commit', tracker('commit')) event.listen(engine, 'rollback', tracker('rollback')) - + conn = engine.connect() trans = conn.begin() conn.execute(select([1])) @@ -605,7 +605,7 @@ class EngineEventsTest(TestBase): def go(*args, **kw): canary.append(name) return go - + engine = engines.testing_engine() for name in ['begin', 'savepoint', 'rollback_savepoint', 'release_savepoint', @@ -634,21 +634,21 @@ class EngineEventsTest(TestBase): 'rollback', 'begin_twophase', 'prepare_twophase', 'commit_twophase'] ) - - + + class ProxyConnectionTest(TestBase): """These are the same tests as EngineEventsTest, except using the deprecated ConnectionProxy interface. - + """ - + @testing.uses_deprecated(r'.*Use event.listen') @testing.fails_on('firebird', 'Data type unknown') def test_proxy(self): - + stmts = [] cursor_stmts = [] - + class MyProxy(ConnectionProxy): def execute( self, @@ -672,7 +672,7 @@ class ProxyConnectionTest(TestBase): ): cursor_stmts.append((str(statement), parameters, None)) return execute(cursor, statement, parameters, context) - + def assert_stmts(expected, received): for stmt, params, posn in expected: if not received: @@ -739,7 +739,7 @@ class ProxyConnectionTest(TestBase): # be incorrect assert_stmts(compiled, stmts) assert_stmts(cursor, cursor_stmts) - + @testing.uses_deprecated(r'.*Use event.listen') def test_options(self): canary = [] @@ -758,8 +758,8 @@ class ProxyConnectionTest(TestBase): c3 = c2.execution_options(bar='bat') eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'}) eq_(canary, ['execute', 'cursor_execute']) - - + + @testing.uses_deprecated(r'.*Use event.listen') def test_transactional(self): canary = [] @@ -779,12 +779,12 @@ class ProxyConnectionTest(TestBase): trans = conn.begin() conn.execute(select([1])) trans.commit() - + eq_(canary, [ 'begin', 'execute', 'cursor_execute', 'rollback', 'begin', 'execute', 'cursor_execute', 'commit', ]) - + @testing.uses_deprecated(r'.*Use event.listen') @testing.requires.savepoints @testing.requires.two_phase_transactions @@ -800,7 +800,7 @@ class ProxyConnectionTest(TestBase): engine = engines.testing_engine(options={'proxy':TrackProxy()}) conn = engine.connect() - + trans = conn.begin() trans2 = conn.begin_nested() conn.execute(select([1])) @@ -809,7 +809,7 @@ class ProxyConnectionTest(TestBase): conn.execute(select([1])) trans2.commit() trans.rollback() - + trans = conn.begin_twophase() conn.execute(select([1])) trans.prepare() |