summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/async_.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 14:50:22 +0300
commit48722053dfb43de077df18a139abb16b0a7f7e24 (patch)
treebd55e709e0d4c02619ef0ec54390a8f792da2f74 /src/apscheduler/schedulers/async_.py
parenta58fca290e0831d377d496a69101e5e3dc4c604e (diff)
downloadapscheduler-48722053dfb43de077df18a139abb16b0a7f7e24.tar.gz
Improved the event subscription system
The subscribe() method now returns a subscription which has the unsubscribe() method in itself.
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r--src/apscheduler/schedulers/async_.py26
1 files changed, 11 insertions, 15 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 790baf4..dbece0e 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -17,14 +17,13 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import (
- Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken)
+from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
from ..marshalling import callable_to_ref
from ..structures import Task
from ..workers.async_ import AsyncWorker
-class AsyncScheduler(EventSource):
+class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
data_store: AsyncDataStore
@@ -49,6 +48,10 @@ class AsyncScheduler(EventSource):
self.data_store = data_store
@property
+ def events(self) -> EventSource:
+ return self._events
+
+ @property
def worker(self) -> Optional[AsyncWorker]:
return self._worker
@@ -58,15 +61,15 @@ class AsyncScheduler(EventSource):
await self._exit_stack.__aenter__()
await self._exit_stack.enter_async_context(self._events)
- # Initialize the data store
+ # Initialize the data store and start relaying events to the scheduler's event broker
await self._exit_stack.enter_async_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store.events)
- self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
+ relay_subscription = self.data_store.events.subscribe(self._events.publish)
+ self._exit_stack.callback(relay_subscription.unsubscribe)
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_token = self.data_store.events.subscribe(
+ wakeup_subscription = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
+ self._exit_stack.callback(wakeup_subscription.unsubscribe)
# Start the built-in worker, if configured to do so
if self.start_worker:
@@ -86,13 +89,6 @@ class AsyncScheduler(EventSource):
del self._task_group
del self._wakeup_event
- def subscribe(self, callback: Callable[[Event], Any],
- event_types: Optional[Iterable[type[Event]]] = None) -> SubscriptionToken:
- return self._events.subscribe(callback, event_types)
-
- def unsubscribe(self, token: SubscriptionToken) -> None:
- self._events.unsubscribe(token)
-
async def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,