summaryrefslogtreecommitdiff
path: root/examples/web/asgi_noframework.py
blob: 4b57c308442005d9e05442ec07c248762bc20a0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Example demonstrating use with ASGI (raw ASGI application, no framework).

Requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asyncpg uvicorn
To run: uvicorn asgi_noframework:app

It should print a line on the console on a one-second interval while running a
basic web app at http://localhost:8000.
"""

from __future__ import annotations

from datetime import datetime

from sqlalchemy.ext.asyncio import create_async_engine

from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger


def tick():
    print("Hello, the time is", datetime.now())


async def original_app(scope, receive, send):
    """Trivial example of an ASGI application."""
    if scope["type"] == "http":
        await receive()
        await send(
            {
                "type": "http.response.start",
                "status": 200,
                "headers": [
                    [b"content-type", b"text/plain"],
                ],
            }
        )
        await send(
            {
                "type": "http.response.body",
                "body": b"Hello, world!",
                "more_body": False,
            }
        )
    elif scope["type"] == "lifespan":
        while True:
            message = await receive()
            if message["type"] == "lifespan.startup":
                await send({"type": "lifespan.startup.complete"})
            elif message["type"] == "lifespan.shutdown":
                await send({"type": "lifespan.shutdown.complete"})
                return


async def scheduler_middleware(scope, receive, send):
    if scope["type"] == "lifespan":
        engine = create_async_engine(
            "postgresql+asyncpg://postgres:secret@localhost/testdb"
        )
        data_store = AsyncSQLAlchemyDataStore(engine)
        event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
        async with AsyncScheduler(data_store, event_broker) as scheduler:
            await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
            await scheduler.start_in_background()
            await original_app(scope, receive, send)
    else:
        await original_app(scope, receive, send)


# This is just for consistency with the other ASGI examples
app = scheduler_middleware