summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
commita9141bb5663e0a22cc7c4da7d34834c925dbadea (patch)
tree533ee4126b22d0b30b6c74106d7b4d3b89328757 /src/apscheduler/schedulers
parentffbbbbe0ee147bba08e0c95b3697136590997fb1 (diff)
downloadapscheduler-a9141bb5663e0a22cc7c4da7d34834c925dbadea.tar.gz
Added job expiration times
Scheduled jobs no longer retain their results. All job outcomes are now logged by the workers. Workers, rather than data stores, are now responsible for emitting the JobReleased event.
Diffstat (limited to 'src/apscheduler/schedulers')
-rw-r--r--src/apscheduler/schedulers/async_.py20
-rw-r--r--src/apscheduler/schedulers/sync.py20
2 files changed, 29 insertions, 11 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6c60bdd..48afe88 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -202,6 +202,7 @@ class AsyncScheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -209,7 +210,10 @@ class AsyncScheduler:
:param func_or_task_id:
:param args: positional arguments to call the target callable with
:param kwargs: keyword arguments to call the target callable with
- :param tags:
+ :param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -224,6 +228,7 @@ class AsyncScheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
await self.data_store.add_job(job)
return job.id
@@ -235,7 +240,8 @@ class AsyncScheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
wait_event = anyio.Event()
@@ -253,9 +259,7 @@ class AsyncScheduler:
await wait_event.wait()
- result = await self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return await self.data_store.get_job_result(job_id)
async def run_job(
self,
@@ -287,7 +291,11 @@ class AsyncScheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
job_id = await self.add_job(
- func_or_task_id, args=args, kwargs=kwargs, tags=tags
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
)
await job_complete_event.wait()
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index aea7047..7ed6d3f 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -234,6 +234,7 @@ class Scheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -243,6 +244,9 @@ class Scheduler:
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -258,6 +262,7 @@ class Scheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
self.data_store.add_job(job)
return job.id
@@ -269,7 +274,8 @@ class Scheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
self._ensure_services_ready()
@@ -288,9 +294,7 @@ class Scheduler:
wait_event.wait()
- result = self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return self.data_store.get_job_result(job_id)
def run_job(
self,
@@ -322,7 +326,13 @@ class Scheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_id = self.add_job(
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
+ )
job_complete_event.wait()
result = self.get_job_result(job_id)