summaryrefslogtreecommitdiff
path: root/src/blinker/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/blinker/base.py')
-rw-r--r--src/blinker/base.py80
1 files changed, 42 insertions, 38 deletions
diff --git a/src/blinker/base.py b/src/blinker/base.py
index 997fef5..80e24e2 100644
--- a/src/blinker/base.py
+++ b/src/blinker/base.py
@@ -7,14 +7,14 @@ each manages its own receivers and message emission.
The :func:`signal` function provides singleton behavior for named signals.
"""
+from __future__ import annotations
+
import typing as t
from collections import defaultdict
from contextlib import contextmanager
from warnings import warn
from weakref import WeakValueDictionary
-import typing_extensions as te
-
from blinker._utilities import annotatable_weakref
from blinker._utilities import hashable_identity
from blinker._utilities import IdentityType
@@ -24,18 +24,20 @@ from blinker._utilities import reference
from blinker._utilities import symbol
from blinker._utilities import WeakTypes
+if t.TYPE_CHECKING:
+ import typing_extensions as te
-ANY = symbol("ANY")
-ANY.__doc__ = 'Token for "any sender".'
-ANY_ID = 0
+ T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any])
-T_callable = t.TypeVar("T_callable", bound=t.Callable)
+ T = t.TypeVar("T")
+ P = te.ParamSpec("P")
-T = t.TypeVar("T")
-P = te.ParamSpec("P")
+ AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
+ SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
-AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
-SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
+ANY = symbol("ANY")
+ANY.__doc__ = 'Token for "any sender".'
+ANY_ID = 0
class Signal:
@@ -46,7 +48,7 @@ class Signal:
ANY = ANY
@lazy_property
- def receiver_connected(self) -> "Signal":
+ def receiver_connected(self) -> Signal:
"""Emitted after each :meth:`connect`.
The signal sender is the signal instance, and the :meth:`connect`
@@ -58,7 +60,7 @@ class Signal:
return Signal(doc="Emitted after a receiver connects.")
@lazy_property
- def receiver_disconnected(self) -> "Signal":
+ def receiver_disconnected(self) -> Signal:
"""Emitted after :meth:`disconnect`.
The sender is the signal instance, and the :meth:`disconnect` arguments
@@ -81,7 +83,7 @@ class Signal:
"""
return Signal(doc="Emitted after a receiver disconnects.")
- def __init__(self, doc: t.Optional[str] = None) -> None:
+ def __init__(self, doc: str | None = None) -> None:
"""
:param doc: optional. If provided, will be assigned to the signal's
__doc__ attribute.
@@ -95,13 +97,11 @@ class Signal:
#: internal :class:`Signal` implementation, however the boolean value
#: of the mapping is useful as an extremely efficient check to see if
#: any receivers are connected to the signal.
- self.receivers: t.Dict[
- IdentityType, t.Union[t.Callable, annotatable_weakref]
- ] = {}
+ self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {}
self.is_muted = False
- self._by_receiver: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
- self._by_sender: t.Dict[IdentityType, t.Set[IdentityType]] = defaultdict(set)
- self._weak_senders: t.Dict[IdentityType, annotatable_weakref] = {}
+ self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict(set)
+ self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict(set)
+ self._weak_senders: dict[IdentityType, annotatable_weakref] = {}
def connect(
self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
@@ -125,7 +125,8 @@ class Signal:
"""
receiver_id = hashable_identity(receiver)
- receiver_ref: t.Union[annotatable_weakref, T_callable]
+ receiver_ref: T_callable | annotatable_weakref
+
if weak:
receiver_ref = reference(receiver, self._cleanup_receiver)
receiver_ref.receiver_id = receiver_id
@@ -271,9 +272,9 @@ class Signal:
def send(
self,
*sender: t.Any,
- _async_wrapper: t.Optional[AsyncWrapperType] = None,
+ _async_wrapper: AsyncWrapperType | None = None,
**kwargs: t.Any,
- ) -> t.List[t.Tuple[t.Callable, t.Any]]:
+ ) -> list[tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -296,15 +297,16 @@ class Signal:
if _async_wrapper is None:
raise RuntimeError("Cannot send to a coroutine function")
receiver = _async_wrapper(receiver)
- results.append((receiver, receiver(sender, **kwargs))) # type: ignore
+ result = receiver(sender, **kwargs) # type: ignore[call-arg]
+ results.append((receiver, result))
return results
async def send_async(
self,
*sender: t.Any,
- _sync_wrapper: t.Optional[SyncWrapperType] = None,
+ _sync_wrapper: SyncWrapperType | None = None,
**kwargs: t.Any,
- ) -> t.List[t.Tuple[t.Callable, t.Any]]:
+ ) -> list[tuple[t.Callable, t.Any]]:
"""Emit this signal on behalf of *sender*, passing on ``kwargs``.
Returns a list of 2-tuples, pairing receivers with their return
@@ -326,11 +328,12 @@ class Signal:
if not is_coroutine_function(receiver):
if _sync_wrapper is None:
raise RuntimeError("Cannot send to a non-coroutine function")
- receiver = _sync_wrapper(receiver) # type: ignore
- results.append((receiver, await receiver(sender, **kwargs))) # type: ignore
+ receiver = _sync_wrapper(receiver) # type: ignore[arg-type]
+ result = await receiver(sender, **kwargs) # type: ignore[call-arg, misc]
+ results.append((receiver, result))
return results
- def _extract_sender(self, sender):
+ def _extract_sender(self, sender: t.Any) -> t.Any:
if not self.receivers:
# Ensure correct signature even on no-op sends, disable with -O
# for lowest possible cost.
@@ -371,7 +374,7 @@ class Signal:
def receivers_for(
self, sender: t.Any
- ) -> t.Generator[t.Union[t.Callable, annotatable_weakref], None, None]:
+ ) -> t.Generator[t.Callable | annotatable_weakref, None, None]:
"""Iterate all live receivers listening for *sender*."""
# TODO: test receivers_for(ANY)
if self.receivers:
@@ -390,8 +393,7 @@ class Signal:
self._disconnect(receiver_id, ANY_ID)
continue
receiver = strong
- receiver = t.cast(t.Union[t.Callable, annotatable_weakref], receiver)
- yield receiver
+ yield receiver # type: ignore[misc]
def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
"""Disconnect *receiver* from this signal's events.
@@ -495,7 +497,7 @@ call signature. This global signal is planned to be removed in 1.6.
class NamedSignal(Signal):
"""A named generic notification emitter."""
- def __init__(self, name: str, doc: t.Optional[str] = None) -> None:
+ def __init__(self, name: str, doc: str | None = None) -> None:
Signal.__init__(self, doc)
#: The name of this signal.
@@ -509,16 +511,17 @@ class NamedSignal(Signal):
class Namespace(dict):
"""A mapping of signal names to signals."""
- def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
+ def signal(self, name: str, doc: str | None = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name] # type: ignore
+ return self[name] # type: ignore[no-any-return]
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
+ result = self.setdefault(name, NamedSignal(name, doc))
+ return result # type: ignore[no-any-return]
class WeakNamespace(WeakValueDictionary):
@@ -532,16 +535,17 @@ class WeakNamespace(WeakValueDictionary):
"""
- def signal(self, name: str, doc: t.Optional[str] = None) -> NamedSignal:
+ def signal(self, name: str, doc: str | None = None) -> NamedSignal:
"""Return the :class:`NamedSignal` *name*, creating it if required.
Repeated calls to this function will return the same signal object.
"""
try:
- return self[name] # type: ignore
+ return self[name] # type: ignore[no-any-return]
except KeyError:
- return self.setdefault(name, NamedSignal(name, doc)) # type: ignore
+ result = self.setdefault(name, NamedSignal(name, doc))
+ return result # type: ignore[no-any-return]
signal = Namespace().signal