summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/cursor.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-08-07 12:14:19 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-09-24 11:18:01 -0400
commita8029f5a7e3e376ec57f1614ab0294b717d53c05 (patch)
tree84b1a3b3a6d3f4c9d6e8054f9cdfa190344436cb /lib/sqlalchemy/engine/cursor.py
parent2bcc97da424eef7db9a5d02f81d02344925415ee (diff)
downloadsqlalchemy-a8029f5a7e3e376ec57f1614ab0294b717d53c05.tar.gz
ORM bulk insert via execute
* ORM Insert now includes "bulk" mode that will run essentially the same process as session.bulk_insert_mappings; interprets the given list of values as ORM attributes for key names * ORM UPDATE has a similar feature, without RETURNING support, for session.bulk_update_mappings * Added support for upserts to do RETURNING ORM objects as well * ORM UPDATE/DELETE with list of parameters + WHERE criteria is a not implemented; use connection * ORM UPDATE/DELETE defaults to "auto" synchronize_session; use fetch if RETURNING is present, evaluate if not, as "fetch" is much more efficient (no expired object SELECT problem) and less error prone if RETURNING is available UPDATE: howver this is inefficient! please continue to use evaluate for simple cases, auto can move to fetch if criteria not evaluable * "Evaluate" criteria will now not preemptively unexpire and SELECT attributes that were individually expired. Instead, if evaluation of the criteria indicates that the necessary attrs were expired, we expire the object completely (delete) or expire the SET attrs unconditionally (update). This keeps the object in the same unloaded state where it will refresh those attrs on the next pass, for this generally unusual case. (originally #5664) * Core change! update/delete rowcount comes from len(rows) if RETURNING was used. SQLite at least otherwise did not support this. adjusted test_rowcount accordingly * ORM DELETE with a list of parameters at all is also a not implemented as this would imply "bulk", and there is no bulk_delete_mappings (could be, but we dont have that) * ORM insert().values() with single or multi-values translates key names based on ORM attribute names * ORM returning() implemented for insert, update, delete; explcit returning clauses now interpret rows in an ORM context, with support for qualifying loader options as well * session.bulk_insert_mappings() assigns polymorphic identity if not set. * explicit RETURNING + synchronize_session='fetch' is now supported with UPDATE and DELETE. * expanded return_defaults() to work with DELETE also. * added support for composite attributes to be present in the dictionaries used by bulk_insert_mappings and bulk_update_mappings, which is also the new ORM bulk insert/update feature, that will expand the composite values into their individual mapped attributes the way they'd be on a mapped instance. * bulk UPDATE supports "synchronize_session=evaluate", is the default. this does not apply to session.bulk_update_mappings, just the new version * both bulk UPDATE and bulk INSERT, the latter with or without RETURNING, support *heterogenous* parameter sets. session.bulk_insert/update_mappings did this, so this feature is maintained. now cursor result can be both horizontally and vertically spliced :) This is now a long story with a lot of options, which in itself is a problem to be able to document all of this in some way that makes sense. raising exceptions for use cases we haven't supported is pretty important here too, the tradition of letting unsupported things just not work is likely not a good idea at this point, though there are still many cases that aren't easily avoidable Fixes: #8360 Fixes: #7864 Fixes: #7865 Change-Id: Idf28379f8705e403a3c6a937f6a798a042ef2540
Diffstat (limited to 'lib/sqlalchemy/engine/cursor.py')
-rw-r--r--lib/sqlalchemy/engine/cursor.py390
1 files changed, 336 insertions, 54 deletions
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index 8840b5916..07e782296 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -23,12 +23,14 @@ from typing import Iterator
from typing import List
from typing import NoReturn
from typing import Optional
+from typing import overload
from typing import Sequence
from typing import Tuple
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
+from .result import IteratorResult
from .result import MergedResult
from .result import Result
from .result import ResultMetaData
@@ -62,36 +64,80 @@ if typing.TYPE_CHECKING:
from .interfaces import ExecutionContext
from .result import _KeyIndexType
from .result import _KeyMapRecType
+ from .result import _KeyMapType
from .result import _KeyType
from .result import _ProcessorsType
+ from .result import _TupleGetterType
from ..sql.type_api import _ResultProcessorType
_T = TypeVar("_T", bound=Any)
+
# metadata entry tuple indexes.
# using raw tuple is faster than namedtuple.
-MD_INDEX: Literal[0] = 0 # integer index in cursor.description
-MD_RESULT_MAP_INDEX: Literal[
- 1
-] = 1 # integer index in compiled._result_columns
-MD_OBJECTS: Literal[
- 2
-] = 2 # other string keys and ColumnElement obj that can match
-MD_LOOKUP_KEY: Literal[
- 3
-] = 3 # string key we usually expect for key-based lookup
-MD_RENDERED_NAME: Literal[4] = 4 # name that is usually in cursor.description
-MD_PROCESSOR: Literal[5] = 5 # callable to process a result value into a row
-MD_UNTRANSLATED: Literal[6] = 6 # raw name from cursor.description
+# these match up to the positions in
+# _CursorKeyMapRecType
+MD_INDEX: Literal[0] = 0
+"""integer index in cursor.description
+
+"""
+
+MD_RESULT_MAP_INDEX: Literal[1] = 1
+"""integer index in compiled._result_columns"""
+
+MD_OBJECTS: Literal[2] = 2
+"""other string keys and ColumnElement obj that can match.
+
+This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
+
+"""
+
+MD_LOOKUP_KEY: Literal[3] = 3
+"""string key we usually expect for key-based lookup
+
+this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
+"""
+
+
+MD_RENDERED_NAME: Literal[4] = 4
+"""name that is usually in cursor.description
+
+this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
+"""
+
+
+MD_PROCESSOR: Literal[5] = 5
+"""callable to process a result value into a row"""
+
+MD_UNTRANSLATED: Literal[6] = 6
+"""raw name from cursor.description"""
_CursorKeyMapRecType = Tuple[
- int, int, List[Any], str, str, Optional["_ResultProcessorType"], str
+ Optional[int], # MD_INDEX, None means the record is ambiguously named
+ int, # MD_RESULT_MAP_INDEX
+ List[Any], # MD_OBJECTS
+ str, # MD_LOOKUP_KEY
+ str, # MD_RENDERED_NAME
+ Optional["_ResultProcessorType"], # MD_PROCESSOR
+ Optional[str], # MD_UNTRANSLATED
]
_CursorKeyMapType = Dict["_KeyType", _CursorKeyMapRecType]
+# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
+# not None
+_NonAmbigCursorKeyMapRecType = Tuple[
+ int,
+ int,
+ List[Any],
+ str,
+ str,
+ Optional["_ResultProcessorType"],
+ str,
+]
+
class CursorResultMetaData(ResultMetaData):
"""Result metadata for DBAPI cursors."""
@@ -127,38 +173,112 @@ class CursorResultMetaData(ResultMetaData):
extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
)
- def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
- recs = cast(
- "List[_CursorKeyMapRecType]", list(self._metadata_for_keys(keys))
+ def _make_new_metadata(
+ self,
+ *,
+ unpickled: bool,
+ processors: _ProcessorsType,
+ keys: Sequence[str],
+ keymap: _KeyMapType,
+ tuplefilter: Optional[_TupleGetterType],
+ translated_indexes: Optional[List[int]],
+ safe_for_cache: bool,
+ keymap_by_result_column_idx: Any,
+ ) -> CursorResultMetaData:
+ new_obj = self.__class__.__new__(self.__class__)
+ new_obj._unpickled = unpickled
+ new_obj._processors = processors
+ new_obj._keys = keys
+ new_obj._keymap = keymap
+ new_obj._tuplefilter = tuplefilter
+ new_obj._translated_indexes = translated_indexes
+ new_obj._safe_for_cache = safe_for_cache
+ new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
+ return new_obj
+
+ def _remove_processors(self) -> CursorResultMetaData:
+ assert not self._tuplefilter
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=[None] * len(self._processors),
+ tuplefilter=None,
+ translated_indexes=None,
+ keymap={
+ key: value[0:5] + (None,) + value[6:]
+ for key, value in self._keymap.items()
+ },
+ keys=self._keys,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
)
+ def _splice_horizontally(
+ self, other: CursorResultMetaData
+ ) -> CursorResultMetaData:
+
+ assert not self._tuplefilter
+
+ keymap = self._keymap.copy()
+ offset = len(self._keys)
+ keymap.update(
+ {
+ key: (
+ # int index should be None for ambiguous key
+ value[0] + offset
+ if value[0] is not None and key not in keymap
+ else None,
+ value[1] + offset,
+ *value[2:],
+ )
+ for key, value in other._keymap.items()
+ }
+ )
+
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=self._processors + other._processors, # type: ignore
+ tuplefilter=None,
+ translated_indexes=None,
+ keys=self._keys + other._keys, # type: ignore
+ keymap=keymap,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx={
+ metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
+ for metadata_entry in keymap.values()
+ },
+ )
+
+ def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
+ recs = list(self._metadata_for_keys(keys))
+
indexes = [rec[MD_INDEX] for rec in recs]
new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
if self._translated_indexes:
indexes = [self._translated_indexes[idx] for idx in indexes]
tup = tuplegetter(*indexes)
-
- new_metadata = self.__class__.__new__(self.__class__)
- new_metadata._unpickled = self._unpickled
- new_metadata._processors = self._processors
- new_metadata._keys = new_keys
- new_metadata._tuplefilter = tup
- new_metadata._translated_indexes = indexes
-
new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
- new_metadata._keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
+ keymap: _KeyMapType = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
# TODO: need unit test for:
# result = connection.execute("raw sql, no columns").scalars()
# without the "or ()" it's failing because MD_OBJECTS is None
- new_metadata._keymap.update(
+ keymap.update(
(e, new_rec)
for new_rec in new_recs
for e in new_rec[MD_OBJECTS] or ()
)
- return new_metadata
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=self._processors,
+ keys=new_keys,
+ tuplefilter=tup,
+ translated_indexes=indexes,
+ keymap=keymap,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
+ )
def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
"""When using a cached Compiled construct that has a _result_map,
@@ -168,6 +288,7 @@ class CursorResultMetaData(ResultMetaData):
as matched to those of the cached statement.
"""
+
if not context.compiled or not context.compiled._result_columns:
return self
@@ -189,7 +310,6 @@ class CursorResultMetaData(ResultMetaData):
# make a copy and add the columns from the invoked statement
# to the result map.
- md = self.__class__.__new__(self.__class__)
keymap_by_position = self._keymap_by_result_column_idx
@@ -201,26 +321,26 @@ class CursorResultMetaData(ResultMetaData):
for metadata_entry in self._keymap.values()
}
- md._keymap = compat.dict_union(
- self._keymap,
- {
- new: keymap_by_position[idx]
- for idx, new in enumerate(
- invoked_statement._all_selected_columns
- )
- if idx in keymap_by_position
- },
- )
-
- md._unpickled = self._unpickled
- md._processors = self._processors
assert not self._tuplefilter
- md._tuplefilter = None
- md._translated_indexes = None
- md._keys = self._keys
- md._keymap_by_result_column_idx = self._keymap_by_result_column_idx
- md._safe_for_cache = self._safe_for_cache
- return md
+ return self._make_new_metadata(
+ keymap=compat.dict_union(
+ self._keymap,
+ {
+ new: keymap_by_position[idx]
+ for idx, new in enumerate(
+ invoked_statement._all_selected_columns
+ )
+ if idx in keymap_by_position
+ },
+ ),
+ unpickled=self._unpickled,
+ processors=self._processors,
+ tuplefilter=None,
+ translated_indexes=None,
+ keys=self._keys,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
+ )
def __init__(
self,
@@ -683,7 +803,27 @@ class CursorResultMetaData(ResultMetaData):
untranslated,
)
- def _key_fallback(self, key, err, raiseerr=True):
+ @overload
+ def _key_fallback(
+ self, key: Any, err: Exception, raiseerr: Literal[True] = ...
+ ) -> NoReturn:
+ ...
+
+ @overload
+ def _key_fallback(
+ self, key: Any, err: Exception, raiseerr: Literal[False] = ...
+ ) -> None:
+ ...
+
+ @overload
+ def _key_fallback(
+ self, key: Any, err: Exception, raiseerr: bool = ...
+ ) -> Optional[NoReturn]:
+ ...
+
+ def _key_fallback(
+ self, key: Any, err: Exception, raiseerr: bool = True
+ ) -> Optional[NoReturn]:
if raiseerr:
if self._unpickled and isinstance(key, elements.ColumnElement):
@@ -714,9 +854,9 @@ class CursorResultMetaData(ResultMetaData):
try:
rec = self._keymap[key]
except KeyError as ke:
- rec = self._key_fallback(key, ke, raiseerr)
- if rec is None:
- return None
+ x = self._key_fallback(key, ke, raiseerr)
+ assert x is None
+ return None
index = rec[0]
@@ -734,7 +874,7 @@ class CursorResultMetaData(ResultMetaData):
def _metadata_for_keys(
self, keys: Sequence[Any]
- ) -> Iterator[_CursorKeyMapRecType]:
+ ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
for key in keys:
if int in key.__class__.__mro__:
key = self._keys[key]
@@ -750,7 +890,7 @@ class CursorResultMetaData(ResultMetaData):
if index is None:
self._raise_for_ambiguous_column_name(rec)
- yield rec
+ yield cast(_NonAmbigCursorKeyMapRecType, rec)
def __getstate__(self):
return {
@@ -1237,6 +1377,12 @@ _NO_RESULT_METADATA = _NoResultMetaData()
SelfCursorResult = TypeVar("SelfCursorResult", bound="CursorResult[Any]")
+def null_dml_result() -> IteratorResult[Any]:
+ it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
+ it._soft_close()
+ return it
+
+
class CursorResult(Result[_T]):
"""A Result that is representing state from a DBAPI cursor.
@@ -1586,6 +1732,142 @@ class CursorResult(Result[_T]):
"""
return self.context.returned_default_rows
+ def splice_horizontally(self, other):
+ """Return a new :class:`.CursorResult` that "horizontally splices"
+ together the rows of this :class:`.CursorResult` with that of another
+ :class:`.CursorResult`.
+
+ .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
+ not intended for general use.
+
+ "horizontally splices" means that for each row in the first and second
+ result sets, a new row that concatenates the two rows together is
+ produced, which then becomes the new row. The incoming
+ :class:`.CursorResult` must have the identical number of rows. It is
+ typically expected that the two result sets come from the same sort
+ order as well, as the result rows are spliced together based on their
+ position in the result.
+
+ The expected use case here is so that multiple INSERT..RETURNING
+ statements against different tables can produce a single result
+ that looks like a JOIN of those two tables.
+
+ E.g.::
+
+ r1 = connection.execute(
+ users.insert().returning(users.c.user_name, users.c.user_id),
+ user_values
+ )
+
+ r2 = connection.execute(
+ addresses.insert().returning(
+ addresses.c.address_id,
+ addresses.c.address,
+ addresses.c.user_id,
+ ),
+ address_values
+ )
+
+ rows = r1.splice_horizontally(r2).all()
+ assert (
+ rows ==
+ [
+ ("john", 1, 1, "foo@bar.com", 1),
+ ("jack", 2, 2, "bar@bat.com", 2),
+ ]
+ )
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :meth:`.CursorResult.splice_vertically`
+
+
+ """
+
+ clone = self._generate()
+ total_rows = [
+ tuple(r1) + tuple(r2)
+ for r1, r2 in zip(
+ list(self._raw_row_iterator()),
+ list(other._raw_row_iterator()),
+ )
+ ]
+
+ clone._metadata = clone._metadata._splice_horizontally(other._metadata)
+
+ clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ initial_buffer=total_rows,
+ )
+ clone._reset_memoizations()
+ return clone
+
+ def splice_vertically(self, other):
+ """Return a new :class:`.CursorResult` that "vertically splices",
+ i.e. "extends", the rows of this :class:`.CursorResult` with that of
+ another :class:`.CursorResult`.
+
+ .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
+ not intended for general use.
+
+ "vertically splices" means the rows of the given result are appended to
+ the rows of this cursor result. The incoming :class:`.CursorResult`
+ must have rows that represent the identical list of columns in the
+ identical order as they are in this :class:`.CursorResult`.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`.CursorResult.splice_horizontally`
+
+ """
+ clone = self._generate()
+ total_rows = list(self._raw_row_iterator()) + list(
+ other._raw_row_iterator()
+ )
+
+ clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ initial_buffer=total_rows,
+ )
+ clone._reset_memoizations()
+ return clone
+
+ def _rewind(self, rows):
+ """rewind this result back to the given rowset.
+
+ this is used internally for the case where an :class:`.Insert`
+ construct combines the use of
+ :meth:`.Insert.return_defaults` along with the
+ "supplemental columns" feature.
+
+ """
+
+ if self._echo:
+ self.context.connection._log_debug(
+ "CursorResult rewound %d row(s)", len(rows)
+ )
+
+ # the rows given are expected to be Row objects, so we
+ # have to clear out processors which have already run on these
+ # rows
+ self._metadata = cast(
+ CursorResultMetaData, self._metadata
+ )._remove_processors()
+
+ self.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ # TODO: if these are Row objects, can we save on not having to
+ # re-make new Row objects out of them a second time? is that
+ # what's actually happening right now? maybe look into this
+ initial_buffer=rows,
+ )
+ self._reset_memoizations()
+ return self
+
@property
def returned_defaults(self):
"""Return the values of default columns that were fetched using