diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-06-23 14:45:47 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2012-06-23 14:45:47 -0400 |
commit | 0efac1d93e27220f2aa1ec09b282fb08a7bb02be (patch) | |
tree | dbd806874a223253d0c8f52465a77f7e0754c7a8 /lib/sqlalchemy/orm/loading.py | |
parent | df62f4501ee1ec37113477eb6a97068cc07faf5d (diff) | |
download | sqlalchemy-0efac1d93e27220f2aa1ec09b282fb08a7bb02be.tar.gz |
- move all of orm to use absolute imports
- break out key mechanics of loading objects
into new "orm.loading" module, removing implementation
details from both mapper.py and query.py. is analogous
to persistence.py
- some other cleanup and old cruft removal
Diffstat (limited to 'lib/sqlalchemy/orm/loading.py')
-rw-r--r-- | lib/sqlalchemy/orm/loading.py | 533 |
1 files changed, 533 insertions, 0 deletions
diff --git a/lib/sqlalchemy/orm/loading.py b/lib/sqlalchemy/orm/loading.py new file mode 100644 index 000000000..3234d2238 --- /dev/null +++ b/lib/sqlalchemy/orm/loading.py @@ -0,0 +1,533 @@ +# orm/loading.py +# Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +"""private module containing functions used to convert database +rows into object instances and associated state. + +the functions here are called primarily by Query, Mapper, +as well as some of the attribute loading strategies. + +""" +from __future__ import absolute_import + +from .. import util +from . import attributes, exc as orm_exc +from .interfaces import EXT_CONTINUE +from ..sql import util as sql_util +from .util import _none_set, state_str + +_new_runid = util.counter() + +def instances(query, cursor, context): + """Return an ORM result as an iterator.""" + session = query.session + + context.runid = _new_runid() + + filter_fns = [ent.filter_fn + for ent in query._entities] + filtered = id in filter_fns + + single_entity = filtered and len(query._entities) == 1 + + if filtered: + if single_entity: + filter_fn = id + else: + def filter_fn(row): + return tuple(fn(x) for x, fn in zip(row, filter_fns)) + + custom_rows = single_entity and \ + query._entities[0].mapper.dispatch.append_result + + (process, labels) = \ + zip(*[ + query_entity.row_processor(query, + context, custom_rows) + for query_entity in query._entities + ]) + + while True: + context.progress = {} + context.partials = {} + + if query._yield_per: + fetch = cursor.fetchmany(query._yield_per) + if not fetch: + break + else: + fetch = cursor.fetchall() + + if custom_rows: + rows = [] + for row in fetch: + process[0](row, rows) + elif single_entity: + rows = [process[0](row, None) for row in fetch] + else: + rows = [util.NamedTuple([proc(row, None) for proc in process], + labels) for row in fetch] + + if filtered: + rows = util.unique_list(rows, filter_fn) + + if context.refresh_state and query._only_load_props \ + and context.refresh_state in context.progress: + context.refresh_state.commit( + context.refresh_state.dict, query._only_load_props) + context.progress.pop(context.refresh_state) + + session._finalize_loaded(context.progress) + + for ii, (dict_, attrs) in context.partials.iteritems(): + ii.commit(dict_, attrs) + + for row in rows: + yield row + + if not query._yield_per: + break + +def merge_result(query, iterator, load=True): + """Merge a result into this :class:`.Query` object's Session.""" + + from . import query as querylib + + session = query.session + if load: + # flush current contents if we expect to load data + session._autoflush() + + autoflush = session.autoflush + try: + session.autoflush = False + single_entity = len(query._entities) == 1 + if single_entity: + if isinstance(query._entities[0], querylib._MapperEntity): + result = [session._merge( + attributes.instance_state(instance), + attributes.instance_dict(instance), + load=load, _recursive={}) + for instance in iterator] + else: + result = list(iterator) + else: + mapped_entities = [i for i, e in enumerate(query._entities) + if isinstance(e, querylib._MapperEntity)] + result = [] + for row in iterator: + newrow = list(row) + for i in mapped_entities: + newrow[i] = session._merge( + attributes.instance_state(newrow[i]), + attributes.instance_dict(newrow[i]), + load=load, _recursive={}) + result.append(util.NamedTuple(newrow, row._labels)) + + return iter(result) + finally: + session.autoflush = autoflush + +def get_from_identity(session, key, passive): + """Look up the given key in the given session's identity map, + check the object for expired state if found. + + """ + instance = session.identity_map.get(key) + if instance is not None: + + state = attributes.instance_state(instance) + + # expired - ensure it still exists + if state.expired: + if not passive & attributes.SQL_OK: + # TODO: no coverage here + return attributes.PASSIVE_NO_RESULT + elif not passive & attributes.RELATED_OBJECT_OK: + # this mode is used within a flush and the instance's + # expired state will be checked soon enough, if necessary + return instance + try: + state(passive) + except orm_exc.ObjectDeletedError: + session._remove_newly_deleted([state]) + return None + return instance + else: + return None + +def load_on_ident(query, key, + refresh_state=None, lockmode=None, + only_load_props=None): + """Load the given identity key from the database.""" + + lockmode = lockmode or query._lockmode + + if key is not None: + ident = key[1] + else: + ident = None + + if refresh_state is None: + q = query._clone() + q._get_condition() + else: + q = query._clone() + + if ident is not None: + mapper = query._mapper_zero() + + (_get_clause, _get_params) = mapper._get_clause + + # None present in ident - turn those comparisons + # into "IS NULL" + if None in ident: + nones = set([ + _get_params[col].key for col, value in + zip(mapper.primary_key, ident) if value is None + ]) + _get_clause = sql_util.adapt_criterion_to_null( + _get_clause, nones) + + _get_clause = q._adapt_clause(_get_clause, True, False) + q._criterion = _get_clause + + params = dict([ + (_get_params[primary_key].key, id_val) + for id_val, primary_key in zip(ident, mapper.primary_key) + ]) + + q._params = params + + if lockmode is not None: + q._lockmode = lockmode + q._get_options( + populate_existing=bool(refresh_state), + version_check=(lockmode is not None), + only_load_props=only_load_props, + refresh_state=refresh_state) + q._order_by = None + + try: + return q.one() + except orm_exc.NoResultFound: + return None + +def instance_processor(mapper, context, path, adapter, + polymorphic_from=None, + only_load_props=None, + refresh_state=None, + polymorphic_discriminator=None): + + """Produce a mapper level row processor callable + which processes rows into mapped instances.""" + + # note that this method, most of which exists in a closure + # called _instance(), resists being broken out, as + # attempts to do so tend to add significant function + # call overhead. _instance() is the most + # performance-critical section in the whole ORM. + + pk_cols = mapper.primary_key + + if polymorphic_from or refresh_state: + polymorphic_on = None + else: + if polymorphic_discriminator is not None: + polymorphic_on = polymorphic_discriminator + else: + polymorphic_on = mapper.polymorphic_on + polymorphic_instances = util.PopulateDict( + _configure_subclass_mapper( + mapper, + context, path, adapter) + ) + + version_id_col = mapper.version_id_col + + if adapter: + pk_cols = [adapter.columns[c] for c in pk_cols] + if polymorphic_on is not None: + polymorphic_on = adapter.columns[polymorphic_on] + if version_id_col is not None: + version_id_col = adapter.columns[version_id_col] + + identity_class = mapper._identity_class + + new_populators = [] + existing_populators = [] + eager_populators = [] + load_path = context.query._current_path + path \ + if context.query._current_path.path \ + else path + + def populate_state(state, dict_, row, isnew, only_load_props): + if isnew: + if context.propagate_options: + state.load_options = context.propagate_options + if state.load_options: + state.load_path = load_path + + if not new_populators: + _populators(mapper, context, path, row, adapter, + new_populators, + existing_populators, + eager_populators + ) + + if isnew: + populators = new_populators + else: + populators = existing_populators + + if only_load_props is None: + for key, populator in populators: + populator(state, dict_, row) + elif only_load_props: + for key, populator in populators: + if key in only_load_props: + populator(state, dict_, row) + + session_identity_map = context.session.identity_map + + listeners = mapper.dispatch + + translate_row = listeners.translate_row or None + create_instance = listeners.create_instance or None + populate_instance = listeners.populate_instance or None + append_result = listeners.append_result or None + populate_existing = context.populate_existing or mapper.always_refresh + invoke_all_eagers = context.invoke_all_eagers + + if mapper.allow_partial_pks: + is_not_primary_key = _none_set.issuperset + else: + is_not_primary_key = _none_set.issubset + + def _instance(row, result): + if not new_populators and invoke_all_eagers: + _populators(mapper, context, path, row, adapter, + new_populators, + existing_populators, + eager_populators + ) + + if translate_row: + for fn in translate_row: + ret = fn(mapper, context, row) + if ret is not EXT_CONTINUE: + row = ret + break + + if polymorphic_on is not None: + discriminator = row[polymorphic_on] + if discriminator is not None: + _instance = polymorphic_instances[discriminator] + if _instance: + return _instance(row, result) + + # determine identity key + if refresh_state: + identitykey = refresh_state.key + if identitykey is None: + # super-rare condition; a refresh is being called + # on a non-instance-key instance; this is meant to only + # occur within a flush() + identitykey = mapper._identity_key_from_state(refresh_state) + else: + identitykey = ( + identity_class, + tuple([row[column] for column in pk_cols]) + ) + + instance = session_identity_map.get(identitykey) + if instance is not None: + state = attributes.instance_state(instance) + dict_ = attributes.instance_dict(instance) + + isnew = state.runid != context.runid + currentload = not isnew + loaded_instance = False + + if not currentload and \ + version_id_col is not None and \ + context.version_check and \ + mapper._get_state_attr_by_column( + state, + dict_, + mapper.version_id_col) != \ + row[version_id_col]: + + raise orm_exc.StaleDataError( + "Instance '%s' has version id '%s' which " + "does not match database-loaded version id '%s'." + % (state_str(state), + mapper._get_state_attr_by_column( + state, dict_, + mapper.version_id_col), + row[version_id_col])) + elif refresh_state: + # out of band refresh_state detected (i.e. its not in the + # session.identity_map) honor it anyway. this can happen + # if a _get() occurs within save_obj(), such as + # when eager_defaults is True. + state = refresh_state + instance = state.obj() + dict_ = attributes.instance_dict(instance) + isnew = state.runid != context.runid + currentload = True + loaded_instance = False + else: + # check for non-NULL values in the primary key columns, + # else no entity is returned for the row + if is_not_primary_key(identitykey[1]): + return None + + isnew = True + currentload = True + loaded_instance = True + + if create_instance: + for fn in create_instance: + instance = fn(mapper, context, + row, mapper.class_) + if instance is not EXT_CONTINUE: + manager = attributes.manager_of_class( + instance.__class__) + # TODO: if manager is None, raise a friendly error + # about returning instances of unmapped types + manager.setup_instance(instance) + break + else: + instance = mapper.class_manager.new_instance() + else: + instance = mapper.class_manager.new_instance() + + dict_ = attributes.instance_dict(instance) + state = attributes.instance_state(instance) + state.key = identitykey + + # attach instance to session. + state.session_id = context.session.hash_key + session_identity_map.add(state) + + if currentload or populate_existing: + # state is being fully loaded, so populate. + # add to the "context.progress" collection. + if isnew: + state.runid = context.runid + context.progress[state] = dict_ + + if populate_instance: + for fn in populate_instance: + ret = fn(mapper, context, row, state, + only_load_props=only_load_props, + instancekey=identitykey, isnew=isnew) + if ret is not EXT_CONTINUE: + break + else: + populate_state(state, dict_, row, isnew, only_load_props) + else: + populate_state(state, dict_, row, isnew, only_load_props) + + if loaded_instance: + state.manager.dispatch.load(state, context) + elif isnew: + state.manager.dispatch.refresh(state, context, only_load_props) + + elif state in context.partials or state.unloaded or eager_populators: + # state is having a partial set of its attributes + # refreshed. Populate those attributes, + # and add to the "context.partials" collection. + if state in context.partials: + isnew = False + (d_, attrs) = context.partials[state] + else: + isnew = True + attrs = state.unloaded + context.partials[state] = (dict_, attrs) + + if populate_instance: + for fn in populate_instance: + ret = fn(mapper, context, row, state, + only_load_props=attrs, + instancekey=identitykey, isnew=isnew) + if ret is not EXT_CONTINUE: + break + else: + populate_state(state, dict_, row, isnew, attrs) + else: + populate_state(state, dict_, row, isnew, attrs) + + for key, pop in eager_populators: + if key not in state.unloaded: + pop(state, dict_, row) + + if isnew: + state.manager.dispatch.refresh(state, context, attrs) + + + if result is not None: + if append_result: + for fn in append_result: + if fn(mapper, context, row, state, + result, instancekey=identitykey, + isnew=isnew) is not EXT_CONTINUE: + break + else: + result.append(instance) + else: + result.append(instance) + + return instance + return _instance + +def _populators(mapper, context, path, row, adapter, + new_populators, existing_populators, eager_populators): + """Produce a collection of attribute level row processor + callables.""" + + delayed_populators = [] + pops = (new_populators, existing_populators, delayed_populators, + eager_populators) + for prop in mapper._props.itervalues(): + for i, pop in enumerate(prop.create_row_processor( + context, path, + mapper, row, adapter)): + if pop is not None: + pops[i].append((prop.key, pop)) + + if delayed_populators: + new_populators.extend(delayed_populators) + +def _configure_subclass_mapper(mapper, context, path, adapter): + """Produce a mapper level row processor callable factory for mappers + inheriting this one.""" + + def configure_subclass_mapper(discriminator): + try: + sub_mapper = mapper.polymorphic_map[discriminator] + except KeyError: + raise AssertionError( + "No such polymorphic_identity %r is defined" % + discriminator) + if sub_mapper is mapper: + return None + + # replace the tip of the path info with the subclass mapper + # being used, that way accurate "load_path" info is available + # for options invoked during deferred loads, e.g. + # query(Person).options(defer(Engineer.machines, Machine.name)). + # for AliasedClass paths, disregard this step (new in 0.8). + return instance_processor( + sub_mapper, + context, + path.parent[sub_mapper] + if not path.is_aliased_class + else path, + adapter, + polymorphic_from=mapper) + return configure_subclass_mapper |