summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-09-15 13:23:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 14:32:14 +0000
commit36420773c0fa0b76c6d756cec0cc6349e321e8c2 (patch)
tree2d9fad103e8412c4c6cf793669ffa57909cf4eff
parent9b5cbbbc2d3cae926689e4e632daa73062bb1432 (diff)
downloadmongo-36420773c0fa0b76c6d756cec0cc6349e321e8c2.tar.gz
SERVER-69069 Added 'doOptimizeAt' method for $_internalAllCollectionStats
-rw-r--r--jstests/sharding/sharded_data_distribution.js51
-rw-r--r--src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp68
-rw-r--r--src/mongo/db/pipeline/document_source_internal_all_collection_stats.h12
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp71
4 files changed, 199 insertions, 3 deletions
diff --git a/jstests/sharding/sharded_data_distribution.js b/jstests/sharding/sharded_data_distribution.js
index a4d24d0c702..30a06a634f1 100644
--- a/jstests/sharding/sharded_data_distribution.js
+++ b/jstests/sharding/sharded_data_distribution.js
@@ -117,6 +117,57 @@ const response = assert.commandFailedWithCode(
assert.neq(-1, response.errmsg.indexOf("$shardedDataDistribution"), response.errmsg);
assert.neq(-1, response.errmsg.indexOf("admin database"), response.errmsg);
+// Test $shardedDataDistribution followed by a $match stage on the 'ns'.
+assert.eq(1, adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1}}]).itcount());
+assert.eq(2,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: {$in: [ns1, ns2]}}}])
+ .itcount());
+assert.eq(0,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: 'test.IDoNotExist'}}])
+ .itcount());
+
+// Test $shardedDataDistribution followed by a $match stage on the 'ns' and something else.
+assert.eq(
+ 1,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 2}}}])
+ .itcount());
+assert.eq(
+ 0,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {ns: ns1, shards: {$size: 50}}}])
+ .itcount());
+
+// Test $shardedDataDistribution followed by a $match stage on the 'ns' and other match stages.
+assert.eq(
+ 1,
+ adminDb
+ .aggregate(
+ [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 2}}}])
+ .itcount());
+assert.eq(
+ 0,
+ adminDb
+ .aggregate(
+ [{$shardedDataDistribution: {}}, {$match: {ns: ns1}}, {$match: {shards: {$size: 50}}}])
+ .itcount());
+assert.eq(1,
+ adminDb
+ .aggregate([
+ {$shardedDataDistribution: {}},
+ {$match: {ns: /^test/}},
+ {$match: {shards: {$size: 2}}},
+ {$match: {ns: /foo$/}},
+ ])
+ .itcount());
+
+// Test $shardedDataDistribution followed by a $match stage unrelated to 'ns'.
+assert.eq(
+ 0,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 50}}}]).itcount());
+
+assert.neq(
+ 0,
+ adminDb.aggregate([{$shardedDataDistribution: {}}, {$match: {shards: {$size: 2}}}]).itcount());
+
st.stop();
// Test that verifies the behavior in unsharded deployments
diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp
index 3e22cc826e9..f37206dbafe 100644
--- a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.cpp
@@ -29,7 +29,6 @@
#include "mongo/db/pipeline/document_source_internal_all_collection_stats.h"
-
namespace mongo {
using boost::intrusive_ptr;
@@ -55,6 +54,13 @@ DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNex
NamespaceString nss(obj["ns"].String());
_catalogDocs->pop_front();
+
+ // Avoid computing stats for collections that do not match the absorbed filter on the 'ns'
+ // field.
+ if (_absorbedMatch && !_absorbedMatch->getMatchExpression()->matchesBSON(std::move(obj))) {
+ continue;
+ }
+
try {
return {Document{DocumentSourceCollStats::makeStatsForNs(
pExpCtx, nss, _internalAllCollectionStatsSpec.getStats().get())}};
@@ -67,6 +73,66 @@ DocumentSource::GetNextResult DocumentSourceInternalAllCollectionStats::doGetNex
return GetNextResult::makeEOF();
}
+Pipeline::SourceContainer::iterator DocumentSourceInternalAllCollectionStats::doOptimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ if (std::next(itr) == container->end()) {
+ return container->end();
+ }
+
+ // Attempt to internalize any predicates of a $match upon the "ns" field.
+ auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
+
+ if (!nextMatch) {
+ return std::next(itr);
+ }
+
+ auto splitMatch = std::move(*nextMatch).splitSourceBy({"ns"}, {});
+ invariant(splitMatch.first || splitMatch.second);
+
+ // Remove the original $match.
+ container->erase(std::next(itr));
+
+ // Absorb the part of $match that is dependant on 'ns'
+ if (splitMatch.second) {
+ if (!_absorbedMatch) {
+ _absorbedMatch = std::move(splitMatch.second);
+ } else {
+ // We have already absorbed a $match. We need to join it with splitMatch.second.
+ _absorbedMatch->joinMatchWith(std::move(splitMatch.second));
+ }
+ }
+
+ // splitMatch.first is independent of 'ns'. Put it back on the pipeline.
+ if (splitMatch.first) {
+ container->insert(std::next(itr), std::move(splitMatch.first));
+ return std::next(itr);
+ } else {
+ // There may be further optimization between this stage and the new neighbor, so we return
+ // an iterator pointing to ourself.
+ return itr;
+ }
+}
+
+void DocumentSourceInternalAllCollectionStats::serializeToArray(
+ std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ if (explain) {
+ BSONObjBuilder bob;
+ _internalAllCollectionStatsSpec.serialize(&bob);
+ if (_absorbedMatch) {
+ bob.append("match", _absorbedMatch->getQuery());
+ }
+ auto doc = Document{{getSourceName(), bob.obj()}};
+ array.push_back(Value(doc));
+ } else {
+ array.push_back(serialize(explain));
+ if (_absorbedMatch) {
+ _absorbedMatch->serializeToArray(array);
+ }
+ }
+}
+
intrusive_ptr<DocumentSource> DocumentSourceInternalAllCollectionStats::createFromBsonInternal(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(6789103,
diff --git a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h
index f3c89bbf93a..491b6354c3f 100644
--- a/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h
+++ b/src/mongo/db/pipeline/document_source_internal_all_collection_stats.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_coll_stats.h"
#include "mongo/db/pipeline/document_source_internal_all_collection_stats_gen.h"
+#include "mongo/db/pipeline/document_source_match.h"
namespace mongo {
@@ -106,6 +107,13 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBsonInternal(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
+ void serializeToArray(
+ std::vector<Value>& array,
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
private:
GetNextResult doGetNext() final;
@@ -113,5 +121,9 @@ private:
// options.
DocumentSourceInternalAllCollectionStatsSpec _internalAllCollectionStatsSpec;
boost::optional<std::deque<BSONObj>> _catalogDocs;
+
+ // A $match stage can be absorbed in order to avoid unnecessarily computing the stats for
+ // collections that do not match that predicate.
+ boost::intrusive_ptr<DocumentSourceMatch> _absorbedMatch;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 8e357b6caa7..62c89ef192f 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -75,6 +75,7 @@ using std::string;
using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
+const NamespaceString kAdminCollectionlessNss = NamespaceString("admin.$cmd.aggregate");
constexpr size_t getChangeStreamStageSize() {
return 6;
@@ -114,7 +115,8 @@ class StubExplainInterface : public StubMongoProcessInterface {
};
void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
std::string outputPipeJson,
- std::string serializedPipeJson) {
+ std::string serializedPipeJson,
+ NamespaceString aggNss = kTestNss) {
QueryTestServiceContext testServiceContext;
auto opCtx = testServiceContext.makeOperationContext();
@@ -128,7 +130,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
ASSERT_EQUALS(stageElem.type(), BSONType::Object);
rawPipeline.push_back(stageElem.embeddedObject());
}
- AggregateCommandRequest request(kTestNss, rawPipeline);
+ AggregateCommandRequest request(aggNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(opCtx.get(), request);
ctx->mongoProcessInterface = std::make_shared<StubExplainInterface>();
@@ -3116,6 +3118,71 @@ TEST(PipelineOptimizationTest, MatchGetsPushedIntoBothChildrenOfUnion) {
"]");
}
+TEST(PipelineOptimizationTest, internalAllCollectionStatsAbsorbsMatchOnNs) {
+ std::string inputPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {ns: 'test.foo', a: 10}}"
+ "]";
+ std::string outputPipe =
+ "["
+ " {$_internalAllCollectionStats: {match: {ns: {$eq: 'test.foo'}}}},"
+ " {$match: {a: {$eq: 10}}}"
+ "]";
+ std::string serializedPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {ns: {$eq: 'test.foo'}}},"
+ " {$match: {a: {$eq: 10}}}"
+ "]";
+ assertPipelineOptimizesAndSerializesTo(
+ inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss);
+}
+
+TEST(PipelineOptimizationTest, internalAllCollectionStatsAbsorbsSeveralMatchesOnNs) {
+ std::string inputPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {ns: {$gt: 0}}},"
+ " {$match: {a: 10}},"
+ " {$match: {ns: {$ne: 5}}}"
+ "]";
+ std::string outputPipe =
+ "["
+ " {$_internalAllCollectionStats: {match: {$and: [{ns: {$gt: 0}}, {ns: {$not: {$eq: "
+ "5}}}]}}},"
+ " {$match: {a: {$eq: 10}}}"
+ "]";
+ std::string serializedPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {$and: [{ns: {$gt: 0}}, {ns: {$not: {$eq: 5}}}]}},"
+ " {$match: {a: {$eq: 10}}}"
+ "]";
+ assertPipelineOptimizesAndSerializesTo(
+ inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss);
+}
+
+TEST(PipelineOptimizationTest, internalAllCollectionStatsDoesNotAbsorbMatchNotOnNs) {
+ std::string inputPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {a: 10}}"
+ "]";
+ std::string outputPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {a: {$eq: 10}}}"
+ "]";
+ std::string serializedPipe =
+ "["
+ " {$_internalAllCollectionStats: {}},"
+ " {$match: {a: 10}}"
+ "]";
+ assertPipelineOptimizesAndSerializesTo(
+ inputPipe, outputPipe, serializedPipe, kAdminCollectionlessNss);
+}
+
TEST(PipelineOptimizationTest, ProjectGetsPushedIntoBothChildrenOfUnion) {
assertPipelineOptimizesTo(
"["