diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2019-11-07 22:17:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-07 22:17:43 +0000 |
commit | a484dd081e3d39e4f9034075f19f75f825c7e5ea (patch) | |
tree | bbe32536348be080fe5bea4db1565e3a0f914e5e /src/mongo/db/repl/base_cloner.cpp | |
parent | aefce507386045fe6a77eebcbec567000a6a98e5 (diff) | |
download | mongo-a484dd081e3d39e4f9034075f19f75f825c7e5ea.tar.gz |
SERVER-43275 Implement retry-on-network-error logic in cloners, except for query.
Diffstat (limited to 'src/mongo/db/repl/base_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/base_cloner.cpp | 127 |
1 files changed, 118 insertions, 9 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 9aca6c4350a..33f4905990b 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,11 +32,15 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeClonerStage); +MONGO_FAIL_POINT_DEFINE(hangBeforeRetryingClonerStage); +MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage); MONGO_FAIL_POINT_DEFINE(hangAfterClonerStage); } // namespace using executor::TaskExecutor; @@ -129,8 +133,6 @@ void BaseCloner::pauseForFuzzer(BaseClonerStage* stage) { } BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { - // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection - // in the retry logic, but make sure not to count the initial attempt as a "re-" try. LOG(1) << "Cloner " << getClonerName() << " running stage " << stage->getName(); pauseForFuzzer(stage); auto isThisStageFailPoint = [this, stage](const BSONObj& data) { @@ -145,13 +147,7 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { } }, isThisStageFailPoint); - if (_client->isFailed()) { - Status failed(ErrorCodes::HostUnreachable, "Client is disconnected"); - log() << "Failed because host " << getSource() << " is unreachable."; - setInitialSyncFailedStatus(failed); - uassertStatusOK(failed); - } - auto afterStageBehavior = stage->run(); + auto afterStageBehavior = runStageWithRetries(stage); hangAfterClonerStage.executeIf( [&](const BSONObj& data) { log() << "Cloner " << getClonerName() << " hanging after running stage " @@ -165,6 +161,119 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { return afterStageBehavior; } +void BaseCloner::clearRetryingState() { + if (_retrying) { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + _sharedData->decrementRetryingOperations(lk, getClock()); + _retrying = false; + } +} + +Status BaseCloner::checkRollBackIdIsUnchanged() { + BSONObj info; + try { + getClient()->simpleCommand("admin", &info, "replSetGetRBID"); + } catch (DBException& e) { + if (ErrorCodes::isNetworkError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve rollBackId after re-connect"); + LOG(1) << status; + return status; + } + throw; + } + uassert( + 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber()); + auto rollBackId = info["rbid"].numberInt(); + uassert(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Rollback occurred on our sync source " << getSource() + << " during initial sync", + rollBackId == _sharedData->getRollBackId()); + return Status::OK(); +} + +BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* stage) { + ON_BLOCK_EXIT([this] { clearRetryingState(); }); + Status lastError = Status::OK(); + auto isThisStageFailPoint = [this, stage](const BSONObj& data) { + return data["stage"].str() == stage->getName() && isMyFailPoint(data); + }; + while (true) { + try { + // mustExit is set when the clone has been canceled externally. + if (mustExit()) + return kSkipRemainingStages; + if (!lastError.isOK()) { + // If lastError is set, this is a retry. + hangBeforeRetryingClonerStage.executeIf( + [&](const BSONObj& data) { + log() << "Cloner " << getClonerName() << " hanging before retrying stage " + << stage->getName(); + while (!mustExit() && + hangBeforeRetryingClonerStage.shouldFail(isThisStageFailPoint)) { + sleepmillis(100); + } + }, + isThisStageFailPoint); + log() << "Initial Sync retrying " << getClonerName() << " stage " + << stage->getName() << " due to " << lastError; + auto retryPeriod = Seconds(initialSyncTransientErrorRetryPeriodSeconds.load()); + Milliseconds outageDuration; + { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + if (!_retrying) { + _retrying = true; + // This is the first retry for this stage in this run, so we start the + // clock on the outage by incrementing the retrying operations counter. + _sharedData->incrementRetryingOperations(lk, getClock()); + } + outageDuration = _sharedData->getCurrentOutageDuration(lk, getClock()); + if (outageDuration <= retryPeriod) { + _sharedData->incrementTotalRetries(lk); + } + } + if (outageDuration > retryPeriod) { + auto status = lastError.withContext( + str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds " + << retryPeriod); + setInitialSyncFailedStatus(status); + uassertStatusOK(status); + } + hangBeforeCheckingRollBackIdClonerStage.executeIf( + [&](const BSONObj& data) { + log() << "Cloner " << getClonerName() + << " hanging before checking rollBackId for stage " + << stage->getName(); + while (!mustExit() && + hangBeforeCheckingRollBackIdClonerStage.shouldFail( + isThisStageFailPoint)) { + sleepmillis(100); + } + }, + isThisStageFailPoint); + if (stage->checkRollBackIdOnRetry()) { + // If checkRollBackIdIsUnchanged fails without throwing, it means a network + // error occurred and it's safe to continue (which will cause another retry). + if (!checkRollBackIdIsUnchanged().isOK()) + continue; + // After successfully checking the rollback ID, the client should always be OK. + invariant(!getClient()->isFailed()); + } + } + return stage->run(); + } catch (DBException& e) { + lastError = e.toStatus(); + if (!stage->isTransientError(lastError)) { + log() << "Non-retryable error occured during cloner " << getClonerName() + << " stage " + stage->getName() << ": " << lastError; + throw; + } + LOG(1) << "Transient error occured during cloner " << getClonerName() + << " stage " + stage->getName() << ": " << lastError; + } + } +} + Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) { { stdx::lock_guard<Latch> lk(_mutex); |