summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/base.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-02-17 13:43:04 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-03-01 09:09:02 -0500
commita4bb502cf95ea3523e4d383c4377e50f402d7d52 (patch)
tree124400f741b6b91f0e9e582b510268607394dfaa /lib/sqlalchemy/engine/base.py
parent60fca2ac8cf44bdaf68552ab5c69854a6776c73c (diff)
downloadsqlalchemy-a4bb502cf95ea3523e4d383c4377e50f402d7d52.tar.gz
pep-484 for engine
All modules in sqlalchemy.engine are strictly typed with the exception of cursor, default, and reflection. cursor and default pass with non-strict typing, reflection is waiting on the multi-reflection refactor. Behavioral changes: * create_connect_args() methods return a tuple of list, dict, rather than a list of list, dict * removed allow_chars parameter from pyodbc connector ._get_server_version_info() method * the parameter list passed to do_executemany is now a list in all cases. previously, this was being run through dialect.execute_sequence_format, which defaults to tuple and was only intended for individual tuple params. * broke up dialect.dbapi into dialect.import_dbapi class method and dialect.dbapi module object. added a deprecation path for legacy dialects. it's not really feasible to type a single attr as a classmethod vs. module type. The "type_compiler" attribute also has this problem with greater ability to work around, left that one for now. * lots of constants changing to be Enum, so that we can type them. for fixed tuple-position constants in cursor.py / compiler.py (which are used to avoid the speed overhead of namedtuple), using Literal[value] which seems to work well * some tightening up in Row regarding __getitem__, which we can do since we are on full 2.0 style result use * altered the set_connection_execution_options and set_engine_execution_options event flows so that the dictionary of options may be mutated within the event hook, where it will then take effect as the actual options used. Previously, changing the dict would be silently ignored which seems counter-intuitive and not very useful. * A lot of DefaultDialect/DefaultExecutionContext methods and attributes, including underscored ones, move to interfaces. This is not fully ideal as it means the Dialect/ExecutionContext interfaces aren't publicly subclassable directly, but their current purpose is more of documentation for dialect authors who should (and certainly are) still be subclassing the DefaultXYZ versions in all cases Overall, Result was the most extremely difficult class hierarchy to type here as this hierarchy passes through largely amorphous "row" datatypes throughout, which can in fact by all kinds of different things, like raw DBAPI rows, or Row objects, or "scalar"/Any, but at the same time these types have meaning so I tried still maintaining some level of semantic markings for these, it highlights how complex Result is now, as it's trying to be extremely efficient and inlined while also being very open-ended and extensible. Change-Id: I98b75c0c09eab5355fc7a33ba41dd9874274f12a
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
-rw-r--r--lib/sqlalchemy/engine/base.py698
1 files changed, 445 insertions, 253 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 8c99f6309..5ce531338 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -10,13 +10,24 @@ import contextlib
import sys
import typing
from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import Iterator
+from typing import List
from typing import Mapping
+from typing import MutableMapping
+from typing import NoReturn
from typing import Optional
+from typing import Tuple
+from typing import Type
from typing import Union
from .interfaces import BindTyping
from .interfaces import ConnectionEventsTarget
+from .interfaces import DBAPICursor
from .interfaces import ExceptionContext
+from .interfaces import ExecutionContext
from .util import _distill_params_20
from .util import _distill_raw_params
from .util import TransactionalContext
@@ -26,22 +37,48 @@ from .. import log
from .. import util
from ..sql import compiler
from ..sql import util as sql_util
-from ..sql._typing import _ExecuteOptions
-from ..sql._typing import _ExecuteParams
+
+_CompiledCacheType = MutableMapping[Any, Any]
if typing.TYPE_CHECKING:
+ from . import Result
+ from . import ScalarResult
+ from .interfaces import _AnyExecuteParams
+ from .interfaces import _AnyMultiExecuteParams
+ from .interfaces import _AnySingleExecuteParams
+ from .interfaces import _CoreAnyExecuteParams
+ from .interfaces import _CoreMultiExecuteParams
+ from .interfaces import _CoreSingleExecuteParams
+ from .interfaces import _DBAPIAnyExecuteParams
+ from .interfaces import _DBAPIMultiExecuteParams
+ from .interfaces import _DBAPISingleExecuteParams
+ from .interfaces import _ExecuteOptions
+ from .interfaces import _ExecuteOptionsParameter
+ from .interfaces import _SchemaTranslateMapType
from .interfaces import Dialect
from .reflection import Inspector # noqa
from .url import URL
+ from ..event import dispatcher
+ from ..log import _EchoFlagType
+ from ..pool import _ConnectionFairy
from ..pool import Pool
from ..pool import PoolProxiedConnection
+ from ..sql import Executable
+ from ..sql.base import SchemaVisitor
+ from ..sql.compiler import Compiled
+ from ..sql.ddl import DDLElement
+ from ..sql.ddl import SchemaDropper
+ from ..sql.ddl import SchemaGenerator
+ from ..sql.functions import FunctionElement
+ from ..sql.schema import ColumnDefault
+ from ..sql.schema import HasSchemaAttr
"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
"""
-_EMPTY_EXECUTION_OPTS = util.immutabledict()
-NO_OPTIONS = util.immutabledict()
+_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.immutabledict()
+NO_OPTIONS: Mapping[str, Any] = util.immutabledict()
class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
@@ -69,23 +106,32 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
+ dispatch: dispatcher[ConnectionEventsTarget]
+
_sqla_logger_namespace = "sqlalchemy.engine.Connection"
# used by sqlalchemy.engine.util.TransactionalContext
- _trans_context_manager = None
+ _trans_context_manager: Optional[TransactionalContext] = None
# legacy as of 2.0, should be eventually deprecated and
# removed. was used in the "pre_ping" recipe that's been in the docs
# a long time
should_close_with_result = False
+ _dbapi_connection: Optional[PoolProxiedConnection]
+
+ _execution_options: _ExecuteOptions
+
+ _transaction: Optional[RootTransaction]
+ _nested_transaction: Optional[NestedTransaction]
+
def __init__(
self,
- engine,
- connection=None,
- _has_events=None,
- _allow_revalidate=True,
- _allow_autobegin=True,
+ engine: Engine,
+ connection: Optional[PoolProxiedConnection] = None,
+ _has_events: Optional[bool] = None,
+ _allow_revalidate: bool = True,
+ _allow_autobegin: bool = True,
):
"""Construct a new Connection."""
self.engine = engine
@@ -125,14 +171,14 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.dispatch.engine_connect(self)
@util.memoized_property
- def _message_formatter(self):
+ def _message_formatter(self) -> Any:
if "logging_token" in self._execution_options:
token = self._execution_options["logging_token"]
return lambda msg: "[%s] %s" % (token, msg)
else:
return None
- def _log_info(self, message, *arg, **kw):
+ def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
fmt = self._message_formatter
if fmt:
@@ -143,7 +189,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.engine.logger.info(message, *arg, **kw)
- def _log_debug(self, message, *arg, **kw):
+ def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
fmt = self._message_formatter
if fmt:
@@ -155,19 +201,19 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self.engine.logger.debug(message, *arg, **kw)
@property
- def _schema_translate_map(self):
+ def _schema_translate_map(self) -> Optional[_SchemaTranslateMapType]:
return self._execution_options.get("schema_translate_map", None)
- def schema_for_object(self, obj):
+ def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
"""Return the schema name for the given schema item taking into
account current schema translate map.
"""
name = obj.schema
- schema_translate_map = self._execution_options.get(
- "schema_translate_map", None
- )
+ schema_translate_map: Optional[
+ Mapping[Optional[str], str]
+ ] = self._execution_options.get("schema_translate_map", None)
if (
schema_translate_map
@@ -178,13 +224,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
else:
return name
- def __enter__(self):
+ def __enter__(self) -> Connection:
return self
- def __exit__(self, type_, value, traceback):
+ def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
self.close()
- def execution_options(self, **opt):
+ def execution_options(self, **opt: Any) -> Connection:
r"""Set non-SQL options for the connection which take effect
during execution.
@@ -346,13 +392,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
ORM-specific execution options
""" # noqa
- self._execution_options = self._execution_options.union(opt)
if self._has_events or self.engine._has_events:
self.dispatch.set_connection_execution_options(self, opt)
+ self._execution_options = self._execution_options.union(opt)
self.dialect.set_connection_execution_options(self, opt)
return self
- def get_execution_options(self):
+ def get_execution_options(self) -> _ExecuteOptions:
"""Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
@@ -364,14 +410,27 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self._execution_options
@property
- def closed(self):
+ def _still_open_and_dbapi_connection_is_valid(self) -> bool:
+ pool_proxied_connection = self._dbapi_connection
+ return (
+ pool_proxied_connection is not None
+ and pool_proxied_connection.is_valid
+ )
+
+ @property
+ def closed(self) -> bool:
"""Return True if this connection is closed."""
return self._dbapi_connection is None and not self.__can_reconnect
@property
- def invalidated(self):
- """Return True if this connection was invalidated."""
+ def invalidated(self) -> bool:
+ """Return True if this connection was invalidated.
+
+ This does not indicate whether or not the connection was
+ invalidated at the pool level, however
+
+ """
# prior to 1.4, "invalid" was stored as a state independent of
# "closed", meaning an invalidated connection could be "closed",
@@ -382,10 +441,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
# "closed" does not need to be "invalid". So the state is now
# represented by the two facts alone.
- return self._dbapi_connection is None and not self.closed
+ pool_proxied_connection = self._dbapi_connection
+ return pool_proxied_connection is None and self.__can_reconnect
@property
- def connection(self) -> "PoolProxiedConnection":
+ def connection(self) -> PoolProxiedConnection:
"""The underlying DB-API connection managed by this Connection.
This is a SQLAlchemy connection-pool proxied connection
@@ -410,7 +470,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
else:
return self._dbapi_connection
- def get_isolation_level(self):
+ def get_isolation_level(self) -> str:
"""Return the current isolation level assigned to this
:class:`_engine.Connection`.
@@ -442,15 +502,15 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
- set per :class:`_engine.Connection` isolation level
"""
+ dbapi_connection = self.connection.dbapi_connection
+ assert dbapi_connection is not None
try:
- return self.dialect.get_isolation_level(
- self.connection.dbapi_connection
- )
+ return self.dialect.get_isolation_level(dbapi_connection)
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@property
- def default_isolation_level(self):
+ def default_isolation_level(self) -> str:
"""The default isolation level assigned to this
:class:`_engine.Connection`.
@@ -482,7 +542,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self.dialect.default_isolation_level
- def _invalid_transaction(self):
+ def _invalid_transaction(self) -> NoReturn:
raise exc.PendingRollbackError(
"Can't reconnect until invalid %stransaction is rolled "
"back. Please rollback() fully before proceeding"
@@ -490,7 +550,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
code="8s2b",
)
- def _revalidate_connection(self):
+ def _revalidate_connection(self) -> PoolProxiedConnection:
if self.__can_reconnect and self.invalidated:
if self._transaction is not None:
self._invalid_transaction()
@@ -499,13 +559,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
raise exc.ResourceClosedError("This Connection is closed")
@property
- def _still_open_and_dbapi_connection_is_valid(self):
- return self._dbapi_connection is not None and getattr(
- self._dbapi_connection, "is_valid", False
- )
-
- @property
- def info(self):
+ def info(self) -> Dict[str, Any]:
"""Info dictionary associated with the underlying DBAPI connection
referred to by this :class:`_engine.Connection`, allowing user-defined
data to be associated with the connection.
@@ -518,7 +572,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self.connection.info
- def invalidate(self, exception=None):
+ def invalidate(self, exception: Optional[BaseException] = None) -> None:
"""Invalidate the underlying DBAPI connection associated with
this :class:`_engine.Connection`.
@@ -567,14 +621,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self.invalidated:
return
+ # MARKMARK
if self.closed:
raise exc.ResourceClosedError("This Connection is closed")
if self._still_open_and_dbapi_connection_is_valid:
- self._dbapi_connection.invalidate(exception)
+ pool_proxied_connection = self._dbapi_connection
+ assert pool_proxied_connection is not None
+ pool_proxied_connection.invalidate(exception)
+
self._dbapi_connection = None
- def detach(self):
+ def detach(self) -> None:
"""Detach the underlying DB-API connection from its connection pool.
E.g.::
@@ -600,13 +658,21 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
- self._dbapi_connection.detach()
+ if self.closed:
+ raise exc.ResourceClosedError("This Connection is closed")
- def _autobegin(self):
- if self._allow_autobegin:
+ pool_proxied_connection = self._dbapi_connection
+ if pool_proxied_connection is None:
+ raise exc.InvalidRequestError(
+ "Can't detach an invalidated Connection"
+ )
+ pool_proxied_connection.detach()
+
+ def _autobegin(self) -> None:
+ if self._allow_autobegin and not self.__in_begin:
self.begin()
- def begin(self):
+ def begin(self) -> RootTransaction:
"""Begin a transaction prior to autobegin occurring.
E.g.::
@@ -671,14 +737,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
:class:`_engine.Engine`
"""
- if self.__in_begin:
- # for dialects that emit SQL within the process of
- # dialect.do_begin() or dialect.do_begin_twophase(), this
- # flag prevents "autobegin" from being emitted within that
- # process, while allowing self._transaction to remain at None
- # until it's complete.
- return
- elif self._transaction is None:
+ if self._transaction is None:
self._transaction = RootTransaction(self)
return self._transaction
else:
@@ -689,7 +748,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"is called first."
)
- def begin_nested(self):
+ def begin_nested(self) -> NestedTransaction:
"""Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
handle that controls the scope of the SAVEPOINT.
@@ -765,7 +824,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return NestedTransaction(self)
- def begin_twophase(self, xid=None):
+ def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
"""Begin a two-phase or XA transaction and return a transaction
handle.
@@ -794,7 +853,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
xid = self.engine.dialect.create_xid()
return TwoPhaseTransaction(self, xid)
- def commit(self):
+ def commit(self) -> None:
"""Commit the transaction that is currently in progress.
This method commits the current transaction if one has been started.
@@ -819,7 +878,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self._transaction:
self._transaction.commit()
- def rollback(self):
+ def rollback(self) -> None:
"""Roll back the transaction that is currently in progress.
This method rolls back the current transaction if one has been started.
@@ -845,33 +904,33 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if self._transaction:
self._transaction.rollback()
- def recover_twophase(self):
+ def recover_twophase(self) -> List[Any]:
return self.engine.dialect.do_recover_twophase(self)
- def rollback_prepared(self, xid, recover=False):
+ def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
- def commit_prepared(self, xid, recover=False):
+ def commit_prepared(self, xid: Any, recover: bool = False) -> None:
self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
- def in_transaction(self):
+ def in_transaction(self) -> bool:
"""Return True if a transaction is in progress."""
return self._transaction is not None and self._transaction.is_active
- def in_nested_transaction(self):
+ def in_nested_transaction(self) -> bool:
"""Return True if a transaction is in progress."""
return (
self._nested_transaction is not None
and self._nested_transaction.is_active
)
- def _is_autocommit(self):
- return (
+ def _is_autocommit_isolation(self) -> bool:
+ return bool(
self._execution_options.get("isolation_level", None)
== "AUTOCOMMIT"
)
- def get_transaction(self):
+ def get_transaction(self) -> Optional[RootTransaction]:
"""Return the current root transaction in progress, if any.
.. versionadded:: 1.4
@@ -880,7 +939,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return self._transaction
- def get_nested_transaction(self):
+ def get_nested_transaction(self) -> Optional[NestedTransaction]:
"""Return the current nested transaction in progress, if any.
.. versionadded:: 1.4
@@ -888,7 +947,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self._nested_transaction
- def _begin_impl(self, transaction):
+ def _begin_impl(self, transaction: RootTransaction) -> None:
if self._echo:
self._log_info("BEGIN (implicit)")
@@ -904,13 +963,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
finally:
self.__in_begin = False
- def _rollback_impl(self):
+ def _rollback_impl(self) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback(self)
if self._still_open_and_dbapi_connection_is_valid:
if self._echo:
- if self._is_autocommit():
+ if self._is_autocommit_isolation():
self._log_info(
"ROLLBACK using DBAPI connection.rollback(), "
"DBAPI should ignore due to autocommit mode"
@@ -922,13 +981,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _commit_impl(self):
+ def _commit_impl(self) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.commit(self)
if self._echo:
- if self._is_autocommit():
+ if self._is_autocommit_isolation():
self._log_info(
"COMMIT using DBAPI connection.commit(), "
"DBAPI should ignore due to autocommit mode"
@@ -940,58 +999,54 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _savepoint_impl(self, name=None):
+ def _savepoint_impl(self, name: Optional[str] = None) -> str:
if self._has_events or self.engine._has_events:
self.dispatch.savepoint(self, name)
if name is None:
self.__savepoint_seq += 1
name = "sa_savepoint_%s" % self.__savepoint_seq
- if self._still_open_and_dbapi_connection_is_valid:
- self.engine.dialect.do_savepoint(self, name)
- return name
+ self.engine.dialect.do_savepoint(self, name)
+ return name
- def _rollback_to_savepoint_impl(self, name):
+ def _rollback_to_savepoint_impl(self, name: str) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback_savepoint(self, name, None)
if self._still_open_and_dbapi_connection_is_valid:
self.engine.dialect.do_rollback_to_savepoint(self, name)
- def _release_savepoint_impl(self, name):
+ def _release_savepoint_impl(self, name: str) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.release_savepoint(self, name, None)
- if self._still_open_and_dbapi_connection_is_valid:
- self.engine.dialect.do_release_savepoint(self, name)
+ self.engine.dialect.do_release_savepoint(self, name)
- def _begin_twophase_impl(self, transaction):
+ def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
if self._echo:
self._log_info("BEGIN TWOPHASE (implicit)")
if self._has_events or self.engine._has_events:
self.dispatch.begin_twophase(self, transaction.xid)
- if self._still_open_and_dbapi_connection_is_valid:
- self.__in_begin = True
- try:
- self.engine.dialect.do_begin_twophase(self, transaction.xid)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
- finally:
- self.__in_begin = False
+ self.__in_begin = True
+ try:
+ self.engine.dialect.do_begin_twophase(self, transaction.xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
+ finally:
+ self.__in_begin = False
- def _prepare_twophase_impl(self, xid):
+ def _prepare_twophase_impl(self, xid: Any) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.prepare_twophase(self, xid)
- if self._still_open_and_dbapi_connection_is_valid:
- assert isinstance(self._transaction, TwoPhaseTransaction)
- try:
- self.engine.dialect.do_prepare_twophase(self, xid)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
+ try:
+ self.engine.dialect.do_prepare_twophase(self, xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
- def _rollback_twophase_impl(self, xid, is_prepared):
+ def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.rollback_twophase(self, xid, is_prepared)
@@ -1004,18 +1059,17 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- def _commit_twophase_impl(self, xid, is_prepared):
+ def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
if self._has_events or self.engine._has_events:
self.dispatch.commit_twophase(self, xid, is_prepared)
- if self._still_open_and_dbapi_connection_is_valid:
- assert isinstance(self._transaction, TwoPhaseTransaction)
- try:
- self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
+ try:
+ self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
- def close(self):
+ def close(self) -> None:
"""Close this :class:`_engine.Connection`.
This results in a release of the underlying database
@@ -1050,7 +1104,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
# as we just closed the transaction, close the connection
# pool connection without doing an additional reset
if skip_reset:
- conn._close_no_reset()
+ cast("_ConnectionFairy", conn)._close_no_reset()
else:
conn.close()
@@ -1061,7 +1115,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self._dbapi_connection = None
self.__can_reconnect = False
- def scalar(self, statement, parameters=None, execution_options=None):
+ def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Any:
r"""Executes a SQL statement construct and returns a scalar object.
This method is shorthand for invoking the
@@ -1074,7 +1133,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"""
return self.execute(statement, parameters, execution_options).scalar()
- def scalars(self, statement, parameters=None, execution_options=None):
+ def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> ScalarResult:
"""Executes and returns a scalar result set, which yields scalar values
from the first column of each row.
@@ -1093,10 +1157,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def execute(
self,
- statement,
- parameters: Optional[_ExecuteParams] = None,
- execution_options: Optional[_ExecuteOptions] = None,
- ):
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptionsParameter] = None,
+ ) -> Result:
r"""Executes a SQL statement construct and returns a
:class:`_engine.Result`.
@@ -1140,7 +1204,12 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
execution_options or NO_OPTIONS,
)
- def _execute_function(self, func, distilled_parameters, execution_options):
+ def _execute_function(
+ self,
+ func: FunctionElement[Any],
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(
@@ -1148,14 +1217,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
)
def _execute_default(
- self, default, distilled_parameters, execution_options
- ):
+ self,
+ default: ColumnDefault,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Any:
"""Execute a schema.ColumnDefault object."""
execution_options = self._execution_options.merge_with(
execution_options
)
+ event_multiparams: Optional[_CoreMultiExecuteParams]
+ event_params: Optional[_CoreAnyExecuteParams]
+
# note for event handlers, the "distilled parameters" which is always
# a list of dicts is broken out into separate "multiparams" and
# "params" collections, which allows the handler to distinguish
@@ -1169,6 +1244,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
) = self._invoke_before_exec_event(
default, distilled_parameters, execution_options
)
+ else:
+ event_multiparams = event_params = None
try:
conn = self._dbapi_connection
@@ -1198,13 +1275,21 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
- def _execute_ddl(self, ddl, distilled_parameters, execution_options):
+ def _execute_ddl(
+ self,
+ ddl: DDLElement,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a schema.DDL object."""
execution_options = ddl._execution_options.merge_with(
self._execution_options, execution_options
)
+ event_multiparams: Optional[_CoreMultiExecuteParams]
+ event_params: Optional[_CoreSingleExecuteParams]
+
if self._has_events or self.engine._has_events:
(
ddl,
@@ -1214,6 +1299,8 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
) = self._invoke_before_exec_event(
ddl, distilled_parameters, execution_options
)
+ else:
+ event_multiparams = event_params = None
exec_opts = self._execution_options.merge_with(execution_options)
schema_translate_map = exec_opts.get("schema_translate_map", None)
@@ -1243,8 +1330,19 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
def _invoke_before_exec_event(
- self, elem, distilled_params, execution_options
- ):
+ self,
+ elem: Any,
+ distilled_params: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Tuple[
+ Any,
+ _CoreMultiExecuteParams,
+ _CoreMultiExecuteParams,
+ _CoreSingleExecuteParams,
+ ]:
+
+ event_multiparams: _CoreMultiExecuteParams
+ event_params: _CoreSingleExecuteParams
if len(distilled_params) == 1:
event_multiparams, event_params = [], distilled_params[0]
@@ -1275,8 +1373,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return elem, distilled_params, event_multiparams, event_params
def _execute_clauseelement(
- self, elem, distilled_parameters, execution_options
- ):
+ self,
+ elem: Executable,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptions,
+ ) -> Result:
"""Execute a sql.ClauseElement object."""
execution_options = elem._execution_options.merge_with(
@@ -1309,7 +1410,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"schema_translate_map", None
)
- compiled_cache = execution_options.get(
+ compiled_cache: _CompiledCacheType = execution_options.get(
"compiled_cache", self.engine._compiled_cache
)
@@ -1346,10 +1447,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def _execute_compiled(
self,
- compiled,
- distilled_parameters,
- execution_options=_EMPTY_EXECUTION_OPTS,
- ):
+ compiled: Compiled,
+ distilled_parameters: _CoreMultiExecuteParams,
+ execution_options: _ExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
+ ) -> Result:
"""Execute a sql.Compiled object.
TODO: why do we have this? likely deprecate or remove
@@ -1395,8 +1496,11 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
return ret
def exec_driver_sql(
- self, statement, parameters=None, execution_options=None
- ):
+ self,
+ statement: str,
+ parameters: Optional[_DBAPIAnyExecuteParams] = None,
+ execution_options: Optional[_ExecuteOptions] = None,
+ ) -> Result:
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
@@ -1456,7 +1560,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
- distilled_parameters,
+ None,
execution_options,
statement,
distilled_parameters,
@@ -1466,14 +1570,14 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
def _execute_context(
self,
- dialect,
- constructor,
- statement,
- parameters,
- execution_options,
- *args,
- **kw,
- ):
+ dialect: Dialect,
+ constructor: Callable[..., ExecutionContext],
+ statement: Union[str, Compiled],
+ parameters: Optional[_AnyMultiExecuteParams],
+ execution_options: _ExecuteOptions,
+ *args: Any,
+ **kw: Any,
+ ) -> Result:
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
@@ -1491,7 +1595,6 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self._handle_dbapi_exception(
e, str(statement), parameters, None, None
)
- return # not reached
if (
self._transaction
@@ -1514,29 +1617,33 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if dialect.bind_typing is BindTyping.SETINPUTSIZES:
context._set_input_sizes()
- cursor, statement, parameters = (
+ cursor, str_statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
+ effective_parameters: Optional[_AnyExecuteParams]
+
if not context.executemany:
- parameters = parameters[0]
+ effective_parameters = parameters[0]
+ else:
+ effective_parameters = parameters
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
- statement, parameters = fn(
+ str_statement, effective_parameters = fn(
self,
cursor,
- statement,
- parameters,
+ str_statement,
+ effective_parameters,
context,
context.executemany,
)
if self._echo:
- self._log_info(statement)
+ self._log_info(str_statement)
stats = context._get_cache_stats()
@@ -1545,7 +1652,9 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
"[%s] %r",
stats,
sql_util._repr_params(
- parameters, batches=10, ismulti=context.executemany
+ effective_parameters,
+ batches=10,
+ ismulti=context.executemany,
),
)
else:
@@ -1554,45 +1663,61 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
% (stats,)
)
- evt_handled = False
+ evt_handled: bool = False
try:
if context.executemany:
+ effective_parameters = cast(
+ "_CoreMultiExecuteParams", effective_parameters
+ )
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
- if fn(cursor, statement, parameters, context):
+ if fn(
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ ):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
- cursor, statement, parameters, context
+ cursor, str_statement, effective_parameters, context
)
- elif not parameters and context.no_parameters:
+ elif not effective_parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
- if fn(cursor, statement, context):
+ if fn(cursor, str_statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
- cursor, statement, context
+ cursor, str_statement, context
)
else:
+ effective_parameters = cast(
+ "_CoreSingleExecuteParams", effective_parameters
+ )
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
- if fn(cursor, statement, parameters, context):
+ if fn(
+ cursor,
+ str_statement,
+ effective_parameters,
+ context,
+ ):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute(
- cursor, statement, parameters, context
+ cursor, str_statement, effective_parameters, context
)
if self._has_events or self.engine._has_events:
self.dispatch.after_cursor_execute(
self,
cursor,
- statement,
- parameters,
+ str_statement,
+ effective_parameters,
context,
context.executemany,
)
@@ -1603,12 +1728,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
except BaseException as e:
self._handle_dbapi_exception(
- e, statement, parameters, cursor, context
+ e, str_statement, effective_parameters, cursor, context
)
return result
- def _cursor_execute(self, cursor, statement, parameters, context=None):
+ def _cursor_execute(
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ parameters: _DBAPISingleExecuteParams,
+ context: Optional[ExecutionContext] = None,
+ ) -> None:
"""Execute a statement + params on the given cursor.
Adds appropriate logging and exception handling.
@@ -1648,7 +1779,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
self, cursor, statement, parameters, context, False
)
- def _safe_close_cursor(self, cursor):
+ def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
"""Close the given cursor, catching exceptions
and turning into log warnings.
@@ -1665,8 +1796,13 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
_is_disconnect = False
def _handle_dbapi_exception(
- self, e, statement, parameters, cursor, context
- ):
+ self,
+ e: BaseException,
+ statement: Optional[str],
+ parameters: Optional[_AnyExecuteParams],
+ cursor: Optional[DBAPICursor],
+ context: Optional[ExecutionContext],
+ ) -> NoReturn:
exc_info = sys.exc_info()
is_exit_exception = util.is_exit_exception(e)
@@ -1708,7 +1844,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
sqlalchemy_exception = exc.DBAPIError.instance(
statement,
parameters,
- e,
+ cast(Exception, e),
self.dialect.dbapi.Error,
hide_parameters=self.engine.hide_parameters,
connection_invalidated=self._is_disconnect,
@@ -1784,8 +1920,10 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if newraise:
raise newraise.with_traceback(exc_info[2]) from e
elif should_wrap:
+ assert sqlalchemy_exception is not None
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
else:
+ assert exc_info[1] is not None
raise exc_info[1].with_traceback(exc_info[2])
finally:
del self._reentrant_error
@@ -1793,15 +1931,20 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
del self._is_disconnect
if not self.invalidated:
dbapi_conn_wrapper = self._dbapi_connection
+ assert dbapi_conn_wrapper is not None
if invalidate_pool_on_disconnect:
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
@classmethod
- def _handle_dbapi_exception_noconnection(cls, e, dialect, engine):
+ def _handle_dbapi_exception_noconnection(
+ cls, e: BaseException, dialect: Dialect, engine: Engine
+ ) -> NoReturn:
exc_info = sys.exc_info()
- is_disconnect = dialect.is_disconnect(e, None, None)
+ is_disconnect = isinstance(
+ e, dialect.dbapi.Error
+ ) and dialect.is_disconnect(e, None, None)
should_wrap = isinstance(e, dialect.dbapi.Error)
@@ -1809,7 +1952,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
sqlalchemy_exception = exc.DBAPIError.instance(
None,
None,
- e,
+ cast(Exception, e),
dialect.dbapi.Error,
hide_parameters=engine.hide_parameters,
connection_invalidated=is_disconnect,
@@ -1852,11 +1995,18 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
if newraise:
raise newraise.with_traceback(exc_info[2]) from e
elif should_wrap:
+ assert sqlalchemy_exception is not None
raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
else:
+ assert exc_info[1] is not None
raise exc_info[1].with_traceback(exc_info[2])
- def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ def _run_ddl_visitor(
+ self,
+ visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
+ element: DDLElement,
+ **kwargs: Any,
+ ) -> None:
"""run a DDL visitor.
This method is only here so that the MockConnection can change the
@@ -1871,16 +2021,16 @@ class ExceptionContextImpl(ExceptionContext):
def __init__(
self,
- exception,
- sqlalchemy_exception,
- engine,
- connection,
- cursor,
- statement,
- parameters,
- context,
- is_disconnect,
- invalidate_pool_on_disconnect,
+ exception: BaseException,
+ sqlalchemy_exception: Optional[exc.StatementError],
+ engine: Optional[Engine],
+ connection: Optional[Connection],
+ cursor: Optional[DBAPICursor],
+ statement: Optional[str],
+ parameters: Optional[_DBAPIAnyExecuteParams],
+ context: Optional[ExecutionContext],
+ is_disconnect: bool,
+ invalidate_pool_on_disconnect: bool,
):
self.engine = engine
self.connection = connection
@@ -1932,33 +2082,35 @@ class Transaction(TransactionalContext):
__slots__ = ()
- _is_root = False
+ _is_root: bool = False
+ is_active: bool
+ connection: Connection
- def __init__(self, connection):
+ def __init__(self, connection: Connection):
raise NotImplementedError()
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
"""True if this transaction is totally deactivated from the connection
and therefore can no longer affect its state.
"""
raise NotImplementedError()
- def _do_close(self):
+ def _do_close(self) -> None:
raise NotImplementedError()
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
raise NotImplementedError()
- def _do_commit(self):
+ def _do_commit(self) -> None:
raise NotImplementedError()
@property
- def is_valid(self):
+ def is_valid(self) -> bool:
return self.is_active and not self.connection.invalidated
- def close(self):
+ def close(self) -> None:
"""Close this :class:`.Transaction`.
If this transaction is the base transaction in a begin/commit
@@ -1974,7 +2126,7 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def rollback(self):
+ def rollback(self) -> None:
"""Roll back this :class:`.Transaction`.
The implementation of this may vary based on the type of transaction in
@@ -1996,7 +2148,7 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def commit(self):
+ def commit(self) -> None:
"""Commit this :class:`.Transaction`.
The implementation of this may vary based on the type of transaction in
@@ -2017,16 +2169,16 @@ class Transaction(TransactionalContext):
finally:
assert not self.is_active
- def _get_subject(self):
+ def _get_subject(self) -> Connection:
return self.connection
- def _transaction_is_active(self):
+ def _transaction_is_active(self) -> bool:
return self.is_active
- def _transaction_is_closed(self):
+ def _transaction_is_closed(self) -> bool:
return not self._deactivated_from_connection
- def _rollback_can_be_called(self):
+ def _rollback_can_be_called(self) -> bool:
# for RootTransaction / NestedTransaction, it's safe to call
# rollback() even if the transaction is deactive and no warnings
# will be emitted. tested in
@@ -2060,7 +2212,7 @@ class RootTransaction(Transaction):
__slots__ = ("connection", "is_active")
- def __init__(self, connection):
+ def __init__(self, connection: Connection):
assert connection._transaction is None
if connection._trans_context_manager:
TransactionalContext._trans_ctx_check(connection)
@@ -2070,7 +2222,7 @@ class RootTransaction(Transaction):
self.is_active = True
- def _deactivate_from_connection(self):
+ def _deactivate_from_connection(self) -> None:
if self.is_active:
assert self.connection._transaction is self
self.is_active = False
@@ -2079,19 +2231,19 @@ class RootTransaction(Transaction):
util.warn("transaction already deassociated from connection")
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
return self.connection._transaction is not self
- def _connection_begin_impl(self):
+ def _connection_begin_impl(self) -> None:
self.connection._begin_impl(self)
- def _connection_rollback_impl(self):
+ def _connection_rollback_impl(self) -> None:
self.connection._rollback_impl()
- def _connection_commit_impl(self):
+ def _connection_commit_impl(self) -> None:
self.connection._commit_impl()
- def _close_impl(self, try_deactivate=False):
+ def _close_impl(self, try_deactivate: bool = False) -> None:
try:
if self.is_active:
self._connection_rollback_impl()
@@ -2107,13 +2259,13 @@ class RootTransaction(Transaction):
assert not self.is_active
assert self.connection._transaction is not self
- def _do_close(self):
+ def _do_close(self) -> None:
self._close_impl()
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
self._close_impl(try_deactivate=True)
- def _do_commit(self):
+ def _do_commit(self) -> None:
if self.is_active:
assert self.connection._transaction is self
@@ -2176,7 +2328,9 @@ class NestedTransaction(Transaction):
__slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
- def __init__(self, connection):
+ _savepoint: str
+
+ def __init__(self, connection: Connection):
assert connection._transaction is not None
if connection._trans_context_manager:
TransactionalContext._trans_ctx_check(connection)
@@ -2186,7 +2340,7 @@ class NestedTransaction(Transaction):
self._previous_nested = connection._nested_transaction
connection._nested_transaction = self
- def _deactivate_from_connection(self, warn=True):
+ def _deactivate_from_connection(self, warn: bool = True) -> None:
if self.connection._nested_transaction is self:
self.connection._nested_transaction = self._previous_nested
elif warn:
@@ -2195,10 +2349,10 @@ class NestedTransaction(Transaction):
)
@property
- def _deactivated_from_connection(self):
+ def _deactivated_from_connection(self) -> bool:
return self.connection._nested_transaction is not self
- def _cancel(self):
+ def _cancel(self) -> None:
# called by RootTransaction when the outer transaction is
# committed, rolled back, or closed to cancel all savepoints
# without any action being taken
@@ -2207,9 +2361,15 @@ class NestedTransaction(Transaction):
if self._previous_nested:
self._previous_nested._cancel()
- def _close_impl(self, deactivate_from_connection, warn_already_deactive):
+ def _close_impl(
+ self, deactivate_from_connection: bool, warn_already_deactive: bool
+ ) -> None:
try:
- if self.is_active and self.connection._transaction.is_active:
+ if (
+ self.is_active
+ and self.connection._transaction
+ and self.connection._transaction.is_active
+ ):
self.connection._rollback_to_savepoint_impl(self._savepoint)
finally:
self.is_active = False
@@ -2221,13 +2381,13 @@ class NestedTransaction(Transaction):
if deactivate_from_connection:
assert self.connection._nested_transaction is not self
- def _do_close(self):
+ def _do_close(self) -> None:
self._close_impl(True, False)
- def _do_rollback(self):
+ def _do_rollback(self) -> None:
self._close_impl(True, True)
- def _do_commit(self):
+ def _do_commit(self) -> None:
if self.is_active:
try:
self.connection._release_savepoint_impl(self._savepoint)
@@ -2261,12 +2421,14 @@ class TwoPhaseTransaction(RootTransaction):
__slots__ = ("xid", "_is_prepared")
- def __init__(self, connection, xid):
+ xid: Any
+
+ def __init__(self, connection: Connection, xid: Any):
self._is_prepared = False
self.xid = xid
super(TwoPhaseTransaction, self).__init__(connection)
- def prepare(self):
+ def prepare(self) -> None:
"""Prepare this :class:`.TwoPhaseTransaction`.
After a PREPARE, the transaction can be committed.
@@ -2277,13 +2439,13 @@ class TwoPhaseTransaction(RootTransaction):
self.connection._prepare_twophase_impl(self.xid)
self._is_prepared = True
- def _connection_begin_impl(self):
+ def _connection_begin_impl(self) -> None:
self.connection._begin_twophase_impl(self)
- def _connection_rollback_impl(self):
+ def _connection_rollback_impl(self) -> None:
self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
- def _connection_commit_impl(self):
+ def _connection_commit_impl(self) -> None:
self.connection._commit_twophase_impl(self.xid, self._is_prepared)
@@ -2310,17 +2472,23 @@ class Engine(
"""
- _execution_options = _EMPTY_EXECUTION_OPTS
- _has_events = False
- _connection_cls = Connection
- _sqla_logger_namespace = "sqlalchemy.engine.Engine"
- _is_future = False
+ dispatch: dispatcher[ConnectionEventsTarget]
- _schema_translate_map = None
+ _compiled_cache: Optional[_CompiledCacheType]
+
+ _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
+ _has_events: bool = False
+ _connection_cls: Type[Connection] = Connection
+ _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
+ _is_future: bool = False
+
+ _schema_translate_map: Optional[_SchemaTranslateMapType] = None
+ _option_cls: Type[OptionEngine]
dialect: Dialect
pool: Pool
url: URL
+ hide_parameters: bool
def __init__(
self,
@@ -2328,7 +2496,7 @@ class Engine(
dialect: Dialect,
url: URL,
logging_name: Optional[str] = None,
- echo: Union[None, str, bool] = None,
+ echo: Optional[_EchoFlagType] = None,
query_cache_size: int = 500,
execution_options: Optional[Mapping[str, Any]] = None,
hide_parameters: bool = False,
@@ -2350,7 +2518,7 @@ class Engine(
if execution_options:
self.update_execution_options(**execution_options)
- def _lru_size_alert(self, cache):
+ def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
if self._should_log_info:
self.logger.info(
"Compiled cache size pruning from %d items to %d. "
@@ -2360,10 +2528,10 @@ class Engine(
)
@property
- def engine(self):
+ def engine(self) -> Engine:
return self
- def clear_compiled_cache(self):
+ def clear_compiled_cache(self) -> None:
"""Clear the compiled cache associated with the dialect.
This applies **only** to the built-in cache that is established
@@ -2377,7 +2545,7 @@ class Engine(
if self._compiled_cache:
self._compiled_cache.clear()
- def update_execution_options(self, **opt):
+ def update_execution_options(self, **opt: Any) -> None:
r"""Update the default execution_options dictionary
of this :class:`_engine.Engine`.
@@ -2394,11 +2562,11 @@ class Engine(
:meth:`_engine.Engine.execution_options`
"""
- self._execution_options = self._execution_options.union(opt)
self.dispatch.set_engine_execution_options(self, opt)
+ self._execution_options = self._execution_options.union(opt)
self.dialect.set_engine_execution_options(self, opt)
- def execution_options(self, **opt):
+ def execution_options(self, **opt: Any) -> OptionEngine:
"""Return a new :class:`_engine.Engine` that will provide
:class:`_engine.Connection` objects with the given execution options.
@@ -2478,7 +2646,7 @@ class Engine(
""" # noqa E501
return self._option_cls(self, opt)
- def get_execution_options(self):
+ def get_execution_options(self) -> _ExecuteOptions:
"""Get the non-SQL options which will take effect during execution.
.. versionadded: 1.3
@@ -2490,14 +2658,14 @@ class Engine(
return self._execution_options
@property
- def name(self):
+ def name(self) -> str:
"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
in use by this :class:`Engine`."""
return self.dialect.name
@property
- def driver(self):
+ def driver(self) -> str:
"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
in use by this :class:`Engine`."""
@@ -2505,10 +2673,10 @@ class Engine(
echo = log.echo_property()
- def __repr__(self):
+ def __repr__(self) -> str:
return "Engine(%r)" % (self.url,)
- def dispose(self):
+ def dispose(self) -> None:
"""Dispose of the connection pool used by this
:class:`_engine.Engine`.
@@ -2538,7 +2706,9 @@ class Engine(
self.dispatch.engine_disposed(self)
@contextlib.contextmanager
- def _optional_conn_ctx_manager(self, connection=None):
+ def _optional_conn_ctx_manager(
+ self, connection: Optional[Connection] = None
+ ) -> Iterator[Connection]:
if connection is None:
with self.connect() as conn:
yield conn
@@ -2546,7 +2716,7 @@ class Engine(
yield connection
@contextlib.contextmanager
- def begin(self):
+ def begin(self) -> Iterator[Connection]:
"""Return a context manager delivering a :class:`_engine.Connection`
with a :class:`.Transaction` established.
@@ -2576,11 +2746,16 @@ class Engine(
with conn.begin():
yield conn
- def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ def _run_ddl_visitor(
+ self,
+ visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
+ element: DDLElement,
+ **kwargs: Any,
+ ) -> None:
with self.begin() as conn:
conn._run_ddl_visitor(visitorcallable, element, **kwargs)
- def connect(self):
+ def connect(self) -> Connection:
"""Return a new :class:`_engine.Connection` object.
The :class:`_engine.Connection` acts as a Python context manager, so
@@ -2605,7 +2780,7 @@ class Engine(
return self._connection_cls(self)
- def raw_connection(self):
+ def raw_connection(self) -> PoolProxiedConnection:
"""Return a "raw" DBAPI connection from the connection pool.
The returned object is a proxied version of the DBAPI
@@ -2630,10 +2805,20 @@ class Engine(
return self.pool.connect()
-class OptionEngineMixin:
+class OptionEngineMixin(log.Identified):
_sa_propagate_class_events = False
- def __init__(self, proxied, execution_options):
+ dispatch: dispatcher[ConnectionEventsTarget]
+ _compiled_cache: Optional[_CompiledCacheType]
+ dialect: Dialect
+ pool: Pool
+ url: URL
+ hide_parameters: bool
+ echo: log.echo_property
+
+ def __init__(
+ self, proxied: Engine, execution_options: _ExecuteOptionsParameter
+ ):
self._proxied = proxied
self.url = proxied.url
self.dialect = proxied.dialect
@@ -2660,27 +2845,34 @@ class OptionEngineMixin:
self._execution_options = proxied._execution_options
self.update_execution_options(**execution_options)
- def _get_pool(self):
- return self._proxied.pool
+ def update_execution_options(self, **opt: Any) -> None:
+ raise NotImplementedError()
- def _set_pool(self, pool):
- self._proxied.pool = pool
+ if not typing.TYPE_CHECKING:
+ # https://github.com/python/typing/discussions/1095
- pool = property(_get_pool, _set_pool)
+ @property
+ def pool(self) -> Pool:
+ return self._proxied.pool
- def _get_has_events(self):
- return self._proxied._has_events or self.__dict__.get(
- "_has_events", False
- )
+ @pool.setter
+ def pool(self, pool: Pool) -> None:
+ self._proxied.pool = pool
- def _set_has_events(self, value):
- self.__dict__["_has_events"] = value
+ @property
+ def _has_events(self) -> bool:
+ return self._proxied._has_events or self.__dict__.get(
+ "_has_events", False
+ )
- _has_events = property(_get_has_events, _set_has_events)
+ @_has_events.setter
+ def _has_events(self, value: bool) -> None:
+ self.__dict__["_has_events"] = value
class OptionEngine(OptionEngineMixin, Engine):
- pass
+ def update_execution_options(self, **opt: Any) -> None:
+ Engine.update_execution_options(self, **opt)
Engine._option_cls = OptionEngine