summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/base_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/base_cloner.cpp')
-rw-r--r--src/mongo/db/repl/base_cloner.cpp59
1 files changed, 55 insertions, 4 deletions
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp
index 1e1fdc45a19..02f8b1af48a 100644
--- a/src/mongo/db/repl/base_cloner.cpp
+++ b/src/mongo/db/repl/base_cloner.cpp
@@ -32,6 +32,8 @@
#include "mongo/platform/basic.h"
#include "mongo/db/repl/base_cloner.h"
+#include "mongo/db/repl/replication_consistency_markers_gen.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/logv2/log.h"
#include "mongo/util/scopeguard.h"
@@ -183,6 +185,54 @@ void BaseCloner::clearRetryingState() {
_retryableOp = boost::none;
}
+Status BaseCloner::checkSyncSourceIsStillValid() {
+ WireVersion wireVersion;
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk);
+ // The wire version should always have been set by the time this is called.
+ invariant(wireVersionOpt);
+ wireVersion = *wireVersionOpt;
+ }
+ if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) {
+ auto status = checkInitialSyncIdIsUnchanged();
+ if (!status.isOK())
+ return status;
+ }
+ return checkRollBackIdIsUnchanged();
+}
+
+Status BaseCloner::checkInitialSyncIdIsUnchanged() {
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Sync source was downgraded and no longer supports resumable initial sync",
+ getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC);
+ BSONObj initialSyncId;
+ try {
+ initialSyncId = getClient()->findOne(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query());
+ } catch (DBException& e) {
+ if (ErrorCodes::isRetriableError(e)) {
+ auto status = e.toStatus().withContext(
+ ": failed while attempting to retrieve initial sync ID after re-connect");
+ LOGV2_DEBUG(
+ 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status);
+ return status;
+ }
+ throw;
+ }
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Cannot retrieve sync source initial sync ID",
+ !initialSyncId.isEmpty());
+ InitialSyncIdDocument initialSyncIdDoc =
+ InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId);
+
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Sync source has been resynced since we started syncing from it",
+ _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id());
+ return Status::OK();
+}
+
Status BaseCloner::checkRollBackIdIsUnchanged() {
BSONObj info;
try {
@@ -267,12 +317,13 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage*
}
},
isThisStageFailPoint);
- if (stage->checkRollBackIdOnRetry()) {
- // If checkRollBackIdIsUnchanged fails without throwing, it means a network
+ if (stage->checkSyncSourceValidityOnRetry()) {
+ // If checkSyncSourceIsStillValid fails without throwing, it means a network
// error occurred and it's safe to continue (which will cause another retry).
- if (!checkRollBackIdIsUnchanged().isOK())
+ if (!checkSyncSourceIsStillValid().isOK())
continue;
- // After successfully checking the rollback ID, the client should always be OK.
+ // After successfully checking the sync source validity, the client should
+ // always be OK.
invariant(!getClient()->isFailed());
}
}