summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2021-03-24 01:36:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 02:30:14 +0000
commit26083aaf57fded55e2ca4f82a536b1b5b3a1e6f7 (patch)
treed6e0390c5db4908ef535e399f9040159fe04c5d5
parent4e3cf572c0ba57b46d9722b11384410cb076b379 (diff)
downloadmongo-26083aaf57fded55e2ca4f82a536b1b5b3a1e6f7.tar.gz
SERVER-55215 Handle small measurement counts in buckets for ARHASH
Co-authored-by: David Storch <david.storch@mongodb.com>
-rw-r--r--jstests/noPassthrough/timeseries_sample.js185
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/exec/SConscript11
-rw-r--r--src/mongo/db/exec/bucket_unpacker.cpp240
-rw-r--r--src/mongo/db/exec/bucket_unpacker.h214
-rw-r--r--src/mongo/db/exec/bucket_unpacker_test.cpp596
-rw-r--r--src/mongo/db/exec/plan_stats.h25
-rw-r--r--src/mongo/db/exec/sample_from_timeseries_bucket.cpp121
-rw-r--r--src/mongo/db/exec/sample_from_timeseries_bucket.h129
-rw-r--r--src/mongo/db/exec/unpack_timeseries_bucket.cpp99
-rw-r--r--src/mongo/db/exec/unpack_timeseries_bucket.h71
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp333
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h197
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp275
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp109
-rw-r--r--src/mongo/db/query/classic_stage_builder.cpp2
-rw-r--r--src/mongo/db/query/plan_explainer_impl.cpp16
-rw-r--r--src/mongo/db/query/stage_types.cpp2
-rw-r--r--src/mongo/db/query/stage_types.h3
20 files changed, 1750 insertions, 881 deletions
diff --git a/jstests/noPassthrough/timeseries_sample.js b/jstests/noPassthrough/timeseries_sample.js
index d38af3559d3..7ce47a48bb0 100644
--- a/jstests/noPassthrough/timeseries_sample.js
+++ b/jstests/noPassthrough/timeseries_sample.js
@@ -12,7 +12,7 @@
load("jstests/core/timeseries/libs/timeseries.js");
load("jstests/libs/analyze_plan.js");
-const conn = MongoRunner.runMongod({setParameter: {timeseriesBucketMaxCount: 2}});
+const conn = MongoRunner.runMongod({setParameter: {timeseriesBucketMaxCount: 100}});
// Although this test is tagged with 'requires_wiredtiger', this is not sufficient for ensuring
// that the parallel suite runs this test only on WT configurations.
@@ -35,30 +35,6 @@ if (!TimeseriesTest.timeseriesCollectionsEnabled(testDB.getMongo())) {
// In order to trigger the optimized sample path we need at least 100 buckets in the bucket
// collection.
const nBuckets = 101;
-let bucketMaxCount = 2;
-let numDocs = nBuckets * bucketMaxCount;
-
-const coll = testDB.getCollection('timeseries_sample');
-const bucketsColl = testDB.getCollection("system.buckets." + coll.getName());
-
-coll.drop();
-
-const timeFieldName = "time";
-const metaFieldName = "m";
-assert.commandWorked(testDB.createCollection(
- coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
-
-assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
-
-for (let i = 0; i < numDocs; i++) {
- let id = ObjectId();
- assert.commandWorked(
- coll.insert({_id: id, [timeFieldName]: ISODate(), [metaFieldName]: i % nBuckets, x: i}),
- "failed to insert doc: " + id);
-}
-
-let buckets = bucketsColl.find().toArray();
-assert.eq(nBuckets, buckets.length, buckets);
let assertUniqueDocuments = function(docs) {
let seen = new Set();
@@ -68,55 +44,116 @@ let assertUniqueDocuments = function(docs) {
});
};
-// Check the time-series view to make sure we have the correct number of docs and that there are no
-// duplicates after sampling.
-const viewDocs = coll.find({}, {x: 1}).toArray();
-assert.eq(numDocs, viewDocs.length, viewDocs);
-
-let sampleSize = 5;
-let result = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
-assert.eq(sampleSize, result.length, result);
-assertUniqueDocuments(result);
-
-// Check that we have absorbed $sample into $_internalUnpackBucket.
-const optimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]);
-let bucketStage = getAggPlanStage(optimizedSamplePlan, "$_internalUnpackBucket");
-assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], sampleSize);
-assert(!aggPlanHasStage(optimizedSamplePlan, "$sample"));
-
-// Run an agg pipeline with optimization disabled.
-result = coll.aggregate([{$_internalInhibitOptimization: {}}, {$sample: {size: 1}}]).toArray();
-assert.eq(1, result.length, result);
-
-// Check that $sample hasn't been absorbed by $_internalUnpackBucket.
-sampleSize = 100;
-const unoptimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]);
-bucketStage = getAggPlanStage(unoptimizedSamplePlan, "$_internalUnpackBucket");
-assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], undefined);
-assert(aggPlanHasStage(unoptimizedSamplePlan, "$sample"));
-
-const unoptimizedResult = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
-assertUniqueDocuments(unoptimizedResult);
-
-// Check that a sampleSize greater than the number of measurements doesn't cause an infinte loop.
-result = coll.aggregate([{$sample: {size: numDocs + 1}}]).toArray();
-assert.eq(numDocs, result.length, result);
-
-// Check that $lookup against a time-series collection doesn't cache inner pipeline results if it
-// contains a $sample stage.
-result =
- coll.aggregate({$lookup: {from: coll.getName(), as: "docs", pipeline: [{$sample: {size: 1}}]}})
- .toArray();
-
-// Each subquery should be an independent sample by checking that we didn't sample the same document
-// repeatedly. It's sufficient for now to make sure that the seen set contains at least two distinct
-// samples.
-let seen = new Set();
-result.forEach(r => {
- assert.eq(r.docs.length, 1);
- seen.add(r.docs[0]._id);
-});
-assert.gte(seen.size, 2);
+let assertPlanForSample = (explainRes, backupPlanSelected) => {
+ if (backupPlanSelected) {
+ assert(aggPlanHasStage(explainRes, "UNPACK_BUCKET"), explainRes);
+ assert(!aggPlanHasStage(explainRes, "$_internalUnpackBucket"));
+ assert(aggPlanHasStage(explainRes, "$sample"));
+
+ // Verify that execution stats are reported correctly for the UNPACK_BUCKET stage in
+ // explain.
+ const unpackBucketStage = getAggPlanStage(explainRes, "UNPACK_BUCKET");
+ assert.neq(unpackBucketStage, null, explainRes);
+ assert(unpackBucketStage.hasOwnProperty("nBucketsUnpacked"));
+ // In the top-k plan, all of the buckets need to be unpacked.
+ assert.eq(unpackBucketStage.nBucketsUnpacked, nBuckets, unpackBucketStage);
+ } else {
+ // When the trial plan succeeds, any data produced during the trial period will be queued
+ // and returned via the QUEUED_DATA stage. If the trial plan being assessed reached EOF,
+ // then we expect only a QUEUED_DATA stage to appear in explain because all of the necessary
+ // data has already been produced. If the plan is not EOF, then we expect OR
+ // (QUEUED_DATA, <trial plan>). Either way, the presence of the QUEUED_DATA stage indicates
+ // that the trial plan was selected over the backup plan.
+ assert(aggPlanHasStage(explainRes, "QUEUED_DATA"), explainRes);
+ assert(!aggPlanHasStage(explainRes, "$_internalUnpackBucket"));
+ assert(!aggPlanHasStage(explainRes, "$sample"));
+ }
+};
+
+let runSampleTests = (measurementsPerBucket, backupPlanSelected) => {
+ const coll = testDB.getCollection("timeseries_sample");
+ coll.drop();
+
+ const bucketsColl = testDB.getCollection("system.buckets." + coll.getName());
+
+ const timeFieldName = "time";
+ const metaFieldName = "m";
+ assert.commandWorked(testDB.createCollection(
+ coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+ assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+ let numDocs = nBuckets * measurementsPerBucket;
+ const bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < numDocs; i++) {
+ bulk.insert(
+ {_id: ObjectId(), [timeFieldName]: ISODate(), [metaFieldName]: i % nBuckets, x: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ let buckets = bucketsColl.find().toArray();
+ assert.eq(nBuckets, buckets.length, buckets);
+
+ // Check the time-series view to make sure we have the correct number of docs and that there are
+ // no duplicates after sampling.
+ const viewDocs = coll.find({}, {x: 1}).toArray();
+ assert.eq(numDocs, viewDocs.length, viewDocs);
+
+ let sampleSize = 5;
+ let result = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
+ assert.eq(sampleSize, result.length, result);
+ assertUniqueDocuments(result);
+
+ // Check that we have executed the correct branch of the TrialStage.
+ const optimizedSamplePlan =
+ coll.explain("executionStats").aggregate([{$sample: {size: sampleSize}}]);
+ assertPlanForSample(optimizedSamplePlan, backupPlanSelected);
+
+ // Run an agg pipeline with optimization disabled.
+ result = coll.aggregate([{$_internalInhibitOptimization: {}}, {$sample: {size: 1}}]).toArray();
+ assert.eq(1, result.length, result);
+
+ // Check that $sample hasn't been absorbed by $_internalUnpackBucket when the
+ // sample size is sufficiently large.
+ sampleSize = 100;
+ const unoptimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]);
+ let bucketStage = getAggPlanStage(unoptimizedSamplePlan, "$_internalUnpackBucket");
+ assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], undefined);
+ assert(aggPlanHasStage(unoptimizedSamplePlan, "$sample"));
+
+ const unoptimizedResult = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
+ assert.eq(100, unoptimizedResult.length, unoptimizedResult);
+ assertUniqueDocuments(unoptimizedResult);
+
+ // Check that a sampleSize greater than the number of measurements doesn't cause an infinte
+ // loop.
+ result = coll.aggregate([{$sample: {size: numDocs + 1}}]).toArray();
+ assert.eq(numDocs, result.length, result);
+
+ // Check that $lookup against a time-series collection doesn't cache inner pipeline results if
+ // it contains a $sample stage.
+ result =
+ coll.aggregate(
+ {$lookup: {from: coll.getName(), as: "docs", pipeline: [{$sample: {size: 1}}]}})
+ .toArray();
+
+ // Each subquery should be an independent sample by checking that we didn't sample the same
+ // document repeatedly. It's sufficient for now to make sure that the seen set contains at least
+ // two distinct samples.
+ let seen = new Set();
+ result.forEach(r => {
+ assert.eq(r.docs.length, 1);
+ seen.add(r.docs[0]._id);
+ });
+ assert.gte(seen.size, 2);
+};
+
+// Test the case where the buckets are only 1% full. Due to the mostly empty buckets, we expect to
+// fall back to the non-optimized top-k algorithm for sampling from a time-series collection.
+runSampleTests(1, true);
+
+// Test the case where the buckets are 95% full. Here we expect the optimized
+// SAMPLE_FROM_TIMESERIES_BUCKET plan to be used.
+runSampleTests(95, false);
MongoRunner.stopMongod(conn);
})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index b6029533ebf..2edde6d77ac 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1187,6 +1187,7 @@ env.Library(
'exec/requires_collection_stage.cpp',
'exec/requires_index_stage.cpp',
'exec/return_key.cpp',
+ 'exec/sample_from_timeseries_bucket.cpp',
'exec/shard_filter.cpp',
'exec/shard_filterer_impl.cpp',
'exec/skip.cpp',
@@ -1198,6 +1199,7 @@ env.Library(
'exec/trial_period_utils.cpp',
'exec/trial_stage.cpp',
'exec/update_stage.cpp',
+ 'exec/unpack_timeseries_bucket.cpp',
'exec/upsert_stage.cpp',
'exec/working_set_common.cpp',
'exec/write_stage_common.cpp',
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index 981512045ea..0a238aae69c 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -51,6 +51,16 @@ env.Library(
],
)
+env.Library(
+ target = "bucket_unpacker",
+ source = [
+ "bucket_unpacker.cpp",
+ ],
+ LIBDEPS = [
+ "document_value/document_value",
+ ],
+)
+
sortExecutorEnv = env.Clone()
sortExecutorEnv.InjectThirdParty(libraries=['snappy'])
sortExecutorEnv.Library(
@@ -133,6 +143,7 @@ env.CppUnitTest(
"queued_data_stage_test.cpp",
"sort_test.cpp",
"working_set_test.cpp",
+ "bucket_unpacker_test.cpp",
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp
new file mode 100644
index 00000000000..37d4ec4a65c
--- /dev/null
+++ b/src/mongo/db/exec/bucket_unpacker.cpp
@@ -0,0 +1,240 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/timeseries/timeseries_field_names.h"
+
+namespace mongo {
+
+/**
+ * Erase computed meta projection fields if they are present in the exclusion field set.
+ */
+void eraseExcludedComputedMetaProjFields(BucketUnpacker::Behavior unpackerBehavior,
+ BucketSpec* bucketSpec) {
+ if (unpackerBehavior == BucketUnpacker::Behavior::kExclude &&
+ bucketSpec->computedMetaProjFields.size() > 0) {
+ for (auto it = bucketSpec->computedMetaProjFields.begin();
+ it != bucketSpec->computedMetaProjFields.end();) {
+ if (bucketSpec->fieldSet.find(*it) != bucketSpec->fieldSet.end()) {
+ it = bucketSpec->computedMetaProjFields.erase(it);
+ } else {
+ it++;
+ }
+ }
+ }
+}
+
+BucketUnpacker::BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior) {
+ setBucketSpecAndBehavior(std::move(spec), unpackerBehavior);
+}
+
+// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
+// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
+// the table, this helper returns the measurement count corresponding to the table record.
+// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
+// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
+// pair for the interval is computed and then linear interpolation is used to compute the
+// measurement count corresponding to the 'targetTimestampObjSize' provided.
+int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
+ auto currentInterval =
+ std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
+ std::end(BucketUnpacker::kTimestampObjSizeTable),
+ [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
+
+ if (currentInterval->second == targetTimestampObjSize) {
+ return currentInterval->first;
+ }
+ // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
+ // interval that will cover the object size is the interval before the current one.
+ tassert(5422104,
+ "currentInterval should not point to the first table entry",
+ currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
+ --currentInterval;
+
+ auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
+
+ return currentInterval->first +
+ ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
+}
+
+void BucketUnpacker::reset(BSONObj&& bucket) {
+ _fieldIters.clear();
+ _timeFieldIter = boost::none;
+
+ _bucket = std::move(bucket);
+ uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.isEmpty());
+
+ auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj();
+ if (dataRegion.isEmpty()) {
+ // If the data field of a bucket is present but it holds an empty object, there's nothing to
+ // unpack.
+ return;
+ }
+
+ auto&& timeFieldElem = dataRegion.getField(_spec.timeField);
+ uassert(5346700,
+ "The $_internalUnpackBucket stage requires the data region to have a timeField object",
+ timeFieldElem);
+
+ _timeFieldIter = BSONObjIterator{timeFieldElem.Obj()};
+
+ _metaValue = _bucket[timeseries::kBucketMetaFieldName];
+ if (_spec.metaField) {
+ // The spec indicates that there might be a metadata region. Missing metadata in
+ // measurements is expressed with missing metadata in a bucket. But we disallow undefined
+ // since the undefined BSON type is deprecated.
+ uassert(5369600,
+ "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it "
+ "must not be the deprecated undefined bson type",
+ !_metaValue || _metaValue.type() != BSONType::Undefined);
+ } else {
+ // If the spec indicates that the time series collection has no metadata field, then we
+ // should not find a metadata region in the underlying bucket documents.
+ uassert(5369601,
+ "The $_internalUnpackBucket stage expects buckets to have missing metadata regions "
+ "if the metaField parameter is not provided",
+ !_metaValue);
+ }
+
+ // Walk the data region of the bucket, and decide if an iterator should be set up based on the
+ // include or exclude case.
+ for (auto&& elem : dataRegion) {
+ auto& colName = elem.fieldNameStringData();
+ if (colName == _spec.timeField) {
+ // Skip adding a FieldIterator for the timeField since the timestamp value from
+ // _timeFieldIter can be placed accordingly in the materialized measurement.
+ continue;
+ }
+
+ // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
+ // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
+ if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ _fieldIters.emplace_back(colName.toString(), BSONObjIterator{elem.Obj()});
+ }
+ }
+
+ // Update computed meta projections with values from this bucket.
+ if (!_spec.computedMetaProjFields.empty()) {
+ for (auto&& name : _spec.computedMetaProjFields) {
+ _computedMetaProjections[name] = _bucket[name];
+ }
+ }
+
+ // Save the measurement count for the bucket.
+ _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
+}
+
+void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
+ _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec);
+ _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec);
+ _unpackerBehavior = behavior;
+ eraseExcludedComputedMetaProjFields(behavior, &bucketSpec);
+ _spec = std::move(bucketSpec);
+}
+
+const std::set<StringData> BucketUnpacker::reservedBucketFieldNames = {
+ timeseries::kBucketIdFieldName,
+ timeseries::kBucketDataFieldName,
+ timeseries::kBucketMetaFieldName,
+ timeseries::kBucketControlFieldName};
+
+void BucketUnpacker::addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames) {
+ for (auto&& field : computedFieldNames) {
+ _spec.computedMetaProjFields.emplace_back(field.toString());
+ }
+}
+
+Document BucketUnpacker::getNext() {
+ tassert(5521503, "'getNext()' requires the bucket to be owned", _bucket.isOwned());
+ tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
+
+ auto measurement = MutableDocument{};
+ auto&& timeElem = _timeFieldIter->next();
+ if (_includeTimeField) {
+ measurement.addField(_spec.timeField, Value{timeElem});
+ }
+
+ // Includes metaField when we're instructed to do so and metaField value exists.
+ if (_includeMetaField && _metaValue) {
+ measurement.addField(*_spec.metaField, Value{_metaValue});
+ }
+
+ auto& currentIdx = timeElem.fieldNameStringData();
+ for (auto&& [colName, colIter] : _fieldIters) {
+ if (auto&& elem = *colIter; colIter.more() && elem.fieldNameStringData() == currentIdx) {
+ measurement.addField(colName, Value{elem});
+ colIter.advance(elem);
+ }
+ }
+
+ // Add computed meta projections.
+ for (auto&& name : _spec.computedMetaProjFields) {
+ measurement.addField(name, Value{_computedMetaProjections[name]});
+ }
+
+ return measurement.freeze();
+}
+
+Document BucketUnpacker::extractSingleMeasurement(int j) {
+ tassert(5422101,
+ "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
+ "or equal to the number of measurements in a bucket",
+ j >= 0 && j < _numberOfMeasurements);
+
+ auto measurement = MutableDocument{};
+
+ auto rowKey = std::to_string(j);
+ auto targetIdx = StringData{rowKey};
+ auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj();
+
+ if (_includeMetaField && !_metaValue.isNull()) {
+ measurement.addField(*_spec.metaField, Value{_metaValue});
+ }
+
+ for (auto&& dataElem : dataRegion) {
+ auto colName = dataElem.fieldNameStringData();
+ if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ continue;
+ }
+ auto value = dataElem[targetIdx];
+ if (value) {
+ measurement.addField(dataElem.fieldNameStringData(), Value{value});
+ }
+ }
+
+ // Add computed meta projections.
+ for (auto&& name : _spec.computedMetaProjFields) {
+ measurement.addField(name, Value{_computedMetaProjections[name]});
+ }
+
+ return measurement.freeze();
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h
new file mode 100644
index 00000000000..9c505bc1682
--- /dev/null
+++ b/src/mongo/db/exec/bucket_unpacker.h
@@ -0,0 +1,214 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <set>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/exec/document_value/document.h"
+
+namespace mongo {
+/**
+ * Carries parameters for unpacking a bucket.
+ */
+struct BucketSpec {
+ // The user-supplied timestamp field name specified during time-series collection creation.
+ std::string timeField;
+
+ // An optional user-supplied metadata field name specified during time-series collection
+ // creation. This field name is used during materialization of metadata fields of a measurement
+ // after unpacking.
+ boost::optional<std::string> metaField;
+
+ // The set of field names in the data region that should be included or excluded.
+ std::set<std::string> fieldSet;
+
+ // Vector of computed meta field projection names. Added at the end of materialized
+ // measurements.
+ std::vector<std::string> computedMetaProjFields;
+};
+
+/**
+ * BucketUnpacker will unpack bucket fields for metadata and the provided fields.
+ */
+class BucketUnpacker {
+public:
+ // A table that is useful for interpolations between the number of measurements in a bucket and
+ // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i,
+ // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the
+ // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are
+ // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99],
+ // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to
+ // exceed the server BSON object limit of 16 MB.
+ static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{
+ {{0, BSONObj::kMinBSONLength},
+ {10, 115},
+ {100, 1195},
+ {1000, 12895},
+ {10000, 138895},
+ {100000, 1488895},
+ {1000000, 15888895},
+ {10000000, 168888895}}};
+
+ /**
+ * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series
+ * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time.
+ */
+ static int computeMeasurementCount(int targetTimestampObjSize);
+
+ // Set of field names reserved for time-series buckets.
+ static const std::set<StringData> reservedBucketFieldNames;
+
+ // When BucketUnpacker is created with kInclude it must produce measurements that contain the
+ // set of fields. Otherwise, if the kExclude option is used, the measurements will include the
+ // set difference between all fields in the bucket and the provided fields.
+ enum class Behavior { kInclude, kExclude };
+
+ BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior);
+
+ /**
+ * This method will continue to materialize Documents until the bucket is exhausted. A
+ * precondition of this method is that 'hasNext()' must be true.
+ */
+ Document getNext();
+
+ /**
+ * This method will extract the j-th measurement from the bucket. A precondition of this method
+ * is that j >= 0 && j <= the number of measurements within the underlying bucket.
+ */
+ Document extractSingleMeasurement(int j);
+
+ bool hasNext() const {
+ return _timeFieldIter && _timeFieldIter->more();
+ }
+
+ /**
+ * This resets the unpacker to prepare to unpack a new bucket described by the given document.
+ */
+ void reset(BSONObj&& bucket);
+
+ Behavior behavior() const {
+ return _unpackerBehavior;
+ }
+
+ const BucketSpec& bucketSpec() const {
+ return _spec;
+ }
+
+ const BSONObj& bucket() const {
+ return _bucket;
+ }
+
+ bool includeMetaField() const {
+ return _includeMetaField;
+ }
+
+ bool includeTimeField() const {
+ return _includeTimeField;
+ }
+
+ int32_t numberOfMeasurements() const {
+ return _numberOfMeasurements;
+ }
+
+ void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
+
+ // Add computed meta projection names to the bucket specification.
+ void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames);
+
+private:
+ BucketSpec _spec;
+ Behavior _unpackerBehavior;
+
+ // Iterates the timestamp section of the bucket to drive the unpacking iteration.
+ boost::optional<BSONObjIterator> _timeFieldIter;
+
+ // A flag used to mark that the timestamp value should be materialized in measurements.
+ bool _includeTimeField;
+
+ // A flag used to mark that a bucket's metadata value should be materialized in measurements.
+ bool _includeMetaField;
+
+ // The bucket being unpacked.
+ BSONObj _bucket;
+
+ // Since the metadata value is the same across all materialized measurements we can cache the
+ // metadata BSONElement in the reset phase and use it to materialize the metadata in each
+ // measurement.
+ BSONElement _metaValue;
+
+ // Iterators used to unpack the columns of the above bucket that are populated during the reset
+ // phase according to the provided 'Behavior' and 'BucketSpec'.
+ std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters;
+
+ // Map <name, BSONElement> for the computed meta field projections. Updated for
+ // every bucket upon reset().
+ stdx::unordered_map<std::string, BSONElement> _computedMetaProjections;
+
+ // The number of measurements in the bucket.
+ int32_t _numberOfMeasurements = 0;
+};
+
+/**
+ * Removes metaField from the field set and returns a boolean indicating whether metaField should be
+ * included in the materialized measurements. Always returns false if metaField does not exist.
+ */
+inline bool eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior,
+ BucketSpec* bucketSpec) {
+ if (!bucketSpec->metaField) {
+ return false;
+ } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField);
+ itr != bucketSpec->fieldSet.end()) {
+ bucketSpec->fieldSet.erase(itr);
+ return unpackerBehavior == BucketUnpacker::Behavior::kInclude;
+ } else {
+ return unpackerBehavior == BucketUnpacker::Behavior::kExclude;
+ }
+}
+
+/**
+ * Determines if timestamp values should be included in the materialized measurements.
+ */
+inline bool determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior,
+ BucketSpec* bucketSpec) {
+ return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end());
+}
+
+/**
+ * Determines if an arbitrary field should be included in the materialized measurements.
+ */
+inline bool determineIncludeField(StringData fieldName,
+ BucketUnpacker::Behavior unpackerBehavior,
+ const BucketSpec& bucketSpec) {
+ return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/bucket_unpacker_test.cpp b/src/mongo/db/exec/bucket_unpacker_test.cpp
new file mode 100644
index 00000000000..311f626854e
--- /dev/null
+++ b/src/mongo/db/exec/bucket_unpacker_test.cpp
@@ -0,0 +1,596 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/json.h"
+#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+constexpr auto kUserDefinedTimeName = "time"_sd;
+constexpr auto kUserDefinedMetaName = "myMeta"_sd;
+
+/**
+ * A fixture to test the BucketUnpacker
+ */
+class BucketUnpackerTest : public mongo::unittest::Test {
+public:
+ /**
+ * Makes a fresh BucketUnpacker, resets it to unpack the given 'bucket', and then returns it
+ * before actually doing any unpacking.
+ */
+ BucketUnpacker makeBucketUnpacker(std::set<std::string> fields,
+ BucketUnpacker::Behavior behavior,
+ BSONObj bucket,
+ boost::optional<std::string> metaFieldName = boost::none) {
+ auto spec = BucketSpec{kUserDefinedTimeName.toString(), metaFieldName, std::move(fields)};
+
+ BucketUnpacker unpacker{std::move(spec), behavior};
+ unpacker.reset(std::move(bucket));
+ return unpacker;
+ }
+
+ /**
+ * Constructs a 'BucketUnpacker' based on the provided parameters and then resets it to unpack
+ * the given 'bucket'. Asserts that 'reset()' throws the given 'errorCode'.
+ */
+ void assertUnpackerThrowsCode(std::set<std::string> fields,
+ BucketUnpacker::Behavior behavior,
+ BSONObj bucket,
+ boost::optional<std::string> metaFieldName,
+ int errorCode) {
+ auto spec = BucketSpec{kUserDefinedTimeName.toString(), metaFieldName, std::move(fields)};
+ BucketUnpacker unpacker{std::move(spec), behavior};
+ ASSERT_THROWS_CODE(unpacker.reset(std::move(bucket)), AssertionException, errorCode);
+ }
+
+ void assertGetNext(BucketUnpacker& unpacker, const Document& expected) {
+ ASSERT_DOCUMENT_EQ(unpacker.getNext(), expected);
+ }
+};
+
+TEST_F(BucketUnpackerTest, UnpackBasicIncludeAllMeasurementFields) {
+ std::set<std::string> fields{
+ "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kInclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a :2, b: 1}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, ExcludeASingleField) {
+ std::set<std::string> fields{"b"};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kInclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+
+ // We should produce empty documents, one per measurement in the bucket.
+ for (auto idx = 0; idx < 2; ++idx) {
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document(fromjson("{}")));
+ }
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a :2, b: 1}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")});
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, b: 1}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) {
+ std::set<std::string> fields{
+ "_id", "$a", "b", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString()};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "$a:{'0':1, '1':2}, b:{'1':1}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kInclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker,
+ Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, $a: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(
+ unpacker,
+ Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, $a: 2, b: 1}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")});
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, UnorderedRowKeysDoesntAffectMaterialization) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, "
+ "'2': 3}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadata) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson("{data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 1, _id: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 2, _id: 2}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 3, _id: 3}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, ExcludedMetaFieldDoesntMaterializeMetadataWhenBucketHasMeta) {
+ std::set<std::string> fields{kUserDefinedMetaName.toString()};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, "
+ "'2': 3}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 1, _id: 1}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 2, _id: 2}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 3, _id: 3}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: undefined, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}");
+
+ assertUnpackerThrowsCode(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString(),
+ 5369600);
+}
+
+TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(
+ "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, "
+ "'2': 3}}}");
+
+ assertUnpackerThrowsCode(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ boost::none /* no metaField provided */,
+ 5369601);
+}
+
+TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) {
+ std::set<std::string> fields{};
+
+ auto bucket =
+ fromjson("{meta: null, data: {_id: {'1':4, '0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 4, myMeta: null, _id: 4}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 5, myMeta: null, _id: 5}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 6, myMeta: null, _id: 6}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) {
+ std::set<std::string> fields{};
+
+ auto bucket = fromjson(R"(
+{
+ data: {
+ _id: {'1':4, '0':5, '2':6},
+ time: {'1':4, '0': 5, '2': 6}
+ }
+})");
+
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ std::move(bucket),
+ kUserDefinedMetaName.toString());
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 4, _id: 4}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 5, _id: 5}")});
+
+ ASSERT_TRUE(unpacker.hasNext());
+ assertGetNext(unpacker, Document{fromjson("{time: 6, _id: 6}")});
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) {
+ std::set<std::string> fields{};
+
+ auto bucket =
+ Document{{"_id", 1}, {"meta", Document{{"m1", 999}, {"m2", 9999}}}, {"data", Document{}}};
+ auto unpacker = makeBucketUnpacker(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ bucket.toBson(),
+ kUserDefinedMetaName.toString());
+ ASSERT_FALSE(unpacker.hasNext());
+}
+
+TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnEmptyBucket) {
+ std::set<std::string> fields{};
+
+ auto bucket = Document{};
+ assertUnpackerThrowsCode(std::move(fields),
+ BucketUnpacker::Behavior::kExclude,
+ bucket.toBson(),
+ kUserDefinedMetaName.toString(),
+ 5346510);
+}
+
+TEST_F(BucketUnpackerTest, EraseMetaFromFieldSetAndDetermineIncludeMeta) {
+ // Tests a missing 'metaField' in the spec.
+ std::set<std::string> empFields{};
+ auto spec = BucketSpec{kUserDefinedTimeName.toString(), boost::none, std::move(empFields)};
+ ASSERT_FALSE(
+ eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude, &spec));
+
+ // Tests a spec with 'metaField' in include list.
+ std::set<std::string> fields{kUserDefinedMetaName.toString()};
+ auto specWithMetaInclude = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude,
+ &specWithMetaInclude));
+ ASSERT_EQ(specWithMetaInclude.fieldSet.count(kUserDefinedMetaName.toString()), 0);
+
+ std::set<std::string> fieldsNoMeta{"foo"};
+ auto specWithFooInclude = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fieldsNoMeta)};
+ ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kExclude,
+ &specWithFooInclude));
+ ASSERT_FALSE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude,
+ &specWithFooInclude));
+
+ // Tests a spec with 'metaField' not in exclude list.
+ std::set<std::string> excludeFields{};
+ auto specWithMetaExclude = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(excludeFields)};
+ ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kExclude,
+ &specWithMetaExclude));
+ ASSERT_FALSE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude,
+ &specWithMetaExclude));
+}
+
+TEST_F(BucketUnpackerTest, DetermineIncludeTimeField) {
+ std::set<std::string> fields{kUserDefinedTimeName.toString()};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ ASSERT_TRUE(determineIncludeTimeField(BucketUnpacker::Behavior::kInclude, &spec));
+ ASSERT_FALSE(determineIncludeTimeField(BucketUnpacker::Behavior::kExclude, &spec));
+}
+
+TEST_F(BucketUnpackerTest, DetermineIncludeField) {
+ std::string includedMeasurementField = "measurementField1";
+ std::string excludedMeasurementField = "measurementField2";
+ std::set<std::string> fields{kUserDefinedTimeName.toString(), includedMeasurementField};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+
+ ASSERT_TRUE(determineIncludeField(
+ kUserDefinedTimeName.toString(), BucketUnpacker::Behavior::kInclude, spec));
+ ASSERT_FALSE(determineIncludeField(
+ kUserDefinedTimeName.toString(), BucketUnpacker::Behavior::kExclude, spec));
+
+ ASSERT_TRUE(
+ determineIncludeField(includedMeasurementField, BucketUnpacker::Behavior::kInclude, spec));
+ ASSERT_FALSE(
+ determineIncludeField(includedMeasurementField, BucketUnpacker::Behavior::kExclude, spec));
+
+ ASSERT_FALSE(
+ determineIncludeField(excludedMeasurementField, BucketUnpacker::Behavior::kInclude, spec));
+ ASSERT_TRUE(
+ determineIncludeField(excludedMeasurementField, BucketUnpacker::Behavior::kExclude, spec));
+}
+
+/**
+ * Manually computes the timestamp object size for n timestamps.
+ */
+auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) {
+ BSONObjBuilder bob;
+ for (auto i = 0; i < n; ++i) {
+ bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now());
+ }
+ return bob.done().objsize();
+}
+
+TEST_F(BucketUnpackerTest, ExtractSingleMeasurement) {
+ std::set<std::string> fields{
+ "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude};
+
+ auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
+ auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
+ auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue();
+ auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
+ << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time"
+ << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a"
+ << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b"
+ << BSON("1" << 1 << "2" << 2)));
+
+ unpacker.reset(std::move(bucket));
+
+ auto next = unpacker.extractSingleMeasurement(0);
+ auto expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(2);
+ expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
+ {"_id", 3},
+ {"time", d3},
+ {"a", 3},
+ {"b", 2}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(1);
+ expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
+ {"_id", 2},
+ {"time", d2},
+ {"a", 2},
+ {"b", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the middle element again?
+ next = unpacker.extractSingleMeasurement(1);
+ ASSERT_DOCUMENT_EQ(next, expected);
+}
+
+TEST_F(BucketUnpackerTest, ExtractSingleMeasurementSparse) {
+ std::set<std::string> fields{
+ "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude};
+
+ auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
+ auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
+ auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
+ << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time"
+ << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1)
+ << "b" << BSON("1" << 1)));
+
+ unpacker.reset(std::move(bucket));
+ auto next = unpacker.extractSingleMeasurement(1);
+ auto expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the same element again?
+ next = unpacker.extractSingleMeasurement(1);
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(0);
+ expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the same element twice in a row?
+ next = unpacker.extractSingleMeasurement(0);
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(0);
+ ASSERT_DOCUMENT_EQ(next, expected);
+}
+
+TEST_F(BucketUnpackerTest, ComputeMeasurementCountLowerBoundsAreCorrect) {
+ // The last table entry is a sentinel for an upper bound on the interval that covers measurement
+ // counts up to 16 MB.
+ const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
+
+ // Test the case when the target size hits a table entry which represents the lower bound of an
+ // interval.
+ for (size_t index = 0; index < maxTableEntry; ++index) {
+ auto interval = BucketUnpacker::kTimestampObjSizeTable[index];
+ ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second));
+ }
+}
+
+TEST_F(BucketUnpackerTest, ComputeMeasurementCountUpperBoundsAreCorrect) {
+ const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
+
+ // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this
+ // fact and walk the table backwards to check the correctness of the S_i'th interval's upper
+ // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size
+ // containing one timestamp with the appropriate rowKey.
+ std::pair<int, int> currentInterval;
+ auto currentIntervalSize = 0;
+ auto currentIntervalCount = 0;
+ auto size = 0;
+ for (size_t index = maxTableEntry; index > 0; --index) {
+ currentInterval = BucketUnpacker::kTimestampObjSizeTable[index];
+ currentIntervalSize = currentInterval.second;
+ currentIntervalCount = currentInterval.first;
+ auto rowKey = currentIntervalCount - 1;
+ size = expectedTimestampObjSize(rowKey, 1);
+ // We need to add back the kMinBSONLength since it's subtracted out.
+ ASSERT_EQ(currentIntervalCount - 1,
+ BucketUnpacker::computeMeasurementCount(currentIntervalSize - size +
+ BSONObj::kMinBSONLength));
+ }
+}
+
+TEST_F(BucketUnpackerTest, ComputeMeasurementCountAllPointsInSmallerIntervals) {
+ // Test all values for some of the smaller intervals up to 100 measurements.
+ for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) {
+ auto size = expectedTimestampObjSize(0, bucketCount);
+ ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size));
+ }
+}
+
+TEST_F(BucketUnpackerTest, ComputeMeasurementCountInLargerIntervals) {
+ ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003));
+ ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560));
+ ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863));
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index b80b22710e6..bf21a599597 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -878,4 +878,29 @@ struct UnionWithStats final : public SpecificStats {
PlanSummaryStats planSummaryStats;
};
+struct UnpackTimeseriesBucketStats final : public SpecificStats {
+ std::unique_ptr<SpecificStats> clone() const final {
+ return std::make_unique<UnpackTimeseriesBucketStats>(*this);
+ }
+
+ uint64_t estimateObjectSizeInBytes() const {
+ return sizeof(*this);
+ }
+
+ size_t nBucketsUnpacked = 0u;
+};
+
+struct SampleFromTimeseriesBucketStats final : public SpecificStats {
+ std::unique_ptr<SpecificStats> clone() const final {
+ return std::make_unique<SampleFromTimeseriesBucketStats>(*this);
+ }
+
+ uint64_t estimateObjectSizeInBytes() const {
+ return sizeof(*this);
+ }
+
+ size_t nBucketsDiscarded = 0u;
+ size_t dupsTested = 0u;
+ size_t dupsDropped = 0u;
+};
} // namespace mongo
diff --git a/src/mongo/db/exec/sample_from_timeseries_bucket.cpp b/src/mongo/db/exec/sample_from_timeseries_bucket.cpp
new file mode 100644
index 00000000000..2d0fe7e72aa
--- /dev/null
+++ b/src/mongo/db/exec/sample_from_timeseries_bucket.cpp
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/sample_from_timeseries_bucket.h"
+#include "mongo/db/timeseries/timeseries_field_names.h"
+
+namespace mongo {
+const char* SampleFromTimeseriesBucket::kStageType = "SAMPLE_FROM_TIMESERIES_BUCKET";
+
+SampleFromTimeseriesBucket::SampleFromTimeseriesBucket(ExpressionContext* expCtx,
+ WorkingSet* ws,
+ std::unique_ptr<PlanStage> child,
+ BucketUnpacker bucketUnpacker,
+ int maxConsecutiveAttempts,
+ long long sampleSize,
+ int bucketMaxCount)
+ : PlanStage{kStageType, expCtx},
+ _ws{*ws},
+ _bucketUnpacker{std::move(bucketUnpacker)},
+ _maxConsecutiveAttempts{maxConsecutiveAttempts},
+ _sampleSize{sampleSize},
+ _bucketMaxCount{bucketMaxCount} {
+ tassert(5521500, "sampleSize must be gte to 0", sampleSize >= 0);
+ tassert(5521501, "bucketMaxCount must be gt 0", bucketMaxCount > 0);
+
+ _children.emplace_back(std::move(child));
+}
+
+void SampleFromTimeseriesBucket::materializeMeasurement(int64_t measurementIdx,
+ WorkingSetMember* member) {
+ auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(measurementIdx);
+
+ member->keyData.clear();
+ member->recordId = {};
+ member->doc = {{}, std::move(sampledDocument)};
+ member->transitionToOwnedObj();
+}
+
+std::unique_ptr<PlanStageStats> SampleFromTimeseriesBucket::getStats() {
+ _commonStats.isEOF = isEOF();
+ auto ret = std::make_unique<PlanStageStats>(_commonStats, stageType());
+ ret->specific = std::make_unique<SampleFromTimeseriesBucketStats>(_specificStats);
+ ret->children.emplace_back(child()->getStats());
+ return ret;
+}
+
+PlanStage::StageState SampleFromTimeseriesBucket::doWork(WorkingSetID* out) {
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+
+ auto id = WorkingSet::INVALID_ID;
+ auto status = child()->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ auto member = _ws.get(id);
+
+ auto bucket = member->doc.value().toBson();
+ _bucketUnpacker.reset(std::move(bucket));
+
+ auto& prng = expCtx()->opCtx->getClient()->getPrng();
+ auto j = prng.nextInt64(_bucketMaxCount);
+
+ if (j < _bucketUnpacker.numberOfMeasurements()) {
+ auto bucketId = _bucketUnpacker.bucket()[timeseries::kBucketIdFieldName];
+ auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
+
+ ++_specificStats.dupsTested;
+ if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
+ materializeMeasurement(j, member);
+ ++_nSampledSoFar;
+ _worksSinceLastAdvanced = 0;
+ *out = id;
+ } else {
+ ++_specificStats.dupsDropped;
+ ++_worksSinceLastAdvanced;
+ _ws.free(id);
+ return PlanStage::NEED_TIME;
+ }
+ } else {
+ ++_specificStats.nBucketsDiscarded;
+ ++_worksSinceLastAdvanced;
+ _ws.free(id);
+ return PlanStage::NEED_TIME;
+ }
+ uassert(5521504,
+ str::stream() << kStageType << " could not find a non-duplicate measurement after "
+ << _worksSinceLastAdvanced << " attempts",
+ _worksSinceLastAdvanced < _maxConsecutiveAttempts);
+ } else if (PlanStage::NEED_YIELD == status) {
+ *out = id;
+ }
+ return status;
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/sample_from_timeseries_bucket.h b/src/mongo/db/exec/sample_from_timeseries_bucket.h
new file mode 100644
index 00000000000..2aa26aabe7a
--- /dev/null
+++ b/src/mongo/db/exec/sample_from_timeseries_bucket.h
@@ -0,0 +1,129 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+/**
+ * This stage implements a variation on the ARHASH algorithm (see
+ * https://dl.acm.org/doi/10.1145/93605.98746), by running one iteration of the ARHASH algorithm to
+ * materialze a random measurement from a randomly sampled bucket once per doWork() call. The plan
+ * is constructed such that the input documents to this stage are coming from a storage-provided
+ * random cursor.
+ */
+class SampleFromTimeseriesBucket final : public PlanStage {
+public:
+ static const char* kStageType;
+
+ /**
+ * Constructs a 'SampleFromTimeseriesBucket' stage which uses 'bucketUnpacker' to materialize
+ * the sampled measurment from the buckets returned by the child stage.
+ * - 'sampleSize' is the user-requested number of documents to sample.
+ * - 'maxConsecutiveAttempts' configures the maximum number of consecutive "misses" when
+ * performing the ARHASH algorithm. A miss may happen either when we sample a duplicate, or the
+ * index 'j' selected by the PRNG exceeds the number of measurements in the bucket. If we miss
+ * enough times in a row, we throw an exception that terminates the execution of the query.
+ * - 'bucketMaxCount' is the maximum number of measurements allowed in a bucket, which can be
+ * configured via a server parameter.
+ */
+ SampleFromTimeseriesBucket(ExpressionContext* expCtx,
+ WorkingSet* ws,
+ std::unique_ptr<PlanStage> child,
+ BucketUnpacker bucketUnpacker,
+ int maxConsecutiveAttempts,
+ long long sampleSize,
+ int bucketMaxCount);
+
+ StageType stageType() const final {
+ return STAGE_SAMPLE_FROM_TIMESERIES_BUCKET;
+ }
+
+ bool isEOF() final {
+ return _nSampledSoFar >= _sampleSize;
+ }
+
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ const SpecificStats* getSpecificStats() const final {
+ return &_specificStats;
+ }
+
+ PlanStage::StageState doWork(WorkingSetID* id);
+
+private:
+ /**
+ * Carries the bucket _id and index for the measurement that was sampled.
+ */
+ struct SampledMeasurementKey {
+ SampledMeasurementKey(OID bucketId, int64_t measurementIndex)
+ : bucketId(bucketId), measurementIndex(measurementIndex) {}
+
+ bool operator==(const SampledMeasurementKey& key) const {
+ return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex;
+ }
+
+ OID bucketId;
+ int32_t measurementIndex;
+ };
+
+ /**
+ * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can
+ * be kept track of for de-duplication after sampling.
+ */
+ struct SampledMeasurementKeyHasher {
+ size_t operator()(const SampledMeasurementKey& s) const {
+ return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^
+ absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^
+ absl::Hash<int32_t>{}(s.measurementIndex);
+ }
+ };
+
+ // Tracks which measurements have been seen so far.
+ using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>;
+
+ void materializeMeasurement(int64_t measurementIdx, WorkingSetMember* out);
+
+ WorkingSet& _ws;
+ BucketUnpacker _bucketUnpacker;
+ SampleFromTimeseriesBucketStats _specificStats;
+
+ const int _maxConsecutiveAttempts;
+ const long long _sampleSize;
+ const int _bucketMaxCount;
+
+ int _worksSinceLastAdvanced = 0;
+ long long _nSampledSoFar = 0;
+
+ // Used to de-duplicate randomly sampled measurements.
+ SeenSet _seenSet;
+};
+} // namespace mongo
diff --git a/src/mongo/db/exec/unpack_timeseries_bucket.cpp b/src/mongo/db/exec/unpack_timeseries_bucket.cpp
new file mode 100644
index 00000000000..bbd4b61cf12
--- /dev/null
+++ b/src/mongo/db/exec/unpack_timeseries_bucket.cpp
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/unpack_timeseries_bucket.h"
+
+namespace mongo {
+namespace {
+
+void transitionToOwnedObj(Document&& doc, WorkingSetMember* member) {
+ member->keyData.clear();
+ member->recordId = {};
+ member->doc = {{}, std::move(doc)};
+ member->transitionToOwnedObj();
+}
+} // namespace
+
+const char* UnpackTimeseriesBucket::kStageType = "UNPACK_BUCKET";
+
+UnpackTimeseriesBucket::UnpackTimeseriesBucket(ExpressionContext* expCtx,
+ WorkingSet* ws,
+ std::unique_ptr<PlanStage> child,
+ BucketUnpacker bucketUnpacker)
+ : PlanStage{kStageType, expCtx}, _ws{*ws}, _bucketUnpacker{std::move(bucketUnpacker)} {
+ _children.emplace_back(std::move(child));
+}
+
+std::unique_ptr<PlanStageStats> UnpackTimeseriesBucket::getStats() {
+ _commonStats.isEOF = isEOF();
+ auto ret = std::make_unique<PlanStageStats>(_commonStats, stageType());
+ ret->specific = std::make_unique<UnpackTimeseriesBucketStats>(_specificStats);
+ ret->children.emplace_back(child()->getStats());
+ return ret;
+}
+
+PlanStage::StageState UnpackTimeseriesBucket::doWork(WorkingSetID* out) {
+ if (isEOF()) {
+ return PlanStage::IS_EOF;
+ }
+
+ if (!_bucketUnpacker.hasNext()) {
+ auto id = WorkingSet::INVALID_ID;
+ auto status = child()->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ auto member = _ws.get(id);
+
+ // Make an owned copy of the bucket document if necessary. The bucket will be unwound
+ // across multiple calls to 'doWork()', so we need to hold our own copy in the query
+ // execution layer in case the storage engine reclaims the memory for the bucket between
+ // calls to 'doWork()'.
+ auto ownedBucket = member->doc.value().toBson().getOwned();
+ _bucketUnpacker.reset(std::move(ownedBucket));
+
+ auto measurement = _bucketUnpacker.getNext();
+ transitionToOwnedObj(std::move(measurement), member);
+ ++_specificStats.nBucketsUnpacked;
+
+ *out = id;
+ } else if (PlanStage::NEED_YIELD == status) {
+ *out = id;
+ }
+ return status;
+ }
+
+ auto measurement = _bucketUnpacker.getNext();
+ *out = _ws.allocate();
+ auto member = _ws.get(*out);
+
+ transitionToOwnedObj(std::move(measurement), member);
+
+ return PlanStage::ADVANCED;
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/unpack_timeseries_bucket.h b/src/mongo/db/exec/unpack_timeseries_bucket.h
new file mode 100644
index 00000000000..84a1392369e
--- /dev/null
+++ b/src/mongo/db/exec/unpack_timeseries_bucket.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/exec/plan_stage.h"
+
+namespace mongo {
+/**
+ * This PlanStage is the analog of DocumentSourceInternalUnpackBucket, but in the PlanStage layer.
+ * It fetches a bucket from it's child as an owned BSONObj and uses the BucketUnpacker to
+ * materialize time-series measurements until the time-series bucket collection is exhausted.
+ */
+class UnpackTimeseriesBucket final : public PlanStage {
+public:
+ static const char* kStageType;
+
+ UnpackTimeseriesBucket(ExpressionContext* expCtx,
+ WorkingSet* ws,
+ std::unique_ptr<PlanStage> child,
+ BucketUnpacker bucketUnpacker);
+
+ StageType stageType() const final {
+ return STAGE_UNPACK_TIMESERIES_BUCKET;
+ }
+
+ bool isEOF() final {
+ return !_bucketUnpacker.hasNext() && child()->isEOF();
+ }
+
+ std::unique_ptr<PlanStageStats> getStats() final;
+
+ const SpecificStats* getSpecificStats() const final {
+ return &_specificStats;
+ }
+
+ PlanStage::StageState doWork(WorkingSetID* id);
+
+private:
+ WorkingSet& _ws;
+ BucketUnpacker _bucketUnpacker;
+ UnpackTimeseriesBucketStats _specificStats;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index c9c8ffae144..bbfe3182218 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -276,6 +276,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
+ '$BUILD_DIR/mongo/db/exec/bucket_unpacker',
'$BUILD_DIR/mongo/db/exec/document_value/document_value',
'$BUILD_DIR/mongo/db/exec/projection_executor',
'$BUILD_DIR/mongo/db/exec/scoped_timer',
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 8e2e81d2d14..961ca3d610b 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
-#include "mongo/bson/bsonobj.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/matcher/expression_algo.h"
@@ -59,59 +58,6 @@ REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket,
namespace {
/**
- * Removes metaField from the field set and returns a boolean indicating whether metaField should be
- * included in the materialized measurements. Always returns false if metaField does not exist.
- */
-auto eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior,
- BucketSpec* bucketSpec) {
- if (!bucketSpec->metaField) {
- return false;
- } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField);
- itr != bucketSpec->fieldSet.end()) {
- bucketSpec->fieldSet.erase(itr);
- return unpackerBehavior == BucketUnpacker::Behavior::kInclude;
- } else {
- return unpackerBehavior == BucketUnpacker::Behavior::kExclude;
- }
-}
-
-/**
- * Determine if timestamp values should be included in the materialized measurements.
- */
-auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, BucketSpec* bucketSpec) {
- return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
- (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end());
-}
-
-/**
- * Determine if an arbitrary field should be included in the materialized measurements.
- */
-auto determineIncludeField(StringData fieldName,
- BucketUnpacker::Behavior unpackerBehavior,
- const BucketSpec& bucketSpec) {
- return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
- (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
-}
-
-/**
- * Erase computed meta projection fields if they are present in the exclusion field set.
- */
-void eraseExcludedComputedMetaProjFields(BucketUnpacker::Behavior unpackerBehavior,
- BucketSpec* bucketSpec) {
- if (unpackerBehavior == BucketUnpacker::Behavior::kExclude &&
- bucketSpec->computedMetaProjFields.size() > 0) {
- for (auto it = bucketSpec->computedMetaProjFields.begin();
- it != bucketSpec->computedMetaProjFields.end();) {
- if (bucketSpec->fieldSet.find(*it) != bucketSpec->fieldSet.end()) {
- it = bucketSpec->computedMetaProjFields.erase(it);
- } else {
- it++;
- }
- }
- }
-}
-
-/**
* A projection can be internalized if every field corresponds to a boolean value. Note that this
* correctly rejects dotted fieldnames, which are mapped to objects internally.
*/
@@ -234,190 +180,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon
}
} // namespace
-// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
-// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
-// the table, this helper returns the measurement count corresponding to the table record.
-// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
-// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
-// pair for the interval is computed and then linear interpolation is used to compute the
-// measurement count corresponding to the 'targetTimestampObjSize' provided.
-int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
- auto currentInterval =
- std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
- std::end(BucketUnpacker::kTimestampObjSizeTable),
- [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
-
- if (currentInterval->second == targetTimestampObjSize) {
- return currentInterval->first;
- }
- // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
- // interval that will cover the object size is the interval before the current one.
- tassert(5422104,
- "currentInterval should not point to the first table entry",
- currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
- --currentInterval;
-
- auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
-
- return currentInterval->first +
- ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
-}
-
-void BucketUnpacker::reset(BSONObj&& bucket) {
- _fieldIters.clear();
- _timeFieldIter = boost::none;
-
- _bucket = std::move(bucket);
- uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.isEmpty());
- tassert(5346701,
- "The $_internalUnpackBucket stage requires the bucket to be owned",
- _bucket.isOwned());
-
- auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj();
- if (dataRegion.isEmpty()) {
- // If the data field of a bucket is present but it holds an empty object, there's nothing to
- // unpack.
- return;
- }
-
- auto&& timeFieldElem = dataRegion.getField(_spec.timeField);
- uassert(5346700,
- "The $_internalUnpackBucket stage requires the data region to have a timeField object",
- timeFieldElem);
-
- _timeFieldIter = BSONObjIterator{timeFieldElem.Obj()};
-
- _metaValue = _bucket[timeseries::kBucketMetaFieldName];
- if (_spec.metaField) {
- // The spec indicates that there might be a metadata region. Missing metadata in
- // measurements is expressed with missing metadata in a bucket. But we disallow undefined
- // since the undefined BSON type is deprecated.
- uassert(5369600,
- "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it "
- "must not be the deprecated undefined bson type",
- !_metaValue || _metaValue.type() != BSONType::Undefined);
- } else {
- // If the spec indicates that the time series collection has no metadata field, then we
- // should not find a metadata region in the underlying bucket documents.
- uassert(5369601,
- "The $_internalUnpackBucket stage expects buckets to have missing metadata regions "
- "if the metaField parameter is not provided",
- !_metaValue);
- }
-
- // Walk the data region of the bucket, and decide if an iterator should be set up based on the
- // include or exclude case.
- for (auto&& elem : dataRegion) {
- auto& colName = elem.fieldNameStringData();
- if (colName == _spec.timeField) {
- // Skip adding a FieldIterator for the timeField since the timestamp value from
- // _timeFieldIter can be placed accordingly in the materialized measurement.
- continue;
- }
-
- // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
- // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
- if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
- _fieldIters.emplace_back(colName.toString(), BSONObjIterator{elem.Obj()});
- }
- }
-
- // Update computed meta projections with values from this bucket.
- if (!_spec.computedMetaProjFields.empty()) {
- for (auto&& name : _spec.computedMetaProjFields) {
- _computedMetaProjections[name] = _bucket[name];
- }
- }
-
- // Save the measurement count for the owned bucket.
- _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
-}
-
-void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
- _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec);
- _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec);
- _unpackerBehavior = behavior;
- eraseExcludedComputedMetaProjFields(behavior, &bucketSpec);
- _spec = std::move(bucketSpec);
-}
-
-Document BucketUnpacker::getNext() {
- tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
-
- auto measurement = MutableDocument{};
- auto&& timeElem = _timeFieldIter->next();
- if (_includeTimeField) {
- measurement.addField(_spec.timeField, Value{timeElem});
- }
-
- // Includes metaField when we're instructed to do so and metaField value exists.
- if (_includeMetaField && _metaValue) {
- measurement.addField(*_spec.metaField, Value{_metaValue});
- }
-
- auto& currentIdx = timeElem.fieldNameStringData();
- for (auto&& [colName, colIter] : _fieldIters) {
- if (auto&& elem = *colIter; colIter.more() && elem.fieldNameStringData() == currentIdx) {
- measurement.addField(colName, Value{elem});
- colIter.advance(elem);
- }
- }
-
- // Add computed meta projections.
- for (auto&& name : _spec.computedMetaProjFields) {
- measurement.addField(name, Value{_computedMetaProjections[name]});
- }
-
- return measurement.freeze();
-}
-
-Document BucketUnpacker::extractSingleMeasurement(int j) {
- tassert(5422101,
- "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
- "or equal to the number of measurements in a bucket",
- j >= 0 && j < _numberOfMeasurements);
-
- auto measurement = MutableDocument{};
-
- auto rowKey = std::to_string(j);
- auto targetIdx = StringData{rowKey};
- auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj();
-
- if (_includeMetaField && !_metaValue.isNull()) {
- measurement.addField(*_spec.metaField, Value{_metaValue});
- }
-
- for (auto&& dataElem : dataRegion) {
- auto colName = dataElem.fieldNameStringData();
- if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
- continue;
- }
- auto value = dataElem[targetIdx];
- if (value) {
- measurement.addField(dataElem.fieldNameStringData(), Value{value});
- }
- }
-
- // Add computed meta projections.
- for (auto&& name : _spec.computedMetaProjFields) {
- measurement.addField(name, Value{_computedMetaProjections[name]});
- }
-
- return measurement.freeze();
-}
-
-const std::set<StringData> BucketUnpacker::reservedBucketFieldNames = {
- timeseries::kBucketIdFieldName,
- timeseries::kBucketDataFieldName,
- timeseries::kBucketMetaFieldName,
- timeseries::kBucketControlFieldName};
-
-void BucketUnpacker::addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames) {
- for (auto&& field : computedFieldNames) {
- _spec.computedMetaProjFields.emplace_back(field.toString());
- }
-}
-
DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
: DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
@@ -482,15 +244,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
"The $_internalUnpackBucket stage requires a timeField parameter",
specElem[timeseries::kTimeFieldName].ok());
- auto includeTimeField = determineIncludeTimeField(unpackerBehavior, &bucketSpec);
-
- auto includeMetaField =
- eraseMetaFromFieldSetAndDetermineIncludeMeta(unpackerBehavior, &bucketSpec);
-
return make_intrusive<DocumentSourceInternalUnpackBucket>(
- expCtx,
- BucketUnpacker{
- std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField});
+ expCtx, BucketUnpacker{std::move(bucketSpec), unpackerBehavior});
}
void DocumentSourceInternalUnpackBucket::serializeToArray(
@@ -524,65 +279,8 @@ void DocumentSourceInternalUnpackBucket::serializeToArray(
}
}
-DocumentSource::GetNextResult
-DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() {
- const auto kMaxAttempts = 100;
- for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) {
- auto randResult = pSource->getNext();
- switch (randResult.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- auto bucket = randResult.getDocument().toBson();
- _bucketUnpacker.reset(std::move(bucket));
-
- auto& prng = pExpCtx->opCtx->getClient()->getPrng();
- auto j = prng.nextInt64(_bucketMaxCount);
-
- if (j < _bucketUnpacker.numberOfMeasurements()) {
- auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j);
-
- auto bucketId = _bucketUnpacker.bucket()[timeseries::kBucketIdFieldName];
- auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
-
- if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
- _nSampledSoFar++;
- return sampledDocument;
- } else {
- LOGV2_DEBUG(
- 5422102,
- 1,
- "$_internalUnpackBucket optimized for sample saw duplicate measurement",
- "measurementIndex"_attr = j,
- "bucketId"_attr = bucketId);
- }
- }
- break;
- }
- case GetNextResult::ReturnStatus::kPauseExecution: {
- // This state should never be reached since the input stage is a random cursor.
- MONGO_UNREACHABLE;
- }
- case GetNextResult::ReturnStatus::kEOF: {
- return randResult;
- }
- }
- }
- uasserted(5422103,
- str::stream()
- << "$_internalUnpackBucket stage could not find a non-duplicate document after "
- << kMaxAttempts
- << " attempts while using a random cursor. This is likely a "
- "sporadic failure, please try again");
-}
-
DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
- // If the '_sampleSize' member is present, then the stage will produce randomly sampled
- // documents from buckets.
- if (_sampleSize) {
- if (_nSampledSoFar >= _sampleSize) {
- return GetNextResult::makeEOF();
- }
- return sampleUniqueMeasurementFromBuckets();
- }
+ tassert(5521502, "calling doGetNext() when '_sampleSize' is set is disallowed", !_sampleSize);
// Otherwise, fallback to unpacking every measurement in all buckets until the child stage is
// exhausted.
@@ -653,8 +351,7 @@ bool DocumentSourceInternalUnpackBucket::pushDownComputedMetaProjection(
void DocumentSourceInternalUnpackBucket::internalizeProject(const BSONObj& project,
bool isInclusion) {
// 'fields' are the top-level fields to be included/excluded by the unpacker. We handle the
- // special case of _id, which may be excluded in an inclusion $project (or vice versa),
- // here.
+ // special case of _id, which may be excluded in an inclusion $project (or vice versa), here.
auto fields = project.getFieldNames<std::set<std::string>>();
if (auto elt = project.getField("_id"); (elt.isBoolean() && elt.Bool() != isInclusion) ||
(elt.isNumber() && (elt.Int() == 1) != isInclusion)) {
@@ -672,8 +369,7 @@ void DocumentSourceInternalUnpackBucket::internalizeProject(const BSONObj& proje
std::pair<BSONObj, bool> DocumentSourceInternalUnpackBucket::extractOrBuildProjectToInternalize(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) const {
if (std::next(itr) == container->end() || !_bucketUnpacker.bucketSpec().fieldSet.empty()) {
- // There is no project to internalize or there are already fields being
- // included/excluded.
+ // There is no project to internalize or there are already fields being included/excluded.
return {BSONObj{}, false};
}
@@ -684,9 +380,9 @@ std::pair<BSONObj, bool> DocumentSourceInternalUnpackBucket::extractOrBuildProje
return {existingProj, isInclusion};
}
- // Attempt to get an inclusion $project representing the root-level dependencies of the
- // pipeline after the $_internalUnpackBucket. If this $project is not empty, then the
- // dependency set was finite.
+ // Attempt to get an inclusion $project representing the root-level dependencies of the pipeline
+ // after the $_internalUnpackBucket. If this $project is not empty, then the dependency set was
+ // finite.
Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end());
auto deps = Pipeline::getDependenciesForContainer(pExpCtx, restOfPipeline, boost::none);
if (auto dependencyProj =
@@ -709,18 +405,17 @@ std::unique_ptr<MatchExpression> createComparisonPredicate(
auto path = matchExpr->path();
auto rhs = matchExpr->getData();
- // The control field's min and max are chosen using a field-order insensitive comparator,
- // while MatchExpressions use a comparator that treats field-order as significant. Because
- // of this we will not perform this optimization on queries with operands of compound types.
+ // The control field's min and max are chosen using a field-order insensitive comparator, while
+ // MatchExpressions use a comparator that treats field-order as significant. Because of this we
+ // will not perform this optimization on queries with operands of compound types.
if (rhs.type() == BSONType::Object || rhs.type() == BSONType::Array) {
return nullptr;
}
- // MatchExpressions have special comparison semantics regarding null, in that {$eq: null}
- // will match all documents where the field is either null or missing. Because this is
- // different from both the comparison semantics that InternalExprComparison expressions and
- // the control's min and max fields use, we will not perform this optimization on queries
- // with null operands.
+ // MatchExpressions have special comparison semantics regarding null, in that {$eq: null} will
+ // match all documents where the field is either null or missing. Because this is different from
+ // both the comparison semantics that InternalExprComparison expressions and the control's min
+ // and max fields use, we will not perform this optimization on queries with null operands.
if (rhs.type() == BSONType::jstNULL) {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index c5133643874..051cc2161ca 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -31,160 +31,11 @@
#include <set>
+#include "mongo/db/exec/bucket_unpacker.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
namespace mongo {
-
-/**
- * Carries parameters for unpacking a bucket.
- */
-struct BucketSpec {
- // The user-supplied timestamp field name specified during time-series collection creation.
- std::string timeField;
-
- // An optional user-supplied metadata field name specified during time-series collection
- // creation. This field name is used during materialization of metadata fields of a measurement
- // after unpacking.
- boost::optional<std::string> metaField;
-
- // The set of field names in the data region that should be included or excluded.
- std::set<std::string> fieldSet;
-
- // Vector of computed meta field projection names. Added at the end of materialized
- // measurements.
- std::vector<std::string> computedMetaProjFields;
-};
-
-
-/**
- * BucketUnpacker will unpack bucket fields for metadata and the provided fields.
- */
-class BucketUnpacker {
-public:
- // A table that is useful for interpolations between the number of measurements in a bucket and
- // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i,
- // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the
- // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are
- // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99],
- // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to
- // exceed the server BSON object limit of 16 MB.
- static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{
- {{0, BSONObj::kMinBSONLength},
- {10, 115},
- {100, 1195},
- {1000, 12895},
- {10000, 138895},
- {100000, 1488895},
- {1000000, 15888895},
- {10000000, 168888895}}};
-
- /**
- * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series
- * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time.
- */
- static int computeMeasurementCount(int targetTimestampObjSize);
-
- // Set of field names reserved for time-series buckets.
- static const std::set<StringData> reservedBucketFieldNames;
-
- // When BucketUnpacker is created with kInclude it must produce measurements that contain the
- // set of fields. Otherwise, if the kExclude option is used, the measurements will include the
- // set difference between all fields in the bucket and the provided fields.
- enum class Behavior { kInclude, kExclude };
-
- BucketUnpacker(BucketSpec spec,
- Behavior unpackerBehavior,
- bool includeTimeField,
- bool includeMetaField)
- : _spec(std::move(spec)),
- _unpackerBehavior(unpackerBehavior),
- _includeTimeField(includeTimeField),
- _includeMetaField(includeMetaField) {}
-
- /**
- * This method will continue to materialize Documents until the bucket is exhausted. A
- * precondition of this method is that 'hasNext()' must be true.
- */
- Document getNext();
-
- /**
- * This method will extract the j-th measurement from the bucket. A precondition of this method
- * is that j >= 0 && j <= the number of measurements within the underlying bucket.
- */
- Document extractSingleMeasurement(int j);
-
- bool hasNext() const {
- return _timeFieldIter && _timeFieldIter->more();
- }
-
- /**
- * This resets the unpacker to prepare to unpack a new bucket described by the given document.
- */
- void reset(BSONObj&& bucket);
-
- Behavior behavior() const {
- return _unpackerBehavior;
- }
-
- const BucketSpec& bucketSpec() const {
- return _spec;
- }
-
- const BSONObj& bucket() const {
- return _bucket;
- }
-
- bool includeMetaField() const {
- return _includeMetaField;
- }
-
- bool includeTimeField() const {
- return _includeTimeField;
- }
-
- int32_t numberOfMeasurements() const {
- return _numberOfMeasurements;
- }
-
- void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
-
- // Add computed meta projection names to the bucket specification.
- void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames);
-
-private:
- BucketSpec _spec;
- Behavior _unpackerBehavior;
-
- // Iterates the timestamp section of the bucket to drive the unpacking iteration.
- boost::optional<BSONObjIterator> _timeFieldIter;
-
- // A flag used to mark that the timestamp value should be materialized in measurements.
- bool _includeTimeField;
-
- // A flag used to mark that a bucket's metadata value should be materialized in measurements.
- bool _includeMetaField;
-
- // The bucket being unpacked.
- BSONObj _bucket;
-
- // Since the metadata value is the same across all materialized measurements we can cache the
- // metadata BSONElement in the reset phase and use it to materialize the metadata in each
- // measurement.
- BSONElement _metaValue;
-
- // Iterators used to unpack the columns of the above bucket that are populated during the reset
- // phase according to the provided 'Behavior' and 'BucketSpec'.
- std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters;
-
- // Map <name, BSONElement> for the computed meta field projections. Updated for
- // every bucket upon reset().
- stdx::unordered_map<std::string, BSONElement> _computedMetaProjections;
-
- // The number of measurements in the bucket.
- int32_t _numberOfMeasurements = 0;
-};
-
class DocumentSourceInternalUnpackBucket : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalUnpackBucket"_sd;
@@ -244,6 +95,10 @@ public:
return boost::none;
};
+ BucketUnpacker bucketUnpacker() const {
+ return _bucketUnpacker;
+ }
+
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
@@ -323,53 +178,11 @@ public:
Pipeline::SourceContainer* container);
private:
- /**
- * Carries the bucket _id and index for the measurement that was sampled by
- * 'sampleRandomBucketOptimized'.
- */
- struct SampledMeasurementKey {
- SampledMeasurementKey(OID bucketId, int64_t measurementIndex)
- : bucketId(bucketId), measurementIndex(measurementIndex) {}
-
- bool operator==(const SampledMeasurementKey& key) const {
- return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex;
- }
-
- OID bucketId;
- int32_t measurementIndex;
- };
-
- /**
- * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can
- * be kept track of for de-duplication after sampling.
- */
- struct SampledMeasurementKeyHasher {
- size_t operator()(const SampledMeasurementKey& s) const {
- return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^
- absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^
- absl::Hash<int32_t>{}(s.measurementIndex);
- }
- };
-
- // Tracks which measurements have been seen so far. This is only used when sampling is enabled
- // for the purpose of de-duplicating measurements.
- using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>;
-
GetNextResult doGetNext() final;
- /**
- * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a
- * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum
- * number of tries is exhausted this method will throw.
- */
- GetNextResult sampleUniqueMeasurementFromBuckets();
-
BucketUnpacker _bucketUnpacker;
- long long _nSampledSoFar = 0;
int _bucketMaxCount = 0;
boost::optional<long long> _sampleSize;
-
- SeenSet _seenSet;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
index 92e43593e32..9c0ca61a175 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
@@ -812,280 +812,5 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter
AssertionException,
5408000);
}
-
-TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) {
- auto expCtx = getExpCtx();
-
- std::set<std::string> fields{
- "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
- auto spec = BucketSpec{
- kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
- auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
-
- auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
- auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
- auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue();
- auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
- << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time"
- << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a"
- << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b"
- << BSON("1" << 1 << "2" << 2)));
-
- unpacker.reset(std::move(bucket));
-
- auto next = unpacker.extractSingleMeasurement(0);
- auto expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(2);
- expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
- {"_id", 3},
- {"time", d3},
- {"a", 3},
- {"b", 2}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(1);
- expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
- {"_id", 2},
- {"time", d2},
- {"a", 2},
- {"b", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the middle element again?
- next = unpacker.extractSingleMeasurement(1);
- ASSERT_DOCUMENT_EQ(next, expected);
-}
-
-TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) {
- auto expCtx = getExpCtx();
-
- std::set<std::string> fields{
- "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
- auto spec = BucketSpec{
- kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
- auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
-
- auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
- auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
- auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
- << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time"
- << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1)
- << "b" << BSON("1" << 1)));
-
- unpacker.reset(std::move(bucket));
- auto next = unpacker.extractSingleMeasurement(1);
- auto expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the same element again?
- next = unpacker.extractSingleMeasurement(1);
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(0);
- expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the same element twice in a row?
- next = unpacker.extractSingleMeasurement(0);
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(0);
- ASSERT_DOCUMENT_EQ(next, expected);
-}
-
-class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture {
-protected:
- BSONObj makeIncludeAllSpec() {
- return BSON("$_internalUnpackBucket"
- << BSON("include" << BSON_ARRAY("_id"
- << "time" << kUserDefinedMetaName << "a"
- << "b")
- << timeseries::kTimeFieldName << kUserDefinedTimeName
- << timeseries::kMetaFieldName << kUserDefinedMetaName));
- }
-
- boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec,
- long long nSample,
- int bucketMaxCount) {
- auto ds =
- DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
- auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
- unpack->setSampleParameters(nSample, bucketMaxCount);
- return unpack;
- }
-
- boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample,
- int nBuckets,
- int nMeasurements) {
- auto spec = makeIncludeAllSpec();
- generateBuckets(nBuckets, nMeasurements);
- auto ds =
- DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
- auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
- unpack->setSampleParameters(nSample, 1000);
- return unpack;
- }
-
- boost::intrusive_ptr<DocumentSource> prepareMock() {
- auto mock = DocumentSourceMock::createForTest(getExpCtx());
- for (auto&& b : _buckets) {
- mock->push_back(DocumentSource::GetNextResult{std::move(b)});
- }
- return mock;
- }
-
- Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) {
- auto doc = MutableDocument{};
- for (auto i = 0; i < nMeasurements; ++i) {
- doc.addField(std::to_string(i), gen(i));
- }
- return doc.freeze();
- }
-
- void generateBuckets(int nBuckets, int nMeasurements) {
- auto& prng = getExpCtx()->opCtx->getClient()->getPrng();
- std::vector<Document> buckets;
- for (auto m = 0; m < nBuckets; m++) {
- auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; });
- auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; });
- auto aCol = makeBucketPart(nMeasurements,
- [&](int i) { return Value{prng.nextCanonicalDouble()}; });
- buckets.push_back({Document{
- {"_id", Value{OID::gen()}},
- {"meta", Document{{"m1", m}, {"m2", m + 1}}},
- {"data",
- Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}});
- }
-
- _buckets = std::move(buckets);
- }
-
-private:
- std::vector<Document> _buckets;
-};
-
-TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) {
- auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000);
- auto mock = prepareMock();
- unpack->setSource(mock.get());
-
- auto next = unpack->getNext();
- ASSERT_TRUE(next.isAdvanced());
-
- auto avg = 0.0;
- auto nSampled = 0;
- while (next.isAdvanced()) {
- avg += next.getDocument()["a"].getDouble();
- next = unpack->getNext();
- nSampled++;
- }
- avg /= nSampled;
- ASSERT_EQ(nSampled, 100);
-
- // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12).
- // We will check if the avg is between +/- 2*sqrt(1/12).
- auto stddev = std::sqrt(1.0 / 12.0);
- ASSERT_GT(avg, 0.5 - 2 * stddev);
- ASSERT_LT(avg, 0.5 + 2 * stddev);
-}
-
-TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) {
- auto spec = BSON("$_internalUnpackBucket"
- << BSON("include" << BSON_ARRAY("_id"
- << "time" << kUserDefinedMetaName << "a"
- << "b")
- << timeseries::kTimeFieldName << kUserDefinedTimeName
- << timeseries::kMetaFieldName << kUserDefinedMetaName));
-
- // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1.
- auto unpack = makeUnpackStage(spec, 2, 1);
-
- // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over
- // again until the 'kMaxAttempts' are reached in 'doGetNext'.
- auto mock = DocumentSourceMock::createForTest(getExpCtx());
- for (auto i = 0; i < 101; ++i) {
- mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}},
- {"meta", Document{{"m1", 1}, {"m2", 2}}},
- {"data",
- Document{{"_id", Document{{"0", 1}}},
- {"time", Document{{"0", Date_t::now()}}},
- {"a", Document{{"0", 1}}}}}});
- }
- unpack->setSource(mock.get());
-
- // The sample size is 2 and there's only one unique measurement in the mock. The second
- // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw.
- ASSERT_TRUE(unpack->getNext().isAdvanced());
- ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103);
-}
-
-namespace {
-/**
- * Manually computes the timestamp object size for n timestamps.
- */
-auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) {
- BSONObjBuilder bob;
- for (auto i = 0; i < n; ++i) {
- bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now());
- }
- return bob.done().objsize();
-}
-} // namespace
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) {
- // The last table entry is a sentinel for an upper bound on the interval that covers measurement
- // counts up to 16 MB.
- const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
-
- // Test the case when the target size hits a table entry which represents the lower bound of an
- // interval.
- for (size_t index = 0; index < maxTableEntry; ++index) {
- auto interval = BucketUnpacker::kTimestampObjSizeTable[index];
- ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) {
- const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
-
- // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this
- // fact and walk the table backwards to check the correctness of the S_i'th interval's upper
- // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size
- // containing one timestamp with the appropriate rowKey.
- std::pair<int, int> currentInterval;
- auto currentIntervalSize = 0;
- auto currentIntervalCount = 0;
- auto size = 0;
- for (size_t index = maxTableEntry; index > 0; --index) {
- currentInterval = BucketUnpacker::kTimestampObjSizeTable[index];
- currentIntervalSize = currentInterval.second;
- currentIntervalCount = currentInterval.first;
- auto rowKey = currentIntervalCount - 1;
- size = expectedTimestampObjSize(rowKey, 1);
- // We need to add back the kMinBSONLength since it's subtracted out.
- ASSERT_EQ(currentIntervalCount - 1,
- BucketUnpacker::computeMeasurementCount(currentIntervalSize - size +
- BSONObj::kMinBSONLength));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) {
- // Test all values for some of the smaller intervals up to 100 measurements.
- for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) {
- auto size = expectedTimestampObjSize(0, bucketCount);
- ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) {
- ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003));
- ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560));
- ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863));
-}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 1750ce38027..12131e95780 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -45,8 +45,10 @@
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/multi_iterator.h"
#include "mongo/db/exec/queued_data_stage.h"
+#include "mongo/db/exec/sample_from_timeseries_bucket.h"
#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/exec/trial_stage.h"
+#include "mongo/db/exec/unpack_timeseries_bucket.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/matcher/extensions_callback_real.h"
@@ -101,7 +103,8 @@ StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>>
createRandomCursorExecutor(const CollectionPtr& coll,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long sampleSize,
- long long numRecords) {
+ long long numRecords,
+ boost::optional<BucketUnpacker> bucketUnpacker) {
OperationContext* opCtx = expCtx->opCtx;
// Verify that we are already under a collection lock. We avoid taking locks ourselves in this
@@ -139,14 +142,14 @@ createRandomCursorExecutor(const CollectionPtr& coll,
// cursor may have been mistaken. For sharded collections, build a TRIAL plan that will switch
// to a collection scan if the ratio of orphaned to owned documents encountered over the first
// 100 works() is such that we would have chosen not to optimize.
- if (collectionFilter.isSharded()) {
+ static const size_t kMaxPresampleSize = 100;
+ if (collectionFilter.isSharded() && !expCtx->ns.isTimeseriesBucketsCollection()) {
// The ratio of owned to orphaned documents must be at least equal to the ratio between the
// requested sampleSize and the maximum permitted sampleSize for the original constraints to
// be satisfied. For instance, if there are 200 documents and the sampleSize is 5, then at
// least (5 / (200*0.05)) = (5/10) = 50% of those documents must be owned. If less than 5%
// of the documents in the collection are owned, we default to the backup plan.
- static const size_t kMaxPresampleSize = 100;
- const auto minWorkAdvancedRatio = std::max(
+ const auto minAdvancedToWorkRatio = std::max(
sampleSize / (numRecords * kMaxSampleRatioForRandCursor), kMaxSampleRatioForRandCursor);
// The trial plan is SHARDING_FILTER-MULTI_ITERATOR.
auto randomCursorPlan = std::make_unique<ShardFilterStage>(
@@ -162,7 +165,65 @@ createRandomCursorExecutor(const CollectionPtr& coll,
std::move(randomCursorPlan),
std::move(collScanPlan),
kMaxPresampleSize,
- minWorkAdvancedRatio);
+ minAdvancedToWorkRatio);
+ trialStage = static_cast<TrialStage*>(root.get());
+ } else if (expCtx->ns.isTimeseriesBucketsCollection()) {
+ // Use a 'TrialStage' to run a trial between 'SampleFromTimeseriesBucket' and
+ // 'UnpackTimeseriesBucket' with $sample left in the pipeline in-place. If the buckets are
+ // not sufficiently full, or the 'SampleFromTimeseriesBucket' plan draws too many
+ // duplicates, then we will fall back to the 'TrialStage' backup plan. This backup plan uses
+ // the top-k sort sampling approach.
+ //
+ // Suppose the 'gTimeseriesBucketMaxCount' is 1000, but each bucket only contains 500
+ // documents on average. The observed trial advanced/work ratio approximates the average
+ // bucket fullness, noted here as "abf". In this example, abf = 500 / 1000 = 0.5.
+ // Experiments have shown that the optimized 'SampleFromTimeseriesBucket' algorithm performs
+ // better than backup plan when
+ //
+ // sampleSize < 0.02 * abf * numRecords * gTimeseriesBucketMaxCount
+ //
+ // This inequality can be rewritten as
+ //
+ // abf > sampleSize / (0.02 * numRecords * gTimeseriesBucketMaxCount)
+ //
+ // Therefore, if the advanced/work ratio exceeds this threshold, we will use the
+ // 'SampleFromTimeseriesBucket' plan. Note that as the sample size requested by the user
+ // becomes larger with respect to the number of buckets, we require a higher advanced/work
+ // ratio in order to justify using 'SampleFromTimeseriesBucket'.
+ //
+ // Additionally, we require the 'TrialStage' to approximate the abf as at least 0.25. When
+ // buckets are mostly empty, the 'SampleFromTimeseriesBucket' will be inefficient due to a
+ // lot of sampling "misses".
+ static const auto kCoefficient = 0.02;
+ static const auto kMinBucketFullness = 0.25;
+ const auto minAdvancedToWorkRatio = std::max(
+ std::min(sampleSize / (kCoefficient * numRecords * gTimeseriesBucketMaxCount), 1.0),
+ kMinBucketFullness);
+
+ auto arhashPlan = std::make_unique<SampleFromTimeseriesBucket>(
+ expCtx.get(),
+ ws.get(),
+ std::move(root),
+ *bucketUnpacker,
+ // By using a quantity slightly higher than 'kMaxPresampleSize', we ensure that the
+ // 'SampleFromTimeseriesBucket' stage won't fail due to too many consecutive sampling
+ // attempts during the 'TrialStage's trial period.
+ kMaxPresampleSize + 5,
+ sampleSize,
+ gTimeseriesBucketMaxCount);
+
+ std::unique_ptr<PlanStage> collScanPlan = std::make_unique<CollectionScan>(
+ expCtx.get(), coll, CollectionScanParams{}, ws.get(), nullptr);
+
+ auto topkSortPlan = std::make_unique<UnpackTimeseriesBucket>(
+ expCtx.get(), ws.get(), std::move(collScanPlan), *bucketUnpacker);
+
+ root = std::make_unique<TrialStage>(expCtx.get(),
+ ws.get(),
+ std::move(arhashPlan),
+ std::move(topkSortPlan),
+ kMaxPresampleSize,
+ minAdvancedToWorkRatio);
trialStage = static_cast<TrialStage*>(root.get());
}
@@ -365,32 +426,38 @@ PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage,
const long long sampleSize = sampleStage->getSampleSize();
const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
- auto&& [exec, isStorageOptimizedSample] =
- uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords));
+
+ boost::optional<BucketUnpacker> bucketUnpacker;
+ if (unpackBucketStage) {
+ bucketUnpacker = unpackBucketStage->bucketUnpacker();
+ }
+ auto&& [exec, isStorageOptimizedSample] = uassertStatusOK(createRandomCursorExecutor(
+ collection, expCtx, sampleSize, numRecords, std::move(bucketUnpacker)));
AttachExecutorCallback attachExecutorCallback;
if (exec) {
- if (isStorageOptimizedSample) {
- if (!unpackBucketStage) {
+ if (!unpackBucketStage) {
+ if (isStorageOptimizedSample) {
// Replace $sample stage with $sampleFromRandomCursor stage.
pipeline->popFront();
std::string idString = collection->ns().isOplog() ? "ts" : "_id";
pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create(
expCtx, sampleSize, idString, numRecords));
- } else {
+ }
+ } else {
+ if (isStorageOptimizedSample) {
// If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then
// 'unpackBucketStage' is at the front of the pipeline immediately followed by a
- // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample.
- unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount);
- sources.erase(std::next(sources.begin()));
-
- // Fix the source for the next stage by pointing it to the $_internalUnpackBucket
- // stage.
- auto sourcesIt = sources.begin();
- if (std::next(sourcesIt) != sources.end()) {
- ++sourcesIt;
- (*sourcesIt)->setSource(unpackBucketStage);
- }
+ // 'sampleStage'. We need to use a TrialStage approach to handle a problem where
+ // ARHASH sampling can fail due to small measurement counts. We can push sampling
+ // and bucket unpacking down to the PlanStage layer and erase $_internalUnpackBucket
+ // and $sample.
+ sources.erase(sources.begin());
+ sources.erase(sources.begin());
+ } else {
+ // The TrialStage chose the backup plan and we need to erase just the
+ // $_internalUnpackBucket stage and leave $sample where it is.
+ sources.erase(sources.begin());
}
}
diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp
index bc9a9ab756b..ea83f27bd56 100644
--- a/src/mongo/db/query/classic_stage_builder.cpp
+++ b/src/mongo/db/query/classic_stage_builder.cpp
@@ -416,9 +416,11 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
case STAGE_MULTI_PLAN:
case STAGE_QUEUED_DATA:
case STAGE_RECORD_STORE_FAST_COUNT:
+ case STAGE_SAMPLE_FROM_TIMESERIES_BUCKET:
case STAGE_SUBPLAN:
case STAGE_TRIAL:
case STAGE_UNKNOWN:
+ case STAGE_UNPACK_TIMESERIES_BUCKET:
case STAGE_UPDATE: {
LOGV2_WARNING(4615604, "Can't build exec tree for node", "node"_attr = *root);
}
diff --git a/src/mongo/db/query/plan_explainer_impl.cpp b/src/mongo/db/query/plan_explainer_impl.cpp
index f75c88d140b..17023980407 100644
--- a/src/mongo/db/query/plan_explainer_impl.cpp
+++ b/src/mongo/db/query/plan_explainer_impl.cpp
@@ -427,6 +427,15 @@ void statsToBSON(const PlanStageStats& stats,
bob->appendNumber("nCounted", spec->nCounted);
bob->appendNumber("nSkipped", spec->nSkipped);
}
+ } else if (STAGE_SAMPLE_FROM_TIMESERIES_BUCKET == stats.stageType) {
+ SampleFromTimeseriesBucketStats* spec =
+ static_cast<SampleFromTimeseriesBucketStats*>(stats.specific.get());
+
+ if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
+ bob->appendNumber("nBucketsDiscarded", static_cast<long long>(spec->nBucketsDiscarded));
+ bob->appendNumber("dupsTested", static_cast<long long>(spec->dupsTested));
+ bob->appendNumber("dupsDropped", static_cast<long long>(spec->dupsDropped));
+ }
} else if (STAGE_SHARDING_FILTER == stats.stageType) {
ShardingFilterStats* spec = static_cast<ShardingFilterStats*>(stats.specific.get());
@@ -477,6 +486,13 @@ void statsToBSON(const PlanStageStats& stats,
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
bob->appendNumber("docsExamined", static_cast<long long>(spec->fetches));
}
+ } else if (STAGE_UNPACK_TIMESERIES_BUCKET == stats.stageType) {
+ UnpackTimeseriesBucketStats* spec =
+ static_cast<UnpackTimeseriesBucketStats*>(stats.specific.get());
+
+ if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
+ bob->appendNumber("nBucketsUnpacked", static_cast<long long>(spec->nBucketsUnpacked));
+ }
} else if (STAGE_UPDATE == stats.stageType) {
UpdateStats* spec = static_cast<UpdateStats*>(stats.specific.get());
diff --git a/src/mongo/db/query/stage_types.cpp b/src/mongo/db/query/stage_types.cpp
index 6d3b0f1fd2b..9c54f668495 100644
--- a/src/mongo/db/query/stage_types.cpp
+++ b/src/mongo/db/query/stage_types.cpp
@@ -61,6 +61,7 @@ StringData stageTypeToString(StageType stageType) {
{STAGE_QUEUED_DATA, "QUEUED_DATA"_sd},
{STAGE_RECORD_STORE_FAST_COUNT, "RECORD_STORE_FAST_COUNT"_sd},
{STAGE_RETURN_KEY, "RETURN_KEY"_sd},
+ {STAGE_SAMPLE_FROM_TIMESERIES_BUCKET, "SAMPLE_FROM_TIMESERIES_BUCKET"_sd},
{STAGE_SHARDING_FILTER, "SHARDING_FILTER"_sd},
{STAGE_SKIP, "SKIP"_sd},
{STAGE_SORT_DEFAULT, "SORT"_sd},
@@ -72,6 +73,7 @@ StringData stageTypeToString(StageType stageType) {
{STAGE_TEXT_MATCH, "TEXT_MATCH"_sd},
{STAGE_TRIAL, "TRIAL"_sd},
{STAGE_UNKNOWN, "UNKNOWN"_sd},
+ {STAGE_UNPACK_TIMESERIES_BUCKET, "UNPACK_TIMESERIES_BUCKET"_sd},
{STAGE_UPDATE, "UPDATE"_sd},
};
if (auto it = kStageTypesMap.find(stageType); it != kStageTypesMap.end()) {
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index a0dc411028b..fd3ce6fa261 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -102,6 +102,7 @@ enum StageType {
STAGE_QUEUED_DATA,
STAGE_RECORD_STORE_FAST_COUNT,
STAGE_RETURN_KEY,
+ STAGE_SAMPLE_FROM_TIMESERIES_BUCKET,
STAGE_SHARDING_FILTER,
STAGE_SKIP,
@@ -121,6 +122,8 @@ enum StageType {
STAGE_UNKNOWN,
+ STAGE_UNPACK_TIMESERIES_BUCKET,
+
STAGE_UPDATE,
};