diff options
author | Roman Podoliaka <roman.podoliaka@gmail.com> | 2016-11-04 00:31:05 +0200 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-11-10 12:09:27 -0500 |
commit | d1e31ab1582e2d9275c70a89b72efc2a8651df3f (patch) | |
tree | 36518daa1d1a2468ba2d68e903b233978afbcab2 /lib/sqlalchemy/engine | |
parent | 6a688b736429e27a892bc02111414491fe4103b0 (diff) | |
download | sqlalchemy-d1e31ab1582e2d9275c70a89b72efc2a8651df3f.tar.gz |
Add support for server side cursors to mysqldb and pymysql
This allows to skip buffering of the results on the client side, e.g.
the following snippet:
table = sa.Table(
'testtbl', sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('a', sa.Integer),
sa.Column('b', sa.String(512))
)
table.create(eng, checkfirst=True)
with eng.connect() as conn:
result = conn.execute(table.select().limit(1)).fetchone()
if result is None:
for _ in range(1000):
conn.execute(
table.insert(),
[{'a': random.randint(1, 100000),
'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))}
for _ in range(1000)]
)
with eng.connect() as conn:
for row in conn.execution_options(stream_results=True).execute(table.select()):
pass
now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and
PyMySQL 0.7.9.
psycopg2 implementation and execution options (stream_results,
server_side_cursors) are reused.
Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 46 |
2 files changed, 45 insertions, 3 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 1d23c66b3..f071abaa1 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -295,7 +295,7 @@ class Connection(Connectable): Indicate to the dialect that results should be "streamed" and not pre-buffered, if possible. This is a limitation of many DBAPIs. The flag is currently understood only by the - psycopg2 dialect. + psycopg2, mysqldb and pymysql dialects. :param schema_translate_map: Available on: Connection, Engine. A dictionary mapping schema names to schema names, that will be diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3ee240383..719178f7e 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile( r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)', re.I | re.UNICODE) +# When we're handed literal SQL, ensure it's a SELECT query +SERVER_SIDE_CURSOR_RE = re.compile( + r'\s*SELECT', + re.I | re.UNICODE) + class DefaultDialect(interfaces.Dialect): """Default implementation of Dialect""" @@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect): supports_empty_insert = True supports_multivalues_insert = False + supports_server_side_cursors = False + server_version_info = None construct_arguments = None @@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext): def should_autocommit_text(self, statement): return AUTOCOMMIT_REGEXP.match(statement) + def _use_server_side_cursor(self): + if not self.dialect.supports_server_side_cursors: + return False + + if self.dialect.server_side_cursors: + use_server_side = \ + self.execution_options.get('stream_results', True) and ( + (self.compiled and isinstance(self.compiled.statement, + expression.Selectable) + or + ( + (not self.compiled or + isinstance(self.compiled.statement, + expression.TextClause)) + and self.statement and SERVER_SIDE_CURSOR_RE.match( + self.statement)) + ) + ) + else: + use_server_side = \ + self.execution_options.get('stream_results', False) + + return use_server_side + def create_cursor(self): - return self._dbapi_connection.cursor() + if self._use_server_side_cursor(): + self._is_server_side = True + return self.create_server_side_cursor() + else: + self._is_server_side = False + return self._dbapi_connection.cursor() + + def create_server_side_cursor(self): + raise NotImplementedError() def pre_exec(self): pass @@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext): pass def get_result_proxy(self): - return result.ResultProxy(self) + if self._is_server_side: + return result.BufferedRowResultProxy(self) + else: + return result.ResultProxy(self) @property def rowcount(self): |