diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-07-26 15:14:15 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-07-27 13:02:08 +0300 |
commit | ad4fd007ab4e72e6eda7fee263b8976c19aa993b (patch) | |
tree | 26fce3398c61af3d686fe0c277fabd7be6d53e12 /src/apscheduler/schedulers | |
parent | aca1ee95abf41272c04cbc60535f56a1f1587e7f (diff) | |
download | apscheduler-ad4fd007ab4e72e6eda7fee263b8976c19aa993b.tar.gz |
Improved the scheduler lifecycle management
Both sync and async schedulers now have consistently working stop() and wait_until_stopped() methods.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 68 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 101 |
2 files changed, 121 insertions, 48 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 889596d..90d041c 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -11,13 +11,8 @@ from uuid import UUID, uuid4 import anyio import attrs -from anyio import ( - TASK_STATUS_IGNORED, - create_task_group, - get_cancelled_exc_class, - move_on_after, -) -from anyio.abc import TaskGroup +from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after +from anyio.abc import TaskGroup, TaskStatus from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger from ..context import current_scheduler @@ -57,9 +52,9 @@ class AsyncScheduler: logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) _state: RunState = attrs.field(init=False, default=RunState.stopped) + _task_group: TaskGroup | None = attrs.field(init=False, default=None) _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) - _task_group: TaskGroup | None = attrs.field(init=False, default=None) _schedule_added_subscription: Subscription = attrs.field(init=False) def __attrs_post_init__(self) -> None: @@ -69,12 +64,11 @@ class AsyncScheduler: async def __aenter__(self): self._task_group = create_task_group() await self._task_group.__aenter__() - await self._task_group.start(self.run_until_stopped) + await self._task_group.start(self._run) return self async def __aexit__(self, exc_type, exc_val, exc_tb): - self._state = RunState.stopping - self._wakeup_event.set() + await self.stop() await self._task_group.__aexit__(exc_type, exc_val, exc_tb) self._task_group = None @@ -244,7 +238,37 @@ class AsyncScheduler: else: raise RuntimeError(f"Unknown job outcome: {result.outcome}") - async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None: + async def stop(self) -> None: + """ + Signal the scheduler that it should stop processing schedules. + + This method does not wait for the scheduler to actually stop. + For that, see :meth:`wait_until_stopped`. + + """ + if self._state is RunState.started: + self._state = RunState.stopping + self._wakeup_event.set() + + async def wait_until_stopped(self) -> None: + """ + Wait until the scheduler is in the "stopped" or "stopping" state. + + If the scheduler is already stopped or in the process of stopping, this method + returns immediately. Otherwise, it waits until the scheduler posts the + ``SchedulerStopped`` event. + + """ + if self._state in (RunState.stopped, RunState.stopping): + return + + event = anyio.Event() + with self.event_broker.subscribe( + lambda ev: event.set(), {SchedulerStopped}, one_shot=True + ): + await event.wait() + + async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: if self._state is not RunState.stopped: raise RuntimeError( f'Cannot start the scheduler when it is in the "{self._state}" ' @@ -268,8 +292,10 @@ class AsyncScheduler: ) # Wake up the scheduler if the data store emits a significant schedule event - self._schedule_added_subscription = self.event_broker.subscribe( - self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + exit_stack.enter_context( + self.event_broker.subscribe( + self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + ) ) # Start the built-in worker, if configured to do so @@ -396,16 +422,20 @@ class AsyncScheduler: self.logger.debug( "Processing more schedules on the next iteration" ) - except get_cancelled_exc_class(): - pass except BaseException as exc: - self.logger.exception("Scheduler crashed") exception = exc raise - else: - self.logger.info("Scheduler stopped") finally: self._state = RunState.stopped + if isinstance(exception, Exception): + self.logger.exception("Scheduler crashed") + elif exception: + self.logger.info( + f"Scheduler stopped due to {exception.__class__.__name__}" + ) + else: + self.logger.info("Scheduler stopped") + with move_on_after(3, shield=True): await self.event_broker.publish_local( SchedulerStopped(exception=exception) diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 5ea0c8a..af689cc 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -74,14 +74,16 @@ class Scheduler: exc_tb: TracebackType, ) -> None: self.stop() + self._join_thread() - def _ensure_services_ready(self) -> None: + def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None: """Ensure that the data store and event broker have been initialized.""" + stack = exit_stack or self._exit_stack with self._lock: if not self._services_initialized: self._services_initialized = True self.event_broker.start() - self._exit_stack.push( + stack.push( lambda *exc_info: self.event_broker.stop( force=exc_info[0] is not None ) @@ -89,12 +91,13 @@ class Scheduler: # Initialize the data store self.data_store.start(self.event_broker) - self._exit_stack.push( + stack.push( lambda *exc_info: self.data_store.stop( force=exc_info[0] is not None ) ) - atexit.register(self.stop) + if not exit_stack: + atexit.register(self._exit_stack.close) def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) @@ -106,6 +109,11 @@ class Scheduler: ) self._wakeup_event.set() + def _join_thread(self) -> None: + if self._thread: + self._thread.join() + self._thread = None + def add_schedule( self, func_or_task_id: str | Callable, @@ -267,6 +275,15 @@ class Scheduler: raise RuntimeError(f"Unknown job outcome: {result.outcome}") def start_in_background(self) -> None: + with self._lock: + if self._state is not RunState.stopped: + raise RuntimeError( + f'Cannot start the scheduler when it is in the "{self._state}" ' + f"state" + ) + + self._state = RunState.starting + start_future: Future[None] = Future() self._thread = threading.Thread( target=self._run, args=[start_future], daemon=True @@ -278,39 +295,63 @@ class Scheduler: self._thread = None raise + atexit.register(self._join_thread) atexit.register(self.stop) def stop(self) -> None: - atexit.unregister(self.stop) + """ + Signal the scheduler that it should stop processing schedules. + + This method does not wait for the scheduler to actually stop. + For that, see :meth:`wait_until_stopped`. + + """ with self._lock: if self._state is RunState.started: self._state = RunState.stopping self._wakeup_event.set() - if self._thread and threading.current_thread() != self._thread: - self._thread.join() - self._thread = None + def wait_until_stopped(self) -> None: + """ + Wait until the scheduler is in the "stopped" or "stopping" state. + + If the scheduler is already stopped or in the process of stopping, this method + returns immediately. Otherwise, it waits until the scheduler posts the + ``SchedulerStopped`` event. + + """ + with self._lock: + if self._state in (RunState.stopped, RunState.stopping): + return + + event = threading.Event() + sub = self.event_broker.subscribe( + lambda ev: event.set(), {SchedulerStopped}, one_shot=True + ) - self._exit_stack.close() + with sub: + event.wait() def run_until_stopped(self) -> None: + with self._lock: + if self._state is not RunState.stopped: + raise RuntimeError( + f'Cannot start the scheduler when it is in the "{self._state}" ' + f"state" + ) + + self._state = RunState.starting + self._run(None) def _run(self, start_future: Future[None] | None) -> None: - with ExitStack() as exit_stack: + assert self._state is RunState.starting + with self._exit_stack.pop_all() as exit_stack: try: - with self._lock: - if self._state is not RunState.stopped: - raise RuntimeError( - f'Cannot start the scheduler when it is in the "{self._state}" ' - f"state" - ) - - self._state = RunState.starting + self._ensure_services_ready(exit_stack) - self._ensure_services_ready() - - # Wake up the scheduler if the data store emits a significant schedule event + # Wake up the scheduler if the data store emits a significant schedule + # event exit_stack.enter_context( self.data_store.events.subscribe( self._schedule_added_or_modified, @@ -340,6 +381,7 @@ class Scheduler: if start_future: start_future.set_result(None) + exception: BaseException | None = None try: while self._state is RunState.started: schedules = self.data_store.acquire_schedules(self.identity, 100) @@ -451,16 +493,17 @@ class Scheduler: "Processing more schedules on the next iteration" ) except BaseException as exc: + exception = exc + raise + finally: self._state = RunState.stopped - if isinstance(exc, Exception): + if isinstance(exception, Exception): self.logger.exception("Scheduler crashed") - else: + elif exception: self.logger.info( - f"Scheduler stopped due to {exc.__class__.__name__}" + f"Scheduler stopped due to {exception.__class__.__name__}" ) + else: + self.logger.info("Scheduler stopped") - self.event_broker.publish_local(SchedulerStopped(exception=exc)) - else: - self._state = RunState.stopped - self.logger.info("Scheduler stopped") - self.event_broker.publish_local(SchedulerStopped()) + self.event_broker.publish_local(SchedulerStopped(exception=exception)) |