summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2021-01-15 07:15:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-15 19:53:42 +0000
commit601e94d2edd7a07ca5dd3a66bf3bebaaf7597b4a (patch)
treee5db167e67e817702d2d83097f7df434a6b3f566
parentc7d912cbf5d1c2e0cd39f1fe0619435f17034f17 (diff)
downloadmongo-601e94d2edd7a07ca5dd3a66bf3bebaaf7597b4a.tar.gz
SERVER-53461 totalDataSizeGroupedBytesEstimate stats for $group should report total bytes processed
-rw-r--r--jstests/aggregation/explain_per_stage_exec_stats.js57
-rw-r--r--src/mongo/db/exec/plan_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_group.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h3
6 files changed, 72 insertions, 45 deletions
diff --git a/jstests/aggregation/explain_per_stage_exec_stats.js b/jstests/aggregation/explain_per_stage_exec_stats.js
index eace27b88d7..f7f65121a2d 100644
--- a/jstests/aggregation/explain_per_stage_exec_stats.js
+++ b/jstests/aggregation/explain_per_stage_exec_stats.js
@@ -51,27 +51,33 @@ const facet = [{$facet: {a: pipelineShardedStages, b: pipelineNoShardedStages}}]
// Verify behavior of $changeStream, which generates several internal stages.
const changeStream = [{$changeStream: {}}];
-function assertExecutionStats(stage) {
- assert(stage.hasOwnProperty("nReturned"));
- assert(stage.hasOwnProperty("executionTimeMillisEstimate"));
-
+// Checks if a particular stage has expected statistics.
+function assertStageExecutionStatsPresent(stage) {
if (stage.hasOwnProperty("$sort")) {
assert(stage.hasOwnProperty("totalDataSizeSortedBytesEstimate"), stage);
assert(stage.hasOwnProperty("usedDisk"), stage);
} else if (stage.hasOwnProperty("$group")) {
- assert(stage.hasOwnProperty("totalDataSizeGroupedBytesEstimate"), stage);
+ assert(stage.hasOwnProperty("totalOutputDataSizeBytes"), stage);
assert(stage.hasOwnProperty("usedDisk"), stage);
}
}
-function assertStatsInOutput(explain) {
+function assertExecutionStats(stage, assertExecutionStatsCallback) {
+ assert(stage.hasOwnProperty("nReturned"));
+ assert(stage.hasOwnProperty("executionTimeMillisEstimate"));
+
+ assert.neq(assertExecutionStatsCallback, null);
+ assertExecutionStatsCallback(stage);
+}
+
+function assertStatsInOutput(explain, assertExecutionStatsCallback) {
// Depending on how the pipeline is split, the explain output from each shard can contain either
// of these.
assert(explain.hasOwnProperty("stages") || explain.hasOwnProperty("queryPlanner"));
if (explain.hasOwnProperty("stages")) {
const stages = explain["stages"];
for (const stage of stages) {
- assertExecutionStats(stage);
+ assertExecutionStats(stage, assertExecutionStatsCallback);
}
} else {
// If we don't have a list of stages, "executionStats" should still contain "nReturned"
@@ -88,29 +94,33 @@ function assertStatsInOutput(explain) {
}
}
-function checkResults(result) {
+function checkResults(result, assertExecutionStatsCallback) {
// Loop over shards in sharded case.
// Note that we do not expect execution statistics for the 'splitPipeline' or 'mergingPart'
// of explain output in the sharded case, so we only check the 'shards' part of explain.
if (result.hasOwnProperty("shards")) {
const shards = result["shards"];
for (let shardName in shards) {
- assertStatsInOutput(shards[shardName]);
+ assertStatsInOutput(shards[shardName], assertExecutionStatsCallback);
}
} else {
- assertStatsInOutput(result);
+ assertStatsInOutput(result, assertExecutionStatsCallback);
}
}
for (let pipeline of [pipelineShardedStages, pipelineNoShardedStages, facet]) {
- checkResults(coll.explain("executionStats").aggregate(pipeline));
- checkResults(coll.explain("allPlansExecution").aggregate(pipeline));
+ checkResults(coll.explain("executionStats").aggregate(pipeline),
+ assertStageExecutionStatsPresent);
+ checkResults(coll.explain("allPlansExecution").aggregate(pipeline),
+ assertStageExecutionStatsPresent);
}
// Only test $changeStream if we are on a replica set or on a sharded cluster.
if (FixtureHelpers.isReplSet(db) || FixtureHelpers.isSharded(coll)) {
- checkResults(coll.explain("executionStats").aggregate(changeStream));
- checkResults(coll.explain("allPlansExecution").aggregate(changeStream));
+ checkResults(coll.explain("executionStats").aggregate(changeStream),
+ assertStageExecutionStatsPresent);
+ checkResults(coll.explain("allPlansExecution").aggregate(changeStream),
+ assertStageExecutionStatsPresent);
}
// Returns the number of documents
@@ -128,4 +138,23 @@ function numberOfDocsReturnedByMatchStage(explain) {
const matchPipeline = [{$_internalInhibitOptimization: {}}, {$match: {a: {$gte: 500}}}];
assert.eq(numberOfDocsReturnedByMatchStage(coll.explain("executionStats").aggregate(matchPipeline)),
500);
+
+// Checks $group totalOutputDataSizeBytes execution statistic.
+(function testGroupStatTotalDataSizeBytes() {
+ const pipeline = [{$group: {_id: null, count: {$sum: 1}}}];
+ const result = coll.explain("executionStats").aggregate(pipeline);
+
+ let assertOutputBytesSize = function(stage) {
+ if (stage.hasOwnProperty("$group")) {
+ assert(stage.hasOwnProperty("totalOutputDataSizeBytes"), stage);
+
+ // A heurisitic size in bytes processed by $group to generate the output '{ "_id" :
+ // null, "count" : 1000 }'. The size is the approximate value of internal document size
+ // used by $group.
+ const approximateOutputDocSizeBytes = 500;
+ assert(stage.totalOutputDataSizeBytes <= approximateOutputDocSizeBytes);
+ }
+ };
+ checkResults(result, assertOutputBytesSize);
+})();
}());
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index b411b36647f..19335e3d16a 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -851,8 +851,8 @@ struct GroupStats : public SpecificStats {
return sizeof(*this);
}
- // The amount of data in bytes that were the input to the group stage.
- size_t memoryUsageBytes = 0;
+ // Tracks an estimate of the total size of all documents output by the group stage in bytes.
+ size_t totalOutputDataSizeBytes = 0;
// Flag to specify if data was spilled to disk while grouping the data.
bool usedDisk = false;
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 10741308735..f056395b8a3 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -134,16 +134,16 @@ const char* DocumentSourceGroup::getSourceName() const {
bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory) {
if (!_memoryTracker.allowDiskUse &&
- (_stats.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) {
- _stats.memoryUsageBytes -= saveMemory();
+ (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) {
+ _memoryTracker.memoryUsageBytes -= saveMemory();
}
- if (_stats.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) {
+ if (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) {
uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
_memoryTracker.allowDiskUse);
- _stats.memoryUsageBytes = 0;
+ _memoryTracker.memoryUsageBytes = 0;
return true;
}
return false;
@@ -318,8 +318,8 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity>
}
out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue());
- out["totalDataSizeGroupedBytesEstimate"] =
- Value(static_cast<long long>(_stats.memoryUsageBytes));
+ out["totalOutputDataSizeBytes"] =
+ Value(static_cast<long long>(_stats.totalOutputDataSizeBytes));
out["usedDisk"] = Value(_stats.usedDisk);
}
@@ -563,7 +563,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
vector<uint64_t> oldAccumMemUsage(numAccumulators, 0);
if (inserted) {
- _stats.memoryUsageBytes += id.getApproximateSize();
+ _memoryTracker.memoryUsageBytes += id.getApproximateSize();
// Initialize and add the accumulators
Value expandedId = expandId(id);
@@ -580,7 +580,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
} else {
for (size_t i = 0; i < group.size(); i++) {
// subtract old mem usage. New usage added back after processing.
- _stats.memoryUsageBytes -= group[i]->memUsageForSorter();
+ _memoryTracker.memoryUsageBytes -= group[i]->memUsageForSorter();
oldAccumMemUsage[i] = group[i]->memUsageForSorter();
}
}
@@ -593,7 +593,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
_accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables),
_doingMerge);
- _stats.memoryUsageBytes += group[i]->memUsageForSorter();
+ _memoryTracker.memoryUsageBytes += group[i]->memUsageForSorter();
_memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes +=
group[i]->memUsageForSorter() - oldAccumMemUsage[i];
}
@@ -766,6 +766,7 @@ Document DocumentSourceGroup::makeDocument(const Value& id,
}
}
+ _stats.totalOutputDataSizeBytes += out.getApproximateSize();
return out.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 2ec9afd4138..67fcd9f6e9e 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -159,13 +159,10 @@ public:
/**
* Returns true if this $group stage used disk during execution and false otherwise.
*/
- bool usedDisk() {
+ bool usedDisk() final {
return _stats.usedDisk;
}
- /**
- * Returns $group stage specific stats.
- */
const SpecificStats* getSpecificStats() const final {
return &_stats;
}
@@ -202,6 +199,9 @@ private:
const bool allowDiskUse;
const size_t maxMemoryUsageBytes;
+
+ // Tracks current memory used. This variable will be reset if data is spilled to disk.
+ size_t memoryUsageBytes = 0;
// Tracks memory consumption per accumulation statement.
std::vector<AccumStatementMemoryTracker> accumStatementMemoryBytes;
};
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index b3c3417353b..134896a4938 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -151,8 +151,8 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
limit = getLimit();
- // Since $sort is not guaranteed to be stable, we can blindly remove the first $sort only
- // when there's no limit on the current sort.
+ // Since $sort is not guaranteed to be stable, we can blindly remove the first $sort only when
+ // there's no limit on the current sort.
auto nextSort = dynamic_cast<DocumentSourceSort*>((*nextStage).get());
if (!limit && nextSort) {
container->erase(itr);
@@ -224,9 +224,9 @@ void DocumentSourceSort::loadDocument(Document&& doc) {
Value sortKey;
Document docForSorter;
- // We always need to extract the sort key if we've reached this point. If the query system
- // had already computed the sort key we'd have split the pipeline there, would be merging
- // presorted documents, and wouldn't use this method.
+ // We always need to extract the sort key if we've reached this point. If the query system had
+ // already computed the sort key we'd have split the pipeline there, would be merging presorted
+ // documents, and wouldn't use this method.
std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc));
_sortExecutor->add(sortKey, docForSorter);
}
@@ -247,8 +247,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
Value sortKey = _sortKeyGen->computeSortKeyFromDocument(doc);
if (pExpCtx->needsMerge) {
- // If this sort stage is part of a merged pipeline, make sure that each Document's sort
- // key gets saved with its metadata.
+ // If this sort stage is part of a merged pipeline, make sure that each Document's sort key
+ // gets saved with its metadata.
MutableDocument toBeSorted(std::move(doc));
toBeSorted.metadata().setSortKey(sortKey, _sortKeyGen->isSingleElementKey());
@@ -272,12 +272,12 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri
bool DocumentSourceSort::canRunInParallelBeforeWriteStage(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
- // This is an interesting special case. If there are no further stages which require merging
- // the streams into one, a $sort should not require it. This is only the case because the
- // sort order doesn't matter for a pipeline ending with a write stage. We may encounter it
- // here as an intermediate stage before a final $group with a $sort, which would make sense.
- // Should we extend our analysis to detect if an exchange is appropriate in a general
- // pipeline, a $sort would generally require merging the streams before producing output.
+ // This is an interesting special case. If there are no further stages which require merging the
+ // streams into one, a $sort should not require it. This is only the case because the sort order
+ // doesn't matter for a pipeline ending with a write stage. We may encounter it here as an
+ // intermediate stage before a final $group with a $sort, which would make sense. Should we
+ // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort
+ // would generally require merging the streams before producing output.
return false;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 2fdf7be2759..a47c50fba6a 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -134,9 +134,6 @@ public:
return _sortExecutor->hasLimit();
}
- /**
- * Returns specific stats for $sort stage.
- */
const SpecificStats* getSpecificStats() const final {
return &_sortExecutor->stats();
}