diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-12 22:09:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | c5727432736b55b7d76753307f14efdb962c2edf (patch) | |
tree | 005bd129694b56bd601d65c4cdf43828cfcd4381 /src/apscheduler/datastores/memory.py | |
parent | 26c4db062145fcb4f623ecfda96c42ce2414e8e1 (diff) | |
download | apscheduler-c5727432736b55b7d76753307f14efdb962c2edf.tar.gz |
Major refactoring
- Made SyncScheduler a synchronous wrapper for AsyncScheduler
- Removed workers as a user interface
- Removed synchronous interfaces for data stores and event brokers and refactored existing implementations to use the async interface
- Added the current_async_scheduler contextvar
- Added job executors
Diffstat (limited to 'src/apscheduler/datastores/memory.py')
-rw-r--r-- | src/apscheduler/datastores/memory.py | 62 |
1 files changed, 33 insertions, 29 deletions
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index a9ff3cb..fd7e90e 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -85,13 +85,9 @@ class MemoryDataStore(BaseDataStore): """ Stores scheduler data in memory, without serializing it. - Can be shared between multiple schedulers and workers within the same event loop. - - :param lock_expiration_delay: maximum amount of time (in seconds) that a scheduler - or worker can keep a lock on a schedule or task + Can be shared between multiple schedulers within the same event loop. """ - lock_expiration_delay: float = 30 _tasks: dict[str, TaskState] = attrs.Factory(dict) _schedules: list[ScheduleState] = attrs.Factory(list) _schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict) @@ -115,41 +111,43 @@ class MemoryDataStore(BaseDataStore): right_index = bisect_left(self._jobs, state) return self._jobs.index(state, left_index, right_index + 1) - def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: + async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: return [ state.schedule for state in self._schedules if ids is None or state.schedule.id in ids ] - def add_task(self, task: Task) -> None: + async def add_task(self, task: Task) -> None: task_exists = task.id in self._tasks self._tasks[task.id] = TaskState(task) if task_exists: - self._events.publish(TaskUpdated(task_id=task.id)) + await self._event_broker.publish(TaskUpdated(task_id=task.id)) else: - self._events.publish(TaskAdded(task_id=task.id)) + await self._event_broker.publish(TaskAdded(task_id=task.id)) - def remove_task(self, task_id: str) -> None: + async def remove_task(self, task_id: str) -> None: try: del self._tasks[task_id] except KeyError: raise TaskLookupError(task_id) from None - self._events.publish(TaskRemoved(task_id=task_id)) + await self._event_broker.publish(TaskRemoved(task_id=task_id)) - def get_task(self, task_id: str) -> Task: + async def get_task(self, task_id: str) -> Task: try: return self._tasks[task_id].task except KeyError: raise TaskLookupError(task_id) from None - def get_tasks(self) -> list[Task]: + async def get_tasks(self) -> list[Task]: return sorted( (state.task for state in self._tasks.values()), key=lambda task: task.id ) - def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: + async def add_schedule( + self, schedule: Schedule, conflict_policy: ConflictPolicy + ) -> None: old_state = self._schedules_by_id.get(schedule.id) if old_state is not None: if conflict_policy is ConflictPolicy.do_nothing: @@ -175,17 +173,17 @@ class MemoryDataStore(BaseDataStore): schedule_id=schedule.id, next_fire_time=schedule.next_fire_time ) - self._events.publish(event) + await self._event_broker.publish(event) - def remove_schedules(self, ids: Iterable[str]) -> None: + async def remove_schedules(self, ids: Iterable[str]) -> None: for schedule_id in ids: state = self._schedules_by_id.pop(schedule_id, None) if state: self._schedules.remove(state) event = ScheduleRemoved(schedule_id=state.schedule.id) - self._events.publish(event) + await self._event_broker.publish(event) - def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: + async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: now = datetime.now(timezone.utc) schedules: list[Schedule] = [] for state in self._schedules: @@ -206,7 +204,9 @@ class MemoryDataStore(BaseDataStore): return schedules - def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: + async def release_schedules( + self, scheduler_id: str, schedules: list[Schedule] + ) -> None: # Send update events for schedules that have a next time finished_schedule_ids: list[str] = [] for s in schedules: @@ -224,17 +224,17 @@ class MemoryDataStore(BaseDataStore): event = ScheduleUpdated( schedule_id=s.id, next_fire_time=s.next_fire_time ) - self._events.publish(event) + await self._event_broker.publish(event) else: finished_schedule_ids.append(s.id) # Remove schedules that didn't get a new next fire time - self.remove_schedules(finished_schedule_ids) + await self.remove_schedules(finished_schedule_ids) - def get_next_schedule_run_time(self) -> datetime | None: + async def get_next_schedule_run_time(self) -> datetime | None: return self._schedules[0].next_fire_time if self._schedules else None - def add_job(self, job: Job) -> None: + async def add_job(self, job: Job) -> None: state = JobState(job) self._jobs.append(state) self._jobs_by_id[job.id] = state @@ -246,15 +246,15 @@ class MemoryDataStore(BaseDataStore): schedule_id=job.schedule_id, tags=job.tags, ) - self._events.publish(event) + await self._event_broker.publish(event) - def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]: + async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]: if ids is not None: ids = frozenset(ids) return [state.job for state in self._jobs if ids is None or state.job.id in ids] - def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]: + async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]: now = datetime.now(timezone.utc) jobs: list[Job] = [] for _index, job_state in enumerate(self._jobs): @@ -290,11 +290,15 @@ class MemoryDataStore(BaseDataStore): # Publish the appropriate events for job in jobs: - self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + await self._event_broker.publish( + JobAcquired(job_id=job.id, worker_id=worker_id) + ) return jobs - def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: + async def release_job( + self, worker_id: str, task_id: str, result: JobResult + ) -> None: # Record the job result if result.expires_at > result.finished_at: self._job_results[result.job_id] = result @@ -310,5 +314,5 @@ class MemoryDataStore(BaseDataStore): index = self._find_job_index(job_state) del self._jobs[index] - def get_job_result(self, job_id: UUID) -> JobResult | None: + async def get_job_result(self, job_id: UUID) -> JobResult | None: return self._job_results.pop(job_id, None) |