summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-08-30 23:22:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-31 00:22:47 +0000
commit8c64e2c4c9a91f863d8911bed6bfe13369242de6 (patch)
treea9183ac6fc602825af1d493c5d49b2eea3a3d678 /src/mongo/db/repl
parentcdda7d10a0907b41c8aafdde08f3bee6d123db78 (diff)
downloadmongo-8c64e2c4c9a91f863d8911bed6bfe13369242de6.tar.gz
SERVER-68931 Drain oplog buffers before applying recipient config
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/oplog_batcher.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp39
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h12
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp100
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp306
6 files changed, 273 insertions, 193 deletions
diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp
index 986e9648e8d..d7d57678edc 100644
--- a/src/mongo/db/repl/oplog_batcher.cpp
+++ b/src/mongo/db/repl/oplog_batcher.cpp
@@ -360,7 +360,9 @@ void OplogBatcher::_run(StorageInterface* storageInterface) {
// Draining state guarantees the producer has already been fully stopped and no more
// operations will be pushed in to the oplog buffer until the applier state changes.
auto isDraining =
- replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
+ replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining ||
+ replCoord->getApplierState() ==
+ ReplicationCoordinator::ApplierState::DrainingForShardSplit;
// Check the oplog buffer after the applier state to ensure the producer is stopped.
if (isDraining && _oplogBuffer->isEmpty()) {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index c2faef400d9..c95d56603f5 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -568,8 +568,11 @@ public:
*
* When a node steps down during catchup mode, the states remain the same (producer: Running,
* applier: Running).
+ *
+ * DrainingForShardSplit follows the same state diagram as Draining, it only exists to hint the
+ * signalDrainModeComplete method that it should not follow the primary step-up logic.
*/
- enum class ApplierState { Running, Draining, Stopped };
+ enum class ApplierState { Running, Draining, DrainingForShardSplit, Stopped };
/**
* In normal cases: Running -> Draining -> Stopped -> Running.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2a8eec4863b..74adb1afb02 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1059,7 +1059,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
_initialSyncer.swap(initialSyncerCopy);
}
-
// joining the replication executor is blocking so it must be run outside of the mutex
if (initialSyncerCopy) {
LOGV2_DEBUG(
@@ -1074,6 +1073,17 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
initialSyncerCopy->join();
initialSyncerCopy.reset();
}
+
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (_finishedDrainingPromise) {
+ _finishedDrainingPromise->setError(
+ {ErrorCodes::InterruptedAtShutdown,
+ "Cancelling wait for drain mode to complete due to shutdown"});
+ _finishedDrainingPromise = boost::none;
+ }
+ }
+
_externalState->shutdown(opCtx);
_replExecutor->shutdown();
_replExecutor->join();
@@ -1192,6 +1202,23 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState
void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) noexcept {
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (_applierState == ReplicationCoordinator::ApplierState::DrainingForShardSplit) {
+ _applierState = ApplierState::Stopped;
+ auto memberState = _getMemberState_inlock();
+ invariant(memberState.secondary() || memberState.startup());
+ _externalState->onDrainComplete(opCtx);
+
+ if (_finishedDrainingPromise) {
+ _finishedDrainingPromise->emplaceValue();
+ _finishedDrainingPromise = boost::none;
+ }
+
+ return;
+ }
+ }
+
// This logic is a little complicated in order to avoid acquiring the RSTL in mode X
// unnecessarily. This is important because the applier may call signalDrainComplete()
// whenever it wants, not only when the ReplicationCoordinator is expecting it.
@@ -4844,6 +4871,16 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_externalState->stopProducer();
}
+Future<void> ReplicationCoordinatorImpl::_drainForShardSplit() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(!_finishedDrainingPromise.has_value());
+ auto [promise, future] = makePromiseFuture<void>();
+ _finishedDrainingPromise = std::move(promise);
+ _applierState = ApplierState::DrainingForShardSplit;
+ _externalState->stopProducer();
+ return std::move(future);
+}
+
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 4a1ba95f347..223c3fc9b90 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1370,7 +1370,8 @@ private:
* Method to write a configuration transmitted via heartbeat message to stable storage.
*/
void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd,
- const ReplSetConfig& newConfig);
+ const ReplSetConfig& newConfig,
+ bool isSplitRecipientConfig = false);
/**
* Conclusion actions of a heartbeat-triggered reconfiguration.
@@ -1567,6 +1568,12 @@ private:
void _enterDrainMode_inlock();
/**
+ * Enter drain mode which does not result in a primary stepup. Returns a future which becomes
+ * ready when the oplog buffers have completed draining.
+ */
+ Future<void> _drainForShardSplit();
+
+ /**
* Waits for the config state to leave kConfigStartingUp, which indicates that start() has
* finished.
*/
@@ -1846,6 +1853,9 @@ private:
// Construct used to synchronize default write concern changes with config write concern
// changes.
WriteConcernTagChangesImpl _writeConcernTagChanges;
+
+ // An optional promise created when entering drain mode for shard split.
+ boost::optional<Promise<void>> _finishedDrainingPromise; // (M)
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 057d101a6ff..a8ef444f31d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -659,7 +659,44 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
_selfIndex < 0);
_replExecutor
->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) {
- _heartbeatReconfigStore(cbData, newConfig);
+ const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig);
+ if (!swConfig.isOK()) {
+ LOGV2_WARNING(
+ 6234600,
+ "Ignoring new configuration in heartbeat response because it is invalid",
+ "status"_attr = swConfig.getStatus());
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_rsConfigState == kConfigHBReconfiguring);
+ _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized
+ : kConfigSteady);
+ return;
+ }
+
+ if (MONGO_likely(!isSplitRecipientConfig)) {
+ _heartbeatReconfigStore(cbData, newConfig);
+ return;
+ }
+
+ LOGV2(8423366, "Waiting for oplog buffer to drain before applying recipient config.");
+ _drainForShardSplit().getAsync(
+ [this,
+ resolvedConfig = swConfig.getValue(),
+ replExecutor = _replExecutor.get(),
+ isSplitRecipientConfig = isSplitRecipientConfig](Status status) {
+ if (!status.isOK()) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized
+ : kConfigSteady);
+ return;
+ }
+
+ replExecutor
+ ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) {
+ _heartbeatReconfigStore(cbData, resolvedConfig, isSplitRecipientConfig);
+ })
+ .status_with_transitional_ignore();
+ });
})
.status_with_transitional_ignore();
}
@@ -676,14 +713,8 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
lk.unlock();
// If this node is listed in the members of incoming config, accept the config.
- const auto foundSelfInMembers = std::any_of(
- config.membersBegin(),
- config.membersEnd(),
- [externalState = _externalState.get()](const MemberConfig& config) {
- return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext());
- });
-
- if (foundSelfInMembers) {
+ auto swSelfIndex = findSelfInConfig(_externalState.get(), config, cc().getServiceContext());
+ if (swSelfIndex.isOK()) {
return {config, false};
}
@@ -707,7 +738,6 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
false};
}
-
return {ReplSetConfig(*recipientConfig), true};
}
@@ -715,7 +745,9 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
}
void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
- const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) {
+ const executor::TaskExecutor::CallbackArgs& cbd,
+ const ReplSetConfig& newConfig,
+ bool isSplitRecipientConfig) {
if (cbd.status.code() == ErrorCodes::CallbackCanceled) {
LOGV2(21480,
@@ -727,23 +759,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
return;
}
- const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig);
- if (!swConfig.isOK()) {
- LOGV2_WARNING(6234600,
- "Ignoring new configuration in heartbeat response because it is invalid",
- "status"_attr = swConfig.getStatus());
-
- stdx::lock_guard<Latch> lg(_mutex);
- invariant(_rsConfigState == kConfigHBReconfiguring);
- _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady);
- return;
- }
-
- const auto configToApply = swConfig.getValue();
if (isSplitRecipientConfig) {
LOGV2(6309200,
"Applying a recipient config for a shard split operation.",
- "config"_attr = configToApply);
+ "config"_attr = newConfig);
}
const auto myIndex = [&]() -> StatusWith<int> {
@@ -751,24 +770,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
// recover from transient DNS errors.
{
stdx::lock_guard<Latch> lk(_mutex);
- if (_selfIndex >= 0 && sameConfigContents(_rsConfig, configToApply)) {
+ if (_selfIndex >= 0 && sameConfigContents(_rsConfig, newConfig)) {
LOGV2_FOR_HEARTBEATS(6351200,
2,
"New heartbeat config is only a version/term change, skipping "
"validation checks",
"oldConfig"_attr = _rsConfig,
- "newConfig"_attr = configToApply);
+ "newConfig"_attr = newConfig);
// If the configs are the same, so is our index.
return _selfIndex;
}
}
return validateConfigForHeartbeatReconfig(
- _externalState.get(), configToApply, getGlobalServiceContext());
+ _externalState.get(), newConfig, cc().getServiceContext());
}();
if (myIndex.getStatus() == ErrorCodes::NodeNotFound) {
stdx::lock_guard<Latch> lk(_mutex);
- // If this node absent in configToApply, and this node was not previously initialized,
+ // If this node absent in newConfig, and this node was not previously initialized,
// return to kConfigUninitialized immediately, rather than storing the config and
// transitioning into the RS_REMOVED state. See SERVER-15740.
if (!_rsConfig.isInitialized()) {
@@ -793,24 +812,23 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
} else {
LOGV2_FOR_HEARTBEATS(4615626,
2,
- "Config with {configToApplyVersionAndTerm} validated for "
+ "Config with {newConfigVersionAndTerm} validated for "
"reconfig; persisting to disk.",
"Config validated for reconfig; persisting to disk",
- "configToApplyVersionAndTerm"_attr =
- configToApply.getConfigVersionAndTerm());
+ "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm());
auto opCtx = cc().makeOperationContext();
auto status = [this,
opCtx = opCtx.get(),
- configToApply,
+ newConfig,
isSplitRecipientConfig = isSplitRecipientConfig]() {
if (isSplitRecipientConfig) {
- return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON());
+ return _externalState->replaceLocalConfigDocument(opCtx, newConfig.toBSON());
}
// Don't write the no-op for config learned via heartbeats.
return _externalState->storeLocalConfigDocument(
- opCtx, configToApply.toBSON(), false /* writeOplog */);
+ opCtx, newConfig.toBSON(), false /* writeOplog */);
}();
// Wait for durability of the new config document.
@@ -847,7 +865,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
}
bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 &&
- configToApply.getMemberAt(myIndex.getValue()).isArbiter();
+ newConfig.getMemberAt(myIndex.getValue()).isArbiter();
if (isArbiter) {
ReplicaSetAwareServiceRegistry::get(_service).onBecomeArbiter();
@@ -871,13 +889,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
LOGV2_FOR_HEARTBEATS(
4615627,
2,
- "New configuration with {configToApplyVersionAndTerm} persisted "
+ "New configuration with {newConfigVersionAndTerm} persisted "
"to local storage; installing new config in memory",
"New configuration persisted to local storage; installing new config in memory",
- "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm());
+ "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm());
}
- _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig);
+ _heartbeatReconfigFinish(cbd, newConfig, myIndex, isSplitRecipientConfig);
// Start data replication after the config has been installed.
if (shouldStartDataReplication) {
@@ -1046,6 +1064,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
// indicating that this is `forInitiate`.
_topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true);
_clearCommittedSnapshot_inlock();
+
+ invariant(_applierState == ApplierState::Stopped);
+ _applierState = ApplierState::Running;
+ _externalState->startProducerIfStopped();
}
lk.unlock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 62e2c412cc6..92a1cecbb69 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -269,29 +269,46 @@ TEST_F(ReplCoordHBV1Test,
ASSERT_TRUE(getExternalState()->threadsStarted());
}
-TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) {
+TEST_F(ReplCoordHBV1Test, RejectRecipientConfigWhenNotInServerlessMode) {
auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kDefault,
logv2::LogSeverity::Debug(3)};
- // Start up with three nodes, and assume the role of "node2" as a secondary. Notably, the local
- // node is NOT started in serverless mode. "node2" is configured as having no votes, no
+ auto rsConfig =
+ assertMakeRSConfig(BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(
+ BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345")
+ << BSON("_id" << 3 << "host"
+ << "node3:12345")
+ << BSON("_id" << 4 << "host"
+ << "node4:12345"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 5 << "host"
+ << "node5:12345"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 6 << "host"
+ << "node6:12345"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2")))));
+
+ // Start up with three nodes, and assume the role of "node5" as a secondary. Notably, the local
+ // node is NOT started in serverless mode. "node5" is configured as having no votes, no
// priority, so that we can pass validation for accepting a split config.
- assertStartSuccess(BSON("_id"
- << "mySet"
- << "protocolVersion" << 1 << "version" << 2 << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "node1:12345")
- << BSON("_id" << 2 << "host"
- << "node2:12345"
- << "votes" << 0 << "priority" << 0)
- << BSON("_id" << 3 << "host"
- << "node3:12345"))),
- HostAndPort("node2", 12345));
+ assertStartSuccess(rsConfig.toBSON(), HostAndPort("node5", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->updateTerm_forTest(1, nullptr);
ASSERT_EQ(getReplCoord()->getTerm(), 1);
// respond to initial heartbeat requests
- for (int j = 0; j < 2; ++j) {
+ for (int j = 0; j < 5; ++j) {
replyToReceivedHeartbeatV1();
}
@@ -303,27 +320,8 @@ TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) {
ASSERT_FALSE(getNet()->hasReadyRequests());
}
- ReplSetConfig splitConfig =
- assertMakeRSConfig(BSON("_id"
- << "mySet"
- << "version" << 3 << "term" << 1 << "protocolVersion" << 1
- << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "node1:12345")
- << BSON("_id" << 2 << "host"
- << "node2:12345")
- << BSON("_id" << 3 << "host"
- << "node3:12345"))
- << "recipientConfig"
- << BSON("_id"
- << "recipientSet"
- << "version" << 1 << "term" << 1 << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "node1:12345")
- << BSON("_id" << 2 << "host"
- << "node2:12345")
- << BSON("_id" << 3 << "host"
- << "node3:12345")))));
+ auto splitConfig =
+ serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName");
// Accept a heartbeat from `node1` which has a split config. The split config lists this node
// ("node2") in the recipient member list, but a node started not in serverless mode should not
@@ -333,43 +331,41 @@ TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) {
{
InNetworkGuard guard(getNet());
processResponseFromPrimary(splitConfig, 2, 1, HostAndPort{"node1", 12345});
- assertMemberState(MemberState::RS_SECONDARY);
- OperationContextNoop opCtx;
- auto storedConfig = ReplSetConfig::parse(
- unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx)));
- ASSERT_OK(storedConfig.validate());
-
- // Verify that the recipient config was not accepted. A successfully applied splitConfig
- // will install at version and term {1, 1}.
- ASSERT_EQUALS(ConfigVersionAndTerm(3, 1), storedConfig.getConfigVersionAndTerm());
- ASSERT_EQUALS("mySet", storedConfig.getReplSetName());
+ assertMemberState(MemberState::RS_REMOVED);
}
-
- ASSERT_TRUE(getExternalState()->threadsStarted());
}
TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) {
- ReplSetConfig rsConfig =
+ auto rsConfig =
assertMakeRSConfig(BSON("_id"
<< "mySet"
- << "version" << 3 << "term" << 1 << "protocolVersion" << 1
- << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "h1:1")
- << BSON("_id" << 2 << "host"
- << "h2:1")
- << BSON("_id" << 3 << "host"
- << "h3:1"))
- << "recipientConfig"
- << BSON("_id"
- << "recipientSet"
- << "version" << 1 << "term" << 1 << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "h4:1")
- << BSON("_id" << 2 << "host"
- << "h5:1")
- << BSON("_id" << 3 << "host"
- << "h6:1")))));
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(
+ BSON("_id" << 1 << "host"
+ << "h1:1")
+ << BSON("_id" << 2 << "host"
+ << "h2:1"
+ << "votes" << 0 << "priority" << 0)
+ << BSON("_id" << 3 << "host"
+ << "h3:1")
+ << BSON("_id" << 4 << "host"
+ << "h4:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 5 << "host"
+ << "h5:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 6 << "host"
+ << "h6:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2")))));
+
+ auto splitConfig =
+ serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName");
ReplSettings settings;
settings.setServerlessMode();
@@ -383,36 +379,45 @@ TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) {
ASSERT_FALSE(getNet()->hasReadyRequests());
exitNetwork();
- receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1));
enterNetwork();
- processResponseFromPrimary(rsConfig);
+ processResponseFromPrimary(splitConfig);
assertMemberState(MemberState::RS_STARTUP);
exitNetwork();
}
TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbeat) {
- ReplSetConfig rsConfig =
+ auto rsConfig =
assertMakeRSConfig(BSON("_id"
<< "mySet"
- << "version" << 3 << "term" << 1 << "protocolVersion" << 1
- << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "h1:1")
- << BSON("_id" << 2 << "host"
- << "h2:1")
- << BSON("_id" << 3 << "host"
- << "h3:1"))
- << "recipientConfig"
- << BSON("_id"
- << "recipientSet"
- << "version" << 1 << "term" << 1 << "members"
- << BSON_ARRAY(BSON("_id" << 1 << "host"
- << "h4:1")
- << BSON("_id" << 2 << "host"
- << "h5:1")
- << BSON("_id" << 3 << "host"
- << "h6:1")))));
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(
+ BSON("_id" << 1 << "host"
+ << "h1:1")
+ << BSON("_id" << 2 << "host"
+ << "h2:1"
+ << "votes" << 0 << "priority" << 0)
+ << BSON("_id" << 3 << "host"
+ << "h3:1")
+ << BSON("_id" << 4 << "host"
+ << "h4:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 5 << "host"
+ << "h5:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2"))
+ << BSON("_id" << 6 << "host"
+ << "h6:1"
+ << "votes" << 0 << "priority" << 0 << "tags"
+ << BSON("recipientTagName"
+ << "tag2")))));
+
+ auto splitConfig =
+ serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName");
ReplSettings settings;
settings.setServerlessMode();
@@ -424,11 +429,12 @@ TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbe
ASSERT_FALSE(getNet()->hasReadyRequests());
exitNetwork();
- receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1));
enterNetwork();
- processResponseFromPrimary(rsConfig);
+ processResponseFromPrimary(splitConfig);
performSyncToFinishReconfigHeartbeat();
+
assertMemberState(MemberState::RS_STARTUP2);
OperationContextNoop opCtx;
auto storedConfig = ReplSetConfig::parse(
@@ -687,9 +693,7 @@ public:
return noi;
}
- BSONObj constructResponse(const ReplSetConfig& config,
- const int configVersion,
- const int termVersion) {
+ BSONObj makeHeartbeatResponseWithConfig(const ReplSetConfig& config) {
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(_donorSetName);
hbResp.setState(MemberState::RS_PRIMARY);
@@ -699,20 +703,11 @@ public:
OpTime opTime(Timestamp(1, 1), 0);
hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t() + Seconds{1}});
hbResp.setDurableOpTimeAndWallTime({opTime, Date_t() + Seconds{1}});
+
BSONObjBuilder responseBuilder;
responseBuilder << "ok" << 1;
hbResp.addToBSON(&responseBuilder);
-
- // Add the raw config object.
- auto conf = ReplSetConfig::parse(makeConfigObj(configVersion, termVersion));
- auto splitConf = serverless::makeSplitConfig(conf, _recipientSetName, _recipientTag);
-
- // makeSplitConf increment the config version. We don't want that here as it makes the unit
- // test case harder to follow.
- BSONObjBuilder splitBuilder(splitConf.toBSON().removeField("version"));
- splitBuilder.append("version", configVersion);
-
- responseBuilder << "config" << splitBuilder.obj();
+ responseBuilder.append("config", config.toBSON());
return responseBuilder.obj();
}
@@ -769,64 +764,73 @@ protected:
const std::string _recipientSecondaryNode{"h4:1"};
};
-TEST_F(ReplCoordHBV1SplitConfigTest, DonorNodeDontApplyConfig) {
+TEST_F(ReplCoordHBV1SplitConfigTest, DonorNodeDoesNotApplyRecipientConfig) {
startUp(_donorSecondaryNode);
+ auto splitConfig =
+ serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm),
+ _recipientSetName,
+ _recipientTag);
- // Config with newer version and same term.
- ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm(_configVersion + 1, _configTerm);
+ // Receive a heartbeat request that informs us about a newer config, prompting a heartbeat
+ // request to fetch the new config.
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1));
- // Receive a heartbeat request that tells us about a newer config.
- receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
-
- getNet()->enterNetwork();
+ InNetworkGuard guard(getNet());
auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm);
- _configVersion += 1;
-
- // Construct the heartbeat response containing the newer config.
- auto responseObj = constructResponse(rsConfig, _configVersion, _configTerm);
-
- // Schedule and deliver the heartbeat response.
- getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj));
+ // Construct the heartbeat response containing the split config, and schedule it.
+ auto response = makeHeartbeatResponseWithConfig(splitConfig);
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(response));
getNet()->runReadyNetworkOperations();
- auto installedConfig = getReplCoord()->getConfig();
- ASSERT_EQ(installedConfig.getReplSetName(), _donorSetName);
-
- // The node has updated its config and term to the new values.
- ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion);
- ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm);
-
- validateNextRequest("", _donorSetName, _configVersion, _configTerm);
+ // Validate that the donor node has accepted the split config, but not applied the recipient
+ // config.
+ ASSERT_EQ(getReplCoord()->getConfig().getReplSetName(), _donorSetName);
+ ASSERT_EQ(getReplCoord()->getConfigVersion(), splitConfig.getConfigVersion());
+ ASSERT_EQ(getReplCoord()->getConfigTerm(), splitConfig.getConfigTerm());
+ validateNextRequest(
+ "", _donorSetName, splitConfig.getConfigVersion(), splitConfig.getConfigTerm());
}
-TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) {
+TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeAppliesRecipientConfig) {
startUp(_recipientSecondaryNode);
+ auto splitConfig =
+ serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm),
+ _recipientSetName,
+ _recipientTag);
- // Config with newer version and same term.
- ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm);
-
- // Receive a heartbeat request that tells us about a newer config.
- receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+ // Receive a heartbeat request that informs us about a newer config, prompting a heartbeat
+ // request to fetch the new config.
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1));
- getNet()->enterNetwork();
- auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm);
+ {
+ InNetworkGuard guard(getNet());
+ auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm);
- // Construct the heartbeat response containing the newer config.
- auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm);
+ // Construct the heartbeat response containing the split config, and schedule it.
+ auto response = makeHeartbeatResponseWithConfig(splitConfig);
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(response));
+ getNet()->runReadyNetworkOperations();
+ }
- // Schedule and deliver the heartbeat response.
- getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj));
- getNet()->runReadyNetworkOperations();
+ auto opCtx = makeOperationContext();
+ getReplCoord()->signalDrainComplete(opCtx.get(), _configTerm);
+ while (getReplCoord()->getConfigVersionAndTerm() < splitConfig.getConfigVersionAndTerm()) {
+ sleepFor(Milliseconds(10));
+ }
- // The recipient's lastCommittedOpTime and currentCommittedSnapshotOpTime are cleared on
- // applying the recipient config.
+ // Validate that the recipient node has accepted the recipient config, and changed its set name
+ // to the recipientSetName. Also, confirm that the recipient's lastCommittedOpTime and
+ // currentCommittedSnapshotOpTime are cleared on applying the recipient config.
+ ASSERT_EQ(getReplCoord()->getConfig().getReplSetName(), _recipientSetName);
ASSERT(getReplCoord()->getLastCommittedOpTime().isNull());
ASSERT(getReplCoord()->getCurrentCommittedSnapshotOpTime().isNull());
- // Applying the recipient config will increase the configVersion by 1.
- validateNextRequest(
- "", _recipientSetName, (_configVersion + 2), getReplCoord()->getConfigTerm());
+ {
+ InNetworkGuard guard(getNet());
+ validateNextRequest(
+ "", _recipientSetName, splitConfig.getConfigVersion(), splitConfig.getConfigTerm());
+ }
}
TEST_F(ReplCoordHBV1SplitConfigTest, RejectMismatchedSetNameInHeartbeatResponse) {
@@ -884,17 +888,19 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeNonZeroVotes) {
<< "tag2")));
startUp(_recipientSecondaryNode);
- // Config with newer version and same term.
- ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm);
+ auto splitConfig =
+ serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm),
+ _recipientSetName,
+ _recipientTag);
- // Receive a heartbeat request that tells us about a newer config.
- receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1));
+ // Receive a heartbeat request that tells us about a split config.
+ receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1));
getNet()->enterNetwork();
auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm);
- // Construct the heartbeat response containing the newer config.
- auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm);
+ // Construct the heartbeat response containing the split config.
+ auto responseObj = makeHeartbeatResponseWithConfig(splitConfig);
// Schedule and deliver the heartbeat response.
getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj));