diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 161 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 121 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp | 281 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 178 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 15 |
6 files changed, 60 insertions, 697 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 226da5788a6..6732fc7e951 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1283,7 +1283,6 @@ env.Library( 'stats/serveronly_stats', 'storage/remove_saver', 'storage/storage_options', - 'timeseries/timeseries_idl', 'update/update_driver', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index da1279ab92d..b20be4cdd68 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -40,12 +38,10 @@ #include "mongo/db/matcher/expression_internal_expr_comparison.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/logv2/log.h" namespace mongo { @@ -81,16 +77,6 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket } /** - * Determine if an arbitrary field should be included in the materialized measurements. - */ -auto determineIncludeField(StringData fieldName, - BucketUnpacker::Behavior unpackerBehavior, - const BucketSpec& bucketSpec) { - return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == - (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); -} - -/** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ @@ -213,35 +199,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon } } // namespace -// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the -// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in -// the table, this helper returns the measurement count corresponding to the table record. -// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} -// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the -// pair for the interval is computed and then linear interpolation is used to compute the -// measurement count corresponding to the 'targetTimestampObjSize' provided. -int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { - auto currentInterval = - std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), - std::end(BucketUnpacker::kTimestampObjSizeTable), - [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); - - if (currentInterval->second == targetTimestampObjSize) { - return currentInterval->first; - } - // This points to the first interval larger than the target 'targetTimestampObjSize', the actual - // interval that will cover the object size is the interval before the current one. - tassert(5422104, - "currentInterval should not point to the first table entry", - currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); - --currentInterval; - - auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); - - return currentInterval->first + - ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); -} - void BucketUnpacker::reset(BSONObj&& bucket) { _fieldIters.clear(); _timeFieldIter = boost::none; @@ -296,13 +253,11 @@ void BucketUnpacker::reset(BSONObj&& bucket) { // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. - if (determineIncludeField(colName, _unpackerBehavior, _spec)) { + auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end(); + if ((_unpackerBehavior == Behavior::kInclude) == found) { _fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}}); } } - - // Save the measurement count for the owned bucket. - _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); } void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { @@ -313,7 +268,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior } Document BucketUnpacker::getNext() { - tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); + invariant(hasNext()); auto measurement = MutableDocument{}; auto&& timeElem = _timeFieldIter->next(); @@ -336,36 +291,6 @@ Document BucketUnpacker::getNext() { return measurement.freeze(); } -Document BucketUnpacker::extractSingleMeasurement(int j) { - tassert(5422101, - "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " - "or equal to the number of measurements in a bucket", - j >= 0 && j < _numberOfMeasurements); - - auto measurement = MutableDocument{}; - - auto rowKey = std::to_string(j); - auto targetIdx = StringData{rowKey}; - auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj(); - - if (_includeMetaField && !_metaValue.isNull()) { - measurement.addField(*_spec.metaField, Value{_metaValue}); - } - - for (auto&& dataElem : dataRegion) { - auto colName = dataElem.fieldNameStringData(); - if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { - continue; - } - auto value = dataElem[targetIdx]; - if (value) { - measurement.addField(dataElem.fieldNameStringData(), Value{value}); - } - } - - return measurement.freeze(); -} - DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} @@ -437,8 +362,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField}); } -void DocumentSourceInternalUnpackBucket::serializeToArray( - std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { +Value DocumentSourceInternalUnpackBucket::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { MutableDocument out; auto behavior = _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude; @@ -452,84 +377,10 @@ void DocumentSourceInternalUnpackBucket::serializeToArray( if (spec.metaField) { out.addField(kMetaFieldName, Value{*spec.metaField}); } - - if (!explain) { - array.push_back(Value(DOC(getSourceName() << out.freeze()))); - if (_sampleSize) { - auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize); - sampleSrc->serializeToArray(array); - } - } else { - if (_sampleSize) { - out.addField("sample", Value{static_cast<long long>(*_sampleSize)}); - out.addField("bucketMaxCount", Value{_bucketMaxCount}); - } - array.push_back(Value(DOC(getSourceName() << out.freeze()))); - } -} - -DocumentSource::GetNextResult -DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() { - const auto kMaxAttempts = 100; - for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) { - auto randResult = pSource->getNext(); - switch (randResult.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - auto bucket = randResult.getDocument().toBson(); - _bucketUnpacker.reset(std::move(bucket)); - - auto& prng = pExpCtx->opCtx->getClient()->getPrng(); - auto j = prng.nextInt64(_bucketMaxCount); - - if (j < _bucketUnpacker.numberOfMeasurements()) { - auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j); - - auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName]; - auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; - - if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { - _nSampledSoFar++; - return sampledDocument; - } else { - LOGV2_DEBUG( - 5422102, - 1, - "$_internalUnpackBucket optimized for sample saw duplicate measurement", - "measurementIndex"_attr = j, - "bucketId"_attr = bucketId); - } - } - break; - } - case GetNextResult::ReturnStatus::kPauseExecution: { - // This state should never be reached since the input stage is a random cursor. - MONGO_UNREACHABLE; - } - case GetNextResult::ReturnStatus::kEOF: { - return randResult; - } - } - } - uasserted(5422103, - str::stream() - << "$_internalUnpackBucket stage could not find a non-duplicate document after " - << kMaxAttempts - << " attempts while using a random cursor. This is likely a " - "sporadic failure, please try again"); + return Value(DOC(getSourceName() << out.freeze())); } DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { - // If the '_sampleSize' member is present, then the stage will produce randomly sampled - // documents from buckets. - if (_sampleSize) { - if (_nSampledSoFar >= _sampleSize) { - return GetNextResult::makeEOF(); - } - return sampleUniqueMeasurementFromBuckets(); - } - - // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is - // exhausted. if (_bucketUnpacker.hasNext()) { return _bucketUnpacker.getNext(); } diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index c9880317964..18316a1a3e9 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -52,7 +52,6 @@ struct BucketSpec { std::set<std::string> fieldSet; }; - /** * BucketUnpacker will unpack bucket fields for metadata and the provided fields. */ @@ -63,29 +62,6 @@ public: static constexpr StringData kBucketDataFieldName = "data"_sd; static constexpr StringData kBucketMetaFieldName = "meta"_sd; - // A table that is useful for interpolations between the number of measurements in a bucket and - // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i, - // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the - // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are - // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99], - // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to - // exceed the server BSON object limit of 16 MB. - static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{ - {{0, BSONObj::kMinBSONLength}, - {10, 115}, - {100, 1195}, - {1000, 12895}, - {10000, 138895}, - {100000, 1488895}, - {1000000, 15888895}, - {10000000, 168888895}}}; - - /** - * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series - * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time. - */ - static int computeMeasurementCount(int targetTimestampObjSize); - // When BucketUnpacker is created with kInclude it must produce measurements that contain the // set of fields. Otherwise, if the kExclude option is used, the measurements will include the // set difference between all fields in the bucket and the provided fields. @@ -100,18 +76,7 @@ public: _includeTimeField(includeTimeField), _includeMetaField(includeMetaField) {} - /** - * This method will continue to materialize Documents until the bucket is exhausted. A - * precondition of this method is that 'hasNext()' must be true. - */ Document getNext(); - - /** - * This method will extract the j-th measurement from the bucket. A precondition of this method - * is that j >= 0 && j <= the number of measurements within the underlying bucket. - */ - Document extractSingleMeasurement(int j); - bool hasNext() const { return _timeFieldIter && _timeFieldIter->more(); } @@ -141,10 +106,6 @@ public: return _includeTimeField; } - int32_t numberOfMeasurements() const { - return _numberOfMeasurements; - } - void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); private: @@ -171,9 +132,6 @@ private: // Iterators used to unpack the columns of the above bucket that are populated during the reset // phase according to the provided 'Behavior' and 'BucketSpec'. std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters; - - // The number of measurements in the bucket. - int32_t _numberOfMeasurements = 0; }; class DocumentSourceInternalUnpackBucket : public DocumentSource { @@ -196,17 +154,6 @@ public: return kStageName.rawData(); } - void serializeToArray( - std::vector<Value>& array, - boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - - /** - * Use 'serializeToArray' above. - */ - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { - MONGO_UNREACHABLE; - } - bool includeMetaField() const { return _bucketUnpacker.includeMetaField(); } @@ -227,13 +174,7 @@ public: ChangeStreamRequirement::kBlacklist}; } - DepsTracker::State getDependencies(DepsTracker* deps) const final { - if (_sampleSize) { - deps->needRandomGenerator = true; - } - deps->needWholeDocument = true; - return DepsTracker::State::EXHAUSTIVE_ALL; - } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; @@ -294,69 +235,9 @@ public: std::unique_ptr<MatchExpression> createPredicatesOnBucketLevelField( const MatchExpression* matchExpr) const; - /** - * Sets the sample size to 'n' and the maximum number of measurements in a bucket to be - * 'bucketMaxCount'. Calling this method implicitly changes the behavior from having the stage - * unpack every bucket in a collection to sampling buckets to generate a uniform random sample - * of size 'n'. - */ - void setSampleParameters(long long n, int bucketMaxCount) { - _sampleSize = n; - _bucketMaxCount = bucketMaxCount; - } - - boost::optional<long long> sampleSize() const { - return _sampleSize; - } - private: - /** - * Carries the bucket _id and index for the measurement that was sampled by - * 'sampleRandomBucketOptimized'. - */ - struct SampledMeasurementKey { - SampledMeasurementKey(OID bucketId, int64_t measurementIndex) - : bucketId(bucketId), measurementIndex(measurementIndex) {} - - bool operator==(const SampledMeasurementKey& key) const { - return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex; - } - - OID bucketId; - int32_t measurementIndex; - }; - - /** - * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can - * be kept track of for de-duplication after sampling. - */ - struct SampledMeasurementKeyHasher { - size_t operator()(const SampledMeasurementKey& s) const { - return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^ - absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^ - absl::Hash<int32_t>{}(s.measurementIndex); - } - }; - - // Tracks which measurements have been seen so far. This is only used when sampling is enabled - // for the purpose of de-duplicating measurements. - using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>; - GetNextResult doGetNext() final; - /** - * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a - * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum - * number of tries is exhausted this method will throw. - */ - GetNextResult sampleUniqueMeasurementFromBuckets(); - BucketUnpacker _bucketUnpacker; - - long long _nSampledSoFar = 0; - int _bucketMaxCount = 0; - boost::optional<long long> _sampleSize; - - SeenSet _seenSet; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp index 29287bce3b8..ae409de7e69 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2021-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -784,284 +784,5 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter AssertionException, 5408000); } - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time" - << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a" - << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b" - << BSON("1" << 1 << "2" << 2))); - - unpacker.reset(std::move(bucket)); - - auto next = unpacker.extractSingleMeasurement(0); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(2); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 3}, - {"time", d3}, - {"a", 3}, - {"b", 2}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(1); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 2}, - {"time", d2}, - {"a", 2}, - {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the middle element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); -} - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time" - << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1) - << "b" << BSON("1" << 1))); - - unpacker.reset(std::move(bucket)); - auto next = unpacker.extractSingleMeasurement(1); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element twice in a row? - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); -} - -class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture { -protected: - BSONObj makeIncludeAllSpec() { - return BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << DocumentSourceInternalUnpackBucket::kTimeFieldName - << kUserDefinedTimeName - << DocumentSourceInternalUnpackBucket::kMetaFieldName - << kUserDefinedMetaName)); - } - - boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec, - long long nSample, - int bucketMaxCount) { - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, bucketMaxCount); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample, - int nBuckets, - int nMeasurements) { - auto spec = makeIncludeAllSpec(); - generateBuckets(nBuckets, nMeasurements); - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, 1000); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> prepareMock() { - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto&& b : _buckets) { - mock->push_back(DocumentSource::GetNextResult{std::move(b)}); - } - return mock; - } - - Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) { - auto doc = MutableDocument{}; - for (auto i = 0; i < nMeasurements; ++i) { - doc.addField(std::to_string(i), gen(i)); - } - return doc.freeze(); - } - - void generateBuckets(int nBuckets, int nMeasurements) { - auto& prng = getExpCtx()->opCtx->getClient()->getPrng(); - std::vector<Document> buckets; - for (auto m = 0; m < nBuckets; m++) { - auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; }); - auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; }); - auto aCol = makeBucketPart(nMeasurements, - [&](int i) { return Value{prng.nextCanonicalDouble()}; }); - buckets.push_back({Document{ - {"_id", Value{OID::gen()}}, - {"meta", Document{{"m1", m}, {"m2", m + 1}}}, - {"data", - Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}}); - } - - _buckets = std::move(buckets); - } - -private: - std::vector<Document> _buckets; -}; - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) { - auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000); - auto mock = prepareMock(); - unpack->setSource(mock.get()); - - auto next = unpack->getNext(); - ASSERT_TRUE(next.isAdvanced()); - - auto avg = 0.0; - auto nSampled = 0; - while (next.isAdvanced()) { - avg += next.getDocument()["a"].getDouble(); - next = unpack->getNext(); - nSampled++; - } - avg /= nSampled; - ASSERT_EQ(nSampled, 100); - - // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12). - // We will check if the avg is between +/- 2*sqrt(1/12). - auto stddev = std::sqrt(1.0 / 12.0); - ASSERT_GT(avg, 0.5 - 2 * stddev); - ASSERT_LT(avg, 0.5 + 2 * stddev); -} - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) { - auto spec = BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << DocumentSourceInternalUnpackBucket::kTimeFieldName - << kUserDefinedTimeName - << DocumentSourceInternalUnpackBucket::kMetaFieldName - << kUserDefinedMetaName)); - - // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1. - auto unpack = makeUnpackStage(spec, 2, 1); - - // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over - // again until the 'kMaxAttempts' are reached in 'doGetNext'. - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto i = 0; i < 101; ++i) { - mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}}, - {"meta", Document{{"m1", 1}, {"m2", 2}}}, - {"data", - Document{{"_id", Document{{"0", 1}}}, - {"time", Document{{"0", Date_t::now()}}}, - {"a", Document{{"0", 1}}}}}}); - } - unpack->setSource(mock.get()); - - // The sample size is 2 and there's only one unique measurement in the mock. The second - // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw. - ASSERT_TRUE(unpack->getNext().isAdvanced()); - ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103); -} - -namespace { -/** - * Manually computes the timestamp object size for n timestamps. - */ -auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) { - BSONObjBuilder bob; - for (auto i = 0; i < n; ++i) { - bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now()); - } - return bob.done().objsize(); -} -} // namespace - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) { - // The last table entry is a sentinel for an upper bound on the interval that covers measurement - // counts up to 16 MB. - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // Test the case when the target size hits a table entry which represents the lower bound of an - // interval. - for (size_t index = 0; index < maxTableEntry; ++index) { - auto interval = BucketUnpacker::kTimestampObjSizeTable[index]; - ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) { - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this - // fact and walk the table backwards to check the correctness of the S_i'th interval's upper - // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size - // containing one timestamp with the appropriate rowKey. - std::pair<int, int> currentInterval; - auto currentIntervalSize = 0; - auto currentIntervalCount = 0; - auto size = 0; - for (size_t index = maxTableEntry; index > 0; --index) { - currentInterval = BucketUnpacker::kTimestampObjSizeTable[index]; - currentIntervalSize = currentInterval.second; - currentIntervalCount = currentInterval.first; - auto rowKey = currentIntervalCount - 1; - size = expectedTimestampObjSize(rowKey, 1); - // We need to add back the kMinBSONLength since it's subtracted out. - ASSERT_EQ(currentIntervalCount - 1, - BucketUnpacker::computeMeasurementCount(currentIntervalSize - size + - BSONObj::kMinBSONLength)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) { - // Test all values for some of the smaller intervals up to 100 measurements. - for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) { - auto size = expectedTimestampObjSize(0, bucketCount); - ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) { - ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003)); - ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560)); - ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863)); -} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index ef095170045..84d4905070c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -59,7 +59,6 @@ #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_geo_near_cursor.h" #include "mongo/db/pipeline/document_source_group.h" -#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sample_from_random_cursor.h" @@ -78,7 +77,6 @@ #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" -#include "mongo/db/timeseries/timeseries_gen.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/util/time_support.h" @@ -93,15 +91,20 @@ using write_ops::Insert; namespace { /** - * Returns a 'PlanExecutor' which uses a random cursor to sample documents if successful as - * determined by the boolean. Returns {} if the storage engine doesn't support random cursors, or if - * 'sampleSize' is a large enough percentage of the collection. + * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} + * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough + * percentage of the collection. + * + * If needed, adds DocumentSourceSampleFromRandomCursor to the front of the pipeline, replacing the + * $sample stage. This is needed if we select an optimized plan for $sample taking advantage of + * storage engine support for random cursors. */ -StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>> -createRandomCursorExecutor(const CollectionPtr& coll, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - long long sampleSize, - long long numRecords) { +StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorExecutor( + const CollectionPtr& coll, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long sampleSize, + long long numRecords, + Pipeline* pipeline) { OperationContext* opCtx = expCtx->opCtx; // Verify that we are already under a collection lock. We avoid taking locks ourselves in this @@ -110,14 +113,14 @@ createRandomCursorExecutor(const CollectionPtr& coll, static const double kMaxSampleRatioForRandCursor = 0.05; if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) { - return std::pair{nullptr, false}; + return {nullptr}; } // Attempt to get a random cursor from the RecordStore. auto rsRandCursor = coll->getRecordStore()->getRandomCursor(opCtx); if (!rsRandCursor) { // The storage engine has no random cursor support. - return std::pair{nullptr, false}; + return {nullptr}; } // Build a MultiIteratorStage and pass it the random-sampling RecordCursor. @@ -166,21 +169,25 @@ createRandomCursorExecutor(const CollectionPtr& coll, trialStage = static_cast<TrialStage*>(root.get()); } - auto execStatus = plan_executor_factory::make(expCtx, - std::move(ws), - std::move(root), - &coll, - PlanYieldPolicy::YieldPolicy::YIELD_AUTO, - QueryPlannerParams::RETURN_OWNED_DATA); - if (!execStatus.isOK()) { - return execStatus.getStatus(); - } + auto exec = plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + &coll, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + QueryPlannerParams::RETURN_OWNED_DATA); // For sharded collections, the root of the plan tree is a TrialStage that may have chosen // either a random-sampling cursor trial plan or a COLLSCAN backup plan. We can only optimize // the $sample aggregation stage if the trial plan was chosen. - return std::pair{std::move(execStatus.getValue()), - !trialStage || !trialStage->pickedBackupPlan()}; + if (!trialStage || !trialStage->pickedBackupPlan()) { + // Replace $sample stage with $sampleFromRandomCursor stage. + pipeline->popFront(); + std::string idString = coll->ns().isOplog() ? "ts" : "_id"; + pipeline->addInitialSource( + DocumentSourceSampleFromRandomCursor::create(expCtx, sampleSize, idString, numRecords)); + } + + return exec; } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( @@ -317,106 +324,9 @@ StringData extractGeoNearFieldFromIndexes(OperationContext* opCtx, } MONGO_UNREACHABLE; } - -/** - * This attempts to either extract a $sample stage at the front of the pipeline or a - * $_internalUnpackBucket stage at the front of the pipeline immediately followed by a $sample - * stage. In the former case a 'nullptr' is returned for the second element of the pair <$sample, - * $_internalUnpackBucket>, and if the latter case is encountered both elements of the pair will be - * a populated. If the pipeline doesn't contain a $_internalUnpackBucket at the front of the - * pipeline immediately followed by a $sample stage, then the first element in the pair will be a - * 'nullptr'. - */ -std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSampleUnpackBucket( - const Pipeline::SourceContainer& sources) { - DocumentSourceSample* sampleStage = nullptr; - DocumentSourceInternalUnpackBucket* unpackStage = nullptr; - - auto sourcesIt = sources.begin(); - if (sourcesIt != sources.end()) { - sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get()); - if (sampleStage) { - return std::pair{sampleStage, unpackStage}; - } - - unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get()); - ++sourcesIt; - - if (sourcesIt != sources.end()) { - sampleStage = dynamic_cast<DocumentSourceSample*>(sourcesIt->get()); - return std::pair{sampleStage, unpackStage}; - } - } - - return std::pair{sampleStage, unpackStage}; -} } // namespace std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> -PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, - DocumentSourceInternalUnpackBucket* unpackBucketStage, - const CollectionPtr& collection, - Pipeline* pipeline) { - tassert(5422105, "sampleStage cannot be a nullptr", sampleStage); - - auto expCtx = pipeline->getContext(); - - Pipeline::SourceContainer& sources = pipeline->_sources; - - const long long sampleSize = sampleStage->getSampleSize(); - const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); - auto&& [exec, isStorageOptimizedSample] = - uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords)); - - AttachExecutorCallback attachExecutorCallback; - if (exec) { - if (isStorageOptimizedSample) { - if (!unpackBucketStage) { - // Replace $sample stage with $sampleFromRandomCursor stage. - pipeline->popFront(); - std::string idString = collection->ns().isOplog() ? "ts" : "_id"; - pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create( - expCtx, sampleSize, idString, numRecords)); - } else { - // If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then - // 'unpackBucketStage' is at the front of the pipeline immediately followed by a - // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample. - unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount); - sources.erase(std::next(sources.begin())); - - // Fix the source for the next stage by pointing it to the $_internalUnpackBucket - // stage. - auto sourcesIt = sources.begin(); - if (std::next(sourcesIt) != sources.end()) { - ++sourcesIt; - (*sourcesIt)->setSource(unpackBucketStage); - } - } - } - - // The order in which we evaluate these arguments is significant. We'd like to be - // sure that the DocumentSourceCursor is created _last_, because if we run into a - // case where a DocumentSourceCursor has been created (yet hasn't been put into a - // Pipeline) and an exception is thrown, an invariant will trigger in the - // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor. - auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata); - const auto cursorType = deps.hasNoRequirements() - ? DocumentSourceCursor::CursorType::kEmptyDocuments - : DocumentSourceCursor::CursorType::kRegular; - attachExecutorCallback = - [cursorType](const CollectionPtr& collection, - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, - Pipeline* pipeline) { - auto cursor = DocumentSourceCursor::create( - collection, std::move(exec), pipeline->getContext(), cursorType); - pipeline->addInitialSource(std::move(cursor)); - }; - return std::pair(std::move(attachExecutorCallback), std::move(exec)); - } - return std::pair(std::move(attachExecutorCallback), nullptr); -} - -std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, const NamespaceString& nss, const AggregateCommand* aggRequest, @@ -431,15 +341,31 @@ PipelineD::buildInnerQueryExecutor(const CollectionPtr& collection, } if (!sources.empty()) { - // Try to inspect if the DocumentSourceSample or a DocumentSourceInternalUnpackBucket stage - // can be optimized for sampling backed by a storage engine supplied random cursor. - auto&& [sampleStage, unpackBucketStage] = extractSampleUnpackBucket(sources); - + auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get()); // Optimize an initial $sample stage if possible. if (collection && sampleStage) { - auto [attachExecutorCallback, exec] = - buildInnerQueryExecutorSample(sampleStage, unpackBucketStage, collection, pipeline); + const long long sampleSize = sampleStage->getSampleSize(); + const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); + auto exec = uassertStatusOK( + createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords, pipeline)); if (exec) { + // The order in which we evaluate these arguments is significant. We'd like to be + // sure that the DocumentSourceCursor is created _last_, because if we run into a + // case where a DocumentSourceCursor has been created (yet hasn't been put into a + // Pipeline) and an exception is thrown, an invariant will trigger in the + // DocumentSourceCursor. This is a design flaw in DocumentSourceCursor. + auto deps = pipeline->getDependencies(DepsTracker::kAllMetadata); + const auto cursorType = deps.hasNoRequirements() + ? DocumentSourceCursor::CursorType::kEmptyDocuments + : DocumentSourceCursor::CursorType::kRegular; + auto attachExecutorCallback = + [cursorType](const CollectionPtr& collection, + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, + Pipeline* pipeline) { + auto cursor = DocumentSourceCursor::create( + collection, std::move(exec), pipeline->getContext(), cursorType); + pipeline->addInitialSource(std::move(cursor)); + }; return std::make_pair(std::move(attachExecutorCallback), std::move(exec)); } } diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index ba7969550c5..37c70d88f70 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -38,8 +38,6 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/document_source_group.h" -#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" -#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/plan_executor.h" @@ -171,19 +169,6 @@ private: Pipeline* pipeline); /** - * Build a PlanExecutor and prepare a callback to create a special DocumentSourceSample or a - * DocumentSourceInternalUnpackBucket stage that has been rewritten to sample buckets using a - * storage engine supplied random cursor if the heuristics used for the optimization allows. If - * the optimized $sample plan cannot or should not be produced, returns a null PlanExecutor - * pointer. - */ - static std::pair<AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> - buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, - DocumentSourceInternalUnpackBucket* unpackBucketStage, - const CollectionPtr& collection, - Pipeline* pipeline); - - /** * Creates a PlanExecutor to be used in the initial cursor source. This function will try to * push down the $sort, $project, $match and $limit stages into the PlanStage layer whenever * possible. In this case, these stages will be incorporated into the PlanExecutor. |