summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/all_database_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/all_database_cloner.cpp')
-rw-r--r--src/mongo/db/repl/all_database_cloner.cpp58
1 files changed, 54 insertions, 4 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp
index c1ed1686fc9..53fbbe297b3 100644
--- a/src/mongo/db/repl/all_database_cloner.cpp
+++ b/src/mongo/db/repl/all_database_cloner.cpp
@@ -35,6 +35,8 @@
#include "mongo/base/string_data.h"
#include "mongo/db/repl/all_database_cloner.h"
+#include "mongo/db/repl/replication_consistency_markers_gen.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -47,10 +49,11 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData,
ThreadPool* dbPool)
: BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool),
_connectStage("connect", this, &AllDatabaseCloner::connectStage),
+ _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage),
_listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {}
BaseCloner::ClonerStages AllDatabaseCloner::getStages() {
- return {&_connectStage, &_listDatabasesStage};
+ return {&_connectStage, &_getInitialSyncIdStage, &_listDatabasesStage};
}
Status AllDatabaseCloner::ensurePrimaryOrSecondary(
@@ -65,9 +68,12 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary(
// There is a window during startup where a node has an invalid configuration and will have
// an isMaster response the same as a removed node. So we must check to see if the node is
// removed by checking local configuration.
- auto otherNodes =
- ReplicationCoordinator::get(getGlobalServiceContext())->getOtherNodesInReplSet();
- if (std::find(otherNodes.begin(), otherNodes.end(), getSource()) == otherNodes.end()) {
+ auto memberData = ReplicationCoordinator::get(getGlobalServiceContext())->getMemberData();
+ auto syncSourceIter = std::find_if(
+ memberData.begin(), memberData.end(), [source = getSource()](const MemberData& member) {
+ return member.getHostAndPort() == source;
+ });
+ if (syncSourceIter == memberData.end()) {
Status status(ErrorCodes::NotMasterOrSecondary,
str::stream() << "Sync source " << getSource()
<< " has been removed from the replication configuration.");
@@ -76,6 +82,20 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary(
getSharedData()->setInitialSyncStatusIfOK(lk, status);
return status;
}
+
+ // We also check if the sync source has gone into initial sync itself. If so, we'll never be
+ // able to sync from it and we should abort the attempt. Because there is a window during
+ // startup where a node will report being in STARTUP2 even if it is not in initial sync,
+ // we also check to see if it has a sync source. A node in STARTUP2 will not have a sync
+ // source unless it is in initial sync.
+ if (syncSourceIter->getState().startup2() && !syncSourceIter->getSyncSource().empty()) {
+ Status status(ErrorCodes::NotMasterOrSecondary,
+ str::stream() << "Sync source " << getSource() << " has been resynced.");
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ // Setting the status in the shared data will cancel the initial sync.
+ getSharedData()->setInitialSyncStatusIfOK(lk, status);
+ return status;
+ }
return Status(ErrorCodes::NotMasterOrSecondary,
str::stream() << "Cannot connect because sync source " << getSource()
<< " is neither primary nor secondary.");
@@ -99,6 +119,36 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() {
return kContinueNormally;
}
+BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() {
+ auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion());
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setSyncSourceWireVersion(lk, wireVersion);
+ }
+
+ // Wire versions prior to resumable initial sync don't have a sync source id.
+ if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC)
+ return kContinueNormally;
+ auto initialSyncId = getClient()->findOne(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query());
+ // TODO(SERVER-47150): Remove this "if" and allow the uassert to fail.
+ if (initialSyncId.isEmpty()) {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setInitialSyncSourceId(lk, UUID::gen());
+ return kContinueNormally;
+ }
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Cannot retrieve sync source initial sync ID",
+ !initialSyncId.isEmpty());
+ InitialSyncIdDocument initialSyncIdDoc =
+ InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId);
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setInitialSyncSourceId(lk, initialSyncIdDoc.get_id());
+ }
+ return kContinueNormally;
+}
+
BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() {
BSONObj res;
auto databasesArray = getClient()->getDatabaseInfos(BSONObj(), true /* nameOnly */);