diff options
Diffstat (limited to 'src/apscheduler/eventbrokers/redis.py')
-rw-r--r-- | src/apscheduler/eventbrokers/redis.py | 79 |
1 files changed, 35 insertions, 44 deletions
diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index 6683276..10d2343 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -1,22 +1,22 @@ from __future__ import annotations -from threading import Thread +from asyncio import CancelledError +from contextlib import AsyncExitStack +import anyio import attrs import tenacity -from redis import ConnectionError, ConnectionPool, Redis, RedisCluster -from redis.client import PubSub +from redis import ConnectionError +from redis.asyncio import Redis, RedisCluster +from redis.asyncio.client import PubSub +from redis.asyncio.connection import ConnectionPool -from .. import RetrySettings from .._events import Event -from ..abc import Serializer -from ..serializers.json import JSONSerializer -from .base import DistributedEventBrokerMixin -from .local import LocalEventBroker +from .base import BaseExternalEventBroker @attrs.define(eq=False) -class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): +class RedisEventBroker(BaseExternalEventBroker): """ An event broker that uses a Redis server to broadcast events. @@ -24,8 +24,7 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): .. _redis: https://pypi.org/project/redis/ - :param client: a (synchronous) Redis client - :param serializer: the serializer used to (de)serialize events for transport + :param client: an asynchronous Redis client :param channel: channel on which to send the messages :param stop_check_interval: interval (in seconds) on which the channel listener should check if it should stop (higher values mean slower reaction time but less @@ -33,12 +32,9 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): """ client: Redis | RedisCluster - serializer: Serializer = attrs.field(factory=JSONSerializer) channel: str = attrs.field(kw_only=True, default="apscheduler") stop_check_interval: float = attrs.field(kw_only=True, default=1) - retry_settings: RetrySettings = attrs.field(default=RetrySettings()) _stopped: bool = attrs.field(init=False, default=True) - _thread: Thread = attrs.field(init=False) @classmethod def from_url(cls, url: str, **kwargs) -> RedisEventBroker: @@ -54,7 +50,7 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): client = Redis(connection_pool=pool) return cls(client, **kwargs) - def _retry(self) -> tenacity.Retrying: + def _retry(self) -> tenacity.AsyncRetrying: def after_attempt(retry_state: tenacity.RetryCallState) -> None: self._logger.warning( f"{self.__class__.__name__}: connection failure " @@ -62,40 +58,32 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): f"{retry_state.outcome.exception()}", ) - return tenacity.Retrying( + return tenacity.AsyncRetrying( stop=self.retry_settings.stop, wait=self.retry_settings.wait, retry=tenacity.retry_if_exception_type(ConnectionError), after=after_attempt, + sleep=anyio.sleep, reraise=True, ) - def start(self) -> None: + async def start(self, exit_stack: AsyncExitStack) -> None: + await super().start(exit_stack) pubsub = self.client.pubsub() - pubsub.subscribe(self.channel) + await pubsub.subscribe(self.channel) + self._stopped = False - super().start() - self._thread = Thread( - target=self._listen_messages, - args=[pubsub], - daemon=True, - name="Redis subscriber", + exit_stack.callback(setattr, self, "_stopped", True) + self._task_group.start_soon( + self._listen_messages, pubsub, name="Redis subscriber" ) - self._thread.start() - - def stop(self, *, force: bool = False) -> None: - self._stopped = True - if not force: - self._thread.join(5) - super().stop(force=force) - - def _listen_messages(self, pubsub: PubSub) -> None: + async def _listen_messages(self, pubsub: PubSub) -> None: while not self._stopped: try: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: - msg = pubsub.get_message( + msg = await pubsub.get_message( ignore_subscribe_messages=True, timeout=self.stop_check_interval, ) @@ -103,16 +91,19 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): if msg and isinstance(msg["data"], bytes): event = self.reconstitute_event(msg["data"]) if event is not None: - self.publish_local(event) - except Exception: - self._logger.exception(f"{self.__class__.__name__} listener crashed") - pubsub.close() + await self.publish_local(event) + except Exception as exc: + # CancelledError is a subclass of Exception in Python 3.7 + if not isinstance(exc, CancelledError): + self._logger.exception( + f"{self.__class__.__name__} listener crashed" + ) + + await pubsub.close() raise - pubsub.close() - - def publish(self, event: Event) -> None: + async def publish(self, event: Event) -> None: notification = self.generate_notification(event) - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: - self.client.publish(self.channel, notification) + await self.client.publish(self.channel, notification) |