summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorMarcelo Trylesinski <marcelotryle@gmail.com>2022-05-15 09:17:49 +0200
committerGitHub <noreply@github.com>2022-05-15 13:17:49 +0600
commit0a2f54eac2d57925a448cbb307a74b92f9f370b2 (patch)
tree2acecccf70dbfa5fdd4332be9a86b21be83649ff /kombu
parent2f62ea49db8316c7c777ae329f8865563cf47f6a (diff)
downloadkombu-0a2f54eac2d57925a448cbb307a74b92f9f370b2.tar.gz
Annotate `abstract.py` (#1522)
* Annotate `abstract.py` * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * apply pre-commit * Use quotes * Add typing_extensions as requirement * Add quotes * Add quotes Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 'kombu')
-rw-r--r--kombu/abstract.py58
-rw-r--r--kombu/connection.py16
2 files changed, 52 insertions, 22 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 3e3483dc..9bafba99 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -3,19 +3,32 @@
from __future__ import annotations
from copy import copy
+from typing import TYPE_CHECKING, Any, Callable, TypeVar
from .connection import maybe_channel
from .exceptions import NotBoundError
from .utils.functional import ChannelPromise
+if TYPE_CHECKING:
+ from kombu.transport.virtual import Channel
+
+
__all__ = ('Object', 'MaybeChannelBound')
+_T = TypeVar("_T")
+_ObjectType = TypeVar("_ObjectType", bound="Object")
+_MaybeChannelBoundType = TypeVar(
+ "_MaybeChannelBoundType", bound="MaybeChannelBound"
+)
+
-def unpickle_dict(cls, kwargs):
+def unpickle_dict(
+ cls: type[_ObjectType], kwargs: dict[str, Any]
+) -> _ObjectType:
return cls(**kwargs)
-def _any(v):
+def _any(v: _T) -> _T:
return v
@@ -25,9 +38,9 @@ class Object:
Supports automatic kwargs->attributes handling, and cloning.
"""
- attrs = ()
+ attrs: tuple[tuple[str, Any], ...] = ()
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
for name, type_ in self.attrs:
value = kwargs.get(name)
if value is not None:
@@ -38,8 +51,8 @@ class Object:
except AttributeError:
setattr(self, name, None)
- def as_dict(self, recurse=False):
- def f(obj, type):
+ def as_dict(self, recurse: bool = False) -> dict[str, Any]:
+ def f(obj: Any, type: Callable[[Any], Any]) -> Any:
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return type(obj) if type and obj is not None else obj
@@ -47,31 +60,40 @@ class Object:
attr: f(getattr(self, attr), type) for attr, type in self.attrs
}
- def __reduce__(self):
+ def __reduce__(self: _ObjectType) -> tuple[
+ Callable[[type[_ObjectType], dict[str, Any]], _ObjectType],
+ tuple[type[_ObjectType], dict[str, Any]]
+ ]:
return unpickle_dict, (self.__class__, self.as_dict())
- def __copy__(self):
+ def __copy__(self: _ObjectType) -> _ObjectType:
return self.__class__(**self.as_dict())
class MaybeChannelBound(Object):
"""Mixin for classes that can be bound to an AMQP channel."""
- _channel = None
+ _channel: Channel | None = None
_is_bound = False
#: Defines whether maybe_declare can skip declaring this entity twice.
can_cache_declaration = False
- def __call__(self, channel):
+ def __call__(
+ self: _MaybeChannelBoundType, channel: Channel
+ ) -> _MaybeChannelBoundType:
"""`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
- def bind(self, channel):
+ def bind(
+ self: _MaybeChannelBoundType, channel: Channel
+ ) -> _MaybeChannelBoundType:
"""Create copy of the instance that is bound to a channel."""
return copy(self).maybe_bind(channel)
- def maybe_bind(self, channel):
+ def maybe_bind(
+ self: _MaybeChannelBoundType, channel: Channel
+ ) -> _MaybeChannelBoundType:
"""Bind instance to channel if not already bound."""
if not self.is_bound and channel:
self._channel = maybe_channel(channel)
@@ -79,7 +101,7 @@ class MaybeChannelBound(Object):
self._is_bound = True
return self
- def revive(self, channel):
+ def revive(self, channel: Channel) -> None:
"""Revive channel after the connection has been re-established.
Used by :meth:`~kombu.Connection.ensure`.
@@ -89,13 +111,13 @@ class MaybeChannelBound(Object):
self._channel = channel
self.when_bound()
- def when_bound(self):
+ def when_bound(self) -> None:
"""Callback called when the class is bound."""
- def __repr__(self):
+ def __repr__(self) -> str:
return self._repr_entity(type(self).__name__)
- def _repr_entity(self, item=''):
+ def _repr_entity(self, item: str = '') -> str:
item = item or type(self).__name__
if self.is_bound:
return '<{} bound to chan:{}>'.format(
@@ -103,12 +125,12 @@ class MaybeChannelBound(Object):
return f'<unbound {item}>'
@property
- def is_bound(self):
+ def is_bound(self) -> bool:
"""Flag set if the channel is bound."""
return self._is_bound and self._channel is not None
@property
- def channel(self):
+ def channel(self) -> Channel:
"""Current channel if the object is bound."""
channel = self._channel
if channel is None:
diff --git a/kombu/connection.py b/kombu/connection.py
index 8246db97..793c4d2c 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -4,10 +4,11 @@ from __future__ import annotations
import os
import socket
+import sys
from contextlib import contextmanager
from itertools import count, cycle
from operator import itemgetter
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
try:
from ssl import CERT_NONE
@@ -30,6 +31,13 @@ from .utils.objects import cached_property
from .utils.url import as_url, maybe_sanitize_url, parse_url, quote, urlparse
if TYPE_CHECKING:
+ from kombu.transport.virtual import Channel
+
+ if sys.version_info < (3, 10):
+ from typing_extensions import TypeGuard
+ else:
+ from typing import TypeGuard
+
from types import TracebackType
__all__ = ('Connection', 'ConnectionPool', 'ChannelPool')
@@ -891,7 +899,7 @@ class Connection:
return self._connection
@property
- def default_channel(self):
+ def default_channel(self) -> Channel:
"""Default channel.
Created upon access and closed when the connection is closed.
@@ -1054,7 +1062,7 @@ class ChannelPool(Resource):
return channel
-def maybe_channel(channel):
+def maybe_channel(channel: Channel | Connection) -> Channel:
"""Get channel from object.
Return the default channel if argument is a connection instance,
@@ -1065,5 +1073,5 @@ def maybe_channel(channel):
return channel
-def is_connection(obj):
+def is_connection(obj: Any) -> TypeGuard[Connection]:
return isinstance(obj, Connection)