summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2021-05-29 23:11:47 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-30 04:04:55 +0000
commit5ced9522d15db7887fd32b82f50ec4b9eed46c70 (patch)
tree93a4e514a270c56e6313e1881609131f175be40e
parente4faa4d08720df067a62d8e48f66944a2f2b76cc (diff)
downloadmongo-5ced9522d15db7887fd32b82f50ec4b9eed46c70.tar.gz
SERVER-57173: Add distinction to oplog application when data is inconsistent.
-rw-r--r--jstests/replsets/invalidate_images_when_minvalid.js162
-rw-r--r--src/mongo/db/repl/applier_helpers.cpp5
-rw-r--r--src/mongo/db/repl/applier_helpers.h3
-rw-r--r--src/mongo/db/repl/apply_ops.cpp12
-rw-r--r--src/mongo/db/repl/do_txn.cpp9
-rw-r--r--src/mongo/db/repl/oplog.cpp27
-rw-r--r--src/mongo/db/repl/oplog.h1
-rw-r--r--src/mongo/db/repl/sync_tail.cpp47
-rw-r--r--src/mongo/db/repl/sync_tail.h19
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp95
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp12
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.h3
-rw-r--r--src/mongo/dbtests/repltests.cpp9
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp8
14 files changed, 335 insertions, 77 deletions
diff --git a/jstests/replsets/invalidate_images_when_minvalid.js b/jstests/replsets/invalidate_images_when_minvalid.js
new file mode 100644
index 00000000000..53b39956dee
--- /dev/null
+++ b/jstests/replsets/invalidate_images_when_minvalid.js
@@ -0,0 +1,162 @@
+/**
+ * When a minvalid timestamp `T` is set, applying a batch of operations earlier than `T` do not have
+ * a consistent snapshot of data. In this case, we must write an invalidated document to the
+ * `config.image_collection` for a retryable `findAndModify` that does not store images in the
+ * oplog. This test ensures this behavior.
+ *
+ * To test this we:
+ * -- Hold the stable timestamp back on a primary
+ * -- Perform a number of retryable `findAndModify`s.
+ * -- Manually set `minvalid` to a value inbetween what performed `findAndModify`s.
+ * -- Restart the primary
+ * -- Verify that operations after the `minvalid` timestamp are marked "valid" and those prior are
+ * marked "invalid". We set the `replBatchLimitOperations` to one to achieve this. This is
+ * necessary for the test due to manually setting minvalid. In production `minvalid` should
+ * always be unset, or set to the top of oplog.
+ *
+ * Because we restart the node, this only works on storage engines that persist data.
+ *
+ * @tags: [requires_majority_read_concern, requires_persistence]
+ */
+
+// Skip db hash check because replset cannot reach consistent state.
+TestData.skipCheckDBHashes = true;
+
+(function() {
+ "use strict";
+
+ let replTest = new ReplSetTest({name: "invalidate_images_when_minvalid", nodes: 1});
+
+ let nodes = replTest.startSet({
+ setParameter:
+ {storeFindAndModifyImagesInSideCollection: true, replBatchLimitOperations: 1}
+ });
+ replTest.initiate();
+ let primary = replTest.getPrimary();
+ let coll = primary.getDB("test")["invalidating"];
+ let images = primary.getDB("config")["image_collection"];
+ let minvalid = primary.getDB("local")["replset.minvalid"];
+
+ // Disable snapshotting to have a restart perform replication recovery on every operation in the
+ // test.
+ assert.commandWorked(primary.adminCommand({
+ "configureFailPoint": 'disableSnapshotting',
+ "mode": 'alwaysOn',
+ }));
+
+ function doRetryableFindAndModify(lsid, query, postImage, remove) {
+ // Performs a retryable findAndModify. The document matched by query must exist. The
+ // findAndModify will either be an update that sets the documents `updated: 1` or
+ // removes the document. Returns the timestamp associated with the generated oplog entry.
+ //
+ // `postImage` and `remove` are booleans. The server rejects removes that ask for a post
+ // image.
+ let cmd = {
+ findandmodify: coll.getName(),
+ lsid: {id: lsid},
+ txnNumber: NumberLong(1),
+ stmtId: NumberInt(0),
+ query: query,
+ new: postImage,
+ upsert: false,
+ };
+
+ if (remove) {
+ cmd["remove"] = true;
+ } else {
+ cmd["update"] = {$set: {updated: 1}};
+ }
+
+ return assert.commandWorked(coll.runCommand(cmd))["operationTime"];
+ }
+
+ // Each write contains arguments for calling `doRetryableFindAndModify`.
+ let invalidatedWrites = [
+ {uuid: UUID(), query: {_id: 1}, postImage: false, remove: false},
+ {uuid: UUID(), query: {_id: 2}, postImage: true, remove: false},
+ {uuid: UUID(), query: {_id: 3}, postImage: false, remove: true}
+ ];
+ let validWrites = [
+ {uuid: UUID(), query: {_id: 4}, postImage: false, remove: false},
+ {uuid: UUID(), query: {_id: 5}, postImage: true, remove: false},
+ {uuid: UUID(), query: {_id: 6}, postImage: false, remove: true}
+ ];
+
+ // Insert each document a query should match.
+ for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ assert.commandWorked(coll.insert(invalidatedWrites[idx]["query"]));
+ }
+ for (let idx = 0; idx < validWrites.length; ++idx) {
+ assert.commandWorked(coll.insert(validWrites[idx]["query"]));
+ }
+
+ // Perform `findAndModify`s. Record the timestamp of the last `invalidatedWrites` to set
+ // minvalid
+ // with.
+ let lastInvalidatedImageTs = null;
+ for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ let write = invalidatedWrites[idx];
+ lastInvalidatedImageTs = doRetryableFindAndModify(
+ write['uuid'], write['query'], write['postImage'], write['remove']);
+ }
+ for (let idx = 0; idx < validWrites.length; ++idx) {
+ let write = validWrites[idx];
+ doRetryableFindAndModify(
+ write['uuid'], write['query'], write['postImage'], write['remove']);
+ }
+
+ let imageDocs = [];
+ images.find().forEach((x) => {
+ imageDocs.push(x);
+ });
+
+ jsTestLog(tojson({"MinValid": lastInvalidatedImageTs, "Pre-restart images": imageDocs}));
+ assert.commandWorked(minvalid.update({}, {$set: {ts: lastInvalidatedImageTs}}));
+
+ replTest.restart(primary, undefined, true);
+
+ primary = replTest.getPrimary();
+ coll = primary.getDB("test")["invalidating"];
+ images = primary.getDB("config")["image_collection"];
+ minvalid = primary.getDB("local")["replset.minvalid"];
+
+ imageDocs = [];
+ images.find().forEach((x) => {
+ imageDocs.push(x);
+ });
+ jsTestLog(tojson({"Post-restart images": imageDocs}));
+
+ for (let idx = 0; idx < invalidatedWrites.length; ++idx) {
+ let write = invalidatedWrites[idx];
+ let image = images.findOne({"_id.id": write["uuid"]});
+
+ assert.eq(1, image["txnNum"]);
+ assert.eq(true, image["invalidated"]);
+ assert.eq("minvalid suggests inconsistent snapshot", image["invalidatedReason"]);
+ if (write["postImage"]) {
+ assert.eq("postImage", image["imageKind"]);
+ } else {
+ assert.eq("preImage", image["imageKind"]);
+ }
+ }
+
+ for (let idx = 0; idx < validWrites.length; ++idx) {
+ let write = validWrites[idx];
+ let image = images.findOne({"_id.id": write["uuid"]});
+
+ assert.eq(1, image["txnNum"]);
+ assert.eq(false, image["invalidated"]);
+ if (write["postImage"]) {
+ assert.eq("postImage", image["imageKind"]);
+
+ let postImage = write["query"];
+ postImage["updated"] = 1;
+ assert.eq(postImage, image["image"]);
+ } else {
+ assert.eq("preImage", image["imageKind"]);
+ assert.eq(write["query"], image["image"]);
+ }
+ }
+
+ replTest.stopSet();
+})();
diff --git a/src/mongo/db/repl/applier_helpers.cpp b/src/mongo/db/repl/applier_helpers.cpp
index 9bcb21d42b1..3f78613867a 100644
--- a/src/mongo/db/repl/applier_helpers.cpp
+++ b/src/mongo/db/repl/applier_helpers.cpp
@@ -75,7 +75,8 @@ InsertGroup::InsertGroup(ApplierHelpers::OperationPtrs* ops,
InsertGroup::Mode mode)
: _doNotGroupBeforePoint(ops->cbegin()), _end(ops->cend()), _opCtx(opCtx), _mode(mode) {}
-StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIterator it) {
+StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(
+ ConstIterator it, const bool isDataConsistent) {
const auto& entry = **it;
// The following conditions must be met before attempting to group the oplog entries starting
@@ -188,7 +189,7 @@ StatusWith<InsertGroup::ConstIterator> InsertGroup::groupAndApplyInserts(ConstIt
auto groupedInsertObj = groupedInsertBuilder.done();
try {
// Apply the group of inserts.
- uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode));
+ uassertStatusOK(SyncTail::syncApply(_opCtx, groupedInsertObj, _mode, isDataConsistent));
// It succeeded, advance the oplogEntriesIterator to the end of the
// group of inserts.
return endOfGroupableOpsIterator - 1;
diff --git a/src/mongo/db/repl/applier_helpers.h b/src/mongo/db/repl/applier_helpers.h
index fae2ce2404e..d4c5e4a3b5a 100644
--- a/src/mongo/db/repl/applier_helpers.h
+++ b/src/mongo/db/repl/applier_helpers.h
@@ -75,7 +75,8 @@ public:
* If the grouped insert is applied successfully, returns the iterator to the last standalone
* insert operation included in the applied grouped insert.
*/
- StatusWith<ConstIterator> groupAndApplyInserts(ConstIterator oplogEntriesIterator);
+ StatusWith<ConstIterator> groupAndApplyInserts(ConstIterator oplogEntriesIterator,
+ const bool isDataConsistent);
private:
// _doNotGroupBeforePoint is used to prevent retrying bad group inserts by marking the final op
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 563caf6cc12..86d8827b1f4 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -178,8 +178,9 @@ Status _applyOps(OperationContext* opCtx,
OldClientContext ctx(opCtx, nss.ns());
+ const bool isDataConsistent = true;
status = repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
+ opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode, isDataConsistent);
if (!status.isOK())
return status;
@@ -234,8 +235,13 @@ Status _applyOps(OperationContext* opCtx,
// ops doesn't stop the applyOps from trying to process the rest of the
// ops. This is to leave the door open to parallelizing CRUD op
// application in the future.
- return repl::applyOperation_inlock(
- opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
+ const bool dataIsConsistent = true;
+ return repl::applyOperation_inlock(opCtx,
+ ctx.db(),
+ opObj,
+ alwaysUpsert,
+ oplogApplicationMode,
+ dataIsConsistent);
}
auto fieldO = opObj["o"];
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index a8b5e0722ea..31a7263b752 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -171,8 +171,13 @@ Status _doTxn(OperationContext* opCtx,
// Setting alwaysUpsert to true makes sense only during oplog replay, and doTxn commands
// should not be executed during oplog replay.
const bool alwaysUpsert = false;
- status = repl::applyOperation_inlock(
- opCtx, db, opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOpsCmd);
+ const bool dataIsConsistent = true;
+ status = repl::applyOperation_inlock(opCtx,
+ db,
+ opObj,
+ alwaysUpsert,
+ repl::OplogApplication::Mode::kApplyOpsCmd,
+ dataIsConsistent);
if (!status.isOK())
return status;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index eeb690ce3ce..2ca86e759ae 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -242,6 +242,16 @@ bool shouldBuildInForeground(OperationContext* opCtx,
}
+StringData getInvalidatingReason(const OplogApplication::Mode mode, const bool isDataConsistent) {
+ if (mode == OplogApplication::Mode::kInitialSync) {
+ return "initial sync"_sd;
+ } else if (!isDataConsistent) {
+ return "minvalid suggests inconsistent snapshot"_sd;
+ }
+
+ return ""_sd;
+}
+
} // namespace
void setOplogCollectionName(ServiceContext* service) {
@@ -317,6 +327,7 @@ void writeToImageCollection(OperationContext* opCtx,
const BSONObj& op,
const BSONObj& image,
repl::RetryImageEnum imageKind,
+ const StringData& invalidatedReason,
bool* upsertConfigImage) {
AutoGetCollection autoColl(opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
repl::ImageEntry imageEntry;
@@ -330,7 +341,7 @@ void writeToImageCollection(OperationContext* opCtx,
imageEntry.setImage(image);
if (image.isEmpty()) {
imageEntry.setInvalidated(true);
- imageEntry.setInvalidatedReason("initial sync"_sd);
+ imageEntry.setInvalidatedReason(invalidatedReason);
}
UpdateRequest request(NamespaceString::kConfigImagesNamespace);
@@ -1120,6 +1131,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
const BSONObj& op,
bool alwaysUpsert,
OplogApplication::Mode mode,
+ const bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
LOG(3) << "applying op: " << redact(op)
<< ", oplog application mode: " << OplogApplication::modeToString(mode);
@@ -1489,7 +1501,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
imageKind = repl::RetryImage_parse(
IDLParserErrorContext("applyUpdate"),
op.getField(OplogEntryBase::kNeedsRetryImageFieldName).String());
- if (mode != OplogApplication::Mode::kInitialSync) {
+ if (mode != OplogApplication::Mode::kInitialSync && isDataConsistent) {
if (imageKind == repl::RetryImageEnum::kPreImage) {
request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD);
} else if (imageKind == repl::RetryImageEnum::kPostImage) {
@@ -1555,8 +1567,12 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
if (op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) {
invariant(imageKind);
- writeToImageCollection(
- opCtx, op, ur.requestedDocImage, *imageKind, &upsertConfigImage);
+ writeToImageCollection(opCtx,
+ op,
+ ur.requestedDocImage,
+ *imageKind,
+ getInvalidatingReason(mode, isDataConsistent),
+ &upsertConfigImage);
}
wuow.commit();
return Status::OK();
@@ -1602,7 +1618,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
DeleteRequest request(requestNss);
request.setQuery(deleteCriteria);
if (mode != OplogApplication::Mode::kInitialSync &&
- op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) {
+ op.hasField(OplogEntryBase::kNeedsRetryImageFieldName) && isDataConsistent) {
request.setReturnDeleted(true);
}
boost::optional<BSONObj> preImage = deleteObject(opCtx, collection, request);
@@ -1613,6 +1629,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
op,
preImage.value_or(BSONObj()),
repl::RetryImageEnum::kPreImage,
+ getInvalidatingReason(mode, isDataConsistent),
&upsertConfigImage);
}
} else
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 40cd7fea97e..6e29250efce 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -229,6 +229,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
const BSONObj& op,
bool alwaysUpsert,
OplogApplication::Mode mode,
+ const bool isDataConsistent,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {});
/**
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 2b5c94bcf64..ba87dafd096 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -372,7 +372,8 @@ std::unique_ptr<ThreadPool> SyncTail::makeWriterPool(int threadCount) {
// static
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
- OplogApplication::Mode oplogApplicationMode) {
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent) {
// Count each log op application as a separate operation, for reporting purposes
CurOp individualOp(opCtx);
@@ -395,8 +396,13 @@ Status SyncTail::syncApply(OperationContext* opCtx,
// wants to. We should ignore these errors intelligently while in RECOVERING and STARTUP
// mode (similar to initial sync) instead so we do not accidentally ignore real errors.
bool shouldAlwaysUpsert = (oplogApplicationMode != OplogApplication::Mode::kInitialSync);
- Status status = applyOperation_inlock(
- opCtx, db, op, shouldAlwaysUpsert, oplogApplicationMode, incrementOpsAppliedStats);
+ Status status = applyOperation_inlock(opCtx,
+ db,
+ op,
+ shouldAlwaysUpsert,
+ oplogApplicationMode,
+ isDataConsistent,
+ incrementOpsAppliedStats);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
@@ -553,7 +559,8 @@ void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
const SyncTail::MultiSyncApplyFunc& func,
SyncTail* st,
std::vector<Status>* statusVector,
- std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo) {
+ std::vector<WorkerMultikeyPathInfo>* workerMultikeyPathInfo,
+ const bool isDataConsistent) {
invariant(writerVectors.size() == statusVector->size());
for (size_t i = 0; i < writerVectors.size(); i++) {
if (!writerVectors[i].empty()) {
@@ -562,10 +569,11 @@ void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors,
st,
&writer = writerVectors.at(i),
&status = statusVector->at(i),
- &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i)
+ &workerMultikeyPathInfo = workerMultikeyPathInfo->at(i),
+ isDataConsistent
] {
auto opCtx = cc().makeOperationContext();
- status = func(opCtx.get(), &writer, st, &workerMultikeyPathInfo);
+ status = func(opCtx.get(), &writer, st, &workerMultikeyPathInfo, isDataConsistent);
}));
}
}
@@ -1392,7 +1400,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx,
Status multiSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent) {
invariant(st);
UnreplicatedWritesBlock uwb(opCtx);
@@ -1421,7 +1430,7 @@ Status multiSyncApply(OperationContext* opCtx,
// If we are successful in grouping and applying inserts, advance the current iterator
// past the end of the inserted group of entries.
- auto groupResult = insertGroup.groupAndApplyInserts(it);
+ auto groupResult = insertGroup.groupAndApplyInserts(it, isDataConsistent);
if (groupResult.isOK()) {
it = groupResult.getValue();
continue;
@@ -1429,7 +1438,8 @@ Status multiSyncApply(OperationContext* opCtx,
// If we didn't create a group, try to apply the op individually.
try {
- const Status status = SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode);
+ const Status status =
+ SyncTail::syncApply(opCtx, entry.raw, oplogApplicationMode, isDataConsistent);
if (!status.isOK()) {
severe() << "Error applying operation (" << redact(entry.toBSON())
@@ -1464,7 +1474,8 @@ Status multiSyncApply(OperationContext* opCtx,
Status multiInitialSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo) {
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent) {
invariant(st);
UnreplicatedWritesBlock uwb(opCtx);
@@ -1481,8 +1492,8 @@ Status multiInitialSyncApply(OperationContext* opCtx,
for (auto it = ops->begin(); it != ops->end(); ++it) {
auto& entry = **it;
try {
- const Status s =
- SyncTail::syncApply(opCtx, entry.raw, OplogApplication::Mode::kInitialSync);
+ const Status s = SyncTail::syncApply(
+ opCtx, entry.raw, OplogApplication::Mode::kInitialSync, isDataConsistent);
if (!s.isOK()) {
// In initial sync, update operations can cause documents to be missed during
// collection cloning. As a result, it is possible that a document that we
@@ -1600,6 +1611,10 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
+ // Read `minValid` prior to it possibly being written to.
+ const bool isDataConsistent =
+ _consistencyMarkers->getMinValid(opCtx) < ops.front().getOpTime();
+
// Reset consistency markers in case the node fails while applying ops.
if (!_options.skipWritesToOplog) {
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, Timestamp());
@@ -1608,7 +1623,13 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
{
std::vector<Status> statusVector(_writerPool->getStats().numThreads, Status::OK());
- applyOps(writerVectors, _writerPool, _applyFunc, this, &statusVector, &multikeyVector);
+ applyOps(writerVectors,
+ _writerPool,
+ _applyFunc,
+ this,
+ &statusVector,
+ &multikeyVector,
+ isDataConsistent);
_writerPool->waitForIdle();
// If any of the statuses is not ok, return error.
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 6e7da616c6b..fff5600707e 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -66,11 +66,11 @@ class OpTime;
*/
class SyncTail {
public:
- using MultiSyncApplyFunc =
- stdx::function<Status(OperationContext* opCtx,
- MultiApplier::OperationPtrs* ops,
- SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo)>;
+ using MultiSyncApplyFunc = stdx::function<Status(OperationContext* opCtx,
+ MultiApplier::OperationPtrs* ops,
+ SyncTail* st,
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent)>;
/**
* Maximum number of operations in each batch that can be applied using multiApply().
@@ -103,7 +103,8 @@ public:
*/
static Status syncApply(OperationContext* opCtx,
const BSONObj& o,
- OplogApplication::Mode oplogApplicationMode);
+ OplogApplication::Mode oplogApplicationMode,
+ const bool isDataConsistent);
/**
*
@@ -315,12 +316,14 @@ private:
Status multiSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo);
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent);
Status multiInitialSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
SyncTail* st,
- WorkerMultikeyPathInfo* workerMultikeyPathInfo);
+ WorkerMultikeyPathInfo* workerMultikeyPathInfo,
+ const bool isDataConsistent);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 4f010a9ad2d..d18a1e97428 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -225,9 +225,12 @@ auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) {
NamespaceString nss("test.t");
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(),
+ op.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */)
+ .ignore(),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentDatabaseMissing) {
@@ -241,9 +244,12 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionLookupByUUIDFails) {
createDatabase(_opCtx.get(), nss.db());
NamespaceString otherNss(nss.getSisterNS("othername"));
auto op = makeOplogEntry(OpTypeEnum::kInsert, otherNss, UUID::gen());
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(),
+ op.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */)
+ .ignore(),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
}
TEST_F(SyncTailTest, SyncApplyDeleteDocumentCollectionLookupByUUIDFails) {
@@ -261,9 +267,12 @@ TEST_F(SyncTailTest, SyncApplyInsertDocumentCollectionMissing) {
// which in the case of this test just ignores such errors. This tests mostly that we don't
// implicitly create the collection and lock the database in MODE_X.
auto op = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op.toBSON(), OplogApplication::Mode::kSecondary).ignore(),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(),
+ op.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */)
+ .ignore(),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
ASSERT_FALSE(collectionExists(_opCtx.get(), nss));
}
@@ -337,7 +346,8 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync));
+ ASSERT_OK(SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, true /* isDataConsistent */));
ASSERT_TRUE(applyCmdCalled);
}
@@ -351,7 +361,9 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
<< "t"));
// This test relies on the namespace type check in applyCommand_inlock().
ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync).ignore(),
+ SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kInitialSync, true /* isDataConsistent */)
+ .ignore(),
ExceptionFor<ErrorCodes::InvalidNamespace>);
}
@@ -375,7 +387,8 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
auto applyOperationFn = [&operationsApplied](OperationContext* opCtx,
MultiApplier::OperationPtrs* operationsToApply,
SyncTail* st,
- WorkerMultikeyPathInfo*) -> Status {
+ WorkerMultikeyPathInfo*,
+ const bool isDataConsistent) -> Status {
for (auto&& opPtr : *operationsToApply) {
operationsApplied.push_back(*opPtr);
}
@@ -431,7 +444,8 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
[&mutex, &operationsApplied](OperationContext* opCtx,
MultiApplier::OperationPtrs* operationsForWriterThreadToApply,
SyncTail* st,
- WorkerMultikeyPathInfo*) -> Status {
+ WorkerMultikeyPathInfo*,
+ const bool isDataConsistent) -> Status {
stdx::lock_guard<stdx::mutex> lock(mutex);
operationsApplied.emplace_back();
for (auto&& opPtr : *operationsForWriterThreadToApply) {
@@ -493,7 +507,8 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
- ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(
+ multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
// Collection should be created after SyncTail::syncApply() processes operation.
ASSERT_TRUE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
}
@@ -504,7 +519,7 @@ void testWorkerMultikeyPaths(OperationContext* opCtx,
SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&op};
- ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiSyncApply(opCtx, &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT_EQ(pathInfo.size(), numPaths);
}
@@ -560,7 +575,8 @@ TEST_F(SyncTailTest, MultiSyncApplyAddsMultipleWorkerMultikeyPathInfo) {
SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
WorkerMultikeyPathInfo pathInfo;
MultiApplier::OperationPtrs ops = {&opA, &opB};
- ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(
+ multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT_EQ(pathInfo.size(), 2UL);
}
}
@@ -601,8 +617,9 @@ TEST_F(SyncTailTest, MultiSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
auto op = makeCreateCollectionOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss);
SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
MultiApplier::OperationPtrs ops = {&op};
- ASSERT_EQUALS(ErrorCodes::InvalidOptions,
- multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidOptions,
+ multiSyncApply(_opCtx.get(), &ops, &syncTail, nullptr, true /* isDataConsistent */));
}
TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMakeUUID) {
@@ -613,8 +630,9 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyFailsWhenCollectionCreationTriesToMake
SyncTail syncTail(nullptr, nullptr, nullptr, {}, nullptr);
MultiApplier::OperationPtrs ops = {&op};
- ASSERT_EQUALS(ErrorCodes::InvalidOptions,
- multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr));
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidOptions,
+ multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, nullptr, true /* isDataConsistent */));
}
TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperations) {
@@ -980,7 +998,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyDisablesDocumentValidationWhileApplyin
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiInitialSyncApply(
+ _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT(syncTail.called);
}
@@ -1000,7 +1019,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyIgnoresUpdateOperationIfDocumentIsMiss
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), BSON("_id" << 0 << "x" << 2));
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiInitialSyncApply(
+ _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
// Since the missing document is not found on the sync source, the collection referenced by
// the failed operation should not be automatically created.
@@ -1024,7 +1044,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsDocumentOnNamespaceNotFound) {
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiInitialSyncApply(
+ _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT_EQUALS(syncTail.numFetched, 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1049,7 +1070,8 @@ TEST_F(SyncTailTest, MultiInitialSyncApplySkipsIndexCreationOnNamespaceNotFound)
auto op3 = makeInsertDocumentOplogEntry({Timestamp(Seconds(4), 0), 1LL}, nss, doc3);
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiInitialSyncApply(
+ _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT_EQUALS(syncTail.numFetched, 0U);
OplogInterfaceLocal collectionReader(_opCtx.get(), nss.ns());
@@ -1072,7 +1094,8 @@ TEST_F(SyncTailTest,
{Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument);
MultiApplier::OperationPtrs ops = {&op};
WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiInitialSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
+ ASSERT_OK(multiInitialSyncApply(
+ _opCtx.get(), &ops, &syncTail, &pathInfo, true /* isDataConsistent */));
ASSERT_EQUALS(syncTail.numFetched, 1U);
// The collection referenced by "ns" in the failed operation is automatically created to hold
@@ -1528,8 +1551,10 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(
- SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary));
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */));
// Use a builder for easier escaping. We expect the operation to be logged.
StringBuilder expected;
@@ -1550,10 +1575,12 @@ TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_THROWS(
- SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary)
- .transitional_ignore(),
- ExceptionFor<ErrorCodes::NamespaceNotFound>);
+ ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */)
+ .transitional_ignore(),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
// Use a builder for easier escaping. We expect the operation to *not* be logged
// even thought it was slow, since we couldn't apply it successfully.
@@ -1576,8 +1603,10 @@ TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
auto entry = makeOplogEntry(OpTypeEnum::kInsert, nss, {});
startCapturingLogMessages();
- ASSERT_OK(
- SyncTail::syncApply(_opCtx.get(), entry.toBSON(), OplogApplication::Mode::kSecondary));
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ true /* isDataConsistent */));
// Use a builder for easier escaping. We expect the operation to *not* be logged,
// since it wasn't slow to apply.
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index f8de8991db5..a87e368eedb 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -176,8 +176,10 @@ void SyncTailTest::_testSyncApplyCrudOperation(ErrorCodes::Error expectedError,
};
ASSERT_TRUE(_opCtx->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_opCtx.get()));
- ASSERT_EQ(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kSecondary),
- expectedError);
+ ASSERT_EQ(
+ SyncTail::syncApply(
+ _opCtx.get(), op, OplogApplication::Mode::kSecondary, true /* isDataConsistent */),
+ expectedError);
ASSERT_EQ(applyOpCalled, expectedApplyOpCalled);
}
@@ -203,7 +205,8 @@ Status SyncTailTest::runOpsSteadyState(std::vector<OplogEntry> ops) {
opsPtrs.push_back(&op);
}
WorkerMultikeyPathInfo pathInfo;
- return multiSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo);
+ return multiSyncApply(
+ _opCtx.get(), &opsPtrs, &syncTail, &pathInfo, true /* isDataConsistent */);
}
Status SyncTailTest::runOpInitialSync(const OplogEntry& op) {
@@ -221,7 +224,8 @@ Status SyncTailTest::runOpsInitialSync(std::vector<OplogEntry> ops) {
opsPtrs.push_back(&op);
}
WorkerMultikeyPathInfo pathInfo;
- return multiInitialSyncApply(_opCtx.get(), &opsPtrs, &syncTail, &pathInfo);
+ return multiInitialSyncApply(
+ _opCtx.get(), &opsPtrs, &syncTail, &pathInfo, true /* isDataConsistent */);
}
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.h b/src/mongo/db/repl/sync_tail_test_fixture.h
index 25d682fcada..0b30db22f39 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.h
+++ b/src/mongo/db/repl/sync_tail_test_fixture.h
@@ -115,7 +115,8 @@ protected:
static Status noopApplyOperationFn(OperationContext*,
MultiApplier::OperationPtrs*,
SyncTail* st,
- WorkerMultikeyPathInfo*) {
+ WorkerMultikeyPathInfo*,
+ const bool isDataConsistent) {
return Status::OK();
}
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index d23ef54d90b..e27cd802129 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -251,8 +251,13 @@ protected:
mongo::unittest::log() << "op: " << *i << endl;
}
repl::UnreplicatedWritesBlock uwb(&_opCtx);
- uassertStatusOK(applyOperation_inlock(
- &_opCtx, ctx.db(), *i, false, OplogApplication::Mode::kSecondary));
+ const bool dataIsConsistent = true;
+ uassertStatusOK(applyOperation_inlock(&_opCtx,
+ ctx.db(),
+ *i,
+ false,
+ OplogApplication::Mode::kSecondary,
+ dataIsConsistent));
}
}
}
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 83891033cfe..ac2db9b5f4b 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -100,7 +100,7 @@ public:
private:
OperationContext* _opCtx;
};
-}
+} // namespace
const auto kIndexVersion = IndexDescriptor::IndexVersion::kV2;
@@ -2325,14 +2325,16 @@ public:
auto applyOperationFn = [&](OperationContext* opCtx,
std::vector<const repl::OplogEntry*>* operationsToApply,
repl::SyncTail* st,
- std::vector<MultikeyPathInfo>* pathInfo) -> Status {
+ std::vector<MultikeyPathInfo>* pathInfo,
+ const bool isDataConsistent) -> Status {
if (!_opCtx->lockState()->isLockHeldForMode(resourceIdParallelBatchWriterMode,
MODE_X)) {
return {ErrorCodes::BadValue, "Batch applied was not holding PBWM lock in MODE_X"};
}
// Insert the document. A reader without a PBWM lock should not see it yet.
- auto status = repl::multiSyncApply(opCtx, operationsToApply, st, pathInfo);
+ auto status = repl::multiSyncApply(
+ opCtx, operationsToApply, st, pathInfo, true /* isDataConsistent */);
if (!status.isOK()) {
return status;
}