diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator_document.idl | 19 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 9 |
10 files changed, 114 insertions, 21 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index cb4ebb53a29..00e17e8fe3a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -35,6 +35,7 @@ namespace mongo { class BSONObj; +class MigrationSessionId; class OperationContext; class Status; class Timestamp; @@ -161,6 +162,12 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) = 0; + /** + * Returns the migration session id associated with this cloner, so stale sessions can be + * disambiguated. + */ + virtual const MigrationSessionId& getSessionId() const = 0; + protected: MigrationChunkClonerSource(); }; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index d5f56e196ed..8e34e2033f3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -119,16 +119,12 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) override; - // Legacy cloner specific functionality - - /** - * Returns the migration session id associated with this cloner, so stale sessions can be - * disambiguated. - */ - const MigrationSessionId& getSessionId() const { + const MigrationSessionId& getSessionId() const override { return _sessionId; } + // Legacy cloner specific functionality + /** * Returns the rollback ID recorded at the beginning of session migration. If the underlying * SessionCatalogMigrationSource does not exist, that means this node is running as a standalone diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 597ca085a45..aaacbc145c7 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -52,6 +52,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision); namespace migrationutil { MigrationCoordinator::MigrationCoordinator(UUID migrationId, + MigrationSessionId sessionId, + LogicalSessionId lsid, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -60,6 +62,8 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId, ChunkVersion preMigrationChunkVersion, bool waitForDelete) : _migrationInfo(migrationId, + std::move(sessionId), + std::move(lsid), std::move(collectionNamespace), collectionUuid, std::move(donorShard), @@ -108,6 +112,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat << " to self and to recipient"; boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none; + switch (*_decision) { case Decision::kAborted: _abortMigrationOnDonorAndRecipient(opCtx); @@ -118,7 +123,9 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet(); break; } + forgetMigration(opCtx); + return cleanupCompleteFuture; } @@ -129,6 +136,11 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( LOG(0) << _logPrefix() << "Making commit decision durable"; migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId()); + LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId() + << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1}; + migrationutil::advanceTransactionOnRecipient( + opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + hangBeforeSendingCommitDecision.pauseWhileSet(); LOG(0) << _logPrefix() << "Deleting range deletion task on recipient"; @@ -156,6 +168,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* LOG(0) << _logPrefix() << "Making abort decision durable"; migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); + LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId() + << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1}; + migrationutil::advanceTransactionOnRecipient( + opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + hangBeforeSendingAbortDecision.pauseWhileSet(); LOG(0) << _logPrefix() << "Deleting range deletion task on donor"; diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index be7e9074bf0..9b2ccf7e672 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/logical_session_id.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_coordinator_document_gen.h" #include "mongo/s/catalog/type_chunk.h" @@ -45,6 +46,8 @@ public: enum class Decision { kAborted, kCommitted }; MigrationCoordinator(UUID migrationId, + MigrationSessionId sessionId, + LogicalSessionId lsid, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -63,10 +66,10 @@ public: * Initializes persistent state required to ensure that orphaned ranges are properly handled, * even after failover, by doing the following: * - * 1) Inserts a document into the local config.migrationCoordinators with the recipientId and - * waits for majority writeConcern. 2) Inserts a document into the local config.rangeDeletions - * with the collectionUUID, range to delete, and "pending: true" and waits for majority - * writeConcern. + * 1) Inserts a document into the local config.migrationCoordinators with the lsid and + * recipientId and waits for majority writeConcern. 2) Inserts a document into the local + * config.rangeDeletions with the collectionUUID, range to delete, and "pending: true" and waits + * for majority writeConcern. */ void startMigration(OperationContext* opCtx); diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl index c1001c0c748..5308a4545ae 100644 --- a/src/mongo/db/s/migration_coordinator_document.idl +++ b/src/mongo/db/s/migration_coordinator_document.idl @@ -31,12 +31,15 @@ global: cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/s/migration_session_id.h" imports: - "mongo/idl/basic_types.idl" - "mongo/s/sharding_types.idl" - "mongo/s/chunk_range.idl" - "mongo/s/chunk_version.idl" + - "mongo/db/logical_session_id.idl" enums: Decision: @@ -46,6 +49,16 @@ enums: kCommitted: "committed" kAborted: "aborted" +types: + MigrationSessionId: + bson_serialization_type: string + description: "The migration session id is the legacy unique identifier for a particular + moveChunk command and is exchanged as part of all communication between the + donor and recipient shards." + cpp_type: "mongo::MigrationSessionId" + deserializer: "mongo::MigrationSessionId::fromString" + serializer: "mongo::MigrationSessionId::toString" + structs: migrationCoordinatorDocument: description: "Represents an in-progress migration on the migration donor." @@ -56,6 +69,12 @@ structs: type: uuid description: "A unique identifier for the migration." cpp_name: id + migrationSessionId: + type: MigrationSessionId + description: "A legacy unique identifier for the migration session." + lsid: + type: LogicalSessionId + description: "The sessionId to use to communicate with the recipient" nss: type: namespacestring description: "The namespace of the collection that the chunk belongs to." diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8124f36a16f..48edd0d463c 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -218,6 +218,7 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6); MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient); MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation); +MONGO_FAIL_POINT_DEFINE(hangOnRecipientFailure); } // namespace @@ -798,6 +799,8 @@ void MigrationDestinationManager::_migrateThread() { _migrateDriver(opCtx); } } catch (...) { + log() << "In catch handler"; + hangOnRecipientFailure.pauseWhileSet(); _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h index 923427822de..e3ad9deebdc 100644 --- a/src/mongo/db/s/migration_session_id.h +++ b/src/mongo/db/s/migration_session_id.h @@ -44,10 +44,12 @@ class StatusWith; /** * Encapsulates the logic for generating, parsing and comparing migration sessions. The migration * session id is a unique identifier for a particular moveChunk command and is exchanged as part of - * all communication between the source and donor shards. + * all communication between the donor and recipient shards. */ class MigrationSessionId { public: + MigrationSessionId() = default; + /** * Constructs a new migration session identifier with the following format: * DonorId_RecipientId_UniqueIdentifier @@ -78,6 +80,11 @@ public: std::string toString() const; + static MigrationSessionId fromString(StringData sessionId) { + MigrationSessionId id(sessionId.toString()); + return id; + } + private: explicit MigrationSessionId(std::string sessionId); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 1e7319a92e9..505622e074d 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -300,6 +300,8 @@ Status MigrationSourceManager::startClone() { if (_enableResumableRangeDeleter) { _coordinator = std::make_unique<migrationutil::MigrationCoordinator>( migrationId, + _cloneDriver->getSessionId(), + _lsid, _args.getFromShardId(), _args.getToShardId(), getNss(), @@ -765,8 +767,6 @@ void MigrationSourceManager::_cleanup() { auto newOpCtx = newOpCtxPtr.get(); _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx); } - - LogicalSessionCache::get(_opCtx)->endSessions({_lsid}); } _state = kDone; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 770dbdd0ea9..c7eb2183d85 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -83,17 +83,21 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::kNoTimeout); template <typename Cmd> -void sendToRecipient(OperationContext* opCtx, const ShardId& recipientId, const Cmd& cmd) { +void sendToRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const Cmd& cmd, + const BSONObj& passthroughFields = {}) { auto recipientShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, recipientId)); - LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmd.toBSON({})); + auto cmdBSON = cmd.toBSON(passthroughFields); + LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmdBSON); auto response = recipientShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "config", - cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)), + cmd.getDbName().toString(), + cmdBSON, Shard::RetryPolicy::kIdempotent); uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response)); @@ -416,7 +420,10 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, false /*multi*/); deleteOp.setDeletes({query}); - sendToRecipient(opCtx, recipientId, deleteOp); + sendToRecipient(opCtx, + recipientId, + deleteOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); } void deleteRangeDeletionTaskLocally(OperationContext* opCtx, @@ -444,7 +451,30 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, updateEntry.setUpsert(false); updateOp.setUpdates({updateEntry}); - sendToRecipient(opCtx, recipientId, updateOp); + sendToRecipient(opCtx, + recipientId, + updateOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); +} + +void advanceTransactionOnRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { + write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace); + auto queryFilter = BSON("_id" + << "migrationCoordinatorStats"); + auto updateModification = write_ops::UpdateModification(BSON("$inc" << BSON("count" << 1))); + + write_ops::UpdateOpEntry updateEntry(queryFilter, updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(true); + updateOp.setUpdates({updateEntry}); + + auto passthroughFields = + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid" + << lsid.toBSON() << "txnNumber" << txnNumber); + sendToRecipient(opCtx, recipientId, updateOp, passthroughFields); } void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) { @@ -610,7 +640,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { // Wait for the latest OpTime to be majority committed to ensure any decision that is // read is on the true branch of history. // Note (Esha): I don't think this is strictly required for correctness, but it is - // is difficult to reason about, and being pessimistic by waiting for the decision to be + // difficult to reason about, and being pessimistic by waiting for the decision to be // majority committed does not cost much, since stepup should be rare. It *is* required // that this node ensure a decision that it itself recovers is majority committed. For // example, it is possible that this node is a stale primary, and the true primary has @@ -647,6 +677,8 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { // Create a MigrationCoordinator to complete the coordination. MigrationCoordinator coordinator(doc.getId(), + doc.getMigrationSessionId(), + doc.getLsid(), doc.getDonorShardId(), doc.getRecipientShardId(), doc.getNss(), diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 1b47486159f..951b55e305e 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -161,6 +161,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, const UUID& migrationId); /** + * Advances the optime for the current transaction by performing a write operation as a retryable + * write. This is to prevent a write of the deletion task once the decision has been recorded. + */ +void advanceTransactionOnRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const LogicalSessionId& lsid, + TxnNumber txnNumber); + +/** * Removes the 'pending' flag from the range deletion task document with the specified id from * config.rangeDeletions and waits for majority write concern. This marks the range as ready for * deletion. |