From 616564d1a5ea7217aa8b1e7f4c6d28800f0b65c9 Mon Sep 17 00:00:00 2001 From: Alex Taskov Date: Fri, 21 Feb 2020 16:26:45 -0500 Subject: SERVER-45952 moveChunk command should re-use lsid and use txnNumber that increments by two for each migration --- src/mongo/db/s/migration_chunk_cloner_source.h | 3 + .../db/s/migration_chunk_cloner_source_legacy.cpp | 113 +++++++++++++++------ .../db/s/migration_chunk_cloner_source_legacy.h | 3 + src/mongo/db/s/migration_coordinator.cpp | 56 ++++++++-- src/mongo/db/s/migration_coordinator.h | 9 +- src/mongo/db/s/migration_coordinator_document.idl | 3 + src/mongo/db/s/migration_source_manager.cpp | 22 ++-- src/mongo/db/s/migration_source_manager.h | 1 - src/mongo/db/s/migration_util.cpp | 19 +--- 9 files changed, 161 insertions(+), 68 deletions(-) (limited to 'src/mongo/db/s') diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 00e17e8fe3a..8d8fd7fa403 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -79,6 +79,9 @@ public: const LogicalSessionId& lsid, TxnNumber txnNumber) = 0; + // TODO (SERVER-44787): Remove this function after 4.4 is released. + virtual Status startClone(OperationContext* opCtx) = 0; + /** * Blocking method, which uses some custom selected logic for deciding whether it is appropriate * for the donor shard to enter critical section. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0b6d87cd791..a3941cfde46 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -44,7 +44,6 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_source_manager.h" -#include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" @@ -283,37 +282,91 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; - auto fcvVersion = serverGlobalParams.featureCompatibility.getVersion(); - if (fcvVersion == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44 && - !disableResumableRangeDeleter.load()) { - StartChunkCloneRequest::appendAsCommand(&cmdBuilder, - _args.getNss(), - migrationId, - lsid, - txnNumber, - _sessionId, - _donorConnStr, - _args.getFromShardId(), - _args.getToShardId(), - _args.getMinKey(), - _args.getMaxKey(), - _shardKeyPattern.toBSON(), - _args.getSecondaryThrottle()); - } else { - // TODO (SERVER-44787): Remove this overload after 4.4 is released AND - // disableResumableRangeDeleter has been removed from server parameters. - StartChunkCloneRequest::appendAsCommand(&cmdBuilder, - _args.getNss(), - _sessionId, - _donorConnStr, - _args.getFromShardId(), - _args.getToShardId(), - _args.getMinKey(), - _args.getMaxKey(), - _shardKeyPattern.toBSON(), - _args.getSecondaryThrottle()); + StartChunkCloneRequest::appendAsCommand(&cmdBuilder, + _args.getNss(), + migrationId, + lsid, + txnNumber, + _sessionId, + _donorConnStr, + _args.getFromShardId(), + _args.getToShardId(), + _args.getMinKey(), + _args.getMaxKey(), + _shardKeyPattern.toBSON(), + _args.getSecondaryThrottle()); + + auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj()); + if (!startChunkCloneResponseStatus.isOK()) { + return startChunkCloneResponseStatus.getStatus(); + } + + // TODO (Kal): Setting the state to kCloning below means that if cancelClone was called we will + // send a cancellation command to the recipient. The reason to limit the cases when we send + // cancellation is for backwards compatibility with 3.2 nodes, which cannot differentiate + // between cancellations for different migration sessions. It is thus possible that a second + // migration from different donor, but the same recipient would certainly abort an already + // running migration. + stdx::lock_guard sl(_mutex); + _state = kCloning; + + return Status::OK(); +} + +// TODO (SERVER-44787): Remove this overload after 4.4 is released AND +// disableResumableRangeDeleter has been removed from server parameters. +Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { + invariant(_state == kNew); + invariant(!opCtx->lockState()->isLocked()); + + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { + _sessionCatalogSource = std::make_unique( + opCtx, + _args.getNss(), + ChunkRange(_args.getMinKey(), _args.getMaxKey()), + _shardKeyPattern.getKeyPattern()); + + // Prime up the session migration source if there are oplog entries to migrate. + _sessionCatalogSource->fetchNextOplog(opCtx); } + { + // Ignore prepare conflicts when we load ids of currently available documents. This is + // acceptable because we will track changes made by prepared transactions at transaction + // commit time. + auto originalPrepareConflictBehavior = opCtx->recoveryUnit()->getPrepareConflictBehavior(); + + ON_BLOCK_EXIT([&] { + opCtx->recoveryUnit()->setPrepareConflictBehavior(originalPrepareConflictBehavior); + }); + + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflicts); + + auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); + if (storeCurrentLocsStatus == ErrorCodes::ChunkTooBig && _forceJumbo) { + stdx::lock_guard sl(_mutex); + _jumboChunkCloneState.emplace(); + } else if (!storeCurrentLocsStatus.isOK()) { + return storeCurrentLocsStatus; + } + } + + // Tell the recipient shard to start cloning + BSONObjBuilder cmdBuilder; + + StartChunkCloneRequest::appendAsCommand(&cmdBuilder, + _args.getNss(), + _sessionId, + _donorConnStr, + _args.getFromShardId(), + _args.getToShardId(), + _args.getMinKey(), + _args.getMaxKey(), + _shardKeyPattern.toBSON(), + _args.getSecondaryThrottle()); + auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj()); if (!startChunkCloneResponseStatus.isOK()) { return startChunkCloneResponseStatus.getStatus(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 8e34e2033f3..7990615e38f 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -95,6 +95,9 @@ public: const LogicalSessionId& lsid, TxnNumber txnNumber) override; + // TODO (SERVER-44787): Remove this function after 4.4 is released. + Status startClone(OperationContext* opCtx) override; + Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx, Milliseconds maxTimeToWait) override; diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 1e38cc4060c..58eae0dfe11 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -33,9 +33,11 @@ #include "mongo/db/s/migration_coordinator.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -49,11 +51,23 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbortDecision); MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterCommitDecision); MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision); +namespace { + +LogicalSessionId getSystemLogicalSessionId() { + static auto lsid = makeSystemLogicalSessionId(); + return lsid; +} + +TxnNumber getNextTxnNumber() { + static AtomicWord nextTxnNumber{0}; + return nextTxnNumber.fetchAndAdd(2); +} + +} // namespace + namespace migrationutil { -MigrationCoordinator::MigrationCoordinator(UUID migrationId, - MigrationSessionId sessionId, - LogicalSessionId lsid, +MigrationCoordinator::MigrationCoordinator(MigrationSessionId sessionId, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -61,9 +75,10 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId, ChunkRange range, ChunkVersion preMigrationChunkVersion, bool waitForDelete) - : _migrationInfo(migrationId, + : _migrationInfo(UUID::gen(), std::move(sessionId), - std::move(lsid), + getSystemLogicalSessionId(), + getNextTxnNumber(), std::move(collectionNamespace), collectionUuid, std::move(donorShard), @@ -72,8 +87,23 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId, std::move(preMigrationChunkVersion)), _waitForDelete(waitForDelete) {} +MigrationCoordinator::MigrationCoordinator(const MigrationCoordinatorDocument& doc) + : _migrationInfo(doc) {} + MigrationCoordinator::~MigrationCoordinator() = default; +const UUID& MigrationCoordinator::getMigrationId() const { + return _migrationInfo.getId(); +} + +const LogicalSessionId& MigrationCoordinator::getLsid() const { + return _migrationInfo.getLsid(); +} + +TxnNumber MigrationCoordinator::getTxnNumber() const { + return _migrationInfo.getTxnNumber(); +} + void MigrationCoordinator::startMigration(OperationContext* opCtx) { LOGV2( 23889, "{logPrefix}Persisting migration coordinator doc", "logPrefix"_attr = _logPrefix()); @@ -154,9 +184,11 @@ SemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( "logPrefix"_attr = _logPrefix(), "migrationInfo_getRecipientShardId"_attr = _migrationInfo.getRecipientShardId(), "migrationInfo_getLsid"_attr = _migrationInfo.getLsid().toBSON(), - "TxnNumber_1"_attr = TxnNumber{1}); - migrationutil::advanceTransactionOnRecipient( - opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + "TxnNumber"_attr = _migrationInfo.getTxnNumber()); + migrationutil::advanceTransactionOnRecipient(opCtx, + _migrationInfo.getRecipientShardId(), + _migrationInfo.getLsid(), + _migrationInfo.getTxnNumber()); hangBeforeSendingCommitDecision.pauseWhileSet(); @@ -197,9 +229,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* "logPrefix"_attr = _logPrefix(), "migrationInfo_getRecipientShardId"_attr = _migrationInfo.getRecipientShardId(), "migrationInfo_getLsid"_attr = _migrationInfo.getLsid().toBSON(), - "TxnNumber_1"_attr = TxnNumber{1}); - migrationutil::advanceTransactionOnRecipient( - opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + "TxnNumber"_attr = _migrationInfo.getTxnNumber()); + migrationutil::advanceTransactionOnRecipient(opCtx, + _migrationInfo.getRecipientShardId(), + _migrationInfo.getLsid(), + _migrationInfo.getTxnNumber()); hangBeforeSendingAbortDecision.pauseWhileSet(); diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index 9b2ccf7e672..94ba7461ad2 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -45,9 +45,7 @@ class MigrationCoordinator { public: enum class Decision { kAborted, kCommitted }; - MigrationCoordinator(UUID migrationId, - MigrationSessionId sessionId, - LogicalSessionId lsid, + MigrationCoordinator(MigrationSessionId sessionId, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -55,6 +53,7 @@ public: ChunkRange range, ChunkVersion preMigrationChunkVersion, bool waitForDelete); + MigrationCoordinator(const MigrationCoordinatorDocument& doc); MigrationCoordinator(const MigrationCoordinator&) = delete; MigrationCoordinator& operator=(const MigrationCoordinator&) = delete; MigrationCoordinator(MigrationCoordinator&&) = delete; @@ -62,6 +61,10 @@ public: ~MigrationCoordinator(); + const UUID& getMigrationId() const; + const LogicalSessionId& getLsid() const; + TxnNumber getTxnNumber() const; + /** * Initializes persistent state required to ensure that orphaned ranges are properly handled, * even after failover, by doing the following: diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl index 5308a4545ae..028cb194582 100644 --- a/src/mongo/db/s/migration_coordinator_document.idl +++ b/src/mongo/db/s/migration_coordinator_document.idl @@ -75,6 +75,9 @@ structs: lsid: type: LogicalSessionId description: "The sessionId to use to communicate with the recipient" + txnNumber: + type: TxnNumber + description: "The last txnNumber used to communicate with the recipient" nss: type: namespacestring description: "The namespace of the collection that the chunk belongs to." diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 77382fb15b6..049609000eb 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -262,9 +262,6 @@ Status MigrationSourceManager::startClone() { auto replCoord = repl::ReplicationCoordinator::get(_opCtx); auto replEnabled = replCoord->isReplEnabled(); - UUID migrationId = UUID::gen(); - _lsid = makeLogicalSessionId(_opCtx); - { const auto metadata = _getCurrentMetadataAndCheckEpoch(); @@ -298,9 +295,7 @@ Status MigrationSourceManager::startClone() { if (_enableResumableRangeDeleter) { _coordinator = std::make_unique( - migrationId, _cloneDriver->getSessionId(), - _lsid, _args.getFromShardId(), _args.getToShardId(), getNss(), @@ -324,11 +319,19 @@ Status MigrationSourceManager::startClone() { if (_enableResumableRangeDeleter) { _coordinator->startMigration(_opCtx); - } - Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId, _lsid, TxnNumber{0}); - if (!startCloneStatus.isOK()) { - return startCloneStatus; + Status startCloneStatus = _cloneDriver->startClone(_opCtx, + _coordinator->getMigrationId(), + _coordinator->getLsid(), + _coordinator->getTxnNumber()); + if (!startCloneStatus.isOK()) { + return startCloneStatus; + } + } else { + Status startCloneStatus = _cloneDriver->startClone(_opCtx); + if (!startCloneStatus.isOK()) { + return startCloneStatus; + } } scopedGuard.dismiss(); @@ -765,6 +768,7 @@ void MigrationSourceManager::_cleanup() { auto newOpCtxPtr = cc().makeOperationContext(); auto newOpCtx = newOpCtxPtr.get(); _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx); + _coordinator.reset(); } } diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 09e58eea15e..f335d2d19c9 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -270,7 +270,6 @@ private: BSONObj _recipientCloneCounts; boost::optional _critSec; - LogicalSessionId _lsid; // Optional future that is populated if the migration succeeds and range deletion is scheduled // on this node. The future is set when the range deletion completes. Used if the moveChunk was diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 75b0c8a24c2..593e660ff26 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -611,7 +611,7 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, void advanceTransactionOnRecipient(OperationContext* opCtx, const ShardId& recipientId, const LogicalSessionId& lsid, - TxnNumber txnNumber) { + TxnNumber currentTxnNumber) { write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace); auto queryFilter = BSON("_id" << "migrationCoordinatorStats"); @@ -622,9 +622,9 @@ void advanceTransactionOnRecipient(OperationContext* opCtx, updateEntry.setUpsert(true); updateOp.setUpdates({updateEntry}); - auto passthroughFields = - BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid" - << lsid.toBSON() << "txnNumber" << txnNumber); + auto passthroughFields = BSON(WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::Majority << "lsid" << lsid.toBSON() + << "txnNumber" << currentTxnNumber + 1); sendToRecipient(opCtx, recipientId, updateOp, passthroughFields); } @@ -770,16 +770,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { LOGV2(22039, "Recovering migration {doc}", "doc"_attr = doc.toBSON()); // Create a MigrationCoordinator to complete the coordination. - MigrationCoordinator coordinator(doc.getId(), - doc.getMigrationSessionId(), - doc.getLsid(), - doc.getDonorShardId(), - doc.getRecipientShardId(), - doc.getNss(), - doc.getCollectionUuid(), - doc.getRange(), - doc.getPreMigrationChunkVersion(), - false /* waitForDelete */); + MigrationCoordinator coordinator(doc); if (doc.getDecision()) { // The decision is already known. -- cgit v1.2.1