summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:44 +0300
commit8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (patch)
tree2a172cf3bed738b4ba969cf08c86f7a8e91150f8 /src/apscheduler/datastores
parent200c5713193c011e5b230c3c20afe31f261c8291 (diff)
downloadapscheduler-8b68b6c5d1c63faae1ba3769b6475b396328e3a3.tar.gz
Added scheduler methods for creating jobs directly w/o schedules
Diffstat (limited to 'src/apscheduler/datastores')
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py6
-rw-r--r--src/apscheduler/datastores/memory.py12
-rw-r--r--src/apscheduler/datastores/mongodb.py13
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py14
4 files changed, 39 insertions, 6 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index b15b154..92ba02b 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -19,7 +19,7 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded,
+ DataStoreEvent, JobAcquired, JobAdded, JobDeserializationFailed, ScheduleAdded,
ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
@@ -368,6 +368,10 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
where(self.t_tasks.c.id == p_id)
await conn.execute(update, params)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ await self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index 98306a2..96514ca 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -13,7 +13,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
+ JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
+ TaskRemoved, TaskUpdated)
from ..exceptions import ConflictingIdError, TaskLookupError
from ..structures import JobResult, Task
from ..util import reentrant
@@ -261,6 +262,10 @@ class MemoryDataStore(DataStore):
if len(jobs) == limit:
break
+ # Publish the appropriate events
+ for job in jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -278,5 +283,10 @@ class MemoryDataStore(DataStore):
# Record the result
self._job_results[result.job_id] = result
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
return self._job_results.pop(job_id, None)
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index e5b8283..5066c56 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
- TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved,
+ ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
from ..exceptions import (
ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
from ..serializers.pickle import PickleSerializer
@@ -336,6 +336,10 @@ class MongoDBDataStore(DataStore):
session=session
)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -355,6 +359,11 @@ class MongoDBDataStore(DataStore):
# Delete the job
self._jobs.delete_one({'_id': result.job_id}, session=session)
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
document = self._jobs_results.find_one_and_delete({'_id': job_id})
if document:
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 8dea821..31e60cc 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -20,8 +20,9 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed,
- ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
+ Event, JobAcquired, JobAdded, JobDeserializationFailed, JobReleased, ScheduleAdded,
+ ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
+ TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
@@ -485,6 +486,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
where(self.t_tasks.c.id == p_id)
conn.execute(update, params)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -504,6 +509,11 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
conn.execute(delete)
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
with self.engine.begin() as conn:
# Retrieve the result