summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/invalidate_images_when_minvalid.js162
-rw-r--r--src/mongo/db/repl/apply_ops.cpp12
-rw-r--r--src/mongo/db/repl/insert_group.cpp6
-rw-r--r--src/mongo/db/repl/insert_group.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp28
-rw-r--r--src/mongo/db/repl/oplog.h1
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp43
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h6
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp35
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp7
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.cpp10
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.h2
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp16
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h3
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp9
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp5
-rw-r--r--src/mongo/dbtests/repltests.cpp3
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp19
18 files changed, 307 insertions, 64 deletions
diff --git a/jstests/replsets/invalidate_images_when_minvalid.js b/jstests/replsets/invalidate_images_when_minvalid.js
new file mode 100644
index 00000000000..dc4c1d8a3b3
--- /dev/null
+++ b/jstests/replsets/invalidate_images_when_minvalid.js
@@ -0,0 +1,162 @@
+/**
+ * When a minvalid timestamp `T` is set, applying a batch of operations earlier than `T` do not have
+ * a consistent snapshot of data. In this case, we must write an invalidated document to the
+ * `config.image_collection` for a retryable `findAndModify` that does not store images in the
+ * oplog. This test ensures this behavior.
+ *
+ * To test this we:
+ * -- Hold the stable timestamp back on a primary
+ * -- Perform a number of retryable `findAndModify`s.
+ * -- Manually set `minvalid` to a value inbetween what performed `findAndModify`s.
+ * -- Restart the primary
+ * -- Verify that operations after the `minvalid` timestamp are marked "valid" and those prior are
+ * marked "invalid". We set the `replBatchLimitOperations` to one to achieve this. This is
+ * necessary for the test due to manually setting minvalid. In production `minvalid` should
+ * always be unset, or set to the top of oplog.
+ *
+ * Because we restart the node, this only works on storage engines that persist data.
+ *
+ * @tags: [multiversion_incompatible, requires_majority_read_concern, requires_persistence]
+ */
+
+// Skip db hash check because replset cannot reach consistent state.
+TestData.skipCheckDBHashes = true;
+
+(function() {
+"use strict";
+
+let replTest = new ReplSetTest({name: "invalidate_images_when_minvalid", nodes: 1});
+
+let nodes = replTest.startSet({
+ setParameter: {
+ featureFlagRetryableFindAndModify: true,
+ storeFindAndModifyImagesInSideCollection: true,
+ replBatchLimitOperations: 1
+ }
+});
+replTest.initiate();
+let primary = replTest.getPrimary();
+let coll = primary.getDB("test")["invalidating"];
+let images = primary.getDB("config")["image_collection"];
+let minvalid = primary.getDB("local")["replset.minvalid"];
+
+// Pause the WT stable timestamp to have a restart perform replication recovery on every operation
+// in the test.
+assert.commandWorked(primary.adminCommand({
+ "configureFailPoint": 'WTPauseStableTimestamp',
+ "mode": 'alwaysOn',
+}));
+
+function doRetryableFindAndModify(lsid, query, postImage, remove) {
+ // Performs a retryable findAndModify. The document matched by query must exist. The
+ // findAndModify will either be an update that sets the documents `updated: 1` or
+ // removes the document. Returns the timestamp associated with the generated oplog entry.
+ //
+ // `postImage` and `remove` are booleans. The server rejects removes that ask for a post image.
+ let cmd = {
+ findandmodify: coll.getName(),
+ lsid: {id: lsid},
+ txnNumber: NumberLong(1),
+ stmtId: NumberInt(0),
+ query: query,
+ new: postImage,
+ upsert: false,
+ };
+
+ if (remove) {
+ cmd["remove"] = true;
+ } else {
+ cmd["update"] = {$set: {updated: 1}};
+ }
+
+ return assert.commandWorked(coll.runCommand(cmd))["operationTime"];
+}
+
+// Each write contains arguments for calling `doRetryableFindAndModify`.
+let invalidatedWrites = [
+ {uuid: UUID(), query: {_id: 1}, postImage: false, remove: false},
+ {uuid: UUID(), query: {_id: 2}, postImage: true, remove: false},
+ {uuid: UUID(), query: {_id: 3}, postImage: false, remove: true}
+];
+let validWrites = [
+ {uuid: UUID(), query: {_id: 4}, postImage: false, remove: false},
+ {uuid: UUID(), query: {_id: 5}, postImage: true, remove: false},
+ {uuid: UUID(), query: {_id: 6}, postImage: false, remove: true}
+];
+
+// Insert each document a query should match.
+for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ assert.commandWorked(coll.insert(invalidatedWrites[idx]["query"]));
+}
+for (let idx = 0; idx < validWrites.length; ++idx) {
+ assert.commandWorked(coll.insert(validWrites[idx]["query"]));
+}
+
+// Perform `findAndModify`s. Record the timestamp of the last `invalidatedWrites` to set minvalid
+// with.
+let lastInvalidatedImageTs = null;
+for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ let write = invalidatedWrites[idx];
+ lastInvalidatedImageTs = doRetryableFindAndModify(
+ write['uuid'], write['query'], write['postImage'], write['remove']);
+}
+for (let idx = 0; idx < validWrites.length; ++idx) {
+ let write = validWrites[idx];
+ doRetryableFindAndModify(write['uuid'], write['query'], write['postImage'], write['remove']);
+}
+
+let imageDocs = [];
+images.find().forEach((x) => {
+ imageDocs.push(x);
+});
+
+jsTestLog({"MinValid": lastInvalidatedImageTs, "Pre-restart images": imageDocs});
+assert.commandWorked(minvalid.update({}, {$set: {ts: lastInvalidatedImageTs}}));
+
+replTest.restart(primary, undefined, true);
+
+primary = replTest.getPrimary();
+coll = primary.getDB("test")["invalidating"];
+images = primary.getDB("config")["image_collection"];
+minvalid = primary.getDB("local")["replset.minvalid"];
+
+imageDocs = [];
+images.find().forEach((x) => {
+ imageDocs.push(x);
+});
+jsTestLog({"Post-restart images": imageDocs});
+
+for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ let write = invalidatedWrites[idx];
+ let image = images.findOne({"_id.id": write["uuid"]});
+
+ assert.eq(1, image["txnNum"]);
+ assert.eq(true, image["invalidated"]);
+ assert.eq("minvalid suggests inconsistent snapshot", image["invalidatedReason"]);
+ if (write["postImage"]) {
+ assert.eq("postImage", image["imageKind"]);
+ } else {
+ assert.eq("preImage", image["imageKind"]);
+ }
+}
+
+for (let idx = 0; idx < validWrites.length; ++idx) {
+ let write = validWrites[idx];
+ let image = images.findOne({"_id.id": write["uuid"]});
+
+ assert.eq(1, image["txnNum"]);
+ assert.eq(false, image["invalidated"]);
+ if (write["postImage"]) {
+ assert.eq("postImage", image["imageKind"]);
+
+ let postImage = write["query"];
+ postImage["updated"] = 1;
+ assert.eq(postImage, image["image"]);
+ } else {
+ assert.eq("preImage", image["imageKind"]);
+ assert.eq(write["query"], image["image"]);
+ }
+}
+
+replTest.stopSet();
+})();
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 840cd23db3b..bc2ff044008 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -186,8 +186,9 @@ Status _applyOps(OperationContext* opCtx,
OldClientContext ctx(opCtx, nss.ns());
const auto& op = entry.getValue();
+ const bool isDataConsistent = true;
status = repl::applyOperation_inlock(
- opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode);
+ opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode, isDataConsistent);
if (!status.isOK())
return status;
@@ -251,8 +252,13 @@ Status _applyOps(OperationContext* opCtx,
// ops doesn't stop the applyOps from trying to process the rest of the
// ops. This is to leave the door open to parallelizing CRUD op
// application in the future.
- return repl::applyOperation_inlock(
- opCtx, ctx.db(), &entry, alwaysUpsert, oplogApplicationMode);
+ const bool isDataConsistent = true;
+ return repl::applyOperation_inlock(opCtx,
+ ctx.db(),
+ &entry,
+ alwaysUpsert,
+ oplogApplicationMode,
+ isDataConsistent);
});
} catch (const DBException& ex) {
ab.append(false);
diff --git a/src/mongo/db/repl/insert_group.cpp b/src/mongo/db/repl/insert_group.cpp
index fc4870d77f8..308d49cd931 100644
--- a/src/mongo/db/repl/insert_group.cpp
+++ b/src/mongo/db/repl/insert_group.cpp
@@ -59,11 +59,13 @@ constexpr auto kInsertGroupMaxOpCount = 64;
InsertGroup::InsertGroup(std::vector<const OplogEntry*>* ops,
OperationContext* opCtx,
InsertGroup::Mode mode,
+ const bool isDataConsistent,
ApplyFunc applyOplogEntryOrGroupedInserts)
: _doNotGroupBeforePoint(ops->cbegin()),
_end(ops->cend()),
_opCtx(opCtx),
_mode(mode),
+ _isDataConsistent(isDataConsistent),
_applyOplogEntryOrGroupedInserts(applyOplogEntryOrGroupedInserts) {}
StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(
@@ -131,8 +133,8 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(
// Create an oplog entry group for grouped inserts.
OplogEntryOrGroupedInserts groupedInserts(it, endOfGroupableOpsIterator);
try {
- // Apply the group of inserts by passing in groupedInserts.
- uassertStatusOK(_applyOplogEntryOrGroupedInserts(_opCtx, groupedInserts, _mode));
+ uassertStatusOK(
+ _applyOplogEntryOrGroupedInserts(_opCtx, groupedInserts, _mode, _isDataConsistent));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
return endOfGroupableOpsIterator - 1;
diff --git a/src/mongo/db/repl/insert_group.h b/src/mongo/db/repl/insert_group.h
index f89e8c016b5..4586ee997ab 100644
--- a/src/mongo/db/repl/insert_group.h
+++ b/src/mongo/db/repl/insert_group.h
@@ -51,12 +51,13 @@ public:
using ConstIterator = std::vector<const OplogEntry*>::const_iterator;
using Mode = OplogApplication::Mode;
typedef std::function<Status(
- OperationContext*, const OplogEntryOrGroupedInserts&, OplogApplication::Mode)>
+ OperationContext*, const OplogEntryOrGroupedInserts&, OplogApplication::Mode, bool)>
ApplyFunc;
InsertGroup(std::vector<const OplogEntry*>* ops,
OperationContext* opCtx,
Mode mode,
+ bool isDataConsistent,
ApplyFunc applyOplogEntryOrGroupedInserts);
/**
@@ -77,6 +78,7 @@ private:
// Passed to _applyOplogEntryOrGroupedInserts when applying grouped inserts.
OperationContext* _opCtx;
Mode _mode;
+ bool _isDataConsistent;
// The function that does the actual oplog application.
ApplyFunc _applyOplogEntryOrGroupedInserts;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index dce50640e27..c5aefc7b346 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -164,6 +164,16 @@ void applyImportCollectionDefault(OperationContext* opCtx,
"isDryRun"_attr = isDryRun);
}
+StringData getInvalidatingReason(const OplogApplication::Mode mode, const bool isDataConsistent) {
+ if (mode == OplogApplication::Mode::kInitialSync) {
+ return "initial sync"_sd;
+ } else if (!isDataConsistent) {
+ return "minvalid suggests inconsistent snapshot"_sd;
+ }
+
+ return ""_sd;
+}
+
} // namespace
ApplyImportCollectionFn applyImportCollection = applyImportCollectionDefault;
@@ -248,6 +258,7 @@ void writeToImageCollection(OperationContext* opCtx,
const Timestamp timestamp,
repl::RetryImageEnum imageKind,
const BSONObj& dataImage,
+ const StringData& invalidatedReason,
bool* upsertConfigImage) {
AutoGetCollection autoColl(opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
repl::ImageEntry imageEntry;
@@ -258,7 +269,7 @@ void writeToImageCollection(OperationContext* opCtx,
imageEntry.setImage(dataImage);
if (dataImage.isEmpty()) {
imageEntry.setInvalidated(true);
- imageEntry.setInvalidatedReason("initial sync"_sd);
+ imageEntry.setInvalidatedReason(invalidatedReason);
}
UpdateRequest request;
@@ -1084,6 +1095,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& opOrGroupedInserts,
bool alwaysUpsert,
OplogApplication::Mode mode,
+ const bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
// Get the single oplog entry to be applied or the first oplog entry of grouped inserts.
auto op = opOrGroupedInserts.getOp();
@@ -1432,7 +1444,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
request.setUpdateModification(std::move(updateMod));
request.setUpsert(upsert);
request.setFromOplogApplication(true);
- if (mode != OplogApplication::Mode::kInitialSync) {
+ if (mode != OplogApplication::Mode::kInitialSync && isDataConsistent) {
if (op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage) {
request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD);
} else if (op.getNeedsRetryImage() == repl::RetryImageEnum::kPostImage) {
@@ -1560,6 +1572,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
// initial sync, the value passed in here is conveniently
// the empty BSONObj.
ur.requestedDocImage,
+ getInvalidatingReason(mode, isDataConsistent),
&upsertConfigImage);
}
@@ -1611,7 +1624,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
request.setNsString(requestNss);
request.setQuery(deleteCriteria);
if (mode != OplogApplication::Mode::kInitialSync &&
- op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage) {
+ op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage &&
+ isDataConsistent) {
// When in initial sync, we'll pass an empty image into
// `writeToImageCollection`.
request.setReturnDeleted(true);
@@ -1624,17 +1638,13 @@ Status applyOperation_inlock(OperationContext* opCtx,
// isn't strictly necessary for correctness -- the `config.transactions` table
// is responsible for whether to retry. The motivation here is to simply reduce
// the number of states related documents in the two collections can be in.
- BSONObj imageDoc;
- if (result.nDeleted > 0 && mode != OplogApplication::Mode::kInitialSync) {
- imageDoc = result.requestedPreImage.get();
- }
-
writeToImageCollection(opCtx,
op.getSessionId().get(),
op.getTxnNumber().get(),
op.getTimestamp(),
repl::RetryImageEnum::kPreImage,
- imageDoc,
+ result.requestedPreImage.value_or(BSONObj()),
+ getInvalidatingReason(mode, isDataConsistent),
&upsertConfigImage);
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 37b9f29fd98..e975ee42095 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -215,6 +215,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& opOrGroupedInserts,
bool alwaysUpsert,
OplogApplication::Mode mode,
+ const bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {});
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 7604af715dc..2781cf72635 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -494,6 +494,10 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx);
}
+ // Read `minValid` prior to it possibly being written to.
+ const bool isDataConsistent =
+ _consistencyMarkers->getMinValid(opCtx) < ops.front().getOpTime();
+
// Reset consistency markers in case the node fails while applying ops.
if (!getOptions().skipWritesToOplog) {
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
@@ -501,9 +505,9 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
}
{
+
std::vector<Status> statusVector(_writerPool->getStats().options.maxThreads,
Status::OK());
-
// Doles out all the work to the writer pool threads. writerVectors is not modified,
// but applyOplogBatchPerWorker will modify the vectors that it contains.
invariant(writerVectors.size() == statusVector.size());
@@ -511,24 +515,25 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
if (writerVectors[i].empty())
continue;
- _writerPool->schedule(
- [this,
- &writer = writerVectors.at(i),
- &status = statusVector.at(i),
- &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) {
- invariant(scheduleStatus);
+ _writerPool->schedule([this,
+ &writer = writerVectors.at(i),
+ &status = statusVector.at(i),
+ &multikeyVector = multikeyVector.at(i),
+ isDataConsistent = isDataConsistent](auto scheduleStatus) {
+ invariant(scheduleStatus);
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = cc().makeOperationContext();
- // This code path is only executed on secondaries and initial syncing nodes,
- // so it is safe to exclude any writes from Flow Control.
- opCtx->setShouldParticipateInFlowControl(false);
- opCtx->setEnforceConstraints(false);
+ // This code path is only executed on secondaries and initial syncing nodes,
+ // so it is safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+ opCtx->setEnforceConstraints(false);
- status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
- return applyOplogBatchPerWorker(opCtx.get(), &writer, &multikeyVector);
- });
+ status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] {
+ return applyOplogBatchPerWorker(
+ opCtx.get(), &writer, &multikeyVector, isDataConsistent);
});
+ });
}
_writerPool->waitForIdle();
@@ -745,7 +750,8 @@ void OplogApplierImpl::fillWriterVectors_forTest(
Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
- OplogApplication::Mode oplogApplicationMode) {
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent) {
// Guarantees that applyOplogEntryOrGroupedInserts' context matches that of its calling
// function, applyOplogBatchPerWorker.
invariant(!opCtx->writesAreReplicated());
@@ -768,6 +774,7 @@ Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx,
entryOrGroupedInserts,
oplogApplicationMode,
+ isDataConsistent,
incrementOpsAppliedStats,
&replOpCounters);
@@ -793,7 +800,8 @@ Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
Status OplogApplierImpl::applyOplogBatchPerWorker(OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent) {
UnreplicatedWritesBlock uwb(opCtx);
// Since we swap the locker in stash / unstash transaction resources,
// ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been
@@ -820,6 +828,7 @@ Status OplogApplierImpl::applyOplogBatchPerWorker(OperationContext* opCtx,
ops,
getOptions().mode,
getOptions().allowNamespaceNotFoundErrorsOnCrudOps,
+ isDataConsistent,
&applyOplogEntryOrGroupedInserts);
if (!status.isOK())
return status;
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index 9eb1b15a169..574965802fc 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -140,7 +140,8 @@ protected:
*/
virtual Status applyOplogBatchPerWorker(OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo);
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent);
};
/**
@@ -148,7 +149,8 @@ protected:
*/
Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
- OplogApplication::Mode oplogApplicationMode);
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index fec78085aa2..eef15241e05 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -374,7 +374,8 @@ public:
Status applyOplogBatchPerWorker(OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) override;
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ bool isDataConsistent) override;
std::vector<OplogEntry> getOperationsApplied() {
stdx::lock_guard lk(_mutex);
@@ -390,7 +391,8 @@ private:
Status TrackOpsAppliedApplier::applyOplogBatchPerWorker(
OperationContext* opCtx,
std::vector<const OplogEntry*>* ops,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent) {
stdx::lock_guard lk(_mutex);
for (auto&& opPtr : *ops) {
_operationsApplied.push_back(*opPtr);
@@ -479,7 +481,9 @@ TEST_F(OplogApplierImplTest,
TestApplyOplogGroupApplier oplogApplier(
nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent));
// Collection should be created after applyOplogEntryOrGroupedInserts() processes operation.
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
@@ -1470,7 +1474,8 @@ void testWorkerMultikeyPaths(OperationContext* opCtx,
nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
std::vector<const OplogEntry*> ops = {&op};
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(opCtx, &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(opCtx, &ops, &pathInfo, dataIsConsistent));
ASSERT_EQ(pathInfo.size(), numPaths);
}
@@ -1536,7 +1541,9 @@ TEST_F(OplogApplierImplTest, OplogApplicationThreadFuncAddsMultipleWorkerMultike
nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
WorkerMultikeyPathInfo pathInfo;
std::vector<const OplogEntry*> ops = {&opA, &opB};
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent));
ASSERT_EQ(pathInfo.size(), 2UL);
}
}
@@ -1583,8 +1590,10 @@ TEST_F(OplogApplierImplTest, OplogApplicationThreadFuncFailsWhenCollectionCreati
TestApplyOplogGroupApplier oplogApplier(
nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary));
std::vector<const OplogEntry*> ops = {&op};
- ASSERT_EQUALS(ErrorCodes::InvalidOptions,
- oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, nullptr));
+ const bool dataIsConsistent = true;
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidOptions,
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, nullptr, dataIsConsistent));
}
TEST_F(OplogApplierImplTest,
@@ -1968,7 +1977,9 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
std::vector<const OplogEntry*> ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent));
// Since the document was missing when we cloned data from the sync source, the collection
// referenced by the failed operation should not be automatically created.
@@ -1991,7 +2002,9 @@ TEST_F(OplogApplierImplTest,
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
std::vector<const OplogEntry*> ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent));
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
@@ -2019,7 +2032,9 @@ TEST_F(OplogApplierImplTest,
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
std::vector<const OplogEntry*> ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo));
+ const bool dataIsConsistent = true;
+ ASSERT_OK(
+ oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent));
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
index c7c6ecb8d82..cda89640fdf 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -208,7 +208,8 @@ Status OplogApplierImplTest::_applyOplogEntryOrGroupedInsertsWrapper(
OplogApplication::Mode oplogApplicationMode) {
UnreplicatedWritesBlock uwb(opCtx);
DisableDocumentValidation validationDisabler(opCtx);
- return applyOplogEntryOrGroupedInserts(opCtx, batch, oplogApplicationMode);
+ const bool dataIsConsistent = true;
+ return applyOplogEntryOrGroupedInserts(opCtx, batch, oplogApplicationMode, dataIsConsistent);
}
void OplogApplierImplTest::_testApplyOplogEntryOrGroupedInsertsCrudOperation(
@@ -285,7 +286,9 @@ Status OplogApplierImplTest::runOpsSteadyState(std::vector<OplogEntry> ops) {
opsPtrs.push_back(&op);
}
WorkerMultikeyPathInfo pathInfo;
- return oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &opsPtrs, &pathInfo);
+ const bool dataIsConsistent = true;
+ return oplogApplier.applyOplogBatchPerWorker(
+ _opCtx.get(), &opsPtrs, &pathInfo, dataIsConsistent);
}
Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) {
diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp
index 82caae399c7..a6ed6f8a0ed 100644
--- a/src/mongo/db/repl/oplog_applier_utils.cpp
+++ b/src/mongo/db/repl/oplog_applier_utils.cpp
@@ -189,6 +189,7 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats,
OpCounters* opCounters) {
invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled());
@@ -230,6 +231,7 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
entryOrGroupedInserts,
shouldAlwaysUpsert,
oplogApplicationMode,
+ isDataConsistent,
incrementOpsAppliedStats);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
@@ -276,6 +278,7 @@ Status OplogApplierUtils::applyOplogBatchCommon(
std::vector<const OplogEntry*>* ops,
OplogApplication::Mode oplogApplicationMode,
bool allowNamespaceNotFoundErrorsOnCrudOps,
+ const bool isDataConsistent,
InsertGroup::ApplyFunc applyOplogEntryOrGroupedInserts) noexcept {
// We cannot do document validation, because document validation could have been disabled when
@@ -284,7 +287,8 @@ Status OplogApplierUtils::applyOplogBatchCommon(
// Group the operations by namespace in order to get larger groups for bulk inserts, but do not
// mix up the current order of oplog entries within the same namespace (thus *stable* sort).
stableSortByNamespace(ops);
- InsertGroup insertGroup(ops, opCtx, oplogApplicationMode, applyOplogEntryOrGroupedInserts);
+ InsertGroup insertGroup(
+ ops, opCtx, oplogApplicationMode, isDataConsistent, applyOplogEntryOrGroupedInserts);
for (auto it = ops->cbegin(); it != ops->cend(); ++it) {
const OplogEntry& entry = **it;
@@ -299,8 +303,8 @@ Status OplogApplierUtils::applyOplogBatchCommon(
// If we didn't create a group, try to apply the op individually.
try {
- const Status status =
- applyOplogEntryOrGroupedInserts(opCtx, &entry, oplogApplicationMode);
+ const Status status = applyOplogEntryOrGroupedInserts(
+ opCtx, &entry, oplogApplicationMode, isDataConsistent);
if (!status.isOK()) {
// Tried to apply an update operation but the document is missing, there must be
diff --git a/src/mongo/db/repl/oplog_applier_utils.h b/src/mongo/db/repl/oplog_applier_utils.h
index 661597c5e12..05a5a9b47e9 100644
--- a/src/mongo/db/repl/oplog_applier_utils.h
+++ b/src/mongo/db/repl/oplog_applier_utils.h
@@ -124,6 +124,7 @@ public:
OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
OplogApplication::Mode oplogApplicationMode,
+ bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats,
OpCounters* opCounters);
@@ -136,6 +137,7 @@ public:
std::vector<const OplogEntry*>* ops,
OplogApplication::Mode oplogApplicationMode,
bool allowNamespaceNotFoundErrorsOnCrudOps,
+ bool isDataConsistent,
InsertGroup::ApplyFunc applyOplogEntryOrGroupedInserts) noexcept;
};
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index d3a02d2b8eb..477cfd0c029 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -926,7 +926,8 @@ std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVecto
Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts(
OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
- OplogApplication::Mode oplogApplicationMode) {
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent) {
// We must ensure the opCtx uses replicated writes, because that will ensure we get a
// NotWritablePrimary error if a stepdown occurs.
invariant(opCtx->writesAreReplicated());
@@ -975,12 +976,14 @@ Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts(
}
// We don't count tenant application in the ops applied stats.
auto incrementOpsAppliedStats = [] {};
- // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to
- // a cloned database the way initial sync does.
+ // We always use oplog application mode 'kInitialSync' and isDataConsistent 'false', because
+ // we're applying oplog entries to a cloned database the way initial sync does.
+ invariant(isDataConsistent == false);
auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
opCtx,
entryOrGroupedInserts,
OplogApplication::Mode::kInitialSync,
+ isDataConsistent,
incrementOpsAppliedStats,
nullptr /* opCounters*/);
LOGV2_DEBUG(4886009,
@@ -1003,15 +1006,18 @@ Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntr
opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
const bool allowNamespaceNotFoundErrorsOnCrudOps(true);
+ const bool isDataConsistent = false;
auto status = OplogApplierUtils::applyOplogBatchCommon(
opCtx.get(),
ops,
OplogApplication::Mode::kInitialSync,
allowNamespaceNotFoundErrorsOnCrudOps,
+ isDataConsistent,
[this](OperationContext* opCtx,
const OplogEntryOrGroupedInserts& opOrInserts,
- OplogApplication::Mode mode) {
- return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode);
+ OplogApplication::Mode mode,
+ const bool isDataConsistent) {
+ return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode, isDataConsistent);
});
if (!status.isOK()) {
LOGV2_ERROR(4886008,
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 8abfc3fca14..9953150b657 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -133,7 +133,8 @@ private:
Status _applyOplogEntryOrGroupedInserts(OperationContext* opCtx,
const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
- OplogApplication::Mode oplogApplicationMode);
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent);
std::vector<std::vector<const OplogEntry*>> _fillWriterVectors(OperationContext* opCtx,
TenantOplogBatch* batch);
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index b5f0d1fece3..6ff76d34087 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -72,8 +72,13 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
// inside. TODO(SERVER-46105)
invariant(!op.isCommand());
AutoGetCollection coll(opCtx, op.getNss(), MODE_IX);
- auto status = repl::applyOperation_inlock(
- opCtx, coll.getDb(), &op, false /*alwaysUpsert*/, oplogApplicationMode);
+ const bool isDataConsistent = true;
+ auto status = repl::applyOperation_inlock(opCtx,
+ coll.getDb(),
+ &op,
+ false /*alwaysUpsert*/,
+ oplogApplicationMode,
+ isDataConsistent);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 78943ba20df..b6583805036 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -118,6 +118,7 @@ namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(WTPauseStableTimestamp);
MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
MONGO_FAIL_POINT_DEFINE(WTSetOldestTSToStableTS);
@@ -1965,6 +1966,10 @@ void WiredTigerKVEngine::setJournalListener(JournalListener* jl) {
}
void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) {
+ if (MONGO_unlikely(WTPauseStableTimestamp.shouldFail())) {
+ return;
+ }
+
if (stableTimestamp.isNull()) {
return;
}
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index bc9f020a8e6..2f2ecfdd1a7 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -248,8 +248,9 @@ protected:
}
repl::UnreplicatedWritesBlock uwb(&_opCtx);
auto entry = uassertStatusOK(OplogEntry::parse(*i));
+ const bool dataIsConsistent = true;
uassertStatusOK(applyOperation_inlock(
- &_opCtx, ctx.db(), &entry, false, getOplogApplicationMode()));
+ &_opCtx, ctx.db(), &entry, false, getOplogApplicationMode(), dataIsConsistent));
}
}
}
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index dd25e73d3b0..8b4cfdc3cf2 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -801,8 +801,9 @@ public:
}
repl::OplogEntryOrGroupedInserts groupedInserts(opPtrs.cbegin(), opPtrs.cend());
+ const bool dataIsConsistent = true;
ASSERT_OK(repl::applyOplogEntryOrGroupedInserts(
- _opCtx, groupedInserts, repl::OplogApplication::Mode::kSecondary));
+ _opCtx, groupedInserts, repl::OplogApplication::Mode::kSecondary, dataIsConsistent));
for (std::int32_t idx = 0; idx < docsToInsert; ++idx) {
OneOffRead oor(_opCtx, firstInsertTime.addTicks(idx).asTimestamp());
@@ -2876,7 +2877,8 @@ public:
Status applyOplogBatchPerWorker(OperationContext* opCtx,
std::vector<const repl::OplogEntry*>* operationsToApply,
- WorkerMultikeyPathInfo* pathInfo) override;
+ WorkerMultikeyPathInfo* pathInfo,
+ bool isDataConsistent) override;
private:
// Pointer to the test's op context. This is distinct from the op context used in
@@ -2893,13 +2895,16 @@ private:
Status SecondaryReadsDuringBatchApplicationAreAllowedApplier::applyOplogBatchPerWorker(
OperationContext* opCtx,
std::vector<const repl::OplogEntry*>* operationsToApply,
- WorkerMultikeyPathInfo* pathInfo) {
+ WorkerMultikeyPathInfo* pathInfo,
+ const bool isDataConsistent) {
if (!_testOpCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) {
return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"};
}
// Insert the document. A reader without a PBWM lock should not see it yet.
- auto status = OplogApplierImpl::applyOplogBatchPerWorker(opCtx, operationsToApply, pathInfo);
+ const bool dataIsConsistent = true;
+ auto status = OplogApplierImpl::applyOplogBatchPerWorker(
+ opCtx, operationsToApply, pathInfo, dataIsConsistent);
if (!status.isOK()) {
return status;
}
@@ -3232,8 +3237,9 @@ public:
auto start = repl::makeStartIndexBuildOplogEntry(
startBuildOpTime, nss, "field_1", keyPattern, collUUID, indexBuildUUID);
+ const bool dataIsConsistent = true;
ASSERT_OK(repl::applyOplogEntryOrGroupedInserts(
- _opCtx, &start, repl::OplogApplication::Mode::kSecondary));
+ _opCtx, &start, repl::OplogApplication::Mode::kSecondary, dataIsConsistent));
// We cannot use the OperationContext to wait for the thread to reach the fail point
// because it also uses the ClockSourceMock.
@@ -3259,8 +3265,9 @@ public:
auto commit = repl::makeCommitIndexBuildOplogEntry(
startBuildOpTime, nss, "field_1", keyPattern, collUUID, indexBuildUUID);
+ const bool dataIsConsistent = true;
ASSERT_OK(repl::applyOplogEntryOrGroupedInserts(
- _opCtx, &commit, repl::OplogApplication::Mode::kSecondary));
+ _opCtx, &commit, repl::OplogApplication::Mode::kSecondary, dataIsConsistent));
// Reacquire read lock to check index metadata.
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS);