summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:00:57 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-04-20 01:11:20 +0300
commitb20f62d929eed84ad18020bb82dd43d8cb70da4d (patch)
treec42bf1877dd54755c55c649269e1254995bdf0c9 /src/apscheduler/schedulers
parent82992cd427a9ab2351d8e0719b82d826dff5a521 (diff)
downloadapscheduler-b20f62d929eed84ad18020bb82dd43d8cb70da4d.tar.gz
Switched to Black for code formatting
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py173
-rw-r--r--src/apscheduler/schedulers/sync.py160
2 files changed, 237 insertions, 96 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 2f6c418..72c5994 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -11,7 +11,12 @@ from uuid import UUID, uuid4
import anyio
import attrs
-from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after
+from anyio import (
+ TASK_STATUS_IGNORED,
+ create_task_group,
+ get_cancelled_exc_class,
+ move_on_after,
+)
from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger
from ..context import current_scheduler
@@ -20,7 +25,13 @@ from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+ Event,
+ JobReleased,
+ ScheduleAdded,
+ SchedulerStarted,
+ SchedulerStopped,
+ ScheduleUpdated,
+)
from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
from ..structures import JobResult, Task
@@ -34,7 +45,9 @@ _zero_timedelta = timedelta()
class AsyncScheduler:
"""An asynchronous (AnyIO based) scheduler implementation."""
- data_store: AsyncDataStore = attrs.field(converter=as_async_datastore, factory=MemoryDataStore)
+ data_store: AsyncDataStore = attrs.field(
+ converter=as_async_datastore, factory=MemoryDataStore
+ )
identity: str = attrs.field(kw_only=True, default=None)
start_worker: bool = attrs.field(kw_only=True, default=True)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
@@ -43,12 +56,14 @@ class AsyncScheduler:
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_worker: AsyncWorker | None = attrs.field(init=False, default=None)
- _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker)
+ _events: LocalAsyncEventBroker = attrs.field(
+ init=False, factory=LocalAsyncEventBroker
+ )
_exit_stack: AsyncExitStack = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -67,7 +82,9 @@ class AsyncScheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
await self._exit_stack.enter_async_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
@@ -99,22 +116,28 @@ class AsyncScheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast('ScheduleAdded | ScheduleUpdated', event)
- if (
- not self._wakeup_deadline
- or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ event_ = cast("ScheduleAdded | ScheduleUpdated", event)
+ if not self._wakeup_deadline or (
+ event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
):
- self.logger.debug('Detected a %s event – waking up the scheduler',
- type(event).__name__)
+ self.logger.debug(
+ "Detected a %s event – waking up the scheduler", type(event).__name__
+ )
self._wakeup_event.set()
async def add_schedule(
- self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
- args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None,
+ self,
+ func_or_task_id: str | Callable,
+ trigger: Trigger,
+ *,
+ id: str | None = None,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
- max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None,
- conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
+ max_jitter: float | timedelta | None = None,
+ tags: Iterable[str] | None = None,
+ conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
id = id or str(uuid4())
args = tuple(args or ())
@@ -129,13 +152,25 @@ class AsyncScheduler:
else:
task = await self.data_store.get_task(func_or_task_id)
- schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time,
- max_jitter=max_jitter, tags=tags)
+ schedule = Schedule(
+ id=id,
+ task_id=task.id,
+ trigger=trigger,
+ args=args,
+ kwargs=kwargs,
+ coalesce=coalesce,
+ misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter,
+ tags=tags,
+ )
schedule.next_fire_time = trigger.next()
await self.data_store.add_schedule(schedule, conflict_policy)
- self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
- trigger, schedule.next_fire_time)
+ self.logger.info(
+ "Added new schedule (task=%r, trigger=%r); next run time at %s",
+ task,
+ trigger,
+ schedule.next_fire_time,
+ )
return schedule.id
async def get_schedule(self, id: str) -> Schedule:
@@ -146,8 +181,12 @@ class AsyncScheduler:
await self.data_store.remove_schedules({schedule_id})
async def add_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = None,
) -> UUID:
"""
Add a job to the data store.
@@ -165,7 +204,12 @@ class AsyncScheduler:
else:
task = await self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
+ job = Job(
+ task_id=task.id,
+ args=args or (),
+ kwargs=kwargs or {},
+ tags=tags or frozenset(),
+ )
await self.data_store.add_job(job)
return job.id
@@ -199,8 +243,12 @@ class AsyncScheduler:
return result
async def run_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = ()
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = (),
) -> Any:
"""
Convenience method to add a job and then return its result (or raise its exception).
@@ -216,7 +264,9 @@ class AsyncScheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = await self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_id = await self.add_job(
+ func_or_task_id, args=args, kwargs=kwargs, tags=tags
+ )
await job_complete_event.wait()
result = await self.get_job_result(job_id)
@@ -229,12 +279,14 @@ class AsyncScheduler:
elif result.outcome is JobOutcome.cancelled:
raise JobCancelled
else:
- raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+ raise RuntimeError(f"Unknown job outcome: {result.outcome}")
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the scheduler is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the scheduler is in the "
+ f"{self._state} state"
+ )
# Signal that the scheduler has started
self._state = RunState.started
@@ -255,8 +307,11 @@ class AsyncScheduler:
fire_time = calculate_next()
except Exception:
self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, schedule.task_id)
+ "Error computing next fire time for schedule %r of task %r – "
+ "removing schedule",
+ schedule.id,
+ schedule.task_id,
+ )
break
# Stop if the calculated fire time is in the future
@@ -271,7 +326,11 @@ class AsyncScheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
+ )
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
@@ -284,19 +343,30 @@ class AsyncScheduler:
if next_fire_time is not None:
# Jitter must never be so high that it would cause a fire time to
# equal or exceed the next fire time
- jitter_s = min([
- max_jitter,
- (next_fire_time - fire_time
- - _microsecond_delta).total_seconds()
- ])
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
jitter = timedelta(seconds=random.uniform(0, jitter_s))
fire_time += jitter
schedule.last_fire_time = fire_time
- job = Job(task_id=schedule.task_id, args=schedule.args,
- kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time, jitter=jitter,
- start_deadline=schedule.next_deadline, tags=schedule.tags)
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
await self.data_store.add_job(job)
# Update the schedules (and release the scheduler's claim on them)
@@ -306,29 +376,34 @@ class AsyncScheduler:
# schedule is due or the scheduler is explicitly woken up
wait_time = None
if len(schedules) < 100:
- self._wakeup_deadline = await self.data_store.get_next_schedule_run_time()
+ self._wakeup_deadline = (
+ await self.data_store.get_next_schedule_run_time()
+ )
if self._wakeup_deadline:
wait_time = (
self._wakeup_deadline - datetime.now(timezone.utc)
).total_seconds()
- self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, self._wakeup_deadline)
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ self._wakeup_deadline,
+ )
else:
- self.logger.debug('Waiting for any due schedules to appear')
+ self.logger.debug("Waiting for any due schedules to appear")
with move_on_after(wait_time):
await self._wakeup_event.wait()
self._wakeup_event = anyio.Event()
else:
- self.logger.debug('Processing more schedules on the next iteration')
+ self.logger.debug("Processing more schedules on the next iteration")
except get_cancelled_exc_class():
pass
except BaseException as exc:
- self.logger.exception('Scheduler crashed')
+ self.logger.exception("Scheduler crashed")
exception = exc
raise
else:
- self.logger.info('Scheduler stopped')
+ self.logger.info("Scheduler stopped")
finally:
self._state = RunState.stopped
with move_on_after(3, shield=True):
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 3e9f196..c78d93c 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -19,7 +19,13 @@ from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+ Event,
+ JobReleased,
+ ScheduleAdded,
+ SchedulerStarted,
+ SchedulerStopped,
+ ScheduleUpdated,
+)
from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
from ..structures import Job, JobResult, Schedule, Task
@@ -47,7 +53,7 @@ class Scheduler:
def __attrs_post_init__(self) -> None:
if not self.identity:
- self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
+ self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
@property
def events(self) -> EventSource:
@@ -70,7 +76,9 @@ class Scheduler:
# Initialize the data store and start relaying events to the scheduler's event broker
self._exit_stack.enter_context(self.data_store)
- self._exit_stack.enter_context(self.data_store.events.subscribe(self._events.publish))
+ self._exit_stack.enter_context(
+ self.data_store.events.subscribe(self._events.publish)
+ )
# Wake up the scheduler if the data store emits a significant schedule event
self._exit_stack.enter_context(
@@ -92,7 +100,9 @@ class Scheduler:
start_future: Future[Event] = Future()
with self._events.subscribe(start_future.set_result, one_shot=True):
executor = ThreadPoolExecutor(1)
- self._exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None))
+ self._exit_stack.push(
+ lambda exc_type, *args: executor.shutdown(wait=exc_type is None)
+ )
run_future = executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
@@ -109,22 +119,28 @@ class Scheduler:
del self._wakeup_event
def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast('ScheduleAdded | ScheduleUpdated', event)
- if (
- not self._wakeup_deadline
- or (event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline)
+ event_ = cast("ScheduleAdded | ScheduleUpdated", event)
+ if not self._wakeup_deadline or (
+ event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
):
- self.logger.debug('Detected a %s event – waking up the scheduler',
- type(event).__name__)
+ self.logger.debug(
+ "Detected a %s event – waking up the scheduler", type(event).__name__
+ )
self._wakeup_event.set()
def add_schedule(
- self, func_or_task_id: str | Callable, trigger: Trigger, *, id: str | None = None,
- args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None,
+ self,
+ func_or_task_id: str | Callable,
+ trigger: Trigger,
+ *,
+ id: str | None = None,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None = None,
- max_jitter: float | timedelta | None = None, tags: Iterable[str] | None = None,
- conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing
+ max_jitter: float | timedelta | None = None,
+ tags: Iterable[str] | None = None,
+ conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing,
) -> str:
id = id or str(uuid4())
args = tuple(args or ())
@@ -139,13 +155,25 @@ class Scheduler:
else:
task = self.data_store.get_task(func_or_task_id)
- schedule = Schedule(id=id, task_id=task.id, trigger=trigger, args=args, kwargs=kwargs,
- coalesce=coalesce, misfire_grace_time=misfire_grace_time,
- max_jitter=max_jitter, tags=tags)
+ schedule = Schedule(
+ id=id,
+ task_id=task.id,
+ trigger=trigger,
+ args=args,
+ kwargs=kwargs,
+ coalesce=coalesce,
+ misfire_grace_time=misfire_grace_time,
+ max_jitter=max_jitter,
+ tags=tags,
+ )
schedule.next_fire_time = trigger.next()
self.data_store.add_schedule(schedule, conflict_policy)
- self.logger.info('Added new schedule (task=%r, trigger=%r); next run time at %s', task,
- trigger, schedule.next_fire_time)
+ self.logger.info(
+ "Added new schedule (task=%r, trigger=%r); next run time at %s",
+ task,
+ trigger,
+ schedule.next_fire_time,
+ )
return schedule.id
def get_schedule(self, id: str) -> Schedule:
@@ -156,8 +184,12 @@ class Scheduler:
self.data_store.remove_schedules({schedule_id})
def add_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = None,
) -> UUID:
"""
Add a job to the data store.
@@ -175,7 +207,12 @@ class Scheduler:
else:
task = self.data_store.get_task(func_or_task_id)
- job = Job(task_id=task.id, args=args or (), kwargs=kwargs or {}, tags=tags or frozenset())
+ job = Job(
+ task_id=task.id,
+ args=args or (),
+ kwargs=kwargs or {},
+ tags=tags or frozenset(),
+ )
self.data_store.add_job(job)
return job.id
@@ -209,8 +246,12 @@ class Scheduler:
return result
def run_job(
- self, func_or_task_id: str | Callable, *, args: Iterable | None = None,
- kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = ()
+ self,
+ func_or_task_id: str | Callable,
+ *,
+ args: Iterable | None = None,
+ kwargs: Mapping[str, Any] | None = None,
+ tags: Iterable[str] | None = (),
) -> Any:
"""
Convenience method to add a job and then return its result (or raise its exception).
@@ -239,12 +280,14 @@ class Scheduler:
elif result.outcome is JobOutcome.cancelled:
raise JobCancelled
else:
- raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+ raise RuntimeError(f"Unknown job outcome: {result.outcome}")
def run(self) -> None:
if self._state is not RunState.starting:
- raise RuntimeError(f'This function cannot be called while the scheduler is in the '
- f'{self._state} state')
+ raise RuntimeError(
+ f"This function cannot be called while the scheduler is in the "
+ f"{self._state} state"
+ )
# Signal that the scheduler has started
self._state = RunState.started
@@ -253,8 +296,10 @@ class Scheduler:
try:
while self._state is RunState.started:
schedules = self.data_store.acquire_schedules(self.identity, 100)
- self.logger.debug('Processing %d schedules retrieved from the data store',
- len(schedules))
+ self.logger.debug(
+ "Processing %d schedules retrieved from the data store",
+ len(schedules),
+ )
now = datetime.now(timezone.utc)
for schedule in schedules:
# Calculate a next fire time for the schedule, if possible
@@ -265,8 +310,11 @@ class Scheduler:
fire_time = calculate_next()
except Exception:
self.logger.exception(
- 'Error computing next fire time for schedule %r of task %r – '
- 'removing schedule', schedule.id, schedule.task_id)
+ "Error computing next fire time for schedule %r of task %r – "
+ "removing schedule",
+ schedule.id,
+ schedule.task_id,
+ )
break
# Stop if the calculated fire time is in the future
@@ -281,7 +329,11 @@ class Scheduler:
fire_times[0] = fire_time
# Add one or more jobs to the job queue
- max_jitter = schedule.max_jitter.total_seconds() if schedule.max_jitter else 0
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
+ )
for i, fire_time in enumerate(fire_times):
# Calculate a jitter if max_jitter > 0
jitter = _zero_timedelta
@@ -294,19 +346,30 @@ class Scheduler:
if next_fire_time is not None:
# Jitter must never be so high that it would cause a fire time to
# equal or exceed the next fire time
- jitter_s = min([
- max_jitter,
- (next_fire_time - fire_time
- - _microsecond_delta).total_seconds()
- ])
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
jitter = timedelta(seconds=random.uniform(0, jitter_s))
fire_time += jitter
schedule.last_fire_time = fire_time
- job = Job(task_id=schedule.task_id, args=schedule.args,
- kwargs=schedule.kwargs, schedule_id=schedule.id,
- scheduled_fire_time=fire_time, jitter=jitter,
- start_deadline=schedule.next_deadline, tags=schedule.tags)
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
self.data_store.add_job(job)
# Update the schedules (and release the scheduler's claim on them)
@@ -321,23 +384,26 @@ class Scheduler:
wait_time = (
self._wakeup_deadline - datetime.now(timezone.utc)
).total_seconds()
- self.logger.debug('Sleeping %.3f seconds until the next fire time (%s)',
- wait_time, self._wakeup_deadline)
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ self._wakeup_deadline,
+ )
else:
- self.logger.debug('Waiting for any due schedules to appear')
+ self.logger.debug("Waiting for any due schedules to appear")
if self._wakeup_event.wait(wait_time):
self._wakeup_event = threading.Event()
else:
- self.logger.debug('Processing more schedules on the next iteration')
+ self.logger.debug("Processing more schedules on the next iteration")
except BaseException as exc:
self._state = RunState.stopped
- self.logger.exception('Scheduler crashed')
+ self.logger.exception("Scheduler crashed")
self._events.publish(SchedulerStopped(exception=exc))
raise
self._state = RunState.stopped
- self.logger.info('Scheduler stopped')
+ self.logger.info("Scheduler stopped")
self._events.publish(SchedulerStopped())
# def stop(self) -> None: