diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-05-02 10:26:29 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 21:34:40 +0300 |
commit | 38e257e4b30f6affadebf60b6c7a0ee0282d9fe3 (patch) | |
tree | 63006682500e4bf169ea810a7209bbd0bfc3dea6 /src | |
parent | f975db3e5e5883d4d15a4513dd3d8d725c4e8428 (diff) | |
download | apscheduler-38e257e4b30f6affadebf60b6c7a0ee0282d9fe3.tar.gz |
Added async Redis event broker and made the sync one resistant to connectivity failures
Diffstat (limited to 'src')
-rw-r--r-- | src/apscheduler/eventbrokers/async_redis.py | 117 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/redis.py | 87 |
2 files changed, 169 insertions, 35 deletions
diff --git a/src/apscheduler/eventbrokers/async_redis.py b/src/apscheduler/eventbrokers/async_redis.py new file mode 100644 index 0000000..faa5438 --- /dev/null +++ b/src/apscheduler/eventbrokers/async_redis.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import anyio +import attrs +import tenacity +from redis import ConnectionError +from redis.asyncio import Redis, RedisCluster +from redis.asyncio.client import PubSub +from redis.asyncio.connection import ConnectionPool + +from .. import RetrySettings +from .._events import Event +from ..abc import Serializer +from ..serializers.json import JSONSerializer +from .async_local import LocalAsyncEventBroker +from .base import DistributedEventBrokerMixin + + +@attrs.define(eq=False) +class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): + """ + An event broker that uses a Redis server to broadcast events. + + Requires the redis_ library to be installed. + + .. _redis: https://pypi.org/project/redis/ + + :param client: an asynchronous Redis client + :param serializer: the serializer used to (de)serialize events for transport + :param channel: channel on which to send the messages + :param retry_settings: Tenacity settings for retrying operations in case of a + broker connectivity problem + :param stop_check_interval: interval on which the channel listener should check if + it + values mean slower reaction time but less CPU use) + """ + + client: Redis | RedisCluster + serializer: Serializer = attrs.field(factory=JSONSerializer) + channel: str = attrs.field(kw_only=True, default="apscheduler") + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) + stop_check_interval: float = attrs.field(kw_only=True, default=1) + _stopped: bool = attrs.field(init=False, default=True) + + @classmethod + def from_url(cls, url: str, **kwargs) -> AsyncRedisEventBroker: + """ + Create a new event broker from a URL. + + :param url: a Redis URL (```redis://...```) + :param kwargs: keyword arguments to pass to the initializer of this class + :return: the newly created event broker + + """ + pool = ConnectionPool.from_url(url) + client = Redis(connection_pool=pool) + return cls(client, **kwargs) + + def _retry(self) -> tenacity.AsyncRetrying: + def after_attempt(retry_state: tenacity.RetryCallState) -> None: + self._logger.warning( + f"{self.__class__.__name__}: connection failure " + f"(attempt {retry_state.attempt_number}): " + f"{retry_state.outcome.exception()}", + ) + + return tenacity.AsyncRetrying( + stop=self.retry_settings.stop, + wait=self.retry_settings.wait, + retry=tenacity.retry_if_exception_type(ConnectionError), + after=after_attempt, + sleep=anyio.sleep, + reraise=True, + ) + + async def start(self) -> None: + await super().start() + pubsub = self.client.pubsub() + try: + await pubsub.subscribe(self.channel) + except Exception: + await self.stop(force=True) + raise + + self._stopped = False + self._task_group.start_soon( + self._listen_messages, pubsub, name="Redis subscriber" + ) + + async def stop(self, *, force: bool = False) -> None: + self._stopped = True + await super().stop(force=force) + + async def _listen_messages(self, pubsub: PubSub) -> None: + while not self._stopped: + try: + async for attempt in self._retry(): + with attempt: + msg = await pubsub.get_message( + ignore_subscribe_messages=True, + timeout=self.stop_check_interval, + ) + + if msg and isinstance(msg["data"], bytes): + event = self.reconstitute_event(msg["data"]) + if event is not None: + await self.publish_local(event) + except Exception: + self._logger.exception(f"{self.__class__.__name__} listener crashed") + await pubsub.close() + raise + + async def publish(self, event: Event) -> None: + notification = self.generate_notification(event) + async for attempt in self._retry(): + with attempt: + await self.client.publish(self.channel, notification) diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index 0b7593f..8f6c2c3 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -1,11 +1,13 @@ from __future__ import annotations -from concurrent.futures import Future from threading import Thread import attrs -from redis import ConnectionPool, Redis +import tenacity +from redis import ConnectionError, ConnectionPool, Redis, RedisCluster +from redis.client import PubSub +from .. import RetrySettings from .._events import Event from ..abc import Serializer from ..serializers.json import JSONSerializer @@ -25,16 +27,16 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): :param client: a (synchronous) Redis client :param serializer: the serializer used to (de)serialize events for transport :param channel: channel on which to send the messages - :param message_poll_interval: interval on which to poll for new messages (higher + :param stop_check_interval: interval on which to poll for new messages (higher values mean slower reaction time but less CPU use) """ - client: Redis + client: Redis | RedisCluster serializer: Serializer = attrs.field(factory=JSONSerializer) channel: str = attrs.field(kw_only=True, default="apscheduler") - message_poll_interval: float = attrs.field(kw_only=True, default=0.05) + stop_check_interval: float = attrs.field(kw_only=True, default=1) + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) _stopped: bool = attrs.field(init=False, default=True) - _ready_future: Future[None] = attrs.field(init=False) _thread: Thread = attrs.field(init=False) @classmethod @@ -47,19 +49,38 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): :return: the newly created event broker """ - pool = ConnectionPool.from_url(url, **kwargs) + pool = ConnectionPool.from_url(url) client = Redis(connection_pool=pool) - return cls(client) + return cls(client, **kwargs) + + def _retry(self) -> tenacity.Retrying: + def after_attempt(retry_state: tenacity.RetryCallState) -> None: + self._logger.warning( + f"{self.__class__.__name__}: connection failure " + f"(attempt {retry_state.attempt_number}): " + f"{retry_state.outcome.exception()}", + ) + + return tenacity.Retrying( + stop=self.retry_settings.stop, + wait=self.retry_settings.wait, + retry=tenacity.retry_if_exception_type(ConnectionError), + after=after_attempt, + reraise=True, + ) def start(self) -> None: + pubsub = self.client.pubsub() + pubsub.subscribe(self.channel) self._stopped = False - self._ready_future = Future() + super().start() self._thread = Thread( - target=self._listen_messages, daemon=True, name="Redis subscriber" + target=self._listen_messages, + args=[pubsub], + daemon=True, + name="Redis subscriber", ) self._thread.start() - self._ready_future.result(10) - super().start() def stop(self, *, force: bool = False) -> None: self._stopped = True @@ -68,33 +89,29 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): super().stop(force=force) - def _listen_messages(self) -> None: + def _listen_messages(self, pubsub: PubSub) -> None: while not self._stopped: try: - pubsub = self.client.pubsub() - pubsub.subscribe(self.channel) - except BaseException as exc: - if not self._ready_future.done(): - self._ready_future.set_exception(exc) - + for attempt in self._retry(): + with attempt: + msg = pubsub.get_message( + ignore_subscribe_messages=True, + timeout=self.stop_check_interval, + ) + + if msg and isinstance(msg["data"], bytes): + event = self.reconstitute_event(msg["data"]) + if event is not None: + self.publish_local(event) + except Exception: + self._logger.exception(f"{self.__class__.__name__} listener crashed") + pubsub.close() raise - else: - if not self._ready_future.done(): - self._ready_future.set_result(None) - try: - while not self._stopped: - msg = pubsub.get_message(timeout=self.message_poll_interval) - if msg and isinstance(msg["data"], bytes): - event = self.reconstitute_event(msg["data"]) - if event is not None: - self.publish_local(event) - except BaseException: - self._logger.exception("Subscriber crashed") - raise - finally: - pubsub.close() + pubsub.close() def publish(self, event: Event) -> None: notification = self.generate_notification(event) - self.client.publish(self.channel, notification) + for attempt in self._retry(): + with attempt: + self.client.publish(self.channel, notification) |