diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-24 15:34:19 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-02 11:18:13 -0400 |
commit | bbf644862ab05734d153d74abf59aa3492278563 (patch) | |
tree | 0a563147603ff2906422ed1b07e9f5f03e7584c8 /lib/sqlalchemy/orm/loading.py | |
parent | 7acf9af1ce74a0bda4c4d29af7da543b5c42b3f8 (diff) | |
download | sqlalchemy-bbf644862ab05734d153d74abf59aa3492278563.tar.gz |
Integrate new Result into ORM query
The next step in the 2.0 ORM changes is to have the
ORM integrate with the new Result object fully.
this patch uses Result to represent ORM objects rather
than lists. public API to get at this Result is not
added yet. dogpile.cache and horizontal sharding
recipe/extensions have small adjustments to accommodate
this change.
Callcounts have fluctuated, some slightly better and
some slightly worse. A few have gone up by a bit,
however as the codebase is still in flux it is anticipated
there will be some performance gains later on as
ORM fetching is refined to no longer need to accommodate
for extensive aliasing. The addition of caching
will then change the entire story.
References: #5087
References: #4395
Change-Id: If1a23824ffb77d8d58cf2338cf35dd6b5963b17f
Diffstat (limited to 'lib/sqlalchemy/orm/loading.py')
-rw-r--r-- | lib/sqlalchemy/orm/loading.py | 100 |
1 files changed, 62 insertions, 38 deletions
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py index d781df980..10d937945 100644 --- a/lib/sqlalchemy/orm/loading.py +++ b/lib/sqlalchemy/orm/loading.py @@ -29,9 +29,11 @@ from .util import state_str from .. import exc as sa_exc from .. import util from ..engine import result_tuple +from ..engine.result import ChunkedIteratorResult +from ..engine.result import FrozenResult +from ..engine.result import SimpleResultMetaData from ..sql import util as sql_util - _new_runid = util.counter() @@ -41,20 +43,7 @@ def instances(query, cursor, context): context.runid = _new_runid() context.post_load_paths = {} - filtered = query._has_mapper_entities - - single_entity = query.is_single_entity - - if filtered: - if single_entity: - filter_fn = id - else: - - def filter_fn(row): - return tuple( - id(item) if ent.use_id_for_hash else item - for ent, item in zip(query._entities, row) - ) + single_entity = context.is_single_entity try: (process, labels, extra) = list( @@ -66,42 +55,66 @@ def instances(query, cursor, context): ) ) - if not single_entity: - keyed_tuple = result_tuple(labels, extra) + if query._yield_per and ( + context.loaders_require_buffering + or context.loaders_require_uniquing + ): + raise sa_exc.InvalidRequestError( + "Can't use yield_per with eager loaders that require uniquing " + "or row buffering, e.g. joinedload() against collections " + "or subqueryload(). Consider the selectinload() strategy " + "for better flexibility in loading objects." + ) + + except Exception: + with util.safe_reraise(): + cursor.close() + + row_metadata = SimpleResultMetaData( + labels, + extra, + _unique_filters=[ + id if ent.use_id_for_hash else None for ent in query._entities + ], + ) + def chunks(size): while True: + yield_per = size + context.partials = {} - if query._yield_per: - fetch = cursor.fetchmany(query._yield_per) + if yield_per: + fetch = cursor.fetchmany(yield_per) if not fetch: break else: fetch = cursor.fetchall() - if single_entity: - proc = process[0] - rows = [proc(row) for row in fetch] - else: - rows = [ - keyed_tuple([proc(row) for proc in process]) - for row in fetch - ] + rows = [tuple([proc(row) for proc in process]) for row in fetch] for path, post_load in context.post_load_paths.items(): post_load.invoke(context, path) - if filtered: - rows = util.unique_list(rows, filter_fn) + yield rows - for row in rows: - yield row - - if not query._yield_per: + if not yield_per: break - except Exception: - with util.safe_reraise(): - cursor.close() + + result = ChunkedIteratorResult(row_metadata, chunks) + if query._yield_per: + result.yield_per(query._yield_per) + + if single_entity: + result = result.scalars() + + # filtered = context.loaders_require_uniquing + filtered = query._has_mapper_entities + + if filtered: + result = result.unique() + + return result @util.preload_module("sqlalchemy.orm.query") @@ -114,10 +127,18 @@ def merge_result(query, iterator, load=True): # flush current contents if we expect to load data session._autoflush() + # TODO: need test coverage and documentation for the FrozenResult + # use case. + if isinstance(iterator, FrozenResult): + frozen_result = iterator + iterator = iter(frozen_result.data) + else: + frozen_result = None + autoflush = session.autoflush try: session.autoflush = False - single_entity = len(query._entities) == 1 + single_entity = not frozen_result and len(query._entities) == 1 if single_entity: if isinstance(query._entities[0], querylib._MapperEntity): result = [ @@ -156,7 +177,10 @@ def merge_result(query, iterator, load=True): ) result.append(keyed_tuple(newrow)) - return iter(result) + if frozen_result: + return frozen_result.with_data(result) + else: + return iter(result) finally: session.autoflush = autoflush |