diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-05-03 21:09:53 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-04 00:04:15 +0000 |
commit | c9260ac7135fc183f2a785124d033debc63e2734 (patch) | |
tree | 54e89230d28035db1c29557300de6710b1938952 /src | |
parent | c1e3a92b0e63d9419d7e3d7a76b95b50976b15d3 (diff) | |
download | mongo-c9260ac7135fc183f2a785124d033debc63e2734.tar.gz |
SERVER-66083 Require rsConfig initialization to accept split config
Diffstat (limited to 'src')
3 files changed, 172 insertions, 76 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index fafe33221b9..f10b0315cf6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1339,14 +1339,9 @@ private: void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig); /** - * Check if the node should use the recipientConfig contained within newConfig. + * Determines if the provided config is a split config, and validates it for installation. */ - bool _shouldUseRecipientConfig(WithLock lk, const ReplSetConfig& newConfig); - - /** - * Check if the recipient config provided can be applied to the current node. - */ - Status _isRecipientConfigValid(WithLock lk, const ReplSetConfig& newConfig); + std::tuple<StatusWith<ReplSetConfig>, bool> _resolveConfigToApply(const ReplSetConfig& config); /** * Method to write a configuration transmitted via heartbeat message to stable storage. @@ -1360,7 +1355,7 @@ private: void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - bool isSplitRecipientConfig); + bool isRecipientConfig); /** * Calculates the time (in millis) left in quiesce mode and converts the value to int64. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 16e04bc8392..967b5352da8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -652,33 +652,56 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, .status_with_transitional_ignore(); } -bool ReplicationCoordinatorImpl::_shouldUseRecipientConfig(WithLock lk, - const ReplSetConfig& newConfig) { - const auto& member = _rsConfig.getMemberAt(_selfIndex); +std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply( + const ReplSetConfig& config) { + stdx::unique_lock<Latch> lk(_mutex); + if (config.isSplitConfig()) { + if (!_rsConfig.isInitialized()) { + // Unlock the lock because isSelf performs network I/O. + lk.unlock(); - return newConfig.getRecipientConfig()->findMemberByHostAndPort(member.getHostAndPort()); -} + // If this node is listed in the members of incoming config, accept the config. + const auto foundSelfInMembers = + std::any_of(config.membersBegin(), + config.membersEnd(), + [externalState = _externalState.get()](const MemberConfig& config) { + return externalState->isSelf(config.getHostAndPort(), + getGlobalServiceContext()); + }); + + if (foundSelfInMembers) { + return {config, false}; + } -Status ReplicationCoordinatorImpl::_isRecipientConfigValid(WithLock lk, - const ReplSetConfig& newConfig) { - const auto& member = _rsConfig.getMemberAt(_selfIndex); + return {Status(ErrorCodes::NotYetInitialized, + "Cannot apply a split config if the current config is uninitialized"), + false}; + } - if (member.getNumVotes() != 0 || member.getPriority() != 0) { - return Status(ErrorCodes::BadValue, - "Cannot apply split config to a node with non-zero vote or priority"); - } + auto recipientConfig = config.getRecipientConfig(); + const auto& selfMember = _rsConfig.getMemberAt(_selfIndex); + if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { + if (selfMember.getNumVotes() > 0) { + return { + Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), + false}; + } - if (!_rsConfig.isInitialized()) { - return Status(ErrorCodes::NotYetInitialized, - "Cannot apply a split config if the current config is uninitialized"); - } + if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { + return {Status(ErrorCodes::InvalidReplicaSetConfig, + "Cannot apply recipient config since current config and recipient " + "config have the same set name."), + false}; + } - if (_rsConfig.getReplSetName() == newConfig.getRecipientConfig()->getReplSetName()) { - return Status(ErrorCodes::InvalidReplicaSetConfig, - "The current config and recipient config cannot have the same set name."); + auto mutableConfig = recipientConfig->getMutable(); + mutableConfig.setConfigVersion(1); + mutableConfig.setConfigTerm(1); + return {ReplSetConfig(std::move(mutableConfig)), true}; + } } - return Status::OK(); + return {config, false}; } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( @@ -694,42 +717,22 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [isSplitRecipientConfig, - statusWithConf] = [&]() -> std::tuple<bool, StatusWith<ReplSetConfig>> { - if (!newConfig.isSplitConfig()) { - return std::make_tuple(false, newConfig); - } + const auto [swConfig, isRecipientConfig] = _resolveConfigToApply(newConfig); + if (!swConfig.isOK()) { + LOGV2_WARNING(6234600, + "Ignoring new configuration in heartbeat response because it is invalid", + "status"_attr = swConfig.getStatus()); stdx::lock_guard<Latch> lg(_mutex); - if (!_shouldUseRecipientConfig(lg, newConfig)) { - return std::make_tuple(false, newConfig); - } - - auto status = _isRecipientConfigValid(lg, newConfig); - if (!status.isOK()) { - return std::make_tuple(false, status); - } - - auto mutableConfig = newConfig.getRecipientConfig()->getMutable(); - mutableConfig.setConfigVersion(1); - mutableConfig.setConfigTerm(1); - auto config = ReplSetConfig(std::move(mutableConfig)); - - return std::make_tuple(true, config); - }(); - - if (!statusWithConf.isOK()) { - LOGV2_WARNING(6234600, - "Not persisting new configuration in heartbeat response to disk because " - "it is invalid", - "conf"_attr = statusWithConf.getStatus()); + invariant(_rsConfigState == kConfigHBReconfiguring); + _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady); return; } - const auto configToApply = statusWithConf.getValue(); - if (isSplitRecipientConfig) { + const auto configToApply = swConfig.getValue(); + if (isRecipientConfig) { LOGV2(6309200, - "Applying a recipient split config for a shard split operation.", + "Applying a recipient config for a shard split operation.", "config"_attr = configToApply); } @@ -788,15 +791,15 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( auto opCtx = cc().makeOperationContext(); // Don't write the no-op for config learned via heartbeats. - auto status = - [&, isSplitRecipientConfig = isSplitRecipientConfig, config = configToApply]() { - if (isSplitRecipientConfig) { - return _externalState->replaceLocalConfigDocument(opCtx.get(), config.toBSON()); - } else { - return _externalState->storeLocalConfigDocument( - opCtx.get(), config.toBSON(), false /* writeOplog */); - } - }(); + auto status = [&, isRecipientConfig = isRecipientConfig]() { + if (isRecipientConfig) { + return _externalState->replaceLocalConfigDocument(opCtx.get(), + configToApply.toBSON()); + } else { + return _externalState->storeLocalConfigDocument( + opCtx.get(), configToApply.toBSON(), false /* writeOplog */); + } + }(); // Wait for durability of the new config document. try { @@ -851,7 +854,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); + _heartbeatReconfigFinish(cbd, configToApply, myIndex, isRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -882,7 +885,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - const bool isSplitRecipientConfig) { + const bool isRecipientConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -896,7 +899,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, isSplitRecipientConfig); + cbData, newConfig, myIndex, isRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -921,7 +924,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->onEvent(electionFinishedEvent, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, isSplitRecipientConfig); + cbData, newConfig, myIndex, isRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -974,7 +977,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || - _selfIndex < 0 || isSplitRecipientConfig); + _selfIndex < 0 || isRecipientConfig); if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 14f8d5ebdc5..3e962036710 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -92,7 +92,9 @@ protected: NetworkInterfaceMock::NetworkOperationIterator performSyncToFinishReconfigHeartbeat(); - void processResponseFromPrimary(const ReplSetConfig& config); + void processResponseFromPrimary(const ReplSetConfig& config, + long long version = -2, + long long term = OpTime::kInitialTerm); }; void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) { @@ -134,7 +136,9 @@ ReplCoordHBV1Test::performSyncToFinishReconfigHeartbeat() { return getNet()->getNextReadyRequest(); } -void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config) { +void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config, + long long version, + long long term) { NetworkInterfaceMock* net = getNet(); const Date_t startDate = getNet()->now(); @@ -144,8 +148,8 @@ void ReplCoordHBV1Test::processResponseFromPrimary(const ReplSetConfig& config) ReplSetHeartbeatArgsV1 hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); ASSERT_EQUALS("mySet", hbArgs.getSetName()); - ASSERT_EQUALS(-2, hbArgs.getConfigVersion()); - ASSERT_EQUALS(OpTime::kInitialTerm, hbArgs.getTerm()); + ASSERT_EQUALS(version, hbArgs.getConfigVersion()); + ASSERT_EQUALS(term, hbArgs.getTerm()); ReplSetHeartbeatResponse hbResp; hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_PRIMARY); @@ -251,6 +255,100 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } +TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) { + ReplSetConfig rsConfig = + assertMakeRSConfig(BSON("_id" + << "mySet" + << "version" << 3 << "term" << 1 << "protocolVersion" << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1")) + << "recipientConfig" + << BSON("_id" + << "recipientSet" + << "version" << 1 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h4:1") + << BSON("_id" << 2 << "host" + << "h5:1") + << BSON("_id" << 3 << "host" + << "h6:1"))))); + + ReplSettings settings; + settings.setServerlessMode(); + init(settings); + + // Start by adding self as one of the recipient nodes + start(HostAndPort("h5", 1)); + + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + processResponseFromPrimary(rsConfig); + assertMemberState(MemberState::RS_STARTUP); + exitNetwork(); +} + +TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbeat) { + ReplSetConfig rsConfig = + assertMakeRSConfig(BSON("_id" + << "mySet" + << "version" << 3 << "term" << 1 << "protocolVersion" << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1")) + << "recipientConfig" + << BSON("_id" + << "recipientSet" + << "version" << 1 << "term" << 1 << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h4:1") + << BSON("_id" << 2 << "host" + << "h5:1") + << BSON("_id" << 3 << "host" + << "h6:1"))))); + + ReplSettings settings; + settings.setServerlessMode(); + init(settings); + start(HostAndPort("h3", 1)); + + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP); + ASSERT_FALSE(getNet()->hasReadyRequests()); + exitNetwork(); + + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + processResponseFromPrimary(rsConfig); + performSyncToFinishReconfigHeartbeat(); + assertMemberState(MemberState::RS_STARTUP2); + OperationContextNoop opCtx; + auto storedConfig = ReplSetConfig::parse( + unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx))); + ASSERT_OK(storedConfig.validate()); + ASSERT_EQUALS(3, storedConfig.getConfigVersion()); + ASSERT_EQUALS(3, storedConfig.getNumMembers()); + ASSERT_EQUALS("mySet", storedConfig.getReplSetName()); + exitNetwork(); + + ASSERT_TRUE(getExternalState()->threadsStarted()); +} + TEST_F(ReplCoordHBV1Test, RestartingHeartbeatsShouldOnlyCancelScheduledHeartbeats) { auto replAllSeverityGuard = unittest::MinimumLoggedSeverityGuard{ logv2::LogComponent::kReplication, logv2::LogSeverity::Debug(3)}; |