1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
|
# ext/asyncio/session.py
# Copyright (C) 2020-2021 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from . import engine
from . import result as _result
from .base import StartableContext
from ... import util
from ...orm import Session
from ...util.concurrency import greenlet_spawn
@util.create_proxy_methods(
Session,
":class:`_orm.Session`",
":class:`_asyncio.AsyncSession`",
classmethods=["object_session", "identity_key"],
methods=[
"__contains__",
"__iter__",
"add",
"add_all",
"expire",
"expire_all",
"expunge",
"expunge_all",
"get_bind",
"is_modified",
"in_transaction",
],
attributes=[
"dirty",
"deleted",
"new",
"identity_map",
"is_active",
"autoflush",
"no_autoflush",
"info",
],
)
class AsyncSession:
"""Asyncio version of :class:`_orm.Session`.
.. versionadded:: 1.4
"""
__slots__ = (
"binds",
"bind",
"sync_session",
"_proxied",
"_slots_dispatch",
)
dispatch = None
def __init__(self, bind=None, binds=None, **kw):
kw["future"] = True
if bind:
self.bind = bind
bind = engine._get_sync_engine_or_connection(bind)
if binds:
self.binds = binds
binds = {
key: engine._get_sync_engine_or_connection(b)
for key, b in binds.items()
}
self.sync_session = self._proxied = Session(
bind=bind, binds=binds, **kw
)
async def refresh(
self, instance, attribute_names=None, with_for_update=None
):
"""Expire and refresh the attributes on the given instance.
A query will be issued to the database and all attributes will be
refreshed with their current database value.
This is the async version of the :meth:`_orm.Session.refresh` method.
See that method for a complete description of all options.
"""
return await greenlet_spawn(
self.sync_session.refresh,
instance,
attribute_names=attribute_names,
with_for_update=with_for_update,
)
async def run_sync(self, fn, *arg, **kw):
"""Invoke the given sync callable passing sync self as the first
argument.
This method maintains the asyncio event loop all the way through
to the database connection by running the given callable in a
specially instrumented greenlet.
E.g.::
with AsyncSession(async_engine) as session:
await session.run_sync(some_business_method)
.. note::
The provided callable is invoked inline within the asyncio event
loop, and will block on traditional IO calls. IO within this
callable should only call into SQLAlchemy's asyncio database
APIs which will be properly adapted to the greenlet context.
.. seealso::
:ref:`session_run_sync`
"""
return await greenlet_spawn(fn, self.sync_session, *arg, **kw)
async def execute(
self,
statement,
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
**kw
):
"""Execute a statement and return a buffered
:class:`_engine.Result` object."""
execution_options = execution_options.union({"prebuffer_rows": True})
return await greenlet_spawn(
self.sync_session.execute,
statement,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw
)
async def scalar(
self,
statement,
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
**kw
):
"""Execute a statement and return a scalar result."""
result = await self.execute(
statement,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw
)
return result.scalar()
async def get(
self,
entity,
ident,
options=None,
populate_existing=False,
with_for_update=None,
identity_token=None,
):
"""Return an instance based on the given primary key identifier,
or ``None`` if not found.
"""
return await greenlet_spawn(
self.sync_session.get,
entity,
ident,
options=options,
populate_existing=populate_existing,
with_for_update=with_for_update,
identity_token=identity_token,
)
async def stream(
self,
statement,
params=None,
execution_options=util.EMPTY_DICT,
bind_arguments=None,
**kw
):
"""Execute a statement and return a streaming
:class:`_asyncio.AsyncResult` object."""
execution_options = execution_options.union({"stream_results": True})
result = await greenlet_spawn(
self.sync_session.execute,
statement,
params=params,
execution_options=execution_options,
bind_arguments=bind_arguments,
**kw
)
return _result.AsyncResult(result)
async def delete(self, instance):
"""Mark an instance as deleted.
The database delete operation occurs upon ``flush()``.
As this operation may need to cascade along unloaded relationships,
it is awaitable to allow for those queries to take place.
"""
return await greenlet_spawn(self.sync_session.delete, instance)
async def merge(self, instance, load=True):
"""Copy the state of a given instance into a corresponding instance
within this :class:`_asyncio.AsyncSession`.
"""
return await greenlet_spawn(
self.sync_session.merge, instance, load=load
)
async def flush(self, objects=None):
"""Flush all the object changes to the database.
.. seealso::
:meth:`_orm.Session.flush`
"""
await greenlet_spawn(self.sync_session.flush, objects=objects)
async def connection(self):
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to this
:class:`.Session` object's transactional state.
"""
# POSSIBLY TODO: here, we see that the sync engine / connection
# that are generated from AsyncEngine / AsyncConnection don't
# provide any backlink from those sync objects back out to the
# async ones. it's not *too* big a deal since AsyncEngine/Connection
# are just proxies and all the state is actually in the sync
# version of things. However! it has to stay that way :)
sync_connection = await greenlet_spawn(self.sync_session.connection)
return engine.AsyncConnection(
engine.AsyncEngine(sync_connection.engine), sync_connection
)
def begin(self, **kw):
"""Return an :class:`_asyncio.AsyncSessionTransaction` object.
The underlying :class:`_orm.Session` will perform the
"begin" action when the :class:`_asyncio.AsyncSessionTransaction`
object is entered::
async with async_session.begin():
# .. ORM transaction is begun
Note that database IO will not normally occur when the session-level
transaction is begun, as database transactions begin on an
on-demand basis. However, the begin block is async to accommodate
for a :meth:`_orm.SessionEvents.after_transaction_create`
event hook that may perform IO.
For a general description of ORM begin, see
:meth:`_orm.Session.begin`.
"""
return AsyncSessionTransaction(self)
def begin_nested(self, **kw):
"""Return an :class:`_asyncio.AsyncSessionTransaction` object
which will begin a "nested" transaction, e.g. SAVEPOINT.
Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
For a general description of ORM begin nested, see
:meth:`_orm.Session.begin_nested`.
"""
return AsyncSessionTransaction(self, nested=True)
async def rollback(self):
"""Rollback the current transaction in progress."""
return await greenlet_spawn(self.sync_session.rollback)
async def commit(self):
"""Commit the current transaction in progress."""
return await greenlet_spawn(self.sync_session.commit)
async def close(self):
"""Close this :class:`_asyncio.AsyncSession`."""
return await greenlet_spawn(self.sync_session.close)
@classmethod
async def close_all(self):
"""Close all :class:`_asyncio.AsyncSession` sessions."""
return await greenlet_spawn(self.sync_session.close_all)
async def __aenter__(self):
return self
async def __aexit__(self, type_, value, traceback):
await self.close()
def _maker_context_manager(self):
# no @contextlib.asynccontextmanager until python3.7, gr
return _AsyncSessionContextManager(self)
class _AsyncSessionContextManager:
def __init__(self, async_session):
self.async_session = async_session
async def __aenter__(self):
self.trans = self.async_session.begin()
await self.trans.__aenter__()
return self.async_session
async def __aexit__(self, type_, value, traceback):
await self.trans.__aexit__(type_, value, traceback)
await self.async_session.__aexit__(type_, value, traceback)
class AsyncSessionTransaction(StartableContext):
"""A wrapper for the ORM :class:`_orm.SessionTransaction` object.
This object is provided so that a transaction-holding object
for the :meth:`_asyncio.AsyncSession.begin` may be returned.
The object supports both explicit calls to
:meth:`_asyncio.AsyncSessionTransaction.commit` and
:meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
async context manager.
.. versionadded:: 1.4
"""
__slots__ = ("session", "sync_transaction", "nested")
def __init__(self, session, nested=False):
self.session = session
self.nested = nested
self.sync_transaction = None
@property
def is_active(self):
return (
self._sync_transaction() is not None
and self._sync_transaction().is_active
)
def _sync_transaction(self):
if not self.sync_transaction:
self._raise_for_not_started()
return self.sync_transaction
async def rollback(self):
"""Roll back this :class:`_asyncio.AsyncTransaction`."""
await greenlet_spawn(self._sync_transaction().rollback)
async def commit(self):
"""Commit this :class:`_asyncio.AsyncTransaction`."""
await greenlet_spawn(self._sync_transaction().commit)
async def start(self, is_ctxmanager=False):
self.sync_transaction = await greenlet_spawn(
self.session.sync_session.begin_nested
if self.nested
else self.session.sync_session.begin
)
if is_ctxmanager:
self.sync_transaction.__enter__()
return self
async def __aexit__(self, type_, value, traceback):
await greenlet_spawn(
self._sync_transaction().__exit__, type_, value, traceback
)
|