summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-05-03 21:09:53 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-04 00:04:15 +0000
commitc9260ac7135fc183f2a785124d033debc63e2734 (patch)
tree54e89230d28035db1c29557300de6710b1938952 /src
parentc1e3a92b0e63d9419d7e3d7a76b95b50976b15d3 (diff)
downloadmongo-c9260ac7135fc183f2a785124d033debc63e2734.tar.gz
SERVER-66083 Require rsConfig initialization to accept split config
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp131
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp106
3 files changed, 172 insertions, 76 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index fafe33221b9..f10b0315cf6 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1339,14 +1339,9 @@ private:
void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig);
/**
- * Check if the node should use the recipientConfig contained within newConfig.
+ * Determines if the provided config is a split config, and validates it for installation.
*/
- bool _shouldUseRecipientConfig(WithLock lk, const ReplSetConfig& newConfig);
-
- /**
- * Check if the recipient config provided can be applied to the current node.
- */
- Status _isRecipientConfigValid(WithLock lk, const ReplSetConfig& newConfig);
+ std::tuple<StatusWith<ReplSetConfig>, bool> _resolveConfigToApply(const ReplSetConfig& config);
/**
* Method to write a configuration transmitted via heartbeat message to stable storage.
@@ -1360,7 +1355,7 @@ private:
void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
StatusWith<int> myIndex,
- bool isSplitRecipientConfig);
+ bool isRecipientConfig);
/**
* Calculates the time (in millis) left in quiesce mode and converts the value to int64.
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 16e04bc8392..967b5352da8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -652,33 +652,56 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
.status_with_transitional_ignore();
}
-bool ReplicationCoordinatorImpl::_shouldUseRecipientConfig(WithLock lk,
- const ReplSetConfig& newConfig) {
- const auto& member = _rsConfig.getMemberAt(_selfIndex);
+std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply(
+ const ReplSetConfig& config) {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (config.isSplitConfig()) {
+ if (!_rsConfig.isInitialized()) {
+ // Unlock the lock because isSelf performs network I/O.
+ lk.unlock();
- return newConfig.getRecipientConfig()->findMemberByHostAndPort(member.getHostAndPort());
-}
+ // If this node is listed in the members of incoming config, accept the config.
+ const auto foundSelfInMembers =
+ std::any_of(config.membersBegin(),
+ config.membersEnd(),
+ [externalState = _externalState.get()](const MemberConfig& config) {
+ return externalState->isSelf(config.getHostAndPort(),
+ getGlobalServiceContext());
+ });
+
+ if (foundSelfInMembers) {
+ return {config, false};
+ }
-Status ReplicationCoordinatorImpl::_isRecipientConfigValid(WithLock lk,
- const ReplSetConfig& newConfig) {
- const auto& member = _rsConfig.getMemberAt(_selfIndex);
+ return {Status(ErrorCodes::NotYetInitialized,
+ "Cannot apply a split config if the current config is uninitialized"),
+ false};
+ }
- if (member.getNumVotes() != 0 || member.getPriority() != 0) {
- return Status(ErrorCodes::BadValue,
- "Cannot apply split config to a node with non-zero vote or priority");
- }
+ auto recipientConfig = config.getRecipientConfig();
+ const auto& selfMember = _rsConfig.getMemberAt(_selfIndex);
+ if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) {
+ if (selfMember.getNumVotes() > 0) {
+ return {
+ Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"),
+ false};
+ }
- if (!_rsConfig.isInitialized()) {
- return Status(ErrorCodes::NotYetInitialized,
- "Cannot apply a split config if the current config is uninitialized");
- }
+ if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) {
+ return {Status(ErrorCodes::InvalidReplicaSetConfig,
+ "Cannot apply recipient config since current config and recipient "
+ "config have the same set name."),
+ false};
+ }
- if (_rsConfig.getReplSetName() == newConfig.getRecipientConfig()->getReplSetName()) {
- return Status(ErrorCodes::InvalidReplicaSetConfig,
- "The current config and recipient config cannot have the same set name.");
+ auto mutableConfig = recipientConfig->getMutable();
+ mutableConfig.setConfigVersion(1);
+ mutableConfig.setConfigTerm(1);
+ return {ReplSetConfig(std::move(mutableConfig)), true};
+ }
}
- return Status::OK();
+ return {config, false};
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
@@ -694,42 +717,22 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
return;
}
- const auto [isSplitRecipientConfig,
- statusWithConf] = [&]() -> std::tuple<bool, StatusWith<ReplSetConfig>> {
- if (!newConfig.isSplitConfig()) {
- return std::make_tuple(false, newConfig);
- }
+ const auto [swConfig, isRecipientConfig] = _resolveConfigToApply(newConfig);
+ if (!swConfig.isOK()) {
+ LOGV2_WARNING(6234600,
+ "Ignoring new configuration in heartbeat response because it is invalid",
+ "status"_attr = swConfig.getStatus());
stdx::lock_guard<Latch> lg(_mutex);
- if (!_shouldUseRecipientConfig(lg, newConfig)) {
- return std::make_tuple(false, newConfig);
- }
-
- auto status = _isRecipientConfigValid(lg, newConfig);
- if (!status.isOK()) {
- return std::make_tuple(false, status);
- }
-
- auto mutableConfig = newConfig.getRecipientConfig()->getMutable();
- mutableConfig.setConfigVersion(1);
- mutableConfig.setConfigTerm(1);
- auto config = ReplSetConfig(std::move(mutableConfig));
-
- return std::make_tuple(true, config);
- }();
-
- if (!statusWithConf.isOK()) {
- LOGV2_WARNING(6234600,
- "Not persisting new configuration in heartbeat response to disk because "
- "it is invalid",
- "conf"_attr = statusWithConf.getStatus());
+ invariant(_rsConfigState == kConfigHBReconfiguring);
+ _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady);
return;
}
- const auto configToApply = statusWithConf.getValue();
- if (isSplitRecipientConfig) {
+ const auto configToApply = swConfig.getValue();
+ if (isRecipientConfig) {
LOGV2(6309200,
- "Applying a recipient split config for a shard split operation.",
+ "Applying a recipient config for a shard split operation.",
"config"_attr = configToApply);
}
@@ -788,15 +791,15 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
auto opCtx = cc().makeOperationContext();
// Don't write the no-op for config learned via heartbeats.
- auto status =
- [&, isSplitRecipientConfig = isSplitRecipientConfig, config = configToApply]() {
- if (isSplitRecipientConfig) {
- return _externalState->replaceLocalConfigDocument(opCtx.get(), config.toBSON());
- } else {
- return _externalState->storeLocalConfigDocument(
- opCtx.get(), config.toBSON(), false /* writeOplog */);
- }
- }();
+ auto status = [&, isRecipientConfig = isRecipientConfig]() {
+ if (isRecipientConfig) {
+ return _externalState->replaceLocalConfigDocument(opCtx.get(),
+ configToApply.toBSON());
+ } else {
+ return _externalState->storeLocalConfigDocument(
+ opCtx.get(), configToApply.toBSON(), false /* writeOplog */);
+ }
+ }();
// Wait for durability of the new config document.
try {
@@ -851,7 +854,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
"configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm());
}
- _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig);
+ _heartbeatReconfigFinish(cbd, configToApply, myIndex, isRecipientConfig);
// Start data replication after the config has been installed.
if (shouldStartDataReplication) {
@@ -882,7 +885,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
StatusWith<int> myIndex,
- const bool isSplitRecipientConfig) {
+ const bool isRecipientConfig) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
@@ -896,7 +899,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
->scheduleWorkAt(_replExecutor->now() + Milliseconds{10},
[=](const executor::TaskExecutor::CallbackArgs& cbData) {
_heartbeatReconfigFinish(
- cbData, newConfig, myIndex, isSplitRecipientConfig);
+ cbData, newConfig, myIndex, isRecipientConfig);
})
.status_with_transitional_ignore();
return;
@@ -921,7 +924,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
->onEvent(electionFinishedEvent,
[=](const executor::TaskExecutor::CallbackArgs& cbData) {
_heartbeatReconfigFinish(
- cbData, newConfig, myIndex, isSplitRecipientConfig);
+ cbData, newConfig, myIndex, isRecipientConfig);
})
.status_with_transitional_ignore();
return;
@@ -974,7 +977,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() ||
- _selfIndex < 0 || isSplitRecipientConfig);
+ _selfIndex < 0 || isRecipientConfig);
if (!myIndex.isOK()) {
switch (myIndex.getStatus().code()) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 14f8d5ebdc5..3e962036710 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -92,7 +92,9 @@ protected:
NetworkInterfaceMock::NetworkOperationIterator performSyncToFinishReconfigHeartbeat();
- void processResponseFromPrimary(const ReplSetConfig& config);
+ void processResponseFromPrimary(const ReplSetConfig& config,
+ long long version = -2,
+ long long term = OpTime::kInitialTerm);
};
void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) {
@@ -134,7 +136,9 @@ ReplCoordHBV1Test::performSyncToFinishReconfigHeartbeat() {
return getNet()->getNextReadyRequest();
}
-void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config) {
+void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config,
+ long long version,
+ long long term) {
NetworkInterfaceMock* net = getNet();
const Date_t startDate = getNet()->now();
@@ -144,8 +148,8 @@ void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config)
ReplSetHeartbeatArgsV1 hbArgs;
ASSERT_OK(hbArgs.initialize(request.cmdObj));
ASSERT_EQUALS("mySet", hbArgs.getSetName());
- ASSERT_EQUALS(-2, hbArgs.getConfigVersion());
- ASSERT_EQUALS(OpTime::kInitialTerm, hbArgs.getTerm());
+ ASSERT_EQUALS(version, hbArgs.getConfigVersion());
+ ASSERT_EQUALS(term, hbArgs.getTerm());
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_PRIMARY);
@@ -251,6 +255,100 @@ TEST_F(ReplCoordHBV1Test,
ASSERT_TRUE(getExternalState()->threadsStarted());
}
+TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) {
+ ReplSetConfig rsConfig =
+ assertMakeRSConfig(BSON("_id"
+ << "mySet"
+ << "version" << 3 << "term" << 1 << "protocolVersion" << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "h1:1")
+ << BSON("_id" << 2 << "host"
+ << "h2:1")
+ << BSON("_id" << 3 << "host"
+ << "h3:1"))
+ << "recipientConfig"
+ << BSON("_id"
+ << "recipientSet"
+ << "version" << 1 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "h4:1")
+ << BSON("_id" << 2 << "host"
+ << "h5:1")
+ << BSON("_id" << 3 << "host"
+ << "h6:1")))));
+
+ ReplSettings settings;
+ settings.setServerlessMode();
+ init(settings);
+
+ // Start by adding self as one of the recipient nodes
+ start(HostAndPort("h5", 1));
+
+ enterNetwork();
+ assertMemberState(MemberState::RS_STARTUP);
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ exitNetwork();
+
+ receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+
+ enterNetwork();
+ processResponseFromPrimary(rsConfig);
+ assertMemberState(MemberState::RS_STARTUP);
+ exitNetwork();
+}
+
+TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbeat) {
+ ReplSetConfig rsConfig =
+ assertMakeRSConfig(BSON("_id"
+ << "mySet"
+ << "version" << 3 << "term" << 1 << "protocolVersion" << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "h1:1")
+ << BSON("_id" << 2 << "host"
+ << "h2:1")
+ << BSON("_id" << 3 << "host"
+ << "h3:1"))
+ << "recipientConfig"
+ << BSON("_id"
+ << "recipientSet"
+ << "version" << 1 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "h4:1")
+ << BSON("_id" << 2 << "host"
+ << "h5:1")
+ << BSON("_id" << 3 << "host"
+ << "h6:1")))));
+
+ ReplSettings settings;
+ settings.setServerlessMode();
+ init(settings);
+ start(HostAndPort("h3", 1));
+
+ enterNetwork();
+ assertMemberState(MemberState::RS_STARTUP);
+ ASSERT_FALSE(getNet()->hasReadyRequests());
+ exitNetwork();
+
+ receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+
+ enterNetwork();
+ processResponseFromPrimary(rsConfig);
+ performSyncToFinishReconfigHeartbeat();
+ assertMemberState(MemberState::RS_STARTUP2);
+ OperationContextNoop opCtx;
+ auto storedConfig = ReplSetConfig::parse(
+ unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx)));
+ ASSERT_OK(storedConfig.validate());
+ ASSERT_EQUALS(3, storedConfig.getConfigVersion());
+ ASSERT_EQUALS(3, storedConfig.getNumMembers());
+ ASSERT_EQUALS("mySet", storedConfig.getReplSetName());
+ exitNetwork();
+
+ ASSERT_TRUE(getExternalState()->threadsStarted());
+}
+
TEST_F(ReplCoordHBV1Test, RestartingHeartbeatsShouldOnlyCancelScheduledHeartbeats) {
auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{
logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)};