summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/base_cloner.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2019-11-07 22:17:43 +0000
committerevergreen <evergreen@mongodb.com>2019-11-07 22:17:43 +0000
commita484dd081e3d39e4f9034075f19f75f825c7e5ea (patch)
treebbe32536348be080fe5bea4db1565e3a0f914e5e /src/mongo/db/repl/base_cloner.cpp
parentaefce507386045fe6a77eebcbec567000a6a98e5 (diff)
downloadmongo-a484dd081e3d39e4f9034075f19f75f825c7e5ea.tar.gz
SERVER-43275 Implement retry-on-network-error logic in cloners, except for query.
Diffstat (limited to 'src/mongo/db/repl/base_cloner.cpp')
-rw-r--r--src/mongo/db/repl/base_cloner.cpp127
1 files changed, 118 insertions, 9 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp
index 9aca6c4350a..33f4905990b 100644
--- a/src/mongo/db/repl/base_cloner.cpp
+++ b/src/mongo/db/repl/base_cloner.cpp
@@ -32,11 +32,15 @@
#include "mongo/platform/basic.h"
#include "mongo/db/repl/base_cloner.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeClonerStage);
+MONGO_FAIL_POINT_DEFINE(hangBeforeRetryingClonerStage);
+MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage);
MONGO_FAIL_POINT_DEFINE(hangAfterClonerStage);
} // namespace
using executor::TaskExecutor;
@@ -129,8 +133,6 @@ void BaseCloner::pauseForFuzzer(BaseClonerStage* stage) {
}
BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
- // TODO(SERVER-43275): Implement retry logic here. Alternately, do the initial connection
- // in the retry logic, but make sure not to count the initial attempt as a "re-" try.
LOG(1) << "Cloner " << getClonerName() << " running stage " << stage->getName();
pauseForFuzzer(stage);
auto isThisStageFailPoint = [this, stage](const BSONObj& data) {
@@ -145,13 +147,7 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
}
},
isThisStageFailPoint);
- if (_client->isFailed()) {
- Status failed(ErrorCodes::HostUnreachable, "Client is disconnected");
- log() << "Failed because host " << getSource() << " is unreachable.";
- setInitialSyncFailedStatus(failed);
- uassertStatusOK(failed);
- }
- auto afterStageBehavior = stage->run();
+ auto afterStageBehavior = runStageWithRetries(stage);
hangAfterClonerStage.executeIf(
[&](const BSONObj& data) {
log() << "Cloner " << getClonerName() << " hanging after running stage "
@@ -165,6 +161,119 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) {
return afterStageBehavior;
}
+void BaseCloner::clearRetryingState() {
+ if (_retrying) {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ _sharedData->decrementRetryingOperations(lk, getClock());
+ _retrying = false;
+ }
+}
+
+Status BaseCloner::checkRollBackIdIsUnchanged() {
+ BSONObj info;
+ try {
+ getClient()->simpleCommand("admin", &info, "replSetGetRBID");
+ } catch (DBException& e) {
+ if (ErrorCodes::isNetworkError(e)) {
+ auto status = e.toStatus().withContext(
+ ": failed while attempting to retrieve rollBackId after re-connect");
+ LOG(1) << status;
+ return status;
+ }
+ throw;
+ }
+ uassert(
+ 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber());
+ auto rollBackId = info["rbid"].numberInt();
+ uassert(ErrorCodes::UnrecoverableRollbackError,
+ str::stream() << "Rollback occurred on our sync source " << getSource()
+ << " during initial sync",
+ rollBackId == _sharedData->getRollBackId());
+ return Status::OK();
+}
+
+BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* stage) {
+ ON_BLOCK_EXIT([this] { clearRetryingState(); });
+ Status lastError = Status::OK();
+ auto isThisStageFailPoint = [this, stage](const BSONObj& data) {
+ return data["stage"].str() == stage->getName() && isMyFailPoint(data);
+ };
+ while (true) {
+ try {
+ // mustExit is set when the clone has been canceled externally.
+ if (mustExit())
+ return kSkipRemainingStages;
+ if (!lastError.isOK()) {
+ // If lastError is set, this is a retry.
+ hangBeforeRetryingClonerStage.executeIf(
+ [&](const BSONObj& data) {
+ log() << "Cloner " << getClonerName() << " hanging before retrying stage "
+ << stage->getName();
+ while (!mustExit() &&
+ hangBeforeRetryingClonerStage.shouldFail(isThisStageFailPoint)) {
+ sleepmillis(100);
+ }
+ },
+ isThisStageFailPoint);
+ log() << "Initial Sync retrying " << getClonerName() << " stage "
+ << stage->getName() << " due to " << lastError;
+ auto retryPeriod = Seconds(initialSyncTransientErrorRetryPeriodSeconds.load());
+ Milliseconds outageDuration;
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ if (!_retrying) {
+ _retrying = true;
+ // This is the first retry for this stage in this run, so we start the
+ // clock on the outage by incrementing the retrying operations counter.
+ _sharedData->incrementRetryingOperations(lk, getClock());
+ }
+ outageDuration = _sharedData->getCurrentOutageDuration(lk, getClock());
+ if (outageDuration <= retryPeriod) {
+ _sharedData->incrementTotalRetries(lk);
+ }
+ }
+ if (outageDuration > retryPeriod) {
+ auto status = lastError.withContext(
+ str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds "
+ << retryPeriod);
+ setInitialSyncFailedStatus(status);
+ uassertStatusOK(status);
+ }
+ hangBeforeCheckingRollBackIdClonerStage.executeIf(
+ [&](const BSONObj& data) {
+ log() << "Cloner " << getClonerName()
+ << " hanging before checking rollBackId for stage "
+ << stage->getName();
+ while (!mustExit() &&
+ hangBeforeCheckingRollBackIdClonerStage.shouldFail(
+ isThisStageFailPoint)) {
+ sleepmillis(100);
+ }
+ },
+ isThisStageFailPoint);
+ if (stage->checkRollBackIdOnRetry()) {
+ // If checkRollBackIdIsUnchanged fails without throwing, it means a network
+ // error occurred and it's safe to continue (which will cause another retry).
+ if (!checkRollBackIdIsUnchanged().isOK())
+ continue;
+ // After successfully checking the rollback ID, the client should always be OK.
+ invariant(!getClient()->isFailed());
+ }
+ }
+ return stage->run();
+ } catch (DBException& e) {
+ lastError = e.toStatus();
+ if (!stage->isTransientError(lastError)) {
+ log() << "Non-retryable error occured during cloner " << getClonerName()
+ << " stage " + stage->getName() << ": " << lastError;
+ throw;
+ }
+ LOG(1) << "Transient error occured during cloner " << getClonerName()
+ << " stage " + stage->getName() << ": " << lastError;
+ }
+ }
+}
+
Future<void> BaseCloner::runOnExecutor(TaskExecutor* executor) {
{
stdx::lock_guard<Latch> lk(_mutex);