summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-12-18 16:33:22 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-12-26 13:48:55 -0500
commit4338213935b4133e36d593ceec75f7fe36c13f66 (patch)
tree526982c75061dfe2fea8d0c0f5093a4ebdadf587 /lib/sqlalchemy/ext
parentce8c0013169bdbe377ca21389f85051525814264 (diff)
downloadsqlalchemy-4338213935b4133e36d593ceec75f7fe36c13f66.tar.gz
reorganize pre_session_exec around do_orm_execute
Allow do_orm_execute() events to both receive the complete state of bind_argments, load_options, update_delete_options as they do already, but also allow them to *change* all those things via new execution options. Options like autoflush, populate_existing etc. can now be updated within a do_orm_execute() hook and those changes will take effect all the way through. Took a few tries to get something that covers every case here, in particular horizontal sharding which is consuming those options as well as using context.invoke(), without excess complexity. The good news seems to be that a simple reorg and replacing the "reentrant" boolean with "is this before do_orm_execute is invoked" was all that was needed. As part of this we add a new "identity_token" option allowing this option to be controlled from do_orm_execute() as well as from the outside. WIP Fixes: #7837 Change-Id: I087728215edec8d1b1712322ab389e3f52ff76ba
Diffstat (limited to 'lib/sqlalchemy/ext')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py138
1 files changed, 80 insertions, 58 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index 69767ad6c..fd53c6046 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -13,11 +13,14 @@ distribute queries and persistence operations across multiple databases.
For a usage example, see the :ref:`examples_sharding` example included in
the source distribution.
-.. legacy:: The horizontal sharding API is not fully updated for the
- SQLAlchemy 2.0 API, and still relies in part on the
- legacy :class:`.Query` architecture, in particular as part of the
- signature for the :paramref:`.ShardedSession.id_chooser` parameter.
- This may change in a future release.
+.. deepalchemy:: The horizontal sharding extension is an advanced feature,
+ involving a complex statement -> database interaction as well as
+ use of semi-public APIs for non-trivial cases. Simpler approaches to
+ refering to multiple database "shards", most commonly using a distinct
+ :class:`_orm.Session` per "shard", should always be considered first
+ before using this more complex and less-production-tested system.
+
+
"""
from __future__ import annotations
@@ -38,8 +41,11 @@ from .. import exc
from .. import inspect
from .. import util
from ..orm import PassiveFlag
+from ..orm._typing import OrmExecuteOptionsParameter
from ..orm.mapper import Mapper
from ..orm.query import Query
+from ..orm.session import _BindArguments
+from ..orm.session import _PKIdentityArgument
from ..orm.session import Session
from ..util.typing import Protocol
@@ -80,6 +86,20 @@ class ShardChooser(Protocol):
...
+class IdentityChooser(Protocol):
+ def __call__(
+ self,
+ mapper: Mapper[_T],
+ primary_key: _PKIdentityArgument,
+ *,
+ lazy_loaded_from: Optional[InstanceState[Any]],
+ execution_options: OrmExecuteOptionsParameter,
+ bind_arguments: _BindArguments,
+ **kw: Any,
+ ) -> Any:
+ ...
+
+
class ShardedQuery(Query[_T]):
"""Query class used with :class:`.ShardedSession`.
@@ -94,8 +114,7 @@ class ShardedQuery(Query[_T]):
super().__init__(*args, **kwargs)
assert isinstance(self.session, ShardedSession)
- self.id_chooser = self.session.id_chooser
- self.query_chooser = self.session.query_chooser
+ self.identity_chooser = self.session.identity_chooser
self.execute_chooser = self.session.execute_chooser
self._shard_id = None
@@ -119,19 +138,22 @@ class ShardedQuery(Query[_T]):
class ShardedSession(Session):
shard_chooser: ShardChooser
- id_chooser: Callable[[Query[Any], Iterable[Any]], Iterable[Any]]
+ identity_chooser: IdentityChooser
execute_chooser: Callable[[ORMExecuteState], Iterable[Any]]
def __init__(
self,
shard_chooser: ShardChooser,
- id_chooser: Callable[[Query[_T], Iterable[_T]], Iterable[Any]],
+ identity_chooser: Optional[IdentityChooser] = None,
execute_chooser: Optional[
Callable[[ORMExecuteState], Iterable[Any]]
] = None,
shards: Optional[Dict[str, Any]] = None,
query_cls: Type[Query[_T]] = ShardedQuery,
*,
+ id_chooser: Optional[
+ Callable[[Query[_T], Iterable[_T]], Iterable[Any]]
+ ] = None,
query_chooser: Optional[Callable[[Executable], Iterable[Any]]] = None,
**kwargs: Any,
) -> None:
@@ -171,12 +193,41 @@ class ShardedSession(Session):
self, "do_orm_execute", execute_and_instances, retval=True
)
self.shard_chooser = shard_chooser
- self.id_chooser = id_chooser
+
+ if id_chooser:
+ _id_chooser = id_chooser
+ util.warn_deprecated(
+ "The ``id_chooser`` parameter is deprecated; "
+ "please use ``identity_chooser``.",
+ "2.0",
+ )
+
+ def _legacy_identity_chooser(
+ mapper: Mapper[_T],
+ primary_key: _PKIdentityArgument,
+ *,
+ lazy_loaded_from: Optional[InstanceState[Any]],
+ execution_options: OrmExecuteOptionsParameter,
+ bind_arguments: _BindArguments,
+ **kw: Any,
+ ) -> Any:
+ q = self.query(mapper)
+ if lazy_loaded_from:
+ q = q._set_lazyload_from(lazy_loaded_from)
+ return _id_chooser(q, primary_key)
+
+ self.identity_chooser = _legacy_identity_chooser
+ elif identity_chooser:
+ self.identity_chooser = identity_chooser
+ else:
+ raise exc.ArgumentError(
+ "identity_chooser or id_chooser is required"
+ )
if query_chooser:
_query_chooser = query_chooser
util.warn_deprecated(
- "The ``query_choser`` parameter is deprecated; "
+ "The ``query_chooser`` parameter is deprecated; "
"please use ``execute_chooser``.",
"1.4",
)
@@ -199,7 +250,6 @@ class ShardedSession(Session):
"execute_chooser or query_chooser is required"
)
self.execute_chooser = execute_chooser
- self.query_chooser = query_chooser
self.__shards: Dict[_ShardKey, _SessionBind] = {}
if shards is not None:
for k in shards:
@@ -212,6 +262,8 @@ class ShardedSession(Session):
identity_token: Optional[Any] = None,
passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
lazy_loaded_from: Optional[InstanceState[Any]] = None,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
**kw: Any,
) -> Union[Optional[_O], LoaderCallableStatus]:
"""override the default :meth:`.Session._identity_lookup` method so
@@ -233,10 +285,13 @@ class ShardedSession(Session):
return obj
else:
- q = self.query(mapper)
- if lazy_loaded_from:
- q = q._set_lazyload_from(lazy_loaded_from)
- for shard_id in self.id_chooser(q, primary_key_identity):
+ for shard_id in self.identity_chooser(
+ mapper,
+ primary_key_identity,
+ lazy_loaded_from=lazy_loaded_from,
+ execution_options=execution_options,
+ bind_arguments=dict(bind_arguments) if bind_arguments else {},
+ ):
obj2 = super()._identity_lookup(
mapper,
primary_key_identity,
@@ -325,11 +380,6 @@ class ShardedSession(Session):
def execute_and_instances(
orm_context: ORMExecuteState,
) -> Union[Result[_T], IteratorResult[_TP]]:
- update_options: Union[
- None,
- BulkUDCompileState.default_update_options,
- Type[BulkUDCompileState.default_update_options],
- ]
active_options: Union[
None,
QueryContext.default_load_options,
@@ -337,58 +387,30 @@ def execute_and_instances(
BulkUDCompileState.default_update_options,
Type[BulkUDCompileState.default_update_options],
]
- load_options: Union[
- None,
- QueryContext.default_load_options,
- Type[QueryContext.default_load_options],
- ]
if orm_context.is_select:
- load_options = active_options = orm_context.load_options
- update_options = None
+ active_options = orm_context.load_options
elif orm_context.is_update or orm_context.is_delete:
- load_options = None
- update_options = active_options = orm_context.update_delete_options
+ active_options = orm_context.update_delete_options
else:
- load_options = update_options = active_options = None
+ active_options = None
session = orm_context.session
assert isinstance(session, ShardedSession)
def iter_for_shard(
shard_id: str,
- load_options: Union[
- None,
- QueryContext.default_load_options,
- Type[QueryContext.default_load_options],
- ],
- update_options: Union[
- None,
- BulkUDCompileState.default_update_options,
- Type[BulkUDCompileState.default_update_options],
- ],
) -> Union[Result[_T], IteratorResult[_TP]]:
- execution_options = dict(orm_context.local_execution_options)
bind_arguments = dict(orm_context.bind_arguments)
bind_arguments["shard_id"] = shard_id
- if orm_context.is_select:
- assert load_options is not None
- load_options += {"_refresh_identity_token": shard_id}
- execution_options["_sa_orm_load_options"] = load_options
- elif orm_context.is_update or orm_context.is_delete:
- assert update_options is not None
- update_options += {"_refresh_identity_token": shard_id}
- execution_options["_sa_orm_update_options"] = update_options
-
- return orm_context.invoke_statement(
- bind_arguments=bind_arguments, execution_options=execution_options
- )
+ orm_context.update_execution_options(identity_token=shard_id)
+ return orm_context.invoke_statement(bind_arguments=bind_arguments)
- if active_options and active_options._refresh_identity_token is not None:
- shard_id = active_options._refresh_identity_token
+ if active_options and active_options._identity_token is not None:
+ shard_id = active_options._identity_token
elif "_sa_shard_id" in orm_context.execution_options:
shard_id = orm_context.execution_options["_sa_shard_id"]
elif "shard_id" in orm_context.bind_arguments:
@@ -397,10 +419,10 @@ def execute_and_instances(
shard_id = None
if shard_id is not None:
- return iter_for_shard(shard_id, load_options, update_options)
+ return iter_for_shard(shard_id)
else:
partial = []
for shard_id in session.execute_chooser(orm_context):
- result_ = iter_for_shard(shard_id, load_options, update_options)
+ result_ = iter_for_shard(shard_id)
partial.append(result_)
return partial[0].merge(*partial[1:])