summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp9
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp5
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp11
-rw-r--r--src/mongo/db/repl/oplog_applier.h23
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp7
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h4
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp17
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp5
-rw-r--r--src/mongo/db/repl/sync_tail.cpp45
-rw-r--r--src/mongo/db/repl/sync_tail.h15
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp281
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp8
-rw-r--r--src/mongo/dbtests/repltests.cpp9
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp18
16 files changed, 273 insertions, 199 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index cbee6637c67..ae615085319 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -49,16 +49,17 @@ public:
OplogBuffer* oplogBuffer,
Observer* observer,
DataReplicatorExternalStateMock* externalState)
- : OplogApplier(executor, oplogBuffer, observer),
+ : OplogApplier(executor,
+ oplogBuffer,
+ observer,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary)),
_observer(observer),
_externalState(externalState) {}
private:
void _run(OplogBuffer* oplogBuffer) final {}
void _shutdown() final {}
- StatusWith<OpTime> _multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) final {
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final {
return _externalState->multiApplyFn(opCtx, ops, _observer);
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 06b4394509f..c6221493aa2 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -389,8 +389,7 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
// Derive ops for transactions if necessary.
- syncTail.fillWriterVectors(
- _opCtx.get(), &ops, &writerVectors, &derivedOps, OplogApplication::Mode::kInitialSync);
+ syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
const auto& opPtrs = writerVectors[0];
ASSERT_OK(runOpPtrsInitialSync(opPtrs));
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index e35dd64463e..4a8a488999e 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -924,7 +924,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
// Create oplog applier.
auto consistencyMarkers = _replicationProcess->getConsistencyMarkers();
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = _syncSource;
options.beginApplyingOpTime = lastOpTime;
@@ -1219,8 +1219,7 @@ void InitialSyncer::_getNextApplierBatchCallback(
_fetchCount.store(0);
MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [this](OperationContext* opCtx,
MultiApplier::Operations ops) {
- return _oplogApplier->multiApply(
- opCtx, std::move(ops), repl::OplogApplication::Mode::kInitialSync);
+ return _oplogApplier->multiApply(opCtx, std::move(ops));
};
OpTime lastApplied = ops.back().getOpTime();
invariant(ops.back().getWallClockTime());
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 24a484f1362..711bce604eb 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -82,8 +82,9 @@ std::size_t OplogApplier::calculateBatchLimitBytes(OperationContext* opCtx,
OplogApplier::OplogApplier(executor::TaskExecutor* executor,
OplogBuffer* oplogBuffer,
- Observer* observer)
- : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer) {}
+ Observer* observer,
+ const OplogApplier::Options& options)
+ : _executor(executor), _oplogBuffer(oplogBuffer), _observer(observer), _options(options) {}
OplogBuffer* OplogApplier::getBuffer() const {
return _oplogBuffer;
@@ -289,11 +290,9 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return std::move(ops);
}
-StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) {
+StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations ops) {
_observer->onBatchBegin(ops);
- auto lastApplied = _multiApply(opCtx, std::move(ops), mode);
+ auto lastApplied = _multiApply(opCtx, std::move(ops));
_observer->onBatchEnd(lastApplied, {});
return lastApplied;
}
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 9650036b496..e9a7daf26fc 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/storage_interface.h"
@@ -62,6 +63,10 @@ public:
**/
class Options {
public:
+ Options() = delete;
+ explicit Options(OplogApplication::Mode inputMode) : mode(inputMode) {}
+
+ // TODO (SERVER-42039): Remove fields here that are redundant with the mode.
bool allowNamespaceNotFoundErrorsOnCrudOps = false;
bool relaxUniqueIndexConstraints = false;
bool skipWritesToOplog = false;
@@ -77,6 +82,8 @@ public:
// For replication recovery only. During replication rollback, this is used to keep track
// of the stable timestamp from which we replay the oplog.
boost::optional<Timestamp> stableTimestampForRecovery;
+
+ const OplogApplication::Mode mode;
};
/**
@@ -131,7 +138,10 @@ public:
* Obtains batches of operations from the OplogBuffer to apply.
* Reports oplog application progress using the Observer.
*/
- OplogApplier(executor::TaskExecutor* executor, OplogBuffer* oplogBuffer, Observer* observer);
+ OplogApplier(executor::TaskExecutor* executor,
+ OplogBuffer* oplogBuffer,
+ Observer* observer,
+ const Options& options);
virtual ~OplogApplier() = default;
@@ -195,9 +205,7 @@ public:
*
* TODO: remove when enqueue() is implemented.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops);
private:
/**
@@ -223,9 +231,7 @@ private:
* Called from multiApply() to apply a batch of operations in parallel.
* Implemented in subclasses but not visible otherwise.
*/
- virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) = 0;
+ virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) = 0;
// Used to schedule task for oplog application loop.
// Not owned by us.
@@ -242,6 +248,9 @@ private:
// Set to true if shutdown() has been called.
bool _inShutdown = false;
+
+ // Configures this OplogApplier.
+ const Options _options;
};
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 9b3126cb781..d3133d19dcb 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -42,7 +42,7 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
StorageInterface* storageInterface,
const OplogApplier::Options& options,
ThreadPool* writerPool)
- : OplogApplier(executor, oplogBuffer, observer),
+ : OplogApplier(executor, oplogBuffer, observer, options),
_replCoord(replCoord),
_syncTail(
observer, consistencyMarkers, storageInterface, multiSyncApply, writerPool, options),
@@ -61,9 +61,8 @@ void OplogApplierImpl::_shutdown() {
_syncTail.shutdown();
}
-StatusWith<OpTime> OplogApplierImpl::_multiApply(
- OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
- return _syncTail.multiApply(opCtx, std::move(ops), mode);
+StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) {
+ return _syncTail.multiApply(opCtx, std::move(ops));
}
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index 741cfd4dcd3..d6e0e523220 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -67,9 +67,7 @@ private:
void _shutdown() override;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) override;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override;
// Not owned by us.
ReplicationCoordinator* const _replCoord;
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 05040963774..5b086263114 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -56,20 +56,20 @@ public:
void _run(OplogBuffer* oplogBuffer) final;
void _shutdown() final;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx,
- Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) final;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
};
OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer)
- : OplogApplier(nullptr, oplogBuffer, nullptr) {}
+ : OplogApplier(nullptr,
+ oplogBuffer,
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary)) {}
void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
void OplogApplierMock::_shutdown() {}
-StatusWith<OpTime> OplogApplierMock::_multiApply(
- OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
+StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
return OpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c92bcf8792c..f9cf6b956f0 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -226,14 +226,15 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
// interface. During steady state replication, there is no need to log details on every batch
// we apply (recovery); or track missing documents that are fetched from the sync source
// (initial sync).
- _oplogApplier = std::make_unique<OplogApplierImpl>(_oplogApplierTaskExecutor.get(),
- _oplogBuffer.get(),
- &noopOplogApplierObserver,
- replCoord,
- _replicationProcess->getConsistencyMarkers(),
- _storageInterface,
- OplogApplier::Options(),
- _writerPool.get());
+ _oplogApplier = std::make_unique<OplogApplierImpl>(
+ _oplogApplierTaskExecutor.get(),
+ _oplogBuffer.get(),
+ &noopOplogApplierObserver,
+ replCoord,
+ _replicationProcess->getConsistencyMarkers(),
+ _storageInterface,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary),
+ _writerPool.get());
invariant(!_bgSync);
_bgSync =
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index e34606fe92b..9f3171d7b23 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -365,7 +365,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
RecoveryOplogApplierStats stats;
auto writerPool = OplogApplier::makeWriterPool();
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kRecovering);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.skipWritesToOplog = true;
// During replication recovery, the stableTimestampForRecovery refers to the stable timestamp
@@ -394,8 +394,7 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
OplogApplier::Operations batch;
while (
!(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) {
- applyThroughOpTime = uassertStatusOK(
- oplogApplier.multiApply(opCtx, std::move(batch), OplogApplication::Mode::kRecovering));
+ applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch)));
}
stats.complete(applyThroughOpTime);
invariant(oplogBuffer.isEmpty(),
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e7539892dad..7431107d29d 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -387,13 +387,6 @@ SyncTail::SyncTail(OplogApplier::Observer* observer,
_writerPool(writerPool),
_options(options) {}
-SyncTail::SyncTail(OplogApplier::Observer* observer,
- ReplicationConsistencyMarkers* consistencyMarkers,
- StorageInterface* storageInterface,
- MultiSyncApplyFunc func,
- ThreadPool* writerPool)
- : SyncTail(observer, consistencyMarkers, storageInterface, func, writerPool, {}) {}
-
SyncTail::~SyncTail() {}
const OplogApplier::Options& SyncTail::getOptions() const {
@@ -796,7 +789,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Apply the operations in this batch. 'multiApply' returns the optime of the last op that
// was applied, which should be the last optime in the batch.
auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none));
+ fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
// In order to provide resilience in the event of a crash in the middle of batch
@@ -1038,13 +1031,7 @@ Status multiSyncApply(OperationContext* opCtx,
ApplierHelpers::stableSortByNamespace(ops);
- // Assume we are recovering if oplog writes are disabled in the options.
- // Assume we are in initial sync if we have a host for fetching missing documents.
- const auto oplogApplicationMode = st->getOptions().skipWritesToOplog
- ? OplogApplication::Mode::kRecovering
- : (st->getOptions().missingDocumentSourceForInitialSync
- ? OplogApplication::Mode::kInitialSync
- : OplogApplication::Mode::kSecondary);
+ const auto oplogApplicationMode = st->getOptions().mode;
ApplierHelpers::InsertGroup insertGroup(ops, opCtx, oplogApplicationMode);
@@ -1123,8 +1110,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker,
- boost::optional<repl::OplogApplication::Mode> mode) {
+ SessionUpdateTracker* sessionUpdateTracker) {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1152,8 +1138,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
}
@@ -1221,7 +1206,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
}
// Transaction entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
@@ -1229,7 +1214,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Nested entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
} catch (...) {
fassertFailedWithStatusNoTrace(
@@ -1244,7 +1229,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// If we see a commitTransaction command that is a part of a prepared transaction during
// initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
// with the extracted operations.
- if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) {
+ if (isPreparedCommit(op) && (_options.mode == OplogApplication::Mode::kInitialSync)) {
auto logicalSessionId = op.getSessionId();
auto& partialTxnList = partialTxnOps[*logicalSessionId];
@@ -1262,8 +1247,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
}
- _fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
continue;
}
@@ -1278,15 +1262,14 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- boost::optional<repl::OplogApplication::Mode> mode) {
+ std::vector<MultiApplier::Operations>* derivedOps) {
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode);
+ _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
}
@@ -1318,9 +1301,7 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx,
- MultiApplier::Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode) {
+StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
invariant(!ops.empty());
LOG(2) << "replication batch size is " << ops.size();
@@ -1364,7 +1345,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx,
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode);
+ fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 5f9961c8b22..2bbdfbd71c8 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -97,11 +97,6 @@ public:
MultiSyncApplyFunc func,
ThreadPool* writerPool,
const OplogApplier::Options& options);
- SyncTail(OplogApplier::Observer* observer,
- ReplicationConsistencyMarkers* consistencyMarkers,
- StorageInterface* storageInterface,
- MultiSyncApplyFunc func,
- ThreadPool* writerPool);
virtual ~SyncTail();
/**
@@ -228,15 +223,12 @@ public:
* to at least the last optime of the batch. If 'minValid' is already greater than or equal
* to the last optime of this batch, it will not be updated.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx,
- MultiApplier::Operations ops,
- boost::optional<repl::OplogApplication::Mode> mode);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- boost::optional<repl::OplogApplication::Mode> mode);
+ std::vector<MultiApplier::Operations>* derivedOps);
private:
class OpQueueBatcher;
@@ -247,8 +239,7 @@ private:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker,
- boost::optional<repl::OplogApplication::Mode> mode);
+ SessionUpdateTracker* sessionUpdateTracker);
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 35b07187677..95c981f6ead 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -406,8 +406,9 @@ DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty
getConsistencyMarkers(),
getStorageInterface(),
noopApplyOperationFn,
- writerPool.get());
- syncTail.multiApply(_opCtx.get(), {}, boost::none).getStatus().ignore();
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore();
}
bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
@@ -431,9 +432,13 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1));
ASSERT_FALSE(op.isForCappedCollection);
- SyncTail syncTail(
- nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}, boost::none));
+ SyncTail syncTail(nullptr,
+ consistencyMarkers,
+ storageInterface,
+ applyOperationFn,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}));
ASSERT_EQUALS(op.getOpTime(), lastOpTime);
ASSERT_EQUALS(1U, operationsApplied.size());
@@ -467,7 +472,12 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
+ SyncTail syncTail(nullptr,
+ nullptr,
+ nullptr,
+ {},
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
// Collection should be created after SyncTail::syncApply() processes operation.
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
@@ -590,14 +600,18 @@ private:
};
TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
// Apply a batch with only the first operation. This should result in the first oplog entry
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -612,7 +626,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the second operation. This should result in the second oplog entry
// being put in the oplog, but with no effect because the operation is part of a pending
// transaction.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -628,7 +642,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -644,7 +658,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
// Skipping writes to oplog proves we're testing the code path which does not rely on reading
// the oplog.
- OplogApplier::Options applierOpts;
+ OplogApplier::Options applierOpts(OplogApplication::Mode::kSecondary);
applierOpts.skipWritesToOplog = true;
SyncTail syncTail(nullptr,
getConsistencyMarkers(),
@@ -655,8 +669,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
// Apply both inserts and the commit in a single batch. We expect no oplog entries to
// be inserted (because we've set skipWritesToOplog), and both entries to be committed.
- ASSERT_OK(
- syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}));
ASSERT_EQ(0U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -704,13 +717,17 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
StmtId(4),
insertOps.back().getOpTime());
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
// Insert the first entry in its own batch. This should result in the oplog entry being written
// but the entry should not be applied as it is part of a pending transaction.
const auto expectedStartOpTime = insertOps[0].getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_EQ(0U, _insertedDocs[_nss1].size());
ASSERT_EQ(0U, _insertedDocs[_nss2].size());
@@ -723,8 +740,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the rest of the entries, including the commit. These entries should be added to the
// oplog, and all the entries including the first should be applied.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}, boost::none));
+ ASSERT_OK(
+ syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}));
ASSERT_EQ(5U, oplogDocs().size());
ASSERT_EQ(3U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
@@ -836,8 +853,12 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
StmtId(2),
insertOps2.back().getOpTime());
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
// Note the insert counter so we can check it later. It is necessary to use opCounters as
// inserts are idempotent so we will not detect duplicate inserts just by checking inserts in
@@ -847,8 +868,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
// once.
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2},
- boost::none));
+ {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
ASSERT_EQ(6U, oplogDocs().size());
ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
ASSERT_EQ(4U, _insertedDocs[_nss1].size());
@@ -950,14 +970,18 @@ private:
};
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionSteadyState) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
// Apply a batch with the insert operations. This should result in the oplog entries
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -973,7 +997,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -987,7 +1011,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1000,14 +1024,18 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactionCheckTxnTable) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
// Apply a batch with the insert operations. This should result in the oplog entries
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
checkTxnTable(_lsid,
_txnNum,
_insertOp1->getOpTime(),
@@ -1018,7 +1046,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
checkTxnTable(_lsid,
_txnNum,
_prepareWithPrevOp->getOpTime(),
@@ -1028,7 +1056,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}));
ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1052,8 +1080,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {*_insertOp1, *_insertOp2}, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -1068,8 +1095,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {*_prepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1083,8 +1109,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being applied.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {*_commitPrepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1119,7 +1144,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with the insert operations. This should have no effect, because this is
// recovery.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1132,7 +1157,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1145,7 +1170,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the commit. This should result in the the three previous entries
// being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1158,14 +1183,18 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedTransaction) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -1178,7 +1207,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1191,8 +1220,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTransaction) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
{Timestamp(Seconds(1), 3), 1LL},
@@ -1206,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1219,7 +1252,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1232,13 +1265,17 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
}
TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPreparedTransaction) {
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, _writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ _writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
checkTxnTable(_lsid,
_txnNum,
_singlePrepareApplyOp->getOpTime(),
@@ -1248,7 +1285,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}));
ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1273,8 +1310,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {*_singlePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1288,8 +1324,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the previous entry being applied.
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {*_commitSinglePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1325,7 +1360,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1338,7 +1373,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the previous entry being
// applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1353,7 +1388,12 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
void testWorkerMultikeyPaths(OperationContext* opCtx,
const OplogEntry& op,
unsigned long numPaths) {
- SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
+ SyncTail syncTail(nullptr,
+ nullptr,
+ nullptr,
+ {},
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&op};
ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo));
@@ -1409,7 +1449,12 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) {
auto opA = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, docA);
auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7));
auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB);
- SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
+ SyncTail syncTail(nullptr,
+ nullptr,
+ nullptr,
+ {},
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&opA, &opB};
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
@@ -1452,7 +1497,12 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
NamespaceString nss("foo." + _agent.getSuiteName() + "_" + _agent.getTestName());
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
- SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
+ SyncTail syncTail(nullptr,
+ nullptr,
+ nullptr,
+ {},
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs ops = {&op};
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
@@ -1936,7 +1986,7 @@ DEATH_TEST_F(SyncTailTest,
return Status::OK();
};
auto writerPool = OplogApplier::makeWriterPool();
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kSecondary);
SyncTail syncTail(nullptr, // observer. not required by oplogApplication().
_consistencyMarkers.get(),
getStorageInterface(),
@@ -2555,9 +2605,13 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
date);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
}
@@ -2586,9 +2640,13 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
Date_t::now());
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
ASSERT_FALSE(docExists(
_opCtx.get(),
@@ -2630,9 +2688,13 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa
date);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
}
@@ -2662,9 +2724,13 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa
Date_t::now());
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}));
checkTxnTable(sessionInfo, newWriteOpTime, date);
}
@@ -2722,11 +2788,14 @@ TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSessi
txnInsertOpTime);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2789,11 +2858,14 @@ TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSessi
date);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(syncTail.multiApply(
- _opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}, boost::none));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2854,12 +2926,15 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
{Timestamp(Seconds(7), 0), 1LL}, ns3, BSON("_id" << 0), info);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn},
- boost::none));
+ {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
// The txnNum and optime of the only write were saved.
auto resultSingleDoc =
@@ -2929,9 +3004,13 @@ TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) {
outerInsertDate);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}));
checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate);
}
@@ -2952,9 +3031,13 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
preImageDate);
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}));
ASSERT_FALSE(docExists(_opCtx.get(),
NamespaceString::kSessionTransactionsTableNamespace,
@@ -2977,9 +3060,13 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
Date_t::now());
auto writerPool = OplogApplier::makeWriterPool();
- SyncTail syncTail(
- nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}, boost::none));
+ SyncTail syncTail(nullptr,
+ getConsistencyMarkers(),
+ getStorageInterface(),
+ multiSyncApply,
+ writerPool.get(),
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}));
ASSERT_FALSE(docExists(
_opCtx.get(),
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index eca42666f91..6ebcfa0f6fd 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
+#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/replication_process.h"
@@ -91,14 +92,14 @@ void SyncTailOpObserver::onCreateCollection(OperationContext* opCtx,
// static
OplogApplier::Options SyncTailTest::makeInitialSyncOptions() {
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
return options;
}
OplogApplier::Options SyncTailTest::makeRecoveryOptions() {
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kRecovering);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.skipWritesToOplog = true;
return options;
@@ -214,7 +215,8 @@ Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) {
getConsistencyMarkers(),
getStorageInterface(),
SyncTail::MultiSyncApplyFunc(),
- nullptr);
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs opsPtrs;
for (auto& op : ops) {
opsPtrs.push_back(&op);
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index d0a5f23e876..cc19591c47e 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -1359,7 +1359,12 @@ class SyncTest : public SyncTail {
public:
bool returnEmpty;
explicit SyncTest(OplogApplier::Observer* observer)
- : SyncTail(observer, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr),
+ : SyncTail(observer,
+ nullptr,
+ nullptr,
+ SyncTail::MultiSyncApplyFunc(),
+ nullptr,
+ OplogApplier::Options(OplogApplication::Mode::kInitialSync)),
returnEmpty(false) {}
virtual ~SyncTest() {}
BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override {
@@ -1406,7 +1411,7 @@ public:
// this should fail because we can't connect
try {
- OplogApplier::Options options;
+ OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
SyncTail badSource(
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 2b48c91607d..37824936a81 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1386,10 +1386,9 @@ public:
nullptr, // replication coordinator. not required for multiApply().
_consistencyMarkers,
storageInterface,
- {},
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
- ASSERT_EQUALS(op2.getOpTime(),
- unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none)));
+ ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX);
assertMultikeyPaths(
@@ -1485,7 +1484,7 @@ public:
DoNothingOplogApplierObserver observer;
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::OplogApplier::makeWriterPool();
- repl::OplogApplier::Options options;
+ repl::OplogApplier::Options options(repl::OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
@@ -1498,7 +1497,7 @@ public:
storageInterface,
options,
writerPool.get());
- auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none));
+ auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp());
// Wait for the index build to finish before making any assertions.
@@ -2567,8 +2566,13 @@ public:
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::OplogApplier::makeWriterPool(1);
repl::SyncTail syncTail(
- nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}, boost::none));
+ nullptr,
+ _consistencyMarkers,
+ storageInterface,
+ applyOperationFn,
+ writerPool.get(),
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));
ASSERT_EQ(insertOp.getOpTime(), lastOpTime);
joinGuard.dismiss();