summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Ignatyev <alexander.ignatyev@mongodb.com>2022-09-01 15:05:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-01 15:50:46 +0000
commit3b8505623b8eba4880d28fb69e5bf1385af58a2b (patch)
treef7cc674a83f6f3bfd700c6dd96b0449a27a976e9
parent7da3e551561ee61122d4507660c41db39067e4e1 (diff)
downloadmongo-3b8505623b8eba4880d28fb69e5bf1385af58a2b.tar.gz
SERVER-68984 Implement experiments in Cost Model Framework
-rw-r--r--buildscripts/cost_model/experiment.py204
-rw-r--r--buildscripts/cost_model/parameters_extractor.py11
2 files changed, 213 insertions, 2 deletions
diff --git a/buildscripts/cost_model/experiment.py b/buildscripts/cost_model/experiment.py
new file mode 100644
index 00000000000..7db7bf053fc
--- /dev/null
+++ b/buildscripts/cost_model/experiment.py
@@ -0,0 +1,204 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+"""Experimenation utility functions.
+
+How to use the utility functions.
+
+First of all we need to run Jupiter Notebook:
+sh> python3 -m notebook
+
+Example notebook.
+
+#Imports
+import math
+import seaborn as sns
+import statsmodels.api as sm
+
+import sys
+sys.path.append('/home/ubuntu/mongo/buildscripts/cost_model')
+import experiment as exp
+from config import DatabaseConfig
+from database_instance import DatabaseInstance
+
+# Load data
+database_config = DatabaseConfig(connection_string='mongodb://localhost',
+ database_name='abt_calibration', dump_path='',
+ restore_from_dump=False, dump_on_exit=False)
+
+database = DatabaseInstance(database_config)
+df = await exp.load_calibration_data(database, 'calibrationData')
+
+# Descriptive functions
+df.describe()
+df.head()
+
+# Clean up loaded data
+noout_df = exp.remove_outliers(df, 0.0, 0.90)
+noout_df.describe()
+
+# Extract ABT nodes
+abt_df = exp.extract_abt_nodes(noout_df)
+abt_df.head()
+
+# Get IndexScan nodes only.
+ixscan_df = abt_df[abt_df['abt_type'] == 'IndexScan']
+ixscan_df.describe()
+
+# Add a new column if required.
+ixscan_df = ixscan_df[ixscan_df['n_processed'] > 0].copy()
+ixscan_df['log_n_processed'] = ixscan_df['n_processed'].apply(math.log)
+ixscan_df.describe()
+
+# Check the correlation.
+ixscan_df.corr()
+
+# Print a scatter plot to see a dependency between e.g. execution_time and n_processed.
+sns.scatterplot(x=ixscan_df['n_processed'], y=ixscan_df['execution_time'])
+
+# Calibrate (train) the cost model for IndexScan
+y = ixscan_df['execution_time']
+X = ixscan_df[['n_processed', 'keys_length_in_bytes', 'average_document_size_in_bytes', 'log_n_processed']]
+X = sm.add_constant(X)
+ixscan_lm = sm.OLS(y, X).fit()
+ixscan_lm.summary()
+
+# Draw the predictions.
+y_pred = ixscan_lm.predict(X)
+sns.scatterplot(x=ixscan_df['n_processed'], y=ixscan_df['execution_time'])
+sns.lineplot(x=ixscan_df['n_processed'],y=y_pred, color='red')
+"""
+
+import json
+import dataclasses
+import pandas as pd
+from database_instance import DatabaseInstance
+import execution_tree as sbe
+import physical_tree as abt
+from parameters_extractor import extract_execution_stats
+
+
+async def load_calibration_data(database: DatabaseInstance, collection_name: str) -> pd.DataFrame:
+ """Load workflow data containing explain output from database and parse it. Retuned calibration DataFrame with parsed SBE and ABT."""
+
+ data = await database.get_all_documents(collection_name)
+ df = pd.DataFrame(data)
+ df['sbe'] = df.explain.apply(lambda e: sbe.build_execution_tree(
+ json.loads(e)['executionStats']))
+ df['abt'] = df.explain.apply(lambda e: abt.build(
+ json.loads(e)['queryPlanner']['winningPlan']['optimizerPlan']))
+ df['total_execution_time'] = df.sbe.apply(lambda t: t.total_execution_time)
+ return df
+
+
+def remove_outliers(df: pd.DataFrame, lower_percentile: float = 0.1,
+ upper_percentile: float = 0.9) -> pd.DataFrame:
+ """Remove the outliers from the parsed calibration DataFrame."""
+
+ def is_not_outlier(df_seq):
+ low = df_seq.quantile(lower_percentile)
+ high = df_seq.quantile(upper_percentile)
+ return (df_seq >= low) & (df_seq <= high)
+
+ return df[df.groupby('run_id').total_execution_time.transform(is_not_outlier).eq(1)]
+
+
+def extract_sbe_stages(df: pd.DataFrame) -> pd.DataFrame:
+ """Extract SBE stages from calibration DataFrame."""
+
+ def flatten_sbe_stages(explain):
+ def traverse(node, stages):
+ execution_time = node['executionTimeMicros']
+ children_fields = ['innerStage', 'outerStage', 'inputStage']
+ for field in children_fields:
+ if field in node and node[field]:
+ child = node[field]
+ execution_time -= child['executionTimeMicros']
+ traverse(child, stages)
+ del node[field]
+ node['executionTime'] = execution_time
+ stages.append(node)
+
+ sbe_tree = json.loads(explain)['executionStats']['executionStages']
+ result = []
+ traverse(sbe_tree, result)
+ return result
+
+ return pd.DataFrame(list(df.explain.apply(flatten_sbe_stages).explode()))
+
+
+def get_sbe_stage(stages_df: pd.DataFrame, stage_name: str) -> pd.DataFrame:
+ """Filter the SBE stages DataFrame by the given SBE stage name."""
+ return stages_df[stages_df.stage == stage_name].copy()
+
+
+def extract_abt_nodes(df: pd.DataFrame) -> pd.DataFrame:
+ """Extract ABT Nodes and execution statistics from calibration DataFrame."""
+
+ def extract(df_seq):
+ es_dict = extract_execution_stats(df_seq['sbe'], df_seq['abt'], [])
+
+ rows = []
+ for abt_type, es in es_dict.items():
+ row = {
+ 'abt_type': abt_type, **dataclasses.asdict(es),
+ **json.loads(df_seq['query_parameters']), 'run_id': df_seq['run_id']
+ }
+ rows.append(row)
+ return rows
+
+ return pd.DataFrame(list(df.apply(extract, axis=1).explode()))
+
+
+if __name__ == '__main__':
+ import asyncio
+ from config import DatabaseConfig
+
+ async def test():
+ """Smoke tests."""
+ database_config = DatabaseConfig(connection_string='mongodb://localhost',
+ database_name='abt_calibration', dump_path='',
+ restore_from_dump=False, dump_on_exit=False)
+ database = DatabaseInstance(database_config)
+
+ raw_df = await load_calibration_data(database, 'calibrationData')
+ print(raw_df.head())
+
+ cleaned_df = remove_outliers(raw_df, 0.0, 0.9)
+ print(cleaned_df.head())
+
+ sbe_stages_df = extract_sbe_stages(cleaned_df)
+ print(sbe_stages_df.head())
+
+ seek_df = get_sbe_stage(sbe_stages_df, 'seek')
+ print(seek_df.head())
+
+ abt_nodes_df = extract_abt_nodes(cleaned_df)
+ print(abt_nodes_df.head())
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(test())
diff --git a/buildscripts/cost_model/parameters_extractor.py b/buildscripts/cost_model/parameters_extractor.py
index e05874e962e..9f02fe77189 100644
--- a/buildscripts/cost_model/parameters_extractor.py
+++ b/buildscripts/cost_model/parameters_extractor.py
@@ -38,7 +38,7 @@ from cost_estimator import ModelParameters, ExecutionStats
import execution_tree
import physical_tree
-__all__ = ['extract_parameters']
+__all__ = ['extract_parameters', 'extract_execution_stats']
async def extract_parameters(config: AbtCalibratorConfig, database: DatabaseInstance,
@@ -112,7 +112,6 @@ def get_excution_stats(root: execution_tree.Node, node_id: int) -> ExecutionStat
def parse_explain(explain: Mapping[str, any], abt_types: Sequence[str]):
"""Extract ExecutionStats from the given explain for the given ABT types."""
- result: Mapping[str, ExecutionStats] = {}
try:
et = execution_tree.build_execution_tree(explain['executionStats'])
pt = physical_tree.build(explain['queryPlanner']['winningPlan']['optimizerPlan'])
@@ -121,9 +120,17 @@ def parse_explain(explain: Mapping[str, any], abt_types: Sequence[str]):
print(explain)
raise exception
+ return extract_execution_stats(et, pt, abt_types)
+
+
+def extract_execution_stats(et: execution_tree.Node, pt: physical_tree.Node,
+ abt_types: Sequence[str]) -> Mapping[str, ExecutionStats]:
+ """Extract ExecutionStats from the given SBE and ABT trees for the given ABT types."""
+
if len(abt_types) == 0:
abt_types = get_abt_types(pt)
+ result: Mapping[str, ExecutionStats] = {}
for abt_type in abt_types:
abt_node = find_abt_node_by_type(pt, abt_type)
if abt_node is not None: