diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-07-04 12:21:36 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-08-13 18:41:53 -0400 |
commit | 5fb0138a3220161703e6ab1087319a669d14e7f4 (patch) | |
tree | 25d006b30830ce6bc71f7a69bed9b570e1ae9654 /test/ext/asyncio/test_session_py3k.py | |
parent | cd03b8f0cecbf72ecd6c99c4d3a6338c8278b40d (diff) | |
download | sqlalchemy-5fb0138a3220161703e6ab1087319a669d14e7f4.tar.gz |
Implement rudimentary asyncio support w/ asyncpg
Using the approach introduced at
https://gist.github.com/zzzeek/6287e28054d3baddc07fa21a7227904e
We can now create asyncio endpoints that are then handled
in "implicit IO" form within the majority of the Core internals.
Then coroutines are re-exposed at the point at which we call
into asyncpg methods.
Patch includes:
* asyncpg dialect
* asyncio package
* engine, result, ORM session classes
* new test fixtures, tests
* some work with pep-484 and a short plugin for the
pyannotate package, which seems to have so-so results
Change-Id: Idbcc0eff72c4cad572914acdd6f40ddb1aef1a7d
Fixes: #3414
Diffstat (limited to 'test/ext/asyncio/test_session_py3k.py')
-rw-r--r-- | test/ext/asyncio/test_session_py3k.py | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/test/ext/asyncio/test_session_py3k.py b/test/ext/asyncio/test_session_py3k.py new file mode 100644 index 000000000..e8caaca3e --- /dev/null +++ b/test/ext/asyncio/test_session_py3k.py @@ -0,0 +1,200 @@ +from sqlalchemy import exc +from sqlalchemy import func +from sqlalchemy import select +from sqlalchemy import testing +from sqlalchemy import update +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.orm import selectinload +from sqlalchemy.testing import async_test +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import is_ +from ...orm import _fixtures + + +class AsyncFixture(_fixtures.FixtureTest): + __requires__ = ("async_dialect",) + + @classmethod + def setup_mappers(cls): + cls._setup_stock_mapping() + + @testing.fixture + def async_engine(self): + return create_async_engine(testing.db.url) + + @testing.fixture + def async_session(self, async_engine): + return AsyncSession(async_engine) + + +class AsyncSessionTest(AsyncFixture): + def test_requires_async_engine(self, async_engine): + testing.assert_raises_message( + exc.ArgumentError, + "AsyncEngine expected, got Engine", + AsyncSession, + bind=async_engine.sync_engine, + ) + + +class AsyncSessionQueryTest(AsyncFixture): + @async_test + async def test_execute(self, async_session): + User = self.classes.User + + stmt = ( + select(User) + .options(selectinload(User.addresses)) + .order_by(User.id) + ) + + result = await async_session.execute(stmt) + eq_(result.scalars().all(), self.static.user_address_result) + + @async_test + async def test_stream_partitions(self, async_session): + User = self.classes.User + + stmt = ( + select(User) + .options(selectinload(User.addresses)) + .order_by(User.id) + ) + + result = await async_session.stream(stmt) + + assert_result = [] + async for partition in result.scalars().partitions(3): + assert_result.append(partition) + + eq_( + assert_result, + [ + self.static.user_address_result[0:3], + self.static.user_address_result[3:], + ], + ) + + +class AsyncSessionTransactionTest(AsyncFixture): + run_inserts = None + + @async_test + async def test_trans(self, async_session, async_engine): + async with async_engine.connect() as outer_conn: + + User = self.classes.User + + async with async_session.begin(): + + eq_(await outer_conn.scalar(select(func.count(User.id))), 0) + + u1 = User(name="u1") + + async_session.add(u1) + + result = await async_session.execute(select(User)) + eq_(result.scalar(), u1) + + eq_(await outer_conn.scalar(select(func.count(User.id))), 1) + + @async_test + async def test_commit_as_you_go(self, async_session, async_engine): + async with async_engine.connect() as outer_conn: + + User = self.classes.User + + eq_(await outer_conn.scalar(select(func.count(User.id))), 0) + + u1 = User(name="u1") + + async_session.add(u1) + + result = await async_session.execute(select(User)) + eq_(result.scalar(), u1) + + await async_session.commit() + + eq_(await outer_conn.scalar(select(func.count(User.id))), 1) + + @async_test + async def test_trans_noctx(self, async_session, async_engine): + async with async_engine.connect() as outer_conn: + + User = self.classes.User + + trans = await async_session.begin() + try: + eq_(await outer_conn.scalar(select(func.count(User.id))), 0) + + u1 = User(name="u1") + + async_session.add(u1) + + result = await async_session.execute(select(User)) + eq_(result.scalar(), u1) + finally: + await trans.commit() + + eq_(await outer_conn.scalar(select(func.count(User.id))), 1) + + @async_test + async def test_flush(self, async_session): + User = self.classes.User + + async with async_session.begin(): + u1 = User(name="u1") + + async_session.add(u1) + + conn = await async_session.connection() + + eq_(await conn.scalar(select(func.count(User.id))), 0) + + await async_session.flush() + + eq_(await conn.scalar(select(func.count(User.id))), 1) + + @async_test + async def test_refresh(self, async_session): + User = self.classes.User + + async with async_session.begin(): + u1 = User(name="u1") + + async_session.add(u1) + await async_session.flush() + + conn = await async_session.connection() + + await conn.execute( + update(User) + .values(name="u2") + .execution_options(synchronize_session=None) + ) + + eq_(u1.name, "u1") + + await async_session.refresh(u1) + + eq_(u1.name, "u2") + + eq_(await conn.scalar(select(func.count(User.id))), 1) + + @async_test + async def test_merge(self, async_session): + User = self.classes.User + + async with async_session.begin(): + u1 = User(id=1, name="u1") + + async_session.add(u1) + + async with async_session.begin(): + new_u = User(id=1, name="new u1") + + new_u_merged = await async_session.merge(new_u) + + is_(new_u_merged, u1) + eq_(u1.name, "new u1") |