summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-01-25 13:48:32 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-02-15 02:20:18 +0200
commit7bb246cc4a907a9ae3bface1d18e58778b7c9029 (patch)
tree13e03ba01fe520aa16eb0e8eb8854d9b8593fbed /src/apscheduler/schedulers
parent23030abe95d790a63cfff6346a271804658f6f3c (diff)
downloadapscheduler-7bb246cc4a907a9ae3bface1d18e58778b7c9029.tar.gz
Fixed scheduler waking up unnecessarily
Added checks to make the scheduler only wake up if there is a new or updated schedule that would trigger sooner than the previously nearest known schedule
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py31
-rw-r--r--src/apscheduler/schedulers/sync.py29
2 files changed, 39 insertions, 21 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6d3287f..274e872 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -6,7 +6,7 @@ import random
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Mapping
+from typing import Any, Callable, Iterable, Mapping, cast
from uuid import UUID, uuid4
import anyio
@@ -41,6 +41,7 @@ class AsyncScheduler:
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_wakeup_event: anyio.Event = attrs.field(init=False)
+ _wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_worker: AsyncWorker | None = attrs.field(init=False, default=None)
_events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker)
_exit_stack: AsyncExitStack = attrs.field(init=False)
@@ -98,8 +99,14 @@ class AsyncScheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__)
- self._wakeup_event.set()
+ event_ = cast('ScheduleAdded | ScheduleUpdated', event)
+ if (
+ not self._wakeup_deadline
+ or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ ):
+ self.logger.debug('Detected a %s event – waking up the scheduler',
+ type(event).__name__)
+ self._wakeup_event.set()
async def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
@@ -299,19 +306,21 @@ class AsyncScheduler:
# schedule is due or the scheduler is explicitly woken up
wait_time = None
if len(schedules) < 100:
- next_fire_time = await self.data_store.get_next_schedule_run_time()
- if next_fire_time:
- wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds()
+ self._wakeup_deadline = await self.data_store.get_next_schedule_run_time()
+ if self._wakeup_deadline:
+ wait_time = (
+ self._wakeup_deadline - datetime.now(timezone.utc)
+ ).total_seconds()
self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, next_fire_time)
+ wait_time, self._wakeup_deadline)
else:
self.logger.debug('Waiting for any due schedules to appear')
+
+ with move_on_after(wait_time):
+ await self._wakeup_event.wait()
+ self._wakeup_event = anyio.Event()
else:
self.logger.debug('Processing more schedules on the next iteration')
-
- with move_on_after(wait_time):
- await self._wakeup_event.wait()
- self._wakeup_event = anyio.Event()
except get_cancelled_exc_class():
pass
except BaseException as exc:
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 7ead9f8..3e9f196 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -8,7 +8,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
-from typing import Any, Callable, Iterable, Mapping
+from typing import Any, Callable, Iterable, Mapping, cast
from uuid import UUID, uuid4
import attrs
@@ -40,6 +40,7 @@ class Scheduler:
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_wakeup_event: threading.Event = attrs.field(init=False)
+ _wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_worker: Worker | None = attrs.field(init=False, default=None)
_events: LocalEventBroker = attrs.field(init=False, factory=LocalEventBroker)
_exit_stack: ExitStack = attrs.field(init=False)
@@ -108,8 +109,14 @@ class Scheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__)
- self._wakeup_event.set()
+ event_ = cast('ScheduleAdded | ScheduleUpdated', event)
+ if (
+ not self._wakeup_deadline
+ or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ ):
+ self.logger.debug('Detected a %s event – waking up the scheduler',
+ type(event).__name__)
+ self._wakeup_event.set()
def add_schedule(
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
@@ -309,18 +316,20 @@ class Scheduler:
# schedule is due or the scheduler is explicitly woken up
wait_time = None
if len(schedules) < 100:
- next_fire_time = self.data_store.get_next_schedule_run_time()
- if next_fire_time:
- wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds()
+ self._wakeup_deadline = self.data_store.get_next_schedule_run_time()
+ if self._wakeup_deadline:
+ wait_time = (
+ self._wakeup_deadline - datetime.now(timezone.utc)
+ ).total_seconds()
self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, next_fire_time)
+ wait_time, self._wakeup_deadline)
else:
self.logger.debug('Waiting for any due schedules to appear')
+
+ if self._wakeup_event.wait(wait_time):
+ self._wakeup_event = threading.Event()
else:
self.logger.debug('Processing more schedules on the next iteration')
-
- if self._wakeup_event.wait(wait_time):
- self._wakeup_event = threading.Event()
except BaseException as exc:
self._state = RunState.stopped
self.logger.exception('Scheduler crashed')