1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
from __future__ import annotations
import anyio
import attrs
import tenacity
from redis import ConnectionError
from redis.asyncio import Redis, RedisCluster
from redis.asyncio.client import PubSub
from redis.asyncio.connection import ConnectionPool
from .. import RetrySettings
from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
from .async_local import LocalAsyncEventBroker
from .base import DistributedEventBrokerMixin
@attrs.define(eq=False)
class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
"""
An event broker that uses a Redis server to broadcast events.
Requires the redis_ library to be installed.
.. _redis: https://pypi.org/project/redis/
:param client: an asynchronous Redis client
:param serializer: the serializer used to (de)serialize events for transport
:param channel: channel on which to send the messages
:param retry_settings: Tenacity settings for retrying operations in case of a
broker connectivity problem
:param stop_check_interval: interval on which the channel listener should check if
it
values mean slower reaction time but less CPU use)
"""
client: Redis | RedisCluster
serializer: Serializer = attrs.field(factory=JSONSerializer)
channel: str = attrs.field(kw_only=True, default="apscheduler")
retry_settings: RetrySettings = attrs.field(default=RetrySettings())
stop_check_interval: float = attrs.field(kw_only=True, default=1)
_stopped: bool = attrs.field(init=False, default=True)
@classmethod
def from_url(cls, url: str, **kwargs) -> AsyncRedisEventBroker:
"""
Create a new event broker from a URL.
:param url: a Redis URL (```redis://...```)
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
"""
pool = ConnectionPool.from_url(url)
client = Redis(connection_pool=pool)
return cls(client, **kwargs)
def _retry(self) -> tenacity.AsyncRetrying:
def after_attempt(retry_state: tenacity.RetryCallState) -> None:
self._logger.warning(
f"{self.__class__.__name__}: connection failure "
f"(attempt {retry_state.attempt_number}): "
f"{retry_state.outcome.exception()}",
)
return tenacity.AsyncRetrying(
stop=self.retry_settings.stop,
wait=self.retry_settings.wait,
retry=tenacity.retry_if_exception_type(ConnectionError),
after=after_attempt,
sleep=anyio.sleep,
reraise=True,
)
async def start(self) -> None:
await super().start()
pubsub = self.client.pubsub()
try:
await pubsub.subscribe(self.channel)
except Exception:
await self.stop(force=True)
raise
self._stopped = False
self._task_group.start_soon(
self._listen_messages, pubsub, name="Redis subscriber"
)
async def stop(self, *, force: bool = False) -> None:
self._stopped = True
await super().stop(force=force)
async def _listen_messages(self, pubsub: PubSub) -> None:
while not self._stopped:
try:
async for attempt in self._retry():
with attempt:
msg = await pubsub.get_message(
ignore_subscribe_messages=True,
timeout=self.stop_check_interval,
)
if msg and isinstance(msg["data"], bytes):
event = self.reconstitute_event(msg["data"])
if event is not None:
await self.publish_local(event)
except Exception:
self._logger.exception(f"{self.__class__.__name__} listener crashed")
await pubsub.close()
raise
async def publish(self, event: Event) -> None:
notification = self.generate_notification(event)
async for attempt in self._retry():
with attempt:
await self.client.publish(self.channel, notification)
|