From 7c7faaa69f9b84945f9eb02ce09af65c3b9d6a15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 24 Sep 2022 00:05:17 +0300 Subject: Fixed MongoDB data store running blocking operations on the event loop thread --- src/apscheduler/datastores/mongodb.py | 59 +++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 0846e14..1846353 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -9,6 +9,7 @@ from uuid import UUID import attrs import pymongo +from anyio import to_thread from attrs.validators import instance_of from bson import CodecOptions, UuidRepresentation from bson.codec_options import TypeEncoder, TypeRegistry @@ -105,11 +106,26 @@ class MongoDBDataStore(BaseExternalDataStore): client = MongoClient(uri) return cls(client, **options) + def _initialize(self) -> None: + with self.client.start_session() as session: + if self.start_from_scratch: + self._tasks.delete_many({}, session=session) + self._schedules.delete_many({}, session=session) + self._jobs.delete_many({}, session=session) + self._jobs_results.delete_many({}, session=session) + + self._schedules.create_index("next_fire_time", session=session) + self._jobs.create_index("task_id", session=session) + self._jobs.create_index("created_at", session=session) + self._jobs.create_index("tags", session=session) + self._jobs_results.create_index("finished_at", session=session) + self._jobs_results.create_index("expires_at", session=session) + async def start( self, exit_stack: AsyncExitStack, event_broker: EventBroker ) -> None: await super().start(exit_stack, event_broker) - server_info = self.client.server_info() + server_info = await to_thread.run_sync(self.client.server_info) if server_info["versionArray"] < [4, 0]: raise RuntimeError( f"MongoDB server must be at least v4.0; current version = " @@ -117,30 +133,21 @@ class MongoDBDataStore(BaseExternalDataStore): ) async for attempt in self._retry(): - with attempt, self.client.start_session() as session: - if self.start_from_scratch: - self._tasks.delete_many({}, session=session) - self._schedules.delete_many({}, session=session) - self._jobs.delete_many({}, session=session) - self._jobs_results.delete_many({}, session=session) - - self._schedules.create_index("next_fire_time", session=session) - self._jobs.create_index("task_id", session=session) - self._jobs.create_index("created_at", session=session) - self._jobs.create_index("tags", session=session) - self._jobs_results.create_index("finished_at", session=session) - self._jobs_results.create_index("expires_at", session=session) + with attempt: + await to_thread.run_sync(self._initialize) async def add_task(self, task: Task) -> None: async for attempt in self._retry(): with attempt: - previous = self._tasks.find_one_and_update( - {"_id": task.id}, - { - "$set": task.marshal(self.serializer), - "$setOnInsert": {"running_jobs": 0}, - }, - upsert=True, + previous = await to_thread.run_sync( + lambda: self._tasks.find_one_and_update( + {"_id": task.id}, + { + "$set": task.marshal(self.serializer), + "$setOnInsert": {"running_jobs": 0}, + }, + upsert=True, + ) ) if previous: @@ -151,7 +158,9 @@ class MongoDBDataStore(BaseExternalDataStore): async def remove_task(self, task_id: str) -> None: async for attempt in self._retry(): with attempt: - if not self._tasks.find_one_and_delete({"_id": task_id}): + if not await to_thread.run_sync( + self._tasks.find_one_and_delete, {"_id": task_id} + ): raise TaskLookupError(task_id) await self._event_broker.publish(TaskRemoved(task_id=task_id)) @@ -159,8 +168,10 @@ class MongoDBDataStore(BaseExternalDataStore): async def get_task(self, task_id: str) -> Task: async for attempt in self._retry(): with attempt: - document = self._tasks.find_one( - {"_id": task_id}, projection=self._task_attrs + document = await to_thread.run_sync( + lambda: self._tasks.find_one( + {"_id": task_id}, projection=self._task_attrs + ) ) if not document: -- cgit v1.2.1