summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-02 19:35:40 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 01:39:07 +0300
commit19d75e196aea88032e68535352bbdd9f528a214f (patch)
tree18b9b6eb6c59c33c5f5adc725f52d1c2c167a2b5 /src/apscheduler/schedulers
parentdbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad (diff)
downloadapscheduler-19d75e196aea88032e68535352bbdd9f528a214f.tar.gz
More refactoring work
* Added mysql and sqlite to the data store testing matrix * Made customizing the SQLAlchemy table metadata easier * Refactored more classes to use attrs instead of dataclasses * Added the get_next_schedule_run_time() method to stores * Made schedulers use get_next_schedule_run_time() to limit their waiting time
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py107
-rw-r--r--src/apscheduler/schedulers/sync.py106
2 files changed, 120 insertions, 93 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 9b28f13..3f33921 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -9,7 +9,8 @@ from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Type, Union
from uuid import uuid4
import anyio
-from anyio import TASK_STATUS_IGNORED, Event, create_task_group, get_cancelled_exc_class
+from anyio import (
+ TASK_STATUS_IGNORED, Event, create_task_group, get_cancelled_exc_class, move_on_after)
from anyio.abc import TaskGroup
from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger
@@ -130,8 +131,8 @@ class AsyncScheduler(EventSource):
taskdef = self._get_taskdef(task)
schedule = Schedule(id=id, task_id=taskdef.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags,
- next_fire_time=trigger.next())
+ coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags)
+ schedule.next_fire_time = trigger.next()
await self.data_store.add_schedule(schedule, conflict_policy)
self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', taskdef,
trigger, schedule.next_fire_time)
@@ -152,50 +153,64 @@ class AsyncScheduler(EventSource):
try:
while self._state is RunState.started:
- async with self.data_store.acquire_schedules(self.identity, 100) as schedules:
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Look up the task definition
+ schedules = await self.data_store.acquire_schedules(self.identity, 100)
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Look up the task definition
+ try:
+ taskdef = self._get_taskdef(schedule.task_id)
+ except LookupError:
+ self.logger.error('Cannot locate task definition %r for schedule %r – '
+ 'removing schedule', schedule.task_id, schedule.id)
+ schedule.next_fire_time = None
+ continue
+
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
try:
- taskdef = self._get_taskdef(schedule.task_id)
- except LookupError:
- self.logger.error('Cannot locate task definition %r for schedule %r – '
- 'removing schedule', schedule.task_id, schedule.id)
- schedule.next_fire_time = None
- continue
-
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
- try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, taskdef.id)
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- for fire_time in fire_times:
- schedule.last_fire_time = fire_time
- job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
- schedule.id, fire_time, schedule.next_deadline,
- schedule.tags)
- await self.data_store.add_job(job)
-
- await self._wakeup_event.wait()
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ 'Error computing next fire time for schedule %r of task %r – '
+ 'removing schedule', schedule.id, taskdef.id)
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ for fire_time in fire_times:
+ schedule.last_fire_time = fire_time
+ job = Job(task_id=taskdef.id, func=taskdef.func, args=schedule.args,
+ kwargs=schedule.kwargs, schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ start_deadline=schedule.next_deadline, tags=schedule.tags)
+ await self.data_store.add_job(job)
+
+ # Update the schedules (and release the scheduler's claim on them)
+ await self.data_store.release_schedules(self.identity, schedules)
+
+ # If we received fewer schedules than the maximum amount, sleep until the next
+ # schedule is due or the scheduler is explicitly woken up
+ wait_time = None
+ if len(schedules) < 100:
+ next_fire_time = await self.data_store.get_next_schedule_run_time()
+ if next_fire_time:
+ wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds()
+
+ with move_on_after(wait_time):
+ await self._wakeup_event.wait()
+
self._wakeup_event = anyio.Event()
except get_cancelled_exc_class():
pass
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 23e0564..feffa0b 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -125,8 +125,8 @@ class Scheduler(EventSource):
taskdef = self._get_taskdef(task)
schedule = Schedule(id=id, task_id=taskdef.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags,
- next_fire_time=trigger.next())
+ coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags)
+ schedule.next_fire_time = trigger.next()
self.data_store.add_schedule(schedule, conflict_policy)
self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', taskdef,
trigger, schedule.next_fire_time)
@@ -146,52 +146,64 @@ class Scheduler(EventSource):
try:
while self._state is RunState.started:
- with self.data_store.acquire_schedules(self.identity, 100) as schedules:
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Look up the task definition
+ schedules = self.data_store.acquire_schedules(self.identity, 100)
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Look up the task definition
+ try:
+ taskdef = self._get_taskdef(schedule.task_id)
+ except LookupError:
+ self.logger.error('Cannot locate task definition %r for schedule %r – '
+ 'putting schedule on hold', schedule.task_id,
+ schedule.id)
+ schedule.next_fire_time = None
+ continue
+
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
try:
- taskdef = self._get_taskdef(schedule.task_id)
- except LookupError:
- self.logger.error('Cannot locate task definition %r for schedule %r – '
- 'putting schedule on hold', schedule.task_id,
- schedule.id)
- schedule.next_fire_time = None
- continue
-
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
- try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, taskdef.id)
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- for fire_time in fire_times:
- schedule.last_fire_time = fire_time
- job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
- schedule.id, fire_time, schedule.next_deadline,
- schedule.tags)
- self.data_store.add_job(job)
-
- self._wakeup_event.wait()
- self._wakeup_event = threading.Event()
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ 'Error computing next fire time for schedule %r of task %r – '
+ 'removing schedule', schedule.id, taskdef.id)
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ for fire_time in fire_times:
+ schedule.last_fire_time = fire_time
+ job = Job(task_id=taskdef.id, func=taskdef.func, args=schedule.args,
+ kwargs=schedule.kwargs, schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ start_deadline=schedule.next_deadline, tags=schedule.tags)
+ self.data_store.add_job(job)
+
+ # Update the schedules (and release the scheduler's claim on them)
+ self.data_store.release_schedules(self.identity, schedules)
+
+ # If we received fewer schedules than the maximum amount, sleep until the next
+ # schedule is due or the scheduler is explicitly woken up
+ wait_time = None
+ if len(schedules) < 100:
+ next_fire_time = self.data_store.get_next_schedule_run_time()
+ if next_fire_time:
+ wait_time = (datetime.now(timezone.utc) - next_fire_time).total_seconds()
+
+ if self._wakeup_event.wait(wait_time):
+ self._wakeup_event = threading.Event()
except BaseException as exc:
self._state = RunState.stopped
self._events.publish(SchedulerStopped(exception=exc))