diff options
Diffstat (limited to 'lib/sqlalchemy/engine/result.py')
-rw-r--r-- | lib/sqlalchemy/engine/result.py | 2298 |
1 files changed, 799 insertions, 1499 deletions
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index bc3cdbb9a..a3a9cc489 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -5,1753 +5,997 @@ # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php -"""Define result set constructs including :class:`.Result`""" +"""Define generic result set constructs.""" -import collections import functools +import itertools import operator from .row import _baserow_usecext -from .row import BaseRow # noqa -from .row import LegacyRow # noqa -from .row import Row # noqa -from .row import RowMapping # noqa -from .row import RowProxy # noqa -from .row import rowproxy_reconstructor # noqa +from .row import Row from .. import exc from .. import util -from ..sql import expression -from ..sql import sqltypes -from ..sql import util as sql_util -from ..sql.compiler import RM_NAME -from ..sql.compiler import RM_OBJECTS -from ..sql.compiler import RM_RENDERED_NAME -from ..sql.compiler import RM_TYPE +from ..sql.base import _generative +from ..sql.base import HasMemoized +from ..sql.base import InPlaceGenerative +from ..util import collections_abc if _baserow_usecext: - from sqlalchemy.cresultproxy import tuplegetter as _tuplegetter + from sqlalchemy.cresultproxy import tuplegetter -_UNPICKLED = util.symbol("unpickled") + _row_as_tuple = tuplegetter +else: -# cyclical import for sqlalchemy.future -_future_Result = None + def tuplegetter(*indexes): + it = operator.itemgetter(*indexes) -# metadata entry tuple indexes. -# using raw tuple is faster than namedtuple. -MD_INDEX = 0 # integer index in cursor.description -MD_OBJECTS = 1 # other string keys and ColumnElement obj that can match -MD_LOOKUP_KEY = 2 # string key we usually expect for key-based lookup -MD_RENDERED_NAME = 3 # name that is usually in cursor.description -MD_PROCESSOR = 4 # callable to process a result value into a row -MD_UNTRANSLATED = 5 # raw name from cursor.description + if len(indexes) > 1: + return it + else: + return lambda row: (it(row),) + + def _row_as_tuple(*indexes): + getters = [ + operator.methodcaller("_get_by_key_impl_mapping", index) + for index in indexes + ] + return lambda rec: tuple(getter(rec) for getter in getters) class ResultMetaData(object): + """Base for metadata about result rows.""" + __slots__ = () - def _has_key(self, key): - return key in self._keymap + _tuplefilter = None + _translated_indexes = None + _unique_filters = None - def _key_fallback(self, key, err): + @property + def keys(self): + return RMKeyView(self) + + def _for_freeze(self): + raise NotImplementedError() + + def _key_fallback(self, key, err, raiseerr=True): + assert raiseerr if isinstance(key, int): util.raise_(IndexError(key), replace_context=err) else: util.raise_(KeyError(key), replace_context=err) + def _warn_for_nonint(self, key): + raise TypeError( + "TypeError: tuple indices must be integers or slices, not %s" + % type(key).__name__ + ) -class SimpleResultMetaData(ResultMetaData): - __slots__ = "keys", "_keymap", "_processors" - - def __init__(self, keys, extra=None): - self.keys = list(keys) - - len_keys = len(keys) - - self._keymap = { - name: (index, name) for index, name in enumerate(self.keys) - } - if not _baserow_usecext: - self._keymap.update( - { - index: (index, None, self.keys[index]) - for index in range(len_keys) - } - ) - # TODO: negative indexes? test coverage? - if extra: - for key, ex in zip(keys, extra): - rec = self._keymap[key] - self._keymap.update({e: rec for e in ex}) - self._processors = [None] * len(keys) - - def __getstate__(self): - return {"keys": self.keys} + def _index_for_key(self, keys, raiseerr): + raise NotImplementedError() - def __setstate__(self, state): - self.__init__(state["keys"]) + def _metadata_for_keys(self, key): + raise NotImplementedError() - def _has_key(self, key): - return key in self._keymap + def _reduce(self, keys): + raise NotImplementedError() - def _contains(self, value, row): - return value in row._data + def _getter(self, key, raiseerr=True): + index = self._index_for_key(key, raiseerr) -def result_tuple(fields, extra=None): - parent = SimpleResultMetaData(fields, extra) - return functools.partial(Row, parent, parent._processors, parent._keymap) + if index is not None: + return operator.methodcaller("_get_by_key_impl_mapping", index) + else: + return None + def _row_as_tuple_getter(self, keys): + indexes = list(self._indexes_for_keys(keys)) + return _row_as_tuple(*indexes) -class CursorResultMetaData(ResultMetaData): - """Handle cursor.description, applying additional info from an execution - context.""" - __slots__ = ( - "_keymap", - "case_sensitive", - "matched_on_name", - "_processors", - "keys", - ) +class RMKeyView(collections_abc.KeysView): + __slots__ = ("_parent", "_keys") - def _adapt_to_context(self, context): - """When using a cached result metadata against a new context, - we need to rewrite the _keymap so that it has the specific - Column objects in the new context inside of it. this accommodates - for select() constructs that contain anonymized columns and - are cached. + def __init__(self, parent): + self._parent = parent + self._keys = [k for k in parent._keys if k is not None] - """ - if not context.compiled._result_columns: - return self - - compiled_statement = context.compiled.statement - invoked_statement = context.invoked_statement - - # same statement was invoked as the one we cached against, - # return self - if compiled_statement is invoked_statement: - return self - - # make a copy and add the columns from the invoked statement - # to the result map. - md = self.__class__.__new__(self.__class__) - - md._keymap = self._keymap.copy() - - # match up new columns positionally to the result columns - for existing, new in zip( - context.compiled._result_columns, - invoked_statement._exported_columns_iterator(), - ): - md._keymap[new] = md._keymap[existing[RM_NAME]] - - md.case_sensitive = self.case_sensitive - md.matched_on_name = self.matched_on_name - md._processors = self._processors - md.keys = self.keys - return md - - def __init__(self, parent, cursor_description): - context = parent.context - dialect = context.dialect - self.case_sensitive = dialect.case_sensitive - self.matched_on_name = False - - if context.result_column_struct: - ( - result_columns, - cols_are_ordered, - textual_ordered, - loose_column_name_matching, - ) = context.result_column_struct - num_ctx_cols = len(result_columns) - else: - result_columns = ( - cols_are_ordered - ) = ( - num_ctx_cols - ) = loose_column_name_matching = textual_ordered = False - - # merge cursor.description with the column info - # present in the compiled structure, if any - raw = self._merge_cursor_description( - context, - cursor_description, - result_columns, - num_ctx_cols, - cols_are_ordered, - textual_ordered, - loose_column_name_matching, - ) + def __len__(self): + return len(self._keys) - self._keymap = {} - if not _baserow_usecext: - # keymap indexes by integer index: this is only used - # in the pure Python BaseRow.__getitem__ - # implementation to avoid an expensive - # isinstance(key, util.int_types) in the most common - # case path + def __repr__(self): + return "{0.__class__.__name__}({0._keys!r})".format(self) - len_raw = len(raw) + def __iter__(self): + return iter(self._keys) - self._keymap.update( - [ - (metadata_entry[MD_INDEX], metadata_entry) - for metadata_entry in raw - ] - + [ - (metadata_entry[MD_INDEX] - len_raw, metadata_entry) - for metadata_entry in raw - ] - ) + def __contains__(self, item): + if not _baserow_usecext and isinstance(item, int): + return False - # processors in key order for certain per-row - # views like __iter__ and slices - self._processors = [ - metadata_entry[MD_PROCESSOR] for metadata_entry in raw - ] + # note this also includes special key fallback behaviors + # which also don't seem to be tested in test_resultset right now + return self._parent._has_key(item) - # keymap by primary string... - by_key = dict( - [ - (metadata_entry[MD_LOOKUP_KEY], metadata_entry) - for metadata_entry in raw - ] - ) + def __eq__(self, other): + return list(other) == list(self) - # for compiled SQL constructs, copy additional lookup keys into - # the key lookup map, such as Column objects, labels, - # column keys and other names - if num_ctx_cols: - - # if by-primary-string dictionary smaller (or bigger?!) than - # number of columns, assume we have dupes, rewrite - # dupe records with "None" for index which results in - # ambiguous column exception when accessed. - if len(by_key) != num_ctx_cols: - # new in 1.4: get the complete set of all possible keys, - # strings, objects, whatever, that are dupes across two - # different records, first. - index_by_key = {} - dupes = set() - for metadata_entry in raw: - for key in (metadata_entry[MD_RENDERED_NAME],) + ( - metadata_entry[MD_OBJECTS] or () - ): - if not self.case_sensitive and isinstance( - key, util.string_types - ): - key = key.lower() - idx = metadata_entry[MD_INDEX] - # if this key has been associated with more than one - # positional index, it's a dupe - if index_by_key.setdefault(key, idx) != idx: - dupes.add(key) - - # then put everything we have into the keymap excluding only - # those keys that are dupes. - self._keymap.update( - [ - (obj_elem, metadata_entry) - for metadata_entry in raw - if metadata_entry[MD_OBJECTS] - for obj_elem in metadata_entry[MD_OBJECTS] - if obj_elem not in dupes - ] - ) + def __ne__(self, other): + return list(other) != list(self) - # then for the dupe keys, put the "ambiguous column" - # record into by_key. - by_key.update({key: (None, (), key) for key in dupes}) - else: - # no dupes - copy secondary elements from compiled - # columns into self._keymap - self._keymap.update( - [ - (obj_elem, metadata_entry) - for metadata_entry in raw - if metadata_entry[MD_OBJECTS] - for obj_elem in metadata_entry[MD_OBJECTS] - ] - ) +class SimpleResultMetaData(ResultMetaData): + """result metadata for in-memory collections.""" - # update keymap with primary string names taking - # precedence - self._keymap.update(by_key) - - # update keymap with "translated" names (sqlite-only thing) - if not num_ctx_cols and context._translate_colname: - self._keymap.update( - [ - ( - metadata_entry[MD_UNTRANSLATED], - self._keymap[metadata_entry[MD_LOOKUP_KEY]], - ) - for metadata_entry in raw - if metadata_entry[MD_UNTRANSLATED] - ] - ) + __slots__ = ( + "_keys", + "_keymap", + "_processors", + "_tuplefilter", + "_translated_indexes", + "_unique_filters", + ) - def _merge_cursor_description( + def __init__( self, - context, - cursor_description, - result_columns, - num_ctx_cols, - cols_are_ordered, - textual_ordered, - loose_column_name_matching, + keys, + extra=None, + _processors=None, + _tuplefilter=None, + _translated_indexes=None, + _unique_filters=None, ): - """Merge a cursor.description with compiled result column information. - - There are at least four separate strategies used here, selected - depending on the type of SQL construct used to start with. - - The most common case is that of the compiled SQL expression construct, - which generated the column names present in the raw SQL string and - which has the identical number of columns as were reported by - cursor.description. In this case, we assume a 1-1 positional mapping - between the entries in cursor.description and the compiled object. - This is also the most performant case as we disregard extracting / - decoding the column names present in cursor.description since we - already have the desired name we generated in the compiled SQL - construct. - - The next common case is that of the completely raw string SQL, - such as passed to connection.execute(). In this case we have no - compiled construct to work with, so we extract and decode the - names from cursor.description and index those as the primary - result row target keys. - - The remaining fairly common case is that of the textual SQL - that includes at least partial column information; this is when - we use a :class:`_expression.TextualSelect` construct. - This construct may have - unordered or ordered column information. In the ordered case, we - merge the cursor.description and the compiled construct's information - positionally, and warn if there are additional description names - present, however we still decode the names in cursor.description - as we don't have a guarantee that the names in the columns match - on these. In the unordered case, we match names in cursor.description - to that of the compiled construct based on name matching. - In both of these cases, the cursor.description names and the column - expression objects and names are indexed as result row target keys. - - The final case is much less common, where we have a compiled - non-textual SQL expression construct, but the number of columns - in cursor.description doesn't match what's in the compiled - construct. We make the guess here that there might be textual - column expressions in the compiled construct that themselves include - a comma in them causing them to split. We do the same name-matching - as with textual non-ordered columns. - - The name-matched system of merging is the same as that used by - SQLAlchemy for all cases up through te 0.9 series. Positional - matching for compiled SQL expressions was introduced in 1.0 as a - major performance feature, and positional matching for textual - :class:`_expression.TextualSelect` objects in 1.1. - As name matching is no longer - a common case, it was acceptable to factor it into smaller generator- - oriented methods that are easier to understand, but incur slightly - more performance overhead. - - """ + self._keys = list(keys) + self._tuplefilter = _tuplefilter + self._translated_indexes = _translated_indexes + self._unique_filters = _unique_filters + len_keys = len(self._keys) - case_sensitive = context.dialect.case_sensitive - - if ( - num_ctx_cols - and cols_are_ordered - and not textual_ordered - and num_ctx_cols == len(cursor_description) - ): - self.keys = [elem[0] for elem in result_columns] - # pure positional 1-1 case; doesn't need to read - # the names from cursor.description - return [ + if extra: + recs_names = [ ( - idx, - rmap_entry[RM_OBJECTS], - rmap_entry[RM_NAME].lower() - if not case_sensitive - else rmap_entry[RM_NAME], - rmap_entry[RM_RENDERED_NAME], - context.get_result_processor( - rmap_entry[RM_TYPE], - rmap_entry[RM_RENDERED_NAME], - cursor_description[idx][1], - ), - None, + (index, name, index - len_keys) + extras, + (index, name, extras), ) - for idx, rmap_entry in enumerate(result_columns) + for index, (name, extras) in enumerate(zip(self._keys, extra)) ] else: - # name-based or text-positional cases, where we need - # to read cursor.description names - if textual_ordered: - # textual positional case - raw_iterator = self._merge_textual_cols_by_position( - context, cursor_description, result_columns - ) - elif num_ctx_cols: - # compiled SQL with a mismatch of description cols - # vs. compiled cols, or textual w/ unordered columns - raw_iterator = self._merge_cols_by_name( - context, - cursor_description, - result_columns, - loose_column_name_matching, - ) - else: - # no compiled SQL, just a raw string - raw_iterator = self._merge_cols_by_none( - context, cursor_description - ) - - return [ - ( - idx, - obj, - cursor_colname, - cursor_colname, - context.get_result_processor( - mapped_type, cursor_colname, coltype - ), - untranslated, - ) - for ( - idx, - cursor_colname, - mapped_type, - coltype, - obj, - untranslated, - ) in raw_iterator + recs_names = [ + ((index, name, index - len_keys), (index, name, ())) + for index, name in enumerate(self._keys) ] - def _colnames_from_description(self, context, cursor_description): - """Extract column names and data types from a cursor.description. - - Applies unicode decoding, column translation, "normalization", - and case sensitivity rules to the names based on the dialect. - - """ - - dialect = context.dialect - case_sensitive = dialect.case_sensitive - translate_colname = context._translate_colname - description_decoder = ( - dialect._description_decoder - if dialect.description_encoding - else None - ) - normalize_name = ( - dialect.normalize_name if dialect.requires_name_normalize else None - ) - untranslated = None - - self.keys = [] - - for idx, rec in enumerate(cursor_description): - colname = rec[0] - coltype = rec[1] - - if description_decoder: - colname = description_decoder(colname) + self._keymap = {key: rec for keys, rec in recs_names for key in keys} - if translate_colname: - colname, untranslated = translate_colname(colname) - - if normalize_name: - colname = normalize_name(colname) - - self.keys.append(colname) - if not case_sensitive: - colname = colname.lower() - - yield idx, colname, untranslated, coltype - - def _merge_textual_cols_by_position( - self, context, cursor_description, result_columns - ): - num_ctx_cols = len(result_columns) if result_columns else None - - if num_ctx_cols > len(cursor_description): - util.warn( - "Number of columns in textual SQL (%d) is " - "smaller than number of columns requested (%d)" - % (num_ctx_cols, len(cursor_description)) - ) - seen = set() - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - if idx < num_ctx_cols: - ctx_rec = result_columns[idx] - obj = ctx_rec[RM_OBJECTS] - mapped_type = ctx_rec[RM_TYPE] - if obj[0] in seen: - raise exc.InvalidRequestError( - "Duplicate column expression requested " - "in textual SQL: %r" % obj[0] - ) - seen.add(obj[0]) - else: - mapped_type = sqltypes.NULLTYPE - obj = None - yield idx, colname, mapped_type, coltype, obj, untranslated - - def _merge_cols_by_name( - self, - context, - cursor_description, - result_columns, - loose_column_name_matching, - ): - dialect = context.dialect - case_sensitive = dialect.case_sensitive - match_map = self._create_description_match_map( - result_columns, case_sensitive, loose_column_name_matching + if _processors is None: + self._processors = [None] * len_keys + else: + self._processors = _processors + + def _for_freeze(self): + unique_filters = self._unique_filters + if unique_filters and self._tuplefilter: + unique_filters = self._tuplefilter(unique_filters) + + # TODO: are we freezing the result with or without uniqueness + # applied? + return SimpleResultMetaData( + self._keys, + extra=[self._keymap[key][2] for key in self._keys], + _unique_filters=unique_filters, ) - self.matched_on_name = True - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - try: - ctx_rec = match_map[colname] - except KeyError: - mapped_type = sqltypes.NULLTYPE - obj = None - else: - obj = ctx_rec[1] - mapped_type = ctx_rec[2] - yield idx, colname, mapped_type, coltype, obj, untranslated - - @classmethod - def _create_description_match_map( - cls, - result_columns, - case_sensitive=True, - loose_column_name_matching=False, - ): - """when matching cursor.description to a set of names that are present - in a Compiled object, as is the case with TextualSelect, get all the - names we expect might match those in cursor.description. - """ - - d = {} - for elem in result_columns: - key = elem[RM_RENDERED_NAME] - - if not case_sensitive: - key = key.lower() - if key in d: - # conflicting keyname - just add the column-linked objects - # to the existing record. if there is a duplicate column - # name in the cursor description, this will allow all of those - # objects to raise an ambiguous column error - e_name, e_obj, e_type = d[key] - d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type - else: - d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE]) - - if loose_column_name_matching: - # when using a textual statement with an unordered set - # of columns that line up, we are expecting the user - # to be using label names in the SQL that match to the column - # expressions. Enable more liberal matching for this case; - # duplicate keys that are ambiguous will be fixed later. - for r_key in elem[RM_OBJECTS]: - d.setdefault( - r_key, (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE]) - ) - - return d - - def _merge_cols_by_none(self, context, cursor_description): - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated + def __getstate__(self): + return { + "_keys": self._keys, + "_translated_indexes": self._translated_indexes, + } - def _key_fallback(self, key, err, raiseerr=True): - if raiseerr: - util.raise_( - exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" - % util.string_or_unprintable(key) - ), - replace_context=err, - ) + def __setstate__(self, state): + if state["_translated_indexes"]: + _translated_indexes = state["_translated_indexes"] + _tuplefilter = tuplegetter(*_translated_indexes) else: - return None - - def _raise_for_ambiguous_column_name(self, rec): - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in " - "result set column descriptions" % rec[MD_LOOKUP_KEY] + _translated_indexes = _tuplefilter = None + self.__init__( + state["_keys"], + _translated_indexes=_translated_indexes, + _tuplefilter=_tuplefilter, ) - def _warn_for_nonint(self, key): - raise TypeError( - "TypeError: tuple indices must be integers or slices, not %s" - % type(key).__name__ - ) + def _contains(self, value, row): + return value in row._data - def _getter(self, key, raiseerr=True): + def _index_for_key(self, key, raiseerr=True): try: rec = self._keymap[key] except KeyError as ke: rec = self._key_fallback(key, ke, raiseerr) - if rec is None: - return None - - index, obj = rec[0:2] - - if index is None: - self._raise_for_ambiguous_column_name(rec) - - return operator.methodcaller("_get_by_key_impl_mapping", index) - - def _tuple_getter(self, keys, raiseerr=True): - """Given a list of keys, return a callable that will deliver a tuple. - - This is strictly used by the ORM and the keys are Column objects. - However, this might be some nice-ish feature if we could find a very - clean way of presenting it. - note that in the new world of "row._mapping", this is a mapping-getter. - maybe the name should indicate that somehow. + return rec[0] + def _indexes_for_keys(self, keys): + for rec in self._metadata_for_keys(keys): + yield rec[0] - """ - indexes = [] + def _metadata_for_keys(self, keys): for key in keys: - if isinstance(key, int): - indexes.append(key) - continue try: rec = self._keymap[key] except KeyError as ke: - rec = self._key_fallback(key, ke, raiseerr) - if rec is None: - return None - - index, obj = rec[0:2] - - if index is None: - self._raise_for_ambiguous_column_name(obj) - indexes.append(index) - - if _baserow_usecext: - return _tuplegetter(*indexes) - else: - return self._pure_py_tuplegetter(*indexes) - - def _pure_py_tuplegetter(self, *indexes): - getters = [ - operator.methodcaller("_get_by_key_impl_mapping", index) - for index in indexes - ] - return lambda rec: tuple(getter(rec) for getter in getters) - - def __getstate__(self): - return { - "_keymap": { - key: (rec[MD_INDEX], _UNPICKLED, key) - for key, rec in self._keymap.items() - if isinstance(key, util.string_types + util.int_types) - }, - "keys": self.keys, - "case_sensitive": self.case_sensitive, - "matched_on_name": self.matched_on_name, - } + rec = self._key_fallback(key, ke, True) - def __setstate__(self, state): - self._processors = [None for _ in range(len(state["keys"]))] - self._keymap = state["_keymap"] + yield rec - self.keys = state["keys"] - self.case_sensitive = state["case_sensitive"] - self.matched_on_name = state["matched_on_name"] + def _reduce(self, keys): + try: + metadata_for_keys = [self._keymap[key] for key in keys] + except KeyError as ke: + self._key_fallback(ke.args[0], ke, True) + indexes, new_keys, extra = zip(*metadata_for_keys) -class LegacyCursorResultMetaData(CursorResultMetaData): - def _contains(self, value, row): - key = value - if key in self._keymap: - util.warn_deprecated_20( - "Using the 'in' operator to test for string or column " - "keys, or integer indexes, in a :class:`.Row` object is " - "deprecated and will " - "be removed in a future release. " - "Use the `Row._fields` or `Row._mapping` attribute, i.e. " - "'key in row._fields'", - ) - return True - else: - return self._key_fallback(key, None, False) is not None + if self._translated_indexes: + indexes = [self._translated_indexes[idx] for idx in indexes] - def _key_fallback(self, key, err, raiseerr=True): - map_ = self._keymap - result = None - - if isinstance(key, util.string_types): - result = map_.get(key if self.case_sensitive else key.lower()) - elif isinstance(key, expression.ColumnElement): - if ( - key._label - and (key._label if self.case_sensitive else key._label.lower()) - in map_ - ): - result = map_[ - key._label if self.case_sensitive else key._label.lower() - ] - elif ( - hasattr(key, "name") - and (key.name if self.case_sensitive else key.name.lower()) - in map_ - ): - # match is only on name. - result = map_[ - key.name if self.case_sensitive else key.name.lower() - ] + tup = tuplegetter(*indexes) - # search extra hard to make sure this - # isn't a column/label name overlap. - # this check isn't currently available if the row - # was unpickled. - if result is not None and result[MD_OBJECTS] not in ( - None, - _UNPICKLED, - ): - for obj in result[MD_OBJECTS]: - if key._compare_name_for_result(obj): - break - else: - result = None - if result is not None: - if result[MD_OBJECTS] is _UNPICKLED: - util.warn_deprecated( - "Retreiving row values using Column objects from a " - "row that was unpickled is deprecated; adequate " - "state cannot be pickled for this to be efficient. " - "This usage will raise KeyError in a future release.", - version="1.4", - ) - else: - util.warn_deprecated( - "Retreiving row values using Column objects with only " - "matching names as keys is deprecated, and will raise " - "KeyError in a future release; only Column " - "objects that are explicitly part of the statement " - "object should be used.", - version="1.4", - ) - if result is None: - if raiseerr: - util.raise_( - exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" - % util.string_or_unprintable(key) - ), - replace_context=err, - ) - else: - return None - else: - map_[key] = result - return result - - def _warn_for_nonint(self, key): - util.warn_deprecated_20( - "Using non-integer/slice indices on Row is deprecated and will " - "be removed in version 2.0; please use row._mapping[<key>], or " - "the mappings() accessor on the sqlalchemy.future result object.", - stacklevel=4, + new_metadata = SimpleResultMetaData( + new_keys, + extra=extra, + _tuplefilter=tup, + _translated_indexes=indexes, + _processors=self._processors, + _unique_filters=self._unique_filters, ) - def _has_key(self, key): - if key in self._keymap: - return True - else: - return self._key_fallback(key, None, False) is not None - - -class CursorFetchStrategy(object): - """Define a cursor strategy for a result object. - - Subclasses define different ways of fetching rows, typically but - not necessarily using a DBAPI cursor object. + return new_metadata - .. versionadded:: 1.4 - """ - - __slots__ = ("dbapi_cursor", "cursor_description") - - def __init__(self, dbapi_cursor, cursor_description): - self.dbapi_cursor = dbapi_cursor - self.cursor_description = cursor_description - - @classmethod - def create(cls, result): - raise NotImplementedError() - - def soft_close(self, result): - raise NotImplementedError() - - def hard_close(self, result): - raise NotImplementedError() - - def fetchone(self): - raise NotImplementedError() - - def fetchmany(self, size=None): - raise NotImplementedError() - - def fetchall(self): - raise NotImplementedError() - - -class NoCursorDQLFetchStrategy(CursorFetchStrategy): - """Cursor strategy for a DQL result that has no open cursor. - - This is a result set that can return rows, i.e. for a SELECT, or for an - INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state - where the cursor is closed and no rows remain available. The owning result - object may or may not be "hard closed", which determines if the fetch - methods send empty results or raise for closed result. - - """ - - __slots__ = ("closed",) - - def __init__(self, closed): - self.closed = closed - self.cursor_description = None - - def soft_close(self, result): - pass - - def hard_close(self, result): - self.closed = True - - def fetchone(self): - return self._non_result(None) - - def fetchmany(self, size=None): - return self._non_result([]) +def result_tuple(fields, extra=None): + parent = SimpleResultMetaData(fields, extra) + return functools.partial(Row, parent, parent._processors, parent._keymap) - def fetchall(self): - return self._non_result([]) - def _non_result(self, default, err=None): - if self.closed: - util.raise_( - exc.ResourceClosedError("This result object is closed."), - replace_context=err, - ) - else: - return default +# a symbol that indicates to internal Result methods that +# "no row is returned". We can't use None for those cases where a scalar +# filter is applied to rows. +_NO_ROW = util.symbol("NO_ROW") -class NoCursorDMLFetchStrategy(CursorFetchStrategy): - """Cursor strategy for a DML result that has no open cursor. +class Result(InPlaceGenerative): + """Represent a set of database results. - This is a result set that does not return rows, i.e. for an INSERT, - UPDATE, DELETE that does not include RETURNING. + .. versionadded:: 1.4 The :class:`.Result` object provides a completely + updated usage model and calling facade for SQLAlchemy Core and + SQLAlchemy ORM. In Core, it forms the basis of the + :class:`.CursorResult` object which replaces the previous + :class:`.ResultProxy` interface. """ - __slots__ = ("closed",) + _process_row = Row - def __init__(self, closed): - self.closed = closed - self.cursor_description = None + _row_logging_fn = None - def soft_close(self, result): - pass + _column_slice_filter = None + _post_creational_filter = None + _unique_filter_state = None + _no_scalar_onerow = False + _yield_per = None - def hard_close(self, result): - self.closed = True + def __init__(self, cursor_metadata): + self._metadata = cursor_metadata - def fetchone(self): - return self._non_result(None) + def _soft_close(self, hard=False): + raise NotImplementedError() - def fetchmany(self, size=None): - return self._non_result([]) + def keys(self): + """Return an iterable view which yields the string keys that would + be represented by each :class:`.Row`. - def fetchall(self): - return self._non_result([]) + The view also can be tested for key containment using the Python + ``in`` operator, which will test both for the string keys represented + in the view, as well as for alternate keys such as column objects. - def _non_result(self, default, err=None): - util.raise_( - exc.ResourceClosedError( - "This result object does not return rows. " - "It has been closed automatically." - ), - replace_context=err, - ) + .. versionchanged:: 1.4 a key view object is returned rather than a + plain list. -class DefaultCursorFetchStrategy(CursorFetchStrategy): - """Call fetch methods from a DBAPI cursor. + """ + return self._metadata.keys - Alternate versions of this class may instead buffer the rows from - cursors or not use cursors at all. + @_generative + def yield_per(self, num): + """Configure the row-fetching strategy to fetch num rows at a time. - """ + This impacts the underlying behavior of the result when iterating over + the result object, or otherwise making use of methods such as + :meth:`_engine.Result.fetchone` that return one row at a time. Data + from the underlying cursor or other data source will be buffered up to + this many rows in memory, and the buffered collection will then be + yielded out one row at at time or as many rows are requested. Each time + the buffer clears, it will be refreshed to this many rows or as many + rows remain if fewer remain. - @classmethod - def create(cls, result): - dbapi_cursor = result.cursor - description = dbapi_cursor.description + The :meth:`_engine.Result.yield_per` method is generally used in + conjunction with the + :paramref:`_engine.Connection.execution_options.stream_results` + execution option, which will allow the database dialect in use to make + use of a server side cursor, if the DBAPI supports it. - if description is None: - return NoCursorDMLFetchStrategy(False) - else: - return cls(dbapi_cursor, description) + Most DBAPIs do not use server side cursors by default, which means all + rows will be fetched upfront from the database regardless of the + :meth:`_engine.Result.yield_per` setting. However, + :meth:`_engine.Result.yield_per` may still be useful in that it batches + the SQLAlchemy-side processing of the raw data from the database, and + additionally when used for ORM scenarios will batch the conversion of + database rows into ORM entity rows. - def soft_close(self, result): - result.cursor_strategy = NoCursorDQLFetchStrategy(False) - def hard_close(self, result): - result.cursor_strategy = NoCursorDQLFetchStrategy(True) + .. versionadded:: 1.4 - def fetchone(self): - return self.dbapi_cursor.fetchone() + :param num: number of rows to fetch each time the buffer is refilled. + If set to a value below 1, fetches all rows for the next buffer. - def fetchmany(self, size=None): - if size is None: - return self.dbapi_cursor.fetchmany() - else: - return self.dbapi_cursor.fetchmany(size) + """ + self._yield_per = num + + @_generative + def unique(self, strategy=None): + """Apply unique filtering to the objects returned by this + :class:`_engine.Result`. + + When this filter is applied with no arguments, the rows or objects + returned will filtered such that each row is returned uniquely. The + algorithm used to determine this uniqueness is by default the Python + hashing identity of the whole tuple. In some cases a specialized + per-entity hashing scheme may be used, such as when using the ORM, a + scheme is applied which works against the primary key identity of + returned objects. + + The unique filter is applied **after all other filters**, which means + if the columns returned have been refined using a method such as the + :meth:`_engine.Result.columns` or :meth:`_engine.Result.scalars` + method, the uniquing is applied to **only the column or columns + returned**. This occurs regardless of the order in which these + methods have been called upon the :class:`_engine.Result` object. + + The unique filter also changes the calculus used for methods like + :meth:`_engine.Result.fetchmany` and :meth:`_engine.Result.partitions`. + When using :meth:`_engine.Result.unique`, these methods will continue + to yield the number of rows or objects requested, after uniquing + has been applied. However, this necessarily impacts the buffering + behavior of the underlying cursor or datasource, such that multiple + underlying calls to ``cursor.fetchmany()`` may be necessary in order + to accumulate enough objects in order to provide a unique collection + of the requested size. + + :param strategy: a callable that will be applied to rows or objects + being iterated, which should return an object that represents the + unique value of the row. A Python ``set()`` is used to store + these identities. If not passed, a default uniqueness strategy + is used which may have been assembled by the source of this + :class:`_engine.Result` object. - def fetchall(self): - return self.dbapi_cursor.fetchall() + """ + self._unique_filter_state = (set(), strategy) + @HasMemoized.memoized_attribute + def _unique_strategy(self): + uniques, strategy = self._unique_filter_state -class BufferedRowCursorFetchStrategy(DefaultCursorFetchStrategy): - """A cursor fetch strategy with row buffering behavior. + if not strategy and self._metadata._unique_filters: + filters = self._metadata._unique_filters + if self._metadata._tuplefilter: + filters = self._metadata._tuplefilter(filters) - This strategy buffers the contents of a selection of rows - before ``fetchone()`` is called. This is to allow the results of - ``cursor.description`` to be available immediately, when - interfacing with a DB-API that requires rows to be consumed before - this information is available (currently psycopg2, when used with - server-side cursors). + strategy = operator.methodcaller("_filter_on_values", filters) + return uniques, strategy - The pre-fetching behavior fetches only one row initially, and then - grows its buffer size by a fixed amount with each successive need - for additional rows up the ``max_row_buffer`` size, which defaults - to 1000:: + def columns(self, *col_expressions): + r"""Establish the columns that should be returned in each row. - with psycopg2_engine.connect() as conn: + This method may be used to limit the columns returned as well + as to reorder them. The given list of expressions are normally + a series of integers or string key names. They may also be + appropriate :class:`.ColumnElement` objects which correspond to + a given statement construct. - result = conn.execution_options( - stream_results=True, max_row_buffer=50 - ).execute(text("select * from table")) + E.g.:: - .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. + statement = select(table.c.x, table.c.y, table.c.z) + result = connection.execute(statement) - .. seealso:: + for z, y in result.columns('z', 'y'): + # ... - :ref:`psycopg2_execution_options` - """ - __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize") + Example of using the column objects from the statement itself:: - def __init__( - self, max_row_buffer, dbapi_cursor, description, initial_buffer - ): - super(BufferedRowCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) + for z, y in result.columns( + statement.selected_columns.c.z, + statement.selected_columns.c.y + ): + # ... - self._max_row_buffer = max_row_buffer - self._growth_factor = 5 - self._rowbuffer = initial_buffer + .. versionadded:: 1.4 - self._bufsize = min(self._max_row_buffer, self._growth_factor) + :param \*col_expressions: indicates columns to be returned. Elements + may be integer row indexes, string column names, or appropriate + :class:`.ColumnElement` objects corresponding to a select construct. - @classmethod - def create(cls, result): - """Buffered row strategy has to buffer the first rows *before* - cursor.description is fetched so that it works with named cursors - correctly + :return: this :class:`_engine.Result` object with the modifications + given. """ + return self._column_slices(col_expressions) - dbapi_cursor = result.cursor + def partitions(self, size=None): + """Iterate through sub-lists of rows of the size given. - initial_buffer = collections.deque(dbapi_cursor.fetchmany(1)) + Each list will be of the size given, excluding the last list to + be yielded, which may have a small number of rows. No empty + lists will be yielded. - description = dbapi_cursor.description + The result object is automatically closed when the iterator + is fully consumed. - if description is None: - return NoCursorDMLFetchStrategy(False) - else: - max_row_buffer = result.context.execution_options.get( - "max_row_buffer", 1000 - ) - return cls( - max_row_buffer, dbapi_cursor, description, initial_buffer - ) + Note that the backend driver will usually buffer the entire result + ahead of time unless the + :paramref:`.Connection.execution_options.stream_results` execution + option is used indicating that the driver should not pre-buffer + results, if possible. Not all drivers support this option and + the option is silently ignored for those who do. - def __buffer_rows(self): - size = self._bufsize - self._rowbuffer = collections.deque(self.dbapi_cursor.fetchmany(size)) - if size < self._max_row_buffer: - self._bufsize = min( - self._max_row_buffer, size * self._growth_factor - ) + .. versionadded:: 1.4 - def soft_close(self, result): - self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).soft_close(result) + :param size: indicate the maximum number of rows to be present + in each list yielded. If None, makes use of the value set by + :meth:`_engine.Result.yield_per`, if present, otherwise uses the + :meth:`_engine.Result.fetchmany` default which may be backend + specific. - def hard_close(self, result): - self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).hard_close(result) + :return: iterator of lists - def fetchone(self): - if not self._rowbuffer: - self.__buffer_rows() - if not self._rowbuffer: - return None - return self._rowbuffer.popleft() + """ + getter = self._manyrow_getter - def fetchmany(self, size=None): - if size is None: - return self.fetchall() - result = [] - for x in range(0, size): - row = self.fetchone() - if row is None: + while True: + partition = getter(self, size) + if partition: + yield partition + else: break - result.append(row) - return result - def fetchall(self): - self._rowbuffer.extend(self.dbapi_cursor.fetchall()) - ret = self._rowbuffer - self._rowbuffer = collections.deque() - return ret + def scalars(self, index=0): + """Apply a scalars filter to returned rows. + When this filter is applied, fetching results will return Python scalar + objects from exactly one column of each row, rather than :class:`.Row` + objects or mappings. -class FullyBufferedCursorFetchStrategy(DefaultCursorFetchStrategy): - """A cursor strategy that buffers rows fully upon creation. + This filter cancels out other filters that may be established such + as that of :meth:`_engine.Result.mappings`. - Used for operations where a result is to be delivered - after the database conversation can not be continued, - such as MSSQL INSERT...OUTPUT after an autocommit. + .. versionadded:: 1.4 - """ + :param index: integer or row key indicating the column to be fetched + from each row, defaults to ``0`` indicating the first column. - __slots__ = ("_rowbuffer",) + :return: this :class:`_engine.Result` object with modifications. - def __init__(self, dbapi_cursor, description, initial_buffer=None): - super(FullyBufferedCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) - if initial_buffer is not None: - self._rowbuffer = collections.deque(initial_buffer) - else: - self._rowbuffer = self._buffer_rows() - - @classmethod - def create_from_buffer(cls, dbapi_cursor, description, buffer): - return cls(dbapi_cursor, description, buffer) - - def _buffer_rows(self): - return collections.deque(self.dbapi_cursor.fetchall()) - - def soft_close(self, result): - self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).soft_close(result) - - def hard_close(self, result): - self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).hard_close(result) - - def fetchone(self): - if self._rowbuffer: - return self._rowbuffer.popleft() - else: - return None - - def fetchmany(self, size=None): - if size is None: - return self.fetchall() - result = [] - for x in range(0, size): - row = self.fetchone() - if row is None: - break - result.append(row) + """ + result = self._column_slices([index]) + result._post_creational_filter = operator.itemgetter(0) + result._no_scalar_onerow = True return result - def fetchall(self): - ret = self._rowbuffer - self._rowbuffer = collections.deque() - return ret - - -class BaseResult(object): - """Base class for database result objects. - - - :class:`.BaseResult` is the base class for the 1.x style - :class:`_engine.ResultProxy` class as well as the 2.x style - :class:`_future.Result` class. - - """ - - out_parameters = None - _metadata = None - _soft_closed = False - closed = False - - @classmethod - def _create_for_context(cls, context): - if context._is_future_result: - obj = object.__new__(_future_Result) - else: - obj = object.__new__(ResultProxy) - obj.__init__(context) - return obj - - def __init__(self, context): - self.context = context - self.dialect = context.dialect - self.cursor = context.cursor - self.connection = context.root_connection - self._echo = ( - self.connection._echo and context.engine._should_log_debug() - ) - self._init_metadata() - - def _init_metadata(self): - self.cursor_strategy = strat = self.context.get_result_cursor_strategy( - self - ) - - if strat.cursor_description is not None: - if self.context.compiled: - if self.context.compiled._cached_metadata: - cached_md = self.context.compiled._cached_metadata - self._metadata = cached_md._adapt_to_context(self.context) - - else: - self._metadata = ( - self.context.compiled._cached_metadata - ) = self._cursor_metadata(self, strat.cursor_description) - else: - self._metadata = self._cursor_metadata( - self, strat.cursor_description - ) - if self._echo: - self.context.engine.logger.debug( - "Col %r", tuple(x[0] for x in strat.cursor_description) - ) - # leave cursor open so that execution context can continue - # setting up things like rowcount - - def keys(self): - """Return the list of string keys that would represented by each - :class:`.Row`.""" - - if self._metadata: - return self._metadata.keys - else: - return [] + @_generative + def _column_slices(self, indexes): + self._metadata = self._metadata._reduce(indexes) def _getter(self, key, raiseerr=True): - try: - getter = self._metadata._getter - except AttributeError as err: - return self.cursor_strategy._non_result(None, err) - else: - return getter(key, raiseerr) - - def _tuple_getter(self, key, raiseerr=True): - try: - getter = self._metadata._tuple_getter - except AttributeError as err: - return self.cursor_strategy._non_result(None, err) - else: - return getter(key, raiseerr) - - def _has_key(self, key): - try: - has_key = self._metadata._has_key - except AttributeError as err: - return self.cursor_strategy._non_result(None, err) - else: - return has_key(key) - - def _soft_close(self, hard=False): - """Soft close this :class:`_engine.ResultProxy`. + """return a callable that will retrieve the given key from a + :class:`.Row`. - This releases all DBAPI cursor resources, but leaves the - ResultProxy "open" from a semantic perspective, meaning the - fetchXXX() methods will continue to return empty results. - - This method is called automatically when: + """ + return self._metadata._getter(key, raiseerr) - * all result rows are exhausted using the fetchXXX() methods. - * cursor.description is None. + def _tuple_getter(self, keys): + """return a callable that will retrieve the given keys from a + :class:`.Row`. - This method is **not public**, but is documented in order to clarify - the "autoclose" process used. + """ + return self._metadata._row_as_tuple_getter(keys) - .. versionadded:: 1.0.0 + @_generative + def mappings(self): + """Apply a mappings filter to returned rows. - .. seealso:: + When this filter is applied, fetching rows will return + :class:`.RowMapping` objects instead of :class:`.Row` objects. - :meth:`_engine.ResultProxy.close` + This filter cancels out other filters that may be established such + as that of :meth:`_engine.Result.scalars`. + .. versionadded:: 1.4 + :return: this :class:`._engine.Result` object with modifications. """ + self._post_creational_filter = operator.attrgetter("_mapping") + self._no_scalar_onerow = False - if (not hard and self._soft_closed) or (hard and self.closed): - return + def _row_getter(self): + process_row = self._process_row + metadata = self._metadata - if hard: - self.closed = True - self.cursor_strategy.hard_close(self) - else: - self.cursor_strategy.soft_close(self) - - if not self._soft_closed: - cursor = self.cursor - self.cursor = None - self.connection._safe_close_cursor(cursor) - self._soft_closed = True - - @util.memoized_property - def inserted_primary_key(self): - """Return the primary key for the row just inserted. - - The return value is a list of scalar values - corresponding to the list of primary key columns - in the target table. - - This only applies to single row :func:`_expression.insert` - constructs which did not explicitly specify - :meth:`_expression.Insert.returning`. - - Note that primary key columns which specify a - server_default clause, - or otherwise do not qualify as "autoincrement" - columns (see the notes at :class:`_schema.Column`), and were - generated using the database-side default, will - appear in this list as ``None`` unless the backend - supports "returning" and the insert statement executed - with the "implicit returning" enabled. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() construct. + keymap = metadata._keymap + processors = metadata._processors + tf = metadata._tuplefilter - """ + if tf: + processors = tf(processors) - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() " "expression construct." + _make_row_orig = functools.partial( + process_row, metadata, processors, keymap ) - elif self.context._is_explicit_returning: - raise exc.InvalidRequestError( - "Can't call inserted_primary_key " - "when returning() " - "is used." - ) - - return self.context.inserted_primary_key - def last_updated_params(self): - """Return the collection of updated parameters from this - execution. + def make_row(row): + return _make_row_orig(tf(row)) - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an update() construct. - - """ - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an update() " "expression construct." - ) - elif self.context.executemany: - return self.context.compiled_parameters else: - return self.context.compiled_parameters[0] - - def last_inserted_params(self): - """Return the collection of inserted parameters from this - execution. + make_row = functools.partial( + process_row, metadata, processors, keymap + ) - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() construct. + fns = () - """ - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() " "expression construct." - ) - elif self.context.executemany: - return self.context.compiled_parameters + if self._row_logging_fn: + fns = (self._row_logging_fn,) else: - return self.context.compiled_parameters[0] + fns = () - @property - def returned_defaults(self): - """Return the values of default columns that were fetched using - the :meth:`.ValuesBase.return_defaults` feature. + if self._column_slice_filter: + fns += (self._column_slice_filter,) - The value is an instance of :class:`.Row`, or ``None`` - if :meth:`.ValuesBase.return_defaults` was not used or if the - backend does not support RETURNING. + if fns: + _make_row = make_row - .. versionadded:: 0.9.0 + def make_row(row): + row = _make_row(row) + for fn in fns: + row = fn(row) + return row - .. seealso:: + return make_row - :meth:`.ValuesBase.return_defaults` + def _raw_row_iterator(self): + """Return a safe iterator that yields raw row data. + + This is used by the :meth:`._engine.Result.merge` method + to merge multiple compatible results together. """ - return self.context.returned_defaults + raise NotImplementedError() - def lastrow_has_defaults(self): - """Return ``lastrow_has_defaults()`` from the underlying - :class:`.ExecutionContext`. + def freeze(self): + """Return a callable object that will produce copies of this + :class:`.Result` when invoked. - See :class:`.ExecutionContext` for details. + This is used for result set caching. The method must be called + on the result when it has been unconsumed, and calling the method + will consume the result fully. """ + return FrozenResult(self) - return self.context.lastrow_has_defaults() + def merge(self, *others): + """Merge this :class:`.Result` with other compatible result + objects. - def postfetch_cols(self): - """Return ``postfetch_cols()`` from the underlying - :class:`.ExecutionContext`. + The object returned is an instance of :class:`.MergedResult`, + which will be composed of iterators from the given result + objects. - See :class:`.ExecutionContext` for details. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() or update() construct. + The new result will use the metadata from this result object. + The subsequent result objects must be against an identical + set of result / cursor metadata, otherwise the behavior is + undefined. """ + return MergedResult(self._metadata, (self,) + others) - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isinsert and not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an insert() or update() " - "expression construct." - ) - return self.context.postfetch_cols + @HasMemoized.memoized_attribute + def _iterator_getter(self): - def prefetch_cols(self): - """Return ``prefetch_cols()`` from the underlying - :class:`.ExecutionContext`. + make_row = self._row_getter() - See :class:`.ExecutionContext` for details. + post_creational_filter = self._post_creational_filter - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() or update() construct. + if self._unique_filter_state: + uniques, strategy = self._unique_strategy - """ + def iterrows(self): + for row in self._fetchiter_impl(): + obj = make_row(row) + hashed = strategy(obj) if strategy else obj + if hashed in uniques: + continue + uniques.add(hashed) + if post_creational_filter: + obj = post_creational_filter(obj) + yield obj - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled " "expression construct." - ) - elif not self.context.isinsert and not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an insert() or update() " - "expression construct." - ) - return self.context.prefetch_cols + else: - def supports_sane_rowcount(self): - """Return ``supports_sane_rowcount`` from the dialect. + def iterrows(self): + for row in self._fetchiter_impl(): + row = make_row(row) + if post_creational_filter: + row = post_creational_filter(row) + yield row - See :attr:`_engine.ResultProxy.rowcount` for background. + return iterrows - """ + @HasMemoized.memoized_attribute + def _allrow_getter(self): - return self.dialect.supports_sane_rowcount + make_row = self._row_getter() - def supports_sane_multi_rowcount(self): - """Return ``supports_sane_multi_rowcount`` from the dialect. + post_creational_filter = self._post_creational_filter - See :attr:`_engine.ResultProxy.rowcount` for background. + if self._unique_filter_state: + uniques, strategy = self._unique_strategy - """ + def allrows(self): + rows = self._fetchall_impl() + rows = [ + made_row + for made_row, sig_row in [ + ( + made_row, + strategy(made_row) if strategy else made_row, + ) + for made_row in [make_row(row) for row in rows] + ] + if sig_row not in uniques and not uniques.add(sig_row) + ] - return self.dialect.supports_sane_multi_rowcount + if post_creational_filter: + rows = [post_creational_filter(row) for row in rows] + return rows - @util.memoized_property - def rowcount(self): - """Return the 'rowcount' for this result. + else: - The 'rowcount' reports the number of rows *matched* - by the WHERE criterion of an UPDATE or DELETE statement. + def allrows(self): + rows = self._fetchall_impl() + if post_creational_filter: + rows = [ + post_creational_filter(make_row(row)) for row in rows + ] + else: + rows = [make_row(row) for row in rows] + return rows + + return allrows + + @HasMemoized.memoized_attribute + def _onerow_getter(self): + make_row = self._row_getter() + + # TODO: this is a lot for results that are only one row. + # all of this could be in _only_one_row except for fetchone() + # and maybe __next__ + + post_creational_filter = self._post_creational_filter + + if self._unique_filter_state: + uniques, strategy = self._unique_strategy + + def onerow(self): + _onerow = self._fetchone_impl + while True: + row = _onerow() + if row is None: + return _NO_ROW + else: + obj = make_row(row) + hashed = strategy(obj) if strategy else obj + if hashed in uniques: + continue + else: + uniques.add(hashed) + if post_creational_filter: + obj = post_creational_filter(obj) + return obj - .. note:: + else: - Notes regarding :attr:`_engine.ResultProxy.rowcount`: + def onerow(self): + row = self._fetchone_impl() + if row is None: + return _NO_ROW + else: + row = make_row(row) + if post_creational_filter: + row = post_creational_filter(row) + return row + + return onerow + + @HasMemoized.memoized_attribute + def _manyrow_getter(self): + make_row = self._row_getter() + + post_creational_filter = self._post_creational_filter + + if self._unique_filter_state: + uniques, strategy = self._unique_strategy + + def filterrows(make_row, rows, strategy, uniques): + return [ + made_row + for made_row, sig_row in [ + ( + made_row, + strategy(made_row) if strategy else made_row, + ) + for made_row in [make_row(row) for row in rows] + ] + if sig_row not in uniques and not uniques.add(sig_row) + ] + def manyrows(self, num): + collect = [] + + _manyrows = self._fetchmany_impl + + if num is None: + # if None is passed, we don't know the default + # manyrows number, DBAPI has this as cursor.arraysize + # different DBAPIs / fetch strategies may be different. + # do a fetch to find what the number is. if there are + # only fewer rows left, then it doesn't matter. + if self._yield_per: + num_required = num = self._yield_per + else: + rows = _manyrows(num) + num = len(rows) + collect.extend( + filterrows(make_row, rows, strategy, uniques) + ) + num_required = num - len(collect) + else: + num_required = num - * This attribute returns the number of rows *matched*, - which is not necessarily the same as the number of rows - that were actually *modified* - an UPDATE statement, for example, - may have no net change on a given row if the SET values - given are the same as those present in the row already. - Such a row would be matched but not modified. - On backends that feature both styles, such as MySQL, - rowcount is configured by default to return the match - count in all cases. + while num_required: + rows = _manyrows(num_required) + if not rows: + break - * :attr:`_engine.ResultProxy.rowcount` - is *only* useful in conjunction - with an UPDATE or DELETE statement. Contrary to what the Python - DBAPI says, it does *not* return the - number of rows available from the results of a SELECT statement - as DBAPIs cannot support this functionality when rows are - unbuffered. + collect.extend( + filterrows(make_row, rows, strategy, uniques) + ) + num_required = num - len(collect) - * :attr:`_engine.ResultProxy.rowcount` - may not be fully implemented by - all dialects. In particular, most DBAPIs do not support an - aggregate rowcount result from an executemany call. - The :meth:`_engine.ResultProxy.supports_sane_rowcount` and - :meth:`_engine.ResultProxy.supports_sane_multi_rowcount` methods - will report from the dialect if each usage is known to be - supported. + if post_creational_filter: + collect = [post_creational_filter(row) for row in collect] + return collect - * Statements that use RETURNING may not return a correct - rowcount. + else: - """ - try: - return self.context.rowcount - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) + def manyrows(self, num): + if num is None: + num = self._yield_per - @property - def lastrowid(self): - """return the 'lastrowid' accessor on the DBAPI cursor. + rows = self._fetchmany_impl(num) + rows = [make_row(row) for row in rows] + if post_creational_filter: + rows = [post_creational_filter(row) for row in rows] + return rows - This is a DBAPI specific method and is only functional - for those backends which support it, for statements - where it is appropriate. It's behavior is not - consistent across backends. + return manyrows - Usage of this method is normally unnecessary when - using insert() expression constructs; the - :attr:`~ResultProxy.inserted_primary_key` attribute provides a - tuple of primary key values for a newly inserted row, - regardless of database backend. + def _fetchiter_impl(self): + raise NotImplementedError() - """ - try: - return self.context.get_lastrowid() - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) + def _fetchone_impl(self): + raise NotImplementedError() - @property - def returns_rows(self): - """True if this :class:`_engine.ResultProxy` returns rows. + def _fetchall_impl(self): + raise NotImplementedError() - I.e. if it is legal to call the methods - :meth:`_engine.ResultProxy.fetchone`, - :meth:`_engine.ResultProxy.fetchmany` - :meth:`_engine.ResultProxy.fetchall`. + def _fetchmany_impl(self, size=None): + raise NotImplementedError() - """ - return self._metadata is not None + def __iter__(self): + return self._iterator_getter(self) - @property - def is_insert(self): - """True if this :class:`_engine.ResultProxy` is the result - of a executing an expression language compiled - :func:`_expression.insert` construct. + def __next__(self): + row = self._onerow_getter(self) + if row is _NO_ROW: + raise StopIteration() + else: + return row - When True, this implies that the - :attr:`inserted_primary_key` attribute is accessible, - assuming the statement did not include - a user defined "returning" construct. + next = __next__ - """ - return self.context.isinsert + def fetchall(self): + """A synonym for the :meth:`_engine.Result.all` method.""" + return self._allrow_getter(self) -class ResultProxy(BaseResult): - """A facade around a DBAPI cursor object. + def fetchone(self): + """Fetch one row. - Returns database rows via the :class:`.Row` class, which provides - additional API features and behaviors on top of the raw data returned - by the DBAPI. + When all rows are exhausted, returns None. - Within the scope of the 1.x series of SQLAlchemy, the - :class:`_engine.ResultProxy` - will in fact return instances of the :class:`.LegacyRow` class, which - maintains Python mapping (i.e. dictionary) like behaviors upon the object - itself. Going forward, the :attr:`.Row._mapping` attribute should be used - for dictionary behaviors. + .. note:: This method is not compatible with the + :meth:`_result.Result.scalars` + filter, as there is no way to distinguish between a data value of + None and the ending value. Prefer to use iterative / collection + methods which support scalar None values. - .. seealso:: + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. - :ref:`coretutorial_selecting` - introductory material for accessing - :class:`_engine.ResultProxy` and :class:`.Row` objects. + To fetch the first row of a result only, use the + :meth:`_engine.Result.first` method. To iterate through all + rows, iterate the :class:`_engine.Result` object directly. - """ + :return: a :class:`.Row` object if no filters are applied, or None + if no rows remain. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. - _autoclose_connection = False - _process_row = LegacyRow - _cursor_metadata = LegacyCursorResultMetaData - _cursor_strategy_cls = DefaultCursorFetchStrategy + """ + if self._no_scalar_onerow: + raise exc.InvalidRequestError( + "Can't use fetchone() when returning scalar values; there's " + "no way to distinguish between end of results and None" + ) + row = self._onerow_getter(self) + if row is _NO_ROW: + return None + else: + return row - def __iter__(self): - """Implement iteration protocol.""" + def fetchmany(self, size=None): + """Fetch many rows. - while True: - row = self.fetchone() - if row is None: - return - else: - yield row + When all rows are exhausted, returns an empty list. - def close(self): - """Close this :class:`_engine.ResultProxy`. - - This closes out the underlying DBAPI cursor corresponding - to the statement execution, if one is still present. Note that the - DBAPI cursor is automatically released when the - :class:`_engine.ResultProxy` - exhausts all available rows. :meth:`_engine.ResultProxy.close` - is generally - an optional method except in the case when discarding a - :class:`_engine.ResultProxy` - that still has additional rows pending for fetch. - - In the case of a result that is the product of - :ref:`connectionless execution <dbengine_implicit>`, - the underlying :class:`_engine.Connection` object is also closed, - which - :term:`releases` DBAPI connection resources. - - .. deprecated:: 2.0 "connectionless" execution is deprecated and will - be removed in version 2.0. Version 2.0 will feature the - :class:`_future.Result` - object that will no longer affect the status - of the originating connection in any case. - - After this method is called, it is no longer valid to call upon - the fetch methods, which will raise a :class:`.ResourceClosedError` - on subsequent use. + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. - .. seealso:: + To fetch rows in groups, use the :meth:`._result.Result.partitions` + method. - :ref:`connections_toplevel` + :return: a list of :class:`.Row` objects if no filters are applied. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. """ - self._soft_close(hard=True) + return self._manyrow_getter(self, size) - def _soft_close(self, hard=False): - soft_closed = self._soft_closed - super(ResultProxy, self)._soft_close(hard=hard) - if ( - not soft_closed - and self._soft_closed - and self._autoclose_connection - ): - self.connection.close() + def all(self): + """Return all rows in a list. - def __next__(self): - """Implement the Python next() protocol. + Closes the result set after invocation. Subsequent invocations + will return an empty list. - This method, mirrored as both ``.next()`` and ``.__next__()``, is part - of Python's API for producing iterator-like behavior. + .. versionadded:: 1.4 - .. versionadded:: 1.2 + :return: a list of :class:`.Row` objects if no filters are applied. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. """ - row = self.fetchone() - if row is None: - raise StopIteration() + return self._allrow_getter(self) + + def _only_one_row(self, raise_for_second_row, raise_for_none): + row = self._onerow_getter(self) + if row is _NO_ROW: + if raise_for_none: + self._soft_close(hard=True) + raise exc.NoResultFound( + "No row was found when one was required" + ) + else: + return None else: - return row + if raise_for_second_row: + next_row = self._onerow_getter(self) + else: + next_row = _NO_ROW + self._soft_close(hard=True) + if next_row is not _NO_ROW: + raise exc.MultipleResultsFound( + "Multiple rows were found when exactly one was required" + if raise_for_none + else "Multiple rows were found when one or none " + "was required" + ) + else: + return row - next = __next__ + def first(self): + """Fetch the first row or None if no row is present. - def process_rows(self, rows): - process_row = self._process_row - metadata = self._metadata - keymap = metadata._keymap - processors = metadata._processors + Closes the result set and discards remaining rows. - if self._echo: - log = self.context.engine.logger.debug - l = [] - for row in rows: - log("Row %r", sql_util._repr_row(row)) - l.append(process_row(metadata, processors, keymap, row)) - return l - else: - return [ - process_row(metadata, processors, keymap, row) for row in rows - ] + .. comment: A warning is emitted if additional rows remain. - def fetchall(self): - """Fetch all rows, just like DB-API ``cursor.fetchall()``. + :return: a :class:`.Row` object if no filters are applied, or None + if no rows remain. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. - After all rows have been exhausted, the underlying DBAPI - cursor resource is released, and the object may be safely - discarded. + """ + return self._only_one_row(False, False) - Subsequent calls to :meth:`_engine.ResultProxy.fetchall` will return - an empty list. After the :meth:`_engine.ResultProxy.close` method is - called, the method will raise :class:`.ResourceClosedError`. + def one_or_none(self): + """Return at most one result or raise an exception. - :return: a list of :class:`.Row` objects + Returns ``None`` if the result has no rows. + Raises :class:`.MultipleResultsFound` + if multiple rows are returned. - """ + .. versionadded:: 1.4 - try: - l = self.process_rows(self.cursor_strategy.fetchall()) - self._soft_close() - return l - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) + :return: The first :class:`.Row` or None if no row is available. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. - def fetchmany(self, size=None): - """Fetch many rows, just like DB-API - ``cursor.fetchmany(size=cursor.arraysize)``. + :raises: :class:`.MultipleResultsFound` - After all rows have been exhausted, the underlying DBAPI - cursor resource is released, and the object may be safely - discarded. + .. seealso:: - Calls to :meth:`_engine.ResultProxy.fetchmany` - after all rows have been - exhausted will return - an empty list. After the :meth:`_engine.ResultProxy.close` method is - called, the method will raise :class:`.ResourceClosedError`. + :meth:`_result.Result.first` - :return: a list of :class:`.Row` objects + :meth:`_result.Result.one` """ + return self._only_one_row(True, False) - try: - l = self.process_rows(self.cursor_strategy.fetchmany(size)) - if len(l) == 0: - self._soft_close() - return l - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) + def one(self): + """Return exactly one result or raise an exception. - def _onerow(self): - return self.fetchone() + Raises :class:`.NoResultFound` if the result returns no + rows, or :class:`.MultipleResultsFound` if multiple rows + would be returned. - def fetchone(self): - """Fetch one row, just like DB-API ``cursor.fetchone()``. + .. versionadded:: 1.4 - After all rows have been exhausted, the underlying DBAPI - cursor resource is released, and the object may be safely - discarded. + :return: The first :class:`.Row`. + When filters are applied, such as :meth:`_engine.Result.mappings` + or :meth:`._engine.Result.scalar`, different kinds of objects + may be returned. - Calls to :meth:`_engine.ResultProxy.fetchone` after all rows have - been exhausted will return ``None``. - After the :meth:`_engine.ResultProxy.close` method is - called, the method will raise :class:`.ResourceClosedError`. + :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound` - :return: a :class:`.Row` object, or None if no rows remain - - """ - try: - row = self.cursor_strategy.fetchone() - if row is not None: - return self.process_rows([row])[0] - else: - self._soft_close() - return None - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) - - def first(self): - """Fetch the first row and then close the result set unconditionally. + .. seealso:: - After calling this method, the object is fully closed, - e.g. the :meth:`_engine.ResultProxy.close` - method will have been called. + :meth:`_result.Result.first` - :return: a :class:`.Row` object, or None if no rows remain + :meth:`_result.Result.one_or_none` """ - try: - row = self.cursor_strategy.fetchone() - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) - - try: - if row is not None: - return self.process_rows([row])[0] - else: - return None - finally: - self.close() + return self._only_one_row(True, True) def scalar(self): """Fetch the first column of the first row, and close the result set. After calling this method, the object is fully closed, - e.g. the :meth:`_engine.ResultProxy.close` + e.g. the :meth:`_engine.CursorResult.close` method will have been called. :return: a Python scalar value , or None if no rows remain @@ -1764,38 +1008,94 @@ class ResultProxy(BaseResult): return None -class BufferedRowResultProxy(ResultProxy): - """A ResultProxy with row buffering behavior. +class FrozenResult(object): + def __init__(self, result): + self.metadata = result._metadata._for_freeze() + self._post_creational_filter = result._post_creational_filter + result._post_creational_filter = None - .. deprecated:: 1.4 this class is now supplied using a strategy object. - See :class:`.BufferedRowCursorFetchStrategy`. + self.data = result.fetchall() - """ + def with_data(self, data): + fr = FrozenResult.__new__(FrozenResult) + fr.metadata = self.metadata + fr._post_creational_filter = self._post_creational_filter + fr.data = data + return fr - _cursor_strategy_cls = BufferedRowCursorFetchStrategy + def __call__(self): + result = IteratorResult(self.metadata, iter(self.data)) + result._post_creational_filter = self._post_creational_filter + return result -class FullyBufferedResultProxy(ResultProxy): - """A result proxy that buffers rows fully upon creation. +class IteratorResult(Result): + def __init__(self, cursor_metadata, iterator): + self._metadata = cursor_metadata + self.iterator = iterator - .. deprecated:: 1.4 this class is now supplied using a strategy object. - See :class:`.FullyBufferedCursorFetchStrategy`. + def _soft_close(self, **kw): + self.iterator = iter([]) - """ + def _raw_row_iterator(self): + return self.iterator - _cursor_strategy_cls = FullyBufferedCursorFetchStrategy + def _fetchiter_impl(self): + return self.iterator + def _fetchone_impl(self): + try: + return next(self.iterator) + except StopIteration: + self._soft_close() + return None -class BufferedColumnRow(LegacyRow): - """Row is now BufferedColumn in all cases""" + def _fetchall_impl(self): + try: + return list(self.iterator) + finally: + self._soft_close() + def _fetchmany_impl(self, size=None): + return list(itertools.islice(self.iterator, 0, size)) -class BufferedColumnResultProxy(ResultProxy): - """A ResultProxy with column buffering behavior. - .. versionchanged:: 1.4 This is now the default behavior of the Row - and this class does not change behavior in any way. +class ChunkedIteratorResult(IteratorResult): + def __init__(self, cursor_metadata, chunks): + self._metadata = cursor_metadata + self.chunks = chunks - """ + self.iterator = itertools.chain.from_iterable(self.chunks(None)) + + @_generative + def yield_per(self, num): + self._yield_per = num + self.iterator = itertools.chain.from_iterable(self.chunks(num)) - _process_row = BufferedColumnRow + +class MergedResult(IteratorResult): + closed = False + + def __init__(self, cursor_metadata, results): + self._results = results + super(MergedResult, self).__init__( + cursor_metadata, + itertools.chain.from_iterable( + r._raw_row_iterator() for r in results + ), + ) + + self._unique_filter_state = results[0]._unique_filter_state + self._post_creational_filter = results[0]._post_creational_filter + self._no_scalar_onerow = results[0]._no_scalar_onerow + self._yield_per = results[0]._yield_per + + def close(self): + self._soft_close(hard=True) + + def _soft_close(self, hard=False): + for r in self._results: + r._soft_close(hard=hard) + + if hard: + self.closed = True |