diff options
author | Jason Barnett <jason.w.barnett@gmail.com> | 2023-01-02 14:09:15 -0500 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2023-01-05 00:34:46 +0600 |
commit | d3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84 (patch) | |
tree | c0af9ba8e6a5dc16ae05a979c68382579af1f26b /kombu | |
parent | 54cd277bc34780a5fdb2f4e1883ffeff1dce1ce4 (diff) | |
download | kombu-d3ca0d562e7c3ee9f8b0cfffb3bb798b98c28a84.tar.gz |
azure service bus: add type annotations and use cached property
Diffstat (limited to 'kombu')
-rw-r--r-- | kombu/transport/azureservicebus.py | 53 |
1 files changed, 23 insertions, 30 deletions
diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py index a2fbd662..e7e2c0cc 100644 --- a/kombu/transport/azureservicebus.py +++ b/kombu/transport/azureservicebus.py @@ -57,7 +57,7 @@ from __future__ import annotations import string from queue import Empty -from typing import Any +from typing import Any, Dict, Set import azure.core.exceptions import azure.servicebus.exceptions @@ -87,8 +87,8 @@ class SendReceive: def __init__(self, receiver: ServiceBusReceiver | None = None, sender: ServiceBusSender | None = None): - self.receiver = receiver # type: ServiceBusReceiver - self.sender = sender # type: ServiceBusSender + self.receiver: ServiceBusReceiver = receiver + self.sender: ServiceBusSender = sender def close(self) -> None: if self.receiver: @@ -102,21 +102,19 @@ class SendReceive: class Channel(virtual.Channel): """Azure Service Bus channel.""" - default_wait_time_seconds = 5 # in seconds - default_peek_lock_seconds = 60 # in seconds (default 60, max 300) + default_wait_time_seconds: int = 5 # in seconds + default_peek_lock_seconds: int = 60 # in seconds (default 60, max 300) # in seconds (is the default from service bus repo) - default_uamqp_keep_alive_interval = 30 + default_uamqp_keep_alive_interval: int = 30 # number of retries (is the default from service bus repo) - default_retry_total = 3 + default_retry_total: int = 3 # exponential backoff factor (is the default from service bus repo) - default_retry_backoff_factor = 0.8 + default_retry_backoff_factor: float = 0.8 # Max time to backoff (is the default from service bus repo) - default_retry_backoff_max = 120 - domain_format = 'kombu%(vhost)s' - _queue_service = None # type: ServiceBusClient - _queue_mgmt_service = None # type: ServiceBusAdministrationClient - _queue_cache = {} # type: Dict[str, SendReceive] - _noack_queues = set() # type: Set[str] + default_retry_backoff_max: int = 120 + domain_format: str = 'kombu%(vhost)s' + _queue_cache: Dict[str, SendReceive] = {} + _noack_queues: Set[str] = set() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -229,7 +227,7 @@ class Channel(virtual.Channel): """Delete queue by name.""" queue = self.entity_name(self.queue_name_prefix + queue) - self._queue_mgmt_service.delete_queue(queue) + self.queue_mgmt_service.delete_queue(queue) send_receive_obj = self._queue_cache.pop(queue, None) if send_receive_obj: send_receive_obj.close() @@ -300,7 +298,7 @@ class Channel(virtual.Channel): return props.total_message_count - def _purge(self, queue): + def _purge(self, queue) -> int: """Delete all current messages in a queue.""" # Azure doesn't provide a purge api yet n = 0 @@ -339,24 +337,19 @@ class Channel(virtual.Channel): if self.connection is not None: self.connection.close_channel(self) - @property + @cached_property def queue_service(self) -> ServiceBusClient: - if self._queue_service is None: - self._queue_service = ServiceBusClient.from_connection_string( - self._connection_string, - retry_total=self.retry_total, - retry_backoff_factor=self.retry_backoff_factor, - retry_backoff_max=self.retry_backoff_max - ) - return self._queue_service + return ServiceBusClient.from_connection_string( + self._connection_string, + retry_total=self.retry_total, + retry_backoff_factor=self.retry_backoff_factor, + retry_backoff_max=self.retry_backoff_max + ) - @property + @cached_property def queue_mgmt_service(self) -> ServiceBusAdministrationClient: - if self._queue_mgmt_service is None: - self._queue_mgmt_service = \ - ServiceBusAdministrationClient.from_connection_string( + return ServiceBusAdministrationClient.from_connection_string( self._connection_string) - return self._queue_mgmt_service @property def conninfo(self): |