diff options
Diffstat (limited to 'test/ext/asyncio/test_session_py3k.py')
-rw-r--r-- | test/ext/asyncio/test_session_py3k.py | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/test/ext/asyncio/test_session_py3k.py b/test/ext/asyncio/test_session_py3k.py index e97e2563a..1f5c95054 100644 --- a/test/ext/asyncio/test_session_py3k.py +++ b/test/ext/asyncio/test_session_py3k.py @@ -1,11 +1,14 @@ from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import func +from sqlalchemy import inspect from sqlalchemy import select from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import update +from sqlalchemy.ext.asyncio import async_object_session from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio.base import ReversibleProxy from sqlalchemy.orm import relationship from sqlalchemy.orm import selectinload from sqlalchemy.orm import sessionmaker @@ -503,3 +506,159 @@ class AsyncEventTest(AsyncFixture): canary.mock_calls, [mock.call(async_session.sync_session)], ) + + +class AsyncProxyTest(AsyncFixture): + @async_test + async def test_get_connection_engine_bound(self, async_session): + c1 = await async_session.connection() + + c2 = await async_session.connection() + + is_(c1, c2) + is_(c1.engine, c2.engine) + + @async_test + async def test_get_connection_connection_bound(self, async_engine): + async with async_engine.begin() as conn: + async_session = AsyncSession(conn) + + c1 = await async_session.connection() + + is_(c1, conn) + is_(c1.engine, conn.engine) + + @async_test + async def test_get_transaction(self, async_session): + + is_(async_session.get_transaction(), None) + is_(async_session.get_nested_transaction(), None) + + t1 = await async_session.begin() + + is_(async_session.get_transaction(), t1) + is_(async_session.get_nested_transaction(), None) + + n1 = await async_session.begin_nested() + + is_(async_session.get_transaction(), t1) + is_(async_session.get_nested_transaction(), n1) + + await n1.commit() + + is_(async_session.get_transaction(), t1) + is_(async_session.get_nested_transaction(), None) + + await t1.commit() + + is_(async_session.get_transaction(), None) + is_(async_session.get_nested_transaction(), None) + + @async_test + async def test_async_object_session(self, async_engine): + User = self.classes.User + + s1 = AsyncSession(async_engine) + + s2 = AsyncSession(async_engine) + + u1 = await s1.get(User, 7) + + u2 = User(name="n1") + + s2.add(u2) + + u3 = User(name="n2") + + is_(async_object_session(u1), s1) + is_(async_object_session(u2), s2) + + is_(async_object_session(u3), None) + + await s2.close() + is_(async_object_session(u2), None) + + @async_test + async def test_async_object_session_custom(self, async_engine): + User = self.classes.User + + class MyCustomAsync(AsyncSession): + pass + + s1 = MyCustomAsync(async_engine) + + u1 = await s1.get(User, 7) + + assert isinstance(async_object_session(u1), MyCustomAsync) + + @testing.requires.predictable_gc + @async_test + async def test_async_object_session_del(self, async_engine): + User = self.classes.User + + s1 = AsyncSession(async_engine) + + u1 = await s1.get(User, 7) + + is_(async_object_session(u1), s1) + + await s1.rollback() + del s1 + is_(async_object_session(u1), None) + + @async_test + async def test_inspect_session(self, async_engine): + User = self.classes.User + + s1 = AsyncSession(async_engine) + + s2 = AsyncSession(async_engine) + + u1 = await s1.get(User, 7) + + u2 = User(name="n1") + + s2.add(u2) + + u3 = User(name="n2") + + is_(inspect(u1).async_session, s1) + is_(inspect(u2).async_session, s2) + + is_(inspect(u3).async_session, None) + + def test_inspect_session_no_asyncio_used(self): + from sqlalchemy.orm import Session + + User = self.classes.User + + s1 = Session(testing.db) + u1 = s1.get(User, 7) + + is_(inspect(u1).async_session, None) + + def test_inspect_session_no_asyncio_imported(self): + from sqlalchemy.orm import Session + + with mock.patch("sqlalchemy.orm.state._async_provider", None): + + User = self.classes.User + + s1 = Session(testing.db) + u1 = s1.get(User, 7) + + is_(inspect(u1).async_session, None) + + @testing.requires.predictable_gc + def test_gc(self, async_engine): + ReversibleProxy._proxy_objects.clear() + + eq_(len(ReversibleProxy._proxy_objects), 0) + + async_session = AsyncSession(async_engine) + + eq_(len(ReversibleProxy._proxy_objects), 1) + + del async_session + + eq_(len(ReversibleProxy._proxy_objects), 0) |