diff options
author | Brett Nawrocki <brett.nawrocki@mongodb.com> | 2023-03-27 19:40:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-09 17:33:07 +0000 |
commit | 627e0bf27cdfde42e7f4a4856eff5de474917abf (patch) | |
tree | 06fd661626c0296d079f197e842b99380adb4bb3 | |
parent | 0239589385ef353ca1ee2a10036cd8c5c52d8599 (diff) | |
download | mongo-627e0bf27cdfde42e7f4a4856eff5de474917abf.tar.gz |
SERVER-73091 Integrate MovePrimaryDonor with MovePrimaryCoordinator
9 files changed, 365 insertions, 65 deletions
diff --git a/jstests/concurrency/fsm_workloads/move_primary_with_crud.js b/jstests/concurrency/fsm_workloads/move_primary_with_crud.js index cabe78836f4..a9f50d57deb 100644 --- a/jstests/concurrency/fsm_workloads/move_primary_with_crud.js +++ b/jstests/concurrency/fsm_workloads/move_primary_with_crud.js @@ -167,7 +167,9 @@ const $config = (function() { // Due to a stepdown of the donor during the cloning phase, the movePrimary // operation failed. It is not automatically recovered, but any orphaned data on // the recipient has been deleted. - 7120202 + 7120202, + // Same as the above, but due to a stepdown of the recipient. + ErrorCodes.MovePrimaryAborted ]); }, checkDatabaseMetadataConsistency: function(db, collName, connCache) { diff --git a/jstests/concurrency/fsm_workloads/random_DDL_operations.js b/jstests/concurrency/fsm_workloads/random_DDL_operations.js index f2b103905ef..6749900610b 100644 --- a/jstests/concurrency/fsm_workloads/random_DDL_operations.js +++ b/jstests/concurrency/fsm_workloads/random_DDL_operations.js @@ -77,7 +77,10 @@ var $config = (function() { db.adminCommand({movePrimary: db.getName(), to: shardId}), [ ErrorCodes.ConflictingOperationInProgress, // The cloning phase has failed (e.g. as a result of a stepdown). When a failure - // occurs at this phase, the movePrimary operation does not recover. + // occurs at this phase, the movePrimary operation does not recover. Either of + // the following error codes could be seen depending on if the failover was on + // the donor or recipient node. + ErrorCodes.MovePrimaryAborted, 7120202 ]); }, diff --git a/jstests/sharding/move_primary_basic.js b/jstests/sharding/move_primary_basic.js index 5dc2216b4dd..faff3a48703 100644 --- a/jstests/sharding/move_primary_basic.js +++ b/jstests/sharding/move_primary_basic.js @@ -3,6 +3,10 @@ load('jstests/libs/feature_flag_util.js'); +function collectionExists(shard, dbName, collName) { + return Array.contains(shard.getDB(dbName).getCollectionNames(), collName); +} + var st = new ShardingTest({mongos: 1, shards: 2}); var mongos = st.s0; @@ -120,19 +124,36 @@ if (FeatureFlagUtil.isPresentAndEnabled(config.admin, 'ResilientMovePrimary')) { assert.commandFailedWithCode(mongos.adminCommand({movePrimary: dbName, to: shard1.shardName}), ErrorCodes.NamespaceExists); - // The documents are still on both the shards. - assert.eq(2, shard0.getCollection(coll1NS).find().itcount()); - assert.eq(1, shard1.getCollection(coll1NS).find().itcount()); + const expectDropOnFailure = + FeatureFlagUtil.isPresentAndEnabled(config.admin, 'OnlineMovePrimaryLifecycle'); - // Remove the orphaned document on shard1 leaving an empty collection. - assert.commandWorked(shard1.getCollection(coll1NS).remove({name: 'Emma'})); - assert.eq(0, shard1.getCollection(coll1NS).find().itcount()); + if (expectDropOnFailure) { + // The orphaned collection on shard1 should have been dropped due to the previous failure. + assert.eq(2, shard0.getCollection(coll1NS).find().itcount()); + assert(!collectionExists(shard1, dbName, coll1Name)); + + // Create another empty collection. + shard1.getDB(dbName).createCollection(coll1Name); + } else { + // The documents are on both the shards. + assert.eq(2, shard0.getCollection(coll1NS).find().itcount()); + assert.eq(1, shard1.getCollection(coll1NS).find().itcount()); + + // Remove the orphaned document on shard1 leaving an empty collection. + assert.commandWorked(shard1.getCollection(coll1NS).remove({name: 'Emma'})); + assert.eq(0, shard1.getCollection(coll1NS).find().itcount()); + } assert.commandFailedWithCode(mongos.adminCommand({movePrimary: dbName, to: shard1.shardName}), ErrorCodes.NamespaceExists); - // Drop the orphaned collection on shard1. - shard1.getCollection(coll1NS).drop(); + if (expectDropOnFailure) { + // The orphaned collection on shard1 should have been dropped due to the previous failure. + assert(!collectionExists(shard1, dbName, coll1Name)); + } else { + // Drop the orphaned collection on shard1. + shard1.getCollection(coll1NS).drop(); + } } jsTest.log('Test that metadata has changed'); diff --git a/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js b/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js new file mode 100644 index 00000000000..b302f5179a5 --- /dev/null +++ b/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js @@ -0,0 +1,44 @@ +/** + * Test that movePrimary sets valid block timestamp if a failover occurs before persisting it. + * + * @tags: [ + * requires_fcv_70, + * featureFlagOnlineMovePrimaryLifecycle + * ] + */ +(function() { +'use strict'; +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallel_shell_helpers.js"); + +const st = new ShardingTest({mongos: 1, shards: 2, rs: {nodes: 3}}); + +const mongos = st.s0; +const shard0 = st.shard0; +const oldDonorPrimary = st.rs0.getPrimary(); +const shard1 = st.shard1; +const config = st.config; + +const dbName = 'test_db'; +const collName = 'test_coll'; +const collNS = dbName + '.' + collName; + +assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName})); +assert.commandWorked(mongos.getCollection(collNS).insert({value: 1})); +assert.commandWorked(mongos.getCollection(collNS).insert({value: 2})); + +const fp = configureFailPoint(oldDonorPrimary, "pauseBeforeMovePrimaryDonorPersistsBlockTimestamp"); + +const joinMovePrimary = startParallelShell( + funWithArgs(function(dbName, toShard) { + assert.commandWorked(db.adminCommand({movePrimary: dbName, to: toShard})); + }, dbName, shard1.shardName), mongos.port); + +fp.wait(); +st.rs0.getPrimary().adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: 1}); +fp.off(); +st.rs0.awaitNodesAgreeOnPrimary(); +joinMovePrimary(); + +st.stop(); +})(); diff --git a/src/mongo/db/s/move_primary/move_primary_donor_service.cpp b/src/mongo/db/s/move_primary/move_primary_donor_service.cpp index 73e008beb8b..ae82cda7b07 100644 --- a/src/mongo/db/s/move_primary/move_primary_donor_service.cpp +++ b/src/mongo/db/s/move_primary/move_primary_donor_service.cpp @@ -32,6 +32,7 @@ #include "mongo/db/persistent_task_store.h" #include "mongo/db/s/move_primary/move_primary_recipient_cmds_gen.h" #include "mongo/db/s/move_primary/move_primary_server_parameters_gen.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/grid.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kMovePrimary @@ -45,6 +46,7 @@ MONGO_FAIL_POINT_DEFINE(pauseDuringMovePrimaryDonorStateEnumTransition); MONGO_FAIL_POINT_DEFINE(pauseDuringMovePrimaryDonorStateEnumTransitionAlternate); MONGO_FAIL_POINT_DEFINE(pauseBeforeBeginningMovePrimaryDonorWorkflow); +MONGO_FAIL_POINT_DEFINE(pauseBeforeMovePrimaryDonorPersistsBlockTimestamp); MONGO_FAIL_POINT_DEFINE(pauseBeforeBeginningMovePrimaryDonorCleanup); enum StateTransitionProgress { @@ -162,6 +164,16 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> MovePrimaryDonorService::con _makeDependencies(initialDoc)); } +std::vector<std::shared_ptr<MovePrimaryDonor>> MovePrimaryDonorService::getAllDonorInstances( + OperationContext* opCtx) { + std::vector<std::shared_ptr<MovePrimaryDonor>> result; + auto instances = getAllInstances(opCtx); + for (const auto& instance : instances) { + result.push_back(checked_pointer_cast<MovePrimaryDonor>(instance)); + } + return result; +} + MovePrimaryDonorDependencies MovePrimaryDonorService::_makeDependencies( const MovePrimaryDonorDocument& initialDoc) { return {std::make_unique<MovePrimaryDonorExternalStateImpl>(initialDoc.getMetadata())}; @@ -312,6 +324,55 @@ StatusWith<Shard::CommandResponse> MovePrimaryDonorExternalStateImpl::runCommand return shard->runCommand(opCtx, readPref, dbName, cmdObj, retryPolicy); } +std::shared_ptr<MovePrimaryDonor> MovePrimaryDonor::get(OperationContext* opCtx, + const DatabaseName& dbName, + const ShardId& toShard) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = checked_cast<MovePrimaryDonorService*>( + registry->lookupServiceByName(MovePrimaryDonorService::kServiceName)); + auto instances = service->getAllDonorInstances(opCtx); + for (const auto& instance : instances) { + if (_matchesArguments(instance, dbName, toShard)) { + return instance; + } + } + return nullptr; +} + +std::shared_ptr<MovePrimaryDonor> MovePrimaryDonor::create(OperationContext* opCtx, + const DatabaseName& dbName, + const ShardId& toShard) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(MovePrimaryDonorService::kServiceName); + MovePrimaryCommonMetadata metadata; + metadata.setMigrationId(UUID::gen()); + metadata.setDatabaseName(NamespaceString{dbName.toString()}); + metadata.setFromShardName(ShardingState::get(opCtx)->shardId()); + metadata.setToShardName(toShard.toString()); + MovePrimaryDonorDocument document; + document.setId(metadata.getMigrationId()); + document.setMetadata(std::move(metadata)); + auto donor = MovePrimaryDonor::getOrCreate(opCtx, service, document.toBSON()); + uassert(7309100, + "Unable to create MovePrimaryDonor using the following initial state: {}"_format( + redact(document.toBSON()).toString()), + donor); + return donor; +} + +bool MovePrimaryDonor::_matchesArguments(const std::shared_ptr<MovePrimaryDonor>& instance, + const DatabaseName& dbName, + const ShardId& toShard) { + const auto& metadata = instance->getMetadata(); + if (dbName != metadata.getDatabaseName().db()) { + return false; + } + if (toShard.toString() != metadata.getToShardName()) { + return false; + } + return true; +} + MovePrimaryDonor::MovePrimaryDonor(ServiceContext* serviceContext, MovePrimaryDonorService* donorService, MovePrimaryDonorDocument initialState, @@ -442,9 +503,10 @@ ExecutorFuture<void> MovePrimaryDonor::_runDonorWorkflow() { .then([this] { return _waitForForgetThenDoCleanup(); }) .thenRunOn(_cleanupExecutor) .onCompletion([this, self = shared_from_this()](Status okOrStepdownError) { - invariant(okOrStepdownError.isOK() || _cancelState->isSteppingDown()); - const auto& finalResult = - _cancelState->isSteppingDown() ? okOrStepdownError : _getOperationStatus(); + bool steppingDown = + _cancelState->isSteppingDown() || (**_taskExecutor)->isShuttingDown(); + invariant(okOrStepdownError.isOK() || steppingDown); + const auto& finalResult = steppingDown ? okOrStepdownError : _getOperationStatus(); _ensureProgressPromisesAreFulfilled(finalResult); return finalResult; }); @@ -553,6 +615,7 @@ ExecutorFuture<void> MovePrimaryDonor::_doWaitingToBlockWrites() { return _waitUntilReadyToBlockWrites() .then([this] { return _waitUntilCurrentlyBlockingWrites(); }) .then([this](Timestamp blockingWritesTimestamp) { + pauseBeforeMovePrimaryDonorPersistsBlockTimestamp.pauseWhileSet(); return _persistBlockingWritesTimestamp(blockingWritesTimestamp); }); } diff --git a/src/mongo/db/s/move_primary/move_primary_donor_service.h b/src/mongo/db/s/move_primary/move_primary_donor_service.h index 922f9904c3f..3236334e7b1 100644 --- a/src/mongo/db/s/move_primary/move_primary_donor_service.h +++ b/src/mongo/db/s/move_primary/move_primary_donor_service.h @@ -38,6 +38,7 @@ namespace mongo { +class MovePrimaryDonor; struct MovePrimaryDonorDependencies; class MovePrimaryDonorService : public repl::PrimaryOnlyService { @@ -59,6 +60,8 @@ public: std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override; + std::vector<std::shared_ptr<MovePrimaryDonor>> getAllDonorInstances(OperationContext* opCtx); + protected: virtual MovePrimaryDonorDependencies _makeDependencies( const MovePrimaryDonorDocument& initialDoc); @@ -219,6 +222,13 @@ struct MovePrimaryDonorDependencies { class MovePrimaryDonor : public repl::PrimaryOnlyService::TypedInstance<MovePrimaryDonor> { public: + static std::shared_ptr<MovePrimaryDonor> get(OperationContext* opCtx, + const DatabaseName& dbName, + const ShardId& toShard); + static std::shared_ptr<MovePrimaryDonor> create(OperationContext* opCtx, + const DatabaseName& dbName, + const ShardId& toShard); + MovePrimaryDonor(ServiceContext* serviceContext, MovePrimaryDonorService* donorService, MovePrimaryDonorDocument initialState, @@ -249,6 +259,10 @@ public: SharedSemiFuture<void> getCompletionFuture() const; private: + static bool _matchesArguments(const std::shared_ptr<MovePrimaryDonor>& instance, + const DatabaseName& dbName, + const ShardId& toShard); + MovePrimaryDonorStateEnum _getCurrentState() const; MovePrimaryDonorMutableFields _getMutableFields() const; bool _isAborted(WithLock) const; diff --git a/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp b/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp index bc92722024a..5a10a1bdc4c 100644 --- a/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp +++ b/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp @@ -189,7 +189,8 @@ SemiFuture<void> MovePrimaryRecipientService::MovePrimaryRecipient::run( // We would like to abort in all cases where there is a failover and we have not yet reached // kPrepared state to maintain correctness of movePrimary operation across upgrades/downgrades - // in binary versions with feature parity in online movePrimary implementation. + // in binary versions with feature parity in online movePrimary implementation. The offline + // cloner is not resumable after failovers. auto shouldAbort = [&] { if (!_useOnlineCloner()) { stdx::lock_guard<Latch> lg(_mutex); @@ -317,6 +318,7 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToCloningStateAndC MovePrimaryRecipientDocument::kStateFieldName, MovePrimaryRecipientState_serializer(MovePrimaryRecipientStateEnum::kCloning)); _transitionStateMachine(MovePrimaryRecipientStateEnum::kCloning); + // TODO SERVER-75872: Refactor this logic after integrating online cloner. _cloneDataFromDonor(opCtx.get()); }) .onTransientError([](const Status& status) { @@ -554,7 +556,8 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToDoneStateAndFini movePrimaryRecipientPauseBeforeDeletingStateDoc.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getStepdownToken()); _removeRecipientDocument(opCtx.get()); - }); + }) + .then([this, executor] { return _waitForMajority(executor); }); }) .onTransientError([](const Status& status) {}) .onUnrecoverableError([](const Status& status) { @@ -805,6 +808,9 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToInitializingStat ExecutorFuture<void> MovePrimaryRecipientService::MovePrimaryRecipient::_initializeForCloningState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_checkInvalidStateTransition(MovePrimaryRecipientStateEnum::kCloning)) { + return ExecutorFuture(**executor); + } return _retryingCancelableOpCtxFactory ->withAutomaticRetry([this, executor](const auto& factory) { auto opCtx = factory.makeOperationContext(Client::getCurrent()); diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp index 1acdefda3b3..929e34809da 100644 --- a/src/mongo/db/s/move_primary_coordinator.cpp +++ b/src/mongo/db/s/move_primary_coordinator.cpp @@ -44,12 +44,22 @@ #include "mongo/db/vector_clock_mutable.h" #include "mongo/db/write_block_bypass.h" #include "mongo/s/grid.h" +#include "mongo/s/move_primary/move_primary_feature_flag_gen.h" #include "mongo/s/request_types/move_primary_gen.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { +namespace { + +bool useOnlineCloner() { + return move_primary::gFeatureFlagOnlineMovePrimaryLifecycle.isEnabled( + serverGlobalParams.featureCompatibility); +} + +} // namespace + MONGO_FAIL_POINT_DEFINE(hangBeforeCloningData); MovePrimaryCoordinator::MovePrimaryCoordinator(ShardingDDLCoordinatorService* service, @@ -139,6 +149,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl( "Requested primary shard {} is draining"_format(toShardId.toString()), !toShardEntry.getDraining()); + if (useOnlineCloner() && !_firstExecution) { + recoverOnlineCloner(opCtx); + } + return runMovePrimaryWorkflow(executor, token); }); } @@ -149,7 +163,7 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( return ExecutorFuture<void>(**executor) .then(_buildPhaseHandler( Phase::kClone, - [this, anchor = shared_from_this()] { + [this, token, anchor = shared_from_this()] { const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); @@ -188,26 +202,30 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( hangBeforeCloningData.pauseWhileSet(opCtx); } - const auto& collectionsToClone = getUnshardedCollections(opCtx); - assertNoOrphanedDataOnRecipient(opCtx, collectionsToClone); - - _doc.setCollectionsToClone(collectionsToClone); - _updateStateDocument(opCtx, StateDoc(_doc)); - - const auto& clonedCollections = cloneDataToRecipient(opCtx); - assertClonedData(clonedCollections); + if (useOnlineCloner()) { + if (!_onlineCloner) { + createOnlineCloner(opCtx); + } + cloneDataUntilReadyForCatchup(opCtx, token); + } else { + cloneDataLegacy(opCtx); + } // TODO (SERVER-71566): Temporary solution to cover the case of stepping down before // actually entering the `kCatchup` phase. blockWrites(opCtx); })) .then(_buildPhaseHandler(Phase::kCatchup, - [this, anchor = shared_from_this()] { + [this, token, anchor = shared_from_this()] { const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); blockWrites(opCtx); + if (useOnlineCloner()) { + informOnlineClonerOfBlockingWrites(opCtx); + waitUntilOnlineClonerPrepared(token); + } })) .then(_buildPhaseHandler(Phase::kEnterCriticalSection, [this, executor, anchor = shared_from_this()] { @@ -226,7 +244,9 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( } blockReads(opCtx); - enterCriticalSectionOnRecipient(opCtx); + if (!useOnlineCloner()) { + enterCriticalSectionOnRecipient(opCtx); + } })) .then(_buildPhaseHandler( Phase::kCommit, @@ -261,7 +281,7 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( dropStaleDataOnDonor(opCtx); })) .then(_buildPhaseHandler(Phase::kExitCriticalSection, - [this, executor, anchor = shared_from_this()] { + [this, executor, token, anchor = shared_from_this()] { const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); @@ -277,7 +297,11 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( } unblockReadsAndWrites(opCtx); - exitCriticalSectionOnRecipient(opCtx); + if (useOnlineCloner()) { + cleanupOnlineCloner(opCtx, token); + } else { + exitCriticalSectionOnRecipient(opCtx); + } LOGV2(7120206, "Completed movePrimary operation", @@ -292,7 +316,8 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( getForwardableOpMetadata().setOn(opCtx); const auto& failedPhase = _doc.getPhase(); - if (failedPhase == Phase::kClone || status == ErrorCodes::ShardNotFound) { + if (_onlineCloner || failedPhase == Phase::kClone || + status == ErrorCodes::ShardNotFound) { LOGV2_DEBUG(7392900, 1, "Triggering movePrimary cleanup", @@ -306,12 +331,70 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( }); } +bool MovePrimaryCoordinator::onlineClonerPossiblyNeverCreated() const { + // Either the first run of this service, or failed over before online cloner persisted its + // state document. + auto phase = _doc.getPhase(); + return phase <= Phase::kClone; +} + +bool MovePrimaryCoordinator::onlineClonerPossiblyCleanedUp() const { + // Could have failed over between the online cloner deleting its state document and the + // coordinator deleting its state document. + auto phase = _doc.getPhase(); + return phase == Phase::kExitCriticalSection || getAbortReason(); +} + +bool MovePrimaryCoordinator::onlineClonerAllowedToBeMissing() const { + return onlineClonerPossiblyNeverCreated() || onlineClonerPossiblyCleanedUp(); +} + +void MovePrimaryCoordinator::recoverOnlineCloner(OperationContext* opCtx) { + _onlineCloner = MovePrimaryDonor::get(opCtx, _dbName, _doc.getToShardId()); + if (_onlineCloner) { + return; + } + invariant(onlineClonerAllowedToBeMissing()); +} + +void MovePrimaryCoordinator::createOnlineCloner(OperationContext* opCtx) { + invariant(onlineClonerPossiblyNeverCreated()); + _onlineCloner = MovePrimaryDonor::create(opCtx, _dbName, _doc.getToShardId()); +} + +void MovePrimaryCoordinator::cloneDataUntilReadyForCatchup(OperationContext* opCtx, + const CancellationToken& token) { + future_util::withCancellation(_onlineCloner->getReadyToBlockWritesFuture(), token).get(); +} + +void MovePrimaryCoordinator::cloneDataLegacy(OperationContext* opCtx) { + const auto& collectionsToClone = getUnshardedCollections(opCtx); + assertNoOrphanedDataOnRecipient(opCtx, collectionsToClone); + + _doc.setCollectionsToClone(collectionsToClone); + _updateStateDocument(opCtx, StateDoc(_doc)); + + const auto& clonedCollections = cloneDataToRecipient(opCtx); + assertClonedData(clonedCollections); +} + +void MovePrimaryCoordinator::informOnlineClonerOfBlockingWrites(OperationContext* opCtx) { + auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); + replClient.setLastOpToSystemLastOpTime(opCtx); + const auto latestOpTime = replClient.getLastOp(); + _onlineCloner->onBeganBlockingWrites(latestOpTime.getTimestamp()); +} + +void MovePrimaryCoordinator::waitUntilOnlineClonerPrepared(const CancellationToken& token) { + future_util::withCancellation(_onlineCloner->getDecisionFuture(), token).get(); +} + ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token, const Status& status) noexcept { return ExecutorFuture<void>(**executor) - .then([this, executor, status, anchor = shared_from_this()] { + .then([this, executor, token, status, anchor = shared_from_this()] { const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); @@ -320,48 +403,77 @@ ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort( _performNoopRetryableWriteOnAllShardsAndConfigsvr( opCtx, getCurrentSession(), **executor); - const auto& failedPhase = _doc.getPhase(); - const auto& toShardId = _doc.getToShardId(); - - if (failedPhase <= Phase::kCommit) { - // A non-retryable error occurred before the new primary shard was actually - // committed, so any cloned data on the recipient must be dropped. - - try { - // Even if the error is `ShardNotFound`, the recipient may still be in draining - // mode, so try to drop any orphaned data anyway. - dropOrphanedDataOnRecipient(opCtx, executor); - } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { - LOGV2_INFO(7392901, - "Failed to remove orphaned data on recipient as it has been removed", - logAttrs(_dbName), - "to"_attr = toShardId); - } - } - - unblockReadsAndWrites(opCtx); - try { - // Even if the error is `ShardNotFound`, the recipient may still be in draining - // mode, so try to exit the critical section anyway. - exitCriticalSectionOnRecipient(opCtx); - } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { - LOGV2_INFO(7392902, - "Failed to exit critical section on recipient as it has been removed", - logAttrs(_dbName), - "to"_attr = toShardId); + if (useOnlineCloner()) { + cleanupOnAbortWithOnlineCloner(opCtx, token, status); + } else { + cleanupOnAbortWithoutOnlineCloner(opCtx, executor); } LOGV2_ERROR(7392903, "Failed movePrimary operation", logAttrs(_dbName), - "to"_attr = toShardId, - "phase"_attr = serializePhase(failedPhase), + "to"_attr = _doc.getToShardId(), + "phase"_attr = serializePhase(_doc.getPhase()), "error"_attr = redact(status)); logChange(opCtx, "error", status); }); } +void MovePrimaryCoordinator::cleanupOnAbortWithoutOnlineCloner( + OperationContext* opCtx, std::shared_ptr<executor::ScopedTaskExecutor> executor) { + const auto& failedPhase = _doc.getPhase(); + const auto& toShardId = _doc.getToShardId(); + + if (failedPhase <= Phase::kCommit) { + // A non-retryable error occurred before the new primary shard was actually + // committed, so any cloned data on the recipient must be dropped. + + try { + // Even if the error is `ShardNotFound`, the recipient may still be in draining + // mode, so try to drop any orphaned data anyway. + dropOrphanedDataOnRecipient(opCtx, executor); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { + LOGV2_INFO(7392901, + "Failed to remove orphaned data on recipient as it has been removed", + logAttrs(_dbName), + "to"_attr = toShardId); + } + } + + unblockReadsAndWrites(opCtx); + try { + // Even if the error is `ShardNotFound`, the recipient may still be in draining + // mode, so try to exit the critical section anyway. + exitCriticalSectionOnRecipient(opCtx); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) { + LOGV2_INFO(7392902, + "Failed to exit critical section on recipient as it has been removed", + logAttrs(_dbName), + "to"_attr = toShardId); + } +} + +void MovePrimaryCoordinator::cleanupOnlineCloner(OperationContext* opCtx, + const CancellationToken& token) { + if (!_onlineCloner) { + return; + } + _onlineCloner->onReadyToForget(); + future_util::withCancellation(_onlineCloner->getCompletionFuture(), token).wait(); +} + +void MovePrimaryCoordinator::cleanupOnAbortWithOnlineCloner(OperationContext* opCtx, + const CancellationToken& token, + const Status& status) { + unblockReadsAndWrites(opCtx); + if (!_onlineCloner) { + return; + } + _onlineCloner->abort(status); + cleanupOnlineCloner(opCtx, token); +} + void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what, const Status& status) const { @@ -376,7 +488,7 @@ void MovePrimaryCoordinator::logChange(OperationContext* opCtx, } std::vector<NamespaceString> MovePrimaryCoordinator::getUnshardedCollections( - OperationContext* opCtx) { + OperationContext* opCtx) const { const auto allCollections = [&] { DBDirectClient dbClient(opCtx); const auto collInfos = @@ -579,8 +691,14 @@ void MovePrimaryCoordinator::dropStaleDataOnDonor(OperationContext* opCtx) const WriteBlockBypass::get(opCtx).set(true); DBDirectClient dbClient(opCtx); - invariant(_doc.getCollectionsToClone()); - for (const auto& nss : *_doc.getCollectionsToClone()) { + auto unshardedCollections = [this, opCtx] { + if (useOnlineCloner()) { + return getUnshardedCollections(opCtx); + } + invariant(_doc.getCollectionsToClone()); + return *_doc.getCollectionsToClone(); + }(); + for (const auto& nss : unshardedCollections) { const auto dropStatus = [&] { BSONObj dropResult; dbClient.runCommand(_dbName, BSON("drop" << nss.coll()), dropResult); diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h index 5136f73a0ee..22a03c2a460 100644 --- a/src/mongo/db/s/move_primary_coordinator.h +++ b/src/mongo/db/s/move_primary_coordinator.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/s/move_primary/move_primary_donor_service.h" #include "mongo/db/s/move_primary_coordinator_document_gen.h" #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/s/client/shard.h" @@ -62,6 +63,25 @@ private: std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept; + bool onlineClonerPossiblyNeverCreated() const; + bool onlineClonerPossiblyCleanedUp() const; + bool onlineClonerAllowedToBeMissing() const; + void recoverOnlineCloner(OperationContext* opCtx); + void createOnlineCloner(OperationContext* opCtx); + + /** + * Clone data to the recipient without using the online cloning machinery. + */ + void cloneDataLegacy(OperationContext* opCtx); + + /** + * Clone data to the recipient using the online cloning machinery. + */ + void cloneDataUntilReadyForCatchup(OperationContext* opCtx, const CancellationToken& token); + + void informOnlineClonerOfBlockingWrites(OperationContext* opCtx); + void waitUntilOnlineClonerPrepared(const CancellationToken& token); + /** * Logs in the `config.changelog` collection a specific event for `movePrimary` operations. */ @@ -73,7 +93,7 @@ private: * Returns the list of unsharded collections for the given database. These are the collections * the recipient is expected to clone. */ - std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx); + std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx) const; /** * Ensures that there are no orphaned collections in the recipient's catalog data, asserting @@ -82,6 +102,7 @@ private: void assertNoOrphanedDataOnRecipient( OperationContext* opCtx, const std::vector<NamespaceString>& collectionsToClone) const; + /** * Requests to the recipient to clone all the collections of the given database currently owned * by this shard. Once the cloning is complete, the recipient returns the list of the actually @@ -177,8 +198,16 @@ private: */ void exitCriticalSectionOnRecipient(OperationContext* opCtx) const; + void cleanupOnlineCloner(OperationContext* opCtx, const CancellationToken& token); + void cleanupOnAbortWithoutOnlineCloner(OperationContext* opCtx, + std::shared_ptr<executor::ScopedTaskExecutor> executor); + void cleanupOnAbortWithOnlineCloner(OperationContext* opCtx, + const CancellationToken& token, + const Status& status); + const DatabaseName _dbName; const BSONObj _csReason; + std::shared_ptr<MovePrimaryDonor> _onlineCloner; }; } // namespace mongo |