summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2022-03-06 01:06:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-06 01:33:17 +0000
commitc176492ec7c149a081585ad52416cdc630edab81 (patch)
tree9ac61bafb7431d442ea614f93332fa2fd8229fe2 /src/mongo/db/s/resharding
parent365c2667c56b7cddb7ef0c69e9440794e3d84c09 (diff)
downloadmongo-c176492ec7c149a081585ad52416cdc630edab81.tar.gz
Revert "SERVER-63441 Handle retryable internal transactions with multiple oplog entries when migrating sessions during resharding"
This reverts commit 97d47a47ed9629ae4206a7b8e20be61aef8d17ec.
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp65
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp13
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp71
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h10
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp414
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_session_application.cpp24
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp63
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp19
9 files changed, 148 insertions, 534 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index 8be9fbb4669..e000d42f5ae 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -307,16 +307,13 @@ void updateSessionRecord(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(txnParticipant, "Must be called with session checked out");
- const auto sessionId = *opCtx->getLogicalSessionId();
- const auto txnNumber = *opCtx->getTxnNumber();
-
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag);
oplogEntry.setObject2(std::move(o2Field));
oplogEntry.setNss({});
- oplogEntry.setSessionId(sessionId);
- oplogEntry.setTxnNumber(txnNumber);
+ oplogEntry.setSessionId(opCtx->getLogicalSessionId());
+ oplogEntry.setTxnNumber(opCtx->getTxnNumber());
oplogEntry.setStatementIds(stmtIds);
oplogEntry.setPreImageOpTime(std::move(preImageOpTime));
oplogEntry.setPostImageOpTime(std::move(postImageOpTime));
@@ -324,33 +321,37 @@ void updateSessionRecord(OperationContext* opCtx,
oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
oplogEntry.setFromMigrate(true);
- writeConflictRetry(
- opCtx,
- "resharding::data_copy::updateSessionRecord",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
-
- WriteUnitOfWork wuow(opCtx);
- repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
-
- uassert(4989901,
- str::stream() << "Failed to create new oplog entry: "
- << redact(oplogEntry.toBSON()),
- !opTime.isNull());
-
- // Use the same wallTime as the oplog entry since SessionUpdateTracker
- // looks at the oplog entry wallTime when replicating.
- SessionTxnRecord sessionTxnRecord(
- sessionId, txnNumber, std::move(opTime), oplogEntry.getWallClockTime());
- if (isInternalSessionForRetryableWrite(sessionId)) {
- sessionTxnRecord.setParentSessionId(*getParentSessionId(sessionId));
- }
-
- txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord);
-
- wuow.commit();
- });
+ writeConflictRetry(opCtx,
+ "resharding::data_copy::updateSessionRecord",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+
+ WriteUnitOfWork wuow(opCtx);
+ repl::OpTime opTime = repl::logOp(opCtx, &oplogEntry);
+
+ uassert(4989901,
+ str::stream() << "Failed to create new oplog entry: "
+ << redact(oplogEntry.toBSON()),
+ !opTime.isNull());
+
+ // Use the same wallTime as the oplog entry since SessionUpdateTracker
+ // looks at the oplog entry wallTime when replicating.
+ SessionTxnRecord sessionTxnRecord(*oplogEntry.getSessionId(),
+ *oplogEntry.getTxnNumber(),
+ std::move(opTime),
+ oplogEntry.getWallClockTime());
+
+ if (isInternalSessionForRetryableWrite(*oplogEntry.getSessionId())) {
+ sessionTxnRecord.setParentSessionId(
+ *getParentSessionId(*oplogEntry.getSessionId()));
+ }
+
+ txnParticipant.onRetryableWriteCloningCompleted(
+ opCtx, stmtIds, sessionTxnRecord);
+
+ wuow.commit();
+ });
}
} // namespace mongo::resharding::data_copy
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 452cef3ea9a..b65d5376689 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -78,8 +78,8 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch(
CancellationToken cancelToken,
CancelableOperationContextFactory factory) {
Timer latencyTimer;
- auto crudWriterVectors = _batchPreparer.makeCrudOpWriterVectors(
- _currentBatchToApply, _currentDerivedOpsForCrudWriters);
+ auto crudWriterVectors =
+ _batchPreparer.makeCrudOpWriterVectors(_currentBatchToApply, _currentDerivedOps);
CancellationSource errorSource(cancelToken);
@@ -99,8 +99,7 @@ SemiFuture<void> ReshardingOplogApplier::_applyBatch(
}
}
- auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(
- _currentBatchToApply, _currentDerivedOpsForSessionWriters);
+ auto sessionWriterVectors = _batchPreparer.makeSessionOpWriterVectors(_currentBatchToApply);
batchApplierFutures.reserve(crudWriterVectors.size() + sessionWriterVectors.size());
for (auto&& writer : sessionWriterVectors) {
@@ -153,8 +152,7 @@ SemiFuture<void> ReshardingOplogApplier::run(
"reshardingApplyOplogBatchTwice failpoint enabled, applying batch "
"a second time",
"batchSize"_attr = _currentBatchToApply.size());
- _currentDerivedOpsForCrudWriters.clear();
- _currentDerivedOpsForSessionWriters.clear();
+ _currentDerivedOps.clear();
return _applyBatch(executor, cancelToken, factory);
}
return SemiFuture<void>();
@@ -242,8 +240,7 @@ void ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationContext*
}
_currentBatchToApply.clear();
- _currentDerivedOpsForCrudWriters.clear();
- _currentDerivedOpsForSessionWriters.clear();
+ _currentDerivedOps.clear();
}
NamespaceString ReshardingOplogApplier::ensureStashCollectionExists(
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index f7835c2ee64..0c3a8bc8306 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -146,8 +146,7 @@ private:
OplogBatch _currentBatchToApply;
// Buffer for internally generated oplog entries that needs to be processed for this batch.
- std::list<repl::OplogEntry> _currentDerivedOpsForCrudWriters;
- std::list<repl::OplogEntry> _currentDerivedOpsForSessionWriters;
+ std::list<repl::OplogEntry> _currentDerivedOps;
// The source of the oplog entries to be applied.
std::unique_ptr<ReshardingDonorOplogIteratorInterface> _oplogIter;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
index bde8901926a..a6f73b01cec 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.cpp
@@ -50,7 +50,8 @@ namespace {
* Return true if we need to update config.transactions collection for this oplog entry.
*/
bool shouldUpdateTxnTable(const repl::OplogEntry& op) {
- if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
+ if (op.getCommandType() == repl::OplogEntry::CommandType::kCommitTransaction ||
+ op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
return true;
}
@@ -63,21 +64,8 @@ bool shouldUpdateTxnTable(const repl::OplogEntry& op) {
}
if (op.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
- // This applyOps oplog entry is guaranteed to correspond to a committed transaction since
- // the resharding aggregation pipeline does not output applyOps oplog entries for aborted
- // transactions (i.e. it only outputs the abortTransaction oplog entry).
-
- if (isInternalSessionForRetryableWrite(*op.getSessionId())) {
- // For a retryable internal transaction, we need to update the config.transactions
- // collection upon writing the noop oplog entries for retryable operations contained
- // within each applyOps oplog entry.
- return true;
- }
-
- // The resharding aggregation pipeline also does not output the commitTransaction oplog
- // entry so for a non-retryable transaction, we need to the update to the
- // config.transactions collection upon seeing the final applyOps oplog entry.
- return !op.isPartialTransaction();
+ auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
+ return !applyOpsInfo.getPrepare() && !applyOpsInfo.getPartialTxn();
}
return false;
@@ -128,8 +116,6 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
}
auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
- // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with
- // WouldChangeOwningShard sentinel noop entry.
uassert(
ErrorCodes::OplogOperationUnsupported,
str::stream() << "Commands within applyOps are not supported during resharding: "
@@ -162,7 +148,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeCrudOpWriterVectors(
}
WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
- const OplogBatchToPrepare& batch, std::list<OplogEntry>& derivedOps) const {
+ const OplogBatchToPrepare& batch) const {
auto writerVectors = _makeEmptyWriterVectors();
struct SessionOpsList {
@@ -202,52 +188,7 @@ WriterVectors ReshardingOplogBatchPreparer::makeSessionOpWriterVectors(
} else if (op.isCommand()) {
throwIfUnsupportedCommandOp(op);
- if (!shouldUpdateTxnTable(op)) {
- continue;
- }
-
- auto sessionId = *op.getSessionId();
-
- if (isInternalSessionForRetryableWrite(sessionId) &&
- op.getCommandType() == OplogEntry::CommandType::kApplyOps) {
- // Derive retryable write CRUD oplog entries from this retryable internal
- // transaction applyOps oplog entry.
-
- auto applyOpsInfo = repl::ApplyOpsCommandInfo::parse(op.getObject());
- // TODO (SERVER-63880): Make resharding handle applyOps oplog entries with
- // WouldChangeOwningShard sentinel noop entry.
- uassert(ErrorCodes::OplogOperationUnsupported,
- str::stream()
- << "Commands within applyOps are not supported during resharding: "
- << redact(op.toBSONForLogging()),
- applyOpsInfo.areOpsCrudOnly());
-
- auto unrolledOp =
- uassertStatusOK(repl::MutableOplogEntry::parse(op.getEntry().toBSON()));
- unrolledOp.setSessionId(*getParentSessionId(sessionId));
- unrolledOp.setTxnNumber(*sessionId.getTxnNumber());
-
- for (const auto& innerOp : applyOpsInfo.getOperations()) {
- auto replOp = repl::ReplOperation::parse(
- {"ReshardingOplogBatchPreparer::makeSessionOpWriterVectors innerOp"},
- innerOp);
- if (replOp.getStatementIds().empty()) {
- // Skip this operation since it is not retryable.
- continue;
- }
- unrolledOp.setDurableReplOperation(replOp);
-
- // There isn't a direct way to convert from a MutableOplogEntry to a
- // DurableOplogEntry or OplogEntry. We serialize the unrolledOp to have it get
- // re-parsed into an OplogEntry.
- auto& derivedOp = derivedOps.emplace_back(unrolledOp.toBSON());
- invariant(derivedOp.isCrudOpType());
-
- // `&derivedOp` is guaranteed to remain stable while we append more derived
- // oplog entries because `derivedOps` is a std::list.
- updateSessionTracker(&derivedOp);
- }
- } else {
+ if (shouldUpdateTxnTable(op)) {
updateSessionTracker(&op);
}
} else {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
index 27e7934b326..f4f035ccdbd 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer.h
@@ -82,13 +82,11 @@ public:
* to the config.transactions record for a higher txnNumber will cause any updates in `batch`
* for lower txnNumbers to be elided.
*
- * The returned writer vectors refer to memory owned by `batch` and `derivedOps`. The caller
- * must take care to ensure both `batch` and `derivedOps` outlive the writer vectors all being
- * applied and must take care not to modify `batch` or `derivedOps` until after the writer
- * vectors have all been applied.
+ * The returned writer vectors refer to memory owned by `batch`. The caller must take care to
+ * ensure `batch` outlives the writer vectors all being applied and must take care not to modify
+ * `batch` until after the writer vectors have all been applied.
*/
- WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch,
- std::list<OplogEntry>& derivedOps) const;
+ WriterVectors makeSessionOpWriterVectors(const OplogBatchToPrepare& batch) const;
static void throwIfUnsupportedCommandOp(const OplogEntry& op);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
index c98104fcf3a..07ae756eeae 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_preparer_test.cpp
@@ -31,7 +31,6 @@
#include <boost/optional/optional_io.hpp>
-#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/s/resharding/resharding_oplog_batch_preparer.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
@@ -67,42 +66,25 @@ protected:
return {op.toBSON()};
}
- /**
- * Returns an applyOps oplog entry containing insert operations for the given documents. If the
- * session is an internal session for retryable writes, uses the "_id" of each document as its
- * statement id.
- */
- repl::OplogEntry makeApplyOpsForInsert(const std::vector<BSONObj> documents,
- boost::optional<LogicalSessionId> lsid = boost::none,
- boost::optional<TxnNumber> txnNumber = boost::none,
- boost::optional<bool> isPrepare = boost::none,
- boost::optional<bool> isPartial = boost::none) {
- BSONObjBuilder applyOpsBuilder;
+ repl::OplogEntry makeApplyOps(BSONObj document,
+ bool isPrepare,
+ bool isPartial,
+ boost::optional<LogicalSessionId> lsid,
+ boost::optional<TxnNumber> txnNumber) {
- BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
- for (const auto& document : documents) {
- auto insertOp = repl::DurableReplOperation(repl::OpTypeEnum::kInsert, {}, document);
- if (lsid && isInternalSessionForRetryableWrite(*lsid)) {
- if (!document.hasField("_id")) {
- continue;
- }
- auto id = document.getIntField("_id");
- insertOp.setStatementIds({{id}});
- }
- opsArrayBuilder.append(insertOp.toBSON());
- }
- opsArrayBuilder.done();
+ std::vector<mongo::BSONObj> operations;
+ auto insertOp = repl::MutableOplogEntry::makeInsertOperation(
+ NamespaceString("foo.bar"), UUID::gen(), document, document);
+
+ BSONObjBuilder applyOpsBuilder;
+ applyOpsBuilder.append("applyOps", BSON_ARRAY(insertOp.toBSON()));
if (isPrepare) {
- invariant(lsid);
- invariant(txnNumber);
- applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, *isPrepare);
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPrepareFieldName, true);
}
if (isPartial) {
- invariant(lsid);
- invariant(txnNumber);
- applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, *isPartial);
+ applyOpsBuilder.append(repl::ApplyOpsCommandInfoBase::kPartialTxnFieldName, true);
}
repl::MutableOplogEntry op;
@@ -212,13 +194,24 @@ TEST_F(ReshardingOplogBatchPreparerTest, CreatesDerivedCrudOpsForApplyOps) {
// We use the "fromApplyOps" field in the document to distinguish between the regular oplog
// entries from the derived ones later on.
int numOps = 20;
- std::vector<BSONObj> docsForApplyOps;
for (int i = 0; i < numOps; ++i) {
batch.emplace_back(makeUpdateOp(BSON("_id" << i << "fromApplyOps" << false)));
- docsForApplyOps.push_back(BSON("_id" << i << "fromApplyOps" << true));
}
- batch.emplace_back(makeApplyOpsForInsert(docsForApplyOps));
+ BSONObjBuilder applyOpsBuilder;
+ {
+ BSONArrayBuilder opsArrayBuilder = applyOpsBuilder.subarrayStart("applyOps");
+ for (int i = 0; i < numOps; ++i) {
+ // We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
+ // deal with setting the 'o2' field.
+ opsArrayBuilder.append(
+ repl::DurableReplOperation(
+ repl::OpTypeEnum::kInsert, {}, BSON("_id" << i << "fromApplyOps" << true))
+ .toBSON());
+ }
+ }
+
+ batch.emplace_back(makeCommandOp(applyOpsBuilder.done()));
std::list<repl::OplogEntry> derivedOps;
auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
@@ -261,8 +254,12 @@ TEST_F(ReshardingOplogBatchPreparerTest, InterleavesDerivedCrudOpsForApplyOps) {
} else {
// We use OpTypeEnum::kInsert rather than OpTypeEnum::kUpdate here to avoid needing to
// deal with setting the 'o2' field.
- batch.emplace_back(
- makeApplyOpsForInsert({BSON("_id" << 0 << "n" << i << "fromApplyOps" << true)}));
+ batch.emplace_back(makeCommandOp(BSON(
+ "applyOps" << BSON_ARRAY(repl::DurableReplOperation(
+ repl::OpTypeEnum::kInsert,
+ {},
+ BSON("_id" << 0 << "n" << i << "fromApplyOps" << true))
+ .toBSON()))));
}
}
@@ -293,10 +290,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, AssignsSessionOpsToWriterVectorsByLsid)
batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{1}));
}
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
auto writer = getNonEmptyWriterVector(writerVectors);
ASSERT_EQ(writer.size(), numOps);
@@ -316,10 +311,8 @@ TEST_F(ReshardingOplogBatchPreparerTest, DiscardsLowerTxnNumberSessionOps) {
batch.emplace_back(makeUpdateOp(BSON("_id" << i), lsid, TxnNumber{i}));
}
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOps.size(), 0U);
auto writer = getNonEmptyWriterVector(writerVectors);
ASSERT_EQ(writer.size(), 1U);
@@ -337,8 +330,7 @@ TEST_F(ReshardingOplogBatchPreparerTest, DistributesSessionOpsToWriterVectorsFai
makeUpdateOp(BSON("_id" << i), makeLogicalSessionIdForTest(), TxnNumber{1}));
}
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
// Use `numOps / 5` as a generous definition for "fair". There's no guarantee for how the lsid
@@ -365,319 +357,107 @@ TEST_F(ReshardingOplogBatchPreparerTest, ThrowsForUnsupportedCommandOps) {
batch.emplace_back(makeCommandOp(BSON("commitIndexBuild" << 1)));
std::list<repl::OplogEntry> derivedOps;
- ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch, derivedOps),
+ ASSERT_THROWS_CODE(_batchPreparer.makeSessionOpWriterVectors(batch),
DBException,
ErrorCodes::OplogOperationUnsupported);
}
}
TEST_F(ReshardingOplogBatchPreparerTest, DiscardsNoops) {
- auto runTest = [&](const boost::optional<LogicalSessionId>& lsid,
- const boost::optional<TxnNumber>& txnNumber) {
- OplogBatch batch;
+ OplogBatch batch;
- int numOps = 5;
- for (int i = 0; i < numOps; ++i) {
- repl::MutableOplogEntry op;
- op.setSessionId(lsid);
- op.setTxnNumber(txnNumber);
- op.setOpType(repl::OpTypeEnum::kNoop);
- op.setObject({});
- op.setNss({});
- op.setOpTime({{}, {}});
- op.setWallClockTime({});
- batch.emplace_back(op.toBSON());
- }
+ int numOps = 5;
+ for (int i = 0; i < numOps; ++i) {
+ repl::MutableOplogEntry op;
+ op.setOpType(repl::OpTypeEnum::kNoop);
+ op.setObject({});
+ op.setNss({});
+ op.setOpTime({{}, {}});
+ op.setWallClockTime({});
+ batch.emplace_back(op.toBSON());
+ }
+
+ std::list<repl::OplogEntry> derivedOps;
+ auto writerVectors = _batchPreparer.makeCrudOpWriterVectors(batch, derivedOps);
+ ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(derivedOps.size(), 0U);
+ ASSERT_EQ(writerVectors[0].size(), 0U);
+ ASSERT_EQ(writerVectors[1].size(), 0U);
- std::list<repl::OplogEntry> derivedOpsForCrudWriters;
- auto writerVectors =
- _batchPreparer.makeCrudOpWriterVectors(batch, derivedOpsForCrudWriters);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOpsForCrudWriters.size(), 0U);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
-
- std::list<repl::OplogEntry> derivedOpsForSessionWriters;
- writerVectors =
- _batchPreparer.makeSessionOpWriterVectors(batch, derivedOpsForSessionWriters);
- ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
- ASSERT_EQ(derivedOpsForSessionWriters.size(), 0U);
- ASSERT_EQ(writerVectors[0].size(), 0U);
- ASSERT_EQ(writerVectors[1].size(), 0U);
- };
-
- runTest(boost::none, boost::none);
-
- TxnNumber txnNumber{1};
- runTest(makeLogicalSessionIdForTest(), txnNumber);
- runTest(makeLogicalSessionIdWithTxnUUIDForTest(), txnNumber);
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), txnNumber);
+ writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ ASSERT_EQ(writerVectors.size(), kNumWriterVectors);
+ ASSERT_EQ(writerVectors[0].size(), 0U);
+ ASSERT_EQ(writerVectors[1].size(), 0U);
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForApplyOpsWithoutTxnNumber) {
OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0)}));
+ batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, boost::none, boost::none));
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_EQ(derivedOps.size(), 0U);
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
for (const auto& writer : writerVectors) {
ASSERT_TRUE(writer.empty());
}
}
-
-TEST_F(ReshardingOplogBatchPreparerTest,
- SessionWriteVectorsDeriveCrudOpsForApplyOpsForRetryableInternalTransaction) {
- const auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
- const TxnNumber txnNumber{1};
-
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) {
OplogBatch batch;
- // 'makeApplyOpsForInsert' uses the "_id" of each document as the "stmtId" for its insert
- // operation. The insert operation without a stmtId should not have a derived operation.
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSONObj(), BSON("_id" << 1)},
- lsid,
- txnNumber,
- false /* isPrepare */,
- false /* isPartial */));
+ auto lsid = makeLogicalSessionIdForTest();
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
+ batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, false, lsid, 2));
+
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
ASSERT_FALSE(writerVectors.empty());
auto writer = getNonEmptyWriterVector(writerVectors);
-
- ASSERT_EQ(writer.size(), 2U);
- ASSERT_EQ(derivedOps.size(), 2U);
- for (size_t i = 0; i < writer.size(); ++i) {
- ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
- }
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallUnpreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
-
- OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
- lsid,
- txnNumber,
- false /* isPrepare */,
- false /* isPartial */));
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_FALSE(writerVectors.empty());
-
- auto writer = getNonEmptyWriterVector(writerVectors);
-
- if (isInternalSessionForRetryableWrite(lsid)) {
- ASSERT_EQ(writer.size(), 2U);
- ASSERT_EQ(derivedOps.size(), 2U);
- for (size_t i = 0; i < writer.size(); ++i) {
- ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
- }
- } else {
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
- ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
- }
- };
-
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeUnpreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
-
- OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
- lsid,
- txnNumber,
- false /* isPrepare */,
- true /* isPartial */));
- batch.emplace_back(makeApplyOpsForInsert(
- {BSON("_id" << 2)}, lsid, txnNumber, false /* isPrepare */, false /* isPartial */));
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_FALSE(writerVectors.empty());
-
- auto writer = getNonEmptyWriterVector(writerVectors);
-
- if (isInternalSessionForRetryableWrite(lsid)) {
- ASSERT_EQ(writer.size(), 3U);
- ASSERT_EQ(derivedOps.size(), 3U);
- for (size_t i = 0; i < writer.size(); ++i) {
- ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
- }
- } else {
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
- ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
- }
- };
-
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
-}
-
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForSmallCommittedPreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
-
- OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
- lsid,
- txnNumber,
- true /* isPrepare */,
- false /* isPartial */));
- batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber));
-
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_FALSE(writerVectors.empty());
-
- auto writer = getNonEmptyWriterVector(writerVectors);
-
- if (isInternalSessionForRetryableWrite(lsid)) {
- ASSERT_EQ(writer.size(), 2U);
- ASSERT_EQ(derivedOps.size(), 2U);
- for (size_t i = 0; i < writer.size(); ++i) {
- ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
- }
- } else {
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
- ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
- }
- };
-
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
}
-TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForLargeCommittedPreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
+TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForCommittedTxn) {
+ OplogBatch batch;
+ auto lsid = makeLogicalSessionIdForTest();
- OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert({BSON("_id" << 0), BSON("_id" << 1)},
- lsid,
- txnNumber,
- false /* isPrepare */,
- true /* isPartial */));
- batch.emplace_back(makeApplyOpsForInsert(
- {BSON("_id" << 2)}, lsid, txnNumber, true /* isPrepare */, false /* isPartial */));
- batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, txnNumber));
+ batch.emplace_back(makeApplyOps(BSON("_id" << 3), true, false, lsid, 2));
+ batch.emplace_back(makeCommandOp(BSON("commitTransaction" << 1), lsid, 2));
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_FALSE(writerVectors.empty());
-
- auto writer = getNonEmptyWriterVector(writerVectors);
-
- if (isInternalSessionForRetryableWrite(lsid)) {
- ASSERT_EQ(writer.size(), 3U);
- ASSERT_EQ(derivedOps.size(), 3U);
- for (size_t i = 0; i < writer.size(); ++i) {
- ASSERT_EQ(writer[i]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[i]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[i]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[i]->getObject(), (BSON("_id" << static_cast<int>(i))));
- }
- } else {
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
- ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
- }
- };
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ ASSERT_FALSE(writerVectors.empty());
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ auto writer = getNonEmptyWriterVector(writerVectors);
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForAbortedPreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
+ OplogBatch batch;
+ auto lsid = makeLogicalSessionIdForTest();
- OplogBatch batch;
- batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, txnNumber));
+ batch.emplace_back(makeCommandOp(BSON("abortTransaction" << 1), lsid, 2));
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- ASSERT_FALSE(writerVectors.empty());
-
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 0U);
- ASSERT_EQ(writer[0]->getSessionId(), lsid);
- ASSERT_EQ(*writer[0]->getTxnNumber(), txnNumber);
- ASSERT(writer[0]->getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction);
- };
-
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
+ ASSERT_FALSE(writerVectors.empty());
+
+ auto writer = getNonEmptyWriterVector(writerVectors);
+ ASSERT_EQ(writer.size(), 1U);
+ ASSERT_EQ(writer[0]->getSessionId(), lsid);
+ ASSERT_EQ(*writer[0]->getTxnNumber(), 2);
}
TEST_F(ReshardingOplogBatchPreparerTest, SessionWriteVectorsForPartialUnpreparedTxn) {
- auto runTest = [&](const LogicalSessionId& lsid) {
- const TxnNumber txnNumber{1};
+ OplogBatch batch;
+ auto lsid = makeLogicalSessionIdForTest();
- OplogBatch batch;
- batch.emplace_back(makeApplyOpsForInsert(
- {BSON("_id" << 0)}, lsid, txnNumber, false /* isPrepare */, true /* isPartial */));
+ batch.emplace_back(makeApplyOps(BSON("_id" << 3), false, true, lsid, 2));
- std::list<repl::OplogEntry> derivedOps;
- auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch, derivedOps);
- if (isInternalSessionForRetryableWrite(lsid)) {
- ASSERT_FALSE(writerVectors.empty());
- auto writer = getNonEmptyWriterVector(writerVectors);
- ASSERT_EQ(writer.size(), 1U);
- ASSERT_EQ(derivedOps.size(), 1U);
- ASSERT_EQ(writer[0]->getSessionId(), *getParentSessionId(lsid));
- ASSERT_EQ(*writer[0]->getTxnNumber(), *lsid.getTxnNumber());
- ASSERT(writer[0]->getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(writer[0]->getObject(), (BSON("_id" << 0)));
- } else {
- ASSERT_EQ(derivedOps.size(), 0U);
- for (const auto& writer : writerVectors) {
- ASSERT_TRUE(writer.empty());
- }
- }
- };
+ auto writerVectors = _batchPreparer.makeSessionOpWriterVectors(batch);
- runTest(makeLogicalSessionIdForTest());
- runTest(makeLogicalSessionIdWithTxnUUIDForTest());
- runTest(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ for (const auto& writer : writerVectors) {
+ ASSERT_TRUE(writer.empty());
+ }
}
} // namespace
diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
index 123a740f119..5d6b635e9e4 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_session_application.cpp
@@ -78,29 +78,8 @@ repl::OpTime ReshardingOplogSessionApplication::_logPrePostImage(
}
boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryApplyOperation(
- OperationContext* opCtx, const mongo::repl::OplogEntry& op) const {
- invariant(op.getSessionId());
- invariant(op.getTxnNumber());
-
+ OperationContext* opCtx, const repl::OplogEntry& op) const {
auto lsid = *op.getSessionId();
- if (isInternalSessionForNonRetryableWrite(lsid)) {
- // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
- // non-retryable writes.
- return boost::none;
- }
- if (isInternalSessionForRetryableWrite(lsid)) {
- // The oplog preparer should have turned each applyOps oplog entry for a retryable internal
- // transaction into retryable write CRUD oplog entries.
- invariant(op.getCommandType() != repl::OplogEntry::CommandType::kApplyOps);
-
- if (op.getCommandType() == repl::OplogEntry::CommandType::kAbortTransaction) {
- // Skip this oplog entry since there is no retryable write history to apply and writing
- // a sentinel noop oplog entry would make retryable write statements that successfully
- // executed outside of this internal transaction not retryable.
- return boost::none;
- }
- }
-
auto txnNumber = *op.getTxnNumber();
bool isRetryableWrite = op.isCrudOpType();
@@ -109,7 +88,6 @@ boost::optional<SharedSemiFuture<void>> ReshardingOplogSessionApplication::tryAp
auto stmtIds =
isRetryableWrite ? op.getStatementIds() : std::vector<StmtId>{kIncompleteHistoryStmtId};
- invariant(!stmtIds.empty());
boost::optional<repl::OpTime> preImageOpTime;
if (auto preImageOp = op.getPreImageOp()) {
diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
index 1116a41ec2a..6c57d2271a5 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
@@ -68,8 +68,6 @@ public:
MongoDSessionCatalog::onStepUp(opCtx.get());
}
-
- serverGlobalParams.clusterRole = ClusterRole::ShardServer;
}
repl::OpTime insertSessionRecord(OperationContext* opCtx,
@@ -949,66 +947,5 @@ TEST_F(ReshardingOplogSessionApplicationTest, IncomingTxnHasHigherTxnNumberThanP
ASSERT_OK(hitPreparedTxn->getNoThrow());
}
-TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingAbortedRetryableInternalTransaction) {
- auto lsid = makeLogicalSessionIdWithTxnNumberAndUUIDForTest();
-
- TxnNumber incomingTxnNumber = 100;
-
- auto opTime = [&] {
- auto opCtx = makeOperationContext();
- return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3});
- }();
-
- // 'makeFinishTxnOp' returns an abortTransaction oplog entry.
- auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
-
- {
- auto opCtx = makeOperationContext();
- ReshardingOplogSessionApplication applier;
- auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry);
- ASSERT_FALSE(bool(hitPreparedTxn));
- }
-
- {
- auto opCtx = makeOperationContext();
- auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp());
- ASSERT_EQ(foundOps.size(), 0U);
-
- auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid);
- ASSERT_FALSE(bool(sessionTxnRecord));
- }
-}
-
-TEST_F(ReshardingOplogSessionApplicationTest, IgnoreIncomingNonRetryableInternalTransaction) {
- // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
- // non-retryable writes.
- auto lsid = makeLogicalSessionIdWithTxnUUIDForTest();
-
- TxnNumber incomingTxnNumber = 100;
-
- auto opTime = [&] {
- auto opCtx = makeOperationContext();
- return insertSessionRecord(opCtx.get(), makeLogicalSessionIdForTest(), 100, {3});
- }();
-
- auto oplogEntry = makeFinishTxnOp(lsid, incomingTxnNumber);
-
- {
- auto opCtx = makeOperationContext();
- ReshardingOplogSessionApplication applier;
- auto hitPreparedTxn = applier.tryApplyOperation(opCtx.get(), oplogEntry);
- ASSERT_FALSE(bool(hitPreparedTxn));
- }
-
- {
- auto opCtx = makeOperationContext();
- auto foundOps = findOplogEntriesNewerThan(opCtx.get(), opTime.getTimestamp());
- ASSERT_EQ(foundOps.size(), 0U);
-
- auto sessionTxnRecord = findSessionRecord(opCtx.get(), lsid);
- ASSERT_FALSE(bool(sessionTxnRecord));
- }
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index a6d9836a334..cfb088a7655 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -167,25 +167,8 @@ boost::optional<SessionTxnRecord> ReshardingTxnCloner::_getNextRecord(OperationC
boost::optional<SharedSemiFuture<void>> ReshardingTxnCloner::doOneRecord(
OperationContext* opCtx, const SessionTxnRecord& donorRecord) {
- auto sessionId = donorRecord.getSessionId();
- auto txnNumber = donorRecord.getTxnNum();
-
- if (isInternalSessionForNonRetryableWrite(sessionId)) {
- // TODO (SERVER-63877): Determine if resharding should migrate internal sessions for
- // non-retryable writes.
- return boost::none;
- }
-
- if (isInternalSessionForRetryableWrite(sessionId)) {
- // Turn this into write history for the retryable write that this internal transaction
- // corresponds to in order to avoid making retryable internal transactions have a sentinel
- // noop oplog entry at all.
- txnNumber = *sessionId.getTxnNumber();
- sessionId = *getParentSessionId(sessionId);
- }
-
return resharding::data_copy::withSessionCheckedOut(
- opCtx, sessionId, txnNumber, boost::none /* stmtId */, [&] {
+ opCtx, donorRecord.getSessionId(), donorRecord.getTxnNum(), boost::none /* stmtId */, [&] {
resharding::data_copy::updateSessionRecord(opCtx,
TransactionParticipant::kDeadEndSentinel,
{kIncompleteHistoryStmtId},