summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/eventbrokers/redis.py')
-rw-r--r--src/apscheduler/eventbrokers/redis.py79
1 files changed, 35 insertions, 44 deletions
diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py
index 6683276..10d2343 100644
--- a/src/apscheduler/eventbrokers/redis.py
+++ b/src/apscheduler/eventbrokers/redis.py
@@ -1,22 +1,22 @@
from __future__ import annotations
-from threading import Thread
+from asyncio import CancelledError
+from contextlib import AsyncExitStack
+import anyio
import attrs
import tenacity
-from redis import ConnectionError, ConnectionPool, Redis, RedisCluster
-from redis.client import PubSub
+from redis import ConnectionError
+from redis.asyncio import Redis, RedisCluster
+from redis.asyncio.client import PubSub
+from redis.asyncio.connection import ConnectionPool
-from .. import RetrySettings
from .._events import Event
-from ..abc import Serializer
-from ..serializers.json import JSONSerializer
-from .base import DistributedEventBrokerMixin
-from .local import LocalEventBroker
+from .base import BaseExternalEventBroker
@attrs.define(eq=False)
-class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
+class RedisEventBroker(BaseExternalEventBroker):
"""
An event broker that uses a Redis server to broadcast events.
@@ -24,8 +24,7 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
.. _redis: https://pypi.org/project/redis/
- :param client: a (synchronous) Redis client
- :param serializer: the serializer used to (de)serialize events for transport
+ :param client: an asynchronous Redis client
:param channel: channel on which to send the messages
:param stop_check_interval: interval (in seconds) on which the channel listener
should check if it should stop (higher values mean slower reaction time but less
@@ -33,12 +32,9 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
"""
client: Redis | RedisCluster
- serializer: Serializer = attrs.field(factory=JSONSerializer)
channel: str = attrs.field(kw_only=True, default="apscheduler")
stop_check_interval: float = attrs.field(kw_only=True, default=1)
- retry_settings: RetrySettings = attrs.field(default=RetrySettings())
_stopped: bool = attrs.field(init=False, default=True)
- _thread: Thread = attrs.field(init=False)
@classmethod
def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
@@ -54,7 +50,7 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
client = Redis(connection_pool=pool)
return cls(client, **kwargs)
- def _retry(self) -> tenacity.Retrying:
+ def _retry(self) -> tenacity.AsyncRetrying:
def after_attempt(retry_state: tenacity.RetryCallState) -> None:
self._logger.warning(
f"{self.__class__.__name__}: connection failure "
@@ -62,40 +58,32 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
f"{retry_state.outcome.exception()}",
)
- return tenacity.Retrying(
+ return tenacity.AsyncRetrying(
stop=self.retry_settings.stop,
wait=self.retry_settings.wait,
retry=tenacity.retry_if_exception_type(ConnectionError),
after=after_attempt,
+ sleep=anyio.sleep,
reraise=True,
)
- def start(self) -> None:
+ async def start(self, exit_stack: AsyncExitStack) -> None:
+ await super().start(exit_stack)
pubsub = self.client.pubsub()
- pubsub.subscribe(self.channel)
+ await pubsub.subscribe(self.channel)
+
self._stopped = False
- super().start()
- self._thread = Thread(
- target=self._listen_messages,
- args=[pubsub],
- daemon=True,
- name="Redis subscriber",
+ exit_stack.callback(setattr, self, "_stopped", True)
+ self._task_group.start_soon(
+ self._listen_messages, pubsub, name="Redis subscriber"
)
- self._thread.start()
-
- def stop(self, *, force: bool = False) -> None:
- self._stopped = True
- if not force:
- self._thread.join(5)
- super().stop(force=force)
-
- def _listen_messages(self, pubsub: PubSub) -> None:
+ async def _listen_messages(self, pubsub: PubSub) -> None:
while not self._stopped:
try:
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
- msg = pubsub.get_message(
+ msg = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=self.stop_check_interval,
)
@@ -103,16 +91,19 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
if msg and isinstance(msg["data"], bytes):
event = self.reconstitute_event(msg["data"])
if event is not None:
- self.publish_local(event)
- except Exception:
- self._logger.exception(f"{self.__class__.__name__} listener crashed")
- pubsub.close()
+ await self.publish_local(event)
+ except Exception as exc:
+ # CancelledError is a subclass of Exception in Python 3.7
+ if not isinstance(exc, CancelledError):
+ self._logger.exception(
+ f"{self.__class__.__name__} listener crashed"
+ )
+
+ await pubsub.close()
raise
- pubsub.close()
-
- def publish(self, event: Event) -> None:
+ async def publish(self, event: Event) -> None:
notification = self.generate_notification(event)
- for attempt in self._retry():
+ async for attempt in self._retry():
with attempt:
- self.client.publish(self.channel, notification)
+ await self.client.publish(self.channel, notification)