diff options
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 61 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 85 |
2 files changed, 128 insertions, 18 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 90d041c..bc90c05 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -82,6 +82,11 @@ class AsyncScheduler: ) self._wakeup_event.set() + @property + def state(self) -> RunState: + """The current running state of the scheduler.""" + return self._state + async def add_schedule( self, func_or_task_id: str | Callable, @@ -96,6 +101,30 @@ class AsyncScheduler: tags: Iterable[str] | None = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, ) -> str: + """ + Schedule a task to be run one or more times in the future. + + :param func_or_task_id: either a callable or an ID of an existing task + definition + :param trigger: determines the times when the task should be run + :param id: an explicit identifier for the schedule (if omitted, a random, UUID + based ID will be assigned) + :param args: positional arguments to be passed to the task function + :param kwargs: keyword arguments to be passed to the task function + :param coalesce: if ``True``, only one job will be created when the schedule is + due even if multiple run times have passed since the last processing of the + schedule + :param misfire_grace_time: maximum number of seconds the scheduled job's actual + run time is allowed to be late, compared to the scheduled run time + :param max_jitter: maximum number of seconds to randomly add to the scheduled + time for each job created from this schedule + :param tags: strings that can be used to categorize and filter the schedule and + its derivative jobs + :param conflict_policy: determines what to do if a schedule with the same ID + already exists in the data store + :return: the ID of the newly added schedule + + """ id = id or str(uuid4()) args = tuple(args or ()) kwargs = dict(kwargs or {}) @@ -131,11 +160,24 @@ class AsyncScheduler: return schedule.id async def get_schedule(self, id: str) -> Schedule: + """ + Retrieve a schedule from the data store. + + :param id: the unique identifier of the schedule + :raises ScheduleLookupError: if the schedule could not be found + + """ schedules = await self.data_store.get_schedules({id}) return schedules[0] - async def remove_schedule(self, schedule_id: str) -> None: - await self.data_store.remove_schedules({schedule_id}) + async def remove_schedule(self, id: str) -> None: + """ + Remove the given schedule from the data store. + + :param id: the unique identifier of the schedule + + """ + await self.data_store.remove_schedules({id}) async def add_job( self, @@ -175,8 +217,8 @@ class AsyncScheduler: Retrieve the result of a job. :param job_id: the ID of the job - :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to - raise an exception if the result is not yet available + :param wait: if ``True``, wait until the job has ended (one way or another), + ``False`` to raise an exception if the result is not yet available :raises JobLookupError: if the job does not exist in the data store """ @@ -208,9 +250,16 @@ class AsyncScheduler: tags: Iterable[str] | None = (), ) -> Any: """ - Convenience method to add a job and then return its result (or raise its exception). + Convenience method to add a job and then return its result. + + If the job raised an exception, that exception will be reraised here. - :returns: the return value of the target function + :param func_or_task_id: either a callable or an ID of an existing task + definition + :param args: positional arguments to be passed to the task function + :param kwargs: keyword arguments to be passed to the task function + :param tags: strings that can be used to categorize and filter the job + :returns: the return value of the task function """ job_complete_event = anyio.Event() diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index af689cc..3f59b3f 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -59,10 +59,6 @@ class Scheduler: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" - @property - def state(self) -> RunState: - return self._state - def __enter__(self) -> Scheduler: self.start_in_background() return self @@ -114,6 +110,11 @@ class Scheduler: self._thread.join() self._thread = None + @property + def state(self) -> RunState: + """The current running state of the scheduler.""" + return self._state + def add_schedule( self, func_or_task_id: str | Callable, @@ -128,6 +129,30 @@ class Scheduler: tags: Iterable[str] | None = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing, ) -> str: + """ + Schedule a task to be run one or more times in the future. + + :param func_or_task_id: either a callable or an ID of an existing task + definition + :param trigger: determines the times when the task should be run + :param id: an explicit identifier for the schedule (if omitted, a random, UUID + based ID will be assigned) + :param args: positional arguments to be passed to the task function + :param kwargs: keyword arguments to be passed to the task function + :param coalesce: if ``True``, only one job will be created when the schedule is + due even if multiple run times have passed since the last processing of the + schedule + :param misfire_grace_time: maximum number of seconds the scheduled job's actual + run time is allowed to be late, compared to the scheduled run time + :param max_jitter: maximum number of seconds to randomly add to the scheduled + time for each job created from this schedule + :param tags: strings that can be used to categorize and filter the schedule and + its derivative jobs + :param conflict_policy: determines what to do if a schedule with the same ID + already exists in the data store + :return: the ID of the newly added schedule + + """ self._ensure_services_ready() id = id or str(uuid4()) args = tuple(args or ()) @@ -164,13 +189,26 @@ class Scheduler: return schedule.id def get_schedule(self, id: str) -> Schedule: + """ + Retrieve a schedule from the data store. + + :param id: the unique identifier of the schedule + :raises ScheduleLookupError: if the schedule could not be found + + """ self._ensure_services_ready() schedules = self.data_store.get_schedules({id}) return schedules[0] - def remove_schedule(self, schedule_id: str) -> None: + def remove_schedule(self, id: str) -> None: + """ + Remove the given schedule from the data store. + + :param id: the unique identifier of the schedule + + """ self._ensure_services_ready() - self.data_store.remove_schedules({schedule_id}) + self.data_store.remove_schedules({id}) def add_job( self, @@ -183,10 +221,11 @@ class Scheduler: """ Add a job to the data store. - :param func_or_task_id: - :param args: positional arguments to call the target callable with - :param kwargs: keyword arguments to call the target callable with - :param tags: + :param func_or_task_id: either a callable or an ID of an existing task + definition + :param args: positional arguments to be passed to the task function + :param kwargs: keyword arguments to be passed to the task function + :param tags: strings that can be used to categorize and filter the job :return: the ID of the newly created job """ @@ -245,9 +284,16 @@ class Scheduler: tags: Iterable[str] | None = (), ) -> Any: """ - Convenience method to add a job and then return its result (or raise its exception). + Convenience method to add a job and then return its result. - :returns: the return value of the target function + If the job raised an exception, that exception will be reraised here. + + :param func_or_task_id: either a callable or an ID of an existing task + definition + :param args: positional arguments to be passed to the task function + :param kwargs: keyword arguments to be passed to the task function + :param tags: strings that can be used to categorize and filter the job + :returns: the return value of the task function """ self._ensure_services_ready() @@ -275,6 +321,15 @@ class Scheduler: raise RuntimeError(f"Unknown job outcome: {result.outcome}") def start_in_background(self) -> None: + """ + Launch the scheduler in a new thread. + + This method registers :mod:`atexit` hooks to shut down the scheduler and wait + for the thread to finish. + + :raises RuntimeError: if the scheduler is not in the ``stopped`` state + + """ with self._lock: if self._state is not RunState.stopped: raise RuntimeError( @@ -333,6 +388,12 @@ class Scheduler: event.wait() def run_until_stopped(self) -> None: + """ + Run the scheduler (and its internal worker) until it is explicitly stopped. + + This method will only return if :meth:`stop` is called. + + """ with self._lock: if self._state is not RunState.stopped: raise RuntimeError( |