summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2020-02-19 18:45:49 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-20 14:19:15 +0000
commit742fa16dc379a281ad3c10e8c196d5ab2de9f70e (patch)
tree9e4c4700e921ca2cab58e9909ec764667f6e6ce9
parent32f47846d78a4fdae9564b7ebb442d53e737d845 (diff)
downloadmongo-742fa16dc379a281ad3c10e8c196d5ab2de9f70e.tar.gz
SERVER-45744 Migration coordinator recovery task should bump the txnNumber on the session used to send _recvChunkStart
create mode 100644 jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
-rw-r--r--jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js195
-rw-r--r--jstests/sharding/migration_ignore_interrupts_3.js7
-rw-r--r--jstests/sharding/migration_ignore_interrupts_4.js7
-rw-r--r--jstests/sharding/move_jumbo_chunk.js8
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h7
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h10
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp17
-rw-r--r--src/mongo/db/s/migration_coordinator.h11
-rw-r--r--src/mongo/db/s/migration_coordinator_document.idl19
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp3
-rw-r--r--src/mongo/db/s/migration_session_id.h9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp4
-rw-r--r--src/mongo/db/s/migration_util.cpp46
-rw-r--r--src/mongo/db/s/migration_util.h9
14 files changed, 328 insertions, 24 deletions
diff --git a/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js b/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
new file mode 100644
index 00000000000..f4e1520ddbb
--- /dev/null
+++ b/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
@@ -0,0 +1,195 @@
+/*
+ * Tests that the deletion task is not written if the donor recovers after the decision is
+ * recorded.
+ *
+ * @tags: [requires_fcv_44]
+ */
+
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/libs/parallel_shell_helpers.js');
+
+const dbName = "test";
+
+// Create 2 shards with 3 replicas each.
+let st = new ShardingTest({shards: {rs0: {nodes: 3}, rs1: {nodes: 3}}});
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));
+
+function getNewNs(dbName) {
+ if (typeof getNewNs.counter == 'undefined') {
+ getNewNs.counter = 0;
+ }
+ getNewNs.counter++;
+ const collName = "ns" + getNewNs.counter;
+ return [collName, dbName + "." + collName];
+}
+
+function moveChunkParallel(ns, toShard) {
+ return startParallelShell(funWithArgs(function(ns, toShard) {
+ db.adminCommand({moveChunk: ns, find: {x: 50}, to: toShard});
+ }, ns, toShard), st.s.port);
+}
+
+function sendRecvChunkStart(conn, ns, id, sessionId, lsid, from, to) {
+ let cmd = {
+ _recvChunkStart: ns,
+ uuid: id,
+ lsid: lsid,
+ txnNumber: NumberLong(0),
+ sessionId: sessionId,
+ from: from.host,
+ fromShardName: from.shardName,
+ toShardName: to.shardName,
+ min: {x: 50.0},
+ max: {x: MaxKey},
+ shardKeyPattern: {x: 1.0}
+ };
+
+ return conn.getDB("admin").runCommand(cmd);
+}
+
+function sendRecvChunkStatus(conn, ns, sessionId) {
+ let cmd = {
+ _recvChunkStatus: ns,
+ waitForSteadyOrDone: true,
+ sessionId: sessionId,
+ };
+
+ return conn.getDB("admin").runCommand(cmd);
+}
+
+(() => {
+ jsTestLog(
+ "Test that recovering a migration coordination ensures a delayed _recvChunkStart does not \
+ cause the recipient to re-insert a range deletion task");
+
+ const [collName, ns] = getNewNs(dbName);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}}));
+
+ // Insert documents into both chunks on shard0.
+ let testColl = st.s.getDB(dbName).getCollection(collName);
+ for (let i = 0; i < 100; i++) {
+ testColl.insert({x: i});
+ }
+
+ let donorPrimary = st.rs0.getPrimary();
+ let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3');
+
+ // Move chunk [50, inf) to shard1.
+ const awaitShell = moveChunkParallel(ns, st.shard1.shardName);
+
+ failpoint.wait();
+
+ // Get the migration doc from the donor.
+ let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;
+ let migrationDoc = migrationDocColl.findOne();
+ jsTestLog("migration doc: " + tojson(migrationDoc));
+
+ // Step down current primary.
+ assert.commandWorked(donorPrimary.adminCommand({replSetStepDown: 60, force: 1}));
+
+ failpoint.off();
+ awaitShell();
+
+ // Recovery should have occurred. Get new primary and verify migration doc is deleted.
+ donorPrimary = st.rs0.getPrimary();
+ migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;
+
+ assert.soon(() => {
+ return migrationDocColl.find().itcount() == 0;
+ });
+
+ // Simulate that the recipient received a delayed _recvChunkStart message by sending one
+ // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld
+ // without inserting a new range deletion task.Since the business logic of
+ //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the
+ // result of the _recvChunkStart.
+ let recipientPrimary = st.rs1.getPrimary();
+ assert.commandWorked(sendRecvChunkStart(recipientPrimary,
+ ns,
+ migrationDoc._id,
+ migrationDoc.migrationSessionId,
+ migrationDoc.lsid,
+ st.shard0,
+ st.shard1));
+
+ assert.soon(() => {
+ let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId);
+ jsTestLog("recvChunkStatus: " + tojson(result));
+
+ return result.state === "fail" &&
+ result.errmsg.startsWith("migrate failed: TransactionTooOld:");
+ });
+
+ // Verify deletion task doesn't exist on recipient.
+ assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0);
+})();
+
+(() => {
+ jsTestLog(
+ "Test that completing a migration coordination at the end of moveChunk ensures a delayed \
+ _recvChunkStart does not cause the recipient to re - insert a range deletion task");
+
+ const [collName, ns] = getNewNs(dbName);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}}));
+
+ // Insert documents into both chunks on shard0.
+ let testColl = st.s.getDB(dbName).getCollection(collName);
+ for (let i = 0; i < 100; i++) {
+ testColl.insert({x: i});
+ }
+
+ let donorPrimary = st.rs0.getPrimary();
+ let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3');
+
+ // Move chunk [50, inf) to shard1.
+ const awaitShell = moveChunkParallel(ns, st.shard1.shardName);
+
+ failpoint.wait();
+
+ // Get the migration doc from the donor.
+ let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;
+ let migrationDoc = migrationDocColl.findOne();
+ jsTestLog("migration doc: " + tojson(migrationDoc));
+
+ failpoint.off();
+ awaitShell();
+
+ // Simulate that the recipient received a delayed _recvChunkStart message by sending one
+ // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld
+ // without inserting a new range deletion task.Since the business logic of
+ //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the
+ // result of the _recvChunkStart.
+ let recipientPrimary = st.rs1.getPrimary();
+ assert.commandWorked(sendRecvChunkStart(recipientPrimary,
+ ns,
+ migrationDoc._id,
+ migrationDoc.migrationSessionId,
+ migrationDoc.lsid,
+ st.shard0,
+ st.shard1));
+
+ assert.soon(() => {
+ let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId);
+ jsTestLog("recvChunkStatus: " + tojson(result));
+
+ return result.state === "fail" &&
+ result.errmsg.startsWith("migrate failed: TransactionTooOld:");
+ });
+
+ // Verify deletion task doesn't exist on recipient.
+ assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0);
+})();
+
+st.stop();
+})();
diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js
index 0b5b6a99b1c..98cd13bf69e 100644
--- a/jstests/sharding/migration_ignore_interrupts_3.js
+++ b/jstests/sharding/migration_ignore_interrupts_3.js
@@ -4,6 +4,9 @@
//
// Note: don't use coll1 in this test after a coll1 migration is interrupted -- the distlock isn't
// released promptly when interrupted.
+// TODO(SERVER-46230): The requires_fcv_44 tag can be removed when the disableResumableRangeDeleter
+// option is no longer needed.
+// @tags: [requires_fcv_44]
load('./jstests/libs/chunk_manipulation_util.js');
@@ -12,7 +15,9 @@ load('./jstests/libs/chunk_manipulation_util.js');
var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
-var st = new ShardingTest({shards: 3});
+// TODO(SERVER-46230): Update test to run with resumable range deleter enabled.
+var st = new ShardingTest(
+ {shards: 3, shardOptions: {setParameter: {"disableResumableRangeDeleter": true}}});
var mongos = st.s0, admin = mongos.getDB('admin'), dbName = "testDB", ns1 = dbName + ".foo",
ns2 = dbName + ".bar", coll1 = mongos.getCollection(ns1), coll2 = mongos.getCollection(ns2),
diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js
index 7554b2ec3ae..2fdb223daca 100644
--- a/jstests/sharding/migration_ignore_interrupts_4.js
+++ b/jstests/sharding/migration_ignore_interrupts_4.js
@@ -4,6 +4,9 @@
//
// Note: don't use coll1 in this test after a coll1 migration is interrupted -- the distlock isn't
// released promptly when interrupted.
+// TODO(SERVER-46230): The requires_fcv_44 tag can be removed when the disableResumableRangeDeleter
+// option is no longer needed.
+// @tags: [requires_fcv_44]
load('./jstests/libs/chunk_manipulation_util.js');
@@ -12,7 +15,9 @@ load('./jstests/libs/chunk_manipulation_util.js');
var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
-var st = new ShardingTest({shards: 3});
+// TODO(SERVER-46230): Update test to run with resumable range deleter enabled.
+var st = new ShardingTest(
+ {shards: 3, shardOptions: {setParameter: {"disableResumableRangeDeleter": true}}});
var mongos = st.s0, admin = mongos.getDB('admin'), dbName = "testDB", ns1 = dbName + ".foo",
ns2 = dbName + ".bar", coll1 = mongos.getCollection(ns1), coll2 = mongos.getCollection(ns2),
diff --git a/jstests/sharding/move_jumbo_chunk.js b/jstests/sharding/move_jumbo_chunk.js
index e9b25af7971..22e413d730f 100644
--- a/jstests/sharding/move_jumbo_chunk.js
+++ b/jstests/sharding/move_jumbo_chunk.js
@@ -9,7 +9,13 @@
(function() {
'use strict';
-let st = new ShardingTest({shards: 2, mongos: 1, other: {chunkSize: 1}});
+// TODO(SERVER-46230): Update test to run with resumable range deleter enabled.
+let st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ other: {chunkSize: 1},
+ shardOptions: {setParameter: {"disableResumableRangeDeleter": true}}
+});
let kDbName = "test";
assert.commandWorked(st.s.adminCommand({enablesharding: kDbName}));
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index cb4ebb53a29..00e17e8fe3a 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -35,6 +35,7 @@
namespace mongo {
class BSONObj;
+class MigrationSessionId;
class OperationContext;
class Status;
class Timestamp;
@@ -161,6 +162,12 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) = 0;
+ /**
+ * Returns the migration session id associated with this cloner, so stale sessions can be
+ * disambiguated.
+ */
+ virtual const MigrationSessionId& getSessionId() const = 0;
+
protected:
MigrationChunkClonerSource();
};
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index d5f56e196ed..8e34e2033f3 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -119,16 +119,12 @@ public:
const repl::OpTime& opTime,
const repl::OpTime& preImageOpTime) override;
- // Legacy cloner specific functionality
-
- /**
- * Returns the migration session id associated with this cloner, so stale sessions can be
- * disambiguated.
- */
- const MigrationSessionId& getSessionId() const {
+ const MigrationSessionId& getSessionId() const override {
return _sessionId;
}
+ // Legacy cloner specific functionality
+
/**
* Returns the rollback ID recorded at the beginning of session migration. If the underlying
* SessionCatalogMigrationSource does not exist, that means this node is running as a standalone
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 597ca085a45..aaacbc145c7 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -52,6 +52,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision);
namespace migrationutil {
MigrationCoordinator::MigrationCoordinator(UUID migrationId,
+ MigrationSessionId sessionId,
+ LogicalSessionId lsid,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -60,6 +62,8 @@ MigrationCoordinator::MigrationCoordinator(UUID migrationId,
ChunkVersion preMigrationChunkVersion,
bool waitForDelete)
: _migrationInfo(migrationId,
+ std::move(sessionId),
+ std::move(lsid),
std::move(collectionNamespace),
collectionUuid,
std::move(donorShard),
@@ -108,6 +112,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
<< " to self and to recipient";
boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none;
+
switch (*_decision) {
case Decision::kAborted:
_abortMigrationOnDonorAndRecipient(opCtx);
@@ -118,7 +123,9 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet();
break;
}
+
forgetMigration(opCtx);
+
return cleanupCompleteFuture;
}
@@ -129,6 +136,11 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
LOG(0) << _logPrefix() << "Making commit decision durable";
migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId());
+ LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId()
+ << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1};
+ migrationutil::advanceTransactionOnRecipient(
+ opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+
hangBeforeSendingCommitDecision.pauseWhileSet();
LOG(0) << _logPrefix() << "Deleting range deletion task on recipient";
@@ -156,6 +168,11 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
LOG(0) << _logPrefix() << "Making abort decision durable";
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
+ LOG(0) << _logPrefix() << "Bumping transaction for " << _migrationInfo.getRecipientShardId()
+ << " lsid: " << _migrationInfo.getLsid().toBSON() << " txn: " << TxnNumber{1};
+ migrationutil::advanceTransactionOnRecipient(
+ opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), TxnNumber{1});
+
hangBeforeSendingAbortDecision.pauseWhileSet();
LOG(0) << _logPrefix() << "Deleting range deletion task on donor";
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index be7e9074bf0..9b2ccf7e672 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_coordinator_document_gen.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -45,6 +46,8 @@ public:
enum class Decision { kAborted, kCommitted };
MigrationCoordinator(UUID migrationId,
+ MigrationSessionId sessionId,
+ LogicalSessionId lsid,
ShardId donorShard,
ShardId recipientShard,
NamespaceString collectionNamespace,
@@ -63,10 +66,10 @@ public:
* Initializes persistent state required to ensure that orphaned ranges are properly handled,
* even after failover, by doing the following:
*
- * 1) Inserts a document into the local config.migrationCoordinators with the recipientId and
- * waits for majority writeConcern. 2) Inserts a document into the local config.rangeDeletions
- * with the collectionUUID, range to delete, and "pending: true" and waits for majority
- * writeConcern.
+ * 1) Inserts a document into the local config.migrationCoordinators with the lsid and
+ * recipientId and waits for majority writeConcern. 2) Inserts a document into the local
+ * config.rangeDeletions with the collectionUUID, range to delete, and "pending: true" and waits
+ * for majority writeConcern.
*/
void startMigration(OperationContext* opCtx);
diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl
index c1001c0c748..5308a4545ae 100644
--- a/src/mongo/db/s/migration_coordinator_document.idl
+++ b/src/mongo/db/s/migration_coordinator_document.idl
@@ -31,12 +31,15 @@
global:
cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/s/migration_session_id.h"
imports:
- "mongo/idl/basic_types.idl"
- "mongo/s/sharding_types.idl"
- "mongo/s/chunk_range.idl"
- "mongo/s/chunk_version.idl"
+ - "mongo/db/logical_session_id.idl"
enums:
Decision:
@@ -46,6 +49,16 @@ enums:
kCommitted: "committed"
kAborted: "aborted"
+types:
+ MigrationSessionId:
+ bson_serialization_type: string
+ description: "The migration session id is the legacy unique identifier for a particular
+ moveChunk command and is exchanged as part of all communication between the
+ donor and recipient shards."
+ cpp_type: "mongo::MigrationSessionId"
+ deserializer: "mongo::MigrationSessionId::fromString"
+ serializer: "mongo::MigrationSessionId::toString"
+
structs:
migrationCoordinatorDocument:
description: "Represents an in-progress migration on the migration donor."
@@ -56,6 +69,12 @@ structs:
type: uuid
description: "A unique identifier for the migration."
cpp_name: id
+ migrationSessionId:
+ type: MigrationSessionId
+ description: "A legacy unique identifier for the migration session."
+ lsid:
+ type: LogicalSessionId
+ description: "The sessionId to use to communicate with the recipient"
nss:
type: namespacestring
description: "The namespace of the collection that the chunk belongs to."
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 8124f36a16f..48edd0d463c 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -218,6 +218,7 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6);
MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient);
MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation);
+MONGO_FAIL_POINT_DEFINE(hangOnRecipientFailure);
} // namespace
@@ -798,6 +799,8 @@ void MigrationDestinationManager::_migrateThread() {
_migrateDriver(opCtx);
}
} catch (...) {
+ log() << "In catch handler";
+ hangOnRecipientFailure.pauseWhileSet();
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
diff --git a/src/mongo/db/s/migration_session_id.h b/src/mongo/db/s/migration_session_id.h
index 923427822de..e3ad9deebdc 100644
--- a/src/mongo/db/s/migration_session_id.h
+++ b/src/mongo/db/s/migration_session_id.h
@@ -44,10 +44,12 @@ class StatusWith;
/**
* Encapsulates the logic for generating, parsing and comparing migration sessions. The migration
* session id is a unique identifier for a particular moveChunk command and is exchanged as part of
- * all communication between the source and donor shards.
+ * all communication between the donor and recipient shards.
*/
class MigrationSessionId {
public:
+ MigrationSessionId() = default;
+
/**
* Constructs a new migration session identifier with the following format:
* DonorId_RecipientId_UniqueIdentifier
@@ -78,6 +80,11 @@ public:
std::string toString() const;
+ static MigrationSessionId fromString(StringData sessionId) {
+ MigrationSessionId id(sessionId.toString());
+ return id;
+ }
+
private:
explicit MigrationSessionId(std::string sessionId);
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 1e7319a92e9..505622e074d 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -300,6 +300,8 @@ Status MigrationSourceManager::startClone() {
if (_enableResumableRangeDeleter) {
_coordinator = std::make_unique<migrationutil::MigrationCoordinator>(
migrationId,
+ _cloneDriver->getSessionId(),
+ _lsid,
_args.getFromShardId(),
_args.getToShardId(),
getNss(),
@@ -765,8 +767,6 @@ void MigrationSourceManager::_cleanup() {
auto newOpCtx = newOpCtxPtr.get();
_cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx);
}
-
- LogicalSessionCache::get(_opCtx)->endSessions({_lsid});
}
_state = kDone;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 770dbdd0ea9..c7eb2183d85 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -83,17 +83,21 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::kNoTimeout);
template <typename Cmd>
-void sendToRecipient(OperationContext* opCtx, const ShardId& recipientId, const Cmd& cmd) {
+void sendToRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const Cmd& cmd,
+ const BSONObj& passthroughFields = {}) {
auto recipientShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, recipientId));
- LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmd.toBSON({}));
+ auto cmdBSON = cmd.toBSON(passthroughFields);
+ LOGV2_DEBUG(22023, 1, "Sending request {cmd} to recipient.", "cmd"_attr = cmdBSON);
auto response = recipientShard->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "config",
- cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)),
+ cmd.getDbName().toString(),
+ cmdBSON,
Shard::RetryPolicy::kIdempotent);
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response));
@@ -416,7 +420,10 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
false /*multi*/);
deleteOp.setDeletes({query});
- sendToRecipient(opCtx, recipientId, deleteOp);
+ sendToRecipient(opCtx,
+ recipientId,
+ deleteOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
}
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
@@ -444,7 +451,30 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
updateEntry.setUpsert(false);
updateOp.setUpdates({updateEntry});
- sendToRecipient(opCtx, recipientId, updateOp);
+ sendToRecipient(opCtx,
+ recipientId,
+ updateOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+}
+
+void advanceTransactionOnRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
+ write_ops::Update updateOp(NamespaceString::kServerConfigurationNamespace);
+ auto queryFilter = BSON("_id"
+ << "migrationCoordinatorStats");
+ auto updateModification = write_ops::UpdateModification(BSON("$inc" << BSON("count" << 1)));
+
+ write_ops::UpdateOpEntry updateEntry(queryFilter, updateModification);
+ updateEntry.setMulti(false);
+ updateEntry.setUpsert(true);
+ updateOp.setUpdates({updateEntry});
+
+ auto passthroughFields =
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority << "lsid"
+ << lsid.toBSON() << "txnNumber" << txnNumber);
+ sendToRecipient(opCtx, recipientId, updateOp, passthroughFields);
}
void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) {
@@ -610,7 +640,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
// Wait for the latest OpTime to be majority committed to ensure any decision that is
// read is on the true branch of history.
// Note (Esha): I don't think this is strictly required for correctness, but it is
- // is difficult to reason about, and being pessimistic by waiting for the decision to be
+ // difficult to reason about, and being pessimistic by waiting for the decision to be
// majority committed does not cost much, since stepup should be rare. It *is* required
// that this node ensure a decision that it itself recovers is majority committed. For
// example, it is possible that this node is a stale primary, and the true primary has
@@ -647,6 +677,8 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
// Create a MigrationCoordinator to complete the coordination.
MigrationCoordinator coordinator(doc.getId(),
+ doc.getMigrationSessionId(),
+ doc.getLsid(),
doc.getDonorShardId(),
doc.getRecipientShardId(),
doc.getNss(),
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 1b47486159f..951b55e305e 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -161,6 +161,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
const UUID& migrationId);
/**
+ * Advances the optime for the current transaction by performing a write operation as a retryable
+ * write. This is to prevent a write of the deletion task once the decision has been recorded.
+ */
+void advanceTransactionOnRecipient(OperationContext* opCtx,
+ const ShardId& recipientId,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
+
+/**
* Removes the 'pending' flag from the range deletion task document with the specified id from
* config.rangeDeletions and waits for majority write concern. This marks the range as ready for
* deletion.