diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2019-11-07 22:17:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-07 22:17:43 +0000 |
commit | a484dd081e3d39e4f9034075f19f75f825c7e5ea (patch) | |
tree | bbe32536348be080fe5bea4db1565e3a0f914e5e /src/mongo | |
parent | aefce507386045fe6a77eebcbec567000a6a98e5 (diff) | |
download | mongo-a484dd081e3d39e4f9034075f19f75f825c7e5ea.tar.gz |
SERVER-43275 Implement retry-on-network-error logic in cloners, except for query.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/all_database_cloner.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/all_database_cloner.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/all_database_cloner_test.cpp | 283 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner.cpp | 127 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner.h | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/cloner_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync_shared_data.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_auth.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_auth.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_source_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/dbtests/mock/mock_dbclient_connection.cpp | 3 |
14 files changed, 485 insertions, 47 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index 6c71eb4ec3c..04cf7c879d0 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -51,10 +51,19 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData, storageInterface, dbPool, clockSource), + _connectStage("connect", this, &AllDatabaseCloner::connectStage), _listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {} BaseCloner::ClonerStages AllDatabaseCloner::getStages() { - return {&_listDatabasesStage}; + return {&_connectStage, &_listDatabasesStage}; +} + +BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() { + auto* client = getClient(); + uassertStatusOK(client->connect(getSource(), StringData())); + uassertStatusOK(replAuthenticate(client).withContext( + str::stream() << "Failed to authenticate to " << getSource())); + return kContinueNormally; } BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() { @@ -82,20 +91,6 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() { return kContinueNormally; } -void AllDatabaseCloner::preStage() { - // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection - // in the BaseCloner retry logic and remove this method, but remember not to count the initial - // connection as a _re_try. - auto* client = getClient(); - Status clientConnectionStatus = client->connect(getSource(), StringData()); - if (clientConnectionStatus.isOK() && !replAuthenticate(client)) { - clientConnectionStatus = - Status{ErrorCodes::AuthenticationFailed, - str::stream() << "Failed to authenticate to " << getSource()}; - } - uassertStatusOK(clientConnectionStatus); -} - void AllDatabaseCloner::postStage() { { stdx::lock_guard<Latch> lk(_mutex); diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h index 434dbd5be7a..02a5ebda648 100644 --- a/src/mongo/db/repl/all_database_cloner.h +++ b/src/mongo/db/repl/all_database_cloner.h @@ -67,15 +67,25 @@ protected: private: friend class AllDatabaseClonerTest; + class ConnectStage : public ClonerStage<AllDatabaseCloner> { + public: + ConnectStage(std::string name, AllDatabaseCloner* cloner, ClonerRunFn stageFunc) + : ClonerStage<AllDatabaseCloner>(name, cloner, stageFunc){}; + virtual bool checkRollBackIdOnRetry() { + return false; + } + }; + + /** + * Stage function that makes a connection to the sync source. + */ + AfterStageBehavior connectStage(); /** * Stage function that retrieves database information from the sync source. */ AfterStageBehavior listDatabasesStage(); - // The pre-stage for this class connects to the sync source. - void preStage() final; - /** * * The postStage creates and runs the individual DatabaseCloners on each database found on @@ -96,6 +106,7 @@ private: // (X) Access only allowed from the main flow of control called from run() or constructor. // (MX) Write access with mutex from main flow of control, read access with mutex from other // threads, read access allowed from main flow without mutex. + ConnectStage _connectStage; // (R) ClonerStage<AllDatabaseCloner> _listDatabasesStage; // (R) std::vector<std::string> _databases; // (X) std::unique_ptr<DatabaseCloner> _currentDatabaseCloner; // (MX) diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp index 3524c09b828..96992cb97ec 100644 --- a/src/mongo/db/repl/all_database_cloner_test.cpp +++ b/src/mongo/db/repl/all_database_cloner_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/service_context_test_fixture.h" #include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/concurrency/thread_pool.h" namespace mongo { @@ -51,14 +52,292 @@ protected: _source, _mockClient.get(), &_storageInterface, - _dbWorkThreadPool.get()); + _dbWorkThreadPool.get(), + &_clock); } std::vector<std::string> getDatabasesFromCloner(AllDatabaseCloner* cloner) { return cloner->_databases; } + + ClockSourceMock _clock; }; +TEST_F(AllDatabaseClonerTest, RetriesConnect) { + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}")); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}")); + + auto cloner = makeAllDatabaseCloner(); + cloner->setStopAfterStage_forTest("connect"); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1); + + // At this point we should have failed, but not recorded the failure yet. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(0, _sharedData->getTotalRetries(WithLock::withoutLock())); + + beforeRetryFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + // Now the failure should be recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + + _clock.advance(Minutes(60)); + + timesEnteredRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}")); + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1); + + // Only first failure is recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + + timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}")); + beforeRetryFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + + // Second failure is recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock())); + + // Bring the server up. + unittest::log() << "Bringing mock server back up."; + _mockServer->reboot(); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock)); +} + +TEST_F(AllDatabaseClonerTest, RetriesConnectButFails) { + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}")); + auto cloner = makeAllDatabaseCloner(); + cloner->setStopAfterStage_forTest("connect"); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + + // Advance the clock enough to fail the whole attempt. + _clock.advance(Days(1) + Seconds(1)); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Advance the clock and make sure this time isn't recorded. + _clock.advance(Minutes(1)); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Days(1) + Seconds(1), + _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock)); +} + +// Note that the code for retrying listDatabases is the same for all stages except connect, so +// the unit tests which cover the AllDatabasesCloner listDatabase stage cover retries for all the +// subsequent stages for all the cloners. +TEST_F(AllDatabaseClonerTest, RetriesListDatabases) { + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1); + + // At this point we should have failed, but not recorded the failure yet. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(0, _sharedData->getTotalRetries(WithLock::withoutLock())); + + beforeRetryFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + // Now the failure should be recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + + _clock.advance(Minutes(60)); + + timesEnteredRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1); + + // Only first failure is recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + + timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeRetryFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + + // Second failure is recorded. + ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock())); + + // Bring the server up. + unittest::log() << "Bringing mock server back up."; + _mockServer->reboot(); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock)); +} + +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) { + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + _clock.advance(Minutes(60)); + + // The rollback ID has changed when we reconnect. + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:2}")); + + // Bring the server up. + unittest::log() << "Bringing mock server back up."; + _mockServer->reboot(); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock)); +} + +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButTimesOut) { + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + // Advance the clock enough for the timeout interval to be exceeded. + _clock.advance(Days(1) + Seconds(1)); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Days(1) + Seconds(1), + _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock)); +} + TEST_F(AllDatabaseClonerTest, FailsOnListDatabases) { Status expectedResult{ErrorCodes::BadValue, "foo"}; _mockServer->setCommandReply("listDatabases", expectedResult); @@ -145,7 +424,7 @@ TEST_F(AllDatabaseClonerTest, DatabaseStats) { 0, fromjson("{cloner: 'DatabaseCloner', stage: 'listCollections', database: 'admin'}")); - // Run the cloner in a separate thread to + // Run the cloner in a separate thread. stdx::thread clonerThread([&] { Client::initThread("ClonerRunner"); ASSERT_OK(cloner->run()); diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 9aca6c4350a..33f4905990b 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,11 +32,15 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeClonerStage); +MONGO_FAIL_POINT_DEFINE(hangBeforeRetryingClonerStage); +MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage); MONGO_FAIL_POINT_DEFINE(hangAfterClonerStage); } // namespace using executor::TaskExecutor; @@ -129,8 +133,6 @@ void BaseCloner::pauseForFuzzer(BaseClonerStage* stage) { } BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { - // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection - // in the retry logic, but make sure not to count the initial attempt as a "re-" try. LOG(1) << "Cloner " << getClonerName() << " running stage " << stage->getName(); pauseForFuzzer(stage); auto isThisStageFailPoint = [this, stage](const BSONObj& data) { @@ -145,13 +147,7 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { } }, isThisStageFailPoint); - if (_client->isFailed()) { - Status failed(ErrorCodes::HostUnreachable, "Client is disconnected"); - log() << "Failed because host " << getSource() << " is unreachable."; - setInitialSyncFailedStatus(failed); - uassertStatusOK(failed); - } - auto afterStageBehavior = stage->run(); + auto afterStageBehavior = runStageWithRetries(stage); hangAfterClonerStage.executeIf( [&](const BSONObj& data) { log() << "Cloner " << getClonerName() << " hanging after running stage " @@ -165,6 +161,119 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { return afterStageBehavior; } +void BaseCloner::clearRetryingState() { + if (_retrying) { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + _sharedData->decrementRetryingOperations(lk, getClock()); + _retrying = false; + } +} + +Status BaseCloner::checkRollBackIdIsUnchanged() { + BSONObj info; + try { + getClient()->simpleCommand("admin", &info, "replSetGetRBID"); + } catch (DBException& e) { + if (ErrorCodes::isNetworkError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve rollBackId after re-connect"); + LOG(1) << status; + return status; + } + throw; + } + uassert( + 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber()); + auto rollBackId = info["rbid"].numberInt(); + uassert(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Rollback occurred on our sync source " << getSource() + << " during initial sync", + rollBackId == _sharedData->getRollBackId()); + return Status::OK(); +} + +BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* stage) { + ON_BLOCK_EXIT([this] { clearRetryingState(); }); + Status lastError = Status::OK(); + auto isThisStageFailPoint = [this, stage](const BSONObj& data) { + return data["stage"].str() == stage->getName() && isMyFailPoint(data); + }; + while (true) { + try { + // mustExit is set when the clone has been canceled externally. + if (mustExit()) + return kSkipRemainingStages; + if (!lastError.isOK()) { + // If lastError is set, this is a retry. + hangBeforeRetryingClonerStage.executeIf( + [&](const BSONObj& data) { + log() << "Cloner " << getClonerName() << " hanging before retrying stage " + << stage->getName(); + while (!mustExit() && + hangBeforeRetryingClonerStage.shouldFail(isThisStageFailPoint)) { + sleepmillis(100); + } + }, + isThisStageFailPoint); + log() << "Initial Sync retrying " << getClonerName() << " stage " + << stage->getName() << " due to " << lastError; + auto retryPeriod = Seconds(initialSyncTransientErrorRetryPeriodSeconds.load()); + Milliseconds outageDuration; + { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + if (!_retrying) { + _retrying = true; + // This is the first retry for this stage in this run, so we start the + // clock on the outage by incrementing the retrying operations counter. + _sharedData->incrementRetryingOperations(lk, getClock()); + } + outageDuration = _sharedData->getCurrentOutageDuration(lk, getClock()); + if (outageDuration <= retryPeriod) { + _sharedData->incrementTotalRetries(lk); + } + } + if (outageDuration > retryPeriod) { + auto status = lastError.withContext( + str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds " + << retryPeriod); + setInitialSyncFailedStatus(status); + uassertStatusOK(status); + } + hangBeforeCheckingRollBackIdClonerStage.executeIf( + [&](const BSONObj& data) { + log() << "Cloner " << getClonerName() + << " hanging before checking rollBackId for stage " + << stage->getName(); + while (!mustExit() && + hangBeforeCheckingRollBackIdClonerStage.shouldFail( + isThisStageFailPoint)) { + sleepmillis(100); + } + }, + isThisStageFailPoint); + if (stage->checkRollBackIdOnRetry()) { + // If checkRollBackIdIsUnchanged fails without throwing, it means a network + // error occurred and it's safe to continue (which will cause another retry). + if (!checkRollBackIdIsUnchanged().isOK()) + continue; + // After successfully checking the rollback ID, the client should always be OK. + invariant(!getClient()->isFailed()); + } + } + return stage->run(); + } catch (DBException& e) { + lastError = e.toStatus(); + if (!stage->isTransientError(lastError)) { + log() << "Non-retryable error occured during cloner " << getClonerName() + << " stage " + stage->getName() << ": " << lastError; + throw; + } + LOG(1) << "Transient error occured during cloner " << getClonerName() + << " stage " + stage->getName() << ": " << lastError; + } + } +} + Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) { { stdx::lock_guard<Latch> lk(_mutex); diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index ce39786d8f5..914045cdca6 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -101,9 +101,20 @@ protected: virtual AfterStageBehavior run() = 0; - // Returns true if the Status represents an error which should be retried. - virtual bool isTransientError(Status) { - return false; + /** + * Returns true if the Status represents an error which should be retried. + */ + virtual bool isTransientError(const Status& status) { + return ErrorCodes::isNetworkError(status); + } + + /** + * Returns true if the rollback ID should be checked before retrying. + * This is provided because the "connect" stage must complete successfully + * before checking rollback ID. + */ + virtual bool checkRollBackIdOnRetry() { + return true; } std::string getName() const { @@ -194,6 +205,13 @@ protected: */ bool mustExit(); + /** + * A stage may, but is not required, to call this when we should clear the retrying state + * because the operation has at least partially succeeded. If the stage does not call this, + * the retrying state is cleared upon successful completion of the entire stage. + */ + void clearRetryingState(); + private: virtual ClonerStages getStages() = 0; @@ -206,6 +224,14 @@ private: AfterStageBehavior runStage(BaseClonerStage* stage); + AfterStageBehavior runStageWithRetries(BaseClonerStage* stage); + + /** + * Make sure the rollback ID has not changed. Throws an exception if it has. Returns + * a not-OK status if a network error occurs. + */ + Status checkRollBackIdIsUnchanged(); + /** * Supports pausing at certain stages for the initial sync fuzzer test framework. */ @@ -248,6 +274,9 @@ private: // _stopAfterStage is used for unit testing and causes the cloner to exit after a given // stage. std::string _stopAfterStage; // (X) + + // Are we currently retrying? + bool _retrying = false; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/cloner_test_fixture.cpp b/src/mongo/db/repl/cloner_test_fixture.cpp index db8df22ff3d..ff8a2bebf4d 100644 --- a/src/mongo/db/repl/cloner_test_fixture.cpp +++ b/src/mongo/db/repl/cloner_test_fixture.cpp @@ -66,8 +66,9 @@ void ClonerTestFixture::setUp() { _dbWorkThreadPool->startup(); _source = HostAndPort{"local:1234"}; _mockServer = std::make_unique<MockRemoteDBServer>(_source.toString()); - _mockClient = - std::unique_ptr<DBClientConnection>(new MockDBClientConnection(_mockServer.get())); + const bool autoReconnect = true; + _mockClient = std::unique_ptr<DBClientConnection>( + new MockDBClientConnection(_mockServer.get(), autoReconnect)); _sharedData = std::make_unique<InitialSyncSharedData>( ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44, kInitialRollbackId); } diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h index 569e0c355fc..fb808d04a1c 100644 --- a/src/mongo/db/repl/initial_sync_shared_data.h +++ b/src/mongo/db/repl/initial_sync_shared_data.h @@ -76,7 +76,7 @@ public: return _retryingOperationsCount; } - int totalRetries(WithLock lk) { + int getTotalRetries(WithLock lk) { return _totalRetries; } diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index aa5955ff501..f5629050b97 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -188,7 +188,8 @@ InitialSyncer::InitialSyncer( _storage(storage), _replicationProcess(replicationProcess), _onCompletion(onCompletion), - _createClientFn([] { return std::make_unique<DBClientConnection>(); }) { + _createClientFn( + [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }) { uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec); uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess); diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 042c571d2d4..0525cea7cc8 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -261,3 +261,15 @@ server_parameters: lte: expr: 100 * 1024 * 1024 + # New parameters since this file was created, not taken from elsewhere. + initialSyncTransientErrorRetryPeriodSeconds: + description: >- + The amount of time to continue retrying transient errors during initial sync before + declaring the attempt failed. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<int> + cpp_varname: initialSyncTransientErrorRetryPeriodSeconds + default: + expr: 24 * 60 * 60 + validator: + gte: 0 diff --git a/src/mongo/db/repl/replication_auth.cpp b/src/mongo/db/repl/replication_auth.cpp index 515dbf665f8..fc53b17183e 100644 --- a/src/mongo/db/repl/replication_auth.cpp +++ b/src/mongo/db/repl/replication_auth.cpp @@ -49,12 +49,13 @@ AuthorizationManager* getGlobalAuthorizationManager() { } // namespace -bool replAuthenticate(DBClientBase* conn) { +Status replAuthenticate(DBClientBase* conn) { if (auth::isInternalAuthSet()) - return conn->authenticateInternalUser().isOK(); + return conn->authenticateInternalUser(); if (getGlobalAuthorizationManager()->isAuthEnabled()) - return false; - return true; + return {ErrorCodes::AuthenticationFailed, + "Authentication is enabled but no internal authentication data is available."}; + return Status::OK(); } } // namespace repl diff --git a/src/mongo/db/repl/replication_auth.h b/src/mongo/db/repl/replication_auth.h index 6866f6f51a1..e3771371eb7 100644 --- a/src/mongo/db/repl/replication_auth.h +++ b/src/mongo/db/repl/replication_auth.h @@ -40,9 +40,9 @@ namespace repl { /** * Authenticates conn using the server's cluster-membership credentials. * - * Returns true on successful authentication. + * Returns Status::OK() on successful authentication. */ -bool replAuthenticate(DBClientBase* conn); +Status replAuthenticate(DBClientBase* conn); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 4c784d7648e..c21394e0145 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -132,7 +132,7 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int ScopedDbConnection conn(s["host"].valuestr()); DBClientConnection* cliConn = dynamic_cast<DBClientConnection*>(&conn.conn()); - if (cliConn && replAuthenticate(cliConn)) { + if (cliConn && replAuthenticate(cliConn).isOK()) { BSONObj first = conn->findOne((string) "local.oplog.$" + sourcename, Query().sort(BSON("$natural" << 1))); BSONObj last = conn->findOne((string) "local.oplog.$" + sourcename, diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp index 329a75d8064..8d06503b597 100644 --- a/src/mongo/db/repl/rollback_source_impl.cpp +++ b/src/mongo/db/repl/rollback_source_impl.cpp @@ -87,9 +87,8 @@ void RollbackSourceImpl::copyCollectionFromRemote(OperationContext* opCtx, const NamespaceString& nss) const { std::string errmsg; auto tmpConn = std::make_unique<DBClientConnection>(); - uassert(15908, - errmsg, - tmpConn->connect(_source, StringData(), errmsg) && replAuthenticate(tmpConn.get())); + uassert(15908, errmsg, tmpConn->connect(_source, StringData(), errmsg)); + uassertStatusOK(replAuthenticate(tmpConn.get())); // cloner owns _conn in unique_ptr Cloner cloner; diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index 6ad13d3ab3d..7a33e493364 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -187,8 +187,9 @@ double MockDBClientConnection::getSoTimeout() const { } void MockDBClientConnection::checkConnection() { - if (_isFailed && _autoReconnect) { + if (_isFailed && _autoReconnect && _remoteServer->isRunning()) { _remoteServerInstanceID = _remoteServer->getInstanceID(); + _isFailed = false; } } } // namespace mongo |