diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2020-04-28 13:34:51 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-06 14:38:38 +0000 |
commit | cd42cb1a51f1e2a6c02759ad5fa1523b5b65faa9 (patch) | |
tree | 08912dad354cb657c88b8a5d9d6f1452ff39925e | |
parent | d0e8908d86cc38699eb70d8e26c57abd5734e21c (diff) | |
download | mongo-cd42cb1a51f1e2a6c02759ad5fa1523b5b65faa9.tar.gz |
SERVER-46255 Use killOperations to cleanup dangling remote requests
(cherry picked from commit 93476f545de27ee61fd69eeab23adbff7f57b932)
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml | 5 | ||||
-rw-r--r-- | jstests/aggregation/sharded_agg_cleanup_on_error.js | 2 | ||||
-rw-r--r-- | jstests/noPassthrough/max_time_ms_repl_targeting.js | 3 | ||||
-rw-r--r-- | jstests/sharding/lookup.js | 4 | ||||
-rw-r--r-- | jstests/sharding/lookup_mongod_unaware.js | 7 | ||||
-rw-r--r-- | jstests/sharding/lookup_on_shard.js | 5 | ||||
-rw-r--r-- | jstests/sharding/lookup_stale_mongos.js | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_union_test.cpp | 39 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 17 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 156 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.h | 9 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors_test.cpp | 167 | ||||
-rw-r--r-- | src/mongo/s/sharding_router_test_fixture.h | 11 |
15 files changed, 374 insertions, 63 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 31d15d5b8ec..8aaa63fee72 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -31,6 +31,11 @@ selector: - jstests/sharding/retryable_write_error_labels.js # Enable when SERVER-43310 is backported - jstests/sharding/cluster_create_indexes_always_routes_through_primary.js + # These tests use the flag-guarded sharded $lookup parameter, which results in cursors being + # established from a shard instead of mongos. As of SERVER-46255, these requests will include the + # 'clientOperationKey' which is not recognizable on 'last-stable' shards. + - jstests/sharding/lookup_on_shard.js + - jstests/sharding/lookup_stale_mongos.js executor: config: diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js index 1a57fffe018..73b5757379b 100644 --- a/jstests/aggregation/sharded_agg_cleanup_on_error.js +++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js @@ -59,7 +59,7 @@ try { assert.commandWorked(shard1DB.adminCommand( {configureFailPoint: kFailPointName, mode: "alwaysOn", data: kFailpointOptions})); - // Issue an aggregregation that will fail during a getMore on shard 0, and make sure that + // Issue an aggregation that will fail during a getMore on shard 0, and make sure that // this correctly kills the hanging cursor on shard 1. Use $_internalSplitPipeline to ensure // that this pipeline merges on mongos. assertFailsAndCleansUpCursors({ diff --git a/jstests/noPassthrough/max_time_ms_repl_targeting.js b/jstests/noPassthrough/max_time_ms_repl_targeting.js index de90a7a0d24..984157264a2 100644 --- a/jstests/noPassthrough/max_time_ms_repl_targeting.js +++ b/jstests/noPassthrough/max_time_ms_repl_targeting.js @@ -25,7 +25,8 @@ const tryFiveTimes = function(name, f) { continue; } - jsTestLog(`Failed 5 times in test ${name}. There is probably a bug here.`); + jsTestLog(`Failed 5 times in test ${ + name}. There is probably a bug here. Latest assertion: ${tojson(e)}`); throw e; } } diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js index 82a8c63624b..b95dfbf3991 100644 --- a/jstests/sharding/lookup.js +++ b/jstests/sharding/lookup.js @@ -1,5 +1,9 @@ // Basic $lookup regression tests. +// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being +// established from a shard instead of mongos. As of SERVER-46255, these requests will include the +// 'clientOperationKey' which is not recognizable on 'last-stable' shards. +// @tags: [requires_fcv_44] (function() { "use strict"; diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js index 03f12ba69a7..5d50978e032 100644 --- a/jstests/sharding/lookup_mongod_unaware.js +++ b/jstests/sharding/lookup_mongod_unaware.js @@ -3,8 +3,11 @@ // when it's not, and likewise when it thinks the collection is unsharded but is actually sharded. // // We restart a mongod to cause it to forget that a collection was sharded. When restarted, we -// expect it to still have all the previous data. -// @tags: [requires_persistence] +// expect it to still have all the previous data. Also, this test uses the flag-guarded sharded +// $lookup parameter, which results in cursors being established from a shard instead of mongos. As +// of SERVER-46255, these requests will include the 'clientOperationKey' which is not recognizable +// on 'last-stable' shards. +// @tags: [requires_persistence, requires_fcv_44] (function() { "use strict"; diff --git a/jstests/sharding/lookup_on_shard.js b/jstests/sharding/lookup_on_shard.js index 2dc96378fab..a5409efc0ed 100644 --- a/jstests/sharding/lookup_on_shard.js +++ b/jstests/sharding/lookup_on_shard.js @@ -1,4 +1,9 @@ // Test that a pipeline with a $lookup stage on a sharded foreign collection may be run on a mongod. + +// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being +// established from a shard instead of mongos. As of SERVER-46255, these requests will include the +// 'clientOperationKey' which is not recognizable on 'last-stable' shards. +// @tags: [requires_fcv_44] (function() { load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // For setParameterOnAllHosts. diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js index 2c74af07b28..d3d56c0b160 100644 --- a/jstests/sharding/lookup_stale_mongos.js +++ b/jstests/sharding/lookup_stale_mongos.js @@ -2,6 +2,11 @@ // local and/or foreign collections. This includes when mongos thinks the collection is sharded // when it's not, and likewise when mongos thinks the collection is unsharded but is actually // sharded. + +// This test uses the flag-guarded sharded $lookup parameter, which results in cursors being +// established from a shard instead of mongos. As of SERVER-46255, these requests will include the +// 'clientOperationKey' which is not recognizable on 'last-stable' shards. +// @tags: [requires_fcv_44] (function() { "use strict"; diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index b141c3b72a9..2ea208e8e7c 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -209,7 +209,8 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 1a8454464a9..fe3dc625a66 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -185,7 +185,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { if (_executionState == ExecutionProgress::kStartingSubPipeline) { auto serializedPipe = _pipeline->serializeToBson(); LOGV2_DEBUG(23869, - 3, + 1, "$unionWith attaching cursor to pipeline {pipeline}", "pipeline"_attr = serializedPipe); try { @@ -198,7 +198,7 @@ DocumentSource::GetNextResult DocumentSourceUnionWith::doGetNext() { ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()}, serializedPipe); LOGV2_DEBUG(4556300, - 0, + 3, "$unionWith found view definition. ns: {ns}, pipeline: {pipeline}. New " "$unionWith sub-pipeline: {new_pipe}", "ns"_attr = e->getNamespace(), diff --git a/src/mongo/db/pipeline/sharded_union_test.cpp b/src/mongo/db/pipeline/sharded_union_test.cpp index 9295efb5e0a..8756cb0c119 100644 --- a/src/mongo/db/pipeline/sharded_union_test.cpp +++ b/src/mongo/db/pipeline/sharded_union_test.cpp @@ -100,10 +100,17 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) { expCtx()->opCtx->setDeadlineAfterNowBy(Milliseconds(15), ErrorCodes::MaxTimeMSExpired); auto future = launchAsync([&] { + // Expect one result from each host. auto next = unionWith.getNext(); ASSERT_TRUE(next.isAdvanced()); auto result = next.releaseDocument(); ASSERT_DOCUMENT_EQ(result, expectedResult); + + next = unionWith.getNext(); + ASSERT_TRUE(next.isAdvanced()); + result = next.releaseDocument(); + ASSERT_DOCUMENT_EQ(result, expectedResult); + ASSERT(unionWith.getNext().isEOF()); ASSERT(unionWith.getNext().isEOF()); ASSERT(unionWith.getNext().isEOF()); @@ -119,6 +126,8 @@ TEST_F(ShardedUnionTest, ForwardsMaxTimeMSToRemotes) { onCommand(assertHasExpectedMaxTimeMSAndReturnResult); onCommand(assertHasExpectedMaxTimeMSAndReturnResult); + + future.default_timed_get(); } TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { @@ -148,7 +157,8 @@ TEST_F(ShardedUnionTest, RetriesSubPipelineOnStaleConfigError) { // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. @@ -221,7 +231,8 @@ TEST_F(ShardedUnionTest, CorrectlySplitsSubPipelineIfRefreshedDistributionRequir // sharding catalog for that namespace. onCommand([&](const executor::RemoteCommandRequest& request) { ASSERT_EQ(request.target, HostAndPort(shards[1].getHost())); - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); // Mock the expected config server queries. Update the distribution as if a chunk [0, 10] was @@ -305,13 +316,14 @@ TEST_F(ShardedUnionTest, AvoidsSplittingSubPipelineIfRefreshedDistributionDoesNo // Mock out an error response from both shards, then expect a refresh of the sharding catalog // for that namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}; + return createErrorCursorResponse( + Status{ErrorCodes::StaleShardVersion, "Mock error: shard version mismatch"}); }); - // Mock the expected config server queries. Update the distribution so that all chunks are on // the same shard. const OID epoch = OID::gen(); @@ -373,14 +385,15 @@ TEST_F(ShardedUnionTest, IncorporatesViewDefinitionAndRetriesWhenViewErrorReceiv // Mock out one error response, then expect a refresh of the sharding catalog for that // namespace, then mock out a successful response. onCommand([&](const executor::RemoteCommandRequest& request) { - return Status{ResolvedView{expectedBackingNs, - {fromjson("{$group: {_id: '$groupKey'}}"), - // Prevent the $match from being pushed into the shards where it - // would not execute in this mocked environment. - fromjson("{$_internalInhibitOptimization: {}}"), - fromjson("{$match: {_id: 'unionResult'}}")}, - BSONObj()}, - "It was a view!"_sd}; + return createErrorCursorResponse( + Status{ResolvedView{expectedBackingNs, + {fromjson("{$group: {_id: '$groupKey'}}"), + // Prevent the $match from being pushed into the shards where it + // would not execute in this mocked environment. + fromjson("{$_internalInhibitOptimization: {}}"), + fromjson("{$match: {_id: 'unionResult'}}")}, + BSONObj()}, + "It was a view!"_sd}); }); // That error should be incorporated, then we should target both shards. The results should be diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 65e1b80592e..4ee7bf893c7 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -57,6 +57,8 @@ namespace { // Maximum number of retries for network and replication notMaster errors (per host). const int kMaxNumFailedHostRetryAttempts = 3; +MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingRemoteCommand); + } // namespace AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, @@ -182,6 +184,21 @@ SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveSha auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts) -> SemiFuture<RemoteCommandOnAnyCallbackArgs> { + hangBeforeSchedulingRemoteCommand.executeIf( + [&](const BSONObj& data) { + while (MONGO_unlikely(hangBeforeSchedulingRemoteCommand.shouldFail())) { + LOGV2(4625505, + "Hanging in ARS due to " + "'hangBeforeSchedulingRemoteCommand' failpoint"); + sleepmillis(100); + } + }, + [&](const BSONObj& data) { + return MONGO_unlikely(std::count(hostAndPorts.begin(), + hostAndPorts.end(), + HostAndPort(data.getStringField("hostAndPort")))); + }); + auto hedgeOptions = extractHedgeOptions(_cmdObj, _ars->_readPreference); executor::RemoteCommandRequestOnAny request(std::move(hostAndPorts), _ars->_db, diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index d72f0493a55..8217de1e2ec 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -33,21 +33,65 @@ #include "mongo/s/query/establish_cursors.h" +#include "mongo/client/connpool.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/cursor_id.h" #include "mongo/db/query/cursor_response.h" -#include "mongo/db/query/getmore_request.h" #include "mongo/db/query/killcursors_request.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" +#include "mongo/logv2/log.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/util/assert_util.h" -#include "mongo/util/scopeguard.h" namespace mongo { +namespace { + +void killOpOnShards(std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + std::vector<HostAndPort> remotes, + const ReadPreferenceSetting& readPref, + UUID opKey) noexcept { + try { + ThreadClient tc("establishCursors cleanup", getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); + + for (auto&& host : remotes) { + executor::RemoteCommandRequest request( + host, + "admin", + BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)), + opCtx.get(), + executor::RemoteCommandRequestBase::kNoTimeout, + boost::none, + executor::RemoteCommandRequestBase::FireAndForgetMode::kOn); + + // We do not process the response to the killOperations request (we make a good-faith + // attempt at cleaning up the cursors, but ignore any returned errors). + uassertStatusOK(executor->scheduleRemoteCommand(request, [&](auto const& args) { + if (!args.response.isOK()) { + LOGV2_DEBUG(4625504, + 2, + "killOperations for {remote} failed with {status}", + "remote"_attr = host.toString(), + "error"_attr = args.response); + return; + } + })); + } + } catch (const AssertionException& ex) { + LOGV2_DEBUG(4625503, + 2, + "Failed to cleanup remote operations: {error}", + "error"_attr = ex.toStatus()); + } +} + +} // namespace + std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, const NamespaceString& nss, @@ -57,25 +101,49 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; + + // Generate an OperationKey to attach to each remote request. This will allow us to kill any + // outstanding requests in case we're interrupted or one of the remotes returns an error. Note + // that although the opCtx may have an OperationKey set on it already, do not inherit it here + // because we may target ourselves which implies the same node receiving multiple operations + // with the same opKey. + // TODO SERVER-47261 management of the opKey should move to the ARS. + auto opKey = UUID::gen(); for (const auto& remote : remotes) { - requests.emplace_back(remote.first, remote.second); + BSONObjBuilder requestWithOpKey(remote.second); + opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey"); + requests.emplace_back(remote.first, requestWithOpKey.obj()); } + LOGV2_DEBUG(4625502, + 3, + "Establishing cursors on {opId} for {nRemotes} remotes with operation key {opKey}", + "opId"_attr = opCtx->getOpID(), + "nRemotes"_attr = remotes.size(), + "opKey"_attr = opKey); + // Send the requests MultiStatementTransactionRequestsSender ars( opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy); std::vector<RemoteCursor> remoteCursors; + + // Keep track of any remotes which may have an open cursor. + std::vector<HostAndPort> remotesToClean; + try { // Get the responses while (!ars.done()) { auto response = ars.next(); + try { + if (response.shardHostAndPort) + remotesToClean.push_back(*response.shardHostAndPort); + // Note the shardHostAndPort may not be populated if there was an error, so be sure // to do this after parsing the cursor response to ensure the response was ok. // Additionally, be careful not to push into 'remoteCursors' until we are sure we - // have a valid cursor, since the error handling path will attempt to clean up - // anything in 'remoteCursors' + // have a valid cursor. auto cursors = CursorResponse::parseFromBSONMany( uassertStatusOK(std::move(response.swResponse)).data); @@ -86,6 +154,11 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, remoteCursor.setShardId(std::move(response.shardId)); remoteCursor.setHostAndPort(*response.shardHostAndPort); remoteCursors.push_back(std::move(remoteCursor)); + } else { + // Remote responded with a failure, do not attempt to clean up. + remotesToClean.erase(std::remove(remotesToClean.begin(), + remotesToClean.end(), + *response.shardHostAndPort)); } } @@ -93,12 +166,12 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, for (auto& cursor : cursors) { uassertStatusOK(cursor.getStatus()); } - } catch (const AssertionException& ex) { // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too. bool isEligibleException = (isMongosRetriableError(ex.code()) || ex.code() == ErrorCodes::FailedToSatisfyReadPreference); + // Fail if the exception is something other than a retriable or read preference // error, or if the 'allowPartialResults' query parameter was not enabled. if (!allowPartialResults || !isEligibleException) { @@ -111,10 +184,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } } return remoteCursors; - } catch (const DBException&) { + } catch (const DBException& ex) { // If one of the remotes had an error, we make a best effort to finish retrieving responses - // for other requests that were already sent, so that we can send killCursors to any cursors - // that we know were established. + // for other requests that were already sent. try { // Do not schedule any new requests. ars.stopRetrying(); @@ -123,23 +195,52 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, while (!ars.done()) { auto response = ars.next(); - // Check if the response contains an established cursor, and if so, store it. - StatusWith<CursorResponse> swCursorResponse( - response.swResponse.isOK() - ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) - : response.swResponse.getStatus()); - - if (swCursorResponse.isOK()) { - RemoteCursor cursor; - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - cursor.setCursorResponse(std::move(swCursorResponse.getValue())); - remoteCursors.push_back(std::move(cursor)); + if (response.shardHostAndPort) + remotesToClean.push_back(*response.shardHostAndPort); + + if (response.swResponse.isOK()) { + // Check if the response contains an established cursor, and if so, store it. + StatusWith<CursorResponse> swCursorResponse = + CursorResponse::parseFromBSON(response.swResponse.getValue().data); + + if (swCursorResponse.isOK()) { + RemoteCursor cursor; + cursor.setShardId(std::move(response.shardId)); + cursor.setHostAndPort(*response.shardHostAndPort); + cursor.setCursorResponse(std::move(swCursorResponse.getValue())); + remoteCursors.push_back(std::move(cursor)); + } else { + // Remote responded with a failure, do not attempt to clean up. + remotesToClean.erase(std::remove(remotesToClean.begin(), + remotesToClean.end(), + *response.shardHostAndPort)); + } } } - // Schedule killCursors against all cursors that were established. - killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss); + LOGV2(4625501, + "ARS failed with {status}, attempting to clean up {nRemotes} remote operations", + "status"_attr = ex.toStatus(), + "nRemotes"_attr = remotesToClean.size()); + + // Check whether we have any remote operations to kill. + if (remotesToClean.size() > 0) { + // Schedule killOperations against all cursors that were established. Make sure to + // capture arguments by value since the cleanup work may get scheduled after + // returning from this function. + MONGO_COMPILER_VARIABLE_UNUSED auto cbHandle = uassertStatusOK( + executor->scheduleWork([executor, + nss, + readPref, + remotesToClean{std::move(remotesToClean)}, + opKey{std::move(opKey)}]( + const executor::TaskExecutor::CallbackArgs& args) { + uassertStatusOKWithContext(args.status, + "Failed to schedule remote cursor cleanup"); + killOpOnShards( + executor, nss, std::move(remotesToClean), readPref, std::move(opKey)); + })); + } } catch (const DBException&) { // Ignore the new error and rethrow the original one. } @@ -148,15 +249,6 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } } -void killRemoteCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - std::vector<RemoteCursor>&& remoteCursors, - const NamespaceString& nss) { - for (auto&& remoteCursor : remoteCursors) { - killRemoteCursor(opCtx, executor, std::move(remoteCursor), nss); - } -} - void killRemoteCursor(OperationContext* opCtx, executor::TaskExecutor* executor, RemoteCursor&& cursor, diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index 95f6e7ae9d0..729a42edd19 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -73,16 +73,11 @@ std::vector<RemoteCursor> establishCursors( Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent); /** - * Schedules a remote killCursor command for each of the cursors in 'remoteCursors'. + * Schedules a remote killCursor command for 'cursor'. * * Note that this method is optimistic and does not check the return status for the killCursors - * commands. + * command. */ -void killRemoteCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - std::vector<RemoteCursor>&& remoteCursors, - const NamespaceString& nss); - void killRemoteCursor(OperationContext* opCtx, executor::TaskExecutor* executor, RemoteCursor&& cursor, diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp index 389417a76ca..cedd53effa9 100644 --- a/src/mongo/s/query/establish_cursors_test.cpp +++ b/src/mongo/s/query/establish_cursors_test.cpp @@ -85,6 +85,19 @@ public: setupShards(shards); } + /** + * Mock a response for a killOperations command. + */ + void expectKillOperations(size_t expected) { + for (size_t i = 0; i < expected; i++) { + onCommand([this](const RemoteCommandRequest& request) { + ASSERT_EQ("admin", request.dbname) << request; + ASSERT_TRUE(request.cmdObj.hasField("_killOperations")) << request; + return BSON("ok" << 1); + }); + } + } + protected: const NamespaceString _nss; }; @@ -145,11 +158,78 @@ TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableError) { // Remote responds with non-retriable error. onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); - return Status(ErrorCodes::FailedToParse, "failed to parse"); + return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse")); }); future.default_timed_get(); } +TEST_F(EstablishCursorsTest, SingleRemoteInterruptedBeforeCommandSent) { + BSONObj cmdObj = fromjson("{find: 'testcoll'}"); + std::vector<std::pair<ShardId, BSONObj>> remotes{ + {kTestShardIds[0], cmdObj}, + }; + + auto future = launchAsync([&] { + ASSERT_THROWS(establishCursors(operationContext(), + executor(), + _nss, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + remotes, + false), // allowPartialResults + ExceptionFor<ErrorCodes::CursorKilled>); + }); + + // Now interrupt the opCtx which the cursor is running under. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->getServiceContext()->killOperation( + lk, operationContext(), ErrorCodes::CursorKilled); + } + + future.default_timed_get(); +} + +TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) { + BSONObj cmdObj = fromjson("{find: 'testcoll'}"); + std::vector<std::pair<ShardId, BSONObj>> remotes{ + {kTestShardIds[0], cmdObj}, + }; + + // Hang before sending the command but after resolving the host to send it to. + auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand"); + invariant(fp); + fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[0].toString())); + + auto future = launchAsync([&] { + ASSERT_THROWS(establishCursors(operationContext(), + executor(), + _nss, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + remotes, + false), // allowPartialResults + ExceptionFor<ErrorCodes::CursorKilled>); + }); + + // Verify that the failpoint is hit. + fp->waitForTimesEntered(2ULL); + + // Now interrupt the opCtx which the cursor is running under. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->getServiceContext()->killOperation( + lk, operationContext(), ErrorCodes::CursorKilled); + } + + // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a + // killOperations for the two remotes. + fp->setMode(FailPoint::off); + + // Expect a killOperation for the outstanding remote request. + expectKillOperations(1); + + future.default_timed_get(); +} + TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableErrorAllowPartialResults) { BSONObj cmdObj = fromjson("{find: 'testcoll'}"); std::vector<std::pair<ShardId, BSONObj>> remotes{{kTestShardIds[0], cmdObj}}; @@ -168,7 +248,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteRespondsWithNonretriableErrorAllowParti // Remote responds with non-retriable error. onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); - return Status(ErrorCodes::FailedToParse, "failed to parse"); + return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse")); }); future.default_timed_get(); } @@ -259,6 +339,10 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrors) { return Status(ErrorCodes::HostUnreachable, "host unreachable"); }); } + + // Expect a killOperations for the remote which was not reachable. + expectKillOperations(1); + future.default_timed_get(); } @@ -291,6 +375,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrorsAllowPartialResu return Status(ErrorCodes::HostUnreachable, "host unreachable"); }); } + future.default_timed_get(); } @@ -350,7 +435,7 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteRespondsWithNonretriableErr // Second remote responds with a non-retriable error. onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); - return Status(ErrorCodes::FailedToParse, "failed to parse"); + return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse")); }); // Third remote responds with success (must give some response to mock network for each remote). @@ -362,6 +447,9 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteRespondsWithNonretriableErr return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); }); + // Expect two killOperation commands, one for each remote which responded with a cursor. + expectKillOperations(2); + future.default_timed_get(); } @@ -394,7 +482,7 @@ TEST_F(EstablishCursorsTest, // Second remote responds with a non-retriable error. onCommand([this](const RemoteCommandRequest& request) { ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); - return Status(ErrorCodes::FailedToParse, "failed to parse"); + return createErrorCursorResponse(Status(ErrorCodes::FailedToParse, "failed to parse")); }); // Third remote responds with success (must give some response to mock network for each remote). @@ -406,6 +494,9 @@ TEST_F(EstablishCursorsTest, return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); }); + // Expect two killOperation commands, one for each remote which responded with a cursor. + expectKillOperations(2); + future.default_timed_get(); } @@ -559,6 +650,9 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrors) { }); } + // Expect two killOperation commands, one for each remote which responded with a cursor. + expectKillOperations(2); + future.default_timed_get(); } @@ -622,6 +716,71 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo future.default_timed_get(); } +TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) { + BSONObj cmdObj = fromjson("{find: 'testcoll'}"); + std::vector<std::pair<ShardId, BSONObj>> remotes{ + {kTestShardIds[0], cmdObj}, + {kTestShardIds[1], cmdObj}, + }; + + // Hang before sending the command to shard 1. + auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand"); + invariant(fp); + fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[1].toString())); + + auto future = launchAsync([&] { + ASSERT_THROWS(establishCursors(operationContext(), + executor(), + _nss, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + remotes, + false), // allowPartialResults + ExceptionFor<ErrorCodes::CursorKilled>); + }); + + // Verify that the failpoint is hit. + fp->waitForTimesEntered(5ULL); + + // Mark the OperationContext as killed. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->getServiceContext()->killOperation( + lk, operationContext(), ErrorCodes::CursorKilled); + } + + // First remote responds. + onCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + + CursorResponse cursorResponse(_nss, CursorId(123), {}); + return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a + // killOperations for the two remotes. + fp->setMode(FailPoint::off); + + // The second remote operation may be in flight before the killOperations cleanup, so relax the + // assertions on the mocked responses. + auto killsReceived = 0; + while (killsReceived < 2) { + onCommand([&](const RemoteCommandRequest& request) { + if (request.dbname == "admin" && request.cmdObj.hasField("_killOperations")) { + killsReceived++; + return BSON("ok" << 1); + } + + // Its not a killOperations, so expect a normal remote command. + ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + + CursorResponse cursorResponse(_nss, CursorId(123), {}); + return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); + }); + } + + future.default_timed_get(); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/sharding_router_test_fixture.h b/src/mongo/s/sharding_router_test_fixture.h index 6f3ca156e12..84a9b7c5dc3 100644 --- a/src/mongo/s/sharding_router_test_fixture.h +++ b/src/mongo/s/sharding_router_test_fixture.h @@ -171,6 +171,17 @@ public: const Timestamp& expectedTS, long long expectedTerm) const; + /** + * Mocks an error cursor response from a remote with the given 'status'. + */ + BSONObj createErrorCursorResponse(Status status) { + invariant(!status.isOK()); + BSONObjBuilder result; + status.serializeErrorToBSON(&result); + result.appendBool("ok", false); + return result.obj(); + } + private: ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; |