summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp161
1 files changed, 155 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index bb156781132..d17840698d9 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
@@ -38,10 +40,12 @@
#include "mongo/db/matcher/expression_internal_expr_comparison.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/logv2/log.h"
namespace mongo {
@@ -77,6 +81,16 @@ auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, Bucket
}
/**
+ * Determine if an arbitrary field should be included in the materialized measurements.
+ */
+auto determineIncludeField(StringData fieldName,
+ BucketUnpacker::Behavior unpackerBehavior,
+ const BucketSpec& bucketSpec) {
+ return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end());
+}
+
+/**
* A projection can be internalized if every field corresponds to a boolean value. Note that this
* correctly rejects dotted fieldnames, which are mapped to objects internally.
*/
@@ -199,6 +213,35 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon
}
} // namespace
+// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the
+// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in
+// the table, this helper returns the measurement count corresponding to the table record.
+// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i}
+// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the
+// pair for the interval is computed and then linear interpolation is used to compute the
+// measurement count corresponding to the 'targetTimestampObjSize' provided.
+int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) {
+ auto currentInterval =
+ std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable),
+ std::end(BucketUnpacker::kTimestampObjSizeTable),
+ [&](const auto& entry) { return targetTimestampObjSize <= entry.second; });
+
+ if (currentInterval->second == targetTimestampObjSize) {
+ return currentInterval->first;
+ }
+ // This points to the first interval larger than the target 'targetTimestampObjSize', the actual
+ // interval that will cover the object size is the interval before the current one.
+ tassert(5422104,
+ "currentInterval should not point to the first table entry",
+ currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin());
+ --currentInterval;
+
+ auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin());
+
+ return currentInterval->first +
+ ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey));
+}
+
void BucketUnpacker::reset(BSONObj&& bucket) {
_fieldIters.clear();
_timeFieldIter = boost::none;
@@ -253,11 +296,13 @@ void BucketUnpacker::reset(BSONObj&& bucket) {
// Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or
// _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'.
- auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end();
- if ((_unpackerBehavior == Behavior::kInclude) == found) {
+ if (determineIncludeField(colName, _unpackerBehavior, _spec)) {
_fieldIters.push_back({colName.toString(), BSONObjIterator{elem.Obj()}});
}
}
+
+ // Save the measurement count for the owned bucket.
+ _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize());
}
void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) {
@@ -268,7 +313,7 @@ void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior
}
Document BucketUnpacker::getNext() {
- invariant(hasNext());
+ tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext());
auto measurement = MutableDocument{};
auto&& timeElem = _timeFieldIter->next();
@@ -292,6 +337,36 @@ Document BucketUnpacker::getNext() {
return measurement.freeze();
}
+Document BucketUnpacker::extractSingleMeasurement(int j) {
+ tassert(5422101,
+ "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than "
+ "or equal to the number of measurements in a bucket",
+ j >= 0 && j < _numberOfMeasurements);
+
+ auto measurement = MutableDocument{};
+
+ auto rowKey = std::to_string(j);
+ auto targetIdx = StringData{rowKey};
+ auto&& dataRegion = _bucket.getField(kBucketDataFieldName).Obj();
+
+ if (_includeMetaField && !_metaValue.isNull()) {
+ measurement.addField(*_spec.metaField, Value{_metaValue});
+ }
+
+ for (auto&& dataElem : dataRegion) {
+ auto colName = dataElem.fieldNameStringData();
+ if (!determineIncludeField(colName, _unpackerBehavior, _spec)) {
+ continue;
+ }
+ auto value = dataElem[targetIdx];
+ if (value) {
+ measurement.addField(dataElem.fieldNameStringData(), Value{value});
+ }
+ }
+
+ return measurement.freeze();
+}
+
DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
: DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
@@ -363,8 +438,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF
std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField});
}
-Value DocumentSourceInternalUnpackBucket::serialize(
- boost::optional<ExplainOptions::Verbosity> explain) const {
+void DocumentSourceInternalUnpackBucket::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
MutableDocument out;
auto behavior =
_bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude;
@@ -378,10 +453,84 @@ Value DocumentSourceInternalUnpackBucket::serialize(
if (spec.metaField) {
out.addField(kMetaFieldName, Value{*spec.metaField});
}
- return Value(DOC(getSourceName() << out.freeze()));
+
+ if (!explain) {
+ array.push_back(Value(DOC(getSourceName() << out.freeze())));
+ if (_sampleSize) {
+ auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize);
+ sampleSrc->serializeToArray(array);
+ }
+ } else {
+ if (_sampleSize) {
+ out.addField("sample", Value{static_cast<long long>(*_sampleSize)});
+ out.addField("bucketMaxCount", Value{_bucketMaxCount});
+ }
+ array.push_back(Value(DOC(getSourceName() << out.freeze())));
+ }
+}
+
+DocumentSource::GetNextResult
+DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() {
+ const auto kMaxAttempts = 100;
+ for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) {
+ auto randResult = pSource->getNext();
+ switch (randResult.getStatus()) {
+ case GetNextResult::ReturnStatus::kAdvanced: {
+ auto bucket = randResult.getDocument().toBson();
+ _bucketUnpacker.reset(std::move(bucket));
+
+ auto& prng = pExpCtx->opCtx->getClient()->getPrng();
+ auto j = prng.nextInt64(_bucketMaxCount);
+
+ if (j < _bucketUnpacker.numberOfMeasurements()) {
+ auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j);
+
+ auto bucketId = _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName];
+ auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j};
+
+ if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) {
+ _nSampledSoFar++;
+ return sampledDocument;
+ } else {
+ LOGV2_DEBUG(
+ 5422102,
+ 1,
+ "$_internalUnpackBucket optimized for sample saw duplicate measurement",
+ "measurementIndex"_attr = j,
+ "bucketId"_attr = bucketId);
+ }
+ }
+ break;
+ }
+ case GetNextResult::ReturnStatus::kPauseExecution: {
+ // This state should never be reached since the input stage is a random cursor.
+ MONGO_UNREACHABLE;
+ }
+ case GetNextResult::ReturnStatus::kEOF: {
+ return randResult;
+ }
+ }
+ }
+ uasserted(5422103,
+ str::stream()
+ << "$_internalUnpackBucket stage could not find a non-duplicate document after "
+ << kMaxAttempts
+ << " attempts while using a random cursor. This is likely a "
+ "sporadic failure, please try again");
}
DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
+ // If the '_sampleSize' member is present, then the stage will produce randomly sampled
+ // documents from buckets.
+ if (_sampleSize) {
+ if (_nSampledSoFar >= _sampleSize) {
+ return GetNextResult::makeEOF();
+ }
+ return sampleUniqueMeasurementFromBuckets();
+ }
+
+ // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is
+ // exhausted.
if (_bucketUnpacker.hasNext()) {
return _bucketUnpacker.getNext();
}