diff options
-rw-r--r-- | src/apscheduler/_structures.py | 27 | ||||
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 11 | ||||
-rw-r--r-- | src/apscheduler/datastores/memory.py | 23 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 10 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 17 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 20 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 20 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 62 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 46 | ||||
-rw-r--r-- | tests/test_datastores.py | 73 | ||||
-rw-r--r-- | tests/test_schedulers.py | 63 |
11 files changed, 273 insertions, 99 deletions
diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index 6ceaaf6..ba3359b 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -158,6 +158,8 @@ class Job: (if the job was derived from a schedule) :param start_deadline: if the job is started in the worker after this time, it is considered to be misfired and will be aborted + :param result_expiration_time: minimum amount of time to keep the result available + for fetching in the data store :param tags: strings that can be used to categorize and filter the job :param created_at: the time at which the job was created :param started_at: the time at which the execution of the job was started @@ -181,6 +183,9 @@ class Job: eq=False, order=False, converter=as_timedelta, factory=timedelta ) start_deadline: datetime | None = attrs.field(eq=False, order=False, default=None) + result_expiration_time: timedelta = attrs.field( + eq=False, order=False, converter=as_timedelta, default=timedelta() + ) tags: frozenset[str] = attrs.field( eq=False, order=False, converter=frozenset, default=() ) @@ -277,9 +282,31 @@ class JobResult: finished_at: datetime = attrs.field( eq=False, order=False, factory=partial(datetime.now, timezone.utc) ) + expires_at: datetime = attrs.field(eq=False, order=False) exception: BaseException | None = attrs.field(eq=False, order=False, default=None) return_value: Any = attrs.field(eq=False, order=False, default=None) + @classmethod + def from_job( + cls, + job: Job, + outcome: JobOutcome, + *, + finished_at: datetime | None = None, + exception: BaseException | None = None, + return_value: Any = None, + ) -> JobResult: + real_finished_at = finished_at or datetime.now(timezone.utc) + expires_at = real_finished_at + job.result_expiration_time + return cls( + job_id=job.id, + outcome=outcome, + finished_at=real_finished_at, + expires_at=expires_at, + exception=exception, + return_value=return_value, + ) + def marshal(self, serializer: Serializer) -> dict[str, Any]: marshalled = attrs.asdict(self, value_serializer=serialize) if self.outcome is JobOutcome.error: diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index 056d282..74742f8 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -557,12 +557,13 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore): async for attempt in self._retry(): with attempt: async with self.engine.begin() as conn: - # Insert the job result - marshalled = result.marshal(self.serializer) - insert = self.t_job_results.insert().values(**marshalled) - await conn.execute(insert) + # Record the job result + if result.expires_at > result.finished_at: + marshalled = result.marshal(self.serializer) + insert = self.t_job_results.insert().values(**marshalled) + await conn.execute(insert) - # Decrement the running jobs counter + # Decrement the number of running jobs for this task update = ( self.t_tasks.update() .values(running_jobs=self.t_tasks.c.running_jobs - 1) diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 2c96095..fb29ad9 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -13,7 +13,6 @@ from .._enums import ConflictPolicy from .._events import ( JobAcquired, JobAdded, - JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, @@ -296,26 +295,20 @@ class MemoryDataStore(BaseDataStore): return jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: - # Delete the job - job_state = self._jobs_by_id.pop(result.job_id) - self._jobs_by_task_id[task_id].remove(job_state) - index = self._find_job_index(job_state) - del self._jobs[index] + # Record the job result + if result.expires_at > result.finished_at: + self._job_results[result.job_id] = result # Decrement the number of running jobs for this task task_state = self._tasks.get(task_id) if task_state is not None: task_state.running_jobs -= 1 - # Record the result - self._job_results[result.job_id] = result - - # Publish the event - self._events.publish( - JobReleased( - job_id=result.job_id, worker_id=worker_id, outcome=result.outcome - ) - ) + # Delete the job + job_state = self._jobs_by_id.pop(result.job_id) + self._jobs_by_task_id[task_id].remove(job_state) + index = self._find_job_index(job_state) + del self._jobs[index] def get_job_result(self, job_id: UUID) -> JobResult | None: return self._job_results.pop(job_id, None) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 1fb7d96..de209ba 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -158,6 +158,7 @@ class MongoDBDataStore(BaseDataStore): self._jobs.create_index("created_at", session=session) self._jobs.create_index("tags", session=session) self._jobs_results.create_index("finished_at", session=session) + self._jobs_results.create_index("expires_at", session=session) def add_task(self, task: Task) -> None: for attempt in self._retry(): @@ -495,10 +496,11 @@ class MongoDBDataStore(BaseDataStore): def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: for attempt in self._retry(): with attempt, self.client.start_session() as session: - # Insert the job result - document = result.marshal(self.serializer) - document["_id"] = document.pop("job_id") - self._jobs_results.insert_one(document, session=session) + # Record the job result + if result.expires_at > result.finished_at: + document = result.marshal(self.serializer) + document["_id"] = document.pop("job_id") + self._jobs_results.insert_one(document, session=session) # Decrement the running jobs counter self._tasks.find_one_and_update( diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 01e31bb..9c7c905 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -37,7 +37,6 @@ from .._events import ( JobAcquired, JobAdded, JobDeserializationFailed, - JobReleased, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, @@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore: Column("scheduled_fire_time", timestamp_type), Column("jitter", interval_type), Column("start_deadline", timestamp_type), + Column("result_expiration_time", interval_type), Column("tags", tags_type, nullable=False), Column("created_at", timestamp_type, nullable=False), Column("started_at", timestamp_type), @@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore: Column("job_id", job_id_type, primary_key=True), Column("outcome", Enum(JobOutcome), nullable=False), Column("finished_at", timestamp_type, index=True), + Column("expires_at", timestamp_type, nullable=False, index=True), Column("exception", LargeBinary), Column("return_value", LargeBinary), ) @@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): for attempt in self._retry(): with attempt, self.engine.begin() as conn: # Insert the job result - marshalled = result.marshal(self.serializer) - insert = self.t_job_results.insert().values(**marshalled) - conn.execute(insert) + if result.expires_at > result.finished_at: + marshalled = result.marshal(self.serializer) + insert = self.t_job_results.insert().values(**marshalled) + conn.execute(insert) # Decrement the running jobs counter update = ( @@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id) conn.execute(delete) - # Publish the event - self._events.publish( - JobReleased( - job_id=result.job_id, worker_id=worker_id, outcome=result.outcome - ) - ) - def get_job_result(self, job_id: UUID) -> JobResult | None: for attempt in self._retry(): with attempt, self.engine.begin() as conn: diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6c60bdd..48afe88 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -202,6 +202,7 @@ class AsyncScheduler: args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None, + result_expiration_time: timedelta | float = 0, ) -> UUID: """ Add a job to the data store. @@ -209,7 +210,10 @@ class AsyncScheduler: :param func_or_task_id: :param args: positional arguments to call the target callable with :param kwargs: keyword arguments to call the target callable with - :param tags: + :param tags: strings that can be used to categorize and filter the job + :param result_expiration_time: the minimum time (as seconds, or timedelta) to + keep the result of the job available for fetching (the result won't be + saved at all if that time is 0) :return: the ID of the newly created job """ @@ -224,6 +228,7 @@ class AsyncScheduler: args=args or (), kwargs=kwargs or {}, tags=tags or frozenset(), + result_expiration_time=result_expiration_time, ) await self.data_store.add_job(job) return job.id @@ -235,7 +240,8 @@ class AsyncScheduler: :param job_id: the ID of the job :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to raise an exception if the result is not yet available - :raises JobLookupError: if the job does not exist in the data store + :raises JobLookupError: if ``wait=False`` and the job result does not exist in + the data store """ wait_event = anyio.Event() @@ -253,9 +259,7 @@ class AsyncScheduler: await wait_event.wait() - result = await self.data_store.get_job_result(job_id) - assert isinstance(result, JobResult) - return result + return await self.data_store.get_job_result(job_id) async def run_job( self, @@ -287,7 +291,11 @@ class AsyncScheduler: job_id: UUID | None = None with self.data_store.events.subscribe(listener, {JobReleased}): job_id = await self.add_job( - func_or_task_id, args=args, kwargs=kwargs, tags=tags + func_or_task_id, + args=args, + kwargs=kwargs, + tags=tags, + result_expiration_time=timedelta(minutes=15), ) await job_complete_event.wait() diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index aea7047..7ed6d3f 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -234,6 +234,7 @@ class Scheduler: args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None, + result_expiration_time: timedelta | float = 0, ) -> UUID: """ Add a job to the data store. @@ -243,6 +244,9 @@ class Scheduler: :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function :param tags: strings that can be used to categorize and filter the job + :param result_expiration_time: the minimum time (as seconds, or timedelta) to + keep the result of the job available for fetching (the result won't be + saved at all if that time is 0) :return: the ID of the newly created job """ @@ -258,6 +262,7 @@ class Scheduler: args=args or (), kwargs=kwargs or {}, tags=tags or frozenset(), + result_expiration_time=result_expiration_time, ) self.data_store.add_job(job) return job.id @@ -269,7 +274,8 @@ class Scheduler: :param job_id: the ID of the job :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to raise an exception if the result is not yet available - :raises JobLookupError: if the job does not exist in the data store + :raises JobLookupError: if ``wait=False`` and the job result does not exist in + the data store """ self._ensure_services_ready() @@ -288,9 +294,7 @@ class Scheduler: wait_event.wait() - result = self.data_store.get_job_result(job_id) - assert isinstance(result, JobResult) - return result + return self.data_store.get_job_result(job_id) def run_job( self, @@ -322,7 +326,13 @@ class Scheduler: job_id: UUID | None = None with self.data_store.events.subscribe(listener, {JobReleased}): - job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + job_id = self.add_job( + func_or_task_id, + args=args, + kwargs=kwargs, + tags=tags, + result_expiration_time=timedelta(minutes=15), + ) job_complete_event.wait() result = self.get_job_result(job_id) diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index a44c89c..b04eea1 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -23,7 +23,7 @@ from anyio.abc import CancelScope, TaskGroup from .._context import current_job, current_worker from .._converters import as_async_datastore, as_async_eventbroker from .._enums import JobOutcome, RunState -from .._events import JobAdded, WorkerStarted, WorkerStopped +from .._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped from .._structures import Job, JobInfo, JobResult from .._validators import positive_integer from ..abc import AsyncDataStore, AsyncEventBroker @@ -172,10 +172,17 @@ class AsyncWorker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - result = JobResult( - job_id=job.id, outcome=JobOutcome.missed_start_deadline + result = JobResult.from_job( + job, + outcome=JobOutcome.missed_start_deadline, + finished_at=start_time, ) await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) return token = current_job.set(JobInfo.from_job(job)) @@ -184,23 +191,60 @@ class AsyncWorker: if isawaitable(retval): retval = await retval except get_cancelled_exc_class(): + self.logger.info("Job %s was cancelled", job.id) with CancelScope(shield=True): - result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled) + result = JobResult.from_job( + job, + outcome=JobOutcome.cancelled, + ) await self.data_store.release_job( self.identity, job.task_id, result ) + await self.event_broker.publish( + JobReleased( + job_id=job.id, + worker_id=self.identity, + outcome=result.outcome, + ) + ) except BaseException as exc: - result = JobResult( - job_id=job.id, outcome=JobOutcome.error, exception=exc + if isinstance(exc, Exception): + self.logger.exception("Job %s raised an exception", job.id) + else: + self.logger.error( + "Job %s was aborted due to %s", job.id, exc.__class__.__name__ + ) + + result = JobResult.from_job( + job, + JobOutcome.error, + exception=exc, + ) + await self.data_store.release_job( + self.identity, + job.task_id, + result, + ) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) - await self.data_store.release_job(self.identity, job.task_id, result) if not isinstance(exc, Exception): raise else: - result = JobResult( - job_id=job.id, outcome=JobOutcome.success, return_value=retval + self.logger.info("Job %s completed successfully", job.id) + result = JobResult.from_job( + job, + JobOutcome.success, + return_value=retval, ) await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) finally: current_job.reset(token) finally: diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 1d8f85a..8c627c6 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -15,6 +15,7 @@ from uuid import UUID import attrs +from .. import JobReleased from .._context import current_job, current_worker from .._enums import JobOutcome, RunState from .._events import JobAdded, WorkerStarted, WorkerStopped @@ -200,8 +201,13 @@ class Worker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - result = JobResult( - job_id=job.id, outcome=JobOutcome.missed_start_deadline + result = JobResult.from_job( + job, JobOutcome.missed_start_deadline, finished_at=start_time + ) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) self.data_store.release_job(self.identity, job.task_id, result) return @@ -210,17 +216,43 @@ class Worker: try: retval = func(*job.args, **job.kwargs) except BaseException as exc: - result = JobResult( - job_id=job.id, outcome=JobOutcome.error, exception=exc + if isinstance(exc, Exception): + self.logger.exception("Job %s raised an exception", job.id) + else: + self.logger.error( + "Job %s was aborted due to %s", job.id, exc.__class__.__name__ + ) + + result = JobResult.from_job( + job, + JobOutcome.error, + exception=exc, + ) + self.data_store.release_job( + self.identity, + job.task_id, + result, + ) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) - self.data_store.release_job(self.identity, job.task_id, result) if not isinstance(exc, Exception): raise else: - result = JobResult( - job_id=job.id, outcome=JobOutcome.success, return_value=retval + self.logger.info("Job %s completed successfully", job.id) + result = JobResult.from_job( + job, + JobOutcome.success, + return_value=retval, ) self.data_store.release_job(self.identity, job.task_id, result) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) finally: current_job.reset(token) finally: diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 22722c7..40fad27 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -3,7 +3,7 @@ from __future__ import annotations import threading from collections.abc import Generator from contextlib import asynccontextmanager, contextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from tempfile import TemporaryDirectory from typing import Any, AsyncGenerator, cast @@ -433,7 +433,7 @@ class TestDataStores: def test_job_release_success(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -443,9 +443,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -460,7 +460,7 @@ class TestDataStores: def test_job_release_failure(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -470,9 +470,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -488,7 +488,7 @@ class TestDataStores: def test_job_release_missed_deadline(self, datastore: DataStore): datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -498,7 +498,10 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -511,7 +514,7 @@ class TestDataStores: def test_job_release_cancelled(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker1", 2) @@ -521,7 +524,10 @@ class TestDataStores: datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job( + acquired[0], + JobOutcome.cancelled, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -576,9 +582,9 @@ class TestDataStores: datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) @@ -890,7 +896,7 @@ class TestAsyncDataStores: async def test_job_release_success(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -900,9 +906,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -917,7 +923,7 @@ class TestAsyncDataStores: async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -927,9 +933,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -945,7 +951,7 @@ class TestAsyncDataStores: async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -955,7 +961,10 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -968,7 +977,7 @@ class TestAsyncDataStores: async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker1", 2) @@ -978,7 +987,7 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job(acquired[0], JobOutcome.cancelled), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -998,7 +1007,7 @@ class TestAsyncDataStores: """ await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) # First, one worker acquires the first available job @@ -1035,9 +1044,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 32f28dc..f57cbeb 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -17,6 +17,7 @@ from apscheduler import ( JobAdded, JobLookupError, JobOutcome, + JobReleased, Schedule, ScheduleAdded, ScheduleLookupError, @@ -181,16 +182,33 @@ class TestAsyncScheduler: async def test_get_job_result_success(self) -> None: async with AsyncScheduler() as scheduler: - job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) + job_id = await scheduler.add_job( + dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5 + ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + async def test_get_job_result_success_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ) + job_id = await scheduler.add_job(dummy_async_job) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_error(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job( - dummy_async_job, kwargs={"delay": 0.2, "fail": True} + dummy_async_job, + kwargs={"delay": 0.2, "fail": True}, + result_expiration_time=5, ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -198,6 +216,17 @@ class TestAsyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + async def test_get_job_result_error_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True) + job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_nowait_not_yet_ready(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) @@ -239,6 +268,7 @@ class TestAsyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) await scheduler.data_store.add_job(job) result = await scheduler.get_job_result(job.id) @@ -371,17 +401,29 @@ class TestSyncScheduler: ) assert jobs[0].original_scheduled_time == orig_start_time - def test_get_job_result(self) -> None: + def test_get_job_result_success(self) -> None: with Scheduler() as scheduler: - job_id = scheduler.add_job(dummy_sync_job) + job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5) result = scheduler.get_job_result(job_id) assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + def test_get_job_result_success_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler: + with scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job) + event.wait(3) + + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_error(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job( - dummy_sync_job, kwargs={"delay": 0.2, "fail": True} + dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5 ) result = scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -389,6 +431,16 @@ class TestSyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + def test_get_job_result_error_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler, scheduler.event_broker.subscribe( + lambda evt: event.set(), one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + event.wait(3) + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_nowait_not_yet_ready(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2}) @@ -428,6 +480,7 @@ class TestSyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) scheduler.data_store.add_job(job) result = scheduler.get_job_result(job.id) |