diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-04-20 01:00:57 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-04-20 01:11:20 +0300 |
commit | b20f62d929eed84ad18020bb82dd43d8cb70da4d (patch) | |
tree | c42bf1877dd54755c55c649269e1254995bdf0c9 /src/apscheduler/schedulers | |
parent | 82992cd427a9ab2351d8e0719b82d826dff5a521 (diff) | |
download | apscheduler-b20f62d929eed84ad18020bb82dd43d8cb70da4d.tar.gz |
Switched to Black for code formatting
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 173 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 160 |
2 files changed, 237 insertions, 96 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 2f6c418..72c5994 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -11,7 +11,12 @@ from uuid import UUID, uuid4 import anyio import attrs -from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after +from anyio import ( + TASK_STATUS_IGNORED, + create_task_group, + get_cancelled_exc_class, + move_on_after, +) from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger from ..context import current_scheduler @@ -20,7 +25,13 @@ from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) + Event, + JobReleased, + ScheduleAdded, + SchedulerStarted, + SchedulerStopped, + ScheduleUpdated, +) from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref from ..structures import JobResult, Task @@ -34,7 +45,9 @@ _zero_timedelta = timedelta() class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" - data_store: AsyncDataStore = attrs.field(converter=as_async_datastore, factory=MemoryDataStore) + data_store: AsyncDataStore = attrs.field( + converter=as_async_datastore, factory=MemoryDataStore + ) identity: str = attrs.field(kw_only=True, default=None) start_worker: bool = attrs.field(kw_only=True, default=True) logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) @@ -43,12 +56,14 @@ class AsyncScheduler: _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _worker: AsyncWorker | None = attrs.field(init=False, default=None) - _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker) + _events: LocalAsyncEventBroker = attrs.field( + init=False, factory=LocalAsyncEventBroker + ) _exit_stack: AsyncExitStack = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: - self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}' + self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" @property def events(self) -> EventSource: @@ -67,7 +82,9 @@ class AsyncScheduler: # Initialize the data store and start relaying events to the scheduler's event broker await self._exit_stack.enter_async_context(self.data_store) - self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) + self._exit_stack.enter_context( + self.data_store.events.subscribe(self._events.publish) + ) # Wake up the scheduler if the data store emits a significant schedule event self._exit_stack.enter_context( @@ -99,22 +116,28 @@ class AsyncScheduler: del self._wakeup_event def _schedule_added_or_modified(self, event: Event) -> None: - event_ = cast('ScheduleAdded | ScheduleUpdated', event) - if ( - not self._wakeup_deadline - or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline) + event_ = cast("ScheduleAdded | ScheduleUpdated", event) + if not self._wakeup_deadline or ( + event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline ): - self.logger.debug('Detected a %s event – waking up the scheduler', - type(event).__name__) + self.logger.debug( + "Detected a %s event – waking up the scheduler", type(event).__name__ + ) self._wakeup_event.set() async def add_schedule( - self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None, - args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + self, + func_or_task_id: str | Callable, + trigger: Trigger, + *, + id: str | None = None, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None = None, - max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None, - conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing + max_jitter: float | timedelta | None = None, + tags: Iterable[str] | None = None, + conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, ) -> str: id = id or str(uuid4()) args = tuple(args or ()) @@ -129,13 +152,25 @@ class AsyncScheduler: else: task = await self.data_store.get_task(func_or_task_id) - schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, - max_jitter=max_jitter, tags=tags) + schedule = Schedule( + id=id, + task_id=task.id, + trigger=trigger, + args=args, + kwargs=kwargs, + coalesce=coalesce, + misfire_grace_time=misfire_grace_time, + max_jitter=max_jitter, + tags=tags, + ) schedule.next_fire_time = trigger.next() await self.data_store.add_schedule(schedule, conflict_policy) - self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task, - trigger, schedule.next_fire_time) + self.logger.info( + "Added new schedule (task=%r, trigger=%r); next run time at %s", + task, + trigger, + schedule.next_fire_time, + ) return schedule.id async def get_schedule(self, id: str) -> Schedule: @@ -146,8 +181,12 @@ class AsyncScheduler: await self.data_store.remove_schedules({schedule_id}) async def add_job( - self, func_or_task_id: str | Callable, *, args: Iterable | None = None, - kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None + self, + func_or_task_id: str | Callable, + *, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, + tags: Iterable[str] | None = None, ) -> UUID: """ Add a job to the data store. @@ -165,7 +204,12 @@ class AsyncScheduler: else: task = await self.data_store.get_task(func_or_task_id) - job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset()) + job = Job( + task_id=task.id, + args=args or (), + kwargs=kwargs or {}, + tags=tags or frozenset(), + ) await self.data_store.add_job(job) return job.id @@ -199,8 +243,12 @@ class AsyncScheduler: return result async def run_job( - self, func_or_task_id: str | Callable, *, args: Iterable | None = None, - kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = () + self, + func_or_task_id: str | Callable, + *, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, + tags: Iterable[str] | None = (), ) -> Any: """ Convenience method to add a job and then return its result (or raise its exception). @@ -216,7 +264,9 @@ class AsyncScheduler: job_id: UUID | None = None with self.data_store.events.subscribe(listener, {JobReleased}): - job_id = await self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + job_id = await self.add_job( + func_or_task_id, args=args, kwargs=kwargs, tags=tags + ) await job_complete_event.wait() result = await self.get_job_result(job_id) @@ -229,12 +279,14 @@ class AsyncScheduler: elif result.outcome is JobOutcome.cancelled: raise JobCancelled else: - raise RuntimeError(f'Unknown job outcome: {result.outcome}') + raise RuntimeError(f"Unknown job outcome: {result.outcome}") async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None: if self._state is not RunState.starting: - raise RuntimeError(f'This function cannot be called while the scheduler is in the ' - f'{self._state} state') + raise RuntimeError( + f"This function cannot be called while the scheduler is in the " + f"{self._state} state" + ) # Signal that the scheduler has started self._state = RunState.started @@ -255,8 +307,11 @@ class AsyncScheduler: fire_time = calculate_next() except Exception: self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, schedule.task_id) + "Error computing next fire time for schedule %r of task %r – " + "removing schedule", + schedule.id, + schedule.task_id, + ) break # Stop if the calculated fire time is in the future @@ -271,7 +326,11 @@ class AsyncScheduler: fire_times[0] = fire_time # Add one or more jobs to the job queue - max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0 + max_jitter = ( + schedule.max_jitter.total_seconds() + if schedule.max_jitter + else 0 + ) for i, fire_time in enumerate(fire_times): # Calculate a jitter if max_jitter > 0 jitter = _zero_timedelta @@ -284,19 +343,30 @@ class AsyncScheduler: if next_fire_time is not None: # Jitter must never be so high that it would cause a fire time to # equal or exceed the next fire time - jitter_s = min([ - max_jitter, - (next_fire_time - fire_time - - _microsecond_delta).total_seconds() - ]) + jitter_s = min( + [ + max_jitter, + ( + next_fire_time + - fire_time + - _microsecond_delta + ).total_seconds(), + ] + ) jitter = timedelta(seconds=random.uniform(0, jitter_s)) fire_time += jitter schedule.last_fire_time = fire_time - job = Job(task_id=schedule.task_id, args=schedule.args, - kwargs=schedule.kwargs, schedule_id=schedule.id, - scheduled_fire_time=fire_time, jitter=jitter, - start_deadline=schedule.next_deadline, tags=schedule.tags) + job = Job( + task_id=schedule.task_id, + args=schedule.args, + kwargs=schedule.kwargs, + schedule_id=schedule.id, + scheduled_fire_time=fire_time, + jitter=jitter, + start_deadline=schedule.next_deadline, + tags=schedule.tags, + ) await self.data_store.add_job(job) # Update the schedules (and release the scheduler's claim on them) @@ -306,29 +376,34 @@ class AsyncScheduler: # schedule is due or the scheduler is explicitly woken up wait_time = None if len(schedules) < 100: - self._wakeup_deadline = await self.data_store.get_next_schedule_run_time() + self._wakeup_deadline = ( + await self.data_store.get_next_schedule_run_time() + ) if self._wakeup_deadline: wait_time = ( self._wakeup_deadline - datetime.now(timezone.utc) ).total_seconds() - self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', - wait_time, self._wakeup_deadline) + self.logger.debug( + "Sleeping %.3f seconds until the next fire time (%s)", + wait_time, + self._wakeup_deadline, + ) else: - self.logger.debug('Waiting for any due schedules to appear') + self.logger.debug("Waiting for any due schedules to appear") with move_on_after(wait_time): await self._wakeup_event.wait() self._wakeup_event = anyio.Event() else: - self.logger.debug('Processing more schedules on the next iteration') + self.logger.debug("Processing more schedules on the next iteration") except get_cancelled_exc_class(): pass except BaseException as exc: - self.logger.exception('Scheduler crashed') + self.logger.exception("Scheduler crashed") exception = exc raise else: - self.logger.info('Scheduler stopped') + self.logger.info("Scheduler stopped") finally: self._state = RunState.stopped with move_on_after(3, shield=True): diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 3e9f196..c78d93c 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -19,7 +19,13 @@ from ..datastores.memory import MemoryDataStore from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker from ..events import ( - Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) + Event, + JobReleased, + ScheduleAdded, + SchedulerStarted, + SchedulerStopped, + ScheduleUpdated, +) from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref from ..structures import Job, JobResult, Schedule, Task @@ -47,7 +53,7 @@ class Scheduler: def __attrs_post_init__(self) -> None: if not self.identity: - self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}' + self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" @property def events(self) -> EventSource: @@ -70,7 +76,9 @@ class Scheduler: # Initialize the data store and start relaying events to the scheduler's event broker self._exit_stack.enter_context(self.data_store) - self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish)) + self._exit_stack.enter_context( + self.data_store.events.subscribe(self._events.publish) + ) # Wake up the scheduler if the data store emits a significant schedule event self._exit_stack.enter_context( @@ -92,7 +100,9 @@ class Scheduler: start_future: Future[Event] = Future() with self._events.subscribe(start_future.set_result, one_shot=True): executor = ThreadPoolExecutor(1) - self._exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None)) + self._exit_stack.push( + lambda exc_type, *args: executor.shutdown(wait=exc_type is None) + ) run_future = executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) @@ -109,22 +119,28 @@ class Scheduler: del self._wakeup_event def _schedule_added_or_modified(self, event: Event) -> None: - event_ = cast('ScheduleAdded | ScheduleUpdated', event) - if ( - not self._wakeup_deadline - or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline) + event_ = cast("ScheduleAdded | ScheduleUpdated", event) + if not self._wakeup_deadline or ( + event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline ): - self.logger.debug('Detected a %s event – waking up the scheduler', - type(event).__name__) + self.logger.debug( + "Detected a %s event – waking up the scheduler", type(event).__name__ + ) self._wakeup_event.set() def add_schedule( - self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None, - args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + self, + func_or_task_id: str | Callable, + trigger: Trigger, + *, + id: str | None = None, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None = None, - max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None, - conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing + max_jitter: float | timedelta | None = None, + tags: Iterable[str] | None = None, + conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, ) -> str: id = id or str(uuid4()) args = tuple(args or ()) @@ -139,13 +155,25 @@ class Scheduler: else: task = self.data_store.get_task(func_or_task_id) - schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs, - coalesce=coalesce, misfire_grace_time=misfire_grace_time, - max_jitter=max_jitter, tags=tags) + schedule = Schedule( + id=id, + task_id=task.id, + trigger=trigger, + args=args, + kwargs=kwargs, + coalesce=coalesce, + misfire_grace_time=misfire_grace_time, + max_jitter=max_jitter, + tags=tags, + ) schedule.next_fire_time = trigger.next() self.data_store.add_schedule(schedule, conflict_policy) - self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task, - trigger, schedule.next_fire_time) + self.logger.info( + "Added new schedule (task=%r, trigger=%r); next run time at %s", + task, + trigger, + schedule.next_fire_time, + ) return schedule.id def get_schedule(self, id: str) -> Schedule: @@ -156,8 +184,12 @@ class Scheduler: self.data_store.remove_schedules({schedule_id}) def add_job( - self, func_or_task_id: str | Callable, *, args: Iterable | None = None, - kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None + self, + func_or_task_id: str | Callable, + *, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, + tags: Iterable[str] | None = None, ) -> UUID: """ Add a job to the data store. @@ -175,7 +207,12 @@ class Scheduler: else: task = self.data_store.get_task(func_or_task_id) - job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset()) + job = Job( + task_id=task.id, + args=args or (), + kwargs=kwargs or {}, + tags=tags or frozenset(), + ) self.data_store.add_job(job) return job.id @@ -209,8 +246,12 @@ class Scheduler: return result def run_job( - self, func_or_task_id: str | Callable, *, args: Iterable | None = None, - kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = () + self, + func_or_task_id: str | Callable, + *, + args: Iterable | None = None, + kwargs: Mapping[str, Any] | None = None, + tags: Iterable[str] | None = (), ) -> Any: """ Convenience method to add a job and then return its result (or raise its exception). @@ -239,12 +280,14 @@ class Scheduler: elif result.outcome is JobOutcome.cancelled: raise JobCancelled else: - raise RuntimeError(f'Unknown job outcome: {result.outcome}') + raise RuntimeError(f"Unknown job outcome: {result.outcome}") def run(self) -> None: if self._state is not RunState.starting: - raise RuntimeError(f'This function cannot be called while the scheduler is in the ' - f'{self._state} state') + raise RuntimeError( + f"This function cannot be called while the scheduler is in the " + f"{self._state} state" + ) # Signal that the scheduler has started self._state = RunState.started @@ -253,8 +296,10 @@ class Scheduler: try: while self._state is RunState.started: schedules = self.data_store.acquire_schedules(self.identity, 100) - self.logger.debug('Processing %d schedules retrieved from the data store', - len(schedules)) + self.logger.debug( + "Processing %d schedules retrieved from the data store", + len(schedules), + ) now = datetime.now(timezone.utc) for schedule in schedules: # Calculate a next fire time for the schedule, if possible @@ -265,8 +310,11 @@ class Scheduler: fire_time = calculate_next() except Exception: self.logger.exception( - 'Error computing next fire time for schedule %r of task %r – ' - 'removing schedule', schedule.id, schedule.task_id) + "Error computing next fire time for schedule %r of task %r – " + "removing schedule", + schedule.id, + schedule.task_id, + ) break # Stop if the calculated fire time is in the future @@ -281,7 +329,11 @@ class Scheduler: fire_times[0] = fire_time # Add one or more jobs to the job queue - max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0 + max_jitter = ( + schedule.max_jitter.total_seconds() + if schedule.max_jitter + else 0 + ) for i, fire_time in enumerate(fire_times): # Calculate a jitter if max_jitter > 0 jitter = _zero_timedelta @@ -294,19 +346,30 @@ class Scheduler: if next_fire_time is not None: # Jitter must never be so high that it would cause a fire time to # equal or exceed the next fire time - jitter_s = min([ - max_jitter, - (next_fire_time - fire_time - - _microsecond_delta).total_seconds() - ]) + jitter_s = min( + [ + max_jitter, + ( + next_fire_time + - fire_time + - _microsecond_delta + ).total_seconds(), + ] + ) jitter = timedelta(seconds=random.uniform(0, jitter_s)) fire_time += jitter schedule.last_fire_time = fire_time - job = Job(task_id=schedule.task_id, args=schedule.args, - kwargs=schedule.kwargs, schedule_id=schedule.id, - scheduled_fire_time=fire_time, jitter=jitter, - start_deadline=schedule.next_deadline, tags=schedule.tags) + job = Job( + task_id=schedule.task_id, + args=schedule.args, + kwargs=schedule.kwargs, + schedule_id=schedule.id, + scheduled_fire_time=fire_time, + jitter=jitter, + start_deadline=schedule.next_deadline, + tags=schedule.tags, + ) self.data_store.add_job(job) # Update the schedules (and release the scheduler's claim on them) @@ -321,23 +384,26 @@ class Scheduler: wait_time = ( self._wakeup_deadline - datetime.now(timezone.utc) ).total_seconds() - self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)', - wait_time, self._wakeup_deadline) + self.logger.debug( + "Sleeping %.3f seconds until the next fire time (%s)", + wait_time, + self._wakeup_deadline, + ) else: - self.logger.debug('Waiting for any due schedules to appear') + self.logger.debug("Waiting for any due schedules to appear") if self._wakeup_event.wait(wait_time): self._wakeup_event = threading.Event() else: - self.logger.debug('Processing more schedules on the next iteration') + self.logger.debug("Processing more schedules on the next iteration") except BaseException as exc: self._state = RunState.stopped - self.logger.exception('Scheduler crashed') + self.logger.exception("Scheduler crashed") self._events.publish(SchedulerStopped(exception=exc)) raise self._state = RunState.stopped - self.logger.info('Scheduler stopped') + self.logger.info("Scheduler stopped") self._events.publish(SchedulerStopped()) # def stop(self) -> None: |