From e47af449940a29452a88f61c173e153139ed5b14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 4 Sep 2022 12:18:15 +0300 Subject: Fixed resource warnings in the asyncpg event broker when the listener task is cancelled --- src/apscheduler/eventbrokers/asyncpg.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 27b1bed..7e3045e 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -145,13 +145,25 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): self._logger.info("Stopped event broker") async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None: - def callback( + conn: Connection + + def listen_callback( connection: Connection, pid: int, channel: str, payload: str ) -> None: event = self.reconstitute_event_str(payload) if event is not None: self._task_group.start_soon(self.publish_local, event) + async def close_connection() -> None: + if not conn.is_closed(): + with move_on_after(3, shield=True): + await conn.close() + + async def unsubscribe() -> None: + if not conn.is_closed(): + with move_on_after(3, shield=True): + await conn.remove_listener(self.channel, listen_callback) + task_started_sent = False send, receive = create_memory_object_stream(100, str) while True: @@ -160,13 +172,11 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): with attempt: conn = await self.connection_factory() - exit_stack.push_async_callback(conn.close) + exit_stack.push_async_callback(close_connection) self._logger.info("Connection established") try: - await conn.add_listener(self.channel, callback) - exit_stack.push_async_callback( - conn.remove_listener, self.channel, callback - ) + await conn.add_listener(self.channel, listen_callback) + exit_stack.push_async_callback(unsubscribe) if not task_started_sent: task_status.started(send) task_started_sent = True -- cgit v1.2.1