diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 12:18:15 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 12:18:15 +0300 |
commit | e47af449940a29452a88f61c173e153139ed5b14 (patch) | |
tree | 504ba0631a0fe061a44d76ad244d98fb06340b1b | |
parent | 2a0eac6fb42391cd5f356a94fa77b901d4b6bcd8 (diff) | |
download | apscheduler-e47af449940a29452a88f61c173e153139ed5b14.tar.gz |
Fixed resource warnings in the asyncpg event broker when the listener task is cancelled
-rw-r--r-- | src/apscheduler/eventbrokers/asyncpg.py | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 27b1bed..7e3045e 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -145,13 +145,25 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): self._logger.info("Stopped event broker") async def _listen_notifications(self, *, task_status=TASK_STATUS_IGNORED) -> None: - def callback( + conn: Connection + + def listen_callback( connection: Connection, pid: int, channel: str, payload: str ) -> None: event = self.reconstitute_event_str(payload) if event is not None: self._task_group.start_soon(self.publish_local, event) + async def close_connection() -> None: + if not conn.is_closed(): + with move_on_after(3, shield=True): + await conn.close() + + async def unsubscribe() -> None: + if not conn.is_closed(): + with move_on_after(3, shield=True): + await conn.remove_listener(self.channel, listen_callback) + task_started_sent = False send, receive = create_memory_object_stream(100, str) while True: @@ -160,13 +172,11 @@ class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): with attempt: conn = await self.connection_factory() - exit_stack.push_async_callback(conn.close) + exit_stack.push_async_callback(close_connection) self._logger.info("Connection established") try: - await conn.add_listener(self.channel, callback) - exit_stack.push_async_callback( - conn.remove_listener, self.channel, callback - ) + await conn.add_listener(self.channel, listen_callback) + exit_stack.push_async_callback(unsubscribe) if not task_started_sent: task_status.started(send) task_started_sent = True |