diff options
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 338 |
1 files changed, 170 insertions, 168 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 72c5994..889596d 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -17,10 +17,11 @@ from anyio import ( get_cancelled_exc_class, move_on_after, ) +from anyio.abc import TaskGroup -from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger +from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger from ..context import current_scheduler -from ..converters import as_async_datastore +from ..converters import as_async_datastore, as_async_eventbroker from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker @@ -48,6 +49,9 @@ class AsyncScheduler: data_store: AsyncDataStore = attrs.field( converter=as_async_datastore, factory=MemoryDataStore ) + event_broker: AsyncEventBroker = attrs.field( + converter=as_async_eventbroker, factory=LocalAsyncEventBroker + ) identity: str = attrs.field(kw_only=True, default=None) start_worker: bool = attrs.field(kw_only=True, default=True) logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) @@ -55,65 +59,24 @@ class AsyncScheduler: _state: RunState = attrs.field(init=False, default=RunState.stopped) _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) - _worker: AsyncWorker | None = attrs.field(init=False, default=None) - _events: LocalAsyncEventBroker = attrs.field( - init=False, factory=LocalAsyncEventBroker - ) - _exit_stack: AsyncExitStack = attrs.field(init=False) + _task_group: TaskGroup | None = attrs.field(init=False, default=None) + _schedule_added_subscription: Subscription = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" - @property - def events(self) -> EventSource: - return self._events - - @property - def worker(self) -> AsyncWorker | None: - return self._worker - async def __aenter__(self): - self._state = RunState.starting - self._wakeup_event = anyio.Event() - self._exit_stack = AsyncExitStack() - await self._exit_stack.__aenter__() - await self._exit_stack.enter_async_context(self._events) - - # Initialize the data store and start relaying events to the scheduler's event broker - await self._exit_stack.enter_async_context(self.data_store) - self._exit_stack.enter_context( - self.data_store.events.subscribe(self._events.publish) - ) - - # Wake up the scheduler if the data store emits a significant schedule event - self._exit_stack.enter_context( - self.data_store.events.subscribe( - self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} - ) - ) - - # Start the built-in worker, if configured to do so - if self.start_worker: - token = current_scheduler.set(self) - try: - self._worker = AsyncWorker(self.data_store) - await self._exit_stack.enter_async_context(self._worker) - finally: - current_scheduler.reset(token) - - # Start the worker and return when it has signalled readiness or raised an exception - task_group = create_task_group() - await self._exit_stack.enter_async_context(task_group) - await task_group.start(self.run) + self._task_group = create_task_group() + await self._task_group.__aenter__() + await self._task_group.start(self.run_until_stopped) return self async def __aexit__(self, exc_type, exc_val, exc_tb): self._state = RunState.stopping self._wakeup_event.set() - await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) - self._state = RunState.stopped - del self._wakeup_event + await self._task_group.__aexit__(exc_type, exc_val, exc_tb) + self._task_group = None def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) @@ -281,130 +244,169 @@ class AsyncScheduler: else: raise RuntimeError(f"Unknown job outcome: {result.outcome}") - async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None: - if self._state is not RunState.starting: + async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None: + if self._state is not RunState.stopped: raise RuntimeError( - f"This function cannot be called while the scheduler is in the " - f"{self._state} state" + f'Cannot start the scheduler when it is in the "{self._state}" ' + f"state" ) - # Signal that the scheduler has started - self._state = RunState.started - task_status.started() - await self._events.publish(SchedulerStarted()) - - exception: BaseException | None = None - try: - while self._state is RunState.started: - schedules = await self.data_store.acquire_schedules(self.identity, 100) - now = datetime.now(timezone.utc) - for schedule in schedules: - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: - try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - "Error computing next fire time for schedule %r of task %r – " - "removing schedule", - schedule.id, - schedule.task_id, - ) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - max_jitter = ( - schedule.max_jitter.total_seconds() - if schedule.max_jitter - else 0 + self._state = RunState.starting + async with AsyncExitStack() as exit_stack: + self._wakeup_event = anyio.Event() + + # Initialize the event broker + await self.event_broker.start() + exit_stack.push_async_exit( + lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None) + ) + + # Initialize the data store + await self.data_store.start(self.event_broker) + exit_stack.push_async_exit( + lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None) + ) + + # Wake up the scheduler if the data store emits a significant schedule event + self._schedule_added_subscription = self.event_broker.subscribe( + self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + ) + + # Start the built-in worker, if configured to do so + if self.start_worker: + token = current_scheduler.set(self) + exit_stack.callback(current_scheduler.reset, token) + worker = AsyncWorker( + self.data_store, self.event_broker, is_internal=True + ) + await exit_stack.enter_async_context(worker) + + # Signal that the scheduler has started + self._state = RunState.started + task_status.started() + await self.event_broker.publish_local(SchedulerStarted()) + + exception: BaseException | None = None + try: + while self._state is RunState.started: + schedules = await self.data_store.acquire_schedules( + self.identity, 100 ) - for i, fire_time in enumerate(fire_times): - # Calculate a jitter if max_jitter > 0 - jitter = _zero_timedelta - if max_jitter: - if i + 1 < len(fire_times): - next_fire_time = fire_times[i + 1] - else: - next_fire_time = schedule.next_fire_time - - if next_fire_time is not None: - # Jitter must never be so high that it would cause a fire time to - # equal or exceed the next fire time - jitter_s = min( - [ - max_jitter, - ( - next_fire_time - - fire_time - - _microsecond_delta - ).total_seconds(), - ] + now = datetime.now(timezone.utc) + for schedule in schedules: + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: + try: + fire_time = calculate_next() + except Exception: + self.logger.exception( + "Error computing next fire time for schedule %r of " + "task %r – removing schedule", + schedule.id, + schedule.task_id, ) - jitter = timedelta(seconds=random.uniform(0, jitter_s)) - fire_time += jitter - - schedule.last_fire_time = fire_time - job = Job( - task_id=schedule.task_id, - args=schedule.args, - kwargs=schedule.kwargs, - schedule_id=schedule.id, - scheduled_fire_time=fire_time, - jitter=jitter, - start_deadline=schedule.next_deadline, - tags=schedule.tags, + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + max_jitter = ( + schedule.max_jitter.total_seconds() + if schedule.max_jitter + else 0 ) - await self.data_store.add_job(job) - - # Update the schedules (and release the scheduler's claim on them) - await self.data_store.release_schedules(self.identity, schedules) + for i, fire_time in enumerate(fire_times): + # Calculate a jitter if max_jitter > 0 + jitter = _zero_timedelta + if max_jitter: + if i + 1 < len(fire_times): + next_fire_time = fire_times[i + 1] + else: + next_fire_time = schedule.next_fire_time + + if next_fire_time is not None: + # Jitter must never be so high that it would cause a + # fire time to equal or exceed the next fire time + jitter_s = min( + [ + max_jitter, + ( + next_fire_time + - fire_time + - _microsecond_delta + ).total_seconds(), + ] + ) + jitter = timedelta( + seconds=random.uniform(0, jitter_s) + ) + fire_time += jitter + + schedule.last_fire_time = fire_time + job = Job( + task_id=schedule.task_id, + args=schedule.args, + kwargs=schedule.kwargs, + schedule_id=schedule.id, + scheduled_fire_time=fire_time, + jitter=jitter, + start_deadline=schedule.next_deadline, + tags=schedule.tags, + ) + await self.data_store.add_job(job) + + # Update the schedules (and release the scheduler's claim on them) + await self.data_store.release_schedules(self.identity, schedules) + + # If we received fewer schedules than the maximum amount, sleep + # until the next schedule is due or the scheduler is explicitly + # woken up + wait_time = None + if len(schedules) < 100: + self._wakeup_deadline = ( + await self.data_store.get_next_schedule_run_time() + ) + if self._wakeup_deadline: + wait_time = ( + self._wakeup_deadline - datetime.now(timezone.utc) + ).total_seconds() + self.logger.debug( + "Sleeping %.3f seconds until the next fire time (%s)", + wait_time, + self._wakeup_deadline, + ) + else: + self.logger.debug("Waiting for any due schedules to appear") - # If we received fewer schedules than the maximum amount, sleep until the next - # schedule is due or the scheduler is explicitly woken up - wait_time = None - if len(schedules) < 100: - self._wakeup_deadline = ( - await self.data_store.get_next_schedule_run_time() - ) - if self._wakeup_deadline: - wait_time = ( - self._wakeup_deadline - datetime.now(timezone.utc) - ).total_seconds() + with move_on_after(wait_time): + await self._wakeup_event.wait() + self._wakeup_event = anyio.Event() + else: self.logger.debug( - "Sleeping %.3f seconds until the next fire time (%s)", - wait_time, - self._wakeup_deadline, + "Processing more schedules on the next iteration" ) - else: - self.logger.debug("Waiting for any due schedules to appear") - - with move_on_after(wait_time): - await self._wakeup_event.wait() - self._wakeup_event = anyio.Event() - else: - self.logger.debug("Processing more schedules on the next iteration") - except get_cancelled_exc_class(): - pass - except BaseException as exc: - self.logger.exception("Scheduler crashed") - exception = exc - raise - else: - self.logger.info("Scheduler stopped") - finally: - self._state = RunState.stopped - with move_on_after(3, shield=True): - await self._events.publish(SchedulerStopped(exception=exception)) + except get_cancelled_exc_class(): + pass + except BaseException as exc: + self.logger.exception("Scheduler crashed") + exception = exc + raise + else: + self.logger.info("Scheduler stopped") + finally: + self._state = RunState.stopped + with move_on_after(3, shield=True): + await self.event_broker.publish_local( + SchedulerStopped(exception=exception) + ) |