summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2020-11-02 21:16:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-20 16:55:53 +0000
commita16b6119d7361395040d40dafdee2c3448081c8c (patch)
tree20e45de44164767676eb19d2ea3ddb2956af2276
parent74fe293b16ae2d84983c32cf7ce6cbe70fa55f7a (diff)
downloadmongo-a16b6119d7361395040d40dafdee2c3448081c8c.tar.gz
SERVER-49904 Handle retryable write oplog entries in the resharding oplog applier
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp194
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h16
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp499
-rw-r--r--src/mongo/db/s/resharding_txn_cloner.cpp2
4 files changed, 695 insertions, 16 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 7b93d3c643f..3eae9fd93e0 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -35,10 +35,15 @@
#include <fmt/format.h>
+#include "mongo/base/simple_string_data_comparator.h"
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/logv2/log.h"
#include "mongo/logv2/redaction.h"
#include "mongo/stdx/mutex.h"
@@ -50,6 +55,115 @@ namespace mongo {
using namespace fmt::literals;
+namespace {
+
+// Used for marking intermediate oplog entries created by the resharding applier that will require
+// special handling in the repl writer thread. These intermediate oplog entries serve as a message
+// container and will never be written to an actual collection.
+const BSONObj kReshardingOplogTag(BSON("$resharding" << 1));
+
+/**
+ * Writes the oplog entries and updates to config.transactions for enabling retrying the write
+ * described in the oplog entry.
+ */
+Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx,
+ const repl::OplogEntry& oplog) {
+ auto txnNumber = *oplog.getTxnNumber();
+
+ opCtx->setLogicalSessionId(*oplog.getSessionId());
+ opCtx->setTxnNumber(txnNumber);
+
+ MongoDOperationContextSession ocs(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ uassert(4990400, "Failed to get transaction Participant", txnParticipant);
+ const auto stmtId = *oplog.getStatementId();
+
+ try {
+ txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none);
+
+ if (txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
+ // Skip the incoming statement because it has already been logged locally.
+ return Status::OK();
+ }
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::TransactionTooOld) {
+ return Status::OK();
+ } else if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ // If the transaction chain is incomplete because oplog was truncated, just ignore the
+ // incoming oplog and don't attempt to 'patch up' the missing pieces.
+ return Status::OK();
+ }
+
+ throw;
+ }
+
+ // TODO: handle pre/post image
+
+ auto rawOplogBSON = oplog.toBSON();
+ auto noOpOplog = uassertStatusOK(repl::MutableOplogEntry::parse(rawOplogBSON));
+ noOpOplog.setObject2(rawOplogBSON);
+ noOpOplog.setNss({});
+ noOpOplog.setObject(BSON("$reshardingOplogApply" << 1));
+ // TODO: link pre/post image
+ noOpOplog.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime());
+ noOpOplog.setOpType(repl::OpTypeEnum::kNoop);
+ // Reset OpTime so logOp() can assign a new one.
+ noOpOplog.setOpTime(OplogSlot());
+ noOpOplog.setWallClockTime(Date_t::now());
+
+ writeConflictRetry(
+ opCtx,
+ "ReshardingUpdateConfigTransaction",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW. Take the
+ // transaction table db lock to ensure the same lock ordering with normal replicated
+ // updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ const auto& oplogOpTime = repl::logOp(opCtx, &noOpOplog);
+
+ uassert(4990402,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << noOpOplog.getOpTime().toString() << ": "
+ << redact(noOpOplog.toBSON()),
+ !oplogOpTime.isNull());
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setSessionId(*oplog.getSessionId());
+ sessionTxnRecord.setTxnNum(txnNumber);
+ sessionTxnRecord.setLastWriteOpTime(oplogOpTime);
+ sessionTxnRecord.setLastWriteDate(noOpOplog.getWallClockTime());
+ txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord);
+
+ wunit.commit();
+ });
+
+ return Status::OK();
+}
+
+/**
+ * Returns true if the given oplog is a special no-op oplog entry that contains the information for
+ * retryable writes.
+ */
+bool isRetryableNoOp(const repl::OplogEntryOrGroupedInserts& oplogOrGroupedInserts) {
+ if (oplogOrGroupedInserts.isGroupedInserts()) {
+ return false;
+ }
+
+ const auto& op = oplogOrGroupedInserts.getOp();
+ if (op.getOpType() != repl::OpTypeEnum::kNoop) {
+ return false;
+ }
+
+ return op.getObject().woCompare(kReshardingOplogTag) == 0;
+}
+
+} // anonymous namespace
+
ReshardingOplogApplier::ReshardingOplogApplier(
ServiceContext* service,
ReshardingSourceId sourceId,
@@ -218,8 +332,7 @@ void ReshardingOplogApplier::_scheduleNextBatch() {
Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) {
// TODO: handle config.transaction updates with derivedOps
- std::vector<std::vector<repl::OplogEntry>> derivedOps;
- auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &derivedOps);
+ auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &_currentDerivedOps);
_currentWriterVectors.swap(writerVectors);
auto pf = makePromiseFuture<void>();
@@ -249,14 +362,51 @@ Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) {
return std::move(pf.future);
}
+repl::OplogEntry convertToNoOpWithReshardingTag(const repl::OplogEntry& oplog) {
+ return repl::OplogEntry(oplog.getOpTime(),
+ oplog.getHash(),
+ repl::OpTypeEnum::kNoop,
+ oplog.getNss(),
+ boost::none /* uuid */,
+ oplog.getFromMigrate(),
+ oplog.getVersion(),
+ kReshardingOplogTag,
+ // Set the o2 field with the original oplog.
+ oplog.toBSON(),
+ oplog.getOperationSessionInfo(),
+ oplog.getUpsert(),
+ oplog.getWallClockTime(),
+ oplog.getStatementId(),
+ oplog.getPrevWriteOpTimeInTransaction(),
+ oplog.getPreImageOpTime(),
+ oplog.getPostImageOpTime(),
+ oplog.getDestinedRecipient(),
+ oplog.get_id());
+}
+
+void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry*>>* writerVectors,
+ const std::vector<repl::OplogEntry>& derivedOps) {
+ for (auto&& op : derivedOps) {
+ invariant(op.getObject().woCompare(kReshardingOplogTag) == 0);
+ uassert(4990403,
+ "expected resharding derived oplog to have session id: {}"_format(
+ op.toBSON().toString()),
+ op.getSessionId());
+
+ LogicalSessionIdHash hasher;
+ auto writerId = hasher(*op.getSessionId()) % writerVectors->size();
+ (*writerVectors)[writerId].push_back(&op);
+ }
+}
+
std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillWriterVectors(
- OperationContext* opCtx,
- OplogBatch* batch,
- std::vector<std::vector<repl::OplogEntry>>* derivedOps) {
+ OperationContext* opCtx, OplogBatch* batch, OplogBatch* derivedOps) {
std::vector<std::vector<const repl::OplogEntry*>> writerVectors(
_writerPool->getStats().numThreads);
repl::CachedCollectionProperties collPropertiesCache;
+ LogicalSessionIdMap<RetryableOpsList> sessionTracker;
+
for (auto&& op : *batch) {
uassert(5012000,
"Resharding oplog application does not support prepared transactions.",
@@ -268,12 +418,37 @@ std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillW
if (op.getOpType() == repl::OpTypeEnum::kNoop)
continue;
- // TODO: handle prePostImageOps.
-
repl::OplogApplierUtils::addToWriterVector(
opCtx, &op, &writerVectors, &collPropertiesCache);
+
+ if (auto sessionId = op.getSessionId()) {
+ auto& retryableOpList = sessionTracker[*sessionId];
+ auto txnNumber = *op.getTxnNumber();
+
+ if (retryableOpList.txnNum == txnNumber) {
+ retryableOpList.ops.push_back(&op);
+ } else if (retryableOpList.txnNum < txnNumber) {
+ retryableOpList.ops.clear();
+ retryableOpList.ops.push_back(&op);
+ retryableOpList.txnNum = txnNumber;
+ } else {
+ uasserted(4990401,
+ str::stream() << "retryable oplog applier for " << _sourceId.toBSON()
+ << " encountered out of order txnNum, saw " << op.toBSON()
+ << " after " << retryableOpList.ops.front()->toBSON());
+ }
+ }
}
+ for (const auto& sessionsToUpdate : sessionTracker) {
+ for (const auto& op : sessionsToUpdate.second.ops) {
+ auto noOpWithPrePost = convertToNoOpWithReshardingTag(*op);
+ derivedOps->push_back(std::move(noOpWithPrePost));
+ }
+ }
+
+ addDerivedOpsToWriterVector(&writerVectors, _currentDerivedOps);
+
return writerVectors;
}
@@ -308,6 +483,10 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts(
// We don't care about applied stats in resharding.
auto incrementOpsAppliedStats = [] {};
+ if (isRetryableNoOp(entryOrGroupedInserts)) {
+ return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp());
+ }
+
// We always use oplog application mode 'kInitialSync', because we're applying oplog entries to
// a cloned database the way initial sync does.
return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx,
@@ -428,6 +607,7 @@ Timestamp ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationCont
<< oplogId.toBSON())));
_currentBatchToApply.clear();
+ _currentDerivedOps.clear();
return lastAppliedTs;
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index cbb8fb41d8f..a95bb3e7029 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -85,6 +85,12 @@ private:
enum class Stage { kStarted, kErrorOccurred, kReachedCloningTS, kFinished };
+ struct RetryableOpsList {
+ public:
+ TxnNumber txnNum{kUninitializedTxnNumber};
+ std::vector<repl::OplogEntry*> ops;
+ };
+
/**
* Schedule to collect and apply the next batch of oplog entries.
*/
@@ -99,10 +105,9 @@ private:
/**
* Partition the currently buffered oplog entries so they can be applied in parallel.
*/
- std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors(
- OperationContext* opCtx,
- OplogBatch* batch,
- std::vector<std::vector<repl::OplogEntry>>* derivedOps);
+ std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors(OperationContext* opCtx,
+ OplogBatch* batch,
+ OplogBatch* derivedOps);
/**
* Apply a slice of oplog entries from the current batch for a worker thread.
@@ -190,6 +195,9 @@ private:
// (R) Buffer for the current batch of oplog entries to apply.
OplogBatch _currentBatchToApply;
+ // (R) Buffer for internally generated oplog entries that needs to be processed for this batch.
+ OplogBatch _currentDerivedOps;
+
// (R) A temporary scratch pad that contains pointers to oplog entries in _currentBatchToApply
// that is used by the writer vector when applying oplog in parallel.
std::vector<std::vector<const repl::OplogEntry*>> _currentWriterVectors;
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index e2a038450c0..d6dd459d982 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -34,13 +34,17 @@
#include <fmt/format.h>
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/sharding_mongod_test_fixture.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/unittest.h"
@@ -113,6 +117,15 @@ public:
repl::OpTypeEnum opType,
const BSONObj& obj1,
const boost::optional<BSONObj> obj2) {
+ return makeOplog(opTime, opType, obj1, obj2, {}, boost::none);
+ }
+
+ repl::OplogEntry makeOplog(const repl::OpTime& opTime,
+ repl::OpTypeEnum opType,
+ const BSONObj& obj1,
+ const boost::optional<BSONObj> obj2,
+ const OperationSessionInfo& sessionInfo,
+ const boost::optional<StmtId>& statementId) {
ReshardingDonorOplogId id(opTime.getTimestamp(), opTime.getTimestamp());
return repl::OplogEntry(opTime,
boost::none /* hash */,
@@ -123,10 +136,10 @@ public:
0 /* version */,
obj1,
obj2,
- {} /* sessionInfo */,
+ sessionInfo,
boost::none /* upsert */,
{} /* date */,
- boost::none /* statementId */,
+ statementId,
boost::none /* prevWrite */,
boost::none /* preImage */,
boost::none /* postImage */,
@@ -158,7 +171,7 @@ public:
return _sourceId;
}
-private:
+protected:
static constexpr int kWriterPoolSize = 4;
const NamespaceString kOplogNs{"config.localReshardingOplogBuffer.xxx.yyy"};
const NamespaceString kCrudNs{"foo.bar"};
@@ -810,7 +823,6 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) {
getExecutor(),
writerPool());
-
auto future = applier.applyUntilCloneFinishedTs();
future.get();
@@ -829,5 +841,484 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) {
ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs());
}
+class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest {
+public:
+ void setUp() override {
+ ReshardingOplogApplierTest::setUp();
+
+ repl::StorageInterface::set(operationContext()->getServiceContext(),
+ std::make_unique<repl::StorageInterfaceImpl>());
+ MongoDSessionCatalog::onStepUp(operationContext());
+ }
+
+ static repl::OpTime insertRetryableOplog(OperationContext* opCtx,
+ const NamespaceString& nss,
+ UUID uuid,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ StmtId stmtId,
+ repl::OpTime prevOpTime) {
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("TestValue" << 0));
+ oplogEntry.setWallClockTime(Date_t::now());
+ if (stmtId != kUninitializedStmtId) {
+ oplogEntry.setSessionId(lsid);
+ oplogEntry.setTxnNumber(txnNumber);
+ oplogEntry.setStatementId(stmtId);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime);
+ }
+ return repl::logOp(opCtx, &oplogEntry);
+ }
+
+ void writeTxnRecord(const LogicalSessionId& lsid,
+ const TxnNumber& txnNum,
+ StmtId stmtId,
+ repl::OpTime prevOpTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ auto newClient = operationContext()->getServiceContext()->makeClient("testWriteTxnRecord");
+ AlternativeClientRegion acr(newClient);
+ auto scopedOpCtx = cc().makeOperationContext();
+ auto opCtx = scopedOpCtx.get();
+
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+ OperationContextSession scopedSession(opCtx);
+
+ const auto session = OperationContextSession::get(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.refreshFromStorageIfNeeded(opCtx);
+ txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none);
+
+ AutoGetCollection autoColl(opCtx, kCrudNs, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ const auto opTime = insertRetryableOplog(
+ opCtx, kCrudNs, kCrudUUID, session->getSessionId(), txnNum, stmtId, prevOpTime);
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setSessionId(session->getSessionId());
+ sessionTxnRecord.setTxnNum(txnNum);
+ sessionTxnRecord.setLastWriteOpTime(opTime);
+ sessionTxnRecord.setLastWriteDate(Date_t::now());
+ sessionTxnRecord.setState(txnState);
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord);
+ wuow.commit();
+ }
+
+ bool isWriteAlreadyExecuted(const OperationSessionInfo& session, StmtId stmtId) {
+ auto newClient =
+ operationContext()->getServiceContext()->makeClient("testCheckStmtExecuted");
+ AlternativeClientRegion acr(newClient);
+ auto scopedOpCtx = cc().makeOperationContext();
+ auto opCtx = scopedOpCtx.get();
+
+ opCtx->setLogicalSessionId(*session.getSessionId());
+ OperationContextSession scopedSession(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.refreshFromStorageIfNeeded(opCtx);
+ txnParticipant.beginOrContinue(opCtx, *session.getTxnNumber(), boost::none, boost::none);
+
+ return txnParticipant.checkStatementExecuted(opCtx, stmtId).is_initialized();
+ }
+};
+
+TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
+ std::queue<repl::OplogEntry> crudOps;
+
+ OperationSessionInfo session1;
+ session1.setSessionId(makeLogicalSessionIdForTest());
+ session1.setTxnNumber(1);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session1,
+ 1));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none,
+ session1,
+ 2));
+
+ OperationSessionInfo session2;
+ session2.setSessionId(makeLogicalSessionIdForTest());
+ session2.setTxnNumber(1);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kUpdate,
+ BSON("$set" << BSON("x" << 1)),
+ BSON("_id" << 2),
+ session2,
+ 1));
+
+ OperationSessionInfo session3;
+ session3.setSessionId(makeLogicalSessionIdForTest());
+ session3.setTxnNumber(1);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1),
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << 1),
+ boost::none,
+ session3,
+ 1));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSONObj(), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "x" << 1), doc);
+
+ auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
+ ASSERT_TRUE(progressDoc);
+ ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime());
+ ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs());
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session1, 1));
+ ASSERT_TRUE(isWriteAlreadyExecuted(session1, 2));
+ ASSERT_TRUE(isWriteAlreadyExecuted(session2, 1));
+ ASSERT_TRUE(isWriteAlreadyExecuted(session3, 1));
+
+ ASSERT_FALSE(isWriteAlreadyExecuted(session2, 2));
+ ASSERT_FALSE(isWriteAlreadyExecuted(session3, 2));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
+ std::queue<repl::OplogEntry> crudOps;
+
+ OperationSessionInfo session1;
+ session1.setSessionId(makeLogicalSessionIdForTest());
+ session1.setTxnNumber(1);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session1,
+ 1));
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none,
+ session1,
+ 2));
+
+ OperationSessionInfo session2;
+ session2.setSessionId(makeLogicalSessionIdForTest());
+ session2.setTxnNumber(1);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 3),
+ boost::none,
+ session2,
+ 1));
+
+ session1.setTxnNumber(2);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 4),
+ boost::none,
+ session1,
+ 21));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 3), doc);
+
+ doc = client.findOne(appliedToNs().ns(), BSON("_id" << 4));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 4), doc);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session1, 21));
+ ASSERT_TRUE(isWriteAlreadyExecuted(session2, 1));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) {
+ auto lsid = makeLogicalSessionIdForTest();
+
+ writeTxnRecord(lsid, 2, 1, {}, boost::none);
+
+ std::queue<repl::OplogEntry> crudOps;
+
+ OperationSessionInfo session;
+ session.setSessionId(lsid);
+ session.setTxnNumber(5);
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session,
+ 21));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session, 21));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) {
+ auto lsid = makeLogicalSessionIdForTest();
+ const TxnNumber existingTxnNum = 20;
+ const StmtId existingStmtId = 1;
+ writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+
+ OperationSessionInfo session;
+ const TxnNumber incomingTxnNum = 15;
+ const StmtId incomingStmtId = 21;
+ session.setSessionId(lsid);
+ session.setTxnNumber(incomingTxnNum);
+
+ std::queue<repl::OplogEntry> crudOps;
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session,
+ incomingStmtId));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ // Op should always be applied, even if session info was not compatible.
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ ASSERT_THROWS_CODE(isWriteAlreadyExecuted(session, incomingStmtId),
+ DBException,
+ ErrorCodes::TransactionTooOld);
+
+ // Check that original txn info is intact.
+ OperationSessionInfo origSession;
+ origSession.setSessionId(lsid);
+ origSession.setTxnNumber(existingTxnNum);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(origSession, existingStmtId));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxnNum) {
+ auto lsid = makeLogicalSessionIdForTest();
+ const TxnNumber existingTxnNum = 20;
+ const StmtId existingStmtId = 1;
+ writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+
+ OperationSessionInfo session;
+ const TxnNumber incomingTxnNum = 25;
+ const StmtId incomingStmtId = 21;
+ session.setSessionId(lsid);
+ session.setTxnNumber(incomingTxnNum);
+
+ std::queue<repl::OplogEntry> crudOps;
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session,
+ incomingStmtId));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ // Op should always be applied, even if session info was not compatible.
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) {
+ auto lsid = makeLogicalSessionIdForTest();
+ const TxnNumber existingTxnNum = 20;
+ const StmtId existingStmtId = 1;
+ writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+
+ OperationSessionInfo session;
+ const TxnNumber incomingTxnNum = existingTxnNum;
+ const StmtId incomingStmtId = 21;
+ session.setSessionId(lsid);
+ session.setTxnNumber(incomingTxnNum);
+
+ std::queue<repl::OplogEntry> crudOps;
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session,
+ incomingStmtId));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId));
+ ASSERT_TRUE(isWriteAlreadyExecuted(session, existingStmtId));
+}
+
+TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted) {
+ auto lsid = makeLogicalSessionIdForTest();
+ const TxnNumber existingTxnNum = 20;
+ const StmtId existingStmtId = 1;
+ writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+
+ OperationSessionInfo session;
+ const TxnNumber incomingTxnNum = existingTxnNum;
+ const StmtId incomingStmtId = existingStmtId;
+ session.setSessionId(lsid);
+ session.setTxnNumber(incomingTxnNum);
+
+ std::queue<repl::OplogEntry> crudOps;
+
+ crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none,
+ session,
+ incomingStmtId));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps));
+ ReshardingOplogApplier applier(getServiceContext(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ Timestamp(6, 3),
+ std::move(iterator),
+ 2 /* batchSize */,
+ getExecutor(),
+ writerPool());
+
+ auto future = applier.applyUntilCloneFinishedTs();
+ future.get();
+
+ future = applier.applyUntilDone();
+ future.get();
+
+ DBDirectClient client(operationContext());
+ auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
+
+ ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId));
+}
+
} // unnamed namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp
index 8fb88ac3437..893d6d7d252 100644
--- a/src/mongo/db/s/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding_txn_cloner.cpp
@@ -179,8 +179,8 @@ void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTra
auto ocs = std::make_unique<MongoDOperationContextSession>(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- // Which error code should this be? what message?
uassert(4989900, "Failed to get transaction Participant", txnParticipant);
+
try {
txnParticipant.beginOrContinue(
opCtx, donorTransaction.getTxnNum(), boost::none, boost::none);