summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp161
1 files changed, 6 insertions, 155 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index da1279ab92d..b20be4cdd68 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -27,8 +27,6 @@
* it in the license file.
*/
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
-
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
@@ -40,12 +38,10 @@
#include "mongo/db/matcher/expression_internal_expr_comparison.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
-#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/logv2/log.h"
namespace mongo {
@@ -81,16 +77,6 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket
}
/**
- * Determine if an arbitrary field should be included in the materialized measurements.
- */
-auto determineIncludeField(StringData fieldName,
- BucketUnpacker::Behavior unpackerBehavior,
- const BucketSpec& bucketSpec) {
- return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
- (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
-}
-
-/**
* A projection can be internalized if every field corresponds to a boolean value. Note that this
* correctly rejects dotted fieldnames, which are mapped to objects internally.
*/
@@ -213,35 +199,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon
}
} // namespace
-// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
-// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
-// the table, this helper returns the measurement count corresponding to the table record.
-// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
-// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
-// pair for the interval is computed and then linear interpolation is used to compute the
-// measurement count corresponding to the 'targetTimestampObjSize' provided.
-int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
- auto currentInterval =
- std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
- std::end(BucketUnpacker::kTimestampObjSizeTable),
- [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
-
- if (currentInterval->second == targetTimestampObjSize) {
- return currentInterval->first;
- }
- // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
- // interval that will cover the object size is the interval before the current one.
- tassert(5422104,
- "currentInterval should not point to the first table entry",
- currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
- --currentInterval;
-
- auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
-
- return currentInterval->first +
- ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
-}
-
void BucketUnpacker::reset(BSONObj&& bucket) {
_fieldIters.clear();
_timeFieldIter = boost::none;
@@ -296,13 +253,11 @@ void BucketUnpacker::reset(BSONObj&& bucket) {
// Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
// _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
- if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end();
+ if ((_unpackerBehavior == Behavior::kInclude) == found) {
_fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}});
}
}
-
- // Save the measurement count for the owned bucket.
- _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
}
void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
@@ -313,7 +268,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior
}
Document BucketUnpacker::getNext() {
- tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
+ invariant(hasNext());
auto measurement = MutableDocument{};
auto&& timeElem = _timeFieldIter->next();
@@ -336,36 +291,6 @@ Document BucketUnpacker::getNext() {
return measurement.freeze();
}
-Document BucketUnpacker::extractSingleMeasurement(int j) {
- tassert(5422101,
- "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
- "or equal to the number of measurements in a bucket",
- j >= 0 && j < _numberOfMeasurements);
-
- auto measurement = MutableDocument{};
-
- auto rowKey = std::to_string(j);
- auto targetIdx = StringData{rowKey};
- auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj();
-
- if (_includeMetaField && !_metaValue.isNull()) {
- measurement.addField(*_spec.metaField, Value{_metaValue});
- }
-
- for (auto&& dataElem : dataRegion) {
- auto colName = dataElem.fieldNameStringData();
- if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
- continue;
- }
- auto value = dataElem[targetIdx];
- if (value) {
- measurement.addField(dataElem.fieldNameStringData(), Value{value});
- }
- }
-
- return measurement.freeze();
-}
-
DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
: DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
@@ -437,8 +362,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField});
}
-void DocumentSourceInternalUnpackBucket::serializeToArray(
- std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+Value DocumentSourceInternalUnpackBucket::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
MutableDocument out;
auto behavior =
_bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude;
@@ -452,84 +377,10 @@ void DocumentSourceInternalUnpackBucket::serializeToArray(
if (spec.metaField) {
out.addField(kMetaFieldName, Value{*spec.metaField});
}
-
- if (!explain) {
- array.push_back(Value(DOC(getSourceName() << out.freeze())));
- if (_sampleSize) {
- auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize);
- sampleSrc->serializeToArray(array);
- }
- } else {
- if (_sampleSize) {
- out.addField("sample", Value{static_cast<long long>(*_sampleSize)});
- out.addField("bucketMaxCount", Value{_bucketMaxCount});
- }
- array.push_back(Value(DOC(getSourceName() << out.freeze())));
- }
-}
-
-DocumentSource::GetNextResult
-DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() {
- const auto kMaxAttempts = 100;
- for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) {
- auto randResult = pSource->getNext();
- switch (randResult.getStatus()) {
- case GetNextResult::ReturnStatus::kAdvanced: {
- auto bucket = randResult.getDocument().toBson();
- _bucketUnpacker.reset(std::move(bucket));
-
- auto& prng = pExpCtx->opCtx->getClient()->getPrng();
- auto j = prng.nextInt64(_bucketMaxCount);
-
- if (j < _bucketUnpacker.numberOfMeasurements()) {
- auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j);
-
- auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName];
- auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
-
- if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
- _nSampledSoFar++;
- return sampledDocument;
- } else {
- LOGV2_DEBUG(
- 5422102,
- 1,
- "$_internalUnpackBucket optimized for sample saw duplicate measurement",
- "measurementIndex"_attr = j,
- "bucketId"_attr = bucketId);
- }
- }
- break;
- }
- case GetNextResult::ReturnStatus::kPauseExecution: {
- // This state should never be reached since the input stage is a random cursor.
- MONGO_UNREACHABLE;
- }
- case GetNextResult::ReturnStatus::kEOF: {
- return randResult;
- }
- }
- }
- uasserted(5422103,
- str::stream()
- << "$_internalUnpackBucket stage could not find a non-duplicate document after "
- << kMaxAttempts
- << " attempts while using a random cursor. This is likely a "
- "sporadic failure, please try again");
+ return Value(DOC(getSourceName() << out.freeze()));
}
DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
- // If the '_sampleSize' member is present, then the stage will produce randomly sampled
- // documents from buckets.
- if (_sampleSize) {
- if (_nSampledSoFar >= _sampleSize) {
- return GetNextResult::makeEOF();
- }
- return sampleUniqueMeasurementFromBuckets();
- }
-
- // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is
- // exhausted.
if (_bucketUnpacker.hasNext()) {
return _bucketUnpacker.getNext();
}