summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2023-03-27 19:40:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-09 17:33:07 +0000
commit627e0bf27cdfde42e7f4a4856eff5de474917abf (patch)
tree06fd661626c0296d079f197e842b99380adb4bb3
parent0239589385ef353ca1ee2a10036cd8c5c52d8599 (diff)
downloadmongo-627e0bf27cdfde42e7f4a4856eff5de474917abf.tar.gz
SERVER-73091 Integrate MovePrimaryDonor with MovePrimaryCoordinator
-rw-r--r--jstests/concurrency/fsm_workloads/move_primary_with_crud.js4
-rw-r--r--jstests/concurrency/fsm_workloads/random_DDL_operations.js5
-rw-r--r--jstests/sharding/move_primary_basic.js37
-rw-r--r--jstests/sharding/move_primary_failover_before_persist_block_timestamp.js44
-rw-r--r--src/mongo/db/s/move_primary/move_primary_donor_service.cpp69
-rw-r--r--src/mongo/db/s/move_primary/move_primary_donor_service.h14
-rw-r--r--src/mongo/db/s/move_primary/move_primary_recipient_service.cpp10
-rw-r--r--src/mongo/db/s/move_primary_coordinator.cpp216
-rw-r--r--src/mongo/db/s/move_primary_coordinator.h31
9 files changed, 365 insertions, 65 deletions
diff --git a/jstests/concurrency/fsm_workloads/move_primary_with_crud.js b/jstests/concurrency/fsm_workloads/move_primary_with_crud.js
index cabe78836f4..a9f50d57deb 100644
--- a/jstests/concurrency/fsm_workloads/move_primary_with_crud.js
+++ b/jstests/concurrency/fsm_workloads/move_primary_with_crud.js
@@ -167,7 +167,9 @@ const $config = (function() {
// Due to a stepdown of the donor during the cloning phase, the movePrimary
// operation failed. It is not automatically recovered, but any orphaned data on
// the recipient has been deleted.
- 7120202
+ 7120202,
+ // Same as the above, but due to a stepdown of the recipient.
+ ErrorCodes.MovePrimaryAborted
]);
},
checkDatabaseMetadataConsistency: function(db, collName, connCache) {
diff --git a/jstests/concurrency/fsm_workloads/random_DDL_operations.js b/jstests/concurrency/fsm_workloads/random_DDL_operations.js
index f2b103905ef..6749900610b 100644
--- a/jstests/concurrency/fsm_workloads/random_DDL_operations.js
+++ b/jstests/concurrency/fsm_workloads/random_DDL_operations.js
@@ -77,7 +77,10 @@ var $config = (function() {
db.adminCommand({movePrimary: db.getName(), to: shardId}), [
ErrorCodes.ConflictingOperationInProgress,
// The cloning phase has failed (e.g. as a result of a stepdown). When a failure
- // occurs at this phase, the movePrimary operation does not recover.
+ // occurs at this phase, the movePrimary operation does not recover. Either of
+ // the following error codes could be seen depending on if the failover was on
+ // the donor or recipient node.
+ ErrorCodes.MovePrimaryAborted,
7120202
]);
},
diff --git a/jstests/sharding/move_primary_basic.js b/jstests/sharding/move_primary_basic.js
index 5dc2216b4dd..faff3a48703 100644
--- a/jstests/sharding/move_primary_basic.js
+++ b/jstests/sharding/move_primary_basic.js
@@ -3,6 +3,10 @@
load('jstests/libs/feature_flag_util.js');
+function collectionExists(shard, dbName, collName) {
+ return Array.contains(shard.getDB(dbName).getCollectionNames(), collName);
+}
+
var st = new ShardingTest({mongos: 1, shards: 2});
var mongos = st.s0;
@@ -120,19 +124,36 @@ if (FeatureFlagUtil.isPresentAndEnabled(config.admin, 'ResilientMovePrimary')) {
assert.commandFailedWithCode(mongos.adminCommand({movePrimary: dbName, to: shard1.shardName}),
ErrorCodes.NamespaceExists);
- // The documents are still on both the shards.
- assert.eq(2, shard0.getCollection(coll1NS).find().itcount());
- assert.eq(1, shard1.getCollection(coll1NS).find().itcount());
+ const expectDropOnFailure =
+ FeatureFlagUtil.isPresentAndEnabled(config.admin, 'OnlineMovePrimaryLifecycle');
- // Remove the orphaned document on shard1 leaving an empty collection.
- assert.commandWorked(shard1.getCollection(coll1NS).remove({name: 'Emma'}));
- assert.eq(0, shard1.getCollection(coll1NS).find().itcount());
+ if (expectDropOnFailure) {
+ // The orphaned collection on shard1 should have been dropped due to the previous failure.
+ assert.eq(2, shard0.getCollection(coll1NS).find().itcount());
+ assert(!collectionExists(shard1, dbName, coll1Name));
+
+ // Create another empty collection.
+ shard1.getDB(dbName).createCollection(coll1Name);
+ } else {
+ // The documents are on both the shards.
+ assert.eq(2, shard0.getCollection(coll1NS).find().itcount());
+ assert.eq(1, shard1.getCollection(coll1NS).find().itcount());
+
+ // Remove the orphaned document on shard1 leaving an empty collection.
+ assert.commandWorked(shard1.getCollection(coll1NS).remove({name: 'Emma'}));
+ assert.eq(0, shard1.getCollection(coll1NS).find().itcount());
+ }
assert.commandFailedWithCode(mongos.adminCommand({movePrimary: dbName, to: shard1.shardName}),
ErrorCodes.NamespaceExists);
- // Drop the orphaned collection on shard1.
- shard1.getCollection(coll1NS).drop();
+ if (expectDropOnFailure) {
+ // The orphaned collection on shard1 should have been dropped due to the previous failure.
+ assert(!collectionExists(shard1, dbName, coll1Name));
+ } else {
+ // Drop the orphaned collection on shard1.
+ shard1.getCollection(coll1NS).drop();
+ }
}
jsTest.log('Test that metadata has changed');
diff --git a/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js b/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js
new file mode 100644
index 00000000000..b302f5179a5
--- /dev/null
+++ b/jstests/sharding/move_primary_failover_before_persist_block_timestamp.js
@@ -0,0 +1,44 @@
+/**
+ * Test that movePrimary sets valid block timestamp if a failover occurs before persisting it.
+ *
+ * @tags: [
+ * requires_fcv_70,
+ * featureFlagOnlineMovePrimaryLifecycle
+ * ]
+ */
+(function() {
+'use strict';
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+const st = new ShardingTest({mongos: 1, shards: 2, rs: {nodes: 3}});
+
+const mongos = st.s0;
+const shard0 = st.shard0;
+const oldDonorPrimary = st.rs0.getPrimary();
+const shard1 = st.shard1;
+const config = st.config;
+
+const dbName = 'test_db';
+const collName = 'test_coll';
+const collNS = dbName + '.' + collName;
+
+assert.commandWorked(mongos.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName}));
+assert.commandWorked(mongos.getCollection(collNS).insert({value: 1}));
+assert.commandWorked(mongos.getCollection(collNS).insert({value: 2}));
+
+const fp = configureFailPoint(oldDonorPrimary, "pauseBeforeMovePrimaryDonorPersistsBlockTimestamp");
+
+const joinMovePrimary = startParallelShell(
+ funWithArgs(function(dbName, toShard) {
+ assert.commandWorked(db.adminCommand({movePrimary: dbName, to: toShard}));
+ }, dbName, shard1.shardName), mongos.port);
+
+fp.wait();
+st.rs0.getPrimary().adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: 1});
+fp.off();
+st.rs0.awaitNodesAgreeOnPrimary();
+joinMovePrimary();
+
+st.stop();
+})();
diff --git a/src/mongo/db/s/move_primary/move_primary_donor_service.cpp b/src/mongo/db/s/move_primary/move_primary_donor_service.cpp
index 73e008beb8b..ae82cda7b07 100644
--- a/src/mongo/db/s/move_primary/move_primary_donor_service.cpp
+++ b/src/mongo/db/s/move_primary/move_primary_donor_service.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/move_primary/move_primary_recipient_cmds_gen.h"
#include "mongo/db/s/move_primary/move_primary_server_parameters_gen.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/s/grid.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kMovePrimary
@@ -45,6 +46,7 @@ MONGO_FAIL_POINT_DEFINE(pauseDuringMovePrimaryDonorStateEnumTransition);
MONGO_FAIL_POINT_DEFINE(pauseDuringMovePrimaryDonorStateEnumTransitionAlternate);
MONGO_FAIL_POINT_DEFINE(pauseBeforeBeginningMovePrimaryDonorWorkflow);
+MONGO_FAIL_POINT_DEFINE(pauseBeforeMovePrimaryDonorPersistsBlockTimestamp);
MONGO_FAIL_POINT_DEFINE(pauseBeforeBeginningMovePrimaryDonorCleanup);
enum StateTransitionProgress {
@@ -162,6 +164,16 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> MovePrimaryDonorService::con
_makeDependencies(initialDoc));
}
+std::vector<std::shared_ptr<MovePrimaryDonor>> MovePrimaryDonorService::getAllDonorInstances(
+ OperationContext* opCtx) {
+ std::vector<std::shared_ptr<MovePrimaryDonor>> result;
+ auto instances = getAllInstances(opCtx);
+ for (const auto& instance : instances) {
+ result.push_back(checked_pointer_cast<MovePrimaryDonor>(instance));
+ }
+ return result;
+}
+
MovePrimaryDonorDependencies MovePrimaryDonorService::_makeDependencies(
const MovePrimaryDonorDocument& initialDoc) {
return {std::make_unique<MovePrimaryDonorExternalStateImpl>(initialDoc.getMetadata())};
@@ -312,6 +324,55 @@ StatusWith<Shard::CommandResponse> MovePrimaryDonorExternalStateImpl::runCommand
return shard->runCommand(opCtx, readPref, dbName, cmdObj, retryPolicy);
}
+std::shared_ptr<MovePrimaryDonor> MovePrimaryDonor::get(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const ShardId& toShard) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = checked_cast<MovePrimaryDonorService*>(
+ registry->lookupServiceByName(MovePrimaryDonorService::kServiceName));
+ auto instances = service->getAllDonorInstances(opCtx);
+ for (const auto& instance : instances) {
+ if (_matchesArguments(instance, dbName, toShard)) {
+ return instance;
+ }
+ }
+ return nullptr;
+}
+
+std::shared_ptr<MovePrimaryDonor> MovePrimaryDonor::create(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const ShardId& toShard) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(MovePrimaryDonorService::kServiceName);
+ MovePrimaryCommonMetadata metadata;
+ metadata.setMigrationId(UUID::gen());
+ metadata.setDatabaseName(NamespaceString{dbName.toString()});
+ metadata.setFromShardName(ShardingState::get(opCtx)->shardId());
+ metadata.setToShardName(toShard.toString());
+ MovePrimaryDonorDocument document;
+ document.setId(metadata.getMigrationId());
+ document.setMetadata(std::move(metadata));
+ auto donor = MovePrimaryDonor::getOrCreate(opCtx, service, document.toBSON());
+ uassert(7309100,
+ "Unable to create MovePrimaryDonor using the following initial state: {}"_format(
+ redact(document.toBSON()).toString()),
+ donor);
+ return donor;
+}
+
+bool MovePrimaryDonor::_matchesArguments(const std::shared_ptr<MovePrimaryDonor>& instance,
+ const DatabaseName& dbName,
+ const ShardId& toShard) {
+ const auto& metadata = instance->getMetadata();
+ if (dbName != metadata.getDatabaseName().db()) {
+ return false;
+ }
+ if (toShard.toString() != metadata.getToShardName()) {
+ return false;
+ }
+ return true;
+}
+
MovePrimaryDonor::MovePrimaryDonor(ServiceContext* serviceContext,
MovePrimaryDonorService* donorService,
MovePrimaryDonorDocument initialState,
@@ -442,9 +503,10 @@ ExecutorFuture<void> MovePrimaryDonor::_runDonorWorkflow() {
.then([this] { return _waitForForgetThenDoCleanup(); })
.thenRunOn(_cleanupExecutor)
.onCompletion([this, self = shared_from_this()](Status okOrStepdownError) {
- invariant(okOrStepdownError.isOK() || _cancelState->isSteppingDown());
- const auto& finalResult =
- _cancelState->isSteppingDown() ? okOrStepdownError : _getOperationStatus();
+ bool steppingDown =
+ _cancelState->isSteppingDown() || (**_taskExecutor)->isShuttingDown();
+ invariant(okOrStepdownError.isOK() || steppingDown);
+ const auto& finalResult = steppingDown ? okOrStepdownError : _getOperationStatus();
_ensureProgressPromisesAreFulfilled(finalResult);
return finalResult;
});
@@ -553,6 +615,7 @@ ExecutorFuture<void> MovePrimaryDonor::_doWaitingToBlockWrites() {
return _waitUntilReadyToBlockWrites()
.then([this] { return _waitUntilCurrentlyBlockingWrites(); })
.then([this](Timestamp blockingWritesTimestamp) {
+ pauseBeforeMovePrimaryDonorPersistsBlockTimestamp.pauseWhileSet();
return _persistBlockingWritesTimestamp(blockingWritesTimestamp);
});
}
diff --git a/src/mongo/db/s/move_primary/move_primary_donor_service.h b/src/mongo/db/s/move_primary/move_primary_donor_service.h
index 922f9904c3f..3236334e7b1 100644
--- a/src/mongo/db/s/move_primary/move_primary_donor_service.h
+++ b/src/mongo/db/s/move_primary/move_primary_donor_service.h
@@ -38,6 +38,7 @@
namespace mongo {
+class MovePrimaryDonor;
struct MovePrimaryDonorDependencies;
class MovePrimaryDonorService : public repl::PrimaryOnlyService {
@@ -59,6 +60,8 @@ public:
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override;
+ std::vector<std::shared_ptr<MovePrimaryDonor>> getAllDonorInstances(OperationContext* opCtx);
+
protected:
virtual MovePrimaryDonorDependencies _makeDependencies(
const MovePrimaryDonorDocument& initialDoc);
@@ -219,6 +222,13 @@ struct MovePrimaryDonorDependencies {
class MovePrimaryDonor : public repl::PrimaryOnlyService::TypedInstance<MovePrimaryDonor> {
public:
+ static std::shared_ptr<MovePrimaryDonor> get(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const ShardId& toShard);
+ static std::shared_ptr<MovePrimaryDonor> create(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const ShardId& toShard);
+
MovePrimaryDonor(ServiceContext* serviceContext,
MovePrimaryDonorService* donorService,
MovePrimaryDonorDocument initialState,
@@ -249,6 +259,10 @@ public:
SharedSemiFuture<void> getCompletionFuture() const;
private:
+ static bool _matchesArguments(const std::shared_ptr<MovePrimaryDonor>& instance,
+ const DatabaseName& dbName,
+ const ShardId& toShard);
+
MovePrimaryDonorStateEnum _getCurrentState() const;
MovePrimaryDonorMutableFields _getMutableFields() const;
bool _isAborted(WithLock) const;
diff --git a/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp b/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp
index bc92722024a..5a10a1bdc4c 100644
--- a/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp
+++ b/src/mongo/db/s/move_primary/move_primary_recipient_service.cpp
@@ -189,7 +189,8 @@ SemiFuture<void> MovePrimaryRecipientService::MovePrimaryRecipient::run(
// We would like to abort in all cases where there is a failover and we have not yet reached
// kPrepared state to maintain correctness of movePrimary operation across upgrades/downgrades
- // in binary versions with feature parity in online movePrimary implementation.
+ // in binary versions with feature parity in online movePrimary implementation. The offline
+ // cloner is not resumable after failovers.
auto shouldAbort = [&] {
if (!_useOnlineCloner()) {
stdx::lock_guard<Latch> lg(_mutex);
@@ -317,6 +318,7 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToCloningStateAndC
MovePrimaryRecipientDocument::kStateFieldName,
MovePrimaryRecipientState_serializer(MovePrimaryRecipientStateEnum::kCloning));
_transitionStateMachine(MovePrimaryRecipientStateEnum::kCloning);
+ // TODO SERVER-75872: Refactor this logic after integrating online cloner.
_cloneDataFromDonor(opCtx.get());
})
.onTransientError([](const Status& status) {
@@ -554,7 +556,8 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToDoneStateAndFini
movePrimaryRecipientPauseBeforeDeletingStateDoc.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
_removeRecipientDocument(opCtx.get());
- });
+ })
+ .then([this, executor] { return _waitForMajority(executor); });
})
.onTransientError([](const Status& status) {})
.onUnrecoverableError([](const Status& status) {
@@ -805,6 +808,9 @@ MovePrimaryRecipientService::MovePrimaryRecipient::_transitionToInitializingStat
ExecutorFuture<void> MovePrimaryRecipientService::MovePrimaryRecipient::_initializeForCloningState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_checkInvalidStateTransition(MovePrimaryRecipientStateEnum::kCloning)) {
+ return ExecutorFuture(**executor);
+ }
return _retryingCancelableOpCtxFactory
->withAutomaticRetry([this, executor](const auto& factory) {
auto opCtx = factory.makeOperationContext(Client::getCurrent());
diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp
index 1acdefda3b3..929e34809da 100644
--- a/src/mongo/db/s/move_primary_coordinator.cpp
+++ b/src/mongo/db/s/move_primary_coordinator.cpp
@@ -44,12 +44,22 @@
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/db/write_block_bypass.h"
#include "mongo/s/grid.h"
+#include "mongo/s/move_primary/move_primary_feature_flag_gen.h"
#include "mongo/s/request_types/move_primary_gen.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
+namespace {
+
+bool useOnlineCloner() {
+ return move_primary::gFeatureFlagOnlineMovePrimaryLifecycle.isEnabled(
+ serverGlobalParams.featureCompatibility);
+}
+
+} // namespace
+
MONGO_FAIL_POINT_DEFINE(hangBeforeCloningData);
MovePrimaryCoordinator::MovePrimaryCoordinator(ShardingDDLCoordinatorService* service,
@@ -139,6 +149,10 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl(
"Requested primary shard {} is draining"_format(toShardId.toString()),
!toShardEntry.getDraining());
+ if (useOnlineCloner() && !_firstExecution) {
+ recoverOnlineCloner(opCtx);
+ }
+
return runMovePrimaryWorkflow(executor, token);
});
}
@@ -149,7 +163,7 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
return ExecutorFuture<void>(**executor)
.then(_buildPhaseHandler(
Phase::kClone,
- [this, anchor = shared_from_this()] {
+ [this, token, anchor = shared_from_this()] {
const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
@@ -188,26 +202,30 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
hangBeforeCloningData.pauseWhileSet(opCtx);
}
- const auto& collectionsToClone = getUnshardedCollections(opCtx);
- assertNoOrphanedDataOnRecipient(opCtx, collectionsToClone);
-
- _doc.setCollectionsToClone(collectionsToClone);
- _updateStateDocument(opCtx, StateDoc(_doc));
-
- const auto& clonedCollections = cloneDataToRecipient(opCtx);
- assertClonedData(clonedCollections);
+ if (useOnlineCloner()) {
+ if (!_onlineCloner) {
+ createOnlineCloner(opCtx);
+ }
+ cloneDataUntilReadyForCatchup(opCtx, token);
+ } else {
+ cloneDataLegacy(opCtx);
+ }
// TODO (SERVER-71566): Temporary solution to cover the case of stepping down before
// actually entering the `kCatchup` phase.
blockWrites(opCtx);
}))
.then(_buildPhaseHandler(Phase::kCatchup,
- [this, anchor = shared_from_this()] {
+ [this, token, anchor = shared_from_this()] {
const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
blockWrites(opCtx);
+ if (useOnlineCloner()) {
+ informOnlineClonerOfBlockingWrites(opCtx);
+ waitUntilOnlineClonerPrepared(token);
+ }
}))
.then(_buildPhaseHandler(Phase::kEnterCriticalSection,
[this, executor, anchor = shared_from_this()] {
@@ -226,7 +244,9 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
}
blockReads(opCtx);
- enterCriticalSectionOnRecipient(opCtx);
+ if (!useOnlineCloner()) {
+ enterCriticalSectionOnRecipient(opCtx);
+ }
}))
.then(_buildPhaseHandler(
Phase::kCommit,
@@ -261,7 +281,7 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
dropStaleDataOnDonor(opCtx);
}))
.then(_buildPhaseHandler(Phase::kExitCriticalSection,
- [this, executor, anchor = shared_from_this()] {
+ [this, executor, token, anchor = shared_from_this()] {
const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
@@ -277,7 +297,11 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
}
unblockReadsAndWrites(opCtx);
- exitCriticalSectionOnRecipient(opCtx);
+ if (useOnlineCloner()) {
+ cleanupOnlineCloner(opCtx, token);
+ } else {
+ exitCriticalSectionOnRecipient(opCtx);
+ }
LOGV2(7120206,
"Completed movePrimary operation",
@@ -292,7 +316,8 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
getForwardableOpMetadata().setOn(opCtx);
const auto& failedPhase = _doc.getPhase();
- if (failedPhase == Phase::kClone || status == ErrorCodes::ShardNotFound) {
+ if (_onlineCloner || failedPhase == Phase::kClone ||
+ status == ErrorCodes::ShardNotFound) {
LOGV2_DEBUG(7392900,
1,
"Triggering movePrimary cleanup",
@@ -306,12 +331,70 @@ ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
});
}
+bool MovePrimaryCoordinator::onlineClonerPossiblyNeverCreated() const {
+ // Either the first run of this service, or failed over before online cloner persisted its
+ // state document.
+ auto phase = _doc.getPhase();
+ return phase <= Phase::kClone;
+}
+
+bool MovePrimaryCoordinator::onlineClonerPossiblyCleanedUp() const {
+ // Could have failed over between the online cloner deleting its state document and the
+ // coordinator deleting its state document.
+ auto phase = _doc.getPhase();
+ return phase == Phase::kExitCriticalSection || getAbortReason();
+}
+
+bool MovePrimaryCoordinator::onlineClonerAllowedToBeMissing() const {
+ return onlineClonerPossiblyNeverCreated() || onlineClonerPossiblyCleanedUp();
+}
+
+void MovePrimaryCoordinator::recoverOnlineCloner(OperationContext* opCtx) {
+ _onlineCloner = MovePrimaryDonor::get(opCtx, _dbName, _doc.getToShardId());
+ if (_onlineCloner) {
+ return;
+ }
+ invariant(onlineClonerAllowedToBeMissing());
+}
+
+void MovePrimaryCoordinator::createOnlineCloner(OperationContext* opCtx) {
+ invariant(onlineClonerPossiblyNeverCreated());
+ _onlineCloner = MovePrimaryDonor::create(opCtx, _dbName, _doc.getToShardId());
+}
+
+void MovePrimaryCoordinator::cloneDataUntilReadyForCatchup(OperationContext* opCtx,
+ const CancellationToken& token) {
+ future_util::withCancellation(_onlineCloner->getReadyToBlockWritesFuture(), token).get();
+}
+
+void MovePrimaryCoordinator::cloneDataLegacy(OperationContext* opCtx) {
+ const auto& collectionsToClone = getUnshardedCollections(opCtx);
+ assertNoOrphanedDataOnRecipient(opCtx, collectionsToClone);
+
+ _doc.setCollectionsToClone(collectionsToClone);
+ _updateStateDocument(opCtx, StateDoc(_doc));
+
+ const auto& clonedCollections = cloneDataToRecipient(opCtx);
+ assertClonedData(clonedCollections);
+}
+
+void MovePrimaryCoordinator::informOnlineClonerOfBlockingWrites(OperationContext* opCtx) {
+ auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
+ replClient.setLastOpToSystemLastOpTime(opCtx);
+ const auto latestOpTime = replClient.getLastOp();
+ _onlineCloner->onBeganBlockingWrites(latestOpTime.getTimestamp());
+}
+
+void MovePrimaryCoordinator::waitUntilOnlineClonerPrepared(const CancellationToken& token) {
+ future_util::withCancellation(_onlineCloner->getDecisionFuture(), token).get();
+}
+
ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token,
const Status& status) noexcept {
return ExecutorFuture<void>(**executor)
- .then([this, executor, status, anchor = shared_from_this()] {
+ .then([this, executor, token, status, anchor = shared_from_this()] {
const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
@@ -320,48 +403,77 @@ ExecutorFuture<void> MovePrimaryCoordinator::_cleanupOnAbort(
_performNoopRetryableWriteOnAllShardsAndConfigsvr(
opCtx, getCurrentSession(), **executor);
- const auto& failedPhase = _doc.getPhase();
- const auto& toShardId = _doc.getToShardId();
-
- if (failedPhase <= Phase::kCommit) {
- // A non-retryable error occurred before the new primary shard was actually
- // committed, so any cloned data on the recipient must be dropped.
-
- try {
- // Even if the error is `ShardNotFound`, the recipient may still be in draining
- // mode, so try to drop any orphaned data anyway.
- dropOrphanedDataOnRecipient(opCtx, executor);
- } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
- LOGV2_INFO(7392901,
- "Failed to remove orphaned data on recipient as it has been removed",
- logAttrs(_dbName),
- "to"_attr = toShardId);
- }
- }
-
- unblockReadsAndWrites(opCtx);
- try {
- // Even if the error is `ShardNotFound`, the recipient may still be in draining
- // mode, so try to exit the critical section anyway.
- exitCriticalSectionOnRecipient(opCtx);
- } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
- LOGV2_INFO(7392902,
- "Failed to exit critical section on recipient as it has been removed",
- logAttrs(_dbName),
- "to"_attr = toShardId);
+ if (useOnlineCloner()) {
+ cleanupOnAbortWithOnlineCloner(opCtx, token, status);
+ } else {
+ cleanupOnAbortWithoutOnlineCloner(opCtx, executor);
}
LOGV2_ERROR(7392903,
"Failed movePrimary operation",
logAttrs(_dbName),
- "to"_attr = toShardId,
- "phase"_attr = serializePhase(failedPhase),
+ "to"_attr = _doc.getToShardId(),
+ "phase"_attr = serializePhase(_doc.getPhase()),
"error"_attr = redact(status));
logChange(opCtx, "error", status);
});
}
+void MovePrimaryCoordinator::cleanupOnAbortWithoutOnlineCloner(
+ OperationContext* opCtx, std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ const auto& failedPhase = _doc.getPhase();
+ const auto& toShardId = _doc.getToShardId();
+
+ if (failedPhase <= Phase::kCommit) {
+ // A non-retryable error occurred before the new primary shard was actually
+ // committed, so any cloned data on the recipient must be dropped.
+
+ try {
+ // Even if the error is `ShardNotFound`, the recipient may still be in draining
+ // mode, so try to drop any orphaned data anyway.
+ dropOrphanedDataOnRecipient(opCtx, executor);
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
+ LOGV2_INFO(7392901,
+ "Failed to remove orphaned data on recipient as it has been removed",
+ logAttrs(_dbName),
+ "to"_attr = toShardId);
+ }
+ }
+
+ unblockReadsAndWrites(opCtx);
+ try {
+ // Even if the error is `ShardNotFound`, the recipient may still be in draining
+ // mode, so try to exit the critical section anyway.
+ exitCriticalSectionOnRecipient(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>&) {
+ LOGV2_INFO(7392902,
+ "Failed to exit critical section on recipient as it has been removed",
+ logAttrs(_dbName),
+ "to"_attr = toShardId);
+ }
+}
+
+void MovePrimaryCoordinator::cleanupOnlineCloner(OperationContext* opCtx,
+ const CancellationToken& token) {
+ if (!_onlineCloner) {
+ return;
+ }
+ _onlineCloner->onReadyToForget();
+ future_util::withCancellation(_onlineCloner->getCompletionFuture(), token).wait();
+}
+
+void MovePrimaryCoordinator::cleanupOnAbortWithOnlineCloner(OperationContext* opCtx,
+ const CancellationToken& token,
+ const Status& status) {
+ unblockReadsAndWrites(opCtx);
+ if (!_onlineCloner) {
+ return;
+ }
+ _onlineCloner->abort(status);
+ cleanupOnlineCloner(opCtx, token);
+}
+
void MovePrimaryCoordinator::logChange(OperationContext* opCtx,
const std::string& what,
const Status& status) const {
@@ -376,7 +488,7 @@ void MovePrimaryCoordinator::logChange(OperationContext* opCtx,
}
std::vector<NamespaceString> MovePrimaryCoordinator::getUnshardedCollections(
- OperationContext* opCtx) {
+ OperationContext* opCtx) const {
const auto allCollections = [&] {
DBDirectClient dbClient(opCtx);
const auto collInfos =
@@ -579,8 +691,14 @@ void MovePrimaryCoordinator::dropStaleDataOnDonor(OperationContext* opCtx) const
WriteBlockBypass::get(opCtx).set(true);
DBDirectClient dbClient(opCtx);
- invariant(_doc.getCollectionsToClone());
- for (const auto& nss : *_doc.getCollectionsToClone()) {
+ auto unshardedCollections = [this, opCtx] {
+ if (useOnlineCloner()) {
+ return getUnshardedCollections(opCtx);
+ }
+ invariant(_doc.getCollectionsToClone());
+ return *_doc.getCollectionsToClone();
+ }();
+ for (const auto& nss : unshardedCollections) {
const auto dropStatus = [&] {
BSONObj dropResult;
dbClient.runCommand(_dbName, BSON("drop" << nss.coll()), dropResult);
diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h
index 5136f73a0ee..22a03c2a460 100644
--- a/src/mongo/db/s/move_primary_coordinator.h
+++ b/src/mongo/db/s/move_primary_coordinator.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/s/move_primary/move_primary_donor_service.h"
#include "mongo/db/s/move_primary_coordinator_document_gen.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
#include "mongo/s/client/shard.h"
@@ -62,6 +63,25 @@ private:
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept;
+ bool onlineClonerPossiblyNeverCreated() const;
+ bool onlineClonerPossiblyCleanedUp() const;
+ bool onlineClonerAllowedToBeMissing() const;
+ void recoverOnlineCloner(OperationContext* opCtx);
+ void createOnlineCloner(OperationContext* opCtx);
+
+ /**
+ * Clone data to the recipient without using the online cloning machinery.
+ */
+ void cloneDataLegacy(OperationContext* opCtx);
+
+ /**
+ * Clone data to the recipient using the online cloning machinery.
+ */
+ void cloneDataUntilReadyForCatchup(OperationContext* opCtx, const CancellationToken& token);
+
+ void informOnlineClonerOfBlockingWrites(OperationContext* opCtx);
+ void waitUntilOnlineClonerPrepared(const CancellationToken& token);
+
/**
* Logs in the `config.changelog` collection a specific event for `movePrimary` operations.
*/
@@ -73,7 +93,7 @@ private:
* Returns the list of unsharded collections for the given database. These are the collections
* the recipient is expected to clone.
*/
- std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx);
+ std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx) const;
/**
* Ensures that there are no orphaned collections in the recipient's catalog data, asserting
@@ -82,6 +102,7 @@ private:
void assertNoOrphanedDataOnRecipient(
OperationContext* opCtx, const std::vector<NamespaceString>& collectionsToClone) const;
+
/**
* Requests to the recipient to clone all the collections of the given database currently owned
* by this shard. Once the cloning is complete, the recipient returns the list of the actually
@@ -177,8 +198,16 @@ private:
*/
void exitCriticalSectionOnRecipient(OperationContext* opCtx) const;
+ void cleanupOnlineCloner(OperationContext* opCtx, const CancellationToken& token);
+ void cleanupOnAbortWithoutOnlineCloner(OperationContext* opCtx,
+ std::shared_ptr<executor::ScopedTaskExecutor> executor);
+ void cleanupOnAbortWithOnlineCloner(OperationContext* opCtx,
+ const CancellationToken& token,
+ const Status& status);
+
const DatabaseName _dbName;
const BSONObj _csReason;
+ std::shared_ptr<MovePrimaryDonor> _onlineCloner;
};
} // namespace mongo