diff options
Diffstat (limited to 'src/blinker/base.py')
-rw-r--r-- | src/blinker/base.py | 111 |
1 files changed, 75 insertions, 36 deletions
diff --git a/src/blinker/base.py b/src/blinker/base.py index 8d41721..997fef5 100644 --- a/src/blinker/base.py +++ b/src/blinker/base.py @@ -7,12 +7,17 @@ each manages its own receivers and message emission. The :func:`signal` function provides singleton behavior for named signals. """ +import typing as t from collections import defaultdict from contextlib import contextmanager from warnings import warn from weakref import WeakValueDictionary +import typing_extensions as te + +from blinker._utilities import annotatable_weakref from blinker._utilities import hashable_identity +from blinker._utilities import IdentityType from blinker._utilities import is_coroutine_function from blinker._utilities import lazy_property from blinker._utilities import reference @@ -24,6 +29,14 @@ ANY = symbol("ANY") ANY.__doc__ = 'Token for "any sender".' ANY_ID = 0 +T_callable = t.TypeVar("T_callable", bound=t.Callable) + +T = t.TypeVar("T") +P = te.ParamSpec("P") + +AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]] +SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]] + class Signal: """A notification emitter.""" @@ -33,7 +46,7 @@ class Signal: ANY = ANY @lazy_property - def receiver_connected(self): + def receiver_connected(self) -> "Signal": """Emitted after each :meth:`connect`. The signal sender is the signal instance, and the :meth:`connect` @@ -45,7 +58,7 @@ class Signal: return Signal(doc="Emitted after a receiver connects.") @lazy_property - def receiver_disconnected(self): + def receiver_disconnected(self) -> "Signal": """Emitted after :meth:`disconnect`. The sender is the signal instance, and the :meth:`disconnect` arguments @@ -68,7 +81,7 @@ class Signal: """ return Signal(doc="Emitted after a receiver disconnects.") - def __init__(self, doc=None): + def __init__(self, doc: t.Optional[str] = None) -> None: """ :param doc: optional. If provided, will be assigned to the signal's __doc__ attribute. @@ -82,13 +95,17 @@ class Signal: #: internal :class:`Signal` implementation, however the boolean value #: of the mapping is useful as an extremely efficient check to see if #: any receivers are connected to the signal. - self.receivers = {} + self.receivers: t.Dict[ + IdentityType, t.Union[t.Callable, annotatable_weakref] + ] = {} self.is_muted = False - self._by_receiver = defaultdict(set) - self._by_sender = defaultdict(set) - self._weak_senders = {} + self._by_receiver: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set) + self._by_sender: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set) + self._weak_senders: t.Dict[IdentityType, annotatable_weakref] = {} - def connect(self, receiver, sender=ANY, weak=True): + def connect( + self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True + ) -> T_callable: """Connect *receiver* to signal events sent by *sender*. :param receiver: A callable. Will be invoked by :meth:`send` with @@ -108,11 +125,13 @@ class Signal: """ receiver_id = hashable_identity(receiver) + receiver_ref: t.Union[annotatable_weakref, T_callable] if weak: receiver_ref = reference(receiver, self._cleanup_receiver) receiver_ref.receiver_id = receiver_id else: receiver_ref = receiver + sender_id: IdentityType if sender is ANY: sender_id = ANY_ID else: @@ -153,7 +172,9 @@ class Signal: raise e return receiver - def connect_via(self, sender, weak=False): + def connect_via( + self, sender: t.Any, weak: bool = False + ) -> t.Callable[[T_callable], T_callable]: """Connect the decorated function as a receiver for *sender*. :param sender: Any object or :obj:`ANY`. The decorated function @@ -175,14 +196,16 @@ class Signal: """ - def decorator(fn): + def decorator(fn: T_callable) -> T_callable: self.connect(fn, sender, weak) return fn return decorator @contextmanager - def connected_to(self, receiver, sender=ANY): + def connected_to( + self, receiver: t.Callable, sender: t.Any = ANY + ) -> t.Generator[None, None, None]: """Execute a block with the signal temporarily connected to *receiver*. :param receiver: a receiver callable @@ -212,7 +235,7 @@ class Signal: self.disconnect(receiver) @contextmanager - def muted(self): + def muted(self) -> t.Generator[None, None, None]: """Context manager for temporarily disabling signal. Useful for test purposes. """ @@ -224,7 +247,9 @@ class Signal: finally: self.is_muted = False - def temporarily_connected_to(self, receiver, sender=ANY): + def temporarily_connected_to( + self, receiver: t.Callable, sender: t.Any = ANY + ) -> t.ContextManager[None]: """An alias for :meth:`connected_to`. :param receiver: a receiver callable @@ -243,7 +268,12 @@ class Signal: ) return self.connected_to(receiver, sender) - def send(self, *sender, _async_wrapper=None, **kwargs): + def send( + self, + *sender: t.Any, + _async_wrapper: t.Optional[AsyncWrapperType] = None, + **kwargs: t.Any, + ) -> t.List[t.Tuple[t.Callable, t.Any]]: """Emit this signal on behalf of *sender*, passing on ``kwargs``. Returns a list of 2-tuples, pairing receivers with their return @@ -266,10 +296,15 @@ class Signal: if _async_wrapper is None: raise RuntimeError("Cannot send to a coroutine function") receiver = _async_wrapper(receiver) - results.append((receiver, receiver(sender, **kwargs))) + results.append((receiver, receiver(sender, **kwargs))) # type: ignore return results - async def send_async(self, *sender, _sync_wrapper=None, **kwargs): + async def send_async( + self, + *sender: t.Any, + _sync_wrapper: t.Optional[SyncWrapperType] = None, + **kwargs: t.Any, + ) -> t.List[t.Tuple[t.Callable, t.Any]]: """Emit this signal on behalf of *sender*, passing on ``kwargs``. Returns a list of 2-tuples, pairing receivers with their return @@ -291,8 +326,8 @@ class Signal: if not is_coroutine_function(receiver): if _sync_wrapper is None: raise RuntimeError("Cannot send to a non-coroutine function") - receiver = _sync_wrapper(receiver) - results.append((receiver, await receiver(sender, **kwargs))) + receiver = _sync_wrapper(receiver) # type: ignore + results.append((receiver, await receiver(sender, **kwargs))) # type: ignore return results def _extract_sender(self, sender): @@ -318,7 +353,7 @@ class Signal: sender = sender[0] return sender - def has_receivers_for(self, sender): + def has_receivers_for(self, sender: t.Any) -> bool: """True if there is probably a receiver for *sender*. Performs an optimistic check only. Does not guarantee that all @@ -334,7 +369,9 @@ class Signal: return False return hashable_identity(sender) in self._by_sender - def receivers_for(self, sender): + def receivers_for( + self, sender: t.Any + ) -> t.Generator[t.Union[t.Callable, annotatable_weakref], None, None]: """Iterate all live receivers listening for *sender*.""" # TODO: test receivers_for(ANY) if self.receivers: @@ -353,9 +390,10 @@ class Signal: self._disconnect(receiver_id, ANY_ID) continue receiver = strong + receiver = t.cast(t.Union[t.Callable, annotatable_weakref], receiver) yield receiver - def disconnect(self, receiver, sender=ANY): + def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None: """Disconnect *receiver* from this signal's events. :param receiver: a previously :meth:`connected<connect>` callable @@ -364,6 +402,7 @@ class Signal: to disconnect from all senders. Defaults to ``ANY``. """ + sender_id: IdentityType if sender is ANY: sender_id = ANY_ID else: @@ -377,7 +416,7 @@ class Signal: ): self.receiver_disconnected.send(self, receiver=receiver, sender=sender) - def _disconnect(self, receiver_id, sender_id): + def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None: if sender_id == ANY_ID: if self._by_receiver.pop(receiver_id, False): for bucket in self._by_sender.values(): @@ -387,19 +426,19 @@ class Signal: self._by_sender[sender_id].discard(receiver_id) self._by_receiver[receiver_id].discard(sender_id) - def _cleanup_receiver(self, receiver_ref): + def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None: """Disconnect a receiver from all senders.""" - self._disconnect(receiver_ref.receiver_id, ANY_ID) + self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID) - def _cleanup_sender(self, sender_ref): + def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None: """Disconnect all receivers from a sender.""" - sender_id = sender_ref.sender_id + sender_id = t.cast(IdentityType, sender_ref.sender_id) assert sender_id != ANY_ID self._weak_senders.pop(sender_id, None) for receiver_id in self._by_sender.pop(sender_id, ()): self._by_receiver[receiver_id].discard(sender_id) - def _cleanup_bookkeeping(self): + def _cleanup_bookkeeping(self) -> None: """Prune unused sender/receiver bookkeeping. Not threadsafe. Connecting & disconnecting leave behind a small amount of bookkeeping @@ -425,7 +464,7 @@ class Signal: if not bucket: mapping.pop(_id, None) - def _clear_state(self): + def _clear_state(self) -> None: """Throw away all signal state. Useful for unit tests.""" self._weak_senders.clear() self.receivers.clear() @@ -456,13 +495,13 @@ call signature. This global signal is planned to be removed in 1.6. class NamedSignal(Signal): """A named generic notification emitter.""" - def __init__(self, name, doc=None): + def __init__(self, name: str, doc: t.Optional[str] = None) -> None: Signal.__init__(self, doc) #: The name of this signal. self.name = name - def __repr__(self): + def __repr__(self) -> str: base = Signal.__repr__(self) return f"{base[:-1]}; {self.name!r}>" @@ -470,16 +509,16 @@ class NamedSignal(Signal): class Namespace(dict): """A mapping of signal names to signals.""" - def signal(self, name, doc=None): + def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal: """Return the :class:`NamedSignal` *name*, creating it if required. Repeated calls to this function will return the same signal object. """ try: - return self[name] + return self[name] # type: ignore except KeyError: - return self.setdefault(name, NamedSignal(name, doc)) + return self.setdefault(name, NamedSignal(name, doc)) # type: ignore class WeakNamespace(WeakValueDictionary): @@ -493,16 +532,16 @@ class WeakNamespace(WeakValueDictionary): """ - def signal(self, name, doc=None): + def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal: """Return the :class:`NamedSignal` *name*, creating it if required. Repeated calls to this function will return the same signal object. """ try: - return self[name] + return self[name] # type: ignore except KeyError: - return self.setdefault(name, NamedSignal(name, doc)) + return self.setdefault(name, NamedSignal(name, doc)) # type: ignore signal = Namespace().signal |