diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-02 19:35:40 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | 19d75e196aea88032e68535352bbdd9f528a214f (patch) | |
tree | 18b9b6eb6c59c33c5f5adc725f52d1c2c167a2b5 /src/apscheduler/schedulers | |
parent | dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad (diff) | |
download | apscheduler-19d75e196aea88032e68535352bbdd9f528a214f.tar.gz |
More refactoring work
* Added mysql and sqlite to the data store testing matrix
* Made customizing the SQLAlchemy table metadata easier
* Refactored more classes to use attrs instead of dataclasses
* Added the get_next_schedule_run_time() method to stores
* Made schedulers use get_next_schedule_run_time() to limit their waiting time
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 107 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 106 |
2 files changed, 120 insertions, 93 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 9b28f13..3f33921 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -9,7 +9,8 @@ from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Type, Union from uuid import uuid4 import anyio -from anyio import TASK_STATUS_IGNORED, Event, create_task_group, get_cancelled_exc_class +from anyio import ( + TASK_STATUS_IGNORED, Event, create_task_group, get_cancelled_exc_class, move_on_after) from anyio.abc import TaskGroup from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger @@ -130,8 +131,8 @@ class AsyncScheduler(EventSource): taskdef = self._get_taskdef(task) schedule = Schedule(id=id, task_id=taskdef.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags, - next_fire_time=trigger.next()) + coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags) + schedule.next_fire_time = trigger.next() await self.data_store.add_schedule(schedule, conflict_policy) self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', taskdef, trigger, schedule.next_fire_time) @@ -152,50 +153,64 @@ class AsyncScheduler(EventSource): try: while self._state is RunState.started: - async with self.data_store.acquire_schedules(self.identity, 100) as schedules: - now = datetime.now(timezone.utc) - for schedule in schedules: - # Look up the task definition + schedules = await self.data_store.acquire_schedules(self.identity, 100) + now = datetime.now(timezone.utc) + for schedule in schedules: + # Look up the task definition + try: + taskdef = self._get_taskdef(schedule.task_id) + except LookupError: + self.logger.error('Cannot locate task definition %r for schedule %r – ' + 'removing schedule', schedule.task_id, schedule.id) + schedule.next_fire_time = None + continue + + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: try: - taskdef = self._get_taskdef(schedule.task_id) - except LookupError: - self.logger.error('Cannot locate task definition %r for schedule %r – ' - 'removing schedule', schedule.task_id, schedule.id) - schedule.next_fire_time = None - continue - - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: - try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, taskdef.id) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - for fire_time in fire_times: - schedule.last_fire_time = fire_time - job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, - schedule.id, fire_time, schedule.next_deadline, - schedule.tags) - await self.data_store.add_job(job) - - await self._wakeup_event.wait() + fire_time = calculate_next() + except Exception: + self.logger.exception( + 'Error computing next fire time for schedule %r of task %r – ' + 'removing schedule', schedule.id, taskdef.id) + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + for fire_time in fire_times: + schedule.last_fire_time = fire_time + job = Job(task_id=taskdef.id, func=taskdef.func, args=schedule.args, + kwargs=schedule.kwargs, schedule_id=schedule.id, + scheduled_fire_time=fire_time, + start_deadline=schedule.next_deadline, tags=schedule.tags) + await self.data_store.add_job(job) + + # Update the schedules (and release the scheduler's claim on them) + await self.data_store.release_schedules(self.identity, schedules) + + # If we received fewer schedules than the maximum amount, sleep until the next + # schedule is due or the scheduler is explicitly woken up + wait_time = None + if len(schedules) < 100: + next_fire_time = await self.data_store.get_next_schedule_run_time() + if next_fire_time: + wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds() + + with move_on_after(wait_time): + await self._wakeup_event.wait() + self._wakeup_event = anyio.Event() except get_cancelled_exc_class(): pass diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 23e0564..feffa0b 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -125,8 +125,8 @@ class Scheduler(EventSource): taskdef = self._get_taskdef(task) schedule = Schedule(id=id, task_id=taskdef.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags, - next_fire_time=trigger.next()) + coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags) + schedule.next_fire_time = trigger.next() self.data_store.add_schedule(schedule, conflict_policy) self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', taskdef, trigger, schedule.next_fire_time) @@ -146,52 +146,64 @@ class Scheduler(EventSource): try: while self._state is RunState.started: - with self.data_store.acquire_schedules(self.identity, 100) as schedules: - now = datetime.now(timezone.utc) - for schedule in schedules: - # Look up the task definition + schedules = self.data_store.acquire_schedules(self.identity, 100) + now = datetime.now(timezone.utc) + for schedule in schedules: + # Look up the task definition + try: + taskdef = self._get_taskdef(schedule.task_id) + except LookupError: + self.logger.error('Cannot locate task definition %r for schedule %r – ' + 'putting schedule on hold', schedule.task_id, + schedule.id) + schedule.next_fire_time = None + continue + + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: try: - taskdef = self._get_taskdef(schedule.task_id) - except LookupError: - self.logger.error('Cannot locate task definition %r for schedule %r – ' - 'putting schedule on hold', schedule.task_id, - schedule.id) - schedule.next_fire_time = None - continue - - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: - try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, taskdef.id) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - for fire_time in fire_times: - schedule.last_fire_time = fire_time - job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, - schedule.id, fire_time, schedule.next_deadline, - schedule.tags) - self.data_store.add_job(job) - - self._wakeup_event.wait() - self._wakeup_event = threading.Event() + fire_time = calculate_next() + except Exception: + self.logger.exception( + 'Error computing next fire time for schedule %r of task %r – ' + 'removing schedule', schedule.id, taskdef.id) + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + for fire_time in fire_times: + schedule.last_fire_time = fire_time + job = Job(task_id=taskdef.id, func=taskdef.func, args=schedule.args, + kwargs=schedule.kwargs, schedule_id=schedule.id, + scheduled_fire_time=fire_time, + start_deadline=schedule.next_deadline, tags=schedule.tags) + self.data_store.add_job(job) + + # Update the schedules (and release the scheduler's claim on them) + self.data_store.release_schedules(self.identity, schedules) + + # If we received fewer schedules than the maximum amount, sleep until the next + # schedule is due or the scheduler is explicitly woken up + wait_time = None + if len(schedules) < 100: + next_fire_time = self.data_store.get_next_schedule_run_time() + if next_fire_time: + wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds() + + if self._wakeup_event.wait(wait_time): + self._wakeup_event = threading.Event() except BaseException as exc: self._state = RunState.stopped self._events.publish(SchedulerStopped(exception=exc)) |