From b421037421bde1d139e3844f6067ee3f1aeb6852 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 23 Sep 2021 01:33:04 +0300 Subject: Improved scheduler logging and fixed wait time calculation --- src/apscheduler/schedulers/async_.py | 19 ++++++++++++++----- src/apscheduler/schedulers/sync.py | 16 ++++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 751eb2c..b8f2e0b 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -20,7 +20,7 @@ from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) + Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref from ..structures import JobResult, Task @@ -71,7 +71,7 @@ class AsyncScheduler: # Wake up the scheduler if the data store emits a significant schedule event self._exit_stack.enter_context( self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated} + self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} ) ) @@ -97,6 +97,10 @@ class AsyncScheduler: self._state = RunState.stopped del self._wakeup_event + def _schedule_added_or_modified(self, event: Event) -> None: + self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__) + self._wakeup_event.set() + async def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, @@ -296,12 +300,17 @@ class AsyncScheduler: if len(schedules) < 100: next_fire_time = await self.data_store.get_next_schedule_run_time() if next_fire_time: - wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds() + wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds() + self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', + wait_time, next_fire_time) + else: + self.logger.debug('Waiting for any due schedules to appear') + else: + self.logger.debug('Processing more schedules on the next iteration') with move_on_after(wait_time): await self._wakeup_event.wait() - - self._wakeup_event = anyio.Event() + self._wakeup_event = anyio.Event() except get_cancelled_exc_class(): pass except BaseException as exc: diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index c49359c..748c8aa 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -74,7 +74,7 @@ class Scheduler: # Wake up the scheduler if the data store emits a significant schedule event self._exit_stack.enter_context( self.data_store.events.subscribe( - lambda event: self._wakeup_event.set(), {ScheduleAdded, ScheduleUpdated} + self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} ) ) @@ -107,6 +107,10 @@ class Scheduler: self._state = RunState.stopped del self._wakeup_event + def _schedule_added_or_modified(self, event: Event) -> None: + self.logger.debug('Detected a %s event – waking up the scheduler', type(event).__name__) + self._wakeup_event.set() + def add_schedule( self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, @@ -242,6 +246,8 @@ class Scheduler: try: while self._state is RunState.started: schedules = self.data_store.acquire_schedules(self.identity, 100) + self.logger.debug('Processing %d schedules retrieved from the data store', + len(schedules)) now = datetime.now(timezone.utc) for schedule in schedules: # Calculate a next fire time for the schedule, if possible @@ -305,7 +311,13 @@ class Scheduler: if len(schedules) < 100: next_fire_time = self.data_store.get_next_schedule_run_time() if next_fire_time: - wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds() + wait_time = (next_fire_time - datetime.now(timezone.utc)).total_seconds() + self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', + wait_time, next_fire_time) + else: + self.logger.debug('Waiting for any due schedules to appear') + else: + self.logger.debug('Processing more schedules on the next iteration') if self._wakeup_event.wait(wait_time): self._wakeup_event = threading.Event() -- cgit v1.2.1