diff options
Diffstat (limited to 'lib/sqlalchemy/dialects/mysql/aiomysql.py')
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/aiomysql.py | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py index 6c968a1e7..cab6df499 100644 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py @@ -35,6 +35,7 @@ handling. from .pymysql import MySQLDialect_pymysql from ... import pool from ... import util +from ...util.concurrency import asyncio from ...util.concurrency import await_fallback from ...util.concurrency import await_only @@ -84,24 +85,32 @@ class AsyncAdapt_aiomysql_cursor: self._rows[:] = [] def execute(self, operation, parameters=None): - if parameters is None: - result = self.await_(self._cursor.execute(operation)) - else: - result = self.await_(self._cursor.execute(operation, parameters)) - - if not self.server_side: - # aiomysql has a "fake" async result, so we have to pull it out - # of that here since our default result is not async. - # we could just as easily grab "_rows" here and be done with it - # but this is safer. - self._rows = list(self.await_(self._cursor.fetchall())) - return result + return self.await_(self._execute_async(operation, parameters)) def executemany(self, operation, seq_of_parameters): return self.await_( - self._cursor.executemany(operation, seq_of_parameters) + self._executemany_async(operation, seq_of_parameters) ) + async def _execute_async(self, operation, parameters): + async with self._adapt_connection._execute_mutex: + if parameters is None: + result = await self._cursor.execute(operation) + else: + result = await self._cursor.execute(operation, parameters) + + if not self.server_side: + # aiomysql has a "fake" async result, so we have to pull it out + # of that here since our default result is not async. + # we could just as easily grab "_rows" here and be done with it + # but this is safer. + self._rows = list(await self._cursor.fetchall()) + return result + + async def _executemany_async(self, operation, seq_of_parameters): + async with self._adapt_connection._execute_mutex: + return await self._cursor.executemany(operation, seq_of_parameters) + def setinputsizes(self, *inputsizes): pass @@ -161,11 +170,12 @@ class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor): class AsyncAdapt_aiomysql_connection: await_ = staticmethod(await_only) - __slots__ = ("dbapi", "_connection") + __slots__ = ("dbapi", "_connection", "_execute_mutex") def __init__(self, dbapi, connection): self.dbapi = dbapi self._connection = connection + self._execute_mutex = asyncio.Lock() def ping(self, reconnect): return self.await_(self._connection.ping(reconnect)) |