summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/redis.py
blob: 10d2343b15c60366a34f24b6706f116176e880ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from __future__ import annotations

from asyncio import CancelledError
from contextlib import AsyncExitStack

import anyio
import attrs
import tenacity
from redis import ConnectionError
from redis.asyncio import Redis, RedisCluster
from redis.asyncio.client import PubSub
from redis.asyncio.connection import ConnectionPool

from .._events import Event
from .base import BaseExternalEventBroker


@attrs.define(eq=False)
class RedisEventBroker(BaseExternalEventBroker):
    """
    An event broker that uses a Redis server to broadcast events.

    Requires the redis_ library to be installed.

    .. _redis: https://pypi.org/project/redis/

    :param client: an asynchronous Redis client
    :param channel: channel on which to send the messages
    :param stop_check_interval: interval (in seconds) on which the channel listener
        should check if it should stop (higher values mean slower reaction time but less
        CPU use)
    """

    client: Redis | RedisCluster
    channel: str = attrs.field(kw_only=True, default="apscheduler")
    stop_check_interval: float = attrs.field(kw_only=True, default=1)
    _stopped: bool = attrs.field(init=False, default=True)

    @classmethod
    def from_url(cls, url: str, **kwargs) -> RedisEventBroker:
        """
        Create a new event broker from a URL.

        :param url: a Redis URL (```redis://...```)
        :param kwargs: keyword arguments to pass to the initializer of this class
        :return: the newly created event broker

        """
        pool = ConnectionPool.from_url(url)
        client = Redis(connection_pool=pool)
        return cls(client, **kwargs)

    def _retry(self) -> tenacity.AsyncRetrying:
        def after_attempt(retry_state: tenacity.RetryCallState) -> None:
            self._logger.warning(
                f"{self.__class__.__name__}: connection failure "
                f"(attempt {retry_state.attempt_number}): "
                f"{retry_state.outcome.exception()}",
            )

        return tenacity.AsyncRetrying(
            stop=self.retry_settings.stop,
            wait=self.retry_settings.wait,
            retry=tenacity.retry_if_exception_type(ConnectionError),
            after=after_attempt,
            sleep=anyio.sleep,
            reraise=True,
        )

    async def start(self, exit_stack: AsyncExitStack) -> None:
        await super().start(exit_stack)
        pubsub = self.client.pubsub()
        await pubsub.subscribe(self.channel)

        self._stopped = False
        exit_stack.callback(setattr, self, "_stopped", True)
        self._task_group.start_soon(
            self._listen_messages, pubsub, name="Redis subscriber"
        )

    async def _listen_messages(self, pubsub: PubSub) -> None:
        while not self._stopped:
            try:
                async for attempt in self._retry():
                    with attempt:
                        msg = await pubsub.get_message(
                            ignore_subscribe_messages=True,
                            timeout=self.stop_check_interval,
                        )

                if msg and isinstance(msg["data"], bytes):
                    event = self.reconstitute_event(msg["data"])
                    if event is not None:
                        await self.publish_local(event)
            except Exception as exc:
                # CancelledError is a subclass of Exception in Python 3.7
                if not isinstance(exc, CancelledError):
                    self._logger.exception(
                        f"{self.__class__.__name__} listener crashed"
                    )

                await pubsub.close()
                raise

    async def publish(self, event: Event) -> None:
        notification = self.generate_notification(event)
        async for attempt in self._retry():
            with attempt:
                await self.client.publish(self.channel, notification)