diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2021-01-15 07:15:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-15 19:53:42 +0000 |
commit | 601e94d2edd7a07ca5dd3a66bf3bebaaf7597b4a (patch) | |
tree | e5db167e67e817702d2d83097f7df434a6b3f566 | |
parent | c7d912cbf5d1c2e0cd39f1fe0619435f17034f17 (diff) | |
download | mongo-601e94d2edd7a07ca5dd3a66bf3bebaaf7597b4a.tar.gz |
SERVER-53461 totalDataSizeGroupedBytesEstimate stats for $group should report total bytes processed
-rw-r--r-- | jstests/aggregation/explain_per_stage_exec_stats.js | 57 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 3 |
6 files changed, 72 insertions, 45 deletions
diff --git a/jstests/aggregation/explain_per_stage_exec_stats.js b/jstests/aggregation/explain_per_stage_exec_stats.js index eace27b88d7..f7f65121a2d 100644 --- a/jstests/aggregation/explain_per_stage_exec_stats.js +++ b/jstests/aggregation/explain_per_stage_exec_stats.js @@ -51,27 +51,33 @@ const facet = [{$facet: {a: pipelineShardedStages, b: pipelineNoShardedStages}}] // Verify behavior of $changeStream, which generates several internal stages. const changeStream = [{$changeStream: {}}]; -function assertExecutionStats(stage) { - assert(stage.hasOwnProperty("nReturned")); - assert(stage.hasOwnProperty("executionTimeMillisEstimate")); - +// Checks if a particular stage has expected statistics. +function assertStageExecutionStatsPresent(stage) { if (stage.hasOwnProperty("$sort")) { assert(stage.hasOwnProperty("totalDataSizeSortedBytesEstimate"), stage); assert(stage.hasOwnProperty("usedDisk"), stage); } else if (stage.hasOwnProperty("$group")) { - assert(stage.hasOwnProperty("totalDataSizeGroupedBytesEstimate"), stage); + assert(stage.hasOwnProperty("totalOutputDataSizeBytes"), stage); assert(stage.hasOwnProperty("usedDisk"), stage); } } -function assertStatsInOutput(explain) { +function assertExecutionStats(stage, assertExecutionStatsCallback) { + assert(stage.hasOwnProperty("nReturned")); + assert(stage.hasOwnProperty("executionTimeMillisEstimate")); + + assert.neq(assertExecutionStatsCallback, null); + assertExecutionStatsCallback(stage); +} + +function assertStatsInOutput(explain, assertExecutionStatsCallback) { // Depending on how the pipeline is split, the explain output from each shard can contain either // of these. assert(explain.hasOwnProperty("stages") || explain.hasOwnProperty("queryPlanner")); if (explain.hasOwnProperty("stages")) { const stages = explain["stages"]; for (const stage of stages) { - assertExecutionStats(stage); + assertExecutionStats(stage, assertExecutionStatsCallback); } } else { // If we don't have a list of stages, "executionStats" should still contain "nReturned" @@ -88,29 +94,33 @@ function assertStatsInOutput(explain) { } } -function checkResults(result) { +function checkResults(result, assertExecutionStatsCallback) { // Loop over shards in sharded case. // Note that we do not expect execution statistics for the 'splitPipeline' or 'mergingPart' // of explain output in the sharded case, so we only check the 'shards' part of explain. if (result.hasOwnProperty("shards")) { const shards = result["shards"]; for (let shardName in shards) { - assertStatsInOutput(shards[shardName]); + assertStatsInOutput(shards[shardName], assertExecutionStatsCallback); } } else { - assertStatsInOutput(result); + assertStatsInOutput(result, assertExecutionStatsCallback); } } for (let pipeline of [pipelineShardedStages, pipelineNoShardedStages, facet]) { - checkResults(coll.explain("executionStats").aggregate(pipeline)); - checkResults(coll.explain("allPlansExecution").aggregate(pipeline)); + checkResults(coll.explain("executionStats").aggregate(pipeline), + assertStageExecutionStatsPresent); + checkResults(coll.explain("allPlansExecution").aggregate(pipeline), + assertStageExecutionStatsPresent); } // Only test $changeStream if we are on a replica set or on a sharded cluster. if (FixtureHelpers.isReplSet(db) || FixtureHelpers.isSharded(coll)) { - checkResults(coll.explain("executionStats").aggregate(changeStream)); - checkResults(coll.explain("allPlansExecution").aggregate(changeStream)); + checkResults(coll.explain("executionStats").aggregate(changeStream), + assertStageExecutionStatsPresent); + checkResults(coll.explain("allPlansExecution").aggregate(changeStream), + assertStageExecutionStatsPresent); } // Returns the number of documents @@ -128,4 +138,23 @@ function numberOfDocsReturnedByMatchStage(explain) { const matchPipeline = [{$_internalInhibitOptimization: {}}, {$match: {a: {$gte: 500}}}]; assert.eq(numberOfDocsReturnedByMatchStage(coll.explain("executionStats").aggregate(matchPipeline)), 500); + +// Checks $group totalOutputDataSizeBytes execution statistic. +(function testGroupStatTotalDataSizeBytes() { + const pipeline = [{$group: {_id: null, count: {$sum: 1}}}]; + const result = coll.explain("executionStats").aggregate(pipeline); + + let assertOutputBytesSize = function(stage) { + if (stage.hasOwnProperty("$group")) { + assert(stage.hasOwnProperty("totalOutputDataSizeBytes"), stage); + + // A heurisitic size in bytes processed by $group to generate the output '{ "_id" : + // null, "count" : 1000 }'. The size is the approximate value of internal document size + // used by $group. + const approximateOutputDocSizeBytes = 500; + assert(stage.totalOutputDataSizeBytes <= approximateOutputDocSizeBytes); + } + }; + checkResults(result, assertOutputBytesSize); +})(); }()); diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index b411b36647f..19335e3d16a 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -851,8 +851,8 @@ struct GroupStats : public SpecificStats { return sizeof(*this); } - // The amount of data in bytes that were the input to the group stage. - size_t memoryUsageBytes = 0; + // Tracks an estimate of the total size of all documents output by the group stage in bytes. + size_t totalOutputDataSizeBytes = 0; // Flag to specify if data was spilled to disk while grouping the data. bool usedDisk = false; diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 10741308735..f056395b8a3 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -134,16 +134,16 @@ const char* DocumentSourceGroup::getSourceName() const { bool DocumentSourceGroup::shouldSpillWithAttemptToSaveMemory(std::function<int()> saveMemory) { if (!_memoryTracker.allowDiskUse && - (_stats.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) { - _stats.memoryUsageBytes -= saveMemory(); + (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes)) { + _memoryTracker.memoryUsageBytes -= saveMemory(); } - if (_stats.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) { + if (_memoryTracker.memoryUsageBytes > _memoryTracker.maxMemoryUsageBytes) { uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, "Exceeded memory limit for $group, but didn't allow external sort." " Pass allowDiskUse:true to opt in.", _memoryTracker.allowDiskUse); - _stats.memoryUsageBytes = 0; + _memoryTracker.memoryUsageBytes = 0; return true; } return false; @@ -318,8 +318,8 @@ Value DocumentSourceGroup::serialize(boost::optional<ExplainOptions::Verbosity> } out["maxAccumulatorMemoryUsageBytes"] = Value(md.freezeToValue()); - out["totalDataSizeGroupedBytesEstimate"] = - Value(static_cast<long long>(_stats.memoryUsageBytes)); + out["totalOutputDataSizeBytes"] = + Value(static_cast<long long>(_stats.totalOutputDataSizeBytes)); out["usedDisk"] = Value(_stats.usedDisk); } @@ -563,7 +563,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { vector<uint64_t> oldAccumMemUsage(numAccumulators, 0); if (inserted) { - _stats.memoryUsageBytes += id.getApproximateSize(); + _memoryTracker.memoryUsageBytes += id.getApproximateSize(); // Initialize and add the accumulators Value expandedId = expandId(id); @@ -580,7 +580,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { } else { for (size_t i = 0; i < group.size(); i++) { // subtract old mem usage. New usage added back after processing. - _stats.memoryUsageBytes -= group[i]->memUsageForSorter(); + _memoryTracker.memoryUsageBytes -= group[i]->memUsageForSorter(); oldAccumMemUsage[i] = group[i]->memUsageForSorter(); } } @@ -593,7 +593,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { _accumulatedFields[i].expr.argument->evaluate(rootDocument, &pExpCtx->variables), _doingMerge); - _stats.memoryUsageBytes += group[i]->memUsageForSorter(); + _memoryTracker.memoryUsageBytes += group[i]->memUsageForSorter(); _memoryTracker.accumStatementMemoryBytes[i].currentMemoryBytes += group[i]->memUsageForSorter() - oldAccumMemUsage[i]; } @@ -766,6 +766,7 @@ Document DocumentSourceGroup::makeDocument(const Value& id, } } + _stats.totalOutputDataSizeBytes += out.getApproximateSize(); return out.freeze(); } diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 2ec9afd4138..67fcd9f6e9e 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -159,13 +159,10 @@ public: /** * Returns true if this $group stage used disk during execution and false otherwise. */ - bool usedDisk() { + bool usedDisk() final { return _stats.usedDisk; } - /** - * Returns $group stage specific stats. - */ const SpecificStats* getSpecificStats() const final { return &_stats; } @@ -202,6 +199,9 @@ private: const bool allowDiskUse; const size_t maxMemoryUsageBytes; + + // Tracks current memory used. This variable will be reset if data is spilled to disk. + size_t memoryUsageBytes = 0; // Tracks memory consumption per accumulation statement. std::vector<AccumStatementMemoryTracker> accumStatementMemoryBytes; }; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index b3c3417353b..134896a4938 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -151,8 +151,8 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( limit = getLimit(); - // Since $sort is not guaranteed to be stable, we can blindly remove the first $sort only - // when there's no limit on the current sort. + // Since $sort is not guaranteed to be stable, we can blindly remove the first $sort only when + // there's no limit on the current sort. auto nextSort = dynamic_cast<DocumentSourceSort*>((*nextStage).get()); if (!limit && nextSort) { container->erase(itr); @@ -224,9 +224,9 @@ void DocumentSourceSort::loadDocument(Document&& doc) { Value sortKey; Document docForSorter; - // We always need to extract the sort key if we've reached this point. If the query system - // had already computed the sort key we'd have split the pipeline there, would be merging - // presorted documents, and wouldn't use this method. + // We always need to extract the sort key if we've reached this point. If the query system had + // already computed the sort key we'd have split the pipeline there, would be merging presorted + // documents, and wouldn't use this method. std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc)); _sortExecutor->add(sortKey, docForSorter); } @@ -247,8 +247,8 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co Value sortKey = _sortKeyGen->computeSortKeyFromDocument(doc); if (pExpCtx->needsMerge) { - // If this sort stage is part of a merged pipeline, make sure that each Document's sort - // key gets saved with its metadata. + // If this sort stage is part of a merged pipeline, make sure that each Document's sort key + // gets saved with its metadata. MutableDocument toBeSorted(std::move(doc)); toBeSorted.metadata().setSortKey(sortKey, _sortKeyGen->isSingleElementKey()); @@ -272,12 +272,12 @@ boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distri bool DocumentSourceSort::canRunInParallelBeforeWriteStage( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { - // This is an interesting special case. If there are no further stages which require merging - // the streams into one, a $sort should not require it. This is only the case because the - // sort order doesn't matter for a pipeline ending with a write stage. We may encounter it - // here as an intermediate stage before a final $group with a $sort, which would make sense. - // Should we extend our analysis to detect if an exchange is appropriate in a general - // pipeline, a $sort would generally require merging the streams before producing output. + // This is an interesting special case. If there are no further stages which require merging the + // streams into one, a $sort should not require it. This is only the case because the sort order + // doesn't matter for a pipeline ending with a write stage. We may encounter it here as an + // intermediate stage before a final $group with a $sort, which would make sense. Should we + // extend our analysis to detect if an exchange is appropriate in a general pipeline, a $sort + // would generally require merging the streams before producing output. return false; } diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 2fdf7be2759..a47c50fba6a 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -134,9 +134,6 @@ public: return _sortExecutor->hasLimit(); } - /** - * Returns specific stats for $sort stage. - */ const SpecificStats* getSpecificStats() const final { return &_sortExecutor->stats(); } |