diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 281 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 8 | ||||
-rw-r--r-- | src/mongo/dbtests/repltests.cpp | 9 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 18 |
16 files changed, 273 insertions, 199 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index cbee6637c67..ae615085319 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -49,16 +49,17 @@ public: OplogBuffer* oplogBuffer, Observer* observer, DataReplicatorExternalStateMock* externalState) - : OplogApplier(executor, oplogBuffer, observer), + : OplogApplier(executor, + oplogBuffer, + observer, + OplogApplier::Options(OplogApplication::Mode::kSecondary)), _observer(observer), _externalState(externalState) {} private: void _run(OplogBuffer* oplogBuffer) final {} void _shutdown() final {} - StatusWith<OpTime> _multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) final { + StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final { return _externalState->multiApplyFn(opCtx, ops, _observer); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 06b4394509f..c6221493aa2 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -389,8 +389,7 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence std::vector<MultiApplier::OperationPtrs> writerVectors(1); std::vector<MultiApplier::Operations> derivedOps; // Derive ops for transactions if necessary. - syncTail.fillWriterVectors( - _opCtx.get(), &ops, &writerVectors, &derivedOps, OplogApplication::Mode::kInitialSync); + syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps); const auto& opPtrs = writerVectors[0]; ASSERT_OK(runOpPtrsInitialSync(opPtrs)); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index e35dd64463e..4a8a488999e 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -924,7 +924,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> // Create oplog applier. auto consistencyMarkers = _replicationProcess->getConsistencyMarkers(); - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = _syncSource; options.beginApplyingOpTime = lastOpTime; @@ -1219,8 +1219,7 @@ void InitialSyncer::_getNextApplierBatchCallback( _fetchCount.store(0); MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [this](OperationContext* opCtx, MultiApplier::Operations ops) { - return _oplogApplier->multiApply( - opCtx, std::move(ops), repl::OplogApplication::Mode::kInitialSync); + return _oplogApplier->multiApply(opCtx, std::move(ops)); }; OpTime lastApplied = ops.back().getOpTime(); invariant(ops.back().getWallClockTime()); diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 24a484f1362..711bce604eb 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -82,8 +82,9 @@ std::size_t OplogApplier::calculateBatchLimitBytes(OperationContext* opCtx, OplogApplier::OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, - Observer* observer) - : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer) {} + Observer* observer, + const OplogApplier::Options& options) + : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {} OplogBuffer* OplogApplier::getBuffer() const { return _oplogBuffer; @@ -289,11 +290,9 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( return std::move(ops); } -StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) { +StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations ops) { _observer->onBatchBegin(ops); - auto lastApplied = _multiApply(opCtx, std::move(ops), mode); + auto lastApplied = _multiApply(opCtx, std::move(ops)); _observer->onBatchEnd(lastApplied, {}); return lastApplied; } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 9650036b496..e9a7daf26fc 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/storage_interface.h" @@ -62,6 +63,10 @@ public: **/ class Options { public: + Options() = delete; + explicit Options(OplogApplication::Mode inputMode) : mode(inputMode) {} + + // TODO (SERVER-42039): Remove fields here that are redundant with the mode. bool allowNamespaceNotFoundErrorsOnCrudOps = false; bool relaxUniqueIndexConstraints = false; bool skipWritesToOplog = false; @@ -77,6 +82,8 @@ public: // For replication recovery only. During replication rollback, this is used to keep track // of the stable timestamp from which we replay the oplog. boost::optional<Timestamp> stableTimestampForRecovery; + + const OplogApplication::Mode mode; }; /** @@ -131,7 +138,10 @@ public: * Obtains batches of operations from the OplogBuffer to apply. * Reports oplog application progress using the Observer. */ - OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer); + OplogApplier(executor::TaskExecutor* executor, + OplogBuffer* oplogBuffer, + Observer* observer, + const Options& options); virtual ~OplogApplier() = default; @@ -195,9 +205,7 @@ public: * * TODO: remove when enqueue() is implemented. */ - StatusWith<OpTime> multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode); + StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops); private: /** @@ -223,9 +231,7 @@ private: * Called from multiApply() to apply a batch of operations in parallel. * Implemented in subclasses but not visible otherwise. */ - virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) = 0; + virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) = 0; // Used to schedule task for oplog application loop. // Not owned by us. @@ -242,6 +248,9 @@ private: // Set to true if shutdown() has been called. bool _inShutdown = false; + + // Configures this OplogApplier. + const Options _options; }; /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 9b3126cb781..d3133d19dcb 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -42,7 +42,7 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor, StorageInterface* storageInterface, const OplogApplier::Options& options, ThreadPool* writerPool) - : OplogApplier(executor, oplogBuffer, observer), + : OplogApplier(executor, oplogBuffer, observer, options), _replCoord(replCoord), _syncTail( observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options), @@ -61,9 +61,8 @@ void OplogApplierImpl::_shutdown() { _syncTail.shutdown(); } -StatusWith<OpTime> OplogApplierImpl::_multiApply( - OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) { - return _syncTail.multiApply(opCtx, std::move(ops), mode); +StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) { + return _syncTail.multiApply(opCtx, std::move(ops)); } } // namespace repl diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index 741cfd4dcd3..d6e0e523220 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -67,9 +67,7 @@ private: void _shutdown() override; - StatusWith<OpTime> _multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) override; + StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override; // Not owned by us. ReplicationCoordinator* const _replCoord; diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 05040963774..5b086263114 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -56,20 +56,20 @@ public: void _run(OplogBuffer* oplogBuffer) final; void _shutdown() final; - StatusWith<OpTime> _multiApply(OperationContext* opCtx, - Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) final; + StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final; }; OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer) - : OplogApplier(nullptr, oplogBuffer, nullptr) {} + : OplogApplier(nullptr, + oplogBuffer, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)) {} void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {} void OplogApplierMock::_shutdown() {} -StatusWith<OpTime> OplogApplierMock::_multiApply( - OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) { +StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) { return OpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c92bcf8792c..f9cf6b956f0 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -226,14 +226,15 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( // interface. During steady state replication, there is no need to log details on every batch // we apply (recovery); or track missing documents that are fetched from the sync source // (initial sync). - _oplogApplier = std::make_unique<OplogApplierImpl>(_oplogApplierTaskExecutor.get(), - _oplogBuffer.get(), - &noopOplogApplierObserver, - replCoord, - _replicationProcess->getConsistencyMarkers(), - _storageInterface, - OplogApplier::Options(), - _writerPool.get()); + _oplogApplier = std::make_unique<OplogApplierImpl>( + _oplogApplierTaskExecutor.get(), + _oplogBuffer.get(), + &noopOplogApplierObserver, + replCoord, + _replicationProcess->getConsistencyMarkers(), + _storageInterface, + OplogApplier::Options(OplogApplication::Mode::kSecondary), + _writerPool.get()); invariant(!_bgSync); _bgSync = diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index e34606fe92b..9f3171d7b23 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -365,7 +365,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, RecoveryOplogApplierStats stats; auto writerPool = OplogApplier::makeWriterPool(); - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kRecovering); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.skipWritesToOplog = true; // During replication recovery, the stableTimestampForRecovery refers to the stable timestamp @@ -394,8 +394,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, OplogApplier::Operations batch; while ( !(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) { - applyThroughOpTime = uassertStatusOK( - oplogApplier.multiApply(opCtx, std::move(batch), OplogApplication::Mode::kRecovering)); + applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch))); } stats.complete(applyThroughOpTime); invariant(oplogBuffer.isEmpty(), diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e7539892dad..7431107d29d 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -387,13 +387,6 @@ SyncTail::SyncTail(OplogApplier::Observer* observer, _writerPool(writerPool), _options(options) {} -SyncTail::SyncTail(OplogApplier::Observer* observer, - ReplicationConsistencyMarkers* consistencyMarkers, - StorageInterface* storageInterface, - MultiSyncApplyFunc func, - ThreadPool* writerPool) - : SyncTail(observer, consistencyMarkers, storageInterface, func, writerPool, {}) {} - SyncTail::~SyncTail() {} const OplogApplier::Options& SyncTail::getOptions() const { @@ -796,7 +789,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, // Apply the operations in this batch. 'multiApply' returns the optime of the last op that // was applied, which should be the last optime in the batch. auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none)); + fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); // In order to provide resilience in the event of a crash in the middle of batch @@ -1038,13 +1031,7 @@ Status multiSyncApply(OperationContext* opCtx, ApplierHelpers::stableSortByNamespace(ops); - // Assume we are recovering if oplog writes are disabled in the options. - // Assume we are in initial sync if we have a host for fetching missing documents. - const auto oplogApplicationMode = st->getOptions().skipWritesToOplog - ? OplogApplication::Mode::kRecovering - : (st->getOptions().missingDocumentSourceForInitialSync - ? OplogApplication::Mode::kInitialSync - : OplogApplication::Mode::kSecondary); + const auto oplogApplicationMode = st->getOptions().mode; ApplierHelpers::InsertGroup insertGroup(ops, opCtx, oplogApplicationMode); @@ -1123,8 +1110,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker, - boost::optional<repl::OplogApplication::Mode> mode) { + SessionUpdateTracker* sessionUpdateTracker) { const auto serviceContext = opCtx->getServiceContext(); const auto storageEngine = serviceContext->getStorageEngine(); @@ -1152,8 +1138,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, if (sessionUpdateTracker) { if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { derivedOps->emplace_back(std::move(*newOplogWrites)); - _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } } @@ -1221,7 +1206,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, } // Transaction entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } else { // The applyOps entry was not generated as part of a transaction. invariant(!op.getPrevWriteOpTimeInTransaction()); @@ -1229,7 +1214,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Nested entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } } catch (...) { fassertFailedWithStatusNoTrace( @@ -1244,7 +1229,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // If we see a commitTransaction command that is a part of a prepared transaction during // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers // with the extracted operations. - if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) { + if (isPreparedCommit(op) && (_options.mode == OplogApplication::Mode::kInitialSync)) { auto logicalSessionId = op.getSessionId(); auto& partialTxnList = partialTxnOps[*logicalSessionId]; @@ -1262,8 +1247,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp())); } - _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); continue; } @@ -1278,15 +1262,14 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, void SyncTail::fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps, - boost::optional<repl::OplogApplication::Mode> mode) { + std::vector<MultiApplier::Operations>* derivedOps) { SessionUpdateTracker sessionUpdateTracker; - _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode); + _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); auto newOplogWrites = sessionUpdateTracker.flushAll(); if (!newOplogWrites.empty()) { derivedOps->emplace_back(std::move(newOplogWrites)); - _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); } } @@ -1318,9 +1301,7 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors } } -StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, - MultiApplier::Operations ops, - boost::optional<repl::OplogApplication::Mode> mode) { +StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { invariant(!ops.empty()); LOG(2) << "replication batch size is " << ops.size(); @@ -1364,7 +1345,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode); + fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 5f9961c8b22..2bbdfbd71c8 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -97,11 +97,6 @@ public: MultiSyncApplyFunc func, ThreadPool* writerPool, const OplogApplier::Options& options); - SyncTail(OplogApplier::Observer* observer, - ReplicationConsistencyMarkers* consistencyMarkers, - StorageInterface* storageInterface, - MultiSyncApplyFunc func, - ThreadPool* writerPool); virtual ~SyncTail(); /** @@ -228,15 +223,12 @@ public: * to at least the last optime of the batch. If 'minValid' is already greater than or equal * to the last optime of this batch, it will not be updated. */ - StatusWith<OpTime> multiApply(OperationContext* opCtx, - MultiApplier::Operations ops, - boost::optional<repl::OplogApplication::Mode> mode); + StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops); void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps, - boost::optional<repl::OplogApplication::Mode> mode); + std::vector<MultiApplier::Operations>* derivedOps); private: class OpQueueBatcher; @@ -247,8 +239,7 @@ private: MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker, - boost::optional<repl::OplogApplication::Mode> mode); + SessionUpdateTracker* sessionUpdateTracker); /** * Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 35b07187677..95c981f6ead 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -406,8 +406,9 @@ DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty getConsistencyMarkers(), getStorageInterface(), noopApplyOperationFn, - writerPool.get()); - syncTail.multiApply(_opCtx.get(), {}, boost::none).getStatus().ignore(); + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore(); } bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, @@ -431,9 +432,13 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1)); ASSERT_FALSE(op.isForCappedCollection); - SyncTail syncTail( - nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}, boost::none)); + SyncTail syncTail(nullptr, + consistencyMarkers, + storageInterface, + applyOperationFn, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op})); ASSERT_EQUALS(op.getOpTime(), lastOpTime); ASSERT_EQUALS(1U, operationsApplied.size()); @@ -467,7 +472,12 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); + SyncTail syncTail(nullptr, + nullptr, + nullptr, + {}, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)); ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); // Collection should be created after SyncTail::syncApply() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); @@ -590,14 +600,18 @@ private: }; TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); // Apply a batch with only the first operation. This should result in the first oplog entry // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1})); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -612,7 +626,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { // Apply a batch with only the second operation. This should result in the second oplog entry // being put in the oplog, but with no effect because the operation is part of a pending // transaction. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -628,7 +642,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the two previous entries being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp})); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -644,7 +658,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { // Skipping writes to oplog proves we're testing the code path which does not rely on reading // the oplog. - OplogApplier::Options applierOpts; + OplogApplier::Options applierOpts(OplogApplication::Mode::kSecondary); applierOpts.skipWritesToOplog = true; SyncTail syncTail(nullptr, getConsistencyMarkers(), @@ -655,8 +669,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { // Apply both inserts and the commit in a single batch. We expect no oplog entries to // be inserted (because we've set skipWritesToOplog), and both entries to be committed. - ASSERT_OK( - syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp})); ASSERT_EQ(0U, oplogDocs().size()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -704,13 +717,17 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { StmtId(4), insertOps.back().getOpTime()); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); // Insert the first entry in its own batch. This should result in the oplog entry being written // but the entry should not be applied as it is part of a pending transaction. const auto expectedStartOpTime = insertOps[0].getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_EQ(0U, _insertedDocs[_nss1].size()); ASSERT_EQ(0U, _insertedDocs[_nss2].size()); @@ -723,8 +740,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { // Insert the rest of the entries, including the commit. These entries should be added to the // oplog, and all the entries including the first should be applied. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}, boost::none)); + ASSERT_OK( + syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp})); ASSERT_EQ(5U, oplogDocs().size()); ASSERT_EQ(3U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); @@ -836,8 +853,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { StmtId(2), insertOps2.back().getOpTime()); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); // Note the insert counter so we can check it later. It is necessary to use opCounters as // inserts are idempotent so we will not detect duplicate inserts just by checking inserts in @@ -847,8 +868,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { // once. ASSERT_OK(syncTail.multiApply( _opCtx.get(), - {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}, - boost::none)); + {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); ASSERT_EQ(6U, oplogDocs().size()); ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); ASSERT_EQ(4U, _insertedDocs[_nss1].size()); @@ -950,14 +970,18 @@ private: }; TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); // Apply a batch with the insert operations. This should result in the oplog entries // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); @@ -973,7 +997,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, and the two previous entries being applied (but in a transaction) along with the // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); @@ -987,7 +1011,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1000,14 +1024,18 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); // Apply a batch with the insert operations. This should result in the oplog entries // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); checkTxnTable(_lsid, _txnNum, _insertOp1->getOpTime(), @@ -1018,7 +1046,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, and the two previous entries being applied (but in a transaction) along with the // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); checkTxnTable(_lsid, _txnNum, _prepareWithPrevOp->getOpTime(), @@ -1028,7 +1056,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp})); ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1052,8 +1080,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {*_insertOp1, *_insertOp2}, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); @@ -1068,8 +1095,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {*_prepareWithPrevOp}, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1083,8 +1109,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being applied. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {*_commitPrepareWithPrevOp}, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1119,7 +1144,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with the insert operations. This should have no effect, because this is // recovery. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1132,7 +1157,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with only the prepare applyOps. This should have no effect, since this is // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1145,7 +1170,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with only the commit. This should result in the the three previous entries // being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1158,14 +1183,18 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); @@ -1178,7 +1207,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1191,8 +1220,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId( {Timestamp(Seconds(1), 3), 1LL}, @@ -1206,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1219,7 +1252,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1232,13 +1265,17 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr } TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) { - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + _writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); checkTxnTable(_lsid, _txnNum, _singlePrepareApplyOp->getOpTime(), @@ -1248,7 +1285,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp})); ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1273,8 +1310,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {*_singlePrepareApplyOp}, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1288,8 +1324,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the previous entry being applied. - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {*_commitSinglePrepareApplyOp}, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1325,7 +1360,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the prepare applyOps. This should have no effect, since this is // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1338,7 +1373,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the commit. This should result in the previous entry being // applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); ASSERT_TRUE(oplogDocs().empty()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1353,7 +1388,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, void testWorkerMultikeyPaths(OperationContext* opCtx, const OplogEntry& op, unsigned long numPaths) { - SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); + SyncTail syncTail(nullptr, + nullptr, + nullptr, + {}, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo)); @@ -1409,7 +1449,12 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA); auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7)); auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB); - SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); + SyncTail syncTail(nullptr, + nullptr, + nullptr, + {}, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); @@ -1452,7 +1497,12 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName()); auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); - SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); + SyncTail syncTail(nullptr, + nullptr, + nullptr, + {}, + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs ops = {&op}; ASSERT_EQUALS(ErrorCodes::InvalidOptions, multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); @@ -1936,7 +1986,7 @@ DEATH_TEST_F(SyncTailTest, return Status::OK(); }; auto writerPool = OplogApplier::makeWriterPool(); - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kSecondary); SyncTail syncTail(nullptr, // observer. not required by oplogApplication(). _consistencyMarkers.get(), getStorageInterface(), @@ -2555,9 +2605,13 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { date); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); } @@ -2586,9 +2640,13 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { Date_t::now()); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); ASSERT_FALSE(docExists( _opCtx.get(), @@ -2630,9 +2688,13 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa date); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); } @@ -2662,9 +2724,13 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa Date_t::now()); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp})); checkTxnTable(sessionInfo, newWriteOpTime, date); } @@ -2722,11 +2788,14 @@ TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSessi txnInsertOpTime); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp})); repl::checkTxnTable(_opCtx.get(), *sessionInfo.getSessionId(), @@ -2789,11 +2858,14 @@ TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSessi date); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(syncTail.multiApply( - _opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}, boost::none)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp})); repl::checkTxnTable(_opCtx.get(), *sessionInfo.getSessionId(), @@ -2854,12 +2926,15 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { {Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); ASSERT_OK(syncTail.multiApply( _opCtx.get(), - {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}, - boost::none)); + {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); // The txnNum and optime of the only write were saved. auto resultSingleDoc = @@ -2929,9 +3004,13 @@ TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) { outerInsertDate); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog})); checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate); } @@ -2952,9 +3031,13 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) { preImageDate); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog})); ASSERT_FALSE(docExists(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace, @@ -2977,9 +3060,13 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { Date_t::now()); auto writerPool = OplogApplier::makeWriterPool(); - SyncTail syncTail( - nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}, boost::none)); + SyncTail syncTail(nullptr, + getConsistencyMarkers(), + getStorageInterface(), + multiSyncApply, + writerPool.get(), + OplogApplier::Options(OplogApplication::Mode::kSecondary)); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog})); ASSERT_FALSE(docExists( _opCtx.get(), diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index eca42666f91..6ebcfa0f6fd 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -38,6 +38,7 @@ #include "mongo/db/op_observer_registry.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" +#include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_process.h" @@ -91,14 +92,14 @@ void SyncTailOpObserver::onCreateCollection(OperationContext* opCtx, // static OplogApplier::Options SyncTailTest::makeInitialSyncOptions() { - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); return options; } OplogApplier::Options SyncTailTest::makeRecoveryOptions() { - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kRecovering); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.skipWritesToOplog = true; return options; @@ -214,7 +215,8 @@ Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) { getConsistencyMarkers(), getStorageInterface(), SyncTail::MultiSyncApplyFunc(), - nullptr); + nullptr, + OplogApplier::Options(OplogApplication::Mode::kSecondary)); MultiApplier::OperationPtrs opsPtrs; for (auto& op : ops) { opsPtrs.push_back(&op); diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index d0a5f23e876..cc19591c47e 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -1359,7 +1359,12 @@ class SyncTest : public SyncTail { public: bool returnEmpty; explicit SyncTest(OplogApplier::Observer* observer) - : SyncTail(observer, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), + : SyncTail(observer, + nullptr, + nullptr, + SyncTail::MultiSyncApplyFunc(), + nullptr, + OplogApplier::Options(OplogApplication::Mode::kInitialSync)), returnEmpty(false) {} virtual ~SyncTest() {} BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override { @@ -1406,7 +1411,7 @@ public: // this should fail because we can't connect try { - OplogApplier::Options options; + OplogApplier::Options options(OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); SyncTail badSource( diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 2b48c91607d..37824936a81 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1386,10 +1386,9 @@ public: nullptr, // replication coordinator. not required for multiApply(). _consistencyMarkers, storageInterface, - {}, + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary), writerPool.get()); - ASSERT_EQUALS(op2.getOpTime(), - unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none))); + ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX); assertMultikeyPaths( @@ -1485,7 +1484,7 @@ public: DoNothingOplogApplierObserver observer; auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::OplogApplier::makeWriterPool(); - repl::OplogApplier::Options options; + repl::OplogApplier::Options options(repl::OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); @@ -1498,7 +1497,7 @@ public: storageInterface, options, writerPool.get()); - auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none)); + auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp()); // Wait for the index build to finish before making any assertions. @@ -2567,8 +2566,13 @@ public: auto storageInterface = repl::StorageInterface::get(_opCtx); auto writerPool = repl::OplogApplier::makeWriterPool(1); repl::SyncTail syncTail( - nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}, boost::none)); + nullptr, + _consistencyMarkers, + storageInterface, + applyOperationFn, + writerPool.get(), + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary)); + auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); ASSERT_EQ(insertOp.getOpTime(), lastOpTime); joinGuard.dismiss(); |