diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-01-25 13:48:32 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-02-15 02:20:18 +0200 |
commit | 7bb246cc4a907a9ae3bface1d18e58778b7c9029 (patch) | |
tree | 13e03ba01fe520aa16eb0e8eb8854d9b8593fbed /src/apscheduler/schedulers/async_.py | |
parent | 23030abe95d790a63cfff6346a271804658f6f3c (diff) | |
download | apscheduler-7bb246cc4a907a9ae3bface1d18e58778b7c9029.tar.gz |
Fixed scheduler waking up unnecessarily
Added checks to make the scheduler only wake up if there is a new or updated schedule that would trigger sooner than the previously nearest known schedule
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 31 |
1 files changed, 20 insertions, 11 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6d3287f..274e872 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -6,7 +6,7 @@ import random from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Mapping +from typing import Any, Callable, Iterable, Mapping, cast from uuid import UUID, uuid4 import anyio @@ -41,6 +41,7 @@ class AsyncScheduler: _state: RunState = attrs.field(init=False, default=RunState.stopped) _wakeup_event: anyio.Event = attrs.field(init=False) + _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _worker: AsyncWorker | None = attrs.field(init=False, default=None) _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker) _exit_stack: AsyncExitStack = attrs.field(init=False) @@ -98,8 +99,14 @@ class AsyncScheduler: del self._wakeup_event def _schedule_added_or_modified(self, event: Event) -> None: - self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__) - self._wakeup_event.set() + event_ = cast('ScheduleAdded | ScheduleUpdated', event) + if ( + not self._wakeup_deadline + or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline) + ): + self.logger.debug('Detected a %s event – waking up the scheduler', + type(event).__name__) + self._wakeup_event.set() async def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None, @@ -299,19 +306,21 @@ class AsyncScheduler: # schedule is due or the scheduler is explicitly woken up wait_time = None if len(schedules) < 100: - next_fire_time = await self.data_store.get_next_schedule_run_time() - if next_fire_time: - wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds() + self._wakeup_deadline = await self.data_store.get_next_schedule_run_time() + if self._wakeup_deadline: + wait_time = ( + self._wakeup_deadline - datetime.now(timezone.utc) + ).total_seconds() self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', - wait_time, next_fire_time) + wait_time, self._wakeup_deadline) else: self.logger.debug('Waiting for any due schedules to appear') + + with move_on_after(wait_time): + await self._wakeup_event.wait() + self._wakeup_event = anyio.Event() else: self.logger.debug('Processing more schedules on the next iteration') - - with move_on_after(wait_time): - await self._wakeup_event.wait() - self._wakeup_event = anyio.Event() except get_cancelled_exc_class(): pass except BaseException as exc: |