summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-05-02 10:26:29 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-03 21:34:40 +0300
commit38e257e4b30f6affadebf60b6c7a0ee0282d9fe3 (patch)
tree63006682500e4bf169ea810a7209bbd0bfc3dea6 /src
parentf975db3e5e5883d4d15a4513dd3d8d725c4e8428 (diff)
downloadapscheduler-38e257e4b30f6affadebf60b6c7a0ee0282d9fe3.tar.gz
Added async Redis event broker and made the sync one resistant to connectivity failures
Diffstat (limited to 'src')
-rw-r--r--src/apscheduler/eventbrokers/async_redis.py117
-rw-r--r--src/apscheduler/eventbrokers/redis.py87
2 files changed, 169 insertions, 35 deletions
diff --git a/src/apscheduler/eventbrokers/async_redis.py b/src/apscheduler/eventbrokers/async_redis.py
new file mode 100644
index 0000000..faa5438
--- /dev/null
+++ b/src/apscheduler/eventbrokers/async_redis.py
@@ -0,0 +1,117 @@
+from __future__ import annotations
+
+import anyio
+import attrs
+import tenacity
+from redis import ConnectionError
+from redis.asyncio import Redis, RedisCluster
+from redis.asyncio.client import PubSub
+from redis.asyncio.connection import ConnectionPool
+
+from .. import RetrySettings
+from .._events import Event
+from ..abc import Serializer
+from ..serializers.json import JSONSerializer
+from .async_local import LocalAsyncEventBroker
+from .base import DistributedEventBrokerMixin
+
+
+@attrs.define(eq=False)
+class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
+ """
+ An event broker that uses a Redis server to broadcast events.
+
+ Requires the redis_ library to be installed.
+
+ .. _redis: https://pypi.org/project/redis/
+
+ :param client: an asynchronous Redis client
+ :param serializer: the serializer used to (de)serialize events for transport
+ :param channel: channel on which to send the messages
+ :param retry_settings: Tenacity settings for retrying operations in case of a
+ broker connectivity problem
+ :param stop_check_interval: interval on which the channel listener should check if
+ it
+ values mean slower reaction time but less CPU use)
+ """
+
+ client: Redis | RedisCluster
+ serializer: Serializer = attrs.field(factory=JSONSerializer)
+ channel: str = attrs.field(kw_only=True, default="apscheduler")
+ retry_settings: RetrySettings = attrs.field(default=RetrySettings())
+ stop_check_interval: float = attrs.field(kw_only=True, default=1)
+ _stopped: bool = attrs.field(init=False, default=True)
+
+ @classmethod
+ def from_url(cls, url: str, **kwargs) -> AsyncRedisEventBroker:
+ """
+ Create a new event broker from a URL.
+
+ :param url: a Redis URL (```redis://...```)
+ :param kwargs: keyword arguments to pass to the initializer of this class
+ :return: the newly created event broker
+
+ """
+ pool = ConnectionPool.from_url(url)
+ client = Redis(connection_pool=pool)
+ return cls(client, **kwargs)
+
+ def _retry(self) -> tenacity.AsyncRetrying:
+ def after_attempt(retry_state: tenacity.RetryCallState) -> None:
+ self._logger.warning(
+ f"{self.__class__.__name__}: connection failure "
+ f"(attempt {retry_state.attempt_number}): "
+ f"{retry_state.outcome.exception()}",
+ )
+
+ return tenacity.AsyncRetrying(
+ stop=self.retry_settings.stop,
+ wait=self.retry_settings.wait,
+ retry=tenacity.retry_if_exception_type(ConnectionError),
+ after=after_attempt,
+ sleep=anyio.sleep,
+ reraise=True,
+ )
+
+ async def start(self) -> None:
+ await super().start()
+ pubsub = self.client.pubsub()
+ try:
+ await pubsub.subscribe(self.channel)
+ except Exception:
+ await self.stop(force=True)
+ raise
+
+ self._stopped = False
+ self._task_group.start_soon(
+ self._listen_messages, pubsub, name="Redis subscriber"
+ )
+
+ async def stop(self, *, force: bool = False) -> None:
+ self._stopped = True
+ await super().stop(force=force)
+
+ async def _listen_messages(self, pubsub: PubSub) -> None:
+ while not self._stopped:
+ try:
+ async for attempt in self._retry():
+ with attempt:
+ msg = await pubsub.get_message(
+ ignore_subscribe_messages=True,
+ timeout=self.stop_check_interval,
+ )
+
+ if msg and isinstance(msg["data"], bytes):
+ event = self.reconstitute_event(msg["data"])
+ if event is not None:
+ await self.publish_local(event)
+ except Exception:
+ self._logger.exception(f"{self.__class__.__name__} listener crashed")
+ await pubsub.close()
+ raise
+
+ async def publish(self, event: Event) -> None:
+ notification = self.generate_notification(event)
+ async for attempt in self._retry():
+ with attempt:
+ await self.client.publish(self.channel, notification)
diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py
index 0b7593f..8f6c2c3 100644
--- a/src/apscheduler/eventbrokers/redis.py
+++ b/src/apscheduler/eventbrokers/redis.py
@@ -1,11 +1,13 @@
from __future__ import annotations
-from concurrent.futures import Future
from threading import Thread
import attrs
-from redis import ConnectionPool, Redis
+import tenacity
+from redis import ConnectionError, ConnectionPool, Redis, RedisCluster
+from redis.client import PubSub
+from .. import RetrySettings
from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
@@ -25,16 +27,16 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
:param client: a (synchronous) Redis client
:param serializer: the serializer used to (de)serialize events for transport
:param channel: channel on which to send the messages
- :param message_poll_interval: interval on which to poll for new messages (higher
+ :param stop_check_interval: interval on which to poll for new messages (higher
values mean slower reaction time but less CPU use)
"""
- client: Redis
+ client: Redis | RedisCluster
serializer: Serializer = attrs.field(factory=JSONSerializer)
channel: str = attrs.field(kw_only=True, default="apscheduler")
- message_poll_interval: float = attrs.field(kw_only=True, default=0.05)
+ stop_check_interval: float = attrs.field(kw_only=True, default=1)
+ retry_settings: RetrySettings = attrs.field(default=RetrySettings())
_stopped: bool = attrs.field(init=False, default=True)
- _ready_future: Future[None] = attrs.field(init=False)
_thread: Thread = attrs.field(init=False)
@classmethod
@@ -47,19 +49,38 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
:return: the newly created event broker
"""
- pool = ConnectionPool.from_url(url, **kwargs)
+ pool = ConnectionPool.from_url(url)
client = Redis(connection_pool=pool)
- return cls(client)
+ return cls(client, **kwargs)
+
+ def _retry(self) -> tenacity.Retrying:
+ def after_attempt(retry_state: tenacity.RetryCallState) -> None:
+ self._logger.warning(
+ f"{self.__class__.__name__}: connection failure "
+ f"(attempt {retry_state.attempt_number}): "
+ f"{retry_state.outcome.exception()}",
+ )
+
+ return tenacity.Retrying(
+ stop=self.retry_settings.stop,
+ wait=self.retry_settings.wait,
+ retry=tenacity.retry_if_exception_type(ConnectionError),
+ after=after_attempt,
+ reraise=True,
+ )
def start(self) -> None:
+ pubsub = self.client.pubsub()
+ pubsub.subscribe(self.channel)
self._stopped = False
- self._ready_future = Future()
+ super().start()
self._thread = Thread(
- target=self._listen_messages, daemon=True, name="Redis subscriber"
+ target=self._listen_messages,
+ args=[pubsub],
+ daemon=True,
+ name="Redis subscriber",
)
self._thread.start()
- self._ready_future.result(10)
- super().start()
def stop(self, *, force: bool = False) -> None:
self._stopped = True
@@ -68,33 +89,29 @@ class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
super().stop(force=force)
- def _listen_messages(self) -> None:
+ def _listen_messages(self, pubsub: PubSub) -> None:
while not self._stopped:
try:
- pubsub = self.client.pubsub()
- pubsub.subscribe(self.channel)
- except BaseException as exc:
- if not self._ready_future.done():
- self._ready_future.set_exception(exc)
-
+ for attempt in self._retry():
+ with attempt:
+ msg = pubsub.get_message(
+ ignore_subscribe_messages=True,
+ timeout=self.stop_check_interval,
+ )
+
+ if msg and isinstance(msg["data"], bytes):
+ event = self.reconstitute_event(msg["data"])
+ if event is not None:
+ self.publish_local(event)
+ except Exception:
+ self._logger.exception(f"{self.__class__.__name__} listener crashed")
+ pubsub.close()
raise
- else:
- if not self._ready_future.done():
- self._ready_future.set_result(None)
- try:
- while not self._stopped:
- msg = pubsub.get_message(timeout=self.message_poll_interval)
- if msg and isinstance(msg["data"], bytes):
- event = self.reconstitute_event(msg["data"])
- if event is not None:
- self.publish_local(event)
- except BaseException:
- self._logger.exception("Subscriber crashed")
- raise
- finally:
- pubsub.close()
+ pubsub.close()
def publish(self, event: Event) -> None:
notification = self.generate_notification(event)
- self.client.publish(self.channel, notification)
+ for attempt in self._retry():
+ with attempt:
+ self.client.publish(self.channel, notification)