diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 100 |
1 files changed, 61 insertions, 39 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 057d101a6ff..a8ef444f31d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -659,7 +659,44 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, _selfIndex < 0); _replExecutor ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { - _heartbeatReconfigStore(cbData, newConfig); + const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); + if (!swConfig.isOK()) { + LOGV2_WARNING( + 6234600, + "Ignoring new configuration in heartbeat response because it is invalid", + "status"_attr = swConfig.getStatus()); + + stdx::lock_guard<Latch> lg(_mutex); + invariant(_rsConfigState == kConfigHBReconfiguring); + _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized + : kConfigSteady); + return; + } + + if (MONGO_likely(!isSplitRecipientConfig)) { + _heartbeatReconfigStore(cbData, newConfig); + return; + } + + LOGV2(8423366, "Waiting for oplog buffer to drain before applying recipient config."); + _drainForShardSplit().getAsync( + [this, + resolvedConfig = swConfig.getValue(), + replExecutor = _replExecutor.get(), + isSplitRecipientConfig = isSplitRecipientConfig](Status status) { + if (!status.isOK()) { + stdx::lock_guard<Latch> lg(_mutex); + _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized + : kConfigSteady); + return; + } + + replExecutor + ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigStore(cbData, resolvedConfig, isSplitRecipientConfig); + }) + .status_with_transitional_ignore(); + }); }) .status_with_transitional_ignore(); } @@ -676,14 +713,8 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve lk.unlock(); // If this node is listed in the members of incoming config, accept the config. - const auto foundSelfInMembers = std::any_of( - config.membersBegin(), - config.membersEnd(), - [externalState = _externalState.get()](const MemberConfig& config) { - return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext()); - }); - - if (foundSelfInMembers) { + auto swSelfIndex = findSelfInConfig(_externalState.get(), config, cc().getServiceContext()); + if (swSelfIndex.isOK()) { return {config, false}; } @@ -707,7 +738,6 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve false}; } - return {ReplSetConfig(*recipientConfig), true}; } @@ -715,7 +745,9 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( - const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { + const executor::TaskExecutor::CallbackArgs& cbd, + const ReplSetConfig& newConfig, + bool isSplitRecipientConfig) { if (cbd.status.code() == ErrorCodes::CallbackCanceled) { LOGV2(21480, @@ -727,23 +759,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); - if (!swConfig.isOK()) { - LOGV2_WARNING(6234600, - "Ignoring new configuration in heartbeat response because it is invalid", - "status"_attr = swConfig.getStatus()); - - stdx::lock_guard<Latch> lg(_mutex); - invariant(_rsConfigState == kConfigHBReconfiguring); - _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady); - return; - } - - const auto configToApply = swConfig.getValue(); if (isSplitRecipientConfig) { LOGV2(6309200, "Applying a recipient config for a shard split operation.", - "config"_attr = configToApply); + "config"_attr = newConfig); } const auto myIndex = [&]() -> StatusWith<int> { @@ -751,24 +770,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( // recover from transient DNS errors. { stdx::lock_guard<Latch> lk(_mutex); - if (_selfIndex >= 0 && sameConfigContents(_rsConfig, configToApply)) { + if (_selfIndex >= 0 && sameConfigContents(_rsConfig, newConfig)) { LOGV2_FOR_HEARTBEATS(6351200, 2, "New heartbeat config is only a version/term change, skipping " "validation checks", "oldConfig"_attr = _rsConfig, - "newConfig"_attr = configToApply); + "newConfig"_attr = newConfig); // If the configs are the same, so is our index. return _selfIndex; } } return validateConfigForHeartbeatReconfig( - _externalState.get(), configToApply, getGlobalServiceContext()); + _externalState.get(), newConfig, cc().getServiceContext()); }(); if (myIndex.getStatus() == ErrorCodes::NodeNotFound) { stdx::lock_guard<Latch> lk(_mutex); - // If this node absent in configToApply, and this node was not previously initialized, + // If this node absent in newConfig, and this node was not previously initialized, // return to kConfigUninitialized immediately, rather than storing the config and // transitioning into the RS_REMOVED state. See SERVER-15740. if (!_rsConfig.isInitialized()) { @@ -793,24 +812,23 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } else { LOGV2_FOR_HEARTBEATS(4615626, 2, - "Config with {configToApplyVersionAndTerm} validated for " + "Config with {newConfigVersionAndTerm} validated for " "reconfig; persisting to disk.", "Config validated for reconfig; persisting to disk", - "configToApplyVersionAndTerm"_attr = - configToApply.getConfigVersionAndTerm()); + "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); auto opCtx = cc().makeOperationContext(); auto status = [this, opCtx = opCtx.get(), - configToApply, + newConfig, isSplitRecipientConfig = isSplitRecipientConfig]() { if (isSplitRecipientConfig) { - return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON()); + return _externalState->replaceLocalConfigDocument(opCtx, newConfig.toBSON()); } // Don't write the no-op for config learned via heartbeats. return _externalState->storeLocalConfigDocument( - opCtx, configToApply.toBSON(), false /* writeOplog */); + opCtx, newConfig.toBSON(), false /* writeOplog */); }(); // Wait for durability of the new config document. @@ -847,7 +865,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && - configToApply.getMemberAt(myIndex.getValue()).isArbiter(); + newConfig.getMemberAt(myIndex.getValue()).isArbiter(); if (isArbiter) { ReplicaSetAwareServiceRegistry::get(_service).onBecomeArbiter(); @@ -871,13 +889,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( LOGV2_FOR_HEARTBEATS( 4615627, 2, - "New configuration with {configToApplyVersionAndTerm} persisted " + "New configuration with {newConfigVersionAndTerm} persisted " "to local storage; installing new config in memory", "New configuration persisted to local storage; installing new config in memory", - "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); + "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); + _heartbeatReconfigFinish(cbd, newConfig, myIndex, isSplitRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -1046,6 +1064,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // indicating that this is `forInitiate`. _topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true); _clearCommittedSnapshot_inlock(); + + invariant(_applierState == ApplierState::Stopped); + _applierState = ApplierState::Running; + _externalState->startProducerIfStopped(); } lk.unlock(); |