summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/redis.py
blob: 8f6c2c3704b8bbe9d4212a87b9086a1285dd39ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations

from threading import Thread

import attrs
import tenacity
from redis import ConnectionError, ConnectionPool, Redis, RedisCluster
from redis.client import PubSub

from .. import RetrySettings
from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
from .base import DistributedEventBrokerMixin
from .local import LocalEventBroker


@attrs.define(eq=False)
class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
    """
    An event broker that uses a Redis server to broadcast events.

    Requires the redis_ library to be installed.

    .. _redis: https://pypi.org/project/redis/

    :param client: a (synchronous) Redis client
    :param serializer: the serializer used to (de)serialize events for transport
    :param channel: channel on which to send the messages
    :param stop_check_interval: interval on which to poll for new messages (higher
        values mean slower reaction time but less CPU use)
    """

    client: Redis | RedisCluster
    serializer: Serializer = attrs.field(factory=JSONSerializer)
    channel: str = attrs.field(kw_only=True, default="apscheduler")
    stop_check_interval: float = attrs.field(kw_only=True, default=1)
    retry_settings: RetrySettings = attrs.field(default=RetrySettings())
    _stopped: bool = attrs.field(init=False, default=True)
    _thread: Thread = attrs.field(init=False)

    @classmethod
    def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
        """
        Create a new event broker from a URL.

        :param url: a Redis URL (```redis://...```)
        :param kwargs: keyword arguments to pass to the initializer of this class
        :return: the newly created event broker

        """
        pool = ConnectionPool.from_url(url)
        client = Redis(connection_pool=pool)
        return cls(client, **kwargs)

    def _retry(self) -> tenacity.Retrying:
        def after_attempt(retry_state: tenacity.RetryCallState) -> None:
            self._logger.warning(
                f"{self.__class__.__name__}: connection failure "
                f"(attempt {retry_state.attempt_number}): "
                f"{retry_state.outcome.exception()}",
            )

        return tenacity.Retrying(
            stop=self.retry_settings.stop,
            wait=self.retry_settings.wait,
            retry=tenacity.retry_if_exception_type(ConnectionError),
            after=after_attempt,
            reraise=True,
        )

    def start(self) -> None:
        pubsub = self.client.pubsub()
        pubsub.subscribe(self.channel)
        self._stopped = False
        super().start()
        self._thread = Thread(
            target=self._listen_messages,
            args=[pubsub],
            daemon=True,
            name="Redis subscriber",
        )
        self._thread.start()

    def stop(self, *, force: bool = False) -> None:
        self._stopped = True
        if not force:
            self._thread.join(5)

        super().stop(force=force)

    def _listen_messages(self, pubsub: PubSub) -> None:
        while not self._stopped:
            try:
                for attempt in self._retry():
                    with attempt:
                        msg = pubsub.get_message(
                            ignore_subscribe_messages=True,
                            timeout=self.stop_check_interval,
                        )

                if msg and isinstance(msg["data"], bytes):
                    event = self.reconstitute_event(msg["data"])
                    if event is not None:
                        self.publish_local(event)
            except Exception:
                self._logger.exception(f"{self.__class__.__name__} listener crashed")
                pubsub.close()
                raise

        pubsub.close()

    def publish(self, event: Event) -> None:
        notification = self.generate_notification(event)
        for attempt in self._retry():
            with attempt:
                self.client.publish(self.channel, notification)