diff options
-rw-r--r-- | jstests/replsets/invalidate_images_when_minvalid.js | 162 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/insert_group.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/insert_group.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_utils.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_utils.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 5 | ||||
-rw-r--r-- | src/mongo/dbtests/repltests.cpp | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 19 |
18 files changed, 307 insertions, 64 deletions
diff --git a/jstests/replsets/invalidate_images_when_minvalid.js b/jstests/replsets/invalidate_images_when_minvalid.js new file mode 100644 index 00000000000..dc4c1d8a3b3 --- /dev/null +++ b/jstests/replsets/invalidate_images_when_minvalid.js @@ -0,0 +1,162 @@ +/** + * When a minvalid timestamp `T` is set, applying a batch of operations earlier than `T` do not have + * a consistent snapshot of data. In this case, we must write an invalidated document to the + * `config.image_collection` for a retryable `findAndModify` that does not store images in the + * oplog. This test ensures this behavior. + * + * To test this we: + * -- Hold the stable timestamp back on a primary + * -- Perform a number of retryable `findAndModify`s. + * -- Manually set `minvalid` to a value inbetween what performed `findAndModify`s. + * -- Restart the primary + * -- Verify that operations after the `minvalid` timestamp are marked "valid" and those prior are + * marked "invalid". We set the `replBatchLimitOperations` to one to achieve this. This is + * necessary for the test due to manually setting minvalid. In production `minvalid` should + * always be unset, or set to the top of oplog. + * + * Because we restart the node, this only works on storage engines that persist data. + * + * @tags: [multiversion_incompatible, requires_majority_read_concern, requires_persistence] + */ + +// Skip db hash check because replset cannot reach consistent state. +TestData.skipCheckDBHashes = true; + +(function() { +"use strict"; + +let replTest = new ReplSetTest({name: "invalidate_images_when_minvalid", nodes: 1}); + +let nodes = replTest.startSet({ + setParameter: { + featureFlagRetryableFindAndModify: true, + storeFindAndModifyImagesInSideCollection: true, + replBatchLimitOperations: 1 + } +}); +replTest.initiate(); +let primary = replTest.getPrimary(); +let coll = primary.getDB("test")["invalidating"]; +let images = primary.getDB("config")["image_collection"]; +let minvalid = primary.getDB("local")["replset.minvalid"]; + +// Pause the WT stable timestamp to have a restart perform replication recovery on every operation +// in the test. +assert.commandWorked(primary.adminCommand({ + "configureFailPoint": 'WTPauseStableTimestamp', + "mode": 'alwaysOn', +})); + +function doRetryableFindAndModify(lsid, query, postImage, remove) { + // Performs a retryable findAndModify. The document matched by query must exist. The + // findAndModify will either be an update that sets the documents `updated: 1` or + // removes the document. Returns the timestamp associated with the generated oplog entry. + // + // `postImage` and `remove` are booleans. The server rejects removes that ask for a post image. + let cmd = { + findandmodify: coll.getName(), + lsid: {id: lsid}, + txnNumber: NumberLong(1), + stmtId: NumberInt(0), + query: query, + new: postImage, + upsert: false, + }; + + if (remove) { + cmd["remove"] = true; + } else { + cmd["update"] = {$set: {updated: 1}}; + } + + return assert.commandWorked(coll.runCommand(cmd))["operationTime"]; +} + +// Each write contains arguments for calling `doRetryableFindAndModify`. +let invalidatedWrites = [ + {uuid: UUID(), query: {_id: 1}, postImage: false, remove: false}, + {uuid: UUID(), query: {_id: 2}, postImage: true, remove: false}, + {uuid: UUID(), query: {_id: 3}, postImage: false, remove: true} +]; +let validWrites = [ + {uuid: UUID(), query: {_id: 4}, postImage: false, remove: false}, + {uuid: UUID(), query: {_id: 5}, postImage: true, remove: false}, + {uuid: UUID(), query: {_id: 6}, postImage: false, remove: true} +]; + +// Insert each document a query should match. +for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + assert.commandWorked(coll.insert(invalidatedWrites[idx]["query"])); +} +for (let idx = 0; idx < validWrites.length; ++idx) { + assert.commandWorked(coll.insert(validWrites[idx]["query"])); +} + +// Perform `findAndModify`s. Record the timestamp of the last `invalidatedWrites` to set minvalid +// with. +let lastInvalidatedImageTs = null; +for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + let write = invalidatedWrites[idx]; + lastInvalidatedImageTs = doRetryableFindAndModify( + write['uuid'], write['query'], write['postImage'], write['remove']); +} +for (let idx = 0; idx < validWrites.length; ++idx) { + let write = validWrites[idx]; + doRetryableFindAndModify(write['uuid'], write['query'], write['postImage'], write['remove']); +} + +let imageDocs = []; +images.find().forEach((x) => { + imageDocs.push(x); +}); + +jsTestLog({"MinValid": lastInvalidatedImageTs, "Pre-restart images": imageDocs}); +assert.commandWorked(minvalid.update({}, {$set: {ts: lastInvalidatedImageTs}})); + +replTest.restart(primary, undefined, true); + +primary = replTest.getPrimary(); +coll = primary.getDB("test")["invalidating"]; +images = primary.getDB("config")["image_collection"]; +minvalid = primary.getDB("local")["replset.minvalid"]; + +imageDocs = []; +images.find().forEach((x) => { + imageDocs.push(x); +}); +jsTestLog({"Post-restart images": imageDocs}); + +for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + let write = invalidatedWrites[idx]; + let image = images.findOne({"_id.id": write["uuid"]}); + + assert.eq(1, image["txnNum"]); + assert.eq(true, image["invalidated"]); + assert.eq("minvalid suggests inconsistent snapshot", image["invalidatedReason"]); + if (write["postImage"]) { + assert.eq("postImage", image["imageKind"]); + } else { + assert.eq("preImage", image["imageKind"]); + } +} + +for (let idx = 0; idx < validWrites.length; ++idx) { + let write = validWrites[idx]; + let image = images.findOne({"_id.id": write["uuid"]}); + + assert.eq(1, image["txnNum"]); + assert.eq(false, image["invalidated"]); + if (write["postImage"]) { + assert.eq("postImage", image["imageKind"]); + + let postImage = write["query"]; + postImage["updated"] = 1; + assert.eq(postImage, image["image"]); + } else { + assert.eq("preImage", image["imageKind"]); + assert.eq(write["query"], image["image"]); + } +} + +replTest.stopSet(); +})(); diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 840cd23db3b..bc2ff044008 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -186,8 +186,9 @@ Status _applyOps(OperationContext* opCtx, OldClientContext ctx(opCtx, nss.ns()); const auto& op = entry.getValue(); + const bool isDataConsistent = true; status = repl::applyOperation_inlock( - opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode); + opCtx, ctx.db(), &op, alwaysUpsert, oplogApplicationMode, isDataConsistent); if (!status.isOK()) return status; @@ -251,8 +252,13 @@ Status _applyOps(OperationContext* opCtx, // ops doesn't stop the applyOps from trying to process the rest of the // ops. This is to leave the door open to parallelizing CRUD op // application in the future. - return repl::applyOperation_inlock( - opCtx, ctx.db(), &entry, alwaysUpsert, oplogApplicationMode); + const bool isDataConsistent = true; + return repl::applyOperation_inlock(opCtx, + ctx.db(), + &entry, + alwaysUpsert, + oplogApplicationMode, + isDataConsistent); }); } catch (const DBException& ex) { ab.append(false); diff --git a/src/mongo/db/repl/insert_group.cpp b/src/mongo/db/repl/insert_group.cpp index fc4870d77f8..308d49cd931 100644 --- a/src/mongo/db/repl/insert_group.cpp +++ b/src/mongo/db/repl/insert_group.cpp @@ -59,11 +59,13 @@ constexpr auto kInsertGroupMaxOpCount = 64; InsertGroup::InsertGroup(std::vector<const OplogEntry*>* ops, OperationContext* opCtx, InsertGroup::Mode mode, + const bool isDataConsistent, ApplyFunc applyOplogEntryOrGroupedInserts) : _doNotGroupBeforePoint(ops->cbegin()), _end(ops->cend()), _opCtx(opCtx), _mode(mode), + _isDataConsistent(isDataConsistent), _applyOplogEntryOrGroupedInserts(applyOplogEntryOrGroupedInserts) {} StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( @@ -131,8 +133,8 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( // Create an oplog entry group for grouped inserts. OplogEntryOrGroupedInserts groupedInserts(it, endOfGroupableOpsIterator); try { - // Apply the group of inserts by passing in groupedInserts. - uassertStatusOK(_applyOplogEntryOrGroupedInserts(_opCtx, groupedInserts, _mode)); + uassertStatusOK( + _applyOplogEntryOrGroupedInserts(_opCtx, groupedInserts, _mode, _isDataConsistent)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. return endOfGroupableOpsIterator - 1; diff --git a/src/mongo/db/repl/insert_group.h b/src/mongo/db/repl/insert_group.h index f89e8c016b5..4586ee997ab 100644 --- a/src/mongo/db/repl/insert_group.h +++ b/src/mongo/db/repl/insert_group.h @@ -51,12 +51,13 @@ public: using ConstIterator = std::vector<const OplogEntry*>::const_iterator; using Mode = OplogApplication::Mode; typedef std::function<Status( - OperationContext*, const OplogEntryOrGroupedInserts&, OplogApplication::Mode)> + OperationContext*, const OplogEntryOrGroupedInserts&, OplogApplication::Mode, bool)> ApplyFunc; InsertGroup(std::vector<const OplogEntry*>* ops, OperationContext* opCtx, Mode mode, + bool isDataConsistent, ApplyFunc applyOplogEntryOrGroupedInserts); /** @@ -77,6 +78,7 @@ private: // Passed to _applyOplogEntryOrGroupedInserts when applying grouped inserts. OperationContext* _opCtx; Mode _mode; + bool _isDataConsistent; // The function that does the actual oplog application. ApplyFunc _applyOplogEntryOrGroupedInserts; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index dce50640e27..c5aefc7b346 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -164,6 +164,16 @@ void applyImportCollectionDefault(OperationContext* opCtx, "isDryRun"_attr = isDryRun); } +StringData getInvalidatingReason(const OplogApplication::Mode mode, const bool isDataConsistent) { + if (mode == OplogApplication::Mode::kInitialSync) { + return "initial sync"_sd; + } else if (!isDataConsistent) { + return "minvalid suggests inconsistent snapshot"_sd; + } + + return ""_sd; +} + } // namespace ApplyImportCollectionFn applyImportCollection = applyImportCollectionDefault; @@ -248,6 +258,7 @@ void writeToImageCollection(OperationContext* opCtx, const Timestamp timestamp, repl::RetryImageEnum imageKind, const BSONObj& dataImage, + const StringData& invalidatedReason, bool* upsertConfigImage) { AutoGetCollection autoColl(opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX); repl::ImageEntry imageEntry; @@ -258,7 +269,7 @@ void writeToImageCollection(OperationContext* opCtx, imageEntry.setImage(dataImage); if (dataImage.isEmpty()) { imageEntry.setInvalidated(true); - imageEntry.setInvalidatedReason("initial sync"_sd); + imageEntry.setInvalidatedReason(invalidatedReason); } UpdateRequest request; @@ -1084,6 +1095,7 @@ Status applyOperation_inlock(OperationContext* opCtx, const OplogEntryOrGroupedInserts& opOrGroupedInserts, bool alwaysUpsert, OplogApplication::Mode mode, + const bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { // Get the single oplog entry to be applied or the first oplog entry of grouped inserts. auto op = opOrGroupedInserts.getOp(); @@ -1432,7 +1444,7 @@ Status applyOperation_inlock(OperationContext* opCtx, request.setUpdateModification(std::move(updateMod)); request.setUpsert(upsert); request.setFromOplogApplication(true); - if (mode != OplogApplication::Mode::kInitialSync) { + if (mode != OplogApplication::Mode::kInitialSync && isDataConsistent) { if (op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage) { request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD); } else if (op.getNeedsRetryImage() == repl::RetryImageEnum::kPostImage) { @@ -1560,6 +1572,7 @@ Status applyOperation_inlock(OperationContext* opCtx, // initial sync, the value passed in here is conveniently // the empty BSONObj. ur.requestedDocImage, + getInvalidatingReason(mode, isDataConsistent), &upsertConfigImage); } @@ -1611,7 +1624,8 @@ Status applyOperation_inlock(OperationContext* opCtx, request.setNsString(requestNss); request.setQuery(deleteCriteria); if (mode != OplogApplication::Mode::kInitialSync && - op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage) { + op.getNeedsRetryImage() == repl::RetryImageEnum::kPreImage && + isDataConsistent) { // When in initial sync, we'll pass an empty image into // `writeToImageCollection`. request.setReturnDeleted(true); @@ -1624,17 +1638,13 @@ Status applyOperation_inlock(OperationContext* opCtx, // isn't strictly necessary for correctness -- the `config.transactions` table // is responsible for whether to retry. The motivation here is to simply reduce // the number of states related documents in the two collections can be in. - BSONObj imageDoc; - if (result.nDeleted > 0 && mode != OplogApplication::Mode::kInitialSync) { - imageDoc = result.requestedPreImage.get(); - } - writeToImageCollection(opCtx, op.getSessionId().get(), op.getTxnNumber().get(), op.getTimestamp(), repl::RetryImageEnum::kPreImage, - imageDoc, + result.requestedPreImage.value_or(BSONObj()), + getInvalidatingReason(mode, isDataConsistent), &upsertConfigImage); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 37b9f29fd98..e975ee42095 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -215,6 +215,7 @@ Status applyOperation_inlock(OperationContext* opCtx, const OplogEntryOrGroupedInserts& opOrGroupedInserts, bool alwaysUpsert, OplogApplication::Mode mode, + const bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {}); /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 7604af715dc..2781cf72635 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -494,6 +494,10 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, pauseBatchApplicationAfterWritingOplogEntries.pauseWhileSet(opCtx); } + // Read `minValid` prior to it possibly being written to. + const bool isDataConsistent = + _consistencyMarkers->getMinValid(opCtx) < ops.front().getOpTime(); + // Reset consistency markers in case the node fails while applying ops. if (!getOptions().skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); @@ -501,9 +505,9 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, } { + std::vector<Status> statusVector(_writerPool->getStats().options.maxThreads, Status::OK()); - // Doles out all the work to the writer pool threads. writerVectors is not modified, // but applyOplogBatchPerWorker will modify the vectors that it contains. invariant(writerVectors.size() == statusVector.size()); @@ -511,24 +515,25 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx, if (writerVectors[i].empty()) continue; - _writerPool->schedule( - [this, - &writer = writerVectors.at(i), - &status = statusVector.at(i), - &multikeyVector = multikeyVector.at(i)](auto scheduleStatus) { - invariant(scheduleStatus); + _writerPool->schedule([this, + &writer = writerVectors.at(i), + &status = statusVector.at(i), + &multikeyVector = multikeyVector.at(i), + isDataConsistent = isDataConsistent](auto scheduleStatus) { + invariant(scheduleStatus); - auto opCtx = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - // This code path is only executed on secondaries and initial syncing nodes, - // so it is safe to exclude any writes from Flow Control. - opCtx->setShouldParticipateInFlowControl(false); - opCtx->setEnforceConstraints(false); + // This code path is only executed on secondaries and initial syncing nodes, + // so it is safe to exclude any writes from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); + opCtx->setEnforceConstraints(false); - status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { - return applyOplogBatchPerWorker(opCtx.get(), &writer, &multikeyVector); - }); + status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown([&] { + return applyOplogBatchPerWorker( + opCtx.get(), &writer, &multikeyVector, isDataConsistent); }); + }); } _writerPool->waitForIdle(); @@ -745,7 +750,8 @@ void OplogApplierImpl::fillWriterVectors_forTest( Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, - OplogApplication::Mode oplogApplicationMode) { + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent) { // Guarantees that applyOplogEntryOrGroupedInserts' context matches that of its calling // function, applyOplogBatchPerWorker. invariant(!opCtx->writesAreReplicated()); @@ -768,6 +774,7 @@ Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx, auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx, entryOrGroupedInserts, oplogApplicationMode, + isDataConsistent, incrementOpsAppliedStats, &replOpCounters); @@ -793,7 +800,8 @@ Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx, Status OplogApplierImpl::applyOplogBatchPerWorker(OperationContext* opCtx, std::vector<const OplogEntry*>* ops, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent) { UnreplicatedWritesBlock uwb(opCtx); // Since we swap the locker in stash / unstash transaction resources, // ShouldNotConflictWithSecondaryBatchApplicationBlock will touch the locker that has been @@ -820,6 +828,7 @@ Status OplogApplierImpl::applyOplogBatchPerWorker(OperationContext* opCtx, ops, getOptions().mode, getOptions().allowNamespaceNotFoundErrorsOnCrudOps, + isDataConsistent, &applyOplogEntryOrGroupedInserts); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index 9eb1b15a169..574965802fc 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -140,7 +140,8 @@ protected: */ virtual Status applyOplogBatchPerWorker(OperationContext* opCtx, std::vector<const OplogEntry*>* ops, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent); }; /** @@ -148,7 +149,8 @@ protected: */ Status applyOplogEntryOrGroupedInserts(OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, - OplogApplication::Mode oplogApplicationMode); + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index fec78085aa2..eef15241e05 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -374,7 +374,8 @@ public: Status applyOplogBatchPerWorker(OperationContext* opCtx, std::vector<const OplogEntry*>* ops, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) override; + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + bool isDataConsistent) override; std::vector<OplogEntry> getOperationsApplied() { stdx::lock_guard lk(_mutex); @@ -390,7 +391,8 @@ private: Status TrackOpsAppliedApplier::applyOplogBatchPerWorker( OperationContext* opCtx, std::vector<const OplogEntry*>* ops, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent) { stdx::lock_guard lk(_mutex); for (auto&& opPtr : *ops) { _operationsApplied.push_back(*opPtr); @@ -479,7 +481,9 @@ TEST_F(OplogApplierImplTest, TestApplyOplogGroupApplier oplogApplier( nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK( + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent)); // Collection should be created after applyOplogEntryOrGroupedInserts() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } @@ -1470,7 +1474,8 @@ void testWorkerMultikeyPaths(OperationContext* opCtx, nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; std::vector<const OplogEntry*> ops = {&op}; - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(opCtx, &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(opCtx, &ops, &pathInfo, dataIsConsistent)); ASSERT_EQ(pathInfo.size(), numPaths); } @@ -1536,7 +1541,9 @@ TEST_F(OplogApplierImplTest, OplogApplicationThreadFuncAddsMultipleWorkerMultike nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); WorkerMultikeyPathInfo pathInfo; std::vector<const OplogEntry*> ops = {&opA, &opB}; - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK( + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent)); ASSERT_EQ(pathInfo.size(), 2UL); } } @@ -1583,8 +1590,10 @@ TEST_F(OplogApplierImplTest, OplogApplicationThreadFuncFailsWhenCollectionCreati TestApplyOplogGroupApplier oplogApplier( nullptr, nullptr, OplogApplier::Options(OplogApplication::Mode::kSecondary)); std::vector<const OplogEntry*> ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, - oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, nullptr)); + const bool dataIsConsistent = true; + ASSERT_EQUALS( + ErrorCodes::InvalidOptions, + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, nullptr, dataIsConsistent)); } TEST_F(OplogApplierImplTest, @@ -1968,7 +1977,9 @@ TEST_F(OplogApplierImplTest, ApplyGroupIgnoresUpdateOperationIfDocumentIsMissing {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); std::vector<const OplogEntry*> ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK( + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent)); // Since the document was missing when we cloned data from the sync source, the collection // referenced by the failed operation should not be automatically created. @@ -1991,7 +2002,9 @@ TEST_F(OplogApplierImplTest, auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); std::vector<const OplogEntry*> ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK( + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent)); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); @@ -2019,7 +2032,9 @@ TEST_F(OplogApplierImplTest, auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); std::vector<const OplogEntry*> ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo)); + const bool dataIsConsistent = true; + ASSERT_OK( + oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &ops, &pathInfo, dataIsConsistent)); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index c7c6ecb8d82..cda89640fdf 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -208,7 +208,8 @@ Status OplogApplierImplTest::_applyOplogEntryOrGroupedInsertsWrapper( OplogApplication::Mode oplogApplicationMode) { UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); - return applyOplogEntryOrGroupedInserts(opCtx, batch, oplogApplicationMode); + const bool dataIsConsistent = true; + return applyOplogEntryOrGroupedInserts(opCtx, batch, oplogApplicationMode, dataIsConsistent); } void OplogApplierImplTest::_testApplyOplogEntryOrGroupedInsertsCrudOperation( @@ -285,7 +286,9 @@ Status OplogApplierImplTest::runOpsSteadyState(std::vector<OplogEntry> ops) { opsPtrs.push_back(&op); } WorkerMultikeyPathInfo pathInfo; - return oplogApplier.applyOplogBatchPerWorker(_opCtx.get(), &opsPtrs, &pathInfo); + const bool dataIsConsistent = true; + return oplogApplier.applyOplogBatchPerWorker( + _opCtx.get(), &opsPtrs, &pathInfo, dataIsConsistent); } Status OplogApplierImplTest::runOpInitialSync(const OplogEntry& op) { diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index 82caae399c7..a6ed6f8a0ed 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -189,6 +189,7 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats, OpCounters* opCounters) { invariant(DocumentValidationSettings::get(opCtx).isSchemaValidationDisabled()); @@ -230,6 +231,7 @@ Status OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( entryOrGroupedInserts, shouldAlwaysUpsert, oplogApplicationMode, + isDataConsistent, incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); @@ -276,6 +278,7 @@ Status OplogApplierUtils::applyOplogBatchCommon( std::vector<const OplogEntry*>* ops, OplogApplication::Mode oplogApplicationMode, bool allowNamespaceNotFoundErrorsOnCrudOps, + const bool isDataConsistent, InsertGroup::ApplyFunc applyOplogEntryOrGroupedInserts) noexcept { // We cannot do document validation, because document validation could have been disabled when @@ -284,7 +287,8 @@ Status OplogApplierUtils::applyOplogBatchCommon( // Group the operations by namespace in order to get larger groups for bulk inserts, but do not // mix up the current order of oplog entries within the same namespace (thus *stable* sort). stableSortByNamespace(ops); - InsertGroup insertGroup(ops, opCtx, oplogApplicationMode, applyOplogEntryOrGroupedInserts); + InsertGroup insertGroup( + ops, opCtx, oplogApplicationMode, isDataConsistent, applyOplogEntryOrGroupedInserts); for (auto it = ops->cbegin(); it != ops->cend(); ++it) { const OplogEntry& entry = **it; @@ -299,8 +303,8 @@ Status OplogApplierUtils::applyOplogBatchCommon( // If we didn't create a group, try to apply the op individually. try { - const Status status = - applyOplogEntryOrGroupedInserts(opCtx, &entry, oplogApplicationMode); + const Status status = applyOplogEntryOrGroupedInserts( + opCtx, &entry, oplogApplicationMode, isDataConsistent); if (!status.isOK()) { // Tried to apply an update operation but the document is missing, there must be diff --git a/src/mongo/db/repl/oplog_applier_utils.h b/src/mongo/db/repl/oplog_applier_utils.h index 661597c5e12..05a5a9b47e9 100644 --- a/src/mongo/db/repl/oplog_applier_utils.h +++ b/src/mongo/db/repl/oplog_applier_utils.h @@ -124,6 +124,7 @@ public: OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, OplogApplication::Mode oplogApplicationMode, + bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats, OpCounters* opCounters); @@ -136,6 +137,7 @@ public: std::vector<const OplogEntry*>* ops, OplogApplication::Mode oplogApplicationMode, bool allowNamespaceNotFoundErrorsOnCrudOps, + bool isDataConsistent, InsertGroup::ApplyFunc applyOplogEntryOrGroupedInserts) noexcept; }; diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index d3a02d2b8eb..477cfd0c029 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -926,7 +926,8 @@ std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVecto Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, - OplogApplication::Mode oplogApplicationMode) { + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent) { // We must ensure the opCtx uses replicated writes, because that will ensure we get a // NotWritablePrimary error if a stepdown occurs. invariant(opCtx->writesAreReplicated()); @@ -975,12 +976,14 @@ Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( } // We don't count tenant application in the ops applied stats. auto incrementOpsAppliedStats = [] {}; - // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to - // a cloned database the way initial sync does. + // We always use oplog application mode 'kInitialSync' and isDataConsistent 'false', because + // we're applying oplog entries to a cloned database the way initial sync does. + invariant(isDataConsistent == false); auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( opCtx, entryOrGroupedInserts, OplogApplication::Mode::kInitialSync, + isDataConsistent, incrementOpsAppliedStats, nullptr /* opCounters*/); LOGV2_DEBUG(4886009, @@ -1003,15 +1006,18 @@ Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntr opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false); const bool allowNamespaceNotFoundErrorsOnCrudOps(true); + const bool isDataConsistent = false; auto status = OplogApplierUtils::applyOplogBatchCommon( opCtx.get(), ops, OplogApplication::Mode::kInitialSync, allowNamespaceNotFoundErrorsOnCrudOps, + isDataConsistent, [this](OperationContext* opCtx, const OplogEntryOrGroupedInserts& opOrInserts, - OplogApplication::Mode mode) { - return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode); + OplogApplication::Mode mode, + const bool isDataConsistent) { + return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode, isDataConsistent); }); if (!status.isOK()) { LOGV2_ERROR(4886008, diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 8abfc3fca14..9953150b657 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -133,7 +133,8 @@ private: Status _applyOplogEntryOrGroupedInserts(OperationContext* opCtx, const OplogEntryOrGroupedInserts& entryOrGroupedInserts, - OplogApplication::Mode oplogApplicationMode); + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent); std::vector<std::vector<const OplogEntry*>> _fillWriterVectors(OperationContext* opCtx, TenantOplogBatch* batch); diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index b5f0d1fece3..6ff76d34087 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -72,8 +72,13 @@ Status _applyOperationsForTransaction(OperationContext* opCtx, // inside. TODO(SERVER-46105) invariant(!op.isCommand()); AutoGetCollection coll(opCtx, op.getNss(), MODE_IX); - auto status = repl::applyOperation_inlock( - opCtx, coll.getDb(), &op, false /*alwaysUpsert*/, oplogApplicationMode); + const bool isDataConsistent = true; + auto status = repl::applyOperation_inlock(opCtx, + coll.getDb(), + &op, + false /*alwaysUpsert*/, + oplogApplicationMode, + isDataConsistent); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 78943ba20df..b6583805036 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -118,6 +118,7 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(WTPauseStableTimestamp); MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely); MONGO_FAIL_POINT_DEFINE(WTSetOldestTSToStableTS); @@ -1965,6 +1966,10 @@ void WiredTigerKVEngine::setJournalListener(JournalListener* jl) { } void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp, bool force) { + if (MONGO_unlikely(WTPauseStableTimestamp.shouldFail())) { + return; + } + if (stableTimestamp.isNull()) { return; } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index bc9f020a8e6..2f2ecfdd1a7 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -248,8 +248,9 @@ protected: } repl::UnreplicatedWritesBlock uwb(&_opCtx); auto entry = uassertStatusOK(OplogEntry::parse(*i)); + const bool dataIsConsistent = true; uassertStatusOK(applyOperation_inlock( - &_opCtx, ctx.db(), &entry, false, getOplogApplicationMode())); + &_opCtx, ctx.db(), &entry, false, getOplogApplicationMode(), dataIsConsistent)); } } } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index dd25e73d3b0..8b4cfdc3cf2 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -801,8 +801,9 @@ public: } repl::OplogEntryOrGroupedInserts groupedInserts(opPtrs.cbegin(), opPtrs.cend()); + const bool dataIsConsistent = true; ASSERT_OK(repl::applyOplogEntryOrGroupedInserts( - _opCtx, groupedInserts, repl::OplogApplication::Mode::kSecondary)); + _opCtx, groupedInserts, repl::OplogApplication::Mode::kSecondary, dataIsConsistent)); for (std::int32_t idx = 0; idx < docsToInsert; ++idx) { OneOffRead oor(_opCtx, firstInsertTime.addTicks(idx).asTimestamp()); @@ -2876,7 +2877,8 @@ public: Status applyOplogBatchPerWorker(OperationContext* opCtx, std::vector<const repl::OplogEntry*>* operationsToApply, - WorkerMultikeyPathInfo* pathInfo) override; + WorkerMultikeyPathInfo* pathInfo, + bool isDataConsistent) override; private: // Pointer to the test's op context. This is distinct from the op context used in @@ -2893,13 +2895,16 @@ private: Status SecondaryReadsDuringBatchApplicationAreAllowedApplier::applyOplogBatchPerWorker( OperationContext* opCtx, std::vector<const repl::OplogEntry*>* operationsToApply, - WorkerMultikeyPathInfo* pathInfo) { + WorkerMultikeyPathInfo* pathInfo, + const bool isDataConsistent) { if (!_testOpCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) { return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"}; } // Insert the document. A reader without a PBWM lock should not see it yet. - auto status = OplogApplierImpl::applyOplogBatchPerWorker(opCtx, operationsToApply, pathInfo); + const bool dataIsConsistent = true; + auto status = OplogApplierImpl::applyOplogBatchPerWorker( + opCtx, operationsToApply, pathInfo, dataIsConsistent); if (!status.isOK()) { return status; } @@ -3232,8 +3237,9 @@ public: auto start = repl::makeStartIndexBuildOplogEntry( startBuildOpTime, nss, "field_1", keyPattern, collUUID, indexBuildUUID); + const bool dataIsConsistent = true; ASSERT_OK(repl::applyOplogEntryOrGroupedInserts( - _opCtx, &start, repl::OplogApplication::Mode::kSecondary)); + _opCtx, &start, repl::OplogApplication::Mode::kSecondary, dataIsConsistent)); // We cannot use the OperationContext to wait for the thread to reach the fail point // because it also uses the ClockSourceMock. @@ -3259,8 +3265,9 @@ public: auto commit = repl::makeCommitIndexBuildOplogEntry( startBuildOpTime, nss, "field_1", keyPattern, collUUID, indexBuildUUID); + const bool dataIsConsistent = true; ASSERT_OK(repl::applyOplogEntryOrGroupedInserts( - _opCtx, &commit, repl::OplogApplication::Mode::kSecondary)); + _opCtx, &commit, repl::OplogApplication::Mode::kSecondary, dataIsConsistent)); // Reacquire read lock to check index metadata. AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IS); |