diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-10-12 12:00:46 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-10-13 17:25:13 -0400 |
commit | 66c0dda7e9f97d03383139f7e62e4534934b5ecc (patch) | |
tree | 0b3fff121c9a0a4076c103edf2b99a239e286382 | |
parent | 6abdb820a8259591cc43ad4ed52dbf7c3509d04b (diff) | |
download | mongo-66c0dda7e9f97d03383139f7e62e4534934b5ecc.tar.gz |
SERVER-20646 Cluster find command per-host retry logic
This change makes the cluster 'find' command to retry on a per-host basis
instead of the entire operation.
Reverts commit c433c8157f988a377c1cf9646078450ecd68c297.
Reverts commit 5ab3290f8796f2143acd5011ab0baae70ed5cece.
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding.yml | 5 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_auth.yml | 5 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_auth_audit.yml | 5 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_ese.yml | 5 | ||||
-rw-r--r-- | jstests/sharding/recovering_slaveok.js (renamed from jstests/sharding/stale_clustered.js) | 10 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 8 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 53 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 5 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 117 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 10 |
11 files changed, 189 insertions, 56 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding.yml b/buildscripts/resmokeconfig/suites/sharding.yml index 8a836967c95..cb084fdbf8d 100644 --- a/buildscripts/resmokeconfig/suites/sharding.yml +++ b/buildscripts/resmokeconfig/suites/sharding.yml @@ -4,9 +4,8 @@ selector: - jstests/sharding/*.js - jstests/sharding/replset_config/*.js exclude_files: - - jstests/sharding/csrs_upgrade.js # SERVER-20694 - - jstests/sharding/csrs_upgrade_during_migrate.js # SERVER-20580 - - jstests/sharding/stale_clustered.js # SERVER-20646 + - jstests/sharding/csrs_upgrade.js # SERVER-20694 + - jstests/sharding/csrs_upgrade_during_migrate.js # flaky - SERVER-20580 executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml index 6ab81bd2c1c..37fa3fc58cd 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml @@ -10,9 +10,8 @@ selector: - jstests/sharding/replset_config/*.js - jstests/sharding/sync_cluster_config/*.js exclude_files: - - jstests/sharding/csrs_upgrade.js # SERVER-20694 - - jstests/sharding/csrs_upgrade_during_migrate.js # SERVER-20580 - - jstests/sharding/stale_clustered.js # SERVER-20646 + - jstests/sharding/csrs_upgrade.js # SERVER-20694 + - jstests/sharding/csrs_upgrade_during_migrate.js # flaky - SERVER-20580 # Skip any tests that run with auth explicitly. - jstests/sharding/*[aA]uth*.js - jstests/sharding/replset_config/*[aA]uth*.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml index 0afd163d8db..7dd52783512 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml @@ -10,9 +10,8 @@ selector: - jstests/sharding/replset_config/*.js - jstests/sharding/sync_cluster_config/*.js exclude_files: - - jstests/sharding/csrs_upgrade.js # SERVER-20694 - - jstests/sharding/csrs_upgrade_during_migrate.js # SERVER-20580 - - jstests/sharding/stale_clustered.js # SERVER-20646 + - jstests/sharding/csrs_upgrade.js # SERVER-20694 + - jstests/sharding/csrs_upgrade_during_migrate.js # flaky - SERVER-20580 # Skip any tests that run with auth explicitly. - jstests/sharding/*[aA]uth*.js - jstests/sharding/replset_config/*[aA]uth*.js diff --git a/buildscripts/resmokeconfig/suites/sharding_ese.yml b/buildscripts/resmokeconfig/suites/sharding_ese.yml index dd092097dbd..2c6887ba8bf 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese.yml @@ -8,9 +8,8 @@ selector: - jstests/sharding/*.js - jstests/sharding/replset_config/*.js exclude_files: - - jstests/sharding/csrs_upgrade.js # SERVER-20694 - - jstests/sharding/csrs_upgrade_during_migrate.js # SERVER-20580 - - jstests/sharding/stale_clustered.js # SERVER-20646 + - jstests/sharding/csrs_upgrade.js # SERVER-20694 + - jstests/sharding/csrs_upgrade_during_migrate.js # flaky - SERVER-20580 executor: js_test: diff --git a/jstests/sharding/stale_clustered.js b/jstests/sharding/recovering_slaveok.js index 5e0bacb7333..b446c80918c 100644 --- a/jstests/sharding/stale_clustered.js +++ b/jstests/sharding/recovering_slaveok.js @@ -5,10 +5,9 @@ 'use strict'; -var shardTest = new ShardingTest({ name: "clusteredstale", +var shardTest = new ShardingTest({ name: "recovering_slaveok", shards: 2, mongos: 2, - verbose: 0, other: { rs: true } }); var mongos = shardTest.s0; @@ -63,15 +62,16 @@ assert.soon(function() { return coll.find().itcount() == collSOk.find().itcount( assert.eq(shardAColl.find().itcount(), 1); assert.eq(shardAColl.findOne()._id, -1); -print("5: overflow oplog"); +print("5: make one of the secondaries RECOVERING"); var secs = rsA.getSecondaries(); var goodSec = secs[0]; var badSec = secs[1]; -rsA.overflow(badSec); +assert.commandWorked(badSec.adminCommand("replSetMaintenance")); +rsA.waitForState(badSec, ReplSetTest.State.RECOVERING); -print("6: stop non-overflowed secondary"); +print("6: stop non-RECOVERING secondary"); rsA.stop(goodSec); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 99df3d845b2..efd22ba7c5c 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -79,16 +79,6 @@ const BSONObj kReplSecondaryOkMetadata{ BSON(rpc::kSecondaryOkFieldName << 1 << rpc::kReplSetMetadataFieldName << 1)}; const BSONObj kSecondaryOkMetadata{BSON(rpc::kSecondaryOkFieldName << 1)}; -void updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter, - const HostAndPort& remoteHost, - const Status& remoteCommandStatus) { - if (ErrorCodes::NotMaster == remoteCommandStatus) { - targeter->markHostNotMaster(remoteHost); - } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) { - targeter->markHostUnreachable(remoteHost); - } -} - BSONObj appendMaxTimeToCmdObj(long long maxTimeMicros, const BSONObj& cmdObj) { Seconds maxTime = kConfigCommandTimeout; @@ -685,4 +675,16 @@ StatusWith<ShardRegistry::CommandResponse> ShardRegistry::_runCommandWithMetadat return cmdResponse; } +void ShardRegistry::updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter, + const HostAndPort& remoteHost, + const Status& remoteCommandStatus) { + if (ErrorCodes::isNotMasterError(remoteCommandStatus.code())) { + targeter->markHostNotMaster(remoteHost); + } else if (ErrorCodes::isNetworkError(remoteCommandStatus.code())) { + targeter->markHostUnreachable(remoteHost); + } else if (remoteCommandStatus == ErrorCodes::NotMasterOrSecondary) { + targeter->markHostUnreachable(remoteHost); + } +} + } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 27efbb16488..fa54265ade5 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -266,6 +266,14 @@ public: const std::string& dbname, const BSONObj& cmdObj); + /** + * Notifies the specified RemoteCommandTargeter of a particular mode of failure for the + * specified host. + */ + static void updateReplSetMonitor(const std::shared_ptr<RemoteCommandTargeter>& targeter, + const HostAndPort& remoteHost, + const Status& remoteCommandStatus); + private: typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap; diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index aec7c898ada..dc412615d33 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -42,9 +42,25 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/assert_util.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { +namespace { + +// Maximum number of retries for network and replication notMaster errors (per host). +const int kMaxNumFailedHostRetryAttempts = 3; + +/** + * Returns whether a particular error code returned from the initial cursor establishment should + * be retried. + */ +bool isPerShardRetriableError(ErrorCodes::Error err) { + return (ErrorCodes::isNetworkError(err) || ErrorCodes::isNotMasterError(err) || + err == ErrorCodes::NotMasterOrSecondary); +} + +} // namespace AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams params) @@ -407,24 +423,37 @@ void AsyncResultsMerger::handleBatchResponse( : cbData.response.getStatus()); if (!cursorResponseStatus.isOK()) { - remote.status = cursorResponseStatus.getStatus(); - - // Errors other than HostUnreachable have no special handling. - if (remote.status != ErrorCodes::HostUnreachable) { - return; - } - // Notify the shard registry of the failure. if (remote.shardId) { auto shard = grid.shardRegistry()->getShard(_params.txn, *remote.shardId); if (!shard) { - remote.status = Status(ErrorCodes::HostUnreachable, + remote.status = Status(cursorResponseStatus.getStatus().code(), str::stream() << "Could not find shard " << *remote.shardId << " containing host " << remote.getTargetHost().toString()); } else { - shard->getTargeter()->markHostUnreachable(remote.getTargetHost()); + ShardRegistry::updateReplSetMonitor( + shard->getTargeter(), remote.getTargetHost(), cursorResponseStatus.getStatus()); + } + } + + // If the error is retriable, schedule another request. + if (!remote.cursorId && remote.retryCount < kMaxNumFailedHostRetryAttempts && + isPerShardRetriableError(cursorResponseStatus.getStatus().code())) { + ++remote.retryCount; + + // Since we potentially updated the targeter that the last host it chose might be + // faulty, the call below may end up getting a different host. + remote.status = askForNextBatch_inlock(remoteIndex); + if (remote.status.isOK()) { + return; } + + // If we end up here, it means we failed to schedule the retry request, which is a more + // severe error that should not be retried. Just pass through to the error handling + // logic below. + } else { + remote.status = cursorResponseStatus.getStatus(); } // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We @@ -441,6 +470,7 @@ void AsyncResultsMerger::handleBatchResponse( return; } + // Cursor id successfully established. auto cursorResponse = cursorResponseStatus.getValue(); remote.cursorId = cursorResponse.cursorId; remote.initialCmdObj = boost::none; @@ -478,9 +508,8 @@ void AsyncResultsMerger::handleBatchResponse( // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. if (!_params.isTailable && !remote.hasNext() && !remote.exhausted()) { - auto nextBatchStatus = askForNextBatch_inlock(remoteIndex); - if (!nextBatchStatus.isOK()) { - remote.status = nextBatchStatus; + remote.status = askForNextBatch_inlock(remoteIndex); + if (!remote.status.isOK()) { return; } } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index b51f57f0559..38671e3a750 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -216,6 +216,11 @@ private: executor::TaskExecutor::CallbackHandle cbHandle; Status status = Status::OK(); + // Counts how many times we retried the initial cursor establishment command. It is used to + // make a decision based on the error type and the retry count about whether we are allowed + // to retry sending the request to another host from this shard. + int retryCount = 0; + // Count of fetched docs during ARM processing of the current batch. Used to reduce the // batchSize in getMore when mongod returned less docs than the requested batchSize. long long fetchedCount = 0; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 291be1670f5..60f8bffa5d7 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -55,10 +55,10 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -const std::vector<std::string> kTestShardIds = {"FakeShardId1", "FakeShardId2", "FakeShardId3"}; -const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShardHost1", 12345), - HostAndPort("FakeShardHost2", 12345), - HostAndPort("FakeShardHost3", 12345)}; +const std::vector<std::string> kTestShardIds = {"FakeShard1", "FakeShard2", "FakeShard3"}; +const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; class AsyncResultsMergerTest : public ShardingTestFixture { public: @@ -83,7 +83,7 @@ public: std::unique_ptr<RemoteCommandTargeterMock> targeter( stdx::make_unique<RemoteCommandTargeterMock>()); - targeter->setConnectionStringReturnValue(ConnectionString(kTestConfigShardHost)); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); targeter->setFindHostReturnValue(kTestShardHosts[i]); targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), @@ -582,7 +582,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) { TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); - makeCursorFromFindCmd(findCmd, {kTestShardHosts[0].toString(), kTestShardHosts[1].toString()}); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); @@ -1117,8 +1117,8 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); - // The network layer reports that the first host is unreachable. - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + // An unretriable error occurs with the first host. + scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"}); ASSERT_FALSE(arm->ready()); // Instead of propagating the error, we should be willing to return results from the two @@ -1142,7 +1142,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { // Now the second host becomes unreachable. We should still be willing to return results from // the third shard. - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"}); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -1192,13 +1192,106 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); - // The lone host involved in this query becomes unreachable. This should simply cause us to - // return EOF. - scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + // The lone host involved in this query returns an error. This should simply cause us to return + // EOF. + scheduleErrorResponse({ErrorCodes::AuthenticationFailed, "authentication failed"}); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } +TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { + BSONObj findCmd = fromjson("{find: 'testcoll'}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // First and second attempts return an error. + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_FALSE(arm->ready()); + + // Third attempt succeeds. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + + ASSERT_TRUE(arm->remotesExhausted()); + ASSERT_TRUE(arm->ready()); +} + +TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { + BSONObj findCmd = fromjson("{find: 'testcoll'}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // All attempts return an error (one attempt plus three retries) + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); + ASSERT_TRUE(arm->ready()); + + auto status = arm->nextReady(); + ASSERT_EQ(status.getStatus().code(), ErrorCodes::NotMasterNoSlaveOk); + + // Protocol is to kill the 'arm' on error before destruction. + auto killEvent = arm->kill(); + executor->waitForEvent(killEvent); +} + +TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { + BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // First host returns single result + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; + responses.emplace_back(_nss, CursorId(0), batch); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); + + // From the second host all attempts return an error (one attempt plus three retries) + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_FALSE(arm->ready()); + + scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); + ASSERT_TRUE(arm->ready()); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + + ASSERT_TRUE(arm->remotesExhausted()); + ASSERT_TRUE(arm->ready()); +} + + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index e1f36f7a9a7..9d96393db5a 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -369,10 +369,10 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, } auto status = std::move(cursorId.getStatus()); - if (status != ErrorCodes::SendStaleConfig && status != ErrorCodes::RecvStaleConfig && - status != ErrorCodes::HostUnreachable) { - // Errors other than receiving a stale config message from mongoD or an unreachable host - // are fatal to the operation. + if (status != ErrorCodes::SendStaleConfig && status != ErrorCodes::RecvStaleConfig) { + // Errors other than receiving a stale config message from MongoD are fatal to the + // operation. Network errors and replication retries happen at the level of the + // AsyncResultsMerger. return status; } @@ -388,7 +388,7 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, return {ErrorCodes::StaleShardVersion, str::stream() << "Retried " << kMaxStaleConfigRetries - << " times without establishing shard version on a reachable host."}; + << " times without successfully establishing shard version."}; } StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, |