summaryrefslogtreecommitdiff
path: root/src/blinker/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/blinker/base.py')
-rw-r--r--src/blinker/base.py111
1 files changed, 75 insertions, 36 deletions
diff --git a/src/blinker/base.py b/src/blinker/base.py
index 8d41721..997fef5 100644
--- a/src/blinker/base.py
+++ b/src/blinker/base.py
@@ -7,12 +7,17 @@ each manages its own receivers and message emission.
The :func:`signal` function provides singleton behavior for named signals.
"""
+import typing as t
from collections import defaultdict
from contextlib import contextmanager
from warnings import warn
from weakref import WeakValueDictionary
+import typing_extensions as te
+
+from blinker._utilities import annotatable_weakref
from blinker._utilities import hashable_identity
+from blinker._utilities import IdentityType
from blinker._utilities import is_coroutine_function
from blinker._utilities import lazy_property
from blinker._utilities import reference
@@ -24,6 +29,14 @@ ANY = symbol("ANY")
ANY.__doc__ = 'Token for "any sender".'
ANY_ID = 0
+T_callable = t.TypeVar("T_callable", bound=t.Callable)
+
+T = t.TypeVar("T")
+P = te.ParamSpec("P")
+
+AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
+SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
+
class Signal:
"""A notification emitter."""
@@ -33,7 +46,7 @@ class Signal:
ANY = ANY
@lazy_property
- def receiver_connected(self):
+ def receiver_connected(self) -> "Signal":
"""Emitted after each :meth:`connect`.
The signal sender is the signal instance, and the :meth:`connect`
@@ -45,7 +58,7 @@ class Signal:
return Signal(doc="Emitted after a receiver connects.")
@lazy_property
- def receiver_disconnected(self):
+ def receiver_disconnected(self) -> "Signal":
"""Emitted after :meth:`disconnect`.
The sender is the signal instance, and the :meth:`disconnect` arguments
@@ -68,7 +81,7 @@ class Signal:
"""
return Signal(doc="Emitted after a receiver disconnects.")
- def __init__(self, doc=None):
+ def __init__(self, doc: t.Optional[str] = None) -> None:
"""
:param doc: optional. If provided, will be assigned to the signal's
__doc__ attribute.
@@ -82,13 +95,17 @@ class Signal:
#: internal :class:`Signal` implementation, however the boolean value
#: of the mapping is useful as an extremely efficient check to see if
#: any receivers are connected to the signal.
- self.receivers = {}
+ self.receivers: t.Dict[
+ IdentityType, t.Union[t.Callable, annotatable_weakref]
+ ] = {}
self.is_muted = False
- self._by_receiver = defaultdict(set)
- self._by_sender = defaultdict(set)
- self._weak_senders = {}
+ self._by_receiver: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
+ self._by_sender: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
+ self._weak_senders: t.Dict[IdentityType, annotatable_weakref] = {}
- def connect(self, receiver, sender=ANY, weak=True):
+ def connect(
+ self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
+ ) -> T_callable:
"""Connect *receiver* to signal events sent by *sender*.
:param receiver: A callable. Will be invoked by :meth:`send` with
@@ -108,11 +125,13 @@ class Signal:
"""
receiver_id = hashable_identity(receiver)
+ receiver_ref: t.Union[annotatable_weakref, T_callable]
if weak:
receiver_ref = reference(receiver, self._cleanup_receiver)
receiver_ref.receiver_id = receiver_id
else:
receiver_ref = receiver
+ sender_id: IdentityType
if sender is ANY:
sender_id = ANY_ID
else:
@@ -153,7 +172,9 @@ class Signal:
raise e
return receiver
- def connect_via(self, sender, weak=False):
+ def connect_via(
+ self, sender: t.Any, weak: bool = False
+ ) -> t.Callable[[T_callable], T_callable]:
"""Connect the decorated function as a receiver for *sender*.
:param sender: Any object or :obj:`ANY`. The decorated function
@@ -175,14 +196,16 @@ class Signal:
"""
- def decorator(fn):
+ def decorator(fn: T_callable) -> T_callable:
self.connect(fn, sender, weak)
return fn
return decorator
@contextmanager
- def connected_to(self, receiver, sender=ANY):
+ def connected_to(
+ self, receiver: t.Callable, sender: t.Any = ANY
+ ) -> t.Generator[None, None, None]:
"""Execute a block with the signal temporarily connected to *receiver*.
:param receiver: a receiver callable
@@ -212,7 +235,7 @@ class Signal:
self.disconnect(receiver)
@contextmanager
- def muted(self):
+ def muted(self) -> t.Generator[None, None, None]:
"""Context manager for temporarily disabling signal.
Useful for test purposes.
"""
@@ -224,7 +247,9 @@ class Signal:
finally:
self.is_muted = False
- def temporarily_connected_to(self, receiver, sender=ANY):
+ def temporarily_connected_to(
+ self, receiver: t.Callable, sender: t.Any = ANY
+ ) -> t.ContextManager[None]:
"""An alias for :meth:`connected_to`.
:param receiver: a receiver callable
@@ -243,7 +268,12 @@ class Signal:
)
return self.connected_to(receiver, sender)
- def send(self, *sender, _async_wrapper=None, **kwargs):
+ def send(
+ self,
+ *sender: t.Any,
+ _async_wrapper: t.Optional[AsyncWrapperType] = None,
+ **kwargs: t.Any,
+ ) -> t.List[t.Tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -266,10 +296,15 @@ class Signal:
if _async_wrapper is None:
raise RuntimeError("Cannot send to a coroutine function")
receiver = _async_wrapper(receiver)
- results.append((receiver, receiver(sender, **kwargs)))
+ results.append((receiver, receiver(sender, **kwargs))) # type: ignore
return results
- async def send_async(self, *sender, _sync_wrapper=None, **kwargs):
+ async def send_async(
+ self,
+ *sender: t.Any,
+ _sync_wrapper: t.Optional[SyncWrapperType] = None,
+ **kwargs: t.Any,
+ ) -> t.List[t.Tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -291,8 +326,8 @@ class Signal:
if not is_coroutine_function(receiver):
if _sync_wrapper is None:
raise RuntimeError("Cannot send to a non-coroutine function")
- receiver = _sync_wrapper(receiver)
- results.append((receiver, await receiver(sender, **kwargs)))
+ receiver = _sync_wrapper(receiver) # type: ignore
+ results.append((receiver, await receiver(sender, **kwargs))) # type: ignore
return results
def _extract_sender(self, sender):
@@ -318,7 +353,7 @@ class Signal:
sender = sender[0]
return sender
- def has_receivers_for(self, sender):
+ def has_receivers_for(self, sender: t.Any) -> bool:
"""True if there is probably a receiver for *sender*.
Performs an optimistic check only. Does not guarantee that all
@@ -334,7 +369,9 @@ class Signal:
return False
return hashable_identity(sender) in self._by_sender
- def receivers_for(self, sender):
+ def receivers_for(
+ self, sender: t.Any
+ ) -> t.Generator[t.Union[t.Callable, annotatable_weakref], None, None]:
"""Iterate all live receivers listening for *sender*."""
# TODO: test receivers_for(ANY)
if self.receivers:
@@ -353,9 +390,10 @@ class Signal:
self._disconnect(receiver_id, ANY_ID)
continue
receiver = strong
+ receiver = t.cast(t.Union[t.Callable, annotatable_weakref], receiver)
yield receiver
- def disconnect(self, receiver, sender=ANY):
+ def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
"""Disconnect *receiver* from this signal's events.
:param receiver: a previously :meth:`connected<connect>` callable
@@ -364,6 +402,7 @@ class Signal:
to disconnect from all senders. Defaults to ``ANY``.
"""
+ sender_id: IdentityType
if sender is ANY:
sender_id = ANY_ID
else:
@@ -377,7 +416,7 @@ class Signal:
):
self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
- def _disconnect(self, receiver_id, sender_id):
+ def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None:
if sender_id == ANY_ID:
if self._by_receiver.pop(receiver_id, False):
for bucket in self._by_sender.values():
@@ -387,19 +426,19 @@ class Signal:
self._by_sender[sender_id].discard(receiver_id)
self._by_receiver[receiver_id].discard(sender_id)
- def _cleanup_receiver(self, receiver_ref):
+ def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None:
"""Disconnect a receiver from all senders."""
- self._disconnect(receiver_ref.receiver_id, ANY_ID)
+ self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID)
- def _cleanup_sender(self, sender_ref):
+ def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None:
"""Disconnect all receivers from a sender."""
- sender_id = sender_ref.sender_id
+ sender_id = t.cast(IdentityType, sender_ref.sender_id)
assert sender_id != ANY_ID
self._weak_senders.pop(sender_id, None)
for receiver_id in self._by_sender.pop(sender_id, ()):
self._by_receiver[receiver_id].discard(sender_id)
- def _cleanup_bookkeeping(self):
+ def _cleanup_bookkeeping(self) -> None:
"""Prune unused sender/receiver bookkeeping. Not threadsafe.
Connecting & disconnecting leave behind a small amount of bookkeeping
@@ -425,7 +464,7 @@ class Signal:
if not bucket:
mapping.pop(_id, None)
- def _clear_state(self):
+ def _clear_state(self) -> None:
"""Throw away all signal state. Useful for unit tests."""
self._weak_senders.clear()
self.receivers.clear()
@@ -456,13 +495,13 @@ call signature. This global signal is planned to be removed in 1.6.
class NamedSignal(Signal):
"""A named generic notification emitter."""
- def __init__(self, name, doc=None):
+ def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
Signal.__init__(self, doc)
#: The name of this signal.
self.name = name
- def __repr__(self):
+ def __repr__(self) -> str:
base = Signal.__repr__(self)
return f"{base[:-1]}; {self.name!r}>"
@@ -470,16 +509,16 @@ class NamedSignal(Signal):
class Namespace(dict):
"""A mapping of signal names to signals."""
- def signal(self, name, doc=None):
+ def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name]
+ return self[name] # type: ignore
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc))
+ return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
class WeakNamespace(WeakValueDictionary):
@@ -493,16 +532,16 @@ class WeakNamespace(WeakValueDictionary):
"""
- def signal(self, name, doc=None):
+ def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name]
+ return self[name] # type: ignore
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc))
+ return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
signal = Namespace().signal