summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine
diff options
context:
space:
mode:
authorRoman Podoliaka <roman.podoliaka@gmail.com>2016-11-04 00:31:05 +0200
committerMike Bayer <mike_mp@zzzcomputing.com>2016-11-10 12:09:27 -0500
commitd1e31ab1582e2d9275c70a89b72efc2a8651df3f (patch)
tree36518daa1d1a2468ba2d68e903b233978afbcab2 /lib/sqlalchemy/engine
parent6a688b736429e27a892bc02111414491fe4103b0 (diff)
downloadsqlalchemy-d1e31ab1582e2d9275c70a89b72efc2a8651df3f.tar.gz
Add support for server side cursors to mysqldb and pymysql
This allows to skip buffering of the results on the client side, e.g. the following snippet: table = sa.Table( 'testtbl', sa.MetaData(), sa.Column('id', sa.Integer, primary_key=True), sa.Column('a', sa.Integer), sa.Column('b', sa.String(512)) ) table.create(eng, checkfirst=True) with eng.connect() as conn: result = conn.execute(table.select().limit(1)).fetchone() if result is None: for _ in range(1000): conn.execute( table.insert(), [{'a': random.randint(1, 100000), 'b': ''.join(random.choice(string.ascii_letters) for _ in range(100))} for _ in range(1000)] ) with eng.connect() as conn: for row in conn.execution_options(stream_results=True).execute(table.select()): pass now uses ~23 MB of memory instead of ~327 MB on CPython 3.5.2 and PyMySQL 0.7.9. psycopg2 implementation and execution options (stream_results, server_side_cursors) are reused. Change-Id: I4dc23ce3094f027bdff51b896b050361991c62e2
Diffstat (limited to 'lib/sqlalchemy/engine')
-rw-r--r--lib/sqlalchemy/engine/base.py2
-rw-r--r--lib/sqlalchemy/engine/default.py46
2 files changed, 45 insertions, 3 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 1d23c66b3..f071abaa1 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -295,7 +295,7 @@ class Connection(Connectable):
Indicate to the dialect that results should be
"streamed" and not pre-buffered, if possible. This is a limitation
of many DBAPIs. The flag is currently understood only by the
- psycopg2 dialect.
+ psycopg2, mysqldb and pymysql dialects.
:param schema_translate_map: Available on: Connection, Engine.
A dictionary mapping schema names to schema names, that will be
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 3ee240383..719178f7e 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -27,6 +27,11 @@ AUTOCOMMIT_REGEXP = re.compile(
r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
re.I | re.UNICODE)
+# When we're handed literal SQL, ensure it's a SELECT query
+SERVER_SIDE_CURSOR_RE = re.compile(
+ r'\s*SELECT',
+ re.I | re.UNICODE)
+
class DefaultDialect(interfaces.Dialect):
"""Default implementation of Dialect"""
@@ -108,6 +113,8 @@ class DefaultDialect(interfaces.Dialect):
supports_empty_insert = True
supports_multivalues_insert = False
+ supports_server_side_cursors = False
+
server_version_info = None
construct_arguments = None
@@ -780,8 +787,40 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
def should_autocommit_text(self, statement):
return AUTOCOMMIT_REGEXP.match(statement)
+ def _use_server_side_cursor(self):
+ if not self.dialect.supports_server_side_cursors:
+ return False
+
+ if self.dialect.server_side_cursors:
+ use_server_side = \
+ self.execution_options.get('stream_results', True) and (
+ (self.compiled and isinstance(self.compiled.statement,
+ expression.Selectable)
+ or
+ (
+ (not self.compiled or
+ isinstance(self.compiled.statement,
+ expression.TextClause))
+ and self.statement and SERVER_SIDE_CURSOR_RE.match(
+ self.statement))
+ )
+ )
+ else:
+ use_server_side = \
+ self.execution_options.get('stream_results', False)
+
+ return use_server_side
+
def create_cursor(self):
- return self._dbapi_connection.cursor()
+ if self._use_server_side_cursor():
+ self._is_server_side = True
+ return self.create_server_side_cursor()
+ else:
+ self._is_server_side = False
+ return self._dbapi_connection.cursor()
+
+ def create_server_side_cursor(self):
+ raise NotImplementedError()
def pre_exec(self):
pass
@@ -831,7 +870,10 @@ class DefaultExecutionContext(interfaces.ExecutionContext):
pass
def get_result_proxy(self):
- return result.ResultProxy(self)
+ if self._is_server_side:
+ return result.BufferedRowResultProxy(self)
+ else:
+ return result.ResultProxy(self)
@property
def rowcount(self):