summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
commitd3d10c982c8a44c85a0114c491207297eac7611d (patch)
treebdfda394fb23cc8d65c0acb77ca070937d93580a
parent38c5e870a7883df0ae104df828217e326f6cff6a (diff)
downloadsqlalchemy-d3d10c982c8a44c85a0114c491207297eac7611d.tar.gz
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is clear;
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks is also a "fairy", so that dictionaries like "info" are available. [ticket:2770] - rework the execution_options system so that the dialect is given the job of making any immediate adjustments based on a set event. move the "isolation level" logic to use this new system. Also work things out so that even engine-level execution options can be used for things like isolation level; the dialect attaches a connect-event handler in this case to handle the task. - to support this new system as well as further extensibiltiy of execution options add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()
-rw-r--r--doc/build/changelog/changelog_09.rst10
-rw-r--r--lib/sqlalchemy/engine/base.py28
-rw-r--r--lib/sqlalchemy/engine/default.py19
-rw-r--r--lib/sqlalchemy/engine/strategies.py9
-rw-r--r--lib/sqlalchemy/events.py101
-rw-r--r--lib/sqlalchemy/pool.py198
-rw-r--r--test/engine/test_execute.py140
-rw-r--r--test/engine/test_pool.py7
-rw-r--r--test/engine/test_transaction.py22
-rw-r--r--test/profiles.txt78
10 files changed, 419 insertions, 193 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst
index 65e05d89b..989dc433d 100644
--- a/doc/build/changelog/changelog_09.rst
+++ b/doc/build/changelog/changelog_09.rst
@@ -7,6 +7,16 @@
:version: 0.9.0
.. change::
+ :tags: feature, engine
+ :tickets: 2770
+
+ New events added to :class:`.ConnectionEvents`:
+
+ * :meth:`.ConnectionEvents.engine_connect`
+ * :meth:`.ConnectionEvents.set_connection_execution_options`
+ * :meth:`.ConnectionEvents.set_engine_execution_options`
+
+ .. change::
:tags: feature, firebird
:tickets: 2763
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 2d9f3af94..f69bd3d4b 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -46,7 +46,7 @@ class Connection(Connectable):
def __init__(self, engine, connection=None, close_with_result=False,
_branch=False, _execution_options=None,
_dispatch=None,
- _has_events=False):
+ _has_events=None):
"""Construct a new Connection.
The constructor here is not public and is only called only by an
@@ -67,7 +67,8 @@ class Connection(Connectable):
self.dispatch = _dispatch
elif engine._has_events:
self.dispatch = self.dispatch._join(engine.dispatch)
- self._has_events = _has_events or engine._has_events
+ self._has_events = _has_events or (
+ _has_events is None and engine._has_events)
self._echo = self.engine._should_log_info()
if _execution_options:
@@ -76,6 +77,9 @@ class Connection(Connectable):
else:
self._execution_options = engine._execution_options
+ if self._has_events:
+ self.dispatch.engine_connect(self, _branch)
+
def _branch(self):
"""Return a new Connection which references this Connection's
engine and connection; but does not have close_with_result enabled,
@@ -200,16 +204,11 @@ class Connection(Connectable):
"""
c = self._clone()
c._execution_options = c._execution_options.union(opt)
- if 'isolation_level' in opt:
- c._set_isolation_level()
+ if self._has_events:
+ self.dispatch.set_connection_execution_options(c, opt)
+ self.dialect.set_connection_execution_options(c, opt)
return c
- def _set_isolation_level(self):
- self.dialect.set_isolation_level(self.connection,
- self._execution_options['isolation_level'])
- self.connection._connection_record.finalize_callback = \
- self.dialect.reset_isolation_level
-
@property
def closed(self):
"""Return True if this connection is closed."""
@@ -1336,15 +1335,10 @@ class Engine(Connectable, log.Identified):
:meth:`.Engine.execution_options`
"""
- if 'isolation_level' in opt:
- raise exc.ArgumentError(
- "'isolation_level' execution option may "
- "only be specified on Connection.execution_options(). "
- "To set engine-wide isolation level, "
- "use the isolation_level argument to create_engine()."
- )
self._execution_options = \
self._execution_options.union(opt)
+ self.dispatch.set_engine_execution_options(self, opt)
+ self.dialect.set_engine_execution_options(self, opt)
def execution_options(self, **opt):
"""Return a new :class:`.Engine` that will provide
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 2ad7002c4..3e8e96a42 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -19,6 +19,7 @@ from ..sql import compiler, expression
from .. import exc, types as sqltypes, util, pool, processors
import codecs
import weakref
+from .. import event
AUTOCOMMIT_REGEXP = re.compile(
r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
@@ -289,6 +290,24 @@ class DefaultDialect(interfaces.Dialect):
opts.update(url.query)
return [[], opts]
+ def set_engine_execution_options(self, engine, opts):
+ if 'isolation_level' in opts:
+ isolation_level = opts['isolation_level']
+ @event.listens_for(engine, "engine_connect")
+ def set_isolation(connection, branch):
+ if not branch:
+ self._set_connection_isolation(connection, isolation_level)
+
+ def set_connection_execution_options(self, connection, opts):
+ if 'isolation_level' in opts:
+ self._set_connection_isolation(connection, opts['isolation_level'])
+
+ def _set_connection_isolation(self, connection, level):
+ self.set_isolation_level(connection.connection, level)
+ connection.connection._connection_record.\
+ finalize_callback.append(self.reset_isolation_level)
+
+
def do_begin(self, dbapi_connection):
pass
diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py
index c65986ca2..3ca91968b 100644
--- a/lib/sqlalchemy/engine/strategies.py
+++ b/lib/sqlalchemy/engine/strategies.py
@@ -150,13 +150,8 @@ class DefaultEngineStrategy(EngineStrategy):
event.listen(pool, 'connect', on_connect)
def first_connect(dbapi_connection, connection_record):
- c = base.Connection(engine, connection=dbapi_connection)
-
- # TODO: removing this allows the on connect activities
- # to generate events. tests currently assume these aren't
- # sent. do we want users to get all the initial connect
- # activities as events ?
- c._has_events = False
+ c = base.Connection(engine, connection=dbapi_connection,
+ _has_events=False)
dialect.initialize(c)
event.listen(pool, 'first_connect', first_connect)
diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py
index ae2e4ed93..7f11232ac 100644
--- a/lib/sqlalchemy/events.py
+++ b/lib/sqlalchemy/events.py
@@ -319,6 +319,10 @@ class PoolEvents(event.Events):
connection will be disposed and a fresh connection retrieved.
Processing of all checkout listeners will abort and restart
using the new connection.
+
+ .. seealso:: :meth:`.ConnectionEvents.connect` - a similar event
+ which occurs upon creation of a new :class:`.Connection`.
+
"""
def checkin(self, dbapi_connection, connection_record):
@@ -615,6 +619,103 @@ class ConnectionEvents(event.Events):
"""
+ def engine_connect(self, conn, branch):
+ """Intercept the creation of a new :class:`.Connection`.
+
+ This event is called typically as the direct result of calling
+ the :meth:`.Engine.connect` method.
+
+ It differs from the :meth:`.PoolEvents.connect` method, which
+ refers to the actual connection to a database at the DBAPI level;
+ a DBAPI connection may be pooled and reused for many operations.
+ In contrast, this event refers only to the production of a higher level
+ :class:`.Connection` wrapper around such a DBAPI connection.
+
+ It also differs from the :meth:`.PoolEvents.checkout` event
+ in that it is specific to the :class:`.Connection` object, not the
+ DBAPI connection that :meth:`.PoolEvents.checkout` deals with, although
+ this DBAPI connection is available here via the :attr:`.Connection.connection`
+ attribute. But note there can in fact
+ be multiple :meth:`.PoolEvents.checkout` events within the lifespan
+ of a single :class:`.Connection` object, if that :class:`.Connection`
+ is invalidated and re-established. There can also be multiple
+ :class:`.Connection` objects generated for the same already-checked-out
+ DBAPI connection, in the case that a "branch" of a :class:`.Connection`
+ is produced.
+
+ :param conn: :class:`.Connection` object.
+ :param branch: if True, this is a "branch" of an existing
+ :class:`.Connection`. A branch is generated within the course
+ of a statement execution to invoke supplemental statements, most
+ typically to pre-execute a SELECT of a default value for the purposes
+ of an INSERT statement.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.PoolEvents.checkout` the lower-level pool checkout event
+ for an individual DBAPI connection
+
+ :meth:`.ConnectionEvents.set_connection_execution_options` - a copy of a
+ :class:`.Connection` is also made when the
+ :meth:`.Connection.execution_options` method is called.
+
+ """
+
+ def set_connection_execution_options(self, conn, opts):
+ """Intercept when the :meth:`.Connection.execution_options`
+ method is called.
+
+ This method is called after the new :class:`.Connection` has been
+ produced, with the newly updated execution options collection, but
+ before the :class:`.Dialect` has acted upon any of those new options.
+
+ Note that this method is not called when a new :class:`.Connection`
+ is produced which is inheriting execution options from its parent
+ :class:`.Engine`; to intercept this condition, use the
+ :meth:`.ConnectionEvents.connect` event.
+
+ :param conn: The newly copied :class:`.Connection` object
+
+ :param opts: dictionary of options that were passed to the
+ :meth:`.Connection.execution_options` method.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.set_engine_execution_options` - event
+ which is called when :meth:`.Engine.execution_options` is called.
+
+
+ """
+
+ def set_engine_execution_options(self, engine, opts):
+ """Intercept when the :meth:`.Engine.execution_options`
+ method is called.
+
+ The :meth:`.Engine.execution_options` method produces a shallow
+ copy of the :class:`.Engine` which stores the new options. That new
+ :class:`.Engine` is passed here. A particular application of this
+ method is to add a :meth:`.ConnectionEvents.engine_connect` event
+ handler to the given :class:`.Engine` which will perform some per-
+ :class:`.Connection` task specific to these execution options.
+
+ :param conn: The newly copied :class:`.Engine` object
+
+ :param opts: dictionary of options that were passed to the
+ :meth:`.Connection.execution_options` method.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.set_connection_execution_options` - event
+ which is called when :meth:`.Connection.execution_options` is called.
+
+ """
+
def begin(self, conn):
"""Intercept begin() events.
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index dcf3d9e39..97411dd3a 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -25,6 +25,7 @@ from .util import queue as sqla_queue
from .util import threading, memoized_property, \
chop_traceback
+from collections import deque
proxies = {}
@@ -217,7 +218,7 @@ class Pool(log.Identified):
"""
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
def _create_connection(self):
"""Called by subclasses to create a new ConnectionRecord."""
@@ -269,18 +270,16 @@ class Pool(log.Identified):
"""
if not self._use_threadlocal:
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
try:
rec = self._threadconns.current()
- if rec:
- return rec.checkout()
except AttributeError:
pass
+ else:
+ return rec.checkout_existing()
- agent = _ConnectionFairy(self)
- self._threadconns.current = weakref.ref(agent)
- return agent.checkout()
+ return _ConnectionFairy.checkout(self, self._threadconns)
def _return_conn(self, record):
"""Given a _ConnectionRecord, return it to the :class:`.Pool`.
@@ -311,11 +310,11 @@ class Pool(log.Identified):
class _ConnectionRecord(object):
- finalize_callback = None
def __init__(self, pool):
self.__pool = pool
self.connection = self.__connect()
+ self.finalize_callback = deque()
pool.dispatch.first_connect.\
for_modify(pool.dispatch).\
@@ -326,6 +325,36 @@ class _ConnectionRecord(object):
def info(self):
return {}
+ @classmethod
+ def checkout(cls, pool):
+ rec = pool._do_get()
+ dbapi_connection = rec.get_connection()
+ fairy = _ConnectionFairy(dbapi_connection, rec)
+ rec.fairy_ref = weakref.ref(
+ fairy,
+ lambda ref: _finalize_fairy and \
+ _finalize_fairy(
+ dbapi_connection,
+ rec, pool, ref, pool._echo)
+ )
+ _refs.add(rec)
+ if pool._echo:
+ pool.logger.debug("Connection %r checked out from pool",
+ dbapi_connection)
+ return fairy
+
+ def checkin(self):
+ self.fairy_ref = None
+ connection = self.connection
+ pool = self.__pool
+ while self.finalize_callback:
+ finalizer = self.finalize_callback.pop()
+ finalizer(connection)
+ if pool.dispatch.checkin:
+ pool.dispatch.checkin(connection, self)
+ pool._return_conn(self)
+
+
def close(self):
if self.connection is not None:
self.__pool._close_connection(self.connection)
@@ -373,11 +402,15 @@ class _ConnectionRecord(object):
raise
-def _finalize_fairy(connection, connection_record, pool, ref, echo):
+def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
+ """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
+ been garbage collected.
+
+ """
_refs.discard(connection_record)
if ref is not None and \
- connection_record.fairy is not ref:
+ connection_record.fairy_ref is not ref:
return
if connection is not None:
@@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
connection)
try:
+ fairy = fairy or _ConnectionFairy(connection, connection_record)
if pool.dispatch.reset:
- pool.dispatch.reset(connection, connection_record)
+ pool.dispatch.reset(fairy, connection_record)
if pool._reset_on_return is reset_rollback:
if echo:
pool.logger.debug("Connection %s rollback-on-return",
connection)
- pool._dialect.do_rollback(connection)
+ pool._dialect.do_rollback(fairy)
elif pool._reset_on_return is reset_commit:
if echo:
- pool.logger.debug("Conneciton %s commit-on-return",
+ pool.logger.debug("Connection %s commit-on-return",
connection)
- pool._dialect.do_commit(connection)
+ pool._dialect.do_commit(fairy)
+
# Immediately close detached instances
- if connection_record is None:
+ if not connection_record:
pool._close_connection(connection)
except Exception as e:
- if connection_record is not None:
+ if connection_record:
connection_record.invalidate(e=e)
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
- if connection_record is not None:
- connection_record.fairy = None
- if connection_record.finalize_callback:
- connection_record.finalize_callback(connection)
- del connection_record.finalize_callback
- if pool.dispatch.checkin:
- pool.dispatch.checkin(connection, connection_record)
- pool._return_conn(connection_record)
+ if connection_record:
+ connection_record.checkin()
_refs = set()
@@ -424,27 +453,58 @@ class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference
support."""
- def __init__(self, pool):
- self._pool = pool
- self.__counter = 0
- self._echo = _echo = pool._should_log_debug()
- try:
- rec = self._connection_record = pool._do_get()
- conn = self.connection = self._connection_record.get_connection()
- rec.fairy = weakref.ref(
- self,
- lambda ref: _finalize_fairy and \
- _finalize_fairy(conn, rec, pool, ref, _echo)
- )
- _refs.add(rec)
- except:
- # helps with endless __getattr__ loops later on
- self.connection = None
- self._connection_record = None
- raise
- if self._echo:
- self._pool.logger.debug("Connection %r checked out from pool",
- self.connection)
+ def __init__(self, dbapi_connection, connection_record):
+ self.connection = dbapi_connection
+ self._connection_record = connection_record
+
+ @classmethod
+ def checkout(cls, pool, threadconns=None, fairy=None):
+ if not fairy:
+ fairy = _ConnectionRecord.checkout(pool)
+
+ fairy._pool = pool
+ fairy._counter = 0
+ fairy._echo = pool._should_log_debug()
+
+ if threadconns is not None:
+ threadconns.current = weakref.ref(fairy)
+
+ if fairy.connection is None:
+ raise exc.InvalidRequestError("This connection is closed")
+ fairy._counter += 1
+
+ if not pool.dispatch.checkout or fairy._counter != 1:
+ return fairy
+
+ # Pool listeners can trigger a reconnection on checkout
+ attempts = 2
+ while attempts > 0:
+ try:
+ pool.dispatch.checkout(fairy.connection,
+ fairy._connection_record,
+ fairy)
+ return fairy
+ except exc.DisconnectionError as e:
+ pool.logger.info(
+ "Disconnection detected on checkout: %s", e)
+ fairy._connection_record.invalidate(e)
+ fairy.connection = fairy._connection_record.get_connection()
+ attempts -= 1
+
+ pool.logger.info("Reconnection attempts exhausted on checkout")
+ fairy.invalidate()
+ raise exc.InvalidRequestError("This connection is closed")
+
+ def checkout_existing(self):
+ return _ConnectionFairy.checkout(self._pool, fairy=self)
+
+ def checkin(self):
+ _finalize_fairy(self.connection, self._connection_record,
+ self._pool, None, self._echo, fairy=self)
+ self.connection = None
+ self._connection_record = None
+
+ _close = checkin
@property
def _logger(self):
@@ -465,10 +525,7 @@ class _ConnectionFairy(object):
in subsequent instances of :class:`.ConnectionFairy`.
"""
- try:
- return self._connection_record.info
- except AttributeError:
- raise exc.InvalidRequestError("This connection is closed")
+ return self._connection_record.info
def invalidate(self, e=None):
"""Mark this connection as invalidated.
@@ -479,10 +536,10 @@ class _ConnectionFairy(object):
if self.connection is None:
raise exc.InvalidRequestError("This connection is closed")
- if self._connection_record is not None:
+ if self._connection_record:
self._connection_record.invalidate(e=e)
self.connection = None
- self._close()
+ self.checkin()
def cursor(self, *args, **kwargs):
return self.connection.cursor(*args, **kwargs)
@@ -490,32 +547,6 @@ class _ConnectionFairy(object):
def __getattr__(self, key):
return getattr(self.connection, key)
- def checkout(self):
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- self.__counter += 1
-
- if not self._pool.dispatch.checkout or self.__counter != 1:
- return self
-
- # Pool listeners can trigger a reconnection on checkout
- attempts = 2
- while attempts > 0:
- try:
- self._pool.dispatch.checkout(self.connection,
- self._connection_record,
- self)
- return self
- except exc.DisconnectionError as e:
- self._pool.logger.info(
- "Disconnection detected on checkout: %s", e)
- self._connection_record.invalidate(e)
- self.connection = self._connection_record.get_connection()
- attempts -= 1
-
- self._pool.logger.info("Reconnection attempts exhausted on checkout")
- self.invalidate()
- raise exc.InvalidRequestError("This connection is closed")
def detach(self):
"""Separate this connection from its Pool.
@@ -532,22 +563,17 @@ class _ConnectionFairy(object):
if self._connection_record is not None:
_refs.remove(self._connection_record)
- self._connection_record.fairy = None
+ self._connection_record.fairy_ref = None
self._connection_record.connection = None
self._pool._do_return_conn(self._connection_record)
self.info = self.info.copy()
self._connection_record = None
def close(self):
- self.__counter -= 1
- if self.__counter == 0:
- self._close()
+ self._counter -= 1
+ if self._counter == 0:
+ self.checkin()
- def _close(self):
- _finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo)
- self.connection = None
- self._connection_record = None
class SingletonThreadPool(Pool):
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 9795e4c10..1d2aebf97 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1014,6 +1014,53 @@ class ResultProxyTest(fixtures.TestBase):
finally:
r.close()
+class ExecutionOptionsTest(fixtures.TestBase):
+ def test_dialect_conn_options(self):
+ engine = testing_engine("sqlite://")
+ engine.dialect = Mock()
+ conn = engine.connect()
+ c2 = conn.execution_options(foo="bar")
+ eq_(
+ engine.dialect.set_connection_execution_options.mock_calls,
+ [call(c2, {"foo": "bar"})]
+ )
+
+ def test_dialect_engine_options(self):
+ engine = testing_engine("sqlite://")
+ engine.dialect = Mock()
+ e2 = engine.execution_options(foo="bar")
+ eq_(
+ engine.dialect.set_engine_execution_options.mock_calls,
+ [call(e2, {"foo": "bar"})]
+ )
+
+ def test_dialect_engine_construction_options(self):
+ dialect = Mock()
+ engine = Engine(Mock(), dialect, Mock(),
+ execution_options={"foo": "bar"})
+ eq_(
+ dialect.set_engine_execution_options.mock_calls,
+ [call(engine, {"foo": "bar"})]
+ )
+
+ def test_propagate_engine_to_connection(self):
+ engine = testing_engine("sqlite://",
+ options=dict(execution_options={"foo": "bar"}))
+ conn = engine.connect()
+ eq_(conn._execution_options, {"foo": "bar"})
+
+ def test_propagate_option_engine_to_connection(self):
+ e1 = testing_engine("sqlite://",
+ options=dict(execution_options={"foo": "bar"}))
+ e2 = e1.execution_options(bat="hoho")
+ c1 = e1.connect()
+ c2 = e2.connect()
+ eq_(c1._execution_options, {"foo": "bar"})
+ eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"})
+
+
+
+
class AlternateResultProxyTest(fixtures.TestBase):
__requires__ = ('sqlite', )
@@ -1101,63 +1148,58 @@ class EngineEventsTest(fixtures.TestBase):
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
- canary = []
- def before_exec(conn, stmt, *arg):
- canary.append(stmt)
- event.listen(e1, "before_execute", before_exec)
+ canary = Mock()
+ event.listen(e1, "before_execute", canary)
s1 = select([1])
s2 = select([2])
e1.execute(s1)
e2.execute(s2)
- eq_(canary, [s1])
- event.listen(e2, "before_execute", before_exec)
+ eq_(
+ [arg[1][1] for arg in canary.mock_calls], [s1]
+ )
+ event.listen(e2, "before_execute", canary)
e1.execute(s1)
e2.execute(s2)
- eq_(canary, [s1, s1, s2])
+ eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
def test_per_engine_plus_global(self):
- canary = []
- def be1(conn, stmt, *arg):
- canary.append('be1')
- def be2(conn, stmt, *arg):
- canary.append('be2')
- def be3(conn, stmt, *arg):
- canary.append('be3')
-
- event.listen(Engine, "before_execute", be1)
+ canary = Mock()
+ event.listen(Engine, "before_execute", canary.be1)
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
- event.listen(e1, "before_execute", be2)
+ event.listen(e1, "before_execute", canary.be2)
- event.listen(Engine, "before_execute", be3)
+ event.listen(Engine, "before_execute", canary.be3)
e1.connect()
e2.connect()
- canary[:] = []
+
e1.execute(select([1]))
+ canary.be1.assert_call_count(1)
+ canary.be2.assert_call_count(1)
+
e2.execute(select([1]))
- eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
+ canary.be1.assert_call_count(2)
+ canary.be2.assert_call_count(1)
+ canary.be3.assert_call_count(2)
def test_per_connection_plus_engine(self):
- canary = []
- def be1(conn, stmt, *arg):
- canary.append('be1')
- def be2(conn, stmt, *arg):
- canary.append('be2')
+ canary = Mock()
e1 = testing_engine(config.db_url)
- event.listen(e1, "before_execute", be1)
+ event.listen(e1, "before_execute", canary.be1)
conn = e1.connect()
- event.listen(conn, "before_execute", be2)
- canary[:] = []
+ event.listen(conn, "before_execute", canary.be2)
conn.execute(select([1]))
- eq_(canary, ['be2', 'be1'])
+ canary.be1.assert_call_count(1)
+ canary.be2.assert_call_count(1)
conn._branch().execute(select([1]))
- eq_(canary, ['be2', 'be1', 'be2', 'be1'])
+ canary.be1.assert_call_count(2)
+ canary.be2.assert_call_count(2)
def test_argument_format_execute(self):
def before_execute(conn, clauseelement, multiparams, params):
@@ -1320,6 +1362,44 @@ class EngineEventsTest(fixtures.TestBase):
canary, ['execute', 'cursor_execute']
)
+ def test_engine_connect(self):
+ engine = engines.testing_engine()
+
+ tracker = Mock()
+ event.listen(engine, "engine_connect", tracker)
+
+ c1 = engine.connect()
+ c2 = c1._branch()
+ c1.close()
+ eq_(
+ tracker.mock_calls,
+ [call(c1, False), call(c2, True)]
+ )
+
+ def test_execution_options(self):
+ engine = engines.testing_engine()
+
+ engine_tracker = Mock()
+ conn_tracker = Mock()
+
+ event.listen(engine, "set_engine_execution_options", engine_tracker)
+ event.listen(engine, "set_connection_execution_options", conn_tracker)
+
+ e2 = engine.execution_options(e1='opt_e1')
+ c1 = engine.connect()
+ c2 = c1.execution_options(c1='opt_c1')
+ c3 = e2.connect()
+ c4 = c3.execution_options(c3='opt_c3')
+ eq_(
+ engine_tracker.mock_calls,
+ [call(e2, {'e1': 'opt_e1'})]
+ )
+ eq_(
+ conn_tracker.mock_calls,
+ [call(c2, {"c1": "opt_c1"}), call(c4, {"c3": "opt_c3"})]
+ )
+
+
@testing.requires.sequences
@testing.provide_metadata
def test_cursor_execute(self):
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 5b64a9f31..8c4bcd8b5 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -879,9 +879,10 @@ class QueuePoolTest(PoolTestBase):
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
def waiter(p):
+ success_key = (timeout, max_overflow)
conn = p.connect()
time.sleep(.5)
- success.append(True)
+ success.append(success_key)
conn.close()
time.sleep(.2)
@@ -896,8 +897,8 @@ class QueuePoolTest(PoolTestBase):
c1.invalidate()
c2.invalidate()
p2 = p._replace()
- time.sleep(2)
- eq_(len(success), 12)
+ time.sleep(1)
+ eq_(len(success), 12, "successes: %s" % success)
@testing.requires.threading_with_mock
@testing.requires.python26
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 789c15444..ffc12b5b9 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1279,15 +1279,15 @@ class IsolationLevelTest(fixtures.TestBase):
)
- def test_per_engine_bzzt(self):
- assert_raises_message(
- exc.ArgumentError,
- r"'isolation_level' execution option may "
- r"only be specified on Connection.execution_options\(\). "
- r"To set engine-wide isolation level, "
- r"use the isolation_level argument to create_engine\(\).",
- create_engine,
- testing.db.url,
- execution_options={'isolation_level':
- self._non_default_isolation_level}
+ def test_per_engine(self):
+ # new in 0.9
+ eng = create_engine(testing.db.url,
+ execution_options={
+ 'isolation_level':
+ self._non_default_isolation_level()}
+ )
+ conn = eng.connect()
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection),
+ self._non_default_isolation_level()
)
diff --git a/test/profiles.txt b/test/profiles.txt
index 4d8964639..915cfadb4 100644
--- a/test/profiles.txt
+++ b/test/profiles.txt
@@ -168,19 +168,19 @@ test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_noc
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.6_sqlite_pysqlite_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_oracle_cx_oracle_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_postgresql_psycopg2_nocextensions 70
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_sqlite_pysqlite_nocextensions 70
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_oracle_cx_oracle_nocextensions 69
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_postgresql_psycopg2_nocextensions 69
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_sqlite_pysqlite_nocextensions 69
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.6_sqlite_pysqlite_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_oracle_cx_oracle_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_postgresql_psycopg2_nocextensions 75
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_sqlite_pysqlite_nocextensions 75
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_oracle_cx_oracle_nocextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_postgresql_psycopg2_nocextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_sqlite_pysqlite_nocextensions 74
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect
@@ -200,19 +200,19 @@ test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect 3.3_sqlite_pysqli
# TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.6_sqlite_pysqlite_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_oracle_cx_oracle_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_postgresql_psycopg2_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_sqlite_pysqlite_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_oracle_cx_oracle_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_postgresql_psycopg2_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.6_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_oracle_cx_oracle_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_postgresql_psycopg2_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_sqlite_pysqlite_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_oracle_cx_oracle_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_postgresql_psycopg2_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_sqlite_pysqlite_nocextensions 8
# TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
@@ -232,19 +232,19 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
# TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.6_sqlite_pysqlite_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_postgresql_psycopg2_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_sqlite_pysqlite_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_oracle_cx_oracle_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_sqlite_pysqlite_nocextensions 66
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.6_sqlite_pysqlite_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_postgresql_psycopg2_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_sqlite_pysqlite_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_oracle_cx_oracle_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_sqlite_pysqlite_nocextensions 71
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile