diff options
Diffstat (limited to 'src/mongo/db/repl/idempotency_test_fixture.cpp')
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 146 |
1 files changed, 132 insertions, 14 deletions
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 984c9f84577..7b48bbb7e3c 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -40,6 +40,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -75,7 +76,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<Date_t> wallClockTime = boost::none, boost::optional<StmtId> stmtId = boost::none, boost::optional<UUID> uuid = boost::none, - boost::optional<OpTime> prevOpTime = boost::none) { + boost::optional<OpTime> prevOpTime = boost::none, + const boost::optional<bool> prepare = boost::none) { return repl::OplogEntry(opTime, // optime boost::none, // hash opType, // opType @@ -89,10 +91,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none, // upsert wallClockTime, // wall clock time stmtId, // statement id - prevOpTime, // optime of previous write within same transaction - boost::none, // pre-image optime - boost::none, // post-image optime - boost::none); // prepare + prevOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + prepare); // prepare } } // namespace @@ -365,8 +367,30 @@ Status IdempotencyTest::resetState() { void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) { ASSERT_OK(resetState()); - ASSERT_OK(runOpsInitialSync(ops)); - auto state1 = validate(); + + // Write oplog entries to oplog collection. + for (auto&& entry : ops) { + ASSERT_OK(getStorageInterface()->insertDocument( + _opCtx.get(), + NamespaceString::kRsOplogNamespace, + {entry.toBSON(), entry.getOpTime().getTimestamp()}, + entry.getOpTime().getTerm())); + } + + SyncTail syncTail(nullptr, // observer + nullptr, // consistency markers + nullptr, // storage interface + SyncTail::MultiSyncApplyFunc(), + nullptr, // writer pool + SyncTailTest::makeInitialSyncOptions()); + std::vector<MultiApplier::OperationPtrs> writerVectors(1); + std::vector<MultiApplier::Operations> derivedOps; + // Derive ops for transactions if necessary. + syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps); + + const auto& opPtrs = writerVectors[0]; + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); + auto state1 = validateAllCollections(); auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size(); for (std::size_t i = 0; i < iterations; i++) { @@ -374,28 +398,28 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence std::vector<OplogEntry> fullSequence; if (sequenceType == SequenceType::kEntireSequence) { - ASSERT_OK(runOpsInitialSync(ops)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); } else if (sequenceType == SequenceType::kAnyPrefix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1); - ASSERT_OK(runOpsInitialSync(prefix)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end()); } - ASSERT_OK(runOpsInitialSync(ops)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); if (sequenceType == SequenceType::kAnySuffix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector<OplogEntry> suffix(ops.begin() + i, ops.end()); - ASSERT_OK(runOpsInitialSync(suffix)); + ASSERT_OK(runOpPtrsInitialSync(opPtrs)); fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end()); } - auto state2 = validate(); + auto state2 = validateAllCollections(); if (state1 != state2) { - FAIL(getStateString(state1, state2, fullSequence)); + FAIL(getStateVectorString(state1, state2, fullSequence)); } } } @@ -434,6 +458,59 @@ OplogEntry IdempotencyTest::dropIndex(const std::string& indexName, const UUID& return makeCommandOplogEntry(nextOpTime(), nss, cmd, uuid); } +OplogEntry IdempotencyTest::prepare(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops) { + OperationSessionInfo info; + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + return makeOplogEntry(nextOpTime(), + OpTypeEnum::kCommand, + nss.getCommandNS(), + BSON("applyOps" << ops), + boost::none /* o2 */, + info /* sessionInfo */, + Date_t::min() /* wallClockTime -- required but not checked */, + stmtId, + boost::none /* uuid */, + OpTime(), + true); +} + +OplogEntry IdempotencyTest::commitUnprepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + const BSONArray& ops) { + OperationSessionInfo info; + info.setSessionId(lsid); + info.setTxnNumber(txnNum); + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, stmtId, OpTime()); +} + +OplogEntry IdempotencyTest::commitPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime) { + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), + nss, + BSON("commitTransaction" << 1 << "commitTimestamp" << prepareOpTime.getTimestamp()), + lsid, + txnNum, + stmtId, + prepareOpTime); +} + +OplogEntry IdempotencyTest::abortPrepared(LogicalSessionId lsid, + TxnNumber txnNum, + StmtId stmtId, + OpTime prepareOpTime) { + return makeCommandOplogEntryWithSessionInfoAndStmtId( + nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, stmtId, prepareOpTime); +} + std::string IdempotencyTest::computeDataHash(Collection* collection) { auto desc = collection->getIndexCatalog()->findIdIndex(_opCtx.get()); ASSERT_TRUE(desc); @@ -462,7 +539,22 @@ std::string IdempotencyTest::computeDataHash(Collection* collection) { return digestToString(d); } -CollectionState IdempotencyTest::validate() { +std::vector<CollectionState> IdempotencyTest::validateAllCollections() { + std::vector<CollectionState> collStates; + auto& uuidCatalog = UUIDCatalog::get(_opCtx.get()); + auto dbs = uuidCatalog.getAllDbNames(); + for (auto& db : dbs) { + // Skip local database. + if (db != "local") { + for (const auto& nss : uuidCatalog.getAllCollectionNamesFromDb(db)) { + collStates.push_back(validate(nss)); + } + } + } + return collStates; +} + +CollectionState IdempotencyTest::validate(const NamespaceString& nss) { auto collUUID = [&]() -> OptionalCollectionUUID { AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); if (auto collection = autoColl.getCollection()) { @@ -520,8 +612,34 @@ std::string IdempotencyTest::getStateString(const CollectionState& state1, return sb.str(); } +std::string IdempotencyTest::getStateVectorString(std::vector<CollectionState>& state1, + std::vector<CollectionState>& state2, + const std::vector<OplogEntry>& ops) { + StringBuilder sb; + sb << "The states: "; + for (const auto& s : state1) { + sb << s << " "; + } + sb << "do not match with the states: "; + for (const auto& s : state2) { + sb << s << " "; + } + sb << " found after applying the operations a second time, therefore breaking idempotency."; + return sb.str(); +} + template OplogEntry IdempotencyTest::update<int>(int _id, const BSONObj& obj); template OplogEntry IdempotencyTest::update<const char*>(char const* _id, const BSONObj& obj); +BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc) { + return BSON("op" + << "i" + << "ns" + << nss.toString() + << "ui" + << uuid + << "o" + << doc); +} } // namespace repl } // namespace mongo |