diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-21 00:21:25 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-21 00:21:25 +0300 |
commit | 8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0 (patch) | |
tree | ac96ee8a1cbc792cf56cf3534f5e3c65f0b5a9e7 /src/apscheduler/schedulers | |
parent | 8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (diff) | |
download | apscheduler-8326ac378e5b5f8e5cb2c45f20e0e1bdfa5075c0.tar.gz |
Implemented schedule-level jitter
Structures now keep enums, timedeltas and frozensets as-is. The MongoDB store was modified to use a custom type registry to handle this.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 40 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 39 |
2 files changed, 69 insertions, 10 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6900460..a08c090 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import platform +import random from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger @@ -24,6 +25,9 @@ from ..marshalling import callable_to_ref from ..structures import JobResult, Task from ..workers.async_ import AsyncWorker +_microsecond_delta = timedelta(microseconds=1) +_zero_timedelta = timedelta() + class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" @@ -96,7 +100,8 @@ class AsyncScheduler: self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, coalesce: CoalescePolicy = CoalescePolicy.latest, - misfire_grace_time: float | timedelta | None = None, tags: Optional[Iterable[str]] = None, + misfire_grace_time: float | timedelta | None = None, + max_jitter: float | timedelta | None = None, tags: Optional[Iterable[str]] = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing ) -> str: id = id or str(uuid4()) @@ -113,13 +118,18 @@ class AsyncScheduler: task = await self.data_store.get_task(func_or_task_id) schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags) + coalesce=coalesce, misfire_grace_time=misfire_grace_time, + max_jitter=max_jitter, tags=tags) schedule.next_fire_time = trigger.next() await self.data_store.add_schedule(schedule, conflict_policy) self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task, trigger, schedule.next_fire_time) return schedule.id + async def get_schedule(self, id: str) -> Schedule: + schedules = await self.data_store.get_schedules({id}) + return schedules[0] + async def remove_schedule(self, schedule_id: str) -> None: await self.data_store.remove_schedules({schedule_id}) @@ -143,7 +153,7 @@ class AsyncScheduler: else: task = await self.data_store.get_task(func_or_task_id) - job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags) + job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset()) await self.data_store.add_job(job) return job.id @@ -248,11 +258,31 @@ class AsyncScheduler: fire_times[0] = fire_time # Add one or more jobs to the job queue - for fire_time in fire_times: + max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0 + for i, fire_time in enumerate(fire_times): + # Calculate a jitter if max_jitter > 0 + jitter = _zero_timedelta + if max_jitter: + if i + 1 < len(fire_times): + next_fire_time = fire_times[i + 1] + else: + next_fire_time = schedule.next_fire_time + + if next_fire_time is not None: + # Jitter must never be so high that it would cause a fire time to + # equal or exceed the next fire time + jitter_s = min([ + max_jitter, + (next_fire_time - fire_time + - _microsecond_delta).total_seconds() + ]) + jitter = timedelta(seconds=random.uniform(0, jitter_s)) + fire_time += jitter + schedule.last_fire_time = fire_time job = Job(task_id=schedule.task_id, args=schedule.args, kwargs=schedule.kwargs, schedule_id=schedule.id, - scheduled_fire_time=fire_time, + scheduled_fire_time=fire_time, jitter=jitter, start_deadline=schedule.next_deadline, tags=schedule.tags) await self.data_store.add_job(job) diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 1525bea..905efac 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import platform +import random import threading from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack @@ -21,6 +22,9 @@ from ..marshalling import callable_to_ref from ..structures import Job, JobResult, Schedule, Task from ..workers.sync import Worker +_microsecond_delta = timedelta(microseconds=1) +_zero_timedelta = timedelta() + class Scheduler: """A synchronous scheduler implementation.""" @@ -96,7 +100,7 @@ class Scheduler: args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None = None, - tags: Optional[Iterable[str]] = None, + max_jitter: float | timedelta | None = None, tags: Optional[Iterable[str]] = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing ) -> str: id = id or str(uuid4()) @@ -113,13 +117,18 @@ class Scheduler: task = self.data_store.get_task(func_or_task_id) schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, tags=tags) + coalesce=coalesce, misfire_grace_time=misfire_grace_time, + max_jitter=max_jitter, tags=tags) schedule.next_fire_time = trigger.next() self.data_store.add_schedule(schedule, conflict_policy) self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task, trigger, schedule.next_fire_time) return schedule.id + def get_schedule(self, id: str) -> Schedule: + schedules = self.data_store.get_schedules({id}) + return schedules[0] + def remove_schedule(self, schedule_id: str) -> None: self.data_store.remove_schedules({schedule_id}) @@ -143,7 +152,7 @@ class Scheduler: else: task = self.data_store.get_task(func_or_task_id) - job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags) + job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset()) self.data_store.add_job(job) return job.id @@ -247,11 +256,31 @@ class Scheduler: fire_times[0] = fire_time # Add one or more jobs to the job queue - for fire_time in fire_times: + max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0 + for i, fire_time in enumerate(fire_times): + # Calculate a jitter if max_jitter > 0 + jitter = _zero_timedelta + if max_jitter: + if i + 1 < len(fire_times): + next_fire_time = fire_times[i + 1] + else: + next_fire_time = schedule.next_fire_time + + if next_fire_time is not None: + # Jitter must never be so high that it would cause a fire time to + # equal or exceed the next fire time + jitter_s = min([ + max_jitter, + (next_fire_time - fire_time + - _microsecond_delta).total_seconds() + ]) + jitter = timedelta(seconds=random.uniform(0, jitter_s)) + fire_time += jitter + schedule.last_fire_time = fire_time job = Job(task_id=schedule.task_id, args=schedule.args, kwargs=schedule.kwargs, schedule_id=schedule.id, - scheduled_fire_time=fire_time, + scheduled_fire_time=fire_time, jitter=jitter, start_deadline=schedule.next_deadline, tags=schedule.tags) self.data_store.add_job(job) |