diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2020-04-28 13:34:51 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-28 18:40:48 +0000 |
commit | 93476f545de27ee61fd69eeab23adbff7f57b932 (patch) | |
tree | 3be7cb29759d7d6b32590d3199675a79e73324d8 /src/mongo/db/pipeline | |
parent | 49159e1cf859d21c767f6b582dd6e6b2d675808d (diff) | |
download | mongo-93476f545de27ee61fd69eeab23adbff7f57b932.tar.gz |
SERVER-46255 Use killOperations to cleanup dangling remote requests
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_union_test.cpp | 39 |
3 files changed, 30 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index b141c3b72a9..2ea208e8e7c 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -209,7 +209,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 50f928e265f..a85781f3ca1 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -183,7 +183,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { if (_executionState == ExecutionProgress::kStartingSubPipeline) { auto serializedPipe = _pipeline->serializeToBson(); LOGV2_DEBUG(23869, - 3, + 1, "$unionWith attaching cursor to pipeline {pipeline}", "pipeline"_attr = serializedPipe); try { @@ -196,7 +196,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()}, serializedPipe); LOGV2_DEBUG(4556300, - 0, + 3, "$unionWith found view definition. ns: {ns}, pipeline: {pipeline}. New " "$unionWith sub-pipeline: {new_pipe}", "ns"_attr = e->getNamespace(), diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp index 9295efb5e0a..8756cb0c119 100644 --- a/src/mongo/db/pipeline/sharded_union_test.cpp +++ b/src/mongo/db/pipeline/sharded_union_test.cpp @@ -100,10 +100,17 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) { expCtx()->opCtx->setDeadlineAfterNowBy(Milliseconds(15), ErrorCodes::MaxTimeMSExpired); auto future = launchAsync([&] { + // Expect one result from each host. auto next = unionWith.getNext(); ASSERT_TRUE(next.isAdvanced()); auto result = next.releaseDocument(); ASSERT_DOCUMENT_EQ(result, expectedResult); + + next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); ASSERT(unionWith.getNext().isEOF()); ASSERT(unionWith.getNext().isEOF()); @@ -119,6 +126,8 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) { onCommand(assertHasExpectedMaxTimeMSAndReturnResult); onCommand(assertHasExpectedMaxTimeMSAndReturnResult); + + future.default_timed_get(); } TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { @@ -148,7 +157,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. @@ -221,7 +231,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir // sharding catalog for that namespace. onCommand([&](const executor::RemoteCommandRequest& request) { ASSERT_EQ(request.target, HostAndPort(shards[1].getHost())); - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was @@ -305,13 +316,14 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo // Mock out an error response from both shards, then expect a refresh of the sharding catalog // for that namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); - // Mock the expected config server queries. Update the distribution so that all chunks are on // the same shard. const OID epoch = OID::gen(); @@ -373,14 +385,15 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ResolvedView{expectedBackingNs, - {fromjson("{$group: {_id: '$groupKey'}}"), - // Prevent the $match from being pushed into the shards where it - // would not execute in this mocked environment. - fromjson("{$_internalInhibitOptimization: {}}"), - fromjson("{$match: {_id: 'unionResult'}}")}, - BSONObj()}, - "It was a view!"_sd}; + return createErrorCursorResponse( + Status{ResolvedView{expectedBackingNs, + {fromjson("{$group: {_id: '$groupKey'}}"), + // Prevent the $match from being pushed into the shards where it + // would not execute in this mocked environment. + fromjson("{$_internalInhibitOptimization: {}}"), + fromjson("{$match: {_id: 'unionResult'}}")}, + BSONObj()}, + "It was a view!"_sd}); }); // That error should be incorporated, then we should target both shards. The results should be |