diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-01-25 13:48:32 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-02-15 02:20:18 +0200 |
commit | 7bb246cc4a907a9ae3bface1d18e58778b7c9029 (patch) | |
tree | 13e03ba01fe520aa16eb0e8eb8854d9b8593fbed /src/apscheduler/schedulers | |
parent | 23030abe95d790a63cfff6346a271804658f6f3c (diff) | |
download | apscheduler-7bb246cc4a907a9ae3bface1d18e58778b7c9029.tar.gz |
Fixed scheduler waking up unnecessarily
Added checks to make the scheduler only wake up if there is a new or updated schedule that would trigger sooner than the previously nearest known schedule
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 31 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 29 |
2 files changed, 39 insertions, 21 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6d3287f..274e872 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -6,7 +6,7 @@ import random from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Mapping +from typing import Any, Callable, Iterable, Mapping, cast from uuid import UUID, uuid4 import anyio @@ -41,6 +41,7 @@ class AsyncScheduler: _state: RunState = attrs.field(init=False, default=RunState.stopped) _wakeup_event: anyio.Event = attrs.field(init=False) + _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _worker: AsyncWorker | None = attrs.field(init=False, default=None) _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker) _exit_stack: AsyncExitStack = attrs.field(init=False) @@ -98,8 +99,14 @@ class AsyncScheduler: del self._wakeup_event def _schedule_added_or_modified(self, event: Event) -> None: - self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__) - self._wakeup_event.set() + event_ = cast('ScheduleAdded | ScheduleUpdated', event) + if ( + not self._wakeup_deadline + or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline) + ): + self.logger.debug('Detected a %s event – waking up the scheduler', + type(event).__name__) + self._wakeup_event.set() async def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None, @@ -299,19 +306,21 @@ class AsyncScheduler: # schedule is due or the scheduler is explicitly woken up wait_time = None if len(schedules) < 100: - next_fire_time = await self.data_store.get_next_schedule_run_time() - if next_fire_time: - wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds() + self._wakeup_deadline = await self.data_store.get_next_schedule_run_time() + if self._wakeup_deadline: + wait_time = ( + self._wakeup_deadline - datetime.now(timezone.utc) + ).total_seconds() self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', - wait_time, next_fire_time) + wait_time, self._wakeup_deadline) else: self.logger.debug('Waiting for any due schedules to appear') + + with move_on_after(wait_time): + await self._wakeup_event.wait() + self._wakeup_event = anyio.Event() else: self.logger.debug('Processing more schedules on the next iteration') - - with move_on_after(wait_time): - await self._wakeup_event.wait() - self._wakeup_event = anyio.Event() except get_cancelled_exc_class(): pass except BaseException as exc: diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 7ead9f8..3e9f196 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -8,7 +8,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Mapping +from typing import Any, Callable, Iterable, Mapping, cast from uuid import UUID, uuid4 import attrs @@ -40,6 +40,7 @@ class Scheduler: _state: RunState = attrs.field(init=False, default=RunState.stopped) _wakeup_event: threading.Event = attrs.field(init=False) + _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _worker: Worker | None = attrs.field(init=False, default=None) _events: LocalEventBroker = attrs.field(init=False, factory=LocalEventBroker) _exit_stack: ExitStack = attrs.field(init=False) @@ -108,8 +109,14 @@ class Scheduler: del self._wakeup_event def _schedule_added_or_modified(self, event: Event) -> None: - self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__) - self._wakeup_event.set() + event_ = cast('ScheduleAdded | ScheduleUpdated', event) + if ( + not self._wakeup_deadline + or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline) + ): + self.logger.debug('Detected a %s event – waking up the scheduler', + type(event).__name__) + self._wakeup_event.set() def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None, @@ -309,18 +316,20 @@ class Scheduler: # schedule is due or the scheduler is explicitly woken up wait_time = None if len(schedules) < 100: - next_fire_time = self.data_store.get_next_schedule_run_time() - if next_fire_time: - wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds() + self._wakeup_deadline = self.data_store.get_next_schedule_run_time() + if self._wakeup_deadline: + wait_time = ( + self._wakeup_deadline - datetime.now(timezone.utc) + ).total_seconds() self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', - wait_time, next_fire_time) + wait_time, self._wakeup_deadline) else: self.logger.debug('Waiting for any due schedules to appear') + + if self._wakeup_event.wait(wait_time): + self._wakeup_event = threading.Event() else: self.logger.debug('Processing more schedules on the next iteration') - - if self._wakeup_event.wait(wait_time): - self._wakeup_event = threading.Event() except BaseException as exc: self._state = RunState.stopped self.logger.exception('Scheduler crashed') |