diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 16:11:13 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-13 01:07:44 +0300 |
commit | 9568bf2f1297c87ec1b93306b79de925fb2da08e (patch) | |
tree | 2969e7951f8883c41f1359ebbc98bcf85a3fdad6 /src | |
parent | 0a6b0f683edee8bf22d85dc655ad61a8285fd312 (diff) | |
download | apscheduler-9568bf2f1297c87ec1b93306b79de925fb2da08e.tar.gz |
Implemented one-shot event subscriptions
Such subscriptions are delivered the first matching event and then unsubscribed automatically.
Diffstat (limited to 'src')
-rw-r--r-- | src/apscheduler/abc.py | 5 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/async_local.py | 26 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/base.py | 14 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/local.py | 13 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 2 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 2 |
6 files changed, 40 insertions, 22 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index 3d85eed..58d74cd 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -92,13 +92,16 @@ class EventSource(metaclass=ABCMeta): @abstractmethod def subscribe( self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None + event_types: Optional[Iterable[type[Event]]] = None, + *, + one_shot: bool = False ) -> Subscription: """ Subscribe to events from this event source. :param callback: callable to be called with the event object when an event is published :param event_types: an iterable of concrete Event classes to subscribe to + :param one_shot: if ``True``, automatically unsubscribe after the first matching event """ diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py index 590f0cb..79030f3 100644 --- a/src/apscheduler/eventbrokers/async_local.py +++ b/src/apscheduler/eventbrokers/async_local.py @@ -36,15 +36,21 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker): await self.publish_local(event) async def publish_local(self, event: Event) -> None: - async def deliver_event(func: Callable[[Event], Any]) -> None: - try: - retval = func(event) - if iscoroutine(retval): - await retval - except BaseException: - self._logger.exception('Error delivering %s event', event.__class__.__name__) - event_type = type(event) - for subscription in self._subscriptions.values(): + one_shot_tokens: list[object] = [] + for token, subscription in self._subscriptions.items(): if subscription.event_types is None or event_type in subscription.event_types: - self._task_group.start_soon(deliver_event, subscription.callback) + self._task_group.start_soon(self._deliver_event, subscription.callback, event) + if subscription.one_shot: + one_shot_tokens.append(subscription.token) + + for token in one_shot_tokens: + super().unsubscribe(token) + + async def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None: + try: + retval = func(event) + if iscoroutine(retval): + await retval + except BaseException: + self._logger.exception('Error delivering %s event', event.__class__.__name__) diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py index 23bae8a..9947f68 100644 --- a/src/apscheduler/eventbrokers/base.py +++ b/src/apscheduler/eventbrokers/base.py @@ -16,31 +16,33 @@ from ..exceptions import DeserializationError class LocalSubscription(Subscription): callback: Callable[[Event], Any] event_types: Optional[set[type[Event]]] + one_shot: bool + token: object _source: BaseEventBroker - _token: object def unsubscribe(self) -> None: - self._source.unsubscribe(self._token) + self._source.unsubscribe(self.token) @attr.define(eq=False) class BaseEventBroker(EventBroker): _logger: Logger = attr.field(init=False) - _subscriptions: dict[object, Subscription] = attr.field(init=False, factory=dict) + _subscriptions: dict[object, LocalSubscription] = attr.field(init=False, factory=dict) def __attrs_post_init__(self) -> None: self._logger = getLogger(self.__class__.__module__) def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> Subscription: + event_types: Optional[Iterable[type[Event]]] = None, *, + one_shot: bool = False) -> Subscription: types = set(event_types) if event_types else None token = object() - subscription = LocalSubscription(callback, types, self, token) + subscription = LocalSubscription(callback, types, one_shot, token, self) self._subscriptions[token] = subscription return subscription def unsubscribe(self, token: object) -> None: - self._subscriptions.pop(token) + self._subscriptions.pop(token, None) class DistributedEventBrokerMixin: diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py index 24de3eb..acf0c9a 100644 --- a/src/apscheduler/eventbrokers/local.py +++ b/src/apscheduler/eventbrokers/local.py @@ -31,13 +31,14 @@ class LocalEventBroker(BaseEventBroker): del self._executor def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> Subscription: + event_types: Optional[Iterable[type[Event]]] = None, *, + one_shot: bool = False) -> Subscription: if iscoroutinefunction(callback): raise ValueError('Coroutine functions are not supported as callbacks on a synchronous ' 'event source') with self._subscriptions_lock: - return super().subscribe(callback, event_types) + return super().subscribe(callback, event_types, one_shot=one_shot) def unsubscribe(self, token: object) -> None: with self._subscriptions_lock: @@ -49,9 +50,15 @@ class LocalEventBroker(BaseEventBroker): def publish_local(self, event: Event) -> None: event_type = type(event) with self._subscriptions_lock: - for subscription in self._subscriptions.values(): + one_shot_tokens: list[object] = [] + for token, subscription in self._subscriptions.items(): if subscription.event_types is None or event_type in subscription.event_types: self._executor.submit(self._deliver_event, subscription.callback, event) + if subscription.one_shot: + one_shot_tokens.append(subscription.token) + + for token in one_shot_tokens: + super().unsubscribe(token) def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None: try: diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index dd3f37e..221b284 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -73,7 +73,7 @@ class Scheduler: # Start the scheduler and return when it has signalled readiness or raised an exception start_future: Future[Event] = Future() - with self._events.subscribe(start_future.set_result): + with self._events.subscribe(start_future.set_result, one_shot=True): run_future = self._executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 824cce8..be805ee 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -64,7 +64,7 @@ class Worker: # Start the worker and return when it has signalled readiness or raised an exception start_future: Future[None] = Future() - with self._events.subscribe(start_future.set_result): + with self._events.subscribe(start_future.set_result, one_shot=True): self._executor = ThreadPoolExecutor(1) run_future = self._executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) |