summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2019-10-04 00:55:10 +0000
committerevergreen <evergreen@mongodb.com>2019-10-04 00:55:10 +0000
commitd1c6b36bf04dd90ad963ad5dfbfa491de0c88789 (patch)
treef6e48e5c9febe17fb708f13de3859b2493283274
parentaa2ccf6e1992b41ac1b286291e6217d91157f573 (diff)
downloadmongo-d1c6b36bf04dd90ad963ad5dfbfa491de0c88789.tar.gz
SERVER-42925 Refactor idempotency tests oplog application and fix oplog visiblity issues
-rw-r--r--src/mongo/db/repl/idempotency_test.cpp16
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp65
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.h2
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp7
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp62
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.h1
6 files changed, 51 insertions, 102 deletions
diff --git a/src/mongo/db/repl/idempotency_test.cpp b/src/mongo/db/repl/idempotency_test.cpp
index 96143015561..34fe562c2c8 100644
--- a/src/mongo/db/repl/idempotency_test.cpp
+++ b/src/mongo/db/repl/idempotency_test.cpp
@@ -60,7 +60,7 @@ protected:
std::string getStatesString(const std::vector<CollectionState>& state1,
const std::vector<CollectionState>& state2,
- const MultiApplier::OperationPtrs& opPtrs) override;
+ const std::vector<OplogEntry>& ops) override;
Status resetState() override;
@@ -113,28 +113,28 @@ std::vector<OplogEntry> RandomizedIdempotencyTest::createUpdateSequence(
std::string RandomizedIdempotencyTest::getStatesString(const std::vector<CollectionState>& state1,
const std::vector<CollectionState>& state2,
- const MultiApplier::OperationPtrs& opPtrs) {
- unittest::log() << IdempotencyTest::getStatesString(state1, state2, opPtrs);
+ const std::vector<OplogEntry>& ops) {
+ unittest::log() << IdempotencyTest::getStatesString(state1, state2, ops);
StringBuilder sb;
sb << "Ran update ops: ";
sb << "[ ";
bool firstIter = true;
- for (auto op : opPtrs) {
+ for (const auto& op : ops) {
if (!firstIter) {
sb << ", ";
} else {
firstIter = false;
}
- sb << op->toString();
+ sb << op.toString();
}
sb << " ]\n";
ASSERT_OK(resetState());
sb << "Start: " << getDoc() << "\n";
- for (auto op : opPtrs) {
- ASSERT_OK(runOpInitialSync(*op));
- sb << "Apply: " << op->getObject() << "\n ==> " << getDoc() << "\n";
+ for (const auto& op : ops) {
+ ASSERT_OK(runOpInitialSync(op));
+ sb << "Apply: " << op.getObject() << "\n ==> " << getDoc() << "\n";
}
sb << "Found from the seed: " << this->seed;
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 185e4f0dd4c..b06bb4e5f1b 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -98,20 +98,6 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::none); // post-image optime
}
-/**
- * Test only subclass of OplogApplierImpl for using fillWriterVectors in tests.
- */
-class OplogApplierImplForTest : public OplogApplierImpl {
-public:
- OplogApplierImplForTest(const OplogApplier::Options& options);
- using OplogApplierImpl::fillWriterVectors;
-};
-
-// Minimal constructor that takes options, the only member accessed in fillWriterVectors.
-OplogApplierImplForTest::OplogApplierImplForTest(const OplogApplier::Options& options)
- : OplogApplierImpl(
- nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, options, nullptr) {}
-
} // namespace
/**
@@ -388,59 +374,34 @@ Status IdempotencyTest::resetState() {
void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) {
ASSERT_OK(resetState());
- // Write oplog entries to oplog collection.
- for (auto&& entry : ops) {
- ASSERT_OK(getStorageInterface()->insertDocument(
- _opCtx.get(),
- NamespaceString::kRsOplogNamespace,
- {entry.toBSON(), entry.getOpTime().getTimestamp()},
- entry.getOpTime().getTerm()));
- }
- OplogApplier::Options option(OplogApplication::Mode::kInitialSync);
- OplogApplierImplForTest oplogApplier(option);
- std::vector<MultiApplier::OperationPtrs> writerVectors(1);
- std::vector<MultiApplier::Operations> derivedOps;
-
- // Keeps all operations in scope for the lifetime of this function.
- std::vector<MultiApplier::Operations> singleOpVectors;
- for (auto&& entry : ops) {
- // Derive ops for transactions if necessary.
- std::vector<OplogEntry> op;
- op.push_back(entry);
- singleOpVectors.emplace_back(op);
- oplogApplier.fillWriterVectors(
- _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps);
- }
+ ASSERT_OK(runOpsInitialSync(ops));
- const auto& opPtrs = writerVectors[0];
- ASSERT_OK(runOpPtrsInitialSync(opPtrs));
auto state1 = validateAllCollections();
auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size();
-
for (std::size_t i = 0; i < iterations; i++) {
// Since the end state after each iteration is expected to be the same as the start state,
// we don't drop and re-create the collections. Dropping and re-creating the collections
// won't work either because we don't have ways to wait until second-phase drop to
// completely finish.
- MultiApplier::OperationPtrs fullSequence;
+ std::vector<OplogEntry> fullSequence;
if (sequenceType == SequenceType::kEntireSequence) {
- ASSERT_OK(runOpPtrsInitialSync(opPtrs));
- fullSequence.insert(fullSequence.end(), opPtrs.begin(), opPtrs.end());
+ ASSERT_OK(runOpsInitialSync(ops));
+ fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
} else if (sequenceType == SequenceType::kAnyPrefix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
- MultiApplier::OperationPtrs prefix(opPtrs.begin(), opPtrs.begin() + i + 1);
- ASSERT_OK(runOpPtrsInitialSync(prefix));
+ std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1);
+ ASSERT_OK(runOpsInitialSync(prefix));
fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end());
}
- ASSERT_OK(runOpPtrsInitialSync(opPtrs));
- fullSequence.insert(fullSequence.end(), opPtrs.begin(), opPtrs.end());
+ ASSERT_OK(runOpsInitialSync(ops));
+ fullSequence.insert(fullSequence.end(), ops.begin(), ops.end());
if (sequenceType == SequenceType::kAnySuffix ||
sequenceType == SequenceType::kAnyPrefixOrSuffix) {
- MultiApplier::OperationPtrs suffix(opPtrs.begin() + i, opPtrs.end());
- ASSERT_OK(runOpPtrsInitialSync(suffix));
+ std::vector<OplogEntry> suffix(ops.begin() + i, ops.end());
+ ASSERT_OK(runOpsInitialSync(suffix));
fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end());
}
@@ -664,7 +625,7 @@ CollectionState IdempotencyTest::validate(const NamespaceString& nss) {
std::string IdempotencyTest::getStatesString(const std::vector<CollectionState>& state1,
const std::vector<CollectionState>& state2,
- const MultiApplier::OperationPtrs& opPtrs) {
+ const std::vector<OplogEntry>& ops) {
StringBuilder sb;
sb << "The states:\n";
for (const auto& s : state1) {
@@ -676,8 +637,8 @@ std::string IdempotencyTest::getStatesString(const std::vector<CollectionState>&
}
sb << "found after applying the operations a second time, therefore breaking idempotency.\n";
sb << "Applied ops:\n";
- for (auto op : opPtrs) {
- sb << op->toString() << "\n";
+ for (const auto& op : ops) {
+ sb << op.toString() << "\n";
}
return sb.str();
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h
index 9e6d14cb228..f3f990cb8da 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.h
+++ b/src/mongo/db/repl/idempotency_test_fixture.h
@@ -142,7 +142,7 @@ protected:
std::string computeDataHash(Collection* collection);
virtual std::string getStatesString(const std::vector<CollectionState>& state1,
const std::vector<CollectionState>& state2,
- const MultiApplier::OperationPtrs& opPtrs);
+ const std::vector<OplogEntry>& ops);
/**
* Validate data and indexes. Return the MD5 hash of the documents ordered by _id.
*/
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index e403254ae15..f085076bacd 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -2117,10 +2117,6 @@ TEST_F(IdempotencyTest, CreateCollectionWithCollation) {
CollectionUUID uuid = UUID::gen();
auto runOpsAndValidate = [this, uuid]() {
- auto insertOp1 = insert(fromjson("{ _id: 'foo' }"));
- auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }"));
- auto updateOp = update("foo", BSON("$set" << BSON("x" << 2)));
- auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll()));
auto options = BSON("collation"
<< BSON("locale"
<< "en"
@@ -2134,6 +2130,9 @@ TEST_F(IdempotencyTest, CreateCollectionWithCollation) {
<< "57.1")
<< "uuid" << uuid);
auto createColl = makeCreateCollectionOplogEntry(nextOpTime(), nss, options);
+ auto insertOp1 = insert(fromjson("{ _id: 'foo' }"));
+ auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }"));
+ auto updateOp = update("foo", BSON("$set" << BSON("x" << 2)));
// We don't drop and re-create the collection since we don't have ways
// to wait until second-phase drop to completely finish.
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
index a9d6821d6f0..dee065f19b6 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -227,51 +227,41 @@ Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) {
}
Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
+ NoopOplogApplierObserver observer;
+ auto storageInterface = getStorageInterface();
+ auto writerPool = makeReplWriterPool();
OplogApplierImpl oplogApplier(
nullptr, // executor
nullptr, // oplogBuffer
- nullptr, // observer
- nullptr, // replCoord
- getConsistencyMarkers(),
- getStorageInterface(),
- OplogApplierImpl::ApplyGroupFunc(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
- nullptr);
- // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD
- // operations provided by idempotency tests.
- for (auto& op : ops) {
- MultiApplier::OperationPtrs opsPtrs;
- opsPtrs.push_back(&op);
- WorkerMultikeyPathInfo pathInfo;
- auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo);
- if (!status.isOK()) {
- return status;
- }
- }
- return Status::OK();
-}
-
-Status OplogApplierImplTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) {
- OplogApplierImpl oplogApplier(
- nullptr, // executor
- nullptr, // oplogBuffer
- nullptr, // observer
- nullptr, // replCoord
+ &observer,
+ ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
- getStorageInterface(),
- OplogApplierImpl::ApplyGroupFunc(),
+ storageInterface,
+ applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
- nullptr);
+ writerPool.get());
+ // Idempotency tests apply the same batch of oplog entries multiple times in a loop, which would
+ // result in out-of-order oplog inserts. So we truncate the oplog collection first before
+ // calling multiApply.
+ ASSERT_OK(
+ storageInterface->truncateCollection(_opCtx.get(), NamespaceString::kRsOplogNamespace));
// Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD
- // operations provided by idempotency tests.
+ // operations provided by idempotency tests. Applying operations in a batch of one is also
+ // necessary to work around oplog visibility issues. For example, idempotency tests may contain
+ // a prepare and a commit that we don't apply both in the same batch in production oplog
+ // application because the commit needs to read the prepare entry. So we apply each operation in
+ // its own batch and update oplog visibility after each batch to make sure all previously
+ // applied entries are visible to subsequent batches.
for (auto& op : ops) {
- MultiApplier::OperationPtrs opsPtrs;
- opsPtrs.push_back(op);
- WorkerMultikeyPathInfo pathInfo;
- auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo);
+ auto status = oplogApplier.multiApply(_opCtx.get(), {op});
if (!status.isOK()) {
- return status;
+ return status.getStatus();
}
+ auto lastApplied = status.getValue();
+ const bool orderedCommit = true;
+ // Update oplog visibility by notifying the storage engine of the new oplog entries.
+ storageInterface->oplogDiskLocRegister(
+ _opCtx.get(), lastApplied.getTimestamp(), orderedCommit);
}
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
index 71967716330..76e060faabc 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
@@ -141,7 +141,6 @@ protected:
Status runOpsSteadyState(std::vector<OplogEntry> ops);
Status runOpInitialSync(const OplogEntry& entry);
Status runOpsInitialSync(std::vector<OplogEntry> ops);
- Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops);
UUID kUuid{UUID::gen()};
};