summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py228
1 files changed, 117 insertions, 111 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 73c35c38f..cf0689626 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -72,7 +72,7 @@ class Connection(Connectable):
# want to handle any of the engine's events in that case.
self.dispatch = self.dispatch._join(engine.dispatch)
self._has_events = _has_events or (
- _has_events is None and engine._has_events)
+ _has_events is None and engine._has_events)
self._echo = self.engine._should_log_info()
if _execution_options:
@@ -94,11 +94,11 @@ class Connection(Connectable):
"""
return self.engine._connection_cls(
- self.engine,
- self.__connection,
- _branch=True,
- _has_events=self._has_events,
- _dispatch=self.dispatch)
+ self.engine,
+ self.__connection,
+ _branch=True,
+ _has_events=self._has_events,
+ _dispatch=self.dispatch)
def _clone(self):
"""Create a shallow copy of this Connection.
@@ -239,8 +239,8 @@ class Connection(Connectable):
if self.__can_reconnect and self.__invalid:
if self.__transaction is not None:
raise exc.InvalidRequestError(
- "Can't reconnect until invalid "
- "transaction is rolled back")
+ "Can't reconnect until invalid "
+ "transaction is rolled back")
self.__connection = self.engine.raw_connection()
self.__invalid = False
return self.__connection
@@ -324,10 +324,10 @@ class Connection(Connectable):
:meth:`.Connection.invalidate` method is called, at the DBAPI
level all state associated with this transaction is lost, as
the DBAPI connection is closed. The :class:`.Connection`
- will not allow a reconnection to proceed until the :class:`.Transaction`
- object is ended, by calling the :meth:`.Transaction.rollback`
- method; until that point, any attempt at continuing to use the
- :class:`.Connection` will raise an
+ will not allow a reconnection to proceed until the
+ :class:`.Transaction` object is ended, by calling the
+ :meth:`.Transaction.rollback` method; until that point, any attempt at
+ continuing to use the :class:`.Connection` will raise an
:class:`~sqlalchemy.exc.InvalidRequestError`.
This is to prevent applications from accidentally
continuing an ongoing transactional operations despite the
@@ -335,8 +335,8 @@ class Connection(Connectable):
invalidation.
The :meth:`.Connection.invalidate` method, just like auto-invalidation,
- will at the connection pool level invoke the :meth:`.PoolEvents.invalidate`
- event.
+ will at the connection pool level invoke the
+ :meth:`.PoolEvents.invalidate` event.
.. seealso::
@@ -585,7 +585,8 @@ class Connection(Connectable):
if self._still_open_and_connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
try:
- self.engine.dialect.do_rollback_twophase(self, xid, is_prepared)
+ self.engine.dialect.do_rollback_twophase(
+ self, xid, is_prepared)
finally:
if self.connection._reset_agent is self.__transaction:
self.connection._reset_agent = None
@@ -722,8 +723,8 @@ class Connection(Connectable):
meth = object._execute_on_connection
except AttributeError:
raise exc.InvalidRequestError(
- "Unexecutable object type: %s" %
- type(object))
+ "Unexecutable object type: %s" %
+ type(object))
else:
return meth(self, multiparams, params)
@@ -731,7 +732,7 @@ class Connection(Connectable):
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(func.select(),
- multiparams, params)
+ multiparams, params)
def _execute_default(self, default, multiparams, params):
"""Execute a schema.ColumnDefault object."""
@@ -749,7 +750,7 @@ class Connection(Connectable):
dialect = self.dialect
ctx = dialect.execution_ctx_cls._init_default(
- dialect, self, conn)
+ dialect, self, conn)
except Exception as e:
self._handle_dbapi_exception(e, None, None, None, None)
@@ -759,7 +760,7 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(self,
- default, multiparams, params, ret)
+ default, multiparams, params, ret)
return ret
@@ -783,7 +784,7 @@ class Connection(Connectable):
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(self,
- ddl, multiparams, params, ret)
+ ddl, multiparams, params, ret)
return ret
def _execute_clauseelement(self, elem, multiparams, params):
@@ -809,13 +810,13 @@ class Connection(Connectable):
compiled_sql = self._execution_options['compiled_cache'][key]
else:
compiled_sql = elem.compile(
- dialect=dialect, column_keys=keys,
- inline=len(distilled_params) > 1)
+ dialect=dialect, column_keys=keys,
+ inline=len(distilled_params) > 1)
self._execution_options['compiled_cache'][key] = compiled_sql
else:
compiled_sql = elem.compile(
- dialect=dialect, column_keys=keys,
- inline=len(distilled_params) > 1)
+ dialect=dialect, column_keys=keys,
+ inline=len(distilled_params) > 1)
ret = self._execute_context(
dialect,
@@ -826,7 +827,7 @@ class Connection(Connectable):
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(self,
- elem, multiparams, params, ret)
+ elem, multiparams, params, ret)
return ret
def _execute_compiled(self, compiled, multiparams, params):
@@ -848,7 +849,7 @@ class Connection(Connectable):
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(self,
- compiled, multiparams, params, ret)
+ compiled, multiparams, params, ret)
return ret
def _execute_text(self, statement, multiparams, params):
@@ -870,12 +871,12 @@ class Connection(Connectable):
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(self,
- statement, multiparams, params, ret)
+ statement, multiparams, params, ret)
return ret
def _execute_context(self, dialect, constructor,
- statement, parameters,
- *args):
+ statement, parameters,
+ *args):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`.ResultProxy`."""
@@ -888,15 +889,15 @@ class Connection(Connectable):
context = constructor(dialect, self, conn, *args)
except Exception as e:
self._handle_dbapi_exception(e,
- util.text_type(statement), parameters,
- None, None)
+ util.text_type(statement), parameters,
+ None, None)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = context.cursor, \
- context.statement, \
- context.parameters
+ context.statement, \
+ context.parameters
if not context.executemany:
parameters = parameters[0]
@@ -904,62 +905,64 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = \
- fn(self, cursor, statement, parameters,
- context, context.executemany)
+ fn(self, cursor, statement, parameters,
+ context, context.executemany)
if self._echo:
self.engine.logger.info(statement)
- self.engine.logger.info("%r",
- sql_util._repr_params(parameters, batches=10))
+ self.engine.logger.info(
+ "%r",
+ sql_util._repr_params(parameters, batches=10)
+ )
try:
if context.executemany:
for fn in () if not self.dialect._has_events \
- else self.dialect.dispatch.do_executemany:
+ else self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
break
else:
self.dialect.do_executemany(
- cursor,
- statement,
- parameters,
- context)
+ cursor,
+ statement,
+ parameters,
+ context)
elif not parameters and context.no_parameters:
for fn in () if not self.dialect._has_events \
- else self.dialect.dispatch.do_execute_no_params:
+ else self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
break
else:
self.dialect.do_execute_no_params(
- cursor,
- statement,
- context)
+ cursor,
+ statement,
+ context)
else:
for fn in () if not self.dialect._has_events \
- else self.dialect.dispatch.do_execute:
+ else self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
break
else:
self.dialect.do_execute(
- cursor,
- statement,
- parameters,
- context)
+ cursor,
+ statement,
+ parameters,
+ context)
except Exception as e:
self._handle_dbapi_exception(
- e,
- statement,
- parameters,
- cursor,
- context)
+ e,
+ statement,
+ parameters,
+ cursor,
+ context)
if self._has_events or self.engine._has_events:
self.dispatch.after_cursor_execute(self, cursor,
- statement,
- parameters,
- context,
- context.executemany)
+ statement,
+ parameters,
+ context,
+ context.executemany)
if context.compiled:
context.post_exec()
@@ -1012,38 +1015,38 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = \
- fn(self, cursor, statement, parameters,
- context,
- False)
+ fn(self, cursor, statement, parameters,
+ context,
+ False)
if self._echo:
self.engine.logger.info(statement)
self.engine.logger.info("%r", parameters)
try:
for fn in () if not self.dialect._has_events \
- else self.dialect.dispatch.do_execute:
+ else self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
break
else:
self.dialect.do_execute(
- cursor,
- statement,
- parameters,
- context)
+ cursor,
+ statement,
+ parameters,
+ context)
except Exception as e:
self._handle_dbapi_exception(
- e,
- statement,
- parameters,
- cursor,
- context)
+ e,
+ statement,
+ parameters,
+ cursor,
+ context)
if self._has_events or self.engine._has_events:
self.dispatch.after_cursor_execute(self, cursor,
- statement,
- parameters,
- context,
- False)
+ statement,
+ parameters,
+ context,
+ False)
def _safe_close_cursor(self, cursor):
"""Close the given cursor, catching exceptions
@@ -1057,17 +1060,17 @@ class Connection(Connectable):
except Exception:
# log the error through the connection pool's logger.
self.engine.pool.logger.error(
- "Error closing cursor", exc_info=True)
+ "Error closing cursor", exc_info=True)
_reentrant_error = False
_is_disconnect = False
def _handle_dbapi_exception(self,
- e,
- statement,
- parameters,
- cursor,
- context):
+ e,
+ statement,
+ parameters,
+ cursor,
+ context):
exc_info = sys.exc_info()
@@ -1084,12 +1087,12 @@ class Connection(Connectable):
if self._reentrant_error:
util.raise_from_cause(
- exc.DBAPIError.instance(statement,
- parameters,
- e,
- self.dialect.dbapi.Error),
- exc_info
- )
+ exc.DBAPIError.instance(statement,
+ parameters,
+ e,
+ self.dialect.dbapi.Error),
+ exc_info
+ )
self._reentrant_error = True
try:
# non-DBAPI error - if we already got a context,
@@ -1113,11 +1116,11 @@ class Connection(Connectable):
# legacy dbapi_error event
if should_wrap and context:
self.dispatch.dbapi_error(self,
- cursor,
- statement,
- parameters,
- context,
- e)
+ cursor,
+ statement,
+ parameters,
+ context,
+ e)
# new handle_error event
ctx = ExceptionContextImpl(
@@ -1153,9 +1156,9 @@ class Connection(Connectable):
util.raise_from_cause(newraise, exc_info)
elif should_wrap:
util.raise_from_cause(
- sqlalchemy_exception,
- exc_info
- )
+ sqlalchemy_exception,
+ exc_info
+ )
else:
util.reraise(*exc_info)
@@ -1240,15 +1243,15 @@ class Connection(Connectable):
def _run_visitor(self, visitorcallable, element, **kwargs):
visitorcallable(self.dialect, self,
- **kwargs).traverse_single(element)
+ **kwargs).traverse_single(element)
class ExceptionContextImpl(ExceptionContext):
"""Implement the :class:`.ExceptionContext` interface."""
def __init__(self, exception, sqlalchemy_exception,
- connection, cursor, statement, parameters,
- context, is_disconnect):
+ connection, cursor, statement, parameters,
+ context, is_disconnect):
self.connection = connection
self.sqlalchemy_exception = sqlalchemy_exception
self.original_exception = exception
@@ -1371,6 +1374,7 @@ class NestedTransaction(Transaction):
The interface is the same as that of :class:`.Transaction`.
"""
+
def __init__(self, connection, parent):
super(NestedTransaction, self).__init__(connection, parent)
self._savepoint = self.connection._savepoint_impl()
@@ -1378,12 +1382,12 @@ class NestedTransaction(Transaction):
def _do_rollback(self):
if self.is_active:
self.connection._rollback_to_savepoint_impl(
- self._savepoint, self._parent)
+ self._savepoint, self._parent)
def _do_commit(self):
if self.is_active:
self.connection._release_savepoint_impl(
- self._savepoint, self._parent)
+ self._savepoint, self._parent)
class TwoPhaseTransaction(Transaction):
@@ -1396,6 +1400,7 @@ class TwoPhaseTransaction(Transaction):
with the addition of the :meth:`prepare` method.
"""
+
def __init__(self, connection, xid):
super(TwoPhaseTransaction, self).__init__(connection, None)
self._is_prepared = False
@@ -1442,9 +1447,9 @@ class Engine(Connectable, log.Identified):
_connection_cls = Connection
def __init__(self, pool, dialect, url,
- logging_name=None, echo=None, proxy=None,
- execution_options=None
- ):
+ logging_name=None, echo=None, proxy=None,
+ execution_options=None
+ ):
self.pool = pool
self.url = url
self.dialect = dialect
@@ -1477,7 +1482,7 @@ class Engine(Connectable, log.Identified):
"""
self._execution_options = \
- self._execution_options.union(opt)
+ self._execution_options.union(opt)
self.dispatch.set_engine_execution_options(self, opt)
self.dialect.set_engine_execution_options(self, opt)
@@ -1526,7 +1531,8 @@ class Engine(Connectable, log.Identified):
shards = {"default": "base", shard_1: "db1", "shard_2": "db2"}
@event.listens_for(Engine, "before_cursor_execute")
- def _switch_shard(conn, cursor, stmt, params, context, executemany):
+ def _switch_shard(conn, cursor, stmt,
+ params, context, executemany):
shard_id = conn._execution_options.get('shard_id', "default")
current_shard = conn.info.get("current_shard", None)
@@ -1606,7 +1612,7 @@ class Engine(Connectable, log.Identified):
yield connection
def _run_visitor(self, visitorcallable, element,
- connection=None, **kwargs):
+ connection=None, **kwargs):
with self._optional_conn_ctx_manager(connection) as conn:
conn._run_visitor(visitorcallable, element, **kwargs)
@@ -1813,8 +1819,8 @@ class Engine(Connectable, log.Identified):
.. seealso::
- :ref:`metadata_reflection_inspector` - detailed schema inspection using
- the :class:`.Inspector` interface.
+ :ref:`metadata_reflection_inspector` - detailed schema inspection
+ using the :class:`.Inspector` interface.
:class:`.quoted_name` - used to pass quoting information along
with a schema identifier.