summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2020-01-14 12:37:09 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-26 00:41:40 +0000
commit27363455aaba930778a5feff02b0b320d4b850af (patch)
tree7436cbb7be03dc9c98d68349c44977b4a71f066f /src/mongo/db/pipeline
parent183e8c5c04d77ea840468ba8723970fa67258376 (diff)
downloadmongo-27363455aaba930778a5feff02b0b320d4b850af.tar.gz
SERVER-45538 Add shard version retry logic for $unionWith sub-pipeline
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript8
-rw-r--r--src/mongo/db/pipeline/accumulator_js_test.cpp3
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp248
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp39
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp28
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h18
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp17
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h15
-rw-r--r--src/mongo/db/pipeline/expression_javascript_test.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp18
-rw-r--r--src/mongo/db/pipeline/pipeline.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/SConscript2
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.h1
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp9
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp206
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h18
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp12
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp26
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp2
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp175
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h91
-rw-r--r--src/mongo/db/pipeline/sharded_union_test.cpp305
31 files changed, 1027 insertions, 283 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index ecd9b51397c..4b8099b5734 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -341,6 +341,7 @@ env.CppUnitTest(
'accumulator_test.cpp',
'aggregation_request_test.cpp',
'dependencies_test.cpp',
+ 'dispatch_shard_pipeline_test.cpp',
'document_path_support_test.cpp',
'document_source_add_fields_test.cpp',
'document_source_bucket_auto_test.cpp',
@@ -399,11 +400,10 @@ env.CppUnitTest(
'resume_token_test.cpp',
'semantic_analysis_test.cpp',
'sequential_document_cache_test.cpp',
+ 'sharded_union_test.cpp',
'tee_buffer_test.cpp',
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/service_context_d",
- "$BUILD_DIR/mongo/db/service_context_d_test_fixture",
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/exec/document_value/document_value',
@@ -413,8 +413,11 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/service_context_d',
+ '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/s/catalog_cache_test_fixture',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/s/query/router_exec_stage',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
@@ -429,6 +432,7 @@ env.CppUnitTest(
'pipeline',
'process_interface/mongod_process_interfaces',
'process_interface/mongos_process_interface',
+ 'process_interface/shardsvr_process_interface',
'sharded_agg_helpers',
]
)
diff --git a/src/mongo/db/pipeline/accumulator_js_test.cpp b/src/mongo/db/pipeline/accumulator_js_test.cpp
index 8497e1ab9c8..f2c471b5536 100644
--- a/src/mongo/db/pipeline/accumulator_js_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_test.cpp
@@ -46,8 +46,7 @@ namespace {
class MapReduceFixture : public ServiceContextMongoDTest {
protected:
MapReduceFixture() : _expCtx((new ExpressionContextForTest())) {
- _expCtx->mongoProcessInterface =
- std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx);
+ _expCtx->mongoProcessInterface = std::make_shared<StandaloneProcessInterface>(nullptr);
}
boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() {
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
new file mode 100644
index 00000000000..b141c3b72a9
--- /dev/null
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -0,0 +1,248 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/s/query/sharded_agg_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+// Use this new name to register these tests under their own unit test suite.
+using DispatchShardPipelineTest = ShardedAggTestFixture;
+
+TEST_F(DispatchShardPipelineTest, DoesNotSplitPipelineIfTargetingOneShard) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ auto shards = setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+
+ auto stages = std::vector{
+ fromjson("{$match: {_id: {$gte: 0}}}"),
+ fromjson("{$sort: {score: -1}}"),
+ fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"),
+ };
+ auto pipeline = Pipeline::create(
+ {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
+ const Document serializedCommand =
+ AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ const bool hasChangeStream = false;
+
+ auto future = launchAsync([&] {
+ auto results = sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, std::move(pipeline));
+ ASSERT_EQ(results.remoteCursors.size(), 1UL);
+ ASSERT(!results.splitPipeline);
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(request.target, HostAndPort(shards[1].getHost()));
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+TEST_F(DispatchShardPipelineTest, DoesSplitPipelineIfMatchSpansTwoShards) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ auto stages = std::vector{
+ fromjson("{$match: {_id: {$gte: -10}}}"),
+ fromjson("{$sort: {score: -1}}"),
+ fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"),
+ };
+ auto pipeline = Pipeline::create(
+ {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
+ const Document serializedCommand =
+ AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ const bool hasChangeStream = false;
+
+ auto future = launchAsync([&] {
+ auto results = sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, std::move(pipeline));
+ ASSERT_EQ(results.remoteCursors.size(), 2UL);
+ ASSERT(bool(results.splitPipeline));
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+TEST_F(DispatchShardPipelineTest, DispatchShardPipelineRetriesOnNetworkError) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ auto stages = std::vector{
+ fromjson("{$match: {_id: {$gte: -10}}}"),
+ fromjson("{$sort: {score: -1}}"),
+ fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"),
+ };
+ auto pipeline = Pipeline::create(
+ {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
+ const Document serializedCommand =
+ AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ const bool hasChangeStream = false;
+ auto future = launchAsync([&] {
+ // Shouldn't throw.
+ auto results = sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, std::move(pipeline));
+ ASSERT_EQ(results.remoteCursors.size(), 2UL);
+ ASSERT(bool(results.splitPipeline));
+ });
+
+ // Mock out responses, one success, one error.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ HostAndPort unreachableShard;
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ unreachableShard = request.target;
+ return Status{ErrorCodes::HostUnreachable, "Mock error: Couldn't find host"};
+ });
+
+ // That error should be retried, but only the one on that shard.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ // Test that it's retrying to the shard we failed earlier.
+ ASSERT_EQ(request.target, unreachableShard);
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ future.default_timed_get();
+}
+
+// Test that this helper is not responsible for retrying StaleConfig errors. This should happen at a
+// higher level.
+TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfigError) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ auto stages = std::vector{
+ fromjson("{$match: {_id: {$gte: 0}}}"),
+ fromjson("{$sort: {score: -1}}"),
+ fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"),
+ };
+ auto pipeline = Pipeline::create(
+ {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
+ const Document serializedCommand =
+ AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ const bool hasChangeStream = false;
+ auto future = launchAsync([&] {
+ ASSERT_THROWS_CODE(sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, std::move(pipeline)),
+ AssertionException,
+ ErrorCodes::StaleShardVersion);
+ });
+
+ // Mock out an error response.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+ future.default_timed_get();
+}
+
+TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+ auto stages = std::vector{
+ fromjson("{$match: {_id: {$gte: 0}}}"),
+ fromjson("{$sort: {score: -1}}"),
+ fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"),
+ };
+ auto pipeline = Pipeline::create(
+ {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx());
+ const Document serializedCommand =
+ AggregationRequest(expCtx()->ns, stages).serializeToCommandObj();
+ const bool hasChangeStream = false;
+ auto future = launchAsync([&] {
+ // Shouldn't throw.
+ auto results = sharded_agg_helpers::shardVersionRetry(
+ operationContext(),
+ Grid::get(getServiceContext())->catalogCache(),
+ kTestAggregateNss,
+ "dispatch shard pipeline"_sd,
+ [&]() {
+ return sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, pipeline->clone());
+ });
+ ASSERT_EQ(results.remoteCursors.size(), 1UL);
+ ASSERT(!bool(results.splitPipeline));
+ });
+
+ // Mock out one error response, then expect a refresh of the sharding catalog for that
+ // namespace, then mock out a successful response.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+
+ // Mock the expected config server queries.
+ const OID epoch = OID::gen();
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+ expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ ChunkVersion version(1, 0, epoch);
+
+ ChunkType chunk1(kTestAggregateNss,
+ {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
+ version,
+ {"0"});
+ chunk1.setName(OID::gen());
+ version.incMinor();
+
+ ChunkType chunk2(kTestAggregateNss,
+ {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"1"});
+ chunk2.setName(OID::gen());
+ version.incMinor();
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ }());
+
+ // That error should be retried, but only the one on that shard.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 1152e92f5f4..3aa5a322a72 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -293,10 +293,6 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
auto pipeline = Pipeline::parse(rawFacet.second, expCtx, [](const Pipeline& pipeline) {
auto sources = pipeline.getSources();
- uassert(ErrorCodes::BadValue,
- "sub-pipeline in $facet stage cannot be empty",
- !sources.empty());
-
std::for_each(sources.begin(), sources.end(), [](auto& stage) {
auto stageConstraints = stage->constraints();
uassert(40600,
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 6316452c22b..37aac48d500 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -112,17 +112,6 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectNonArrayFacets) {
AssertionException);
}
-TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) {
- auto ctx = getExpCtx();
- auto spec = BSON("$facet" << BSON("a" << BSONArray()));
- ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx),
- AssertionException);
-
- spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << BSONArray()));
- ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx),
- AssertionException);
-}
-
TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) {
auto ctx = getExpCtx();
auto spec = fromjson("{$facet: {a: [{$match: {}}]}}");
@@ -372,6 +361,34 @@ TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) {
ASSERT(facetStage->getNext().isEOF());
}
+TEST_F(DocumentSourceFacetTest, ShouldAcceptEmptyPipelines) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSONArray()));
+
+ deque<DocumentSource::GetNextResult> inputs = {
+ Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}};
+ auto mock = DocumentSourceMock::createForTest(inputs);
+
+ auto facetStage = DocumentSourceFacet::createFromBson(spec.firstElement(), ctx);
+ facetStage->setSource(mock.get());
+
+ auto output = facetStage->getNext();
+
+ // The output fields are in no guaranteed order.
+ vector<Value> expectedOutputs;
+ for (auto&& input : inputs) {
+ expectedOutputs.emplace_back(input.releaseDocument());
+ }
+ ASSERT(output.isAdvanced());
+ ASSERT_EQ(output.getDocument().computeSize(), 1ULL);
+ ASSERT_VALUE_EQ(output.getDocument()["a"], Value(expectedOutputs));
+
+ // Should be exhausted now.
+ ASSERT(facetStage->getNext().isEOF());
+ ASSERT(facetStage->getNext().isEOF());
+ ASSERT(facetStage->getNext().isEOF());
+}
+
TEST_F(DocumentSourceFacetTest,
ShouldCorrectlyHandleSubPipelinesYieldingDifferentNumbersOfResults) {
auto ctx = getExpCtx();
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
index f5763d42987..e5f51c1883d 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -80,6 +80,7 @@ public:
DocumentSourceMergeCursorsTest() {
TimeZoneDatabase::set(getServiceContext(), std::make_unique<TimeZoneDatabase>());
_expCtx = new ExpressionContext(operationContext(), nullptr, kTestNss);
+ _expCtx->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>(executor());
}
void setUp() override {
@@ -206,8 +207,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
armParams.setRemotes(std::move(cursors));
auto pipeline = Pipeline::create({}, expCtx);
- auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx);
+ auto mergeCursorsStage = DocumentSourceMergeCursors::create(expCtx, std::move(armParams));
ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
}
@@ -230,8 +230,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
armParams.setRemotes(std::move(cursors));
auto pipeline = Pipeline::create({}, expCtx);
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams)));
// Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
// waiting for network responses, which we will manually schedule below.
@@ -279,8 +278,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfTheyAreNotOwned) {
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
armParams.setRemotes(std::move(cursors));
auto pipeline = Pipeline::create({}, expCtx);
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams)));
auto mergeCursors =
checked_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get());
@@ -301,8 +299,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
armParams.setRemotes(std::move(cursors));
auto pipeline = Pipeline::create({}, expCtx);
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams)));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
@@ -347,8 +344,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams)));
// After optimization we should only have a $mergeCursors stage.
pipeline->optimizePipeline();
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index ad9b7acc9d0..43a6e22403d 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -157,27 +157,37 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::createFromBson(
}
DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
- if (!_sourceExhausted) {
+ if (!_pipeline) {
+ // We must have already been disposed, so we're finished.
+ return GetNextResult::makeEOF();
+ }
+
+ if (_executionState == ExecutionProgress::kIteratingSource) {
auto nextInput = pSource->getNext();
if (!nextInput.isEOF()) {
return nextInput;
}
- _sourceExhausted = true;
+ _executionState = ExecutionProgress::kStartingSubPipeline;
// All documents from the base collection have been returned, switch to iterating the sub-
// pipeline by falling through below.
}
- if (!_cursorAttached) {
- auto ctx = _pipeline->getContext();
- _pipeline =
- pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(ctx, _pipeline.release());
- _cursorAttached = true;
- LOGV2_DEBUG(23869, 3, "$unionWith attached cursor to pipeline");
+ if (_executionState == ExecutionProgress::kStartingSubPipeline) {
+ LOGV2_DEBUG(23869,
+ 3,
+ "$unionWith attaching cursor to pipeline {pipeline}",
+ "pipeline"_attr = Value(_pipeline->serialize()));
+ auto expCtxCopy = _pipeline->getContext();
+ _pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ expCtxCopy, _pipeline.release());
+ _executionState = ExecutionProgress::kIteratingSubPipeline;
}
- if (auto res = _pipeline->getNext())
+ auto res = _pipeline->getNext();
+ if (res)
return std::move(*res);
+ _executionState = ExecutionProgress::kFinished;
return GetNextResult::makeEOF();
}
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index ade72532935..2dd26f4aa55 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -136,6 +136,22 @@ protected:
void doDispose() final;
private:
+ enum ExecutionProgress {
+ // We haven't yet iterated 'pSource' to completion.
+ kIteratingSource,
+
+ // We finished iterating 'pSource', but haven't started on the sub pipeline and need to do
+ // some setup first.
+ kStartingSubPipeline,
+
+ // We finished iterating 'pSource' and are now iterating '_pipeline', but haven't finished
+ // yet.
+ kIteratingSubPipeline,
+
+ // There are no more results.
+ kFinished
+ };
+
/**
* Should not be called; use serializeToArray instead.
*/
@@ -144,9 +160,9 @@ private:
}
std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- bool _sourceExhausted = false;
bool _cursorAttached = false;
bool _usedDisk = false;
+ ExecutionProgress _executionState = ExecutionProgress::kIteratingSource;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
index 61412ad421c..29d38f916bf 100644
--- a/src/mongo/db/pipeline/document_source_union_with_test.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -324,6 +324,23 @@ TEST_F(DocumentSourceUnionWithTest, PropagatePauses) {
ASSERT_TRUE(unionWithTwo.getNext().isEOF());
}
+TEST_F(DocumentSourceUnionWithTest, ReturnEOFAfterBeingDisposed) {
+ const auto mockInput = DocumentSourceMock::createForTest({Document(), Document()});
+ const auto mockUnionInput = std::deque<DocumentSource::GetNextResult>{};
+ const auto mockCtx = getExpCtx()->copyWith({});
+ mockCtx->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockUnionInput);
+ auto unionWith = DocumentSourceUnionWith(
+ mockCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
+ unionWith.setSource(mockInput.get());
+
+ ASSERT_TRUE(unionWith.getNext().isAdvanced());
+
+ unionWith.dispose();
+ ASSERT_TRUE(unionWith.getNext().isEOF());
+ ASSERT_TRUE(unionWith.getNext().isEOF());
+ ASSERT_TRUE(unionWith.getNext().isEOF());
+}
+
TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsFullDoc) {
auto expCtx = getExpCtx();
const auto replaceRoot =
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index 10d71ef185a..fcfcf7ee159 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -80,6 +80,21 @@ public:
: ExpressionContext(
opCtx, request, nullptr, std::make_shared<StubMongoProcessInterface>(), {}, {}) {}
+ ExpressionContextForTest(OperationContext* opCtx, NamespaceString nss)
+ : ExpressionContext(opCtx,
+ boost::none, // explain
+ false, // fromMongos,
+ false, // needsMerge,
+ false, // allowDiskUse,
+ false, // bypassDocumentValidation,
+ false, // isMapReduce
+ nss,
+ RuntimeConstants(Date_t::now(), Timestamp(1, 0)),
+ {}, // collator
+ std::make_shared<StubMongoProcessInterface>(),
+ {}, // resolvedNamespaces
+ {} // collUUID
+ ) {}
/**
* Sets the resolved definition for an involved namespace.
*/
diff --git a/src/mongo/db/pipeline/expression_javascript_test.cpp b/src/mongo/db/pipeline/expression_javascript_test.cpp
index 10203cc523d..3df1f10d436 100644
--- a/src/mongo/db/pipeline/expression_javascript_test.cpp
+++ b/src/mongo/db/pipeline/expression_javascript_test.cpp
@@ -47,8 +47,7 @@ class MapReduceFixture : public ServiceContextMongoDTest {
protected:
MapReduceFixture()
: _expCtx((new ExpressionContextForTest())), _vps(_expCtx->variablesParseState) {
- _expCtx->mongoProcessInterface =
- std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx);
+ _expCtx->mongoProcessInterface = std::make_shared<StandaloneProcessInterface>(nullptr);
}
boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index eb3a4d97914..ce3e9e89fa4 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -148,6 +148,24 @@ Pipeline::~Pipeline() {
invariant(_disposed);
}
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::clone() const {
+ const auto& serialized = serialize();
+ std::vector<BSONObj> asBson;
+ asBson.reserve(serialized.size());
+ for (auto&& stage : serialized) {
+ invariant(stage.getType() == BSONType::Object);
+ asBson.push_back(stage.getDocument().toBson());
+ }
+ try {
+ return parse(asBson, getContext());
+ } catch (DBException& ex) {
+ ex.addContext(str::stream()
+ << "Failed to copy pipeline. Could not parse serialized version: "
+ << Value(serialized).toString());
+ throw;
+ }
+}
+
std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse(
const std::vector<BSONObj>& rawPipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 5eb096fc7e2..93bcf7a4bbe 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -146,6 +146,8 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{});
+ std::unique_ptr<Pipeline, PipelineDeleter> clone() const;
+
const boost::intrusive_ptr<ExpressionContext>& getContext() const {
return pCtx;
}
diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript
index 7f7697bcf6b..6f455ff74c8 100644
--- a/src/mongo/db/pipeline/process_interface/SConscript
+++ b/src/mongo/db/pipeline/process_interface/SConscript
@@ -76,6 +76,7 @@ env.Library(
'mongod_process_interface_factory.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/s/sharding_router_api',
'mongod_process_interfaces',
'shardsvr_process_interface',
],
@@ -88,6 +89,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
'$BUILD_DIR/mongo/s/query/async_results_merger',
'$BUILD_DIR/mongo/s/query/cluster_query',
'common_process_interface',
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index 640a69bd3d7..565d64b1d4f 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -143,8 +143,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-CommonMongodProcessInterface::CommonMongodProcessInterface(OperationContext* opCtx) {}
-
std::unique_ptr<TransactionHistoryIteratorBase>
CommonMongodProcessInterface::createTransactionHistoryIterator(repl::OpTime time) const {
bool permitYield = true;
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index 64fcde8a913..cc6b032f2a7 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -49,7 +49,7 @@ using write_ops::Update;
*/
class CommonMongodProcessInterface : public CommonProcessInterface {
public:
- CommonMongodProcessInterface(OperationContext* opCtx);
+ using CommonProcessInterface::CommonProcessInterface;
virtual ~CommonMongodProcessInterface() = default;
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h
index 2184e4b2ee6..ebe970987c8 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h
@@ -42,6 +42,7 @@ namespace mongo {
*/
class CommonProcessInterface : public MongoProcessInterface {
public:
+ using MongoProcessInterface::MongoProcessInterface;
virtual ~CommonProcessInterface() = default;
/**
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 3f0cfee2f37..25210681f9d 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -52,6 +52,7 @@
#include "mongo/db/resource_yielder.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
#include "mongo/db/storage/backup_cursor_state.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -116,6 +117,9 @@ public:
int64_t nModified{0};
};
+ MongoProcessInterface(std::shared_ptr<executor::TaskExecutor> executor)
+ : taskExecutor(std::move(executor)) {}
+
virtual ~MongoProcessInterface(){};
/**
@@ -420,6 +424,8 @@ public:
boost::optional<std::set<FieldPath>> fieldPaths,
boost::optional<ChunkVersion> targetCollectionVersion,
const NamespaceString& outputNs) const = 0;
+
+ std::shared_ptr<executor::TaskExecutor> taskExecutor;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
index 744222380f9..80b330149ca 100644
--- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
@@ -35,18 +35,21 @@
#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h"
#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace {
std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(OperationContext* opCtx) {
if (ShardingState::get(opCtx)->enabled()) {
- return std::make_shared<ShardServerProcessInterface>(opCtx);
+ return std::make_shared<ShardServerProcessInterface>(
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
} else if (getTestCommandsEnabled()) {
if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx))
- return std::make_shared<ReplicaSetNodeProcessInterface>(opCtx, executor);
+ return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor));
}
- return std::make_shared<StandaloneProcessInterface>(opCtx);
+ return std::make_shared<StandaloneProcessInterface>(nullptr);
}
auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION(
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 906e6c3af50..09fc0fc00d0 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/process_interface/mongos_process_interface.h"
@@ -50,6 +52,7 @@
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -100,14 +103,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- bool allowTargetingShards) {
- // On mongos we can't have local cursors.
- return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline);
-}
-
boost::optional<Document> MongosProcessInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -134,94 +129,79 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument(
cmdBuilder.append("allowSpeculativeMajorityRead", true);
}
- auto shardResults = std::vector<RemoteCursor>();
- auto findCmd = cmdBuilder.obj();
- for (size_t numRetries = 0; numRetries <= kMaxNumStaleVersionRetries; ++numRetries) {
- // Obtain the catalog cache. If we are retrying after a stale shard error, mark this
- // operation as needing to block on the next catalog cache refresh.
+ try {
+ auto findCmd = cmdBuilder.obj();
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, numRetries);
-
- // Verify that the collection exists, with the correct UUID.
- auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
- if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
- return boost::none;
- }
- auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
-
- // Finalize the 'find' command object based on the routing table information.
- if (findCmdIsByUuid && routingInfo.cm()) {
- // Find by UUID and shard versioning do not work together (SERVER-31946). In the
- // sharded case we've already checked the UUID, so find by namespace is safe. In the
- // unlikely case that the collection has been deleted and a new collection with the same
- // name created through a different mongos, the shard version will be detected as stale,
- // as shard versions contain an 'epoch' field unique to the collection.
- findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement());
- findCmdIsByUuid = false;
- }
-
- try {
- // Build the versioned requests to be dispatched to the shards. Typically, only a single
- // shard will be targeted here; however, in certain cases where only the _id is present,
- // we may need to scatter-gather the query to all shards in order to find the document.
- auto requests = getVersionedRequestsForTargetedShards(
- expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec);
-
- // Dispatch the requests. The 'establishCursors' method conveniently prepares the result
- // into a vector of cursor responses for us.
- shardResults = establishCursors(
- expCtx->opCtx,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- ReadPreferenceSetting::get(expCtx->opCtx),
- std::move(requests),
- false);
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
- // If it's an unsharded collection which has been deleted and re-created, we may get a
- // NamespaceNotFound error when looking up by UUID.
- return boost::none;
- } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) {
- // If the database version is stale, refresh its entry in the catalog cache.
- catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion());
- continue; // Try again if allowed.
- } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) {
- // If the exception provides a shardId, add it to the set of shards requiring a refresh.
- // If the cache currently considers the collection to be unsharded, this will trigger an
- // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh.
- auto staleInfo = e.extraInfo<StaleConfigInfo>();
- if (auto staleShardId = (staleInfo ? staleInfo->getShardId() : boost::none)) {
- catalogCache->onStaleShardVersion(std::move(routingInfo), *staleShardId);
- } else {
- catalogCache->onEpochChange(nss);
- }
- continue; // Try again if allowed.
- } catch (const ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>&) {
- continue;
+ auto shardResults = sharded_agg_helpers::shardVersionRetry(
+ expCtx->opCtx,
+ catalogCache,
+ expCtx->ns,
+ str::stream() << "Looking up document matching " << redact(filter.toBson()),
+ [&]() -> std::vector<RemoteCursor> {
+ // Verify that the collection exists, with the correct UUID.
+ auto routingInfo = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx));
+
+ // Finalize the 'find' command object based on the routing table information.
+ if (findCmdIsByUuid && routingInfo.cm()) {
+ // Find by UUID and shard versioning do not work together (SERVER-31946). In
+ // the sharded case we've already checked the UUID, so find by namespace is
+ // safe. In the unlikely case that the collection has been deleted and a new
+ // collection with the same name created through a different mongos, the shard
+ // version will be detected as stale, as shard versions contain an 'epoch' field
+ // unique to the collection.
+ findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement());
+ findCmdIsByUuid = false;
+ }
+
+ // Build the versioned requests to be dispatched to the shards. Typically, only a
+ // single shard will be targeted here; however, in certain cases where only the _id
+ // is present, we may need to scatter-gather the query to all shards in order to
+ // find the document.
+ auto requests = getVersionedRequestsForTargetedShards(expCtx->opCtx,
+ nss,
+ routingInfo,
+ findCmd,
+ filterObj,
+ CollationSpec::kSimpleSpec);
+
+ // Dispatch the requests. The 'establishCursors' method conveniently prepares the
+ // result into a vector of cursor responses for us.
+ return establishCursors(
+ expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ ReadPreferenceSetting::get(expCtx->opCtx),
+ std::move(requests),
+ false);
+ });
+
+ // Iterate all shard results and build a single composite batch. We also enforce the
+ // requirement that only a single document should have been returned from across the
+ // cluster.
+ std::vector<BSONObj> finalBatch;
+ for (auto&& shardResult : shardResults) {
+ auto& shardCursor = shardResult.getCursorResponse();
+ finalBatch.insert(
+ finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end());
+ // We should have at most 1 result, and the cursor should be exhausted.
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "Shard cursor was unexpectedly open after lookup: "
+ << shardResult.getHostAndPort()
+ << ", id: " << shardCursor.getCursorId(),
+ shardCursor.getCursorId() == 0);
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " [" << finalBatch.begin()->toString() << ", "
+ << std::next(finalBatch.begin())->toString() << "]",
+ finalBatch.size() <= 1u);
}
- break; // Success!
- }
- // Iterate all shard results and build a single composite batch. We also enforce the requirement
- // that only a single document should have been returned from across the cluster.
- std::vector<BSONObj> finalBatch;
- for (auto&& shardResult : shardResults) {
- auto& shardCursor = shardResult.getCursorResponse();
- finalBatch.insert(
- finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end());
- // We should have at most 1 result, and the cursor should be exhausted.
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.getHostAndPort()
- << ", id: " << shardCursor.getCursorId(),
- shardCursor.getCursorId() == 0);
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "found more than one document matching " << filter.toString()
- << " [" << finalBatch.begin()->toString() << ", "
- << std::next(finalBatch.begin())->toString() << "]",
- finalBatch.size() <= 1u);
+ return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{});
+ } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ // If it's an unsharded collection which has been deleted and re-created, we may get a
+ // NamespaceNotFound error when looking up by UUID.
+ return boost::none;
}
-
- return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{});
}
BSONObj MongosProcessInterface::_reportCurrentOpForClient(
@@ -256,10 +236,11 @@ void MongosProcessInterface::_reportCurrentOpsForIdleSessions(OperationContext*
const bool authEnabled =
AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled();
- // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to
- // create a pattern that will match against all authenticated usernames for the current client.
- // If the user is listing ops for all users, we create an empty pattern; constructing an
- // instance of SessionKiller::Matcher with this empty pattern will return all sessions.
+ // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers
+ // to create a pattern that will match against all authenticated usernames for the current
+ // client. If the user is listing ops for all users, we create an empty pattern;
+ // constructing an instance of SessionKiller::Matcher with this empty pattern will return
+ // all sessions.
auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers
? makeSessionFilterForAuthenticatedUsers(opCtx)
: KillAllSessionsByPatternSet{{}});
@@ -334,24 +315,24 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey(
"Cannot find index to verify that join fields will be unique",
fieldsHaveSupportingUniqueIndex(expCtx, outputNs, *fieldPaths));
- // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for the
- // shards since we are not at risk of 'guessing' the wrong shard key.
+ // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for
+ // the shards since we are not at risk of 'guessing' the wrong shard key.
return {*fieldPaths, boost::none};
}
// In case there are multiple shards which will perform this stage in parallel, we need to
- // figure out and attach the collection's shard version to ensure each shard is talking about
- // the same version of the collection. This mongos will coordinate that. We force a catalog
- // refresh to do so because there is no shard versioning protocol on this namespace and so we
- // otherwise could not be sure this node is (or will become) at all recent. We will also
- // figure out and attach the 'joinFields' to send to the shards.
-
- // There are edge cases when the collection could be dropped or re-created during or near the
- // time of the operation (for example, during aggregation). This is okay - we are mostly
+ // figure out and attach the collection's shard version to ensure each shard is talking
+ // about the same version of the collection. This mongos will coordinate that. We force a
+ // catalog refresh to do so because there is no shard versioning protocol on this namespace
+ // and so we otherwise could not be sure this node is (or will become) at all recent. We
+ // will also figure out and attach the 'joinFields' to send to the shards.
+
+ // There are edge cases when the collection could be dropped or re-created during or near
+ // the time of the operation (for example, during aggregation). This is okay - we are mostly
// paranoid that this mongos is very stale and want to prevent returning an error if the
- // collection was dropped a long time ago. Because of this, we are okay with piggy-backing off
- // another thread's request to refresh the cache, simply waiting for that request to return
- // instead of forcing another refresh.
+ // collection was dropped a long time ago. Because of this, we are okay with piggy-backing
+ // off another thread's request to refresh the cache, simply waiting for that request to
+ // return instead of forcing another refresh.
targetCollectionVersion = refreshAndGetCollectionVersion(expCtx, outputNs);
auto docKeyPaths = collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, outputNs);
@@ -360,4 +341,11 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey(
targetCollectionVersion};
}
+std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 252ee20d0a6..16bfc0dfe80 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -41,7 +41,7 @@ namespace mongo {
*/
class MongosProcessInterface : public CommonProcessInterface {
public:
- MongosProcessInterface() = default;
+ using CommonProcessInterface::CommonProcessInterface;
virtual ~MongosProcessInterface() = default;
@@ -148,11 +148,6 @@ public:
MONGO_UNREACHABLE;
}
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline,
- bool allowTargetingShards = true) final;
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
// It is not meaningful to perform a "local read" on mongos.
@@ -223,6 +218,17 @@ public:
boost::optional<ChunkVersion> targetCollectionVersion,
const NamespaceString& outputNs) const override;
+ /**
+ * If 'allowTargetingShards' is true, splits the pipeline and dispatch half to the shards,
+ * leaving the merging half executing in this process after attaching a $mergeCursors. Will
+ * retry on network errors and also on StaleConfig errors to avoid restarting the entire
+ * operation.
+ */
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline,
+ bool allowTargetingShards) final;
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp
index 63d69d93eab..e2d64f29eb5 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp
@@ -56,7 +56,7 @@ public:
}
auto makeProcessInterface() {
- return std::make_unique<MongosProcessInterfaceForTest>();
+ return std::make_unique<MongosProcessInterfaceForTest>(nullptr);
}
};
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
index d6fa54b353a..17a2c69a207 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -48,21 +48,21 @@ namespace mongo {
namespace {
const auto replicaSetNodeExecutor =
- ServiceContext::declareDecoration<std::unique_ptr<executor::TaskExecutor>>();
+ ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>();
} // namespace
-executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
+std::shared_ptr<executor::TaskExecutor> ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
ServiceContext* service) {
- return replicaSetNodeExecutor(service).get();
+ return replicaSetNodeExecutor(service);
}
-executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
+std::shared_ptr<executor::TaskExecutor> ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
OperationContext* opCtx) {
return getReplicaSetNodeExecutor(opCtx->getServiceContext());
}
void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
- ServiceContext* service, std::unique_ptr<executor::TaskExecutor> executor) {
+ ServiceContext* service, std::shared_ptr<executor::TaskExecutor> executor) {
replicaSetNodeExecutor(service) = std::move(executor);
}
@@ -196,7 +196,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary(
auto [promise, future] = makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>();
auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>(
std::move(promise));
- auto scheduleResult = _executor->scheduleRemoteCommand(
+ auto scheduleResult = taskExecutor->scheduleRemoteCommand(
std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); });
if (!scheduleResult.isOK()) {
// Since the command failed to be scheduled, the callback above did not and will not run.
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
index 1d7c7b8539d..22af49f29a3 100644
--- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -44,15 +44,17 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa
public:
using NonShardServerProcessInterface::NonShardServerProcessInterface;
- static executor::TaskExecutor* getReplicaSetNodeExecutor(ServiceContext* service);
+ static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor(
+ ServiceContext* service);
- static executor::TaskExecutor* getReplicaSetNodeExecutor(OperationContext* opCtx);
+ static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor(
+ OperationContext* opCtx);
static void setReplicaSetNodeExecutor(ServiceContext* service,
- std::unique_ptr<executor::TaskExecutor> executor);
+ std::shared_ptr<executor::TaskExecutor> executor);
- ReplicaSetNodeProcessInterface(OperationContext* opCtx, executor::TaskExecutor* executor)
- : NonShardServerProcessInterface(opCtx), _executor(executor) {}
+ ReplicaSetNodeProcessInterface(std::shared_ptr<executor::TaskExecutor> executor)
+ : NonShardServerProcessInterface(std::move(executor)) {}
virtual ~ReplicaSetNodeProcessInterface() = default;
@@ -112,8 +114,6 @@ private:
* immediately stale upon return.
*/
bool _canWriteLocally(OperationContext* opCtx, const NamespaceString& ns) const;
-
- executor::TaskExecutor* _executor;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index d93a18bfb5f..80c72ffb326 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -47,7 +47,6 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
-#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/write_ops/cluster_write.h"
namespace mongo {
@@ -145,23 +144,6 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd
return {{response.getN(), response.getNModified()}};
}
-unique_ptr<Pipeline, PipelineDeleter> ShardServerProcessInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- bool allowTargetingShards) {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
-
- if (!allowTargetingShards || expCtx->ns.db() == "local") {
- // If the db is local, this may be a change stream examining the oplog. We know the oplog
- // (and any other local collections) will not be sharded.
- return attachCursorSourceToPipelineForLocalRead(expCtx, pipeline.release());
- }
- return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, pipeline.release());
-}
-
std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
auto collectionFilter =
@@ -315,4 +297,12 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx,
<< "write concern failed while running command " << cmdObj);
}
+std::unique_ptr<Pipeline, PipelineDeleter>
+ShardServerProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index 4c411cad35b..592d8043017 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -87,11 +87,6 @@ public:
bool multi,
boost::optional<OID> targetEpoch) final;
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline,
- bool allowTargetingShards = true) final;
-
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final;
@@ -110,6 +105,17 @@ public:
const NamespaceString& ns,
const std::vector<BSONObj>& indexSpecs) final;
void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final;
+
+ /**
+ * If 'allowTargetingShards' is true, splits the pipeline and dispatch half to the shards,
+ * leaving the merging half executing in this process after attaching a $mergeCursors. Will
+ * retry on network errors and also on StaleConfig errors to avoid restarting the entire
+ * operation.
+ */
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline,
+ bool allowTargetingShards) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp
index 12fe460d674..82c36b40ca7 100644
--- a/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp
@@ -59,7 +59,7 @@ public:
class ProcessInterfaceStandaloneTest : public AggregationContextFixture {
public:
auto makeProcessInterface() {
- return std::make_unique<MongoProcessInterfaceForTest>(getExpCtx()->opCtx);
+ return std::make_unique<MongoProcessInterfaceForTest>(nullptr);
}
};
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 2d2d798ab92..f6a19dd9851 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -44,6 +44,8 @@ namespace mongo {
*/
class StubMongoProcessInterface : public MongoProcessInterface {
public:
+ StubMongoProcessInterface() : MongoProcessInterface(nullptr) {}
+ using MongoProcessInterface::MongoProcessInterface;
virtual ~StubMongoProcessInterface() = default;
std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index b68ad0edc34..aace58f534b 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -48,7 +48,6 @@
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/semantic_analysis.h"
-#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -110,7 +109,7 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
aggReq.setBatchSize(0);
auto configCursor =
establishCursors(expCtx->opCtx,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ expCtx->mongoProcessInterface->taskExecutor,
aggReq.getNamespaceString(),
ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
{{configShard->getId(), aggReq.serializeToCommandObj().toBson()}},
@@ -165,6 +164,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
std::vector<RemoteCursor> establishShardCursors(
OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
bool hasChangeStream,
boost::optional<CachedCollectionRoutingInfo>& routingInfo,
@@ -216,7 +216,7 @@ std::vector<RemoteCursor> establishShardCursors(
}
return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ std::move(executor),
nss,
readPref,
requests,
@@ -581,6 +581,72 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
}
}
+/**
+ * For a sharded collection, establishes remote cursors on each shard that may have results, and
+ * creates a DocumentSourceMergeCursors stage to merge the remote cursors. Returns a pipeline
+ * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might
+ * be this node itself.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
+ PipelineDeleter(expCtx->opCtx));
+
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ std::vector<BSONObj> rawStages = [&pipeline]() {
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> stages;
+ stages.reserve(serialization.size());
+
+ for (const auto& stageObj : serialization) {
+ invariant(stageObj.getType() == BSONType::Object);
+ stages.push_back(stageObj.getDocument().toBson());
+ }
+
+ return stages;
+ }();
+
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+
+ // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set
+ // on the originating command, so copy it from the ExpressionContext.
+ aggRequest.setAllowDiskUse(expCtx->allowDiskUse);
+
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto hasChangeStream = liteParsedPipeline.hasChangeStream();
+ auto shardDispatchResults = dispatchShardPipeline(
+ aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
+ if (shardDispatchResults.splitPipeline) {
+ mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ } else {
+ // We have not split the pipeline, and will execute entirely on the remote shards. Set up an
+ // empty local pipeline which we will attach the merge cursors stage to.
+ mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx);
+ }
+
+ addMergeCursorsSource(mergePipeline.get(),
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardCursorsSortSpec,
+ hasChangeStream);
+
+ return mergePipeline;
+}
+
} // namespace
boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx,
@@ -840,6 +906,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
}
} else {
cursors = establishShardCursors(opCtx,
+ expCtx->mongoProcessInterface->taskExecutor,
expCtx->ns,
hasChangeStream,
executionNsRoutingInfo,
@@ -886,7 +953,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- std::shared_ptr<executor::TaskExecutor> executor,
bool hasChangeStream) {
auto* opCtx = mergePipeline->getContext()->opCtx;
AsyncResultsMergerParams armParams;
@@ -924,82 +990,17 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
// For change streams, we need to set up a custom stage to establish cursors on new shards when
// they are added, to ensure we don't miss results from the new shards.
- auto mergeCursorsStage = DocumentSourceMergeCursors::create(
- std::move(executor), std::move(armParams), mergePipeline->getContext());
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(mergePipeline->getContext(), std::move(armParams));
if (hasChangeStream) {
mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
- mergePipeline->getContext(),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- mergeCursorsStage,
- targetedShards,
- cmdSentToShards));
+ mergePipeline->getContext(), mergeCursorsStage, targetedShards, cmdSentToShards));
}
mergePipeline->addInitialSource(std::move(mergeCursorsStage));
}
-std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
- PipelineDeleter(expCtx->opCtx));
-
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
-
- // Generate the command object for the targeted shards.
- std::vector<BSONObj> rawStages = [&pipeline]() {
- auto serialization = pipeline->serialize();
- std::vector<BSONObj> stages;
- stages.reserve(serialization.size());
-
- for (const auto& stageObj : serialization) {
- invariant(stageObj.getType() == BSONType::Object);
- stages.push_back(stageObj.getDocument().toBson());
- }
-
- return stages;
- }();
-
- AggregationRequest aggRequest(expCtx->ns, rawStages);
-
- // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set
- // on the originating command, so copy it from the ExpressionContext.
- aggRequest.setAllowDiskUse(expCtx->allowDiskUse);
-
- LiteParsedPipeline liteParsedPipeline(aggRequest);
- auto hasChangeStream = liteParsedPipeline.hasChangeStream();
- auto shardDispatchResults = dispatchShardPipeline(
- aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
-
- std::vector<ShardId> targetedShards;
- targetedShards.reserve(shardDispatchResults.remoteCursors.size());
- for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
- targetedShards.emplace_back(remoteCursor->getShardId().toString());
- }
-
- std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
- boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
- if (shardDispatchResults.splitPipeline) {
- mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
- shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
- } else {
- // We have not split the pipeline, and will execute entirely on the remote shards. Set up an
- // empty local pipeline which we will attach the merge cursors stage to.
- mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx);
- }
-
- addMergeCursorsSource(mergePipeline.get(),
- shardDispatchResults.commandForTargetedShards,
- std::move(shardDispatchResults.remoteCursors),
- targetedShards,
- shardCursorsSortSpec,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- hasChangeStream);
-
- return mergePipeline;
-}
-
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss) {
// First, verify that there are shards present in the cluster. If not, then we return the
@@ -1035,4 +1036,34 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) {
return nss.isCollectionlessAggregateNS() || hasChangeStream;
}
+std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
+ PipelineDeleter(expCtx->opCtx));
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ return shardVersionRetry(
+ expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() {
+ auto pipelineToTarget = pipeline->clone();
+ if (!allowTargetingShards || expCtx->ns.db() == "local") {
+ // If the db is local, this may be a change stream examining the oplog. We know the
+ // oplog (and any other local collections) will not be sharded.
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ expCtx, pipeline.release());
+ }
+ return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release());
+ });
+}
+
+void logFailedRetryAttempt(StringData taskDescription, const DBException& exception) {
+ LOGV2_DEBUG(4553800,
+ 3,
+ "Retrying {task_description}. Got error: {exception}",
+ "task_description"_attr = taskDescription,
+ "exception"_attr = exception);
+}
} // namespace mongo::sharded_agg_helpers
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index d3b3e116e6f..c8bea34d92c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -149,20 +149,9 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- std::shared_ptr<executor::TaskExecutor> executor,
bool hasChangeStream);
/**
- * For a sharded collection, establishes remote cursors on each shard that may have results, and
- * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline
- * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might
- * be this node itself.
- */
-std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline);
-
-
-/**
* Returns the proper routing table to use for targeting shards: either a historical routing table
* based on the global read timestamp if there is an active transaction with snapshot level read
* concern or the latest routing table otherwise.
@@ -183,5 +172,85 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream);
*/
Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx);
+/**
+ * Uses sharded_agg_helpers to split the pipeline and dispatch half to the shards, leaving the
+ * merging half executing in this process after attaching a $mergeCursors. Will retry on network
+ * errors and also on StaleConfig errors to avoid restarting the entire operation.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards);
+
+/**
+ * Adds a log message with the given message. Simple helper to avoid defining the log component in a
+ * header file.
+ */
+void logFailedRetryAttempt(StringData taskDescription, const DBException&);
+
+/**
+ * A retry loop which handles errors in ErrorCategory::StaleShardVersionError. When such an error is
+ * encountered, the CatalogCache is marked for refresh and 'callback' is retried. When retried,
+ * 'callback' will trigger a refresh of the CatalogCache and block until it's done when it next
+ * consults the CatalogCache.
+ */
+template <typename F>
+auto shardVersionRetry(OperationContext* opCtx,
+ CatalogCache* catalogCache,
+ NamespaceString nss,
+ StringData taskDescription,
+ F&& callbackFn) {
+ size_t numAttempts = 0;
+ auto logAndTestMaxRetries = [&numAttempts, taskDescription](auto& exception) {
+ if (++numAttempts <= kMaxNumStaleVersionRetries) {
+ logFailedRetryAttempt(taskDescription, exception);
+ return true;
+ }
+ exception.addContext(str::stream()
+ << "Exceeded maximum number of " << kMaxNumStaleVersionRetries
+ << " retries attempting " << taskDescription);
+ return false;
+ };
+ while (true) {
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, numAttempts);
+ try {
+ return callbackFn();
+ } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
+ invariant(ex->getDb() == nss.db(),
+ str::stream() << "StaleDbVersion error on unexpected database. Expected "
+ << nss.db() << ", received " << ex->getDb());
+ // If the database version is stale, refresh its entry in the catalog cache.
+ catalogCache->onStaleDatabaseVersion(ex->getDb(), ex->getVersionReceived());
+ if (!logAndTestMaxRetries(ex)) {
+ throw;
+ }
+ } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) {
+ // If the exception provides a shardId, add it to the set of shards requiring a refresh.
+ // If the cache currently considers the collection to be unsharded, this will trigger an
+ // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh.
+ if (auto staleInfo = e.extraInfo<StaleConfigInfo>()) {
+ invariant(staleInfo->getNss() == nss,
+ str::stream() << "StaleConfig error on unexpected namespace. Expected "
+ << nss << ", received " << staleInfo->getNss());
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ opCtx,
+ nss,
+ staleInfo->getVersionWanted(),
+ staleInfo->getVersionReceived(),
+ staleInfo->getShardId());
+ } else {
+ catalogCache->onEpochChange(nss);
+ }
+ if (!logAndTestMaxRetries(e)) {
+ throw;
+ }
+ } catch (ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>& e) {
+ if (!logAndTestMaxRetries(e)) {
+ throw;
+ }
+ continue;
+ }
+ }
+}
} // namespace sharded_agg_helpers
} // namespace mongo
diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp
new file mode 100644
index 00000000000..b1590424fb7
--- /dev/null
+++ b/src/mongo/db/pipeline/sharded_union_test.cpp
@@ -0,0 +1,305 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_queue.h"
+#include "mongo/db/pipeline/document_source_union_with.h"
+#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h"
+#include "mongo/s/query/sharded_agg_test_fixture.h"
+#include "mongo/s/stale_exception.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// Use this new name to register these tests under their own unit test suite.
+using ShardedUnionTest = ShardedAggTestFixture;
+
+TEST_F(ShardedUnionTest, RetriesSubPipelineOnNetworkError) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+
+ auto pipeline = Pipeline::create(
+ {DocumentSourceMatch::create(fromjson("{_id: 'unionResult'}"), expCtx())}, expCtx());
+ auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline));
+ expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor());
+ auto queue = DocumentSourceQueue::create(expCtx());
+ unionWith.setSource(queue.get());
+
+ auto expectedResult = Document{{"_id"_sd, "unionResult"_sd}};
+
+ auto future = launchAsync([&] {
+ auto next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ auto result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"};
+ });
+
+ // Now schedule the response with the expected result.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+
+ auto pipeline = Pipeline::create(
+ {DocumentSourceMatch::create(fromjson("{_id: 'unionResult'}"), expCtx())}, expCtx());
+ auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline));
+ expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor());
+ auto queue = DocumentSourceQueue::create(expCtx());
+ unionWith.setSource(queue.get());
+
+ auto expectedResult = Document{{"_id"_sd, "unionResult"_sd}};
+
+ auto future = launchAsync([&] {
+ auto next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ auto result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ });
+
+ // Mock out one error response, then expect a refresh of the sharding catalog for that
+ // namespace, then mock out a successful response.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+
+ // Mock the expected config server queries.
+ const OID epoch = OID::gen();
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+ expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ ChunkVersion version(1, 0, epoch);
+
+ ChunkType chunk1(kTestAggregateNss,
+ {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
+ version,
+ {"0"});
+ chunk1.setName(OID::gen());
+ version.incMinor();
+
+ ChunkType chunk2(kTestAggregateNss,
+ {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"1"});
+ chunk2.setName(OID::gen());
+ version.incMinor();
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
+ }());
+
+ // That error should be retried, but only the one on that shard.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequiresIt) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ auto shards = setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+
+ auto&& parser = AccumulationStatement::getParser("$sum");
+ auto accumulatorArg = BSON("" << 1);
+ auto sumStatement =
+ parser(expCtx(), accumulatorArg.firstElement(), expCtx()->variablesParseState);
+ AccumulationStatement countStatement{"count", sumStatement};
+ auto pipeline = Pipeline::create(
+ {DocumentSourceMatch::create(fromjson("{_id: {$gte: 0}}"), expCtx()),
+ DocumentSourceGroup::create(
+ expCtx(), ExpressionConstant::create(expCtx(), Value(BSONNULL)), {countStatement})},
+ expCtx());
+ auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline));
+ expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor());
+ auto queue = DocumentSourceQueue::create(expCtx());
+ unionWith.setSource(queue.get());
+
+ auto expectedResult = Document{{"_id"_sd, BSONNULL}, {"count"_sd, 1}};
+
+ auto future = launchAsync([&] {
+ auto next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ auto result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ });
+
+ // With the $match at the front of the sub-pipeline, we should be able to target the request to
+ // just shard 1. Mock out an error response from that shard, then expect a refresh of the
+ // sharding catalog for that namespace.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(request.target, HostAndPort(shards[1].getHost()));
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+
+ // Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was
+ // created and moved to the first shard.
+ const OID epoch = OID::gen();
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+ expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ ChunkVersion version(1, 0, epoch);
+
+ ChunkType chunk1(kTestAggregateNss,
+ {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
+ version,
+ {shards[0].getName()});
+ chunk1.setName(OID::gen());
+ version.incMinor();
+
+ ChunkType chunk2(kTestAggregateNss,
+ {BSON("_id" << 0), BSON("_id" << 10)},
+ version,
+ {shards[1].getName()});
+ chunk2.setName(OID::gen());
+ version.incMinor();
+
+ ChunkType chunk3(kTestAggregateNss,
+ {BSON("_id" << 10), shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {shards[0].getName()});
+ chunk3.setName(OID::gen());
+
+ return std::vector<BSONObj>{
+ chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()};
+ }());
+
+ // That error should be retried, this time two shards.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(
+ kTestAggregateNss, CursorId{0}, {BSON("_id" << BSONNULL << "count" << 1)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(
+ kTestAggregateNss, CursorId{0}, {BSON("_id" << BSONNULL << "count" << 0)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNotRequire) {
+ // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ auto shards = setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss);
+
+ auto&& parser = AccumulationStatement::getParser("$sum");
+ auto accumulatorArg = BSON("" << 1);
+ auto sumExpression =
+ parser(expCtx(), accumulatorArg.firstElement(), expCtx()->variablesParseState);
+ AccumulationStatement countStatement{"count", sumExpression};
+ auto pipeline = Pipeline::create(
+ {DocumentSourceGroup::create(
+ expCtx(), ExpressionConstant::create(expCtx(), Value(BSONNULL)), {countStatement})},
+ expCtx());
+ auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline));
+ expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor());
+ auto queue = DocumentSourceQueue::create(expCtx());
+ unionWith.setSource(queue.get());
+
+ auto expectedResult = Document{{"_id"_sd, BSONNULL}, {"count"_sd, 1}};
+
+ auto future = launchAsync([&] {
+ auto next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ auto result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ ASSERT(unionWith.getNext().isEOF());
+ });
+
+ // Mock out an error response from both shards, then expect a refresh of the sharding catalog
+ // for that namespace, then mock out a successful response.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ });
+
+
+ // Mock the expected config server queries. Update the distribution so that all chunks are on
+ // the same shard.
+ const OID epoch = OID::gen();
+ const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+ expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern);
+ expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
+ ChunkVersion version(1, 0, epoch);
+
+ ChunkType chunk1(kTestAggregateNss,
+ {shardKeyPattern.getKeyPattern().globalMin(),
+ shardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {shards[0].getName()});
+ chunk1.setName(OID::gen());
+
+ return std::vector<BSONObj>{chunk1.toConfigBSON()};
+ }());
+
+ // That error should be retried, this time targetting only one shard.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(request.target, HostAndPort(shards[0].getHost()));
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+} // namespace
+} // namespace mongo