summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/mysql/aiomysql.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-02-22 21:49:09 -0500
committermike bayer <mike_mp@zzzcomputing.com>2021-02-25 20:49:52 +0000
commita5b76f7b07e620ece882137ceb51bace3898fad5 (patch)
tree03aac63d2e9af851a317503e577e6bea2ff35793 /lib/sqlalchemy/dialects/mysql/aiomysql.py
parentdc615763d39916e9c037c7c376db1817cdf02764 (diff)
downloadsqlalchemy-a5b76f7b07e620ece882137ceb51bace3898fad5.tar.gz
mutex asyncpg / aiomysql connection state changes
Added an ``asyncio.Lock()`` within SQLAlchemy's emulated DBAPI cursor, local to the connection, for the asyncpg dialect, so that the space between the call to ``prepare()`` and ``fetch()`` is prevented from allowing concurrent executions on the connection from causing interface error exceptions, as well as preventing race conditions when starting a new transaction. Other PostgreSQL DBAPIs are threadsafe at the connection level so this intends to provide a similar behavior, outside the realm of server side cursors. Apply the same idea to the aiomysql dialect which also would otherwise be subject to corruption if the connection were used concurrently. While this is an issue which can also occur with the threaded connection libraries, we anticipate asyncio users are more likely to attempt using the same connection in multiple awaitables at a time, even though this won't achieve concurrency for that use case, as the asyncio programming style is very encouraging of this. As the failure modes are also more complicated under asyncio, we'd rather not have this being reported. Fixes: #5967 Change-Id: I3670ba0c8f0b593c587c5aa7c6c61f9e8c5eb93a
Diffstat (limited to 'lib/sqlalchemy/dialects/mysql/aiomysql.py')
-rw-r--r--lib/sqlalchemy/dialects/mysql/aiomysql.py38
1 files changed, 24 insertions, 14 deletions
diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py
index 6c968a1e7..cab6df499 100644
--- a/lib/sqlalchemy/dialects/mysql/aiomysql.py
+++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py
@@ -35,6 +35,7 @@ handling.
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
+from ...util.concurrency import asyncio
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
@@ -84,24 +85,32 @@ class AsyncAdapt_aiomysql_cursor:
self._rows[:] = []
def execute(self, operation, parameters=None):
- if parameters is None:
- result = self.await_(self._cursor.execute(operation))
- else:
- result = self.await_(self._cursor.execute(operation, parameters))
-
- if not self.server_side:
- # aiomysql has a "fake" async result, so we have to pull it out
- # of that here since our default result is not async.
- # we could just as easily grab "_rows" here and be done with it
- # but this is safer.
- self._rows = list(self.await_(self._cursor.fetchall()))
- return result
+ return self.await_(self._execute_async(operation, parameters))
def executemany(self, operation, seq_of_parameters):
return self.await_(
- self._cursor.executemany(operation, seq_of_parameters)
+ self._executemany_async(operation, seq_of_parameters)
)
+ async def _execute_async(self, operation, parameters):
+ async with self._adapt_connection._execute_mutex:
+ if parameters is None:
+ result = await self._cursor.execute(operation)
+ else:
+ result = await self._cursor.execute(operation, parameters)
+
+ if not self.server_side:
+ # aiomysql has a "fake" async result, so we have to pull it out
+ # of that here since our default result is not async.
+ # we could just as easily grab "_rows" here and be done with it
+ # but this is safer.
+ self._rows = list(await self._cursor.fetchall())
+ return result
+
+ async def _executemany_async(self, operation, seq_of_parameters):
+ async with self._adapt_connection._execute_mutex:
+ return await self._cursor.executemany(operation, seq_of_parameters)
+
def setinputsizes(self, *inputsizes):
pass
@@ -161,11 +170,12 @@ class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
class AsyncAdapt_aiomysql_connection:
await_ = staticmethod(await_only)
- __slots__ = ("dbapi", "_connection")
+ __slots__ = ("dbapi", "_connection", "_execute_mutex")
def __init__(self, dbapi, connection):
self.dbapi = dbapi
self._connection = connection
+ self._execute_mutex = asyncio.Lock()
def ping(self, reconnect):
return self.await_(self._connection.ping(reconnect))