diff options
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 228 |
1 files changed, 117 insertions, 111 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 73c35c38f..cf0689626 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -72,7 +72,7 @@ class Connection(Connectable): # want to handle any of the engine's events in that case. self.dispatch = self.dispatch._join(engine.dispatch) self._has_events = _has_events or ( - _has_events is None and engine._has_events) + _has_events is None and engine._has_events) self._echo = self.engine._should_log_info() if _execution_options: @@ -94,11 +94,11 @@ class Connection(Connectable): """ return self.engine._connection_cls( - self.engine, - self.__connection, - _branch=True, - _has_events=self._has_events, - _dispatch=self.dispatch) + self.engine, + self.__connection, + _branch=True, + _has_events=self._has_events, + _dispatch=self.dispatch) def _clone(self): """Create a shallow copy of this Connection. @@ -239,8 +239,8 @@ class Connection(Connectable): if self.__can_reconnect and self.__invalid: if self.__transaction is not None: raise exc.InvalidRequestError( - "Can't reconnect until invalid " - "transaction is rolled back") + "Can't reconnect until invalid " + "transaction is rolled back") self.__connection = self.engine.raw_connection() self.__invalid = False return self.__connection @@ -324,10 +324,10 @@ class Connection(Connectable): :meth:`.Connection.invalidate` method is called, at the DBAPI level all state associated with this transaction is lost, as the DBAPI connection is closed. The :class:`.Connection` - will not allow a reconnection to proceed until the :class:`.Transaction` - object is ended, by calling the :meth:`.Transaction.rollback` - method; until that point, any attempt at continuing to use the - :class:`.Connection` will raise an + will not allow a reconnection to proceed until the + :class:`.Transaction` object is ended, by calling the + :meth:`.Transaction.rollback` method; until that point, any attempt at + continuing to use the :class:`.Connection` will raise an :class:`~sqlalchemy.exc.InvalidRequestError`. This is to prevent applications from accidentally continuing an ongoing transactional operations despite the @@ -335,8 +335,8 @@ class Connection(Connectable): invalidation. The :meth:`.Connection.invalidate` method, just like auto-invalidation, - will at the connection pool level invoke the :meth:`.PoolEvents.invalidate` - event. + will at the connection pool level invoke the + :meth:`.PoolEvents.invalidate` event. .. seealso:: @@ -585,7 +585,8 @@ class Connection(Connectable): if self._still_open_and_connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) try: - self.engine.dialect.do_rollback_twophase(self, xid, is_prepared) + self.engine.dialect.do_rollback_twophase( + self, xid, is_prepared) finally: if self.connection._reset_agent is self.__transaction: self.connection._reset_agent = None @@ -722,8 +723,8 @@ class Connection(Connectable): meth = object._execute_on_connection except AttributeError: raise exc.InvalidRequestError( - "Unexecutable object type: %s" % - type(object)) + "Unexecutable object type: %s" % + type(object)) else: return meth(self, multiparams, params) @@ -731,7 +732,7 @@ class Connection(Connectable): """Execute a sql.FunctionElement object.""" return self._execute_clauseelement(func.select(), - multiparams, params) + multiparams, params) def _execute_default(self, default, multiparams, params): """Execute a schema.ColumnDefault object.""" @@ -749,7 +750,7 @@ class Connection(Connectable): dialect = self.dialect ctx = dialect.execution_ctx_cls._init_default( - dialect, self, conn) + dialect, self, conn) except Exception as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -759,7 +760,7 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, - default, multiparams, params, ret) + default, multiparams, params, ret) return ret @@ -783,7 +784,7 @@ class Connection(Connectable): ) if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, - ddl, multiparams, params, ret) + ddl, multiparams, params, ret) return ret def _execute_clauseelement(self, elem, multiparams, params): @@ -809,13 +810,13 @@ class Connection(Connectable): compiled_sql = self._execution_options['compiled_cache'][key] else: compiled_sql = elem.compile( - dialect=dialect, column_keys=keys, - inline=len(distilled_params) > 1) + dialect=dialect, column_keys=keys, + inline=len(distilled_params) > 1) self._execution_options['compiled_cache'][key] = compiled_sql else: compiled_sql = elem.compile( - dialect=dialect, column_keys=keys, - inline=len(distilled_params) > 1) + dialect=dialect, column_keys=keys, + inline=len(distilled_params) > 1) ret = self._execute_context( dialect, @@ -826,7 +827,7 @@ class Connection(Connectable): ) if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, - elem, multiparams, params, ret) + elem, multiparams, params, ret) return ret def _execute_compiled(self, compiled, multiparams, params): @@ -848,7 +849,7 @@ class Connection(Connectable): ) if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, - compiled, multiparams, params, ret) + compiled, multiparams, params, ret) return ret def _execute_text(self, statement, multiparams, params): @@ -870,12 +871,12 @@ class Connection(Connectable): ) if self._has_events or self.engine._has_events: self.dispatch.after_execute(self, - statement, multiparams, params, ret) + statement, multiparams, params, ret) return ret def _execute_context(self, dialect, constructor, - statement, parameters, - *args): + statement, parameters, + *args): """Create an :class:`.ExecutionContext` and execute, returning a :class:`.ResultProxy`.""" @@ -888,15 +889,15 @@ class Connection(Connectable): context = constructor(dialect, self, conn, *args) except Exception as e: self._handle_dbapi_exception(e, - util.text_type(statement), parameters, - None, None) + util.text_type(statement), parameters, + None, None) if context.compiled: context.pre_exec() cursor, statement, parameters = context.cursor, \ - context.statement, \ - context.parameters + context.statement, \ + context.parameters if not context.executemany: parameters = parameters[0] @@ -904,62 +905,64 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: for fn in self.dispatch.before_cursor_execute: statement, parameters = \ - fn(self, cursor, statement, parameters, - context, context.executemany) + fn(self, cursor, statement, parameters, + context, context.executemany) if self._echo: self.engine.logger.info(statement) - self.engine.logger.info("%r", - sql_util._repr_params(parameters, batches=10)) + self.engine.logger.info( + "%r", + sql_util._repr_params(parameters, batches=10) + ) try: if context.executemany: for fn in () if not self.dialect._has_events \ - else self.dialect.dispatch.do_executemany: + else self.dialect.dispatch.do_executemany: if fn(cursor, statement, parameters, context): break else: self.dialect.do_executemany( - cursor, - statement, - parameters, - context) + cursor, + statement, + parameters, + context) elif not parameters and context.no_parameters: for fn in () if not self.dialect._has_events \ - else self.dialect.dispatch.do_execute_no_params: + else self.dialect.dispatch.do_execute_no_params: if fn(cursor, statement, context): break else: self.dialect.do_execute_no_params( - cursor, - statement, - context) + cursor, + statement, + context) else: for fn in () if not self.dialect._has_events \ - else self.dialect.dispatch.do_execute: + else self.dialect.dispatch.do_execute: if fn(cursor, statement, parameters, context): break else: self.dialect.do_execute( - cursor, - statement, - parameters, - context) + cursor, + statement, + parameters, + context) except Exception as e: self._handle_dbapi_exception( - e, - statement, - parameters, - cursor, - context) + e, + statement, + parameters, + cursor, + context) if self._has_events or self.engine._has_events: self.dispatch.after_cursor_execute(self, cursor, - statement, - parameters, - context, - context.executemany) + statement, + parameters, + context, + context.executemany) if context.compiled: context.post_exec() @@ -1012,38 +1015,38 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: for fn in self.dispatch.before_cursor_execute: statement, parameters = \ - fn(self, cursor, statement, parameters, - context, - False) + fn(self, cursor, statement, parameters, + context, + False) if self._echo: self.engine.logger.info(statement) self.engine.logger.info("%r", parameters) try: for fn in () if not self.dialect._has_events \ - else self.dialect.dispatch.do_execute: + else self.dialect.dispatch.do_execute: if fn(cursor, statement, parameters, context): break else: self.dialect.do_execute( - cursor, - statement, - parameters, - context) + cursor, + statement, + parameters, + context) except Exception as e: self._handle_dbapi_exception( - e, - statement, - parameters, - cursor, - context) + e, + statement, + parameters, + cursor, + context) if self._has_events or self.engine._has_events: self.dispatch.after_cursor_execute(self, cursor, - statement, - parameters, - context, - False) + statement, + parameters, + context, + False) def _safe_close_cursor(self, cursor): """Close the given cursor, catching exceptions @@ -1057,17 +1060,17 @@ class Connection(Connectable): except Exception: # log the error through the connection pool's logger. self.engine.pool.logger.error( - "Error closing cursor", exc_info=True) + "Error closing cursor", exc_info=True) _reentrant_error = False _is_disconnect = False def _handle_dbapi_exception(self, - e, - statement, - parameters, - cursor, - context): + e, + statement, + parameters, + cursor, + context): exc_info = sys.exc_info() @@ -1084,12 +1087,12 @@ class Connection(Connectable): if self._reentrant_error: util.raise_from_cause( - exc.DBAPIError.instance(statement, - parameters, - e, - self.dialect.dbapi.Error), - exc_info - ) + exc.DBAPIError.instance(statement, + parameters, + e, + self.dialect.dbapi.Error), + exc_info + ) self._reentrant_error = True try: # non-DBAPI error - if we already got a context, @@ -1113,11 +1116,11 @@ class Connection(Connectable): # legacy dbapi_error event if should_wrap and context: self.dispatch.dbapi_error(self, - cursor, - statement, - parameters, - context, - e) + cursor, + statement, + parameters, + context, + e) # new handle_error event ctx = ExceptionContextImpl( @@ -1153,9 +1156,9 @@ class Connection(Connectable): util.raise_from_cause(newraise, exc_info) elif should_wrap: util.raise_from_cause( - sqlalchemy_exception, - exc_info - ) + sqlalchemy_exception, + exc_info + ) else: util.reraise(*exc_info) @@ -1240,15 +1243,15 @@ class Connection(Connectable): def _run_visitor(self, visitorcallable, element, **kwargs): visitorcallable(self.dialect, self, - **kwargs).traverse_single(element) + **kwargs).traverse_single(element) class ExceptionContextImpl(ExceptionContext): """Implement the :class:`.ExceptionContext` interface.""" def __init__(self, exception, sqlalchemy_exception, - connection, cursor, statement, parameters, - context, is_disconnect): + connection, cursor, statement, parameters, + context, is_disconnect): self.connection = connection self.sqlalchemy_exception = sqlalchemy_exception self.original_exception = exception @@ -1371,6 +1374,7 @@ class NestedTransaction(Transaction): The interface is the same as that of :class:`.Transaction`. """ + def __init__(self, connection, parent): super(NestedTransaction, self).__init__(connection, parent) self._savepoint = self.connection._savepoint_impl() @@ -1378,12 +1382,12 @@ class NestedTransaction(Transaction): def _do_rollback(self): if self.is_active: self.connection._rollback_to_savepoint_impl( - self._savepoint, self._parent) + self._savepoint, self._parent) def _do_commit(self): if self.is_active: self.connection._release_savepoint_impl( - self._savepoint, self._parent) + self._savepoint, self._parent) class TwoPhaseTransaction(Transaction): @@ -1396,6 +1400,7 @@ class TwoPhaseTransaction(Transaction): with the addition of the :meth:`prepare` method. """ + def __init__(self, connection, xid): super(TwoPhaseTransaction, self).__init__(connection, None) self._is_prepared = False @@ -1442,9 +1447,9 @@ class Engine(Connectable, log.Identified): _connection_cls = Connection def __init__(self, pool, dialect, url, - logging_name=None, echo=None, proxy=None, - execution_options=None - ): + logging_name=None, echo=None, proxy=None, + execution_options=None + ): self.pool = pool self.url = url self.dialect = dialect @@ -1477,7 +1482,7 @@ class Engine(Connectable, log.Identified): """ self._execution_options = \ - self._execution_options.union(opt) + self._execution_options.union(opt) self.dispatch.set_engine_execution_options(self, opt) self.dialect.set_engine_execution_options(self, opt) @@ -1526,7 +1531,8 @@ class Engine(Connectable, log.Identified): shards = {"default": "base", shard_1: "db1", "shard_2": "db2"} @event.listens_for(Engine, "before_cursor_execute") - def _switch_shard(conn, cursor, stmt, params, context, executemany): + def _switch_shard(conn, cursor, stmt, + params, context, executemany): shard_id = conn._execution_options.get('shard_id', "default") current_shard = conn.info.get("current_shard", None) @@ -1606,7 +1612,7 @@ class Engine(Connectable, log.Identified): yield connection def _run_visitor(self, visitorcallable, element, - connection=None, **kwargs): + connection=None, **kwargs): with self._optional_conn_ctx_manager(connection) as conn: conn._run_visitor(visitorcallable, element, **kwargs) @@ -1813,8 +1819,8 @@ class Engine(Connectable, log.Identified): .. seealso:: - :ref:`metadata_reflection_inspector` - detailed schema inspection using - the :class:`.Inspector` interface. + :ref:`metadata_reflection_inspector` - detailed schema inspection + using the :class:`.Inspector` interface. :class:`.quoted_name` - used to pass quoting information along with a schema identifier. |