summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-10-10 15:00:17 +0000
committerevergreen <evergreen@mongodb.com>2019-10-10 15:00:17 +0000
commit6b94e0025991847cb4a145351bfeee12f4af92e4 (patch)
tree1b8e6b89aa88889859a135a273a2b7948c27d2b2
parentab77143b2163da066776bd965e0cd3a931d9aa10 (diff)
downloadmongo-6b94e0025991847cb4a145351bfeee12f4af92e4.tar.gz
SERVER-42998 Make multiSyncApply non-configurable
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp1
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp200
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h31
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp154
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.h28
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp1
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp1
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp110
9 files changed, 252 insertions, 287 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 357f5959aa2..33ea1ea0d16 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -143,7 +143,6 @@ std::unique_ptr<OplogApplier> DataReplicatorExternalStateImpl::makeOplogApplier(
_replicationCoordinator,
consistencyMarkers,
storageInterface,
- applyOplogGroup,
options,
writerPool);
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 899bae5683d..97b9d37797f 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -235,6 +235,13 @@ void addDerivedOps(OperationContext* opCtx,
}
}
+void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) {
+ auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) {
+ return l->getNss() < r->getNss();
+ };
+ std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator);
+}
+
} // namespace
@@ -351,7 +358,6 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
ReplicationCoordinator* replCoord,
ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
- ApplyGroupFunc func,
const OplogApplier::Options& options,
ThreadPool* writerPool)
: OplogApplier(executor, oplogBuffer, observer, options),
@@ -359,7 +365,6 @@ OplogApplierImpl::OplogApplierImpl(executor::TaskExecutor* executor,
_writerPool(writerPool),
_storageInterface(storageInterface),
_consistencyMarkers(consistencyMarkers),
- _applyFunc(func),
_beginApplyingOpTime(options.beginApplyingOpTime) {}
void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
@@ -636,9 +641,8 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx,
// so it is safe to exclude any writes from Flow Control.
opCtx->setShouldParticipateInFlowControl(false);
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return _applyFunc(opCtx.get(), &writer, this, &multikeyVector);
- });
+ status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown(
+ [&] { return applyOplogGroup(opCtx.get(), &writer, &multikeyVector); });
});
}
@@ -832,6 +836,90 @@ void OplogApplierImpl::fillWriterVectors(
}
}
+Status OplogApplierImpl::applyOplogGroup(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+
+ UnreplicatedWritesBlock uwb(opCtx);
+ DisableDocumentValidation validationDisabler(opCtx);
+ // Since we swap the locker in stash / unstash transaction resources,
+ // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been
+ // destroyed by unstash in its destructor. Thus we set the flag explicitly.
+ opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+
+ // Explicitly start future read transactions without a timestamp.
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+
+ // When querying indexes, we return the record matching the key if it exists, or an adjacent
+ // document. This means that it is possible for us to hit a prepare conflict if we query for an
+ // incomplete key and an adjacent key is prepared.
+ // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that
+ // did not occur on the primary.
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(
+ PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
+
+ stableSortByNamespace(ops);
+
+ const auto oplogApplicationMode = getOptions().mode;
+
+ InsertGroup insertGroup(ops, opCtx, oplogApplicationMode);
+
+ { // Ensure that the MultikeyPathTracker stops tracking paths.
+ ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); });
+ MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo();
+
+ for (auto it = ops->cbegin(); it != ops->cend(); ++it) {
+ const OplogEntry& entry = **it;
+
+ // If we are successful in grouping and applying inserts, advance the current iterator
+ // past the end of the inserted group of entries.
+ auto groupResult = insertGroup.groupAndApplyInserts(it);
+ if (groupResult.isOK()) {
+ it = groupResult.getValue();
+ continue;
+ }
+
+ // If we didn't create a group, try to apply the op individually.
+ try {
+ const Status status = applyOplogEntryBatch(opCtx, &entry, oplogApplicationMode);
+
+ if (!status.isOK()) {
+ // Tried to apply an update operation but the document is missing, there must be
+ // a delete operation for the document later in the oplog.
+ if (status == ErrorCodes::UpdateOperationFailed &&
+ oplogApplicationMode == OplogApplication::Mode::kInitialSync) {
+ continue;
+ }
+
+ severe() << "Error applying operation (" << redact(entry.toBSON())
+ << "): " << causedBy(redact(status));
+ return status;
+ }
+ } catch (const DBException& e) {
+ // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be
+ // dropped before initial sync or recovery ends anyways and we should ignore it.
+ if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() &&
+ getOptions().allowNamespaceNotFoundErrorsOnCrudOps) {
+ continue;
+ }
+
+ severe() << "writer worker caught exception: " << redact(e)
+ << " on: " << redact(entry.toBSON());
+ return e.toStatus();
+ }
+ }
+ }
+
+ invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo());
+ invariant(workerMultikeyPathInfo->empty());
+ auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo();
+ if (!newPaths.empty()) {
+ workerMultikeyPathInfo->swap(newPaths);
+ }
+
+ return Status::OK();
+}
+
Status applyOplogEntryBatch(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode) {
@@ -924,108 +1012,6 @@ Status applyOplogEntryBatch(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-void stableSortByNamespace(MultiApplier::OperationPtrs* oplogEntryPointers) {
- if (oplogEntryPointers->size() < 1U) {
- return;
- }
- auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) {
- return l->getNss() < r->getNss();
- };
- std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), nssComparator);
-}
-
-/**
- * This free function is used by the thread pool workers to write ops to the db.
- * This consumes the passed in OperationPtrs and callers should not make any assumptions about the
- * state of the container after calling. However, this function cannot modify the pointed-to
- * operations because the OperationPtrs container contains const pointers.
- */
-Status applyOplogGroup(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- OplogApplierImpl* oai,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
- invariant(oai);
-
- UnreplicatedWritesBlock uwb(opCtx);
- DisableDocumentValidation validationDisabler(opCtx);
- // Since we swap the locker in stash / unstash transaction resources,
- // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been
- // destroyed by unstash in its destructor. Thus we set the flag explicitly.
- opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
-
- // Explicitly start future read transactions without a timestamp.
- opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
-
- // When querying indexes, we return the record matching the key if it exists, or an adjacent
- // document. This means that it is possible for us to hit a prepare conflict if we query for an
- // incomplete key and an adjacent key is prepared.
- // We ignore prepare conflicts on secondaries because they may encounter prepare conflicts that
- // did not occur on the primary.
- opCtx->recoveryUnit()->setPrepareConflictBehavior(
- PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
-
- stableSortByNamespace(ops);
-
- const auto oplogApplicationMode = oai->getOptions().mode;
-
- InsertGroup insertGroup(ops, opCtx, oplogApplicationMode);
-
- { // Ensure that the MultikeyPathTracker stops tracking paths.
- ON_BLOCK_EXIT([opCtx] { MultikeyPathTracker::get(opCtx).stopTrackingMultikeyPathInfo(); });
- MultikeyPathTracker::get(opCtx).startTrackingMultikeyPathInfo();
-
- for (auto it = ops->cbegin(); it != ops->cend(); ++it) {
- const OplogEntry& entry = **it;
-
- // If we are successful in grouping and applying inserts, advance the current iterator
- // past the end of the inserted group of entries.
- auto groupResult = insertGroup.groupAndApplyInserts(it);
- if (groupResult.isOK()) {
- it = groupResult.getValue();
- continue;
- }
-
- // If we didn't create a group, try to apply the op individually.
- try {
- const Status status = applyOplogEntryBatch(opCtx, &entry, oplogApplicationMode);
-
- if (!status.isOK()) {
- // Tried to apply an update operation but the document is missing, there must be
- // a delete operation for the document later in the oplog.
- if (status == ErrorCodes::UpdateOperationFailed &&
- oplogApplicationMode == OplogApplication::Mode::kInitialSync) {
- continue;
- }
-
- severe() << "Error applying operation (" << redact(entry.toBSON())
- << "): " << causedBy(redact(status));
- return status;
- }
- } catch (const DBException& e) {
- // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be
- // dropped before initial sync or recovery ends anyways and we should ignore it.
- if (e.code() == ErrorCodes::NamespaceNotFound && entry.isCrudOpType() &&
- oai->getOptions().allowNamespaceNotFoundErrorsOnCrudOps) {
- continue;
- }
-
- severe() << "writer worker caught exception: " << redact(e)
- << " on: " << redact(entry.toBSON());
- return e.toStatus();
- }
- }
- }
-
- invariant(!MultikeyPathTracker::get(opCtx).isTrackingMultikeyPathInfo());
- invariant(workerMultikeyPathInfo->empty());
- auto newPaths = MultikeyPathTracker::get(opCtx).getMultikeyPathInfo();
- if (!newPaths.empty()) {
- workerMultikeyPathInfo->swap(newPaths);
- }
-
- return Status::OK();
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index 4d67f022d4d..02790b35103 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -58,10 +58,6 @@ class OplogApplierImpl : public OplogApplier {
OplogApplierImpl& operator=(const OplogApplierImpl&) = delete;
public:
- using ApplyGroupFunc = std::function<Status(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- OplogApplierImpl* oai,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
/**
* Constructs this OplogApplier with specific options.
* During steady state replication, _run() obtains batches of operations to apply
@@ -75,7 +71,6 @@ public:
ReplicationCoordinator* replCoord,
ReplicationConsistencyMarkers* consistencyMarkers,
StorageInterface* storageInterface,
- ApplyGroupFunc func,
const Options& options,
ThreadPool* writerPool);
@@ -120,9 +115,6 @@ private:
ReplicationConsistencyMarkers* const _consistencyMarkers;
- // Function to use during _multiApply
- ApplyGroupFunc _applyFunc;
-
// Used to determine which operations should be applied during initial sync. If this is null,
// we will apply all operations that were fetched.
OpTime _beginApplyingOpTime = OpTime();
@@ -133,6 +125,19 @@ protected:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps) noexcept;
+
+ /**
+ * This function is used by the thread pool workers to write ops to the db.
+ * This consumes the passed in OperationPtrs and callers should not make any assumptions about
+ * the state of the container after calling. However, this function cannot modify the pointed-to
+ * operations because the OperationPtrs container contains const pointers.
+ *
+ * This function has been marked as virtual to allow certain unit tests to skip oplog
+ * application.
+ */
+ virtual Status applyOplogGroup(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo);
};
/**
@@ -142,15 +147,5 @@ Status applyOplogEntryBatch(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode);
-/**
- * This free function is used by the thread pool workers to write ops to the db.
- * This consumes the passed in OperationPtrs and callers should not make any assumptions about the
- * state of the container after calling. However, this function cannot modify the pointed-to
- * operations because the OperationPtrs container contains const pointers.
- */
-Status applyOplogGroup(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- OplogApplierImpl* oai,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index f085076bacd..cf171d1d0b8 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -99,27 +99,6 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollec
}
/**
- * Testing-only OplogApplierImpl
- */
-
-class OplogApplierImplForTest : public OplogApplierImpl {
-public:
- OplogApplierImplForTest();
-};
-
-// Minimal constructor that takes options, the only member accessed in fillWriterVectors.
-OplogApplierImplForTest::OplogApplierImplForTest()
- : OplogApplierImpl(nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
- nullptr) {}
-
-/**
* Creates collection options suitable for oplog.
*/
CollectionOptions createOplogCollectionOptions() {
@@ -308,17 +287,38 @@ TEST_F(OplogApplierImplTest, applyOplogEntryBatchCommand) {
ASSERT_TRUE(applyCmdCalled);
}
+/**
+ * Test only subclass of OplogApplierImpl that does not apply oplog entries, but tracks ops.
+ */
+class TrackOpsAppliedApplier : public OplogApplierImpl {
+public:
+ using OplogApplierImpl::OplogApplierImpl;
+
+ Status applyOplogGroup(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
+ MultiApplier::Operations operationsApplied;
+};
+
+Status TrackOpsAppliedApplier::applyOplogGroup(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ for (auto&& opPtr : *ops) {
+ operationsApplied.push_back(*opPtr);
+ }
+ return Status::OK();
+}
+
DEATH_TEST_F(OplogApplierImplTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty()") {
auto writerPool = makeReplWriterPool();
NoopOplogApplierObserver observer;
- OplogApplierImpl oplogApplier(
+ TrackOpsAppliedApplier oplogApplier(
nullptr, // executor
nullptr, // oplogBuffer
&observer,
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- noopApplyOperationFn,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
oplogApplier.multiApply(_opCtx.get(), {}).getStatus().ignore();
@@ -331,37 +331,26 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionOptions& options) {
auto writerPool = makeReplWriterPool();
- MultiApplier::Operations operationsApplied;
- auto applyOperationFn = [&operationsApplied](OperationContext* opCtx,
- MultiApplier::OperationPtrs* operationsToApply,
- OplogApplierImpl* oai,
- WorkerMultikeyPathInfo*) -> Status {
- for (auto&& opPtr : *operationsToApply) {
- operationsApplied.push_back(*opPtr);
- }
- return Status::OK();
- };
createCollection(opCtx, nss, options);
auto op = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss, BSON("a" << 1));
ASSERT_FALSE(op.isForCappedCollection);
NoopOplogApplierObserver observer;
- OplogApplierImpl oplogApplier(
+ TrackOpsAppliedApplier oplogApplier(
nullptr, // executor
nullptr, // oplogBuffer
&observer,
replCoord,
consistencyMarkers,
storageInterface,
- applyOperationFn,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(opCtx, {op}));
ASSERT_EQUALS(op.getOpTime(), lastOpTime);
- ASSERT_EQUALS(1U, operationsApplied.size());
- const auto& opApplied = operationsApplied.front();
+ ASSERT_EQUALS(1U, oplogApplier.operationsApplied.size());
+ const auto& opApplied = oplogApplier.operationsApplied.front();
ASSERT_EQUALS(op, opApplied);
// "isForCappedCollection" is not parsed from raw oplog entry document.
return opApplied.isForCappedCollection;
@@ -397,16 +386,9 @@ TEST_F(OplogApplierImplTest, ApplyGroupUsesApplyOplogEntryBatchToApplyOperation)
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- OplogApplierImpl oplogApplier(nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary),
- nullptr);
- ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo));
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
+ ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo));
// Collection should be created after applyOplogEntryBatch() processes operation.
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
@@ -523,7 +505,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionSepar
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -586,7 +567,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionAllAt
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
_writerPool.get());
@@ -644,7 +624,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBa
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -765,7 +744,6 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -877,7 +855,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -936,7 +913,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -986,7 +962,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
_writerPool.get());
// Apply a batch with the insert operations. This should result in the oplog entries
@@ -1055,7 +1030,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
_writerPool.get());
@@ -1109,7 +1083,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
@@ -1150,7 +1123,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -1200,7 +1172,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
@@ -1239,7 +1210,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
_writerPool.get());
@@ -1294,7 +1264,6 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kRecovering),
_writerPool.get());
@@ -1330,19 +1299,11 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
void testWorkerMultikeyPaths(OperationContext* opCtx,
const OplogEntry& op,
unsigned long numPaths) {
-
- OplogApplierImpl oplogApplier(nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary),
- nullptr);
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&op};
- ASSERT_OK(applyOplogGroup(opCtx, &ops, &oplogApplier, &pathInfo));
+ ASSERT_OK(oplogApplier.applyOplogGroup(opCtx, &ops, &pathInfo));
ASSERT_EQ(pathInfo.size(), numPaths);
}
@@ -1396,18 +1357,11 @@ TEST_F(OplogApplierImplTest, ApplyGroupAddsMultipleWorkerMultikeyPathInfo) {
auto docB = BSON("_id" << 2 << "b" << BSON_ARRAY(6 << 7));
auto opB = makeInsertDocumentOplogEntry({Timestamp(Seconds(5), 0), 1LL}, nss, docB);
- OplogApplierImpl oplogApplier(nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary),
- nullptr);
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&opA, &opB};
- ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo));
+ ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo));
ASSERT_EQ(pathInfo.size(), 2UL);
}
}
@@ -1448,18 +1402,11 @@ TEST_F(OplogApplierImplTest, ApplyGroupFailsWhenCollectionCreationTriesToMakeUUI
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
- OplogApplierImpl oplogApplier(nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kSecondary),
- nullptr);
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs ops = {&op};
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
- applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, nullptr));
+ oplogApplier.applyOplogGroup(_opCtx.get(), &ops, nullptr));
}
TEST_F(OplogApplierImplTest, ApplyGroupDisablesDocumentValidationWhileApplyingOperations) {
@@ -1821,7 +1768,8 @@ TEST_F(OplogApplierImplTest,
}
TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) {
- OplogApplierImplForTest oplogApplier;
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync));
NamespaceString nss("test.t");
{
Lock::GlobalWrite globalLock(_opCtx.get());
@@ -1835,7 +1783,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo));
+ ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo));
// Since the document was missing when we cloned data from the sync source, the collection
// referenced by the failed operation should not be automatically created.
@@ -1844,7 +1792,8 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing
TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
- OplogApplierImplForTest oplogApplier;
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync));
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad");
auto doc1 = BSON("_id" << 1);
@@ -1856,7 +1805,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringIni
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo));
+ ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo));
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
@@ -1866,7 +1815,8 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsDocumentOnNamespaceNotFoundDuringIni
TEST_F(OplogApplierImplTest, ApplyGroupSkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
- OplogApplierImplForTest oplogApplier;
+ TestApplyOplogGroupApplier oplogApplier(
+ nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kInitialSync));
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad");
auto doc1 = BSON("_id" << 1);
@@ -1880,7 +1830,7 @@ TEST_F(OplogApplierImplTest, ApplyGroupSkipsIndexCreationOnNamespaceNotFoundDuri
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(applyOplogGroup(_opCtx.get(), &ops, &oplogApplier, &pathInfo));
+ ASSERT_OK(oplogApplier.applyOplogGroup(_opCtx.get(), &ops, &pathInfo));
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
@@ -2478,7 +2428,6 @@ TEST_F(OplogApplierImplTxnTableTest, SimpleWriteWithTxn) {
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2519,7 +2468,6 @@ TEST_F(OplogApplierImplTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable)
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2574,7 +2522,6 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDelet
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2616,7 +2563,6 @@ TEST_F(OplogApplierImplTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdat
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2681,7 +2627,6 @@ TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnS
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2751,7 +2696,6 @@ TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnS
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2824,7 +2768,6 @@ TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2908,7 +2851,6 @@ TEST_F(OplogApplierImplTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnT
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2941,7 +2883,6 @@ TEST_F(OplogApplierImplTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable)
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
@@ -2976,7 +2917,6 @@ TEST_F(OplogApplierImplTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTabl
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
getStorageInterface(),
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
index dee065f19b6..bf21ae4ad84 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -204,22 +204,16 @@ Status OplogApplierImplTest::runOpSteadyState(const OplogEntry& op) {
}
Status OplogApplierImplTest::runOpsSteadyState(std::vector<OplogEntry> ops) {
- OplogApplierImpl oplogApplier(
- nullptr, // executor
- nullptr, // oplogBuffer
- nullptr, // observer
- nullptr, // replCoord
+ TestApplyOplogGroupApplier oplogApplier(
getConsistencyMarkers(),
getStorageInterface(),
- OplogApplierImpl::ApplyGroupFunc(),
- repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
- nullptr);
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary));
MultiApplier::OperationPtrs opsPtrs;
for (auto& op : ops) {
opsPtrs.push_back(&op);
}
WorkerMultikeyPathInfo pathInfo;
- return applyOplogGroup(_opCtx.get(), &opsPtrs, &oplogApplier, &pathInfo);
+ return oplogApplier.applyOplogGroup(_opCtx.get(), &opsPtrs, &pathInfo);
}
Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) {
@@ -237,7 +231,6 @@ Status OplogApplierImplTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
ReplicationCoordinator::get(_opCtx.get()),
getConsistencyMarkers(),
storageInterface,
- applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
writerPool.get());
// Idempotency tests apply the same batch of oplog entries multiple times in a loop, which would
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
index 76e060faabc..2d632351b62 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.h
@@ -45,6 +45,26 @@ class BSONObj;
class OperationContext;
namespace repl {
+
+/**
+ * Test only subclass of OplogApplierImpl that makes applyOplogGroup a public method.
+ */
+class TestApplyOplogGroupApplier : public OplogApplierImpl {
+public:
+ TestApplyOplogGroupApplier(ReplicationConsistencyMarkers* consistencyMarkers,
+ StorageInterface* storageInterface,
+ const OplogApplier::Options& options)
+ : OplogApplierImpl(nullptr,
+ nullptr,
+ nullptr,
+ nullptr,
+ consistencyMarkers,
+ storageInterface,
+ options,
+ nullptr) {}
+ using OplogApplierImpl::applyOplogGroup;
+};
+
/**
* OpObserver for OplogApplierImpl test fixture.
*/
@@ -116,14 +136,6 @@ protected:
ServiceContext* serviceContext;
OplogApplierImplOpObserver* _opObserver = nullptr;
- // Implements the OplogApplierImpl::ApplyGroupFn interface and does nothing.
- static Status noopApplyOperationFn(OperationContext*,
- MultiApplier::OperationPtrs*,
- OplogApplierImpl* oai,
- WorkerMultikeyPathInfo*) {
- return Status::OK();
- }
-
OpTime nextOpTime() {
static long long lastSecond = 1;
return OpTime(Timestamp(Seconds(lastSecond++), 0), 1LL);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 1c7603ca075..5be36ed3881 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -223,7 +223,6 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
replCoord,
_replicationProcess->getConsistencyMarkers(),
_storageInterface,
- applyOplogGroup,
OplogApplier::Options(OplogApplication::Mode::kSecondary),
_writerPool.get());
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index a38c75a3c6d..95f05e9d52c 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -363,7 +363,6 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
ReplicationCoordinator::get(opCtx),
_consistencyMarkers,
_storageInterface,
- applyOplogGroup,
OplogApplier::Options(OplogApplication::Mode::kRecovering),
writerPool.get());
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 7981f2abdf5..d0923437470 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -59,6 +59,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_applier_impl.h"
+#include "mongo/db/repl/oplog_applier_impl_test_fixture.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -1312,7 +1313,6 @@ public:
_coordinatorMock,
_consistencyMarkers,
storageInterface,
- repl::applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
writerPool.get());
ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
@@ -1397,7 +1397,6 @@ public:
_coordinatorMock,
_consistencyMarkers,
storageInterface,
- repl::applyOplogGroup,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kInitialSync),
writerPool.get());
auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
@@ -2460,6 +2459,76 @@ public:
}
};
+/**
+ * Test specific OplogApplierImpl subclass that allows for custom applyOplogGroup to be run during
+ * multiApply.
+ */
+class SecondaryReadsDuringBatchApplicationAreAllowedApplier : public repl::OplogApplierImpl {
+public:
+ SecondaryReadsDuringBatchApplicationAreAllowedApplier(
+ executor::TaskExecutor* executor,
+ repl::OplogBuffer* oplogBuffer,
+ Observer* observer,
+ repl::ReplicationCoordinator* replCoord,
+ repl::ReplicationConsistencyMarkers* consistencyMarkers,
+ repl::StorageInterface* storageInterface,
+ const OplogApplier::Options& options,
+ ThreadPool* writerPool,
+ OperationContext* opCtx,
+ Promise<bool>* promise,
+ stdx::future<bool>* taskFuture)
+ : repl::OplogApplierImpl(executor,
+ oplogBuffer,
+ observer,
+ replCoord,
+ consistencyMarkers,
+ storageInterface,
+ options,
+ writerPool),
+ _testOpCtx(opCtx),
+ _promise(promise),
+ _taskFuture(taskFuture) {}
+
+ Status applyOplogGroup(OperationContext* opCtx,
+ repl::MultiApplier::OperationPtrs* operationsToApply,
+ WorkerMultikeyPathInfo* pathInfo) override;
+
+private:
+ // Pointer to the test's op context. This is distinct from the op context used in
+ // applyOplogGroup.
+ OperationContext* _testOpCtx;
+ Promise<bool>* _promise;
+ stdx::future<bool>* _taskFuture;
+};
+
+
+// This apply operation function will block until the reader has tried acquiring a collection lock.
+// This returns BadValue statuses instead of asserting so that the worker threads can cleanly exit
+// and this test case fails without crashing the entire suite.
+Status SecondaryReadsDuringBatchApplicationAreAllowedApplier::applyOplogGroup(
+ OperationContext* opCtx,
+ repl::MultiApplier::OperationPtrs* operationsToApply,
+ WorkerMultikeyPathInfo* pathInfo) {
+ if (!_testOpCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) {
+ return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"};
+ }
+
+ // Insert the document. A reader without a PBWM lock should not see it yet.
+ auto status = OplogApplierImpl::applyOplogGroup(opCtx, operationsToApply, pathInfo);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Signals the reader to acquire a collection read lock.
+ _promise->emplaceValue(true);
+
+ // Block while holding the PBWM lock until the reader is done.
+ if (!_taskFuture->get()) {
+ return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"};
+ }
+ return Status::OK();
+}
+
class SecondaryReadsDuringBatchApplicationAreAllowed : public StorageTimestampTest {
public:
void run() {
@@ -2498,55 +2567,28 @@ public:
taskThread.join();
});
- // This apply operation function will block until the reader has tried acquiring a
- // collection lock. This returns BadValue statuses instead of asserting so that the worker
- // threads can cleanly exit and this test case fails without crashing the entire suite.
- auto applyOperationFn = [&](OperationContext* opCtx,
- std::vector<const repl::OplogEntry*>* operationsToApply,
- repl::OplogApplierImpl* oa,
- std::vector<MultikeyPathInfo>* pathInfo) -> Status {
- if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode,
- MODE_X)) {
- return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"};
- }
-
- // Insert the document. A reader without a PBWM lock should not see it yet.
- auto status = repl::applyOplogGroup(opCtx, operationsToApply, oa, pathInfo);
- if (!status.isOK()) {
- return status;
- }
-
- // Signals the reader to acquire a collection read lock.
- batchInProgress.promise.emplaceValue(true);
-
- // Block while holding the PBWM lock until the reader is done.
- if (!taskFuture.get()) {
- return {ErrorCodes::BadValue, "Client was holding PBWM lock in MODE_IS"};
- }
- return Status::OK();
- };
-
// Make a simple insert operation.
BSONObj doc0 = BSON("_id" << 0 << "a" << 0);
auto insertOp = repl::OplogEntry(BSON("ts" << futureTs << "t" << 1LL << "v" << 2 << "op"
<< "i"
<< "ns" << ns.ns() << "ui" << uuid << "wall"
<< Date_t() << "o" << doc0));
-
DoNothingOplogApplierObserver observer;
// Apply the operation.
auto storageInterface = repl::StorageInterface::get(_opCtx);
auto writerPool = repl::makeReplWriterPool(1);
- repl::OplogApplierImpl oplogApplier(
+ SecondaryReadsDuringBatchApplicationAreAllowedApplier oplogApplier(
nullptr, // task executor. not required for multiApply().
nullptr, // oplog buffer. not required for multiApply().
&observer,
_coordinatorMock,
_consistencyMarkers,
storageInterface,
- applyOperationFn,
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
- writerPool.get());
+ writerPool.get(),
+ _opCtx,
+ &(batchInProgress.promise),
+ &taskFuture);
auto lastOpTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, {insertOp}));
ASSERT_EQ(insertOp.getOpTime(), lastOpTime);