summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp100
1 files changed, 61 insertions, 39 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 057d101a6ff..a8ef444f31d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -659,7 +659,44 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
_selfIndex < 0);
_replExecutor
->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) {
- _heartbeatReconfigStore(cbData, newConfig);
+ const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig);
+ if (!swConfig.isOK()) {
+ LOGV2_WARNING(
+ 6234600,
+ "Ignoring new configuration in heartbeat response because it is invalid",
+ "status"_attr = swConfig.getStatus());
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_rsConfigState == kConfigHBReconfiguring);
+ _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized
+ : kConfigSteady);
+ return;
+ }
+
+ if (MONGO_likely(!isSplitRecipientConfig)) {
+ _heartbeatReconfigStore(cbData, newConfig);
+ return;
+ }
+
+ LOGV2(8423366, "Waiting for oplog buffer to drain before applying recipient config.");
+ _drainForShardSplit().getAsync(
+ [this,
+ resolvedConfig = swConfig.getValue(),
+ replExecutor = _replExecutor.get(),
+ isSplitRecipientConfig = isSplitRecipientConfig](Status status) {
+ if (!status.isOK()) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized
+ : kConfigSteady);
+ return;
+ }
+
+ replExecutor
+ ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) {
+ _heartbeatReconfigStore(cbData, resolvedConfig, isSplitRecipientConfig);
+ })
+ .status_with_transitional_ignore();
+ });
})
.status_with_transitional_ignore();
}
@@ -676,14 +713,8 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
lk.unlock();
// If this node is listed in the members of incoming config, accept the config.
- const auto foundSelfInMembers = std::any_of(
- config.membersBegin(),
- config.membersEnd(),
- [externalState = _externalState.get()](const MemberConfig& config) {
- return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext());
- });
-
- if (foundSelfInMembers) {
+ auto swSelfIndex = findSelfInConfig(_externalState.get(), config, cc().getServiceContext());
+ if (swSelfIndex.isOK()) {
return {config, false};
}
@@ -707,7 +738,6 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
false};
}
-
return {ReplSetConfig(*recipientConfig), true};
}
@@ -715,7 +745,9 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
- const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) {
+ const executor::TaskExecutor::CallbackArgs& cbd,
+ const ReplSetConfig& newConfig,
+ bool isSplitRecipientConfig) {
if (cbd.status.code() == ErrorCodes::CallbackCanceled) {
LOGV2(21480,
@@ -727,23 +759,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
return;
}
- const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig);
- if (!swConfig.isOK()) {
- LOGV2_WARNING(6234600,
- "Ignoring new configuration in heartbeat response because it is invalid",
- "status"_attr = swConfig.getStatus());
-
- stdx::lock_guard<Latch> lg(_mutex);
- invariant(_rsConfigState == kConfigHBReconfiguring);
- _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady);
- return;
- }
-
- const auto configToApply = swConfig.getValue();
if (isSplitRecipientConfig) {
LOGV2(6309200,
"Applying a recipient config for a shard split operation.",
- "config"_attr = configToApply);
+ "config"_attr = newConfig);
}
const auto myIndex = [&]() -> StatusWith<int> {
@@ -751,24 +770,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
// recover from transient DNS errors.
{
stdx::lock_guard<Latch> lk(_mutex);
- if (_selfIndex >= 0 && sameConfigContents(_rsConfig, configToApply)) {
+ if (_selfIndex >= 0 && sameConfigContents(_rsConfig, newConfig)) {
LOGV2_FOR_HEARTBEATS(6351200,
2,
"New heartbeat config is only a version/term change, skipping "
"validation checks",
"oldConfig"_attr = _rsConfig,
- "newConfig"_attr = configToApply);
+ "newConfig"_attr = newConfig);
// If the configs are the same, so is our index.
return _selfIndex;
}
}
return validateConfigForHeartbeatReconfig(
- _externalState.get(), configToApply, getGlobalServiceContext());
+ _externalState.get(), newConfig, cc().getServiceContext());
}();
if (myIndex.getStatus() == ErrorCodes::NodeNotFound) {
stdx::lock_guard<Latch> lk(_mutex);
- // If this node absent in configToApply, and this node was not previously initialized,
+ // If this node absent in newConfig, and this node was not previously initialized,
// return to kConfigUninitialized immediately, rather than storing the config and
// transitioning into the RS_REMOVED state. See SERVER-15740.
if (!_rsConfig.isInitialized()) {
@@ -793,24 +812,23 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
} else {
LOGV2_FOR_HEARTBEATS(4615626,
2,
- "Config with {configToApplyVersionAndTerm} validated for "
+ "Config with {newConfigVersionAndTerm} validated for "
"reconfig; persisting to disk.",
"Config validated for reconfig; persisting to disk",
- "configToApplyVersionAndTerm"_attr =
- configToApply.getConfigVersionAndTerm());
+ "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm());
auto opCtx = cc().makeOperationContext();
auto status = [this,
opCtx = opCtx.get(),
- configToApply,
+ newConfig,
isSplitRecipientConfig = isSplitRecipientConfig]() {
if (isSplitRecipientConfig) {
- return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON());
+ return _externalState->replaceLocalConfigDocument(opCtx, newConfig.toBSON());
}
// Don't write the no-op for config learned via heartbeats.
return _externalState->storeLocalConfigDocument(
- opCtx, configToApply.toBSON(), false /* writeOplog */);
+ opCtx, newConfig.toBSON(), false /* writeOplog */);
}();
// Wait for durability of the new config document.
@@ -847,7 +865,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 &&
- configToApply.getMemberAt(myIndex.getValue()).isArbiter();
+ newConfig.getMemberAt(myIndex.getValue()).isArbiter();
if (isArbiter) {
ReplicaSetAwareServiceRegistry::get(_service).onBecomeArbiter();
@@ -871,13 +889,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
LOGV2_FOR_HEARTBEATS(
4615627,
2,
- "New configuration with {configToApplyVersionAndTerm} persisted "
+ "New configuration with {newConfigVersionAndTerm} persisted "
"to local storage; installing new config in memory",
"New configuration persisted to local storage; installing new config in memory",
- "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm());
+ "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm());
}
- _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig);
+ _heartbeatReconfigFinish(cbd, newConfig, myIndex, isSplitRecipientConfig);
// Start data replication after the config has been installed.
if (shouldStartDataReplication) {
@@ -1046,6 +1064,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// indicating that this is `forInitiate`.
_topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true);
_clearCommittedSnapshot_inlock();
+
+ invariant(_applierState == ApplierState::Stopped);
+ _applierState = ApplierState::Running;
+ _externalState->startProducerIfStopped();
}
lk.unlock();