summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/horizontal_shard.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-05-25 22:36:44 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-28 14:38:56 -0400
commit77f1b7d236dba6b1c859bb428ef32d118ec372e6 (patch)
tree7fae8eaaf303d6ce02bd423abf216550001e2f7b /lib/sqlalchemy/ext/horizontal_shard.py
parent366e88ea0e5c5417184c1dd4776cff752560631d (diff)
downloadsqlalchemy-77f1b7d236dba6b1c859bb428ef32d118ec372e6.tar.gz
callcount reductions and refinement for cached queries
This commit includes that we've removed the "_orm_query" attribute from compile state as well as query context. The attribute created reference cycles and also added method call overhead. As part of this change, the interface for ORMExecuteState changes a bit, as well as the interface for the horizontal sharding extension which now deprecates the "query_chooser" callable in favor of "execute_chooser", which receives the contextual object. This will also work more nicely when we implement the new execution path for bulk updates and deletes. Pre-merge execution options for statement, connection, arguments all up front in Connection. that way they can be passed to the before_execute / after_execute events, and the ExecutionContext doesn't have to merge as second time. Core execute is pretty close to 1.3 now. baked wasn't using the new one()/first()/one_or_none() methods, fixed that. Convert non-buffered cursor strategy to be a stateless singleton. inline all the paths by which the strategy gets chosen, oracle and SQL Server dialects make use of the already-invoked post_exec() hook to establish the alternate strategies, and this is actually much nicer than it was before. Add caching to mapper instance processor for getters. Identified a reference cycle per query that was showing up as a lot of gc cleanup, fixed that. After all that, performance not budging much. Even test_baked_query now runs with significantly fewer function calls than 1.3, still 40% slower. Basically something about the new patterns just makes this slower and while I've walked a whole bunch of them back, it hardly makes a dent. that said, the performance issues are relatively small, in the 20-40% time increase range, and the new caching feature does provide for regular ORM and Core queries that are cached, and they are faster than non-cached. Change-Id: I7b0b0d8ca550c05f79e82f75cd8eff0bbfade053
Diffstat (limited to 'lib/sqlalchemy/ext/horizontal_shard.py')
-rw-r--r--lib/sqlalchemy/ext/horizontal_shard.py53
1 files changed, 38 insertions, 15 deletions
diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py
index 1375a24cd..c3ac71c10 100644
--- a/lib/sqlalchemy/ext/horizontal_shard.py
+++ b/lib/sqlalchemy/ext/horizontal_shard.py
@@ -15,8 +15,10 @@ the source distribution.
"""
-from sqlalchemy import event
+from .. import event
+from .. import exc
from .. import inspect
+from .. import util
from ..orm.query import Query
from ..orm.session import Session
@@ -28,6 +30,7 @@ class ShardedQuery(Query):
super(ShardedQuery, self).__init__(*args, **kwargs)
self.id_chooser = self.session.id_chooser
self.query_chooser = self.session.query_chooser
+ self.execute_chooser = self.session.execute_chooser
self._shard_id = None
def set_shard(self, shard_id):
@@ -45,10 +48,7 @@ class ShardedQuery(Query):
)
"""
-
- q = self._clone()
- q._shard_id = shard_id
- return q
+ return self.execution_options(_sa_shard_id=shard_id)
def _execute_crud(self, stmt, mapper):
def exec_for_shard(shard_id):
@@ -68,7 +68,8 @@ class ShardedQuery(Query):
else:
rowcount = 0
results = []
- for shard_id in self.query_chooser(self):
+ # TODO: this will have to be the new object
+ for shard_id in self.execute_chooser(self):
result = exec_for_shard(shard_id)
rowcount += result.rowcount
results.append(result)
@@ -107,7 +108,7 @@ class ShardedSession(Session):
self,
shard_chooser,
id_chooser,
- query_chooser,
+ execute_chooser=None,
shards=None,
query_cls=ShardedQuery,
**kwargs
@@ -125,14 +126,19 @@ class ShardedSession(Session):
values, which should return a list of shard ids where the ID might
reside. The databases will be queried in the order of this listing.
- :param query_chooser: For a given Query, returns the list of shard_ids
+ :param execute_chooser: For a given :class:`.ORMExecuteState`,
+ returns the list of shard_ids
where the query should be issued. Results from all shards returned
will be combined together into a single listing.
+ .. versionchanged:: 1.4 The ``execute_chooser`` paramter
+ supersedes the ``query_chooser`` parameter.
+
:param shards: A dictionary of string shard names
to :class:`~sqlalchemy.engine.Engine` objects.
"""
+ query_chooser = kwargs.pop("query_chooser", None)
super(ShardedSession, self).__init__(query_cls=query_cls, **kwargs)
event.listen(
@@ -140,6 +146,25 @@ class ShardedSession(Session):
)
self.shard_chooser = shard_chooser
self.id_chooser = id_chooser
+
+ if query_chooser:
+ util.warn_deprecated(
+ "The ``query_choser`` parameter is deprecated; "
+ "please use ``execute_chooser``.",
+ "1.4",
+ )
+ if execute_chooser:
+ raise exc.ArgumentError(
+ "Can't pass query_chooser and execute_chooser "
+ "at the same time."
+ )
+
+ def execute_chooser(orm_context):
+ return query_chooser(orm_context.statement)
+
+ self.execute_chooser = execute_chooser
+ else:
+ self.execute_chooser = execute_chooser
self.query_chooser = query_chooser
self.__binds = {}
if shards is not None:
@@ -241,13 +266,13 @@ def execute_and_instances(orm_context):
load_options = orm_context.load_options
session = orm_context.session
- orm_query = orm_context.orm_query
+ # orm_query = orm_context.orm_query
if params is None:
params = load_options._params
def iter_for_shard(shard_id, load_options):
- execution_options = dict(orm_context.execution_options)
+ execution_options = dict(orm_context.local_execution_options)
bind_arguments = dict(orm_context.bind_arguments)
bind_arguments["_horizontal_shard"] = True
@@ -265,8 +290,8 @@ def execute_and_instances(orm_context):
if load_options._refresh_identity_token is not None:
shard_id = load_options._refresh_identity_token
- elif orm_query is not None and orm_query._shard_id is not None:
- shard_id = orm_query._shard_id
+ elif "_sa_shard_id" in orm_context.merged_execution_options:
+ shard_id = orm_context.merged_execution_options["_sa_shard_id"]
elif "shard_id" in orm_context.bind_arguments:
shard_id = orm_context.bind_arguments["shard_id"]
else:
@@ -276,9 +301,7 @@ def execute_and_instances(orm_context):
return iter_for_shard(shard_id, load_options)
else:
partial = []
- for shard_id in session.query_chooser(
- orm_query if orm_query is not None else orm_context.statement
- ):
+ for shard_id in session.execute_chooser(orm_context):
result_ = iter_for_shard(shard_id, load_options)
partial.append(result_)