diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 161 |
1 files changed, 6 insertions, 155 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index da1279ab92d..b20be4cdd68 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -40,12 +38,10 @@ #include "mongo/db/matcher/expression_internal_expr_comparison.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/logv2/log.h" namespace mongo { @@ -81,16 +77,6 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket } /** - * Determine if an arbitrary field should be included in the materialized measurements. - */ -auto determineIncludeField(StringData fieldName, - BucketUnpacker::Behavior unpackerBehavior, - const BucketSpec& bucketSpec) { - return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == - (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); -} - -/** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ @@ -213,35 +199,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon } } // namespace -// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the -// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in -// the table, this helper returns the measurement count corresponding to the table record. -// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} -// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the -// pair for the interval is computed and then linear interpolation is used to compute the -// measurement count corresponding to the 'targetTimestampObjSize' provided. -int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { - auto currentInterval = - std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), - std::end(BucketUnpacker::kTimestampObjSizeTable), - [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); - - if (currentInterval->second == targetTimestampObjSize) { - return currentInterval->first; - } - // This points to the first interval larger than the target 'targetTimestampObjSize', the actual - // interval that will cover the object size is the interval before the current one. - tassert(5422104, - "currentInterval should not point to the first table entry", - currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); - --currentInterval; - - auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); - - return currentInterval->first + - ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); -} - void BucketUnpacker::reset(BSONObj&& bucket) { _fieldIters.clear(); _timeFieldIter = boost::none; @@ -296,13 +253,11 @@ void BucketUnpacker::reset(BSONObj&& bucket) { // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. - if (determineIncludeField(colName, _unpackerBehavior, _spec)) { + auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end(); + if ((_unpackerBehavior == Behavior::kInclude) == found) { _fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}}); } } - - // Save the measurement count for the owned bucket. - _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); } void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { @@ -313,7 +268,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior } Document BucketUnpacker::getNext() { - tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); + invariant(hasNext()); auto measurement = MutableDocument{}; auto&& timeElem = _timeFieldIter->next(); @@ -336,36 +291,6 @@ Document BucketUnpacker::getNext() { return measurement.freeze(); } -Document BucketUnpacker::extractSingleMeasurement(int j) { - tassert(5422101, - "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " - "or equal to the number of measurements in a bucket", - j >= 0 && j < _numberOfMeasurements); - - auto measurement = MutableDocument{}; - - auto rowKey = std::to_string(j); - auto targetIdx = StringData{rowKey}; - auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj(); - - if (_includeMetaField && !_metaValue.isNull()) { - measurement.addField(*_spec.metaField, Value{_metaValue}); - } - - for (auto&& dataElem : dataRegion) { - auto colName = dataElem.fieldNameStringData(); - if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { - continue; - } - auto value = dataElem[targetIdx]; - if (value) { - measurement.addField(dataElem.fieldNameStringData(), Value{value}); - } - } - - return measurement.freeze(); -} - DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} @@ -437,8 +362,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField}); } -void DocumentSourceInternalUnpackBucket::serializeToArray( - std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { +Value DocumentSourceInternalUnpackBucket::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { MutableDocument out; auto behavior = _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude; @@ -452,84 +377,10 @@ void DocumentSourceInternalUnpackBucket::serializeToArray( if (spec.metaField) { out.addField(kMetaFieldName, Value{*spec.metaField}); } - - if (!explain) { - array.push_back(Value(DOC(getSourceName() << out.freeze()))); - if (_sampleSize) { - auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize); - sampleSrc->serializeToArray(array); - } - } else { - if (_sampleSize) { - out.addField("sample", Value{static_cast<long long>(*_sampleSize)}); - out.addField("bucketMaxCount", Value{_bucketMaxCount}); - } - array.push_back(Value(DOC(getSourceName() << out.freeze()))); - } -} - -DocumentSource::GetNextResult -DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() { - const auto kMaxAttempts = 100; - for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) { - auto randResult = pSource->getNext(); - switch (randResult.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - auto bucket = randResult.getDocument().toBson(); - _bucketUnpacker.reset(std::move(bucket)); - - auto& prng = pExpCtx->opCtx->getClient()->getPrng(); - auto j = prng.nextInt64(_bucketMaxCount); - - if (j < _bucketUnpacker.numberOfMeasurements()) { - auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j); - - auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName]; - auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; - - if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { - _nSampledSoFar++; - return sampledDocument; - } else { - LOGV2_DEBUG( - 5422102, - 1, - "$_internalUnpackBucket optimized for sample saw duplicate measurement", - "measurementIndex"_attr = j, - "bucketId"_attr = bucketId); - } - } - break; - } - case GetNextResult::ReturnStatus::kPauseExecution: { - // This state should never be reached since the input stage is a random cursor. - MONGO_UNREACHABLE; - } - case GetNextResult::ReturnStatus::kEOF: { - return randResult; - } - } - } - uasserted(5422103, - str::stream() - << "$_internalUnpackBucket stage could not find a non-duplicate document after " - << kMaxAttempts - << " attempts while using a random cursor. This is likely a " - "sporadic failure, please try again"); + return Value(DOC(getSourceName() << out.freeze())); } DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { - // If the '_sampleSize' member is present, then the stage will produce randomly sampled - // documents from buckets. - if (_sampleSize) { - if (_nSampledSoFar >= _sampleSize) { - return GetNextResult::makeEOF(); - } - return sampleUniqueMeasurementFromBuckets(); - } - - // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is - // exhausted. if (_bucketUnpacker.hasNext()) { return _bucketUnpacker.getNext(); } |