summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2020-04-28 13:34:51 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-06 14:38:38 +0000
commitcd42cb1a51f1e2a6c02759ad5fa1523b5b65faa9 (patch)
tree08912dad354cb657c88b8a5d9d6f1452ff39925e
parentd0e8908d86cc38699eb70d8e26c57abd5734e21c (diff)
downloadmongo-cd42cb1a51f1e2a6c02759ad5fa1523b5b65faa9.tar.gz
SERVER-46255 Use killOperations to cleanup dangling remote requests
(cherry picked from commit 93476f545de27ee61fd69eeab23adbff7f57b932)
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml5
-rw-r--r--jstests/aggregation/sharded_agg_cleanup_on_error.js2
-rw-r--r--jstests/noPassthrough/max_time_ms_repl_targeting.js3
-rw-r--r--jstests/sharding/lookup.js4
-rw-r--r--jstests/sharding/lookup_mongod_unaware.js7
-rw-r--r--jstests/sharding/lookup_on_shard.js5
-rw-r--r--jstests/sharding/lookup_stale_mongos.js5
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp4
-rw-r--r--src/mongo/db/pipeline/sharded_union_test.cpp39
-rw-r--r--src/mongo/s/async_requests_sender.cpp17
-rw-r--r--src/mongo/s/query/establish_cursors.cpp156
-rw-r--r--src/mongo/s/query/establish_cursors.h9
-rw-r--r--src/mongo/s/query/establish_cursors_test.cpp167
-rw-r--r--src/mongo/s/sharding_router_test_fixture.h11
15 files changed, 374 insertions, 63 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 31d15d5b8ec..8aaa63fee72 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -31,6 +31,11 @@ selector:
- jstests/sharding/retryable_write_error_labels.js
# Enable when SERVER-43310 is backported
- jstests/sharding/cluster_create_indexes_always_routes_through_primary.js
+ # These tests use the flag-guarded sharded $lookup parameter, which results in cursors being
+ # established from a shard instead of mongos. As of SERVER-46255, these requests will include the
+ # 'clientOperationKey' which is not recognizable on 'last-stable' shards.
+ - jstests/sharding/lookup_on_shard.js
+ - jstests/sharding/lookup_stale_mongos.js
executor:
config:
diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js
index 1a57fffe018..73b5757379b 100644
--- a/jstests/aggregation/sharded_agg_cleanup_on_error.js
+++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js
@@ -59,7 +59,7 @@ try {
assert.commandWorked(shard1DB.adminCommand(
{configureFailPoint: kFailPointName, mode: "alwaysOn", data: kFailpointOptions}));
- // Issue an aggregregation that will fail during a getMore on shard 0, and make sure that
+ // Issue an aggregation that will fail during a getMore on shard 0, and make sure that
// this correctly kills the hanging cursor on shard 1. Use $_internalSplitPipeline to ensure
// that this pipeline merges on mongos.
assertFailsAndCleansUpCursors({
diff --git a/jstests/noPassthrough/max_time_ms_repl_targeting.js b/jstests/noPassthrough/max_time_ms_repl_targeting.js
index de90a7a0d24..984157264a2 100644
--- a/jstests/noPassthrough/max_time_ms_repl_targeting.js
+++ b/jstests/noPassthrough/max_time_ms_repl_targeting.js
@@ -25,7 +25,8 @@ const tryFiveTimes = function(name, f) {
continue;
}
- jsTestLog(`Failed 5 times in test ${name}. There is probably a bug here.`);
+ jsTestLog(`Failed 5 times in test ${
+ name}. There is probably a bug here. Latest assertion: ${tojson(e)}`);
throw e;
}
}
diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js
index 82a8c63624b..b95dfbf3991 100644
--- a/jstests/sharding/lookup.js
+++ b/jstests/sharding/lookup.js
@@ -1,5 +1,9 @@
// Basic $lookup regression tests.
+// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being
+// established from a shard instead of mongos. As of SERVER-46255, these requests will include the
+// 'clientOperationKey' which is not recognizable on 'last-stable' shards.
+// @tags: [requires_fcv_44]
(function() {
"use strict";
diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js
index 03f12ba69a7..5d50978e032 100644
--- a/jstests/sharding/lookup_mongod_unaware.js
+++ b/jstests/sharding/lookup_mongod_unaware.js
@@ -3,8 +3,11 @@
// when it's not, and likewise when it thinks the collection is unsharded but is actually sharded.
//
// We restart a mongod to cause it to forget that a collection was sharded. When restarted, we
-// expect it to still have all the previous data.
-// @tags: [requires_persistence]
+// expect it to still have all the previous data. Also, this test uses the flag-guarded sharded
+// $lookup parameter, which results in cursors being established from a shard instead of mongos. As
+// of SERVER-46255, these requests will include the 'clientOperationKey' which is not recognizable
+// on 'last-stable' shards.
+// @tags: [requires_persistence, requires_fcv_44]
(function() {
"use strict";
diff --git a/jstests/sharding/lookup_on_shard.js b/jstests/sharding/lookup_on_shard.js
index 2dc96378fab..a5409efc0ed 100644
--- a/jstests/sharding/lookup_on_shard.js
+++ b/jstests/sharding/lookup_on_shard.js
@@ -1,4 +1,9 @@
// Test that a pipeline with a $lookup stage on a sharded foreign collection may be run on a mongod.
+
+// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being
+// established from a shard instead of mongos. As of SERVER-46255, these requests will include the
+// 'clientOperationKey' which is not recognizable on 'last-stable' shards.
+// @tags: [requires_fcv_44]
(function() {
load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // For setParameterOnAllHosts.
diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js
index 2c74af07b28..d3d56c0b160 100644
--- a/jstests/sharding/lookup_stale_mongos.js
+++ b/jstests/sharding/lookup_stale_mongos.js
@@ -2,6 +2,11 @@
// local and/or foreign collections. This includes when mongos thinks the collection is sharded
// when it's not, and likewise when mongos thinks the collection is unsharded but is actually
// sharded.
+
+// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being
+// established from a shard instead of mongos. As of SERVER-46255, these requests will include the
+// 'clientOperationKey' which is not recognizable on 'last-stable' shards.
+// @tags: [requires_fcv_44]
(function() {
"use strict";
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index b141c3b72a9..2ea208e8e7c 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -209,7 +209,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries.
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 1a8454464a9..fe3dc625a66 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -185,7 +185,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
if (_executionState == ExecutionProgress::kStartingSubPipeline) {
auto serializedPipe = _pipeline->serializeToBson();
LOGV2_DEBUG(23869,
- 3,
+ 1,
"$unionWith attaching cursor to pipeline {pipeline}",
"pipeline"_attr = serializedPipe);
try {
@@ -198,7 +198,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() {
ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()},
serializedPipe);
LOGV2_DEBUG(4556300,
- 0,
+ 3,
"$unionWith found view definition. ns: {ns}, pipeline: {pipeline}. New "
"$unionWith sub-pipeline: {new_pipe}",
"ns"_attr = e->getNamespace(),
diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp
index 9295efb5e0a..8756cb0c119 100644
--- a/src/mongo/db/pipeline/sharded_union_test.cpp
+++ b/src/mongo/db/pipeline/sharded_union_test.cpp
@@ -100,10 +100,17 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) {
expCtx()->opCtx->setDeadlineAfterNowBy(Milliseconds(15), ErrorCodes::MaxTimeMSExpired);
auto future = launchAsync([&] {
+ // Expect one result from each host.
auto next = unionWith.getNext();
ASSERT_TRUE(next.isAdvanced());
auto result = next.releaseDocument();
ASSERT_DOCUMENT_EQ(result, expectedResult);
+
+ next = unionWith.getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ result = next.releaseDocument();
+ ASSERT_DOCUMENT_EQ(result, expectedResult);
+
ASSERT(unionWith.getNext().isEOF());
ASSERT(unionWith.getNext().isEOF());
ASSERT(unionWith.getNext().isEOF());
@@ -119,6 +126,8 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) {
onCommand(assertHasExpectedMaxTimeMSAndReturnResult);
onCommand(assertHasExpectedMaxTimeMSAndReturnResult);
+
+ future.default_timed_get();
}
TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
@@ -148,7 +157,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) {
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries.
@@ -221,7 +231,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir
// sharding catalog for that namespace.
onCommand([&](const executor::RemoteCommandRequest& request) {
ASSERT_EQ(request.target, HostAndPort(shards[1].getHost()));
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
// Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was
@@ -305,13 +316,14 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo
// Mock out an error response from both shards, then expect a refresh of the sharding catalog
// for that namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"};
+ return createErrorCursorResponse(
+ Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"});
});
-
// Mock the expected config server queries. Update the distribution so that all chunks are on
// the same shard.
const OID epoch = OID::gen();
@@ -373,14 +385,15 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv
// Mock out one error response, then expect a refresh of the sharding catalog for that
// namespace, then mock out a successful response.
onCommand([&](const executor::RemoteCommandRequest& request) {
- return Status{ResolvedView{expectedBackingNs,
- {fromjson("{$group: {_id: '$groupKey'}}"),
- // Prevent the $match from being pushed into the shards where it
- // would not execute in this mocked environment.
- fromjson("{$_internalInhibitOptimization: {}}"),
- fromjson("{$match: {_id: 'unionResult'}}")},
- BSONObj()},
- "It was a view!"_sd};
+ return createErrorCursorResponse(
+ Status{ResolvedView{expectedBackingNs,
+ {fromjson("{$group: {_id: '$groupKey'}}"),
+ // Prevent the $match from being pushed into the shards where it
+ // would not execute in this mocked environment.
+ fromjson("{$_internalInhibitOptimization: {}}"),
+ fromjson("{$match: {_id: 'unionResult'}}")},
+ BSONObj()},
+ "It was a view!"_sd});
});
// That error should be incorporated, then we should target both shards. The results should be
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 65e1b80592e..4ee7bf893c7 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -57,6 +57,8 @@ namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
const int kMaxNumFailedHostRetryAttempts = 3;
+MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingRemoteCommand);
+
} // namespace
AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
@@ -182,6 +184,21 @@ SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveSha
auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts)
-> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
+ hangBeforeSchedulingRemoteCommand.executeIf(
+ [&](const BSONObj& data) {
+ while (MONGO_unlikely(hangBeforeSchedulingRemoteCommand.shouldFail())) {
+ LOGV2(4625505,
+ "Hanging in ARS due to "
+ "'hangBeforeSchedulingRemoteCommand' failpoint");
+ sleepmillis(100);
+ }
+ },
+ [&](const BSONObj& data) {
+ return MONGO_unlikely(std::count(hostAndPorts.begin(),
+ hostAndPorts.end(),
+ HostAndPort(data.getStringField("hostAndPort"))));
+ });
+
auto hedgeOptions = extractHedgeOptions(_cmdObj, _ars->_readPreference);
executor::RemoteCommandRequestOnAny request(std::move(hostAndPorts),
_ars->_db,
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index d72f0493a55..8217de1e2ec 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -33,21 +33,65 @@
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/client/connpool.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/query/cursor_response.h"
-#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/killcursors_request.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/scopeguard.h"
namespace mongo {
+namespace {
+
+void killOpOnShards(std::shared_ptr<executor::TaskExecutor> executor,
+ const NamespaceString& nss,
+ std::vector<HostAndPort> remotes,
+ const ReadPreferenceSetting& readPref,
+ UUID opKey) noexcept {
+ try {
+ ThreadClient tc("establishCursors cleanup", getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
+
+ for (auto&& host : remotes) {
+ executor::RemoteCommandRequest request(
+ host,
+ "admin",
+ BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)),
+ opCtx.get(),
+ executor::RemoteCommandRequestBase::kNoTimeout,
+ boost::none,
+ executor::RemoteCommandRequestBase::FireAndForgetMode::kOn);
+
+ // We do not process the response to the killOperations request (we make a good-faith
+ // attempt at cleaning up the cursors, but ignore any returned errors).
+ uassertStatusOK(executor->scheduleRemoteCommand(request, [&](auto const& args) {
+ if (!args.response.isOK()) {
+ LOGV2_DEBUG(4625504,
+ 2,
+ "killOperations for {remote} failed with {status}",
+ "remote"_attr = host.toString(),
+ "error"_attr = args.response);
+ return;
+ }
+ }));
+ }
+ } catch (const AssertionException& ex) {
+ LOGV2_DEBUG(4625503,
+ 2,
+ "Failed to cleanup remote operations: {error}",
+ "error"_attr = ex.toStatus());
+ }
+}
+
+} // namespace
+
std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
@@ -57,25 +101,49 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
+
+ // Generate an OperationKey to attach to each remote request. This will allow us to kill any
+ // outstanding requests in case we're interrupted or one of the remotes returns an error. Note
+ // that although the opCtx may have an OperationKey set on it already, do not inherit it here
+ // because we may target ourselves which implies the same node receiving multiple operations
+ // with the same opKey.
+ // TODO SERVER-47261 management of the opKey should move to the ARS.
+ auto opKey = UUID::gen();
for (const auto& remote : remotes) {
- requests.emplace_back(remote.first, remote.second);
+ BSONObjBuilder requestWithOpKey(remote.second);
+ opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey");
+ requests.emplace_back(remote.first, requestWithOpKey.obj());
}
+ LOGV2_DEBUG(4625502,
+ 3,
+ "Establishing cursors on {opId} for {nRemotes} remotes with operation key {opKey}",
+ "opId"_attr = opCtx->getOpID(),
+ "nRemotes"_attr = remotes.size(),
+ "opKey"_attr = opKey);
+
// Send the requests
MultiStatementTransactionRequestsSender ars(
opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy);
std::vector<RemoteCursor> remoteCursors;
+
+ // Keep track of any remotes which may have an open cursor.
+ std::vector<HostAndPort> remotesToClean;
+
try {
// Get the responses
while (!ars.done()) {
auto response = ars.next();
+
try {
+ if (response.shardHostAndPort)
+ remotesToClean.push_back(*response.shardHostAndPort);
+
// Note the shardHostAndPort may not be populated if there was an error, so be sure
// to do this after parsing the cursor response to ensure the response was ok.
// Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor, since the error handling path will attempt to clean up
- // anything in 'remoteCursors'
+ // have a valid cursor.
auto cursors = CursorResponse::parseFromBSONMany(
uassertStatusOK(std::move(response.swResponse)).data);
@@ -86,6 +154,11 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
remoteCursor.setShardId(std::move(response.shardId));
remoteCursor.setHostAndPort(*response.shardHostAndPort);
remoteCursors.push_back(std::move(remoteCursor));
+ } else {
+ // Remote responded with a failure, do not attempt to clean up.
+ remotesToClean.erase(std::remove(remotesToClean.begin(),
+ remotesToClean.end(),
+ *response.shardHostAndPort));
}
}
@@ -93,12 +166,12 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
for (auto& cursor : cursors) {
uassertStatusOK(cursor.getStatus());
}
-
} catch (const AssertionException& ex) {
// Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard
// replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
bool isEligibleException = (isMongosRetriableError(ex.code()) ||
ex.code() == ErrorCodes::FailedToSatisfyReadPreference);
+
// Fail if the exception is something other than a retriable or read preference
// error, or if the 'allowPartialResults' query parameter was not enabled.
if (!allowPartialResults || !isEligibleException) {
@@ -111,10 +184,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
}
return remoteCursors;
- } catch (const DBException&) {
+ } catch (const DBException& ex) {
// If one of the remotes had an error, we make a best effort to finish retrieving responses
- // for other requests that were already sent, so that we can send killCursors to any cursors
- // that we know were established.
+ // for other requests that were already sent.
try {
// Do not schedule any new requests.
ars.stopRetrying();
@@ -123,23 +195,52 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
while (!ars.done()) {
auto response = ars.next();
- // Check if the response contains an established cursor, and if so, store it.
- StatusWith<CursorResponse> swCursorResponse(
- response.swResponse.isOK()
- ? CursorResponse::parseFromBSON(response.swResponse.getValue().data)
- : response.swResponse.getStatus());
-
- if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
+ if (response.shardHostAndPort)
+ remotesToClean.push_back(*response.shardHostAndPort);
+
+ if (response.swResponse.isOK()) {
+ // Check if the response contains an established cursor, and if so, store it.
+ StatusWith<CursorResponse> swCursorResponse =
+ CursorResponse::parseFromBSON(response.swResponse.getValue().data);
+
+ if (swCursorResponse.isOK()) {
+ RemoteCursor cursor;
+ cursor.setShardId(std::move(response.shardId));
+ cursor.setHostAndPort(*response.shardHostAndPort);
+ cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
+ remoteCursors.push_back(std::move(cursor));
+ } else {
+ // Remote responded with a failure, do not attempt to clean up.
+ remotesToClean.erase(std::remove(remotesToClean.begin(),
+ remotesToClean.end(),
+ *response.shardHostAndPort));
+ }
}
}
- // Schedule killCursors against all cursors that were established.
- killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss);
+ LOGV2(4625501,
+ "ARS failed with {status}, attempting to clean up {nRemotes} remote operations",
+ "status"_attr = ex.toStatus(),
+ "nRemotes"_attr = remotesToClean.size());
+
+ // Check whether we have any remote operations to kill.
+ if (remotesToClean.size() > 0) {
+ // Schedule killOperations against all cursors that were established. Make sure to
+ // capture arguments by value since the cleanup work may get scheduled after
+ // returning from this function.
+ MONGO_COMPILER_VARIABLE_UNUSED auto cbHandle = uassertStatusOK(
+ executor->scheduleWork([executor,
+ nss,
+ readPref,
+ remotesToClean{std::move(remotesToClean)},
+ opKey{std::move(opKey)}](
+ const executor::TaskExecutor::CallbackArgs& args) {
+ uassertStatusOKWithContext(args.status,
+ "Failed to schedule remote cursor cleanup");
+ killOpOnShards(
+ executor, nss, std::move(remotesToClean), readPref, std::move(opKey));
+ }));
+ }
} catch (const DBException&) {
// Ignore the new error and rethrow the original one.
}
@@ -148,15 +249,6 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
}
-void killRemoteCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- std::vector<RemoteCursor>&& remoteCursors,
- const NamespaceString& nss) {
- for (auto&& remoteCursor : remoteCursors) {
- killRemoteCursor(opCtx, executor, std::move(remoteCursor), nss);
- }
-}
-
void killRemoteCursor(OperationContext* opCtx,
executor::TaskExecutor* executor,
RemoteCursor&& cursor,
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index 95f6e7ae9d0..729a42edd19 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -73,16 +73,11 @@ std::vector<RemoteCursor> establishCursors(
Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent);
/**
- * Schedules a remote killCursor command for each of the cursors in 'remoteCursors'.
+ * Schedules a remote killCursor command for 'cursor'.
*
* Note that this method is optimistic and does not check the return status for the killCursors
- * commands.
+ * command.
*/
-void killRemoteCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- std::vector<RemoteCursor>&& remoteCursors,
- const NamespaceString& nss);
-
void killRemoteCursor(OperationContext* opCtx,
executor::TaskExecutor* executor,
RemoteCursor&& cursor,
diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp
index 389417a76ca..cedd53effa9 100644
--- a/src/mongo/s/query/establish_cursors_test.cpp
+++ b/src/mongo/s/query/establish_cursors_test.cpp
@@ -85,6 +85,19 @@ public:
setupShards(shards);
}
+ /**
+ * Mock a response for a killOperations command.
+ */
+ void expectKillOperations(size_t expected) {
+ for (size_t i = 0; i < expected; i++) {
+ onCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_EQ("admin", request.dbname) << request;
+ ASSERT_TRUE(request.cmdObj.hasField("_killOperations")) << request;
+ return BSON("ok" << 1);
+ });
+ }
+ }
+
protected:
const NamespaceString _nss;
};
@@ -145,11 +158,78 @@ TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableError) {
// Remote responds with non-retriable error.
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
- return Status(ErrorCodes::FailedToParse, "failed to parse");
+ return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse"));
});
future.default_timed_get();
}
+TEST_F(EstablishCursorsTest, SingleRemoteInterruptedBeforeCommandSent) {
+ BSONObj cmdObj = fromjson("{find: 'testcoll'}");
+ std::vector<std::pair<ShardId, BSONObj>> remotes{
+ {kTestShardIds[0], cmdObj},
+ };
+
+ auto future = launchAsync([&] {
+ ASSERT_THROWS(establishCursors(operationContext(),
+ executor(),
+ _nss,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ remotes,
+ false), // allowPartialResults
+ ExceptionFor<ErrorCodes::CursorKilled>);
+ });
+
+ // Now interrupt the opCtx which the cursor is running under.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->getServiceContext()->killOperation(
+ lk, operationContext(), ErrorCodes::CursorKilled);
+ }
+
+ future.default_timed_get();
+}
+
+TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) {
+ BSONObj cmdObj = fromjson("{find: 'testcoll'}");
+ std::vector<std::pair<ShardId, BSONObj>> remotes{
+ {kTestShardIds[0], cmdObj},
+ };
+
+ // Hang before sending the command but after resolving the host to send it to.
+ auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand");
+ invariant(fp);
+ fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[0].toString()));
+
+ auto future = launchAsync([&] {
+ ASSERT_THROWS(establishCursors(operationContext(),
+ executor(),
+ _nss,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ remotes,
+ false), // allowPartialResults
+ ExceptionFor<ErrorCodes::CursorKilled>);
+ });
+
+ // Verify that the failpoint is hit.
+ fp->waitForTimesEntered(2ULL);
+
+ // Now interrupt the opCtx which the cursor is running under.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->getServiceContext()->killOperation(
+ lk, operationContext(), ErrorCodes::CursorKilled);
+ }
+
+ // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a
+ // killOperations for the two remotes.
+ fp->setMode(FailPoint::off);
+
+ // Expect a killOperation for the outstanding remote request.
+ expectKillOperations(1);
+
+ future.default_timed_get();
+}
+
TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableErrorAllowPartialResults) {
BSONObj cmdObj = fromjson("{find: 'testcoll'}");
std::vector<std::pair<ShardId, BSONObj>> remotes{{kTestShardIds[0], cmdObj}};
@@ -168,7 +248,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableErrorAllowParti
// Remote responds with non-retriable error.
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
- return Status(ErrorCodes::FailedToParse, "failed to parse");
+ return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse"));
});
future.default_timed_get();
}
@@ -259,6 +339,10 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrors) {
return Status(ErrorCodes::HostUnreachable, "host unreachable");
});
}
+
+ // Expect a killOperations for the remote which was not reachable.
+ expectKillOperations(1);
+
future.default_timed_get();
}
@@ -291,6 +375,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrorsAllowPartialResu
return Status(ErrorCodes::HostUnreachable, "host unreachable");
});
}
+
future.default_timed_get();
}
@@ -350,7 +435,7 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteRespondsWithNonretriableErr
// Second remote responds with a non-retriable error.
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
- return Status(ErrorCodes::FailedToParse, "failed to parse");
+ return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse"));
});
// Third remote responds with success (must give some response to mock network for each remote).
@@ -362,6 +447,9 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteRespondsWithNonretriableErr
return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
});
+ // Expect two killOperation commands, one for each remote which responded with a cursor.
+ expectKillOperations(2);
+
future.default_timed_get();
}
@@ -394,7 +482,7 @@ TEST_F(EstablishCursorsTest,
// Second remote responds with a non-retriable error.
onCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
- return Status(ErrorCodes::FailedToParse, "failed to parse");
+ return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse"));
});
// Third remote responds with success (must give some response to mock network for each remote).
@@ -406,6 +494,9 @@ TEST_F(EstablishCursorsTest,
return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
});
+ // Expect two killOperation commands, one for each remote which responded with a cursor.
+ expectKillOperations(2);
+
future.default_timed_get();
}
@@ -559,6 +650,9 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrors) {
});
}
+ // Expect two killOperation commands, one for each remote which responded with a cursor.
+ expectKillOperations(2);
+
future.default_timed_get();
}
@@ -622,6 +716,71 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo
future.default_timed_get();
}
+TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) {
+ BSONObj cmdObj = fromjson("{find: 'testcoll'}");
+ std::vector<std::pair<ShardId, BSONObj>> remotes{
+ {kTestShardIds[0], cmdObj},
+ {kTestShardIds[1], cmdObj},
+ };
+
+ // Hang before sending the command to shard 1.
+ auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand");
+ invariant(fp);
+ fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[1].toString()));
+
+ auto future = launchAsync([&] {
+ ASSERT_THROWS(establishCursors(operationContext(),
+ executor(),
+ _nss,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ remotes,
+ false), // allowPartialResults
+ ExceptionFor<ErrorCodes::CursorKilled>);
+ });
+
+ // Verify that the failpoint is hit.
+ fp->waitForTimesEntered(5ULL);
+
+ // Mark the OperationContext as killed.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->getServiceContext()->killOperation(
+ lk, operationContext(), ErrorCodes::CursorKilled);
+ }
+
+ // First remote responds.
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
+
+ CursorResponse cursorResponse(_nss, CursorId(123), {});
+ return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a
+ // killOperations for the two remotes.
+ fp->setMode(FailPoint::off);
+
+ // The second remote operation may be in flight before the killOperations cleanup, so relax the
+ // assertions on the mocked responses.
+ auto killsReceived = 0;
+ while (killsReceived < 2) {
+ onCommand([&](const RemoteCommandRequest& request) {
+ if (request.dbname == "admin" && request.cmdObj.hasField("_killOperations")) {
+ killsReceived++;
+ return BSON("ok" << 1);
+ }
+
+ // Its not a killOperations, so expect a normal remote command.
+ ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
+
+ CursorResponse cursorResponse(_nss, CursorId(123), {});
+ return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ }
+
+ future.default_timed_get();
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h
index 6f3ca156e12..84a9b7c5dc3 100644
--- a/src/mongo/s/sharding_router_test_fixture.h
+++ b/src/mongo/s/sharding_router_test_fixture.h
@@ -171,6 +171,17 @@ public:
const Timestamp& expectedTS,
long long expectedTerm) const;
+ /**
+ * Mocks an error cursor response from a remote with the given 'status'.
+ */
+ BSONObj createErrorCursorResponse(Status status) {
+ invariant(!status.isOK());
+ BSONObjBuilder result;
+ status.serializeErrorToBSON(&result);
+ result.appendBool("ok", false);
+ return result.obj();
+ }
+
private:
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;