summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/cursor.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-05-25 22:36:44 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-05-28 14:38:56 -0400
commit77f1b7d236dba6b1c859bb428ef32d118ec372e6 (patch)
tree7fae8eaaf303d6ce02bd423abf216550001e2f7b /lib/sqlalchemy/engine/cursor.py
parent366e88ea0e5c5417184c1dd4776cff752560631d (diff)
downloadsqlalchemy-77f1b7d236dba6b1c859bb428ef32d118ec372e6.tar.gz
callcount reductions and refinement for cached queries
This commit includes that we've removed the "_orm_query" attribute from compile state as well as query context. The attribute created reference cycles and also added method call overhead. As part of this change, the interface for ORMExecuteState changes a bit, as well as the interface for the horizontal sharding extension which now deprecates the "query_chooser" callable in favor of "execute_chooser", which receives the contextual object. This will also work more nicely when we implement the new execution path for bulk updates and deletes. Pre-merge execution options for statement, connection, arguments all up front in Connection. that way they can be passed to the before_execute / after_execute events, and the ExecutionContext doesn't have to merge as second time. Core execute is pretty close to 1.3 now. baked wasn't using the new one()/first()/one_or_none() methods, fixed that. Convert non-buffered cursor strategy to be a stateless singleton. inline all the paths by which the strategy gets chosen, oracle and SQL Server dialects make use of the already-invoked post_exec() hook to establish the alternate strategies, and this is actually much nicer than it was before. Add caching to mapper instance processor for getters. Identified a reference cycle per query that was showing up as a lot of gc cleanup, fixed that. After all that, performance not budging much. Even test_baked_query now runs with significantly fewer function calls than 1.3, still 40% slower. Basically something about the new patterns just makes this slower and while I've walked a whole bunch of them back, it hardly makes a dent. that said, the performance issues are relatively small, in the 20-40% time increase range, and the new caching feature does provide for regular ORM and Core queries that are cached, and they are faster than non-cached. Change-Id: I7b0b0d8ca550c05f79e82f75cd8eff0bbfade053
Diffstat (limited to 'lib/sqlalchemy/engine/cursor.py')
-rw-r--r--lib/sqlalchemy/engine/cursor.py345
1 files changed, 164 insertions, 181 deletions
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index fdbf826ed..c32427644 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -10,12 +10,12 @@
import collections
+import functools
from .result import Result
from .result import ResultMetaData
from .result import SimpleResultMetaData
from .result import tuplegetter
-from .row import _baserow_usecext
from .row import LegacyRow
from .. import exc
from .. import util
@@ -89,14 +89,6 @@ class CursorResultMetaData(ResultMetaData):
for index, rec in enumerate(self._metadata_for_keys(keys))
]
new_metadata._keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
- if not _baserow_usecext:
- # TODO: can consider assembling ints + negative ints here
- new_metadata._keymap.update(
- {
- index: (index, new_keys[index], ())
- for index in range(len(new_keys))
- }
- )
# TODO: need unit test for:
# result = connection.execute("raw sql, no columns").scalars()
@@ -186,25 +178,6 @@ class CursorResultMetaData(ResultMetaData):
)
self._keymap = {}
- if not _baserow_usecext:
- # keymap indexes by integer index: this is only used
- # in the pure Python BaseRow.__getitem__
- # implementation to avoid an expensive
- # isinstance(key, util.int_types) in the most common
- # case path
-
- len_raw = len(raw)
-
- self._keymap.update(
- [
- (metadata_entry[MD_INDEX], metadata_entry)
- for metadata_entry in raw
- ]
- + [
- (metadata_entry[MD_INDEX] - len_raw, metadata_entry)
- for metadata_entry in raw
- ]
- )
# processors in key order for certain per-row
# views like __iter__ and slices
@@ -623,20 +596,23 @@ class CursorResultMetaData(ResultMetaData):
return index
def _indexes_for_keys(self, keys):
- for rec in self._metadata_for_keys(keys):
- yield rec[0]
+
+ try:
+ return [self._keymap[key][0] for key in keys]
+ except KeyError as ke:
+ # ensure it raises
+ CursorResultMetaData._key_fallback(self, ke.args[0], ke)
def _metadata_for_keys(self, keys):
for key in keys:
- # TODO: can consider pre-loading ints and negative ints
- # into _keymap
- if isinstance(key, int):
+ if int in key.__class__.__mro__:
key = self._keys[key]
try:
rec = self._keymap[key]
except KeyError as ke:
- rec = self._key_fallback(key, ke)
+ # ensure it raises
+ CursorResultMetaData._key_fallback(self, ke.args[0], ke)
index = rec[0]
@@ -786,25 +762,27 @@ class ResultFetchStrategy(object):
__slots__ = ()
- def soft_close(self, result):
+ alternate_cursor_description = None
+
+ def soft_close(self, result, dbapi_cursor):
raise NotImplementedError()
- def hard_close(self, result):
+ def hard_close(self, result, dbapi_cursor):
raise NotImplementedError()
- def yield_per(self, result, num):
+ def yield_per(self, result, dbapi_cursor, num):
return
- def fetchone(self, result, hard_close=False):
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
raise NotImplementedError()
- def fetchmany(self, result, size=None):
+ def fetchmany(self, result, dbapi_cursor, size=None):
raise NotImplementedError()
def fetchall(self, result):
raise NotImplementedError()
- def handle_exception(self, result, err):
+ def handle_exception(self, result, dbapi_cursor, err):
raise err
@@ -819,21 +797,19 @@ class NoCursorFetchStrategy(ResultFetchStrategy):
__slots__ = ()
- cursor_description = None
-
- def soft_close(self, result):
+ def soft_close(self, result, dbapi_cursor):
pass
- def hard_close(self, result):
+ def hard_close(self, result, dbapi_cursor):
pass
- def fetchone(self, result, hard_close=False):
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
return self._non_result(result, None)
- def fetchmany(self, result, size=None):
+ def fetchmany(self, result, dbapi_cursor, size=None):
return self._non_result(result, [])
- def fetchall(self, result):
+ def fetchall(self, result, dbapi_cursor):
return self._non_result(result, [])
def _non_result(self, result, default, err=None):
@@ -893,71 +869,59 @@ class CursorFetchStrategy(ResultFetchStrategy):
"""
- __slots__ = ("dbapi_cursor", "cursor_description")
-
- def __init__(self, dbapi_cursor, cursor_description):
- self.dbapi_cursor = dbapi_cursor
- self.cursor_description = cursor_description
-
- @classmethod
- def create(cls, result):
- dbapi_cursor = result.cursor
- description = dbapi_cursor.description
-
- if description is None:
- return _NO_CURSOR_DML
- else:
- return cls(dbapi_cursor, description)
+ __slots__ = ()
- def soft_close(self, result):
+ def soft_close(self, result, dbapi_cursor):
result.cursor_strategy = _NO_CURSOR_DQL
- def hard_close(self, result):
+ def hard_close(self, result, dbapi_cursor):
result.cursor_strategy = _NO_CURSOR_DQL
- def handle_exception(self, result, err):
+ def handle_exception(self, result, dbapi_cursor, err):
result.connection._handle_dbapi_exception(
- err, None, None, self.dbapi_cursor, result.context
+ err, None, None, dbapi_cursor, result.context
)
- def yield_per(self, result, num):
+ def yield_per(self, result, dbapi_cursor, num):
result.cursor_strategy = BufferedRowCursorFetchStrategy(
- self.dbapi_cursor,
- self.cursor_description,
- num,
- collections.deque(),
+ dbapi_cursor,
+ {"max_row_buffer": num},
+ initial_buffer=collections.deque(),
growth_factor=0,
)
- def fetchone(self, result, hard_close=False):
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
try:
- row = self.dbapi_cursor.fetchone()
+ row = dbapi_cursor.fetchone()
if row is None:
result._soft_close(hard=hard_close)
return row
except BaseException as e:
- self.handle_exception(result, e)
+ self.handle_exception(result, dbapi_cursor, e)
- def fetchmany(self, result, size=None):
+ def fetchmany(self, result, dbapi_cursor, size=None):
try:
if size is None:
- l = self.dbapi_cursor.fetchmany()
+ l = dbapi_cursor.fetchmany()
else:
- l = self.dbapi_cursor.fetchmany(size)
+ l = dbapi_cursor.fetchmany(size)
if not l:
result._soft_close()
return l
except BaseException as e:
- self.handle_exception(result, e)
+ self.handle_exception(result, dbapi_cursor, e)
- def fetchall(self, result):
+ def fetchall(self, result, dbapi_cursor):
try:
- rows = self.dbapi_cursor.fetchall()
+ rows = dbapi_cursor.fetchall()
result._soft_close()
return rows
except BaseException as e:
- self.handle_exception(result, e)
+ self.handle_exception(result, dbapi_cursor, e)
+
+
+_DEFAULT_FETCH = CursorFetchStrategy()
class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
@@ -993,18 +957,18 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
def __init__(
self,
dbapi_cursor,
- description,
- max_row_buffer,
- initial_buffer,
+ execution_options,
growth_factor=5,
+ initial_buffer=None,
):
- super(BufferedRowCursorFetchStrategy, self).__init__(
- dbapi_cursor, description
- )
- self._max_row_buffer = max_row_buffer
+ self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
+
+ if initial_buffer is not None:
+ self._rowbuffer = initial_buffer
+ else:
+ self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
self._growth_factor = growth_factor
- self._rowbuffer = initial_buffer
if growth_factor:
self._bufsize = min(self._max_row_buffer, self._growth_factor)
@@ -1013,39 +977,19 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
@classmethod
def create(cls, result):
- """Buffered row strategy has to buffer the first rows *before*
- cursor.description is fetched so that it works with named cursors
- correctly
-
- """
-
- dbapi_cursor = result.cursor
-
- # TODO: is create() called within a handle_error block externally?
- # can this be guaranteed / tested / etc
- initial_buffer = collections.deque(dbapi_cursor.fetchmany(1))
-
- description = dbapi_cursor.description
-
- if description is None:
- return _NO_CURSOR_DML
- else:
- max_row_buffer = result.context.execution_options.get(
- "max_row_buffer", 1000
- )
- return cls(
- dbapi_cursor, description, max_row_buffer, initial_buffer
- )
+ return BufferedRowCursorFetchStrategy(
+ result.cursor, result.context.execution_options,
+ )
- def _buffer_rows(self, result):
+ def _buffer_rows(self, result, dbapi_cursor):
size = self._bufsize
try:
if size < 1:
- new_rows = self.dbapi_cursor.fetchall()
+ new_rows = dbapi_cursor.fetchall()
else:
- new_rows = self.dbapi_cursor.fetchmany(size)
+ new_rows = dbapi_cursor.fetchmany(size)
except BaseException as e:
- self.handle_exception(result, e)
+ self.handle_exception(result, dbapi_cursor, e)
if not new_rows:
return
@@ -1055,21 +999,25 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
self._max_row_buffer, size * self._growth_factor
)
- def yield_per(self, result, num):
+ def yield_per(self, result, dbapi_cursor, num):
self._growth_factor = 0
self._max_row_buffer = self._bufsize = num
- def soft_close(self, result):
+ def soft_close(self, result, dbapi_cursor):
self._rowbuffer.clear()
- super(BufferedRowCursorFetchStrategy, self).soft_close(result)
+ super(BufferedRowCursorFetchStrategy, self).soft_close(
+ result, dbapi_cursor
+ )
- def hard_close(self, result):
+ def hard_close(self, result, dbapi_cursor):
self._rowbuffer.clear()
- super(BufferedRowCursorFetchStrategy, self).hard_close(result)
+ super(BufferedRowCursorFetchStrategy, self).hard_close(
+ result, dbapi_cursor
+ )
- def fetchone(self, result, hard_close=False):
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
if not self._rowbuffer:
- self._buffer_rows(result)
+ self._buffer_rows(result, dbapi_cursor)
if not self._rowbuffer:
try:
result._soft_close(hard=hard_close)
@@ -1078,15 +1026,15 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
return None
return self._rowbuffer.popleft()
- def fetchmany(self, result, size=None):
+ def fetchmany(self, result, dbapi_cursor, size=None):
if size is None:
- return self.fetchall(result)
+ return self.fetchall(result, dbapi_cursor)
buf = list(self._rowbuffer)
lb = len(buf)
if size > lb:
try:
- buf.extend(self.dbapi_cursor.fetchmany(size - lb))
+ buf.extend(dbapi_cursor.fetchmany(size - lb))
except BaseException as e:
self.handle_exception(result, e)
@@ -1094,14 +1042,14 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
self._rowbuffer = collections.deque(buf[size:])
return result
- def fetchall(self, result):
+ def fetchall(self, result, dbapi_cursor):
try:
- ret = list(self._rowbuffer) + list(self.dbapi_cursor.fetchall())
+ ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
self._rowbuffer.clear()
result._soft_close()
return ret
except BaseException as e:
- self.handle_exception(result, e)
+ self.handle_exception(result, dbapi_cursor, e)
class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
@@ -1113,42 +1061,42 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
"""
- __slots__ = ("_rowbuffer",)
+ __slots__ = ("_rowbuffer", "alternate_cursor_description")
- def __init__(self, dbapi_cursor, description, initial_buffer=None):
- super(FullyBufferedCursorFetchStrategy, self).__init__(
- dbapi_cursor, description
- )
+ def __init__(
+ self, dbapi_cursor, alternate_description, initial_buffer=None
+ ):
+ self.alternate_cursor_description = alternate_description
if initial_buffer is not None:
self._rowbuffer = collections.deque(initial_buffer)
else:
- self._rowbuffer = collections.deque(self.dbapi_cursor.fetchall())
-
- @classmethod
- def create_from_buffer(cls, dbapi_cursor, description, buffer):
- return cls(dbapi_cursor, description, buffer)
+ self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
- def yield_per(self, result, num):
+ def yield_per(self, result, dbapi_cursor, num):
pass
- def soft_close(self, result):
+ def soft_close(self, result, dbapi_cursor):
self._rowbuffer.clear()
- super(FullyBufferedCursorFetchStrategy, self).soft_close(result)
+ super(FullyBufferedCursorFetchStrategy, self).soft_close(
+ result, dbapi_cursor
+ )
- def hard_close(self, result):
+ def hard_close(self, result, dbapi_cursor):
self._rowbuffer.clear()
- super(FullyBufferedCursorFetchStrategy, self).hard_close(result)
+ super(FullyBufferedCursorFetchStrategy, self).hard_close(
+ result, dbapi_cursor
+ )
- def fetchone(self, result, hard_close=False):
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
if self._rowbuffer:
return self._rowbuffer.popleft()
else:
result._soft_close(hard=hard_close)
return None
- def fetchmany(self, result, size=None):
+ def fetchmany(self, result, dbapi_cursor, size=None):
if size is None:
- return self.fetchall(result)
+ return self.fetchall(result, dbapi_cursor)
buf = list(self._rowbuffer)
rows = buf[0:size]
@@ -1157,7 +1105,7 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
result._soft_close()
return rows
- def fetchall(self, result):
+ def fetchall(self, result, dbapi_cursor):
ret = self._rowbuffer
self._rowbuffer = collections.deque()
result._soft_close()
@@ -1210,40 +1158,53 @@ class BaseCursorResult(object):
_soft_closed = False
closed = False
- @classmethod
- def _create_for_context(cls, context):
-
- if context._is_future_result:
- obj = CursorResult(context)
- else:
- obj = LegacyCursorResult(context)
- return obj
-
- def __init__(self, context):
+ def __init__(self, context, cursor_strategy, cursor_description):
self.context = context
self.dialect = context.dialect
self.cursor = context.cursor
+ self.cursor_strategy = cursor_strategy
self.connection = context.root_connection
self._echo = echo = (
self.connection._echo and context.engine._should_log_debug()
)
- if echo:
- log = self.context.engine.logger.debug
+ if cursor_description is not None:
+ # inline of Result._row_getter(), set up an initial row
+ # getter assuming no transformations will be called as this
+ # is the most common case
+
+ if echo:
+ log = self.context.engine.logger.debug
+
+ def log_row(row):
+ log("Row %r", sql_util._repr_row(row))
+ return row
- def log_row(row):
- log("Row %r", sql_util._repr_row(row))
- return row
+ self._row_logging_fn = log_row
+ else:
+ log_row = None
+
+ metadata = self._init_metadata(context, cursor_description)
+
+ keymap = metadata._keymap
+ processors = metadata._processors
+ process_row = self._process_row
+ key_style = process_row._default_key_style
+ _make_row = functools.partial(
+ process_row, metadata, processors, keymap, key_style
+ )
+ if log_row:
- self._row_logging_fn = log_row
+ def make_row(row):
+ made_row = _make_row(row)
+ log_row(made_row)
+ return made_row
- # this is a hook used by dialects to change the strategy,
- # so for the moment we have to keep calling this every time
- # :(
- self.cursor_strategy = strat = context.get_result_cursor_strategy(self)
+ self._row_getter = make_row
+ else:
+ make_row = _make_row
+ self._set_memoized_attribute("_row_getter", make_row)
- if strat.cursor_description is not None:
- self._init_metadata(context, strat.cursor_description)
else:
self._metadata = _NO_RESULT_METADATA
@@ -1251,19 +1212,41 @@ class BaseCursorResult(object):
if context.compiled:
if context.compiled._cached_metadata:
cached_md = self.context.compiled._cached_metadata
- self._metadata = cached_md
self._metadata_from_cache = True
+ # result rewrite/ adapt step. two translations can occur here.
+ # one is if we are invoked against a cached statement, we want
+ # to rewrite the ResultMetaData to reflect the column objects
+ # that are in our current selectable, not the cached one. the
+ # other is, the CompileState can return an alternative Result
+ # object. Finally, CompileState might want to tell us to not
+ # actually do the ResultMetaData adapt step if it in fact has
+ # changed the selected columns in any case.
+ compiled = context.compiled
+ if (
+ compiled
+ and not compiled._rewrites_selected_columns
+ and compiled.statement is not context.invoked_statement
+ ):
+ cached_md = cached_md._adapt_to_context(context)
+
+ self._metadata = metadata = cached_md
+
else:
self._metadata = (
- context.compiled._cached_metadata
- ) = self._cursor_metadata(self, cursor_description)
+ metadata
+ ) = context.compiled._cached_metadata = self._cursor_metadata(
+ self, cursor_description
+ )
else:
- self._metadata = self._cursor_metadata(self, cursor_description)
+ self._metadata = metadata = self._cursor_metadata(
+ self, cursor_description
+ )
if self._echo:
context.engine.logger.debug(
"Col %r", tuple(x[0] for x in cursor_description)
)
+ return metadata
def _soft_close(self, hard=False):
"""Soft close this :class:`_engine.CursorResult`.
@@ -1294,9 +1277,9 @@ class BaseCursorResult(object):
if hard:
self.closed = True
- self.cursor_strategy.hard_close(self)
+ self.cursor_strategy.hard_close(self, self.cursor)
else:
- self.cursor_strategy.soft_close(self)
+ self.cursor_strategy.soft_close(self, self.cursor)
if not self._soft_closed:
cursor = self.cursor
@@ -1632,19 +1615,19 @@ class CursorResult(BaseCursorResult, Result):
fetchone = self.cursor_strategy.fetchone
while True:
- row = fetchone(self)
+ row = fetchone(self, self.cursor)
if row is None:
break
yield row
def _fetchone_impl(self, hard_close=False):
- return self.cursor_strategy.fetchone(self, hard_close)
+ return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
def _fetchall_impl(self):
- return self.cursor_strategy.fetchall(self)
+ return self.cursor_strategy.fetchall(self, self.cursor)
def _fetchmany_impl(self, size=None):
- return self.cursor_strategy.fetchmany(self, size)
+ return self.cursor_strategy.fetchmany(self, self.cursor, size)
def _raw_row_iterator(self):
return self._fetchiter_impl()
@@ -1674,7 +1657,7 @@ class CursorResult(BaseCursorResult, Result):
@_generative
def yield_per(self, num):
self._yield_per = num
- self.cursor_strategy.yield_per(self, num)
+ self.cursor_strategy.yield_per(self, self.cursor, num)
class LegacyCursorResult(CursorResult):