summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/async_.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 21:14:14 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 21:14:14 +0300
commit56afe91d5dc338db3440b2e9ecdea3e522dba30f (patch)
tree311380b0d953f09919d7e8c4c0a340507e5d0dc5 /src/apscheduler/schedulers/async_.py
parent7248a78e7e787b728b083aaa8199eeba3a3f3023 (diff)
downloadapscheduler-56afe91d5dc338db3440b2e9ecdea3e522dba30f.tar.gz
Implemented a pluggable event broker system
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r--src/apscheduler/schedulers/async_.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index b60aed6..790baf4 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -16,9 +16,9 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger
from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, RunState
+from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- AsyncEventHub, Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated,
- SubscriptionToken)
+ Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated, SubscriptionToken)
from ..marshalling import callable_to_ref
from ..structures import Task
from ..workers.async_ import AsyncWorker
@@ -40,7 +40,7 @@ class AsyncScheduler(EventSource):
self.logger = logger or getLogger(__name__)
self.start_worker = start_worker
self._exit_stack = AsyncExitStack()
- self._events = AsyncEventHub()
+ self._events = LocalAsyncEventBroker()
data_store = data_store or MemoryDataStore()
if isinstance(data_store, DataStore):
@@ -60,13 +60,13 @@ class AsyncScheduler(EventSource):
# Initialize the data store
await self._exit_stack.enter_async_context(self.data_store)
- relay_token = self._events.relay_events_from(self.data_store)
- self._exit_stack.callback(self.data_store.unsubscribe, relay_token)
+ relay_token = self._events.relay_events_from(self.data_store.events)
+ self._exit_stack.callback(self.data_store.events.unsubscribe, relay_token)
# Wake up the scheduler if the data store emits a significant schedule event
- wakeup_token = self.data_store.subscribe(
+ wakeup_token = self.data_store.events.subscribe(
lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated})
- self._exit_stack.callback(self.data_store.unsubscribe, wakeup_token)
+ self._exit_stack.callback(self.data_store.events.unsubscribe, wakeup_token)
# Start the built-in worker, if configured to do so
if self.start_worker:
@@ -132,7 +132,7 @@ class AsyncScheduler(EventSource):
# Signal that the scheduler has started
self._state = RunState.started
task_status.started()
- self._events.publish(SchedulerStarted())
+ await self._events.publish(SchedulerStarted())
try:
while self._state is RunState.started:
@@ -190,11 +190,11 @@ class AsyncScheduler(EventSource):
pass
except BaseException as exc:
self._state = RunState.stopped
- self._events.publish(SchedulerStopped(exception=exc))
+ await self._events.publish(SchedulerStopped(exception=exc))
raise
self._state = RunState.stopped
- self._events.publish(SchedulerStopped())
+ await self._events.publish(SchedulerStopped())
# async def stop(self, force: bool = False) -> None:
# self._running = False