diff options
Diffstat (limited to 'lib/sqlalchemy/orm/loading.py')
-rw-r--r-- | lib/sqlalchemy/orm/loading.py | 231 |
1 files changed, 191 insertions, 40 deletions
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py index 48641685e..616e757a3 100644 --- a/lib/sqlalchemy/orm/loading.py +++ b/lib/sqlalchemy/orm/loading.py @@ -26,6 +26,7 @@ from .base import _SET_DEFERRED_EXPIRED from .util import _none_set from .util import state_str from .. import exc as sa_exc +from .. import future from .. import util from ..engine import result_tuple from ..engine.result import ChunkedIteratorResult @@ -36,8 +37,20 @@ from ..sql import util as sql_util _new_runid = util.counter() -def instances(query, cursor, context): - """Return an ORM result as an iterator.""" +def instances(cursor, context): + """Return a :class:`.Result` given an ORM query context. + + :param cursor: a :class:`.CursorResult`, generated by a statement + which came from :class:`.ORMCompileState` + + :param context: a :class:`.QueryContext` object + + :return: a :class:`.Result` object representing ORM results + + .. versionchanged:: 1.4 The instances() function now uses + :class:`.Result` objects and has an all new interface. + + """ context.runid = _new_runid() context.post_load_paths = {} @@ -80,7 +93,7 @@ def instances(query, cursor, context): ], ) - def chunks(size, as_tuples): + def chunks(size): while True: yield_per = size @@ -94,7 +107,7 @@ def instances(query, cursor, context): else: fetch = cursor.fetchall() - if not as_tuples: + if single_entity: proc = process[0] rows = [proc(row) for row in fetch] else: @@ -111,20 +124,62 @@ def instances(query, cursor, context): break result = ChunkedIteratorResult( - row_metadata, chunks, source_supports_scalars=single_entity + row_metadata, chunks, source_supports_scalars=single_entity, raw=cursor + ) + + result._attributes = result._attributes.union( + dict(filtered=filtered, is_single_entity=single_entity) ) + if context.yield_per: result.yield_per(context.yield_per) - if single_entity: - result = result.scalars() + return result - filtered = context.compile_state._has_mapper_entities - if filtered: - result = result.unique() +@util.preload_module("sqlalchemy.orm.context") +def merge_frozen_result(session, statement, frozen_result, load=True): + querycontext = util.preloaded.orm_context - return result + if load: + # flush current contents if we expect to load data + session._autoflush() + + ctx = querycontext.ORMSelectCompileState._create_entities_collection( + statement + ) + + autoflush = session.autoflush + try: + session.autoflush = False + mapped_entities = [ + i + for i, e in enumerate(ctx._entities) + if isinstance(e, querycontext._MapperEntity) + ] + keys = [ent._label_name for ent in ctx._entities] + + keyed_tuple = result_tuple( + keys, [ent._extra_entities for ent in ctx._entities] + ) + + result = [] + for newrow in frozen_result.rewrite_rows(): + for i in mapped_entities: + if newrow[i] is not None: + newrow[i] = session._merge( + attributes.instance_state(newrow[i]), + attributes.instance_dict(newrow[i]), + load=load, + _recursive={}, + _resolve_conflict_map={}, + ) + + result.append(keyed_tuple(newrow)) + + return frozen_result.with_new_rows(result) + finally: + session.autoflush = autoflush @util.preload_module("sqlalchemy.orm.context") @@ -145,9 +200,7 @@ def merge_result(query, iterator, load=True): else: frozen_result = None - ctx = querycontext.QueryCompileState._create_for_legacy_query( - query, entities_only=True - ) + ctx = querycontext.ORMSelectCompileState._create_entities_collection(query) autoflush = session.autoflush try: @@ -235,12 +288,15 @@ def get_from_identity(session, mapper, key, passive): def load_on_ident( - query, + session, + statement, key, + load_options=None, refresh_state=None, with_for_update=None, only_load_props=None, no_autoflush=False, + bind_arguments=util.immutabledict(), ): """Load the given identity key from the database.""" if key is not None: @@ -249,38 +305,59 @@ def load_on_ident( else: ident = identity_token = None - if no_autoflush: - query = query.autoflush(False) - return load_on_pk_identity( - query, + session, + statement, ident, + load_options=load_options, refresh_state=refresh_state, with_for_update=with_for_update, only_load_props=only_load_props, identity_token=identity_token, + no_autoflush=no_autoflush, + bind_arguments=bind_arguments, ) def load_on_pk_identity( - query, + session, + statement, primary_key_identity, + load_options=None, refresh_state=None, with_for_update=None, only_load_props=None, identity_token=None, + no_autoflush=False, + bind_arguments=util.immutabledict(), ): """Load the given primary key identity from the database.""" + query = statement + q = query._clone() + + # TODO: fix these imports .... + from .context import QueryContext, ORMCompileState + + if load_options is None: + load_options = QueryContext.default_load_options + + compile_options = ORMCompileState.default_compile_options.merge( + q.compile_options + ) + + # checking that query doesnt have criteria on it + # just delete it here w/ optional assertion? since we are setting a + # where clause also if refresh_state is None: - q = query._clone() - q._get_condition() - else: - q = query._clone() + _no_criterion_assertion(q, "get", order_by=False, distinct=False) if primary_key_identity is not None: - mapper = query._only_full_mapper_zero("load_on_pk_identity") + # mapper = query._only_full_mapper_zero("load_on_pk_identity") + + # TODO: error checking? + mapper = query._raw_columns[0]._annotations["parententity"] (_get_clause, _get_params) = mapper._get_clause @@ -320,9 +397,8 @@ def load_on_pk_identity( ] ) - q.load_options += {"_params": params} + load_options += {"_params": params} - # with_for_update needs to be query.LockmodeArg() if with_for_update is not None: version_check = True q._for_update_arg = with_for_update @@ -333,11 +409,15 @@ def load_on_pk_identity( version_check = False if refresh_state and refresh_state.load_options: - # if refresh_state.load_path.parent: - q = q._with_current_path(refresh_state.load_path.parent) - q = q.options(refresh_state.load_options) + compile_options += {"_current_path": refresh_state.load_path.parent} + q = q.options(*refresh_state.load_options) - q._get_options( + # TODO: most of the compile_options that are not legacy only involve this + # function, so try to see if handling of them can mostly be local to here + + q.compile_options, load_options = _set_get_options( + compile_options, + load_options, populate_existing=bool(refresh_state), version_check=version_check, only_load_props=only_load_props, @@ -346,12 +426,76 @@ def load_on_pk_identity( ) q._order_by = None + if no_autoflush: + load_options += {"_autoflush": False} + + result = ( + session.execute( + q, + params=load_options._params, + execution_options={"_sa_orm_load_options": load_options}, + bind_arguments=bind_arguments, + ) + .unique() + .scalars() + ) + try: - return q.one() + return result.one() except orm_exc.NoResultFound: return None +def _no_criterion_assertion(stmt, meth, order_by=True, distinct=True): + if ( + stmt._where_criteria + or stmt.compile_options._statement is not None + or stmt._from_obj + or stmt._legacy_setup_joins + or stmt._limit_clause is not None + or stmt._offset_clause is not None + or stmt._group_by_clauses + or (order_by and stmt._order_by_clauses) + or (distinct and stmt._distinct) + ): + raise sa_exc.InvalidRequestError( + "Query.%s() being called on a " + "Query with existing criterion. " % meth + ) + + +def _set_get_options( + compile_opt, + load_opt, + populate_existing=None, + version_check=None, + only_load_props=None, + refresh_state=None, + identity_token=None, +): + + compile_options = {} + load_options = {} + if version_check: + load_options["_version_check"] = version_check + if populate_existing: + load_options["_populate_existing"] = populate_existing + if refresh_state: + load_options["_refresh_state"] = refresh_state + compile_options["_for_refresh_state"] = True + if only_load_props: + compile_options["_only_load_props"] = frozenset(only_load_props) + if identity_token: + load_options["_refresh_identity_token"] = identity_token + + if load_options: + load_opt += load_options + if compile_options: + compile_opt += compile_options + + return compile_opt, load_opt + + def _setup_entity_query( compile_state, mapper, @@ -487,7 +631,7 @@ def _instance_processor( context, path, mapper, result, adapter, populators ) - propagate_options = context.propagate_options + propagated_loader_options = context.propagated_loader_options load_path = ( context.compile_state.current_path + path if context.compile_state.current_path.path @@ -639,8 +783,8 @@ def _instance_processor( # be conservative about setting load_path when populate_existing # is in effect; want to maintain options from the original # load. see test_expire->test_refresh_maintains_deferred_options - if isnew and (propagate_options or not populate_existing): - state.load_options = propagate_options + if isnew and (propagated_loader_options or not populate_existing): + state.load_options = propagated_loader_options state.load_path = load_path _populate_full( @@ -1055,7 +1199,7 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): result = False - no_autoflush = passive & attributes.NO_AUTOFLUSH + no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) # in the case of inheritance, particularly concrete and abstract # concrete inheritance, the class manager might have some keys @@ -1080,10 +1224,16 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): # note: using from_statement() here means there is an adaption # with adapt_on_names set up. the other option is to make the # aliased() against a subquery which affects the SQL. + + from .query import FromStatement + + stmt = FromStatement(mapper, statement).options( + strategy_options.Load(mapper).undefer("*") + ) + result = load_on_ident( - session.query(mapper) - .options(strategy_options.Load(mapper).undefer("*")) - .from_statement(statement), + session, + stmt, None, only_load_props=attribute_names, refresh_state=state, @@ -1121,7 +1271,8 @@ def load_scalar_attributes(mapper, state, attribute_names, passive): return result = load_on_ident( - session.query(mapper), + session, + future.select(mapper).apply_labels(), identity_key, refresh_state=state, only_load_props=attribute_names, |