summaryrefslogtreecommitdiff
path: root/buildscripts/cost_model
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2022-08-30 13:36:52 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-30 14:26:01 +0000
commitce2dbe9a4a3baaf2a346fd517ca82937100804e6 (patch)
treeb30d173b61b52f4648c8611645103dd84199a4d6 /buildscripts/cost_model
parentbc38590591e0c5509f73482c2431ca28e8f882e8 (diff)
downloadmongo-ce2dbe9a4a3baaf2a346fd517ca82937100804e6.tar.gz
SERVER-67223 Optimize writing to MongoDB data generator
Diffstat (limited to 'buildscripts/cost_model')
-rw-r--r--buildscripts/cost_model/abt_calibrator.py5
-rw-r--r--buildscripts/cost_model/data_generator.py68
-rw-r--r--buildscripts/cost_model/database_instance.py63
-rw-r--r--buildscripts/cost_model/parameters_extractor.py7
-rw-r--r--buildscripts/cost_model/start.py17
-rw-r--r--buildscripts/cost_model/workload_execution.py29
6 files changed, 103 insertions, 86 deletions
diff --git a/buildscripts/cost_model/abt_calibrator.py b/buildscripts/cost_model/abt_calibrator.py
index 8a32889a231..06e949c0d8b 100644
--- a/buildscripts/cost_model/abt_calibrator.py
+++ b/buildscripts/cost_model/abt_calibrator.py
@@ -37,14 +37,15 @@ from parameters_extractor import extract_parameters
__all__ = ['calibrate']
-def calibrate(config: AbtCalibratorConfig, database: DatabaseInstance, abt_types: Sequence[str]):
+async def calibrate(config: AbtCalibratorConfig, database: DatabaseInstance,
+ abt_types: Sequence[str]):
"""Main entry-point for ABT calibration."""
if not config.enabled:
return {}
result = {}
- stats = extract_parameters(config, database, abt_types)
+ stats = await extract_parameters(config, database, abt_types)
for abt, abt_stats in stats.items():
result[abt] = estimate(abt_stats, config.test_size, config.trace)
return result
diff --git a/buildscripts/cost_model/data_generator.py b/buildscripts/cost_model/data_generator.py
index 5f4601f5fb1..83dc7cb929e 100644
--- a/buildscripts/cost_model/data_generator.py
+++ b/buildscripts/cost_model/data_generator.py
@@ -33,11 +33,12 @@ from importlib.metadata import distribution
import time
import random
from typing import Sequence
+import asyncio
import pymongo
-from pymongo import InsertOne, IndexModel
-from pymongo.collection import Collection
+from pymongo import IndexModel
+from motor.motor_asyncio import AsyncIOMotorCollection
+from motor.motor_asyncio import AsyncIOMotorDatabase
from random_generator import RandomDistribution
-from common import timer_decorator
from config import DataGeneratorConfig, DataType
from database_instance import DatabaseInstance
from random_generator_config import distributions
@@ -81,7 +82,7 @@ class DataGenerator:
self.collection_infos = list(self._generate_collection_infos())
- def populate_collections(self) -> None:
+ async def populate_collections(self) -> None:
"""Create and populate collections for each combination of size and data type in the corresponding 'docCounts' and 'dataTypes' input arrays.
All collections have the same schema defined by one of the elements of 'collFields'.
@@ -90,14 +91,18 @@ class DataGenerator:
if not self.config.enabled:
return
- self.database.enable_cascades(False)
+ await self.database.enable_cascades(False)
t0 = time.time()
+ tasks = []
for coll_info in self.collection_infos:
- coll = self.database.database.get_collection(coll_info.name)
- coll.drop()
- self._populate_collection(coll, coll_info)
- create_single_field_indexes(coll, coll_info.fields)
- create_compound_indexes(coll, coll_info)
+ coll = self.database.database[coll_info.name]
+ await coll.drop()
+ tasks.append(asyncio.create_task(self._populate_collection(coll, coll_info)))
+ tasks.append(asyncio.create_task(create_single_field_indexes(coll, coll_info.fields)))
+ tasks.append(asyncio.create_task(create_compound_indexes(coll, coll_info)))
+
+ for task in tasks:
+ await task
t1 = time.time()
print(f'\npopulate Collections took {t1-t0} s.')
@@ -114,21 +119,27 @@ class DataGenerator:
yield CollectionInfo(name=name, fields=fields, documents_count=doc_count,
compound_indexes=coll_template.compound_indexes)
- @timer_decorator
- def _populate_collection(self, coll: Collection, coll_info: CollectionInfo) -> None:
+ async def _populate_collection(self, coll: AsyncIOMotorCollection,
+ coll_info: CollectionInfo) -> None:
print(f'\nGenerating ${coll_info.name} ...')
batch_size = self.config.batch_size
+ tasks = []
for _ in range(coll_info.documents_count // batch_size):
- populate_batch(coll, batch_size, coll_info.fields)
+ tasks.append(asyncio.create_task(populate_batch(coll, batch_size, coll_info.fields)))
if coll_info.documents_count % batch_size > 0:
- populate_batch(coll, coll_info.documents_count % batch_size, coll_info.fields)
+ tasks.append(
+ asyncio.create_task(
+ populate_batch(coll, coll_info.documents_count % batch_size, coll_info.fields)))
+
+ for task in tasks:
+ await task
-def populate_batch(coll: Collection, documents_count: int, fields: Sequence[FieldInfo]) -> None:
+async def populate_batch(coll: AsyncIOMotorCollection, documents_count: int,
+ fields: Sequence[FieldInfo]) -> None:
"""Generate collection data and write it to the collection."""
- requests = [InsertOne(doc) for doc in generate_collection_data(documents_count, fields)]
- coll.bulk_write(requests, ordered=False)
+ await coll.insert_many(generate_collection_data(documents_count, fields), ordered=False)
def generate_collection_data(documents_count: int, fields: Sequence[FieldInfo]):
@@ -141,30 +152,29 @@ def generate_collection_data(documents_count: int, fields: Sequence[FieldInfo]):
return documents
-def create_single_field_indexes(coll: Collection, fields: Sequence[FieldInfo]) -> None:
+async def create_single_field_indexes(coll: AsyncIOMotorCollection,
+ fields: Sequence[FieldInfo]) -> None:
"""Create single-fields indexes on the given collection."""
- t0 = time.time()
-
indexes = [IndexModel([(field.name, pymongo.ASCENDING)]) for field in fields if field.indexed]
if len(indexes) > 0:
- coll.create_indexes(indexes)
+ await coll.create_indexes(indexes)
- t1 = time.time()
- print(f'createSingleFieldIndexes took {t1 - t0} s.')
+ index_spec = [(field.name, pymongo.ASCENDING) for field in fields]
+ print(f'create_single_field_indexes done. {index_spec}')
-def create_compound_indexes(coll: Collection, coll_info: CollectionInfo) -> None:
- """Create a coumpound indexes on the given collection."""
- t0 = time.time()
+async def create_compound_indexes(coll: AsyncIOMotorCollection, coll_info: CollectionInfo) -> None:
+ """Create a coumpound indexes on the given collection."""
indexes_spec = []
+ index_specs = []
for compound_index in coll_info.compound_indexes:
index_spec = IndexModel([(field, pymongo.ASCENDING) for field in compound_index])
indexes_spec.append(index_spec)
+ index_specs.append([(field, pymongo.ASCENDING) for field in compound_index])
if len(indexes_spec) > 0:
- coll.create_indexes(indexes_spec)
+ await coll.create_indexes(indexes_spec)
- t1 = time.time()
- print(f'createCompoundIndex took {t1 - t0} s.')
+ print(f'createCompoundIndex done. {index_specs}')
diff --git a/buildscripts/cost_model/database_instance.py b/buildscripts/cost_model/database_instance.py
index 7fd271709d0..6d1323f8d8b 100644
--- a/buildscripts/cost_model/database_instance.py
+++ b/buildscripts/cost_model/database_instance.py
@@ -30,7 +30,7 @@
from __future__ import annotations
from typing import Sequence, Mapping, NewType, Any
import subprocess
-from pymongo import MongoClient, InsertOne
+from motor.motor_asyncio import AsyncIOMotorClient
from config import DatabaseConfig, RestoreMode
__all__ = ['DatabaseInstance', 'Pipeline']
@@ -44,13 +44,13 @@ class DatabaseInstance:
def __init__(self, config: DatabaseConfig) -> None:
"""Initialize wrapper."""
self.config = config
- self.client = MongoClient(config.connection_string)
- self.database = self.client.get_database(config.database_name)
+ self.client = AsyncIOMotorClient(config.connection_string)
+ self.database = self.client[config.database_name]
def __enter__(self):
if self.config.restore_from_dump == RestoreMode.ALWAYS or (
self.config.restore_from_dump == RestoreMode.ONLY_NEW
- and self.config.database_name not in self.client.database_names()):
+ and self.config.database_name not in self.client.list_database_names()):
self.restore()
return self
@@ -59,9 +59,9 @@ class DatabaseInstance:
self.enable_cascades(False)
self.dump()
- def drop(self):
+ async def drop(self):
"""Drop the database."""
- self.client.drop_database(self.config.database_name)
+ await self.client.drop_database(self.config.database_name)
def restore(self):
"""Restore the database from the 'self.dump_directory'."""
@@ -73,69 +73,68 @@ class DatabaseInstance:
subprocess.run(['mongodump', '--db', self.config.database_name], cwd=self.config.dump_path,
check=True)
- def enable_sbe(self, state: bool) -> None:
+ async def enable_sbe(self, state: bool) -> None:
"""Enable new query execution engine. Throw pymongo.errors.OperationFailure in case of failure."""
- self.client.admin.command({
+ await self.client.admin.command({
'setParameter': 1,
'internalQueryFrameworkControl': 'trySbeEngine' if state else 'forceClassicEngine'
})
- def enable_cascades(self, state: bool) -> None:
+ async def enable_cascades(self, state: bool) -> None:
"""Enable new query optimizer. Requires featureFlagCommonQueryFramework set to True."""
- self.client.admin.command(
+ await self.client.admin.command(
{'configureFailPoint': 'enableExplainInBonsai', 'mode': 'alwaysOn'})
- self.client.admin.command({
+ await self.client.admin.command({
'setParameter': 1,
'internalQueryFrameworkControl': 'tryBonsai' if state else 'trySbeEngine'
})
- def explain(self, collection_name: str, pipeline: Pipeline) -> dict[str, any]:
+ async def explain(self, collection_name: str, pipeline: Pipeline) -> dict[str, any]:
"""Return explain for the given pipeline."""
- return self.database.command(
+ return await self.database.command(
'explain', {'aggregate': collection_name, 'pipeline': pipeline, 'cursor': {}},
verbosity='executionStats')
- def hide_index(self, collection_name: str, index_name: str) -> None:
+ async def hide_index(self, collection_name: str, index_name: str) -> None:
"""Hide the given index from the query optimizer."""
- self.database.command(
+ await self.database.command(
{'collMod': collection_name, 'index': {'name': index_name, 'hidden': True}})
- def unhide_index(self, collection_name: str, index_name: str) -> None:
+ async def unhide_index(self, collection_name: str, index_name: str) -> None:
"""Make the given index visible for the query optimizer."""
- self.database.command(
+ await self.database.command(
{'collMod': collection_name, 'index': {'name': index_name, 'hidden': False}})
- def hide_all_indexes(self, collection_name: str) -> None:
+ async def hide_all_indexes(self, collection_name: str) -> None:
"""Hide all indexes of the given collection from the query optimizer."""
for index in self.database[collection_name].list_indexes():
if index['name'] != '_id_':
- self.hide_index(collection_name, index['name'])
+ await self.hide_index(collection_name, index['name'])
- def unhide_all_indexes(self, collection_name: str) -> None:
+ async def unhide_all_indexes(self, collection_name: str) -> None:
"""Make all indexes of the given collection visible fpr the query optimizer."""
for index in self.database[collection_name].list_indexes():
if index['name'] != '_id_':
- self.unhide_index(collection_name, index['name'])
+ await self.unhide_index(collection_name, index['name'])
- def drop_collection(self, collection_name: str) -> None:
+ async def drop_collection(self, collection_name: str) -> None:
"""Drop collection."""
- self.database[collection_name].drop()
+ await self.database[collection_name].drop()
- def insert_many(self, collection_name: str, docs: Sequence[Mapping[str, any]]) -> None:
+ async def insert_many(self, collection_name: str, docs: Sequence[Mapping[str, any]]) -> None:
"""Insert documents into the collection with the given name."""
- requests = [InsertOne(doc) for doc in docs]
- self.database[collection_name].bulk_write(requests, ordered=False)
+ await self.database[collection_name].insert_many(docs, ordered=False)
- def get_all_documents(self, collection_name: str):
+ async def get_all_documents(self, collection_name: str):
"""Get all documents from the collection with the given name."""
- return self.database[collection_name].find({})
+ return await self.database[collection_name].find({}).to_list(length=None)
- def get_stats(self, collection_name: str):
+ async def get_stats(self, collection_name: str):
"""Get collection statistics."""
- return self.database.command('collstats', collection_name)
+ return await self.database.command('collstats', collection_name)
- def get_average_document_size(self, collection_name: str) -> float:
+ async def get_average_document_size(self, collection_name: str) -> float:
"""Get average document size for the given collection."""
- stats = self.get_stats(collection_name)
+ stats = await self.get_stats(collection_name)
avg_size = stats.get('avgObjSize')
return avg_size if avg_size is not None else 0
diff --git a/buildscripts/cost_model/parameters_extractor.py b/buildscripts/cost_model/parameters_extractor.py
index 9bdbceabff7..e05874e962e 100644
--- a/buildscripts/cost_model/parameters_extractor.py
+++ b/buildscripts/cost_model/parameters_extractor.py
@@ -41,13 +41,14 @@ import physical_tree
__all__ = ['extract_parameters']
-def extract_parameters(config: AbtCalibratorConfig, database: DatabaseInstance,
- abt_types: Sequence[str]) -> Mapping[str, Sequence[ModelParameters]]:
+async def extract_parameters(config: AbtCalibratorConfig, database: DatabaseInstance,
+ abt_types: Sequence[str]) -> Mapping[str, Sequence[ModelParameters]]:
"""Read measurements from database and extract cost model parameters for the given ABT types."""
stats = defaultdict(list)
- for result in database.get_all_documents(config.input_collection_name):
+ docs = await database.get_all_documents(config.input_collection_name)
+ for result in docs:
explain = json.loads(result['explain'])
query_parameters = QueryParameters.from_json(result['query_parameters'])
res = parse_explain(explain, abt_types)
diff --git a/buildscripts/cost_model/start.py b/buildscripts/cost_model/start.py
index 4ed4de0897c..254e7bcce55 100644
--- a/buildscripts/cost_model/start.py
+++ b/buildscripts/cost_model/start.py
@@ -31,6 +31,7 @@ import dataclasses
import os
import csv
import json
+import asyncio
from typing import Mapping, Sequence
from cost_estimator import ExecutionStats, ModelParameters
from data_generator import DataGenerator
@@ -63,7 +64,7 @@ def save_to_csv(parameters: Mapping[str, Sequence[ModelParameters]], filepath: s
writer.writerow(fields)
-def main():
+async def main():
"""Entry point function."""
script_directory = os.path.abspath(os.path.dirname(__file__))
os.chdir(script_directory)
@@ -77,7 +78,7 @@ def main():
# 2. Data generation (optional), generates random data and populates collections with it.
generator = DataGenerator(database, config.data_generator)
- generator.populate_collections()
+ await generator.populate_collections()
# 3. Collecting data for calibration (optional).
# It runs the pipelines and stores explains to the database.
@@ -89,24 +90,26 @@ def main():
Query(pipeline=[{'$match': {f'choice{i}': val}}],
keys_length_in_bytes=keys_length))
- workload_execution.execute(database, config.workload_execution, generator.collection_infos,
- requests)
+ await workload_execution.execute(database, config.workload_execution,
+ generator.collection_infos, requests)
# Calibration phase (optional).
# Reads the explains stored on the previous step (this run and/or previous runs),
# aparses the explains, nd calibrates the cost model for the ABT nodes.
- models = abt_calibrator.calibrate(
+ models = await abt_calibrator.calibrate(
config.abt_calibrator, database,
['IndexScan', 'Seek', 'PhysicalScan', 'ValueScan', 'CoScan', 'Scan'])
for abt, model in models.items():
print(abt)
print(model)
- parameters = parameters_extractor.extract_parameters(config.abt_calibrator, database, [])
+ parameters = await parameters_extractor.extract_parameters(config.abt_calibrator, database,
+ [])
save_to_csv(parameters, 'parameters.csv')
print("DONE!")
if __name__ == '__main__':
- main()
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(main())
diff --git a/buildscripts/cost_model/workload_execution.py b/buildscripts/cost_model/workload_execution.py
index 956d9b03bc3..4fa18699210 100644
--- a/buildscripts/cost_model/workload_execution.py
+++ b/buildscripts/cost_model/workload_execution.py
@@ -64,15 +64,16 @@ class QueryParameters:
return QueryParameters(**json.loads(json_str))
-def execute(database: DatabaseInstance, config: WorkloadExecutionConfig,
- collection_infos: Sequence[CollectionInfo], queries: Sequence[Query]):
+async def execute(database: DatabaseInstance, config: WorkloadExecutionConfig,
+ collection_infos: Sequence[CollectionInfo], queries: Sequence[Query]):
"""Run the given queries and write the collected explain into collection."""
if not config.enabled:
return
collector = WorkloadExecution(database, config)
+ await collector.async_init()
print('>>> running queries')
- collector.collect(collection_infos, queries)
+ await collector.collect(collection_infos, queries)
class WorkloadExecution:
@@ -82,13 +83,15 @@ class WorkloadExecution:
self.database = database
self.config = config
- self.database.enable_sbe(True)
- self.database.enable_cascades(True)
+ async def async_init(self):
+ """Initialize the database settings."""
+ await self.database.enable_sbe(True)
+ await self.database.enable_cascades(True)
if self.config.write_mode == WriteMode.REPLACE:
- self.database.drop_collection(self.config.output_collection_name)
+ await self.database.drop_collection(self.config.output_collection_name)
- def collect(self, collection_infos: Sequence[CollectionInfo], queries: Sequence[Query]):
+ async def collect(self, collection_infos: Sequence[CollectionInfo], queries: Sequence[Query]):
"""Run the given piplelines on the given collection to generate and collect execution statistics."""
measurements = []
@@ -96,21 +99,21 @@ class WorkloadExecution:
print(f'\n>>>>> running queries on collection {coll_info.name}')
for query in queries:
print(f'>>>>>>> running query {query.pipeline}')
- self._run_query(coll_info, query, measurements)
+ await self._run_query(coll_info, query, measurements)
- self.database.insert_many(self.config.output_collection_name, measurements)
+ await self.database.insert_many(self.config.output_collection_name, measurements)
- def _run_query(self, coll_info: CollectionInfo, query: Query, result: Sequence):
+ async def _run_query(self, coll_info: CollectionInfo, query: Query, result: Sequence):
# warm up
for _ in range(self.config.warmup_runs):
- self.database.explain(coll_info.name, query.pipeline)
+ await self.database.explain(coll_info.name, query.pipeline)
run_id = ObjectId()
- avg_doc_size = self.database.get_average_document_size(coll_info.name)
+ avg_doc_size = await self.database.get_average_document_size(coll_info.name)
parameters = QueryParameters(keys_length_in_bytes=query.keys_length_in_bytes,
average_document_size_in_bytes=avg_doc_size)
for _ in range(self.config.runs):
- explain = self.database.explain(coll_info.name, query.pipeline)
+ explain = await self.database.explain(coll_info.name, query.pipeline)
if explain['ok'] == 1:
result.append({
'run_id': run_id, 'collection': coll_info.name,