diff options
author | Eric Cox <eric.cox@mongodb.com> | 2021-02-05 17:56:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-17 02:25:02 +0000 |
commit | e0b81b6eedaf048b19ffb88e4848154acf7327c1 (patch) | |
tree | f5b71c8ec3ebc45af177a4a9a15f2bcde43a0c5a | |
parent | 06ec07e0aab34e645fa116543afc88695d2d63dc (diff) | |
download | mongo-e0b81b6eedaf048b19ffb88e4848154acf7327c1.tar.gz |
SERVER-54221 Implement $sample pushdown into $_internalUnpackBucket
-rw-r--r-- | jstests/noPassthrough/timeseries_sample.js | 120 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 161 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 121 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp | 281 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 178 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 15 |
7 files changed, 817 insertions, 60 deletions
diff --git a/jstests/noPassthrough/timeseries_sample.js b/jstests/noPassthrough/timeseries_sample.js new file mode 100644 index 00000000000..2720bf43a1a --- /dev/null +++ b/jstests/noPassthrough/timeseries_sample.js @@ -0,0 +1,120 @@ +/** + * Tests inserting sample data into the time-series buckets collection. This test is for the + * exercising the optimized $sample implementation for $_internalUnpackBucket. + * @tags: [ + * sbe_incompatible, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); +load("jstests/libs/analyze_plan.js"); + +const conn = MongoRunner.runMongod({setParameter: {timeseriesBucketMaxCount: 2}}); + +// Although this test is tagged with 'requires_wiredtiger', this is not sufficient for ensuring +// that the parallel suite runs this test only on WT configurations. +if (jsTest.options().storageEngine && jsTest.options().storageEngine !== "wiredTiger") { + jsTest.log("Skipping test on non-WT storage engine: " + jsTest.options().storageEngine); + MongoRunner.stopMongod(conn); + return; +} + +const testDB = conn.getDB('timeseries_sample_db'); + +if (!TimeseriesTest.timeseriesCollectionsEnabled(testDB.getMongo())) { + jsTestLog("Skipping test because the time-series collection feature flag is disabled"); + MongoRunner.stopMongod(conn); + return; +} + +// In order to trigger the optimized sample path we need at least 100 buckets in the bucket +// collection. +const nBuckets = 101; +let bucketMaxCount = 2; +let numDocs = nBuckets * bucketMaxCount; + +const coll = testDB.getCollection('timeseries_sample'); +const bucketsColl = testDB.getCollection("system.buckets." + coll.getName()); + +coll.drop(); + +const timeFieldName = "time"; +const metaFieldName = "m"; +assert.commandWorked(testDB.createCollection( + coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); + +assert.contains(bucketsColl.getName(), testDB.getCollectionNames()); + +for (let i = 0; i < numDocs; i++) { + let id = ObjectId(); + assert.commandWorked( + coll.insert({_id: id, [timeFieldName]: ISODate(), [metaFieldName]: i % nBuckets, x: i}), + "failed to insert doc: " + id); +} + +let buckets = bucketsColl.find().toArray(); +assert.eq(nBuckets, buckets.length, buckets); + +let assertUniqueDocuments = function(docs) { + let seen = new Set(); + docs.forEach(doc => { + assert.eq(seen.has(doc._id), false); + seen.add(doc._id); + }); +}; + +// Check the time-series view to make sure we have the correct number of docs and that there are no +// duplicates after sampling. +const viewDocs = coll.find({}, {x: 1}).toArray(); +assert.eq(numDocs, viewDocs.length, viewDocs); + +let sampleSize = 5; +let result = coll.aggregate([{$sample: {size: sampleSize}}]).toArray(); +assert.eq(sampleSize, result.length, result); +assertUniqueDocuments(result); + +// Check that we have absorbed $sample into $_internalUnpackBucket. +const optimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]); +let bucketStage = getAggPlanStage(optimizedSamplePlan, "$_internalUnpackBucket"); +assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], sampleSize); +assert(!aggPlanHasStage(optimizedSamplePlan, "$sample")); + +// Run an agg pipeline with optimization disabled. +result = coll.aggregate([{$_internalInhibitOptimization: {}}, {$sample: {size: 1}}]).toArray(); +assert.eq(1, result.length, result); + +// Check that $sample hasn't been absorbed by $_internalUnpackBucket. +sampleSize = 100; +const unoptimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]); +bucketStage = getAggPlanStage(unoptimizedSamplePlan, "$_internalUnpackBucket"); +assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], undefined); +assert(aggPlanHasStage(unoptimizedSamplePlan, "$sample")); + +const unoptimizedResult = coll.aggregate([{$sample: {size: sampleSize}}]).toArray(); +assertUniqueDocuments(unoptimizedResult); + +// Check that a sampleSize greater than the number of measurements doesn't cause an infinte loop. +result = coll.aggregate([{$sample: {size: numDocs + 1}}]).toArray(); +assert.eq(numDocs, result.length, result); + +// Check that $lookup against a time-series collection doesn't cache inner pipeline results if it +// contains a $sample stage. +result = + coll.aggregate({$lookup: {from: coll.getName(), as: "docs", pipeline: [{$sample: {size: 1}}]}}) + .toArray(); + +// Each subquery should be an independent sample by checking that we didn't sample the same document +// repeatedly. It's sufficient for now to make sure that the seen set contains at least two distinct +// samples. +let seen = new Set(); +result.forEach(r => { + assert.eq(r.docs.length, 1); + seen.add(r.docs[0]._id); +}); +assert.gte(seen.size, 2); + +MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 6732fc7e951..226da5788a6 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1283,6 +1283,7 @@ env.Library( 'stats/serveronly_stats', 'storage/remove_saver', 'storage/storage_options', + 'timeseries/timeseries_idl', 'update/update_driver', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index b20be4cdd68..da1279ab92d 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -38,10 +40,12 @@ #include "mongo/db/matcher/expression_internal_expr_comparison.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/logv2/log.h" namespace mongo { @@ -77,6 +81,16 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket } /** + * Determine if an arbitrary field should be included in the materialized measurements. + */ +auto determineIncludeField(StringData fieldName, + BucketUnpacker::Behavior unpackerBehavior, + const BucketSpec& bucketSpec) { + return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); +} + +/** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ @@ -199,6 +213,35 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon } } // namespace +// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the +// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in +// the table, this helper returns the measurement count corresponding to the table record. +// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} +// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the +// pair for the interval is computed and then linear interpolation is used to compute the +// measurement count corresponding to the 'targetTimestampObjSize' provided. +int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { + auto currentInterval = + std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), + std::end(BucketUnpacker::kTimestampObjSizeTable), + [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); + + if (currentInterval->second == targetTimestampObjSize) { + return currentInterval->first; + } + // This points to the first interval larger than the target 'targetTimestampObjSize', the actual + // interval that will cover the object size is the interval before the current one. + tassert(5422104, + "currentInterval should not point to the first table entry", + currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); + --currentInterval; + + auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); + + return currentInterval->first + + ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); +} + void BucketUnpacker::reset(BSONObj&& bucket) { _fieldIters.clear(); _timeFieldIter = boost::none; @@ -253,11 +296,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) { // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. - auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end(); - if ((_unpackerBehavior == Behavior::kInclude) == found) { + if (determineIncludeField(colName, _unpackerBehavior, _spec)) { _fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}}); } } + + // Save the measurement count for the owned bucket. + _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); } void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { @@ -268,7 +313,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior } Document BucketUnpacker::getNext() { - invariant(hasNext()); + tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); auto measurement = MutableDocument{}; auto&& timeElem = _timeFieldIter->next(); @@ -291,6 +336,36 @@ Document BucketUnpacker::getNext() { return measurement.freeze(); } +Document BucketUnpacker::extractSingleMeasurement(int j) { + tassert(5422101, + "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " + "or equal to the number of measurements in a bucket", + j >= 0 && j < _numberOfMeasurements); + + auto measurement = MutableDocument{}; + + auto rowKey = std::to_string(j); + auto targetIdx = StringData{rowKey}; + auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj(); + + if (_includeMetaField && !_metaValue.isNull()) { + measurement.addField(*_spec.metaField, Value{_metaValue}); + } + + for (auto&& dataElem : dataRegion) { + auto colName = dataElem.fieldNameStringData(); + if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { + continue; + } + auto value = dataElem[targetIdx]; + if (value) { + measurement.addField(dataElem.fieldNameStringData(), Value{value}); + } + } + + return measurement.freeze(); +} + DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} @@ -362,8 +437,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField}); } -Value DocumentSourceInternalUnpackBucket::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { +void DocumentSourceInternalUnpackBucket::serializeToArray( + std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { MutableDocument out; auto behavior = _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude; @@ -377,10 +452,84 @@ Value DocumentSourceInternalUnpackBucket::serialize( if (spec.metaField) { out.addField(kMetaFieldName, Value{*spec.metaField}); } - return Value(DOC(getSourceName() << out.freeze())); + + if (!explain) { + array.push_back(Value(DOC(getSourceName() << out.freeze()))); + if (_sampleSize) { + auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize); + sampleSrc->serializeToArray(array); + } + } else { + if (_sampleSize) { + out.addField("sample", Value{static_cast<long long>(*_sampleSize)}); + out.addField("bucketMaxCount", Value{_bucketMaxCount}); + } + array.push_back(Value(DOC(getSourceName() << out.freeze()))); + } +} + +DocumentSource::GetNextResult +DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() { + const auto kMaxAttempts = 100; + for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) { + auto randResult = pSource->getNext(); + switch (randResult.getStatus()) { + case GetNextResult::ReturnStatus::kAdvanced: { + auto bucket = randResult.getDocument().toBson(); + _bucketUnpacker.reset(std::move(bucket)); + + auto& prng = pExpCtx->opCtx->getClient()->getPrng(); + auto j = prng.nextInt64(_bucketMaxCount); + + if (j < _bucketUnpacker.numberOfMeasurements()) { + auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j); + + auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName]; + auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; + + if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { + _nSampledSoFar++; + return sampledDocument; + } else { + LOGV2_DEBUG( + 5422102, + 1, + "$_internalUnpackBucket optimized for sample saw duplicate measurement", + "measurementIndex"_attr = j, + "bucketId"_attr = bucketId); + } + } + break; + } + case GetNextResult::ReturnStatus::kPauseExecution: { + // This state should never be reached since the input stage is a random cursor. + MONGO_UNREACHABLE; + } + case GetNextResult::ReturnStatus::kEOF: { + return randResult; + } + } + } + uasserted(5422103, + str::stream() + << "$_internalUnpackBucket stage could not find a non-duplicate document after " + << kMaxAttempts + << " attempts while using a random cursor. This is likely a " + "sporadic failure, please try again"); } DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { + // If the '_sampleSize' member is present, then the stage will produce randomly sampled + // documents from buckets. + if (_sampleSize) { + if (_nSampledSoFar >= _sampleSize) { + return GetNextResult::makeEOF(); + } + return sampleUniqueMeasurementFromBuckets(); + } + + // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is + // exhausted. if (_bucketUnpacker.hasNext()) { return _bucketUnpacker.getNext(); } diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 18316a1a3e9..c9880317964 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -52,6 +52,7 @@ struct BucketSpec { std::set<std::string> fieldSet; }; + /** * BucketUnpacker will unpack bucket fields for metadata and the provided fields. */ @@ -62,6 +63,29 @@ public: static constexpr StringData kBucketDataFieldName = "data"_sd; static constexpr StringData kBucketMetaFieldName = "meta"_sd; + // A table that is useful for interpolations between the number of measurements in a bucket and + // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i, + // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the + // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are + // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99], + // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to + // exceed the server BSON object limit of 16 MB. + static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{ + {{0, BSONObj::kMinBSONLength}, + {10, 115}, + {100, 1195}, + {1000, 12895}, + {10000, 138895}, + {100000, 1488895}, + {1000000, 15888895}, + {10000000, 168888895}}}; + + /** + * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series + * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time. + */ + static int computeMeasurementCount(int targetTimestampObjSize); + // When BucketUnpacker is created with kInclude it must produce measurements that contain the // set of fields. Otherwise, if the kExclude option is used, the measurements will include the // set difference between all fields in the bucket and the provided fields. @@ -76,7 +100,18 @@ public: _includeTimeField(includeTimeField), _includeMetaField(includeMetaField) {} + /** + * This method will continue to materialize Documents until the bucket is exhausted. A + * precondition of this method is that 'hasNext()' must be true. + */ Document getNext(); + + /** + * This method will extract the j-th measurement from the bucket. A precondition of this method + * is that j >= 0 && j <= the number of measurements within the underlying bucket. + */ + Document extractSingleMeasurement(int j); + bool hasNext() const { return _timeFieldIter && _timeFieldIter->more(); } @@ -106,6 +141,10 @@ public: return _includeTimeField; } + int32_t numberOfMeasurements() const { + return _numberOfMeasurements; + } + void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); private: @@ -132,6 +171,9 @@ private: // Iterators used to unpack the columns of the above bucket that are populated during the reset // phase according to the provided 'Behavior' and 'BucketSpec'. std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters; + + // The number of measurements in the bucket. + int32_t _numberOfMeasurements = 0; }; class DocumentSourceInternalUnpackBucket : public DocumentSource { @@ -154,6 +196,17 @@ public: return kStageName.rawData(); } + void serializeToArray( + std::vector<Value>& array, + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + /** + * Use 'serializeToArray' above. + */ + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + MONGO_UNREACHABLE; + } + bool includeMetaField() const { return _bucketUnpacker.includeMetaField(); } @@ -174,7 +227,13 @@ public: ChangeStreamRequirement::kBlacklist}; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + DepsTracker::State getDependencies(DepsTracker* deps) const final { + if (_sampleSize) { + deps->needRandomGenerator = true; + } + deps->needWholeDocument = true; + return DepsTracker::State::EXHAUSTIVE_ALL; + } boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; @@ -235,9 +294,69 @@ public: std::unique_ptr<MatchExpression> createPredicatesOnBucketLevelField( const MatchExpression* matchExpr) const; + /** + * Sets the sample size to 'n' and the maximum number of measurements in a bucket to be + * 'bucketMaxCount'. Calling this method implicitly changes the behavior from having the stage + * unpack every bucket in a collection to sampling buckets to generate a uniform random sample + * of size 'n'. + */ + void setSampleParameters(long long n, int bucketMaxCount) { + _sampleSize = n; + _bucketMaxCount = bucketMaxCount; + } + + boost::optional<long long> sampleSize() const { + return _sampleSize; + } + private: + /** + * Carries the bucket _id and index for the measurement that was sampled by + * 'sampleRandomBucketOptimized'. + */ + struct SampledMeasurementKey { + SampledMeasurementKey(OID bucketId, int64_t measurementIndex) + : bucketId(bucketId), measurementIndex(measurementIndex) {} + + bool operator==(const SampledMeasurementKey& key) const { + return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex; + } + + OID bucketId; + int32_t measurementIndex; + }; + + /** + * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can + * be kept track of for de-duplication after sampling. + */ + struct SampledMeasurementKeyHasher { + size_t operator()(const SampledMeasurementKey& s) const { + return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^ + absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^ + absl::Hash<int32_t>{}(s.measurementIndex); + } + }; + + // Tracks which measurements have been seen so far. This is only used when sampling is enabled + // for the purpose of de-duplicating measurements. + using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>; + GetNextResult doGetNext() final; + /** + * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a + * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum + * number of tries is exhausted this method will throw. + */ + GetNextResult sampleUniqueMeasurementFromBuckets(); + BucketUnpacker _bucketUnpacker; + + long long _nSampledSoFar = 0; + int _bucketMaxCount = 0; + boost::optional<long long> _sampleSize; + + SeenSet _seenSet; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp index ae409de7e69..29287bce3b8 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020-present MongoDB, Inc. + * Copyright (C) 2021-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -784,5 +784,284 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter AssertionException, 5408000); } + +TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) { + auto expCtx = getExpCtx(); + + std::set<std::string> fields{ + "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; + + auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); + auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); + auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue(); + auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" + << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time" + << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a" + << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b" + << BSON("1" << 1 << "2" << 2))); + + unpacker.reset(std::move(bucket)); + + auto next = unpacker.extractSingleMeasurement(0); + auto expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(2); + expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, + {"_id", 3}, + {"time", d3}, + {"a", 3}, + {"b", 2}}; + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(1); + expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, + {"_id", 2}, + {"time", d2}, + {"a", 2}, + {"b", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the middle element again? + next = unpacker.extractSingleMeasurement(1); + ASSERT_DOCUMENT_EQ(next, expected); +} + +TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) { + auto expCtx = getExpCtx(); + + std::set<std::string> fields{ + "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; + + auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); + auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); + auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" + << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time" + << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1) + << "b" << BSON("1" << 1))); + + unpacker.reset(std::move(bucket)); + auto next = unpacker.extractSingleMeasurement(1); + auto expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the same element again? + next = unpacker.extractSingleMeasurement(1); + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(0); + expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the same element twice in a row? + next = unpacker.extractSingleMeasurement(0); + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(0); + ASSERT_DOCUMENT_EQ(next, expected); +} + +class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture { +protected: + BSONObj makeIncludeAllSpec() { + return BSON("$_internalUnpackBucket" + << BSON("include" << BSON_ARRAY("_id" + << "time" << kUserDefinedMetaName << "a" + << "b") + << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + } + + boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec, + long long nSample, + int bucketMaxCount) { + auto ds = + DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); + auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); + unpack->setSampleParameters(nSample, bucketMaxCount); + return unpack; + } + + boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample, + int nBuckets, + int nMeasurements) { + auto spec = makeIncludeAllSpec(); + generateBuckets(nBuckets, nMeasurements); + auto ds = + DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); + auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); + unpack->setSampleParameters(nSample, 1000); + return unpack; + } + + boost::intrusive_ptr<DocumentSource> prepareMock() { + auto mock = DocumentSourceMock::createForTest(getExpCtx()); + for (auto&& b : _buckets) { + mock->push_back(DocumentSource::GetNextResult{std::move(b)}); + } + return mock; + } + + Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) { + auto doc = MutableDocument{}; + for (auto i = 0; i < nMeasurements; ++i) { + doc.addField(std::to_string(i), gen(i)); + } + return doc.freeze(); + } + + void generateBuckets(int nBuckets, int nMeasurements) { + auto& prng = getExpCtx()->opCtx->getClient()->getPrng(); + std::vector<Document> buckets; + for (auto m = 0; m < nBuckets; m++) { + auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; }); + auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; }); + auto aCol = makeBucketPart(nMeasurements, + [&](int i) { return Value{prng.nextCanonicalDouble()}; }); + buckets.push_back({Document{ + {"_id", Value{OID::gen()}}, + {"meta", Document{{"m1", m}, {"m2", m + 1}}}, + {"data", + Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}}); + } + + _buckets = std::move(buckets); + } + +private: + std::vector<Document> _buckets; +}; + +TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) { + auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000); + auto mock = prepareMock(); + unpack->setSource(mock.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + + auto avg = 0.0; + auto nSampled = 0; + while (next.isAdvanced()) { + avg += next.getDocument()["a"].getDouble(); + next = unpack->getNext(); + nSampled++; + } + avg /= nSampled; + ASSERT_EQ(nSampled, 100); + + // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12). + // We will check if the avg is between +/- 2*sqrt(1/12). + auto stddev = std::sqrt(1.0 / 12.0); + ASSERT_GT(avg, 0.5 - 2 * stddev); + ASSERT_LT(avg, 0.5 + 2 * stddev); +} + +TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) { + auto spec = BSON("$_internalUnpackBucket" + << BSON("include" << BSON_ARRAY("_id" + << "time" << kUserDefinedMetaName << "a" + << "b") + << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + + // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1. + auto unpack = makeUnpackStage(spec, 2, 1); + + // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over + // again until the 'kMaxAttempts' are reached in 'doGetNext'. + auto mock = DocumentSourceMock::createForTest(getExpCtx()); + for (auto i = 0; i < 101; ++i) { + mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}}, + {"meta", Document{{"m1", 1}, {"m2", 2}}}, + {"data", + Document{{"_id", Document{{"0", 1}}}, + {"time", Document{{"0", Date_t::now()}}}, + {"a", Document{{"0", 1}}}}}}); + } + unpack->setSource(mock.get()); + + // The sample size is 2 and there's only one unique measurement in the mock. The second + // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw. + ASSERT_TRUE(unpack->getNext().isAdvanced()); + ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103); +} + +namespace { +/** + * Manually computes the timestamp object size for n timestamps. + */ +auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) { + BSONObjBuilder bob; + for (auto i = 0; i < n; ++i) { + bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now()); + } + return bob.done().objsize(); +} +} // namespace + +TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) { + // The last table entry is a sentinel for an upper bound on the interval that covers measurement + // counts up to 16 MB. + const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; + + // Test the case when the target size hits a table entry which represents the lower bound of an + // interval. + for (size_t index = 0; index < maxTableEntry; ++index) { + auto interval = BucketUnpacker::kTimestampObjSizeTable[index]; + ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second)); + } +} + +TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) { + const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; + + // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this + // fact and walk the table backwards to check the correctness of the S_i'th interval's upper + // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size + // containing one timestamp with the appropriate rowKey. + std::pair<int, int> currentInterval; + auto currentIntervalSize = 0; + auto currentIntervalCount = 0; + auto size = 0; + for (size_t index = maxTableEntry; index > 0; --index) { + currentInterval = BucketUnpacker::kTimestampObjSizeTable[index]; + currentIntervalSize = currentInterval.second; + currentIntervalCount = currentInterval.first; + auto rowKey = currentIntervalCount - 1; + size = expectedTimestampObjSize(rowKey, 1); + // We need to add back the kMinBSONLength since it's subtracted out. + ASSERT_EQ(currentIntervalCount - 1, + BucketUnpacker::computeMeasurementCount(currentIntervalSize - size + + BSONObj::kMinBSONLength)); + } +} + +TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) { + // Test all values for some of the smaller intervals up to 100 measurements. + for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) { + auto size = expectedTimestampObjSize(0, bucketCount); + ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size)); + } +} + +TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) { + ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003)); + ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560)); + ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863)); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 84d4905070c..ef095170045 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -59,6 +59,7 @@ #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_geo_near_cursor.h" #include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" @@ -77,6 +78,7 @@ #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/db/timeseries/timeseries_gen.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/util/time_support.h" @@ -91,20 +93,15 @@ using write_ops::Insert; namespace { /** - * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} - * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough - * percentage of the collection. - * - * If needed, adds DocumentSourceSampleFromRandomCursor to the front of the pipeline, replacing the - * $sample stage. This is needed if we select an optimized plan for $sample taking advantage of - * storage engine support for random cursors. + * Returns a 'PlanExecutor' which uses a random cursor to sample documents if successful as + * determined by the boolean. Returns {} if the storage engine doesn't support random cursors, or if + * 'sampleSize' is a large enough percentage of the collection. */ -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor( - const CollectionPtr& coll, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - long long sampleSize, - long long numRecords, - Pipeline* pipeline) { +StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>> +createRandomCursorExecutor(const CollectionPtr& coll, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long sampleSize, + long long numRecords) { OperationContext* opCtx = expCtx->opCtx; // Verify that we are already under a collection lock. We avoid taking locks ourselves in this @@ -113,14 +110,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx static const double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { - return {nullptr}; + return std::pair{nullptr, false}; } // Attempt to get a random cursor from the RecordStore. auto rsRandCursor = coll->getRecordStore()->getRandomCursor(opCtx); if (!rsRandCursor) { // The storage engine has no random cursor support. - return {nullptr}; + return std::pair{nullptr, false}; } // Build a MultiIteratorStage and pass it the random-sampling RecordCursor. @@ -169,25 +166,21 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx trialStage = static_cast<TrialStage*>(root.get()); } - auto exec = plan_executor_factory::make(expCtx, - std::move(ws), - std::move(root), - &coll, - PlanYieldPolicy::YieldPolicy::YIELD_AUTO, - QueryPlannerParams::RETURN_OWNED_DATA); + auto execStatus = plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + &coll, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + QueryPlannerParams::RETURN_OWNED_DATA); + if (!execStatus.isOK()) { + return execStatus.getStatus(); + } // For sharded collections, the root of the plan tree is a TrialStage that may have chosen // either a random-sampling cursor trial plan or a COLLSCAN backup plan. We can only optimize // the $sample aggregation stage if the trial plan was chosen. - if (!trialStage || !trialStage->pickedBackupPlan()) { - // Replace $sample stage with $sampleFromRandomCursor stage. - pipeline->popFront(); - std::string idString = coll->ns().isOplog() ? "ts" : "_id"; - pipeline->addInitialSource( - DocumentSourceSampleFromRandomCursor::create(expCtx, sampleSize, idString, numRecords)); - } - - return exec; + return std::pair{std::move(execStatus.getValue()), + !trialStage || !trialStage->pickedBackupPlan()}; } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( @@ -324,9 +317,106 @@ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx, } MONGO_UNREACHABLE; } + +/** + * This attempts to either extract a $sample stage at the front of the pipeline or a + * $_internalUnpackBucket stage at the front of the pipeline immediately followed by a $sample + * stage. In the former case a 'nullptr' is returned for the second element of the pair <$sample, + * $_internalUnpackBucket>, and if the latter case is encountered both elements of the pair will be + * a populated. If the pipeline doesn't contain a $_internalUnpackBucket at the front of the + * pipeline immediately followed by a $sample stage, then the first element in the pair will be a + * 'nullptr'. + */ +std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSampleUnpackBucket( + const Pipeline::SourceContainer& sources) { + DocumentSourceSample* sampleStage = nullptr; + DocumentSourceInternalUnpackBucket* unpackStage = nullptr; + + auto sourcesIt = sources.begin(); + if (sourcesIt != sources.end()) { + sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get()); + if (sampleStage) { + return std::pair{sampleStage, unpackStage}; + } + + unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get()); + ++sourcesIt; + + if (sourcesIt != sources.end()) { + sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get()); + return std::pair{sampleStage, unpackStage}; + } + } + + return std::pair{sampleStage, unpackStage}; +} } // namespace std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> +PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, + DocumentSourceInternalUnpackBucket* unpackBucketStage, + const CollectionPtr& collection, + Pipeline* pipeline) { + tassert(5422105, "sampleStage cannot be a nullptr", sampleStage); + + auto expCtx = pipeline->getContext(); + + Pipeline::SourceContainer& sources = pipeline->_sources; + + const long long sampleSize = sampleStage->getSampleSize(); + const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); + auto&& [exec, isStorageOptimizedSample] = + uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords)); + + AttachExecutorCallback attachExecutorCallback; + if (exec) { + if (isStorageOptimizedSample) { + if (!unpackBucketStage) { + // Replace $sample stage with $sampleFromRandomCursor stage. + pipeline->popFront(); + std::string idString = collection->ns().isOplog() ? "ts" : "_id"; + pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create( + expCtx, sampleSize, idString, numRecords)); + } else { + // If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then + // 'unpackBucketStage' is at the front of the pipeline immediately followed by a + // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample. + unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount); + sources.erase(std::next(sources.begin())); + + // Fix the source for the next stage by pointing it to the $_internalUnpackBucket + // stage. + auto sourcesIt = sources.begin(); + if (std::next(sourcesIt) != sources.end()) { + ++sourcesIt; + (*sourcesIt)->setSource(unpackBucketStage); + } + } + } + + // The order in which we evaluate these arguments is significant. We'd like to be + // sure that the DocumentSourceCursor is created _last_, because if we run into a + // case where a DocumentSourceCursor has been created (yet hasn't been put into a + // Pipeline) and an exception is thrown, an invariant will trigger in the + // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor. + auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata); + const auto cursorType = deps.hasNoRequirements() + ? DocumentSourceCursor::CursorType::kEmptyDocuments + : DocumentSourceCursor::CursorType::kRegular; + attachExecutorCallback = + [cursorType](const CollectionPtr& collection, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, + Pipeline* pipeline) { + auto cursor = DocumentSourceCursor::create( + collection, std::move(exec), pipeline->getContext(), cursorType); + pipeline->addInitialSource(std::move(cursor)); + }; + return std::pair(std::move(attachExecutorCallback), std::move(exec)); + } + return std::pair(std::move(attachExecutorCallback), nullptr); +} + +std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, const NamespaceString& nss, const AggregateCommand* aggRequest, @@ -341,31 +431,15 @@ PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, } if (!sources.empty()) { - auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get()); + // Try to inspect if the DocumentSourceSample or a DocumentSourceInternalUnpackBucket stage + // can be optimized for sampling backed by a storage engine supplied random cursor. + auto&& [sampleStage, unpackBucketStage] = extractSampleUnpackBucket(sources); + // Optimize an initial $sample stage if possible. if (collection && sampleStage) { - const long long sampleSize = sampleStage->getSampleSize(); - const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); - auto exec = uassertStatusOK( - createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords, pipeline)); + auto [attachExecutorCallback, exec] = + buildInnerQueryExecutorSample(sampleStage, unpackBucketStage, collection, pipeline); if (exec) { - // The order in which we evaluate these arguments is significant. We'd like to be - // sure that the DocumentSourceCursor is created _last_, because if we run into a - // case where a DocumentSourceCursor has been created (yet hasn't been put into a - // Pipeline) and an exception is thrown, an invariant will trigger in the - // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor. - auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata); - const auto cursorType = deps.hasNoRequirements() - ? DocumentSourceCursor::CursorType::kEmptyDocuments - : DocumentSourceCursor::CursorType::kRegular; - auto attachExecutorCallback = - [cursorType](const CollectionPtr& collection, - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - Pipeline* pipeline) { - auto cursor = DocumentSourceCursor::create( - collection, std::move(exec), pipeline->getContext(), cursorType); - pipeline->addInitialSource(std::move(cursor)); - }; return std::make_pair(std::move(attachExecutorCallback), std::move(exec)); } } diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 37c70d88f70..ba7969550c5 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -38,6 +38,8 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" +#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/plan_executor.h" @@ -169,6 +171,19 @@ private: Pipeline* pipeline); /** + * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a + * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a + * storage engine supplied random cursor if the heuristics used for the optimization allows. If + * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor + * pointer. + */ + static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> + buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, + DocumentSourceInternalUnpackBucket* unpackBucketStage, + const CollectionPtr& collection, + Pipeline* pipeline); + + /** * Creates a PlanExecutor to be used in the initial cursor source. This function will try to * push down the $sort, $project, $match and $limit stages into the PlanStage layer whenever * possible. In this case, these stages will be incorporated into the PlanExecutor. |