From c736eef8b35841af89ec19469aa496585efd3865 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 9 Dec 2020 08:52:57 -0500 Subject: Revert "Merge "add aiomysql support"" This reverts commit 23343f87f3297ad31d7315ac0e5312db10ef7592, reversing changes made to c5831b1abd98c46ef7eab7ee82ead18756aea112. The crashes that occur in jenkins have not been solved and are now impacting master. I am not able to reproduce the failure, including running on the CI machines directly, and a few runs where I sat there for 20 minutes and watched, it didn't happen. it is the ultimate heisenbug. Additionally, there's a reference to "arraysize" that doesn't exist in fetchmany() and there seem to be no tests that exercise this for any DBAPI which is also a major bug to be fixed. References: #5747 --- lib/sqlalchemy/dialects/mysql/aiomysql.py | 270 ------------------------------ 1 file changed, 270 deletions(-) delete mode 100644 lib/sqlalchemy/dialects/mysql/aiomysql.py (limited to 'lib/sqlalchemy/dialects/mysql/aiomysql.py') diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py deleted file mode 100644 index 2eabb91e4..000000000 --- a/lib/sqlalchemy/dialects/mysql/aiomysql.py +++ /dev/null @@ -1,270 +0,0 @@ -# mysql/aiomysql.py -# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors -# -# This module is part of SQLAlchemy and is released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php -r""" -.. dialect:: mysql+aiomysql - :name: aiomysql - :dbapi: aiomysql - :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...] - :url: https://github.com/aio-libs/aiomysql - -The aiomysql dialect is SQLAlchemy's second Python asyncio dialect. - -Using a special asyncio mediation layer, the aiomysql dialect is usable -as the backend for the :ref:`SQLAlchemy asyncio ` -extension package. - -This dialect should normally be used only with the -:func:`_asyncio.create_async_engine` engine creation function:: - - from sqlalchemy.ext.asyncio import create_async_engine - engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname") - -Unicode -------- - -Please see :ref:`mysql_unicode` for current recommendations on unicode -handling. - - -""" # noqa - -from .pymysql import MySQLDialect_pymysql -from ... import pool -from ...util.concurrency import await_fallback -from ...util.concurrency import await_only - - -class AsyncAdapt_aiomysql_cursor: - server_side = False - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor() - - # see https://github.com/aio-libs/aiomysql/issues/543 - self._cursor = self.await_(cursor.__aenter__()) - self._rows = [] - - @property - def description(self): - return self._cursor.description - - @property - def rowcount(self): - return self._cursor.rowcount - - @property - def lastrowid(self): - return self._cursor.lastrowid - - def close(self): - self._rows[:] = [] - - def execute(self, operation, parameters=None): - if parameters is None: - result = self.await_(self._cursor.execute(operation)) - else: - result = self.await_(self._cursor.execute(operation, parameters)) - - if not self.server_side: - # aiomysql has a "fake" async result, so we have to pull it out - # of that here since our default result is not async. - # we could just as easily grab "_rows" here and be done with it - # but this is safer. - self._rows = list(self.await_(self._cursor.fetchall())) - return result - - def executemany(self, operation, seq_of_parameters): - return self.await_( - self._cursor.executemany(operation, seq_of_parameters) - ) - - def setinputsizes(self, *inputsizes): - pass - - def __iter__(self): - while self._rows: - yield self._rows.pop(0) - - def fetchone(self): - if self._rows: - return self._rows.pop(0) - else: - return None - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval - - def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] - return retval - - -class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor): - - server_side = True - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor( - adapt_connection.dbapi.aiomysql.SSCursor - ) - - self._cursor = self.await_(cursor.__aenter__()) - - def close(self): - if self._cursor is not None: - self.await_(self._cursor.close()) - self._cursor = None - - def fetchone(self): - return self.await_(self._cursor.fetchone()) - - def fetchmany(self, size=None): - return self.await_(self._cursor.fetchmany(size=size)) - - def fetchall(self): - return self.await_(self._cursor.fetchall()) - - -class AsyncAdapt_aiomysql_connection: - await_ = staticmethod(await_only) - - def __init__(self, dbapi, connection): - self.dbapi = dbapi - self._connection = connection - - def ping(self, reconnect): - return self.await_(self._connection.ping(reconnect)) - - def character_set_name(self): - return self._connection.character_set_name() - - def autocommit(self, value): - self.await_(self._connection.autocommit(value)) - - def cursor(self, server_side=False): - if server_side: - return AsyncAdapt_aiomysql_ss_cursor(self) - else: - return AsyncAdapt_aiomysql_cursor(self) - - def rollback(self): - self.await_(self._connection.rollback()) - - def commit(self): - self.await_(self._connection.commit()) - - def close(self): - # it's not awaitable. - self._connection.close() - - -class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection): - __slots__ = () - - await_ = staticmethod(await_fallback) - - -class AsyncAdapt_aiomysql_dbapi: - def __init__(self, aiomysql, pymysql): - self.aiomysql = aiomysql - self.pymysql = pymysql - self.paramstyle = "format" - self._init_dbapi_attributes() - - def _init_dbapi_attributes(self): - for name in ( - "Warning", - "Error", - "InterfaceError", - "DataError", - "DatabaseError", - "OperationalError", - "InterfaceError", - "IntegrityError", - "ProgrammingError", - "InternalError", - "NotSupportedError", - ): - setattr(self, name, getattr(self.aiomysql, name)) - - for name in ( - "NUMBER", - "STRING", - "DATETIME", - "BINARY", - "TIMESTAMP", - "Binary", - ): - setattr(self, name, getattr(self.pymysql, name)) - - def connect(self, *arg, **kw): - async_fallback = kw.pop("async_fallback", False) - - if async_fallback: - return AsyncAdaptFallback_aiomysql_connection( - self, - await_fallback(self.aiomysql.connect(*arg, **kw)), - ) - else: - return AsyncAdapt_aiomysql_connection( - self, - await_only(self.aiomysql.connect(*arg, **kw)), - ) - - -class MySQLDialect_aiomysql(MySQLDialect_pymysql): - driver = "aiomysql" - - supports_server_side_cursors = True - _sscursor = AsyncAdapt_aiomysql_ss_cursor - - @classmethod - def dbapi(cls): - return AsyncAdapt_aiomysql_dbapi( - __import__("aiomysql"), __import__("pymysql") - ) - - @classmethod - def get_pool_class(self, url): - return pool.AsyncAdaptedQueuePool - - def create_connect_args(self, url): - args, kw = super(MySQLDialect_aiomysql, self).create_connect_args(url) - if "passwd" in kw: - kw["password"] = kw.pop("passwd") - return args, kw - - def is_disconnect(self, e, connection, cursor): - if super(MySQLDialect_aiomysql, self).is_disconnect( - e, connection, cursor - ): - return True - else: - str_e = str(e).lower() - return "not connected" in str_e - - def _found_rows_client_flag(self): - from pymysql.constants import CLIENT - - return CLIENT.FOUND_ROWS - - -dialect = MySQLDialect_aiomysql -- cgit v1.2.1