diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-06-14 16:43:16 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-06-14 16:43:16 -0400 |
commit | 9ccdea3a0fe57931e779b44eb2c278b78eea3d95 (patch) | |
tree | 0e77479d782446f0b35e367bc38e97d215800f4c | |
parent | e15d58695d6eff9a1d53e31e5ae3666434a4a1af (diff) | |
download | sqlalchemy-9ccdea3a0fe57931e779b44eb2c278b78eea3d95.tar.gz |
- add test cases for pullreq github:182, where we add a new
"max_row_buffer" execution option for BufferedRowResultProxy
- also add documentation, changelog and version notes
- rework the max_row_buffer argument to be interpreted from
the execution options upfront when the BufferedRowResultProxy
is first initialized.
-rw-r--r-- | doc/build/changelog/changelog_10.rst | 11 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/psycopg2.py | 13 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/result.py | 21 | ||||
-rw-r--r-- | test/engine/test_execute.py | 157 | ||||
-rw-r--r-- | test/orm/test_query.py | 6 |
5 files changed, 155 insertions, 53 deletions
diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst index f44886559..155302388 100644 --- a/doc/build/changelog/changelog_10.rst +++ b/doc/build/changelog/changelog_10.rst @@ -19,6 +19,17 @@ :version: 1.0.6 .. change:: + :tags: feature, postgresql + :pullreq: github:182 + + Added new execution option ``max_row_buffer`` which is interpreted + by the psycopg2 dialect when the ``stream_results`` option is + used, which sets a limit on the size of the row buffer that may be + allocated. This value is also provided based on the integer + value sent to :meth:`.Query.yield_per`. Pull request courtesy + mcclurem. + + .. change:: :tags: bug, orm :tickets: 3448 diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 35de41fef..36a9d7bf7 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -74,6 +74,8 @@ See also: `PQconnectdbParams <http://www.postgresql.org/docs/9.1/static/\ libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_ +.. _psycopg2_execution_options: + Per-Statement/Connection Execution Options ------------------------------------------- @@ -81,16 +83,23 @@ The following DBAPI-specific options are respected when used with :meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`, :meth:`.Query.execution_options`, in addition to those not specific to DBAPIs: -* isolation_level - Set the transaction isolation level for the lifespan of a +* ``isolation_level`` - Set the transaction isolation level for the lifespan of a :class:`.Connection` (can only be set on a connection, not a statement or query). See :ref:`psycopg2_isolation_level`. -* stream_results - Enable or disable usage of psycopg2 server side cursors - +* ``stream_results`` - Enable or disable usage of psycopg2 server side cursors - this feature makes use of "named" cursors in combination with special result handling methods so that result rows are not fully buffered. If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used. +* ``max_row_buffer`` - when using ``stream_results``, an integer value that + specifies the maximum number of rows to buffer at a time. This is + interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the + buffer will grow to ultimately store 1000 rows at a time. + + .. versionadded:: 1.0.6 + .. _psycopg2_unicode: Unicode with Psycopg2 diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 41b30c983..b2b78dee8 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -1068,9 +1068,26 @@ class BufferedRowResultProxy(ResultProxy): The pre-fetching behavior fetches only one row initially, and then grows its buffer size by a fixed amount with each successive need for additional rows up to a size of 1000. + + The size argument is configurable using the ``max_row_buffer`` + execution option:: + + with psycopg2_engine.connect() as conn: + + result = conn.execution_options( + stream_results=True, max_row_buffer=50 + ).execute("select * from table") + + .. versionadded:: 1.0.6 Added the ``max_row_buffer`` option. + + .. seealso:: + + :ref:`psycopg2_execution_options` """ def _init_metadata(self): + self._max_row_buffer = self.context.execution_options.get( + 'max_row_buffer', None) self.__buffer_rows() super(BufferedRowResultProxy, self)._init_metadata() @@ -1095,8 +1112,8 @@ class BufferedRowResultProxy(ResultProxy): size = getattr(self, '_bufsize', 1) self.__rowbuffer = collections.deque(self.cursor.fetchmany(size)) self._bufsize = self.size_growth.get(size, size) - if self.context.execution_options.get('max_row_buffer') is not None: - self._bufsize = min(self.context.execution_options['max_row_buffer'], self._bufsize) + if self._max_row_buffer is not None: + self._bufsize = min(self._max_row_buffer, self._bufsize) def _soft_close(self, **kw): self.__rowbuffer.clear() diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index c7b524335..5ddf4ad09 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,7 +1,7 @@ # coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ - config, is_, is_not_ + config, is_, is_not_, le_ import re from sqlalchemy.testing.util import picklers from sqlalchemy.interfaces import ConnectionProxy @@ -1047,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase): ) -class AlternateResultProxyTest(fixtures.TestBase): +class AlternateResultProxyTest(fixtures.TablesTest): __requires__ = ('sqlite', ) @classmethod - def setup_class(cls): + def setup_bind(cls): cls.engine = engine = testing_engine('sqlite://') - m = MetaData() - cls.table = t = Table('test', m, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) - m.create_all(engine) - engine.execute(t.insert(), [ + return engine + + @classmethod + def define_tables(cls, metadata): + Table( + 'test', metadata, + Column('x', Integer, primary_key=True), + Column('y', String(50, convert_unicode='force')) + ) + + @classmethod + def insert_data(cls): + cls.engine.execute(cls.tables.test.insert(), [ {'x': i, 'y': "t_%d" % i} for i in range(1, 12) ]) - def _test_proxy(self, cls): + @contextmanager + def _proxy_fixture(self, cls): + self.table = self.tables.test + class ExcCtx(default.DefaultExecutionContext): def get_result_proxy(self): return cls(self) - self.engine.dialect.execution_ctx_cls = ExcCtx - rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + self.patcher = patch.object( + self.engine.dialect, "execution_ctx_cls", ExcCtx) + with self.patcher: + yield - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) + def _test_proxy(self, cls): + with self._proxy_fixture(cls): + rows = [] + r = self.engine.execute(select([self.table])) + assert isinstance(r, cls) + for i in range(5): + rows.append(r.fetchone()) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) + rows = r.fetchmany(3) + eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 + rows = r.fetchall() + eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) + r = self.engine.execute(select([self.table])) + rows = r.fetchmany(None) + eq_(rows[0], (1, "t_1")) + # number of rows here could be one, or the whole thing + assert len(rows) == 1 or len(rows) == 11 - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + r = self.engine.execute(select([self.table]).limit(1)) + r.fetchone() + eq_(r.fetchone(), None) - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) + r = self.engine.execute(select([self.table]).limit(5)) + rows = r.fetchmany(6) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - eq_(r.fetchmany(2), []) + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) - eq_(r.fetchall(), []) + eq_(r.fetchmany(2), []) - eq_(r.fetchone(), None) + eq_(r.fetchall(), []) - # until we close - r.close() + eq_(r.fetchone(), None) - self._assert_result_closed(r) + # until we close + r.close() - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) def _assert_result_closed(self, r): assert_raises_message( @@ -1149,6 +1164,54 @@ class AlternateResultProxyTest(fixtures.TestBase): def test_buffered_column_result_proxy(self): self._test_proxy(_result.BufferedColumnResultProxy) + def test_buffered_row_growth(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, + 1351: 1000 + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + def test_max_row_buffer_option(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + result = conn.execution_options(max_row_buffer=27).execute( + self.table.select() + ) + for idx, row in enumerate(result, 0): + if idx in (16, 70, 150, 250): + eq_(result._bufsize, 27) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 27 + ) + class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', diff --git a/test/orm/test_query.py b/test/orm/test_query.py index 41c0e2a21..62c97ec90 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -2675,10 +2675,12 @@ class YieldTest(_fixtures.FixtureTest): User = self.classes.User sess = create_session() - q = sess.query(User).yield_per(1) + q = sess.query(User).yield_per(15) q = q.execution_options(foo='bar') assert q._yield_per - eq_(q._execution_options, {"stream_results": True, "foo": "bar"}) + eq_( + q._execution_options, + {"stream_results": True, "foo": "bar", "max_row_buffer": 15}) def test_no_joinedload_opt(self): self._eagerload_mappings() |