diff options
Diffstat (limited to 'src/mongo/db/repl/base_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/base_cloner.cpp | 59 |
1 files changed, 55 insertions, 4 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 1e1fdc45a19..02f8b1af48a 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,6 +32,8 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/scopeguard.h" @@ -183,6 +185,54 @@ void BaseCloner::clearRetryingState() { _retryableOp = boost::none; } +Status BaseCloner::checkSyncSourceIsStillValid() { + WireVersion wireVersion; + { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk); + // The wire version should always have been set by the time this is called. + invariant(wireVersionOpt); + wireVersion = *wireVersionOpt; + } + if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { + auto status = checkInitialSyncIdIsUnchanged(); + if (!status.isOK()) + return status; + } + return checkRollBackIdIsUnchanged(); +} + +Status BaseCloner::checkInitialSyncIdIsUnchanged() { + uassert(ErrorCodes::InitialSyncFailure, + "Sync source was downgraded and no longer supports resumable initial sync", + getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); + BSONObj initialSyncId; + try { + initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + } catch (DBException& e) { + if (ErrorCodes::isRetriableError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve initial sync ID after re-connect"); + LOGV2_DEBUG( + 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status); + return status; + } + throw; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + uassert(ErrorCodes::InitialSyncFailure, + "Sync source has been resynced since we started syncing from it", + _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id()); + return Status::OK(); +} + Status BaseCloner::checkRollBackIdIsUnchanged() { BSONObj info; try { @@ -267,12 +317,13 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* } }, isThisStageFailPoint); - if (stage->checkRollBackIdOnRetry()) { - // If checkRollBackIdIsUnchanged fails without throwing, it means a network + if (stage->checkSyncSourceValidityOnRetry()) { + // If checkSyncSourceIsStillValid fails without throwing, it means a network // error occurred and it's safe to continue (which will cause another retry). - if (!checkRollBackIdIsUnchanged().isOK()) + if (!checkSyncSourceIsStillValid().isOK()) continue; - // After successfully checking the rollback ID, the client should always be OK. + // After successfully checking the sync source validity, the client should + // always be OK. invariant(!getClient()->isFailed()); } } |