diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-07-24 20:35:03 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-07-24 20:35:03 -0400 |
commit | d9a6641dc8dfea02936d37d58bc446cb4fa1f1b5 (patch) | |
tree | 07cd2890e3ed2cd6e1cd3c50d6b9cbde68ec7636 /lib/sqlalchemy/engine/base.py | |
parent | 5f7d70124ed6999e25bfaba948738a99bae2044e (diff) | |
download | sqlalchemy-d9a6641dc8dfea02936d37d58bc446cb4fa1f1b5.tar.gz |
- got engine events partially working, needs work on return value considerations
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 196 |
1 files changed, 95 insertions, 101 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 264a71bef..a5f99022f 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1546,20 +1546,6 @@ class TwoPhaseTransaction(Transaction): def _do_commit(self): self.connection._commit_twophase_impl(self.xid, self._is_prepared) -class _EngineDispatch(event.Events): - def append(self, fn, identifier, target): - if isinstance(target.Connection, Connection): - target.Connection = _proxy_connection_cls(target.Connection, self) - event.Dispatch.append(self, fn, identifier) - - def exec_(self, identifier, orig, kw): - for fn in getattr(self, identifier): - r = fn(**kw) - if r: - return r - else: - return orig() - class Engine(Connectable, log.Identified): """ Connects a :class:`~sqlalchemy.pool.Pool` and @@ -1574,7 +1560,6 @@ class Engine(Connectable, log.Identified): _execution_options = util.frozendict() Connection = Connection - def __init__(self, pool, dialect, url, logging_name=None, echo=None, proxy=None, execution_options=None @@ -1592,41 +1577,50 @@ class Engine(Connectable, log.Identified): if execution_options: self.update_execution_options(**execution_options) - class events(_EngineDispatch): - def execute(self, conn, execute, clauseelement, *multiparams, **params): + class events(event.Events): + @classmethod + def listen(cls, target, fn, identifier): + if issubclass(target.Connection, Connection): + target.Connection = _proxy_connection_cls( + Connection, + target.events) + event.Events.listen(target, fn, identifier) + + def on_execute(self, conn, execute, clauseelement, *multiparams, **params): """Intercept high level execute() events.""" - def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): + def on_cursor_execute(self, conn, execute, cursor, statement, + parameters, context, executemany): """Intercept low-level cursor execute() events.""" - def begin(self, conn, begin): + def on_begin(self, conn, begin): """Intercept begin() events.""" - def rollback(self, conn, rollback): + def on_rollback(self, conn, rollback): """Intercept rollback() events.""" - def commit(self, conn, commit): + def on_commit(self, conn, commit): """Intercept commit() events.""" - def savepoint(self, conn, savepoint, name=None): + def on_savepoint(self, conn, savepoint, name=None): """Intercept savepoint() events.""" - def rollback_savepoint(self, conn, rollback_savepoint, name, context): + def on_rollback_savepoint(self, conn, rollback_savepoint, name, context): """Intercept rollback_savepoint() events.""" - def release_savepoint(self, conn, release_savepoint, name, context): + def on_release_savepoint(self, conn, release_savepoint, name, context): """Intercept release_savepoint() events.""" - def begin_twophase(self, conn, begin_twophase, xid): + def on_begin_twophase(self, conn, begin_twophase, xid): """Intercept begin_twophase() events.""" - def prepare_twophase(self, conn, prepare_twophase, xid): + def on_prepare_twophase(self, conn, prepare_twophase, xid): """Intercept prepare_twophase() events.""" - def rollback_twophase(self, conn, rollback_twophase, xid, is_prepared): + def on_rollback_twophase(self, conn, rollback_twophase, xid, is_prepared): """Intercept rollback_twophase() events.""" - def commit_twophase(self, conn, commit_twophase, xid, is_prepared): + def on_commit_twophase(self, conn, commit_twophase, xid, is_prepared): """Intercept commit_twophase() events.""" events = event.dispatcher(events) @@ -1847,106 +1841,106 @@ class Engine(Connectable, log.Identified): return self.pool.unique_connection() def _proxy_connection_cls(cls, dispatch): + def _exec_recursive(conn, fns, orig): + if not fns: + return orig + def go(*arg, **kw): + nested = _exec_recursive(conn, fns[1:], orig) + ret = fns[0](conn, nested, *arg, **kw) + # TODO: need to get consistent way to check + # for "they called the fn, they didn't", or otherwise + # make some decision here how this is to work + #if ret is None: + # return nested(*arg, **kw) + #else: + return ret + return go + class ProxyConnection(cls): - def _exec_recursive(self, fns, orig): - if not fns: - return orig - def go(*arg, **kw): - nested = self._exec_recursive(fns[1:], orig) - ret = fns[0](self, nested, *arg, **kw) - if ret is None: - return nested(*arg, **kw) - else: - return ret - return go - - def _exec_recursive_minus_self(self, fns, orig): - if not fns: - return orig - def go(*arg, **kw): - nested = self._exec_recursive(fns[1:], orig) - ret = fns[0](nested, *arg, **kw) - if ret is None: - return nested(*arg, **kw) - else: - return ret - return go - def execute(self, clauseelement, *multiparams, **params): - - orig = super(ProxyConnection, self).execute - - g = self._exec_recursive( - dispatch.on_execute, - orig) + g = _exec_recursive(self, dispatch.on_execute, + super(ProxyConnection, self).execute) return g(clauseelement, *multiparams, **params) - def _execute_clauseelement(self, clauseelement, multiparams=None, params=None): return self.execute(clauseelement, *(multiparams or []), **(params or {})) + # TODO : this is all wrong, cursor_execute() and + # cursor_executemany() don't have a return value, need to find some + # other way to check for executed on these + def _cursor_execute(self, cursor, statement, parameters, context=None): - orig = super(ProxyConnection, self)._cursor_execute - g = self._exec_recursive_minus_self( - dispatch.on_cursor_execute, - orig - ) - return g(cursor, statement, parameters, context=None) - - # these are all TODO - def _cursor_executemany(self, cursor, statement, - parameters, context=None): - return proxy.cursor_execute( - super(ProxyConnection, self)._cursor_executemany, - cursor, statement, parameters, context, True) - + g = _exec_recursive(self, dispatch.on_cursor_execute, + self._cursor_exec) + return g(cursor, statement, parameters, context, False) + + def _cursor_executemany(self, cursor, statement, parameters, + context=None, ): + g = _exec_recursive(self, dispatch.on_cursor_execute, + self._cursor_exec) + return g(cursor, statement, parameters, context, True) + + def _cursor_exec(self, cursor, statement, parameters, context, + executemany): + if executemany: + return super(ProxyConnection, + self)._cursor_executemany(cursor, + statement, parameters, context) + else: + return super(ProxyConnection, + self)._cursor_execute(cursor, statement, + parameters, context) + def _begin_impl(self): - return proxy.begin(self, super(ProxyConnection, self)._begin_impl) + g = _exec_recursive(self, dispatch.on_begin, + super(ProxyConnection, self)._begin_impl) + return g() def _rollback_impl(self): - return proxy.rollback(self, - super(ProxyConnection, self)._rollback_impl) + g = _exec_recursive(self, dispatch.on_rollback, + super(ProxyConnection, self)._rollback_impl) + return g() def _commit_impl(self): - return proxy.commit(self, - super(ProxyConnection, self)._commit_impl) + g = _exec_recursive(self, dispatch.on_commit, + super(ProxyConnection, self)._commit_impl) + return g() def _savepoint_impl(self, name=None): - return proxy.savepoint(self, - super(ProxyConnection, self)._savepoint_impl, - name=name) + g = _exec_recursive(self, dispatch.on_savepoint, + super(ProxyConnection, self)._savepoint_impl) + return g(name=name) def _rollback_to_savepoint_impl(self, name, context): - return proxy.rollback_savepoint(self, - super(ProxyConnection, - self)._rollback_to_savepoint_impl, - name, context) + g = _exec_recursive(self, dispatch.on_rollback_savepoint, + super(ProxyConnection, self)._rollback_to_savepoint_impl) + return g(name, context) def _release_savepoint_impl(self, name, context): - return proxy.release_savepoint(self, - super(ProxyConnection, self)._release_savepoint_impl, - name, context) - + g = _exec_recursive(self, dispatch.on_release_savepoint, + super(ProxyConnection, self)._release_savepoint_impl) + return g(name, context) + def _begin_twophase_impl(self, xid): - return proxy.begin_twophase(self, - super(ProxyConnection, self)._begin_twophase_impl, - xid) + g = _exec_recursive(self, dispatch.on_begin_twophase, + super(ProxyConnection, self)._begin_twophase_impl) + return g(xid) def _prepare_twophase_impl(self, xid): - return proxy.prepare_twophase(self, - super(ProxyConnection, self)._prepare_twophase_impl, - xid) + g = _exec_recursive(self, dispatch.on_prepare_twophase, + super(ProxyConnection, self)._prepare_twophase_impl) + return g(xid) def _rollback_twophase_impl(self, xid, is_prepared): - return proxy.rollback_twophase(self, - super(ProxyConnection, self)._rollback_twophase_impl, - xid, is_prepared) + g = _exec_recursive(self, dispatch.on_rollback_twophase, + super(ProxyConnection, self)._rollback_twophase_impl) + return g(xid, is_prepared) def _commit_twophase_impl(self, xid, is_prepared): - return proxy.commit_twophase(self, - super(ProxyConnection, self)._commit_twophase_impl, - xid, is_prepared) + g = _exec_recursive(self, dispatch.on_commit_twophase, + super(ProxyConnection, self)._commit_twophase_impl) + return g(xid, is_prepared) return ProxyConnection |