diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-23 00:29:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-24 00:05:30 +0300 |
commit | 642d0ee75d7bd5b475cdbd86efcefec7d026dbb3 (patch) | |
tree | 246936c5aa962734d4a7bcd18437389c883256dd /src/apscheduler | |
parent | 631c78e7161cfe8457bf1121eb355e3a3d19c35d (diff) | |
download | apscheduler-642d0ee75d7bd5b475cdbd86efcefec7d026dbb3.tar.gz |
Fixed MongoDB data store retrying synchronously
Diffstat (limited to 'src/apscheduler')
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 4d47878..0846e14 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -132,7 +132,7 @@ class MongoDBDataStore(BaseExternalDataStore): self._jobs_results.create_index("expires_at", session=session) async def add_task(self, task: Task) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: previous = self._tasks.find_one_and_update( {"_id": task.id}, @@ -149,7 +149,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(TaskAdded(task_id=task.id)) async def remove_task(self, task_id: str) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: if not self._tasks.find_one_and_delete({"_id": task_id}): raise TaskLookupError(task_id) @@ -157,7 +157,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(TaskRemoved(task_id=task_id)) async def get_task(self, task_id: str) -> Task: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._tasks.find_one( {"_id": task_id}, projection=self._task_attrs @@ -171,7 +171,7 @@ class MongoDBDataStore(BaseExternalDataStore): return task async def get_tasks(self) -> list[Task]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: tasks: list[Task] = [] for document in self._tasks.find( @@ -184,7 +184,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: schedules: list[Schedule] = [] cursor = self._schedules.find(filters).sort("_id") @@ -209,14 +209,14 @@ class MongoDBDataStore(BaseExternalDataStore): document = schedule.marshal(self.serializer) document["_id"] = document.pop("id") try: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._schedules.insert_one(document) except DuplicateKeyError: if conflict_policy is ConflictPolicy.exception: raise ConflictingIdError(schedule.id) from None elif conflict_policy is ConflictPolicy.replace: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._schedules.replace_one( {"_id": schedule.id}, document, True @@ -234,7 +234,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def remove_schedules(self, ids: Iterable[str]) -> None: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: cursor = self._schedules.find( filters, projection=["_id"], session=session @@ -247,7 +247,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id)) async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: schedules: list[Schedule] = [] cursor = ( @@ -324,7 +324,7 @@ class MongoDBDataStore(BaseExternalDataStore): finished_schedule_ids.append(schedule.id) if requests: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: self._schedules.bulk_write( requests, ordered=False, session=session @@ -340,7 +340,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id)) async def get_next_schedule_run_time(self) -> datetime | None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._schedules.find_one( {"next_run_time": {"$ne": None}}, @@ -356,7 +356,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def add_job(self, job: Job) -> None: document = job.marshal(self.serializer) document["_id"] = document.pop("id") - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._jobs.insert_one(document) @@ -370,7 +370,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: jobs: list[Job] = [] cursor = self._jobs.find(filters).sort("_id") @@ -389,7 +389,7 @@ class MongoDBDataStore(BaseExternalDataStore): return jobs async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: cursor = self._jobs.find( { @@ -466,7 +466,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def release_job( self, worker_id: str, task_id: str, result: JobResult ) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: # Record the job result if result.expires_at > result.finished_at: @@ -483,7 +483,7 @@ class MongoDBDataStore(BaseExternalDataStore): self._jobs.delete_one({"_id": result.job_id}, session=session) async def get_job_result(self, job_id: UUID) -> JobResult | None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._jobs_results.find_one_and_delete({"_id": job_id}) |