1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
from __future__ import annotations
from concurrent.futures import Future
from typing import Any
import attrs
from paho.mqtt.client import Client, MQTTMessage
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCodes
from .._events import Event
from ..abc import Serializer
from ..serializers.json import JSONSerializer
from .base import DistributedEventBrokerMixin
from .local import LocalEventBroker
@attrs.define(eq=False)
class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
"""
An event broker that uses an MQTT (v3.1 or v5) broker to broadcast events.
Requires the paho-mqtt_ library to be installed.
.. _paho-mqtt: https://pypi.org/project/paho-mqtt/
:param client: a paho-mqtt client
:param serializer: the serializer used to (de)serialize events for transport
:param host: host name or IP address to connect to
:param port: TCP port number to connect to
:param topic: topic on which to send the messages
:param subscribe_qos: MQTT QoS to use for subscribing messages
:param publish_qos: MQTT QoS to use for publishing messages
"""
client: Client
serializer: Serializer = attrs.field(factory=JSONSerializer)
host: str = attrs.field(kw_only=True, default="localhost")
port: int = attrs.field(kw_only=True, default=1883)
topic: str = attrs.field(kw_only=True, default="apscheduler")
subscribe_qos: int = attrs.field(kw_only=True, default=0)
publish_qos: int = attrs.field(kw_only=True, default=0)
_ready_future: Future[None] = attrs.field(init=False)
def start(self) -> None:
super().start()
self._ready_future = Future()
self.client.enable_logger(self._logger)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_subscribe = self._on_subscribe
self.client.connect(self.host, self.port)
self.client.loop_start()
self._ready_future.result(10)
def stop(self, *, force: bool = False) -> None:
self.client.disconnect()
self.client.loop_stop(force=force)
super().stop()
def _on_connect(
self,
client: Client,
userdata: Any,
flags: dict[str, Any],
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
try:
client.subscribe(self.topic, qos=self.subscribe_qos)
except Exception as exc:
self._ready_future.set_exception(exc)
raise
def _on_subscribe(
self, client: Client, userdata: Any, mid, granted_qos: list[int]
) -> None:
self._ready_future.set_result(None)
def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None:
event = self.reconstitute_event(msg.payload)
if event is not None:
self.publish_local(event)
def publish(self, event: Event) -> None:
notification = self.generate_notification(event)
self.client.publish(self.topic, notification, qos=self.publish_qos)
|