summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/mysql/aiomysql.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/dialects/mysql/aiomysql.py')
-rw-r--r--lib/sqlalchemy/dialects/mysql/aiomysql.py38
1 files changed, 24 insertions, 14 deletions
diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py
index 6c968a1e7..cab6df499 100644
--- a/lib/sqlalchemy/dialects/mysql/aiomysql.py
+++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py
@@ -35,6 +35,7 @@ handling.
from .pymysql import MySQLDialect_pymysql
from ... import pool
from ... import util
+from ...util.concurrency import asyncio
from ...util.concurrency import await_fallback
from ...util.concurrency import await_only
@@ -84,24 +85,32 @@ class AsyncAdapt_aiomysql_cursor:
self._rows[:] = []
def execute(self, operation, parameters=None):
- if parameters is None:
- result = self.await_(self._cursor.execute(operation))
- else:
- result = self.await_(self._cursor.execute(operation, parameters))
-
- if not self.server_side:
- # aiomysql has a "fake" async result, so we have to pull it out
- # of that here since our default result is not async.
- # we could just as easily grab "_rows" here and be done with it
- # but this is safer.
- self._rows = list(self.await_(self._cursor.fetchall()))
- return result
+ return self.await_(self._execute_async(operation, parameters))
def executemany(self, operation, seq_of_parameters):
return self.await_(
- self._cursor.executemany(operation, seq_of_parameters)
+ self._executemany_async(operation, seq_of_parameters)
)
+ async def _execute_async(self, operation, parameters):
+ async with self._adapt_connection._execute_mutex:
+ if parameters is None:
+ result = await self._cursor.execute(operation)
+ else:
+ result = await self._cursor.execute(operation, parameters)
+
+ if not self.server_side:
+ # aiomysql has a "fake" async result, so we have to pull it out
+ # of that here since our default result is not async.
+ # we could just as easily grab "_rows" here and be done with it
+ # but this is safer.
+ self._rows = list(await self._cursor.fetchall())
+ return result
+
+ async def _executemany_async(self, operation, seq_of_parameters):
+ async with self._adapt_connection._execute_mutex:
+ return await self._cursor.executemany(operation, seq_of_parameters)
+
def setinputsizes(self, *inputsizes):
pass
@@ -161,11 +170,12 @@ class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
class AsyncAdapt_aiomysql_connection:
await_ = staticmethod(await_only)
- __slots__ = ("dbapi", "_connection")
+ __slots__ = ("dbapi", "_connection", "_execute_mutex")
def __init__(self, dbapi, connection):
self.dbapi = dbapi
self._connection = connection
+ self._execute_mutex = asyncio.Lock()
def ping(self, reconnect):
return self.await_(self._connection.ping(reconnect))