summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-09-12 19:59:08 +0000
committerevergreen <evergreen@mongodb.com>2019-09-12 19:59:08 +0000
commite7fa33abb338eda55653fe61665b5f17cc22b4d3 (patch)
tree321d255edc14c674a4d49df1e9f89efbd90eb344 /src
parentb40e542972c082e85098c09298eb56436bf57abb (diff)
downloadmongo-e7fa33abb338eda55653fe61665b5f17cc22b4d3.tar.gz
SERVER-42995 Remove redundant SyncTail methods
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp205
-rw-r--r--src/mongo/db/repl/sync_tail.h27
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp18
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp16
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h4
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp5
7 files changed, 154 insertions, 123 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index e6405c3fe11..860505e3e7c 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -52,7 +52,7 @@ void OplogApplierImpl::_run(OplogBuffer* oplogBuffer) {
auto getNextApplierBatchFn = [this](OperationContext* opCtx, const BatchLimits& batchLimits) {
return getNextApplierBatch(opCtx, batchLimits);
};
- _syncTail.oplogApplication(oplogBuffer, getNextApplierBatchFn, _replCoord);
+ _syncTail.runLoop(oplogBuffer, getNextApplierBatchFn, _replCoord);
}
void OplogApplierImpl::_shutdown() {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 165ab31c7a3..af355139e1e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -463,6 +463,69 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx,
}
}
+/**
+ * Updates a CRUD op's hash and isForCappedCollection field if necessary.
+ */
+void processCrudOp(OperationContext* opCtx,
+ OplogEntry* op,
+ uint32_t* hash,
+ StringMapHashedKey* hashedNs) {
+ const auto serviceContext = opCtx->getServiceContext();
+ const auto storageEngine = serviceContext->getStorageEngine();
+ const bool supportsDocLocking = storageEngine->supportsDocLocking();
+ CachedCollectionProperties collPropertiesCache;
+ auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, *hashedNs);
+
+ // For doc locking engines, include the _id of the document in the hash so we get
+ // parallelism even if all writes are to a single collection.
+ //
+ // For capped collections, this is illegal, since capped collections must preserve
+ // insertion order.
+ if (supportsDocLocking && !collProperties.isCapped) {
+ BSONElement id = op->getIdElement();
+ BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
+ collProperties.collator);
+ const size_t idHash = elementHasher.hash(id);
+ MurmurHash3_x86_32(&idHash, sizeof(idHash), *hash, hash);
+ }
+
+ if (op->getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
+ // Mark capped collection ops before storing them to ensure we do not attempt to
+ // bulk insert them.
+ op->isForCappedCollection = true;
+ }
+}
+
+/**
+ * Adds a single oplog entry to the appropriate writer vector.
+ */
+void addToWriterVector(OplogEntry* op,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ uint32_t hash) {
+ const uint32_t numWriters = writerVectors->size();
+ auto& writer = (*writerVectors)[hash % numWriters];
+ if (writer.empty()) {
+ writer.reserve(8); // Skip a few growth rounds
+ }
+ writer.push_back(op);
+}
+
+/**
+ * Adds a set of derivedOps to writerVectors.
+ */
+void addDerivedOps(OperationContext* opCtx,
+ MultiApplier::Operations* derivedOps,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors) {
+ for (auto&& op : *derivedOps) {
+ auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns());
+ uint32_t hash = static_cast<uint32_t>(hashedNs.hash());
+ if (op.isCrudOpType()) {
+ processCrudOp(opCtx, &op, &hash, &hashedNs);
+ }
+ addToWriterVector(&op, writerVectors, hash);
+ }
+}
+
} // namespace
class SyncTail::OpQueueBatcher {
@@ -627,20 +690,15 @@ private:
stdx::thread _thread; // Must be last so all other members are initialized before starting.
};
-void SyncTail::oplogApplication(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord) {
+void SyncTail::runLoop(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
+ ReplicationCoordinator* replCoord) {
// We don't start data replication for arbiters at all and it's not allowed to reconfig
// arbiterOnly field for any member.
invariant(!replCoord->getMemberState().arbiter());
OpQueueBatcher batcher(this, _storageInterface, oplogBuffer, getNextApplierBatchFn);
- _oplogApplication(replCoord, &batcher);
-}
-
-void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
- OpQueueBatcher* batcher) noexcept {
std::unique_ptr<ApplyBatchFinalizer> finalizer{
getGlobalServiceContext()->getStorageEngine()->isDurable()
? new ApplyBatchFinalizerForJournal(replCoord)
@@ -683,7 +741,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
- OpQueue ops = batcher->getNextBatch(Seconds(1));
+ OpQueue ops = batcher.getNextBatch(Seconds(1));
if (ops.empty()) {
if (ops.mustShutdown()) {
// Shut down and exit oplog application loop.
@@ -780,6 +838,10 @@ bool SyncTail::inShutdown() const {
Status syncApply(OperationContext* opCtx,
const OplogEntryBatch& batch,
OplogApplication::Mode oplogApplicationMode) {
+ // Guarantees that syncApply's context matches that of its calling function, multiSyncApply.
+ invariant(!opCtx->writesAreReplicated());
+ invariant(documentValidationDisabled(opCtx));
+
auto op = batch.getOp();
// Count each log op application as a separate operation, for reporting purposes
CurOp individualOp(opCtx);
@@ -789,11 +851,6 @@ Status syncApply(OperationContext* opCtx,
auto incrementOpsAppliedStats = [] { opsAppliedStats.increment(1); };
auto applyOp = [&](Database* db) {
- // For non-initial-sync, we convert updates to upserts
- // to suppress errors when replaying oplog entries.
- UnreplicatedWritesBlock uwb(opCtx);
- DisableDocumentValidation validationDisabler(opCtx);
-
// We convert updates to upserts when not in initial sync because after rollback and during
// startup we may replay an update after a delete and crash since we do not ignore
// errors. In initial sync we simply ignore these update errors so there is no reason to
@@ -974,18 +1031,13 @@ Status multiSyncApply(OperationContext* opCtx,
* with transactions.
* sessionUpdateTracker - if provided, keeps track of session info from ops.
*/
-void SyncTail::_fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) noexcept {
- const auto serviceContext = opCtx->getServiceContext();
- const auto storageEngine = serviceContext->getStorageEngine();
+void SyncTail::_deriveOpsAndFillWriterVectors(
+ OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps,
+ SessionUpdateTracker* sessionUpdateTracker) noexcept {
- const bool supportsDocLocking = storageEngine->supportsDocLocking();
- const uint32_t numWriters = writerVectors->size();
-
- CachedCollectionProperties collPropertiesCache;
LogicalSessionIdMap<std::vector<OplogEntry*>> partialTxnOps;
for (auto&& op : *ops) {
@@ -1006,10 +1058,11 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
}
}
+
// If this entry is part of a multi-oplog-entry transaction, ignore it until the commit.
// We must save it here because we are not guaranteed it has been written to the oplog
// yet.
@@ -1030,29 +1083,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
partialTxnList.clear();
}
- if (op.isCrudOpType()) {
- auto collProperties = collPropertiesCache.getCollectionProperties(opCtx, hashedNs);
-
- // For doc locking engines, include the _id of the document in the hash so we get
- // parallelism even if all writes are to a single collection.
- //
- // For capped collections, this is illegal, since capped collections must preserve
- // insertion order.
- if (supportsDocLocking && !collProperties.isCapped) {
- BSONElement id = op.getIdElement();
- BSONElementComparator elementHasher(BSONElementComparator::FieldNamesMode::kIgnore,
- collProperties.collator);
- const size_t idHash = elementHasher.hash(id);
- MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
- }
-
- if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
- // Mark capped collection ops before storing them to ensure we do not attempt to
- // bulk insert them.
- op.isForCappedCollection = true;
- }
- }
-
+ if (op.isCrudOpType())
+ processCrudOp(opCtx, &op, &hash, &hashedNs);
// Extract applyOps operations and fill writers with extracted operations using this
// function.
if (op.isTerminalApplyOps()) {
@@ -1070,14 +1102,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
partialTxnList.clear();
// Transaction entries cannot have different session updates.
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
+
derivedOps->emplace_back(ApplyOps::extractOperations(op));
// Nested entries cannot have different session updates.
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
}
continue;
}
@@ -1093,15 +1126,11 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
partialTxnList.clear();
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors);
continue;
}
- auto& writer = (*writerVectors)[hash % numWriters];
- if (writer.empty()) {
- writer.reserve(8); // Skip a few growth rounds
- }
- writer.push_back(&op);
+ addToWriterVector(&op, writerVectors, hash);
}
}
@@ -1109,41 +1138,15 @@ void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps) noexcept {
+
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+ _deriveOpsAndFillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
- }
-}
-
-void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
- std::vector<Status>* statusVector,
- std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo) {
- invariant(writerVectors.size() == statusVector->size());
- for (size_t i = 0; i < writerVectors.size(); i++) {
- if (writerVectors[i].empty())
- continue;
-
- _writerPool->schedule(
- [this,
- &writer = writerVectors.at(i),
- &status = statusVector->at(i),
- &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i)](auto scheduleStatus) {
- invariant(scheduleStatus);
-
- auto opCtx = cc().makeOperationContext();
-
- // This code path is only executed on secondaries and initial syncing nodes, so it
- // is safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
-
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo);
- });
- });
+ _deriveOpsAndFillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
}
}
@@ -1212,7 +1215,33 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
{
std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
- _applyOps(writerVectors, &statusVector, &multikeyVector);
+
+ // Doles out all the work to the writer pool threads. writerVectors is not modified,
+ // but multiSyncApply will modify the vectors that it contains.
+ invariant(writerVectors.size() == statusVector.size());
+ for (size_t i = 0; i < writerVectors.size(); i++) {
+ if (writerVectors[i].empty())
+ continue;
+
+ _writerPool->schedule(
+ [this,
+ &writer = writerVectors.at(i),
+ &status = statusVector.at(i),
+ &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
+ invariant(scheduleStatus);
+
+ auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes,
+ // so it is safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
+ status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
+ return _applyFunc(opCtx.get(), &writer, this, &multikeyVector);
+ });
+ });
+ }
+
_writerPool->waitForIdle();
// If any of the statuses is not ok, return error.
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index b619851738c..770663e9dab 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -99,9 +99,9 @@ public:
* Retrieves operations from the OplogBuffer in batches that will be applied in parallel using
* multiApply().
*/
- void oplogApplication(OplogBuffer* oplogBuffer,
- OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
- ReplicationCoordinator* replCoord);
+ void runLoop(OplogBuffer* oplogBuffer,
+ OplogApplier::GetNextApplierBatchFn getNextApplierBatchFn,
+ ReplicationCoordinator* replCoord);
/**
* Shuts down oplogApplication() processing.
@@ -219,26 +219,15 @@ public:
private:
class OpQueueBatcher;
- void _oplogApplication(ReplicationCoordinator* replCoord, OpQueueBatcher* batcher) noexcept;
-
- void _fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) noexcept;
-
- /**
- * Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
- * non-const pointers to inner vectors into func.
- */
- void _applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
- std::vector<Status>* statusVector,
- std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo);
-
OplogApplier::Observer* const _observer;
ReplicationConsistencyMarkers* const _consistencyMarkers;
StorageInterface* const _storageInterface;
+ void _deriveOpsAndFillWriterVectors(OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ std::vector<MultiApplier::Operations>* derivedOps,
+ SessionUpdateTracker* sessionUpdateTracker) noexcept;
// Function to use during applyOps
MultiSyncApplyFunc _applyFunc;
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index bd445689df9..4c8f44ad8d8 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -191,7 +191,7 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) {
NamespaceString nss("test.t");
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
+ ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
@@ -206,7 +206,7 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) {
createDatabase(_opCtx.get(), nss.db());
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, kUuid);
- ASSERT_THROWS(syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
+ ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
@@ -225,7 +225,7 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) {
// which in the case of this test just ignores such errors. This tests mostly that we don't
// implicitly create the collection and lock the database in MODE_X.
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
+ ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
ExceptionFor<ErrorCodes::NamespaceNotFound>);
ASSERT_FALSE(collectionExists(_opCtx.get(), nss));
}
@@ -292,15 +292,11 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
applyCmdCalled = true;
ASSERT_TRUE(opCtx);
ASSERT_TRUE(opCtx->lockState()->isDbLockedForMode(nss.db(), MODE_X));
- ASSERT_TRUE(opCtx->writesAreReplicated());
- ASSERT_FALSE(documentValidationDisabled(opCtx));
ASSERT_EQUALS(nss, collNss);
return Status::OK();
};
- ASSERT_TRUE(_opCtx->writesAreReplicated());
- ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
auto entry = OplogEntry(op);
- ASSERT_OK(syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kInitialSync));
ASSERT_TRUE(applyCmdCalled);
}
@@ -2199,7 +2195,7 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary));
+ ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary));
// Use a builder for easier escaping. We expect the operation to be logged.
StringBuilder expected;
@@ -2221,7 +2217,7 @@ TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_THROWS(syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary),
+ ASSERT_THROWS(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary),
ExceptionFor<ErrorCodes::NamespaceNotFound>);
// Use a builder for easier escaping. We expect the operation to *not* be logged
@@ -2245,7 +2241,7 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(syncApply(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary));
+ ASSERT_OK(_syncApplyWrapper(_opCtx.get(), &entry, OplogApplication::Mode::kSecondary));
// Use a builder for easier escaping. We expect the operation to *not* be logged,
// since it wasn't slow to apply.
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 0af98d628b6..4bca63586f3 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -138,6 +138,16 @@ StorageInterface* SyncTailTest::getStorageInterface() const {
return StorageInterface::get(serviceContext);
}
+// Since syncApply is being tested outside of its calling function (multiSyncApply), we recreate the
+// necessary calling context.
+Status SyncTailTest::_syncApplyWrapper(OperationContext* opCtx,
+ const OplogEntryBatch& batch,
+ OplogApplication::Mode oplogApplicationMode) {
+ UnreplicatedWritesBlock uwb(opCtx);
+ DisableDocumentValidation validationDisabler(opCtx);
+ return syncApply(opCtx, batch, oplogApplicationMode);
+}
+
void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
const OplogEntry& op,
bool expectedApplyOpCalled) {
@@ -176,9 +186,9 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
ASSERT_BSONOBJ_EQ(op.getObject(), *deletedDoc);
return Status::OK();
};
- ASSERT_TRUE(_opCtx->writesAreReplicated());
- ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_EQ(syncApply(_opCtx.get(), &op, OplogApplication::Mode::kSecondary), expectedError);
+
+ ASSERT_EQ(_syncApplyWrapper(_opCtx.get(), &op, OplogApplication::Mode::kSecondary),
+ expectedError);
ASSERT_EQ(applyOpCalled, expectedApplyOpCalled);
}
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index 4cac4f77bbf..4f2965527d1 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -108,6 +108,10 @@ protected:
const OplogEntry& op,
bool expectedApplyOpCalled);
+ Status _syncApplyWrapper(OperationContext* opCtx,
+ const OplogEntryBatch& batch,
+ OplogApplication::Mode oplogApplicationMode);
+
ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<ReplicationConsistencyMarkers> _consistencyMarkers;
ServiceContext* serviceContext;
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index e0086ba1160..597b84fde43 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog/drop_indexes.h"
#include "mongo/db/catalog/index_catalog.h"
@@ -738,8 +739,10 @@ public:
class SecondaryArrayInsertTimes : public StorageTimestampTest {
public:
void run() {
- // In order for oplog application to assign timestamps, we must be in non-replicated mode.
+ // In order for oplog application to assign timestamps, we must be in non-replicated mode
+ // and disable document validation.
repl::UnreplicatedWritesBlock uwb(_opCtx);
+ DisableDocumentValidation validationDisabler(_opCtx);
// Create a new collection.
NamespaceString nss("unittests.timestampedUpdates");