summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-03-06 20:53:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-06 22:39:27 +0000
commit94b782312489cfeb017ab0f1d9b5ff8c0aa9e058 (patch)
treeab0f877c5b2d578751011f3582ac4bdbf5771233 /src/mongo/db/s/resharding
parent75f6b70040f9ccc46888a91b3937079f83016831 (diff)
downloadmongo-94b782312489cfeb017ab0f1d9b5ff8c0aa9e058.tar.gz
SERVER-63441 Handle retryable internal transactions with multiple oplog entries when migrating sessions during resharding
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp65
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp13
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp71
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp414
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_session_application.cpp24
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp63
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp19
9 files changed, 534 insertions, 148 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index e000d42f5ae..8be9fbb4669 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -307,13 +307,16 @@ void updateSessionRecord(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(txnParticipant, "Must be called with session checked out");
+ const auto sessionId = *opCtx->getLogicalSessionId();
+ const auto txnNumber = *opCtx->getTxnNumber();
+
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag);
oplogEntry.setObject2(std::move(o2Field));
oplogEntry.setNss({});
- oplogEntry.setSessionId(opCtx->getLogicalSessionId());
- oplogEntry.setTxnNumber(opCtx->getTxnNumber());
+ oplogEntry.setSessionId(sessionId);
+ oplogEntry.setTxnNumber(txnNumber);
oplogEntry.setStatementIds(stmtIds);
oplogEntry.setPreImageOpTime(std::move(preImageOpTime));
oplogEntry.setPostImageOpTime(std::move(postImageOpTime));
@@ -321,37 +324,33 @@ void updateSessionRecord(OperationContext* opCtx,
oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
oplogEntry.setFromMigrate(true);
- writeConflictRetry(opCtx,
- "resharding::data_copy::updateSessionRecord",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
-
- WriteUnitOfWork wuow(opCtx);
- repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
-
- uassert(4989901,
- str::stream() << "Failed to create new oplog entry: "
- << redact(oplogEntry.toBSON()),
- !opTime.isNull());
-
- // Use the same wallTime as the oplog entry since SessionUpdateTracker
- // looks at the oplog entry wallTime when replicating.
- SessionTxnRecord sessionTxnRecord(*oplogEntry.getSessionId(),
- *oplogEntry.getTxnNumber(),
- std::move(opTime),
- oplogEntry.getWallClockTime());
-
- if (isInternalSessionForRetryableWrite(*oplogEntry.getSessionId())) {
- sessionTxnRecord.setParentSessionId(
- *getParentSessionId(*oplogEntry.getSessionId()));
- }
-
- txnParticipant.onRetryableWriteCloningCompleted(
- opCtx, stmtIds, sessionTxnRecord);
-
- wuow.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "resharding::data_copy::updateSessionRecord",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+
+ WriteUnitOfWork wuow(opCtx);
+ repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
+
+ uassert(4989901,
+ str::stream() << "Failed to create new oplog entry: "
+ << redact(oplogEntry.toBSON()),
+ !opTime.isNull());
+
+ // Use the same wallTime as the oplog entry since SessionUpdateTracker
+ // looks at the oplog entry wallTime when replicating.
+ SessionTxnRecord sessionTxnRecord(
+ sessionId, txnNumber, std::move(opTime), oplogEntry.getWallClockTime());
+ if (isInternalSessionForRetryableWrite(sessionId)) {
+ sessionTxnRecord.setParentSessionId(*getParentSessionId(sessionId));
+ }
+
+ txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord);
+
+ wuow.commit();
+ });
}
} // namespace mongo::resharding::data_copy
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index b65d5376689..452cef3ea9a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -78,8 +78,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch(
CancellationToken cancelToken,
CancelableOperationContextFactory factory) {
Timer latencyTimer;
- auto crudWriterVectors =
- _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps);
+ auto crudWriterVectors = _batchPreparer.makeCrudOpWriterVectors(
+ _currentBatchToApply, _currentDerivedOpsForCrudWriters);
CancellationSource errorSource(cancelToken);
@@ -99,7 +99,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch(
}
}
- auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply);
+ auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(
+ _currentBatchToApply, _currentDerivedOpsForSessionWriters);
batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size());
for (auto&& writer : sessionWriterVectors) {
@@ -152,7 +153,8 @@ SemiFuture<void> ReshardingOplogApplier::run(
"reshardingApplyOplogBatchTwice failpoint enabled, applying batch "
"a second time",
"batchSize"_attr = _currentBatchToApply.size());
- _currentDerivedOps.clear();
+ _currentDerivedOpsForCrudWriters.clear();
+ _currentDerivedOpsForSessionWriters.clear();
return _applyBatch(executor, cancelToken, factory);
}
return SemiFuture<void>();
@@ -240,7 +242,8 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext*
}
_currentBatchToApply.clear();
- _currentDerivedOps.clear();
+ _currentDerivedOpsForCrudWriters.clear();
+ _currentDerivedOpsForSessionWriters.clear();
}
NamespaceString ReshardingOplogApplier::ensureStashCollectionExists(
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index 0c3a8bc8306..f7835c2ee64 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -146,7 +146,8 @@ private:
OplogBatch _currentBatchToApply;
// Buffer for internally generated oplog entries that needs to be processed for this batch.
- std::list<repl::OplogEntry> _currentDerivedOps;
+ std::list<repl::OplogEntry> _currentDerivedOpsForCrudWriters;
+ std::list<repl::OplogEntry> _currentDerivedOpsForSessionWriters;
// The source of the oplog entries to be applied.
std::unique_ptr<ReshardingDonorOplogIteratorInterface> _oplogIter;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
index a6f73b01cec..bde8901926a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
@@ -50,8 +50,7 @@ namespace {
* Return true if we need to update config.transactions collection for this oplog entry.
*/
bool shouldUpdateTxnTable(const repl::OplogEntry& op) {
- if (op.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction ||
- op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
+ if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
return true;
}
@@ -64,8 +63,21 @@ bool shouldUpdateTxnTable(const repl::OplogEntry& op) {
}
if (op.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
- auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
- return !applyOpsInfo.getPrepare() && !applyOpsInfo.getPartialTxn();
+ // This applyOps oplog entry is guaranteed to correspond to a committed transaction since
+ // the resharding aggregation pipeline does not output applyOps oplog entries for aborted
+ // transactions (i.e. it only outputs the abortTransaction oplog entry).
+
+ if (isInternalSessionForRetryableWrite(*op.getSessionId())) {
+ // For a retryable internal transaction, we need to update the config.transactions
+ // collection upon writing the noop oplog entries for retryable operations contained
+ // within each applyOps oplog entry.
+ return true;
+ }
+
+ // The resharding aggregation pipeline also does not output the commitTransaction oplog
+ // entry so for a non-retryable transaction, we need to the update to the
+ // config.transactions collection upon seeing the final applyOps oplog entry.
+ return !op.isPartialTransaction();
}
return false;
@@ -116,6 +128,8 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
}
auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
+ // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with
+ // WouldChangeOwningShard sentinel noop entry.
uassert(
ErrorCodes::OplogOperationUnsupported,
str::stream() << "Commands within applyOps are not supported during resharding: "
@@ -148,7 +162,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
}
WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
- const OplogBatchToPrepare& batch) const {
+ const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const {
auto writerVectors = _makeEmptyWriterVectors();
struct SessionOpsList {
@@ -188,7 +202,52 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
} else if (op.isCommand()) {
throwIfUnsupportedCommandOp(op);
- if (shouldUpdateTxnTable(op)) {
+ if (!shouldUpdateTxnTable(op)) {
+ continue;
+ }
+
+ auto sessionId = *op.getSessionId();
+
+ if (isInternalSessionForRetryableWrite(sessionId) &&
+ op.getCommandType() == OplogEntry::CommandType::kApplyOps) {
+ // Derive retryable write CRUD oplog entries from this retryable internal
+ // transaction applyOps oplog entry.
+
+ auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
+ // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with
+ // WouldChangeOwningShard sentinel noop entry.
+ uassert(ErrorCodes::OplogOperationUnsupported,
+ str::stream()
+ << "Commands within applyOps are not supported during resharding: "
+ << redact(op.toBSONForLogging()),
+ applyOpsInfo.areOpsCrudOnly());
+
+ auto unrolledOp =
+ uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON()));
+ unrolledOp.setSessionId(*getParentSessionId(sessionId));
+ unrolledOp.setTxnNumber(*sessionId.getTxnNumber());
+
+ for (const auto& innerOp : applyOpsInfo.getOperations()) {
+ auto replOp = repl::ReplOperation::parse(
+ {"ReshardingOplogBatchPreparer::makeSessionOpWriterVectors innerOp"},
+ innerOp);
+ if (replOp.getStatementIds().empty()) {
+ // Skip this operation since it is not retryable.
+ continue;
+ }
+ unrolledOp.setDurableReplOperation(replOp);
+
+ // There isn't a direct way to convert from a MutableOplogEntry to a
+ // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get
+ // re-parsed into an OplogEntry.
+ auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON());
+ invariant(derivedOp.isCrudOpType());
+
+ // `&derivedOp` is guaranteed to remain stable while we append more derived
+ // oplog entries because `derivedOps` is a std::list.
+ updateSessionTracker(&derivedOp);
+ }
+ } else {
updateSessionTracker(&op);
}
} else {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
index f4f035ccdbd..27e7934b326 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
@@ -82,11 +82,13 @@ public:
* to the config.transactions record for a higher txnNumber will cause any updates in `batch`
* for lower txnNumbers to be elided.
*
- * The returned writer vectors refer to memory owned by `batch`. The caller must take care to
- * ensure `batch` outlives the writer vectors all being applied and must take care not to modify
- * `batch` until after the writer vectors have all been applied.
+ * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller
+ * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being
+ * applied and must take care not to modify `batch` or `derivedOps` until after the writer
+ * vectors have all been applied.
*/
- WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const;
+ WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch,
+ std::list<OplogEntry>& derivedOps) const;
static void throwIfUnsupportedCommandOp(const OplogEntry& op);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
index 07ae756eeae..c98104fcf3a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
@@ -31,6 +31,7 @@
#include <boost/optional/optional_io.hpp>
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
@@ -66,25 +67,42 @@ protected:
return {op.toBSON()};
}
- repl::OplogEntry makeApplyOps(BSONObj document,
- bool isPrepare,
- bool isPartial,
- boost::optional<LogicalSessionId> lsid,
- boost::optional<TxnNumber> txnNumber) {
-
- std::vector<mongo::BSONObj> operations;
- auto insertOp = repl::MutableOplogEntry::makeInsertOperation(
- NamespaceString("foo.bar"), UUID::gen(), document, document);
-
+ /**
+ * Returns an applyOps oplog entry containing insert operations for the given documents. If the
+ * session is an internal session for retryable writes, uses the "_id" of each document as its
+ * statement id.
+ */
+ repl::OplogEntry makeApplyOpsForInsert(const std::vector<BSONObj> documents,
+ boost::optional<LogicalSessionId> lsid = boost::none,
+ boost::optional<TxnNumber> txnNumber = boost::none,
+ boost::optional<bool> isPrepare = boost::none,
+ boost::optional<bool> isPartial = boost::none) {
BSONObjBuilder applyOpsBuilder;
- applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON()));
+
+ BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
+ for (const auto& document : documents) {
+ auto insertOp = repl::DurableReplOperation(repl::OpTypeEnum::kInsert, {}, document);
+ if (lsid && isInternalSessionForRetryableWrite(*lsid)) {
+ if (!document.hasField("_id")) {
+ continue;
+ }
+ auto id = document.getIntField("_id");
+ insertOp.setStatementIds({{id}});
+ }
+ opsArrayBuilder.append(insertOp.toBSON());
+ }
+ opsArrayBuilder.done();
if (isPrepare) {
- applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true);
+ invariant(lsid);
+ invariant(txnNumber);
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, *isPrepare);
}
if (isPartial) {
- applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true);
+ invariant(lsid);
+ invariant(txnNumber);
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, *isPartial);
}
repl::MutableOplogEntry op;
@@ -194,24 +212,13 @@ TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) {
// We use the "fromApplyOps" field in the document to distinguish between the regular oplog
// entries from the derived ones later on.
int numOps = 20;
+ std::vector<BSONObj> docsForApplyOps;
for (int i = 0; i < numOps; ++i) {
batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false)));
+ docsForApplyOps.push_back(BSON("_id" << i << "fromApplyOps" << true));
}
- BSONObjBuilder applyOpsBuilder;
- {
- BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
- for (int i = 0; i < numOps; ++i) {
- // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
- // deal with setting the 'o2' field.
- opsArrayBuilder.append(
- repl::DurableReplOperation(
- repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true))
- .toBSON());
- }
- }
-
- batch.emplace_back(makeCommandOp(applyOpsBuilder.done()));
+ batch.emplace_back(makeApplyOpsForInsert(docsForApplyOps));
std::list<repl::OplogEntry> derivedOps;
auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
@@ -254,12 +261,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) {
} else {
// We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
// deal with setting the 'o2' field.
- batch.emplace_back(makeCommandOp(BSON(
- "applyOps" << BSON_ARRAY(repl::DurableReplOperation(
- repl::OpTypeEnum::kInsert,
- {},
- BSON("_id" << 0 << "n" << i << "fromApplyOps" << true))
- .toBSON()))));
+ batch.emplace_back(
+ makeApplyOpsForInsert({BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)}));
}
}
@@ -290,8 +293,10 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid)
batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1}));
}
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(derivedOps.size(), 0U);
auto writer = getNonEmptyWriterVector(writerVectors);
ASSERT_EQ(writer.size(), numOps);
@@ -311,8 +316,10 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) {
batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i}));
}
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(derivedOps.size(), 0U);
auto writer = getNonEmptyWriterVector(writerVectors);
ASSERT_EQ(writer.size(), 1U);
@@ -330,7 +337,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFai
makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1}));
}
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
// Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid
@@ -357,107 +365,319 @@ TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) {
batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1)));
std::list<repl::OplogEntry> derivedOps;
- ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch),
+ ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch, derivedOps),
DBException,
ErrorCodes::OplogOperationUnsupported);
}
}
TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) {
- OplogBatch batch;
-
- int numOps = 5;
- for (int i = 0; i < numOps; ++i) {
- repl::MutableOplogEntry op;
- op.setOpType(repl::OpTypeEnum::kNoop);
- op.setObject({});
- op.setNss({});
- op.setOpTime({{}, {}});
- op.setWallClockTime({});
- batch.emplace_back(op.toBSON());
- }
+ auto runTest = [&](const boost::optional<LogicalSessionId>& lsid,
+ const boost::optional<TxnNumber>& txnNumber) {
+ OplogBatch batch;
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
+ int numOps = 5;
+ for (int i = 0; i < numOps; ++i) {
+ repl::MutableOplogEntry op;
+ op.setSessionId(lsid);
+ op.setTxnNumber(txnNumber);
+ op.setOpType(repl::OpTypeEnum::kNoop);
+ op.setObject({});
+ op.setNss({});
+ op.setOpTime({{}, {}});
+ op.setWallClockTime({});
+ batch.emplace_back(op.toBSON());
+ }
- writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
+ std::list<repl::OplogEntry> derivedOpsForCrudWriters;
+ auto writerVectors =
+ _batchPreparer.makeCrudOpWriterVectors(batch, derivedOpsForCrudWriters);
+ ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(derivedOpsForCrudWriters.size(), 0U);
+ ASSERT_EQ(writerVectors[0].size(), 0U);
+ ASSERT_EQ(writerVectors[1].size(), 0U);
+
+ std::list<repl::OplogEntry> derivedOpsForSessionWriters;
+ writerVectors =
+ _batchPreparer.makeSessionOpWriterVectors(batch, derivedOpsForSessionWriters);
+ ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(derivedOpsForSessionWriters.size(), 0U);
+ ASSERT_EQ(writerVectors[0].size(), 0U);
+ ASSERT_EQ(writerVectors[1].size(), 0U);
+ };
+
+ runTest(boost::none, boost::none);
+
+ TxnNumber txnNumber{1};
+ runTest(makeLogicalSessionIdForTest(), txnNumber);
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest(), txnNumber);
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), txnNumber);
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForApplyOpsWithoutTxnNumber) {
OplogBatch batch;
- batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, boost::none, boost::none));
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0)}));
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_EQ(derivedOps.size(), 0U);
for (const auto& writer : writerVectors) {
ASSERT_TRUE(writer.empty());
}
}
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
- batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, lsid, 2));
+TEST_F(ReshardingOplogBatchPreparerTest,
+ SessionWriteVectorsDeriveCrudOpsForApplyOpsForRetryableInternalTransaction) {
+ const auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+ const TxnNumber txnNumber{1};
+
+ OplogBatch batch;
+ // 'makeApplyOpsForInsert' uses the "_id" of each document as the "stmtId" for its insert
+ // operation. The insert operation without a stmtId should not have a derived operation.
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSONObj(), BSON("_id" << 1)},
+ lsid,
+ txnNumber,
+ false /* isPrepare */,
+ false /* isPartial */));
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
ASSERT_FALSE(writerVectors.empty());
auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
+
+ ASSERT_EQ(writer.size(), 2U);
+ ASSERT_EQ(derivedOps.size(), 2U);
+ for (size_t i = 0; i < writer.size(); ++i) {
+ ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
+ }
}
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) {
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
- batch.emplace_back(makeApplyOps(BSON("_id" << 3), true, false, lsid, 2));
- batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, 2));
+ OplogBatch batch;
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
+ lsid,
+ txnNumber,
+ false /* isPrepare */,
+ false /* isPartial */));
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_FALSE(writerVectors.empty());
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT_EQ(writer.size(), 2U);
+ ASSERT_EQ(derivedOps.size(), 2U);
+ for (size_t i = 0; i < writer.size(); ++i) {
+ ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
+ }
+ } else {
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
+ ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ }
+ };
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
}
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeUnpreparedTxn) {
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
- batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, 2));
+ OplogBatch batch;
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
+ lsid,
+ txnNumber,
+ false /* isPrepare */,
+ true /* isPartial */));
+ batch.emplace_back(makeApplyOpsForInsert(
+ {BSON("_id" << 2)}, lsid, txnNumber, false /* isPrepare */, false /* isPartial */));
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- ASSERT_FALSE(writerVectors.empty());
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT_EQ(writer.size(), 3U);
+ ASSERT_EQ(derivedOps.size(), 3U);
+ for (size_t i = 0; i < writer.size(); ++i) {
+ ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
+ }
+ } else {
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
+ ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ }
+ };
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+}
+
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallCommittedPreparedTxn) {
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
+
+ OplogBatch batch;
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
+ lsid,
+ txnNumber,
+ true /* isPrepare */,
+ false /* isPartial */));
+ batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber));
+
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT_EQ(writer.size(), 2U);
+ ASSERT_EQ(derivedOps.size(), 2U);
+ for (size_t i = 0; i < writer.size(); ++i) {
+ ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
+ }
+ } else {
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
+ ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ }
+ };
+
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+}
+
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeCommittedPreparedTxn) {
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
+
+ OplogBatch batch;
+ batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
+ lsid,
+ txnNumber,
+ false /* isPrepare */,
+ true /* isPartial */));
+ batch.emplace_back(makeApplyOpsForInsert(
+ {BSON("_id" << 2)}, lsid, txnNumber, true /* isPrepare */, false /* isPartial */));
+ batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber));
+
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT_EQ(writer.size(), 3U);
+ ASSERT_EQ(derivedOps.size(), 3U);
+ for (size_t i = 0; i < writer.size(); ++i) {
+ ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
+ }
+ } else {
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
+ ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ }
+ };
+
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+}
+
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) {
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
+
+ OplogBatch batch;
+ batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, txnNumber));
+
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
+ ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction);
+ };
+
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) {
- OplogBatch batch;
- auto lsid = makeLogicalSessionIdForTest();
+ auto runTest = [&](const LogicalSessionId& lsid) {
+ const TxnNumber txnNumber{1};
- batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, true, lsid, 2));
+ OplogBatch batch;
+ batch.emplace_back(makeApplyOpsForInsert(
+ {BSON("_id" << 0)}, lsid, txnNumber, false /* isPrepare */, true /* isPartial */));
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT_FALSE(writerVectors.empty());
+ auto writer = getNonEmptyWriterVector(writerVectors);
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(derivedOps.size(), 1U);
+ ASSERT_EQ(writer[0]->getSessionId(), *getParentSessionId(lsid));
+ ASSERT_EQ(*writer[0]->getTxnNumber(), *lsid.getTxnNumber());
+ ASSERT(writer[0]->getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(writer[0]->getObject(), (BSON("_id" << 0)));
+ } else {
+ ASSERT_EQ(derivedOps.size(), 0U);
+ for (const auto& writer : writerVectors) {
+ ASSERT_TRUE(writer.empty());
+ }
+ }
+ };
- for (const auto& writer : writerVectors) {
- ASSERT_TRUE(writer.empty());
- }
+ runTest(makeLogicalSessionIdForTest());
+ runTest(makeLogicalSessionIdWithTxnUUIDForTest());
+ runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
}
} // namespace
diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
index 5d6b635e9e4..123a740f119 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
@@ -78,8 +78,29 @@ repl::OpTime ReshardingOplogSessionApplication::_logPrePostImage(
}
boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryApplyOperation(
- OperationContext* opCtx, const repl::OplogEntry& op) const {
+ OperationContext* opCtx, const mongo::repl::OplogEntry& op) const {
+ invariant(op.getSessionId());
+ invariant(op.getTxnNumber());
+
auto lsid = *op.getSessionId();
+ if (isInternalSessionForNonRetryableWrite(lsid)) {
+ // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
+ // non-retryable writes.
+ return boost::none;
+ }
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ // The oplog preparer should have turned each applyOps oplog entry for a retryable internal
+ // transaction into retryable write CRUD oplog entries.
+ invariant(op.getCommandType() != repl::OplogEntry::CommandType::kApplyOps);
+
+ if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
+ // Skip this oplog entry since there is no retryable write history to apply and writing
+ // a sentinel noop oplog entry would make retryable write statements that successfully
+ // executed outside of this internal transaction not retryable.
+ return boost::none;
+ }
+ }
+
auto txnNumber = *op.getTxnNumber();
bool isRetryableWrite = op.isCrudOpType();
@@ -88,6 +109,7 @@ boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryAp
auto stmtIds =
isRetryableWrite ? op.getStatementIds() : std::vector<StmtId>{kIncompleteHistoryStmtId};
+ invariant(!stmtIds.empty());
boost::optional<repl::OpTime> preImageOpTime;
if (auto preImageOp = op.getPreImageOp()) {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
index 6c57d2271a5..1116a41ec2a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
@@ -68,6 +68,8 @@ public:
MongoDSessionCatalog::onStepUp(opCtx.get());
}
+
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
}
repl::OpTime insertSessionRecord(OperationContext* opCtx,
@@ -947,5 +949,66 @@ TEST_F(ReshardingOplogSessionApplicationTest, IncomingTxnHasHigherTxnNumberThanP
ASSERT_OK(hitPreparedTxn->getNoThrow());
}
+TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingAbortedRetryableInternalTransaction) {
+ auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
+
+ TxnNumber incomingTxnNumber = 100;
+
+ auto opTime = [&] {
+ auto opCtx = makeOperationContext();
+ return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3});
+ }();
+
+ // 'makeFinishTxnOp' returns an abortTransaction oplog entry.
+ auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
+
+ {
+ auto opCtx = makeOperationContext();
+ ReshardingOplogSessionApplication applier;
+ auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry);
+ ASSERT_FALSE(bool(hitPreparedTxn));
+ }
+
+ {
+ auto opCtx = makeOperationContext();
+ auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp());
+ ASSERT_EQ(foundOps.size(), 0U);
+
+ auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid);
+ ASSERT_FALSE(bool(sessionTxnRecord));
+ }
+}
+
+TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingNonRetryableInternalTransaction) {
+ // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
+ // non-retryable writes.
+ auto lsid = makeLogicalSessionIdWithTxnUUIDForTest();
+
+ TxnNumber incomingTxnNumber = 100;
+
+ auto opTime = [&] {
+ auto opCtx = makeOperationContext();
+ return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3});
+ }();
+
+ auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
+
+ {
+ auto opCtx = makeOperationContext();
+ ReshardingOplogSessionApplication applier;
+ auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry);
+ ASSERT_FALSE(bool(hitPreparedTxn));
+ }
+
+ {
+ auto opCtx = makeOperationContext();
+ auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp());
+ ASSERT_EQ(foundOps.size(), 0U);
+
+ auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid);
+ ASSERT_FALSE(bool(sessionTxnRecord));
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index cfb088a7655..a6d9836a334 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -167,8 +167,25 @@ boost::optional<SessionTxnRecord> ReshardingTxnCloner::_getNextRecord(OperationC
boost::optional<SharedSemiFuture<void>> ReshardingTxnCloner::doOneRecord(
OperationContext* opCtx, const SessionTxnRecord& donorRecord) {
+ auto sessionId = donorRecord.getSessionId();
+ auto txnNumber = donorRecord.getTxnNum();
+
+ if (isInternalSessionForNonRetryableWrite(sessionId)) {
+ // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
+ // non-retryable writes.
+ return boost::none;
+ }
+
+ if (isInternalSessionForRetryableWrite(sessionId)) {
+ // Turn this into write history for the retryable write that this internal transaction
+ // corresponds to in order to avoid making retryable internal transactions have a sentinel
+ // noop oplog entry at all.
+ txnNumber = *sessionId.getTxnNumber();
+ sessionId = *getParentSessionId(sessionId);
+ }
+
return resharding::data_copy::withSessionCheckedOut(
- opCtx, donorRecord.getSessionId(), donorRecord.getTxnNum(), boost::none /* stmtId */, [&] {
+ opCtx, sessionId, txnNumber, boost::none /* stmtId */, [&] {
resharding::data_copy::updateSessionRecord(opCtx,
TransactionParticipant::kDeadEndSentinel,
{kIncompleteHistoryStmtId},