diff options
-rw-r--r-- | jstests/serverless/libs/basic_serverless_test.js | 2 | ||||
-rw-r--r-- | jstests/serverless/shard_split_apply_splitconfig.js | 74 | ||||
-rw-r--r-- | jstests/serverless/shard_split_enabled.js | 2 | ||||
-rw-r--r-- | jstests/serverless/shard_split_tenant_access_blocking.js | 2 | ||||
-rw-r--r-- | jstests/serverless/shard_split_wait_for_block_timestamp.js | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 232 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/serverless/SConscript | 15 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 11 |
14 files changed, 452 insertions, 26 deletions
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index ce1841327f5..acf0e6e6877 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -1,6 +1,6 @@ class BasicServerlessTest { constructor({recipientTagName, recipientSetName, nodeOptions}) { - this.donor = new ReplSetTest({name: "donor", nodes: 3, nodeOptions}); + this.donor = new ReplSetTest({name: "donor", nodes: 3, serverless: true, nodeOptions}); this.donor.startSet(); this.donor.initiate(); diff --git a/jstests/serverless/shard_split_apply_splitconfig.js b/jstests/serverless/shard_split_apply_splitconfig.js new file mode 100644 index 00000000000..2c623169f36 --- /dev/null +++ b/jstests/serverless/shard_split_apply_splitconfig.js @@ -0,0 +1,74 @@ +load("jstests/libs/fail_point_util.js"); // for "configureFailPoint" +load('jstests/libs/parallel_shell_helpers.js'); // for "startParallelShell" +load("jstests/serverless/libs/basic_serverless_test.js"); + +function populateRecipientMembers(splitConfig) { + return splitConfig.members.filter(m => m._id >= 3).map((member, idx) => { + member._id = idx; + member.votes = 1; + member.priority = 1; + return member; + }); +} + +function runReconfigToSplitConfig() { + "use strict"; + + const kRecipientSetName = "receiveSet"; + + jsTestLog("Starting serverless"); + const test = + new BasicServerlessTest({recipientTagName: "recipientNode", recipientSetName: "recipient"}); + + jsTestLog("Adding recipient nodes"); + test.addRecipientNodes(); + + test.donor.awaitSecondaryNodes(); + + jsTestLog("Reconfigure the donor to apply a `splitConfig`"); + const config = test.donor.getReplSetConfigFromNode(); + const splitConfig = Object.extend({}, config, /* deepCopy */ true); + splitConfig._id = kRecipientSetName; + splitConfig.version++; + splitConfig.members = populateRecipientMembers(splitConfig); + + // TODO: possible future validation in replSetReconfig command? + delete splitConfig.settings.replicaSetId; + + const configWithSplitConfig = Object.extend({}, config, /* deepCopy */ true); + configWithSplitConfig.version++; + configWithSplitConfig.recipientConfig = splitConfig; + configWithSplitConfig.members = configWithSplitConfig.members.filter(m => m._id < 3); + + jsTestLog("Applying the split config, and waiting for it to propagate to recipient"); + const admin = test.donor.getPrimary().getDB("admin"); + assert.commandWorked(admin.runCommand({replSetReconfig: configWithSplitConfig})); + assert.soon(() => { + const recipientNode = test.recipientNodes[0]; + const status = + assert.commandWorked(recipientNode.getDB('admin').runCommand({replSetGetStatus: 1})); + return status.set === kRecipientSetName; + }, "waiting for split config to take", 30000, 2000); + + jsTestLog("Waiting for recipient to elect a primary"); + assert.soon(() => { + const recipientNode = test.recipientNodes[0]; + const status = + assert.commandWorked(recipientNode.getDB('admin').runCommand({replSetGetStatus: 1})); + return status.members.some(member => member.stateStr === 'PRIMARY'); + }, "waiting for recipient to elect primary", 30000, 2000); + + jsTestLog("Confirming we can write to recipient"); + + const recipientPrimary = test.recipientNodes.filter(node => { + const n = node.getDB('admin')._helloOrLegacyHello(); + return n.isWritablePrimary || n.ismaster; + })[0]; + + assert(recipientPrimary); + assert.commandWorked(recipientPrimary.getDB('foo').bar.insert({fake: 'document'})); + + test.stop(); +} + +runReconfigToSplitConfig(); diff --git a/jstests/serverless/shard_split_enabled.js b/jstests/serverless/shard_split_enabled.js index 09778e54672..cb2075421b8 100644 --- a/jstests/serverless/shard_split_enabled.js +++ b/jstests/serverless/shard_split_enabled.js @@ -39,7 +39,7 @@ function makeShardSplitTest() { const donorPrimary = test.donor.getPrimary(); const adminDB = donorPrimary.getDB("admin"); - // TODO(SERVER-62346): remove this when we actually split recipients + // TODO(SERVER-64168): remove this when split is ready configureFailPoint(adminDB, "skipShardSplitWaitForSplitAcceptance"); assert(TenantMigrationUtil.isShardSplitEnabled(adminDB)); diff --git a/jstests/serverless/shard_split_tenant_access_blocking.js b/jstests/serverless/shard_split_tenant_access_blocking.js index d41b62e1f4a..7d961bda6ef 100644 --- a/jstests/serverless/shard_split_tenant_access_blocking.js +++ b/jstests/serverless/shard_split_tenant_access_blocking.js @@ -40,7 +40,7 @@ tenantIds.forEach(id => { const adminDb = donorPrimary.getDB("admin"); const blockingFailPoint = configureFailPoint(adminDb, "pauseShardSplitAfterBlocking"); -// TODO(SERVER-62346): remove this when we actually split recipients +// TODO(SERVER-64168): remove this when split is ready configureFailPoint(adminDb, "skipShardSplitWaitForSplitAcceptance"); jsTestLog("Running commitShardSplit command"); diff --git a/jstests/serverless/shard_split_wait_for_block_timestamp.js b/jstests/serverless/shard_split_wait_for_block_timestamp.js index 64b2408de4c..4cb293650b0 100644 --- a/jstests/serverless/shard_split_wait_for_block_timestamp.js +++ b/jstests/serverless/shard_split_wait_for_block_timestamp.js @@ -41,7 +41,7 @@ for (let i = 0; i < 2000; i++) { } assert.commandWorked(bulk.execute()); -// TODO(SERVER-62346): remove this when we actually split recipients +// TODO(SERVER-64168): remove this when split is ready configureFailPoint(adminDb, "skipShardSplitWaitForSplitAcceptance"); jsTestLog("Running commitShardSplit command"); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 950b18feb56..a89bf012887 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1764,6 +1764,7 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/read_write_concern_defaults_mock', + '$BUILD_DIR/mongo/db/serverless/shard_split_utils', 'isself', 'repl_coordinator_impl', 'repl_coordinator_test_fixture', diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 26565673d86..86aaa76c00f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -134,7 +134,7 @@ Status ReplicationCoordinatorExternalStateMock::storeLocalConfigDocument(Operati Status ReplicationCoordinatorExternalStateMock::replaceLocalConfigDocument(OperationContext* opCtx, const BSONObj& config) { - MONGO_UNREACHABLE; + return storeLocalConfigDocument(opCtx, config, false); } void ReplicationCoordinatorExternalStateMock::setLocalConfigDocument( diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0d63e49c91c..643bba407b4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -4067,9 +4067,8 @@ Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* opCt BSONObjBuilder* resultObj) { LOGV2(21356, "replSetInitiate admin command received from client"); - const auto replEnabled = _settings.usingReplSets(); stdx::unique_lock<Latch> lk(_mutex); - if (!replEnabled) { + if (!isReplEnabled()) { return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); } while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index ebce95d9a4f..e213e4d18d2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1267,6 +1267,16 @@ private: void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig); /** + * Check if the node should use the recipientConfig contained within newConfig. + */ + bool _shouldUseRecipientConfig(WithLock lk, const ReplSetConfig& newConfig); + + /** + * Check if the recipient config provided can be applied to the current node. + */ + Status _isRecipientConfigValid(WithLock lk, const ReplSetConfig& newConfig); + + /** * Method to write a configuration transmitted via heartbeat message to stable storage. */ void _heartbeatReconfigStore(const executor::TaskExecutor::CallbackArgs& cbd, @@ -1277,7 +1287,8 @@ private: */ void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, - StatusWith<int> myIndex); + StatusWith<int> myIndex, + bool isSplitRecipientConfig); /** * Calculates the time (in millis) left in quiesce mode and converts the value to int64. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 9a4083e525c..6d356487893 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -42,6 +42,7 @@ #include "mongo/base/status.h" #include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/global_settings.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/kill_sessions_local.h" #include "mongo/db/operation_context.h" @@ -640,6 +641,35 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, .status_with_transitional_ignore(); } +bool ReplicationCoordinatorImpl::_shouldUseRecipientConfig(WithLock lk, + const ReplSetConfig& newConfig) { + const auto& member = _rsConfig.getMemberAt(_selfIndex); + + return newConfig.getRecipientConfig()->findMemberByHostAndPort(member.getHostAndPort()); +} + +Status ReplicationCoordinatorImpl::_isRecipientConfigValid(WithLock lk, + const ReplSetConfig& newConfig) { + const auto& member = _rsConfig.getMemberAt(_selfIndex); + + if (member.getNumVotes() != 0 || member.getPriority() != 0) { + return Status(ErrorCodes::BadValue, + "Cannot apply split config to a node with non-zero vote or priority"); + } + + if (!_rsConfig.isInitialized()) { + return Status(ErrorCodes::NotYetInitialized, + "Cannot apply a split config if the current config is uninitialized"); + } + + if (_rsConfig.getReplSetName() == newConfig.getRecipientConfig()->getReplSetName()) { + return Status(ErrorCodes::InvalidReplicaSetConfig, + "The current config and recipient config cannot have the same set name."); + } + + return Status::OK(); +} + void ReplicationCoordinatorImpl::_heartbeatReconfigStore( const executor::TaskExecutor::CallbackArgs& cbd, const ReplSetConfig& newConfig) { @@ -653,12 +683,46 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } + const auto [isSplitRecipientConfig, + statusWithConf] = [&]() -> std::tuple<bool, StatusWith<ReplSetConfig>> { + if (!newConfig.isSplitConfig()) { + return std::make_tuple(false, newConfig); + } + + stdx::lock_guard<Latch> lg(_mutex); + if (!_shouldUseRecipientConfig(lg, newConfig)) { + return std::make_tuple(false, newConfig); + } + + auto status = _isRecipientConfigValid(lg, newConfig); + if (!status.isOK()) { + return std::make_tuple(false, status); + } + + auto mutableConfig = newConfig.getRecipientConfig()->getMutable(); + mutableConfig.setConfigVersion(1); + mutableConfig.setConfigTerm(1); + auto config = ReplSetConfig(std::move(mutableConfig)); + + return std::make_tuple(true, config); + }(); + + if (!statusWithConf.isOK()) { + LOGV2_WARNING(6234600, + "Not persisting new configuration in heartbeat response to disk because " + "it is invalid", + "conf"_attr = statusWithConf.getStatus()); + return; + } + + const auto configToApply = statusWithConf.getValue(); + const StatusWith<int> myIndex = validateConfigForHeartbeatReconfig( - _externalState.get(), newConfig, getGlobalServiceContext()); + _externalState.get(), configToApply, getGlobalServiceContext()); if (myIndex.getStatus() == ErrorCodes::NodeNotFound) { stdx::lock_guard<Latch> lk(_mutex); - // If this node absent in newConfig, and this node was not previously initialized, + // If this node absent in configToApply, and this node was not previously initialized, // return to kConfigUninitialized immediately, rather than storing the config and // transitioning into the RS_REMOVED state. See SERVER-15740. if (!_rsConfig.isInitialized()) { @@ -683,15 +747,24 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } else { LOGV2_FOR_HEARTBEATS(4615626, 2, - "Config with {newConfigVersionAndTerm} validated for " + "Config with {configToApplyVersionAndTerm} validated for " "reconfig; persisting to disk.", "Config validated for reconfig; persisting to disk", - "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); + "configToApplyVersionAndTerm"_attr = + configToApply.getConfigVersionAndTerm()); auto opCtx = cc().makeOperationContext(); // Don't write the no-op for config learned via heartbeats. - auto status = _externalState->storeLocalConfigDocument( - opCtx.get(), newConfig.toBSON(), false /* writeOplog */); + auto status = + [&, isSplitRecipientConfig = isSplitRecipientConfig, config = configToApply]() { + if (isSplitRecipientConfig) { + return _externalState->replaceLocalConfigDocument(opCtx.get(), config.toBSON()); + } else { + return _externalState->storeLocalConfigDocument( + opCtx.get(), config.toBSON(), false /* writeOplog */); + } + }(); + // Wait for durability of the new config document. try { JournalFlusher::get(opCtx.get())->waitForJournalFlush(); @@ -726,7 +799,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } bool isArbiter = myIndex.isOK() && myIndex.getValue() != -1 && - newConfig.getMemberAt(myIndex.getValue()).isArbiter(); + configToApply.getMemberAt(myIndex.getValue()).isArbiter(); if (isArbiter) { ReplicaSetAwareServiceRegistry::get(_service).onBecomeArbiter(); @@ -739,13 +812,13 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( LOGV2_FOR_HEARTBEATS( 4615627, 2, - "New configuration with {newConfigVersionAndTerm} persisted " + "New configuration with {configToApplyVersionAndTerm} persisted " "to local storage; installing new config in memory", "New configuration persisted to local storage; installing new config in memory", - "newConfigVersionAndTerm"_attr = newConfig.getConfigVersionAndTerm()); + "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, newConfig, myIndex); + _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -773,7 +846,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, - StatusWith<int> myIndex) { + StatusWith<int> myIndex, + const bool isSplitRecipientConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -786,7 +860,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( _replExecutor ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, [=](const executor::TaskExecutor::CallbackArgs& cbData) { - _heartbeatReconfigFinish(cbData, newConfig, myIndex); + _heartbeatReconfigFinish( + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -810,7 +885,8 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( _replExecutor ->onEvent(electionFinishedEvent, [=](const executor::TaskExecutor::CallbackArgs& cbData) { - _heartbeatReconfigFinish(cbData, newConfig, myIndex); + _heartbeatReconfigFinish( + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -863,7 +939,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || - _selfIndex < 0); + _selfIndex < 0 || isSplitRecipientConfig); if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index bfb922c0245..36ce4bdaf08 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/task_runner_test_fixture.h" #include "mongo/db/repl/topology_coordinator.h" +#include "mongo/db/serverless/shard_split_utils.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -441,6 +442,237 @@ TEST_F( exitNetwork(); } } +class ReplCoordHBV1SplitConfigTest : public ReplCoordHBV1Test { +public: + void startUp(const std::string& hostAndPort) { + BSONObj configBson = + BSON("_id" << _donorSetName << "version" << _configVersion << "term" << _configTerm + << "members" << _members << "protocolVersion" << 1); + ReplSetConfig rsConfig = assertMakeRSConfig(configBson); + assertStartSuccess(configBson, HostAndPort(hostAndPort)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + // Black hole initial heartbeat requests. + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + + // Ignore the initial heartbeat requests sent to each of the 5 other nodes of this replica + // set. + net->blackHole(net->getNextReadyRequest()); + net->blackHole(net->getNextReadyRequest()); + net->blackHole(net->getNextReadyRequest()); + net->blackHole(net->getNextReadyRequest()); + net->blackHole(net->getNextReadyRequest()); + + net->exitNetwork(); + } + + NetworkInterfaceMock::NetworkOperationIterator validateNextRequest(const std::string& target, + const std::string& setName, + const int configVersion, + const int termVersion) { + ASSERT(getNet()->hasReadyRequests()); + + ReplSetHeartbeatArgsV1 hbArgs; + auto noi = getNet()->getNextReadyRequest(); + const RemoteCommandRequest& hbrequest = noi->getRequest(); + + if (!target.empty()) { + // We might not know the exact target as ordering might change. In that case, simply + // validate the content of the requests and ignore to which node it's sent. + ASSERT_EQUALS(HostAndPort(target, 1), hbrequest.target); + } + ASSERT_OK(hbArgs.initialize(hbrequest.cmdObj)); + ASSERT_EQUALS(setName, hbArgs.getSetName()); + ASSERT_EQUALS(configVersion, hbArgs.getConfigVersion()); + ASSERT_EQUALS(termVersion, hbArgs.getConfigTerm()); + + return noi; + } + + BSONObj constructResponse(const ReplSetConfig& config, + const int configVersion, + const int termVersion) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(_donorSetName); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setConfigTerm(config.getConfigTerm()); + // The smallest valid optime in PV1. + OpTime opTime(Timestamp(), 0); + hbResp.setAppliedOpTimeAndWallTime({opTime, Date_t()}); + hbResp.setDurableOpTimeAndWallTime({opTime, Date_t()}); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder); + + // Add the raw config object. + auto conf = ReplSetConfig::parse(makeConfigObj(configVersion, termVersion)); + auto splitConf = serverless::makeSplitConfig(conf, _recipientSetName, _recipientTag); + + // makeSplitConf increment the config version. We don't want that here as it makes the unit + // test case harder to follow. + BSONObjBuilder splitBuilder(splitConf.toBSON().removeField("version")); + splitBuilder.append("version", configVersion); + + responseBuilder << "config" << splitBuilder.obj(); + return responseBuilder.obj(); + } + + BSONObj makeConfigObj(long long version, boost::optional<long long> term) { + BSONObjBuilder bob; + bob.appendElements(BSON("_id" + << "mySet" + << "version" << version << "members" << _members + << "protocolVersion" << 1)); + if (term) { + bob.append("term", *term); + } + return bob.obj(); + } + + ReplSetConfig makeRSConfigWithVersionAndTerm(long long version, long long term) { + return assertMakeRSConfig(makeConfigObj(version, term)); + } + + unittest::MinimumLoggedSeverityGuard severityGuard{logv2::LogComponent::kDefault, + logv2::LogSeverity::Debug(3)}; + + int _configVersion = 2; + int _configTerm = 2; + + BSONArray _members = BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1") + << BSON("_id" << 4 << "host" + << "h4:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2")) + << BSON("_id" << 5 << "host" + << "h5:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2")) + << BSON("_id" << 6 << "host" + << "h6:1" + << "votes" << 0 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2"))); + +protected: + const std::string _donorSetName{"mySet"}; + const std::string _recipientSetName{"newSet"}; + const std::string _recipientTag{"recip"}; + const std::string _donorSecondaryNode{"h2:1"}; + const std::string _recipientSecondaryNode{"h4:1"}; +}; + +TEST_F(ReplCoordHBV1SplitConfigTest, DonorNodeDontApplyConfig) { + startUp(_donorSecondaryNode); + + // Config with newer version and same term. + ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm(_configVersion + 1, _configTerm); + + // Receive a heartbeat request that tells us about a newer config. + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + getNet()->enterNetwork(); + auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); + + _configVersion += 1; + + // Construct the heartbeat response containing the newer config. + auto responseObj = constructResponse(rsConfig, _configVersion, _configTerm); + + // Schedule and deliver the heartbeat response. + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); + getNet()->runReadyNetworkOperations(); + + auto installedConfig = getReplCoord()->getConfig(); + ASSERT_EQ(installedConfig.getReplSetName(), _donorSetName); + + // The node has updated its config and term to the new values. + ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion); + ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm); + + validateNextRequest("", _donorSetName, _configVersion, _configTerm); +} + +TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) { + startUp(_recipientSecondaryNode); + + // Config with newer version and same term. + ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm); + + // Receive a heartbeat request that tells us about a newer config. + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + getNet()->enterNetwork(); + auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); + + // Construct the heartbeat response containing the newer config. + auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm); + + // Schedule and deliver the heartbeat response. + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); + getNet()->runReadyNetworkOperations(); + + // The recipient's config and term versions are set to 1. + ASSERT_EQ(getReplCoord()->getConfigVersion(), 1); + ASSERT_EQ(getReplCoord()->getConfigTerm(), 1); + + validateNextRequest("", _recipientSetName, 1, 1); +} + +TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeNonZeroVotes) { + _members = BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1") + << BSON("_id" << 4 << "host" + << "h4:1" + << "votes" << 1 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2")) + << BSON("_id" << 5 << "host" + << "h5:1" + << "votes" << 1 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2")) + << BSON("_id" << 6 << "host" + << "h6:1" + << "votes" << 1 << "priority" << 0 << "tags" + << BSON("recip" + << "tag2"))); + startUp(_recipientSecondaryNode); + + // Config with newer version and same term. + ReplSetConfig rsConfig = makeRSConfigWithVersionAndTerm((_configVersion + 1), _configTerm); + + // Receive a heartbeat request that tells us about a newer config. + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + getNet()->enterNetwork(); + auto noi = validateNextRequest("h1", _donorSetName, _configVersion, _configTerm); + + // Construct the heartbeat response containing the newer config. + auto responseObj = constructResponse(rsConfig, _configVersion + 1, _configTerm); + + // Schedule and deliver the heartbeat response. + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); + getNet()->runReadyNetworkOperations(); + + // The node rejected the config as it's a voting node and its version has not changed. + ASSERT_EQ(getReplCoord()->getConfigVersion(), _configVersion); + ASSERT_EQ(getReplCoord()->getConfigTerm(), _configTerm); + ASSERT_EQ(getReplCoord()->getSettings().ourSetName(), _donorSetName); +} class ReplCoordHBV1ReconfigTest : public ReplCoordHBV1Test { public: diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 8c9430b9ed8..4ae43ea1d98 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -1388,9 +1388,22 @@ StatusWith<bool> TopologyCoordinator::setLastOptime(const UpdatePositionArgs::Up "appliedOpTime"_attr = args.appliedOpTime, "durableOpTime"_attr = args.durableOpTime); + auto* memberData = _findMemberDataByMemberId(memberId.getData()); + + // If we are applying a splitConfig for a shard split, we may still be receiving updates for + // nodes that have been removed from the donor set. + if (_rsConfig.isSplitConfig() && !memberData && + memberId.getData() >= _rsConfig.getNumMembers()) { + LOGV2(6234605, + "Skipping update from node", + "data"_attr = memberId.getData(), + "conf"_attr = _rsConfig); + // Do not advance optime + return false; + } + // While we can accept replSetUpdatePosition commands across config versions, we still do not // allow receiving them from a node that is not in our config. - auto* memberData = _findMemberDataByMemberId(memberId.getData()); if (!memberData) { invariant(!_rsConfig.findMemberByID(memberId.getData())); diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript index 82b230ac6ab..ad92d014d37 100644 --- a/src/mongo/db/serverless/SConscript +++ b/src/mongo/db/serverless/SConscript @@ -46,11 +46,22 @@ env.Library( ) env.Library( + target='shard_split_utils', + source=[ + 'shard_split_utils.cpp' + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/dbhelpers', + '$BUILD_DIR/mongo/db/repl/replica_set_messages', + 'shard_split_state_machine', + ] +) + +env.Library( target='shard_split_donor_service', source=[ 'shard_split_donor_service.cpp', 'shard_split_donor_op_observer.cpp', - 'shard_split_utils.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/primary_only_service', @@ -64,6 +75,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', + 'shard_split_utils' ] ) @@ -83,5 +95,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/dbtests/mocklib', 'shard_split_donor_service', + 'shard_split_utils', ] ) diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index b101d022f1e..120d1b60890 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -2750,10 +2750,15 @@ var ReplSetTest = function(opts) { oplogSize: this.oplogSize, keyFile: this.keyFile, port: _useBridge ? _unbridgedPorts[n] : this.ports[n], - replSet: this.useSeedList ? this.getURL() : this.name, dbpath: "$set-$node" }; + if (this.serverless == null) { + defaults.replSet = this.useSeedList ? this.getURL() : this.name; + } else { + defaults.serverless = true; + } + if (options && options.binVersion && jsTest.options().useRandomBinVersionsWithinReplicaSet) { throw new Error( @@ -3211,6 +3216,7 @@ var ReplSetTest = function(opts) { self.name = opts.name || jsTest.name(); print('Starting new replica set ' + self.name); + self.serverless = opts.serverless; self.useHostName = opts.useHostName == undefined ? true : opts.useHostName; self.host = self.useHostName ? (opts.host || getHostName()) : 'localhost'; self.oplogSize = opts.oplogSize || 40; @@ -3343,10 +3349,11 @@ var ReplSetTest = function(opts) { * Constructor, which instantiates the ReplSetTest object from existing nodes. */ function _constructFromExistingNodes( - {name, nodeHosts, nodeOptions, keyFile, host, waitForKeys}) { + {name, serverless, nodeHosts, nodeOptions, keyFile, host, waitForKeys}) { print('Recreating replica set from existing nodes ' + tojson(nodeHosts)); self.name = name; + self.serverless = serverless; self.ports = nodeHosts.map(node => node.split(':')[1]); self.nodes = nodeHosts.map((node) => { const conn = Mongo(node); |