summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2011-02-17 19:59:45 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2011-02-17 19:59:45 -0500
commit7dab4ae6a9ca057bb99be2f01efc26610be12f63 (patch)
tree69e2a7a7bca8899c6fa44616c919cf3b232cb3a0 /lib/sqlalchemy/engine/base.py
parente7c5fd7b22dd21ec1c1cac177b9ee611779903e3 (diff)
downloadsqlalchemy-7dab4ae6a9ca057bb99be2f01efc26610be12f63.tar.gz
- rename EngineEvents to ConnectionEvents
- simplify connection event model to be inline inside Connection, don't use ad-hoc subclasses (technically would leak memory for the app that keeps creating engines and adding events) - not doing listen-per-connection yet. this is closer. overall things are much simpler now (until we put listen-per-connection in...)
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py227
1 files changed, 104 insertions, 123 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index ae29ac40b..9eb5a38ff 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -847,6 +847,7 @@ class Connection(Connectable):
self.__savepoint_seq = 0
self.__branch = _branch
self.__invalid = False
+ self._has_events = engine._has_events
self._echo = self.engine._should_log_info()
if _execution_options:
self._execution_options =\
@@ -1107,6 +1108,10 @@ class Connection(Connectable):
def _begin_impl(self):
if self._echo:
self.engine.logger.info("BEGIN (implicit)")
+
+ if self._has_events:
+ self.engine.dispatch.begin(self)
+
try:
self.engine.dialect.do_begin(self.connection)
except Exception, e:
@@ -1114,6 +1119,9 @@ class Connection(Connectable):
raise
def _rollback_impl(self):
+ if self._has_events:
+ self.engine.dispatch.rollback(self)
+
if not self.closed and not self.invalidated and \
self._connection_is_valid:
if self._echo:
@@ -1128,6 +1136,9 @@ class Connection(Connectable):
self.__transaction = None
def _commit_impl(self):
+ if self._has_events:
+ self.engine.dispatch.commit(self)
+
if self._echo:
self.engine.logger.info("COMMIT")
try:
@@ -1138,6 +1149,9 @@ class Connection(Connectable):
raise
def _savepoint_impl(self, name=None):
+ if self._has_events:
+ self.engine.dispatch.savepoint(self, name)
+
if name is None:
self.__savepoint_seq += 1
name = 'sa_savepoint_%s' % self.__savepoint_seq
@@ -1146,31 +1160,49 @@ class Connection(Connectable):
return name
def _rollback_to_savepoint_impl(self, name, context):
+ if self._has_events:
+ self.engine.dispatch.rollback_savepoint(self, name, context)
+
if self._connection_is_valid:
self.engine.dialect.do_rollback_to_savepoint(self, name)
self.__transaction = context
def _release_savepoint_impl(self, name, context):
+ if self._has_events:
+ self.engine.dispatch.release_savepoint(self, name, context)
+
if self._connection_is_valid:
self.engine.dialect.do_release_savepoint(self, name)
self.__transaction = context
def _begin_twophase_impl(self, xid):
+ if self._has_events:
+ self.engine.dispatch.begin_twophase(self, xid)
+
if self._connection_is_valid:
self.engine.dialect.do_begin_twophase(self, xid)
def _prepare_twophase_impl(self, xid):
+ if self._has_events:
+ self.engine.dispatch.prepare_twophase(self, xid)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_prepare_twophase(self, xid)
def _rollback_twophase_impl(self, xid, is_prepared):
+ if self._has_events:
+ self.engine.dispatch.rollback_twophase(self, xid, is_prepared)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_rollback_twophase(self, xid, is_prepared)
self.__transaction = None
def _commit_twophase_impl(self, xid, is_prepared):
+ if self._has_events:
+ self.engine.dispatch.commit_twophase(self, xid, is_prepared)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
@@ -1218,7 +1250,6 @@ class Connection(Connectable):
* a :class:`.Compiled` object
"""
-
for c in type(object).__mro__:
if c in Connection.executors:
return Connection.executors[c](
@@ -1272,6 +1303,11 @@ class Connection(Connectable):
def _execute_default(self, default, multiparams, params):
"""Execute a schema.ColumnDefault object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ default, multiparams, params = \
+ fn(self, default, multiparams, params)
+
try:
try:
conn = self.__connection
@@ -1288,83 +1324,121 @@ class Connection(Connectable):
ret = ctx._exec_default(default, None)
if self.should_close_with_result:
self.close()
+
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ default, multiparams, params, ret)
+
return ret
- def _execute_ddl(self, ddl, params, multiparams):
+ def _execute_ddl(self, ddl, multiparams, params):
"""Execute a schema.DDL object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ ddl, multiparams, params = \
+ fn(self, ddl, multiparams, params)
+
dialect = self.dialect
compiled = ddl.compile(dialect=dialect)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_ddl,
compiled,
None,
compiled
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ ddl, multiparams, params, ret)
+ return ret
def _execute_clauseelement(self, elem, multiparams, params):
"""Execute a sql.ClauseElement object."""
- params = self.__distill_params(multiparams, params)
- if params:
- keys = params[0].keys()
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ elem, multiparams, params = \
+ fn(self, elem, multiparams, params)
+
+ distilled_params = self.__distill_params(multiparams, params)
+ if distilled_params:
+ keys = distilled_params[0].keys()
else:
keys = []
dialect = self.dialect
if 'compiled_cache' in self._execution_options:
- key = dialect, elem, tuple(keys), len(params) > 1
+ key = dialect, elem, tuple(keys), len(distilled_params) > 1
if key in self._execution_options['compiled_cache']:
compiled_sql = self._execution_options['compiled_cache'][key]
else:
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
- inline=len(params) > 1)
+ inline=len(distilled_params) > 1)
self._execution_options['compiled_cache'][key] = compiled_sql
else:
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
- inline=len(params) > 1)
+ inline=len(distilled_params) > 1)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
compiled_sql,
- params,
- compiled_sql, params
+ distilled_params,
+ compiled_sql, distilled_params
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ elem, multiparams, params, ret)
+ return ret
def _execute_compiled(self, compiled, multiparams, params):
"""Execute a sql.Compiled object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ compiled, multiparams, params = \
+ fn(self, compiled, multiparams, params)
+
dialect = self.dialect
parameters=self.__distill_params(multiparams, params)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
compiled,
parameters,
compiled, parameters
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ compiled, multiparams, params, ret)
+ return ret
def _execute_text(self, statement, multiparams, params):
"""Execute a string SQL statement."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ statement, multiparams, params = \
+ fn(self, statement, multiparams, params)
+
dialect = self.dialect
parameters = self.__distill_params(multiparams, params)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
parameters,
statement, parameters
)
-
- _before_cursor_execute = None
- _after_cursor_execute = None
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ statement, multiparams, params, ret)
+ return ret
def _execute_context(self, dialect, constructor,
statement, parameters,
@@ -1395,12 +1469,11 @@ class Connection(Connectable):
if not context.executemany:
parameters = parameters[0]
- if self._before_cursor_execute:
- statement, parameters = self._before_cursor_execute(
- context,
- cursor,
- statement,
- parameters)
+ if self._has_events:
+ for fn in self.engine.dispatch.before_cursor_execute:
+ statement, parameters = \
+ fn(self, cursor, statement, parameters,
+ context, context.executemany)
if self._echo:
self.engine.logger.info(statement)
@@ -1428,9 +1501,12 @@ class Connection(Connectable):
raise
- if self._after_cursor_execute:
- self._after_cursor_execute(context, cursor,
- statement, parameters)
+ if self._has_events:
+ self.engine.dispatch.after_cursor_execute(self, cursor,
+ statement,
+ parameters,
+ context,
+ context.executemany)
if context.compiled:
context.post_exec()
@@ -1757,6 +1833,7 @@ class Engine(Connectable, log.Identified):
"""
_execution_options = util.immutabledict()
+ _has_events = False
Connection = Connection
def __init__(self, pool, dialect, url,
@@ -1783,8 +1860,7 @@ class Engine(Connectable, log.Identified):
)
self.update_execution_options(**execution_options)
-
- dispatch = event.dispatcher(events.EngineEvents)
+ dispatch = event.dispatcher(events.ConnectionEvents)
def update_execution_options(self, **opt):
"""update the execution_options dictionary of this :class:`Engine`.
@@ -2028,101 +2104,6 @@ class Engine(Connectable, log.Identified):
return self.pool.unique_connection()
-def _listener_connection_cls(cls, dispatch):
- """Produce a wrapper for :class:`.Connection` which will apply event
- dispatch to each method.
-
- :class:`.Connection` does not provide event dispatch built in so that
- method call overhead is avoided in the absense of any listeners.
-
- """
- class EventListenerConnection(cls):
- def execute(self, clauseelement, *multiparams, **params):
- for fn in dispatch.before_execute:
- clauseelement, multiparams, params = \
- fn(self, clauseelement, multiparams, params)
-
- ret = super(EventListenerConnection, self).\
- execute(clauseelement, *multiparams, **params)
-
- for fn in dispatch.after_execute:
- fn(self, clauseelement, multiparams, params, ret)
-
- return ret
-
- def _execute_clauseelement(self, clauseelement,
- multiparams=None, params=None):
- return self.execute(clauseelement,
- *(multiparams or []),
- **(params or {}))
-
- def _before_cursor_execute(self, context, cursor,
- statement, parameters):
- for fn in dispatch.before_cursor_execute:
- statement, parameters = \
- fn(self, cursor, statement, parameters,
- context, context.executemany)
- return statement, parameters
-
- def _after_cursor_execute(self, context, cursor,
- statement, parameters):
- dispatch.after_cursor_execute(self, cursor,
- statement,
- parameters,
- context,
- context.executemany)
-
- def _begin_impl(self):
- dispatch.begin(self)
- return super(EventListenerConnection, self).\
- _begin_impl()
-
- def _rollback_impl(self):
- dispatch.rollback(self)
- return super(EventListenerConnection, self).\
- _rollback_impl()
-
- def _commit_impl(self):
- dispatch.commit(self)
- return super(EventListenerConnection, self).\
- _commit_impl()
-
- def _savepoint_impl(self, name=None):
- dispatch.savepoint(self, name)
- return super(EventListenerConnection, self).\
- _savepoint_impl(name=name)
-
- def _rollback_to_savepoint_impl(self, name, context):
- dispatch.rollback_savepoint(self, name, context)
- return super(EventListenerConnection, self).\
- _rollback_to_savepoint_impl(name, context)
-
- def _release_savepoint_impl(self, name, context):
- dispatch.release_savepoint(self, name, context)
- return super(EventListenerConnection, self).\
- _release_savepoint_impl(name, context)
-
- def _begin_twophase_impl(self, xid):
- dispatch.begin_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _begin_twophase_impl(xid)
-
- def _prepare_twophase_impl(self, xid):
- dispatch.prepare_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _prepare_twophase_impl(xid)
-
- def _rollback_twophase_impl(self, xid, is_prepared):
- dispatch.rollback_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _rollback_twophase_impl(xid, is_prepared)
-
- def _commit_twophase_impl(self, xid, is_prepared):
- dispatch.commit_twophase(self, xid, is_prepared)
- return super(EventListenerConnection, self).\
- _commit_twophase_impl(xid, is_prepared)
-
- return EventListenerConnection
# This reconstructor is necessary so that pickles with the C extension or
# without use the same Binary format.