summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/result.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/result.py')
-rw-r--r--lib/sqlalchemy/engine/result.py2298
1 files changed, 799 insertions, 1499 deletions
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index bc3cdbb9a..a3a9cc489 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -5,1753 +5,997 @@
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Define result set constructs including :class:`.Result`"""
+"""Define generic result set constructs."""
-import collections
import functools
+import itertools
import operator
from .row import _baserow_usecext
-from .row import BaseRow # noqa
-from .row import LegacyRow # noqa
-from .row import Row # noqa
-from .row import RowMapping # noqa
-from .row import RowProxy # noqa
-from .row import rowproxy_reconstructor # noqa
+from .row import Row
from .. import exc
from .. import util
-from ..sql import expression
-from ..sql import sqltypes
-from ..sql import util as sql_util
-from ..sql.compiler import RM_NAME
-from ..sql.compiler import RM_OBJECTS
-from ..sql.compiler import RM_RENDERED_NAME
-from ..sql.compiler import RM_TYPE
+from ..sql.base import _generative
+from ..sql.base import HasMemoized
+from ..sql.base import InPlaceGenerative
+from ..util import collections_abc
if _baserow_usecext:
- from sqlalchemy.cresultproxy import tuplegetter as _tuplegetter
+ from sqlalchemy.cresultproxy import tuplegetter
-_UNPICKLED = util.symbol("unpickled")
+ _row_as_tuple = tuplegetter
+else:
-# cyclical import for sqlalchemy.future
-_future_Result = None
+ def tuplegetter(*indexes):
+ it = operator.itemgetter(*indexes)
-# metadata entry tuple indexes.
-# using raw tuple is faster than namedtuple.
-MD_INDEX = 0 # integer index in cursor.description
-MD_OBJECTS = 1 # other string keys and ColumnElement obj that can match
-MD_LOOKUP_KEY = 2 # string key we usually expect for key-based lookup
-MD_RENDERED_NAME = 3 # name that is usually in cursor.description
-MD_PROCESSOR = 4 # callable to process a result value into a row
-MD_UNTRANSLATED = 5 # raw name from cursor.description
+ if len(indexes) > 1:
+ return it
+ else:
+ return lambda row: (it(row),)
+
+ def _row_as_tuple(*indexes):
+ getters = [
+ operator.methodcaller("_get_by_key_impl_mapping", index)
+ for index in indexes
+ ]
+ return lambda rec: tuple(getter(rec) for getter in getters)
class ResultMetaData(object):
+ """Base for metadata about result rows."""
+
__slots__ = ()
- def _has_key(self, key):
- return key in self._keymap
+ _tuplefilter = None
+ _translated_indexes = None
+ _unique_filters = None
- def _key_fallback(self, key, err):
+ @property
+ def keys(self):
+ return RMKeyView(self)
+
+ def _for_freeze(self):
+ raise NotImplementedError()
+
+ def _key_fallback(self, key, err, raiseerr=True):
+ assert raiseerr
if isinstance(key, int):
util.raise_(IndexError(key), replace_context=err)
else:
util.raise_(KeyError(key), replace_context=err)
+ def _warn_for_nonint(self, key):
+ raise TypeError(
+ "TypeError: tuple indices must be integers or slices, not %s"
+ % type(key).__name__
+ )
-class SimpleResultMetaData(ResultMetaData):
- __slots__ = "keys", "_keymap", "_processors"
-
- def __init__(self, keys, extra=None):
- self.keys = list(keys)
-
- len_keys = len(keys)
-
- self._keymap = {
- name: (index, name) for index, name in enumerate(self.keys)
- }
- if not _baserow_usecext:
- self._keymap.update(
- {
- index: (index, None, self.keys[index])
- for index in range(len_keys)
- }
- )
- # TODO: negative indexes? test coverage?
- if extra:
- for key, ex in zip(keys, extra):
- rec = self._keymap[key]
- self._keymap.update({e: rec for e in ex})
- self._processors = [None] * len(keys)
-
- def __getstate__(self):
- return {"keys": self.keys}
+ def _index_for_key(self, keys, raiseerr):
+ raise NotImplementedError()
- def __setstate__(self, state):
- self.__init__(state["keys"])
+ def _metadata_for_keys(self, key):
+ raise NotImplementedError()
- def _has_key(self, key):
- return key in self._keymap
+ def _reduce(self, keys):
+ raise NotImplementedError()
- def _contains(self, value, row):
- return value in row._data
+ def _getter(self, key, raiseerr=True):
+ index = self._index_for_key(key, raiseerr)
-def result_tuple(fields, extra=None):
- parent = SimpleResultMetaData(fields, extra)
- return functools.partial(Row, parent, parent._processors, parent._keymap)
+ if index is not None:
+ return operator.methodcaller("_get_by_key_impl_mapping", index)
+ else:
+ return None
+ def _row_as_tuple_getter(self, keys):
+ indexes = list(self._indexes_for_keys(keys))
+ return _row_as_tuple(*indexes)
-class CursorResultMetaData(ResultMetaData):
- """Handle cursor.description, applying additional info from an execution
- context."""
- __slots__ = (
- "_keymap",
- "case_sensitive",
- "matched_on_name",
- "_processors",
- "keys",
- )
+class RMKeyView(collections_abc.KeysView):
+ __slots__ = ("_parent", "_keys")
- def _adapt_to_context(self, context):
- """When using a cached result metadata against a new context,
- we need to rewrite the _keymap so that it has the specific
- Column objects in the new context inside of it. this accommodates
- for select() constructs that contain anonymized columns and
- are cached.
+ def __init__(self, parent):
+ self._parent = parent
+ self._keys = [k for k in parent._keys if k is not None]
- """
- if not context.compiled._result_columns:
- return self
-
- compiled_statement = context.compiled.statement
- invoked_statement = context.invoked_statement
-
- # same statement was invoked as the one we cached against,
- # return self
- if compiled_statement is invoked_statement:
- return self
-
- # make a copy and add the columns from the invoked statement
- # to the result map.
- md = self.__class__.__new__(self.__class__)
-
- md._keymap = self._keymap.copy()
-
- # match up new columns positionally to the result columns
- for existing, new in zip(
- context.compiled._result_columns,
- invoked_statement._exported_columns_iterator(),
- ):
- md._keymap[new] = md._keymap[existing[RM_NAME]]
-
- md.case_sensitive = self.case_sensitive
- md.matched_on_name = self.matched_on_name
- md._processors = self._processors
- md.keys = self.keys
- return md
-
- def __init__(self, parent, cursor_description):
- context = parent.context
- dialect = context.dialect
- self.case_sensitive = dialect.case_sensitive
- self.matched_on_name = False
-
- if context.result_column_struct:
- (
- result_columns,
- cols_are_ordered,
- textual_ordered,
- loose_column_name_matching,
- ) = context.result_column_struct
- num_ctx_cols = len(result_columns)
- else:
- result_columns = (
- cols_are_ordered
- ) = (
- num_ctx_cols
- ) = loose_column_name_matching = textual_ordered = False
-
- # merge cursor.description with the column info
- # present in the compiled structure, if any
- raw = self._merge_cursor_description(
- context,
- cursor_description,
- result_columns,
- num_ctx_cols,
- cols_are_ordered,
- textual_ordered,
- loose_column_name_matching,
- )
+ def __len__(self):
+ return len(self._keys)
- self._keymap = {}
- if not _baserow_usecext:
- # keymap indexes by integer index: this is only used
- # in the pure Python BaseRow.__getitem__
- # implementation to avoid an expensive
- # isinstance(key, util.int_types) in the most common
- # case path
+ def __repr__(self):
+ return "{0.__class__.__name__}({0._keys!r})".format(self)
- len_raw = len(raw)
+ def __iter__(self):
+ return iter(self._keys)
- self._keymap.update(
- [
- (metadata_entry[MD_INDEX], metadata_entry)
- for metadata_entry in raw
- ]
- + [
- (metadata_entry[MD_INDEX] - len_raw, metadata_entry)
- for metadata_entry in raw
- ]
- )
+ def __contains__(self, item):
+ if not _baserow_usecext and isinstance(item, int):
+ return False
- # processors in key order for certain per-row
- # views like __iter__ and slices
- self._processors = [
- metadata_entry[MD_PROCESSOR] for metadata_entry in raw
- ]
+ # note this also includes special key fallback behaviors
+ # which also don't seem to be tested in test_resultset right now
+ return self._parent._has_key(item)
- # keymap by primary string...
- by_key = dict(
- [
- (metadata_entry[MD_LOOKUP_KEY], metadata_entry)
- for metadata_entry in raw
- ]
- )
+ def __eq__(self, other):
+ return list(other) == list(self)
- # for compiled SQL constructs, copy additional lookup keys into
- # the key lookup map, such as Column objects, labels,
- # column keys and other names
- if num_ctx_cols:
-
- # if by-primary-string dictionary smaller (or bigger?!) than
- # number of columns, assume we have dupes, rewrite
- # dupe records with "None" for index which results in
- # ambiguous column exception when accessed.
- if len(by_key) != num_ctx_cols:
- # new in 1.4: get the complete set of all possible keys,
- # strings, objects, whatever, that are dupes across two
- # different records, first.
- index_by_key = {}
- dupes = set()
- for metadata_entry in raw:
- for key in (metadata_entry[MD_RENDERED_NAME],) + (
- metadata_entry[MD_OBJECTS] or ()
- ):
- if not self.case_sensitive and isinstance(
- key, util.string_types
- ):
- key = key.lower()
- idx = metadata_entry[MD_INDEX]
- # if this key has been associated with more than one
- # positional index, it's a dupe
- if index_by_key.setdefault(key, idx) != idx:
- dupes.add(key)
-
- # then put everything we have into the keymap excluding only
- # those keys that are dupes.
- self._keymap.update(
- [
- (obj_elem, metadata_entry)
- for metadata_entry in raw
- if metadata_entry[MD_OBJECTS]
- for obj_elem in metadata_entry[MD_OBJECTS]
- if obj_elem not in dupes
- ]
- )
+ def __ne__(self, other):
+ return list(other) != list(self)
- # then for the dupe keys, put the "ambiguous column"
- # record into by_key.
- by_key.update({key: (None, (), key) for key in dupes})
- else:
- # no dupes - copy secondary elements from compiled
- # columns into self._keymap
- self._keymap.update(
- [
- (obj_elem, metadata_entry)
- for metadata_entry in raw
- if metadata_entry[MD_OBJECTS]
- for obj_elem in metadata_entry[MD_OBJECTS]
- ]
- )
+class SimpleResultMetaData(ResultMetaData):
+ """result metadata for in-memory collections."""
- # update keymap with primary string names taking
- # precedence
- self._keymap.update(by_key)
-
- # update keymap with "translated" names (sqlite-only thing)
- if not num_ctx_cols and context._translate_colname:
- self._keymap.update(
- [
- (
- metadata_entry[MD_UNTRANSLATED],
- self._keymap[metadata_entry[MD_LOOKUP_KEY]],
- )
- for metadata_entry in raw
- if metadata_entry[MD_UNTRANSLATED]
- ]
- )
+ __slots__ = (
+ "_keys",
+ "_keymap",
+ "_processors",
+ "_tuplefilter",
+ "_translated_indexes",
+ "_unique_filters",
+ )
- def _merge_cursor_description(
+ def __init__(
self,
- context,
- cursor_description,
- result_columns,
- num_ctx_cols,
- cols_are_ordered,
- textual_ordered,
- loose_column_name_matching,
+ keys,
+ extra=None,
+ _processors=None,
+ _tuplefilter=None,
+ _translated_indexes=None,
+ _unique_filters=None,
):
- """Merge a cursor.description with compiled result column information.
-
- There are at least four separate strategies used here, selected
- depending on the type of SQL construct used to start with.
-
- The most common case is that of the compiled SQL expression construct,
- which generated the column names present in the raw SQL string and
- which has the identical number of columns as were reported by
- cursor.description. In this case, we assume a 1-1 positional mapping
- between the entries in cursor.description and the compiled object.
- This is also the most performant case as we disregard extracting /
- decoding the column names present in cursor.description since we
- already have the desired name we generated in the compiled SQL
- construct.
-
- The next common case is that of the completely raw string SQL,
- such as passed to connection.execute(). In this case we have no
- compiled construct to work with, so we extract and decode the
- names from cursor.description and index those as the primary
- result row target keys.
-
- The remaining fairly common case is that of the textual SQL
- that includes at least partial column information; this is when
- we use a :class:`_expression.TextualSelect` construct.
- This construct may have
- unordered or ordered column information. In the ordered case, we
- merge the cursor.description and the compiled construct's information
- positionally, and warn if there are additional description names
- present, however we still decode the names in cursor.description
- as we don't have a guarantee that the names in the columns match
- on these. In the unordered case, we match names in cursor.description
- to that of the compiled construct based on name matching.
- In both of these cases, the cursor.description names and the column
- expression objects and names are indexed as result row target keys.
-
- The final case is much less common, where we have a compiled
- non-textual SQL expression construct, but the number of columns
- in cursor.description doesn't match what's in the compiled
- construct. We make the guess here that there might be textual
- column expressions in the compiled construct that themselves include
- a comma in them causing them to split. We do the same name-matching
- as with textual non-ordered columns.
-
- The name-matched system of merging is the same as that used by
- SQLAlchemy for all cases up through te 0.9 series. Positional
- matching for compiled SQL expressions was introduced in 1.0 as a
- major performance feature, and positional matching for textual
- :class:`_expression.TextualSelect` objects in 1.1.
- As name matching is no longer
- a common case, it was acceptable to factor it into smaller generator-
- oriented methods that are easier to understand, but incur slightly
- more performance overhead.
-
- """
+ self._keys = list(keys)
+ self._tuplefilter = _tuplefilter
+ self._translated_indexes = _translated_indexes
+ self._unique_filters = _unique_filters
+ len_keys = len(self._keys)
- case_sensitive = context.dialect.case_sensitive
-
- if (
- num_ctx_cols
- and cols_are_ordered
- and not textual_ordered
- and num_ctx_cols == len(cursor_description)
- ):
- self.keys = [elem[0] for elem in result_columns]
- # pure positional 1-1 case; doesn't need to read
- # the names from cursor.description
- return [
+ if extra:
+ recs_names = [
(
- idx,
- rmap_entry[RM_OBJECTS],
- rmap_entry[RM_NAME].lower()
- if not case_sensitive
- else rmap_entry[RM_NAME],
- rmap_entry[RM_RENDERED_NAME],
- context.get_result_processor(
- rmap_entry[RM_TYPE],
- rmap_entry[RM_RENDERED_NAME],
- cursor_description[idx][1],
- ),
- None,
+ (index, name, index - len_keys) + extras,
+ (index, name, extras),
)
- for idx, rmap_entry in enumerate(result_columns)
+ for index, (name, extras) in enumerate(zip(self._keys, extra))
]
else:
- # name-based or text-positional cases, where we need
- # to read cursor.description names
- if textual_ordered:
- # textual positional case
- raw_iterator = self._merge_textual_cols_by_position(
- context, cursor_description, result_columns
- )
- elif num_ctx_cols:
- # compiled SQL with a mismatch of description cols
- # vs. compiled cols, or textual w/ unordered columns
- raw_iterator = self._merge_cols_by_name(
- context,
- cursor_description,
- result_columns,
- loose_column_name_matching,
- )
- else:
- # no compiled SQL, just a raw string
- raw_iterator = self._merge_cols_by_none(
- context, cursor_description
- )
-
- return [
- (
- idx,
- obj,
- cursor_colname,
- cursor_colname,
- context.get_result_processor(
- mapped_type, cursor_colname, coltype
- ),
- untranslated,
- )
- for (
- idx,
- cursor_colname,
- mapped_type,
- coltype,
- obj,
- untranslated,
- ) in raw_iterator
+ recs_names = [
+ ((index, name, index - len_keys), (index, name, ()))
+ for index, name in enumerate(self._keys)
]
- def _colnames_from_description(self, context, cursor_description):
- """Extract column names and data types from a cursor.description.
-
- Applies unicode decoding, column translation, "normalization",
- and case sensitivity rules to the names based on the dialect.
-
- """
-
- dialect = context.dialect
- case_sensitive = dialect.case_sensitive
- translate_colname = context._translate_colname
- description_decoder = (
- dialect._description_decoder
- if dialect.description_encoding
- else None
- )
- normalize_name = (
- dialect.normalize_name if dialect.requires_name_normalize else None
- )
- untranslated = None
-
- self.keys = []
-
- for idx, rec in enumerate(cursor_description):
- colname = rec[0]
- coltype = rec[1]
-
- if description_decoder:
- colname = description_decoder(colname)
+ self._keymap = {key: rec for keys, rec in recs_names for key in keys}
- if translate_colname:
- colname, untranslated = translate_colname(colname)
-
- if normalize_name:
- colname = normalize_name(colname)
-
- self.keys.append(colname)
- if not case_sensitive:
- colname = colname.lower()
-
- yield idx, colname, untranslated, coltype
-
- def _merge_textual_cols_by_position(
- self, context, cursor_description, result_columns
- ):
- num_ctx_cols = len(result_columns) if result_columns else None
-
- if num_ctx_cols > len(cursor_description):
- util.warn(
- "Number of columns in textual SQL (%d) is "
- "smaller than number of columns requested (%d)"
- % (num_ctx_cols, len(cursor_description))
- )
- seen = set()
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- if idx < num_ctx_cols:
- ctx_rec = result_columns[idx]
- obj = ctx_rec[RM_OBJECTS]
- mapped_type = ctx_rec[RM_TYPE]
- if obj[0] in seen:
- raise exc.InvalidRequestError(
- "Duplicate column expression requested "
- "in textual SQL: %r" % obj[0]
- )
- seen.add(obj[0])
- else:
- mapped_type = sqltypes.NULLTYPE
- obj = None
- yield idx, colname, mapped_type, coltype, obj, untranslated
-
- def _merge_cols_by_name(
- self,
- context,
- cursor_description,
- result_columns,
- loose_column_name_matching,
- ):
- dialect = context.dialect
- case_sensitive = dialect.case_sensitive
- match_map = self._create_description_match_map(
- result_columns, case_sensitive, loose_column_name_matching
+ if _processors is None:
+ self._processors = [None] * len_keys
+ else:
+ self._processors = _processors
+
+ def _for_freeze(self):
+ unique_filters = self._unique_filters
+ if unique_filters and self._tuplefilter:
+ unique_filters = self._tuplefilter(unique_filters)
+
+ # TODO: are we freezing the result with or without uniqueness
+ # applied?
+ return SimpleResultMetaData(
+ self._keys,
+ extra=[self._keymap[key][2] for key in self._keys],
+ _unique_filters=unique_filters,
)
- self.matched_on_name = True
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- try:
- ctx_rec = match_map[colname]
- except KeyError:
- mapped_type = sqltypes.NULLTYPE
- obj = None
- else:
- obj = ctx_rec[1]
- mapped_type = ctx_rec[2]
- yield idx, colname, mapped_type, coltype, obj, untranslated
-
- @classmethod
- def _create_description_match_map(
- cls,
- result_columns,
- case_sensitive=True,
- loose_column_name_matching=False,
- ):
- """when matching cursor.description to a set of names that are present
- in a Compiled object, as is the case with TextualSelect, get all the
- names we expect might match those in cursor.description.
- """
-
- d = {}
- for elem in result_columns:
- key = elem[RM_RENDERED_NAME]
-
- if not case_sensitive:
- key = key.lower()
- if key in d:
- # conflicting keyname - just add the column-linked objects
- # to the existing record. if there is a duplicate column
- # name in the cursor description, this will allow all of those
- # objects to raise an ambiguous column error
- e_name, e_obj, e_type = d[key]
- d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type
- else:
- d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE])
-
- if loose_column_name_matching:
- # when using a textual statement with an unordered set
- # of columns that line up, we are expecting the user
- # to be using label names in the SQL that match to the column
- # expressions. Enable more liberal matching for this case;
- # duplicate keys that are ambiguous will be fixed later.
- for r_key in elem[RM_OBJECTS]:
- d.setdefault(
- r_key, (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE])
- )
-
- return d
-
- def _merge_cols_by_none(self, context, cursor_description):
- for (
- idx,
- colname,
- untranslated,
- coltype,
- ) in self._colnames_from_description(context, cursor_description):
- yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated
+ def __getstate__(self):
+ return {
+ "_keys": self._keys,
+ "_translated_indexes": self._translated_indexes,
+ }
- def _key_fallback(self, key, err, raiseerr=True):
- if raiseerr:
- util.raise_(
- exc.NoSuchColumnError(
- "Could not locate column in row for column '%s'"
- % util.string_or_unprintable(key)
- ),
- replace_context=err,
- )
+ def __setstate__(self, state):
+ if state["_translated_indexes"]:
+ _translated_indexes = state["_translated_indexes"]
+ _tuplefilter = tuplegetter(*_translated_indexes)
else:
- return None
-
- def _raise_for_ambiguous_column_name(self, rec):
- raise exc.InvalidRequestError(
- "Ambiguous column name '%s' in "
- "result set column descriptions" % rec[MD_LOOKUP_KEY]
+ _translated_indexes = _tuplefilter = None
+ self.__init__(
+ state["_keys"],
+ _translated_indexes=_translated_indexes,
+ _tuplefilter=_tuplefilter,
)
- def _warn_for_nonint(self, key):
- raise TypeError(
- "TypeError: tuple indices must be integers or slices, not %s"
- % type(key).__name__
- )
+ def _contains(self, value, row):
+ return value in row._data
- def _getter(self, key, raiseerr=True):
+ def _index_for_key(self, key, raiseerr=True):
try:
rec = self._keymap[key]
except KeyError as ke:
rec = self._key_fallback(key, ke, raiseerr)
- if rec is None:
- return None
-
- index, obj = rec[0:2]
-
- if index is None:
- self._raise_for_ambiguous_column_name(rec)
-
- return operator.methodcaller("_get_by_key_impl_mapping", index)
-
- def _tuple_getter(self, keys, raiseerr=True):
- """Given a list of keys, return a callable that will deliver a tuple.
-
- This is strictly used by the ORM and the keys are Column objects.
- However, this might be some nice-ish feature if we could find a very
- clean way of presenting it.
- note that in the new world of "row._mapping", this is a mapping-getter.
- maybe the name should indicate that somehow.
+ return rec[0]
+ def _indexes_for_keys(self, keys):
+ for rec in self._metadata_for_keys(keys):
+ yield rec[0]
- """
- indexes = []
+ def _metadata_for_keys(self, keys):
for key in keys:
- if isinstance(key, int):
- indexes.append(key)
- continue
try:
rec = self._keymap[key]
except KeyError as ke:
- rec = self._key_fallback(key, ke, raiseerr)
- if rec is None:
- return None
-
- index, obj = rec[0:2]
-
- if index is None:
- self._raise_for_ambiguous_column_name(obj)
- indexes.append(index)
-
- if _baserow_usecext:
- return _tuplegetter(*indexes)
- else:
- return self._pure_py_tuplegetter(*indexes)
-
- def _pure_py_tuplegetter(self, *indexes):
- getters = [
- operator.methodcaller("_get_by_key_impl_mapping", index)
- for index in indexes
- ]
- return lambda rec: tuple(getter(rec) for getter in getters)
-
- def __getstate__(self):
- return {
- "_keymap": {
- key: (rec[MD_INDEX], _UNPICKLED, key)
- for key, rec in self._keymap.items()
- if isinstance(key, util.string_types + util.int_types)
- },
- "keys": self.keys,
- "case_sensitive": self.case_sensitive,
- "matched_on_name": self.matched_on_name,
- }
+ rec = self._key_fallback(key, ke, True)
- def __setstate__(self, state):
- self._processors = [None for _ in range(len(state["keys"]))]
- self._keymap = state["_keymap"]
+ yield rec
- self.keys = state["keys"]
- self.case_sensitive = state["case_sensitive"]
- self.matched_on_name = state["matched_on_name"]
+ def _reduce(self, keys):
+ try:
+ metadata_for_keys = [self._keymap[key] for key in keys]
+ except KeyError as ke:
+ self._key_fallback(ke.args[0], ke, True)
+ indexes, new_keys, extra = zip(*metadata_for_keys)
-class LegacyCursorResultMetaData(CursorResultMetaData):
- def _contains(self, value, row):
- key = value
- if key in self._keymap:
- util.warn_deprecated_20(
- "Using the 'in' operator to test for string or column "
- "keys, or integer indexes, in a :class:`.Row` object is "
- "deprecated and will "
- "be removed in a future release. "
- "Use the `Row._fields` or `Row._mapping` attribute, i.e. "
- "'key in row._fields'",
- )
- return True
- else:
- return self._key_fallback(key, None, False) is not None
+ if self._translated_indexes:
+ indexes = [self._translated_indexes[idx] for idx in indexes]
- def _key_fallback(self, key, err, raiseerr=True):
- map_ = self._keymap
- result = None
-
- if isinstance(key, util.string_types):
- result = map_.get(key if self.case_sensitive else key.lower())
- elif isinstance(key, expression.ColumnElement):
- if (
- key._label
- and (key._label if self.case_sensitive else key._label.lower())
- in map_
- ):
- result = map_[
- key._label if self.case_sensitive else key._label.lower()
- ]
- elif (
- hasattr(key, "name")
- and (key.name if self.case_sensitive else key.name.lower())
- in map_
- ):
- # match is only on name.
- result = map_[
- key.name if self.case_sensitive else key.name.lower()
- ]
+ tup = tuplegetter(*indexes)
- # search extra hard to make sure this
- # isn't a column/label name overlap.
- # this check isn't currently available if the row
- # was unpickled.
- if result is not None and result[MD_OBJECTS] not in (
- None,
- _UNPICKLED,
- ):
- for obj in result[MD_OBJECTS]:
- if key._compare_name_for_result(obj):
- break
- else:
- result = None
- if result is not None:
- if result[MD_OBJECTS] is _UNPICKLED:
- util.warn_deprecated(
- "Retreiving row values using Column objects from a "
- "row that was unpickled is deprecated; adequate "
- "state cannot be pickled for this to be efficient. "
- "This usage will raise KeyError in a future release.",
- version="1.4",
- )
- else:
- util.warn_deprecated(
- "Retreiving row values using Column objects with only "
- "matching names as keys is deprecated, and will raise "
- "KeyError in a future release; only Column "
- "objects that are explicitly part of the statement "
- "object should be used.",
- version="1.4",
- )
- if result is None:
- if raiseerr:
- util.raise_(
- exc.NoSuchColumnError(
- "Could not locate column in row for column '%s'"
- % util.string_or_unprintable(key)
- ),
- replace_context=err,
- )
- else:
- return None
- else:
- map_[key] = result
- return result
-
- def _warn_for_nonint(self, key):
- util.warn_deprecated_20(
- "Using non-integer/slice indices on Row is deprecated and will "
- "be removed in version 2.0; please use row._mapping[<key>], or "
- "the mappings() accessor on the sqlalchemy.future result object.",
- stacklevel=4,
+ new_metadata = SimpleResultMetaData(
+ new_keys,
+ extra=extra,
+ _tuplefilter=tup,
+ _translated_indexes=indexes,
+ _processors=self._processors,
+ _unique_filters=self._unique_filters,
)
- def _has_key(self, key):
- if key in self._keymap:
- return True
- else:
- return self._key_fallback(key, None, False) is not None
-
-
-class CursorFetchStrategy(object):
- """Define a cursor strategy for a result object.
-
- Subclasses define different ways of fetching rows, typically but
- not necessarily using a DBAPI cursor object.
+ return new_metadata
- .. versionadded:: 1.4
- """
-
- __slots__ = ("dbapi_cursor", "cursor_description")
-
- def __init__(self, dbapi_cursor, cursor_description):
- self.dbapi_cursor = dbapi_cursor
- self.cursor_description = cursor_description
-
- @classmethod
- def create(cls, result):
- raise NotImplementedError()
-
- def soft_close(self, result):
- raise NotImplementedError()
-
- def hard_close(self, result):
- raise NotImplementedError()
-
- def fetchone(self):
- raise NotImplementedError()
-
- def fetchmany(self, size=None):
- raise NotImplementedError()
-
- def fetchall(self):
- raise NotImplementedError()
-
-
-class NoCursorDQLFetchStrategy(CursorFetchStrategy):
- """Cursor strategy for a DQL result that has no open cursor.
-
- This is a result set that can return rows, i.e. for a SELECT, or for an
- INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
- where the cursor is closed and no rows remain available. The owning result
- object may or may not be "hard closed", which determines if the fetch
- methods send empty results or raise for closed result.
-
- """
-
- __slots__ = ("closed",)
-
- def __init__(self, closed):
- self.closed = closed
- self.cursor_description = None
-
- def soft_close(self, result):
- pass
-
- def hard_close(self, result):
- self.closed = True
-
- def fetchone(self):
- return self._non_result(None)
-
- def fetchmany(self, size=None):
- return self._non_result([])
+def result_tuple(fields, extra=None):
+ parent = SimpleResultMetaData(fields, extra)
+ return functools.partial(Row, parent, parent._processors, parent._keymap)
- def fetchall(self):
- return self._non_result([])
- def _non_result(self, default, err=None):
- if self.closed:
- util.raise_(
- exc.ResourceClosedError("This result object is closed."),
- replace_context=err,
- )
- else:
- return default
+# a symbol that indicates to internal Result methods that
+# "no row is returned". We can't use None for those cases where a scalar
+# filter is applied to rows.
+_NO_ROW = util.symbol("NO_ROW")
-class NoCursorDMLFetchStrategy(CursorFetchStrategy):
- """Cursor strategy for a DML result that has no open cursor.
+class Result(InPlaceGenerative):
+ """Represent a set of database results.
- This is a result set that does not return rows, i.e. for an INSERT,
- UPDATE, DELETE that does not include RETURNING.
+ .. versionadded:: 1.4 The :class:`.Result` object provides a completely
+ updated usage model and calling facade for SQLAlchemy Core and
+ SQLAlchemy ORM. In Core, it forms the basis of the
+ :class:`.CursorResult` object which replaces the previous
+ :class:`.ResultProxy` interface.
"""
- __slots__ = ("closed",)
+ _process_row = Row
- def __init__(self, closed):
- self.closed = closed
- self.cursor_description = None
+ _row_logging_fn = None
- def soft_close(self, result):
- pass
+ _column_slice_filter = None
+ _post_creational_filter = None
+ _unique_filter_state = None
+ _no_scalar_onerow = False
+ _yield_per = None
- def hard_close(self, result):
- self.closed = True
+ def __init__(self, cursor_metadata):
+ self._metadata = cursor_metadata
- def fetchone(self):
- return self._non_result(None)
+ def _soft_close(self, hard=False):
+ raise NotImplementedError()
- def fetchmany(self, size=None):
- return self._non_result([])
+ def keys(self):
+ """Return an iterable view which yields the string keys that would
+ be represented by each :class:`.Row`.
- def fetchall(self):
- return self._non_result([])
+ The view also can be tested for key containment using the Python
+ ``in`` operator, which will test both for the string keys represented
+ in the view, as well as for alternate keys such as column objects.
- def _non_result(self, default, err=None):
- util.raise_(
- exc.ResourceClosedError(
- "This result object does not return rows. "
- "It has been closed automatically."
- ),
- replace_context=err,
- )
+ .. versionchanged:: 1.4 a key view object is returned rather than a
+ plain list.
-class DefaultCursorFetchStrategy(CursorFetchStrategy):
- """Call fetch methods from a DBAPI cursor.
+ """
+ return self._metadata.keys
- Alternate versions of this class may instead buffer the rows from
- cursors or not use cursors at all.
+ @_generative
+ def yield_per(self, num):
+ """Configure the row-fetching strategy to fetch num rows at a time.
- """
+ This impacts the underlying behavior of the result when iterating over
+ the result object, or otherwise making use of methods such as
+ :meth:`_engine.Result.fetchone` that return one row at a time. Data
+ from the underlying cursor or other data source will be buffered up to
+ this many rows in memory, and the buffered collection will then be
+ yielded out one row at at time or as many rows are requested. Each time
+ the buffer clears, it will be refreshed to this many rows or as many
+ rows remain if fewer remain.
- @classmethod
- def create(cls, result):
- dbapi_cursor = result.cursor
- description = dbapi_cursor.description
+ The :meth:`_engine.Result.yield_per` method is generally used in
+ conjunction with the
+ :paramref:`_engine.Connection.execution_options.stream_results`
+ execution option, which will allow the database dialect in use to make
+ use of a server side cursor, if the DBAPI supports it.
- if description is None:
- return NoCursorDMLFetchStrategy(False)
- else:
- return cls(dbapi_cursor, description)
+ Most DBAPIs do not use server side cursors by default, which means all
+ rows will be fetched upfront from the database regardless of the
+ :meth:`_engine.Result.yield_per` setting. However,
+ :meth:`_engine.Result.yield_per` may still be useful in that it batches
+ the SQLAlchemy-side processing of the raw data from the database, and
+ additionally when used for ORM scenarios will batch the conversion of
+ database rows into ORM entity rows.
- def soft_close(self, result):
- result.cursor_strategy = NoCursorDQLFetchStrategy(False)
- def hard_close(self, result):
- result.cursor_strategy = NoCursorDQLFetchStrategy(True)
+ .. versionadded:: 1.4
- def fetchone(self):
- return self.dbapi_cursor.fetchone()
+ :param num: number of rows to fetch each time the buffer is refilled.
+ If set to a value below 1, fetches all rows for the next buffer.
- def fetchmany(self, size=None):
- if size is None:
- return self.dbapi_cursor.fetchmany()
- else:
- return self.dbapi_cursor.fetchmany(size)
+ """
+ self._yield_per = num
+
+ @_generative
+ def unique(self, strategy=None):
+ """Apply unique filtering to the objects returned by this
+ :class:`_engine.Result`.
+
+ When this filter is applied with no arguments, the rows or objects
+ returned will filtered such that each row is returned uniquely. The
+ algorithm used to determine this uniqueness is by default the Python
+ hashing identity of the whole tuple. In some cases a specialized
+ per-entity hashing scheme may be used, such as when using the ORM, a
+ scheme is applied which works against the primary key identity of
+ returned objects.
+
+ The unique filter is applied **after all other filters**, which means
+ if the columns returned have been refined using a method such as the
+ :meth:`_engine.Result.columns` or :meth:`_engine.Result.scalars`
+ method, the uniquing is applied to **only the column or columns
+ returned**. This occurs regardless of the order in which these
+ methods have been called upon the :class:`_engine.Result` object.
+
+ The unique filter also changes the calculus used for methods like
+ :meth:`_engine.Result.fetchmany` and :meth:`_engine.Result.partitions`.
+ When using :meth:`_engine.Result.unique`, these methods will continue
+ to yield the number of rows or objects requested, after uniquing
+ has been applied. However, this necessarily impacts the buffering
+ behavior of the underlying cursor or datasource, such that multiple
+ underlying calls to ``cursor.fetchmany()`` may be necessary in order
+ to accumulate enough objects in order to provide a unique collection
+ of the requested size.
+
+ :param strategy: a callable that will be applied to rows or objects
+ being iterated, which should return an object that represents the
+ unique value of the row. A Python ``set()`` is used to store
+ these identities. If not passed, a default uniqueness strategy
+ is used which may have been assembled by the source of this
+ :class:`_engine.Result` object.
- def fetchall(self):
- return self.dbapi_cursor.fetchall()
+ """
+ self._unique_filter_state = (set(), strategy)
+ @HasMemoized.memoized_attribute
+ def _unique_strategy(self):
+ uniques, strategy = self._unique_filter_state
-class BufferedRowCursorFetchStrategy(DefaultCursorFetchStrategy):
- """A cursor fetch strategy with row buffering behavior.
+ if not strategy and self._metadata._unique_filters:
+ filters = self._metadata._unique_filters
+ if self._metadata._tuplefilter:
+ filters = self._metadata._tuplefilter(filters)
- This strategy buffers the contents of a selection of rows
- before ``fetchone()`` is called. This is to allow the results of
- ``cursor.description`` to be available immediately, when
- interfacing with a DB-API that requires rows to be consumed before
- this information is available (currently psycopg2, when used with
- server-side cursors).
+ strategy = operator.methodcaller("_filter_on_values", filters)
+ return uniques, strategy
- The pre-fetching behavior fetches only one row initially, and then
- grows its buffer size by a fixed amount with each successive need
- for additional rows up the ``max_row_buffer`` size, which defaults
- to 1000::
+ def columns(self, *col_expressions):
+ r"""Establish the columns that should be returned in each row.
- with psycopg2_engine.connect() as conn:
+ This method may be used to limit the columns returned as well
+ as to reorder them. The given list of expressions are normally
+ a series of integers or string key names. They may also be
+ appropriate :class:`.ColumnElement` objects which correspond to
+ a given statement construct.
- result = conn.execution_options(
- stream_results=True, max_row_buffer=50
- ).execute(text("select * from table"))
+ E.g.::
- .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
+ statement = select(table.c.x, table.c.y, table.c.z)
+ result = connection.execute(statement)
- .. seealso::
+ for z, y in result.columns('z', 'y'):
+ # ...
- :ref:`psycopg2_execution_options`
- """
- __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize")
+ Example of using the column objects from the statement itself::
- def __init__(
- self, max_row_buffer, dbapi_cursor, description, initial_buffer
- ):
- super(BufferedRowCursorFetchStrategy, self).__init__(
- dbapi_cursor, description
- )
+ for z, y in result.columns(
+ statement.selected_columns.c.z,
+ statement.selected_columns.c.y
+ ):
+ # ...
- self._max_row_buffer = max_row_buffer
- self._growth_factor = 5
- self._rowbuffer = initial_buffer
+ .. versionadded:: 1.4
- self._bufsize = min(self._max_row_buffer, self._growth_factor)
+ :param \*col_expressions: indicates columns to be returned. Elements
+ may be integer row indexes, string column names, or appropriate
+ :class:`.ColumnElement` objects corresponding to a select construct.
- @classmethod
- def create(cls, result):
- """Buffered row strategy has to buffer the first rows *before*
- cursor.description is fetched so that it works with named cursors
- correctly
+ :return: this :class:`_engine.Result` object with the modifications
+ given.
"""
+ return self._column_slices(col_expressions)
- dbapi_cursor = result.cursor
+ def partitions(self, size=None):
+ """Iterate through sub-lists of rows of the size given.
- initial_buffer = collections.deque(dbapi_cursor.fetchmany(1))
+ Each list will be of the size given, excluding the last list to
+ be yielded, which may have a small number of rows. No empty
+ lists will be yielded.
- description = dbapi_cursor.description
+ The result object is automatically closed when the iterator
+ is fully consumed.
- if description is None:
- return NoCursorDMLFetchStrategy(False)
- else:
- max_row_buffer = result.context.execution_options.get(
- "max_row_buffer", 1000
- )
- return cls(
- max_row_buffer, dbapi_cursor, description, initial_buffer
- )
+ Note that the backend driver will usually buffer the entire result
+ ahead of time unless the
+ :paramref:`.Connection.execution_options.stream_results` execution
+ option is used indicating that the driver should not pre-buffer
+ results, if possible. Not all drivers support this option and
+ the option is silently ignored for those who do.
- def __buffer_rows(self):
- size = self._bufsize
- self._rowbuffer = collections.deque(self.dbapi_cursor.fetchmany(size))
- if size < self._max_row_buffer:
- self._bufsize = min(
- self._max_row_buffer, size * self._growth_factor
- )
+ .. versionadded:: 1.4
- def soft_close(self, result):
- self._rowbuffer.clear()
- super(BufferedRowCursorFetchStrategy, self).soft_close(result)
+ :param size: indicate the maximum number of rows to be present
+ in each list yielded. If None, makes use of the value set by
+ :meth:`_engine.Result.yield_per`, if present, otherwise uses the
+ :meth:`_engine.Result.fetchmany` default which may be backend
+ specific.
- def hard_close(self, result):
- self._rowbuffer.clear()
- super(BufferedRowCursorFetchStrategy, self).hard_close(result)
+ :return: iterator of lists
- def fetchone(self):
- if not self._rowbuffer:
- self.__buffer_rows()
- if not self._rowbuffer:
- return None
- return self._rowbuffer.popleft()
+ """
+ getter = self._manyrow_getter
- def fetchmany(self, size=None):
- if size is None:
- return self.fetchall()
- result = []
- for x in range(0, size):
- row = self.fetchone()
- if row is None:
+ while True:
+ partition = getter(self, size)
+ if partition:
+ yield partition
+ else:
break
- result.append(row)
- return result
- def fetchall(self):
- self._rowbuffer.extend(self.dbapi_cursor.fetchall())
- ret = self._rowbuffer
- self._rowbuffer = collections.deque()
- return ret
+ def scalars(self, index=0):
+ """Apply a scalars filter to returned rows.
+ When this filter is applied, fetching results will return Python scalar
+ objects from exactly one column of each row, rather than :class:`.Row`
+ objects or mappings.
-class FullyBufferedCursorFetchStrategy(DefaultCursorFetchStrategy):
- """A cursor strategy that buffers rows fully upon creation.
+ This filter cancels out other filters that may be established such
+ as that of :meth:`_engine.Result.mappings`.
- Used for operations where a result is to be delivered
- after the database conversation can not be continued,
- such as MSSQL INSERT...OUTPUT after an autocommit.
+ .. versionadded:: 1.4
- """
+ :param index: integer or row key indicating the column to be fetched
+ from each row, defaults to ``0`` indicating the first column.
- __slots__ = ("_rowbuffer",)
+ :return: this :class:`_engine.Result` object with modifications.
- def __init__(self, dbapi_cursor, description, initial_buffer=None):
- super(FullyBufferedCursorFetchStrategy, self).__init__(
- dbapi_cursor, description
- )
- if initial_buffer is not None:
- self._rowbuffer = collections.deque(initial_buffer)
- else:
- self._rowbuffer = self._buffer_rows()
-
- @classmethod
- def create_from_buffer(cls, dbapi_cursor, description, buffer):
- return cls(dbapi_cursor, description, buffer)
-
- def _buffer_rows(self):
- return collections.deque(self.dbapi_cursor.fetchall())
-
- def soft_close(self, result):
- self._rowbuffer.clear()
- super(FullyBufferedCursorFetchStrategy, self).soft_close(result)
-
- def hard_close(self, result):
- self._rowbuffer.clear()
- super(FullyBufferedCursorFetchStrategy, self).hard_close(result)
-
- def fetchone(self):
- if self._rowbuffer:
- return self._rowbuffer.popleft()
- else:
- return None
-
- def fetchmany(self, size=None):
- if size is None:
- return self.fetchall()
- result = []
- for x in range(0, size):
- row = self.fetchone()
- if row is None:
- break
- result.append(row)
+ """
+ result = self._column_slices([index])
+ result._post_creational_filter = operator.itemgetter(0)
+ result._no_scalar_onerow = True
return result
- def fetchall(self):
- ret = self._rowbuffer
- self._rowbuffer = collections.deque()
- return ret
-
-
-class BaseResult(object):
- """Base class for database result objects.
-
-
- :class:`.BaseResult` is the base class for the 1.x style
- :class:`_engine.ResultProxy` class as well as the 2.x style
- :class:`_future.Result` class.
-
- """
-
- out_parameters = None
- _metadata = None
- _soft_closed = False
- closed = False
-
- @classmethod
- def _create_for_context(cls, context):
- if context._is_future_result:
- obj = object.__new__(_future_Result)
- else:
- obj = object.__new__(ResultProxy)
- obj.__init__(context)
- return obj
-
- def __init__(self, context):
- self.context = context
- self.dialect = context.dialect
- self.cursor = context.cursor
- self.connection = context.root_connection
- self._echo = (
- self.connection._echo and context.engine._should_log_debug()
- )
- self._init_metadata()
-
- def _init_metadata(self):
- self.cursor_strategy = strat = self.context.get_result_cursor_strategy(
- self
- )
-
- if strat.cursor_description is not None:
- if self.context.compiled:
- if self.context.compiled._cached_metadata:
- cached_md = self.context.compiled._cached_metadata
- self._metadata = cached_md._adapt_to_context(self.context)
-
- else:
- self._metadata = (
- self.context.compiled._cached_metadata
- ) = self._cursor_metadata(self, strat.cursor_description)
- else:
- self._metadata = self._cursor_metadata(
- self, strat.cursor_description
- )
- if self._echo:
- self.context.engine.logger.debug(
- "Col %r", tuple(x[0] for x in strat.cursor_description)
- )
- # leave cursor open so that execution context can continue
- # setting up things like rowcount
-
- def keys(self):
- """Return the list of string keys that would represented by each
- :class:`.Row`."""
-
- if self._metadata:
- return self._metadata.keys
- else:
- return []
+ @_generative
+ def _column_slices(self, indexes):
+ self._metadata = self._metadata._reduce(indexes)
def _getter(self, key, raiseerr=True):
- try:
- getter = self._metadata._getter
- except AttributeError as err:
- return self.cursor_strategy._non_result(None, err)
- else:
- return getter(key, raiseerr)
-
- def _tuple_getter(self, key, raiseerr=True):
- try:
- getter = self._metadata._tuple_getter
- except AttributeError as err:
- return self.cursor_strategy._non_result(None, err)
- else:
- return getter(key, raiseerr)
-
- def _has_key(self, key):
- try:
- has_key = self._metadata._has_key
- except AttributeError as err:
- return self.cursor_strategy._non_result(None, err)
- else:
- return has_key(key)
-
- def _soft_close(self, hard=False):
- """Soft close this :class:`_engine.ResultProxy`.
+ """return a callable that will retrieve the given key from a
+ :class:`.Row`.
- This releases all DBAPI cursor resources, but leaves the
- ResultProxy "open" from a semantic perspective, meaning the
- fetchXXX() methods will continue to return empty results.
-
- This method is called automatically when:
+ """
+ return self._metadata._getter(key, raiseerr)
- * all result rows are exhausted using the fetchXXX() methods.
- * cursor.description is None.
+ def _tuple_getter(self, keys):
+ """return a callable that will retrieve the given keys from a
+ :class:`.Row`.
- This method is **not public**, but is documented in order to clarify
- the "autoclose" process used.
+ """
+ return self._metadata._row_as_tuple_getter(keys)
- .. versionadded:: 1.0.0
+ @_generative
+ def mappings(self):
+ """Apply a mappings filter to returned rows.
- .. seealso::
+ When this filter is applied, fetching rows will return
+ :class:`.RowMapping` objects instead of :class:`.Row` objects.
- :meth:`_engine.ResultProxy.close`
+ This filter cancels out other filters that may be established such
+ as that of :meth:`_engine.Result.scalars`.
+ .. versionadded:: 1.4
+ :return: this :class:`._engine.Result` object with modifications.
"""
+ self._post_creational_filter = operator.attrgetter("_mapping")
+ self._no_scalar_onerow = False
- if (not hard and self._soft_closed) or (hard and self.closed):
- return
+ def _row_getter(self):
+ process_row = self._process_row
+ metadata = self._metadata
- if hard:
- self.closed = True
- self.cursor_strategy.hard_close(self)
- else:
- self.cursor_strategy.soft_close(self)
-
- if not self._soft_closed:
- cursor = self.cursor
- self.cursor = None
- self.connection._safe_close_cursor(cursor)
- self._soft_closed = True
-
- @util.memoized_property
- def inserted_primary_key(self):
- """Return the primary key for the row just inserted.
-
- The return value is a list of scalar values
- corresponding to the list of primary key columns
- in the target table.
-
- This only applies to single row :func:`_expression.insert`
- constructs which did not explicitly specify
- :meth:`_expression.Insert.returning`.
-
- Note that primary key columns which specify a
- server_default clause,
- or otherwise do not qualify as "autoincrement"
- columns (see the notes at :class:`_schema.Column`), and were
- generated using the database-side default, will
- appear in this list as ``None`` unless the backend
- supports "returning" and the insert statement executed
- with the "implicit returning" enabled.
-
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() construct.
+ keymap = metadata._keymap
+ processors = metadata._processors
+ tf = metadata._tuplefilter
- """
+ if tf:
+ processors = tf(processors)
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() " "expression construct."
+ _make_row_orig = functools.partial(
+ process_row, metadata, processors, keymap
)
- elif self.context._is_explicit_returning:
- raise exc.InvalidRequestError(
- "Can't call inserted_primary_key "
- "when returning() "
- "is used."
- )
-
- return self.context.inserted_primary_key
- def last_updated_params(self):
- """Return the collection of updated parameters from this
- execution.
+ def make_row(row):
+ return _make_row_orig(tf(row))
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an update() construct.
-
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an update() " "expression construct."
- )
- elif self.context.executemany:
- return self.context.compiled_parameters
else:
- return self.context.compiled_parameters[0]
-
- def last_inserted_params(self):
- """Return the collection of inserted parameters from this
- execution.
+ make_row = functools.partial(
+ process_row, metadata, processors, keymap
+ )
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() construct.
+ fns = ()
- """
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() " "expression construct."
- )
- elif self.context.executemany:
- return self.context.compiled_parameters
+ if self._row_logging_fn:
+ fns = (self._row_logging_fn,)
else:
- return self.context.compiled_parameters[0]
+ fns = ()
- @property
- def returned_defaults(self):
- """Return the values of default columns that were fetched using
- the :meth:`.ValuesBase.return_defaults` feature.
+ if self._column_slice_filter:
+ fns += (self._column_slice_filter,)
- The value is an instance of :class:`.Row`, or ``None``
- if :meth:`.ValuesBase.return_defaults` was not used or if the
- backend does not support RETURNING.
+ if fns:
+ _make_row = make_row
- .. versionadded:: 0.9.0
+ def make_row(row):
+ row = _make_row(row)
+ for fn in fns:
+ row = fn(row)
+ return row
- .. seealso::
+ return make_row
- :meth:`.ValuesBase.return_defaults`
+ def _raw_row_iterator(self):
+ """Return a safe iterator that yields raw row data.
+
+ This is used by the :meth:`._engine.Result.merge` method
+ to merge multiple compatible results together.
"""
- return self.context.returned_defaults
+ raise NotImplementedError()
- def lastrow_has_defaults(self):
- """Return ``lastrow_has_defaults()`` from the underlying
- :class:`.ExecutionContext`.
+ def freeze(self):
+ """Return a callable object that will produce copies of this
+ :class:`.Result` when invoked.
- See :class:`.ExecutionContext` for details.
+ This is used for result set caching. The method must be called
+ on the result when it has been unconsumed, and calling the method
+ will consume the result fully.
"""
+ return FrozenResult(self)
- return self.context.lastrow_has_defaults()
+ def merge(self, *others):
+ """Merge this :class:`.Result` with other compatible result
+ objects.
- def postfetch_cols(self):
- """Return ``postfetch_cols()`` from the underlying
- :class:`.ExecutionContext`.
+ The object returned is an instance of :class:`.MergedResult`,
+ which will be composed of iterators from the given result
+ objects.
- See :class:`.ExecutionContext` for details.
-
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() or update() construct.
+ The new result will use the metadata from this result object.
+ The subsequent result objects must be against an identical
+ set of result / cursor metadata, otherwise the behavior is
+ undefined.
"""
+ return MergedResult(self._metadata, (self,) + others)
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isinsert and not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an insert() or update() "
- "expression construct."
- )
- return self.context.postfetch_cols
+ @HasMemoized.memoized_attribute
+ def _iterator_getter(self):
- def prefetch_cols(self):
- """Return ``prefetch_cols()`` from the underlying
- :class:`.ExecutionContext`.
+ make_row = self._row_getter()
- See :class:`.ExecutionContext` for details.
+ post_creational_filter = self._post_creational_filter
- Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
- statement is not a compiled expression construct
- or is not an insert() or update() construct.
+ if self._unique_filter_state:
+ uniques, strategy = self._unique_strategy
- """
+ def iterrows(self):
+ for row in self._fetchiter_impl():
+ obj = make_row(row)
+ hashed = strategy(obj) if strategy else obj
+ if hashed in uniques:
+ continue
+ uniques.add(hashed)
+ if post_creational_filter:
+ obj = post_creational_filter(obj)
+ yield obj
- if not self.context.compiled:
- raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isinsert and not self.context.isupdate:
- raise exc.InvalidRequestError(
- "Statement is not an insert() or update() "
- "expression construct."
- )
- return self.context.prefetch_cols
+ else:
- def supports_sane_rowcount(self):
- """Return ``supports_sane_rowcount`` from the dialect.
+ def iterrows(self):
+ for row in self._fetchiter_impl():
+ row = make_row(row)
+ if post_creational_filter:
+ row = post_creational_filter(row)
+ yield row
- See :attr:`_engine.ResultProxy.rowcount` for background.
+ return iterrows
- """
+ @HasMemoized.memoized_attribute
+ def _allrow_getter(self):
- return self.dialect.supports_sane_rowcount
+ make_row = self._row_getter()
- def supports_sane_multi_rowcount(self):
- """Return ``supports_sane_multi_rowcount`` from the dialect.
+ post_creational_filter = self._post_creational_filter
- See :attr:`_engine.ResultProxy.rowcount` for background.
+ if self._unique_filter_state:
+ uniques, strategy = self._unique_strategy
- """
+ def allrows(self):
+ rows = self._fetchall_impl()
+ rows = [
+ made_row
+ for made_row, sig_row in [
+ (
+ made_row,
+ strategy(made_row) if strategy else made_row,
+ )
+ for made_row in [make_row(row) for row in rows]
+ ]
+ if sig_row not in uniques and not uniques.add(sig_row)
+ ]
- return self.dialect.supports_sane_multi_rowcount
+ if post_creational_filter:
+ rows = [post_creational_filter(row) for row in rows]
+ return rows
- @util.memoized_property
- def rowcount(self):
- """Return the 'rowcount' for this result.
+ else:
- The 'rowcount' reports the number of rows *matched*
- by the WHERE criterion of an UPDATE or DELETE statement.
+ def allrows(self):
+ rows = self._fetchall_impl()
+ if post_creational_filter:
+ rows = [
+ post_creational_filter(make_row(row)) for row in rows
+ ]
+ else:
+ rows = [make_row(row) for row in rows]
+ return rows
+
+ return allrows
+
+ @HasMemoized.memoized_attribute
+ def _onerow_getter(self):
+ make_row = self._row_getter()
+
+ # TODO: this is a lot for results that are only one row.
+ # all of this could be in _only_one_row except for fetchone()
+ # and maybe __next__
+
+ post_creational_filter = self._post_creational_filter
+
+ if self._unique_filter_state:
+ uniques, strategy = self._unique_strategy
+
+ def onerow(self):
+ _onerow = self._fetchone_impl
+ while True:
+ row = _onerow()
+ if row is None:
+ return _NO_ROW
+ else:
+ obj = make_row(row)
+ hashed = strategy(obj) if strategy else obj
+ if hashed in uniques:
+ continue
+ else:
+ uniques.add(hashed)
+ if post_creational_filter:
+ obj = post_creational_filter(obj)
+ return obj
- .. note::
+ else:
- Notes regarding :attr:`_engine.ResultProxy.rowcount`:
+ def onerow(self):
+ row = self._fetchone_impl()
+ if row is None:
+ return _NO_ROW
+ else:
+ row = make_row(row)
+ if post_creational_filter:
+ row = post_creational_filter(row)
+ return row
+
+ return onerow
+
+ @HasMemoized.memoized_attribute
+ def _manyrow_getter(self):
+ make_row = self._row_getter()
+
+ post_creational_filter = self._post_creational_filter
+
+ if self._unique_filter_state:
+ uniques, strategy = self._unique_strategy
+
+ def filterrows(make_row, rows, strategy, uniques):
+ return [
+ made_row
+ for made_row, sig_row in [
+ (
+ made_row,
+ strategy(made_row) if strategy else made_row,
+ )
+ for made_row in [make_row(row) for row in rows]
+ ]
+ if sig_row not in uniques and not uniques.add(sig_row)
+ ]
+ def manyrows(self, num):
+ collect = []
+
+ _manyrows = self._fetchmany_impl
+
+ if num is None:
+ # if None is passed, we don't know the default
+ # manyrows number, DBAPI has this as cursor.arraysize
+ # different DBAPIs / fetch strategies may be different.
+ # do a fetch to find what the number is. if there are
+ # only fewer rows left, then it doesn't matter.
+ if self._yield_per:
+ num_required = num = self._yield_per
+ else:
+ rows = _manyrows(num)
+ num = len(rows)
+ collect.extend(
+ filterrows(make_row, rows, strategy, uniques)
+ )
+ num_required = num - len(collect)
+ else:
+ num_required = num
- * This attribute returns the number of rows *matched*,
- which is not necessarily the same as the number of rows
- that were actually *modified* - an UPDATE statement, for example,
- may have no net change on a given row if the SET values
- given are the same as those present in the row already.
- Such a row would be matched but not modified.
- On backends that feature both styles, such as MySQL,
- rowcount is configured by default to return the match
- count in all cases.
+ while num_required:
+ rows = _manyrows(num_required)
+ if not rows:
+ break
- * :attr:`_engine.ResultProxy.rowcount`
- is *only* useful in conjunction
- with an UPDATE or DELETE statement. Contrary to what the Python
- DBAPI says, it does *not* return the
- number of rows available from the results of a SELECT statement
- as DBAPIs cannot support this functionality when rows are
- unbuffered.
+ collect.extend(
+ filterrows(make_row, rows, strategy, uniques)
+ )
+ num_required = num - len(collect)
- * :attr:`_engine.ResultProxy.rowcount`
- may not be fully implemented by
- all dialects. In particular, most DBAPIs do not support an
- aggregate rowcount result from an executemany call.
- The :meth:`_engine.ResultProxy.supports_sane_rowcount` and
- :meth:`_engine.ResultProxy.supports_sane_multi_rowcount` methods
- will report from the dialect if each usage is known to be
- supported.
+ if post_creational_filter:
+ collect = [post_creational_filter(row) for row in collect]
+ return collect
- * Statements that use RETURNING may not return a correct
- rowcount.
+ else:
- """
- try:
- return self.context.rowcount
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
+ def manyrows(self, num):
+ if num is None:
+ num = self._yield_per
- @property
- def lastrowid(self):
- """return the 'lastrowid' accessor on the DBAPI cursor.
+ rows = self._fetchmany_impl(num)
+ rows = [make_row(row) for row in rows]
+ if post_creational_filter:
+ rows = [post_creational_filter(row) for row in rows]
+ return rows
- This is a DBAPI specific method and is only functional
- for those backends which support it, for statements
- where it is appropriate. It's behavior is not
- consistent across backends.
+ return manyrows
- Usage of this method is normally unnecessary when
- using insert() expression constructs; the
- :attr:`~ResultProxy.inserted_primary_key` attribute provides a
- tuple of primary key values for a newly inserted row,
- regardless of database backend.
+ def _fetchiter_impl(self):
+ raise NotImplementedError()
- """
- try:
- return self.context.get_lastrowid()
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
+ def _fetchone_impl(self):
+ raise NotImplementedError()
- @property
- def returns_rows(self):
- """True if this :class:`_engine.ResultProxy` returns rows.
+ def _fetchall_impl(self):
+ raise NotImplementedError()
- I.e. if it is legal to call the methods
- :meth:`_engine.ResultProxy.fetchone`,
- :meth:`_engine.ResultProxy.fetchmany`
- :meth:`_engine.ResultProxy.fetchall`.
+ def _fetchmany_impl(self, size=None):
+ raise NotImplementedError()
- """
- return self._metadata is not None
+ def __iter__(self):
+ return self._iterator_getter(self)
- @property
- def is_insert(self):
- """True if this :class:`_engine.ResultProxy` is the result
- of a executing an expression language compiled
- :func:`_expression.insert` construct.
+ def __next__(self):
+ row = self._onerow_getter(self)
+ if row is _NO_ROW:
+ raise StopIteration()
+ else:
+ return row
- When True, this implies that the
- :attr:`inserted_primary_key` attribute is accessible,
- assuming the statement did not include
- a user defined "returning" construct.
+ next = __next__
- """
- return self.context.isinsert
+ def fetchall(self):
+ """A synonym for the :meth:`_engine.Result.all` method."""
+ return self._allrow_getter(self)
-class ResultProxy(BaseResult):
- """A facade around a DBAPI cursor object.
+ def fetchone(self):
+ """Fetch one row.
- Returns database rows via the :class:`.Row` class, which provides
- additional API features and behaviors on top of the raw data returned
- by the DBAPI.
+ When all rows are exhausted, returns None.
- Within the scope of the 1.x series of SQLAlchemy, the
- :class:`_engine.ResultProxy`
- will in fact return instances of the :class:`.LegacyRow` class, which
- maintains Python mapping (i.e. dictionary) like behaviors upon the object
- itself. Going forward, the :attr:`.Row._mapping` attribute should be used
- for dictionary behaviors.
+ .. note:: This method is not compatible with the
+ :meth:`_result.Result.scalars`
+ filter, as there is no way to distinguish between a data value of
+ None and the ending value. Prefer to use iterative / collection
+ methods which support scalar None values.
- .. seealso::
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
- :ref:`coretutorial_selecting` - introductory material for accessing
- :class:`_engine.ResultProxy` and :class:`.Row` objects.
+ To fetch the first row of a result only, use the
+ :meth:`_engine.Result.first` method. To iterate through all
+ rows, iterate the :class:`_engine.Result` object directly.
- """
+ :return: a :class:`.Row` object if no filters are applied, or None
+ if no rows remain.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
- _autoclose_connection = False
- _process_row = LegacyRow
- _cursor_metadata = LegacyCursorResultMetaData
- _cursor_strategy_cls = DefaultCursorFetchStrategy
+ """
+ if self._no_scalar_onerow:
+ raise exc.InvalidRequestError(
+ "Can't use fetchone() when returning scalar values; there's "
+ "no way to distinguish between end of results and None"
+ )
+ row = self._onerow_getter(self)
+ if row is _NO_ROW:
+ return None
+ else:
+ return row
- def __iter__(self):
- """Implement iteration protocol."""
+ def fetchmany(self, size=None):
+ """Fetch many rows.
- while True:
- row = self.fetchone()
- if row is None:
- return
- else:
- yield row
+ When all rows are exhausted, returns an empty list.
- def close(self):
- """Close this :class:`_engine.ResultProxy`.
-
- This closes out the underlying DBAPI cursor corresponding
- to the statement execution, if one is still present. Note that the
- DBAPI cursor is automatically released when the
- :class:`_engine.ResultProxy`
- exhausts all available rows. :meth:`_engine.ResultProxy.close`
- is generally
- an optional method except in the case when discarding a
- :class:`_engine.ResultProxy`
- that still has additional rows pending for fetch.
-
- In the case of a result that is the product of
- :ref:`connectionless execution <dbengine_implicit>`,
- the underlying :class:`_engine.Connection` object is also closed,
- which
- :term:`releases` DBAPI connection resources.
-
- .. deprecated:: 2.0 "connectionless" execution is deprecated and will
- be removed in version 2.0. Version 2.0 will feature the
- :class:`_future.Result`
- object that will no longer affect the status
- of the originating connection in any case.
-
- After this method is called, it is no longer valid to call upon
- the fetch methods, which will raise a :class:`.ResourceClosedError`
- on subsequent use.
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
- .. seealso::
+ To fetch rows in groups, use the :meth:`._result.Result.partitions`
+ method.
- :ref:`connections_toplevel`
+ :return: a list of :class:`.Row` objects if no filters are applied.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
"""
- self._soft_close(hard=True)
+ return self._manyrow_getter(self, size)
- def _soft_close(self, hard=False):
- soft_closed = self._soft_closed
- super(ResultProxy, self)._soft_close(hard=hard)
- if (
- not soft_closed
- and self._soft_closed
- and self._autoclose_connection
- ):
- self.connection.close()
+ def all(self):
+ """Return all rows in a list.
- def __next__(self):
- """Implement the Python next() protocol.
+ Closes the result set after invocation. Subsequent invocations
+ will return an empty list.
- This method, mirrored as both ``.next()`` and ``.__next__()``, is part
- of Python's API for producing iterator-like behavior.
+ .. versionadded:: 1.4
- .. versionadded:: 1.2
+ :return: a list of :class:`.Row` objects if no filters are applied.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
"""
- row = self.fetchone()
- if row is None:
- raise StopIteration()
+ return self._allrow_getter(self)
+
+ def _only_one_row(self, raise_for_second_row, raise_for_none):
+ row = self._onerow_getter(self)
+ if row is _NO_ROW:
+ if raise_for_none:
+ self._soft_close(hard=True)
+ raise exc.NoResultFound(
+ "No row was found when one was required"
+ )
+ else:
+ return None
else:
- return row
+ if raise_for_second_row:
+ next_row = self._onerow_getter(self)
+ else:
+ next_row = _NO_ROW
+ self._soft_close(hard=True)
+ if next_row is not _NO_ROW:
+ raise exc.MultipleResultsFound(
+ "Multiple rows were found when exactly one was required"
+ if raise_for_none
+ else "Multiple rows were found when one or none "
+ "was required"
+ )
+ else:
+ return row
- next = __next__
+ def first(self):
+ """Fetch the first row or None if no row is present.
- def process_rows(self, rows):
- process_row = self._process_row
- metadata = self._metadata
- keymap = metadata._keymap
- processors = metadata._processors
+ Closes the result set and discards remaining rows.
- if self._echo:
- log = self.context.engine.logger.debug
- l = []
- for row in rows:
- log("Row %r", sql_util._repr_row(row))
- l.append(process_row(metadata, processors, keymap, row))
- return l
- else:
- return [
- process_row(metadata, processors, keymap, row) for row in rows
- ]
+ .. comment: A warning is emitted if additional rows remain.
- def fetchall(self):
- """Fetch all rows, just like DB-API ``cursor.fetchall()``.
+ :return: a :class:`.Row` object if no filters are applied, or None
+ if no rows remain.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
- After all rows have been exhausted, the underlying DBAPI
- cursor resource is released, and the object may be safely
- discarded.
+ """
+ return self._only_one_row(False, False)
- Subsequent calls to :meth:`_engine.ResultProxy.fetchall` will return
- an empty list. After the :meth:`_engine.ResultProxy.close` method is
- called, the method will raise :class:`.ResourceClosedError`.
+ def one_or_none(self):
+ """Return at most one result or raise an exception.
- :return: a list of :class:`.Row` objects
+ Returns ``None`` if the result has no rows.
+ Raises :class:`.MultipleResultsFound`
+ if multiple rows are returned.
- """
+ .. versionadded:: 1.4
- try:
- l = self.process_rows(self.cursor_strategy.fetchall())
- self._soft_close()
- return l
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
+ :return: The first :class:`.Row` or None if no row is available.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
- def fetchmany(self, size=None):
- """Fetch many rows, just like DB-API
- ``cursor.fetchmany(size=cursor.arraysize)``.
+ :raises: :class:`.MultipleResultsFound`
- After all rows have been exhausted, the underlying DBAPI
- cursor resource is released, and the object may be safely
- discarded.
+ .. seealso::
- Calls to :meth:`_engine.ResultProxy.fetchmany`
- after all rows have been
- exhausted will return
- an empty list. After the :meth:`_engine.ResultProxy.close` method is
- called, the method will raise :class:`.ResourceClosedError`.
+ :meth:`_result.Result.first`
- :return: a list of :class:`.Row` objects
+ :meth:`_result.Result.one`
"""
+ return self._only_one_row(True, False)
- try:
- l = self.process_rows(self.cursor_strategy.fetchmany(size))
- if len(l) == 0:
- self._soft_close()
- return l
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
+ def one(self):
+ """Return exactly one result or raise an exception.
- def _onerow(self):
- return self.fetchone()
+ Raises :class:`.NoResultFound` if the result returns no
+ rows, or :class:`.MultipleResultsFound` if multiple rows
+ would be returned.
- def fetchone(self):
- """Fetch one row, just like DB-API ``cursor.fetchone()``.
+ .. versionadded:: 1.4
- After all rows have been exhausted, the underlying DBAPI
- cursor resource is released, and the object may be safely
- discarded.
+ :return: The first :class:`.Row`.
+ When filters are applied, such as :meth:`_engine.Result.mappings`
+ or :meth:`._engine.Result.scalar`, different kinds of objects
+ may be returned.
- Calls to :meth:`_engine.ResultProxy.fetchone` after all rows have
- been exhausted will return ``None``.
- After the :meth:`_engine.ResultProxy.close` method is
- called, the method will raise :class:`.ResourceClosedError`.
+ :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
- :return: a :class:`.Row` object, or None if no rows remain
-
- """
- try:
- row = self.cursor_strategy.fetchone()
- if row is not None:
- return self.process_rows([row])[0]
- else:
- self._soft_close()
- return None
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
-
- def first(self):
- """Fetch the first row and then close the result set unconditionally.
+ .. seealso::
- After calling this method, the object is fully closed,
- e.g. the :meth:`_engine.ResultProxy.close`
- method will have been called.
+ :meth:`_result.Result.first`
- :return: a :class:`.Row` object, or None if no rows remain
+ :meth:`_result.Result.one_or_none`
"""
- try:
- row = self.cursor_strategy.fetchone()
- except BaseException as e:
- self.connection._handle_dbapi_exception(
- e, None, None, self.cursor, self.context
- )
-
- try:
- if row is not None:
- return self.process_rows([row])[0]
- else:
- return None
- finally:
- self.close()
+ return self._only_one_row(True, True)
def scalar(self):
"""Fetch the first column of the first row, and close the result set.
After calling this method, the object is fully closed,
- e.g. the :meth:`_engine.ResultProxy.close`
+ e.g. the :meth:`_engine.CursorResult.close`
method will have been called.
:return: a Python scalar value , or None if no rows remain
@@ -1764,38 +1008,94 @@ class ResultProxy(BaseResult):
return None
-class BufferedRowResultProxy(ResultProxy):
- """A ResultProxy with row buffering behavior.
+class FrozenResult(object):
+ def __init__(self, result):
+ self.metadata = result._metadata._for_freeze()
+ self._post_creational_filter = result._post_creational_filter
+ result._post_creational_filter = None
- .. deprecated:: 1.4 this class is now supplied using a strategy object.
- See :class:`.BufferedRowCursorFetchStrategy`.
+ self.data = result.fetchall()
- """
+ def with_data(self, data):
+ fr = FrozenResult.__new__(FrozenResult)
+ fr.metadata = self.metadata
+ fr._post_creational_filter = self._post_creational_filter
+ fr.data = data
+ return fr
- _cursor_strategy_cls = BufferedRowCursorFetchStrategy
+ def __call__(self):
+ result = IteratorResult(self.metadata, iter(self.data))
+ result._post_creational_filter = self._post_creational_filter
+ return result
-class FullyBufferedResultProxy(ResultProxy):
- """A result proxy that buffers rows fully upon creation.
+class IteratorResult(Result):
+ def __init__(self, cursor_metadata, iterator):
+ self._metadata = cursor_metadata
+ self.iterator = iterator
- .. deprecated:: 1.4 this class is now supplied using a strategy object.
- See :class:`.FullyBufferedCursorFetchStrategy`.
+ def _soft_close(self, **kw):
+ self.iterator = iter([])
- """
+ def _raw_row_iterator(self):
+ return self.iterator
- _cursor_strategy_cls = FullyBufferedCursorFetchStrategy
+ def _fetchiter_impl(self):
+ return self.iterator
+ def _fetchone_impl(self):
+ try:
+ return next(self.iterator)
+ except StopIteration:
+ self._soft_close()
+ return None
-class BufferedColumnRow(LegacyRow):
- """Row is now BufferedColumn in all cases"""
+ def _fetchall_impl(self):
+ try:
+ return list(self.iterator)
+ finally:
+ self._soft_close()
+ def _fetchmany_impl(self, size=None):
+ return list(itertools.islice(self.iterator, 0, size))
-class BufferedColumnResultProxy(ResultProxy):
- """A ResultProxy with column buffering behavior.
- .. versionchanged:: 1.4 This is now the default behavior of the Row
- and this class does not change behavior in any way.
+class ChunkedIteratorResult(IteratorResult):
+ def __init__(self, cursor_metadata, chunks):
+ self._metadata = cursor_metadata
+ self.chunks = chunks
- """
+ self.iterator = itertools.chain.from_iterable(self.chunks(None))
+
+ @_generative
+ def yield_per(self, num):
+ self._yield_per = num
+ self.iterator = itertools.chain.from_iterable(self.chunks(num))
- _process_row = BufferedColumnRow
+
+class MergedResult(IteratorResult):
+ closed = False
+
+ def __init__(self, cursor_metadata, results):
+ self._results = results
+ super(MergedResult, self).__init__(
+ cursor_metadata,
+ itertools.chain.from_iterable(
+ r._raw_row_iterator() for r in results
+ ),
+ )
+
+ self._unique_filter_state = results[0]._unique_filter_state
+ self._post_creational_filter = results[0]._post_creational_filter
+ self._no_scalar_onerow = results[0]._no_scalar_onerow
+ self._yield_per = results[0]._yield_per
+
+ def close(self):
+ self._soft_close(hard=True)
+
+ def _soft_close(self, hard=False):
+ for r in self._results:
+ r._soft_close(hard=hard)
+
+ if hard:
+ self.closed = True