diff options
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 200 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 154 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test_fixture.h | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 1 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 110 |
9 files changed, 252 insertions, 287 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 357f5959aa2..33ea1ea0d16 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -143,7 +143,6 @@ std::unique_ptr<OplogApplier> DataReplicatorExternalStateImpl::makeOplogApplier( _replicationCoordinator, consistencyMarkers, storageInterface, - applyOplogGroup, options, writerPool); } diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 899bae5683d..97b9d37797f 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -235,6 +235,13 @@ void addDerivedOps(OperationContext* opCtx, } } +void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) { + auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) { + return l->getNss() < r->getNss(); + }; + std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator); +} + } // namespace @@ -351,7 +358,6 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - ApplyGroupFunc func, const OplogApplier::Options& options, ThreadPool* writerPool) : OplogApplier(executor, oplogBuffer, observer, options), @@ -359,7 +365,6 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, _writerPool(writerPool), _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers), - _applyFunc(func), _beginApplyingOpTime(options.beginApplyingOpTime) {} void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) { @@ -636,9 +641,8 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, // so it is safe to exclude any writes from Flow Control. opCtx->setShouldParticipateInFlowControl(false); - status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { - return _applyFunc(opCtx.get(), &writer, this, &multikeyVector); - }); + status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown( + [&] { return applyOplogGroup(opCtx.get(), &writer, &multikeyVector); }); }); } @@ -832,6 +836,90 @@ void OplogApplierImpl::fillWriterVectors( } } +Status OplogApplierImpl::applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + + UnreplicatedWritesBlock uwb(opCtx); + DisableDocumentValidation validationDisabler(opCtx); + // Since we swap the locker in stash / unstash transaction resources, + // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been + // destroyed by unstash in its destructor. Thus we set the flag explicitly. + opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + + // Explicitly start future read transactions without a timestamp. + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + + // When querying indexes, we return the record matching the key if it exists, or an adjacent + // document. This means that it is possible for us to hit a prepare conflict if we query for an + // incomplete key and an adjacent key is prepared. + // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that + // did not occur on the primary. + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflictsAllowWrites); + + stableSortByNamespace(ops); + + const auto oplogApplicationMode = getOptions().mode; + + InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); + + { // Ensure that the MultikeyPathTracker stops tracking paths. + ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); + MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); + + for (auto it = ops->cbegin(); it != ops->cend(); ++it) { + const OplogEntry& entry = **it; + + // If we are successful in grouping and applying inserts, advance the current iterator + // past the end of the inserted group of entries. + auto groupResult = insertGroup.groupAndApplyInserts(it); + if (groupResult.isOK()) { + it = groupResult.getValue(); + continue; + } + + // If we didn't create a group, try to apply the op individually. + try { + const Status status = applyOplogEntryBatch(opCtx, &entry, oplogApplicationMode); + + if (!status.isOK()) { + // Tried to apply an update operation but the document is missing, there must be + // a delete operation for the document later in the oplog. + if (status == ErrorCodes::UpdateOperationFailed && + oplogApplicationMode == OplogApplication::Mode::kInitialSync) { + continue; + } + + severe() << "Error applying operation (" << redact(entry.toBSON()) + << "): " << causedBy(redact(status)); + return status; + } + } catch (const DBException& e) { + // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be + // dropped before initial sync or recovery ends anyways and we should ignore it. + if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() && + getOptions().allowNamespaceNotFoundErrorsOnCrudOps) { + continue; + } + + severe() << "writer worker caught exception: " << redact(e) + << " on: " << redact(entry.toBSON()); + return e.toStatus(); + } + } + } + + invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); + invariant(workerMultikeyPathInfo->empty()); + auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); + if (!newPaths.empty()) { + workerMultikeyPathInfo->swap(newPaths); + } + + return Status::OK(); +} + Status applyOplogEntryBatch(OperationContext* opCtx, const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode) { @@ -924,108 +1012,6 @@ Status applyOplogEntryBatch(OperationContext* opCtx, MONGO_UNREACHABLE; } -void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) { - if (oplogEntryPointers->size() < 1U) { - return; - } - auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) { - return l->getNss() < r->getNss(); - }; - std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator); -} - -/** - * This free function is used by the thread pool workers to write ops to the db. - * This consumes the passed in OperationPtrs and callers should not make any assumptions about the - * state of the container after calling. However, this function cannot modify the pointed-to - * operations because the OperationPtrs container contains const pointers. - */ -Status applyOplogGroup(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - OplogApplierImpl* oai, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { - invariant(oai); - - UnreplicatedWritesBlock uwb(opCtx); - DisableDocumentValidation validationDisabler(opCtx); - // Since we swap the locker in stash / unstash transaction resources, - // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been - // destroyed by unstash in its destructor. Thus we set the flag explicitly. - opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); - - // Explicitly start future read transactions without a timestamp. - opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); - - // When querying indexes, we return the record matching the key if it exists, or an adjacent - // document. This means that it is possible for us to hit a prepare conflict if we query for an - // incomplete key and an adjacent key is prepared. - // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that - // did not occur on the primary. - opCtx->recoveryUnit()->setPrepareConflictBehavior( - PrepareConflictBehavior::kIgnoreConflictsAllowWrites); - - stableSortByNamespace(ops); - - const auto oplogApplicationMode = oai->getOptions().mode; - - InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); - - { // Ensure that the MultikeyPathTracker stops tracking paths. - ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); }); - MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo(); - - for (auto it = ops->cbegin(); it != ops->cend(); ++it) { - const OplogEntry& entry = **it; - - // If we are successful in grouping and applying inserts, advance the current iterator - // past the end of the inserted group of entries. - auto groupResult = insertGroup.groupAndApplyInserts(it); - if (groupResult.isOK()) { - it = groupResult.getValue(); - continue; - } - - // If we didn't create a group, try to apply the op individually. - try { - const Status status = applyOplogEntryBatch(opCtx, &entry, oplogApplicationMode); - - if (!status.isOK()) { - // Tried to apply an update operation but the document is missing, there must be - // a delete operation for the document later in the oplog. - if (status == ErrorCodes::UpdateOperationFailed && - oplogApplicationMode == OplogApplication::Mode::kInitialSync) { - continue; - } - - severe() << "Error applying operation (" << redact(entry.toBSON()) - << "): " << causedBy(redact(status)); - return status; - } - } catch (const DBException& e) { - // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be - // dropped before initial sync or recovery ends anyways and we should ignore it. - if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() && - oai->getOptions().allowNamespaceNotFoundErrorsOnCrudOps) { - continue; - } - - severe() << "writer worker caught exception: " << redact(e) - << " on: " << redact(entry.toBSON()); - return e.toStatus(); - } - } - } - - invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo()); - invariant(workerMultikeyPathInfo->empty()); - auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo(); - if (!newPaths.empty()) { - workerMultikeyPathInfo->swap(newPaths); - } - - return Status::OK(); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index 4d67f022d4d..02790b35103 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -58,10 +58,6 @@ class OplogApplierImpl : public OplogApplier { OplogApplierImpl& operator=(const OplogApplierImpl&) = delete; public: - using ApplyGroupFunc = std::function<Status(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - OplogApplierImpl* oai, - WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; /** * Constructs this OplogApplier with specific options. * During steady state replication, _run() obtains batches of operations to apply @@ -75,7 +71,6 @@ public: ReplicationCoordinator* replCoord, ReplicationConsistencyMarkers* consistencyMarkers, StorageInterface* storageInterface, - ApplyGroupFunc func, const Options& options, ThreadPool* writerPool); @@ -120,9 +115,6 @@ private: ReplicationConsistencyMarkers* const _consistencyMarkers; - // Function to use during _multiApply - ApplyGroupFunc _applyFunc; - // Used to determine which operations should be applied during initial sync. If this is null, // we will apply all operations that were fetched. OpTime _beginApplyingOpTime = OpTime(); @@ -133,6 +125,19 @@ protected: MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps) noexcept; + + /** + * This function is used by the thread pool workers to write ops to the db. + * This consumes the passed in OperationPtrs and callers should not make any assumptions about + * the state of the container after calling. However, this function cannot modify the pointed-to + * operations because the OperationPtrs container contains const pointers. + * + * This function has been marked as virtual to allow certain unit tests to skip oplog + * application. + */ + virtual Status applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo); }; /** @@ -142,15 +147,5 @@ Status applyOplogEntryBatch(OperationContext* opCtx, const OplogEntryBatch& batch, OplogApplication::Mode oplogApplicationMode); -/** - * This free function is used by the thread pool workers to write ops to the db. - * This consumes the passed in OperationPtrs and callers should not make any assumptions about the - * state of the container after calling. However, this function cannot modify the pointed-to - * operations because the OperationPtrs container contains const pointers. - */ -Status applyOplogGroup(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - OplogApplierImpl* oai, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index f085076bacd..cf171d1d0b8 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -99,27 +99,6 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollec } /** - * Testing-only OplogApplierImpl - */ - -class OplogApplierImplForTest : public OplogApplierImpl { -public: - OplogApplierImplForTest(); -}; - -// Minimal constructor that takes options, the only member accessed in fillWriterVectors. -OplogApplierImplForTest::OplogApplierImplForTest() - : OplogApplierImpl(nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), - nullptr) {} - -/** * Creates collection options suitable for oplog. */ CollectionOptions createOplogCollectionOptions() { @@ -308,17 +287,38 @@ TEST_F(OplogApplierImplTest, applyOplogEntryBatchCommand) { ASSERT_TRUE(applyCmdCalled); } +/** + * Test only subclass of OplogApplierImpl that does not apply oplog entries, but tracks ops. + */ +class TrackOpsAppliedApplier : public OplogApplierImpl { +public: + using OplogApplierImpl::OplogApplierImpl; + + Status applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; + MultiApplier::Operations operationsApplied; +}; + +Status TrackOpsAppliedApplier::applyOplogGroup(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + for (auto&& opPtr : *ops) { + operationsApplied.push_back(*opPtr); + } + return Status::OK(); +} + DEATH_TEST_F(OplogApplierImplTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") { auto writerPool = makeReplWriterPool(); NoopOplogApplierObserver observer; - OplogApplierImpl oplogApplier( + TrackOpsAppliedApplier oplogApplier( nullptr, // executor nullptr, // oplogBuffer &observer, ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - noopApplyOperationFn, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); oplogApplier.multiApply(_opCtx.get(), {}).getStatus().ignore(); @@ -331,37 +331,26 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { auto writerPool = makeReplWriterPool(); - MultiApplier::Operations operationsApplied; - auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, - MultiApplier::OperationPtrs* operationsToApply, - OplogApplierImpl* oai, - WorkerMultikeyPathInfo*) -> Status { - for (auto&& opPtr : *operationsToApply) { - operationsApplied.push_back(*opPtr); - } - return Status::OK(); - }; createCollection(opCtx, nss, options); auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); ASSERT_FALSE(op.isForCappedCollection); NoopOplogApplierObserver observer; - OplogApplierImpl oplogApplier( + TrackOpsAppliedApplier oplogApplier( nullptr, // executor nullptr, // oplogBuffer &observer, replCoord, consistencyMarkers, storageInterface, - applyOperationFn, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(opCtx, {op})); ASSERT_EQUALS(op.getOpTime(), lastOpTime); - ASSERT_EQUALS(1U, operationsApplied.size()); - const auto& opApplied = operationsApplied.front(); + ASSERT_EQUALS(1U, oplogApplier.operationsApplied.size()); + const auto& opApplied = oplogApplier.operationsApplied.front(); ASSERT_EQUALS(op, opApplied); // "isForCappedCollection" is not parsed from raw oplog entry document. return opApplied.isForCappedCollection; @@ -397,16 +386,9 @@ TEST_F(OplogApplierImplTest, ApplyGroupUsesApplyOplogEntryBatchToApplyOperation) MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - OplogApplierImpl oplogApplier(nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary), - nullptr); - ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo)); // Collection should be created after applyOplogEntryBatch() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } @@ -523,7 +505,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -586,7 +567,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionAllAt ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -644,7 +624,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBa ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -765,7 +744,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -877,7 +855,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -936,7 +913,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -986,7 +962,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), _writerPool.get()); // Apply a batch with the insert operations. This should result in the oplog entries @@ -1055,7 +1030,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -1109,7 +1083,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); @@ -1150,7 +1123,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -1200,7 +1172,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), _writerPool.get()); @@ -1239,7 +1210,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), _writerPool.get()); @@ -1294,7 +1264,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering), _writerPool.get()); @@ -1330,19 +1299,11 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { - - OplogApplierImpl oplogApplier(nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary), - nullptr); + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; - ASSERT_OK(applyOplogGroup(opCtx, &ops, &oplogApplier, &pathInfo)); + ASSERT_OK(oplogApplier.applyOplogGroup(opCtx, &ops, &pathInfo)); ASSERT_EQ(pathInfo.size(), numPaths); } @@ -1396,18 +1357,11 @@ TEST_F(OplogApplierImplTest, ApplyGroupAddsMultipleWorkerMultikeyPathInfo) { auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); - OplogApplierImpl oplogApplier(nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary), - nullptr); + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; - ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo)); ASSERT_EQ(pathInfo.size(), 2UL); } } @@ -1448,18 +1402,11 @@ TEST_F(OplogApplierImplTest, ApplyGroupFailsWhenCollectionCreationTriesToMakeUUI auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - OplogApplierImpl oplogApplier(nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - nullptr, - OplogApplier::Options(OplogApplication::Mode::kSecondary), - nullptr); + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs ops = {&op}; ASSERT_EQUALS(ErrorCodes::InvalidOptions, - applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, nullptr)); + oplogApplier.applyOplogGroup(_opCtx.get(), &ops, nullptr)); } TEST_F(OplogApplierImplTest, ApplyGroupDisablesDocumentValidationWhileApplyingOperations) { @@ -1821,7 +1768,8 @@ TEST_F(OplogApplierImplTest, } TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { - OplogApplierImplForTest oplogApplier; + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync)); NamespaceString nss("test.t"); { Lock::GlobalWrite globalLock(_opCtx.get()); @@ -1835,7 +1783,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo)); // Since the document was missing when we cloned data from the sync source, the collection // referenced by the failed operation should not be automatically created. @@ -1844,7 +1792,8 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; - OplogApplierImplForTest oplogApplier; + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync)); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); @@ -1856,7 +1805,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringIni auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo)); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); @@ -1866,7 +1815,8 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringIni TEST_F(OplogApplierImplTest, ApplyGroupSkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; - OplogApplierImplForTest oplogApplier; + TestApplyOplogGroupApplier oplogApplier( + nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync)); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); @@ -1880,7 +1830,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsIndexCreationOnNamespaceNotFoundDuri auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo)); + ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo)); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); @@ -2478,7 +2428,6 @@ TEST_F(OplogApplierImplTxnTableTest, SimpleWriteWithTxn) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2519,7 +2468,6 @@ TEST_F(OplogApplierImplTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2574,7 +2522,6 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDelet ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2616,7 +2563,6 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdat ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2681,7 +2627,6 @@ TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnS ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2751,7 +2696,6 @@ TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnS ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2824,7 +2768,6 @@ TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2908,7 +2851,6 @@ TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnT ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2941,7 +2883,6 @@ TEST_F(OplogApplierImplTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); @@ -2976,7 +2917,6 @@ TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTabl ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), getStorageInterface(), - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index dee065f19b6..bf21ae4ad84 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -204,22 +204,16 @@ Status OplogApplierImplTest::runOpSteadyState(const OplogEntry& op) { } Status OplogApplierImplTest::runOpsSteadyState(std::vector<OplogEntry> ops) { - OplogApplierImpl oplogApplier( - nullptr, // executor - nullptr, // oplogBuffer - nullptr, // observer - nullptr, // replCoord + TestApplyOplogGroupApplier oplogApplier( getConsistencyMarkers(), getStorageInterface(), - OplogApplierImpl::ApplyGroupFunc(), - repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), - nullptr); + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs opsPtrs; for (auto& op : ops) { opsPtrs.push_back(&op); } WorkerMultikeyPathInfo pathInfo; - return applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo); + return oplogApplier.applyOplogGroup(_opCtx.get(), &opsPtrs, &pathInfo); } Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) { @@ -237,7 +231,6 @@ Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) { ReplicationCoordinator::get(_opCtx.get()), getConsistencyMarkers(), storageInterface, - applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), writerPool.get()); // Idempotency tests apply the same batch of oplog entries multiple times in a loop, which would diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h index 76e060faabc..2d632351b62 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h @@ -45,6 +45,26 @@ class BSONObj; class OperationContext; namespace repl { + +/** + * Test only subclass of OplogApplierImpl that makes applyOplogGroup a public method. + */ +class TestApplyOplogGroupApplier : public OplogApplierImpl { +public: + TestApplyOplogGroupApplier(ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, + const OplogApplier::Options& options) + : OplogApplierImpl(nullptr, + nullptr, + nullptr, + nullptr, + consistencyMarkers, + storageInterface, + options, + nullptr) {} + using OplogApplierImpl::applyOplogGroup; +}; + /** * OpObserver for OplogApplierImpl test fixture. */ @@ -116,14 +136,6 @@ protected: ServiceContext* serviceContext; OplogApplierImplOpObserver* _opObserver = nullptr; - // Implements the OplogApplierImpl::ApplyGroupFn interface and does nothing. - static Status noopApplyOperationFn(OperationContext*, - MultiApplier::OperationPtrs*, - OplogApplierImpl* oai, - WorkerMultikeyPathInfo*) { - return Status::OK(); - } - OpTime nextOpTime() { static long long lastSecond = 1; return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1c7603ca075..5be36ed3881 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -223,7 +223,6 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( replCoord, _replicationProcess->getConsistencyMarkers(), _storageInterface, - applyOplogGroup, OplogApplier::Options(OplogApplication::Mode::kSecondary), _writerPool.get()); diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index a38c75a3c6d..95f05e9d52c 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -363,7 +363,6 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, ReplicationCoordinator::get(opCtx), _consistencyMarkers, _storageInterface, - applyOplogGroup, OplogApplier::Options(OplogApplication::Mode::kRecovering), writerPool.get()); diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 7981f2abdf5..d0923437470 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -59,6 +59,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_applier_impl.h" +#include "mongo/db/repl/oplog_applier_impl_test_fixture.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" @@ -1312,7 +1313,6 @@ public: _coordinatorMock, _consistencyMarkers, storageInterface, - repl::applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); @@ -1397,7 +1397,6 @@ public: _coordinatorMock, _consistencyMarkers, storageInterface, - repl::applyOplogGroup, repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync), writerPool.get()); auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); @@ -2460,6 +2459,76 @@ public: } }; +/** + * Test specific OplogApplierImpl subclass that allows for custom applyOplogGroup to be run during + * multiApply. + */ +class SecondaryReadsDuringBatchApplicationAreAllowedApplier : public repl::OplogApplierImpl { +public: + SecondaryReadsDuringBatchApplicationAreAllowedApplier( + executor::TaskExecutor* executor, + repl::OplogBuffer* oplogBuffer, + Observer* observer, + repl::ReplicationCoordinator* replCoord, + repl::ReplicationConsistencyMarkers* consistencyMarkers, + repl::StorageInterface* storageInterface, + const OplogApplier::Options& options, + ThreadPool* writerPool, + OperationContext* opCtx, + Promise<bool>* promise, + stdx::future<bool>* taskFuture) + : repl::OplogApplierImpl(executor, + oplogBuffer, + observer, + replCoord, + consistencyMarkers, + storageInterface, + options, + writerPool), + _testOpCtx(opCtx), + _promise(promise), + _taskFuture(taskFuture) {} + + Status applyOplogGroup(OperationContext* opCtx, + repl::MultiApplier::OperationPtrs* operationsToApply, + WorkerMultikeyPathInfo* pathInfo) override; + +private: + // Pointer to the test's op context. This is distinct from the op context used in + // applyOplogGroup. + OperationContext* _testOpCtx; + Promise<bool>* _promise; + stdx::future<bool>* _taskFuture; +}; + + +// This apply operation function will block until the reader has tried acquiring a collection lock. +// This returns BadValue statuses instead of asserting so that the worker threads can cleanly exit +// and this test case fails without crashing the entire suite. +Status SecondaryReadsDuringBatchApplicationAreAllowedApplier::applyOplogGroup( + OperationContext* opCtx, + repl::MultiApplier::OperationPtrs* operationsToApply, + WorkerMultikeyPathInfo* pathInfo) { + if (!_testOpCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) { + return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"}; + } + + // Insert the document. A reader without a PBWM lock should not see it yet. + auto status = OplogApplierImpl::applyOplogGroup(opCtx, operationsToApply, pathInfo); + if (!status.isOK()) { + return status; + } + + // Signals the reader to acquire a collection read lock. + _promise->emplaceValue(true); + + // Block while holding the PBWM lock until the reader is done. + if (!_taskFuture->get()) { + return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"}; + } + return Status::OK(); +} + class SecondaryReadsDuringBatchApplicationAreAllowed : public StorageTimestampTest { public: void run() { @@ -2498,55 +2567,28 @@ public: taskThread.join(); }); - // This apply operation function will block until the reader has tried acquiring a - // collection lock. This returns BadValue statuses instead of asserting so that the worker - // threads can cleanly exit and this test case fails without crashing the entire suite. - auto applyOperationFn = [&](OperationContext* opCtx, - std::vector<const repl::OplogEntry*>* operationsToApply, - repl::OplogApplierImpl* oa, - std::vector<MultikeyPathInfo>* pathInfo) -> Status { - if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, - MODE_X)) { - return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"}; - } - - // Insert the document. A reader without a PBWM lock should not see it yet. - auto status = repl::applyOplogGroup(opCtx, operationsToApply, oa, pathInfo); - if (!status.isOK()) { - return status; - } - - // Signals the reader to acquire a collection read lock. - batchInProgress.promise.emplaceValue(true); - - // Block while holding the PBWM lock until the reader is done. - if (!taskFuture.get()) { - return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"}; - } - return Status::OK(); - }; - // Make a simple insert operation. BSONObj doc0 = BSON("_id" << 0 << "a" << 0); auto insertOp = repl::OplogEntry(BSON("ts" << futureTs << "t" << 1LL << "v" << 2 << "op" << "i" << "ns" << ns.ns() << "ui" << uuid << "wall" << Date_t() << "o" << doc0)); - DoNothingOplogApplierObserver observer; // Apply the operation. auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::makeReplWriterPool(1); - repl::OplogApplierImpl oplogApplier( + SecondaryReadsDuringBatchApplicationAreAllowedApplier oplogApplier( nullptr, // task executor. not required for multiApply(). nullptr, // oplog buffer. not required for multiApply(). &observer, _coordinatorMock, _consistencyMarkers, storageInterface, - applyOperationFn, repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), - writerPool.get()); + writerPool.get(), + _opCtx, + &(batchInProgress.promise), + &taskFuture); auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, {insertOp})); ASSERT_EQ(insertOp.getOpTime(), lastOpTime); |