summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores/memory.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/datastores/memory.py')
-rw-r--r--src/apscheduler/datastores/memory.py62
1 files changed, 33 insertions, 29 deletions
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index a9ff3cb..fd7e90e 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -85,13 +85,9 @@ class MemoryDataStore(BaseDataStore):
"""
Stores scheduler data in memory, without serializing it.
- Can be shared between multiple schedulers and workers within the same event loop.
-
- :param lock_expiration_delay: maximum amount of time (in seconds) that a scheduler
- or worker can keep a lock on a schedule or task
+ Can be shared between multiple schedulers within the same event loop.
"""
- lock_expiration_delay: float = 30
_tasks: dict[str, TaskState] = attrs.Factory(dict)
_schedules: list[ScheduleState] = attrs.Factory(list)
_schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict)
@@ -115,41 +111,43 @@ class MemoryDataStore(BaseDataStore):
right_index = bisect_left(self._jobs, state)
return self._jobs.index(state, left_index, right_index + 1)
- def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
+ async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]:
return [
state.schedule
for state in self._schedules
if ids is None or state.schedule.id in ids
]
- def add_task(self, task: Task) -> None:
+ async def add_task(self, task: Task) -> None:
task_exists = task.id in self._tasks
self._tasks[task.id] = TaskState(task)
if task_exists:
- self._events.publish(TaskUpdated(task_id=task.id))
+ await self._event_broker.publish(TaskUpdated(task_id=task.id))
else:
- self._events.publish(TaskAdded(task_id=task.id))
+ await self._event_broker.publish(TaskAdded(task_id=task.id))
- def remove_task(self, task_id: str) -> None:
+ async def remove_task(self, task_id: str) -> None:
try:
del self._tasks[task_id]
except KeyError:
raise TaskLookupError(task_id) from None
- self._events.publish(TaskRemoved(task_id=task_id))
+ await self._event_broker.publish(TaskRemoved(task_id=task_id))
- def get_task(self, task_id: str) -> Task:
+ async def get_task(self, task_id: str) -> Task:
try:
return self._tasks[task_id].task
except KeyError:
raise TaskLookupError(task_id) from None
- def get_tasks(self) -> list[Task]:
+ async def get_tasks(self) -> list[Task]:
return sorted(
(state.task for state in self._tasks.values()), key=lambda task: task.id
)
- def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ async def add_schedule(
+ self, schedule: Schedule, conflict_policy: ConflictPolicy
+ ) -> None:
old_state = self._schedules_by_id.get(schedule.id)
if old_state is not None:
if conflict_policy is ConflictPolicy.do_nothing:
@@ -175,17 +173,17 @@ class MemoryDataStore(BaseDataStore):
schedule_id=schedule.id, next_fire_time=schedule.next_fire_time
)
- self._events.publish(event)
+ await self._event_broker.publish(event)
- def remove_schedules(self, ids: Iterable[str]) -> None:
+ async def remove_schedules(self, ids: Iterable[str]) -> None:
for schedule_id in ids:
state = self._schedules_by_id.pop(schedule_id, None)
if state:
self._schedules.remove(state)
event = ScheduleRemoved(schedule_id=state.schedule.id)
- self._events.publish(event)
+ await self._event_broker.publish(event)
- def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
+ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
now = datetime.now(timezone.utc)
schedules: list[Schedule] = []
for state in self._schedules:
@@ -206,7 +204,9 @@ class MemoryDataStore(BaseDataStore):
return schedules
- def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None:
+ async def release_schedules(
+ self, scheduler_id: str, schedules: list[Schedule]
+ ) -> None:
# Send update events for schedules that have a next time
finished_schedule_ids: list[str] = []
for s in schedules:
@@ -224,17 +224,17 @@ class MemoryDataStore(BaseDataStore):
event = ScheduleUpdated(
schedule_id=s.id, next_fire_time=s.next_fire_time
)
- self._events.publish(event)
+ await self._event_broker.publish(event)
else:
finished_schedule_ids.append(s.id)
# Remove schedules that didn't get a new next fire time
- self.remove_schedules(finished_schedule_ids)
+ await self.remove_schedules(finished_schedule_ids)
- def get_next_schedule_run_time(self) -> datetime | None:
+ async def get_next_schedule_run_time(self) -> datetime | None:
return self._schedules[0].next_fire_time if self._schedules else None
- def add_job(self, job: Job) -> None:
+ async def add_job(self, job: Job) -> None:
state = JobState(job)
self._jobs.append(state)
self._jobs_by_id[job.id] = state
@@ -246,15 +246,15 @@ class MemoryDataStore(BaseDataStore):
schedule_id=job.schedule_id,
tags=job.tags,
)
- self._events.publish(event)
+ await self._event_broker.publish(event)
- def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
+ async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]:
if ids is not None:
ids = frozenset(ids)
return [state.job for state in self._jobs if ids is None or state.job.id in ids]
- def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]:
+ async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]:
now = datetime.now(timezone.utc)
jobs: list[Job] = []
for _index, job_state in enumerate(self._jobs):
@@ -290,11 +290,15 @@ class MemoryDataStore(BaseDataStore):
# Publish the appropriate events
for job in jobs:
- self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+ await self._event_broker.publish(
+ JobAcquired(job_id=job.id, worker_id=worker_id)
+ )
return jobs
- def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
+ async def release_job(
+ self, worker_id: str, task_id: str, result: JobResult
+ ) -> None:
# Record the job result
if result.expires_at > result.finished_at:
self._job_results[result.job_id] = result
@@ -310,5 +314,5 @@ class MemoryDataStore(BaseDataStore):
index = self._find_job_index(job_state)
del self._jobs[index]
- def get_job_result(self, job_id: UUID) -> JobResult | None:
+ async def get_job_result(self, job_id: UUID) -> JobResult | None:
return self._job_results.pop(job_id, None)