summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp161
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h121
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp281
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp178
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h15
6 files changed, 60 insertions, 697 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 226da5788a6..6732fc7e951 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1283,7 +1283,6 @@ env.Library(
'stats/serveronly_stats',
'storage/remove_saver',
'storage/storage_options',
- 'timeseries/timeseries_idl',
'update/update_driver',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index da1279ab92d..b20be4cdd68 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -27,8 +27,6 @@
* it in the license file.
*/
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
@@ -40,12 +38,10 @@
#include "mongo/db/matcher/expression_internal_expr_comparison.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
-#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/logv2/log.h"
namespace mongo {
@@ -81,16 +77,6 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket
}
/**
- * Determine if an arbitrary field should be included in the materialized measurements.
- */
-auto determineIncludeField(StringData fieldName,
- BucketUnpacker::Behavior unpackerBehavior,
- const BucketSpec& bucketSpec) {
- return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
- (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
-}
-
-/**
* A projection can be internalized if every field corresponds to a boolean value. Note that this
* correctly rejects dotted fieldnames, which are mapped to objects internally.
*/
@@ -213,35 +199,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon
}
} // namespace
-// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
-// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
-// the table, this helper returns the measurement count corresponding to the table record.
-// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
-// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
-// pair for the interval is computed and then linear interpolation is used to compute the
-// measurement count corresponding to the 'targetTimestampObjSize' provided.
-int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
- auto currentInterval =
- std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
- std::end(BucketUnpacker::kTimestampObjSizeTable),
- [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
-
- if (currentInterval->second == targetTimestampObjSize) {
- return currentInterval->first;
- }
- // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
- // interval that will cover the object size is the interval before the current one.
- tassert(5422104,
- "currentInterval should not point to the first table entry",
- currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
- --currentInterval;
-
- auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
-
- return currentInterval->first +
- ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
-}
-
void BucketUnpacker::reset(BSONObj&& bucket) {
_fieldIters.clear();
_timeFieldIter = boost::none;
@@ -296,13 +253,11 @@ void BucketUnpacker::reset(BSONObj&& bucket) {
// Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
// _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
- if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end();
+ if ((_unpackerBehavior == Behavior::kInclude) == found) {
_fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}});
}
}
-
- // Save the measurement count for the owned bucket.
- _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
}
void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
@@ -313,7 +268,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior
}
Document BucketUnpacker::getNext() {
- tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
+ invariant(hasNext());
auto measurement = MutableDocument{};
auto&& timeElem = _timeFieldIter->next();
@@ -336,36 +291,6 @@ Document BucketUnpacker::getNext() {
return measurement.freeze();
}
-Document BucketUnpacker::extractSingleMeasurement(int j) {
- tassert(5422101,
- "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
- "or equal to the number of measurements in a bucket",
- j >= 0 && j < _numberOfMeasurements);
-
- auto measurement = MutableDocument{};
-
- auto rowKey = std::to_string(j);
- auto targetIdx = StringData{rowKey};
- auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj();
-
- if (_includeMetaField && !_metaValue.isNull()) {
- measurement.addField(*_spec.metaField, Value{_metaValue});
- }
-
- for (auto&& dataElem : dataRegion) {
- auto colName = dataElem.fieldNameStringData();
- if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
- continue;
- }
- auto value = dataElem[targetIdx];
- if (value) {
- measurement.addField(dataElem.fieldNameStringData(), Value{value});
- }
- }
-
- return measurement.freeze();
-}
-
DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
: DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
@@ -437,8 +362,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField});
}
-void DocumentSourceInternalUnpackBucket::serializeToArray(
- std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+Value DocumentSourceInternalUnpackBucket::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
MutableDocument out;
auto behavior =
_bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude;
@@ -452,84 +377,10 @@ void DocumentSourceInternalUnpackBucket::serializeToArray(
if (spec.metaField) {
out.addField(kMetaFieldName, Value{*spec.metaField});
}
-
- if (!explain) {
- array.push_back(Value(DOC(getSourceName() << out.freeze())));
- if (_sampleSize) {
- auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize);
- sampleSrc->serializeToArray(array);
- }
- } else {
- if (_sampleSize) {
- out.addField("sample", Value{static_cast<long long>(*_sampleSize)});
- out.addField("bucketMaxCount", Value{_bucketMaxCount});
- }
- array.push_back(Value(DOC(getSourceName() << out.freeze())));
- }
-}
-
-DocumentSource::GetNextResult
-DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() {
- const auto kMaxAttempts = 100;
- for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) {
- auto randResult = pSource->getNext();
- switch (randResult.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- auto bucket = randResult.getDocument().toBson();
- _bucketUnpacker.reset(std::move(bucket));
-
- auto& prng = pExpCtx->opCtx->getClient()->getPrng();
- auto j = prng.nextInt64(_bucketMaxCount);
-
- if (j < _bucketUnpacker.numberOfMeasurements()) {
- auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j);
-
- auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName];
- auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
-
- if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
- _nSampledSoFar++;
- return sampledDocument;
- } else {
- LOGV2_DEBUG(
- 5422102,
- 1,
- "$_internalUnpackBucket optimized for sample saw duplicate measurement",
- "measurementIndex"_attr = j,
- "bucketId"_attr = bucketId);
- }
- }
- break;
- }
- case GetNextResult::ReturnStatus::kPauseExecution: {
- // This state should never be reached since the input stage is a random cursor.
- MONGO_UNREACHABLE;
- }
- case GetNextResult::ReturnStatus::kEOF: {
- return randResult;
- }
- }
- }
- uasserted(5422103,
- str::stream()
- << "$_internalUnpackBucket stage could not find a non-duplicate document after "
- << kMaxAttempts
- << " attempts while using a random cursor. This is likely a "
- "sporadic failure, please try again");
+ return Value(DOC(getSourceName() << out.freeze()));
}
DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
- // If the '_sampleSize' member is present, then the stage will produce randomly sampled
- // documents from buckets.
- if (_sampleSize) {
- if (_nSampledSoFar >= _sampleSize) {
- return GetNextResult::makeEOF();
- }
- return sampleUniqueMeasurementFromBuckets();
- }
-
- // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is
- // exhausted.
if (_bucketUnpacker.hasNext()) {
return _bucketUnpacker.getNext();
}
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index c9880317964..18316a1a3e9 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -52,7 +52,6 @@ struct BucketSpec {
std::set<std::string> fieldSet;
};
-
/**
* BucketUnpacker will unpack bucket fields for metadata and the provided fields.
*/
@@ -63,29 +62,6 @@ public:
static constexpr StringData kBucketDataFieldName = "data"_sd;
static constexpr StringData kBucketMetaFieldName = "meta"_sd;
- // A table that is useful for interpolations between the number of measurements in a bucket and
- // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i,
- // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the
- // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are
- // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99],
- // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to
- // exceed the server BSON object limit of 16 MB.
- static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{
- {{0, BSONObj::kMinBSONLength},
- {10, 115},
- {100, 1195},
- {1000, 12895},
- {10000, 138895},
- {100000, 1488895},
- {1000000, 15888895},
- {10000000, 168888895}}};
-
- /**
- * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series
- * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time.
- */
- static int computeMeasurementCount(int targetTimestampObjSize);
-
// When BucketUnpacker is created with kInclude it must produce measurements that contain the
// set of fields. Otherwise, if the kExclude option is used, the measurements will include the
// set difference between all fields in the bucket and the provided fields.
@@ -100,18 +76,7 @@ public:
_includeTimeField(includeTimeField),
_includeMetaField(includeMetaField) {}
- /**
- * This method will continue to materialize Documents until the bucket is exhausted. A
- * precondition of this method is that 'hasNext()' must be true.
- */
Document getNext();
-
- /**
- * This method will extract the j-th measurement from the bucket. A precondition of this method
- * is that j >= 0 && j <= the number of measurements within the underlying bucket.
- */
- Document extractSingleMeasurement(int j);
-
bool hasNext() const {
return _timeFieldIter && _timeFieldIter->more();
}
@@ -141,10 +106,6 @@ public:
return _includeTimeField;
}
- int32_t numberOfMeasurements() const {
- return _numberOfMeasurements;
- }
-
void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
private:
@@ -171,9 +132,6 @@ private:
// Iterators used to unpack the columns of the above bucket that are populated during the reset
// phase according to the provided 'Behavior' and 'BucketSpec'.
std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters;
-
- // The number of measurements in the bucket.
- int32_t _numberOfMeasurements = 0;
};
class DocumentSourceInternalUnpackBucket : public DocumentSource {
@@ -196,17 +154,6 @@ public:
return kStageName.rawData();
}
- void serializeToArray(
- std::vector<Value>& array,
- boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
-
- /**
- * Use 'serializeToArray' above.
- */
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
- MONGO_UNREACHABLE;
- }
-
bool includeMetaField() const {
return _bucketUnpacker.includeMetaField();
}
@@ -227,13 +174,7 @@ public:
ChangeStreamRequirement::kBlacklist};
}
- DepsTracker::State getDependencies(DepsTracker* deps) const final {
- if (_sampleSize) {
- deps->needRandomGenerator = true;
- }
- deps->needWholeDocument = true;
- return DepsTracker::State::EXHAUSTIVE_ALL;
- }
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
@@ -294,69 +235,9 @@ public:
std::unique_ptr<MatchExpression> createPredicatesOnBucketLevelField(
const MatchExpression* matchExpr) const;
- /**
- * Sets the sample size to 'n' and the maximum number of measurements in a bucket to be
- * 'bucketMaxCount'. Calling this method implicitly changes the behavior from having the stage
- * unpack every bucket in a collection to sampling buckets to generate a uniform random sample
- * of size 'n'.
- */
- void setSampleParameters(long long n, int bucketMaxCount) {
- _sampleSize = n;
- _bucketMaxCount = bucketMaxCount;
- }
-
- boost::optional<long long> sampleSize() const {
- return _sampleSize;
- }
-
private:
- /**
- * Carries the bucket _id and index for the measurement that was sampled by
- * 'sampleRandomBucketOptimized'.
- */
- struct SampledMeasurementKey {
- SampledMeasurementKey(OID bucketId, int64_t measurementIndex)
- : bucketId(bucketId), measurementIndex(measurementIndex) {}
-
- bool operator==(const SampledMeasurementKey& key) const {
- return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex;
- }
-
- OID bucketId;
- int32_t measurementIndex;
- };
-
- /**
- * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can
- * be kept track of for de-duplication after sampling.
- */
- struct SampledMeasurementKeyHasher {
- size_t operator()(const SampledMeasurementKey& s) const {
- return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^
- absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^
- absl::Hash<int32_t>{}(s.measurementIndex);
- }
- };
-
- // Tracks which measurements have been seen so far. This is only used when sampling is enabled
- // for the purpose of de-duplicating measurements.
- using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>;
-
GetNextResult doGetNext() final;
- /**
- * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a
- * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum
- * number of tries is exhausted this method will throw.
- */
- GetNextResult sampleUniqueMeasurementFromBuckets();
-
BucketUnpacker _bucketUnpacker;
-
- long long _nSampledSoFar = 0;
- int _bucketMaxCount = 0;
- boost::optional<long long> _sampleSize;
-
- SeenSet _seenSet;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
index 29287bce3b8..ae409de7e69 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2021-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -784,284 +784,5 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter
AssertionException,
5408000);
}
-
-TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) {
- auto expCtx = getExpCtx();
-
- std::set<std::string> fields{
- "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
- auto spec = BucketSpec{
- kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
- auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
-
- auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
- auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
- auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue();
- auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
- << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time"
- << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a"
- << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b"
- << BSON("1" << 1 << "2" << 2)));
-
- unpacker.reset(std::move(bucket));
-
- auto next = unpacker.extractSingleMeasurement(0);
- auto expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(2);
- expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
- {"_id", 3},
- {"time", d3},
- {"a", 3},
- {"b", 2}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(1);
- expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}},
- {"_id", 2},
- {"time", d2},
- {"a", 2},
- {"b", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the middle element again?
- next = unpacker.extractSingleMeasurement(1);
- ASSERT_DOCUMENT_EQ(next, expected);
-}
-
-TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) {
- auto expCtx = getExpCtx();
-
- std::set<std::string> fields{
- "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"};
- auto spec = BucketSpec{
- kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)};
- auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true};
-
- auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue();
- auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue();
- auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data"
- << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time"
- << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1)
- << "b" << BSON("1" << 1)));
-
- unpacker.reset(std::move(bucket));
- auto next = unpacker.extractSingleMeasurement(1);
- auto expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the same element again?
- next = unpacker.extractSingleMeasurement(1);
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(0);
- expected = Document{
- {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}};
- ASSERT_DOCUMENT_EQ(next, expected);
-
- // Can we extract the same element twice in a row?
- next = unpacker.extractSingleMeasurement(0);
- ASSERT_DOCUMENT_EQ(next, expected);
-
- next = unpacker.extractSingleMeasurement(0);
- ASSERT_DOCUMENT_EQ(next, expected);
-}
-
-class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture {
-protected:
- BSONObj makeIncludeAllSpec() {
- return BSON("$_internalUnpackBucket"
- << BSON("include" << BSON_ARRAY("_id"
- << "time" << kUserDefinedMetaName << "a"
- << "b")
- << DocumentSourceInternalUnpackBucket::kTimeFieldName
- << kUserDefinedTimeName
- << DocumentSourceInternalUnpackBucket::kMetaFieldName
- << kUserDefinedMetaName));
- }
-
- boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec,
- long long nSample,
- int bucketMaxCount) {
- auto ds =
- DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
- auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
- unpack->setSampleParameters(nSample, bucketMaxCount);
- return unpack;
- }
-
- boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample,
- int nBuckets,
- int nMeasurements) {
- auto spec = makeIncludeAllSpec();
- generateBuckets(nBuckets, nMeasurements);
- auto ds =
- DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx());
- auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get());
- unpack->setSampleParameters(nSample, 1000);
- return unpack;
- }
-
- boost::intrusive_ptr<DocumentSource> prepareMock() {
- auto mock = DocumentSourceMock::createForTest(getExpCtx());
- for (auto&& b : _buckets) {
- mock->push_back(DocumentSource::GetNextResult{std::move(b)});
- }
- return mock;
- }
-
- Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) {
- auto doc = MutableDocument{};
- for (auto i = 0; i < nMeasurements; ++i) {
- doc.addField(std::to_string(i), gen(i));
- }
- return doc.freeze();
- }
-
- void generateBuckets(int nBuckets, int nMeasurements) {
- auto& prng = getExpCtx()->opCtx->getClient()->getPrng();
- std::vector<Document> buckets;
- for (auto m = 0; m < nBuckets; m++) {
- auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; });
- auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; });
- auto aCol = makeBucketPart(nMeasurements,
- [&](int i) { return Value{prng.nextCanonicalDouble()}; });
- buckets.push_back({Document{
- {"_id", Value{OID::gen()}},
- {"meta", Document{{"m1", m}, {"m2", m + 1}}},
- {"data",
- Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}});
- }
-
- _buckets = std::move(buckets);
- }
-
-private:
- std::vector<Document> _buckets;
-};
-
-TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) {
- auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000);
- auto mock = prepareMock();
- unpack->setSource(mock.get());
-
- auto next = unpack->getNext();
- ASSERT_TRUE(next.isAdvanced());
-
- auto avg = 0.0;
- auto nSampled = 0;
- while (next.isAdvanced()) {
- avg += next.getDocument()["a"].getDouble();
- next = unpack->getNext();
- nSampled++;
- }
- avg /= nSampled;
- ASSERT_EQ(nSampled, 100);
-
- // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12).
- // We will check if the avg is between +/- 2*sqrt(1/12).
- auto stddev = std::sqrt(1.0 / 12.0);
- ASSERT_GT(avg, 0.5 - 2 * stddev);
- ASSERT_LT(avg, 0.5 + 2 * stddev);
-}
-
-TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) {
- auto spec = BSON("$_internalUnpackBucket"
- << BSON("include" << BSON_ARRAY("_id"
- << "time" << kUserDefinedMetaName << "a"
- << "b")
- << DocumentSourceInternalUnpackBucket::kTimeFieldName
- << kUserDefinedTimeName
- << DocumentSourceInternalUnpackBucket::kMetaFieldName
- << kUserDefinedMetaName));
-
- // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1.
- auto unpack = makeUnpackStage(spec, 2, 1);
-
- // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over
- // again until the 'kMaxAttempts' are reached in 'doGetNext'.
- auto mock = DocumentSourceMock::createForTest(getExpCtx());
- for (auto i = 0; i < 101; ++i) {
- mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}},
- {"meta", Document{{"m1", 1}, {"m2", 2}}},
- {"data",
- Document{{"_id", Document{{"0", 1}}},
- {"time", Document{{"0", Date_t::now()}}},
- {"a", Document{{"0", 1}}}}}});
- }
- unpack->setSource(mock.get());
-
- // The sample size is 2 and there's only one unique measurement in the mock. The second
- // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw.
- ASSERT_TRUE(unpack->getNext().isAdvanced());
- ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103);
-}
-
-namespace {
-/**
- * Manually computes the timestamp object size for n timestamps.
- */
-auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) {
- BSONObjBuilder bob;
- for (auto i = 0; i < n; ++i) {
- bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now());
- }
- return bob.done().objsize();
-}
-} // namespace
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) {
- // The last table entry is a sentinel for an upper bound on the interval that covers measurement
- // counts up to 16 MB.
- const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
-
- // Test the case when the target size hits a table entry which represents the lower bound of an
- // interval.
- for (size_t index = 0; index < maxTableEntry; ++index) {
- auto interval = BucketUnpacker::kTimestampObjSizeTable[index];
- ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) {
- const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1;
-
- // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this
- // fact and walk the table backwards to check the correctness of the S_i'th interval's upper
- // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size
- // containing one timestamp with the appropriate rowKey.
- std::pair<int, int> currentInterval;
- auto currentIntervalSize = 0;
- auto currentIntervalCount = 0;
- auto size = 0;
- for (size_t index = maxTableEntry; index > 0; --index) {
- currentInterval = BucketUnpacker::kTimestampObjSizeTable[index];
- currentIntervalSize = currentInterval.second;
- currentIntervalCount = currentInterval.first;
- auto rowKey = currentIntervalCount - 1;
- size = expectedTimestampObjSize(rowKey, 1);
- // We need to add back the kMinBSONLength since it's subtracted out.
- ASSERT_EQ(currentIntervalCount - 1,
- BucketUnpacker::computeMeasurementCount(currentIntervalSize - size +
- BSONObj::kMinBSONLength));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) {
- // Test all values for some of the smaller intervals up to 100 measurements.
- for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) {
- auto size = expectedTimestampObjSize(0, bucketCount);
- ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size));
- }
-}
-
-TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) {
- ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003));
- ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560));
- ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863));
-}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ef095170045..84d4905070c 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -59,7 +59,6 @@
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_geo_near_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
-#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_sample_from_random_cursor.h"
@@ -78,7 +77,6 @@
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
-#include "mongo/db/timeseries/timeseries_gen.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/util/time_support.h"
@@ -93,15 +91,20 @@ using write_ops::Insert;
namespace {
/**
- * Returns a 'PlanExecutor' which uses a random cursor to sample documents if successful as
- * determined by the boolean. Returns {} if the storage engine doesn't support random cursors, or if
- * 'sampleSize' is a large enough percentage of the collection.
+ * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
+ * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
+ * percentage of the collection.
+ *
+ * If needed, adds DocumentSourceSampleFromRandomCursor to the front of the pipeline, replacing the
+ * $sample stage. This is needed if we select an optimized plan for $sample taking advantage of
+ * storage engine support for random cursors.
*/
-StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>>
-createRandomCursorExecutor(const CollectionPtr& coll,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- long long sampleSize,
- long long numRecords) {
+StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor(
+ const CollectionPtr& coll,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long sampleSize,
+ long long numRecords,
+ Pipeline* pipeline) {
OperationContext* opCtx = expCtx->opCtx;
// Verify that we are already under a collection lock. We avoid taking locks ourselves in this
@@ -110,14 +113,14 @@ createRandomCursorExecutor(const CollectionPtr& coll,
static const double kMaxSampleRatioForRandCursor = 0.05;
if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) {
- return std::pair{nullptr, false};
+ return {nullptr};
}
// Attempt to get a random cursor from the RecordStore.
auto rsRandCursor = coll->getRecordStore()->getRandomCursor(opCtx);
if (!rsRandCursor) {
// The storage engine has no random cursor support.
- return std::pair{nullptr, false};
+ return {nullptr};
}
// Build a MultiIteratorStage and pass it the random-sampling RecordCursor.
@@ -166,21 +169,25 @@ createRandomCursorExecutor(const CollectionPtr& coll,
trialStage = static_cast<TrialStage*>(root.get());
}
- auto execStatus = plan_executor_factory::make(expCtx,
- std::move(ws),
- std::move(root),
- &coll,
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- QueryPlannerParams::RETURN_OWNED_DATA);
- if (!execStatus.isOK()) {
- return execStatus.getStatus();
- }
+ auto exec = plan_executor_factory::make(expCtx,
+ std::move(ws),
+ std::move(root),
+ &coll,
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ QueryPlannerParams::RETURN_OWNED_DATA);
// For sharded collections, the root of the plan tree is a TrialStage that may have chosen
// either a random-sampling cursor trial plan or a COLLSCAN backup plan. We can only optimize
// the $sample aggregation stage if the trial plan was chosen.
- return std::pair{std::move(execStatus.getValue()),
- !trialStage || !trialStage->pickedBackupPlan()};
+ if (!trialStage || !trialStage->pickedBackupPlan()) {
+ // Replace $sample stage with $sampleFromRandomCursor stage.
+ pipeline->popFront();
+ std::string idString = coll->ns().isOplog() ? "ts" : "_id";
+ pipeline->addInitialSource(
+ DocumentSourceSampleFromRandomCursor::create(expCtx, sampleSize, idString, numRecords));
+ }
+
+ return exec;
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
@@ -317,106 +324,9 @@ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx,
}
MONGO_UNREACHABLE;
}
-
-/**
- * This attempts to either extract a $sample stage at the front of the pipeline or a
- * $_internalUnpackBucket stage at the front of the pipeline immediately followed by a $sample
- * stage. In the former case a 'nullptr' is returned for the second element of the pair <$sample,
- * $_internalUnpackBucket>, and if the latter case is encountered both elements of the pair will be
- * a populated. If the pipeline doesn't contain a $_internalUnpackBucket at the front of the
- * pipeline immediately followed by a $sample stage, then the first element in the pair will be a
- * 'nullptr'.
- */
-std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSampleUnpackBucket(
- const Pipeline::SourceContainer& sources) {
- DocumentSourceSample* sampleStage = nullptr;
- DocumentSourceInternalUnpackBucket* unpackStage = nullptr;
-
- auto sourcesIt = sources.begin();
- if (sourcesIt != sources.end()) {
- sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get());
- if (sampleStage) {
- return std::pair{sampleStage, unpackStage};
- }
-
- unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get());
- ++sourcesIt;
-
- if (sourcesIt != sources.end()) {
- sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get());
- return std::pair{sampleStage, unpackStage};
- }
- }
-
- return std::pair{sampleStage, unpackStage};
-}
} // namespace
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
-PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage,
- DocumentSourceInternalUnpackBucket* unpackBucketStage,
- const CollectionPtr& collection,
- Pipeline* pipeline) {
- tassert(5422105, "sampleStage cannot be a nullptr", sampleStage);
-
- auto expCtx = pipeline->getContext();
-
- Pipeline::SourceContainer& sources = pipeline->_sources;
-
- const long long sampleSize = sampleStage->getSampleSize();
- const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
- auto&& [exec, isStorageOptimizedSample] =
- uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords));
-
- AttachExecutorCallback attachExecutorCallback;
- if (exec) {
- if (isStorageOptimizedSample) {
- if (!unpackBucketStage) {
- // Replace $sample stage with $sampleFromRandomCursor stage.
- pipeline->popFront();
- std::string idString = collection->ns().isOplog() ? "ts" : "_id";
- pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create(
- expCtx, sampleSize, idString, numRecords));
- } else {
- // If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then
- // 'unpackBucketStage' is at the front of the pipeline immediately followed by a
- // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample.
- unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount);
- sources.erase(std::next(sources.begin()));
-
- // Fix the source for the next stage by pointing it to the $_internalUnpackBucket
- // stage.
- auto sourcesIt = sources.begin();
- if (std::next(sourcesIt) != sources.end()) {
- ++sourcesIt;
- (*sourcesIt)->setSource(unpackBucketStage);
- }
- }
- }
-
- // The order in which we evaluate these arguments is significant. We'd like to be
- // sure that the DocumentSourceCursor is created _last_, because if we run into a
- // case where a DocumentSourceCursor has been created (yet hasn't been put into a
- // Pipeline) and an exception is thrown, an invariant will trigger in the
- // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor.
- auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata);
- const auto cursorType = deps.hasNoRequirements()
- ? DocumentSourceCursor::CursorType::kEmptyDocuments
- : DocumentSourceCursor::CursorType::kRegular;
- attachExecutorCallback =
- [cursorType](const CollectionPtr& collection,
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
- Pipeline* pipeline) {
- auto cursor = DocumentSourceCursor::create(
- collection, std::move(exec), pipeline->getContext(), cursorType);
- pipeline->addInitialSource(std::move(cursor));
- };
- return std::pair(std::move(attachExecutorCallback), std::move(exec));
- }
- return std::pair(std::move(attachExecutorCallback), nullptr);
-}
-
-std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection,
const NamespaceString& nss,
const AggregateCommand* aggRequest,
@@ -431,15 +341,31 @@ PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection,
}
if (!sources.empty()) {
- // Try to inspect if the DocumentSourceSample or a DocumentSourceInternalUnpackBucket stage
- // can be optimized for sampling backed by a storage engine supplied random cursor.
- auto&& [sampleStage, unpackBucketStage] = extractSampleUnpackBucket(sources);
-
+ auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get());
// Optimize an initial $sample stage if possible.
if (collection && sampleStage) {
- auto [attachExecutorCallback, exec] =
- buildInnerQueryExecutorSample(sampleStage, unpackBucketStage, collection, pipeline);
+ const long long sampleSize = sampleStage->getSampleSize();
+ const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx);
+ auto exec = uassertStatusOK(
+ createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords, pipeline));
if (exec) {
+ // The order in which we evaluate these arguments is significant. We'd like to be
+ // sure that the DocumentSourceCursor is created _last_, because if we run into a
+ // case where a DocumentSourceCursor has been created (yet hasn't been put into a
+ // Pipeline) and an exception is thrown, an invariant will trigger in the
+ // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor.
+ auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata);
+ const auto cursorType = deps.hasNoRequirements()
+ ? DocumentSourceCursor::CursorType::kEmptyDocuments
+ : DocumentSourceCursor::CursorType::kRegular;
+ auto attachExecutorCallback =
+ [cursorType](const CollectionPtr& collection,
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
+ Pipeline* pipeline) {
+ auto cursor = DocumentSourceCursor::create(
+ collection, std::move(exec), pipeline->getContext(), cursorType);
+ pipeline->addInitialSource(std::move(cursor));
+ };
return std::make_pair(std::move(attachExecutorCallback), std::move(exec));
}
}
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index ba7969550c5..37c70d88f70 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -38,8 +38,6 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_source_group.h"
-#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
-#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/plan_executor.h"
@@ -171,19 +169,6 @@ private:
Pipeline* pipeline);
/**
- * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a
- * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a
- * storage engine supplied random cursor if the heuristics used for the optimization allows. If
- * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor
- * pointer.
- */
- static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
- buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage,
- DocumentSourceInternalUnpackBucket* unpackBucketStage,
- const CollectionPtr& collection,
- Pipeline* pipeline);
-
- /**
* Creates a PlanExecutor to be used in the initial cursor source. This function will try to
* push down the $sort, $project, $match and $limit stages into the PlanStage layer whenever
* possible. In this case, these stages will be incorporated into the PlanExecutor.