diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-05-02 10:26:29 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 21:34:40 +0300 |
commit | 38e257e4b30f6affadebf60b6c7a0ee0282d9fe3 (patch) | |
tree | 63006682500e4bf169ea810a7209bbd0bfc3dea6 | |
parent | f975db3e5e5883d4d15a4513dd3d8d725c4e8428 (diff) | |
download | apscheduler-38e257e4b30f6affadebf60b6c7a0ee0282d9fe3.tar.gz |
Added async Redis event broker and made the sync one resistant to connectivity failures
-rw-r--r-- | docs/versionhistory.rst | 5 | ||||
-rw-r--r-- | pyproject.toml | 2 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/async_redis.py | 117 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/redis.py | 87 | ||||
-rw-r--r-- | tests/test_eventbrokers.py | 20 |
5 files changed, 193 insertions, 38 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 308b5be..7c6e65c 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -4,6 +4,11 @@ Version history To find out how to migrate your application from a previous version of APScheduler, see the :doc:`migration section <migration>`. +**UNRELEASED** + +- Added an async Redis event broker +- Added automatic reconnection to the Redis event brokers (sync and async) + **4.0.0a1** This was a major rewrite/redesign of most parts of the project. See the diff --git a/pyproject.toml b/pyproject.toml index c89d6c1..e9eb5fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ test = [ "pytest-freezegun", "pytest-lazy-fixture", "pytest-mock", - "redis[hiredis] >= 4.0", + "redis[hiredis] >= 4.4.0rc1", "sqlalchemy >= 1.4.22", "trio", ] diff --git a/src/apscheduler/eventbrokers/async_redis.py b/src/apscheduler/eventbrokers/async_redis.py new file mode 100644 index 0000000..faa5438 --- /dev/null +++ b/src/apscheduler/eventbrokers/async_redis.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import anyio +import attrs +import tenacity +from redis import ConnectionError +from redis.asyncio import Redis, RedisCluster +from redis.asyncio.client import PubSub +from redis.asyncio.connection import ConnectionPool + +from .. import RetrySettings +from .._events import Event +from ..abc import Serializer +from ..serializers.json import JSONSerializer +from .async_local import LocalAsyncEventBroker +from .base import DistributedEventBrokerMixin + + +@attrs.define(eq=False) +class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): + """ + An event broker that uses a Redis server to broadcast events. + + Requires the redis_ library to be installed. + + .. _redis: https://pypi.org/project/redis/ + + :param client: an asynchronous Redis client + :param serializer: the serializer used to (de)serialize events for transport + :param channel: channel on which to send the messages + :param retry_settings: Tenacity settings for retrying operations in case of a + broker connectivity problem + :param stop_check_interval: interval on which the channel listener should check if + it + values mean slower reaction time but less CPU use) + """ + + client: Redis | RedisCluster + serializer: Serializer = attrs.field(factory=JSONSerializer) + channel: str = attrs.field(kw_only=True, default="apscheduler") + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) + stop_check_interval: float = attrs.field(kw_only=True, default=1) + _stopped: bool = attrs.field(init=False, default=True) + + @classmethod + def from_url(cls, url: str, **kwargs) -> AsyncRedisEventBroker: + """ + Create a new event broker from a URL. + + :param url: a Redis URL (```redis://...```) + :param kwargs: keyword arguments to pass to the initializer of this class + :return: the newly created event broker + + """ + pool = ConnectionPool.from_url(url) + client = Redis(connection_pool=pool) + return cls(client, **kwargs) + + def _retry(self) -> tenacity.AsyncRetrying: + def after_attempt(retry_state: tenacity.RetryCallState) -> None: + self._logger.warning( + f"{self.__class__.__name__}: connection failure " + f"(attempt {retry_state.attempt_number}): " + f"{retry_state.outcome.exception()}", + ) + + return tenacity.AsyncRetrying( + stop=self.retry_settings.stop, + wait=self.retry_settings.wait, + retry=tenacity.retry_if_exception_type(ConnectionError), + after=after_attempt, + sleep=anyio.sleep, + reraise=True, + ) + + async def start(self) -> None: + await super().start() + pubsub = self.client.pubsub() + try: + await pubsub.subscribe(self.channel) + except Exception: + await self.stop(force=True) + raise + + self._stopped = False + self._task_group.start_soon( + self._listen_messages, pubsub, name="Redis subscriber" + ) + + async def stop(self, *, force: bool = False) -> None: + self._stopped = True + await super().stop(force=force) + + async def _listen_messages(self, pubsub: PubSub) -> None: + while not self._stopped: + try: + async for attempt in self._retry(): + with attempt: + msg = await pubsub.get_message( + ignore_subscribe_messages=True, + timeout=self.stop_check_interval, + ) + + if msg and isinstance(msg["data"], bytes): + event = self.reconstitute_event(msg["data"]) + if event is not None: + await self.publish_local(event) + except Exception: + self._logger.exception(f"{self.__class__.__name__} listener crashed") + await pubsub.close() + raise + + async def publish(self, event: Event) -> None: + notification = self.generate_notification(event) + async for attempt in self._retry(): + with attempt: + await self.client.publish(self.channel, notification) diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index 0b7593f..8f6c2c3 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -1,11 +1,13 @@ from __future__ import annotations -from concurrent.futures import Future from threading import Thread import attrs -from redis import ConnectionPool, Redis +import tenacity +from redis import ConnectionError, ConnectionPool, Redis, RedisCluster +from redis.client import PubSub +from .. import RetrySettings from .._events import Event from ..abc import Serializer from ..serializers.json import JSONSerializer @@ -25,16 +27,16 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): :param client: a (synchronous) Redis client :param serializer: the serializer used to (de)serialize events for transport :param channel: channel on which to send the messages - :param message_poll_interval: interval on which to poll for new messages (higher + :param stop_check_interval: interval on which to poll for new messages (higher values mean slower reaction time but less CPU use) """ - client: Redis + client: Redis | RedisCluster serializer: Serializer = attrs.field(factory=JSONSerializer) channel: str = attrs.field(kw_only=True, default="apscheduler") - message_poll_interval: float = attrs.field(kw_only=True, default=0.05) + stop_check_interval: float = attrs.field(kw_only=True, default=1) + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) _stopped: bool = attrs.field(init=False, default=True) - _ready_future: Future[None] = attrs.field(init=False) _thread: Thread = attrs.field(init=False) @classmethod @@ -47,19 +49,38 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): :return: the newly created event broker """ - pool = ConnectionPool.from_url(url, **kwargs) + pool = ConnectionPool.from_url(url) client = Redis(connection_pool=pool) - return cls(client) + return cls(client, **kwargs) + + def _retry(self) -> tenacity.Retrying: + def after_attempt(retry_state: tenacity.RetryCallState) -> None: + self._logger.warning( + f"{self.__class__.__name__}: connection failure " + f"(attempt {retry_state.attempt_number}): " + f"{retry_state.outcome.exception()}", + ) + + return tenacity.Retrying( + stop=self.retry_settings.stop, + wait=self.retry_settings.wait, + retry=tenacity.retry_if_exception_type(ConnectionError), + after=after_attempt, + reraise=True, + ) def start(self) -> None: + pubsub = self.client.pubsub() + pubsub.subscribe(self.channel) self._stopped = False - self._ready_future = Future() + super().start() self._thread = Thread( - target=self._listen_messages, daemon=True, name="Redis subscriber" + target=self._listen_messages, + args=[pubsub], + daemon=True, + name="Redis subscriber", ) self._thread.start() - self._ready_future.result(10) - super().start() def stop(self, *, force: bool = False) -> None: self._stopped = True @@ -68,33 +89,29 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): super().stop(force=force) - def _listen_messages(self) -> None: + def _listen_messages(self, pubsub: PubSub) -> None: while not self._stopped: try: - pubsub = self.client.pubsub() - pubsub.subscribe(self.channel) - except BaseException as exc: - if not self._ready_future.done(): - self._ready_future.set_exception(exc) - + for attempt in self._retry(): + with attempt: + msg = pubsub.get_message( + ignore_subscribe_messages=True, + timeout=self.stop_check_interval, + ) + + if msg and isinstance(msg["data"], bytes): + event = self.reconstitute_event(msg["data"]) + if event is not None: + self.publish_local(event) + except Exception: + self._logger.exception(f"{self.__class__.__name__} listener crashed") + pubsub.close() raise - else: - if not self._ready_future.done(): - self._ready_future.set_result(None) - try: - while not self._stopped: - msg = pubsub.get_message(timeout=self.message_poll_interval) - if msg and isinstance(msg["data"], bytes): - event = self.reconstitute_event(msg["data"]) - if event is not None: - self.publish_local(event) - except BaseException: - self._logger.exception("Subscriber crashed") - raise - finally: - pubsub.close() + pubsub.close() def publish(self, event: Event) -> None: notification = self.generate_notification(event) - self.client.publish(self.channel, notification) + for attempt in self._retry(): + with attempt: + self.client.publish(self.channel, notification) diff --git a/tests/test_eventbrokers.py b/tests/test_eventbrokers.py index d2974c0..451b848 100644 --- a/tests/test_eventbrokers.py +++ b/tests/test_eventbrokers.py @@ -34,8 +34,19 @@ def local_async_broker() -> AsyncEventBroker: def redis_broker(serializer: Serializer) -> EventBroker: from apscheduler.eventbrokers.redis import RedisEventBroker - broker = RedisEventBroker.from_url("redis://localhost:6379") - broker.serializer = serializer + broker = RedisEventBroker.from_url( + "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05 + ) + return broker + + +@pytest.fixture +async def async_redis_broker(serializer: Serializer) -> AsyncEventBroker: + from apscheduler.eventbrokers.async_redis import AsyncRedisEventBroker + + broker = AsyncRedisEventBroker.from_url( + "redis://localhost:6379", serializer=serializer, stop_check_interval=0.05 + ) return broker @@ -87,6 +98,11 @@ def broker(request: SubRequest) -> Generator[EventBroker, Any, None]: id="asyncpg", marks=[pytest.mark.external_service], ), + pytest.param( + lazy_fixture("async_redis_broker"), + id="async_redis", + marks=[pytest.mark.external_service], + ), ] ) async def raw_async_broker(request: SubRequest) -> AsyncEventBroker: |