diff options
Diffstat (limited to 'src/mongo/db/pipeline')
31 files changed, 1027 insertions, 283 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index ecd9b51397c..4b8099b5734 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -341,6 +341,7 @@ env.CppUnitTest( 'accumulator_test.cpp', 'aggregation_request_test.cpp', 'dependencies_test.cpp', + 'dispatch_shard_pipeline_test.cpp', 'document_path_support_test.cpp', 'document_source_add_fields_test.cpp', 'document_source_bucket_auto_test.cpp', @@ -399,11 +400,10 @@ env.CppUnitTest( 'resume_token_test.cpp', 'semantic_analysis_test.cpp', 'sequential_document_cache_test.cpp', + 'sharded_union_test.cpp', 'tee_buffer_test.cpp', ], LIBDEPS=[ - "$BUILD_DIR/mongo/db/service_context_d", - "$BUILD_DIR/mongo/db/service_context_d_test_fixture", '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/exec/document_value/document_value', @@ -413,8 +413,11 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/db/service_context_d', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/s/catalog_cache_test_fixture', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/s/query/router_exec_stage', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', @@ -429,6 +432,7 @@ env.CppUnitTest( 'pipeline', 'process_interface/mongod_process_interfaces', 'process_interface/mongos_process_interface', + 'process_interface/shardsvr_process_interface', 'sharded_agg_helpers', ] ) diff --git a/src/mongo/db/pipeline/accumulator_js_test.cpp b/src/mongo/db/pipeline/accumulator_js_test.cpp index 8497e1ab9c8..f2c471b5536 100644 --- a/src/mongo/db/pipeline/accumulator_js_test.cpp +++ b/src/mongo/db/pipeline/accumulator_js_test.cpp @@ -46,8 +46,7 @@ namespace { class MapReduceFixture : public ServiceContextMongoDTest { protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())) { - _expCtx->mongoProcessInterface = - std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx); + _expCtx->mongoProcessInterface = std::make_shared<StandaloneProcessInterface>(nullptr); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp new file mode 100644 index 00000000000..b141c3b72a9 --- /dev/null +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/s/query/sharded_agg_test_fixture.h" + +namespace mongo { +namespace { + +// Use this new name to register these tests under their own unit test suite. +using DispatchShardPipelineTest = ShardedAggTestFixture; + +TEST_F(DispatchShardPipelineTest, DoesNotSplitPipelineIfTargetingOneShard) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + auto shards = setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + + auto stages = std::vector{ + fromjson("{$match: {_id: {$gte: 0}}}"), + fromjson("{$sort: {score: -1}}"), + fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"), + }; + auto pipeline = Pipeline::create( + {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx()); + const Document serializedCommand = + AggregationRequest(expCtx()->ns, stages).serializeToCommandObj(); + const bool hasChangeStream = false; + + auto future = launchAsync([&] { + auto results = sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, std::move(pipeline)); + ASSERT_EQ(results.remoteCursors.size(), 1UL); + ASSERT(!results.splitPipeline); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.target, HostAndPort(shards[1].getHost())); + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +TEST_F(DispatchShardPipelineTest, DoesSplitPipelineIfMatchSpansTwoShards) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + auto stages = std::vector{ + fromjson("{$match: {_id: {$gte: -10}}}"), + fromjson("{$sort: {score: -1}}"), + fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"), + }; + auto pipeline = Pipeline::create( + {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx()); + const Document serializedCommand = + AggregationRequest(expCtx()->ns, stages).serializeToCommandObj(); + const bool hasChangeStream = false; + + auto future = launchAsync([&] { + auto results = sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, std::move(pipeline)); + ASSERT_EQ(results.remoteCursors.size(), 2UL); + ASSERT(bool(results.splitPipeline)); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +TEST_F(DispatchShardPipelineTest, DispatchShardPipelineRetriesOnNetworkError) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + auto stages = std::vector{ + fromjson("{$match: {_id: {$gte: -10}}}"), + fromjson("{$sort: {score: -1}}"), + fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"), + }; + auto pipeline = Pipeline::create( + {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx()); + const Document serializedCommand = + AggregationRequest(expCtx()->ns, stages).serializeToCommandObj(); + const bool hasChangeStream = false; + auto future = launchAsync([&] { + // Shouldn't throw. + auto results = sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, std::move(pipeline)); + ASSERT_EQ(results.remoteCursors.size(), 2UL); + ASSERT(bool(results.splitPipeline)); + }); + + // Mock out responses, one success, one error. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + HostAndPort unreachableShard; + onCommand([&](const executor::RemoteCommandRequest& request) { + unreachableShard = request.target; + return Status{ErrorCodes::HostUnreachable, "Mock error: Couldn't find host"}; + }); + + // That error should be retried, but only the one on that shard. + onCommand([&](const executor::RemoteCommandRequest& request) { + // Test that it's retrying to the shard we failed earlier. + ASSERT_EQ(request.target, unreachableShard); + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + future.default_timed_get(); +} + +// Test that this helper is not responsible for retrying StaleConfig errors. This should happen at a +// higher level. +TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfigError) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + auto stages = std::vector{ + fromjson("{$match: {_id: {$gte: 0}}}"), + fromjson("{$sort: {score: -1}}"), + fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"), + }; + auto pipeline = Pipeline::create( + {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx()); + const Document serializedCommand = + AggregationRequest(expCtx()->ns, stages).serializeToCommandObj(); + const bool hasChangeStream = false; + auto future = launchAsync([&] { + ASSERT_THROWS_CODE(sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, std::move(pipeline)), + AssertionException, + ErrorCodes::StaleShardVersion); + }); + + // Mock out an error response. + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + future.default_timed_get(); +} + +TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + auto stages = std::vector{ + fromjson("{$match: {_id: {$gte: 0}}}"), + fromjson("{$sort: {score: -1}}"), + fromjson("{$group: {_id: '$username', high_scores: {$push: '$score'}}}"), + }; + auto pipeline = Pipeline::create( + {parseStage(stages[0]), parseStage(stages[1]), parseStage(stages[2])}, expCtx()); + const Document serializedCommand = + AggregationRequest(expCtx()->ns, stages).serializeToCommandObj(); + const bool hasChangeStream = false; + auto future = launchAsync([&] { + // Shouldn't throw. + auto results = sharded_agg_helpers::shardVersionRetry( + operationContext(), + Grid::get(getServiceContext())->catalogCache(), + kTestAggregateNss, + "dispatch shard pipeline"_sd, + [&]() { + return sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, pipeline->clone()); + }); + ASSERT_EQ(results.remoteCursors.size(), 1UL); + ASSERT(!bool(results.splitPipeline)); + }); + + // Mock out one error response, then expect a refresh of the sharding catalog for that + // namespace, then mock out a successful response. + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + + // Mock the expected config server queries. + const OID epoch = OID::gen(); + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + ChunkVersion version(1, 0, epoch); + + ChunkType chunk1(kTestAggregateNss, + {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, + version, + {"0"}); + chunk1.setName(OID::gen()); + version.incMinor(); + + ChunkType chunk2(kTestAggregateNss, + {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, + version, + {"1"}); + chunk2.setName(OID::gen()); + version.incMinor(); + + return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + }()); + + // That error should be retried, but only the one on that shard. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, std::vector<BSONObj>{}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 1152e92f5f4..3aa5a322a72 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -293,10 +293,6 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson( auto pipeline = Pipeline::parse(rawFacet.second, expCtx, [](const Pipeline& pipeline) { auto sources = pipeline.getSources(); - uassert(ErrorCodes::BadValue, - "sub-pipeline in $facet stage cannot be empty", - !sources.empty()); - std::for_each(sources.begin(), sources.end(), [](auto& stage) { auto stageConstraints = stage->constraints(); uassert(40600, diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 6316452c22b..37aac48d500 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -112,17 +112,6 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectNonArrayFacets) { AssertionException); } -TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) { - auto ctx = getExpCtx(); - auto spec = BSON("$facet" << BSON("a" << BSONArray())); - ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), - AssertionException); - - spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << BSONArray())); - ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), - AssertionException); -} - TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) { auto ctx = getExpCtx(); auto spec = fromjson("{$facet: {a: [{$match: {}}]}}"); @@ -372,6 +361,34 @@ TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) { ASSERT(facetStage->getNext().isEOF()); } +TEST_F(DocumentSourceFacetTest, ShouldAcceptEmptyPipelines) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSONArray())); + + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; + auto mock = DocumentSourceMock::createForTest(inputs); + + auto facetStage = DocumentSourceFacet::createFromBson(spec.firstElement(), ctx); + facetStage->setSource(mock.get()); + + auto output = facetStage->getNext(); + + // The output fields are in no guaranteed order. + vector<Value> expectedOutputs; + for (auto&& input : inputs) { + expectedOutputs.emplace_back(input.releaseDocument()); + } + ASSERT(output.isAdvanced()); + ASSERT_EQ(output.getDocument().computeSize(), 1ULL); + ASSERT_VALUE_EQ(output.getDocument()["a"], Value(expectedOutputs)); + + // Should be exhausted now. + ASSERT(facetStage->getNext().isEOF()); + ASSERT(facetStage->getNext().isEOF()); + ASSERT(facetStage->getNext().isEOF()); +} + TEST_F(DocumentSourceFacetTest, ShouldCorrectlyHandleSubPipelinesYieldingDifferentNumbersOfResults) { auto ctx = getExpCtx(); diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index f5763d42987..e5f51c1883d 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -80,6 +80,7 @@ public: DocumentSourceMergeCursorsTest() { TimeZoneDatabase::set(getServiceContext(), std::make_unique<TimeZoneDatabase>()); _expCtx = new ExpressionContext(operationContext(), nullptr, kTestNss); + _expCtx->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>(executor()); } void setUp() override { @@ -206,8 +207,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); armParams.setRemotes(std::move(cursors)); auto pipeline = Pipeline::create({}, expCtx); - auto mergeCursorsStage = - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx); + auto mergeCursorsStage = DocumentSourceMergeCursors::create(expCtx, std::move(armParams)); ASSERT_TRUE(mergeCursorsStage->getNext().isEOF()); } @@ -230,8 +230,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); armParams.setRemotes(std::move(cursors)); auto pipeline = Pipeline::create({}, expCtx); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams))); // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block // waiting for network responses, which we will manually schedule below. @@ -279,8 +278,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfTheyAreNotOwned) { makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); armParams.setRemotes(std::move(cursors)); auto pipeline = Pipeline::create({}, expCtx); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams))); auto mergeCursors = checked_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()); @@ -301,8 +299,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); armParams.setRemotes(std::move(cursors)); auto pipeline = Pipeline::create({}, expCtx); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams))); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. @@ -347,8 +344,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { cursors.emplace_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + pipeline->addInitialSource(DocumentSourceMergeCursors::create(expCtx, std::move(armParams))); // After optimization we should only have a $mergeCursors stage. pipeline->optimizePipeline(); diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index ad9b7acc9d0..43a6e22403d 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -157,27 +157,37 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::createFromBson( } DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { - if (!_sourceExhausted) { + if (!_pipeline) { + // We must have already been disposed, so we're finished. + return GetNextResult::makeEOF(); + } + + if (_executionState == ExecutionProgress::kIteratingSource) { auto nextInput = pSource->getNext(); if (!nextInput.isEOF()) { return nextInput; } - _sourceExhausted = true; + _executionState = ExecutionProgress::kStartingSubPipeline; // All documents from the base collection have been returned, switch to iterating the sub- // pipeline by falling through below. } - if (!_cursorAttached) { - auto ctx = _pipeline->getContext(); - _pipeline = - pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(ctx, _pipeline.release()); - _cursorAttached = true; - LOGV2_DEBUG(23869, 3, "$unionWith attached cursor to pipeline"); + if (_executionState == ExecutionProgress::kStartingSubPipeline) { + LOGV2_DEBUG(23869, + 3, + "$unionWith attaching cursor to pipeline {pipeline}", + "pipeline"_attr = Value(_pipeline->serialize())); + auto expCtxCopy = _pipeline->getContext(); + _pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + expCtxCopy, _pipeline.release()); + _executionState = ExecutionProgress::kIteratingSubPipeline; } - if (auto res = _pipeline->getNext()) + auto res = _pipeline->getNext(); + if (res) return std::move(*res); + _executionState = ExecutionProgress::kFinished; return GetNextResult::makeEOF(); } diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index ade72532935..2dd26f4aa55 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -136,6 +136,22 @@ protected: void doDispose() final; private: + enum ExecutionProgress { + // We haven't yet iterated 'pSource' to completion. + kIteratingSource, + + // We finished iterating 'pSource', but haven't started on the sub pipeline and need to do + // some setup first. + kStartingSubPipeline, + + // We finished iterating 'pSource' and are now iterating '_pipeline', but haven't finished + // yet. + kIteratingSubPipeline, + + // There are no more results. + kFinished + }; + /** * Should not be called; use serializeToArray instead. */ @@ -144,9 +160,9 @@ private: } std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; - bool _sourceExhausted = false; bool _cursorAttached = false; bool _usedDisk = false; + ExecutionProgress _executionState = ExecutionProgress::kIteratingSource; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index 61412ad421c..29d38f916bf 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -324,6 +324,23 @@ TEST_F(DocumentSourceUnionWithTest, PropagatePauses) { ASSERT_TRUE(unionWithTwo.getNext().isEOF()); } +TEST_F(DocumentSourceUnionWithTest, ReturnEOFAfterBeingDisposed) { + const auto mockInput = DocumentSourceMock::createForTest({Document(), Document()}); + const auto mockUnionInput = std::deque<DocumentSource::GetNextResult>{}; + const auto mockCtx = getExpCtx()->copyWith({}); + mockCtx->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockUnionInput); + auto unionWith = DocumentSourceUnionWith( + mockCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); + unionWith.setSource(mockInput.get()); + + ASSERT_TRUE(unionWith.getNext().isAdvanced()); + + unionWith.dispose(); + ASSERT_TRUE(unionWith.getNext().isEOF()); + ASSERT_TRUE(unionWith.getNext().isEOF()); + ASSERT_TRUE(unionWith.getNext().isEOF()); +} + TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsFullDoc) { auto expCtx = getExpCtx(); const auto replaceRoot = diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 10d71ef185a..fcfcf7ee159 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -80,6 +80,21 @@ public: : ExpressionContext( opCtx, request, nullptr, std::make_shared<StubMongoProcessInterface>(), {}, {}) {} + ExpressionContextForTest(OperationContext* opCtx, NamespaceString nss) + : ExpressionContext(opCtx, + boost::none, // explain + false, // fromMongos, + false, // needsMerge, + false, // allowDiskUse, + false, // bypassDocumentValidation, + false, // isMapReduce + nss, + RuntimeConstants(Date_t::now(), Timestamp(1, 0)), + {}, // collator + std::make_shared<StubMongoProcessInterface>(), + {}, // resolvedNamespaces + {} // collUUID + ) {} /** * Sets the resolved definition for an involved namespace. */ diff --git a/src/mongo/db/pipeline/expression_javascript_test.cpp b/src/mongo/db/pipeline/expression_javascript_test.cpp index 10203cc523d..3df1f10d436 100644 --- a/src/mongo/db/pipeline/expression_javascript_test.cpp +++ b/src/mongo/db/pipeline/expression_javascript_test.cpp @@ -47,8 +47,7 @@ class MapReduceFixture : public ServiceContextMongoDTest { protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())), _vps(_expCtx->variablesParseState) { - _expCtx->mongoProcessInterface = - std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx); + _expCtx->mongoProcessInterface = std::make_shared<StandaloneProcessInterface>(nullptr); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index eb3a4d97914..ce3e9e89fa4 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -148,6 +148,24 @@ Pipeline::~Pipeline() { invariant(_disposed); } +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::clone() const { + const auto& serialized = serialize(); + std::vector<BSONObj> asBson; + asBson.reserve(serialized.size()); + for (auto&& stage : serialized) { + invariant(stage.getType() == BSONType::Object); + asBson.push_back(stage.getDocument().toBson()); + } + try { + return parse(asBson, getContext()); + } catch (DBException& ex) { + ex.addContext(str::stream() + << "Failed to copy pipeline. Could not parse serialized version: " + << Value(serialized).toString()); + throw; + } +} + std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 5eb096fc7e2..93bcf7a4bbe 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -146,6 +146,8 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}); + std::unique_ptr<Pipeline, PipelineDeleter> clone() const; + const boost::intrusive_ptr<ExpressionContext>& getContext() const { return pCtx; } diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript index 7f7697bcf6b..6f455ff74c8 100644 --- a/src/mongo/db/pipeline/process_interface/SConscript +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -76,6 +76,7 @@ env.Library( 'mongod_process_interface_factory.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/s/sharding_router_api', 'mongod_process_interfaces', 'shardsvr_process_interface', ], @@ -88,6 +89,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', '$BUILD_DIR/mongo/s/query/async_results_merger', '$BUILD_DIR/mongo/s/query/cluster_query', 'common_process_interface', diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 640a69bd3d7..565d64b1d4f 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -143,8 +143,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -CommonMongodProcessInterface::CommonMongodProcessInterface(OperationContext* opCtx) {} - std::unique_ptr<TransactionHistoryIteratorBase> CommonMongodProcessInterface::createTransactionHistoryIterator(repl::OpTime time) const { bool permitYield = true; diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index 64fcde8a913..cc6b032f2a7 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -49,7 +49,7 @@ using write_ops::Update; */ class CommonMongodProcessInterface : public CommonProcessInterface { public: - CommonMongodProcessInterface(OperationContext* opCtx); + using CommonProcessInterface::CommonProcessInterface; virtual ~CommonMongodProcessInterface() = default; diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 2184e4b2ee6..ebe970987c8 100644 --- a/src/mongo/db/pipeline/process_interface/common_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -42,6 +42,7 @@ namespace mongo { */ class CommonProcessInterface : public MongoProcessInterface { public: + using MongoProcessInterface::MongoProcessInterface; virtual ~CommonProcessInterface() = default; /** diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 3f0cfee2f37..25210681f9d 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -52,6 +52,7 @@ #include "mongo/db/resource_yielder.h" #include "mongo/db/storage/backup_cursor_hooks.h" #include "mongo/db/storage/backup_cursor_state.h" +#include "mongo/executor/task_executor.h" #include "mongo/s/chunk_version.h" namespace mongo { @@ -116,6 +117,9 @@ public: int64_t nModified{0}; }; + MongoProcessInterface(std::shared_ptr<executor::TaskExecutor> executor) + : taskExecutor(std::move(executor)) {} + virtual ~MongoProcessInterface(){}; /** @@ -420,6 +424,8 @@ public: boost::optional<std::set<FieldPath>> fieldPaths, boost::optional<ChunkVersion> targetCollectionVersion, const NamespaceString& outputNs) const = 0; + + std::shared_ptr<executor::TaskExecutor> taskExecutor; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp index 744222380f9..80b330149ca 100644 --- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp +++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp @@ -35,18 +35,21 @@ #include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" #include "mongo/db/pipeline/process_interface/standalone_process_interface.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/grid.h" namespace mongo { namespace { std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(OperationContext* opCtx) { if (ShardingState::get(opCtx)->enabled()) { - return std::make_shared<ShardServerProcessInterface>(opCtx); + return std::make_shared<ShardServerProcessInterface>( + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); } else if (getTestCommandsEnabled()) { if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) - return std::make_shared<ReplicaSetNodeProcessInterface>(opCtx, executor); + return std::make_shared<ReplicaSetNodeProcessInterface>(std::move(executor)); } - return std::make_shared<StandaloneProcessInterface>(opCtx); + return std::make_shared<StandaloneProcessInterface>(nullptr); } auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION( diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 906e6c3af50..09fc0fc00d0 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/process_interface/mongos_process_interface.h" @@ -50,6 +52,7 @@ #include "mongo/s/query/router_exec_stage.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" +#include "mongo/util/log.h" namespace mongo { @@ -100,14 +103,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - bool allowTargetingShards) { - // On mongos we can't have local cursors. - return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline); -} - boost::optional<Document> MongosProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -134,94 +129,79 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( cmdBuilder.append("allowSpeculativeMajorityRead", true); } - auto shardResults = std::vector<RemoteCursor>(); - auto findCmd = cmdBuilder.obj(); - for (size_t numRetries = 0; numRetries <= kMaxNumStaleVersionRetries; ++numRetries) { - // Obtain the catalog cache. If we are retrying after a stale shard error, mark this - // operation as needing to block on the next catalog cache refresh. + try { + auto findCmd = cmdBuilder.obj(); auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, numRetries); - - // Verify that the collection exists, with the correct UUID. - auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); - if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { - return boost::none; - } - auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); - - // Finalize the 'find' command object based on the routing table information. - if (findCmdIsByUuid && routingInfo.cm()) { - // Find by UUID and shard versioning do not work together (SERVER-31946). In the - // sharded case we've already checked the UUID, so find by namespace is safe. In the - // unlikely case that the collection has been deleted and a new collection with the same - // name created through a different mongos, the shard version will be detected as stale, - // as shard versions contain an 'epoch' field unique to the collection. - findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement()); - findCmdIsByUuid = false; - } - - try { - // Build the versioned requests to be dispatched to the shards. Typically, only a single - // shard will be targeted here; however, in certain cases where only the _id is present, - // we may need to scatter-gather the query to all shards in order to find the document. - auto requests = getVersionedRequestsForTargetedShards( - expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec); - - // Dispatch the requests. The 'establishCursors' method conveniently prepares the result - // into a vector of cursor responses for us. - shardResults = establishCursors( - expCtx->opCtx, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - ReadPreferenceSetting::get(expCtx->opCtx), - std::move(requests), - false); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { - // If it's an unsharded collection which has been deleted and re-created, we may get a - // NamespaceNotFound error when looking up by UUID. - return boost::none; - } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) { - // If the database version is stale, refresh its entry in the catalog cache. - catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion()); - continue; // Try again if allowed. - } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) { - // If the exception provides a shardId, add it to the set of shards requiring a refresh. - // If the cache currently considers the collection to be unsharded, this will trigger an - // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh. - auto staleInfo = e.extraInfo<StaleConfigInfo>(); - if (auto staleShardId = (staleInfo ? staleInfo->getShardId() : boost::none)) { - catalogCache->onStaleShardVersion(std::move(routingInfo), *staleShardId); - } else { - catalogCache->onEpochChange(nss); - } - continue; // Try again if allowed. - } catch (const ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>&) { - continue; + auto shardResults = sharded_agg_helpers::shardVersionRetry( + expCtx->opCtx, + catalogCache, + expCtx->ns, + str::stream() << "Looking up document matching " << redact(filter.toBson()), + [&]() -> std::vector<RemoteCursor> { + // Verify that the collection exists, with the correct UUID. + auto routingInfo = uassertStatusOK(getCollectionRoutingInfo(foreignExpCtx)); + + // Finalize the 'find' command object based on the routing table information. + if (findCmdIsByUuid && routingInfo.cm()) { + // Find by UUID and shard versioning do not work together (SERVER-31946). In + // the sharded case we've already checked the UUID, so find by namespace is + // safe. In the unlikely case that the collection has been deleted and a new + // collection with the same name created through a different mongos, the shard + // version will be detected as stale, as shard versions contain an 'epoch' field + // unique to the collection. + findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement()); + findCmdIsByUuid = false; + } + + // Build the versioned requests to be dispatched to the shards. Typically, only a + // single shard will be targeted here; however, in certain cases where only the _id + // is present, we may need to scatter-gather the query to all shards in order to + // find the document. + auto requests = getVersionedRequestsForTargetedShards(expCtx->opCtx, + nss, + routingInfo, + findCmd, + filterObj, + CollationSpec::kSimpleSpec); + + // Dispatch the requests. The 'establishCursors' method conveniently prepares the + // result into a vector of cursor responses for us. + return establishCursors( + expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + ReadPreferenceSetting::get(expCtx->opCtx), + std::move(requests), + false); + }); + + // Iterate all shard results and build a single composite batch. We also enforce the + // requirement that only a single document should have been returned from across the + // cluster. + std::vector<BSONObj> finalBatch; + for (auto&& shardResult : shardResults) { + auto& shardCursor = shardResult.getCursorResponse(); + finalBatch.insert( + finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end()); + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.getHostAndPort() + << ", id: " << shardCursor.getCursorId(), + shardCursor.getCursorId() == 0); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "found more than one document matching " << filter.toString() + << " [" << finalBatch.begin()->toString() << ", " + << std::next(finalBatch.begin())->toString() << "]", + finalBatch.size() <= 1u); } - break; // Success! - } - // Iterate all shard results and build a single composite batch. We also enforce the requirement - // that only a single document should have been returned from across the cluster. - std::vector<BSONObj> finalBatch; - for (auto&& shardResult : shardResults) { - auto& shardCursor = shardResult.getCursorResponse(); - finalBatch.insert( - finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end()); - // We should have at most 1 result, and the cursor should be exhausted. - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.getHostAndPort() - << ", id: " << shardCursor.getCursorId(), - shardCursor.getCursorId() == 0); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "found more than one document matching " << filter.toString() - << " [" << finalBatch.begin()->toString() << ", " - << std::next(finalBatch.begin())->toString() << "]", - finalBatch.size() <= 1u); + return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{}); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { + // If it's an unsharded collection which has been deleted and re-created, we may get a + // NamespaceNotFound error when looking up by UUID. + return boost::none; } - - return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{}); } BSONObj MongosProcessInterface::_reportCurrentOpForClient( @@ -256,10 +236,11 @@ void MongosProcessInterface::_reportCurrentOpsForIdleSessions(OperationContext* const bool authEnabled = AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); - // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to - // create a pattern that will match against all authenticated usernames for the current client. - // If the user is listing ops for all users, we create an empty pattern; constructing an - // instance of SessionKiller::Matcher with this empty pattern will return all sessions. + // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers + // to create a pattern that will match against all authenticated usernames for the current + // client. If the user is listing ops for all users, we create an empty pattern; + // constructing an instance of SessionKiller::Matcher with this empty pattern will return + // all sessions. auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers ? makeSessionFilterForAuthenticatedUsers(opCtx) : KillAllSessionsByPatternSet{{}}); @@ -334,24 +315,24 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( "Cannot find index to verify that join fields will be unique", fieldsHaveSupportingUniqueIndex(expCtx, outputNs, *fieldPaths)); - // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for the - // shards since we are not at risk of 'guessing' the wrong shard key. + // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for + // the shards since we are not at risk of 'guessing' the wrong shard key. return {*fieldPaths, boost::none}; } // In case there are multiple shards which will perform this stage in parallel, we need to - // figure out and attach the collection's shard version to ensure each shard is talking about - // the same version of the collection. This mongos will coordinate that. We force a catalog - // refresh to do so because there is no shard versioning protocol on this namespace and so we - // otherwise could not be sure this node is (or will become) at all recent. We will also - // figure out and attach the 'joinFields' to send to the shards. - - // There are edge cases when the collection could be dropped or re-created during or near the - // time of the operation (for example, during aggregation). This is okay - we are mostly + // figure out and attach the collection's shard version to ensure each shard is talking + // about the same version of the collection. This mongos will coordinate that. We force a + // catalog refresh to do so because there is no shard versioning protocol on this namespace + // and so we otherwise could not be sure this node is (or will become) at all recent. We + // will also figure out and attach the 'joinFields' to send to the shards. + + // There are edge cases when the collection could be dropped or re-created during or near + // the time of the operation (for example, during aggregation). This is okay - we are mostly // paranoid that this mongos is very stale and want to prevent returning an error if the - // collection was dropped a long time ago. Because of this, we are okay with piggy-backing off - // another thread's request to refresh the cache, simply waiting for that request to return - // instead of forcing another refresh. + // collection was dropped a long time ago. Because of this, we are okay with piggy-backing + // off another thread's request to refresh the cache, simply waiting for that request to + // return instead of forcing another refresh. targetCollectionVersion = refreshAndGetCollectionVersion(expCtx, outputNs); auto docKeyPaths = collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, outputNs); @@ -360,4 +341,11 @@ MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( targetCollectionVersion}; } +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 252ee20d0a6..16bfc0dfe80 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -41,7 +41,7 @@ namespace mongo { */ class MongosProcessInterface : public CommonProcessInterface { public: - MongosProcessInterface() = default; + using CommonProcessInterface::CommonProcessInterface; virtual ~MongosProcessInterface() = default; @@ -148,11 +148,6 @@ public: MONGO_UNREACHABLE; } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline, - bool allowTargetingShards = true) final; - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { // It is not meaningful to perform a "local read" on mongos. @@ -223,6 +218,17 @@ public: boost::optional<ChunkVersion> targetCollectionVersion, const NamespaceString& outputNs) const override; + /** + * If 'allowTargetingShards' is true, splits the pipeline and dispatch half to the shards, + * leaving the merging half executing in this process after attaching a $mergeCursors. Will + * retry on network errors and also on StaleConfig errors to avoid restarting the entire + * operation. + */ + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline, + bool allowTargetingShards) final; + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp index 63d69d93eab..e2d64f29eb5 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp @@ -56,7 +56,7 @@ public: } auto makeProcessInterface() { - return std::make_unique<MongosProcessInterfaceForTest>(); + return std::make_unique<MongosProcessInterfaceForTest>(nullptr); } }; diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index d6fa54b353a..17a2c69a207 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -48,21 +48,21 @@ namespace mongo { namespace { const auto replicaSetNodeExecutor = - ServiceContext::declareDecoration<std::unique_ptr<executor::TaskExecutor>>(); + ServiceContext::declareDecoration<std::shared_ptr<executor::TaskExecutor>>(); } // namespace -executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( +std::shared_ptr<executor::TaskExecutor> ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( ServiceContext* service) { - return replicaSetNodeExecutor(service).get(); + return replicaSetNodeExecutor(service); } -executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( +std::shared_ptr<executor::TaskExecutor> ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( OperationContext* opCtx) { return getReplicaSetNodeExecutor(opCtx->getServiceContext()); } void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( - ServiceContext* service, std::unique_ptr<executor::TaskExecutor> executor) { + ServiceContext* service, std::shared_ptr<executor::TaskExecutor> executor) { replicaSetNodeExecutor(service) = std::move(executor); } @@ -196,7 +196,7 @@ StatusWith<BSONObj> ReplicaSetNodeProcessInterface::_executeCommandOnPrimary( auto [promise, future] = makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>(); auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>( std::move(promise)); - auto scheduleResult = _executor->scheduleRemoteCommand( + auto scheduleResult = taskExecutor->scheduleRemoteCommand( std::move(request), [promisePtr](const auto& args) { promisePtr->emplaceValue(args); }); if (!scheduleResult.isOK()) { // Since the command failed to be scheduled, the callback above did not and will not run. diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 1d7c7b8539d..22af49f29a3 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -44,15 +44,17 @@ class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterfa public: using NonShardServerProcessInterface::NonShardServerProcessInterface; - static executor::TaskExecutor* getReplicaSetNodeExecutor(ServiceContext* service); + static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor( + ServiceContext* service); - static executor::TaskExecutor* getReplicaSetNodeExecutor(OperationContext* opCtx); + static std::shared_ptr<executor::TaskExecutor> getReplicaSetNodeExecutor( + OperationContext* opCtx); static void setReplicaSetNodeExecutor(ServiceContext* service, - std::unique_ptr<executor::TaskExecutor> executor); + std::shared_ptr<executor::TaskExecutor> executor); - ReplicaSetNodeProcessInterface(OperationContext* opCtx, executor::TaskExecutor* executor) - : NonShardServerProcessInterface(opCtx), _executor(executor) {} + ReplicaSetNodeProcessInterface(std::shared_ptr<executor::TaskExecutor> executor) + : NonShardServerProcessInterface(std::move(executor)) {} virtual ~ReplicaSetNodeProcessInterface() = default; @@ -112,8 +114,6 @@ private: * immediately stale upon return. */ bool _canWriteLocally(OperationContext* opCtx, const NamespaceString& ns) const; - - executor::TaskExecutor* _executor; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index d93a18bfb5f..80c72ffb326 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -47,7 +47,6 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" -#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/write_ops/cluster_write.h" namespace mongo { @@ -145,23 +144,6 @@ StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::upd return {{response.getN(), response.getNModified()}}; } -unique_ptr<Pipeline, PipelineDeleter> ShardServerProcessInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - bool allowTargetingShards) { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); - - if (!allowTargetingShards || expCtx->ns.db() == "local") { - // If the db is local, this may be a change stream examining the oplog. We know the oplog - // (and any other local collections) will not be sharded. - return attachCursorSourceToPipelineForLocalRead(expCtx, pipeline.release()); - } - return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, pipeline.release()); -} - std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const { auto collectionFilter = @@ -315,4 +297,12 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, << "write concern failed while running command " << cmdObj); } +std::unique_ptr<Pipeline, PipelineDeleter> +ShardServerProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return sharded_agg_helpers::attachCursorToPipeline(expCtx, ownedPipeline, allowTargetingShards); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 4c411cad35b..592d8043017 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -87,11 +87,6 @@ public: bool multi, boost::optional<OID> targetEpoch) final; - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline, - bool allowTargetingShards = true) final; - std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final; @@ -110,6 +105,17 @@ public: const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) final; void dropCollection(OperationContext* opCtx, const NamespaceString& collection) final; + + /** + * If 'allowTargetingShards' is true, splits the pipeline and dispatch half to the shards, + * leaving the merging half executing in this process after attaching a $mergeCursors. Will + * retry on network errors and also on StaleConfig errors to avoid restarting the entire + * operation. + */ + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline, + bool allowTargetingShards) final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp index 12fe460d674..82c36b40ca7 100644 --- a/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp @@ -59,7 +59,7 @@ public: class ProcessInterfaceStandaloneTest : public AggregationContextFixture { public: auto makeProcessInterface() { - return std::make_unique<MongoProcessInterfaceForTest>(getExpCtx()->opCtx); + return std::make_unique<MongoProcessInterfaceForTest>(nullptr); } }; diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 2d2d798ab92..f6a19dd9851 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -44,6 +44,8 @@ namespace mongo { */ class StubMongoProcessInterface : public MongoProcessInterface { public: + StubMongoProcessInterface() : MongoProcessInterface(nullptr) {} + using MongoProcessInterface::MongoProcessInterface; virtual ~StubMongoProcessInterface() = default; std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index b68ad0edc34..aace58f534b 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -48,7 +48,6 @@ #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/semantic_analysis.h" -#include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/cluster_commands_helpers.h" @@ -110,7 +109,7 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi aggReq.setBatchSize(0); auto configCursor = establishCursors(expCtx->opCtx, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + expCtx->mongoProcessInterface->taskExecutor, aggReq.getNamespaceString(), ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}}, @@ -165,6 +164,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, std::vector<RemoteCursor> establishShardCursors( OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, const NamespaceString& nss, bool hasChangeStream, boost::optional<CachedCollectionRoutingInfo>& routingInfo, @@ -216,7 +216,7 @@ std::vector<RemoteCursor> establishShardCursors( } return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + std::move(executor), nss, readPref, requests, @@ -581,6 +581,72 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { } } +/** + * For a sharded collection, establishes remote cursors on each shard that may have results, and + * creates a DocumentSourceMergeCursors stage to merge the remote cursors. Returns a pipeline + * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might + * be this node itself. + */ +std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, + PipelineDeleter(expCtx->opCtx)); + + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + std::vector<BSONObj> rawStages = [&pipeline]() { + auto serialization = pipeline->serialize(); + std::vector<BSONObj> stages; + stages.reserve(serialization.size()); + + for (const auto& stageObj : serialization) { + invariant(stageObj.getType() == BSONType::Object); + stages.push_back(stageObj.getDocument().toBson()); + } + + return stages; + }(); + + AggregationRequest aggRequest(expCtx->ns, rawStages); + + // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set + // on the originating command, so copy it from the ExpressionContext. + aggRequest.setAllowDiskUse(expCtx->allowDiskUse); + + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto hasChangeStream = liteParsedPipeline.hasChangeStream(); + auto shardDispatchResults = dispatchShardPipeline( + aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline)); + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + boost::optional<BSONObj> shardCursorsSortSpec = boost::none; + if (shardDispatchResults.splitPipeline) { + mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } else { + // We have not split the pipeline, and will execute entirely on the remote shards. Set up an + // empty local pipeline which we will attach the merge cursors stage to. + mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx); + } + + addMergeCursorsSource(mergePipeline.get(), + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardCursorsSortSpec, + hasChangeStream); + + return mergePipeline; +} + } // namespace boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationContext* opCtx, @@ -840,6 +906,7 @@ DispatchShardPipelineResults dispatchShardPipeline( } } else { cursors = establishShardCursors(opCtx, + expCtx->mongoProcessInterface->taskExecutor, expCtx->ns, hasChangeStream, executionNsRoutingInfo, @@ -886,7 +953,6 @@ void addMergeCursorsSource(Pipeline* mergePipeline, std::vector<OwnedRemoteCursor> ownedCursors, const std::vector<ShardId>& targetedShards, boost::optional<BSONObj> shardCursorsSortSpec, - std::shared_ptr<executor::TaskExecutor> executor, bool hasChangeStream) { auto* opCtx = mergePipeline->getContext()->opCtx; AsyncResultsMergerParams armParams; @@ -924,82 +990,17 @@ void addMergeCursorsSource(Pipeline* mergePipeline, // For change streams, we need to set up a custom stage to establish cursors on new shards when // they are added, to ensure we don't miss results from the new shards. - auto mergeCursorsStage = DocumentSourceMergeCursors::create( - std::move(executor), std::move(armParams), mergePipeline->getContext()); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(mergePipeline->getContext(), std::move(armParams)); if (hasChangeStream) { mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( - mergePipeline->getContext(), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - mergeCursorsStage, - targetedShards, - cmdSentToShards)); + mergePipeline->getContext(), mergeCursorsStage, targetedShards, cmdSentToShards)); } mergePipeline->addInitialSource(std::move(mergeCursorsStage)); } -std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { - std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, - PipelineDeleter(expCtx->opCtx)); - - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); - - // Generate the command object for the targeted shards. - std::vector<BSONObj> rawStages = [&pipeline]() { - auto serialization = pipeline->serialize(); - std::vector<BSONObj> stages; - stages.reserve(serialization.size()); - - for (const auto& stageObj : serialization) { - invariant(stageObj.getType() == BSONType::Object); - stages.push_back(stageObj.getDocument().toBson()); - } - - return stages; - }(); - - AggregationRequest aggRequest(expCtx->ns, rawStages); - - // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set - // on the originating command, so copy it from the ExpressionContext. - aggRequest.setAllowDiskUse(expCtx->allowDiskUse); - - LiteParsedPipeline liteParsedPipeline(aggRequest); - auto hasChangeStream = liteParsedPipeline.hasChangeStream(); - auto shardDispatchResults = dispatchShardPipeline( - aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline)); - - std::vector<ShardId> targetedShards; - targetedShards.reserve(shardDispatchResults.remoteCursors.size()); - for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { - targetedShards.emplace_back(remoteCursor->getShardId().toString()); - } - - std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; - boost::optional<BSONObj> shardCursorsSortSpec = boost::none; - if (shardDispatchResults.splitPipeline) { - mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); - shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; - } else { - // We have not split the pipeline, and will execute entirely on the remote shards. Set up an - // empty local pipeline which we will attach the merge cursors stage to. - mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx); - } - - addMergeCursorsSource(mergePipeline.get(), - shardDispatchResults.commandForTargetedShards, - std::move(shardDispatchResults.remoteCursors), - targetedShards, - shardCursorsSortSpec, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - hasChangeStream); - - return mergePipeline; -} - StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, const NamespaceString& execNss) { // First, verify that there are shards present in the cluster. If not, then we return the @@ -1035,4 +1036,34 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) { return nss.isCollectionlessAggregateNS() || hasChangeStream; } +std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, + PipelineDeleter(expCtx->opCtx)); + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + return shardVersionRetry( + expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { + auto pipelineToTarget = pipeline->clone(); + if (!allowTargetingShards || expCtx->ns.db() == "local") { + // If the db is local, this may be a change stream examining the oplog. We know the + // oplog (and any other local collections) will not be sharded. + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + expCtx, pipeline.release()); + } + return targetShardsAndAddMergeCursors(expCtx, pipelineToTarget.release()); + }); +} + +void logFailedRetryAttempt(StringData taskDescription, const DBException& exception) { + LOGV2_DEBUG(4553800, + 3, + "Retrying {task_description}. Got error: {exception}", + "task_description"_attr = taskDescription, + "exception"_attr = exception); +} } // namespace mongo::sharded_agg_helpers diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index d3b3e116e6f..c8bea34d92c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -149,20 +149,9 @@ void addMergeCursorsSource(Pipeline* mergePipeline, std::vector<OwnedRemoteCursor> ownedCursors, const std::vector<ShardId>& targetedShards, boost::optional<BSONObj> shardCursorsSortSpec, - std::shared_ptr<executor::TaskExecutor> executor, bool hasChangeStream); /** - * For a sharded collection, establishes remote cursors on each shard that may have results, and - * creates a DocumentSourceMergeCursors stage to merge the remove cursors. Returns a pipeline - * beginning with that DocumentSourceMergeCursors stage. Note that one of the 'remote' cursors might - * be this node itself. - */ -std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline); - - -/** * Returns the proper routing table to use for targeting shards: either a historical routing table * based on the global read timestamp if there is an active transaction with snapshot level read * concern or the latest routing table otherwise. @@ -183,5 +172,85 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream); */ Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx); +/** + * Uses sharded_agg_helpers to split the pipeline and dispatch half to the shards, leaving the + * merging half executing in this process after attaching a $mergeCursors. Will retry on network + * errors and also on StaleConfig errors to avoid restarting the entire operation. + */ +std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards); + +/** + * Adds a log message with the given message. Simple helper to avoid defining the log component in a + * header file. + */ +void logFailedRetryAttempt(StringData taskDescription, const DBException&); + +/** + * A retry loop which handles errors in ErrorCategory::StaleShardVersionError. When such an error is + * encountered, the CatalogCache is marked for refresh and 'callback' is retried. When retried, + * 'callback' will trigger a refresh of the CatalogCache and block until it's done when it next + * consults the CatalogCache. + */ +template <typename F> +auto shardVersionRetry(OperationContext* opCtx, + CatalogCache* catalogCache, + NamespaceString nss, + StringData taskDescription, + F&& callbackFn) { + size_t numAttempts = 0; + auto logAndTestMaxRetries = [&numAttempts, taskDescription](auto& exception) { + if (++numAttempts <= kMaxNumStaleVersionRetries) { + logFailedRetryAttempt(taskDescription, exception); + return true; + } + exception.addContext(str::stream() + << "Exceeded maximum number of " << kMaxNumStaleVersionRetries + << " retries attempting " << taskDescription); + return false; + }; + while (true) { + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, numAttempts); + try { + return callbackFn(); + } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) { + invariant(ex->getDb() == nss.db(), + str::stream() << "StaleDbVersion error on unexpected database. Expected " + << nss.db() << ", received " << ex->getDb()); + // If the database version is stale, refresh its entry in the catalog cache. + catalogCache->onStaleDatabaseVersion(ex->getDb(), ex->getVersionReceived()); + if (!logAndTestMaxRetries(ex)) { + throw; + } + } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) { + // If the exception provides a shardId, add it to the set of shards requiring a refresh. + // If the cache currently considers the collection to be unsharded, this will trigger an + // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh. + if (auto staleInfo = e.extraInfo<StaleConfigInfo>()) { + invariant(staleInfo->getNss() == nss, + str::stream() << "StaleConfig error on unexpected namespace. Expected " + << nss << ", received " << staleInfo->getNss()); + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + opCtx, + nss, + staleInfo->getVersionWanted(), + staleInfo->getVersionReceived(), + staleInfo->getShardId()); + } else { + catalogCache->onEpochChange(nss); + } + if (!logAndTestMaxRetries(e)) { + throw; + } + } catch (ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>& e) { + if (!logAndTestMaxRetries(e)) { + throw; + } + continue; + } + } +} } // namespace sharded_agg_helpers } // namespace mongo diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp new file mode 100644 index 00000000000..b1590424fb7 --- /dev/null +++ b/src/mongo/db/pipeline/sharded_union_test.cpp @@ -0,0 +1,305 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_queue.h" +#include "mongo/db/pipeline/document_source_union_with.h" +#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" +#include "mongo/s/query/sharded_agg_test_fixture.h" +#include "mongo/s/stale_exception.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +// Use this new name to register these tests under their own unit test suite. +using ShardedUnionTest = ShardedAggTestFixture; + +TEST_F(ShardedUnionTest, RetriesSubPipelineOnNetworkError) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + + auto pipeline = Pipeline::create( + {DocumentSourceMatch::create(fromjson("{_id: 'unionResult'}"), expCtx())}, expCtx()); + auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline)); + expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor()); + auto queue = DocumentSourceQueue::create(expCtx()); + unionWith.setSource(queue.get()); + + auto expectedResult = Document{{"_id"_sd, "unionResult"_sd}}; + + auto future = launchAsync([&] { + auto next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + auto result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + }); + + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"}; + }); + + // Now schedule the response with the expected result. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + + auto pipeline = Pipeline::create( + {DocumentSourceMatch::create(fromjson("{_id: 'unionResult'}"), expCtx())}, expCtx()); + auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline)); + expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor()); + auto queue = DocumentSourceQueue::create(expCtx()); + unionWith.setSource(queue.get()); + + auto expectedResult = Document{{"_id"_sd, "unionResult"_sd}}; + + auto future = launchAsync([&] { + auto next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + auto result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + }); + + // Mock out one error response, then expect a refresh of the sharding catalog for that + // namespace, then mock out a successful response. + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + + // Mock the expected config server queries. + const OID epoch = OID::gen(); + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + ChunkVersion version(1, 0, epoch); + + ChunkType chunk1(kTestAggregateNss, + {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, + version, + {"0"}); + chunk1.setName(OID::gen()); + version.incMinor(); + + ChunkType chunk2(kTestAggregateNss, + {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, + version, + {"1"}); + chunk2.setName(OID::gen()); + version.incMinor(); + + return std::vector<BSONObj>{chunk1.toConfigBSON(), chunk2.toConfigBSON()}; + }()); + + // That error should be retried, but only the one on that shard. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequiresIt) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + auto shards = setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + + auto&& parser = AccumulationStatement::getParser("$sum"); + auto accumulatorArg = BSON("" << 1); + auto sumStatement = + parser(expCtx(), accumulatorArg.firstElement(), expCtx()->variablesParseState); + AccumulationStatement countStatement{"count", sumStatement}; + auto pipeline = Pipeline::create( + {DocumentSourceMatch::create(fromjson("{_id: {$gte: 0}}"), expCtx()), + DocumentSourceGroup::create( + expCtx(), ExpressionConstant::create(expCtx(), Value(BSONNULL)), {countStatement})}, + expCtx()); + auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline)); + expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor()); + auto queue = DocumentSourceQueue::create(expCtx()); + unionWith.setSource(queue.get()); + + auto expectedResult = Document{{"_id"_sd, BSONNULL}, {"count"_sd, 1}}; + + auto future = launchAsync([&] { + auto next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + auto result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + }); + + // With the $match at the front of the sub-pipeline, we should be able to target the request to + // just shard 1. Mock out an error response from that shard, then expect a refresh of the + // sharding catalog for that namespace. + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.target, HostAndPort(shards[1].getHost())); + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + + // Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was + // created and moved to the first shard. + const OID epoch = OID::gen(); + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + ChunkVersion version(1, 0, epoch); + + ChunkType chunk1(kTestAggregateNss, + {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, + version, + {shards[0].getName()}); + chunk1.setName(OID::gen()); + version.incMinor(); + + ChunkType chunk2(kTestAggregateNss, + {BSON("_id" << 0), BSON("_id" << 10)}, + version, + {shards[1].getName()}); + chunk2.setName(OID::gen()); + version.incMinor(); + + ChunkType chunk3(kTestAggregateNss, + {BSON("_id" << 10), shardKeyPattern.getKeyPattern().globalMax()}, + version, + {shards[0].getName()}); + chunk3.setName(OID::gen()); + + return std::vector<BSONObj>{ + chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()}; + }()); + + // That error should be retried, this time two shards. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse( + kTestAggregateNss, CursorId{0}, {BSON("_id" << BSONNULL << "count" << 1)}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse( + kTestAggregateNss, CursorId{0}, {BSON("_id" << BSONNULL << "count" << 0)}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNotRequire) { + // Sharded by {_id: 1}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1". + auto shards = setupNShards(2); + loadRoutingTableWithTwoChunksAndTwoShards(kTestAggregateNss); + + auto&& parser = AccumulationStatement::getParser("$sum"); + auto accumulatorArg = BSON("" << 1); + auto sumExpression = + parser(expCtx(), accumulatorArg.firstElement(), expCtx()->variablesParseState); + AccumulationStatement countStatement{"count", sumExpression}; + auto pipeline = Pipeline::create( + {DocumentSourceGroup::create( + expCtx(), ExpressionConstant::create(expCtx(), Value(BSONNULL)), {countStatement})}, + expCtx()); + auto unionWith = DocumentSourceUnionWith(expCtx(), std::move(pipeline)); + expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor()); + auto queue = DocumentSourceQueue::create(expCtx()); + unionWith.setSource(queue.get()); + + auto expectedResult = Document{{"_id"_sd, BSONNULL}, {"count"_sd, 1}}; + + auto future = launchAsync([&] { + auto next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + auto result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + ASSERT(unionWith.getNext().isEOF()); + }); + + // Mock out an error response from both shards, then expect a refresh of the sharding catalog + // for that namespace, then mock out a successful response. + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + onCommand([&](const executor::RemoteCommandRequest& request) { + return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + }); + + + // Mock the expected config server queries. Update the distribution so that all chunks are on + // the same shard. + const OID epoch = OID::gen(); + const ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); + expectGetCollection(kTestAggregateNss, epoch, shardKeyPattern); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + ChunkVersion version(1, 0, epoch); + + ChunkType chunk1(kTestAggregateNss, + {shardKeyPattern.getKeyPattern().globalMin(), + shardKeyPattern.getKeyPattern().globalMax()}, + version, + {shards[0].getName()}); + chunk1.setName(OID::gen()); + + return std::vector<BSONObj>{chunk1.toConfigBSON()}; + }()); + + // That error should be retried, this time targetting only one shard. + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.target, HostAndPort(shards[0].getHost())); + return CursorResponse(kTestAggregateNss, CursorId{0}, {expectedResult.toBson()}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +} // namespace +} // namespace mongo |