diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-08-30 23:22:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-31 00:22:47 +0000 |
commit | 8c64e2c4c9a91f863d8911bed6bfe13369242de6 (patch) | |
tree | a9183ac6fc602825af1d493c5d49b2eea3a3d678 /src/mongo/db/repl | |
parent | cdda7d10a0907b41c8aafdde08f3bee6d123db78 (diff) | |
download | mongo-8c64e2c4c9a91f863d8911bed6bfe13369242de6.tar.gz |
SERVER-68931 Drain oplog buffers before applying recipient config
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 100 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 306 |
6 files changed, 273 insertions, 193 deletions
diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index 986e9648e8d..d7d57678edc 100644 --- a/src/mongo/db/repl/oplog_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -360,7 +360,9 @@ void OplogBatcher::_run(StorageInterface* storageInterface) { // Draining state guarantees the producer has already been fully stopped and no more // operations will be pushed in to the oplog buffer until the applier state changes. auto isDraining = - replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; + replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining || + replCoord->getApplierState() == + ReplicationCoordinator::ApplierState::DrainingForShardSplit; // Check the oplog buffer after the applier state to ensure the producer is stopped. if (isDraining && _oplogBuffer->isEmpty()) { diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index c2faef400d9..c95d56603f5 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -568,8 +568,11 @@ public: * * When a node steps down during catchup mode, the states remain the same (producer: Running, * applier: Running). + * + * DrainingForShardSplit follows the same state diagram as Draining, it only exists to hint the + * signalDrainModeComplete method that it should not follow the primary step-up logic. */ - enum class ApplierState { Running, Draining, Stopped }; + enum class ApplierState { Running, Draining, DrainingForShardSplit, Stopped }; /** * In normal cases: Running -> Draining -> Stopped -> Running. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2a8eec4863b..74adb1afb02 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1059,7 +1059,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { _initialSyncer.swap(initialSyncerCopy); } - // joining the replication executor is blocking so it must be run outside of the mutex if (initialSyncerCopy) { LOGV2_DEBUG( @@ -1074,6 +1073,17 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { initialSyncerCopy->join(); initialSyncerCopy.reset(); } + + { + stdx::unique_lock<Latch> lk(_mutex); + if (_finishedDrainingPromise) { + _finishedDrainingPromise->setError( + {ErrorCodes::InterruptedAtShutdown, + "Cancelling wait for drain mode to complete due to shutdown"}); + _finishedDrainingPromise = boost::none; + } + } + _externalState->shutdown(opCtx); _replExecutor->shutdown(); _replExecutor->join(); @@ -1192,6 +1202,23 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) noexcept { + { + stdx::unique_lock<Latch> lk(_mutex); + if (_applierState == ReplicationCoordinator::ApplierState::DrainingForShardSplit) { + _applierState = ApplierState::Stopped; + auto memberState = _getMemberState_inlock(); + invariant(memberState.secondary() || memberState.startup()); + _externalState->onDrainComplete(opCtx); + + if (_finishedDrainingPromise) { + _finishedDrainingPromise->emplaceValue(); + _finishedDrainingPromise = boost::none; + } + + return; + } + } + // This logic is a little complicated in order to avoid acquiring the RSTL in mode X // unnecessarily. This is important because the applier may call signalDrainComplete() // whenever it wants, not only when the ReplicationCoordinator is expecting it. @@ -4844,6 +4871,16 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _externalState->stopProducer(); } +Future<void> ReplicationCoordinatorImpl::_drainForShardSplit() { + stdx::lock_guard<Latch> lk(_mutex); + invariant(!_finishedDrainingPromise.has_value()); + auto [promise, future] = makePromiseFuture<void>(); + _finishedDrainingPromise = std::move(promise); + _applierState = ApplierState::DrainingForShardSplit; + _externalState->stopProducer(); + return std::move(future); +} + ReplicationCoordinatorImpl::PostMemberStateUpdateAction ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk, OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 4a1ba95f347..223c3fc9b90 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1370,7 +1370,8 @@ private: * Method to write a configuration transmitted via heartbeat message to stable storage. */ void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd, - const ReplSetConfig& newConfig); + const ReplSetConfig& newConfig, + bool isSplitRecipientConfig = false); /** * Conclusion actions of a heartbeat-triggered reconfiguration. @@ -1567,6 +1568,12 @@ private: void _enterDrainMode_inlock(); /** + * Enter drain mode which does not result in a primary stepup. Returns a future which becomes + * ready when the oplog buffers have completed draining. + */ + Future<void> _drainForShardSplit(); + + /** * Waits for the config state to leave kConfigStartingUp, which indicates that start() has * finished. */ @@ -1846,6 +1853,9 @@ private: // Construct used to synchronize default write concern changes with config write concern // changes. WriteConcernTagChangesImpl _writeConcernTagChanges; + + // An optional promise created when entering drain mode for shard split. + boost::optional<Promise<void>> _finishedDrainingPromise; // (M) }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 057d101a6ff..a8ef444f31d 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -659,7 +659,44 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, _selfIndex < 0); _replExecutor ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { - _heartbeatReconfigStore(cbData, newConfig); + const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); + if (!swConfig.isOK()) { + LOGV2_WARNING( + 6234600, + "Ignoring new configuration in heartbeat response because it is invalid", + "status"_attr = swConfig.getStatus()); + + stdx::lock_guard<Latch> lg(_mutex); + invariant(_rsConfigState == kConfigHBReconfiguring); + _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized + : kConfigSteady); + return; + } + + if (MONGO_likely(!isSplitRecipientConfig)) { + _heartbeatReconfigStore(cbData, newConfig); + return; + } + + LOGV2(8423366, "Waiting for oplog buffer to drain before applying recipient config."); + _drainForShardSplit().getAsync( + [this, + resolvedConfig = swConfig.getValue(), + replExecutor = _replExecutor.get(), + isSplitRecipientConfig = isSplitRecipientConfig](Status status) { + if (!status.isOK()) { + stdx::lock_guard<Latch> lg(_mutex); + _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized + : kConfigSteady); + return; + } + + replExecutor + ->scheduleWork([=](const executor::TaskExecutor::CallbackArgs& cbData) { + _heartbeatReconfigStore(cbData, resolvedConfig, isSplitRecipientConfig); + }) + .status_with_transitional_ignore(); + }); }) .status_with_transitional_ignore(); } @@ -676,14 +713,8 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve lk.unlock(); // If this node is listed in the members of incoming config, accept the config. - const auto foundSelfInMembers = std::any_of( - config.membersBegin(), - config.membersEnd(), - [externalState = _externalState.get()](const MemberConfig& config) { - return externalState->isSelf(config.getHostAndPort(), getGlobalServiceContext()); - }); - - if (foundSelfInMembers) { + auto swSelfIndex = findSelfInConfig(_externalState.get(), config, cc().getServiceContext()); + if (swSelfIndex.isOK()) { return {config, false}; } @@ -707,7 +738,6 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve false}; } - return {ReplSetConfig(*recipientConfig), true}; } @@ -715,7 +745,9 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( - const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { + const executor::TaskExecutor::CallbackArgs& cbd, + const ReplSetConfig& newConfig, + bool isSplitRecipientConfig) { if (cbd.status.code() == ErrorCodes::CallbackCanceled) { LOGV2(21480, @@ -727,23 +759,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); - if (!swConfig.isOK()) { - LOGV2_WARNING(6234600, - "Ignoring new configuration in heartbeat response because it is invalid", - "status"_attr = swConfig.getStatus()); - - stdx::lock_guard<Latch> lg(_mutex); - invariant(_rsConfigState == kConfigHBReconfiguring); - _setConfigState_inlock(!_rsConfig.isInitialized() ? kConfigUninitialized : kConfigSteady); - return; - } - - const auto configToApply = swConfig.getValue(); if (isSplitRecipientConfig) { LOGV2(6309200, "Applying a recipient config for a shard split operation.", - "config"_attr = configToApply); + "config"_attr = newConfig); } const auto myIndex = [&]() -> StatusWith<int> { @@ -751,24 +770,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( // recover from transient DNS errors. { stdx::lock_guard<Latch> lk(_mutex); - if (_selfIndex >= 0 && sameConfigContents(_rsConfig, configToApply)) { + if (_selfIndex >= 0 && sameConfigContents(_rsConfig, newConfig)) { LOGV2_FOR_HEARTBEATS(6351200, 2, "New heartbeat config is only a version/term change, skipping " "validation checks", "oldConfig"_attr = _rsConfig, - "newConfig"_attr = configToApply); + "newConfig"_attr = newConfig); // If the configs are the same, so is our index. return _selfIndex; } } return validateConfigForHeartbeatReconfig( - _externalState.get(), configToApply, getGlobalServiceContext()); + _externalState.get(), newConfig, cc().getServiceContext()); }(); if (myIndex.getStatus() == ErrorCodes::NodeNotFound) { stdx::lock_guard<Latch> lk(_mutex); - // If this node absent in configToApply, and this node was not previously initialized, + // If this node absent in newConfig, and this node was not previously initialized, // return to kConfigUninitialized immediately, rather than storing the config and // transitioning into the RS_REMOVED state. See SERVER-15740. if (!_rsConfig.isInitialized()) { @@ -793,24 +812,23 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } else { LOGV2_FOR_HEARTBEATS(4615626, 2, - "Config with {configToApplyVersionAndTerm} validated for " + "Config with {newConfigVersionAndTerm} validated for " "reconfig; persisting to disk.", "Config validated for reconfig; persisting to disk", - "configToApplyVersionAndTerm"_attr = - configToApply.getConfigVersionAndTerm()); + "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); auto opCtx = cc().makeOperationContext(); auto status = [this, opCtx = opCtx.get(), - configToApply, + newConfig, isSplitRecipientConfig = isSplitRecipientConfig]() { if (isSplitRecipientConfig) { - return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON()); + return _externalState->replaceLocalConfigDocument(opCtx, newConfig.toBSON()); } // Don't write the no-op for config learned via heartbeats. return _externalState->storeLocalConfigDocument( - opCtx, configToApply.toBSON(), false /* writeOplog */); + opCtx, newConfig.toBSON(), false /* writeOplog */); }(); // Wait for durability of the new config document. @@ -847,7 +865,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && - configToApply.getMemberAt(myIndex.getValue()).isArbiter(); + newConfig.getMemberAt(myIndex.getValue()).isArbiter(); if (isArbiter) { ReplicaSetAwareServiceRegistry::get(_service).onBecomeArbiter(); @@ -871,13 +889,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( LOGV2_FOR_HEARTBEATS( 4615627, 2, - "New configuration with {configToApplyVersionAndTerm} persisted " + "New configuration with {newConfigVersionAndTerm} persisted " "to local storage; installing new config in memory", "New configuration persisted to local storage; installing new config in memory", - "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); + "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); + _heartbeatReconfigFinish(cbd, newConfig, myIndex, isSplitRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -1046,6 +1064,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( // indicating that this is `forInitiate`. _topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true); _clearCommittedSnapshot_inlock(); + + invariant(_applierState == ApplierState::Stopped); + _applierState = ApplierState::Running; + _externalState->startProducerIfStopped(); } lk.unlock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 62e2c412cc6..92a1cecbb69 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -269,29 +269,46 @@ TEST_F(ReplCoordHBV1Test, ASSERT_TRUE(getExternalState()->threadsStarted()); } -TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) { +TEST_F(ReplCoordHBV1Test, RejectRecipientConfigWhenNotInServerlessMode) { auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kDefault, logv2::LogSeverity::Debug(3)}; - // Start up with three nodes, and assume the role of "node2" as a secondary. Notably, the local - // node is NOT started in serverless mode. "node2" is configured as having no votes, no + auto rsConfig = + assertMakeRSConfig(BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY( + BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 4 << "host" + << "node4:12345" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 5 << "host" + << "node5:12345" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 6 << "host" + << "node6:12345" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2"))))); + + // Start up with three nodes, and assume the role of "node5" as a secondary. Notably, the local + // node is NOT started in serverless mode. "node5" is configured as having no votes, no // priority, so that we can pass validation for accepting a split config. - assertStartSuccess(BSON("_id" - << "mySet" - << "protocolVersion" << 1 << "version" << 2 << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "node1:12345") - << BSON("_id" << 2 << "host" - << "node2:12345" - << "votes" << 0 << "priority" << 0) - << BSON("_id" << 3 << "host" - << "node3:12345"))), - HostAndPort("node2", 12345)); + assertStartSuccess(rsConfig.toBSON(), HostAndPort("node5", 12345)); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getReplCoord()->updateTerm_forTest(1, nullptr); ASSERT_EQ(getReplCoord()->getTerm(), 1); // respond to initial heartbeat requests - for (int j = 0; j < 2; ++j) { + for (int j = 0; j < 5; ++j) { replyToReceivedHeartbeatV1(); } @@ -303,27 +320,8 @@ TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) { ASSERT_FALSE(getNet()->hasReadyRequests()); } - ReplSetConfig splitConfig = - assertMakeRSConfig(BSON("_id" - << "mySet" - << "version" << 3 << "term" << 1 << "protocolVersion" << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "node1:12345") - << BSON("_id" << 2 << "host" - << "node2:12345") - << BSON("_id" << 3 << "host" - << "node3:12345")) - << "recipientConfig" - << BSON("_id" - << "recipientSet" - << "version" << 1 << "term" << 1 << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "node1:12345") - << BSON("_id" << 2 << "host" - << "node2:12345") - << BSON("_id" << 3 << "host" - << "node3:12345"))))); + auto splitConfig = + serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName"); // Accept a heartbeat from `node1` which has a split config. The split config lists this node // ("node2") in the recipient member list, but a node started not in serverless mode should not @@ -333,43 +331,41 @@ TEST_F(ReplCoordHBV1Test, RejectSplitConfigWhenNotInServerlessMode) { { InNetworkGuard guard(getNet()); processResponseFromPrimary(splitConfig, 2, 1, HostAndPort{"node1", 12345}); - assertMemberState(MemberState::RS_SECONDARY); - OperationContextNoop opCtx; - auto storedConfig = ReplSetConfig::parse( - unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx))); - ASSERT_OK(storedConfig.validate()); - - // Verify that the recipient config was not accepted. A successfully applied splitConfig - // will install at version and term {1, 1}. - ASSERT_EQUALS(ConfigVersionAndTerm(3, 1), storedConfig.getConfigVersionAndTerm()); - ASSERT_EQUALS("mySet", storedConfig.getReplSetName()); + assertMemberState(MemberState::RS_REMOVED); } - - ASSERT_TRUE(getExternalState()->threadsStarted()); } TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) { - ReplSetConfig rsConfig = + auto rsConfig = assertMakeRSConfig(BSON("_id" << "mySet" - << "version" << 3 << "term" << 1 << "protocolVersion" << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "h1:1") - << BSON("_id" << 2 << "host" - << "h2:1") - << BSON("_id" << 3 << "host" - << "h3:1")) - << "recipientConfig" - << BSON("_id" - << "recipientSet" - << "version" << 1 << "term" << 1 << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "h4:1") - << BSON("_id" << 2 << "host" - << "h5:1") - << BSON("_id" << 3 << "host" - << "h6:1"))))); + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY( + BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1" + << "votes" << 0 << "priority" << 0) + << BSON("_id" << 3 << "host" + << "h3:1") + << BSON("_id" << 4 << "host" + << "h4:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 5 << "host" + << "h5:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 6 << "host" + << "h6:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2"))))); + + auto splitConfig = + serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName"); ReplSettings settings; settings.setServerlessMode(); @@ -383,36 +379,45 @@ TEST_F(ReplCoordHBV1Test, NodeRejectsSplitConfigWhenNotInitialized) { ASSERT_FALSE(getNet()->hasReadyRequests()); exitNetwork(); - receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1)); enterNetwork(); - processResponseFromPrimary(rsConfig); + processResponseFromPrimary(splitConfig); assertMemberState(MemberState::RS_STARTUP); exitNetwork(); } TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbeat) { - ReplSetConfig rsConfig = + auto rsConfig = assertMakeRSConfig(BSON("_id" << "mySet" - << "version" << 3 << "term" << 1 << "protocolVersion" << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "h1:1") - << BSON("_id" << 2 << "host" - << "h2:1") - << BSON("_id" << 3 << "host" - << "h3:1")) - << "recipientConfig" - << BSON("_id" - << "recipientSet" - << "version" << 1 << "term" << 1 << "members" - << BSON_ARRAY(BSON("_id" << 1 << "host" - << "h4:1") - << BSON("_id" << 2 << "host" - << "h5:1") - << BSON("_id" << 3 << "host" - << "h6:1"))))); + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY( + BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1" + << "votes" << 0 << "priority" << 0) + << BSON("_id" << 3 << "host" + << "h3:1") + << BSON("_id" << 4 << "host" + << "h4:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 5 << "host" + << "h5:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2")) + << BSON("_id" << 6 << "host" + << "h6:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recipientTagName" + << "tag2"))))); + + auto splitConfig = + serverless::makeSplitConfig(rsConfig, "recipientSetName", "recipientTagName"); ReplSettings settings; settings.setServerlessMode(); @@ -424,11 +429,12 @@ TEST_F(ReplCoordHBV1Test, UninitializedDonorNodeAcceptsSplitConfigOnFirstHeartbe ASSERT_FALSE(getNet()->hasReadyRequests()); exitNetwork(); - receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1)); enterNetwork(); - processResponseFromPrimary(rsConfig); + processResponseFromPrimary(splitConfig); performSyncToFinishReconfigHeartbeat(); + assertMemberState(MemberState::RS_STARTUP2); OperationContextNoop opCtx; auto storedConfig = ReplSetConfig::parse( @@ -687,9 +693,7 @@ public: return noi; } - BSONObj constructResponse(const ReplSetConfig& config, - const int configVersion, - const int termVersion) { + BSONObj makeHeartbeatResponseWithConfig(const ReplSetConfig& config) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(_donorSetName); hbResp.setState(MemberState::RS_PRIMARY); @@ -699,20 +703,11 @@ public: OpTime opTime(Timestamp(1, 1), 0); hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t() + Seconds{1}}); hbResp.setDurableOpTimeAndWallTime({opTime, Date_t() + Seconds{1}}); + BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1; hbResp.addToBSON(&responseBuilder); - - // Add the raw config object. - auto conf = ReplSetConfig::parse(makeConfigObj(configVersion, termVersion)); - auto splitConf = serverless::makeSplitConfig(conf, _recipientSetName, _recipientTag); - - // makeSplitConf increment the config version. We don't want that here as it makes the unit - // test case harder to follow. - BSONObjBuilder splitBuilder(splitConf.toBSON().removeField("version")); - splitBuilder.append("version", configVersion); - - responseBuilder << "config" << splitBuilder.obj(); + responseBuilder.append("config", config.toBSON()); return responseBuilder.obj(); } @@ -769,64 +764,73 @@ protected: const std::string _recipientSecondaryNode{"h4:1"}; }; -TEST_F(ReplCoordHBV1SplitConfigTest, DonorNodeDontApplyConfig) { +TEST_F(ReplCoordHBV1SplitConfigTest, DonorNodeDoesNotApplyRecipientConfig) { startUp(_donorSecondaryNode); + auto splitConfig = + serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm), + _recipientSetName, + _recipientTag); - // Config with newer version and same term. - ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm(_configVersion + 1, _configTerm); + // Receive a heartbeat request that informs us about a newer config, prompting a heartbeat + // request to fetch the new config. + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1)); - // Receive a heartbeat request that tells us about a newer config. - receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); - - getNet()->enterNetwork(); + InNetworkGuard guard(getNet()); auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); - _configVersion += 1; - - // Construct the heartbeat response containing the newer config. - auto responseObj = constructResponse(rsConfig, _configVersion, _configTerm); - - // Schedule and deliver the heartbeat response. - getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); + // Construct the heartbeat response containing the split config, and schedule it. + auto response = makeHeartbeatResponseWithConfig(splitConfig); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(response)); getNet()->runReadyNetworkOperations(); - auto installedConfig = getReplCoord()->getConfig(); - ASSERT_EQ(installedConfig.getReplSetName(), _donorSetName); - - // The node has updated its config and term to the new values. - ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion); - ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm); - - validateNextRequest("", _donorSetName, _configVersion, _configTerm); + // Validate that the donor node has accepted the split config, but not applied the recipient + // config. + ASSERT_EQ(getReplCoord()->getConfig().getReplSetName(), _donorSetName); + ASSERT_EQ(getReplCoord()->getConfigVersion(), splitConfig.getConfigVersion()); + ASSERT_EQ(getReplCoord()->getConfigTerm(), splitConfig.getConfigTerm()); + validateNextRequest( + "", _donorSetName, splitConfig.getConfigVersion(), splitConfig.getConfigTerm()); } -TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) { +TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeAppliesRecipientConfig) { startUp(_recipientSecondaryNode); + auto splitConfig = + serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm), + _recipientSetName, + _recipientTag); - // Config with newer version and same term. - ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm); - - // Receive a heartbeat request that tells us about a newer config. - receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + // Receive a heartbeat request that informs us about a newer config, prompting a heartbeat + // request to fetch the new config. + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1)); - getNet()->enterNetwork(); - auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); + { + InNetworkGuard guard(getNet()); + auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); - // Construct the heartbeat response containing the newer config. - auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm); + // Construct the heartbeat response containing the split config, and schedule it. + auto response = makeHeartbeatResponseWithConfig(splitConfig); + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(response)); + getNet()->runReadyNetworkOperations(); + } - // Schedule and deliver the heartbeat response. - getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); - getNet()->runReadyNetworkOperations(); + auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), _configTerm); + while (getReplCoord()->getConfigVersionAndTerm() < splitConfig.getConfigVersionAndTerm()) { + sleepFor(Milliseconds(10)); + } - // The recipient's lastCommittedOpTime and currentCommittedSnapshotOpTime are cleared on - // applying the recipient config. + // Validate that the recipient node has accepted the recipient config, and changed its set name + // to the recipientSetName. Also, confirm that the recipient's lastCommittedOpTime and + // currentCommittedSnapshotOpTime are cleared on applying the recipient config. + ASSERT_EQ(getReplCoord()->getConfig().getReplSetName(), _recipientSetName); ASSERT(getReplCoord()->getLastCommittedOpTime().isNull()); ASSERT(getReplCoord()->getCurrentCommittedSnapshotOpTime().isNull()); - // Applying the recipient config will increase the configVersion by 1. - validateNextRequest( - "", _recipientSetName, (_configVersion + 2), getReplCoord()->getConfigTerm()); + { + InNetworkGuard guard(getNet()); + validateNextRequest( + "", _recipientSetName, splitConfig.getConfigVersion(), splitConfig.getConfigTerm()); + } } TEST_F(ReplCoordHBV1SplitConfigTest, RejectMismatchedSetNameInHeartbeatResponse) { @@ -884,17 +888,19 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeNonZeroVotes) { << "tag2"))); startUp(_recipientSecondaryNode); - // Config with newer version and same term. - ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm); + auto splitConfig = + serverless::makeSplitConfig(makeRSConfigWithVersionAndTerm(_configVersion, _configTerm), + _recipientSetName, + _recipientTag); - // Receive a heartbeat request that tells us about a newer config. - receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + // Receive a heartbeat request that tells us about a split config. + receiveHeartbeatFrom(splitConfig, 1, HostAndPort("h1", 1)); getNet()->enterNetwork(); auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); - // Construct the heartbeat response containing the newer config. - auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm); + // Construct the heartbeat response containing the split config. + auto responseObj = makeHeartbeatResponseWithConfig(splitConfig); // Schedule and deliver the heartbeat response. getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); |