diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-08-13 23:06:08 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-08-13 23:06:08 +0300 |
commit | a9141bb5663e0a22cc7c4da7d34834c925dbadea (patch) | |
tree | 533ee4126b22d0b30b6c74106d7b4d3b89328757 /src/apscheduler | |
parent | ffbbbbe0ee147bba08e0c95b3697136590997fb1 (diff) | |
download | apscheduler-a9141bb5663e0a22cc7c4da7d34834c925dbadea.tar.gz |
Added job expiration times
Scheduled jobs no longer retain their results. All job outcomes are now logged by the workers.
Workers, rather than data stores, are now responsible for emitting the JobReleased event.
Diffstat (limited to 'src/apscheduler')
-rw-r--r-- | src/apscheduler/_structures.py | 27 | ||||
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 11 | ||||
-rw-r--r-- | src/apscheduler/datastores/memory.py | 23 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 10 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 17 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 20 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 20 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 62 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 46 |
9 files changed, 174 insertions, 62 deletions
diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index 6ceaaf6..ba3359b 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -158,6 +158,8 @@ class Job: (if the job was derived from a schedule) :param start_deadline: if the job is started in the worker after this time, it is considered to be misfired and will be aborted + :param result_expiration_time: minimum amount of time to keep the result available + for fetching in the data store :param tags: strings that can be used to categorize and filter the job :param created_at: the time at which the job was created :param started_at: the time at which the execution of the job was started @@ -181,6 +183,9 @@ class Job: eq=False, order=False, converter=as_timedelta, factory=timedelta ) start_deadline: datetime | None = attrs.field(eq=False, order=False, default=None) + result_expiration_time: timedelta = attrs.field( + eq=False, order=False, converter=as_timedelta, default=timedelta() + ) tags: frozenset[str] = attrs.field( eq=False, order=False, converter=frozenset, default=() ) @@ -277,9 +282,31 @@ class JobResult: finished_at: datetime = attrs.field( eq=False, order=False, factory=partial(datetime.now, timezone.utc) ) + expires_at: datetime = attrs.field(eq=False, order=False) exception: BaseException | None = attrs.field(eq=False, order=False, default=None) return_value: Any = attrs.field(eq=False, order=False, default=None) + @classmethod + def from_job( + cls, + job: Job, + outcome: JobOutcome, + *, + finished_at: datetime | None = None, + exception: BaseException | None = None, + return_value: Any = None, + ) -> JobResult: + real_finished_at = finished_at or datetime.now(timezone.utc) + expires_at = real_finished_at + job.result_expiration_time + return cls( + job_id=job.id, + outcome=outcome, + finished_at=real_finished_at, + expires_at=expires_at, + exception=exception, + return_value=return_value, + ) + def marshal(self, serializer: Serializer) -> dict[str, Any]: marshalled = attrs.asdict(self, value_serializer=serialize) if self.outcome is JobOutcome.error: diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index 056d282..74742f8 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -557,12 +557,13 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore): async for attempt in self._retry(): with attempt: async with self.engine.begin() as conn: - # Insert the job result - marshalled = result.marshal(self.serializer) - insert = self.t_job_results.insert().values(**marshalled) - await conn.execute(insert) + # Record the job result + if result.expires_at > result.finished_at: + marshalled = result.marshal(self.serializer) + insert = self.t_job_results.insert().values(**marshalled) + await conn.execute(insert) - # Decrement the running jobs counter + # Decrement the number of running jobs for this task update = ( self.t_tasks.update() .values(running_jobs=self.t_tasks.c.running_jobs - 1) diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 2c96095..fb29ad9 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -13,7 +13,6 @@ from .._enums import ConflictPolicy from .._events import ( JobAcquired, JobAdded, - JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, @@ -296,26 +295,20 @@ class MemoryDataStore(BaseDataStore): return jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: - # Delete the job - job_state = self._jobs_by_id.pop(result.job_id) - self._jobs_by_task_id[task_id].remove(job_state) - index = self._find_job_index(job_state) - del self._jobs[index] + # Record the job result + if result.expires_at > result.finished_at: + self._job_results[result.job_id] = result # Decrement the number of running jobs for this task task_state = self._tasks.get(task_id) if task_state is not None: task_state.running_jobs -= 1 - # Record the result - self._job_results[result.job_id] = result - - # Publish the event - self._events.publish( - JobReleased( - job_id=result.job_id, worker_id=worker_id, outcome=result.outcome - ) - ) + # Delete the job + job_state = self._jobs_by_id.pop(result.job_id) + self._jobs_by_task_id[task_id].remove(job_state) + index = self._find_job_index(job_state) + del self._jobs[index] def get_job_result(self, job_id: UUID) -> JobResult | None: return self._job_results.pop(job_id, None) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 1fb7d96..de209ba 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -158,6 +158,7 @@ class MongoDBDataStore(BaseDataStore): self._jobs.create_index("created_at", session=session) self._jobs.create_index("tags", session=session) self._jobs_results.create_index("finished_at", session=session) + self._jobs_results.create_index("expires_at", session=session) def add_task(self, task: Task) -> None: for attempt in self._retry(): @@ -495,10 +496,11 @@ class MongoDBDataStore(BaseDataStore): def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: for attempt in self._retry(): with attempt, self.client.start_session() as session: - # Insert the job result - document = result.marshal(self.serializer) - document["_id"] = document.pop("job_id") - self._jobs_results.insert_one(document, session=session) + # Record the job result + if result.expires_at > result.finished_at: + document = result.marshal(self.serializer) + document["_id"] = document.pop("job_id") + self._jobs_results.insert_one(document, session=session) # Decrement the running jobs counter self._tasks.find_one_and_update( diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 01e31bb..9c7c905 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -37,7 +37,6 @@ from .._events import ( JobAcquired, JobAdded, JobDeserializationFailed, - JobReleased, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, @@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore: Column("scheduled_fire_time", timestamp_type), Column("jitter", interval_type), Column("start_deadline", timestamp_type), + Column("result_expiration_time", interval_type), Column("tags", tags_type, nullable=False), Column("created_at", timestamp_type, nullable=False), Column("started_at", timestamp_type), @@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore: Column("job_id", job_id_type, primary_key=True), Column("outcome", Enum(JobOutcome), nullable=False), Column("finished_at", timestamp_type, index=True), + Column("expires_at", timestamp_type, nullable=False, index=True), Column("exception", LargeBinary), Column("return_value", LargeBinary), ) @@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): for attempt in self._retry(): with attempt, self.engine.begin() as conn: # Insert the job result - marshalled = result.marshal(self.serializer) - insert = self.t_job_results.insert().values(**marshalled) - conn.execute(insert) + if result.expires_at > result.finished_at: + marshalled = result.marshal(self.serializer) + insert = self.t_job_results.insert().values(**marshalled) + conn.execute(insert) # Decrement the running jobs counter update = ( @@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id) conn.execute(delete) - # Publish the event - self._events.publish( - JobReleased( - job_id=result.job_id, worker_id=worker_id, outcome=result.outcome - ) - ) - def get_job_result(self, job_id: UUID) -> JobResult | None: for attempt in self._retry(): with attempt, self.engine.begin() as conn: diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 6c60bdd..48afe88 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -202,6 +202,7 @@ class AsyncScheduler: args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None, + result_expiration_time: timedelta | float = 0, ) -> UUID: """ Add a job to the data store. @@ -209,7 +210,10 @@ class AsyncScheduler: :param func_or_task_id: :param args: positional arguments to call the target callable with :param kwargs: keyword arguments to call the target callable with - :param tags: + :param tags: strings that can be used to categorize and filter the job + :param result_expiration_time: the minimum time (as seconds, or timedelta) to + keep the result of the job available for fetching (the result won't be + saved at all if that time is 0) :return: the ID of the newly created job """ @@ -224,6 +228,7 @@ class AsyncScheduler: args=args or (), kwargs=kwargs or {}, tags=tags or frozenset(), + result_expiration_time=result_expiration_time, ) await self.data_store.add_job(job) return job.id @@ -235,7 +240,8 @@ class AsyncScheduler: :param job_id: the ID of the job :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to raise an exception if the result is not yet available - :raises JobLookupError: if the job does not exist in the data store + :raises JobLookupError: if ``wait=False`` and the job result does not exist in + the data store """ wait_event = anyio.Event() @@ -253,9 +259,7 @@ class AsyncScheduler: await wait_event.wait() - result = await self.data_store.get_job_result(job_id) - assert isinstance(result, JobResult) - return result + return await self.data_store.get_job_result(job_id) async def run_job( self, @@ -287,7 +291,11 @@ class AsyncScheduler: job_id: UUID | None = None with self.data_store.events.subscribe(listener, {JobReleased}): job_id = await self.add_job( - func_or_task_id, args=args, kwargs=kwargs, tags=tags + func_or_task_id, + args=args, + kwargs=kwargs, + tags=tags, + result_expiration_time=timedelta(minutes=15), ) await job_complete_event.wait() diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index aea7047..7ed6d3f 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -234,6 +234,7 @@ class Scheduler: args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, tags: Iterable[str] | None = None, + result_expiration_time: timedelta | float = 0, ) -> UUID: """ Add a job to the data store. @@ -243,6 +244,9 @@ class Scheduler: :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function :param tags: strings that can be used to categorize and filter the job + :param result_expiration_time: the minimum time (as seconds, or timedelta) to + keep the result of the job available for fetching (the result won't be + saved at all if that time is 0) :return: the ID of the newly created job """ @@ -258,6 +262,7 @@ class Scheduler: args=args or (), kwargs=kwargs or {}, tags=tags or frozenset(), + result_expiration_time=result_expiration_time, ) self.data_store.add_job(job) return job.id @@ -269,7 +274,8 @@ class Scheduler: :param job_id: the ID of the job :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to raise an exception if the result is not yet available - :raises JobLookupError: if the job does not exist in the data store + :raises JobLookupError: if ``wait=False`` and the job result does not exist in + the data store """ self._ensure_services_ready() @@ -288,9 +294,7 @@ class Scheduler: wait_event.wait() - result = self.data_store.get_job_result(job_id) - assert isinstance(result, JobResult) - return result + return self.data_store.get_job_result(job_id) def run_job( self, @@ -322,7 +326,13 @@ class Scheduler: job_id: UUID | None = None with self.data_store.events.subscribe(listener, {JobReleased}): - job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + job_id = self.add_job( + func_or_task_id, + args=args, + kwargs=kwargs, + tags=tags, + result_expiration_time=timedelta(minutes=15), + ) job_complete_event.wait() result = self.get_job_result(job_id) diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index a44c89c..b04eea1 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -23,7 +23,7 @@ from anyio.abc import CancelScope, TaskGroup from .._context import current_job, current_worker from .._converters import as_async_datastore, as_async_eventbroker from .._enums import JobOutcome, RunState -from .._events import JobAdded, WorkerStarted, WorkerStopped +from .._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped from .._structures import Job, JobInfo, JobResult from .._validators import positive_integer from ..abc import AsyncDataStore, AsyncEventBroker @@ -172,10 +172,17 @@ class AsyncWorker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - result = JobResult( - job_id=job.id, outcome=JobOutcome.missed_start_deadline + result = JobResult.from_job( + job, + outcome=JobOutcome.missed_start_deadline, + finished_at=start_time, ) await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) return token = current_job.set(JobInfo.from_job(job)) @@ -184,23 +191,60 @@ class AsyncWorker: if isawaitable(retval): retval = await retval except get_cancelled_exc_class(): + self.logger.info("Job %s was cancelled", job.id) with CancelScope(shield=True): - result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled) + result = JobResult.from_job( + job, + outcome=JobOutcome.cancelled, + ) await self.data_store.release_job( self.identity, job.task_id, result ) + await self.event_broker.publish( + JobReleased( + job_id=job.id, + worker_id=self.identity, + outcome=result.outcome, + ) + ) except BaseException as exc: - result = JobResult( - job_id=job.id, outcome=JobOutcome.error, exception=exc + if isinstance(exc, Exception): + self.logger.exception("Job %s raised an exception", job.id) + else: + self.logger.error( + "Job %s was aborted due to %s", job.id, exc.__class__.__name__ + ) + + result = JobResult.from_job( + job, + JobOutcome.error, + exception=exc, + ) + await self.data_store.release_job( + self.identity, + job.task_id, + result, + ) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) - await self.data_store.release_job(self.identity, job.task_id, result) if not isinstance(exc, Exception): raise else: - result = JobResult( - job_id=job.id, outcome=JobOutcome.success, return_value=retval + self.logger.info("Job %s completed successfully", job.id) + result = JobResult.from_job( + job, + JobOutcome.success, + return_value=retval, ) await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) finally: current_job.reset(token) finally: diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 1d8f85a..8c627c6 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -15,6 +15,7 @@ from uuid import UUID import attrs +from .. import JobReleased from .._context import current_job, current_worker from .._enums import JobOutcome, RunState from .._events import JobAdded, WorkerStarted, WorkerStopped @@ -200,8 +201,13 @@ class Worker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - result = JobResult( - job_id=job.id, outcome=JobOutcome.missed_start_deadline + result = JobResult.from_job( + job, JobOutcome.missed_start_deadline, finished_at=start_time + ) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) self.data_store.release_job(self.identity, job.task_id, result) return @@ -210,17 +216,43 @@ class Worker: try: retval = func(*job.args, **job.kwargs) except BaseException as exc: - result = JobResult( - job_id=job.id, outcome=JobOutcome.error, exception=exc + if isinstance(exc, Exception): + self.logger.exception("Job %s raised an exception", job.id) + else: + self.logger.error( + "Job %s was aborted due to %s", job.id, exc.__class__.__name__ + ) + + result = JobResult.from_job( + job, + JobOutcome.error, + exception=exc, + ) + self.data_store.release_job( + self.identity, + job.task_id, + result, + ) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) ) - self.data_store.release_job(self.identity, job.task_id, result) if not isinstance(exc, Exception): raise else: - result = JobResult( - job_id=job.id, outcome=JobOutcome.success, return_value=retval + self.logger.info("Job %s completed successfully", job.id) + result = JobResult.from_job( + job, + JobOutcome.success, + return_value=retval, ) self.data_store.release_job(self.identity, job.task_id, result) + self.event_broker.publish( + JobReleased( + job_id=job.id, worker_id=self.identity, outcome=result.outcome + ) + ) finally: current_job.reset(token) finally: |