diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-25 22:36:44 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-28 14:38:56 -0400 |
commit | 77f1b7d236dba6b1c859bb428ef32d118ec372e6 (patch) | |
tree | 7fae8eaaf303d6ce02bd423abf216550001e2f7b /lib/sqlalchemy/engine/cursor.py | |
parent | 366e88ea0e5c5417184c1dd4776cff752560631d (diff) | |
download | sqlalchemy-77f1b7d236dba6b1c859bb428ef32d118ec372e6.tar.gz |
callcount reductions and refinement for cached queries
This commit includes that we've removed the "_orm_query"
attribute from compile state as well as query context.
The attribute created reference cycles and also added
method call overhead. As part of this change,
the interface for ORMExecuteState changes a bit, as well
as the interface for the horizontal sharding extension
which now deprecates the "query_chooser" callable
in favor of "execute_chooser", which receives the contextual
object. This will also work more nicely when we implement
the new execution path for bulk updates and deletes.
Pre-merge execution options for statement, connection,
arguments all up front in Connection. that way they
can be passed to the before_execute / after_execute events,
and the ExecutionContext doesn't have to merge as second
time. Core execute is pretty close to 1.3 now.
baked wasn't using the new one()/first()/one_or_none() methods,
fixed that.
Convert non-buffered cursor strategy to be a stateless
singleton. inline all the paths by which the strategy
gets chosen, oracle and SQL Server dialects make use of the
already-invoked post_exec() hook to establish the alternate
strategies, and this is actually much nicer than it was before.
Add caching to mapper instance processor for getters.
Identified a reference cycle per query that was showing
up as a lot of gc cleanup, fixed that.
After all that, performance not budging much. Even
test_baked_query now runs with significantly fewer function
calls than 1.3, still 40% slower.
Basically something about the new patterns just makes
this slower and while I've walked a whole bunch of them
back, it hardly makes a dent. that said, the performance
issues are relatively small, in the 20-40% time increase
range, and the new caching feature
does provide for regular ORM and Core queries that
are cached, and they are faster than non-cached.
Change-Id: I7b0b0d8ca550c05f79e82f75cd8eff0bbfade053
Diffstat (limited to 'lib/sqlalchemy/engine/cursor.py')
-rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 345 |
1 files changed, 164 insertions, 181 deletions
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index fdbf826ed..c32427644 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -10,12 +10,12 @@ import collections +import functools from .result import Result from .result import ResultMetaData from .result import SimpleResultMetaData from .result import tuplegetter -from .row import _baserow_usecext from .row import LegacyRow from .. import exc from .. import util @@ -89,14 +89,6 @@ class CursorResultMetaData(ResultMetaData): for index, rec in enumerate(self._metadata_for_keys(keys)) ] new_metadata._keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} - if not _baserow_usecext: - # TODO: can consider assembling ints + negative ints here - new_metadata._keymap.update( - { - index: (index, new_keys[index], ()) - for index in range(len(new_keys)) - } - ) # TODO: need unit test for: # result = connection.execute("raw sql, no columns").scalars() @@ -186,25 +178,6 @@ class CursorResultMetaData(ResultMetaData): ) self._keymap = {} - if not _baserow_usecext: - # keymap indexes by integer index: this is only used - # in the pure Python BaseRow.__getitem__ - # implementation to avoid an expensive - # isinstance(key, util.int_types) in the most common - # case path - - len_raw = len(raw) - - self._keymap.update( - [ - (metadata_entry[MD_INDEX], metadata_entry) - for metadata_entry in raw - ] - + [ - (metadata_entry[MD_INDEX] - len_raw, metadata_entry) - for metadata_entry in raw - ] - ) # processors in key order for certain per-row # views like __iter__ and slices @@ -623,20 +596,23 @@ class CursorResultMetaData(ResultMetaData): return index def _indexes_for_keys(self, keys): - for rec in self._metadata_for_keys(keys): - yield rec[0] + + try: + return [self._keymap[key][0] for key in keys] + except KeyError as ke: + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) def _metadata_for_keys(self, keys): for key in keys: - # TODO: can consider pre-loading ints and negative ints - # into _keymap - if isinstance(key, int): + if int in key.__class__.__mro__: key = self._keys[key] try: rec = self._keymap[key] except KeyError as ke: - rec = self._key_fallback(key, ke) + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) index = rec[0] @@ -786,25 +762,27 @@ class ResultFetchStrategy(object): __slots__ = () - def soft_close(self, result): + alternate_cursor_description = None + + def soft_close(self, result, dbapi_cursor): raise NotImplementedError() - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): raise NotImplementedError() - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): return - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): raise NotImplementedError() - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): raise NotImplementedError() def fetchall(self, result): raise NotImplementedError() - def handle_exception(self, result, err): + def handle_exception(self, result, dbapi_cursor, err): raise err @@ -819,21 +797,19 @@ class NoCursorFetchStrategy(ResultFetchStrategy): __slots__ = () - cursor_description = None - - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): pass - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): pass - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): return self._non_result(result, None) - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): return self._non_result(result, []) - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): return self._non_result(result, []) def _non_result(self, result, default, err=None): @@ -893,71 +869,59 @@ class CursorFetchStrategy(ResultFetchStrategy): """ - __slots__ = ("dbapi_cursor", "cursor_description") - - def __init__(self, dbapi_cursor, cursor_description): - self.dbapi_cursor = dbapi_cursor - self.cursor_description = cursor_description - - @classmethod - def create(cls, result): - dbapi_cursor = result.cursor - description = dbapi_cursor.description - - if description is None: - return _NO_CURSOR_DML - else: - return cls(dbapi_cursor, description) + __slots__ = () - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): result.cursor_strategy = _NO_CURSOR_DQL - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): result.cursor_strategy = _NO_CURSOR_DQL - def handle_exception(self, result, err): + def handle_exception(self, result, dbapi_cursor, err): result.connection._handle_dbapi_exception( - err, None, None, self.dbapi_cursor, result.context + err, None, None, dbapi_cursor, result.context ) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): result.cursor_strategy = BufferedRowCursorFetchStrategy( - self.dbapi_cursor, - self.cursor_description, - num, - collections.deque(), + dbapi_cursor, + {"max_row_buffer": num}, + initial_buffer=collections.deque(), growth_factor=0, ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): try: - row = self.dbapi_cursor.fetchone() + row = dbapi_cursor.fetchone() if row is None: result._soft_close(hard=hard_close) return row except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): try: if size is None: - l = self.dbapi_cursor.fetchmany() + l = dbapi_cursor.fetchmany() else: - l = self.dbapi_cursor.fetchmany(size) + l = dbapi_cursor.fetchmany(size) if not l: result._soft_close() return l except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): try: - rows = self.dbapi_cursor.fetchall() + rows = dbapi_cursor.fetchall() result._soft_close() return rows except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) + + +_DEFAULT_FETCH = CursorFetchStrategy() class BufferedRowCursorFetchStrategy(CursorFetchStrategy): @@ -993,18 +957,18 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): def __init__( self, dbapi_cursor, - description, - max_row_buffer, - initial_buffer, + execution_options, growth_factor=5, + initial_buffer=None, ): - super(BufferedRowCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) - self._max_row_buffer = max_row_buffer + self._max_row_buffer = execution_options.get("max_row_buffer", 1000) + + if initial_buffer is not None: + self._rowbuffer = initial_buffer + else: + self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) self._growth_factor = growth_factor - self._rowbuffer = initial_buffer if growth_factor: self._bufsize = min(self._max_row_buffer, self._growth_factor) @@ -1013,39 +977,19 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): @classmethod def create(cls, result): - """Buffered row strategy has to buffer the first rows *before* - cursor.description is fetched so that it works with named cursors - correctly - - """ - - dbapi_cursor = result.cursor - - # TODO: is create() called within a handle_error block externally? - # can this be guaranteed / tested / etc - initial_buffer = collections.deque(dbapi_cursor.fetchmany(1)) - - description = dbapi_cursor.description - - if description is None: - return _NO_CURSOR_DML - else: - max_row_buffer = result.context.execution_options.get( - "max_row_buffer", 1000 - ) - return cls( - dbapi_cursor, description, max_row_buffer, initial_buffer - ) + return BufferedRowCursorFetchStrategy( + result.cursor, result.context.execution_options, + ) - def _buffer_rows(self, result): + def _buffer_rows(self, result, dbapi_cursor): size = self._bufsize try: if size < 1: - new_rows = self.dbapi_cursor.fetchall() + new_rows = dbapi_cursor.fetchall() else: - new_rows = self.dbapi_cursor.fetchmany(size) + new_rows = dbapi_cursor.fetchmany(size) except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) if not new_rows: return @@ -1055,21 +999,25 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): self._max_row_buffer, size * self._growth_factor ) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): self._growth_factor = 0 self._max_row_buffer = self._bufsize = num - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).soft_close(result) + super(BufferedRowCursorFetchStrategy, self).soft_close( + result, dbapi_cursor + ) - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(BufferedRowCursorFetchStrategy, self).hard_close(result) + super(BufferedRowCursorFetchStrategy, self).hard_close( + result, dbapi_cursor + ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): if not self._rowbuffer: - self._buffer_rows(result) + self._buffer_rows(result, dbapi_cursor) if not self._rowbuffer: try: result._soft_close(hard=hard_close) @@ -1078,15 +1026,15 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): return None return self._rowbuffer.popleft() - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): if size is None: - return self.fetchall(result) + return self.fetchall(result, dbapi_cursor) buf = list(self._rowbuffer) lb = len(buf) if size > lb: try: - buf.extend(self.dbapi_cursor.fetchmany(size - lb)) + buf.extend(dbapi_cursor.fetchmany(size - lb)) except BaseException as e: self.handle_exception(result, e) @@ -1094,14 +1042,14 @@ class BufferedRowCursorFetchStrategy(CursorFetchStrategy): self._rowbuffer = collections.deque(buf[size:]) return result - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): try: - ret = list(self._rowbuffer) + list(self.dbapi_cursor.fetchall()) + ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) self._rowbuffer.clear() result._soft_close() return ret except BaseException as e: - self.handle_exception(result, e) + self.handle_exception(result, dbapi_cursor, e) class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): @@ -1113,42 +1061,42 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): """ - __slots__ = ("_rowbuffer",) + __slots__ = ("_rowbuffer", "alternate_cursor_description") - def __init__(self, dbapi_cursor, description, initial_buffer=None): - super(FullyBufferedCursorFetchStrategy, self).__init__( - dbapi_cursor, description - ) + def __init__( + self, dbapi_cursor, alternate_description, initial_buffer=None + ): + self.alternate_cursor_description = alternate_description if initial_buffer is not None: self._rowbuffer = collections.deque(initial_buffer) else: - self._rowbuffer = collections.deque(self.dbapi_cursor.fetchall()) - - @classmethod - def create_from_buffer(cls, dbapi_cursor, description, buffer): - return cls(dbapi_cursor, description, buffer) + self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) - def yield_per(self, result, num): + def yield_per(self, result, dbapi_cursor, num): pass - def soft_close(self, result): + def soft_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).soft_close(result) + super(FullyBufferedCursorFetchStrategy, self).soft_close( + result, dbapi_cursor + ) - def hard_close(self, result): + def hard_close(self, result, dbapi_cursor): self._rowbuffer.clear() - super(FullyBufferedCursorFetchStrategy, self).hard_close(result) + super(FullyBufferedCursorFetchStrategy, self).hard_close( + result, dbapi_cursor + ) - def fetchone(self, result, hard_close=False): + def fetchone(self, result, dbapi_cursor, hard_close=False): if self._rowbuffer: return self._rowbuffer.popleft() else: result._soft_close(hard=hard_close) return None - def fetchmany(self, result, size=None): + def fetchmany(self, result, dbapi_cursor, size=None): if size is None: - return self.fetchall(result) + return self.fetchall(result, dbapi_cursor) buf = list(self._rowbuffer) rows = buf[0:size] @@ -1157,7 +1105,7 @@ class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): result._soft_close() return rows - def fetchall(self, result): + def fetchall(self, result, dbapi_cursor): ret = self._rowbuffer self._rowbuffer = collections.deque() result._soft_close() @@ -1210,40 +1158,53 @@ class BaseCursorResult(object): _soft_closed = False closed = False - @classmethod - def _create_for_context(cls, context): - - if context._is_future_result: - obj = CursorResult(context) - else: - obj = LegacyCursorResult(context) - return obj - - def __init__(self, context): + def __init__(self, context, cursor_strategy, cursor_description): self.context = context self.dialect = context.dialect self.cursor = context.cursor + self.cursor_strategy = cursor_strategy self.connection = context.root_connection self._echo = echo = ( self.connection._echo and context.engine._should_log_debug() ) - if echo: - log = self.context.engine.logger.debug + if cursor_description is not None: + # inline of Result._row_getter(), set up an initial row + # getter assuming no transformations will be called as this + # is the most common case + + if echo: + log = self.context.engine.logger.debug + + def log_row(row): + log("Row %r", sql_util._repr_row(row)) + return row - def log_row(row): - log("Row %r", sql_util._repr_row(row)) - return row + self._row_logging_fn = log_row + else: + log_row = None + + metadata = self._init_metadata(context, cursor_description) + + keymap = metadata._keymap + processors = metadata._processors + process_row = self._process_row + key_style = process_row._default_key_style + _make_row = functools.partial( + process_row, metadata, processors, keymap, key_style + ) + if log_row: - self._row_logging_fn = log_row + def make_row(row): + made_row = _make_row(row) + log_row(made_row) + return made_row - # this is a hook used by dialects to change the strategy, - # so for the moment we have to keep calling this every time - # :( - self.cursor_strategy = strat = context.get_result_cursor_strategy(self) + self._row_getter = make_row + else: + make_row = _make_row + self._set_memoized_attribute("_row_getter", make_row) - if strat.cursor_description is not None: - self._init_metadata(context, strat.cursor_description) else: self._metadata = _NO_RESULT_METADATA @@ -1251,19 +1212,41 @@ class BaseCursorResult(object): if context.compiled: if context.compiled._cached_metadata: cached_md = self.context.compiled._cached_metadata - self._metadata = cached_md self._metadata_from_cache = True + # result rewrite/ adapt step. two translations can occur here. + # one is if we are invoked against a cached statement, we want + # to rewrite the ResultMetaData to reflect the column objects + # that are in our current selectable, not the cached one. the + # other is, the CompileState can return an alternative Result + # object. Finally, CompileState might want to tell us to not + # actually do the ResultMetaData adapt step if it in fact has + # changed the selected columns in any case. + compiled = context.compiled + if ( + compiled + and not compiled._rewrites_selected_columns + and compiled.statement is not context.invoked_statement + ): + cached_md = cached_md._adapt_to_context(context) + + self._metadata = metadata = cached_md + else: self._metadata = ( - context.compiled._cached_metadata - ) = self._cursor_metadata(self, cursor_description) + metadata + ) = context.compiled._cached_metadata = self._cursor_metadata( + self, cursor_description + ) else: - self._metadata = self._cursor_metadata(self, cursor_description) + self._metadata = metadata = self._cursor_metadata( + self, cursor_description + ) if self._echo: context.engine.logger.debug( "Col %r", tuple(x[0] for x in cursor_description) ) + return metadata def _soft_close(self, hard=False): """Soft close this :class:`_engine.CursorResult`. @@ -1294,9 +1277,9 @@ class BaseCursorResult(object): if hard: self.closed = True - self.cursor_strategy.hard_close(self) + self.cursor_strategy.hard_close(self, self.cursor) else: - self.cursor_strategy.soft_close(self) + self.cursor_strategy.soft_close(self, self.cursor) if not self._soft_closed: cursor = self.cursor @@ -1632,19 +1615,19 @@ class CursorResult(BaseCursorResult, Result): fetchone = self.cursor_strategy.fetchone while True: - row = fetchone(self) + row = fetchone(self, self.cursor) if row is None: break yield row def _fetchone_impl(self, hard_close=False): - return self.cursor_strategy.fetchone(self, hard_close) + return self.cursor_strategy.fetchone(self, self.cursor, hard_close) def _fetchall_impl(self): - return self.cursor_strategy.fetchall(self) + return self.cursor_strategy.fetchall(self, self.cursor) def _fetchmany_impl(self, size=None): - return self.cursor_strategy.fetchmany(self, size) + return self.cursor_strategy.fetchmany(self, self.cursor, size) def _raw_row_iterator(self): return self._fetchiter_impl() @@ -1674,7 +1657,7 @@ class CursorResult(BaseCursorResult, Result): @_generative def yield_per(self, num): self._yield_per = num - self.cursor_strategy.yield_per(self, num) + self.cursor_strategy.yield_per(self, self.cursor, num) class LegacyCursorResult(CursorResult): |