summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-06-14 16:43:16 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2015-06-14 16:43:16 -0400
commit9ccdea3a0fe57931e779b44eb2c278b78eea3d95 (patch)
tree0e77479d782446f0b35e367bc38e97d215800f4c
parente15d58695d6eff9a1d53e31e5ae3666434a4a1af (diff)
downloadsqlalchemy-9ccdea3a0fe57931e779b44eb2c278b78eea3d95.tar.gz
- add test cases for pullreq github:182, where we add a new
"max_row_buffer" execution option for BufferedRowResultProxy - also add documentation, changelog and version notes - rework the max_row_buffer argument to be interpreted from the execution options upfront when the BufferedRowResultProxy is first initialized.
-rw-r--r--doc/build/changelog/changelog_10.rst11
-rw-r--r--lib/sqlalchemy/dialects/postgresql/psycopg2.py13
-rw-r--r--lib/sqlalchemy/engine/result.py21
-rw-r--r--test/engine/test_execute.py157
-rw-r--r--test/orm/test_query.py6
5 files changed, 155 insertions, 53 deletions
diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst
index f44886559..155302388 100644
--- a/doc/build/changelog/changelog_10.rst
+++ b/doc/build/changelog/changelog_10.rst
@@ -19,6 +19,17 @@
:version: 1.0.6
.. change::
+ :tags: feature, postgresql
+ :pullreq: github:182
+
+ Added new execution option ``max_row_buffer`` which is interpreted
+ by the psycopg2 dialect when the ``stream_results`` option is
+ used, which sets a limit on the size of the row buffer that may be
+ allocated. This value is also provided based on the integer
+ value sent to :meth:`.Query.yield_per`. Pull request courtesy
+ mcclurem.
+
+ .. change::
:tags: bug, orm
:tickets: 3448
diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
index 35de41fef..36a9d7bf7 100644
--- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py
+++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py
@@ -74,6 +74,8 @@ See also:
`PQconnectdbParams <http://www.postgresql.org/docs/9.1/static/\
libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
+.. _psycopg2_execution_options:
+
Per-Statement/Connection Execution Options
-------------------------------------------
@@ -81,16 +83,23 @@ The following DBAPI-specific options are respected when used with
:meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`,
:meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
-* isolation_level - Set the transaction isolation level for the lifespan of a
+* ``isolation_level`` - Set the transaction isolation level for the lifespan of a
:class:`.Connection` (can only be set on a connection, not a statement
or query). See :ref:`psycopg2_isolation_level`.
-* stream_results - Enable or disable usage of psycopg2 server side cursors -
+* ``stream_results`` - Enable or disable usage of psycopg2 server side cursors -
this feature makes use of "named" cursors in combination with special
result handling methods so that result rows are not fully buffered.
If ``None`` or not set, the ``server_side_cursors`` option of the
:class:`.Engine` is used.
+* ``max_row_buffer`` - when using ``stream_results``, an integer value that
+ specifies the maximum number of rows to buffer at a time. This is
+ interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the
+ buffer will grow to ultimately store 1000 rows at a time.
+
+ .. versionadded:: 1.0.6
+
.. _psycopg2_unicode:
Unicode with Psycopg2
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py
index 41b30c983..b2b78dee8 100644
--- a/lib/sqlalchemy/engine/result.py
+++ b/lib/sqlalchemy/engine/result.py
@@ -1068,9 +1068,26 @@ class BufferedRowResultProxy(ResultProxy):
The pre-fetching behavior fetches only one row initially, and then
grows its buffer size by a fixed amount with each successive need
for additional rows up to a size of 1000.
+
+ The size argument is configurable using the ``max_row_buffer``
+ execution option::
+
+ with psycopg2_engine.connect() as conn:
+
+ result = conn.execution_options(
+ stream_results=True, max_row_buffer=50
+ ).execute("select * from table")
+
+ .. versionadded:: 1.0.6 Added the ``max_row_buffer`` option.
+
+ .. seealso::
+
+ :ref:`psycopg2_execution_options`
"""
def _init_metadata(self):
+ self._max_row_buffer = self.context.execution_options.get(
+ 'max_row_buffer', None)
self.__buffer_rows()
super(BufferedRowResultProxy, self)._init_metadata()
@@ -1095,8 +1112,8 @@ class BufferedRowResultProxy(ResultProxy):
size = getattr(self, '_bufsize', 1)
self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
self._bufsize = self.size_growth.get(size, size)
- if self.context.execution_options.get('max_row_buffer') is not None:
- self._bufsize = min(self.context.execution_options['max_row_buffer'], self._bufsize)
+ if self._max_row_buffer is not None:
+ self._bufsize = min(self._max_row_buffer, self._bufsize)
def _soft_close(self, **kw):
self.__rowbuffer.clear()
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index c7b524335..5ddf4ad09 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,7 +1,7 @@
# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
- config, is_, is_not_
+ config, is_, is_not_, le_
import re
from sqlalchemy.testing.util import picklers
from sqlalchemy.interfaces import ConnectionProxy
@@ -1047,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase):
)
-class AlternateResultProxyTest(fixtures.TestBase):
+class AlternateResultProxyTest(fixtures.TablesTest):
__requires__ = ('sqlite', )
@classmethod
- def setup_class(cls):
+ def setup_bind(cls):
cls.engine = engine = testing_engine('sqlite://')
- m = MetaData()
- cls.table = t = Table('test', m,
- Column('x', Integer, primary_key=True),
- Column('y', String(50, convert_unicode='force'))
- )
- m.create_all(engine)
- engine.execute(t.insert(), [
+ return engine
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'test', metadata,
+ Column('x', Integer, primary_key=True),
+ Column('y', String(50, convert_unicode='force'))
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.engine.execute(cls.tables.test.insert(), [
{'x': i, 'y': "t_%d" % i} for i in range(1, 12)
])
- def _test_proxy(self, cls):
+ @contextmanager
+ def _proxy_fixture(self, cls):
+ self.table = self.tables.test
+
class ExcCtx(default.DefaultExecutionContext):
def get_result_proxy(self):
return cls(self)
- self.engine.dialect.execution_ctx_cls = ExcCtx
- rows = []
- r = self.engine.execute(select([self.table]))
- assert isinstance(r, cls)
- for i in range(5):
- rows.append(r.fetchone())
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ self.patcher = patch.object(
+ self.engine.dialect, "execution_ctx_cls", ExcCtx)
+ with self.patcher:
+ yield
- rows = r.fetchmany(3)
- eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+ def _test_proxy(self, cls):
+ with self._proxy_fixture(cls):
+ rows = []
+ r = self.engine.execute(select([self.table]))
+ assert isinstance(r, cls)
+ for i in range(5):
+ rows.append(r.fetchone())
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- rows = r.fetchall()
- eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+ rows = r.fetchmany(3)
+ eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
- r = self.engine.execute(select([self.table]))
- rows = r.fetchmany(None)
- eq_(rows[0], (1, "t_1"))
- # number of rows here could be one, or the whole thing
- assert len(rows) == 1 or len(rows) == 11
+ rows = r.fetchall()
+ eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
- r = self.engine.execute(select([self.table]).limit(1))
- r.fetchone()
- eq_(r.fetchone(), None)
+ r = self.engine.execute(select([self.table]))
+ rows = r.fetchmany(None)
+ eq_(rows[0], (1, "t_1"))
+ # number of rows here could be one, or the whole thing
+ assert len(rows) == 1 or len(rows) == 11
- r = self.engine.execute(select([self.table]).limit(5))
- rows = r.fetchmany(6)
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ r = self.engine.execute(select([self.table]).limit(1))
+ r.fetchone()
+ eq_(r.fetchone(), None)
- # result keeps going just fine with blank results...
- eq_(r.fetchmany(2), [])
+ r = self.engine.execute(select([self.table]).limit(5))
+ rows = r.fetchmany(6)
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- eq_(r.fetchmany(2), [])
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchall(), [])
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchone(), None)
+ eq_(r.fetchall(), [])
- # until we close
- r.close()
+ eq_(r.fetchone(), None)
- self._assert_result_closed(r)
+ # until we close
+ r.close()
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.first(), (1, "t_1"))
- self._assert_result_closed(r)
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.scalar(), 1)
- self._assert_result_closed(r)
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
+
+ r = self.engine.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
def _assert_result_closed(self, r):
assert_raises_message(
@@ -1149,6 +1164,54 @@ class AlternateResultProxyTest(fixtures.TestBase):
def test_buffered_column_result_proxy(self):
self._test_proxy(_result.BufferedColumnResultProxy)
+ def test_buffered_row_growth(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execute(self.table.select())
+ checks = {
+ 0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
+ 1351: 1000
+ }
+ for idx, row in enumerate(result, 0):
+ if idx in checks:
+ eq_(result._bufsize, checks[idx])
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 1000
+ )
+
+ def test_max_row_buffer_option(self):
+ with self._proxy_fixture(_result.BufferedRowResultProxy):
+ with self.engine.connect() as conn:
+ conn.execute(self.table.insert(), [
+ {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+ ])
+ result = conn.execute(self.table.select())
+ checks = {
+ 0: 5, 1: 10, 9: 20,
+ }
+ for idx, row in enumerate(result, 0):
+ if idx in checks:
+ eq_(result._bufsize, checks[idx])
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 1000
+ )
+
+ result = conn.execution_options(max_row_buffer=27).execute(
+ self.table.select()
+ )
+ for idx, row in enumerate(result, 0):
+ if idx in (16, 70, 150, 250):
+ eq_(result._bufsize, 27)
+ le_(
+ len(result._BufferedRowResultProxy__rowbuffer),
+ 27
+ )
+
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
diff --git a/test/orm/test_query.py b/test/orm/test_query.py
index 41c0e2a21..62c97ec90 100644
--- a/test/orm/test_query.py
+++ b/test/orm/test_query.py
@@ -2675,10 +2675,12 @@ class YieldTest(_fixtures.FixtureTest):
User = self.classes.User
sess = create_session()
- q = sess.query(User).yield_per(1)
+ q = sess.query(User).yield_per(15)
q = q.execution_options(foo='bar')
assert q._yield_per
- eq_(q._execution_options, {"stream_results": True, "foo": "bar"})
+ eq_(
+ q._execution_options,
+ {"stream_results": True, "foo": "bar", "max_row_buffer": 15})
def test_no_joinedload_opt(self):
self._eagerload_mappings()