summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/idempotency_test_fixture.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/idempotency_test_fixture.cpp')
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp146
1 files changed, 132 insertions, 14 deletions
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 984c9f84577..7b48bbb7e3c 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -75,7 +76,8 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::optional<Date_t> wallClockTime = boost::none,
boost::optional<StmtId> stmtId = boost::none,
boost::optional<UUID> uuid = boost::none,
- boost::optional<OpTime> prevOpTime = boost::none) {
+ boost::optional<OpTime> prevOpTime = boost::none,
+ const boost::optional<bool> prepare = boost::none) {
return repl::OplogEntry(opTime, // optime
boost::none, // hash
opType, // opType
@@ -89,10 +91,10 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
- prevOpTime, // optime of previous write within same transaction
- boost::none, // pre-image optime
- boost::none, // post-image optime
- boost::none); // prepare
+ prevOpTime, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ prepare); // prepare
}
} // namespace
@@ -365,8 +367,30 @@ Status IdempotencyTest::resetState() {
void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) {
ASSERT_OK(resetState());
- ASSERT_OK(runOpsInitialSync(ops));
- auto state1 = validate();
+
+ // Write oplog entries to oplog collection.
+ for (auto&& entry : ops) {
+ ASSERT_OK(getStorageInterface()->insertDocument(
+ _opCtx.get(),
+ NamespaceString::kRsOplogNamespace,
+ {entry.toBSON(), entry.getOpTime().getTimestamp()},
+ entry.getOpTime().getTerm()));
+ }
+
+ SyncTail syncTail(nullptr, // observer
+ nullptr, // consistency markers
+ nullptr, // storage interface
+ SyncTail::MultiSyncApplyFunc(),
+ nullptr, // writer pool
+ SyncTailTest::makeInitialSyncOptions());
+ std::vector<MultiApplier::OperationPtrs> writerVectors(1);
+ std::vector<MultiApplier::Operations> derivedOps;
+ // Derive ops for transactions if necessary.
+ syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+
+ const auto& opPtrs = writerVectors[0];
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
+ auto state1 = validateAllCollections();
auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size();
for (std::size_t i = 0; i < iterations; i++) {
@@ -374,28 +398,28 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
std::vector<OplogEntry> fullSequence;
if (sequenceType == SequenceType::kEntireSequence) {
- ASSERT_OK(runOpsInitialSync(ops));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
} else if (sequenceType == SequenceType::kAnyPrefix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1);
- ASSERT_OK(runOpsInitialSync(prefix));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end());
}
- ASSERT_OK(runOpsInitialSync(ops));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
if (sequenceType == SequenceType::kAnySuffix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
std::vector<OplogEntry> suffix(ops.begin() + i, ops.end());
- ASSERT_OK(runOpsInitialSync(suffix));
+ ASSERT_OK(runOpPtrsInitialSync(opPtrs));
fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end());
}
- auto state2 = validate();
+ auto state2 = validateAllCollections();
if (state1 != state2) {
- FAIL(getStateString(state1, state2, fullSequence));
+ FAIL(getStateVectorString(state1, state2, fullSequence));
}
}
}
@@ -434,6 +458,59 @@ OplogEntry IdempotencyTest::dropIndex(const std::string& indexName, const UUID&
return makeCommandOplogEntry(nextOpTime(), nss, cmd, uuid);
}
+OplogEntry IdempotencyTest::prepare(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops) {
+ OperationSessionInfo info;
+ info.setSessionId(lsid);
+ info.setTxnNumber(txnNum);
+ return makeOplogEntry(nextOpTime(),
+ OpTypeEnum::kCommand,
+ nss.getCommandNS(),
+ BSON("applyOps" << ops),
+ boost::none /* o2 */,
+ info /* sessionInfo */,
+ Date_t::min() /* wallClockTime -- required but not checked */,
+ stmtId,
+ boost::none /* uuid */,
+ OpTime(),
+ true);
+}
+
+OplogEntry IdempotencyTest::commitUnprepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ const BSONArray& ops) {
+ OperationSessionInfo info;
+ info.setSessionId(lsid);
+ info.setTxnNumber(txnNum);
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, stmtId, OpTime());
+}
+
+OplogEntry IdempotencyTest::commitPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime) {
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(),
+ nss,
+ BSON("commitTransaction" << 1 << "commitTimestamp" << prepareOpTime.getTimestamp()),
+ lsid,
+ txnNum,
+ stmtId,
+ prepareOpTime);
+}
+
+OplogEntry IdempotencyTest::abortPrepared(LogicalSessionId lsid,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ OpTime prepareOpTime) {
+ return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, stmtId, prepareOpTime);
+}
+
std::string IdempotencyTest::computeDataHash(Collection* collection) {
auto desc = collection->getIndexCatalog()->findIdIndex(_opCtx.get());
ASSERT_TRUE(desc);
@@ -462,7 +539,22 @@ std::string IdempotencyTest::computeDataHash(Collection* collection) {
return digestToString(d);
}
-CollectionState IdempotencyTest::validate() {
+std::vector<CollectionState> IdempotencyTest::validateAllCollections() {
+ std::vector<CollectionState> collStates;
+ auto& uuidCatalog = UUIDCatalog::get(_opCtx.get());
+ auto dbs = uuidCatalog.getAllDbNames();
+ for (auto& db : dbs) {
+ // Skip local database.
+ if (db != "local") {
+ for (const auto& nss : uuidCatalog.getAllCollectionNamesFromDb(db)) {
+ collStates.push_back(validate(nss));
+ }
+ }
+ }
+ return collStates;
+}
+
+CollectionState IdempotencyTest::validate(const NamespaceString& nss) {
auto collUUID = [&]() -> OptionalCollectionUUID {
AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss);
if (auto collection = autoColl.getCollection()) {
@@ -520,8 +612,34 @@ std::string IdempotencyTest::getStateString(const CollectionState& state1,
return sb.str();
}
+std::string IdempotencyTest::getStateVectorString(std::vector<CollectionState>& state1,
+ std::vector<CollectionState>& state2,
+ const std::vector<OplogEntry>& ops) {
+ StringBuilder sb;
+ sb << "The states: ";
+ for (const auto& s : state1) {
+ sb << s << " ";
+ }
+ sb << "do not match with the states: ";
+ for (const auto& s : state2) {
+ sb << s << " ";
+ }
+ sb << " found after applying the operations a second time, therefore breaking idempotency.";
+ return sb.str();
+}
+
template OplogEntry IdempotencyTest::update<int>(int _id, const BSONObj& obj);
template OplogEntry IdempotencyTest::update<const char*>(char const* _id, const BSONObj& obj);
+BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc) {
+ return BSON("op"
+ << "i"
+ << "ns"
+ << nss.toString()
+ << "ui"
+ << uuid
+ << "o"
+ << doc);
+}
} // namespace repl
} // namespace mongo