1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
from __future__ import annotations
from threading import Thread
import attrs
import tenacity
from redis import ConnectionError, ConnectionPool, Redis, RedisCluster
from redis.client import PubSub
from .. import RetrySettings
from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
from .base import DistributedEventBrokerMixin
from .local import LocalEventBroker
@attrs.define(eq=False)
class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
"""
An event broker that uses a Redis server to broadcast events.
Requires the redis_ library to be installed.
.. _redis: https://pypi.org/project/redis/
:param client: a (synchronous) Redis client
:param serializer: the serializer used to (de)serialize events for transport
:param channel: channel on which to send the messages
:param stop_check_interval: interval on which to poll for new messages (higher
values mean slower reaction time but less CPU use)
"""
client: Redis | RedisCluster
serializer: Serializer = attrs.field(factory=JSONSerializer)
channel: str = attrs.field(kw_only=True, default="apscheduler")
stop_check_interval: float = attrs.field(kw_only=True, default=1)
retry_settings: RetrySettings = attrs.field(default=RetrySettings())
_stopped: bool = attrs.field(init=False, default=True)
_thread: Thread = attrs.field(init=False)
@classmethod
def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
"""
Create a new event broker from a URL.
:param url: a Redis URL (```redis://...```)
:param kwargs: keyword arguments to pass to the initializer of this class
:return: the newly created event broker
"""
pool = ConnectionPool.from_url(url)
client = Redis(connection_pool=pool)
return cls(client, **kwargs)
def _retry(self) -> tenacity.Retrying:
def after_attempt(retry_state: tenacity.RetryCallState) -> None:
self._logger.warning(
f"{self.__class__.__name__}: connection failure "
f"(attempt {retry_state.attempt_number}): "
f"{retry_state.outcome.exception()}",
)
return tenacity.Retrying(
stop=self.retry_settings.stop,
wait=self.retry_settings.wait,
retry=tenacity.retry_if_exception_type(ConnectionError),
after=after_attempt,
reraise=True,
)
def start(self) -> None:
pubsub = self.client.pubsub()
pubsub.subscribe(self.channel)
self._stopped = False
super().start()
self._thread = Thread(
target=self._listen_messages,
args=[pubsub],
daemon=True,
name="Redis subscriber",
)
self._thread.start()
def stop(self, *, force: bool = False) -> None:
self._stopped = True
if not force:
self._thread.join(5)
super().stop(force=force)
def _listen_messages(self, pubsub: PubSub) -> None:
while not self._stopped:
try:
for attempt in self._retry():
with attempt:
msg = pubsub.get_message(
ignore_subscribe_messages=True,
timeout=self.stop_check_interval,
)
if msg and isinstance(msg["data"], bytes):
event = self.reconstitute_event(msg["data"])
if event is not None:
self.publish_local(event)
except Exception:
self._logger.exception(f"{self.__class__.__name__} listener crashed")
pubsub.close()
raise
pubsub.close()
def publish(self, event: Event) -> None:
notification = self.generate_notification(event)
for attempt in self._retry():
with attempt:
self.client.publish(self.channel, notification)
|