diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-08-29 18:26:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-29 19:12:05 +0000 |
commit | 382f12ccb49dac4ac5435b35543521f96e003207 (patch) | |
tree | a167a0e93282905dea22eb8e3536ca11dd589050 /src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | |
parent | a8ff73d03f7b21764479524f28c5bbae857fdc6b (diff) | |
download | mongo-382f12ccb49dac4ac5435b35543521f96e003207.tar.gz |
SERVER-69197 Clear lastCommitted and currentCommitted on split
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 66 |
1 files changed, 35 insertions, 31 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 681baa5e13f..057d101a6ff 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -664,10 +664,10 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, .status_with_transitional_ignore(); } -std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>> -ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { +std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply( + const ReplSetConfig& config) { if (!_settings.isServerless() || !config.isSplitConfig()) { - return {config, boost::none}; + return {config, false}; } stdx::unique_lock<Latch> lk(_mutex); @@ -684,12 +684,12 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { }); if (foundSelfInMembers) { - return {config, boost::none}; + return {config, false}; } return {Status(ErrorCodes::NotYetInitialized, "Cannot apply a split config if the current config is uninitialized"), - boost::none}; + false}; } auto recipientConfig = config.getRecipientConfig(); @@ -697,25 +697,21 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { if (selfMember.getNumVotes() > 0) { return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), - boost::none}; + false}; } if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { return {Status(ErrorCodes::InvalidReplicaSetConfig, "Cannot apply recipient config since current config and recipient " "config have the same set name."), - boost::none}; + false}; } - invariant(recipientConfig->getShardSplitBlockOpTime()); - auto shardSplitBlockOpTime = *recipientConfig->getShardSplitBlockOpTime(); - auto mutableConfig = recipientConfig->getMutable(); - mutableConfig.removeShardSplitBlockOpTime(); - return {ReplSetConfig(std::move(mutableConfig)), shardSplitBlockOpTime}; + return {ReplSetConfig(*recipientConfig), true}; } - return {config, boost::none}; + return {config, false}; } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( @@ -731,7 +727,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [swConfig, shardSplitBlockOpTime] = _resolveConfigToApply(newConfig); + const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); if (!swConfig.isOK()) { LOGV2_WARNING(6234600, "Ignoring new configuration in heartbeat response because it is invalid", @@ -744,7 +740,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } const auto configToApply = swConfig.getValue(); - if (shardSplitBlockOpTime) { + if (isSplitRecipientConfig) { LOGV2(6309200, "Applying a recipient config for a shard split operation.", "config"_attr = configToApply); @@ -804,15 +800,17 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( configToApply.getConfigVersionAndTerm()); auto opCtx = cc().makeOperationContext(); - // Don't write the no-op for config learned via heartbeats. - auto status = [&, isRecipientConfig = shardSplitBlockOpTime.has_value()]() { - if (isRecipientConfig) { - return _externalState->replaceLocalConfigDocument(opCtx.get(), - configToApply.toBSON()); - } else { - return _externalState->storeLocalConfigDocument( - opCtx.get(), configToApply.toBSON(), false /* writeOplog */); + auto status = [this, + opCtx = opCtx.get(), + configToApply, + isSplitRecipientConfig = isSplitRecipientConfig]() { + if (isSplitRecipientConfig) { + return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON()); } + + // Don't write the no-op for config learned via heartbeats. + return _externalState->storeLocalConfigDocument( + opCtx, configToApply.toBSON(), false /* writeOplog */); }(); // Wait for durability of the new config document. @@ -859,7 +857,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( shouldStartDataReplication = true; } - if (shardSplitBlockOpTime) { + if (isSplitRecipientConfig) { // Donor access blockers are removed from donor nodes via the shard split op observer. // Donor access blockers are removed from recipient nodes when the node applies the // recipient config. When the recipient primary steps up it will delete its state @@ -879,7 +877,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, shardSplitBlockOpTime); + _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -910,7 +908,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - boost::optional<OpTime> shardSplitBlockOpTime) { + bool isSplitRecipientConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -924,7 +922,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, shardSplitBlockOpTime); + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -949,7 +947,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->onEvent(electionFinishedEvent, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, shardSplitBlockOpTime); + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -1002,7 +1000,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || - _selfIndex < 0 || shardSplitBlockOpTime); + _selfIndex < 0 || isSplitRecipientConfig); if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { @@ -1040,8 +1038,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const PostMemberStateUpdateAction action = _setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue); - if (shardSplitBlockOpTime) { - _topCoord->resetLastCommittedOpTime(*shardSplitBlockOpTime); + if (isSplitRecipientConfig) { + LOGV2(8423364, + "Clearing the commit point and current committed snapshot after applying split " + "recipient config."); + // Clear lastCommittedOpTime by passing in a default constructed OpTimeAndWallTime, and + // indicating that this is `forInitiate`. + _topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true); + _clearCommittedSnapshot_inlock(); } lk.unlock(); |