summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/mqtt.py
blob: ab5dc88890694330faceeffe0ccc0d9e87c2ee9a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations

from concurrent.futures import Future
from typing import Any

import attrs
from paho.mqtt.client import Client, MQTTMessage
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCodes

from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
from .base import DistributedEventBrokerMixin
from .local import LocalEventBroker


@attrs.define(eq=False)
class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
    """
    An event broker that uses an MQTT (v3.1 or v5) broker to broadcast events.

    Requires the paho-mqtt_ library to be installed.

    .. _paho-mqtt: https://pypi.org/project/paho-mqtt/

    :param client: a paho-mqtt client
    :param serializer: the serializer used to (de)serialize events for transport
    :param host: host name or IP address to connect to
    :param port: TCP port number to connect to
    :param topic: topic on which to send the messages
    :param subscribe_qos: MQTT QoS to use for subscribing messages
    :param publish_qos: MQTT QoS to use for publishing messages
    """

    client: Client
    serializer: Serializer = attrs.field(factory=JSONSerializer)
    host: str = attrs.field(kw_only=True, default="localhost")
    port: int = attrs.field(kw_only=True, default=1883)
    topic: str = attrs.field(kw_only=True, default="apscheduler")
    subscribe_qos: int = attrs.field(kw_only=True, default=0)
    publish_qos: int = attrs.field(kw_only=True, default=0)
    _ready_future: Future[None] = attrs.field(init=False)

    def start(self) -> None:
        super().start()
        self._ready_future = Future()
        self.client.enable_logger(self._logger)
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_subscribe = self._on_subscribe
        self.client.connect(self.host, self.port)
        self.client.loop_start()
        self._ready_future.result(10)

    def stop(self, *, force: bool = False) -> None:
        self.client.disconnect()
        self.client.loop_stop(force=force)
        super().stop()

    def _on_connect(
        self,
        client: Client,
        userdata: Any,
        flags: dict[str, Any],
        rc: ReasonCodes | int,
        properties: Properties | None = None,
    ) -> None:
        try:
            client.subscribe(self.topic, qos=self.subscribe_qos)
        except Exception as exc:
            self._ready_future.set_exception(exc)
            raise

    def _on_subscribe(
        self, client: Client, userdata: Any, mid, granted_qos: list[int]
    ) -> None:
        self._ready_future.set_result(None)

    def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None:
        event = self.reconstitute_event(msg.payload)
        if event is not None:
            self.publish_local(event)

    def publish(self, event: Event) -> None:
        notification = self.generate_notification(event)
        self.client.publish(self.topic, notification, qos=self.publish_qos)