diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2022-09-15 13:23:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-15 14:32:14 +0000 |
commit | 36420773c0fa0b76c6d756cec0cc6349e321e8c2 (patch) | |
tree | 2d9fad103e8412c4c6cf793669ffa57909cf4eff | |
parent | 9b5cbbbc2d3cae926689e4e632daa73062bb1432 (diff) | |
download | mongo-36420773c0fa0b76c6d756cec0cc6349e321e8c2.tar.gz |
SERVER-69069 Added 'doOptimizeAt' method for $_internalAllCollectionStats
4 files changed, 199 insertions, 3 deletions
diff --git a/jstests/sharding/sharded_data_distribution.js b/jstests/sharding/sharded_data_distribution.js index a4d24d0c702..30a06a634f1 100644 --- a/jstests/sharding/sharded_data_distribution.js +++ b/jstests/sharding/sharded_data_distribution.js @@ -117,6 +117,57 @@ const response = assert.commandFailedWithCode( assert.neq(-1, response.errmsg.indexOf("$shardedDataDistribution"), response.errmsg); assert.neq(-1, response.errmsg.indexOf("admin database"), response.errmsg); +// Test $shardedDataDistribution followed by a $match stage on the 'ns'. +assert.eq(1, adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1}}]).itcount()); +assert.eq(2, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: {$in: [ns1, ns2]}}}]) + .itcount()); +assert.eq(0, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: 'test.IDoNotExist'}}]) + .itcount()); + +// Test $shardedDataDistribution followed by a $match stage on the 'ns' and something else. +assert.eq( + 1, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 2}}}]) + .itcount()); +assert.eq( + 0, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 50}}}]) + .itcount()); + +// Test $shardedDataDistribution followed by a $match stage on the 'ns' and other match stages. +assert.eq( + 1, + adminDb + .aggregate( + [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 2}}}]) + .itcount()); +assert.eq( + 0, + adminDb + .aggregate( + [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 50}}}]) + .itcount()); +assert.eq(1, + adminDb + .aggregate([ + {$shardedDataDistribution: {}}, + {$match: {ns: /^test/}}, + {$match: {shards: {$size: 2}}}, + {$match: {ns: /foo$/}}, + ]) + .itcount()); + +// Test $shardedDataDistribution followed by a $match stage unrelated to 'ns'. +assert.eq( + 0, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 50}}}]).itcount()); + +assert.neq( + 0, + adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 2}}}]).itcount()); + st.stop(); // Test that verifies the behavior in unsharded deployments diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp index 3e22cc826e9..f37206dbafe 100644 --- a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp +++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp @@ -29,7 +29,6 @@ #include "mongo/db/pipeline/document_source_internal_all_collection_stats.h" - namespace mongo { using boost::intrusive_ptr; @@ -55,6 +54,13 @@ DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNex NamespaceString nss(obj["ns"].String()); _catalogDocs->pop_front(); + + // Avoid computing stats for collections that do not match the absorbed filter on the 'ns' + // field. + if (_absorbedMatch && !_absorbedMatch->getMatchExpression()->matchesBSON(std::move(obj))) { + continue; + } + try { return {Document{DocumentSourceCollStats::makeStatsForNs( pExpCtx, nss, _internalAllCollectionStatsSpec.getStats().get())}}; @@ -67,6 +73,66 @@ DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNex return GetNextResult::makeEOF(); } +Pipeline::SourceContainer::iterator DocumentSourceInternalAllCollectionStats::doOptimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + if (std::next(itr) == container->end()) { + return container->end(); + } + + // Attempt to internalize any predicates of a $match upon the "ns" field. + auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); + + if (!nextMatch) { + return std::next(itr); + } + + auto splitMatch = std::move(*nextMatch).splitSourceBy({"ns"}, {}); + invariant(splitMatch.first || splitMatch.second); + + // Remove the original $match. + container->erase(std::next(itr)); + + // Absorb the part of $match that is dependant on 'ns' + if (splitMatch.second) { + if (!_absorbedMatch) { + _absorbedMatch = std::move(splitMatch.second); + } else { + // We have already absorbed a $match. We need to join it with splitMatch.second. + _absorbedMatch->joinMatchWith(std::move(splitMatch.second)); + } + } + + // splitMatch.first is independent of 'ns'. Put it back on the pipeline. + if (splitMatch.first) { + container->insert(std::next(itr), std::move(splitMatch.first)); + return std::next(itr); + } else { + // There may be further optimization between this stage and the new neighbor, so we return + // an iterator pointing to ourself. + return itr; + } +} + +void DocumentSourceInternalAllCollectionStats::serializeToArray( + std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { + if (explain) { + BSONObjBuilder bob; + _internalAllCollectionStatsSpec.serialize(&bob); + if (_absorbedMatch) { + bob.append("match", _absorbedMatch->getQuery()); + } + auto doc = Document{{getSourceName(), bob.obj()}}; + array.push_back(Value(doc)); + } else { + array.push_back(serialize(explain)); + if (_absorbedMatch) { + _absorbedMatch->serializeToArray(array); + } + } +} + intrusive_ptr<DocumentSource> DocumentSourceInternalAllCollectionStats::createFromBsonInternal( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(6789103, diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h index f3c89bbf93a..491b6354c3f 100644 --- a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h +++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_coll_stats.h" #include "mongo/db/pipeline/document_source_internal_all_collection_stats_gen.h" +#include "mongo/db/pipeline/document_source_match.h" namespace mongo { @@ -106,6 +107,13 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBsonInternal( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + void serializeToArray( + std::vector<Value>& array, + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + private: GetNextResult doGetNext() final; @@ -113,5 +121,9 @@ private: // options. DocumentSourceInternalAllCollectionStatsSpec _internalAllCollectionStatsSpec; boost::optional<std::deque<BSONObj>> _catalogDocs; + + // A $match stage can be absorbed in order to avoid unnecessarily computing the stats for + // collections that do not match that predicate. + boost::intrusive_ptr<DocumentSourceMatch> _absorbedMatch; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 8e357b6caa7..62c89ef192f 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -75,6 +75,7 @@ using std::string; using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); +const NamespaceString kAdminCollectionlessNss = NamespaceString("admin.$cmd.aggregate"); constexpr size_t getChangeStreamStageSize() { return 6; @@ -114,7 +115,8 @@ class StubExplainInterface : public StubMongoProcessInterface { }; void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, std::string outputPipeJson, - std::string serializedPipeJson) { + std::string serializedPipeJson, + NamespaceString aggNss = kTestNss) { QueryTestServiceContext testServiceContext; auto opCtx = testServiceContext.makeOperationContext(); @@ -128,7 +130,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, ASSERT_EQUALS(stageElem.type(), BSONType::Object); rawPipeline.push_back(stageElem.embeddedObject()); } - AggregateCommandRequest request(kTestNss, rawPipeline); + AggregateCommandRequest request(aggNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(opCtx.get(), request); ctx->mongoProcessInterface = std::make_shared<StubExplainInterface>(); @@ -3116,6 +3118,71 @@ TEST(PipelineOptimizationTest, MatchGetsPushedIntoBothChildrenOfUnion) { "]"); } +TEST(PipelineOptimizationTest, internalAllCollectionStatsAbsorbsMatchOnNs) { + std::string inputPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {ns: 'test.foo', a: 10}}" + "]"; + std::string outputPipe = + "[" + " {$_internalAllCollectionStats: {match: {ns: {$eq: 'test.foo'}}}}," + " {$match: {a: {$eq: 10}}}" + "]"; + std::string serializedPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {ns: {$eq: 'test.foo'}}}," + " {$match: {a: {$eq: 10}}}" + "]"; + assertPipelineOptimizesAndSerializesTo( + inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss); +} + +TEST(PipelineOptimizationTest, internalAllCollectionStatsAbsorbsSeveralMatchesOnNs) { + std::string inputPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {ns: {$gt: 0}}}," + " {$match: {a: 10}}," + " {$match: {ns: {$ne: 5}}}" + "]"; + std::string outputPipe = + "[" + " {$_internalAllCollectionStats: {match: {$and: [{ns: {$gt: 0}}, {ns: {$not: {$eq: " + "5}}}]}}}," + " {$match: {a: {$eq: 10}}}" + "]"; + std::string serializedPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {$and: [{ns: {$gt: 0}}, {ns: {$not: {$eq: 5}}}]}}," + " {$match: {a: {$eq: 10}}}" + "]"; + assertPipelineOptimizesAndSerializesTo( + inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss); +} + +TEST(PipelineOptimizationTest, internalAllCollectionStatsDoesNotAbsorbMatchNotOnNs) { + std::string inputPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {a: 10}}" + "]"; + std::string outputPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {a: {$eq: 10}}}" + "]"; + std::string serializedPipe = + "[" + " {$_internalAllCollectionStats: {}}," + " {$match: {a: 10}}" + "]"; + assertPipelineOptimizesAndSerializesTo( + inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss); +} + TEST(PipelineOptimizationTest, ProjectGetsPushedIntoBothChildrenOfUnion) { assertPipelineOptimizesTo( "[" |