diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 157 |
1 files changed, 110 insertions, 47 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index c7b524335..5ddf4ad09 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,7 +1,7 @@ # coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ - config, is_, is_not_ + config, is_, is_not_, le_ import re from sqlalchemy.testing.util import picklers from sqlalchemy.interfaces import ConnectionProxy @@ -1047,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase): ) -class AlternateResultProxyTest(fixtures.TestBase): +class AlternateResultProxyTest(fixtures.TablesTest): __requires__ = ('sqlite', ) @classmethod - def setup_class(cls): + def setup_bind(cls): cls.engine = engine = testing_engine('sqlite://') - m = MetaData() - cls.table = t = Table('test', m, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) - m.create_all(engine) - engine.execute(t.insert(), [ + return engine + + @classmethod + def define_tables(cls, metadata): + Table( + 'test', metadata, + Column('x', Integer, primary_key=True), + Column('y', String(50, convert_unicode='force')) + ) + + @classmethod + def insert_data(cls): + cls.engine.execute(cls.tables.test.insert(), [ {'x': i, 'y': "t_%d" % i} for i in range(1, 12) ]) - def _test_proxy(self, cls): + @contextmanager + def _proxy_fixture(self, cls): + self.table = self.tables.test + class ExcCtx(default.DefaultExecutionContext): def get_result_proxy(self): return cls(self) - self.engine.dialect.execution_ctx_cls = ExcCtx - rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + self.patcher = patch.object( + self.engine.dialect, "execution_ctx_cls", ExcCtx) + with self.patcher: + yield - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) + def _test_proxy(self, cls): + with self._proxy_fixture(cls): + rows = [] + r = self.engine.execute(select([self.table])) + assert isinstance(r, cls) + for i in range(5): + rows.append(r.fetchone()) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) + rows = r.fetchmany(3) + eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 + rows = r.fetchall() + eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) + r = self.engine.execute(select([self.table])) + rows = r.fetchmany(None) + eq_(rows[0], (1, "t_1")) + # number of rows here could be one, or the whole thing + assert len(rows) == 1 or len(rows) == 11 - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + r = self.engine.execute(select([self.table]).limit(1)) + r.fetchone() + eq_(r.fetchone(), None) - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) + r = self.engine.execute(select([self.table]).limit(5)) + rows = r.fetchmany(6) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - eq_(r.fetchmany(2), []) + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) - eq_(r.fetchall(), []) + eq_(r.fetchmany(2), []) - eq_(r.fetchone(), None) + eq_(r.fetchall(), []) - # until we close - r.close() + eq_(r.fetchone(), None) - self._assert_result_closed(r) + # until we close + r.close() - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) def _assert_result_closed(self, r): assert_raises_message( @@ -1149,6 +1164,54 @@ class AlternateResultProxyTest(fixtures.TestBase): def test_buffered_column_result_proxy(self): self._test_proxy(_result.BufferedColumnResultProxy) + def test_buffered_row_growth(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, + 1351: 1000 + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + def test_max_row_buffer_option(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + result = conn.execution_options(max_row_buffer=27).execute( + self.table.select() + ) + for idx, row in enumerate(result, 0): + if idx in (16, 70, 150, 250): + eq_(result._bufsize, 27) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 27 + ) + class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', |