summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-08-29 18:26:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-29 19:12:05 +0000
commit382f12ccb49dac4ac5435b35543521f96e003207 (patch)
treea167a0e93282905dea22eb8e3536ca11dd589050 /src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
parenta8ff73d03f7b21764479524f28c5bbae857fdc6b (diff)
downloadmongo-382f12ccb49dac4ac5435b35543521f96e003207.tar.gz
SERVER-69197 Clear lastCommitted and currentCommitted on split
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp66
1 files changed, 35 insertions, 31 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 681baa5e13f..057d101a6ff 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -664,10 +664,10 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
.status_with_transitional_ignore();
}
-std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>>
-ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) {
+std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply(
+ const ReplSetConfig& config) {
if (!_settings.isServerless() || !config.isSplitConfig()) {
- return {config, boost::none};
+ return {config, false};
}
stdx::unique_lock<Latch> lk(_mutex);
@@ -684,12 +684,12 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) {
});
if (foundSelfInMembers) {
- return {config, boost::none};
+ return {config, false};
}
return {Status(ErrorCodes::NotYetInitialized,
"Cannot apply a split config if the current config is uninitialized"),
- boost::none};
+ false};
}
auto recipientConfig = config.getRecipientConfig();
@@ -697,25 +697,21 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) {
if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) {
if (selfMember.getNumVotes() > 0) {
return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"),
- boost::none};
+ false};
}
if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) {
return {Status(ErrorCodes::InvalidReplicaSetConfig,
"Cannot apply recipient config since current config and recipient "
"config have the same set name."),
- boost::none};
+ false};
}
- invariant(recipientConfig->getShardSplitBlockOpTime());
- auto shardSplitBlockOpTime = *recipientConfig->getShardSplitBlockOpTime();
- auto mutableConfig = recipientConfig->getMutable();
- mutableConfig.removeShardSplitBlockOpTime();
- return {ReplSetConfig(std::move(mutableConfig)), shardSplitBlockOpTime};
+ return {ReplSetConfig(*recipientConfig), true};
}
- return {config, boost::none};
+ return {config, false};
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
@@ -731,7 +727,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
return;
}
- const auto [swConfig, shardSplitBlockOpTime] = _resolveConfigToApply(newConfig);
+ const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig);
if (!swConfig.isOK()) {
LOGV2_WARNING(6234600,
"Ignoring new configuration in heartbeat response because it is invalid",
@@ -744,7 +740,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
const auto configToApply = swConfig.getValue();
- if (shardSplitBlockOpTime) {
+ if (isSplitRecipientConfig) {
LOGV2(6309200,
"Applying a recipient config for a shard split operation.",
"config"_attr = configToApply);
@@ -804,15 +800,17 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
configToApply.getConfigVersionAndTerm());
auto opCtx = cc().makeOperationContext();
- // Don't write the no-op for config learned via heartbeats.
- auto status = [&, isRecipientConfig = shardSplitBlockOpTime.has_value()]() {
- if (isRecipientConfig) {
- return _externalState->replaceLocalConfigDocument(opCtx.get(),
- configToApply.toBSON());
- } else {
- return _externalState->storeLocalConfigDocument(
- opCtx.get(), configToApply.toBSON(), false /* writeOplog */);
+ auto status = [this,
+ opCtx = opCtx.get(),
+ configToApply,
+ isSplitRecipientConfig = isSplitRecipientConfig]() {
+ if (isSplitRecipientConfig) {
+ return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON());
}
+
+ // Don't write the no-op for config learned via heartbeats.
+ return _externalState->storeLocalConfigDocument(
+ opCtx, configToApply.toBSON(), false /* writeOplog */);
}();
// Wait for durability of the new config document.
@@ -859,7 +857,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
shouldStartDataReplication = true;
}
- if (shardSplitBlockOpTime) {
+ if (isSplitRecipientConfig) {
// Donor access blockers are removed from donor nodes via the shard split op observer.
// Donor access blockers are removed from recipient nodes when the node applies the
// recipient config. When the recipient primary steps up it will delete its state
@@ -879,7 +877,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
"configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm());
}
- _heartbeatReconfigFinish(cbd, configToApply, myIndex, shardSplitBlockOpTime);
+ _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig);
// Start data replication after the config has been installed.
if (shouldStartDataReplication) {
@@ -910,7 +908,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
const executor::TaskExecutor::CallbackArgs& cbData,
const ReplSetConfig& newConfig,
StatusWith<int> myIndex,
- boost::optional<OpTime> shardSplitBlockOpTime) {
+ bool isSplitRecipientConfig) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
@@ -924,7 +922,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
->scheduleWorkAt(_replExecutor->now() + Milliseconds{10},
[=](const executor::TaskExecutor::CallbackArgs& cbData) {
_heartbeatReconfigFinish(
- cbData, newConfig, myIndex, shardSplitBlockOpTime);
+ cbData, newConfig, myIndex, isSplitRecipientConfig);
})
.status_with_transitional_ignore();
return;
@@ -949,7 +947,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
->onEvent(electionFinishedEvent,
[=](const executor::TaskExecutor::CallbackArgs& cbData) {
_heartbeatReconfigFinish(
- cbData, newConfig, myIndex, shardSplitBlockOpTime);
+ cbData, newConfig, myIndex, isSplitRecipientConfig);
})
.status_with_transitional_ignore();
return;
@@ -1002,7 +1000,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
invariant(_rsConfigState == kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() ||
- _selfIndex < 0 || shardSplitBlockOpTime);
+ _selfIndex < 0 || isSplitRecipientConfig);
if (!myIndex.isOK()) {
switch (myIndex.getStatus().code()) {
@@ -1040,8 +1038,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
const PostMemberStateUpdateAction action =
_setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue);
- if (shardSplitBlockOpTime) {
- _topCoord->resetLastCommittedOpTime(*shardSplitBlockOpTime);
+ if (isSplitRecipientConfig) {
+ LOGV2(8423364,
+ "Clearing the commit point and current committed snapshot after applying split "
+ "recipient config.");
+ // Clear lastCommittedOpTime by passing in a default constructed OpTimeAndWallTime, and
+ // indicating that this is `forInitiate`.
+ _topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true);
+ _clearCommittedSnapshot_inlock();
}
lk.unlock();