summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-03 20:27:15 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-03 21:34:40 +0300
commit22b41e7bc632ba57ae0a625aec29d616eb87b43b (patch)
treebd9292410f3f169c4f6934b9ff33e2fd77152778
parent02dd92a159b9ce99fe725949ab955d02f9450a75 (diff)
downloadapscheduler-22b41e7bc632ba57ae0a625aec29d616eb87b43b.tar.gz
Improved the MQTT event broker
* Connection success, failure and disconnection events are now logged * The client parameter is now optional
-rw-r--r--docs/versionhistory.rst1
-rw-r--r--src/apscheduler/eventbrokers/mqtt.py23
2 files changed, 21 insertions, 3 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 4c91c58..7f997f1 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -12,6 +12,7 @@ APScheduler, see the :doc:`migration section <migration>`.
- Replaced ``from_asyncpg_pool()`` with ``from_dsn()`` in the asyncpg event broker
- Changed ``from_async_sqla_engine()`` in asyncpg event broker to only copy the
connection options instead of directly using the engine
+- Simplified the MQTT event broker by providing a default ``client`` instance if omitted
**4.0.0a1**
diff --git a/src/apscheduler/eventbrokers/mqtt.py b/src/apscheduler/eventbrokers/mqtt.py
index ab5dc88..10ac605 100644
--- a/src/apscheduler/eventbrokers/mqtt.py
+++ b/src/apscheduler/eventbrokers/mqtt.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import sys
from concurrent.futures import Future
from typing import Any
@@ -33,7 +34,7 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
:param publish_qos: MQTT QoS to use for publishing messages
"""
- client: Client
+ client: Client = attrs.field(factory=Client)
serializer: Serializer = attrs.field(factory=JSONSerializer)
host: str = attrs.field(kw_only=True, default="localhost")
port: int = attrs.field(kw_only=True, default=1883)
@@ -45,8 +46,9 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
def start(self) -> None:
super().start()
self._ready_future = Future()
- self.client.enable_logger(self._logger)
self.client.on_connect = self._on_connect
+ self.client.on_connect_fail = self._on_connect_fail
+ self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_subscribe = self._on_subscribe
self.client.connect(self.host, self.port)
@@ -66,15 +68,30 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
+ self._logger.info(f"{self.__class__.__name__}: Connected")
try:
client.subscribe(self.topic, qos=self.subscribe_qos)
except Exception as exc:
self._ready_future.set_exception(exc)
raise
+ def _on_connect_fail(self, client: Client, userdata: Any) -> None:
+ exc = sys.exc_info()[1]
+ self._logger.error(f"{self.__class__.__name__}: Connection failed ({exc})")
+
+ def _on_disconnect(
+ self,
+ client: Client,
+ userdata: Any,
+ rc: ReasonCodes | int,
+ properties: Properties | None = None,
+ ) -> None:
+ self._logger.error(f"{self.__class__.__name__}: Disconnected (code: {rc})")
+
def _on_subscribe(
- self, client: Client, userdata: Any, mid, granted_qos: list[int]
+ self, client: Client, userdata: Any, mid: int, granted_qos: list[int]
) -> None:
+ self._logger.info(f"{self.__class__.__name__}: Subscribed")
self._ready_future.set_result(None)
def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None: