summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2019-11-07 22:17:43 +0000
committerevergreen <evergreen@mongodb.com>2019-11-07 22:17:43 +0000
commita484dd081e3d39e4f9034075f19f75f825c7e5ea (patch)
treebbe32536348be080fe5bea4db1565e3a0f914e5e /src/mongo
parentaefce507386045fe6a77eebcbec567000a6a98e5 (diff)
downloadmongo-a484dd081e3d39e4f9034075f19f75f825c7e5ea.tar.gz
SERVER-43275 Implement retry-on-network-error logic in cloners, except for query.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/all_database_cloner.cpp25
-rw-r--r--src/mongo/db/repl/all_database_cloner.h17
-rw-r--r--src/mongo/db/repl/all_database_cloner_test.cpp283
-rw-r--r--src/mongo/db/repl/base_cloner.cpp127
-rw-r--r--src/mongo/db/repl/base_cloner.h35
-rw-r--r--src/mongo/db/repl/cloner_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/initial_sync_shared_data.h2
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp3
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl12
-rw-r--r--src/mongo/db/repl/replication_auth.cpp9
-rw-r--r--src/mongo/db/repl/replication_auth.h4
-rw-r--r--src/mongo/db/repl/replication_info.cpp2
-rw-r--r--src/mongo/db/repl/rollback_source_impl.cpp5
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.cpp3
14 files changed, 485 insertions, 47 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp
index 6c71eb4ec3c..04cf7c879d0 100644
--- a/src/mongo/db/repl/all_database_cloner.cpp
+++ b/src/mongo/db/repl/all_database_cloner.cpp
@@ -51,10 +51,19 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData,
storageInterface,
dbPool,
clockSource),
+ _connectStage("connect", this, &AllDatabaseCloner::connectStage),
_listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {}
BaseCloner::ClonerStages AllDatabaseCloner::getStages() {
- return {&_listDatabasesStage};
+ return {&_connectStage, &_listDatabasesStage};
+}
+
+BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() {
+ auto* client = getClient();
+ uassertStatusOK(client->connect(getSource(), StringData()));
+ uassertStatusOK(replAuthenticate(client).withContext(
+ str::stream() << "Failed to authenticate to " << getSource()));
+ return kContinueNormally;
}
BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() {
@@ -82,20 +91,6 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() {
return kContinueNormally;
}
-void AllDatabaseCloner::preStage() {
- // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection
- // in the BaseCloner retry logic and remove this method, but remember not to count the initial
- // connection as a _re_try.
- auto* client = getClient();
- Status clientConnectionStatus = client->connect(getSource(), StringData());
- if (clientConnectionStatus.isOK() && !replAuthenticate(client)) {
- clientConnectionStatus =
- Status{ErrorCodes::AuthenticationFailed,
- str::stream() << "Failed to authenticate to " << getSource()};
- }
- uassertStatusOK(clientConnectionStatus);
-}
-
void AllDatabaseCloner::postStage() {
{
stdx::lock_guard<Latch> lk(_mutex);
diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h
index 434dbd5be7a..02a5ebda648 100644
--- a/src/mongo/db/repl/all_database_cloner.h
+++ b/src/mongo/db/repl/all_database_cloner.h
@@ -67,15 +67,25 @@ protected:
private:
friend class AllDatabaseClonerTest;
+ class ConnectStage : public ClonerStage<AllDatabaseCloner> {
+ public:
+ ConnectStage(std::string name, AllDatabaseCloner* cloner, ClonerRunFn stageFunc)
+ : ClonerStage<AllDatabaseCloner>(name, cloner, stageFunc){};
+ virtual bool checkRollBackIdOnRetry() {
+ return false;
+ }
+ };
+
+ /**
+ * Stage function that makes a connection to the sync source.
+ */
+ AfterStageBehavior connectStage();
/**
* Stage function that retrieves database information from the sync source.
*/
AfterStageBehavior listDatabasesStage();
- // The pre-stage for this class connects to the sync source.
- void preStage() final;
-
/**
*
* The postStage creates and runs the individual DatabaseCloners on each database found on
@@ -96,6 +106,7 @@ private:
// (X) Access only allowed from the main flow of control called from run() or constructor.
// (MX) Write access with mutex from main flow of control, read access with mutex from other
// threads, read access allowed from main flow without mutex.
+ ConnectStage _connectStage; // (R)
ClonerStage<AllDatabaseCloner> _listDatabasesStage; // (R)
std::vector<std::string> _databases; // (X)
std::unique_ptr<DatabaseCloner> _currentDatabaseCloner; // (MX)
diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp
index 3524c09b828..96992cb97ec 100644
--- a/src/mongo/db/repl/all_database_cloner_test.cpp
+++ b/src/mongo/db/repl/all_database_cloner_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
@@ -51,14 +52,292 @@ protected:
_source,
_mockClient.get(),
&_storageInterface,
- _dbWorkThreadPool.get());
+ _dbWorkThreadPool.get(),
+ &_clock);
}
std::vector<std::string> getDatabasesFromCloner(AllDatabaseCloner* cloner) {
return cloner->_databases;
}
+
+ ClockSourceMock _clock;
};
+TEST_F(AllDatabaseClonerTest, RetriesConnect) {
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}"));
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+ cloner->setStopAfterStage_forTest("connect");
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1);
+
+ // At this point we should have failed, but not recorded the failure yet.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(0, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ // Now the failure should be recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ _clock.advance(Minutes(60));
+
+ timesEnteredRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}"));
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1);
+
+ // Only first failure is recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}"));
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+
+ // Second failure is recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ // Bring the server up.
+ unittest::log() << "Bringing mock server back up.";
+ _mockServer->reboot();
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock));
+}
+
+TEST_F(AllDatabaseClonerTest, RetriesConnectButFails) {
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'connect'}"));
+ auto cloner = makeAllDatabaseCloner();
+ cloner->setStopAfterStage_forTest("connect");
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+
+ // Advance the clock enough to fail the whole attempt.
+ _clock.advance(Days(1) + Seconds(1));
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Advance the clock and make sure this time isn't recorded.
+ _clock.advance(Minutes(1));
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Days(1) + Seconds(1),
+ _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock));
+}
+
+// Note that the code for retrying listDatabases is the same for all stages except connect, so
+// the unit tests which cover the AllDatabasesCloner listDatabase stage cover retries for all the
+// subsequent stages for all the cloners.
+TEST_F(AllDatabaseClonerTest, RetriesListDatabases) {
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
+
+ // Stop at the listDatabases stage.
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait until we get to the listDatabases stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1);
+
+ // At this point we should have failed, but not recorded the failure yet.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(0, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ // Now the failure should be recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ _clock.advance(Minutes(60));
+
+ timesEnteredRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredRetry + 1);
+
+ // Only first failure is recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+
+ // Second failure is recorded.
+ ASSERT_EQ(1, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock()));
+
+ // Bring the server up.
+ unittest::log() << "Bringing mock server back up.";
+ _mockServer->reboot();
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(2, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock));
+}
+
+TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) {
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
+
+ // Stop at the listDatabases stage.
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ // Wait until we get to the listDatabases stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ _clock.advance(Minutes(60));
+
+ // The rollback ID has changed when we reconnect.
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:2}"));
+
+ // Bring the server up.
+ unittest::log() << "Bringing mock server back up.";
+ _mockServer->reboot();
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock));
+}
+
+TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButTimesOut) {
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
+
+ // Stop at the listDatabases stage.
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ // Wait until we get to the listDatabases stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ // Advance the clock enough for the timeout interval to be exceeded.
+ _clock.advance(Days(1) + Seconds(1));
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Days(1) + Seconds(1),
+ _sharedData->getTotalTimeUnreachable(WithLock::withoutLock(), &_clock));
+}
+
TEST_F(AllDatabaseClonerTest, FailsOnListDatabases) {
Status expectedResult{ErrorCodes::BadValue, "foo"};
_mockServer->setCommandReply("listDatabases", expectedResult);
@@ -145,7 +424,7 @@ TEST_F(AllDatabaseClonerTest, DatabaseStats) {
0,
fromjson("{cloner: 'DatabaseCloner', stage: 'listCollections', database: 'admin'}"));
- // Run the cloner in a separate thread to
+ // Run the cloner in a separate thread.
stdx::thread clonerThread([&] {
Client::initThread("ClonerRunner");
ASSERT_OK(cloner->run());
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp
index 9aca6c4350a..33f4905990b 100644
--- a/src/mongo/db/repl/base_cloner.cpp
+++ b/src/mongo/db/repl/base_cloner.cpp
@@ -32,11 +32,15 @@
#include "mongo/platform/basic.h"
#include "mongo/db/repl/base_cloner.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeClonerStage);
+MONGO_FAIL_POINT_DEFINE(hangBeforeRetryingClonerStage);
+MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage);
MONGO_FAIL_POINT_DEFINE(hangAfterClonerStage);
} // namespace
using executor::TaskExecutor;
@@ -129,8 +133,6 @@ void BaseCloner::pauseForFuzzer(BaseClonerStage* stage) {
}
BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
- // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection
- // in the retry logic, but make sure not to count the initial attempt as a "re-" try.
LOG(1) << "Cloner " << getClonerName() << " running stage " << stage->getName();
pauseForFuzzer(stage);
auto isThisStageFailPoint = [this, stage](const BSONObj& data) {
@@ -145,13 +147,7 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
}
},
isThisStageFailPoint);
- if (_client->isFailed()) {
- Status failed(ErrorCodes::HostUnreachable, "Client is disconnected");
- log() << "Failed because host " << getSource() << " is unreachable.";
- setInitialSyncFailedStatus(failed);
- uassertStatusOK(failed);
- }
- auto afterStageBehavior = stage->run();
+ auto afterStageBehavior = runStageWithRetries(stage);
hangAfterClonerStage.executeIf(
[&](const BSONObj& data) {
log() << "Cloner " << getClonerName() << " hanging after running stage "
@@ -165,6 +161,119 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
return afterStageBehavior;
}
+void BaseCloner::clearRetryingState() {
+ if (_retrying) {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ _sharedData->decrementRetryingOperations(lk, getClock());
+ _retrying = false;
+ }
+}
+
+Status BaseCloner::checkRollBackIdIsUnchanged() {
+ BSONObj info;
+ try {
+ getClient()->simpleCommand("admin", &info, "replSetGetRBID");
+ } catch (DBException& e) {
+ if (ErrorCodes::isNetworkError(e)) {
+ auto status = e.toStatus().withContext(
+ ": failed while attempting to retrieve rollBackId after re-connect");
+ LOG(1) << status;
+ return status;
+ }
+ throw;
+ }
+ uassert(
+ 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber());
+ auto rollBackId = info["rbid"].numberInt();
+ uassert(ErrorCodes::UnrecoverableRollbackError,
+ str::stream() << "Rollback occurred on our sync source " << getSource()
+ << " during initial sync",
+ rollBackId == _sharedData->getRollBackId());
+ return Status::OK();
+}
+
+BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* stage) {
+ ON_BLOCK_EXIT([this] { clearRetryingState(); });
+ Status lastError = Status::OK();
+ auto isThisStageFailPoint = [this, stage](const BSONObj& data) {
+ return data["stage"].str() == stage->getName() && isMyFailPoint(data);
+ };
+ while (true) {
+ try {
+ // mustExit is set when the clone has been canceled externally.
+ if (mustExit())
+ return kSkipRemainingStages;
+ if (!lastError.isOK()) {
+ // If lastError is set, this is a retry.
+ hangBeforeRetryingClonerStage.executeIf(
+ [&](const BSONObj& data) {
+ log() << "Cloner " << getClonerName() << " hanging before retrying stage "
+ << stage->getName();
+ while (!mustExit() &&
+ hangBeforeRetryingClonerStage.shouldFail(isThisStageFailPoint)) {
+ sleepmillis(100);
+ }
+ },
+ isThisStageFailPoint);
+ log() << "Initial Sync retrying " << getClonerName() << " stage "
+ << stage->getName() << " due to " << lastError;
+ auto retryPeriod = Seconds(initialSyncTransientErrorRetryPeriodSeconds.load());
+ Milliseconds outageDuration;
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ if (!_retrying) {
+ _retrying = true;
+ // This is the first retry for this stage in this run, so we start the
+ // clock on the outage by incrementing the retrying operations counter.
+ _sharedData->incrementRetryingOperations(lk, getClock());
+ }
+ outageDuration = _sharedData->getCurrentOutageDuration(lk, getClock());
+ if (outageDuration <= retryPeriod) {
+ _sharedData->incrementTotalRetries(lk);
+ }
+ }
+ if (outageDuration > retryPeriod) {
+ auto status = lastError.withContext(
+ str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds "
+ << retryPeriod);
+ setInitialSyncFailedStatus(status);
+ uassertStatusOK(status);
+ }
+ hangBeforeCheckingRollBackIdClonerStage.executeIf(
+ [&](const BSONObj& data) {
+ log() << "Cloner " << getClonerName()
+ << " hanging before checking rollBackId for stage "
+ << stage->getName();
+ while (!mustExit() &&
+ hangBeforeCheckingRollBackIdClonerStage.shouldFail(
+ isThisStageFailPoint)) {
+ sleepmillis(100);
+ }
+ },
+ isThisStageFailPoint);
+ if (stage->checkRollBackIdOnRetry()) {
+ // If checkRollBackIdIsUnchanged fails without throwing, it means a network
+ // error occurred and it's safe to continue (which will cause another retry).
+ if (!checkRollBackIdIsUnchanged().isOK())
+ continue;
+ // After successfully checking the rollback ID, the client should always be OK.
+ invariant(!getClient()->isFailed());
+ }
+ }
+ return stage->run();
+ } catch (DBException& e) {
+ lastError = e.toStatus();
+ if (!stage->isTransientError(lastError)) {
+ log() << "Non-retryable error occured during cloner " << getClonerName()
+ << " stage " + stage->getName() << ": " << lastError;
+ throw;
+ }
+ LOG(1) << "Transient error occured during cloner " << getClonerName()
+ << " stage " + stage->getName() << ": " << lastError;
+ }
+ }
+}
+
Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) {
{
stdx::lock_guard<Latch> lk(_mutex);
diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h
index ce39786d8f5..914045cdca6 100644
--- a/src/mongo/db/repl/base_cloner.h
+++ b/src/mongo/db/repl/base_cloner.h
@@ -101,9 +101,20 @@ protected:
virtual AfterStageBehavior run() = 0;
- // Returns true if the Status represents an error which should be retried.
- virtual bool isTransientError(Status) {
- return false;
+ /**
+ * Returns true if the Status represents an error which should be retried.
+ */
+ virtual bool isTransientError(const Status& status) {
+ return ErrorCodes::isNetworkError(status);
+ }
+
+ /**
+ * Returns true if the rollback ID should be checked before retrying.
+ * This is provided because the "connect" stage must complete successfully
+ * before checking rollback ID.
+ */
+ virtual bool checkRollBackIdOnRetry() {
+ return true;
}
std::string getName() const {
@@ -194,6 +205,13 @@ protected:
*/
bool mustExit();
+ /**
+ * A stage may, but is not required, to call this when we should clear the retrying state
+ * because the operation has at least partially succeeded. If the stage does not call this,
+ * the retrying state is cleared upon successful completion of the entire stage.
+ */
+ void clearRetryingState();
+
private:
virtual ClonerStages getStages() = 0;
@@ -206,6 +224,14 @@ private:
AfterStageBehavior runStage(BaseClonerStage* stage);
+ AfterStageBehavior runStageWithRetries(BaseClonerStage* stage);
+
+ /**
+ * Make sure the rollback ID has not changed. Throws an exception if it has. Returns
+ * a not-OK status if a network error occurs.
+ */
+ Status checkRollBackIdIsUnchanged();
+
/**
* Supports pausing at certain stages for the initial sync fuzzer test framework.
*/
@@ -248,6 +274,9 @@ private:
// _stopAfterStage is used for unit testing and causes the cloner to exit after a given
// stage.
std::string _stopAfterStage; // (X)
+
+ // Are we currently retrying?
+ bool _retrying = false; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/cloner_test_fixture.cpp b/src/mongo/db/repl/cloner_test_fixture.cpp
index db8df22ff3d..ff8a2bebf4d 100644
--- a/src/mongo/db/repl/cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/cloner_test_fixture.cpp
@@ -66,8 +66,9 @@ void ClonerTestFixture::setUp() {
_dbWorkThreadPool->startup();
_source = HostAndPort{"local:1234"};
_mockServer = std::make_unique<MockRemoteDBServer>(_source.toString());
- _mockClient =
- std::unique_ptr<DBClientConnection>(new MockDBClientConnection(_mockServer.get()));
+ const bool autoReconnect = true;
+ _mockClient = std::unique_ptr<DBClientConnection>(
+ new MockDBClientConnection(_mockServer.get(), autoReconnect));
_sharedData = std::make_unique<InitialSyncSharedData>(
ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44, kInitialRollbackId);
}
diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h
index 569e0c355fc..fb808d04a1c 100644
--- a/src/mongo/db/repl/initial_sync_shared_data.h
+++ b/src/mongo/db/repl/initial_sync_shared_data.h
@@ -76,7 +76,7 @@ public:
return _retryingOperationsCount;
}
- int totalRetries(WithLock lk) {
+ int getTotalRetries(WithLock lk) {
return _totalRetries;
}
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index aa5955ff501..f5629050b97 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -188,7 +188,8 @@ InitialSyncer::InitialSyncer(
_storage(storage),
_replicationProcess(replicationProcess),
_onCompletion(onCompletion),
- _createClientFn([] { return std::make_unique<DBClientConnection>(); }) {
+ _createClientFn(
+ [] { return std::make_unique<DBClientConnection>(true /* autoReconnect */); }) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec);
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess);
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 042c571d2d4..0525cea7cc8 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -261,3 +261,15 @@ server_parameters:
lte:
expr: 100 * 1024 * 1024
+ # New parameters since this file was created, not taken from elsewhere.
+ initialSyncTransientErrorRetryPeriodSeconds:
+ description: >-
+ The amount of time to continue retrying transient errors during initial sync before
+ declaring the attempt failed.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: initialSyncTransientErrorRetryPeriodSeconds
+ default:
+ expr: 24 * 60 * 60
+ validator:
+ gte: 0
diff --git a/src/mongo/db/repl/replication_auth.cpp b/src/mongo/db/repl/replication_auth.cpp
index 515dbf665f8..fc53b17183e 100644
--- a/src/mongo/db/repl/replication_auth.cpp
+++ b/src/mongo/db/repl/replication_auth.cpp
@@ -49,12 +49,13 @@ AuthorizationManager* getGlobalAuthorizationManager() {
} // namespace
-bool replAuthenticate(DBClientBase* conn) {
+Status replAuthenticate(DBClientBase* conn) {
if (auth::isInternalAuthSet())
- return conn->authenticateInternalUser().isOK();
+ return conn->authenticateInternalUser();
if (getGlobalAuthorizationManager()->isAuthEnabled())
- return false;
- return true;
+ return {ErrorCodes::AuthenticationFailed,
+ "Authentication is enabled but no internal authentication data is available."};
+ return Status::OK();
}
} // namespace repl
diff --git a/src/mongo/db/repl/replication_auth.h b/src/mongo/db/repl/replication_auth.h
index 6866f6f51a1..e3771371eb7 100644
--- a/src/mongo/db/repl/replication_auth.h
+++ b/src/mongo/db/repl/replication_auth.h
@@ -40,9 +40,9 @@ namespace repl {
/**
* Authenticates conn using the server's cluster-membership credentials.
*
- * Returns true on successful authentication.
+ * Returns Status::OK() on successful authentication.
*/
-bool replAuthenticate(DBClientBase* conn);
+Status replAuthenticate(DBClientBase* conn);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index 4c784d7648e..c21394e0145 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -132,7 +132,7 @@ void appendReplicationInfo(OperationContext* opCtx, BSONObjBuilder& result, int
ScopedDbConnection conn(s["host"].valuestr());
DBClientConnection* cliConn = dynamic_cast<DBClientConnection*>(&conn.conn());
- if (cliConn && replAuthenticate(cliConn)) {
+ if (cliConn && replAuthenticate(cliConn).isOK()) {
BSONObj first = conn->findOne((string) "local.oplog.$" + sourcename,
Query().sort(BSON("$natural" << 1)));
BSONObj last = conn->findOne((string) "local.oplog.$" + sourcename,
diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp
index 329a75d8064..8d06503b597 100644
--- a/src/mongo/db/repl/rollback_source_impl.cpp
+++ b/src/mongo/db/repl/rollback_source_impl.cpp
@@ -87,9 +87,8 @@ void RollbackSourceImpl::copyCollectionFromRemote(OperationContext* opCtx,
const NamespaceString& nss) const {
std::string errmsg;
auto tmpConn = std::make_unique<DBClientConnection>();
- uassert(15908,
- errmsg,
- tmpConn->connect(_source, StringData(), errmsg) && replAuthenticate(tmpConn.get()));
+ uassert(15908, errmsg, tmpConn->connect(_source, StringData(), errmsg));
+ uassertStatusOK(replAuthenticate(tmpConn.get()));
// cloner owns _conn in unique_ptr
Cloner cloner;
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
index 6ad13d3ab3d..7a33e493364 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
@@ -187,8 +187,9 @@ double MockDBClientConnection::getSoTimeout() const {
}
void MockDBClientConnection::checkConnection() {
- if (_isFailed && _autoReconnect) {
+ if (_isFailed && _autoReconnect && _remoteServer->isRunning()) {
_remoteServerInstanceID = _remoteServer->getInstanceID();
+ _isFailed = false;
}
}
} // namespace mongo