diff options
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 146 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.h | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 183 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.h | 3 |
7 files changed, 365 insertions, 40 deletions
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 984c9f84577..7b48bbb7e3c 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -40,6 +40,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -75,7 +76,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<Date_t> wallClockTime = boost::none, boost::optional<StmtId> stmtId = boost::none, boost::optional<UUID> uuid = boost::none, - boost::optional<OpTime> prevOpTime = boost::none) { + boost::optional<OpTime> prevOpTime = boost::none, + const boost::optional<bool> prepare = boost::none) { return repl::OplogEntry(opTime, // optime boost::none, // hash opType, // opType @@ -89,10 +91,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none, // upsert wallClockTime, // wall clock time stmtId, // statement id - prevOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none); // prepare + prevOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + prepare); // prepare } } // namespace @@ -365,8 +367,30 @@ Status IdempotencyTest::resetState() { void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) { ASSERT_OK(resetState()); - ASSERT_OK(runOpsInitialSync(ops)); - auto state1 = validate(); + + // Write oplog entries to oplog collection. + for (auto&& entry : ops) { + ASSERT_OK(getStorageInterface()->insertDocument( + _opCtx.get(), + NamespaceString::kRsOplogNamespace, + {entry.toBSON(), entry.getOpTime().getTimestamp()}, + entry.getOpTime().getTerm())); + } + + SyncTail syncTail(nullptr, // observer + nullptr, // consistency markers + nullptr, // storage interface + SyncTail::MultiSyncApplyFunc(), + nullptr, // writer pool + SyncTailTest::makeInitialSyncOptions()); + std::vector<MultiApplier::OperationPtrs> writerVectors(1); + std::vector<MultiApplier::Operations> derivedOps; + // Derive ops for transactions if necessary. + syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps); + + const auto& opPtrs = writerVectors[0]; + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); + auto state1 = validateAllCollections(); auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size(); for (std::size_t i = 0; i < iterations; i++) { @@ -374,28 +398,28 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence std::vector<OplogEntry> fullSequence; if (sequenceType == SequenceType::kEntireSequence) { - ASSERT_OK(runOpsInitialSync(ops)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); } else if (sequenceType == SequenceType::kAnyPrefix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1); - ASSERT_OK(runOpsInitialSync(prefix)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end()); } - ASSERT_OK(runOpsInitialSync(ops)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); if (sequenceType == SequenceType::kAnySuffix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector<OplogEntry> suffix(ops.begin() + i, ops.end()); - ASSERT_OK(runOpsInitialSync(suffix)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end()); } - auto state2 = validate(); + auto state2 = validateAllCollections(); if (state1 != state2) { - FAIL(getStateString(state1, state2, fullSequence)); + FAIL(getStateVectorString(state1, state2, fullSequence)); } } } @@ -434,6 +458,59 @@ OplogEntry IdempotencyTest::dropIndex(const std::string& indexName, const UUID& return makeCommandOplogEntry(nextOpTime(), nss, cmd, uuid); } +OplogEntry IdempotencyTest::prepare(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops) { + OperationSessionInfo info; + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + return makeOplogEntry(nextOpTime(), + OpTypeEnum::kCommand, + nss.getCommandNS(), + BSON("applyOps" << ops), + boost::none /* o2 */, + info /* sessionInfo */, + Date_t::min() /* wallClockTime -- required but not checked */, + stmtId, + boost::none /* uuid */, + OpTime(), + true); +} + +OplogEntry IdempotencyTest::commitUnprepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops) { + OperationSessionInfo info; + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, stmtId, OpTime()); +} + +OplogEntry IdempotencyTest::commitPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime) { + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), + nss, + BSON("commitTransaction" << 1 << "commitTimestamp" << prepareOpTime.getTimestamp()), + lsid, + txnNum, + stmtId, + prepareOpTime); +} + +OplogEntry IdempotencyTest::abortPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime) { + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, stmtId, prepareOpTime); +} + std::string IdempotencyTest::computeDataHash(Collection* collection) { auto desc = collection->getIndexCatalog()->findIdIndex(_opCtx.get()); ASSERT_TRUE(desc); @@ -462,7 +539,22 @@ std::string IdempotencyTest::computeDataHash(Collection* collection) { return digestToString(d); } -CollectionState IdempotencyTest::validate() { +std::vector<CollectionState> IdempotencyTest::validateAllCollections() { + std::vector<CollectionState> collStates; + auto& uuidCatalog = UUIDCatalog::get(_opCtx.get()); + auto dbs = uuidCatalog.getAllDbNames(); + for (auto& db : dbs) { + // Skip local database. + if (db != "local") { + for (const auto& nss : uuidCatalog.getAllCollectionNamesFromDb(db)) { + collStates.push_back(validate(nss)); + } + } + } + return collStates; +} + +CollectionState IdempotencyTest::validate(const NamespaceString& nss) { auto collUUID = [&]() -> OptionalCollectionUUID { AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); if (auto collection = autoColl.getCollection()) { @@ -520,8 +612,34 @@ std::string IdempotencyTest::getStateString(const CollectionState& state1, return sb.str(); } +std::string IdempotencyTest::getStateVectorString(std::vector<CollectionState>& state1, + std::vector<CollectionState>& state2, + const std::vector<OplogEntry>& ops) { + StringBuilder sb; + sb << "The states: "; + for (const auto& s : state1) { + sb << s << " "; + } + sb << "do not match with the states: "; + for (const auto& s : state2) { + sb << s << " "; + } + sb << " found after applying the operations a second time, therefore breaking idempotency."; + return sb.str(); +} + template OplogEntry IdempotencyTest::update<int>(int _id, const BSONObj& obj); template OplogEntry IdempotencyTest::update<const char*>(char const* _id, const BSONObj& obj); +BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc) { + return BSON("op" + << "i" + << "ns" + << nss.toString() + << "ui" + << uuid + << "o" + << doc); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h index 6c386073015..325effc7e22 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.h +++ b/src/mongo/db/repl/idempotency_test_fixture.h @@ -97,6 +97,22 @@ protected: OplogEntry update(IdType _id, const BSONObj& obj); OplogEntry buildIndex(const BSONObj& indexSpec, const BSONObj& options, const UUID& uuid); OplogEntry dropIndex(const std::string& indexName, const UUID& uuid); + OplogEntry prepare(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops); + OplogEntry commitUnprepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops); + OplogEntry commitPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime); + OplogEntry abortPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime); virtual Status resetState(); /** @@ -120,10 +136,15 @@ protected: virtual std::string getStateString(const CollectionState& state1, const CollectionState& state2, const std::vector<OplogEntry>& ops); + + virtual std::string getStateVectorString(std::vector<CollectionState>& state1, + std::vector<CollectionState>& state2, + const std::vector<OplogEntry>& ops); /** * Validate data and indexes. Return the MD5 hash of the documents ordered by _id. */ - CollectionState validate(); + CollectionState validate(const NamespaceString& nss = NamespaceString("test.foo")); + std::vector<CollectionState> validateAllCollections(); NamespaceString nss{"test.foo"}; }; @@ -179,5 +200,7 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId( TxnNumber txnNum, StmtId stmtId, boost::optional<OpTime> prevOpTime = boost::none); + +BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index d0bf1b00b1e..485aee900c8 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1238,10 +1238,10 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, } } -void SyncTail::_fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) { +void SyncTail::fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps) { SessionUpdateTracker sessionUpdateTracker; _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); @@ -1318,7 +1318,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - _fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 84a6086f606..8b8d4a4a5e0 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -230,6 +230,11 @@ public: */ StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops); + void fillWriterVectors(OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + std::vector<MultiApplier::Operations>* derivedOps); + private: class OpQueueBatcher; @@ -241,11 +246,6 @@ private: std::vector<MultiApplier::Operations>* derivedOps, SessionUpdateTracker* sessionUpdateTracker); - void _fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps); - /** * Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes * non-const pointers to inner vectors into func. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 09a2cbbc40c..cb3fc8011b3 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -2347,11 +2347,10 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); - DBDirectClient client(_opCtx.get()); - auto result = client.findOne( - NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())}); - ASSERT_TRUE(result.isEmpty()); + ASSERT_FALSE(docExists( + _opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); } TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTable) { @@ -2577,11 +2576,10 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) { nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog})); - DBDirectClient client(_opCtx.get()); - auto result = client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName - << preImageSessionInfo.getSessionId()->toBSON())}); - ASSERT_TRUE(result.isEmpty()); + ASSERT_FALSE(docExists(_opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName + << preImageSessionInfo.getSessionId()->toBSON()))); } TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { @@ -2603,11 +2601,10 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog})); - DBDirectClient client(_opCtx.get()); - auto result = client.findOne( - NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON())}); - ASSERT_TRUE(result.isEmpty()); + ASSERT_FALSE(docExists( + _opCtx.get(), + NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()))); } TEST_F(IdempotencyTest, EmptyCappedNamespaceNotFound) { @@ -2644,6 +2641,162 @@ TEST_F(IdempotencyTest, ConvertToCappedNamespaceNotFound) { ASSERT_FALSE(autoColl.getDb()); } +// Document used by transaction idempotency tests. +const BSONObj doc = fromjson("{_id: 1}"); + +TEST_F(IdempotencyTest, CommitUnpreparedTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto commitOp = commitUnprepared( + lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTest, CommitUnpreparedTransactionDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + NamespaceString nss2("test.coll2"); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto commitOp = commitUnprepared(lsid, + txnNum, + StmtId(0), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) + << makeInsertApplyOpsEntry(nss2, uuid2, doc))); + + // Manually insert one of the documents so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); +} + +TEST_F(IdempotencyTest, CommitPreparedTransaction) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto prepareOp = + prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + + auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); +} + +TEST_F(IdempotencyTest, CommitPreparedTransactionDataPartiallyApplied) { + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + NamespaceString nss2("test.coll2"); + auto uuid2 = createCollectionWithUuid(_opCtx.get(), nss2); + + auto prepareOp = prepare(lsid, + txnNum, + StmtId(0), + BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc) + << makeInsertApplyOpsEntry(nss2, uuid2, doc))); + + auto commitOp = commitPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + // Manually insert one of the documents so that the data will partially reflect the transaction + // when the commitTransaction oplog entry is applied during initial sync. + ASSERT_OK(getStorageInterface()->insertDocument(_opCtx.get(), + nss, + {doc, commitOp.getOpTime().getTimestamp()}, + commitOp.getOpTime().getTerm())); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_FALSE(docExists(_opCtx.get(), nss2, doc)); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, commitOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + commitOp.getOpTime(), + *commitOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kCommitted); + ASSERT_TRUE(docExists(_opCtx.get(), nss, doc)); + ASSERT_TRUE(docExists(_opCtx.get(), nss2, doc)); +} + +TEST_F(IdempotencyTest, AbortPreparedTransaction) { + // TODO: SERVER-36492 Fix this test + return; + createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + auto uuid = createCollectionWithUuid(_opCtx.get(), nss); + auto lsid = makeLogicalSessionId(_opCtx.get()); + TxnNumber txnNum(0); + + auto prepareOp = + prepare(lsid, txnNum, StmtId(0), BSON_ARRAY(makeInsertApplyOpsEntry(nss, uuid, doc))); + auto abortOp = abortPrepared(lsid, txnNum, StmtId(1), prepareOp.getOpTime()); + + ASSERT_OK(ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(MemberState::RS_RECOVERING)); + + testOpsAreIdempotent({prepareOp, abortOp}); + repl::checkTxnTable(_opCtx.get(), + lsid, + txnNum, + abortOp.getOpTime(), + *abortOp.getWallClockTime(), + boost::none, + DurableTxnStateEnum::kAborted); + ASSERT_FALSE(docExists(_opCtx.get(), nss, doc)); +} } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 9b8557a6a82..9ef501f89b9 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -250,6 +250,28 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { return Status::OK(); } +Status SyncTailTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) { + auto options = makeInitialSyncOptions(); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + SyncTail::MultiSyncApplyFunc(), + nullptr, + options); + // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD + // operations provided by idempotency tests. + for (auto& op : ops) { + MultiApplier::OperationPtrs opsPtrs; + opsPtrs.push_back(op); + WorkerMultikeyPathInfo pathInfo; + auto status = multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} + void checkTxnTable(OperationContext* opCtx, const LogicalSessionId& lsid, const TxnNumber& txnNum, @@ -301,5 +323,11 @@ StatusWith<BSONObj> CollectionReader::next() { return obj; } +bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { + DBDirectClient client(opCtx); + auto result = client.findOne(nss.ns(), {doc}); + return !result.isEmpty(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index e58d11b26dc..c1362aad0f2 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -146,6 +146,7 @@ protected: Status runOpsSteadyState(std::vector<OplogEntry> ops); Status runOpInitialSync(const OplogEntry& entry); Status runOpsInitialSync(std::vector<OplogEntry> ops); + Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops); UUID kUuid{UUID::gen()}; }; @@ -175,5 +176,7 @@ void checkTxnTable(OperationContext* opCtx, boost::optional<repl::OpTime> expectedStartOpTime, boost::optional<DurableTxnStateEnum> expectedState); +bool docExists(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc); + } // namespace repl } // namespace mongo |