diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2019-10-04 00:55:10 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-04 00:55:10 +0000 |
commit | d1c6b36bf04dd90ad963ad5dfbfa491de0c88789 (patch) | |
tree | f6e48e5c9febe17fb708f13de3859b2493283274 /src/mongo/db/repl | |
parent | aa2ccf6e1992b41ac1b286291e6217d91157f573 (diff) | |
download | mongo-d1c6b36bf04dd90ad963ad5dfbfa491de0c88789.tar.gz |
SERVER-42925 Refactor idempotency tests oplog application and fix oplog visiblity issues
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/idempotency_test.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test_fixture.h | 1 |
6 files changed, 51 insertions, 102 deletions
diff --git a/src/mongo/db/repl/idempotency_test.cpp b/src/mongo/db/repl/idempotency_test.cpp index 96143015561..34fe562c2c8 100644 --- a/src/mongo/db/repl/idempotency_test.cpp +++ b/src/mongo/db/repl/idempotency_test.cpp @@ -60,7 +60,7 @@ protected: std::string getStatesString(const std::vector<CollectionState>& state1, const std::vector<CollectionState>& state2, - const MultiApplier::OperationPtrs& opPtrs) override; + const std::vector<OplogEntry>& ops) override; Status resetState() override; @@ -113,28 +113,28 @@ std::vector<OplogEntry> RandomizedIdempotencyTest::createUpdateSequence( std::string RandomizedIdempotencyTest::getStatesString(const std::vector<CollectionState>& state1, const std::vector<CollectionState>& state2, - const MultiApplier::OperationPtrs& opPtrs) { - unittest::log() << IdempotencyTest::getStatesString(state1, state2, opPtrs); + const std::vector<OplogEntry>& ops) { + unittest::log() << IdempotencyTest::getStatesString(state1, state2, ops); StringBuilder sb; sb << "Ran update ops: "; sb << "[ "; bool firstIter = true; - for (auto op : opPtrs) { + for (const auto& op : ops) { if (!firstIter) { sb << ", "; } else { firstIter = false; } - sb << op->toString(); + sb << op.toString(); } sb << " ]\n"; ASSERT_OK(resetState()); sb << "Start: " << getDoc() << "\n"; - for (auto op : opPtrs) { - ASSERT_OK(runOpInitialSync(*op)); - sb << "Apply: " << op->getObject() << "\n ==> " << getDoc() << "\n"; + for (const auto& op : ops) { + ASSERT_OK(runOpInitialSync(op)); + sb << "Apply: " << op.getObject() << "\n ==> " << getDoc() << "\n"; } sb << "Found from the seed: " << this->seed; diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 185e4f0dd4c..b06bb4e5f1b 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -98,20 +98,6 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::none); // post-image optime } -/** - * Test only subclass of OplogApplierImpl for using fillWriterVectors in tests. - */ -class OplogApplierImplForTest : public OplogApplierImpl { -public: - OplogApplierImplForTest(const OplogApplier::Options& options); - using OplogApplierImpl::fillWriterVectors; -}; - -// Minimal constructor that takes options, the only member accessed in fillWriterVectors. -OplogApplierImplForTest::OplogApplierImplForTest(const OplogApplier::Options& options) - : OplogApplierImpl( - nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, options, nullptr) {} - } // namespace /** @@ -388,59 +374,34 @@ Status IdempotencyTest::resetState() { void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, SequenceType sequenceType) { ASSERT_OK(resetState()); - // Write oplog entries to oplog collection. - for (auto&& entry : ops) { - ASSERT_OK(getStorageInterface()->insertDocument( - _opCtx.get(), - NamespaceString::kRsOplogNamespace, - {entry.toBSON(), entry.getOpTime().getTimestamp()}, - entry.getOpTime().getTerm())); - } - OplogApplier::Options option(OplogApplication::Mode::kInitialSync); - OplogApplierImplForTest oplogApplier(option); - std::vector<MultiApplier::OperationPtrs> writerVectors(1); - std::vector<MultiApplier::Operations> derivedOps; - - // Keeps all operations in scope for the lifetime of this function. - std::vector<MultiApplier::Operations> singleOpVectors; - for (auto&& entry : ops) { - // Derive ops for transactions if necessary. - std::vector<OplogEntry> op; - op.push_back(entry); - singleOpVectors.emplace_back(op); - oplogApplier.fillWriterVectors( - _opCtx.get(), &singleOpVectors.back(), &writerVectors, &derivedOps); - } + ASSERT_OK(runOpsInitialSync(ops)); - const auto& opPtrs = writerVectors[0]; - ASSERT_OK(runOpPtrsInitialSync(opPtrs)); auto state1 = validateAllCollections(); auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size(); - for (std::size_t i = 0; i < iterations; i++) { // Since the end state after each iteration is expected to be the same as the start state, // we don't drop and re-create the collections. Dropping and re-creating the collections // won't work either because we don't have ways to wait until second-phase drop to // completely finish. - MultiApplier::OperationPtrs fullSequence; + std::vector<OplogEntry> fullSequence; if (sequenceType == SequenceType::kEntireSequence) { - ASSERT_OK(runOpPtrsInitialSync(opPtrs)); - fullSequence.insert(fullSequence.end(), opPtrs.begin(), opPtrs.end()); + ASSERT_OK(runOpsInitialSync(ops)); + fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); } else if (sequenceType == SequenceType::kAnyPrefix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { - MultiApplier::OperationPtrs prefix(opPtrs.begin(), opPtrs.begin() + i + 1); - ASSERT_OK(runOpPtrsInitialSync(prefix)); + std::vector<OplogEntry> prefix(ops.begin(), ops.begin() + i + 1); + ASSERT_OK(runOpsInitialSync(prefix)); fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end()); } - ASSERT_OK(runOpPtrsInitialSync(opPtrs)); - fullSequence.insert(fullSequence.end(), opPtrs.begin(), opPtrs.end()); + ASSERT_OK(runOpsInitialSync(ops)); + fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); if (sequenceType == SequenceType::kAnySuffix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { - MultiApplier::OperationPtrs suffix(opPtrs.begin() + i, opPtrs.end()); - ASSERT_OK(runOpPtrsInitialSync(suffix)); + std::vector<OplogEntry> suffix(ops.begin() + i, ops.end()); + ASSERT_OK(runOpsInitialSync(suffix)); fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end()); } @@ -664,7 +625,7 @@ CollectionState IdempotencyTest::validate(const NamespaceString& nss) { std::string IdempotencyTest::getStatesString(const std::vector<CollectionState>& state1, const std::vector<CollectionState>& state2, - const MultiApplier::OperationPtrs& opPtrs) { + const std::vector<OplogEntry>& ops) { StringBuilder sb; sb << "The states:\n"; for (const auto& s : state1) { @@ -676,8 +637,8 @@ std::string IdempotencyTest::getStatesString(const std::vector<CollectionState>& } sb << "found after applying the operations a second time, therefore breaking idempotency.\n"; sb << "Applied ops:\n"; - for (auto op : opPtrs) { - sb << op->toString() << "\n"; + for (const auto& op : ops) { + sb << op.toString() << "\n"; } return sb.str(); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.h b/src/mongo/db/repl/idempotency_test_fixture.h index 9e6d14cb228..f3f990cb8da 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.h +++ b/src/mongo/db/repl/idempotency_test_fixture.h @@ -142,7 +142,7 @@ protected: std::string computeDataHash(Collection* collection); virtual std::string getStatesString(const std::vector<CollectionState>& state1, const std::vector<CollectionState>& state2, - const MultiApplier::OperationPtrs& opPtrs); + const std::vector<OplogEntry>& ops); /** * Validate data and indexes. Return the MD5 hash of the documents ordered by _id. */ diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index e403254ae15..f085076bacd 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -2117,10 +2117,6 @@ TEST_F(IdempotencyTest, CreateCollectionWithCollation) { CollectionUUID uuid = UUID::gen(); auto runOpsAndValidate = [this, uuid]() { - auto insertOp1 = insert(fromjson("{ _id: 'foo' }")); - auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }")); - auto updateOp = update("foo", BSON("$set" << BSON("x" << 2))); - auto dropColl = makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); auto options = BSON("collation" << BSON("locale" << "en" @@ -2134,6 +2130,9 @@ TEST_F(IdempotencyTest, CreateCollectionWithCollation) { << "57.1") << "uuid" << uuid); auto createColl = makeCreateCollectionOplogEntry(nextOpTime(), nss, options); + auto insertOp1 = insert(fromjson("{ _id: 'foo' }")); + auto insertOp2 = insert(fromjson("{ _id: 'Foo', x: 1 }")); + auto updateOp = update("foo", BSON("$set" << BSON("x" << 2))); // We don't drop and re-create the collection since we don't have ways // to wait until second-phase drop to completely finish. diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index a9d6821d6f0..dee065f19b6 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -227,51 +227,41 @@ Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) { } Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) { + NoopOplogApplierObserver observer; + auto storageInterface = getStorageInterface(); + auto writerPool = makeReplWriterPool(); OplogApplierImpl oplogApplier( nullptr, // executor nullptr, // oplogBuffer - nullptr, // observer - nullptr, // replCoord - getConsistencyMarkers(), - getStorageInterface(), - OplogApplierImpl::ApplyGroupFunc(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), - nullptr); - // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD - // operations provided by idempotency tests. - for (auto& op : ops) { - MultiApplier::OperationPtrs opsPtrs; - opsPtrs.push_back(&op); - WorkerMultikeyPathInfo pathInfo; - auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); - if (!status.isOK()) { - return status; - } - } - return Status::OK(); -} - -Status OplogApplierImplTest::runOpPtrsInitialSync(MultiApplier::OperationPtrs ops) { - OplogApplierImpl oplogApplier( - nullptr, // executor - nullptr, // oplogBuffer - nullptr, // observer - nullptr, // replCoord + &observer, + ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), - getStorageInterface(), - OplogApplierImpl::ApplyGroupFunc(), + storageInterface, + applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), - nullptr); + writerPool.get()); + // Idempotency tests apply the same batch of oplog entries multiple times in a loop, which would + // result in out-of-order oplog inserts. So we truncate the oplog collection first before + // calling multiApply. + ASSERT_OK( + storageInterface->truncateCollection(_opCtx.get(), NamespaceString::kRsOplogNamespace)); // Apply each operation in a batch of one because 'ops' may contain a mix of commands and CRUD - // operations provided by idempotency tests. + // operations provided by idempotency tests. Applying operations in a batch of one is also + // necessary to work around oplog visibility issues. For example, idempotency tests may contain + // a prepare and a commit that we don't apply both in the same batch in production oplog + // application because the commit needs to read the prepare entry. So we apply each operation in + // its own batch and update oplog visibility after each batch to make sure all previously + // applied entries are visible to subsequent batches. for (auto& op : ops) { - MultiApplier::OperationPtrs opsPtrs; - opsPtrs.push_back(op); - WorkerMultikeyPathInfo pathInfo; - auto status = applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); + auto status = oplogApplier.multiApply(_opCtx.get(), {op}); if (!status.isOK()) { - return status; + return status.getStatus(); } + auto lastApplied = status.getValue(); + const bool orderedCommit = true; + // Update oplog visibility by notifying the storage engine of the new oplog entries. + storageInterface->oplogDiskLocRegister( + _opCtx.get(), lastApplied.getTimestamp(), orderedCommit); } return Status::OK(); } diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h index 71967716330..76e060faabc 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h @@ -141,7 +141,6 @@ protected: Status runOpsSteadyState(std::vector<OplogEntry> ops); Status runOpInitialSync(const OplogEntry& entry); Status runOpsInitialSync(std::vector<OplogEntry> ops); - Status runOpPtrsInitialSync(MultiApplier::OperationPtrs ops); UUID kUuid{UUID::gen()}; }; |