summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/async_.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r--src/apscheduler/schedulers/async_.py68
1 files changed, 49 insertions, 19 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 889596d..90d041c 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -11,13 +11,8 @@ from uuid import UUID, uuid4
import anyio
import attrs
-from anyio import (
- TASK_STATUS_IGNORED,
- create_task_group,
- get_cancelled_exc_class,
- move_on_after,
-)
-from anyio.abc import TaskGroup
+from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after
+from anyio.abc import TaskGroup, TaskStatus
from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger
from ..context import current_scheduler
@@ -57,9 +52,9 @@ class AsyncScheduler:
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
_state: RunState = attrs.field(init=False, default=RunState.stopped)
+ _task_group: TaskGroup | None = attrs.field(init=False, default=None)
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
- _task_group: TaskGroup | None = attrs.field(init=False, default=None)
_schedule_added_subscription: Subscription = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
@@ -69,12 +64,11 @@ class AsyncScheduler:
async def __aenter__(self):
self._task_group = create_task_group()
await self._task_group.__aenter__()
- await self._task_group.start(self.run_until_stopped)
+ await self._task_group.start(self._run)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
- self._state = RunState.stopping
- self._wakeup_event.set()
+ await self.stop()
await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
self._task_group = None
@@ -244,7 +238,37 @@ class AsyncScheduler:
else:
raise RuntimeError(f"Unknown job outcome: {result.outcome}")
- async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None:
+ async def stop(self) -> None:
+ """
+ Signal the scheduler that it should stop processing schedules.
+
+ This method does not wait for the scheduler to actually stop.
+ For that, see :meth:`wait_until_stopped`.
+
+ """
+ if self._state is RunState.started:
+ self._state = RunState.stopping
+ self._wakeup_event.set()
+
+ async def wait_until_stopped(self) -> None:
+ """
+ Wait until the scheduler is in the "stopped" or "stopping" state.
+
+ If the scheduler is already stopped or in the process of stopping, this method
+ returns immediately. Otherwise, it waits until the scheduler posts the
+ ``SchedulerStopped`` event.
+
+ """
+ if self._state in (RunState.stopped, RunState.stopping):
+ return
+
+ event = anyio.Event()
+ with self.event_broker.subscribe(
+ lambda ev: event.set(), {SchedulerStopped}, one_shot=True
+ ):
+ await event.wait()
+
+ async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.stopped:
raise RuntimeError(
f'Cannot start the scheduler when it is in the "{self._state}" '
@@ -268,8 +292,10 @@ class AsyncScheduler:
)
# Wake up the scheduler if the data store emits a significant schedule event
- self._schedule_added_subscription = self.event_broker.subscribe(
- self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ exit_stack.enter_context(
+ self.event_broker.subscribe(
+ self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ )
)
# Start the built-in worker, if configured to do so
@@ -396,16 +422,20 @@ class AsyncScheduler:
self.logger.debug(
"Processing more schedules on the next iteration"
)
- except get_cancelled_exc_class():
- pass
except BaseException as exc:
- self.logger.exception("Scheduler crashed")
exception = exc
raise
- else:
- self.logger.info("Scheduler stopped")
finally:
self._state = RunState.stopped
+ if isinstance(exception, Exception):
+ self.logger.exception("Scheduler crashed")
+ elif exception:
+ self.logger.info(
+ f"Scheduler stopped due to {exception.__class__.__name__}"
+ )
+ else:
+ self.logger.info("Scheduler stopped")
+
with move_on_after(3, shield=True):
await self.event_broker.publish_local(
SchedulerStopped(exception=exception)