diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-30 00:53:34 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | ff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5 (patch) | |
tree | f54b415d73f4e3ca19be77194b48f916928d99cc /src/apscheduler/schedulers | |
parent | 9b64cbec8e7e513e0452b9be76bc14cc08ec28a6 (diff) | |
download | apscheduler-ff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5.tar.gz |
Refactored acquire_schedules() to be a context manager
This allows some nifty tricks like holding a lock on the schedules while they're being evaluated.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 84 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 86 |
2 files changed, 83 insertions, 87 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6c7ef8c..9b28f13 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -152,50 +152,48 @@ class AsyncScheduler(EventSource): try: while self._state is RunState.started: - schedules = await self.data_store.acquire_schedules(self.identity, 100) - now = datetime.now(timezone.utc) - for schedule in schedules: - # Look up the task definition - try: - taskdef = self._get_taskdef(schedule.task_id) - except LookupError: - self.logger.error('Cannot locate task definition %r for schedule %r – ' - 'removing schedule', schedule.task_id, schedule.id) - schedule.next_fire_time = None - continue - - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: + async with self.data_store.acquire_schedules(self.identity, 100) as schedules: + now = datetime.now(timezone.utc) + for schedule in schedules: + # Look up the task definition try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, taskdef.id) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - for fire_time in fire_times: - schedule.last_fire_time = fire_time - job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, - schedule.id, fire_time, schedule.next_deadline, - schedule.tags) - await self.data_store.add_job(job) - - await self.data_store.release_schedules(self.identity, schedules) + taskdef = self._get_taskdef(schedule.task_id) + except LookupError: + self.logger.error('Cannot locate task definition %r for schedule %r – ' + 'removing schedule', schedule.task_id, schedule.id) + schedule.next_fire_time = None + continue + + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: + try: + fire_time = calculate_next() + except Exception: + self.logger.exception( + 'Error computing next fire time for schedule %r of task %r – ' + 'removing schedule', schedule.id, taskdef.id) + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + for fire_time in fire_times: + schedule.last_fire_time = fire_time + job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, + schedule.id, fire_time, schedule.next_deadline, + schedule.tags) + await self.data_store.add_job(job) await self._wakeup_event.wait() self._wakeup_event = anyio.Event() diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 6f330c9..23e0564 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -146,51 +146,49 @@ class Scheduler(EventSource): try: while self._state is RunState.started: - schedules = self.data_store.acquire_schedules(self.identity, 100) - now = datetime.now(timezone.utc) - for schedule in schedules: - # Look up the task definition - try: - taskdef = self._get_taskdef(schedule.task_id) - except LookupError: - self.logger.error('Cannot locate task definition %r for schedule %r – ' - 'putting schedule on hold', schedule.task_id, - schedule.id) - schedule.next_fire_time = None - continue - - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: + with self.data_store.acquire_schedules(self.identity, 100) as schedules: + now = datetime.now(timezone.utc) + for schedule in schedules: + # Look up the task definition try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, taskdef.id) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - for fire_time in fire_times: - schedule.last_fire_time = fire_time - job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, - schedule.id, fire_time, schedule.next_deadline, - schedule.tags) - self.data_store.add_job(job) - - self.data_store.release_schedules(self.identity, schedules) + taskdef = self._get_taskdef(schedule.task_id) + except LookupError: + self.logger.error('Cannot locate task definition %r for schedule %r – ' + 'putting schedule on hold', schedule.task_id, + schedule.id) + schedule.next_fire_time = None + continue + + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: + try: + fire_time = calculate_next() + except Exception: + self.logger.exception( + 'Error computing next fire time for schedule %r of task %r – ' + 'removing schedule', schedule.id, taskdef.id) + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + for fire_time in fire_times: + schedule.last_fire_time = fire_time + job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, + schedule.id, fire_time, schedule.next_deadline, + schedule.tags) + self.data_store.add_job(job) self._wakeup_event.wait() self._wakeup_event = threading.Event() |