summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-08-30 00:53:34 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 01:39:07 +0300
commitff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5 (patch)
treef54b415d73f4e3ca19be77194b48f916928d99cc /src/apscheduler/schedulers
parent9b64cbec8e7e513e0452b9be76bc14cc08ec28a6 (diff)
downloadapscheduler-ff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5.tar.gz
Refactored acquire_schedules() to be a context manager
This allows some nifty tricks like holding a lock on the schedules while they're being evaluated.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py84
-rw-r--r--src/apscheduler/schedulers/sync.py86
2 files changed, 83 insertions, 87 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6c7ef8c..9b28f13 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -152,50 +152,48 @@ class AsyncScheduler(EventSource):
try:
while self._state is RunState.started:
- schedules = await self.data_store.acquire_schedules(self.identity, 100)
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Look up the task definition
- try:
- taskdef = self._get_taskdef(schedule.task_id)
- except LookupError:
- self.logger.error('Cannot locate task definition %r for schedule %r – '
- 'removing schedule', schedule.task_id, schedule.id)
- schedule.next_fire_time = None
- continue
-
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
+ async with self.data_store.acquire_schedules(self.identity, 100) as schedules:
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Look up the task definition
try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, taskdef.id)
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- for fire_time in fire_times:
- schedule.last_fire_time = fire_time
- job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
- schedule.id, fire_time, schedule.next_deadline,
- schedule.tags)
- await self.data_store.add_job(job)
-
- await self.data_store.release_schedules(self.identity, schedules)
+ taskdef = self._get_taskdef(schedule.task_id)
+ except LookupError:
+ self.logger.error('Cannot locate task definition %r for schedule %r – '
+ 'removing schedule', schedule.task_id, schedule.id)
+ schedule.next_fire_time = None
+ continue
+
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
+ try:
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ 'Error computing next fire time for schedule %r of task %r – '
+ 'removing schedule', schedule.id, taskdef.id)
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ for fire_time in fire_times:
+ schedule.last_fire_time = fire_time
+ job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
+ schedule.id, fire_time, schedule.next_deadline,
+ schedule.tags)
+ await self.data_store.add_job(job)
await self._wakeup_event.wait()
self._wakeup_event = anyio.Event()
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 6f330c9..23e0564 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -146,51 +146,49 @@ class Scheduler(EventSource):
try:
while self._state is RunState.started:
- schedules = self.data_store.acquire_schedules(self.identity, 100)
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Look up the task definition
- try:
- taskdef = self._get_taskdef(schedule.task_id)
- except LookupError:
- self.logger.error('Cannot locate task definition %r for schedule %r – '
- 'putting schedule on hold', schedule.task_id,
- schedule.id)
- schedule.next_fire_time = None
- continue
-
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
+ with self.data_store.acquire_schedules(self.identity, 100) as schedules:
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Look up the task definition
try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, taskdef.id)
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- for fire_time in fire_times:
- schedule.last_fire_time = fire_time
- job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
- schedule.id, fire_time, schedule.next_deadline,
- schedule.tags)
- self.data_store.add_job(job)
-
- self.data_store.release_schedules(self.identity, schedules)
+ taskdef = self._get_taskdef(schedule.task_id)
+ except LookupError:
+ self.logger.error('Cannot locate task definition %r for schedule %r – '
+ 'putting schedule on hold', schedule.task_id,
+ schedule.id)
+ schedule.next_fire_time = None
+ continue
+
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
+ try:
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ 'Error computing next fire time for schedule %r of task %r – '
+ 'removing schedule', schedule.id, taskdef.id)
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ for fire_time in fire_times:
+ schedule.last_fire_time = fire_time
+ job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
+ schedule.id, fire_time, schedule.next_deadline,
+ schedule.tags)
+ self.data_store.add_job(job)
self._wakeup_event.wait()
self._wakeup_event = threading.Event()