diff options
author | Jason Chan <jason.chan@10gen.com> | 2021-05-29 23:11:47 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-30 04:04:55 +0000 |
commit | 5ced9522d15db7887fd32b82f50ec4b9eed46c70 (patch) | |
tree | 93a4e514a270c56e6313e1881609131f175be40e | |
parent | e4faa4d08720df067a62d8e48f66944a2f2b76cc (diff) | |
download | mongo-5ced9522d15db7887fd32b82f50ec4b9eed46c70.tar.gz |
SERVER-57173: Add distinction to oplog application when data is inconsistent.
-rw-r--r-- | jstests/replsets/invalidate_images_when_minvalid.js | 162 | ||||
-rw-r--r-- | src/mongo/db/repl/applier_helpers.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/applier_helpers.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test_fixture.h | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/repltests.cpp | 9 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 8 |
14 files changed, 335 insertions, 77 deletions
diff --git a/jstests/replsets/invalidate_images_when_minvalid.js b/jstests/replsets/invalidate_images_when_minvalid.js new file mode 100644 index 00000000000..53b39956dee --- /dev/null +++ b/jstests/replsets/invalidate_images_when_minvalid.js @@ -0,0 +1,162 @@ +/** + * When a minvalid timestamp `T` is set, applying a batch of operations earlier than `T` do not have + * a consistent snapshot of data. In this case, we must write an invalidated document to the + * `config.image_collection` for a retryable `findAndModify` that does not store images in the + * oplog. This test ensures this behavior. + * + * To test this we: + * -- Hold the stable timestamp back on a primary + * -- Perform a number of retryable `findAndModify`s. + * -- Manually set `minvalid` to a value inbetween what performed `findAndModify`s. + * -- Restart the primary + * -- Verify that operations after the `minvalid` timestamp are marked "valid" and those prior are + * marked "invalid". We set the `replBatchLimitOperations` to one to achieve this. This is + * necessary for the test due to manually setting minvalid. In production `minvalid` should + * always be unset, or set to the top of oplog. + * + * Because we restart the node, this only works on storage engines that persist data. + * + * @tags: [requires_majority_read_concern, requires_persistence] + */ + +// Skip db hash check because replset cannot reach consistent state. +TestData.skipCheckDBHashes = true; + +(function() { + "use strict"; + + let replTest = new ReplSetTest({name: "invalidate_images_when_minvalid", nodes: 1}); + + let nodes = replTest.startSet({ + setParameter: + {storeFindAndModifyImagesInSideCollection: true, replBatchLimitOperations: 1} + }); + replTest.initiate(); + let primary = replTest.getPrimary(); + let coll = primary.getDB("test")["invalidating"]; + let images = primary.getDB("config")["image_collection"]; + let minvalid = primary.getDB("local")["replset.minvalid"]; + + // Disable snapshotting to have a restart perform replication recovery on every operation in the + // test. + assert.commandWorked(primary.adminCommand({ + "configureFailPoint": 'disableSnapshotting', + "mode": 'alwaysOn', + })); + + function doRetryableFindAndModify(lsid, query, postImage, remove) { + // Performs a retryable findAndModify. The document matched by query must exist. The + // findAndModify will either be an update that sets the documents `updated: 1` or + // removes the document. Returns the timestamp associated with the generated oplog entry. + // + // `postImage` and `remove` are booleans. The server rejects removes that ask for a post + // image. + let cmd = { + findandmodify: coll.getName(), + lsid: {id: lsid}, + txnNumber: NumberLong(1), + stmtId: NumberInt(0), + query: query, + new: postImage, + upsert: false, + }; + + if (remove) { + cmd["remove"] = true; + } else { + cmd["update"] = {$set: {updated: 1}}; + } + + return assert.commandWorked(coll.runCommand(cmd))["operationTime"]; + } + + // Each write contains arguments for calling `doRetryableFindAndModify`. + let invalidatedWrites = [ + {uuid: UUID(), query: {_id: 1}, postImage: false, remove: false}, + {uuid: UUID(), query: {_id: 2}, postImage: true, remove: false}, + {uuid: UUID(), query: {_id: 3}, postImage: false, remove: true} + ]; + let validWrites = [ + {uuid: UUID(), query: {_id: 4}, postImage: false, remove: false}, + {uuid: UUID(), query: {_id: 5}, postImage: true, remove: false}, + {uuid: UUID(), query: {_id: 6}, postImage: false, remove: true} + ]; + + // Insert each document a query should match. + for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + assert.commandWorked(coll.insert(invalidatedWrites[idx]["query"])); + } + for (let idx = 0; idx < validWrites.length; ++idx) { + assert.commandWorked(coll.insert(validWrites[idx]["query"])); + } + + // Perform `findAndModify`s. Record the timestamp of the last `invalidatedWrites` to set + // minvalid + // with. + let lastInvalidatedImageTs = null; + for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + let write = invalidatedWrites[idx]; + lastInvalidatedImageTs = doRetryableFindAndModify( + write['uuid'], write['query'], write['postImage'], write['remove']); + } + for (let idx = 0; idx < validWrites.length; ++idx) { + let write = validWrites[idx]; + doRetryableFindAndModify( + write['uuid'], write['query'], write['postImage'], write['remove']); + } + + let imageDocs = []; + images.find().forEach((x) => { + imageDocs.push(x); + }); + + jsTestLog(tojson({"MinValid": lastInvalidatedImageTs, "Pre-restart images": imageDocs})); + assert.commandWorked(minvalid.update({}, {$set: {ts: lastInvalidatedImageTs}})); + + replTest.restart(primary, undefined, true); + + primary = replTest.getPrimary(); + coll = primary.getDB("test")["invalidating"]; + images = primary.getDB("config")["image_collection"]; + minvalid = primary.getDB("local")["replset.minvalid"]; + + imageDocs = []; + images.find().forEach((x) => { + imageDocs.push(x); + }); + jsTestLog(tojson({"Post-restart images": imageDocs})); + + for (let idx = 0; idx < invalidatedWrites.length; ++idx) { + let write = invalidatedWrites[idx]; + let image = images.findOne({"_id.id": write["uuid"]}); + + assert.eq(1, image["txnNum"]); + assert.eq(true, image["invalidated"]); + assert.eq("minvalid suggests inconsistent snapshot", image["invalidatedReason"]); + if (write["postImage"]) { + assert.eq("postImage", image["imageKind"]); + } else { + assert.eq("preImage", image["imageKind"]); + } + } + + for (let idx = 0; idx < validWrites.length; ++idx) { + let write = validWrites[idx]; + let image = images.findOne({"_id.id": write["uuid"]}); + + assert.eq(1, image["txnNum"]); + assert.eq(false, image["invalidated"]); + if (write["postImage"]) { + assert.eq("postImage", image["imageKind"]); + + let postImage = write["query"]; + postImage["updated"] = 1; + assert.eq(postImage, image["image"]); + } else { + assert.eq("preImage", image["imageKind"]); + assert.eq(write["query"], image["image"]); + } + } + + replTest.stopSet(); +})(); diff --git a/src/mongo/db/repl/applier_helpers.cpp b/src/mongo/db/repl/applier_helpers.cpp index 9bcb21d42b1..3f78613867a 100644 --- a/src/mongo/db/repl/applier_helpers.cpp +++ b/src/mongo/db/repl/applier_helpers.cpp @@ -75,7 +75,8 @@ InsertGroup::InsertGroup(ApplierHelpers::OperationPtrs* ops, InsertGroup::Mode mode) : _doNotGroupBeforePoint(ops->cbegin()), _end(ops->cend()), _opCtx(opCtx), _mode(mode) {} -StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIterator it) { +StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts( + ConstIterator it, const bool isDataConsistent) { const auto& entry = **it; // The following conditions must be met before attempting to group the oplog entries starting @@ -188,7 +189,7 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIt auto groupedInsertObj = groupedInsertBuilder.done(); try { // Apply the group of inserts. - uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode)); + uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode, isDataConsistent)); // It succeeded, advance the oplogEntriesIterator to the end of the // group of inserts. return endOfGroupableOpsIterator - 1; diff --git a/src/mongo/db/repl/applier_helpers.h b/src/mongo/db/repl/applier_helpers.h index fae2ce2404e..d4c5e4a3b5a 100644 --- a/src/mongo/db/repl/applier_helpers.h +++ b/src/mongo/db/repl/applier_helpers.h @@ -75,7 +75,8 @@ public: * If the grouped insert is applied successfully, returns the iterator to the last standalone * insert operation included in the applied grouped insert. */ - StatusWith<ConstIterator> groupAndApplyInserts(ConstIterator oplogEntriesIterator); + StatusWith<ConstIterator> groupAndApplyInserts(ConstIterator oplogEntriesIterator, + const bool isDataConsistent); private: // _doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 563caf6cc12..86d8827b1f4 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -178,8 +178,9 @@ Status _applyOps(OperationContext* opCtx, OldClientContext ctx(opCtx, nss.ns()); + const bool isDataConsistent = true; status = repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); + opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode, isDataConsistent); if (!status.isOK()) return status; @@ -234,8 +235,13 @@ Status _applyOps(OperationContext* opCtx, // ops doesn't stop the applyOps from trying to process the rest of the // ops. This is to leave the door open to parallelizing CRUD op // application in the future. - return repl::applyOperation_inlock( - opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode); + const bool dataIsConsistent = true; + return repl::applyOperation_inlock(opCtx, + ctx.db(), + opObj, + alwaysUpsert, + oplogApplicationMode, + dataIsConsistent); } auto fieldO = opObj["o"]; diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp index a8b5e0722ea..31a7263b752 100644 --- a/src/mongo/db/repl/do_txn.cpp +++ b/src/mongo/db/repl/do_txn.cpp @@ -171,8 +171,13 @@ Status _doTxn(OperationContext* opCtx, // Setting alwaysUpsert to true makes sense only during oplog replay, and doTxn commands // should not be executed during oplog replay. const bool alwaysUpsert = false; - status = repl::applyOperation_inlock( - opCtx, db, opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOpsCmd); + const bool dataIsConsistent = true; + status = repl::applyOperation_inlock(opCtx, + db, + opObj, + alwaysUpsert, + repl::OplogApplication::Mode::kApplyOpsCmd, + dataIsConsistent); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index eeb690ce3ce..2ca86e759ae 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -242,6 +242,16 @@ bool shouldBuildInForeground(OperationContext* opCtx, } +StringData getInvalidatingReason(const OplogApplication::Mode mode, const bool isDataConsistent) { + if (mode == OplogApplication::Mode::kInitialSync) { + return "initial sync"_sd; + } else if (!isDataConsistent) { + return "minvalid suggests inconsistent snapshot"_sd; + } + + return ""_sd; +} + } // namespace void setOplogCollectionName(ServiceContext* service) { @@ -317,6 +327,7 @@ void writeToImageCollection(OperationContext* opCtx, const BSONObj& op, const BSONObj& image, repl::RetryImageEnum imageKind, + const StringData& invalidatedReason, bool* upsertConfigImage) { AutoGetCollection autoColl(opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX); repl::ImageEntry imageEntry; @@ -330,7 +341,7 @@ void writeToImageCollection(OperationContext* opCtx, imageEntry.setImage(image); if (image.isEmpty()) { imageEntry.setInvalidated(true); - imageEntry.setInvalidatedReason("initial sync"_sd); + imageEntry.setInvalidatedReason(invalidatedReason); } UpdateRequest request(NamespaceString::kConfigImagesNamespace); @@ -1120,6 +1131,7 @@ Status applyOperation_inlock(OperationContext* opCtx, const BSONObj& op, bool alwaysUpsert, OplogApplication::Mode mode, + const bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { LOG(3) << "applying op: " << redact(op) << ", oplog application mode: " << OplogApplication::modeToString(mode); @@ -1489,7 +1501,7 @@ Status applyOperation_inlock(OperationContext* opCtx, imageKind = repl::RetryImage_parse( IDLParserErrorContext("applyUpdate"), op.getField(OplogEntryBase::kNeedsRetryImageFieldName).String()); - if (mode != OplogApplication::Mode::kInitialSync) { + if (mode != OplogApplication::Mode::kInitialSync && isDataConsistent) { if (imageKind == repl::RetryImageEnum::kPreImage) { request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD); } else if (imageKind == repl::RetryImageEnum::kPostImage) { @@ -1555,8 +1567,12 @@ Status applyOperation_inlock(OperationContext* opCtx, } if (op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) { invariant(imageKind); - writeToImageCollection( - opCtx, op, ur.requestedDocImage, *imageKind, &upsertConfigImage); + writeToImageCollection(opCtx, + op, + ur.requestedDocImage, + *imageKind, + getInvalidatingReason(mode, isDataConsistent), + &upsertConfigImage); } wuow.commit(); return Status::OK(); @@ -1602,7 +1618,7 @@ Status applyOperation_inlock(OperationContext* opCtx, DeleteRequest request(requestNss); request.setQuery(deleteCriteria); if (mode != OplogApplication::Mode::kInitialSync && - op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) { + op.hasField(OplogEntryBase::kNeedsRetryImageFieldName) && isDataConsistent) { request.setReturnDeleted(true); } boost::optional<BSONObj> preImage = deleteObject(opCtx, collection, request); @@ -1613,6 +1629,7 @@ Status applyOperation_inlock(OperationContext* opCtx, op, preImage.value_or(BSONObj()), repl::RetryImageEnum::kPreImage, + getInvalidatingReason(mode, isDataConsistent), &upsertConfigImage); } } else diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 40cd7fea97e..6e29250efce 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -229,6 +229,7 @@ Status applyOperation_inlock(OperationContext* opCtx, const BSONObj& op, bool alwaysUpsert, OplogApplication::Mode mode, + const bool isDataConsistent, IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {}); /** diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 2b5c94bcf64..ba87dafd096 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -372,7 +372,8 @@ std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) { // static Status SyncTail::syncApply(OperationContext* opCtx, const BSONObj& op, - OplogApplication::Mode oplogApplicationMode) { + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent) { // Count each log op application as a separate operation, for reporting purposes CurOp individualOp(opCtx); @@ -395,8 +396,13 @@ Status SyncTail::syncApply(OperationContext* opCtx, // wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP // mode (similar to initial sync) instead so we do not accidentally ignore real errors. bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync); - Status status = applyOperation_inlock( - opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats); + Status status = applyOperation_inlock(opCtx, + db, + op, + shouldAlwaysUpsert, + oplogApplicationMode, + isDataConsistent, + incrementOpsAppliedStats); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } @@ -553,7 +559,8 @@ void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors, const SyncTail::MultiSyncApplyFunc& func, SyncTail* st, std::vector<Status>* statusVector, - std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo) { + std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo, + const bool isDataConsistent) { invariant(writerVectors.size() == statusVector->size()); for (size_t i = 0; i < writerVectors.size(); i++) { if (!writerVectors[i].empty()) { @@ -562,10 +569,11 @@ void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors, st, &writer = writerVectors.at(i), &status = statusVector->at(i), - &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i) + &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i), + isDataConsistent ] { auto opCtx = cc().makeOperationContext(); - status = func(opCtx.get(), &writer, st, &workerMultikeyPathInfo); + status = func(opCtx.get(), &writer, st, &workerMultikeyPathInfo, isDataConsistent); })); } } @@ -1392,7 +1400,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, Status multiSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent) { invariant(st); UnreplicatedWritesBlock uwb(opCtx); @@ -1421,7 +1430,7 @@ Status multiSyncApply(OperationContext* opCtx, // If we are successful in grouping and applying inserts, advance the current iterator // past the end of the inserted group of entries. - auto groupResult = insertGroup.groupAndApplyInserts(it); + auto groupResult = insertGroup.groupAndApplyInserts(it, isDataConsistent); if (groupResult.isOK()) { it = groupResult.getValue(); continue; @@ -1429,7 +1438,8 @@ Status multiSyncApply(OperationContext* opCtx, // If we didn't create a group, try to apply the op individually. try { - const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode); + const Status status = + SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode, isDataConsistent); if (!status.isOK()) { severe() << "Error applying operation (" << redact(entry.toBSON()) @@ -1464,7 +1474,8 @@ Status multiSyncApply(OperationContext* opCtx, Status multiInitialSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo) { + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent) { invariant(st); UnreplicatedWritesBlock uwb(opCtx); @@ -1481,8 +1492,8 @@ Status multiInitialSyncApply(OperationContext* opCtx, for (auto it = ops->begin(); it != ops->end(); ++it) { auto& entry = **it; try { - const Status s = - SyncTail::syncApply(opCtx, entry.raw, OplogApplication::Mode::kInitialSync); + const Status s = SyncTail::syncApply( + opCtx, entry.raw, OplogApplication::Mode::kInitialSync, isDataConsistent); if (!s.isOK()) { // In initial sync, update operations can cause documents to be missed during // collection cloning. As a result, it is possible that a document that we @@ -1600,6 +1611,10 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); + // Read `minValid` prior to it possibly being written to. + const bool isDataConsistent = + _consistencyMarkers->getMinValid(opCtx) < ops.front().getOpTime(); + // Reset consistency markers in case the node fails while applying ops. if (!_options.skipWritesToOplog) { _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp()); @@ -1608,7 +1623,13 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O { std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK()); - applyOps(writerVectors, _writerPool, _applyFunc, this, &statusVector, &multikeyVector); + applyOps(writerVectors, + _writerPool, + _applyFunc, + this, + &statusVector, + &multikeyVector, + isDataConsistent); _writerPool->waitForIdle(); // If any of the statuses is not ok, return error. diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 6e7da616c6b..fff5600707e 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -66,11 +66,11 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = - stdx::function<Status(OperationContext* opCtx, - MultiApplier::OperationPtrs* ops, - SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo)>; + using MultiSyncApplyFunc = stdx::function<Status(OperationContext* opCtx, + MultiApplier::OperationPtrs* ops, + SyncTail* st, + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent)>; /** * Maximum number of operations in each batch that can be applied using multiApply(). @@ -103,7 +103,8 @@ public: */ static Status syncApply(OperationContext* opCtx, const BSONObj& o, - OplogApplication::Mode oplogApplicationMode); + OplogApplication::Mode oplogApplicationMode, + const bool isDataConsistent); /** * @@ -315,12 +316,14 @@ private: Status multiSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent); Status multiInitialSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, SyncTail* st, - WorkerMultikeyPathInfo* workerMultikeyPathInfo); + WorkerMultikeyPathInfo* workerMultikeyPathInfo, + const bool isDataConsistent); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 4f010a9ad2d..d18a1e97428 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -225,9 +225,12 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) { TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) { NamespaceString nss("test.t"); auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(), - ExceptionFor<ErrorCodes::NamespaceNotFound>); + ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(), + op.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */) + .ignore(), + ExceptionFor<ErrorCodes::NamespaceNotFound>); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) { @@ -241,9 +244,12 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) { createDatabase(_opCtx.get(), nss.db()); NamespaceString otherNss(nss.getSisterNS("othername")); auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, UUID::gen()); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(), - ExceptionFor<ErrorCodes::NamespaceNotFound>); + ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(), + op.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */) + .ignore(), + ExceptionFor<ErrorCodes::NamespaceNotFound>); } TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) { @@ -261,9 +267,12 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) { // which in the case of this test just ignores such errors. This tests mostly that we don't // implicitly create the collection and lock the database in MODE_X. auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(), - ExceptionFor<ErrorCodes::NamespaceNotFound>); + ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(), + op.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */) + .ignore(), + ExceptionFor<ErrorCodes::NamespaceNotFound>); ASSERT_FALSE(collectionExists(_opCtx.get(), nss)); } @@ -337,7 +346,8 @@ TEST_F(SyncTailTest, SyncApplyCommand) { }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync)); + ASSERT_OK(SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, true /* isDataConsistent */)); ASSERT_TRUE(applyCmdCalled); } @@ -351,7 +361,9 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { << "t")); // This test relies on the namespace type check in applyCommand_inlock(). ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync).ignore(), + SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kInitialSync, true /* isDataConsistent */) + .ignore(), ExceptionFor<ErrorCodes::InvalidNamespace>); } @@ -375,7 +387,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, auto applyOperationFn = [&operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsToApply, SyncTail* st, - WorkerMultikeyPathInfo*) -> Status { + WorkerMultikeyPathInfo*, + const bool isDataConsistent) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); } @@ -431,7 +444,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH [&mutex, &operationsApplied](OperationContext* opCtx, MultiApplier::OperationPtrs* operationsForWriterThreadToApply, SyncTail* st, - WorkerMultikeyPathInfo*) -> Status { + WorkerMultikeyPathInfo*, + const bool isDataConsistent) -> Status { stdx::lock_guard<stdx::mutex> lock(mutex); operationsApplied.emplace_back(); for (auto&& opPtr : *operationsForWriterThreadToApply) { @@ -493,7 +507,8 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK( + multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); // Collection should be created after SyncTail::syncApply() processes operation. ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); } @@ -504,7 +519,7 @@ void testWorkerMultikeyPaths(OperationContext* opCtx, SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&op}; - ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT_EQ(pathInfo.size(), numPaths); } @@ -560,7 +575,8 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) { SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); WorkerMultikeyPathInfo pathInfo; MultiApplier::OperationPtrs ops = {&opA, &opB}; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK( + multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT_EQ(pathInfo.size(), 2UL); } } @@ -601,8 +617,9 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss); SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); MultiApplier::OperationPtrs ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, - multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); + ASSERT_EQUALS( + ErrorCodes::InvalidOptions, + multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr, true /* isDataConsistent */)); } TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) { @@ -613,8 +630,9 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr); MultiApplier::OperationPtrs ops = {&op}; - ASSERT_EQUALS(ErrorCodes::InvalidOptions, - multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr)); + ASSERT_EQUALS( + ErrorCodes::InvalidOptions, + multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr, true /* isDataConsistent */)); } TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) { @@ -980,7 +998,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyin {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiInitialSyncApply( + _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT(syncTail.called); } @@ -1000,7 +1019,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2)); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiInitialSyncApply( + _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); // Since the missing document is not found on the sync source, the collection referenced by // the failed operation should not be automatically created. @@ -1024,7 +1044,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) { auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiInitialSyncApply( + _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT_EQUALS(syncTail.numFetched, 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1049,7 +1070,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound) auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3); MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiInitialSyncApply( + _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT_EQUALS(syncTail.numFetched, 0U); OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns()); @@ -1072,7 +1094,8 @@ TEST_F(SyncTailTest, {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument); MultiApplier::OperationPtrs ops = {&op}; WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); + ASSERT_OK(multiInitialSyncApply( + _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */)); ASSERT_EQUALS(syncTail.numFetched, 1U); // The collection referenced by "ns" in the failed operation is automatically created to hold @@ -1528,8 +1551,10 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) { auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); startCapturingLogMessages(); - ASSERT_OK( - SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary)); + ASSERT_OK(SyncTail::syncApply(_opCtx.get(), + entry.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */)); // Use a builder for easier escaping. We expect the operation to be logged. StringBuilder expected; @@ -1550,10 +1575,12 @@ TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) { auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); startCapturingLogMessages(); - ASSERT_THROWS( - SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary) - .transitional_ignore(), - ExceptionFor<ErrorCodes::NamespaceNotFound>); + ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(), + entry.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */) + .transitional_ignore(), + ExceptionFor<ErrorCodes::NamespaceNotFound>); // Use a builder for easier escaping. We expect the operation to *not* be logged // even thought it was slow, since we couldn't apply it successfully. @@ -1576,8 +1603,10 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) { auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {}); startCapturingLogMessages(); - ASSERT_OK( - SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary)); + ASSERT_OK(SyncTail::syncApply(_opCtx.get(), + entry.toBSON(), + OplogApplication::Mode::kSecondary, + true /* isDataConsistent */)); // Use a builder for easier escaping. We expect the operation to *not* be logged, // since it wasn't slow to apply. diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index f8de8991db5..a87e368eedb 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -176,8 +176,10 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError, }; ASSERT_TRUE(_opCtx->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_opCtx.get())); - ASSERT_EQ(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kSecondary), - expectedError); + ASSERT_EQ( + SyncTail::syncApply( + _opCtx.get(), op, OplogApplication::Mode::kSecondary, true /* isDataConsistent */), + expectedError); ASSERT_EQ(applyOpCalled, expectedApplyOpCalled); } @@ -203,7 +205,8 @@ Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) { opsPtrs.push_back(&op); } WorkerMultikeyPathInfo pathInfo; - return multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); + return multiSyncApply( + _opCtx.get(), &opsPtrs, &syncTail, &pathInfo, true /* isDataConsistent */); } Status SyncTailTest::runOpInitialSync(const OplogEntry& op) { @@ -221,7 +224,8 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) { opsPtrs.push_back(&op); } WorkerMultikeyPathInfo pathInfo; - return multiInitialSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo); + return multiInitialSyncApply( + _opCtx.get(), &opsPtrs, &syncTail, &pathInfo, true /* isDataConsistent */); } diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h index 25d682fcada..0b30db22f39 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.h +++ b/src/mongo/db/repl/sync_tail_test_fixture.h @@ -115,7 +115,8 @@ protected: static Status noopApplyOperationFn(OperationContext*, MultiApplier::OperationPtrs*, SyncTail* st, - WorkerMultikeyPathInfo*) { + WorkerMultikeyPathInfo*, + const bool isDataConsistent) { return Status::OK(); } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index d23ef54d90b..e27cd802129 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -251,8 +251,13 @@ protected: mongo::unittest::log() << "op: " << *i << endl; } repl::UnreplicatedWritesBlock uwb(&_opCtx); - uassertStatusOK(applyOperation_inlock( - &_opCtx, ctx.db(), *i, false, OplogApplication::Mode::kSecondary)); + const bool dataIsConsistent = true; + uassertStatusOK(applyOperation_inlock(&_opCtx, + ctx.db(), + *i, + false, + OplogApplication::Mode::kSecondary, + dataIsConsistent)); } } } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 83891033cfe..ac2db9b5f4b 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -100,7 +100,7 @@ public: private: OperationContext* _opCtx; }; -} +} // namespace const auto kIndexVersion = IndexDescriptor::IndexVersion::kV2; @@ -2325,14 +2325,16 @@ public: auto applyOperationFn = [&](OperationContext* opCtx, std::vector<const repl::OplogEntry*>* operationsToApply, repl::SyncTail* st, - std::vector<MultikeyPathInfo>* pathInfo) -> Status { + std::vector<MultikeyPathInfo>* pathInfo, + const bool isDataConsistent) -> Status { if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode, MODE_X)) { return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"}; } // Insert the document. A reader without a PBWM lock should not see it yet. - auto status = repl::multiSyncApply(opCtx, operationsToApply, st, pathInfo); + auto status = repl::multiSyncApply( + opCtx, operationsToApply, st, pathInfo, true /* isDataConsistent */); if (!status.isOK()) { return status; } |