From 87d6efbda4c1b5c5a0d502bdd37bf72189054892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 3 Sep 2022 21:01:10 +0300 Subject: Improved the "separate worker/scheduler" examples Added async versions and alternate event broker options. --- examples/separate_worker/async_scheduler.py | 47 +++++++++++++++++++++++++++++ examples/separate_worker/async_worker.py | 42 ++++++++++++++++++++++++++ examples/separate_worker/sync_scheduler.py | 8 +++++ examples/separate_worker/sync_worker.py | 8 +++++ 4 files changed, 105 insertions(+) create mode 100644 examples/separate_worker/async_scheduler.py create mode 100644 examples/separate_worker/async_worker.py diff --git a/examples/separate_worker/async_scheduler.py b/examples/separate_worker/async_scheduler.py new file mode 100644 index 0000000..27eb37a --- /dev/null +++ b/examples/separate_worker/async_scheduler.py @@ -0,0 +1,47 @@ +""" +Example demonstrating the separation of scheduler and worker. +This script runs the scheduler part. You need to be running both this and the worker +script simultaneously in order for the scheduled task to be run. + +Requires the "postgresql" service to be running. +To install prerequisites: pip install sqlalchemy asyncpg +To run: python async_scheduler.py + +When run together with async_worker.py, it should print a line on the console +on a one-second interval. +""" + +from __future__ import annotations + +import asyncio +import logging + +from example_tasks import tick +from sqlalchemy.ext.asyncio import create_async_engine + +from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore +from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker +from apscheduler.schedulers.async_ import AsyncScheduler +from apscheduler.triggers.interval import IntervalTrigger + + +async def main(): + engine = create_async_engine( + "postgresql+asyncpg://postgres:secret@localhost/testdb" + ) + data_store = AsyncSQLAlchemyDataStore(engine) + event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) + + # Uncomment the next two lines to use the Redis event broker instead + # from apscheduler.eventbrokers.async_redis import AsyncRedisEventBroker + # event_broker = AsyncRedisEventBroker.from_url("redis://localhost") + + async with AsyncScheduler( + data_store, event_broker, start_worker=False + ) as scheduler: + await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") + await scheduler.wait_until_stopped() + + +logging.basicConfig(level=logging.INFO) +asyncio.run(main()) diff --git a/examples/separate_worker/async_worker.py b/examples/separate_worker/async_worker.py new file mode 100644 index 0000000..700720e --- /dev/null +++ b/examples/separate_worker/async_worker.py @@ -0,0 +1,42 @@ +""" +Example demonstrating the separation of scheduler and worker. +This script runs the scheduler part. You need to be running both this and the worker +script simultaneously in order for the scheduled task to be run. + +Requires the "postgresql" service to be running. +To install prerequisites: pip install sqlalchemy asyncpg +To run: python async_worker.py + +When run together with async_scheduler.py, it should print a line on the console +on a one-second interval. +""" + +from __future__ import annotations + +import asyncio +import logging + +from sqlalchemy.ext.asyncio import create_async_engine + +from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore +from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker +from apscheduler.workers.async_ import AsyncWorker + + +async def main(): + engine = create_async_engine( + "postgresql+asyncpg://postgres:secret@localhost/testdb" + ) + data_store = AsyncSQLAlchemyDataStore(engine) + event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) + + # Uncomment the next two lines to use the Redis event broker instead + # from apscheduler.eventbrokers.async_redis import AsyncRedisEventBroker + # event_broker = AsyncRedisEventBroker.from_url("redis://localhost") + + worker = AsyncWorker(data_store, event_broker) + await worker.run_until_stopped() + + +logging.basicConfig(level=logging.INFO) +asyncio.run(main()) diff --git a/examples/separate_worker/sync_scheduler.py b/examples/separate_worker/sync_scheduler.py index aeba93c..9c40032 100644 --- a/examples/separate_worker/sync_scheduler.py +++ b/examples/separate_worker/sync_scheduler.py @@ -13,6 +13,8 @@ on a one-second interval. from __future__ import annotations +import logging + from example_tasks import tick from sqlalchemy.future import create_engine @@ -21,9 +23,15 @@ from apscheduler.eventbrokers.redis import RedisEventBroker from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.interval import IntervalTrigger +logging.basicConfig(level=logging.INFO) engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") data_store = SQLAlchemyDataStore(engine) event_broker = RedisEventBroker.from_url("redis://localhost") + +# Uncomment the next two lines to use the MQTT event broker instead +# from apscheduler.eventbrokers.mqtt import MQTTEventBroker +# event_broker = MQTTEventBroker() + with Scheduler(data_store, event_broker, start_worker=False) as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") scheduler.wait_until_stopped() diff --git a/examples/separate_worker/sync_worker.py b/examples/separate_worker/sync_worker.py index 30bdb78..e57be64 100644 --- a/examples/separate_worker/sync_worker.py +++ b/examples/separate_worker/sync_worker.py @@ -13,14 +13,22 @@ console on a one-second interval. from __future__ import annotations +import logging + from sqlalchemy.future import create_engine from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from apscheduler.eventbrokers.redis import RedisEventBroker from apscheduler.workers.sync import Worker +logging.basicConfig(level=logging.INFO) engine = create_engine("postgresql+psycopg2://postgres:secret@localhost/testdb") data_store = SQLAlchemyDataStore(engine) event_broker = RedisEventBroker.from_url("redis://localhost") + +# Uncomment the next two lines to use the MQTT event broker instead +# from apscheduler.eventbrokers.mqtt import MQTTEventBroker +# event_broker = MQTTEventBroker() + worker = Worker(data_store, event_broker) worker.run_until_stopped() -- cgit v1.2.1