diff options
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 68 |
1 files changed, 49 insertions, 19 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 889596d..90d041c 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -11,13 +11,8 @@ from uuid import UUID, uuid4 import anyio import attrs -from anyio import ( - TASK_STATUS_IGNORED, - create_task_group, - get_cancelled_exc_class, - move_on_after, -) -from anyio.abc import TaskGroup +from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after +from anyio.abc import TaskGroup, TaskStatus from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger from ..context import current_scheduler @@ -57,9 +52,9 @@ class AsyncScheduler: logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) _state: RunState = attrs.field(init=False, default=RunState.stopped) + _task_group: TaskGroup | None = attrs.field(init=False, default=None) _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) - _task_group: TaskGroup | None = attrs.field(init=False, default=None) _schedule_added_subscription: Subscription = attrs.field(init=False) def __attrs_post_init__(self) -> None: @@ -69,12 +64,11 @@ class AsyncScheduler: async def __aenter__(self): self._task_group = create_task_group() await self._task_group.__aenter__() - await self._task_group.start(self.run_until_stopped) + await self._task_group.start(self._run) return self async def __aexit__(self, exc_type, exc_val, exc_tb): - self._state = RunState.stopping - self._wakeup_event.set() + await self.stop() await self._task_group.__aexit__(exc_type, exc_val, exc_tb) self._task_group = None @@ -244,7 +238,37 @@ class AsyncScheduler: else: raise RuntimeError(f"Unknown job outcome: {result.outcome}") - async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None: + async def stop(self) -> None: + """ + Signal the scheduler that it should stop processing schedules. + + This method does not wait for the scheduler to actually stop. + For that, see :meth:`wait_until_stopped`. + + """ + if self._state is RunState.started: + self._state = RunState.stopping + self._wakeup_event.set() + + async def wait_until_stopped(self) -> None: + """ + Wait until the scheduler is in the "stopped" or "stopping" state. + + If the scheduler is already stopped or in the process of stopping, this method + returns immediately. Otherwise, it waits until the scheduler posts the + ``SchedulerStopped`` event. + + """ + if self._state in (RunState.stopped, RunState.stopping): + return + + event = anyio.Event() + with self.event_broker.subscribe( + lambda ev: event.set(), {SchedulerStopped}, one_shot=True + ): + await event.wait() + + async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: if self._state is not RunState.stopped: raise RuntimeError( f'Cannot start the scheduler when it is in the "{self._state}" ' @@ -268,8 +292,10 @@ class AsyncScheduler: ) # Wake up the scheduler if the data store emits a significant schedule event - self._schedule_added_subscription = self.event_broker.subscribe( - self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + exit_stack.enter_context( + self.event_broker.subscribe( + self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + ) ) # Start the built-in worker, if configured to do so @@ -396,16 +422,20 @@ class AsyncScheduler: self.logger.debug( "Processing more schedules on the next iteration" ) - except get_cancelled_exc_class(): - pass except BaseException as exc: - self.logger.exception("Scheduler crashed") exception = exc raise - else: - self.logger.info("Scheduler stopped") finally: self._state = RunState.stopped + if isinstance(exception, Exception): + self.logger.exception("Scheduler crashed") + elif exception: + self.logger.info( + f"Scheduler stopped due to {exception.__class__.__name__}" + ) + else: + self.logger.info("Scheduler stopped") + with move_on_after(3, shield=True): await self.event_broker.publish_local( SchedulerStopped(exception=exception) |