diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 20:27:15 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 21:34:40 +0300 |
commit | 22b41e7bc632ba57ae0a625aec29d616eb87b43b (patch) | |
tree | bd9292410f3f169c4f6934b9ff33e2fd77152778 | |
parent | 02dd92a159b9ce99fe725949ab955d02f9450a75 (diff) | |
download | apscheduler-22b41e7bc632ba57ae0a625aec29d616eb87b43b.tar.gz |
Improved the MQTT event broker
* Connection success, failure and disconnection events are now logged
* The client parameter is now optional
-rw-r--r-- | docs/versionhistory.rst | 1 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/mqtt.py | 23 |
2 files changed, 21 insertions, 3 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 4c91c58..7f997f1 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -12,6 +12,7 @@ APScheduler, see the :doc:`migration section <migration>`. - Replaced ``from_asyncpg_pool()`` with ``from_dsn()`` in the asyncpg event broker - Changed ``from_async_sqla_engine()`` in asyncpg event broker to only copy the connection options instead of directly using the engine +- Simplified the MQTT event broker by providing a default ``client`` instance if omitted **4.0.0a1** diff --git a/src/apscheduler/eventbrokers/mqtt.py b/src/apscheduler/eventbrokers/mqtt.py index ab5dc88..10ac605 100644 --- a/src/apscheduler/eventbrokers/mqtt.py +++ b/src/apscheduler/eventbrokers/mqtt.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from concurrent.futures import Future from typing import Any @@ -33,7 +34,7 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin): :param publish_qos: MQTT QoS to use for publishing messages """ - client: Client + client: Client = attrs.field(factory=Client) serializer: Serializer = attrs.field(factory=JSONSerializer) host: str = attrs.field(kw_only=True, default="localhost") port: int = attrs.field(kw_only=True, default=1883) @@ -45,8 +46,9 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin): def start(self) -> None: super().start() self._ready_future = Future() - self.client.enable_logger(self._logger) self.client.on_connect = self._on_connect + self.client.on_connect_fail = self._on_connect_fail + self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message self.client.on_subscribe = self._on_subscribe self.client.connect(self.host, self.port) @@ -66,15 +68,30 @@ class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin): rc: ReasonCodes | int, properties: Properties | None = None, ) -> None: + self._logger.info(f"{self.__class__.__name__}: Connected") try: client.subscribe(self.topic, qos=self.subscribe_qos) except Exception as exc: self._ready_future.set_exception(exc) raise + def _on_connect_fail(self, client: Client, userdata: Any) -> None: + exc = sys.exc_info()[1] + self._logger.error(f"{self.__class__.__name__}: Connection failed ({exc})") + + def _on_disconnect( + self, + client: Client, + userdata: Any, + rc: ReasonCodes | int, + properties: Properties | None = None, + ) -> None: + self._logger.error(f"{self.__class__.__name__}: Disconnected (code: {rc})") + def _on_subscribe( - self, client: Client, userdata: Any, mid, granted_qos: list[int] + self, client: Client, userdata: Any, mid: int, granted_qos: list[int] ) -> None: + self._logger.info(f"{self.__class__.__name__}: Subscribed") self._ready_future.set_result(None) def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None: |