diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-11 21:14:14 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-11 21:14:14 +0300 |
commit | 56afe91d5dc338db3440b2e9ecdea3e522dba30f (patch) | |
tree | 311380b0d953f09919d7e8c4c0a340507e5d0dc5 /src/apscheduler/schedulers/async_.py | |
parent | 7248a78e7e787b728b083aaa8199eeba3a3f3023 (diff) | |
download | apscheduler-56afe91d5dc338db3440b2e9ecdea3e522dba30f.tar.gz |
Implemented a pluggable event broker system
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index b60aed6..790baf4 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -16,9 +16,9 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger from ..datastores.async_adapter import AsyncDataStoreAdapter from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, RunState +from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - AsyncEventHub, Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, - SubscriptionToken) + Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken) from ..marshalling import callable_to_ref from ..structures import Task from ..workers.async_ import AsyncWorker @@ -40,7 +40,7 @@ class AsyncScheduler(EventSource): self.logger = logger or getLogger(__name__) self.start_worker = start_worker self._exit_stack = AsyncExitStack() - self._events = AsyncEventHub() + self._events = LocalAsyncEventBroker() data_store = data_store or MemoryDataStore() if isinstance(data_store, DataStore): @@ -60,13 +60,13 @@ class AsyncScheduler(EventSource): # Initialize the data store await self._exit_stack.enter_async_context(self.data_store) - relay_token = self._events.relay_events_from(self.data_store) - self._exit_stack.callback(self.data_store.unsubscribe, relay_token) + relay_token = self._events.relay_events_from(self.data_store.events) + self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token) # Wake up the scheduler if the data store emits a significant schedule event - wakeup_token = self.data_store.subscribe( + wakeup_token = self.data_store.events.subscribe( lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated}) - self._exit_stack.callback(self.data_store.unsubscribe, wakeup_token) + self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token) # Start the built-in worker, if configured to do so if self.start_worker: @@ -132,7 +132,7 @@ class AsyncScheduler(EventSource): # Signal that the scheduler has started self._state = RunState.started task_status.started() - self._events.publish(SchedulerStarted()) + await self._events.publish(SchedulerStarted()) try: while self._state is RunState.started: @@ -190,11 +190,11 @@ class AsyncScheduler(EventSource): pass except BaseException as exc: self._state = RunState.stopped - self._events.publish(SchedulerStopped(exception=exc)) + await self._events.publish(SchedulerStopped(exception=exc)) raise self._state = RunState.stopped - self._events.publish(SchedulerStopped()) + await self._events.publish(SchedulerStopped()) # async def stop(self, force: bool = False) -> None: # self._running = False |