1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
"""
Example demonstrating use with ASGI (raw ASGI application, no framework).
Requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asyncpg uvicorn
To run: uvicorn asgi_noframework:app
It should print a line on the console on a one-second interval while running a
basic web app at http://localhost:8000.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy.ext.asyncio import create_async_engine
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
def tick():
print("Hello, the time is", datetime.now())
async def original_app(scope, receive, send):
"""Trivial example of an ASGI application."""
if scope["type"] == "http":
await receive()
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", b"text/plain"],
],
}
)
await send(
{
"type": "http.response.body",
"body": b"Hello, world!",
"more_body": False,
}
)
elif scope["type"] == "lifespan":
while True:
message = await receive()
if message["type"] == "lifespan.startup":
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
await send({"type": "lifespan.shutdown.complete"})
return
async def scheduler_middleware(scope, receive, send):
if scope["type"] == "lifespan":
engine = create_async_engine(
"postgresql+asyncpg://postgres:secret@localhost/testdb"
)
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
async with AsyncScheduler(data_store, event_broker) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
await scheduler.start_in_background()
await original_app(scope, receive, send)
else:
await original_app(scope, receive, send)
# This is just for consistency with the other ASGI examples
app = scheduler_middleware
|