diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-02-17 19:59:45 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-02-17 19:59:45 -0500 |
commit | 7dab4ae6a9ca057bb99be2f01efc26610be12f63 (patch) | |
tree | 69e2a7a7bca8899c6fa44616c919cf3b232cb3a0 /lib/sqlalchemy/engine/base.py | |
parent | e7c5fd7b22dd21ec1c1cac177b9ee611779903e3 (diff) | |
download | sqlalchemy-7dab4ae6a9ca057bb99be2f01efc26610be12f63.tar.gz |
- rename EngineEvents to ConnectionEvents
- simplify connection event model to be inline inside Connection, don't use ad-hoc
subclasses (technically would leak memory for the app that keeps creating engines
and adding events)
- not doing listen-per-connection yet. this is closer. overall things
are much simpler now (until we put listen-per-connection in...)
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 227 |
1 files changed, 104 insertions, 123 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index ae29ac40b..9eb5a38ff 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -847,6 +847,7 @@ class Connection(Connectable): self.__savepoint_seq = 0 self.__branch = _branch self.__invalid = False + self._has_events = engine._has_events self._echo = self.engine._should_log_info() if _execution_options: self._execution_options =\ @@ -1107,6 +1108,10 @@ class Connection(Connectable): def _begin_impl(self): if self._echo: self.engine.logger.info("BEGIN (implicit)") + + if self._has_events: + self.engine.dispatch.begin(self) + try: self.engine.dialect.do_begin(self.connection) except Exception, e: @@ -1114,6 +1119,9 @@ class Connection(Connectable): raise def _rollback_impl(self): + if self._has_events: + self.engine.dispatch.rollback(self) + if not self.closed and not self.invalidated and \ self._connection_is_valid: if self._echo: @@ -1128,6 +1136,9 @@ class Connection(Connectable): self.__transaction = None def _commit_impl(self): + if self._has_events: + self.engine.dispatch.commit(self) + if self._echo: self.engine.logger.info("COMMIT") try: @@ -1138,6 +1149,9 @@ class Connection(Connectable): raise def _savepoint_impl(self, name=None): + if self._has_events: + self.engine.dispatch.savepoint(self, name) + if name is None: self.__savepoint_seq += 1 name = 'sa_savepoint_%s' % self.__savepoint_seq @@ -1146,31 +1160,49 @@ class Connection(Connectable): return name def _rollback_to_savepoint_impl(self, name, context): + if self._has_events: + self.engine.dispatch.rollback_savepoint(self, name, context) + if self._connection_is_valid: self.engine.dialect.do_rollback_to_savepoint(self, name) self.__transaction = context def _release_savepoint_impl(self, name, context): + if self._has_events: + self.engine.dispatch.release_savepoint(self, name, context) + if self._connection_is_valid: self.engine.dialect.do_release_savepoint(self, name) self.__transaction = context def _begin_twophase_impl(self, xid): + if self._has_events: + self.engine.dispatch.begin_twophase(self, xid) + if self._connection_is_valid: self.engine.dialect.do_begin_twophase(self, xid) def _prepare_twophase_impl(self, xid): + if self._has_events: + self.engine.dispatch.prepare_twophase(self, xid) + if self._connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_prepare_twophase(self, xid) def _rollback_twophase_impl(self, xid, is_prepared): + if self._has_events: + self.engine.dispatch.rollback_twophase(self, xid, is_prepared) + if self._connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_rollback_twophase(self, xid, is_prepared) self.__transaction = None def _commit_twophase_impl(self, xid, is_prepared): + if self._has_events: + self.engine.dispatch.commit_twophase(self, xid, is_prepared) + if self._connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_commit_twophase(self, xid, is_prepared) @@ -1218,7 +1250,6 @@ class Connection(Connectable): * a :class:`.Compiled` object """ - for c in type(object).__mro__: if c in Connection.executors: return Connection.executors[c]( @@ -1272,6 +1303,11 @@ class Connection(Connectable): def _execute_default(self, default, multiparams, params): """Execute a schema.ColumnDefault object.""" + if self._has_events: + for fn in self.engine.dispatch.before_execute: + default, multiparams, params = \ + fn(self, default, multiparams, params) + try: try: conn = self.__connection @@ -1288,83 +1324,121 @@ class Connection(Connectable): ret = ctx._exec_default(default, None) if self.should_close_with_result: self.close() + + if self._has_events: + self.engine.dispatch.after_execute(self, + default, multiparams, params, ret) + return ret - def _execute_ddl(self, ddl, params, multiparams): + def _execute_ddl(self, ddl, multiparams, params): """Execute a schema.DDL object.""" + if self._has_events: + for fn in self.engine.dispatch.before_execute: + ddl, multiparams, params = \ + fn(self, ddl, multiparams, params) + dialect = self.dialect compiled = ddl.compile(dialect=dialect) - return self._execute_context( + ret = self._execute_context( dialect, dialect.execution_ctx_cls._init_ddl, compiled, None, compiled ) + if self._has_events: + self.engine.dispatch.after_execute(self, + ddl, multiparams, params, ret) + return ret def _execute_clauseelement(self, elem, multiparams, params): """Execute a sql.ClauseElement object.""" - params = self.__distill_params(multiparams, params) - if params: - keys = params[0].keys() + if self._has_events: + for fn in self.engine.dispatch.before_execute: + elem, multiparams, params = \ + fn(self, elem, multiparams, params) + + distilled_params = self.__distill_params(multiparams, params) + if distilled_params: + keys = distilled_params[0].keys() else: keys = [] dialect = self.dialect if 'compiled_cache' in self._execution_options: - key = dialect, elem, tuple(keys), len(params) > 1 + key = dialect, elem, tuple(keys), len(distilled_params) > 1 if key in self._execution_options['compiled_cache']: compiled_sql = self._execution_options['compiled_cache'][key] else: compiled_sql = elem.compile( dialect=dialect, column_keys=keys, - inline=len(params) > 1) + inline=len(distilled_params) > 1) self._execution_options['compiled_cache'][key] = compiled_sql else: compiled_sql = elem.compile( dialect=dialect, column_keys=keys, - inline=len(params) > 1) + inline=len(distilled_params) > 1) - return self._execute_context( + ret = self._execute_context( dialect, dialect.execution_ctx_cls._init_compiled, compiled_sql, - params, - compiled_sql, params + distilled_params, + compiled_sql, distilled_params ) + if self._has_events: + self.engine.dispatch.after_execute(self, + elem, multiparams, params, ret) + return ret def _execute_compiled(self, compiled, multiparams, params): """Execute a sql.Compiled object.""" + if self._has_events: + for fn in self.engine.dispatch.before_execute: + compiled, multiparams, params = \ + fn(self, compiled, multiparams, params) + dialect = self.dialect parameters=self.__distill_params(multiparams, params) - return self._execute_context( + ret = self._execute_context( dialect, dialect.execution_ctx_cls._init_compiled, compiled, parameters, compiled, parameters ) + if self._has_events: + self.engine.dispatch.after_execute(self, + compiled, multiparams, params, ret) + return ret def _execute_text(self, statement, multiparams, params): """Execute a string SQL statement.""" + if self._has_events: + for fn in self.engine.dispatch.before_execute: + statement, multiparams, params = \ + fn(self, statement, multiparams, params) + dialect = self.dialect parameters = self.__distill_params(multiparams, params) - return self._execute_context( + ret = self._execute_context( dialect, dialect.execution_ctx_cls._init_statement, statement, parameters, statement, parameters ) - - _before_cursor_execute = None - _after_cursor_execute = None + if self._has_events: + self.engine.dispatch.after_execute(self, + statement, multiparams, params, ret) + return ret def _execute_context(self, dialect, constructor, statement, parameters, @@ -1395,12 +1469,11 @@ class Connection(Connectable): if not context.executemany: parameters = parameters[0] - if self._before_cursor_execute: - statement, parameters = self._before_cursor_execute( - context, - cursor, - statement, - parameters) + if self._has_events: + for fn in self.engine.dispatch.before_cursor_execute: + statement, parameters = \ + fn(self, cursor, statement, parameters, + context, context.executemany) if self._echo: self.engine.logger.info(statement) @@ -1428,9 +1501,12 @@ class Connection(Connectable): raise - if self._after_cursor_execute: - self._after_cursor_execute(context, cursor, - statement, parameters) + if self._has_events: + self.engine.dispatch.after_cursor_execute(self, cursor, + statement, + parameters, + context, + context.executemany) if context.compiled: context.post_exec() @@ -1757,6 +1833,7 @@ class Engine(Connectable, log.Identified): """ _execution_options = util.immutabledict() + _has_events = False Connection = Connection def __init__(self, pool, dialect, url, @@ -1783,8 +1860,7 @@ class Engine(Connectable, log.Identified): ) self.update_execution_options(**execution_options) - - dispatch = event.dispatcher(events.EngineEvents) + dispatch = event.dispatcher(events.ConnectionEvents) def update_execution_options(self, **opt): """update the execution_options dictionary of this :class:`Engine`. @@ -2028,101 +2104,6 @@ class Engine(Connectable, log.Identified): return self.pool.unique_connection() -def _listener_connection_cls(cls, dispatch): - """Produce a wrapper for :class:`.Connection` which will apply event - dispatch to each method. - - :class:`.Connection` does not provide event dispatch built in so that - method call overhead is avoided in the absense of any listeners. - - """ - class EventListenerConnection(cls): - def execute(self, clauseelement, *multiparams, **params): - for fn in dispatch.before_execute: - clauseelement, multiparams, params = \ - fn(self, clauseelement, multiparams, params) - - ret = super(EventListenerConnection, self).\ - execute(clauseelement, *multiparams, **params) - - for fn in dispatch.after_execute: - fn(self, clauseelement, multiparams, params, ret) - - return ret - - def _execute_clauseelement(self, clauseelement, - multiparams=None, params=None): - return self.execute(clauseelement, - *(multiparams or []), - **(params or {})) - - def _before_cursor_execute(self, context, cursor, - statement, parameters): - for fn in dispatch.before_cursor_execute: - statement, parameters = \ - fn(self, cursor, statement, parameters, - context, context.executemany) - return statement, parameters - - def _after_cursor_execute(self, context, cursor, - statement, parameters): - dispatch.after_cursor_execute(self, cursor, - statement, - parameters, - context, - context.executemany) - - def _begin_impl(self): - dispatch.begin(self) - return super(EventListenerConnection, self).\ - _begin_impl() - - def _rollback_impl(self): - dispatch.rollback(self) - return super(EventListenerConnection, self).\ - _rollback_impl() - - def _commit_impl(self): - dispatch.commit(self) - return super(EventListenerConnection, self).\ - _commit_impl() - - def _savepoint_impl(self, name=None): - dispatch.savepoint(self, name) - return super(EventListenerConnection, self).\ - _savepoint_impl(name=name) - - def _rollback_to_savepoint_impl(self, name, context): - dispatch.rollback_savepoint(self, name, context) - return super(EventListenerConnection, self).\ - _rollback_to_savepoint_impl(name, context) - - def _release_savepoint_impl(self, name, context): - dispatch.release_savepoint(self, name, context) - return super(EventListenerConnection, self).\ - _release_savepoint_impl(name, context) - - def _begin_twophase_impl(self, xid): - dispatch.begin_twophase(self, xid) - return super(EventListenerConnection, self).\ - _begin_twophase_impl(xid) - - def _prepare_twophase_impl(self, xid): - dispatch.prepare_twophase(self, xid) - return super(EventListenerConnection, self).\ - _prepare_twophase_impl(xid) - - def _rollback_twophase_impl(self, xid, is_prepared): - dispatch.rollback_twophase(self, xid) - return super(EventListenerConnection, self).\ - _rollback_twophase_impl(xid, is_prepared) - - def _commit_twophase_impl(self, xid, is_prepared): - dispatch.commit_twophase(self, xid, is_prepared) - return super(EventListenerConnection, self).\ - _commit_twophase_impl(xid, is_prepared) - - return EventListenerConnection # This reconstructor is necessary so that pickles with the C extension or # without use the same Binary format. |