summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 16:11:13 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:44 +0300
commit9568bf2f1297c87ec1b93306b79de925fb2da08e (patch)
tree2969e7951f8883c41f1359ebbc98bcf85a3fdad6 /src
parent0a6b0f683edee8bf22d85dc655ad61a8285fd312 (diff)
downloadapscheduler-9568bf2f1297c87ec1b93306b79de925fb2da08e.tar.gz
Implemented one-shot event subscriptions
Such subscriptions are delivered the first matching event and then unsubscribed automatically.
Diffstat (limited to 'src')
-rw-r--r--src/apscheduler/abc.py5
-rw-r--r--src/apscheduler/eventbrokers/async_local.py26
-rw-r--r--src/apscheduler/eventbrokers/base.py14
-rw-r--r--src/apscheduler/eventbrokers/local.py13
-rw-r--r--src/apscheduler/schedulers/sync.py2
-rw-r--r--src/apscheduler/workers/sync.py2
6 files changed, 40 insertions, 22 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py
index 3d85eed..58d74cd 100644
--- a/src/apscheduler/abc.py
+++ b/src/apscheduler/abc.py
@@ -92,13 +92,16 @@ class EventSource(metaclass=ABCMeta):
@abstractmethod
def subscribe(
self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None
+ event_types: Optional[Iterable[type[Event]]] = None,
+ *,
+ one_shot: bool = False
) -> Subscription:
"""
Subscribe to events from this event source.
:param callback: callable to be called with the event object when an event is published
:param event_types: an iterable of concrete Event classes to subscribe to
+ :param one_shot: if ``True``, automatically unsubscribe after the first matching event
"""
diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py
index 590f0cb..79030f3 100644
--- a/src/apscheduler/eventbrokers/async_local.py
+++ b/src/apscheduler/eventbrokers/async_local.py
@@ -36,15 +36,21 @@ class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker):
await self.publish_local(event)
async def publish_local(self, event: Event) -> None:
- async def deliver_event(func: Callable[[Event], Any]) -> None:
- try:
- retval = func(event)
- if iscoroutine(retval):
- await retval
- except BaseException:
- self._logger.exception('Error delivering %s event', event.__class__.__name__)
-
event_type = type(event)
- for subscription in self._subscriptions.values():
+ one_shot_tokens: list[object] = []
+ for token, subscription in self._subscriptions.items():
if subscription.event_types is None or event_type in subscription.event_types:
- self._task_group.start_soon(deliver_event, subscription.callback)
+ self._task_group.start_soon(self._deliver_event, subscription.callback, event)
+ if subscription.one_shot:
+ one_shot_tokens.append(subscription.token)
+
+ for token in one_shot_tokens:
+ super().unsubscribe(token)
+
+ async def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
+ try:
+ retval = func(event)
+ if iscoroutine(retval):
+ await retval
+ except BaseException:
+ self._logger.exception('Error delivering %s event', event.__class__.__name__)
diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py
index 23bae8a..9947f68 100644
--- a/src/apscheduler/eventbrokers/base.py
+++ b/src/apscheduler/eventbrokers/base.py
@@ -16,31 +16,33 @@ from ..exceptions import DeserializationError
class LocalSubscription(Subscription):
callback: Callable[[Event], Any]
event_types: Optional[set[type[Event]]]
+ one_shot: bool
+ token: object
_source: BaseEventBroker
- _token: object
def unsubscribe(self) -> None:
- self._source.unsubscribe(self._token)
+ self._source.unsubscribe(self.token)
@attr.define(eq=False)
class BaseEventBroker(EventBroker):
_logger: Logger = attr.field(init=False)
- _subscriptions: dict[object, Subscription] = attr.field(init=False, factory=dict)
+ _subscriptions: dict[object, LocalSubscription] = attr.field(init=False, factory=dict)
def __attrs_post_init__(self) -> None:
self._logger = getLogger(self.__class__.__module__)
def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> Subscription:
+ event_types: Optional[Iterable[type[Event]]] = None, *,
+ one_shot: bool = False) -> Subscription:
types = set(event_types) if event_types else None
token = object()
- subscription = LocalSubscription(callback, types, self, token)
+ subscription = LocalSubscription(callback, types, one_shot, token, self)
self._subscriptions[token] = subscription
return subscription
def unsubscribe(self, token: object) -> None:
- self._subscriptions.pop(token)
+ self._subscriptions.pop(token, None)
class DistributedEventBrokerMixin:
diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py
index 24de3eb..acf0c9a 100644
--- a/src/apscheduler/eventbrokers/local.py
+++ b/src/apscheduler/eventbrokers/local.py
@@ -31,13 +31,14 @@ class LocalEventBroker(BaseEventBroker):
del self._executor
def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> Subscription:
+ event_types: Optional[Iterable[type[Event]]] = None, *,
+ one_shot: bool = False) -> Subscription:
if iscoroutinefunction(callback):
raise ValueError('Coroutine functions are not supported as callbacks on a synchronous '
'event source')
with self._subscriptions_lock:
- return super().subscribe(callback, event_types)
+ return super().subscribe(callback, event_types, one_shot=one_shot)
def unsubscribe(self, token: object) -> None:
with self._subscriptions_lock:
@@ -49,9 +50,15 @@ class LocalEventBroker(BaseEventBroker):
def publish_local(self, event: Event) -> None:
event_type = type(event)
with self._subscriptions_lock:
- for subscription in self._subscriptions.values():
+ one_shot_tokens: list[object] = []
+ for token, subscription in self._subscriptions.items():
if subscription.event_types is None or event_type in subscription.event_types:
self._executor.submit(self._deliver_event, subscription.callback, event)
+ if subscription.one_shot:
+ one_shot_tokens.append(subscription.token)
+
+ for token in one_shot_tokens:
+ super().unsubscribe(token)
def _deliver_event(self, func: Callable[[Event], Any], event: Event) -> None:
try:
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index dd3f37e..221b284 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -73,7 +73,7 @@ class Scheduler:
# Start the scheduler and return when it has signalled readiness or raised an exception
start_future: Future[Event] = Future()
- with self._events.subscribe(start_future.set_result):
+ with self._events.subscribe(start_future.set_result, one_shot=True):
run_future = self._executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 824cce8..be805ee 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -64,7 +64,7 @@ class Worker:
# Start the worker and return when it has signalled readiness or raised an exception
start_future: Future[None] = Future()
- with self._events.subscribe(start_future.set_result):
+ with self._events.subscribe(start_future.set_result, one_shot=True):
self._executor = ThreadPoolExecutor(1)
run_future = self._executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)