summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-21 00:21:25 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-21 00:21:25 +0300
commit8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0 (patch)
treeac96ee8a1cbc792cf56cf3534f5e3c65f0b5a9e7 /src/apscheduler/schedulers
parent8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (diff)
downloadapscheduler-8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0.tar.gz
Implemented schedule-level jitter
Structures now keep enums, timedeltas and frozensets as-is. The MongoDB store was modified to use a custom type registry to handle this.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py40
-rw-r--r--src/apscheduler/schedulers/sync.py39
2 files changed, 69 insertions, 10 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6900460..a08c090 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import os
import platform
+import random
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
@@ -24,6 +25,9 @@ from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
from ..workers.async_ import AsyncWorker
+_microsecond_delta = timedelta(microseconds=1)
+_zero_timedelta = timedelta()
+
class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
@@ -96,7 +100,8 @@ class AsyncScheduler:
self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None,
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
- misfire_grace_time: float | timedelta | None = None, tags: Optional[Iterable[str]] = None,
+ misfire_grace_time: float | timedelta | None = None,
+ max_jitter: float | timedelta | None = None, tags: Optional[Iterable[str]] = None,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
) -> str:
id = id or str(uuid4())
@@ -113,13 +118,18 @@ class AsyncScheduler:
task = await self.data_store.get_task(func_or_task_id)
schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags)
+ coalesce=coalesce, misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter, tags=tags)
schedule.next_fire_time = trigger.next()
await self.data_store.add_schedule(schedule, conflict_policy)
self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
trigger, schedule.next_fire_time)
return schedule.id
+ async def get_schedule(self, id: str) -> Schedule:
+ schedules = await self.data_store.get_schedules({id})
+ return schedules[0]
+
async def remove_schedule(self, schedule_id: str) -> None:
await self.data_store.remove_schedules({schedule_id})
@@ -143,7 +153,7 @@ class AsyncScheduler:
else:
task = await self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags)
+ job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
await self.data_store.add_job(job)
return job.id
@@ -248,11 +258,31 @@ class AsyncScheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- for fire_time in fire_times:
+ max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ for i, fire_time in enumerate(fire_times):
+ # Calculate a jitter if max_jitter > 0
+ jitter = _zero_timedelta
+ if max_jitter:
+ if i + 1 < len(fire_times):
+ next_fire_time = fire_times[i + 1]
+ else:
+ next_fire_time = schedule.next_fire_time
+
+ if next_fire_time is not None:
+ # Jitter must never be so high that it would cause a fire time to
+ # equal or exceed the next fire time
+ jitter_s = min([
+ max_jitter,
+ (next_fire_time - fire_time
+ - _microsecond_delta).total_seconds()
+ ])
+ jitter = timedelta(seconds=random.uniform(0, jitter_s))
+ fire_time += jitter
+
schedule.last_fire_time = fire_time
job = Job(task_id=schedule.task_id, args=schedule.args,
kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time,
+ scheduled_fire_time=fire_time, jitter=jitter,
start_deadline=schedule.next_deadline, tags=schedule.tags)
await self.data_store.add_job(job)
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 1525bea..905efac 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import os
import platform
+import random
import threading
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from contextlib import ExitStack
@@ -21,6 +22,9 @@ from ..marshalling import callable_to_ref
from ..structures import Job, JobResult, Schedule, Task
from ..workers.sync import Worker
+_microsecond_delta = timedelta(microseconds=1)
+_zero_timedelta = timedelta()
+
class Scheduler:
"""A synchronous scheduler implementation."""
@@ -96,7 +100,7 @@ class Scheduler:
args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
- tags: Optional[Iterable[str]] = None,
+ max_jitter: float | timedelta | None = None, tags: Optional[Iterable[str]] = None,
conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
) -> str:
id = id or str(uuid4())
@@ -113,13 +117,18 @@ class Scheduler:
task = self.data_store.get_task(func_or_task_id)
schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags)
+ coalesce=coalesce, misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter, tags=tags)
schedule.next_fire_time = trigger.next()
self.data_store.add_schedule(schedule, conflict_policy)
self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
trigger, schedule.next_fire_time)
return schedule.id
+ def get_schedule(self, id: str) -> Schedule:
+ schedules = self.data_store.get_schedules({id})
+ return schedules[0]
+
def remove_schedule(self, schedule_id: str) -> None:
self.data_store.remove_schedules({schedule_id})
@@ -143,7 +152,7 @@ class Scheduler:
else:
task = self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags)
+ job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
self.data_store.add_job(job)
return job.id
@@ -247,11 +256,31 @@ class Scheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- for fire_time in fire_times:
+ max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ for i, fire_time in enumerate(fire_times):
+ # Calculate a jitter if max_jitter > 0
+ jitter = _zero_timedelta
+ if max_jitter:
+ if i + 1 < len(fire_times):
+ next_fire_time = fire_times[i + 1]
+ else:
+ next_fire_time = schedule.next_fire_time
+
+ if next_fire_time is not None:
+ # Jitter must never be so high that it would cause a fire time to
+ # equal or exceed the next fire time
+ jitter_s = min([
+ max_jitter,
+ (next_fire_time - fire_time
+ - _microsecond_delta).total_seconds()
+ ])
+ jitter = timedelta(seconds=random.uniform(0, jitter_s))
+ fire_time += jitter
+
schedule.last_fire_time = fire_time
job = Job(task_id=schedule.task_id, args=schedule.args,
kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time,
+ scheduled_fire_time=fire_time, jitter=jitter,
start_deadline=schedule.next_deadline, tags=schedule.tags)
self.data_store.add_job(job)