summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp146
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.h25
-rw-r--r--src/mongo/db/repl/sync_tail.cpp10
-rw-r--r--src/mongo/db/repl/sync_tail.h10
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp183
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp28
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h3
7 files changed, 365 insertions, 40 deletions
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 984c9f84577..7b48bbb7e3c 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -75,7 +76,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::optional<Date_t> wallClockTime = boost::none,
boost::optional<StmtId> stmtId = boost::none,
boost::optional<UUID> uuid = boost::none,
- boost::optional<OpTime> prevOpTime = boost::none) {
+ boost::optional<OpTime> prevOpTime = boost::none,
+ const boost::optional<bool> prepare = boost::none) {
return repl::OplogEntry(opTime, // optime
boost::none, // hash
opType, // opType
@@ -89,10 +91,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
- prevOpTime, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none, // post-image optime
- boost::none); // prepare
+ prevOpTime, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ prepare); // prepare
}
} // namespace
@@ -365,8 +367,30 @@ Status IdempotencyTest::resetState() {
void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) {
ASSERT_OK(resetState());
- ASSERT_OK(runOpsInitialSync(ops));
- auto state1 = validate();
+
+ // Write oplog entries to oplog collection.
+ for (auto&& entry : ops) {
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ _opCtx.get(),
+ NamespaceString::kRsOplogNamespace,
+ {entry.toBSON(), entry.getOpTime().getTimestamp()},
+ entry.getOpTime().getTerm()));
+ }
+
+ SyncTail syncTail(nullptr, // observer
+ nullptr, // consistency markers
+ nullptr, // storage interface
+ SyncTail::MultiSyncApplyFunc(),
+ nullptr, // writer pool
+ SyncTailTest::makeInitialSyncOptions());
+ std::vector<MultiApplier::OperationPtrs> writerVectors(1);
+ std::vector<MultiApplier::Operations> derivedOps;
+ // Derive ops for transactions if necessary.
+ syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+
+ const auto& opPtrs = writerVectors[0];
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
+ auto state1 = validateAllCollections();
auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size();
for (std::size_t i = 0; i < iterations; i++) {
@@ -374,28 +398,28 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
std::vector<OplogEntry> fullSequence;
if (sequenceType == SequenceType::kEntireSequence) {
- ASSERT_OK(runOpsInitialSync(ops));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
} else if (sequenceType == SequenceType::kAnyPrefix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1);
- ASSERT_OK(runOpsInitialSync(prefix));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end());
}
- ASSERT_OK(runOpsInitialSync(ops));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
if (sequenceType == SequenceType::kAnySuffix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
std::vector<OplogEntry> suffix(ops.begin() + i, ops.end());
- ASSERT_OK(runOpsInitialSync(suffix));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end());
}
- auto state2 = validate();
+ auto state2 = validateAllCollections();
if (state1 != state2) {
- FAIL(getStateString(state1, state2, fullSequence));
+ FAIL(getStateVectorString(state1, state2, fullSequence));
}
}
}
@@ -434,6 +458,59 @@ OplogEntry IdempotencyTest::dropIndex(const std::string& indexName, const UUID&
return makeCommandOplogEntry(nextOpTime(), nss, cmd, uuid);
}
+OplogEntry IdempotencyTest::prepare(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops) {
+ OperationSessionInfo info;
+ info.setSessionId(lsid);
+ info.setTxnNumber(txnNum);
+ return makeOplogEntry(nextOpTime(),
+ OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ BSON("applyOps" << ops),
+ boost::none /* o2 */,
+ info /* sessionInfo */,
+ Date_t::min() /* wallClockTime -- required but not checked */,
+ stmtId,
+ boost::none /* uuid */,
+ OpTime(),
+ true);
+}
+
+OplogEntry IdempotencyTest::commitUnprepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops) {
+ OperationSessionInfo info;
+ info.setSessionId(lsid);
+ info.setTxnNumber(txnNum);
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, stmtId, OpTime());
+}
+
+OplogEntry IdempotencyTest::commitPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime) {
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(),
+ nss,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << prepareOpTime.getTimestamp()),
+ lsid,
+ txnNum,
+ stmtId,
+ prepareOpTime);
+}
+
+OplogEntry IdempotencyTest::abortPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime) {
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, stmtId, prepareOpTime);
+}
+
std::string IdempotencyTest::computeDataHash(Collection* collection) {
auto desc = collection->getIndexCatalog()->findIdIndex(_opCtx.get());
ASSERT_TRUE(desc);
@@ -462,7 +539,22 @@ std::string IdempotencyTest::computeDataHash(Collection* collection) {
return digestToString(d);
}
-CollectionState IdempotencyTest::validate() {
+std::vector<CollectionState> IdempotencyTest::validateAllCollections() {
+ std::vector<CollectionState> collStates;
+ auto& uuidCatalog = UUIDCatalog::get(_opCtx.get());
+ auto dbs = uuidCatalog.getAllDbNames();
+ for (auto& db : dbs) {
+ // Skip local database.
+ if (db != "local") {
+ for (const auto& nss : uuidCatalog.getAllCollectionNamesFromDb(db)) {
+ collStates.push_back(validate(nss));
+ }
+ }
+ }
+ return collStates;
+}
+
+CollectionState IdempotencyTest::validate(const NamespaceString& nss) {
auto collUUID = [&]() -> OptionalCollectionUUID {
AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss);
if (auto collection = autoColl.getCollection()) {
@@ -520,8 +612,34 @@ std::string IdempotencyTest::getStateString(const CollectionState& state1,
return sb.str();
}
+std::string IdempotencyTest::getStateVectorString(std::vector<CollectionState>& state1,
+ std::vector<CollectionState>& state2,
+ const std::vector<OplogEntry>& ops) {
+ StringBuilder sb;
+ sb << "The states: ";
+ for (const auto& s : state1) {
+ sb << s << " ";
+ }
+ sb << "do not match with the states: ";
+ for (const auto& s : state2) {
+ sb << s << " ";
+ }
+ sb << " found after applying the operations a second time, therefore breaking idempotency.";
+ return sb.str();
+}
+
template OplogEntry IdempotencyTest::update<int>(int _id, const BSONObj& obj);
template OplogEntry IdempotencyTest::update<const char*>(char const* _id, const BSONObj& obj);
+BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc) {
+ return BSON("op"
+ << "i"
+ << "ns"
+ << nss.toString()
+ << "ui"
+ << uuid
+ << "o"
+ << doc);
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h
index 6c386073015..325effc7e22 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.h
+++ b/src/mongo/db/repl/idempotency_test_fixture.h
@@ -97,6 +97,22 @@ protected:
OplogEntry update(IdType _id, const BSONObj& obj);
OplogEntry buildIndex(const BSONObj& indexSpec, const BSONObj& options, const UUID& uuid);
OplogEntry dropIndex(const std::string& indexName, const UUID& uuid);
+ OplogEntry prepare(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops);
+ OplogEntry commitUnprepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops);
+ OplogEntry commitPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime);
+ OplogEntry abortPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime);
virtual Status resetState();
/**
@@ -120,10 +136,15 @@ protected:
virtual std::string getStateString(const CollectionState& state1,
const CollectionState& state2,
const std::vector<OplogEntry>& ops);
+
+ virtual std::string getStateVectorString(std::vector<CollectionState>& state1,
+ std::vector<CollectionState>& state2,
+ const std::vector<OplogEntry>& ops);
/**
* Validate data and indexes. Return the MD5 hash of the documents ordered by _id.
*/
- CollectionState validate();
+ CollectionState validate(const NamespaceString& nss = NamespaceString("test.foo"));
+ std::vector<CollectionState> validateAllCollections();
NamespaceString nss{"test.foo"};
};
@@ -179,5 +200,7 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
TxnNumber txnNum,
StmtId stmtId,
boost::optional<OpTime> prevOpTime = boost::none);
+
+BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index d0bf1b00b1e..485aee900c8 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1238,10 +1238,10 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
}
}
-void SyncTail::_fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+void SyncTail::fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps) {
SessionUpdateTracker sessionUpdateTracker;
_fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
@@ -1318,7 +1318,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- _fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+ fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 84a6086f606..8b8d4a4a5e0 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -230,6 +230,11 @@ public:
*/
StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
+ void fillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps);
+
private:
class OpQueueBatcher;
@@ -241,11 +246,6 @@ private:
std::vector<MultiApplier::Operations>* derivedOps,
SessionUpdateTracker* sessionUpdateTracker);
- void _fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps);
-
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
* non-const pointers to inner vectors into func.
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 09a2cbbc40c..cb3fc8011b3 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -2347,11 +2347,10 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
- DBDirectClient client(_opCtx.get());
- auto result = client.findOne(
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())});
- ASSERT_TRUE(result.isEmpty());
+ ASSERT_FALSE(docExists(
+ _opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
}
TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) {
@@ -2577,11 +2576,10 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}));
- DBDirectClient client(_opCtx.get());
- auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- {BSON(SessionTxnRecord::kSessionIdFieldName
- << preImageSessionInfo.getSessionId()->toBSON())});
- ASSERT_TRUE(result.isEmpty());
+ ASSERT_FALSE(docExists(_opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName
+ << preImageSessionInfo.getSessionId()->toBSON())));
}
TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
@@ -2603,11 +2601,10 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}));
- DBDirectClient client(_opCtx.get());
- auto result = client.findOne(
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())});
- ASSERT_TRUE(result.isEmpty());
+ ASSERT_FALSE(docExists(
+ _opCtx.get(),
+ NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())));
}
TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) {
@@ -2644,6 +2641,162 @@ TEST_F(IdempotencyTest, ConvertToCappedNamespaceNotFound) {
ASSERT_FALSE(autoColl.getDb());
}
+// Document used by transaction idempotency tests.
+const BSONObj doc = fromjson("{_id: 1}");
+
+TEST_F(IdempotencyTest, CommitUnpreparedTransaction) {
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(0);
+
+ auto commitOp = commitUnprepared(
+ lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)));
+
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_RECOVERING));
+
+ testOpsAreIdempotent({commitOp});
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+}
+
+TEST_F(IdempotencyTest, CommitUnpreparedTransactionDataPartiallyApplied) {
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(0);
+
+ NamespaceString nss2("test.coll2");
+ auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+
+ auto commitOp = commitUnprepared(lsid,
+ txnNum,
+ StmtId(0),
+ BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)
+ << makeInsertApplyOpsEntry(nss2, uuid2, doc)));
+
+ // Manually insert one of the documents so that the data will partially reflect the transaction
+ // when the commitTransaction oplog entry is applied during initial sync.
+ ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(),
+ nss,
+ {doc, commitOp.getOpTime().getTimestamp()},
+ commitOp.getOpTime().getTerm()));
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+ ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc));
+
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_RECOVERING));
+
+ testOpsAreIdempotent({commitOp});
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+ ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc));
+}
+
+TEST_F(IdempotencyTest, CommitPreparedTransaction) {
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(0);
+
+ auto prepareOp =
+ prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)));
+
+ auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime());
+
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_RECOVERING));
+
+ testOpsAreIdempotent({prepareOp, commitOp});
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+}
+
+TEST_F(IdempotencyTest, CommitPreparedTransactionDataPartiallyApplied) {
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(0);
+
+ NamespaceString nss2("test.coll2");
+ auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2);
+
+ auto prepareOp = prepare(lsid,
+ txnNum,
+ StmtId(0),
+ BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)
+ << makeInsertApplyOpsEntry(nss2, uuid2, doc)));
+
+ auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime());
+
+ // Manually insert one of the documents so that the data will partially reflect the transaction
+ // when the commitTransaction oplog entry is applied during initial sync.
+ ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(),
+ nss,
+ {doc, commitOp.getOpTime().getTimestamp()},
+ commitOp.getOpTime().getTerm()));
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+ ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc));
+
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_RECOVERING));
+
+ testOpsAreIdempotent({prepareOp, commitOp});
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ commitOp.getOpTime(),
+ *commitOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kCommitted);
+ ASSERT_TRUE(docExists(_opCtx.get(), nss, doc));
+ ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc));
+}
+
+TEST_F(IdempotencyTest, AbortPreparedTransaction) {
+ // TODO: SERVER-36492 Fix this test
+ return;
+ createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
+ auto lsid = makeLogicalSessionId(_opCtx.get());
+ TxnNumber txnNum(0);
+
+ auto prepareOp =
+ prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc)));
+ auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime());
+
+ ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(MemberState::RS_RECOVERING));
+
+ testOpsAreIdempotent({prepareOp, abortOp});
+ repl::checkTxnTable(_opCtx.get(),
+ lsid,
+ txnNum,
+ abortOp.getOpTime(),
+ *abortOp.getWallClockTime(),
+ boost::none,
+ DurableTxnStateEnum::kAborted);
+ ASSERT_FALSE(docExists(_opCtx.get(), nss, doc));
+}
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 9b8557a6a82..9ef501f89b9 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -250,6 +250,28 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
return Status::OK();
}
+Status SyncTailTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) {
+ auto options = makeInitialSyncOptions();
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ SyncTail::MultiSyncApplyFunc(),
+ nullptr,
+ options);
+ // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD
+ // operations provided by idempotency tests.
+ for (auto& op : ops) {
+ MultiApplier::OperationPtrs opsPtrs;
+ opsPtrs.push_back(op);
+ WorkerMultikeyPathInfo pathInfo;
+ auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+ return Status::OK();
+}
+
void checkTxnTable(OperationContext* opCtx,
const LogicalSessionId& lsid,
const TxnNumber& txnNum,
@@ -301,5 +323,11 @@ StatusWith<BSONObj> CollectionReader::next() {
return obj;
}
+bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) {
+ DBDirectClient client(opCtx);
+ auto result = client.findOne(nss.ns(), {doc});
+ return !result.isEmpty();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index e58d11b26dc..c1362aad0f2 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -146,6 +146,7 @@ protected:
Status runOpsSteadyState(std::vector<OplogEntry> ops);
Status runOpInitialSync(const OplogEntry& entry);
Status runOpsInitialSync(std::vector<OplogEntry> ops);
+ Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops);
UUID kUuid{UUID::gen()};
};
@@ -175,5 +176,7 @@ void checkTxnTable(OperationContext* opCtx,
boost::optional<repl::OpTime> expectedStartOpTime,
boost::optional<DurableTxnStateEnum> expectedState);
+bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc);
+
} // namespace repl
} // namespace mongo