summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/unreleased_14/6993.rst13
-rw-r--r--doc/build/dialects/mysql.rst9
-rw-r--r--lib/sqlalchemy/dialects/mysql/__init__.py3
-rw-r--r--lib/sqlalchemy/dialects/mysql/aiomysql.py12
-rw-r--r--lib/sqlalchemy/dialects/mysql/asyncmy.py340
-rw-r--r--lib/sqlalchemy/dialects/mysql/base.py5
-rw-r--r--lib/sqlalchemy/testing/suite/test_results.py2
-rw-r--r--setup.cfg15
-rw-r--r--test/dialect/mysql/test_types.py4
-rw-r--r--test/engine/test_reconnect.py1
-rw-r--r--tox.ini5
11 files changed, 388 insertions, 21 deletions
diff --git a/doc/build/changelog/unreleased_14/6993.rst b/doc/build/changelog/unreleased_14/6993.rst
new file mode 100644
index 000000000..fd2122eef
--- /dev/null
+++ b/doc/build/changelog/unreleased_14/6993.rst
@@ -0,0 +1,13 @@
+.. change::
+ :tags: feature, asyncio, mysql
+ :tickets: 6993
+
+ Added initial support for the ``asyncmy`` asyncio database driver for MySQL
+ and MariaDB. This driver is very new, however appears to be the only
+ current alternative to the ``aiomysql`` driver which currently appears to
+ be unmaintained and is not working with current Python versions. Much
+ thanks to long2ice for the pull request for this dialect.
+
+ .. seealso::
+
+ :ref:`asyncmy`
diff --git a/doc/build/dialects/mysql.rst b/doc/build/dialects/mysql.rst
index 573c2598c..9eb7f5a74 100644
--- a/doc/build/dialects/mysql.rst
+++ b/doc/build/dialects/mysql.rst
@@ -189,6 +189,14 @@ MySQL-Connector
.. automodule:: sqlalchemy.dialects.mysql.mysqlconnector
+.. _asyncmy:
+
+asyncmy
+-------
+
+.. automodule:: sqlalchemy.dialects.mysql.asyncmy
+
+
.. _aiomysql:
aiomysql
@@ -210,4 +218,3 @@ pyodbc
------
.. automodule:: sqlalchemy.dialects.mysql.pyodbc
-
diff --git a/lib/sqlalchemy/dialects/mysql/__init__.py b/lib/sqlalchemy/dialects/mysql/__init__.py
index 8631ea3b9..c83fec0c3 100644
--- a/lib/sqlalchemy/dialects/mysql/__init__.py
+++ b/lib/sqlalchemy/dialects/mysql/__init__.py
@@ -54,12 +54,11 @@ from ...util import compat
if compat.py3k:
from . import aiomysql # noqa
-
+ from . import asyncmy # noqa
# default dialect
base.dialect = dialect = mysqldb.dialect
-
__all__ = (
"BIGINT",
"BINARY",
diff --git a/lib/sqlalchemy/dialects/mysql/aiomysql.py b/lib/sqlalchemy/dialects/mysql/aiomysql.py
index 3275d1888..c9a87145e 100644
--- a/lib/sqlalchemy/dialects/mysql/aiomysql.py
+++ b/lib/sqlalchemy/dialects/mysql/aiomysql.py
@@ -11,6 +11,10 @@ r"""
:connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
:url: https://github.com/aio-libs/aiomysql
+.. warning:: The aiomysql dialect as of September, 2021 appears to be unmaintained
+ and no longer functions for Python version 3.10. Please refer to the
+ :ref:`asyncmy` dialect for current MySQL asyncio functionality.
+
The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
Using a special asyncio mediation layer, the aiomysql dialect is usable
@@ -21,13 +25,7 @@ This dialect should normally be used only with the
:func:`_asyncio.create_async_engine` engine creation function::
from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname")
-
-Unicode
--------
-
-Please see :ref:`mysql_unicode` for current recommendations on unicode
-handling.
+ engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4")
""" # noqa
diff --git a/lib/sqlalchemy/dialects/mysql/asyncmy.py b/lib/sqlalchemy/dialects/mysql/asyncmy.py
new file mode 100644
index 000000000..f312cf79b
--- /dev/null
+++ b/lib/sqlalchemy/dialects/mysql/asyncmy.py
@@ -0,0 +1,340 @@
+# mysql/asyncmy.py
+# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors <see AUTHORS
+# file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+r"""
+.. dialect:: mysql+asyncmy
+ :name: asyncmy
+ :dbapi: asyncmy
+ :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
+ :url: https://github.com/long2ice/asyncmy
+
+.. note:: The asyncmy dialect as of September, 2021 was added to provide
+ MySQL/MariaDB asyncio compatibility given that the :ref:`aiomysql` database
+ driver has become unmaintained, however asyncmy is itself very new.
+
+Using a special asyncio mediation layer, the asyncmy dialect is usable
+as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
+extension package.
+
+This dialect should normally be used only with the
+:func:`_asyncio.create_async_engine` engine creation function::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4")
+
+
+""" # noqa
+
+import contextlib
+
+from .pymysql import MySQLDialect_pymysql
+from ... import pool
+from ... import util
+from ...util.concurrency import asyncio
+from ...util.concurrency import await_fallback
+from ...util.concurrency import await_only
+
+
+class AsyncAdapt_asyncmy_cursor:
+ server_side = False
+ __slots__ = (
+ "_adapt_connection",
+ "_connection",
+ "await_",
+ "_cursor",
+ "_rows",
+ )
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ cursor = self._connection.cursor()
+
+ self._cursor = self.await_(cursor.__aenter__())
+ self._rows = []
+
+ @property
+ def description(self):
+ return self._cursor.description
+
+ @property
+ def rowcount(self):
+ return self._cursor.rowcount
+
+ @property
+ def arraysize(self):
+ return self._cursor.arraysize
+
+ @arraysize.setter
+ def arraysize(self, value):
+ self._cursor.arraysize = value
+
+ @property
+ def lastrowid(self):
+ return self._cursor.lastrowid
+
+ def close(self):
+ # note we aren't actually closing the cursor here,
+ # we are just letting GC do it. to allow this to be async
+ # we would need the Result to change how it does "Safe close cursor".
+ # MySQL "cursors" don't actually have state to be "closed" besides
+ # exhausting rows, which we already have done for sync cursor.
+ # another option would be to emulate aiosqlite dialect and assign
+ # cursor only if we are doing server side cursor operation.
+ self._rows[:] = []
+
+ def execute(self, operation, parameters=None):
+ return self.await_(self._execute_async(operation, parameters))
+
+ def executemany(self, operation, seq_of_parameters):
+ return self.await_(
+ self._executemany_async(operation, seq_of_parameters)
+ )
+
+ async def _execute_async(self, operation, parameters):
+ async with self._adapt_connection._mutex_and_adapt_errors():
+ if parameters is None:
+ result = await self._cursor.execute(operation)
+ else:
+ result = await self._cursor.execute(operation, parameters)
+
+ if not self.server_side:
+ # asyncmy has a "fake" async result, so we have to pull it out
+ # of that here since our default result is not async.
+ # we could just as easily grab "_rows" here and be done with it
+ # but this is safer.
+ self._rows = list(await self._cursor.fetchall())
+ return result
+
+ async def _executemany_async(self, operation, seq_of_parameters):
+ async with self._adapt_connection._mutex_and_adapt_errors():
+ return await self._cursor.executemany(operation, seq_of_parameters)
+
+ def setinputsizes(self, *inputsizes):
+ pass
+
+ def __iter__(self):
+ while self._rows:
+ yield self._rows.pop(0)
+
+ def fetchone(self):
+ if self._rows:
+ return self._rows.pop(0)
+ else:
+ return None
+
+ def fetchmany(self, size=None):
+ if size is None:
+ size = self.arraysize
+
+ retval = self._rows[0:size]
+ self._rows[:] = self._rows[size:]
+ return retval
+
+ def fetchall(self):
+ retval = self._rows[:]
+ self._rows[:] = []
+ return retval
+
+
+class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
+ __slots__ = ()
+ server_side = True
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ adapt_connection._ss_cursors.add(self)
+
+ cursor = self._connection.cursor(
+ adapt_connection.dbapi.asyncmy.cursors.SSCursor
+ )
+
+ self._cursor = self.await_(cursor.__aenter__())
+
+ def close(self):
+ try:
+ if self._cursor is not None:
+ self.await_(self._cursor.fetchall())
+ self.await_(self._cursor.close())
+ self._cursor = None
+ finally:
+ self._adapt_connection._ss_cursors.discard(self)
+
+ def fetchone(self):
+ return self.await_(self._cursor.fetchone())
+
+ def fetchmany(self, size=None):
+ return self.await_(self._cursor.fetchmany(size=size))
+
+ def fetchall(self):
+ return self.await_(self._cursor.fetchall())
+
+
+class AsyncAdapt_asyncmy_connection:
+ await_ = staticmethod(await_only)
+ __slots__ = ("dbapi", "_connection", "_execute_mutex", "_ss_cursors")
+
+ def __init__(self, dbapi, connection):
+ self.dbapi = dbapi
+ self._connection = connection
+ self._execute_mutex = asyncio.Lock()
+ self._ss_cursors = set()
+
+ @contextlib.asynccontextmanager
+ async def _mutex_and_adapt_errors(self):
+ async with self._execute_mutex:
+ try:
+ yield
+ except AttributeError:
+ raise self.dbapi.InternalError(
+ "network operation failed due to asyncmy attribute error"
+ )
+
+ def ping(self, reconnect):
+ assert not reconnect
+ return self.await_(self._do_ping())
+
+ async def _do_ping(self):
+ async with self._mutex_and_adapt_errors():
+ return await self._connection.ping(False)
+
+ def character_set_name(self):
+ return self._connection.character_set_name()
+
+ def autocommit(self, value):
+ self.await_(self._connection.autocommit(value))
+
+ def cursor(self, server_side=False):
+ if server_side:
+ return AsyncAdapt_asyncmy_ss_cursor(self)
+ else:
+ return AsyncAdapt_asyncmy_cursor(self)
+
+ def _shutdown_ss_cursors(self):
+ for curs in list(self._ss_cursors):
+ curs.close()
+
+ def rollback(self):
+ self._shutdown_ss_cursors()
+ self.await_(self._connection.rollback())
+
+ def commit(self):
+ self._shutdown_ss_cursors()
+ self.await_(self._connection.commit())
+
+ def close(self):
+ self._shutdown_ss_cursors()
+ # it's not awaitable.
+ self._connection.close()
+
+
+class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
+ __slots__ = ()
+
+ await_ = staticmethod(await_fallback)
+
+
+class AsyncAdapt_asyncmy_dbapi:
+ def __init__(self, asyncmy, pymysql):
+ self.asyncmy = asyncmy
+ self.pymysql = pymysql
+ self.paramstyle = "format"
+ self._init_dbapi_attributes()
+
+ def _init_dbapi_attributes(self):
+ for name in (
+ "Warning",
+ "Error",
+ "InterfaceError",
+ "DataError",
+ "DatabaseError",
+ "OperationalError",
+ "InterfaceError",
+ "IntegrityError",
+ "ProgrammingError",
+ "InternalError",
+ "NotSupportedError",
+ ):
+ setattr(self, name, getattr(self.asyncmy.errors, name))
+
+ for name in (
+ "NUMBER",
+ "STRING",
+ "DATETIME",
+ "BINARY",
+ "TIMESTAMP",
+ "Binary",
+ ):
+ setattr(self, name, getattr(self.pymysql, name))
+
+ def connect(self, *arg, **kw):
+ async_fallback = kw.pop("async_fallback", False)
+
+ if util.asbool(async_fallback):
+ return AsyncAdaptFallback_asyncmy_connection(
+ self,
+ await_fallback(self.asyncmy.connect(*arg, **kw)),
+ )
+ else:
+ return AsyncAdapt_asyncmy_connection(
+ self,
+ await_only(self.asyncmy.connect(*arg, **kw)),
+ )
+
+
+class MySQLDialect_asyncmy(MySQLDialect_pymysql):
+ driver = "asyncmy"
+ supports_statement_cache = True
+
+ supports_server_side_cursors = True
+ _sscursor = AsyncAdapt_asyncmy_ss_cursor
+
+ is_async = True
+
+ @classmethod
+ def dbapi(cls):
+ return AsyncAdapt_asyncmy_dbapi(
+ __import__("asyncmy"), __import__("pymysql")
+ )
+
+ @classmethod
+ def get_pool_class(cls, url):
+
+ async_fallback = url.query.get("async_fallback", False)
+
+ if util.asbool(async_fallback):
+ return pool.FallbackAsyncAdaptedQueuePool
+ else:
+ return pool.AsyncAdaptedQueuePool
+
+ def create_connect_args(self, url):
+ return super(MySQLDialect_asyncmy, self).create_connect_args(
+ url, _translate_args=dict(username="user", database="db")
+ )
+
+ def is_disconnect(self, e, connection, cursor):
+ if super(MySQLDialect_asyncmy, self).is_disconnect(
+ e, connection, cursor
+ ):
+ return True
+ else:
+ str_e = str(e).lower()
+ return (
+ "not connected" in str_e or "network operation failed" in str_e
+ )
+
+ def _found_rows_client_flag(self):
+ from pymysql.constants import CLIENT
+
+ return CLIENT.FOUND_ROWS
+
+
+dialect = MySQLDialect_asyncmy
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py
index 9bf12e194..04b0c1b6d 100644
--- a/lib/sqlalchemy/dialects/mysql/base.py
+++ b/lib/sqlalchemy/dialects/mysql/base.py
@@ -2915,7 +2915,10 @@ class MySQLDialect(default.DefaultDialect):
"WHERE TABLE_TYPE='SEQUENCE' and TABLE_NAME=:name AND "
"TABLE_SCHEMA=:schema_name"
),
- dict(name=sequence_name, schema_name=schema),
+ dict(
+ name=util.text_type(sequence_name),
+ schema_name=util.text_type(schema),
+ ),
)
return cursor.first() is not None
diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py
index 982ac498d..c41a55025 100644
--- a/lib/sqlalchemy/testing/suite/test_results.py
+++ b/lib/sqlalchemy/testing/suite/test_results.py
@@ -234,7 +234,7 @@ class ServerSideCursorsTest(
elif self.engine.dialect.driver == "pymysql":
sscursor = __import__("pymysql.cursors").cursors.SSCursor
return isinstance(cursor, sscursor)
- elif self.engine.dialect.driver == "aiomysql":
+ elif self.engine.dialect.driver in ("aiomysql", "asyncmy"):
return cursor.server_side
elif self.engine.dialect.driver == "mysqldb":
sscursor = __import__("MySQLdb.cursors").cursors.SSCursor
diff --git a/setup.cfg b/setup.cfg
index 689539687..8512e7abc 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -74,6 +74,9 @@ pymysql =
aiomysql =
%(asyncio)s
aiomysql;python_version>="3"
+asyncmy =
+ %(asyncio)s
+ asyncmy;python_version>="3"
aiosqlite =
%(asyncio)s
aiosqlite;python_version>="3"
@@ -111,11 +114,11 @@ exclude = .venv,.git,.tox,dist,doc,*egg,build
import-order-style = google
application-import-names = sqlalchemy,test
per-file-ignores =
- **/__init__.py:F401
- lib/sqlalchemy/events.py:F401
- lib/sqlalchemy/schema.py:F401
- lib/sqlalchemy/types.py:F401
- lib/sqlalchemy/sql/expression.py:F401
+ **/__init__.py:F401
+ lib/sqlalchemy/events.py:F401
+ lib/sqlalchemy/schema.py:F401
+ lib/sqlalchemy/types.py:F401
+ lib/sqlalchemy/sql/expression.py:F401
[mypy]
# min mypy version 0.800
@@ -164,6 +167,8 @@ mysql = mysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
pymysql = mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
aiomysql = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
aiomysql_fallback = mysql+aiomysql://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
+asyncmy = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
+asyncmy_fallback = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4&async_fallback=true
mariadb = mariadb://scott:tiger@127.0.0.1:3306/test
mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server
mssql_pymssql = mssql+pymssql://scott:tiger@ms_2008
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index 0d466e26d..7bdf6f8ce 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -549,7 +549,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults):
([0, 0, 0, 0, i, i, i, i], None),
([0, 0, 0, 0, 0, j, j, j], None),
([0, 0, 0, 0, 0, 0, k, k], None),
- ([0, 0, 0, 0, 0, 0, 0, l], None),
+ ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")),
argnames="store, expected",
)
def test_bit_50_roundtrip(self, connection, bit_table, store, expected):
@@ -569,7 +569,7 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults):
([0, 0, 0, 0, i, i, i, i], None),
([0, 0, 0, 0, 0, j, j, j], None),
([0, 0, 0, 0, 0, 0, k, k], None),
- ([0, 0, 0, 0, 0, 0, 0, l], None),
+ ([0, 0, 0, 0, 0, 0, 0, l], None, testing.fails_if("+asyncmy")),
argnames="store, expected",
)
def test_bit_50_roundtrip_reflected(
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 538ff8b89..ebcb8d520 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -1386,6 +1386,7 @@ class InvalidateDuringResultTest(fixtures.TestBase):
"+asyncpg",
"+aiosqlite",
"+aiomysql",
+ "+asyncmy",
],
"Buffers the result set and doesn't check for connection close",
)
diff --git a/tox.ini b/tox.ini
index 544b27491..67731240a 100644
--- a/tox.ini
+++ b/tox.ini
@@ -31,6 +31,7 @@ deps=
mysql: .[mysql]
mysql: .[pymysql]
mysql: git+https://github.com/sqlalchemy/aiomysql@sqlalchemy_tox; python_version >= '3'
+ mysql: .[asyncmy]; python_version >= '3'
mysql: .[mariadb_connector]; python_version >= '3'
oracle: .[oracle]
@@ -102,9 +103,9 @@ setenv=
py2{,7}-mysql: MYSQL={env:TOX_MYSQL_PY2K:{env:TOX_MYSQL:--db mysql}}
mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql}
- py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql}
+ py3{,5,6,7,8,9}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver aiomysql --dbdriver asyncmy}
# omit aiomysql for Python 3.10
- py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector}
+ py3{,10,11}-mysql: EXTRA_MYSQL_DRIVERS={env:EXTRA_MYSQL_DRIVERS:--dbdriver mysqldb --dbdriver pymysql --dbdriver mariadbconnector --dbdriver asyncmy}
mssql: MSSQL={env:TOX_MSSQL:--db mssql}