summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2020-04-28 13:34:51 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 18:40:48 +0000
commit93476f545de27ee61fd69eeab23adbff7f57b932 (patch)
tree3be7cb29759d7d6b32590d3199675a79e73324d8 /src/mongo/db/pipeline
parent49159e1cf859d21c767f6b582dd6e6b2d675808d (diff)
downloadmongo-93476f545de27ee61fd69eeab23adbff7f57b932.tar.gz
SERVER-46255 Use killOperations to cleanup dangling remote requests
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp4
-rw-r--r--src/mongo/db/pipeline/sharded_union_test.cpp39
3 files changed, 30 insertions, 16 deletions
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index b141c3b72a9..2ea208e8e7c 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -209,7 +209,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries.
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 50f928e265f..a85781f3ca1 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -183,7 +183,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
if (_executionState == ExecutionProgress::kStartingSubPipeline) {
auto serializedPipe = _pipeline->serializeToBson();
LOGV2_DEBUG(23869,
- 3,
+ 1,
"$unionWith attaching cursor to pipeline {pipeline}",
"pipeline"_attr = serializedPipe);
try {
@@ -196,7 +196,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()},
serializedPipe);
LOGV2_DEBUG(4556300,
- 0,
+ 3,
"$unionWith found view definition. ns: {ns}, pipeline: {pipeline}. New "
"$unionWith sub-pipeline: {new_pipe}",
"ns"_attr = e->getNamespace(),
diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp
index 9295efb5e0a..8756cb0c119 100644
--- a/src/mongo/db/pipeline/sharded_union_test.cpp
+++ b/src/mongo/db/pipeline/sharded_union_test.cpp
@@ -100,10 +100,17 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) {
expCtx()->opCtx->setDeadlineAfterNowBy(Milliseconds(15), ErrorCodes::MaxTimeMSExpired);
auto future = launchAsync([&] {
+ // Expect one result from each host.
auto next = unionWith.getNext();
ASSERT_TRUE(next.isAdvanced());
auto result = next.releaseDocument();
ASSERT_DOCUMENT_EQ(result, expectedResult);
+
+ next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+
ASSERT(unionWith.getNext().isEOF());
ASSERT(unionWith.getNext().isEOF());
ASSERT(unionWith.getNext().isEOF());
@@ -119,6 +126,8 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) {
onCommand(assertHasExpectedMaxTimeMSAndReturnResult);
onCommand(assertHasExpectedMaxTimeMSAndReturnResult);
+
+ future.default_timed_get();
}
TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
@@ -148,7 +157,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries.
@@ -221,7 +231,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir
// sharding catalog for that namespace.
onCommand([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(request.target, HostAndPort(shards[1].getHost()));
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was
@@ -305,13 +316,14 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo
// Mock out an error response from both shards, then expect a refresh of the sharding catalog
// for that namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
-
// Mock the expected config server queries. Update the distribution so that all chunks are on
// the same shard.
const OID epoch = OID::gen();
@@ -373,14 +385,15 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ResolvedView{expectedBackingNs,
- {fromjson("{$group: {_id: '$groupKey'}}"),
- // Prevent the $match from being pushed into the shards where it
- // would not execute in this mocked environment.
- fromjson("{$_internalInhibitOptimization: {}}"),
- fromjson("{$match: {_id: 'unionResult'}}")},
- BSONObj()},
- "It was a view!"_sd};
+ return createErrorCursorResponse(
+ Status{ResolvedView{expectedBackingNs,
+ {fromjson("{$group: {_id: '$groupKey'}}"),
+ // Prevent the $match from being pushed into the shards where it
+ // would not execute in this mocked environment.
+ fromjson("{$_internalInhibitOptimization: {}}"),
+ fromjson("{$match: {_id: 'unionResult'}}")},
+ BSONObj()},
+ "It was a view!"_sd});
});
// That error should be incorporated, then we should target both shards. The results should be