summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Cox <eric.cox@mongodb.com>2021-02-05 17:56:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-19 01:32:47 +0000
commit612a3725d98381bf9c0777bcd6b2169cae33f4d1 (patch)
tree3074ea759aefcd027c24eb83853cec71d314b035
parent8f965fa363779b60173fc9f4459a70d56355a2ea (diff)
downloadmongo-612a3725d98381bf9c0777bcd6b2169cae33f4d1.tar.gz
SERVER-54221 Implement $sample pushdown into $_internalUnpackBucket
-rw-r--r--jstests/noPassthrough/timeseries_sample.js122
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp161
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h121
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp281
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp178
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h15
7 files changed, 819 insertions, 60 deletions
diff --git a/jstests/noPassthrough/timeseries_sample.js b/jstests/noPassthrough/timeseries_sample.js
new file mode 100644
index 00000000000..d38af3559d3
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_sample.js
@@ -0,0 +1,122 @@
+/**
+ * Tests inserting sample data into the time-series buckets collection. This test is for the
+ * exercising the optimized $sample implementation for $_internalUnpackBucket.
+ * @tags: [
+ * sbe_incompatible,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js");
+load("jstests/libs/analyze_plan.js");
+
+const conn = MongoRunner.runMongod({setParameter: {timeseriesBucketMaxCount: 2}});
+
+// Although this test is tagged with 'requires_wiredtiger', this is not sufficient for ensuring
+// that the parallel suite runs this test only on WT configurations.
+if (jsTest.options().storageEngine && jsTest.options().storageEngine !== "wiredTiger") {
+ jsTest.log("Skipping test on non-WT storage engine: " + jsTest.options().storageEngine);
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+const dbName = jsTestName();
+const testDB = conn.getDB(dbName);
+assert.commandWorked(testDB.dropDatabase());
+
+if (!TimeseriesTest.timeseriesCollectionsEnabled(testDB.getMongo())) {
+ jsTestLog("Skipping test because the time-series collection feature flag is disabled");
+ MongoRunner.stopMongod(conn);
+ return;
+}
+
+// In order to trigger the optimized sample path we need at least 100 buckets in the bucket
+// collection.
+const nBuckets = 101;
+let bucketMaxCount = 2;
+let numDocs = nBuckets * bucketMaxCount;
+
+const coll = testDB.getCollection('timeseries_sample');
+const bucketsColl = testDB.getCollection("system.buckets." + coll.getName());
+
+coll.drop();
+
+const timeFieldName = "time";
+const metaFieldName = "m";
+assert.commandWorked(testDB.createCollection(
+ coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+
+assert.contains(bucketsColl.getName(), testDB.getCollectionNames());
+
+for (let i = 0; i < numDocs; i++) {
+ let id = ObjectId();
+ assert.commandWorked(
+ coll.insert({_id: id, [timeFieldName]: ISODate(), [metaFieldName]: i % nBuckets, x: i}),
+ "failed to insert doc: " + id);
+}
+
+let buckets = bucketsColl.find().toArray();
+assert.eq(nBuckets, buckets.length, buckets);
+
+let assertUniqueDocuments = function(docs) {
+ let seen = new Set();
+ docs.forEach(doc => {
+ assert.eq(seen.has(doc._id), false);
+ seen.add(doc._id);
+ });
+};
+
+// Check the time-series view to make sure we have the correct number of docs and that there are no
+// duplicates after sampling.
+const viewDocs = coll.find({}, {x: 1}).toArray();
+assert.eq(numDocs, viewDocs.length, viewDocs);
+
+let sampleSize = 5;
+let result = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
+assert.eq(sampleSize, result.length, result);
+assertUniqueDocuments(result);
+
+// Check that we have absorbed $sample into $_internalUnpackBucket.
+const optimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]);
+let bucketStage = getAggPlanStage(optimizedSamplePlan, "$_internalUnpackBucket");
+assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], sampleSize);
+assert(!aggPlanHasStage(optimizedSamplePlan, "$sample"));
+
+// Run an agg pipeline with optimization disabled.
+result = coll.aggregate([{$_internalInhibitOptimization: {}}, {$sample: {size: 1}}]).toArray();
+assert.eq(1, result.length, result);
+
+// Check that $sample hasn't been absorbed by $_internalUnpackBucket.
+sampleSize = 100;
+const unoptimizedSamplePlan = coll.explain().aggregate([{$sample: {size: sampleSize}}]);
+bucketStage = getAggPlanStage(unoptimizedSamplePlan, "$_internalUnpackBucket");
+assert.eq(bucketStage["$_internalUnpackBucket"]["sample"], undefined);
+assert(aggPlanHasStage(unoptimizedSamplePlan, "$sample"));
+
+const unoptimizedResult = coll.aggregate([{$sample: {size: sampleSize}}]).toArray();
+assertUniqueDocuments(unoptimizedResult);
+
+// Check that a sampleSize greater than the number of measurements doesn't cause an infinte loop.
+result = coll.aggregate([{$sample: {size: numDocs + 1}}]).toArray();
+assert.eq(numDocs, result.length, result);
+
+// Check that $lookup against a time-series collection doesn't cache inner pipeline results if it
+// contains a $sample stage.
+result =
+ coll.aggregate({$lookup: {from: coll.getName(), as: "docs", pipeline: [{$sample: {size: 1}}]}})
+ .toArray();
+
+// Each subquery should be an independent sample by checking that we didn't sample the same document
+// repeatedly. It's sufficient for now to make sure that the seen set contains at least two distinct
+// samples.
+let seen = new Set();
+result.forEach(r => {
+ assert.eq(r.docs.length, 1);
+ seen.add(r.docs[0]._id);
+});
+assert.gte(seen.size, 2);
+
+MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 865b287cda0..70ef8c1d168 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1284,6 +1284,7 @@ env.Library(
'stats/serveronly_stats',
'storage/remove_saver',
'storage/storage_options',
+ 'timeseries/timeseries_idl',
'update/update_driver',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index bb156781132..d17840698d9 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
@@ -38,10 +40,12 @@
#include "mongo/db/matcher/expression_internal_expr_comparison.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/logv2/log.h"
namespace mongo {
@@ -77,6 +81,16 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket
}
/**
+ * Determine if an arbitrary field should be included in the materialized measurements.
+ */
+auto determineIncludeField(StringData fieldName,
+ BucketUnpacker::Behavior unpackerBehavior,
+ const BucketSpec& bucketSpec) {
+ return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
+}
+
+/**
* A projection can be internalized if every field corresponds to a boolean value. Note that this
* correctly rejects dotted fieldnames, which are mapped to objects internally.
*/
@@ -199,6 +213,35 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon
}
} // namespace
+// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
+// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
+// the table, this helper returns the measurement count corresponding to the table record.
+// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
+// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
+// pair for the interval is computed and then linear interpolation is used to compute the
+// measurement count corresponding to the 'targetTimestampObjSize' provided.
+int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
+ auto currentInterval =
+ std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
+ std::end(BucketUnpacker::kTimestampObjSizeTable),
+ [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
+
+ if (currentInterval->second == targetTimestampObjSize) {
+ return currentInterval->first;
+ }
+ // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
+ // interval that will cover the object size is the interval before the current one.
+ tassert(5422104,
+ "currentInterval should not point to the first table entry",
+ currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
+ --currentInterval;
+
+ auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
+
+ return currentInterval->first +
+ ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
+}
+
void BucketUnpacker::reset(BSONObj&& bucket) {
_fieldIters.clear();
_timeFieldIter = boost::none;
@@ -253,11 +296,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) {
// Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
// _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
- auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end();
- if ((_unpackerBehavior == Behavior::kInclude) == found) {
+ if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
_fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}});
}
}
+
+ // Save the measurement count for the owned bucket.
+ _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
}
void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
@@ -268,7 +313,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior
}
Document BucketUnpacker::getNext() {
- invariant(hasNext());
+ tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
auto measurement = MutableDocument{};
auto&& timeElem = _timeFieldIter->next();
@@ -292,6 +337,36 @@ Document BucketUnpacker::getNext() {
return measurement.freeze();
}
+Document BucketUnpacker::extractSingleMeasurement(int j) {
+ tassert(5422101,
+ "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
+ "or equal to the number of measurements in a bucket",
+ j >= 0 && j < _numberOfMeasurements);
+
+ auto measurement = MutableDocument{};
+
+ auto rowKey = std::to_string(j);
+ auto targetIdx = StringData{rowKey};
+ auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj();
+
+ if (_includeMetaField && !_metaValue.isNull()) {
+ measurement.addField(*_spec.metaField, Value{_metaValue});
+ }
+
+ for (auto&& dataElem : dataRegion) {
+ auto colName = dataElem.fieldNameStringData();
+ if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ continue;
+ }
+ auto value = dataElem[targetIdx];
+ if (value) {
+ measurement.addField(dataElem.fieldNameStringData(), Value{value});
+ }
+ }
+
+ return measurement.freeze();
+}
+
DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
: DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
@@ -363,8 +438,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField});
}
-Value DocumentSourceInternalUnpackBucket::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
+void DocumentSourceInternalUnpackBucket::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
MutableDocument out;
auto behavior =
_bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude;
@@ -378,10 +453,84 @@ Value DocumentSourceInternalUnpackBucket::serialize(
if (spec.metaField) {
out.addField(kMetaFieldName, Value{*spec.metaField});
}
- return Value(DOC(getSourceName() << out.freeze()));
+
+ if (!explain) {
+ array.push_back(Value(DOC(getSourceName() << out.freeze())));
+ if (_sampleSize) {
+ auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize);
+ sampleSrc->serializeToArray(array);
+ }
+ } else {
+ if (_sampleSize) {
+ out.addField("sample", Value{static_cast<long long>(*_sampleSize)});
+ out.addField("bucketMaxCount", Value{_bucketMaxCount});
+ }
+ array.push_back(Value(DOC(getSourceName() << out.freeze())));
+ }
+}
+
+DocumentSource::GetNextResult
+DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() {
+ const auto kMaxAttempts = 100;
+ for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) {
+ auto randResult = pSource->getNext();
+ switch (randResult.getStatus()) {
+ case GetNextResult::ReturnStatus::kAdvanced: {
+ auto bucket = randResult.getDocument().toBson();
+ _bucketUnpacker.reset(std::move(bucket));
+
+ auto& prng = pExpCtx->opCtx->getClient()->getPrng();
+ auto j = prng.nextInt64(_bucketMaxCount);
+
+ if (j < _bucketUnpacker.numberOfMeasurements()) {
+ auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j);
+
+ auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName];
+ auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
+
+ if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
+ _nSampledSoFar++;
+ return sampledDocument;
+ } else {
+ LOGV2_DEBUG(
+ 5422102,
+ 1,
+ "$_internalUnpackBucket optimized for sample saw duplicate measurement",
+ "measurementIndex"_attr = j,
+ "bucketId"_attr = bucketId);
+ }
+ }
+ break;
+ }
+ case GetNextResult::ReturnStatus::kPauseExecution: {
+ // This state should never be reached since the input stage is a random cursor.
+ MONGO_UNREACHABLE;
+ }
+ case GetNextResult::ReturnStatus::kEOF: {
+ return randResult;
+ }
+ }
+ }
+ uasserted(5422103,
+ str::stream()
+ << "$_internalUnpackBucket stage could not find a non-duplicate document after "
+ << kMaxAttempts
+ << " attempts while using a random cursor. This is likely a "
+ "sporadic failure, please try again");
}
DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
+ // If the '_sampleSize' member is present, then the stage will produce randomly sampled
+ // documents from buckets.
+ if (_sampleSize) {
+ if (_nSampledSoFar >= _sampleSize) {
+ return GetNextResult::makeEOF();
+ }
+ return sampleUniqueMeasurementFromBuckets();
+ }
+
+ // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is
+ // exhausted.
if (_bucketUnpacker.hasNext()) {
return _bucketUnpacker.getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 18316a1a3e9..c9880317964 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -52,6 +52,7 @@ struct BucketSpec {
std::set<std::string> fieldSet;
};
+
/**
* BucketUnpacker will unpack bucket fields for metadata and the provided fields.
*/
@@ -62,6 +63,29 @@ public:
static constexpr StringData kBucketDataFieldName = "data"_sd;
static constexpr StringData kBucketMetaFieldName = "meta"_sd;
+ // A table that is useful for interpolations between the number of measurements in a bucket and
+ // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i,
+ // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the
+ // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are
+ // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99],
+ // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to
+ // exceed the server BSON object limit of 16 MB.
+ static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{
+ {{0, BSONObj::kMinBSONLength},
+ {10, 115},
+ {100, 1195},
+ {1000, 12895},
+ {10000, 138895},
+ {100000, 1488895},
+ {1000000, 15888895},
+ {10000000, 168888895}}};
+
+ /**
+ * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series
+ * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time.
+ */
+ static int computeMeasurementCount(int targetTimestampObjSize);
+
// When BucketUnpacker is created with kInclude it must produce measurements that contain the
// set of fields. Otherwise, if the kExclude option is used, the measurements will include the
// set difference between all fields in the bucket and the provided fields.
@@ -76,7 +100,18 @@ public:
_includeTimeField(includeTimeField),
_includeMetaField(includeMetaField) {}
+ /**
+ * This method will continue to materialize Documents until the bucket is exhausted. A
+ * precondition of this method is that 'hasNext()' must be true.
+ */
Document getNext();
+
+ /**
+ * This method will extract the j-th measurement from the bucket. A precondition of this method
+ * is that j >= 0 && j <= the number of measurements within the underlying bucket.
+ */
+ Document extractSingleMeasurement(int j);
+
bool hasNext() const {
return _timeFieldIter && _timeFieldIter->more();
}
@@ -106,6 +141,10 @@ public:
return _includeTimeField;
}
+ int32_t numberOfMeasurements() const {
+ return _numberOfMeasurements;
+ }
+
void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
private:
@@ -132,6 +171,9 @@ private:
// Iterators used to unpack the columns of the above bucket that are populated during the reset
// phase according to the provided 'Behavior' and 'BucketSpec'.
std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters;
+
+ // The number of measurements in the bucket.
+ int32_t _numberOfMeasurements = 0;
};
class DocumentSourceInternalUnpackBucket : public DocumentSource {
@@ -154,6 +196,17 @@ public:
return kStageName.rawData();
}
+ void serializeToArray(
+ std::vector<Value>& array,
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ /**
+ * Use 'serializeToArray' above.
+ */
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ MONGO_UNREACHABLE;
+ }
+
bool includeMetaField() const {
return _bucketUnpacker.includeMetaField();
}
@@ -174,7 +227,13 @@ public:
ChangeStreamRequirement::kBlacklist};
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ DepsTracker::State getDependencies(DepsTracker* deps) const final {
+ if (_sampleSize) {
+ deps->needRandomGenerator = true;
+ }
+ deps->needWholeDocument = true;
+ return DepsTracker::State::EXHAUSTIVE_ALL;
+ }
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
@@ -235,9 +294,69 @@ public:
std::unique_ptr<MatchExpression> createPredicatesOnBucketLevelField(
const MatchExpression* matchExpr) const;
+ /**
+ * Sets the sample size to 'n' and the maximum number of measurements in a bucket to be
+ * 'bucketMaxCount'. Calling this method implicitly changes the behavior from having the stage
+ * unpack every bucket in a collection to sampling buckets to generate a uniform random sample
+ * of size 'n'.
+ */
+ void setSampleParameters(long long n, int bucketMaxCount) {
+ _sampleSize = n;
+ _bucketMaxCount = bucketMaxCount;
+ }
+
+ boost::optional<long long> sampleSize() const {
+ return _sampleSize;
+ }
+
private:
+ /**
+ * Carries the bucket _id and index for the measurement that was sampled by
+ * 'sampleRandomBucketOptimized'.
+ */
+ struct SampledMeasurementKey {
+ SampledMeasurementKey(OID bucketId, int64_t measurementIndex)
+ : bucketId(bucketId), measurementIndex(measurementIndex) {}
+
+ bool operator==(const SampledMeasurementKey& key) const {
+ return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex;
+ }
+
+ OID bucketId;
+ int32_t measurementIndex;
+ };
+
+ /**
+ * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can
+ * be kept track of for de-duplication after sampling.
+ */
+ struct SampledMeasurementKeyHasher {
+ size_t operator()(const SampledMeasurementKey& s) const {
+ return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^
+ absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^
+ absl::Hash<int32_t>{}(s.measurementIndex);
+ }
+ };
+
+ // Tracks which measurements have been seen so far. This is only used when sampling is enabled
+ // for the purpose of de-duplicating measurements.
+ using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>;
+
GetNextResult doGetNext() final;
+ /**
+ * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a
+ * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum
+ * number of tries is exhausted this method will throw.
+ */
+ GetNextResult sampleUniqueMeasurementFromBuckets();
+
BucketUnpacker _bucketUnpacker;
+
+ long long _nSampledSoFar = 0;
+ int _bucketMaxCount = 0;
+ boost::optional<long long> _sampleSize;
+
+ SeenSet _seenSet;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
index fcc63c7aaaa..5d8d89b334e 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2020-present MongoDB, Inc.
+ * Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -830,5 +830,284 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter
AssertionException,
5408000);
}
+
+TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) {
+ auto expCtx = getExpCtx();
+
+ std::set<std::string> fields{
+ "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
+
+ auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
+ auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
+ auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue();
+ auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
+ << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time"
+ << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a"
+ << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b"
+ << BSON("1" << 1 << "2" << 2)));
+
+ unpacker.reset(std::move(bucket));
+
+ auto next = unpacker.extractSingleMeasurement(0);
+ auto expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(2);
+ expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
+ {"_id", 3},
+ {"time", d3},
+ {"a", 3},
+ {"b", 2}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(1);
+ expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
+ {"_id", 2},
+ {"time", d2},
+ {"a", 2},
+ {"b", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the middle element again?
+ next = unpacker.extractSingleMeasurement(1);
+ ASSERT_DOCUMENT_EQ(next, expected);
+}
+
+TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) {
+ auto expCtx = getExpCtx();
+
+ std::set<std::string> fields{
+ "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
+ auto spec = BucketSpec{
+ kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
+ auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
+
+ auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
+ auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
+ auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
+ << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time"
+ << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1)
+ << "b" << BSON("1" << 1)));
+
+ unpacker.reset(std::move(bucket));
+ auto next = unpacker.extractSingleMeasurement(1);
+ auto expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the same element again?
+ next = unpacker.extractSingleMeasurement(1);
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(0);
+ expected = Document{
+ {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ // Can we extract the same element twice in a row?
+ next = unpacker.extractSingleMeasurement(0);
+ ASSERT_DOCUMENT_EQ(next, expected);
+
+ next = unpacker.extractSingleMeasurement(0);
+ ASSERT_DOCUMENT_EQ(next, expected);
+}
+
+class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture {
+protected:
+ BSONObj makeIncludeAllSpec() {
+ return BSON("$_internalUnpackBucket"
+ << BSON("include" << BSON_ARRAY("_id"
+ << "time" << kUserDefinedMetaName << "a"
+ << "b")
+ << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ }
+
+ boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec,
+ long long nSample,
+ int bucketMaxCount) {
+ auto ds =
+ DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
+ auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
+ unpack->setSampleParameters(nSample, bucketMaxCount);
+ return unpack;
+ }
+
+ boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample,
+ int nBuckets,
+ int nMeasurements) {
+ auto spec = makeIncludeAllSpec();
+ generateBuckets(nBuckets, nMeasurements);
+ auto ds =
+ DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
+ auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
+ unpack->setSampleParameters(nSample, 1000);
+ return unpack;
+ }
+
+ boost::intrusive_ptr<DocumentSource> prepareMock() {
+ auto mock = DocumentSourceMock::createForTest(getExpCtx());
+ for (auto&& b : _buckets) {
+ mock->push_back(DocumentSource::GetNextResult{std::move(b)});
+ }
+ return mock;
+ }
+
+ Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) {
+ auto doc = MutableDocument{};
+ for (auto i = 0; i < nMeasurements; ++i) {
+ doc.addField(std::to_string(i), gen(i));
+ }
+ return doc.freeze();
+ }
+
+ void generateBuckets(int nBuckets, int nMeasurements) {
+ auto& prng = getExpCtx()->opCtx->getClient()->getPrng();
+ std::vector<Document> buckets;
+ for (auto m = 0; m < nBuckets; m++) {
+ auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; });
+ auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; });
+ auto aCol = makeBucketPart(nMeasurements,
+ [&](int i) { return Value{prng.nextCanonicalDouble()}; });
+ buckets.push_back({Document{
+ {"_id", Value{OID::gen()}},
+ {"meta", Document{{"m1", m}, {"m2", m + 1}}},
+ {"data",
+ Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}});
+ }
+
+ _buckets = std::move(buckets);
+ }
+
+private:
+ std::vector<Document> _buckets;
+};
+
+TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) {
+ auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000);
+ auto mock = prepareMock();
+ unpack->setSource(mock.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+
+ auto avg = 0.0;
+ auto nSampled = 0;
+ while (next.isAdvanced()) {
+ avg += next.getDocument()["a"].getDouble();
+ next = unpack->getNext();
+ nSampled++;
+ }
+ avg /= nSampled;
+ ASSERT_EQ(nSampled, 100);
+
+ // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12).
+ // We will check if the avg is between +/- 2*sqrt(1/12).
+ auto stddev = std::sqrt(1.0 / 12.0);
+ ASSERT_GT(avg, 0.5 - 2 * stddev);
+ ASSERT_LT(avg, 0.5 + 2 * stddev);
+}
+
+TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) {
+ auto spec = BSON("$_internalUnpackBucket"
+ << BSON("include" << BSON_ARRAY("_id"
+ << "time" << kUserDefinedMetaName << "a"
+ << "b")
+ << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+
+ // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1.
+ auto unpack = makeUnpackStage(spec, 2, 1);
+
+ // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over
+ // again until the 'kMaxAttempts' are reached in 'doGetNext'.
+ auto mock = DocumentSourceMock::createForTest(getExpCtx());
+ for (auto i = 0; i < 101; ++i) {
+ mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}},
+ {"meta", Document{{"m1", 1}, {"m2", 2}}},
+ {"data",
+ Document{{"_id", Document{{"0", 1}}},
+ {"time", Document{{"0", Date_t::now()}}},
+ {"a", Document{{"0", 1}}}}}});
+ }
+ unpack->setSource(mock.get());
+
+ // The sample size is 2 and there's only one unique measurement in the mock. The second
+ // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw.
+ ASSERT_TRUE(unpack->getNext().isAdvanced());
+ ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103);
+}
+
+namespace {
+/**
+ * Manually computes the timestamp object size for n timestamps.
+ */
+auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) {
+ BSONObjBuilder bob;
+ for (auto i = 0; i < n; ++i) {
+ bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now());
+ }
+ return bob.done().objsize();
+}
+} // namespace
+
+TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) {
+ // The last table entry is a sentinel for an upper bound on the interval that covers measurement
+ // counts up to 16 MB.
+ const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
+
+ // Test the case when the target size hits a table entry which represents the lower bound of an
+ // interval.
+ for (size_t index = 0; index < maxTableEntry; ++index) {
+ auto interval = BucketUnpacker::kTimestampObjSizeTable[index];
+ ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second));
+ }
+}
+
+TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) {
+ const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
+
+ // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this
+ // fact and walk the table backwards to check the correctness of the S_i'th interval's upper
+ // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size
+ // containing one timestamp with the appropriate rowKey.
+ std::pair<int, int> currentInterval;
+ auto currentIntervalSize = 0;
+ auto currentIntervalCount = 0;
+ auto size = 0;
+ for (size_t index = maxTableEntry; index > 0; --index) {
+ currentInterval = BucketUnpacker::kTimestampObjSizeTable[index];
+ currentIntervalSize = currentInterval.second;
+ currentIntervalCount = currentInterval.first;
+ auto rowKey = currentIntervalCount - 1;
+ size = expectedTimestampObjSize(rowKey, 1);
+ // We need to add back the kMinBSONLength since it's subtracted out.
+ ASSERT_EQ(currentIntervalCount - 1,
+ BucketUnpacker::computeMeasurementCount(currentIntervalSize - size +
+ BSONObj::kMinBSONLength));
+ }
+}
+
+TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) {
+ // Test all values for some of the smaller intervals up to 100 measurements.
+ for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) {
+ auto size = expectedTimestampObjSize(0, bucketCount);
+ ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size));
+ }
+}
+
+TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) {
+ ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003));
+ ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560));
+ ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863));
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 84d4905070c..3c964246093 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_geo_near_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
@@ -77,6 +78,7 @@
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
+#include "mongo/db/timeseries/timeseries_gen.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/util/time_support.h"
@@ -91,20 +93,15 @@ using write_ops::Insert;
namespace {
/**
- * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
- * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
- * percentage of the collection.
- *
- * If needed, adds DocumentSourceSampleFromRandomCursor to the front of the pipeline, replacing the
- * $sample stage. This is needed if we select an optimized plan for $sample taking advantage of
- * storage engine support for random cursors.
+ * Returns a 'PlanExecutor' which uses a random cursor to sample documents if successful as
+ * determined by the boolean. Returns {} if the storage engine doesn't support random cursors, or if
+ * 'sampleSize' is a large enough percentage of the collection.
*/
-StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor(
- const CollectionPtr& coll,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- long long sampleSize,
- long long numRecords,
- Pipeline* pipeline) {
+StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>>
+createRandomCursorExecutor(const CollectionPtr& coll,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long sampleSize,
+ long long numRecords) {
OperationContext* opCtx = expCtx->opCtx;
// Verify that we are already under a collection lock. We avoid taking locks ourselves in this
@@ -113,14 +110,14 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
static const double kMaxSampleRatioForRandCursor = 0.05;
if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
- return {nullptr};
+ return std::pair{nullptr, false};
}
// Attempt to get a random cursor from the RecordStore.
auto rsRandCursor = coll->getRecordStore()->getRandomCursor(opCtx);
if (!rsRandCursor) {
// The storage engine has no random cursor support.
- return {nullptr};
+ return std::pair{nullptr, false};
}
// Build a MultiIteratorStage and pass it the random-sampling RecordCursor.
@@ -169,25 +166,21 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
trialStage = static_cast<TrialStage*>(root.get());
}
- auto exec = plan_executor_factory::make(expCtx,
- std::move(ws),
- std::move(root),
- &coll,
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- QueryPlannerParams::RETURN_OWNED_DATA);
+ auto execStatus = plan_executor_factory::make(expCtx,
+ std::move(ws),
+ std::move(root),
+ &coll,
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ QueryPlannerParams::RETURN_OWNED_DATA);
+ if (!execStatus.isOK()) {
+ return execStatus.getStatus();
+ }
// For sharded collections, the root of the plan tree is a TrialStage that may have chosen
// either a random-sampling cursor trial plan or a COLLSCAN backup plan. We can only optimize
// the $sample aggregation stage if the trial plan was chosen.
- if (!trialStage || !trialStage->pickedBackupPlan()) {
- // Replace $sample stage with $sampleFromRandomCursor stage.
- pipeline->popFront();
- std::string idString = coll->ns().isOplog() ? "ts" : "_id";
- pipeline->addInitialSource(
- DocumentSourceSampleFromRandomCursor::create(expCtx, sampleSize, idString, numRecords));
- }
-
- return exec;
+ return std::pair{std::move(execStatus.getValue()),
+ !trialStage || !trialStage->pickedBackupPlan()};
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
@@ -324,9 +317,106 @@ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx,
}
MONGO_UNREACHABLE;
}
+
+/**
+ * This attempts to either extract a $sample stage at the front of the pipeline or a
+ * $_internalUnpackBucket stage at the front of the pipeline immediately followed by a $sample
+ * stage. In the former case a 'nullptr' is returned for the second element of the pair <$sample,
+ * $_internalUnpackBucket>, and if the latter case is encountered both elements of the pair will be
+ * a populated. If the pipeline doesn't contain a $_internalUnpackBucket at the front of the
+ * pipeline immediately followed by a $sample stage, then the first element in the pair will be a
+ * 'nullptr'.
+ */
+std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSampleUnpackBucket(
+ const Pipeline::SourceContainer& sources) {
+ DocumentSourceSample* sampleStage = nullptr;
+ DocumentSourceInternalUnpackBucket* unpackStage = nullptr;
+
+ auto sourcesIt = sources.begin();
+ if (sourcesIt != sources.end()) {
+ sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get());
+ if (sampleStage) {
+ return std::pair{sampleStage, unpackStage};
+ }
+
+ unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get());
+ ++sourcesIt;
+
+ if (unpackStage && sourcesIt != sources.end()) {
+ sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get());
+ return std::pair{sampleStage, unpackStage};
+ }
+ }
+
+ return std::pair{sampleStage, unpackStage};
+}
} // namespace
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
+PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage,
+ DocumentSourceInternalUnpackBucket* unpackBucketStage,
+ const CollectionPtr& collection,
+ Pipeline* pipeline) {
+ tassert(5422105, "sampleStage cannot be a nullptr", sampleStage);
+
+ auto expCtx = pipeline->getContext();
+
+ Pipeline::SourceContainer& sources = pipeline->_sources;
+
+ const long long sampleSize = sampleStage->getSampleSize();
+ const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
+ auto&& [exec, isStorageOptimizedSample] =
+ uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords));
+
+ AttachExecutorCallback attachExecutorCallback;
+ if (exec) {
+ if (isStorageOptimizedSample) {
+ if (!unpackBucketStage) {
+ // Replace $sample stage with $sampleFromRandomCursor stage.
+ pipeline->popFront();
+ std::string idString = collection->ns().isOplog() ? "ts" : "_id";
+ pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create(
+ expCtx, sampleSize, idString, numRecords));
+ } else {
+ // If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then
+ // 'unpackBucketStage' is at the front of the pipeline immediately followed by a
+ // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample.
+ unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount);
+ sources.erase(std::next(sources.begin()));
+
+ // Fix the source for the next stage by pointing it to the $_internalUnpackBucket
+ // stage.
+ auto sourcesIt = sources.begin();
+ if (std::next(sourcesIt) != sources.end()) {
+ ++sourcesIt;
+ (*sourcesIt)->setSource(unpackBucketStage);
+ }
+ }
+ }
+
+ // The order in which we evaluate these arguments is significant. We'd like to be
+ // sure that the DocumentSourceCursor is created _last_, because if we run into a
+ // case where a DocumentSourceCursor has been created (yet hasn't been put into a
+ // Pipeline) and an exception is thrown, an invariant will trigger in the
+ // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor.
+ auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata);
+ const auto cursorType = deps.hasNoRequirements()
+ ? DocumentSourceCursor::CursorType::kEmptyDocuments
+ : DocumentSourceCursor::CursorType::kRegular;
+ attachExecutorCallback =
+ [cursorType](const CollectionPtr& collection,
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
+ Pipeline* pipeline) {
+ auto cursor = DocumentSourceCursor::create(
+ collection, std::move(exec), pipeline->getContext(), cursorType);
+ pipeline->addInitialSource(std::move(cursor));
+ };
+ return std::pair(std::move(attachExecutorCallback), std::move(exec));
+ }
+ return std::pair(std::move(attachExecutorCallback), nullptr);
+}
+
+std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection,
const NamespaceString& nss,
const AggregateCommand* aggRequest,
@@ -341,31 +431,15 @@ PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection,
}
if (!sources.empty()) {
- auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get());
+ // Try to inspect if the DocumentSourceSample or a DocumentSourceInternalUnpackBucket stage
+ // can be optimized for sampling backed by a storage engine supplied random cursor.
+ auto&& [sampleStage, unpackBucketStage] = extractSampleUnpackBucket(sources);
+
// Optimize an initial $sample stage if possible.
if (collection && sampleStage) {
- const long long sampleSize = sampleStage->getSampleSize();
- const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
- auto exec = uassertStatusOK(
- createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords, pipeline));
+ auto [attachExecutorCallback, exec] =
+ buildInnerQueryExecutorSample(sampleStage, unpackBucketStage, collection, pipeline);
if (exec) {
- // The order in which we evaluate these arguments is significant. We'd like to be
- // sure that the DocumentSourceCursor is created _last_, because if we run into a
- // case where a DocumentSourceCursor has been created (yet hasn't been put into a
- // Pipeline) and an exception is thrown, an invariant will trigger in the
- // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor.
- auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata);
- const auto cursorType = deps.hasNoRequirements()
- ? DocumentSourceCursor::CursorType::kEmptyDocuments
- : DocumentSourceCursor::CursorType::kRegular;
- auto attachExecutorCallback =
- [cursorType](const CollectionPtr& collection,
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- Pipeline* pipeline) {
- auto cursor = DocumentSourceCursor::create(
- collection, std::move(exec), pipeline->getContext(), cursorType);
- pipeline->addInitialSource(std::move(cursor));
- };
return std::make_pair(std::move(attachExecutorCallback), std::move(exec));
}
}
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 37c70d88f70..ba7969550c5 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -38,6 +38,8 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/plan_executor.h"
@@ -169,6 +171,19 @@ private:
Pipeline* pipeline);
/**
+ * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a
+ * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a
+ * storage engine supplied random cursor if the heuristics used for the optimization allows. If
+ * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor
+ * pointer.
+ */
+ static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
+ buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage,
+ DocumentSourceInternalUnpackBucket* unpackBucketStage,
+ const CollectionPtr& collection,
+ Pipeline* pipeline);
+
+ /**
* Creates a PlanExecutor to be used in the initial cursor source. This function will try to
* push down the $sort, $project, $match and $limit stages into the PlanStage layer whenever
* possible. In this case, these stages will be incorporated into the PlanExecutor.