diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2020-02-19 18:45:49 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-20 14:19:15 +0000 |
commit | 742fa16dc379a281ad3c10e8c196d5ab2de9f70e (patch) | |
tree | 9e4c4700e921ca2cab58e9909ec764667f6e6ce9 | |
parent | 32f47846d78a4fdae9564b7ebb442d53e737d845 (diff) | |
download | mongo-742fa16dc379a281ad3c10e8c196d5ab2de9f70e.tar.gz |
SERVER-45744 Migration coordinator recovery task should bump the txnNumber on the session used to send _recvChunkStart
create mode 100644 jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
-rw-r--r-- | jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js | 195 | ||||
-rw-r--r-- | jstests/sharding/migration_ignore_interrupts_3.js | 7 | ||||
-rw-r--r-- | jstests/sharding/migration_ignore_interrupts_4.js | 7 | ||||
-rw-r--r-- | jstests/sharding/move_jumbo_chunk.js | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator_document.idl | 19 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_session_id.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 9 |
14 files changed, 328 insertions, 24 deletions
diff --git a/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js b/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js new file mode 100644 index 00000000000..f4e1520ddbb --- /dev/null +++ b/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js @@ -0,0 +1,195 @@ +/* + * Tests that the deletion task is not written if the donor recovers after the decision is + * recorded. + * + * @tags: [requires_fcv_44] + */ + +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallel_shell_helpers.js'); + +const dbName = "test"; + +// Create 2 shards with 3 replicas each. +let st = new ShardingTest({shards: {rs0: {nodes: 3}, rs1: {nodes: 3}}}); + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName})); + +function getNewNs(dbName) { + if (typeof getNewNs.counter == 'undefined') { + getNewNs.counter = 0; + } + getNewNs.counter++; + const collName = "ns" + getNewNs.counter; + return [collName, dbName + "." + collName]; +} + +function moveChunkParallel(ns, toShard) { + return startParallelShell(funWithArgs(function(ns, toShard) { + db.adminCommand({moveChunk: ns, find: {x: 50}, to: toShard}); + }, ns, toShard), st.s.port); +} + +function sendRecvChunkStart(conn, ns, id, sessionId, lsid, from, to) { + let cmd = { + _recvChunkStart: ns, + uuid: id, + lsid: lsid, + txnNumber: NumberLong(0), + sessionId: sessionId, + from: from.host, + fromShardName: from.shardName, + toShardName: to.shardName, + min: {x: 50.0}, + max: {x: MaxKey}, + shardKeyPattern: {x: 1.0} + }; + + return conn.getDB("admin").runCommand(cmd); +} + +function sendRecvChunkStatus(conn, ns, sessionId) { + let cmd = { + _recvChunkStatus: ns, + waitForSteadyOrDone: true, + sessionId: sessionId, + }; + + return conn.getDB("admin").runCommand(cmd); +} + +(() => { + jsTestLog( + "Test that recovering a migration coordination ensures a delayed _recvChunkStart does not \ + cause the recipient to re-insert a range deletion task"); + + const [collName, ns] = getNewNs(dbName); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}})); + + // Insert documents into both chunks on shard0. + let testColl = st.s.getDB(dbName).getCollection(collName); + for (let i = 0; i < 100; i++) { + testColl.insert({x: i}); + } + + let donorPrimary = st.rs0.getPrimary(); + let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3'); + + // Move chunk [50, inf) to shard1. + const awaitShell = moveChunkParallel(ns, st.shard1.shardName); + + failpoint.wait(); + + // Get the migration doc from the donor. + let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators; + let migrationDoc = migrationDocColl.findOne(); + jsTestLog("migration doc: " + tojson(migrationDoc)); + + // Step down current primary. + assert.commandWorked(donorPrimary.adminCommand({replSetStepDown: 60, force: 1})); + + failpoint.off(); + awaitShell(); + + // Recovery should have occurred. Get new primary and verify migration doc is deleted. + donorPrimary = st.rs0.getPrimary(); + migrationDocColl = donorPrimary.getDB("config").migrationCoordinators; + + assert.soon(() => { + return migrationDocColl.find().itcount() == 0; + }); + + // Simulate that the recipient received a delayed _recvChunkStart message by sending one + // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld + // without inserting a new range deletion task.Since the business logic of + //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the + // result of the _recvChunkStart. + let recipientPrimary = st.rs1.getPrimary(); + assert.commandWorked(sendRecvChunkStart(recipientPrimary, + ns, + migrationDoc._id, + migrationDoc.migrationSessionId, + migrationDoc.lsid, + st.shard0, + st.shard1)); + + assert.soon(() => { + let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId); + jsTestLog("recvChunkStatus: " + tojson(result)); + + return result.state === "fail" && + result.errmsg.startsWith("migrate failed: TransactionTooOld:"); + }); + + // Verify deletion task doesn't exist on recipient. + assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0); +})(); + +(() => { + jsTestLog( + "Test that completing a migration coordination at the end of moveChunk ensures a delayed \ + _recvChunkStart does not cause the recipient to re - insert a range deletion task"); + + const [collName, ns] = getNewNs(dbName); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}})); + + // Insert documents into both chunks on shard0. + let testColl = st.s.getDB(dbName).getCollection(collName); + for (let i = 0; i < 100; i++) { + testColl.insert({x: i}); + } + + let donorPrimary = st.rs0.getPrimary(); + let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3'); + + // Move chunk [50, inf) to shard1. + const awaitShell = moveChunkParallel(ns, st.shard1.shardName); + + failpoint.wait(); + + // Get the migration doc from the donor. + let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators; + let migrationDoc = migrationDocColl.findOne(); + jsTestLog("migration doc: " + tojson(migrationDoc)); + + failpoint.off(); + awaitShell(); + + // Simulate that the recipient received a delayed _recvChunkStart message by sending one + // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld + // without inserting a new range deletion task.Since the business logic of + //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the + // result of the _recvChunkStart. + let recipientPrimary = st.rs1.getPrimary(); + assert.commandWorked(sendRecvChunkStart(recipientPrimary, + ns, + migrationDoc._id, + migrationDoc.migrationSessionId, + migrationDoc.lsid, + st.shard0, + st.shard1)); + + assert.soon(() => { + let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId); + jsTestLog("recvChunkStatus: " + tojson(result)); + + return result.state === "fail" && + result.errmsg.startsWith("migrate failed: TransactionTooOld:"); + }); + + // Verify deletion task doesn't exist on recipient. + assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0); +})(); + +st.stop(); +})(); diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js index 0b5b6a99b1c..98cd13bf69e 100644 --- a/jstests/sharding/migration_ignore_interrupts_3.js +++ b/jstests/sharding/migration_ignore_interrupts_3.js @@ -4,6 +4,9 @@ // // Note: don't use coll1 in this test after a coll1 migration is interrupted -- the distlock isn't // released promptly when interrupted. +// TODO(SERVER-46230): The requires_fcv_44 tag can be removed when the disableResumableRangeDeleter +// option is no longer needed. +// @tags: [requires_fcv_44] load('./jstests/libs/chunk_manipulation_util.js'); @@ -12,7 +15,9 @@ load('./jstests/libs/chunk_manipulation_util.js'); var staticMongod = MongoRunner.runMongod({}); // For startParallelOps. -var st = new ShardingTest({shards: 3}); +// TODO(SERVER-46230): Update test to run with resumable range deleter enabled. +var st = new ShardingTest( + {shards: 3, shardOptions: {setParameter: {"disableResumableRangeDeleter": true}}}); var mongos = st.s0, admin = mongos.getDB('admin'), dbName = "testDB", ns1 = dbName + ".foo", ns2 = dbName + ".bar", coll1 = mongos.getCollection(ns1), coll2 = mongos.getCollection(ns2), diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js index 7554b2ec3ae..2fdb223daca 100644 --- a/jstests/sharding/migration_ignore_interrupts_4.js +++ b/jstests/sharding/migration_ignore_interrupts_4.js @@ -4,6 +4,9 @@ // // Note: don't use coll1 in this test after a coll1 migration is interrupted -- the distlock isn't // released promptly when interrupted. +// TODO(SERVER-46230): The requires_fcv_44 tag can be removed when the disableResumableRangeDeleter +// option is no longer needed. +// @tags: [requires_fcv_44] load('./jstests/libs/chunk_manipulation_util.js'); @@ -12,7 +15,9 @@ load('./jstests/libs/chunk_manipulation_util.js'); var staticMongod = MongoRunner.runMongod({}); // For startParallelOps. -var st = new ShardingTest({shards: 3}); +// TODO(SERVER-46230): Update test to run with resumable range deleter enabled. +var st = new ShardingTest( + {shards: 3, shardOptions: {setParameter: {"disableResumableRangeDeleter": true}}}); var mongos = st.s0, admin = mongos.getDB('admin'), dbName = "testDB", ns1 = dbName + ".foo", ns2 = dbName + ".bar", coll1 = mongos.getCollection(ns1), coll2 = mongos.getCollection(ns2), diff --git a/jstests/sharding/move_jumbo_chunk.js b/jstests/sharding/move_jumbo_chunk.js index e9b25af7971..22e413d730f 100644 --- a/jstests/sharding/move_jumbo_chunk.js +++ b/jstests/sharding/move_jumbo_chunk.js @@ -9,7 +9,13 @@ (function() { 'use strict'; -let st = new ShardingTest({shards: 2, mongos: 1, other: {chunkSize: 1}}); +// TODO(SERVER-46230): Update test to run with resumable range deleter enabled. +let st = new ShardingTest({ + shards: 2, + mongos: 1, + other: {chunkSize: 1}, + shardOptions: {setParameter: {"disableResumableRangeDeleter": true}} +}); let kDbName = "test"; assert.commandWorked(st.s.adminCommand({enablesharding: kDbName})); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index cb4ebb53a29..00e17e8fe3a 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -35,6 +35,7 @@ namespace mongo { class BSONObj; +class MigrationSessionId; class OperationContext; class Status; class Timestamp; @@ -161,6 +162,12 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) = 0; + /** + * Returns the migration session id associated with this cloner, so stale sessions can be + * disambiguated. + */ + virtual const MigrationSessionId& getSessionId() const = 0; + protected: MigrationChunkClonerSource(); }; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index d5f56e196ed..8e34e2033f3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -119,16 +119,12 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) override; - // Legacy cloner specific functionality - - /** - * Returns the migration session id associated with this cloner, so stale sessions can be - * disambiguated. - */ - const MigrationSessionId& getSessionId() const { + const MigrationSessionId& getSessionId() const override { return _sessionId; } + // Legacy cloner specific functionality + /** * Returns the rollback ID recorded at the beginning of session migration. If the underlying * SessionCatalogMigrationSource does not exist, that means this node is running as a standalone diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 597ca085a45..aaacbc145c7 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -52,6 +52,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision); namespace migrationutil { MigrationCoordinator::MigrationCoordinator(UUID migrationId, + MigrationSessionId sessionId, + LogicalSessionId lsid, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -60,6 +62,8 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId, ChunkVersion preMigrationChunkVersion, bool waitForDelete) : _migrationInfo(migrationId, + std::move(sessionId), + std::move(lsid), std::move(collectionNamespace), collectionUuid, std::move(donorShard), @@ -108,6 +112,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat << " to self and to recipient"; boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none; + switch (*_decision) { case Decision::kAborted: _abortMigrationOnDonorAndRecipient(opCtx); @@ -118,7 +123,9 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet(); break; } + forgetMigration(opCtx); + return cleanupCompleteFuture; } @@ -129,6 +136,11 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( LOG(0) << _logPrefix() << "Making commit decision durable"; migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId()); + LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId() + << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1}; + migrationutil::advanceTransactionOnRecipient( + opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + hangBeforeSendingCommitDecision.pauseWhileSet(); LOG(0) << _logPrefix() << "Deleting range deletion task on recipient"; @@ -156,6 +168,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* LOG(0) << _logPrefix() << "Making abort decision durable"; migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); + LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId() + << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1}; + migrationutil::advanceTransactionOnRecipient( + opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1}); + hangBeforeSendingAbortDecision.pauseWhileSet(); LOG(0) << _logPrefix() << "Deleting range deletion task on donor"; diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index be7e9074bf0..9b2ccf7e672 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/logical_session_id.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_coordinator_document_gen.h" #include "mongo/s/catalog/type_chunk.h" @@ -45,6 +46,8 @@ public: enum class Decision { kAborted, kCommitted }; MigrationCoordinator(UUID migrationId, + MigrationSessionId sessionId, + LogicalSessionId lsid, ShardId donorShard, ShardId recipientShard, NamespaceString collectionNamespace, @@ -63,10 +66,10 @@ public: * Initializes persistent state required to ensure that orphaned ranges are properly handled, * even after failover, by doing the following: * - * 1) Inserts a document into the local config.migrationCoordinators with the recipientId and - * waits for majority writeConcern. 2) Inserts a document into the local config.rangeDeletions - * with the collectionUUID, range to delete, and "pending: true" and waits for majority - * writeConcern. + * 1) Inserts a document into the local config.migrationCoordinators with the lsid and + * recipientId and waits for majority writeConcern. 2) Inserts a document into the local + * config.rangeDeletions with the collectionUUID, range to delete, and "pending: true" and waits + * for majority writeConcern. */ void startMigration(OperationContext* opCtx); diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl index c1001c0c748..5308a4545ae 100644 --- a/src/mongo/db/s/migration_coordinator_document.idl +++ b/src/mongo/db/s/migration_coordinator_document.idl @@ -31,12 +31,15 @@ global: cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/s/migration_session_id.h" imports: - "mongo/idl/basic_types.idl" - "mongo/s/sharding_types.idl" - "mongo/s/chunk_range.idl" - "mongo/s/chunk_version.idl" + - "mongo/db/logical_session_id.idl" enums: Decision: @@ -46,6 +49,16 @@ enums: kCommitted: "committed" kAborted: "aborted" +types: + MigrationSessionId: + bson_serialization_type: string + description: "The migration session id is the legacy unique identifier for a particular + moveChunk command and is exchanged as part of all communication between the + donor and recipient shards." + cpp_type: "mongo::MigrationSessionId" + deserializer: "mongo::MigrationSessionId::fromString" + serializer: "mongo::MigrationSessionId::toString" + structs: migrationCoordinatorDocument: description: "Represents an in-progress migration on the migration donor." @@ -56,6 +69,12 @@ structs: type: uuid description: "A unique identifier for the migration." cpp_name: id + migrationSessionId: + type: MigrationSessionId + description: "A legacy unique identifier for the migration session." + lsid: + type: LogicalSessionId + description: "The sessionId to use to communicate with the recipient" nss: type: namespacestring description: "The namespace of the collection that the chunk belongs to." diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8124f36a16f..48edd0d463c 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -218,6 +218,7 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6); MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient); MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation); +MONGO_FAIL_POINT_DEFINE(hangOnRecipientFailure); } // namespace @@ -798,6 +799,8 @@ void MigrationDestinationManager::_migrateThread() { _migrateDriver(opCtx); } } catch (...) { + log() << "In catch handler"; + hangOnRecipientFailure.pauseWhileSet(); _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h index 923427822de..e3ad9deebdc 100644 --- a/src/mongo/db/s/migration_session_id.h +++ b/src/mongo/db/s/migration_session_id.h @@ -44,10 +44,12 @@ class StatusWith; /** * Encapsulates the logic for generating, parsing and comparing migration sessions. The migration * session id is a unique identifier for a particular moveChunk command and is exchanged as part of - * all communication between the source and donor shards. + * all communication between the donor and recipient shards. */ class MigrationSessionId { public: + MigrationSessionId() = default; + /** * Constructs a new migration session identifier with the following format: * DonorId_RecipientId_UniqueIdentifier @@ -78,6 +80,11 @@ public: std::string toString() const; + static MigrationSessionId fromString(StringData sessionId) { + MigrationSessionId id(sessionId.toString()); + return id; + } + private: explicit MigrationSessionId(std::string sessionId); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 1e7319a92e9..505622e074d 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -300,6 +300,8 @@ Status MigrationSourceManager::startClone() { if (_enableResumableRangeDeleter) { _coordinator = std::make_unique<migrationutil::MigrationCoordinator>( migrationId, + _cloneDriver->getSessionId(), + _lsid, _args.getFromShardId(), _args.getToShardId(), getNss(), @@ -765,8 +767,6 @@ void MigrationSourceManager::_cleanup() { auto newOpCtx = newOpCtxPtr.get(); _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx); } - - LogicalSessionCache::get(_opCtx)->endSessions({_lsid}); } _state = kDone; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 770dbdd0ea9..c7eb2183d85 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -83,17 +83,21 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::kNoTimeout); template <typename Cmd> -void sendToRecipient(OperationContext* opCtx, const ShardId& recipientId, const Cmd& cmd) { +void sendToRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const Cmd& cmd, + const BSONObj& passthroughFields = {}) { auto recipientShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, recipientId)); - LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmd.toBSON({})); + auto cmdBSON = cmd.toBSON(passthroughFields); + LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmdBSON); auto response = recipientShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "config", - cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)), + cmd.getDbName().toString(), + cmdBSON, Shard::RetryPolicy::kIdempotent); uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response)); @@ -416,7 +420,10 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, false /*multi*/); deleteOp.setDeletes({query}); - sendToRecipient(opCtx, recipientId, deleteOp); + sendToRecipient(opCtx, + recipientId, + deleteOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); } void deleteRangeDeletionTaskLocally(OperationContext* opCtx, @@ -444,7 +451,30 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, updateEntry.setUpsert(false); updateOp.setUpdates({updateEntry}); - sendToRecipient(opCtx, recipientId, updateOp); + sendToRecipient(opCtx, + recipientId, + updateOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); +} + +void advanceTransactionOnRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { + write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace); + auto queryFilter = BSON("_id" + << "migrationCoordinatorStats"); + auto updateModification = write_ops::UpdateModification(BSON("$inc" << BSON("count" << 1))); + + write_ops::UpdateOpEntry updateEntry(queryFilter, updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(true); + updateOp.setUpdates({updateEntry}); + + auto passthroughFields = + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid" + << lsid.toBSON() << "txnNumber" << txnNumber); + sendToRecipient(opCtx, recipientId, updateOp, passthroughFields); } void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) { @@ -610,7 +640,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { // Wait for the latest OpTime to be majority committed to ensure any decision that is // read is on the true branch of history. // Note (Esha): I don't think this is strictly required for correctness, but it is - // is difficult to reason about, and being pessimistic by waiting for the decision to be + // difficult to reason about, and being pessimistic by waiting for the decision to be // majority committed does not cost much, since stepup should be rare. It *is* required // that this node ensure a decision that it itself recovers is majority committed. For // example, it is possible that this node is a stale primary, and the true primary has @@ -647,6 +677,8 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { // Create a MigrationCoordinator to complete the coordination. MigrationCoordinator coordinator(doc.getId(), + doc.getMigrationSessionId(), + doc.getLsid(), doc.getDonorShardId(), doc.getRecipientShardId(), doc.getNss(), diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 1b47486159f..951b55e305e 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -161,6 +161,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, const UUID& migrationId); /** + * Advances the optime for the current transaction by performing a write operation as a retryable + * write. This is to prevent a write of the deletion task once the decision has been recorded. + */ +void advanceTransactionOnRecipient(OperationContext* opCtx, + const ShardId& recipientId, + const LogicalSessionId& lsid, + TxnNumber txnNumber); + +/** * Removes the 'pending' flag from the range deletion task document with the specified id from * config.rangeDeletions and waits for majority write concern. This marks the range as ready for * deletion. |