diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-13 01:07:08 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-13 01:07:44 +0300 |
commit | 8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (patch) | |
tree | 2a172cf3bed738b4ba969cf08c86f7a8e91150f8 /src/apscheduler/datastores | |
parent | 200c5713193c011e5b230c3c20afe31f261c8291 (diff) | |
download | apscheduler-8b68b6c5d1c63faae1ba3769b6475b396328e3a3.tar.gz |
Added scheduler methods for creating jobs directly w/o schedules
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 6 | ||||
-rw-r--r-- | src/apscheduler/datastores/memory.py | 12 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 13 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 14 |
4 files changed, 39 insertions, 6 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index b15b154..92ba02b 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -19,7 +19,7 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded, + DataStoreEvent, JobAcquired, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError @@ -368,6 +368,10 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): where(self.t_tasks.c.id == p_id) await conn.execute(update, params) + # Publish the appropriate events + for job in acquired_jobs: + await self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 98306a2..96514ca 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -13,7 +13,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) + JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, + TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, TaskLookupError from ..structures import JobResult, Task from ..util import reentrant @@ -261,6 +262,10 @@ class MemoryDataStore(DataStore): if len(jobs) == limit: break + # Publish the appropriate events + for job in jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -278,5 +283,10 @@ class MemoryDataStore(DataStore): # Record the result self._job_results[result.job_id] = result + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: return self._job_results.pop(job_id, None) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index e5b8283..5066c56 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, - TaskRemoved, TaskUpdated) + DataStoreEvent, JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, + ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ( ConflictingIdError, DeserializationError, SerializationError, TaskLookupError) from ..serializers.pickle import PickleSerializer @@ -336,6 +336,10 @@ class MongoDBDataStore(DataStore): session=session ) + # Publish the appropriate events + for job in acquired_jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -355,6 +359,11 @@ class MongoDBDataStore(DataStore): # Delete the job self._jobs.delete_one({'_id': result.job_id}, session=session) + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: document = self._jobs_results.find_one_and_delete({'_id': job_id}) if document: diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 8dea821..31e60cc 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -20,8 +20,9 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome from ..eventbrokers.local import LocalEventBroker from ..events import ( - Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, - ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) + Event, JobAcquired, JobAdded, JobDeserializationFailed, JobReleased, ScheduleAdded, + ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, + TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..serializers.pickle import PickleSerializer @@ -485,6 +486,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): where(self.t_tasks.c.id == p_id) conn.execute(update, params) + # Publish the appropriate events + for job in acquired_jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -504,6 +509,11 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id) conn.execute(delete) + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: with self.engine.begin() as conn: # Retrieve the result |