diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 161 |
1 files changed, 155 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index bb156781132..d17840698d9 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -38,10 +40,12 @@ #include "mongo/db/matcher/expression_internal_expr_comparison.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/logv2/log.h" namespace mongo { @@ -77,6 +81,16 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket } /** + * Determine if an arbitrary field should be included in the materialized measurements. + */ +auto determineIncludeField(StringData fieldName, + BucketUnpacker::Behavior unpackerBehavior, + const BucketSpec& bucketSpec) { + return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); +} + +/** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ @@ -199,6 +213,35 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon } } // namespace +// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the +// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in +// the table, this helper returns the measurement count corresponding to the table record. +// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} +// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the +// pair for the interval is computed and then linear interpolation is used to compute the +// measurement count corresponding to the 'targetTimestampObjSize' provided. +int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { + auto currentInterval = + std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), + std::end(BucketUnpacker::kTimestampObjSizeTable), + [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); + + if (currentInterval->second == targetTimestampObjSize) { + return currentInterval->first; + } + // This points to the first interval larger than the target 'targetTimestampObjSize', the actual + // interval that will cover the object size is the interval before the current one. + tassert(5422104, + "currentInterval should not point to the first table entry", + currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); + --currentInterval; + + auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); + + return currentInterval->first + + ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); +} + void BucketUnpacker::reset(BSONObj&& bucket) { _fieldIters.clear(); _timeFieldIter = boost::none; @@ -253,11 +296,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) { // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. - auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end(); - if ((_unpackerBehavior == Behavior::kInclude) == found) { + if (determineIncludeField(colName, _unpackerBehavior, _spec)) { _fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}}); } } + + // Save the measurement count for the owned bucket. + _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); } void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { @@ -268,7 +313,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior } Document BucketUnpacker::getNext() { - invariant(hasNext()); + tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); auto measurement = MutableDocument{}; auto&& timeElem = _timeFieldIter->next(); @@ -292,6 +337,36 @@ Document BucketUnpacker::getNext() { return measurement.freeze(); } +Document BucketUnpacker::extractSingleMeasurement(int j) { + tassert(5422101, + "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " + "or equal to the number of measurements in a bucket", + j >= 0 && j < _numberOfMeasurements); + + auto measurement = MutableDocument{}; + + auto rowKey = std::to_string(j); + auto targetIdx = StringData{rowKey}; + auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj(); + + if (_includeMetaField && !_metaValue.isNull()) { + measurement.addField(*_spec.metaField, Value{_metaValue}); + } + + for (auto&& dataElem : dataRegion) { + auto colName = dataElem.fieldNameStringData(); + if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { + continue; + } + auto value = dataElem[targetIdx]; + if (value) { + measurement.addField(dataElem.fieldNameStringData(), Value{value}); + } + } + + return measurement.freeze(); +} + DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} @@ -363,8 +438,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField}); } -Value DocumentSourceInternalUnpackBucket::serialize( - boost::optional<ExplainOptions::Verbosity> explain) const { +void DocumentSourceInternalUnpackBucket::serializeToArray( + std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { MutableDocument out; auto behavior = _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude; @@ -378,10 +453,84 @@ Value DocumentSourceInternalUnpackBucket::serialize( if (spec.metaField) { out.addField(kMetaFieldName, Value{*spec.metaField}); } - return Value(DOC(getSourceName() << out.freeze())); + + if (!explain) { + array.push_back(Value(DOC(getSourceName() << out.freeze()))); + if (_sampleSize) { + auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize); + sampleSrc->serializeToArray(array); + } + } else { + if (_sampleSize) { + out.addField("sample", Value{static_cast<long long>(*_sampleSize)}); + out.addField("bucketMaxCount", Value{_bucketMaxCount}); + } + array.push_back(Value(DOC(getSourceName() << out.freeze()))); + } +} + +DocumentSource::GetNextResult +DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() { + const auto kMaxAttempts = 100; + for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) { + auto randResult = pSource->getNext(); + switch (randResult.getStatus()) { + case GetNextResult::ReturnStatus::kAdvanced: { + auto bucket = randResult.getDocument().toBson(); + _bucketUnpacker.reset(std::move(bucket)); + + auto& prng = pExpCtx->opCtx->getClient()->getPrng(); + auto j = prng.nextInt64(_bucketMaxCount); + + if (j < _bucketUnpacker.numberOfMeasurements()) { + auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j); + + auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName]; + auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; + + if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { + _nSampledSoFar++; + return sampledDocument; + } else { + LOGV2_DEBUG( + 5422102, + 1, + "$_internalUnpackBucket optimized for sample saw duplicate measurement", + "measurementIndex"_attr = j, + "bucketId"_attr = bucketId); + } + } + break; + } + case GetNextResult::ReturnStatus::kPauseExecution: { + // This state should never be reached since the input stage is a random cursor. + MONGO_UNREACHABLE; + } + case GetNextResult::ReturnStatus::kEOF: { + return randResult; + } + } + } + uasserted(5422103, + str::stream() + << "$_internalUnpackBucket stage could not find a non-duplicate document after " + << kMaxAttempts + << " attempts while using a random cursor. This is likely a " + "sporadic failure, please try again"); } DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { + // If the '_sampleSize' member is present, then the stage will produce randomly sampled + // documents from buckets. + if (_sampleSize) { + if (_nSampledSoFar >= _sampleSize) { + return GetNextResult::makeEOF(); + } + return sampleUniqueMeasurementFromBuckets(); + } + + // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is + // exhausted. if (_bucketUnpacker.hasNext()) { return _bucketUnpacker.getNext(); } |