diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-12 14:50:22 +0300 |
commit | 48722053dfb43de077df18a139abb16b0a7f7e24 (patch) | |
tree | bd55e709e0d4c02619ef0ec54390a8f792da2f74 /src/apscheduler/schedulers/async_.py | |
parent | a58fca290e0831d377d496a69101e5e3dc4c604e (diff) | |
download | apscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz |
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 26 |
1 files changed, 11 insertions, 15 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 790baf4..dbece0e 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -17,14 +17,13 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import ( - Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken) +from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated from ..marshalling import callable_to_ref from ..structures import Task from ..workers.async_ import AsyncWorker -class AsyncScheduler(EventSource): +class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" data_store: AsyncDataStore @@ -49,6 +48,10 @@ class AsyncScheduler(EventSource): self.data_store = data_store @property + def events(self) -> EventSource: + return self._events + + @property def worker(self) -> Optional[AsyncWorker]: return self._worker @@ -58,15 +61,15 @@ class AsyncScheduler(EventSource): await self._exit_stack.__aenter__() await self._exit_stack.enter_async_context(self._events) - # Initialize the data store + # Initialize the data store and start relaying events to the scheduler's event broker await self._exit_stack.enter_async_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store.events) - self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) + relay_subscription = self.data_store.events.subscribe(self._events.publish) + self._exit_stack.callback(relay_subscription.unsubscribe) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_token = self.data_store.events.subscribe( + wakeup_subscription = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) + self._exit_stack.callback(wakeup_subscription.unsubscribe) # Start the built-in worker, if configured to do so if self.start_worker: @@ -86,13 +89,6 @@ class AsyncScheduler(EventSource): del self._task_group del self._wakeup_event - def subscribe(self, callback: Callable[[Event], Any], - event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken: - return self._events.subscribe(callback, event_types) - - def unsubscribe(self, token: SubscriptionToken) -> None: - self._events.unsubscribe(token) - async def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, |