diff options
22 files changed, 150 insertions, 174 deletions
diff --git a/buildscripts/resmokeconfig/suites/serverless.yml b/buildscripts/resmokeconfig/suites/serverless.yml index eeef77194a2..6b8c43e586a 100644 --- a/buildscripts/resmokeconfig/suites/serverless.yml +++ b/buildscripts/resmokeconfig/suites/serverless.yml @@ -4,6 +4,9 @@ selector: roots: - jstests/serverless/*.js - jstests/serverless/change_streams/**/*.js + exclude_files: + # TODO(SERVER-69227): Unexclude this test when we resolve the performance issues with split. + - jstests/serverless/shard_split_performance_test.js executor: config: diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index eeb7f5630f1..69aeab56616 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -575,6 +575,20 @@ class BasicServerlessTest { assert(recipientRstArgs.nodeHosts.length >= 3); return createRst(recipientRstArgs, true); } + + /** + * @returns An array of recipient nodes. + */ + getRecipientNodes() { + return this.recipientNodes; + } + + /** + * @returns An array of donor nodes. + */ + getDonorNodes() { + return this.donor.nodes.filter(node => !this.recipientNodes.includes(node)); + } } BasicServerlessTest.kConfigSplitDonorsNS = "config.shardSplitDonors"; diff --git a/jstests/serverless/shard_split_basic_test.js b/jstests/serverless/shard_split_basic_test.js index f79fbaad229..2472e945d24 100644 --- a/jstests/serverless/shard_split_basic_test.js +++ b/jstests/serverless/shard_split_basic_test.js @@ -31,10 +31,6 @@ assert.eq(status.shardSplits.totalAborted, 0); assert.gt(status.shardSplits.totalCommittedDurationMillis, 0); assert.gt(status.shardSplits.totalCommittedDurationWithoutCatchupMillis, 0); -const recipientPrimary = test.getRecipient().getPrimary(); -const recipientConfig = recipientPrimary.adminCommand({replSetGetConfig: 1}).config; -assert(!recipientConfig.settings.shardSplitBlockOpTime); - test.cleanupSuccesfulCommitted(operation.migrationId, tenantIds); test.stop(); })(); diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js index aca63f2daa2..effb881bb9a 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js @@ -99,8 +99,8 @@ function testUnblockBlockedReadsAfterMigrationAborted(testCase, dbName, collName const db = donorPrimary.getDB(dbName); runCommandForConcurrentReadTest(db, command, null, testCase.isTransaction); if (testCase.isSupportedOnSecondaries) { - const primaryPort = String(donorPrimary).split(":")[1]; - const secondaries = donorRst.nodes.filter(node => node.port != primaryPort); + const secondaries = + test.getDonorNodes().filter(node => node.adminCommand({hello: 1}).secondary); secondaries.forEach(node => { const db = node.getDB(dbName); runCommandForConcurrentReadTest(db, command, null, testCase.isTransaction); diff --git a/jstests/serverless/shard_split_rejects_multiple_ops.js b/jstests/serverless/shard_split_rejects_multiple_ops.js index e35a6d06cb9..010a280fc32 100644 --- a/jstests/serverless/shard_split_rejects_multiple_ops.js +++ b/jstests/serverless/shard_split_rejects_multiple_ops.js @@ -22,8 +22,8 @@ function commitShardSplitConcurrently() { const donorPrimary = test.donor.getPrimary(); - let fp = configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking"); - let fpAfterDecision = + const fp = configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking"); + const fpAfterDecision = configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterDecision"); const operation = test.createSplitOperation(tenantIds); @@ -45,7 +45,7 @@ function commitShardSplitConcurrently() { // blocks before processing any `forgetShardSplit` command. fpAfterDecision.wait(); - let forgetThread = operation.forgetAsync(); + const forgetThread = operation.forgetAsync(); // fails because the commitShardSplit hasn't be garbage collected yet. assert.commandFailedWithCode(donorPrimary.adminCommand({ @@ -58,7 +58,7 @@ function commitShardSplitConcurrently() { 117); // ConflictingOperationInProgress fpAfterDecision.off(); assert.commandWorked(splitThread.returnData()); - forgetThread.join(); + assert.commandWorked(forgetThread.returnData()); test.cleanupSuccesfulCommitted(operation.migrationId, tenantIds); diff --git a/jstests/serverless/shard_split_wait_for_block_timestamp.js b/jstests/serverless/shard_split_wait_for_block_timestamp.js index c5f15900ca3..c114cd55707 100644 --- a/jstests/serverless/shard_split_wait_for_block_timestamp.js +++ b/jstests/serverless/shard_split_wait_for_block_timestamp.js @@ -42,10 +42,8 @@ assert.commandWorked(bulk.execute()); jsTestLog("Running commitShardSplit command"); const firstOperation = test.createSplitOperation(tenantIds); -assert.isnull(findSplitOperation(donorPrimary, firstOperation.migrationId)); -const res = firstOperation.commit({retryOnRetryableErrors: false}); -assert.commandFailed(res); -assert.eq(res.code, ErrorCodes.TenantMigrationAborted); +assert.commandFailedWithCode(firstOperation.commit({retryOnRetryableErrors: false}), + ErrorCodes.TenantMigrationAborted); firstOperation.forget(); test.cleanupSuccesfulAborted(firstOperation.migrationId, tenantIds); diff --git a/jstests/serverless/shard_split_write_during_split_stepdown.js b/jstests/serverless/shard_split_write_during_split_stepdown.js index 1670b5b57b3..8c943118e08 100644 --- a/jstests/serverless/shard_split_write_during_split_stepdown.js +++ b/jstests/serverless/shard_split_write_during_split_stepdown.js @@ -52,14 +52,11 @@ assert.commandWorked(donorPrimary.adminCommand({replSetStepDown: 360, force: tru blockingFP.off(); -splitThread.join(); -const result = splitThread.returnData(); -assert.eq(result.ok, 0); -assert.eq(result.code, ErrorCodes.InterruptedDueToReplStateChange); +assert.commandFailedWithCode(splitThread.returnData(), ErrorCodes.InterruptedDueToReplStateChange); -writeThread.join(); const writeResults = writeThread.returnData(); writeResults.forEach(res => { + jsTestLog(`result: ${res}`); assert.eq(res, ErrorCodes.TenantMigrationCommitted); }); diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index 4565587f3c3..46f96bbbb59 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -791,14 +791,6 @@ bool ReplSetConfig::areWriteConcernModesTheSame(ReplSetConfig* otherConfig) cons return true; } -boost::optional<OpTime> ReplSetConfig::getShardSplitBlockOpTime() const { - return getSettings()->getShardSplitBlockOpTime(); -} - -void MutableReplSetConfig::removeShardSplitBlockOpTime() { - getSettings()->setShardSplitBlockOpTime(boost::none); -} - MemberConfig* MutableReplSetConfig::_findMemberByID(MemberId id) { for (auto it = getMembers().begin(); it != getMembers().end(); ++it) { if (it->getId() == id) { diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index c8d3943e08d..b6e34afdc19 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -139,11 +139,6 @@ public: */ void setSecondaryDelaySecsFieldDefault(MemberId memberId); - /** - * Removes the opTime field stored for an in-progress shard split operation. - */ - void removeShardSplitBlockOpTime(); - protected: MutableReplSetConfig() = default; @@ -565,12 +560,6 @@ public: */ bool areWriteConcernModesTheSame(ReplSetConfig* otherConfig) const; - /** - * Returns the opTime when an in-progress split operation started blocking requests, if one is - * currently running. - */ - boost::optional<OpTime> getShardSplitBlockOpTime() const; - private: /** * Sets replica set ID to 'defaultReplicaSetId' if 'cfg' does not contain an ID. diff --git a/src/mongo/db/repl/repl_set_config.idl b/src/mongo/db/repl/repl_set_config.idl index 9df1a636444..47b94a347a0 100644 --- a/src/mongo/db/repl/repl_set_config.idl +++ b/src/mongo/db/repl/repl_set_config.idl @@ -129,9 +129,6 @@ structs: type: objectid optional: true validator: { callback: "validateReplicaSetIdNotNull"} - shardSplitBlockOpTime: - type: optime - optional: true ReplSetConfigBase: description: "The complete configuration for the replica set" diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index ddc80433aa7..4a1ba95f347 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1360,10 +1360,11 @@ private: void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig); /** - * Determines if the provided config is a split config, and validates it for installation. + * Accepts a ReplSetConfig and resolves it either to itself, or the embedded shard split + * recipient config if it's present and self is a shard split recipient. Returns a tuple of the + * resolved config and a boolean indicating whether a recipient config was found. */ - std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>> _resolveConfigToApply( - const ReplSetConfig& config); + std::tuple<StatusWith<ReplSetConfig>, bool> _resolveConfigToApply(const ReplSetConfig& config); /** * Method to write a configuration transmitted via heartbeat message to stable storage. @@ -1377,7 +1378,7 @@ private: void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - boost::optional<OpTime> shardSplitBlockOpTime); + bool isSplitRecipientConfig); /** * Calculates the time (in millis) left in quiesce mode and converts the value to int64. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 681baa5e13f..057d101a6ff 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -664,10 +664,10 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, .status_with_transitional_ignore(); } -std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>> -ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { +std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply( + const ReplSetConfig& config) { if (!_settings.isServerless() || !config.isSplitConfig()) { - return {config, boost::none}; + return {config, false}; } stdx::unique_lock<Latch> lk(_mutex); @@ -684,12 +684,12 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { }); if (foundSelfInMembers) { - return {config, boost::none}; + return {config, false}; } return {Status(ErrorCodes::NotYetInitialized, "Cannot apply a split config if the current config is uninitialized"), - boost::none}; + false}; } auto recipientConfig = config.getRecipientConfig(); @@ -697,25 +697,21 @@ ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { if (selfMember.getNumVotes() > 0) { return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), - boost::none}; + false}; } if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { return {Status(ErrorCodes::InvalidReplicaSetConfig, "Cannot apply recipient config since current config and recipient " "config have the same set name."), - boost::none}; + false}; } - invariant(recipientConfig->getShardSplitBlockOpTime()); - auto shardSplitBlockOpTime = *recipientConfig->getShardSplitBlockOpTime(); - auto mutableConfig = recipientConfig->getMutable(); - mutableConfig.removeShardSplitBlockOpTime(); - return {ReplSetConfig(std::move(mutableConfig)), shardSplitBlockOpTime}; + return {ReplSetConfig(*recipientConfig), true}; } - return {config, boost::none}; + return {config, false}; } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( @@ -731,7 +727,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [swConfig, shardSplitBlockOpTime] = _resolveConfigToApply(newConfig); + const auto [swConfig, isSplitRecipientConfig] = _resolveConfigToApply(newConfig); if (!swConfig.isOK()) { LOGV2_WARNING(6234600, "Ignoring new configuration in heartbeat response because it is invalid", @@ -744,7 +740,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } const auto configToApply = swConfig.getValue(); - if (shardSplitBlockOpTime) { + if (isSplitRecipientConfig) { LOGV2(6309200, "Applying a recipient config for a shard split operation.", "config"_attr = configToApply); @@ -804,15 +800,17 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( configToApply.getConfigVersionAndTerm()); auto opCtx = cc().makeOperationContext(); - // Don't write the no-op for config learned via heartbeats. - auto status = [&, isRecipientConfig = shardSplitBlockOpTime.has_value()]() { - if (isRecipientConfig) { - return _externalState->replaceLocalConfigDocument(opCtx.get(), - configToApply.toBSON()); - } else { - return _externalState->storeLocalConfigDocument( - opCtx.get(), configToApply.toBSON(), false /* writeOplog */); + auto status = [this, + opCtx = opCtx.get(), + configToApply, + isSplitRecipientConfig = isSplitRecipientConfig]() { + if (isSplitRecipientConfig) { + return _externalState->replaceLocalConfigDocument(opCtx, configToApply.toBSON()); } + + // Don't write the no-op for config learned via heartbeats. + return _externalState->storeLocalConfigDocument( + opCtx, configToApply.toBSON(), false /* writeOplog */); }(); // Wait for durability of the new config document. @@ -859,7 +857,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( shouldStartDataReplication = true; } - if (shardSplitBlockOpTime) { + if (isSplitRecipientConfig) { // Donor access blockers are removed from donor nodes via the shard split op observer. // Donor access blockers are removed from recipient nodes when the node applies the // recipient config. When the recipient primary steps up it will delete its state @@ -879,7 +877,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, shardSplitBlockOpTime); + _heartbeatReconfigFinish(cbd, configToApply, myIndex, isSplitRecipientConfig); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -910,7 +908,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - boost::optional<OpTime> shardSplitBlockOpTime) { + bool isSplitRecipientConfig) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -924,7 +922,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, shardSplitBlockOpTime); + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -949,7 +947,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->onEvent(electionFinishedEvent, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, shardSplitBlockOpTime); + cbData, newConfig, myIndex, isSplitRecipientConfig); }) .status_with_transitional_ignore(); return; @@ -1002,7 +1000,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || - _selfIndex < 0 || shardSplitBlockOpTime); + _selfIndex < 0 || isSplitRecipientConfig); if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { @@ -1040,8 +1038,14 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const PostMemberStateUpdateAction action = _setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue); - if (shardSplitBlockOpTime) { - _topCoord->resetLastCommittedOpTime(*shardSplitBlockOpTime); + if (isSplitRecipientConfig) { + LOGV2(8423364, + "Clearing the commit point and current committed snapshot after applying split " + "recipient config."); + // Clear lastCommittedOpTime by passing in a default constructed OpTimeAndWallTime, and + // indicating that this is `forInitiate`. + _topCoord->advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTime(), false, true); + _clearCommittedSnapshot_inlock(); } lk.unlock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 23e1d594488..62e2c412cc6 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -705,8 +705,7 @@ public: // Add the raw config object. auto conf = ReplSetConfig::parse(makeConfigObj(configVersion, termVersion)); - auto splitConf = serverless::makeSplitConfig( - conf, _recipientSetName, _recipientTag, repl::OpTime(Timestamp(12345, 1), termVersion)); + auto splitConf = serverless::makeSplitConfig(conf, _recipientSetName, _recipientTag); // makeSplitConf increment the config version. We don't want that here as it makes the unit // test case harder to follow. @@ -820,10 +819,10 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) { getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); getNet()->runReadyNetworkOperations(); - // The recipient's lastCommittedOpTime is reset to the blockOpTime on applying the recipient - // config. - ASSERT_EQ(getReplCoord()->getLastCommittedOpTime(), - repl::OpTime(Timestamp(12345, 1), getReplCoord()->getConfigTerm())); + // The recipient's lastCommittedOpTime and currentCommittedSnapshotOpTime are cleared on + // applying the recipient config. + ASSERT(getReplCoord()->getLastCommittedOpTime().isNull()); + ASSERT(getReplCoord()->getCurrentCommittedSnapshotOpTime().isNull()); // Applying the recipient config will increase the configVersion by 1. validateNextRequest( diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index 3c61693428e..003e70c0ff7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -1391,10 +1391,8 @@ TEST_F(ReplCoordReconfigTest, MustSendHeartbeatToSplitConfigRecipients) { BSONObjBuilder result; const auto opCtx = makeOperationContext(); - auto newConfig = mongo::serverless::makeSplitConfig(ReplSetConfig::parse(oldConfigObj), - "recipientSet", - recipientTagName, - repl::OpTime(Timestamp(100, 0), 1)); + auto newConfig = mongo::serverless::makeSplitConfig( + ReplSetConfig::parse(oldConfigObj), "recipientSet", recipientTagName); Status status(ErrorCodes::InternalError, "Not Set"); stdx::thread reconfigThread; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 691c7411e9e..88b4b07d20d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -171,7 +171,9 @@ void ReplicationCoordinatorMock::setAwaitReplicationReturnValueFunction( SharedSemiFuture<void> ReplicationCoordinatorMock::awaitReplicationAsyncNoWTimeout( const OpTime& opTime, const WriteConcernOptions& writeConcern) { - MONGO_UNREACHABLE; + auto opCtx = cc().makeOperationContext(); + auto result = _awaitReplicationReturnValueFunction(opCtx.get(), opTime); + return Future<ReplicationCoordinator::StatusAndDuration>::makeReady(result).ignoreValue(); } void ReplicationCoordinatorMock::stepDown(OperationContext* opCtx, diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index b112eab9af0..030ad0a2792 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -2965,11 +2965,6 @@ bool TopologyCoordinator::advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTim return true; } -void TopologyCoordinator::resetLastCommittedOpTime(const OpTime& lastCommittedOpTime) { - LOGV2(8423364, "Resetting commit point", "lastCommittedOpTime"_attr = lastCommittedOpTime); - _lastCommittedOpTimeAndWallTime = OpTimeAndWallTime(lastCommittedOpTime, Date_t::now()); -} - OpTime TopologyCoordinator::getLastCommittedOpTime() const { return _lastCommittedOpTimeAndWallTime.opTime; } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 6c2231d3904..3285a5b4825 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -322,11 +322,6 @@ public: bool forInitiate = false); /** - * Resets the commit point to the provided opTime, with a wall time of now. - */ - void resetLastCommittedOpTime(const OpTime& lastCommittedOpTime); - - /** * Returns the OpTime of the latest majority-committed op known to this server. */ OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 67aa77142d5..5d59e965d3e 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -378,12 +378,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( // only cancel operations on stepdown from here out _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); - LOGV2(6236700, - "Shard split decision reached", - "id"_attr = _migrationId, - "state"_attr = ShardSplitDonorState_serializer(_stateDoc.getState()), - "status"_attr = status); - { stdx::lock_guard<Latch> lg(_mutex); if (!_stateDoc.getExpireAt()) { @@ -403,6 +397,11 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( status, executor, primaryToken, abortToken); } + LOGV2(6236700, + "Shard split decision reached", + "id"_attr = _migrationId, + "state"_attr = ShardSplitDonorState_serializer(_stateDoc.getState())); + return ExecutorFuture(**executor, DurableState{_stateDoc.getState(), _abortReason}); }) .unsafeToInlineFuture(); @@ -417,15 +416,12 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return ExecutorFuture(**executor) .then([&] { return _decisionPromise.getFuture().semi().ignoreValue(); }) - .onCompletion([this, executor, primaryToken](Status status) { - // Always remove the split config, whether the operation was aborted or - // committed. + .then([this, executor, primaryToken]() { + // Always remove the split config after the split decision auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitBeforeSplitConfigRemoval.pauseWhileSetAndNotCanceled(opCtx.get(), primaryToken); - return _removeSplitConfigFromDonor(executor, primaryToken).then([status]() { - return status; - }); + return _removeSplitConfigFromDonor(executor, primaryToken); }) .then([this, executor, primaryToken] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); @@ -461,21 +457,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return ExecutorFuture(**executor) .then([&] { return _garbageCollectablePromise.getFuture().semi().ignoreValue(); }) - .then([this, executor, primaryToken, anchor = shared_from_this()] { + .then([this, executor, primaryToken] { return _waitForGarbageCollectionTimeoutThenDeleteStateDoc(executor, primaryToken); }) - .onCompletion( - [this, executor, primaryToken, anchor = shared_from_this()](Status status) { - stdx::lock_guard<Latch> lg(_mutex); - - LOGV2(8423356, - "Shard split completed.", - "id"_attr = _stateDoc.getId(), - "status"_attr = status, - "abortReason"_attr = _abortReason); - - return status; - }) + .then([this, executor, primaryToken, anchor = shared_from_this()] { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(8423356, + "Shard split completed.", + "id"_attr = _stateDoc.getId(), + "abortReason"_attr = _abortReason); + }) .unsafeToInlineFuture(); }); @@ -685,8 +676,13 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); + // It's possible that there has been an election since the blockOpTime was recorded, so we use + // the blockOpTime's timestamp and the current configTerm when waiting for recipient nodes to + // reach the blockTimestamp. This is okay because these timestamps are cluster times, and so are + // guaranteed to increase even across terms. invariant(_stateDoc.getBlockOpTime()); - auto blockOpTime = *_stateDoc.getBlockOpTime(); + auto blockOpTime = + repl::OpTime(_stateDoc.getBlockOpTime()->getTimestamp(), replCoord->getConfigTerm()); invariant(_stateDoc.getRecipientTagName()); auto recipientTagName = *_stateDoc.getRecipientTagName(); @@ -723,14 +719,12 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi auto recipientSetName = _stateDoc.getRecipientSetName()->toString(); invariant(_stateDoc.getRecipientTagName()); auto recipientTagName = _stateDoc.getRecipientTagName()->toString(); - invariant(_stateDoc.getBlockOpTime()); - auto blockOpTime = *_stateDoc.getBlockOpTime(); auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); invariant(replCoord); return serverless::makeSplitConfig( - replCoord->getConfig(), recipientSetName, recipientTagName, blockOpTime); + replCoord->getConfig(), recipientSetName, recipientTagName); }(); LOGV2(6309100, @@ -759,7 +753,8 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi ExecutorFuture<void> remoteAdminCommand(TaskExecutorPtr executor, const CancellationToken& token, const HostAndPort remoteNode, - const BSONObj& command) { + const BSONObj& command, + std::function<bool(Status)> untilCondition = nullptr) { return AsyncTry([executor, token, remoteNode, command] { executor::RemoteCommandRequest request(remoteNode, "admin", command, nullptr); auto hasWriteConcern = command.hasField(WriteConcernOptions::kWriteConcernField); @@ -774,7 +769,11 @@ ExecutorFuture<void> remoteAdminCommand(TaskExecutorPtr executor, return status; }); }) - .until([](Status status) { + .until([untilCondition](Status status) { + if (untilCondition) { + return untilCondition(status); + } + return status.isOK() || (!ErrorCodes::isRetriableError(status) && !ErrorCodes::isNetworkTimeoutError(status)); @@ -786,8 +785,11 @@ ExecutorFuture<void> remoteAdminCommand(TaskExecutorPtr executor, ExecutorFuture<void> sendStepUpToRecipient(TaskExecutorPtr executor, const CancellationToken& token, const HostAndPort recipientPrimary) { - return remoteAdminCommand( - executor, token, recipientPrimary, BSON("replSetStepUp" << 1 << "skipDryRun" << true)); + return remoteAdminCommand(executor, + token, + recipientPrimary, + BSON("replSetStepUp" << 1 << "skipDryRun" << true), + [](Status status) { return status.isOK(); }); } ExecutorFuture<void> waitForMajorityWriteOnRecipient(TaskExecutorPtr executor, diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 474d6d68fa0..2ed287d01ce 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -524,27 +524,28 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } -TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpFails) { - auto opCtx = makeOperationContext(); - test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); - mockCommandReplies(&_recipientSet); - _skipAcceptanceFP.reset(); - - auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, defaultStateDocument().toBSON()); - ASSERT(serviceInstance.get()); - ASSERT_EQ(_uuid, serviceInstance->getId()); - - waitForMonitorAndProcessHello(); - waitForReplSetStepUp(Status(ErrorCodes::OperationFailed, "")); - // No need to wait for recipient majority no-op write, since the stepup failed. - - auto result = serviceInstance->decisionFuture().get(); - ASSERT(result.abortReason); - ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); -} +// TODO(SERVER-69227): Re-enable this test when we are no longer retrying replSetStepUp. +// TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpFails) { +// auto opCtx = makeOperationContext(); +// test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); +// test::shard_split::reconfigToAddRecipientNodes( +// getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); +// mockCommandReplies(&_recipientSet); +// _skipAcceptanceFP.reset(); + +// auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( +// opCtx.get(), _service, defaultStateDocument().toBSON()); +// ASSERT(serviceInstance.get()); +// ASSERT_EQ(_uuid, serviceInstance->getId()); + +// waitForMonitorAndProcessHello(); +// waitForReplSetStepUp(Status(ErrorCodes::OperationFailed, "")); +// // No need to wait for recipient majority no-op write, since the stepup failed. + +// auto result = serviceInstance->decisionFuture().get(); +// ASSERT(result.abortReason); +// ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); +// } TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) { auto opCtx = makeOperationContext(); diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index 35822ba118c..465e32826ce 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -81,8 +81,7 @@ ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, const std::string& recipientSetName, - const std::string& recipientTagName, - const repl::OpTime& blockOpTime) { + const std::string& recipientTagName) { dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName()); uassert(6201800, "We can not make a split config of an existing split config.", @@ -132,11 +131,10 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, configNoMembersBson.getField("settings").isABSONObj()) { BSONObj settings = configNoMembersBson.getField("settings").Obj(); return settings.removeField("replicaSetId") - .addFields( - BSON("replicaSetId" << OID::gen() << "shardSplitBlockOpTime" << blockOpTime)); + .addFields(BSON("replicaSetId" << OID::gen())); } - return BSON("shardSplitBlockOpTime" << blockOpTime); + return BSON("replicaSetId" << OID::gen()); }()); BSONObjBuilder splitConfigBob(configNoMembersBson); diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index a8387072ff1..db24008a055 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -60,8 +60,7 @@ ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config */ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, const std::string& recipientSetName, - const std::string& recipientTagName, - const repl::OpTime& blockOpTime); + const std::string& recipientTagName); /** * Inserts the shard split state document 'stateDoc' into diff --git a/src/mongo/db/serverless/shard_split_utils_test.cpp b/src/mongo/db/serverless/shard_split_utils_test.cpp index 206807f5967..0c85517c262 100644 --- a/src/mongo/db/serverless/shard_split_utils_test.cpp +++ b/src/mongo/db/serverless/shard_split_utils_test.cpp @@ -55,8 +55,8 @@ TEST(MakeSplitConfig, recipientConfigHasNewReplicaSetId) { << donorReplSetId))); const std::string recipientConfigSetName{"newSet"}; - const ReplSetConfig splitConfigResult = serverless::makeSplitConfig( - configA, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); + const ReplSetConfig splitConfigResult = + serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); ASSERT_EQ(splitConfigResult.getReplicaSetId(), donorReplSetId); ASSERT_NE(splitConfigResult.getReplicaSetId(), @@ -90,8 +90,8 @@ TEST(MakeSplitConfig, toBSONRoundTripAbility) { ASSERT_TRUE(configA == configB); const std::string recipientConfigSetName{"newSet"}; - const ReplSetConfig splitConfigResult = serverless::makeSplitConfig( - configA, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); + const ReplSetConfig splitConfigResult = + serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); // here we will test that the result from the method `makeSplitConfig` matches the hardcoded // resultSplitConfigBSON. We will also check that the recipient from the splitConfig matches @@ -155,8 +155,8 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { << BSON("electionTimeoutMillis" << 1000 << "replicaSetId" << OID::gen()))); - const ReplSetConfig splitConfig = serverless::makeSplitConfig( - config, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); + const ReplSetConfig splitConfig = + serverless::makeSplitConfig(config, recipientConfigSetName, recipientTagName); ASSERT_OK(splitConfig.validate()); ASSERT_EQ(splitConfig.getReplSetName(), donorConfigSetName); ASSERT_TRUE(splitConfig.toBSON().hasField("members")); @@ -177,12 +177,10 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { ASSERT_TRUE(recipientConfigPtr->getRecipientConfig() == nullptr); ASSERT_EQ(recipientConfigPtr->getReplSetName(), recipientConfigSetName); - ASSERT_THROWS_CODE(serverless::makeSplitConfig(splitConfig, - recipientConfigSetName, - recipientTagName, - repl::OpTime(Timestamp(100, 0), 1)), - AssertionException, - 6201800 /*calling on a splitconfig*/); + ASSERT_THROWS_CODE( + serverless::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName), + AssertionException, + 6201800 /*calling on a splitconfig*/); } TEST(MakeSplitConfig, SplitConfigAssertionsTest) { @@ -197,8 +195,7 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), recipientConfigSetName, - recipientTagName, - repl::OpTime(Timestamp(100, 0), 1)), + recipientTagName), AssertionException, 6201801 /*no recipient members created*/); @@ -213,8 +210,7 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), recipientConfigSetName, - recipientTagName, - repl::OpTime(Timestamp(100, 0), 1)), + recipientTagName), AssertionException, 6201802 /*no donor members created*/); } |