From 642d0ee75d7bd5b475cdbd86efcefec7d026dbb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 23 Sep 2022 00:29:05 +0300 Subject: Fixed MongoDB data store retrying synchronously --- src/apscheduler/datastores/mongodb.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 4d47878..0846e14 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -132,7 +132,7 @@ class MongoDBDataStore(BaseExternalDataStore): self._jobs_results.create_index("expires_at", session=session) async def add_task(self, task: Task) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: previous = self._tasks.find_one_and_update( {"_id": task.id}, @@ -149,7 +149,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(TaskAdded(task_id=task.id)) async def remove_task(self, task_id: str) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: if not self._tasks.find_one_and_delete({"_id": task_id}): raise TaskLookupError(task_id) @@ -157,7 +157,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(TaskRemoved(task_id=task_id)) async def get_task(self, task_id: str) -> Task: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._tasks.find_one( {"_id": task_id}, projection=self._task_attrs @@ -171,7 +171,7 @@ class MongoDBDataStore(BaseExternalDataStore): return task async def get_tasks(self) -> list[Task]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: tasks: list[Task] = [] for document in self._tasks.find( @@ -184,7 +184,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def get_schedules(self, ids: set[str] | None = None) -> list[Schedule]: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: schedules: list[Schedule] = [] cursor = self._schedules.find(filters).sort("_id") @@ -209,14 +209,14 @@ class MongoDBDataStore(BaseExternalDataStore): document = schedule.marshal(self.serializer) document["_id"] = document.pop("id") try: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._schedules.insert_one(document) except DuplicateKeyError: if conflict_policy is ConflictPolicy.exception: raise ConflictingIdError(schedule.id) from None elif conflict_policy is ConflictPolicy.replace: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._schedules.replace_one( {"_id": schedule.id}, document, True @@ -234,7 +234,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def remove_schedules(self, ids: Iterable[str]) -> None: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: cursor = self._schedules.find( filters, projection=["_id"], session=session @@ -247,7 +247,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id)) async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: schedules: list[Schedule] = [] cursor = ( @@ -324,7 +324,7 @@ class MongoDBDataStore(BaseExternalDataStore): finished_schedule_ids.append(schedule.id) if requests: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: self._schedules.bulk_write( requests, ordered=False, session=session @@ -340,7 +340,7 @@ class MongoDBDataStore(BaseExternalDataStore): await self._event_broker.publish(ScheduleRemoved(schedule_id=schedule_id)) async def get_next_schedule_run_time(self) -> datetime | None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._schedules.find_one( {"next_run_time": {"$ne": None}}, @@ -356,7 +356,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def add_job(self, job: Job) -> None: document = job.marshal(self.serializer) document["_id"] = document.pop("id") - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: self._jobs.insert_one(document) @@ -370,7 +370,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def get_jobs(self, ids: Iterable[UUID] | None = None) -> list[Job]: filters = {"_id": {"$in": list(ids)}} if ids is not None else {} - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: jobs: list[Job] = [] cursor = self._jobs.find(filters).sort("_id") @@ -389,7 +389,7 @@ class MongoDBDataStore(BaseExternalDataStore): return jobs async def acquire_jobs(self, worker_id: str, limit: int | None = None) -> list[Job]: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: cursor = self._jobs.find( { @@ -466,7 +466,7 @@ class MongoDBDataStore(BaseExternalDataStore): async def release_job( self, worker_id: str, task_id: str, result: JobResult ) -> None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt, self.client.start_session() as session: # Record the job result if result.expires_at > result.finished_at: @@ -483,7 +483,7 @@ class MongoDBDataStore(BaseExternalDataStore): self._jobs.delete_one({"_id": result.job_id}, session=session) async def get_job_result(self, job_id: UUID) -> JobResult | None: - for attempt in self._retry(): + async for attempt in self._retry(): with attempt: document = self._jobs_results.find_one_and_delete({"_id": job_id}) -- cgit v1.2.1