diff options
-rw-r--r-- | buildscripts/cost_model/README.md | 21 | ||||
-rw-r--r-- | buildscripts/cost_model/abt_calibrator.py | 2 | ||||
-rw-r--r-- | buildscripts/cost_model/calibration_settings.py | 164 | ||||
-rw-r--r-- | buildscripts/cost_model/config.py | 4 | ||||
-rw-r--r-- | buildscripts/cost_model/cost_estimator.py | 13 | ||||
-rw-r--r-- | buildscripts/cost_model/data_generator.py | 2 | ||||
-rw-r--r-- | buildscripts/cost_model/start.py | 117 | ||||
-rw-r--r-- | buildscripts/cost_model/workload_execution.py | 4 |
8 files changed, 137 insertions, 190 deletions
diff --git a/buildscripts/cost_model/README.md b/buildscripts/cost_model/README.md index f838902f63a..a435b00023f 100644 --- a/buildscripts/cost_model/README.md +++ b/buildscripts/cost_model/README.md @@ -1,23 +1,4 @@ -# The smaller experiment - Calibration with queries that return smaller number of documents. (smaller "n_processed") - -This "smaller" experiment differs from the previous experiments that query on collections of a sequence of cardinalities. For calibrating each type of ABT node, we query only against one collection with around a big enough cardinality (e.g. 100k) and a small set of documents that will be queried for collecting calibration data. - -There are some differences to conduct this experiment from conducting previous experiments. - -## Data generation phase -we need to populate the collections twice - 1) first populate the small set of documents for queries and then 2) populate larger set of documents that will not be queried during most types of ABT nodes calibration. The larger set is for the collections being big enough for queries. - -There are more detailed instructions in the "data_generator" settings in "calibration_settings.py". - -## Calibration data collection phase -In this experiment, we will run different queries and these queries as mentioned above will mostly be querying the smaller set of documents in order to get a smaller value of "n_processed". - -More specifically, to collect data for calibration we only need to run "execute_small_queries()" in "start.py" and make sure to avoid running any other queries outside this function. - -## Calibration phase -This phase should be the same as calibration in other experiments. - -Please note that this type of experiment may not work with some certain ABT nodes. For example, for PhyscialScan Node, all the data collected may come with the same "n_processed" - the collection cardinality, which does not make any sense for calibration. +# Cost Model Calibrator ## Python virtual environment diff --git a/buildscripts/cost_model/abt_calibrator.py b/buildscripts/cost_model/abt_calibrator.py index 89d7c812b59..08d24032446 100644 --- a/buildscripts/cost_model/abt_calibrator.py +++ b/buildscripts/cost_model/abt_calibrator.py @@ -75,4 +75,4 @@ def calibrate_node(abt_df: pd.DataFrame, config: AbtCalibratorConfig, model = nnls.fit(X, y) return (model.coef_, model.predict) - return estimate(fit, X, y, config.test_size, config.trace) + return estimate(fit, X.to_numpy(), y.to_numpy(), config.test_size, config.trace) diff --git a/buildscripts/cost_model/calibration_settings.py b/buildscripts/cost_model/calibration_settings.py index ee814d8d337..a3849db2730 100644 --- a/buildscripts/cost_model/calibration_settings.py +++ b/buildscripts/cost_model/calibration_settings.py @@ -33,6 +33,9 @@ from random_generator import RangeGenerator, DataType, RandomDistribution, Array __all__ = ['main_config', 'distributions'] +# A string value to fill up collections and not used in queries. +HIDDEN_STRING_VALUE = '__hidden_string_value' + # Data distributions settings. distributions = {} @@ -103,69 +106,60 @@ distributions['array_small'] = ArrayRandomDistribution(lengths_distr, distributi # Database settings database = config.DatabaseConfig(connection_string='mongodb://localhost', - database_name='abt_calibration_small', dump_path='~/data/dump', + database_name='abt_calibration', dump_path='~/data/dump', restore_from_dump=config.RestoreMode.NEVER, dump_on_exit=False) -# Collection template seetings -# templates for small queries. -c_int_05_small = config.CollectionTemplate( - name="c_int_05", fields=[ - config.FieldTemplate(name="in1", data_type=config.DataType.INTEGER, - distribution=distributions["int_choice"], indexed=True), - config.FieldTemplate(name="mixed1", data_type=config.DataType.STRING, - distribution=distributions["string_mixed"], indexed=False), - config.FieldTemplate(name="uniform1", data_type=config.DataType.STRING, - distribution=distributions["string_uniform"], indexed=False), - config.FieldTemplate(name="in2", data_type=config.DataType.INTEGER, - distribution=distributions["int_choice"], indexed=True), - config.FieldTemplate(name="mixed2", data_type=config.DataType.STRING, - distribution=distributions["string_mixed"], indexed=False), - ], compound_indexes=[]) - -c_str_02_small = config.CollectionTemplate( - name="c_str_02", fields=[ - config.FieldTemplate(name="choice1", data_type=config.DataType.STRING, - distribution=distributions['string_choice_small'], indexed=True), - config.FieldTemplate(name="choice2", data_type=config.DataType.STRING, - distribution=distributions['string_choice_small'], indexed=False) - ], compound_indexes=[]) - -c_arr_01_small = config.CollectionTemplate( - name="c_arr_01", fields=[ - config.FieldTemplate(name="as", data_type=config.DataType.INTEGER, - distribution=distributions["array_small"], indexed=False), - config.FieldTemplate(name="in1", data_type=config.DataType.INTEGER, - distribution=distributions["int_choice"], indexed=True), - ], compound_indexes=[]) - -c_str_01 = config.CollectionTemplate( - name="c_str_01", fields=[ - config.FieldTemplate(name="choice1", data_type=config.DataType.STRING, - distribution=distributions['string_choice'], indexed=True) - ], compound_indexes=[]) - -c_str_02 = config.CollectionTemplate( - name="c_str_02", fields=[ - config.FieldTemplate(name="choice1", data_type=config.DataType.STRING, - distribution=distributions['string_choice'], indexed=True), - config.FieldTemplate(name="choice2", data_type=config.DataType.STRING, - distribution=distributions['string_choice'], indexed=False) - ], compound_indexes=[]) - -c_str_05 = config.CollectionTemplate( - name="c_str_05", fields=[ - config.FieldTemplate(name="choice1", data_type=config.DataType.STRING, - distribution=distributions["string_choice"], indexed=True), - config.FieldTemplate(name="mixed1", data_type=config.DataType.STRING, - distribution=distributions["string_mixed"], indexed=True), - config.FieldTemplate(name="uniform1", data_type=config.DataType.STRING, - distribution=distributions["string_uniform"], indexed=True), - config.FieldTemplate(name="choice2", data_type=config.DataType.STRING, - distribution=distributions["string_choice"], indexed=True), - config.FieldTemplate(name="mixed2", data_type=config.DataType.STRING, - distribution=distributions["string_mixed"], indexed=True), - ], compound_indexes=[["choice1", "mixed1"]]) +# Collection template settings +def create_index_scan_collection_template(name: str, cardinality: int) -> config.CollectionTemplate: + values = [ + 'iqtbr5b5is', 'vt5s3tf8o6', 'b0rgm58qsn', '9m59if353m', 'biw2l9ok17', 'b9ct0ue14d', + 'oxj0vxjsti', 'f3k8w9vb49', 'ec7v82k6nk', 'f49ufwaqx7' + ] + + start_weight = 10 + step_weight = 25 + finish_weight = start_weight + len(values) * step_weight + weights = list(range(start_weight, finish_weight, step_weight)) + fill_up_weight = cardinality - sum(weights) + if fill_up_weight > 0: + values.append(HIDDEN_STRING_VALUE) + weights.append(fill_up_weight) + + distr = RandomDistribution.choice(values, weights) + + return config.CollectionTemplate( + name=name, fields=[ + config.FieldTemplate(name="choice", data_type=config.DataType.STRING, + distribution=distr, indexed=True), + config.FieldTemplate(name="mixed1", data_type=config.DataType.STRING, + distribution=distributions["string_mixed"], indexed=False), + config.FieldTemplate(name="uniform1", data_type=config.DataType.STRING, + distribution=distributions["string_uniform"], indexed=False), + config.FieldTemplate(name="choice2", data_type=config.DataType.STRING, + distribution=distributions["string_choice"], indexed=False), + config.FieldTemplate(name="mixed2", data_type=config.DataType.STRING, + distribution=distributions["string_mixed"], indexed=False), + ], compound_indexes=[], cardinalities=[cardinality]) + + +def create_physical_scan_collection_template(name: str) -> config.CollectionTemplate: + return config.CollectionTemplate( + name=name, fields=[ + config.FieldTemplate(name="choice1", data_type=config.DataType.STRING, + distribution=distributions["string_choice"], indexed=False), + config.FieldTemplate(name="mixed1", data_type=config.DataType.STRING, + distribution=distributions["string_mixed"], indexed=False), + config.FieldTemplate(name="uniform1", data_type=config.DataType.STRING, + distribution=distributions["string_uniform"], indexed=False), + config.FieldTemplate(name="choice", data_type=config.DataType.STRING, + distribution=distributions["string_choice"], indexed=False), + config.FieldTemplate(name="mixed2", data_type=config.DataType.STRING, + distribution=distributions["string_mixed"], indexed=False), + ], compound_indexes=[], cardinalities=[1000, 5000, 10000, 50000]) + + +collection_caridinalities = list(range(10000, 50001, 5000)) c_int_05 = config.CollectionTemplate( name="c_int_05", fields=[ @@ -179,48 +173,45 @@ c_int_05 = config.CollectionTemplate( distribution=distributions["int_normal"], indexed=True), config.FieldTemplate(name="mixed2", data_type=config.DataType.STRING, distribution=distributions["string_mixed"], indexed=False), - ], compound_indexes=[]) + ], compound_indexes=[], cardinalities=collection_caridinalities) c_arr_01 = config.CollectionTemplate( name="c_arr_01", fields=[ config.FieldTemplate(name="as", data_type=config.DataType.INTEGER, distribution=distributions["array_small"], indexed=False) - ], compound_indexes=[]) + ], compound_indexes=[], cardinalities=collection_caridinalities) -# Data Generator settings -data_generator = config.DataGeneratorConfig( - enabled=True, collection_cardinalities=list(range(10000, 50001, - 2500)), collection_name_with_card=True, - batch_size=10000, collection_templates=[c_str_01, c_str_05, c_int_05, - c_arr_01], write_mode=config.WriteMode.REPLACE) -"""Data generation settings for Calibration with queries that return smaller number of documents. -# First round of data population generates a small set of documents that will be queried. - -data_generator = config.DataGeneratorConfig( - enabled=True, collection_cardinalities=[small_query_cardinality], collection_name_with_card=False, batch_size=10000, - collection_templates=[c_str_02_small, c_int_05_small, c_arr_01_small], write_mode=config.WriteMode.APPEND) +index_scan = create_index_scan_collection_template('index_scan', 1000000) -# Second round of data population which generates 100,000 documents to the collection in order to -# make sure the collection is big enough for queries. +physical_scan = create_physical_scan_collection_template('physical_scan') +# Data Generator settings data_generator = config.DataGeneratorConfig( - enabled=True, collection_cardinalities=[100000], collection_name_with_card=False, batch_size=10000, - collection_templates=[c_str_02, c_int_05, c_arr_01], write_mode=config.WriteMode.APPEND) - -# Please note that the 'WriteMode' should be 'APPEND' type and the collection name should remain -# unchanged. This can be controlled by setting "collection_name_with_card=False". -""" + enabled=True, batch_size=10000, + collection_templates=[index_scan, physical_scan, c_int_05, + c_arr_01], write_mode=config.WriteMode, collection_name_with_card=True) # Workload Execution settings workload_execution = config.WorkloadExecutionConfig( - enabled=True, output_collection_name='calibrationDataSmall', write_mode=config.WriteMode.APPEND, + enabled=True, output_collection_name='calibrationData', write_mode=config.WriteMode.REPLACE, warmup_runs=3, runs=10) + +def make_filter_by_note(note_value: any): + def impl(df): + return df[df.note == note_value] + + return impl + + abt_nodes = [ - config.AbtNodeCalibrationConfig(type='PhysicalScan'), - config.AbtNodeCalibrationConfig(type='IndexScan'), + config.AbtNodeCalibrationConfig(type='PhysicalScan', + filter_function=make_filter_by_note('PhysicalScan')), + config.AbtNodeCalibrationConfig(type='IndexScan', + filter_function=make_filter_by_note('IndexScan')), config.AbtNodeCalibrationConfig(type='Seek'), - config.AbtNodeCalibrationConfig(type='Filter'), + config.AbtNodeCalibrationConfig(type='Filter', + filter_function=make_filter_by_note('PhysicalScan')), config.AbtNodeCalibrationConfig(type='Evaluation'), config.AbtNodeCalibrationConfig(type='BinaryJoin'), config.AbtNodeCalibrationConfig(type='HashJoin'), @@ -230,6 +221,7 @@ abt_nodes = [ config.AbtNodeCalibrationConfig(type='GroupBy'), config.AbtNodeCalibrationConfig(type='Unwind'), ] + # Calibrator settings abt_calibrator = config.AbtCalibratorConfig( enabled=True, test_size=0.2, input_collection_name=workload_execution.output_collection_name, diff --git a/buildscripts/cost_model/config.py b/buildscripts/cost_model/config.py index 8080a4eaba5..5f35d5f365f 100644 --- a/buildscripts/cost_model/config.py +++ b/buildscripts/cost_model/config.py @@ -73,9 +73,8 @@ class DataGeneratorConfig: """Data Generator configuration.""" enabled: bool - collection_cardinalities: list[int] - collection_name_with_card: bool collection_templates: list[CollectionTemplate] + collection_name_with_card: bool write_mode: WriteMode batch_size: int @@ -87,6 +86,7 @@ class CollectionTemplate: name: str fields: Sequence[FieldTemplate] compound_indexes: Sequence[Sequence[str]] + cardinalities: Sequence[int] @dataclass diff --git a/buildscripts/cost_model/cost_estimator.py b/buildscripts/cost_model/cost_estimator.py index 427390b9937..f2601e58cc6 100644 --- a/buildscripts/cost_model/cost_estimator.py +++ b/buildscripts/cost_model/cost_estimator.py @@ -66,9 +66,14 @@ class LinearModel: # pylint: disable=invalid-name -def estimate(fit, X, y, test_size: float, trace: bool = False) -> LinearModel: +def estimate(fit, X: np.ndarray, y: np.ndarray, test_size: float, + trace: bool = False) -> LinearModel: """Estimate cost model parameters.""" + if len(X) == 0: + # no data to trainn return empty model + return LinearModel(coef=[], intercept=0, mse=0, r2=0, evs=0, corrcoef=[]) + # split data X_training, X_test, y_training, y_test = train_test_split(X, y, test_size=test_size) @@ -79,7 +84,7 @@ def estimate(fit, X, y, test_size: float, trace: bool = False) -> LinearModel: if len(X_test) == 0 or len(X_training) == 0: # no data to trainn return empty model - return LinearModel(coef=[], intercept=0, mse=0, rs=0, evs=0) + return LinearModel(coef=[], intercept=0, mse=0, r2=0, evs=0, corrcoef=[]) (coef, predict) = fit(X, y) y_predict = predict(X_test) @@ -87,7 +92,7 @@ def estimate(fit, X, y, test_size: float, trace: bool = False) -> LinearModel: mse = mean_squared_error(y_test, y_predict) r2 = r2_score(y_test, y_predict) evs = explained_variance_score(y_test, y_predict) - corrcoef = np.corrcoef(np.transpose(X), y) + corrcoef = np.corrcoef(np.transpose(X[:, 1:]), y) return LinearModel(coef=coef[1:], intercept=coef[0], mse=mse, r2=r2, evs=evs, - corrcoef=corrcoef[1, 2:]) + corrcoef=corrcoef[0, 1:]) diff --git a/buildscripts/cost_model/data_generator.py b/buildscripts/cost_model/data_generator.py index e10847a7546..fe237fba957 100644 --- a/buildscripts/cost_model/data_generator.py +++ b/buildscripts/cost_model/data_generator.py @@ -110,7 +110,7 @@ class DataGenerator: FieldInfo(name=ft.name, type=ft.data_type, distribution=ft.distribution, indexed=ft.indexed) for ft in coll_template.fields ] - for doc_count in self.config.collection_cardinalities: + for doc_count in coll_template.cardinalities: name = f'{coll_template.name}' if self.config.collection_name_with_card is True: name = f'{coll_template.name}_{doc_count}' diff --git a/buildscripts/cost_model/start.py b/buildscripts/cost_model/start.py index f95e010ad7f..6dbe4ef2cce 100644 --- a/buildscripts/cost_model/start.py +++ b/buildscripts/cost_model/start.py @@ -63,16 +63,41 @@ def save_to_csv(parameters: Mapping[str, Sequence[CostModelParameters]], filepat writer.writerow(fields) -async def execute_general(database: DatabaseInstance, collections: Sequence[CollectionInfo]): +async def execute_index_scan_queries(database: DatabaseInstance, + collections: Sequence[CollectionInfo]): + collection = [ci for ci in collections if ci.name.startswith('index_scan')][0] + fields = [f for f in collection.fields if f.name == 'choice'] + requests = [] - for val in distributions['string_choice'].get_values()[::3]: - keys_length = len(val) + 2 - for i in range(1, 5): + + for field in fields: + for val in field.distribution.get_values(): + if val.startswith('_'): + continue + keys_length = len(val) + 2 requests.append( - Query(pipeline=[{'$match': {f'choice{i}': val}}], keys_length_in_bytes=keys_length)) + Query(pipeline=[{'$match': {field.name: val}}], keys_length_in_bytes=keys_length, + note='IndexScan')) - await workload_execution.execute(database, main_config.workload_execution, - [ci for ci in collections if ci.name.startswith('c_str')], + await workload_execution.execute(database, main_config.workload_execution, [collection], + requests) + + +async def execute_physical_scan_queries(database: DatabaseInstance, + collections: Sequence[CollectionInfo]): + collections = [ci for ci in collections if ci.name.startswith('physical_scan')] + fields = [f for f in collections[0].fields if f.name == 'choice'] + requests = [] + for field in fields: + for val in field.distribution.get_values()[::3]: + if val.startswith('_'): + continue + keys_length = len(val) + 2 + requests.append( + Query(pipeline=[{'$match': {field.name: val}}], keys_length_in_bytes=keys_length, + note='PhysicalScan')) + + await workload_execution.execute(database, main_config.workload_execution, collections, requests) @@ -151,62 +176,6 @@ async def execute_unwind(database: DatabaseInstance, collections: Sequence[Colle requests) -async def execute_small_queries(database: DatabaseInstance, collections: Sequence[CollectionInfo]): - # strings - requests = [] - for val in distributions['string_choice_small'].get_values(): - keys_length = len(val) + 2 - for i in range(1, 3): - requests.append( - Query(pipeline=[{'$match': {f'choice{i}': val}}], keys_length_in_bytes=keys_length)) - - await workload_execution.execute(database, main_config.workload_execution, - [ci for ci in collections if ci.name.startswith('c_str_02')], - requests) - - # index intersection - colls = [ci for ci in collections if ci.name.startswith('c_int_05')] - requests = [] - - for val in distributions['int_choice'].get_values(): - for val2 in distributions['int_choice'].get_values(): - requests.append( - Query(pipeline=[{'$match': {'in1': val, 'in2': val2}}], keys_length_in_bytes=1)) - - requests.append( - Query(pipeline=[{'$match': {'in1': val, 'in2': {'$gt': 500}}}], keys_length_in_bytes=1)) - - requests.append( - Query(pipeline=[{'$match': {'in1': {'$lte': 500}, 'in2': val}}], - keys_length_in_bytes=1)) - - await execute_index_intersections_with_requests(database, colls, requests) - - # Evaluation - colls = [ci for ci in collections if ci.name.startswith('c_int_05')] - requests = [] - - for val in distributions['int_choice'].get_values(): - requests.append( - Query(pipeline=[{"$match": {'in1': val}}, {'$project': {'proj1': 1}}], - keys_length_in_bytes=1, number_of_fields=1)) - - await workload_execution.execute(database, main_config.workload_execution, colls, requests) - - # Unwind - colls = [ci for ci in collections if ci.name.startswith('c_arr_01')] - requests = [] - # average size of arrays in the collection - average_size_of_arrays = 10 - - for val in distributions['int_choice'].get_values(): - requests.append( - Query(pipeline=[{"$match": {'in1': val}}, {"$unwind": "$as"}], - number_of_fields=average_size_of_arrays)) - - await workload_execution.execute(database, main_config.workload_execution, colls, requests) - - async def main(): """Entry point function.""" script_directory = os.path.abspath(os.path.dirname(__file__)) @@ -222,18 +191,16 @@ async def main(): # 3. Collecting data for calibration (optional). # It runs the pipelines and stores explains to the database. - - # Run this execute function only to collect calibration data in the "smaller" experiment. - # await execute_small_queries(database, generator.collection_infos); - - await execute_general(database, generator.collection_infos) - main_config.workload_execution.write_mode = WriteMode.APPEND - - await execute_index_intersections(database, generator.collection_infos) - - await execute_evaluation(database, generator.collection_infos) - - await execute_unwind(database, generator.collection_infos) + execution_query_functions = [ + execute_index_scan_queries, + execute_physical_scan_queries, + execute_index_intersections, + execute_evaluation, + execute_unwind, + ] + for execute_query in execution_query_functions: + await execute_query(database, generator.collection_infos) + main_config.workload_execution.write_mode = WriteMode.APPEND # Calibration phase (optional). # Reads the explains stored on the previous step (this run and/or previous runs), diff --git a/buildscripts/cost_model/workload_execution.py b/buildscripts/cost_model/workload_execution.py index d27bbdea330..76850a0d3b5 100644 --- a/buildscripts/cost_model/workload_execution.py +++ b/buildscripts/cost_model/workload_execution.py @@ -46,6 +46,7 @@ class Query: pipeline: Pipeline keys_length_in_bytes: int = 0 number_of_fields: int = 0 + note: any = None @dataclass @@ -54,6 +55,7 @@ class QueryParameters: keys_length_in_bytes: int average_document_size_in_bytes: float + note: any number_of_fields: int = 0 def to_json(self) -> str: @@ -114,7 +116,7 @@ class WorkloadExecution: avg_doc_size = await self.database.get_average_document_size(coll_info.name) parameters = QueryParameters(keys_length_in_bytes=query.keys_length_in_bytes, number_of_fields=query.number_of_fields, - average_document_size_in_bytes=avg_doc_size) + average_document_size_in_bytes=avg_doc_size, note=query.note) for _ in range(self.config.runs): explain = await self.database.explain(coll_info.name, query.pipeline) if explain['ok'] == 1: |