summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-07-26 15:14:15 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-07-27 13:02:08 +0300
commitad4fd007ab4e72e6eda7fee263b8976c19aa993b (patch)
tree26fce3398c61af3d686fe0c277fabd7be6d53e12 /src
parentaca1ee95abf41272c04cbc60535f56a1f1587e7f (diff)
downloadapscheduler-ad4fd007ab4e72e6eda7fee263b8976c19aa993b.tar.gz
Improved the scheduler lifecycle management
Both sync and async schedulers now have consistently working stop() and wait_until_stopped() methods.
Diffstat (limited to 'src')
-rw-r--r--src/apscheduler/schedulers/async_.py68
-rw-r--r--src/apscheduler/schedulers/sync.py101
2 files changed, 121 insertions, 48 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 889596d..90d041c 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -11,13 +11,8 @@ from uuid import UUID, uuid4
import anyio
import attrs
-from anyio import (
- TASK_STATUS_IGNORED,
- create_task_group,
- get_cancelled_exc_class,
- move_on_after,
-)
-from anyio.abc import TaskGroup
+from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after
+from anyio.abc import TaskGroup, TaskStatus
from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger
from ..context import current_scheduler
@@ -57,9 +52,9 @@ class AsyncScheduler:
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
_state: RunState = attrs.field(init=False, default=RunState.stopped)
+ _task_group: TaskGroup | None = attrs.field(init=False, default=None)
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
- _task_group: TaskGroup | None = attrs.field(init=False, default=None)
_schedule_added_subscription: Subscription = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
@@ -69,12 +64,11 @@ class AsyncScheduler:
async def __aenter__(self):
self._task_group = create_task_group()
await self._task_group.__aenter__()
- await self._task_group.start(self.run_until_stopped)
+ await self._task_group.start(self._run)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
- self._state = RunState.stopping
- self._wakeup_event.set()
+ await self.stop()
await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
self._task_group = None
@@ -244,7 +238,37 @@ class AsyncScheduler:
else:
raise RuntimeError(f"Unknown job outcome: {result.outcome}")
- async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None:
+ async def stop(self) -> None:
+ """
+ Signal the scheduler that it should stop processing schedules.
+
+ This method does not wait for the scheduler to actually stop.
+ For that, see :meth:`wait_until_stopped`.
+
+ """
+ if self._state is RunState.started:
+ self._state = RunState.stopping
+ self._wakeup_event.set()
+
+ async def wait_until_stopped(self) -> None:
+ """
+ Wait until the scheduler is in the "stopped" or "stopping" state.
+
+ If the scheduler is already stopped or in the process of stopping, this method
+ returns immediately. Otherwise, it waits until the scheduler posts the
+ ``SchedulerStopped`` event.
+
+ """
+ if self._state in (RunState.stopped, RunState.stopping):
+ return
+
+ event = anyio.Event()
+ with self.event_broker.subscribe(
+ lambda ev: event.set(), {SchedulerStopped}, one_shot=True
+ ):
+ await event.wait()
+
+ async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.stopped:
raise RuntimeError(
f'Cannot start the scheduler when it is in the "{self._state}" '
@@ -268,8 +292,10 @@ class AsyncScheduler:
)
# Wake up the scheduler if the data store emits a significant schedule event
- self._schedule_added_subscription = self.event_broker.subscribe(
- self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ exit_stack.enter_context(
+ self.event_broker.subscribe(
+ self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ )
)
# Start the built-in worker, if configured to do so
@@ -396,16 +422,20 @@ class AsyncScheduler:
self.logger.debug(
"Processing more schedules on the next iteration"
)
- except get_cancelled_exc_class():
- pass
except BaseException as exc:
- self.logger.exception("Scheduler crashed")
exception = exc
raise
- else:
- self.logger.info("Scheduler stopped")
finally:
self._state = RunState.stopped
+ if isinstance(exception, Exception):
+ self.logger.exception("Scheduler crashed")
+ elif exception:
+ self.logger.info(
+ f"Scheduler stopped due to {exception.__class__.__name__}"
+ )
+ else:
+ self.logger.info("Scheduler stopped")
+
with move_on_after(3, shield=True):
await self.event_broker.publish_local(
SchedulerStopped(exception=exception)
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 5ea0c8a..af689cc 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -74,14 +74,16 @@ class Scheduler:
exc_tb: TracebackType,
) -> None:
self.stop()
+ self._join_thread()
- def _ensure_services_ready(self) -> None:
+ def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None:
"""Ensure that the data store and event broker have been initialized."""
+ stack = exit_stack or self._exit_stack
with self._lock:
if not self._services_initialized:
self._services_initialized = True
self.event_broker.start()
- self._exit_stack.push(
+ stack.push(
lambda *exc_info: self.event_broker.stop(
force=exc_info[0] is not None
)
@@ -89,12 +91,13 @@ class Scheduler:
# Initialize the data store
self.data_store.start(self.event_broker)
- self._exit_stack.push(
+ stack.push(
lambda *exc_info: self.data_store.stop(
force=exc_info[0] is not None
)
)
- atexit.register(self.stop)
+ if not exit_stack:
+ atexit.register(self._exit_stack.close)
def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
@@ -106,6 +109,11 @@ class Scheduler:
)
self._wakeup_event.set()
+ def _join_thread(self) -> None:
+ if self._thread:
+ self._thread.join()
+ self._thread = None
+
def add_schedule(
self,
func_or_task_id: str | Callable,
@@ -267,6 +275,15 @@ class Scheduler:
raise RuntimeError(f"Unknown job outcome: {result.outcome}")
def start_in_background(self) -> None:
+ with self._lock:
+ if self._state is not RunState.stopped:
+ raise RuntimeError(
+ f'Cannot start the scheduler when it is in the "{self._state}" '
+ f"state"
+ )
+
+ self._state = RunState.starting
+
start_future: Future[None] = Future()
self._thread = threading.Thread(
target=self._run, args=[start_future], daemon=True
@@ -278,39 +295,63 @@ class Scheduler:
self._thread = None
raise
+ atexit.register(self._join_thread)
atexit.register(self.stop)
def stop(self) -> None:
- atexit.unregister(self.stop)
+ """
+ Signal the scheduler that it should stop processing schedules.
+
+ This method does not wait for the scheduler to actually stop.
+ For that, see :meth:`wait_until_stopped`.
+
+ """
with self._lock:
if self._state is RunState.started:
self._state = RunState.stopping
self._wakeup_event.set()
- if self._thread and threading.current_thread() != self._thread:
- self._thread.join()
- self._thread = None
+ def wait_until_stopped(self) -> None:
+ """
+ Wait until the scheduler is in the "stopped" or "stopping" state.
+
+ If the scheduler is already stopped or in the process of stopping, this method
+ returns immediately. Otherwise, it waits until the scheduler posts the
+ ``SchedulerStopped`` event.
+
+ """
+ with self._lock:
+ if self._state in (RunState.stopped, RunState.stopping):
+ return
+
+ event = threading.Event()
+ sub = self.event_broker.subscribe(
+ lambda ev: event.set(), {SchedulerStopped}, one_shot=True
+ )
- self._exit_stack.close()
+ with sub:
+ event.wait()
def run_until_stopped(self) -> None:
+ with self._lock:
+ if self._state is not RunState.stopped:
+ raise RuntimeError(
+ f'Cannot start the scheduler when it is in the "{self._state}" '
+ f"state"
+ )
+
+ self._state = RunState.starting
+
self._run(None)
def _run(self, start_future: Future[None] | None) -> None:
- with ExitStack() as exit_stack:
+ assert self._state is RunState.starting
+ with self._exit_stack.pop_all() as exit_stack:
try:
- with self._lock:
- if self._state is not RunState.stopped:
- raise RuntimeError(
- f'Cannot start the scheduler when it is in the "{self._state}" '
- f"state"
- )
-
- self._state = RunState.starting
+ self._ensure_services_ready(exit_stack)
- self._ensure_services_ready()
-
- # Wake up the scheduler if the data store emits a significant schedule event
+ # Wake up the scheduler if the data store emits a significant schedule
+ # event
exit_stack.enter_context(
self.data_store.events.subscribe(
self._schedule_added_or_modified,
@@ -340,6 +381,7 @@ class Scheduler:
if start_future:
start_future.set_result(None)
+ exception: BaseException | None = None
try:
while self._state is RunState.started:
schedules = self.data_store.acquire_schedules(self.identity, 100)
@@ -451,16 +493,17 @@ class Scheduler:
"Processing more schedules on the next iteration"
)
except BaseException as exc:
+ exception = exc
+ raise
+ finally:
self._state = RunState.stopped
- if isinstance(exc, Exception):
+ if isinstance(exception, Exception):
self.logger.exception("Scheduler crashed")
- else:
+ elif exception:
self.logger.info(
- f"Scheduler stopped due to {exc.__class__.__name__}"
+ f"Scheduler stopped due to {exception.__class__.__name__}"
)
+ else:
+ self.logger.info("Scheduler stopped")
- self.event_broker.publish_local(SchedulerStopped(exception=exc))
- else:
- self._state = RunState.stopped
- self.logger.info("Scheduler stopped")
- self.event_broker.publish_local(SchedulerStopped())
+ self.event_broker.publish_local(SchedulerStopped(exception=exception))