summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/orm/loading.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-04-24 15:34:19 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-02 11:18:13 -0400
commitbbf644862ab05734d153d74abf59aa3492278563 (patch)
tree0a563147603ff2906422ed1b07e9f5f03e7584c8 /lib/sqlalchemy/orm/loading.py
parent7acf9af1ce74a0bda4c4d29af7da543b5c42b3f8 (diff)
downloadsqlalchemy-bbf644862ab05734d153d74abf59aa3492278563.tar.gz
Integrate new Result into ORM query
The next step in the 2.0 ORM changes is to have the ORM integrate with the new Result object fully. this patch uses Result to represent ORM objects rather than lists. public API to get at this Result is not added yet. dogpile.cache and horizontal sharding recipe/extensions have small adjustments to accommodate this change. Callcounts have fluctuated, some slightly better and some slightly worse. A few have gone up by a bit, however as the codebase is still in flux it is anticipated there will be some performance gains later on as ORM fetching is refined to no longer need to accommodate for extensive aliasing. The addition of caching will then change the entire story. References: #5087 References: #4395 Change-Id: If1a23824ffb77d8d58cf2338cf35dd6b5963b17f
Diffstat (limited to 'lib/sqlalchemy/orm/loading.py')
-rw-r--r--lib/sqlalchemy/orm/loading.py100
1 files changed, 62 insertions, 38 deletions
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py
index d781df980..10d937945 100644
--- a/lib/sqlalchemy/orm/loading.py
+++ b/lib/sqlalchemy/orm/loading.py
@@ -29,9 +29,11 @@ from .util import state_str
from .. import exc as sa_exc
from .. import util
from ..engine import result_tuple
+from ..engine.result import ChunkedIteratorResult
+from ..engine.result import FrozenResult
+from ..engine.result import SimpleResultMetaData
from ..sql import util as sql_util
-
_new_runid = util.counter()
@@ -41,20 +43,7 @@ def instances(query, cursor, context):
context.runid = _new_runid()
context.post_load_paths = {}
- filtered = query._has_mapper_entities
-
- single_entity = query.is_single_entity
-
- if filtered:
- if single_entity:
- filter_fn = id
- else:
-
- def filter_fn(row):
- return tuple(
- id(item) if ent.use_id_for_hash else item
- for ent, item in zip(query._entities, row)
- )
+ single_entity = context.is_single_entity
try:
(process, labels, extra) = list(
@@ -66,42 +55,66 @@ def instances(query, cursor, context):
)
)
- if not single_entity:
- keyed_tuple = result_tuple(labels, extra)
+ if query._yield_per and (
+ context.loaders_require_buffering
+ or context.loaders_require_uniquing
+ ):
+ raise sa_exc.InvalidRequestError(
+ "Can't use yield_per with eager loaders that require uniquing "
+ "or row buffering, e.g. joinedload() against collections "
+ "or subqueryload(). Consider the selectinload() strategy "
+ "for better flexibility in loading objects."
+ )
+
+ except Exception:
+ with util.safe_reraise():
+ cursor.close()
+
+ row_metadata = SimpleResultMetaData(
+ labels,
+ extra,
+ _unique_filters=[
+ id if ent.use_id_for_hash else None for ent in query._entities
+ ],
+ )
+ def chunks(size):
while True:
+ yield_per = size
+
context.partials = {}
- if query._yield_per:
- fetch = cursor.fetchmany(query._yield_per)
+ if yield_per:
+ fetch = cursor.fetchmany(yield_per)
if not fetch:
break
else:
fetch = cursor.fetchall()
- if single_entity:
- proc = process[0]
- rows = [proc(row) for row in fetch]
- else:
- rows = [
- keyed_tuple([proc(row) for proc in process])
- for row in fetch
- ]
+ rows = [tuple([proc(row) for proc in process]) for row in fetch]
for path, post_load in context.post_load_paths.items():
post_load.invoke(context, path)
- if filtered:
- rows = util.unique_list(rows, filter_fn)
+ yield rows
- for row in rows:
- yield row
-
- if not query._yield_per:
+ if not yield_per:
break
- except Exception:
- with util.safe_reraise():
- cursor.close()
+
+ result = ChunkedIteratorResult(row_metadata, chunks)
+ if query._yield_per:
+ result.yield_per(query._yield_per)
+
+ if single_entity:
+ result = result.scalars()
+
+ # filtered = context.loaders_require_uniquing
+ filtered = query._has_mapper_entities
+
+ if filtered:
+ result = result.unique()
+
+ return result
@util.preload_module("sqlalchemy.orm.query")
@@ -114,10 +127,18 @@ def merge_result(query, iterator, load=True):
# flush current contents if we expect to load data
session._autoflush()
+ # TODO: need test coverage and documentation for the FrozenResult
+ # use case.
+ if isinstance(iterator, FrozenResult):
+ frozen_result = iterator
+ iterator = iter(frozen_result.data)
+ else:
+ frozen_result = None
+
autoflush = session.autoflush
try:
session.autoflush = False
- single_entity = len(query._entities) == 1
+ single_entity = not frozen_result and len(query._entities) == 1
if single_entity:
if isinstance(query._entities[0], querylib._MapperEntity):
result = [
@@ -156,7 +177,10 @@ def merge_result(query, iterator, load=True):
)
result.append(keyed_tuple(newrow))
- return iter(result)
+ if frozen_result:
+ return frozen_result.with_data(result)
+ else:
+ return iter(result)
finally:
session.autoflush = autoflush