summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/local.py
blob: 25ff2dd5ab4b26ad1da1dd8960d8bc55b302291b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import annotations

from asyncio import iscoroutinefunction
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
from threading import Lock
from typing import Any, Callable, Iterable

import attrs

from .._events import Event
from ..abc import EventBroker, Subscription
from .base import BaseEventBroker


@attrs.define(eq=False)
class LocalEventBroker(EventBroker, BaseEventBroker):
    """
    Synchronous, local event broker.

    This event broker only broadcasts within the process it runs in, and is therefore
    not suitable for multi-node or multiprocess use cases.

    Does not serialize events.
    """

    _executor: ThreadPoolExecutor = attrs.field(init=False)
    _exit_stack: ExitStack = attrs.field(init=False)
    _subscriptions_lock: Lock = attrs.field(init=False, factory=Lock)

    def start(self) -> None:
        self._executor = ThreadPoolExecutor(1)

    def stop(self, *, force: bool = False) -> None:
        self._executor.shutdown(wait=not force)
        del self._executor

    def subscribe(
        self,
        callback: Callable[[Event], Any],
        event_types: Iterable[type[Event]] | None = None,
        *,
        one_shot: bool = False,
    ) -> Subscription:
        if iscoroutinefunction(callback):
            raise ValueError(
                "Coroutine functions are not supported as callbacks on a synchronous "
                "event source"
            )

        with self._subscriptions_lock:
            return super().subscribe(callback, event_types, one_shot=one_shot)

    def unsubscribe(self, token: object) -> None:
        with self._subscriptions_lock:
            super().unsubscribe(token)

    def publish(self, event: Event) -> None:
        self.publish_local(event)

    def publish_local(self, event: Event) -> None:
        event_type = type(event)
        with self._subscriptions_lock:
            one_shot_tokens: list[object] = []
            for _token, subscription in self._subscriptions.items():
                if (
                    subscription.event_types is None
                    or event_type in subscription.event_types
                ):
                    self._executor.submit(
                        self._deliver_event, subscription.callback, event
                    )
                    if subscription.one_shot:
                        one_shot_tokens.append(subscription.token)

            for token in one_shot_tokens:
                super().unsubscribe(token)

    def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
        try:
            func(event)
        except BaseException:
            self._logger.exception(
                "Error delivering %s event", event.__class__.__name__
            )