summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js310
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/op_observer_impl.cpp2
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp32
-rw-r--r--src/mongo/db/repl/image_collection_entry.idl3
-rw-r--r--src/mongo/db/repl/oplog.cpp1
6 files changed, 344 insertions, 5 deletions
diff --git a/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js
new file mode 100644
index 00000000000..26c49a816c4
--- /dev/null
+++ b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js
@@ -0,0 +1,310 @@
+/**
+ * Test that retryable findAndModify commands will store pre- and post- images in the appropriate
+ * collections according to the parameter value of `storeFindAndModifyImagesInSideCollection`.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/retryable_writes_util.js");
+
+ if (!RetryableWritesUtil.storageEngineSupportsRetryableWrites(jsTest.options().storageEngine)) {
+ jsTestLog("Retryable writes are not supported, skipping test");
+ return;
+ }
+
+ const numNodes = 2;
+
+ function checkOplogEntry(entry, lsid, txnNum, stmtId, prevTs, retryImageArgs) {
+ assert.neq(entry, null);
+ assert.neq(entry.lsid, null);
+ assert.eq(lsid, entry.lsid.id, entry);
+ assert.eq(txnNum, entry.txnNumber, entry);
+ assert.eq(stmtId, entry.stmtId, entry);
+
+ const oplogPrevTs = entry.prevOpTime.ts;
+ assert.eq(prevTs.getTime(), oplogPrevTs.getTime(), entry);
+
+ if (retryImageArgs.needsRetryImage) {
+ assert.eq(retryImageArgs.imageKind, entry.needsRetryImage, entry);
+ assert(!entry.hasOwnProperty("preImageOpTime"));
+ assert(!entry.hasOwnProperty("postImageOpTime"));
+ } else {
+ assert(!entry.hasOwnProperty("needsRetryImage"));
+ }
+ }
+
+ function checkSessionCatalog(conn, sessionId, txnNum, expectedTs) {
+ const coll = conn.getDB('config').transactions;
+ const sessionDoc = coll.findOne({'_id.id': sessionId});
+
+ assert.eq(txnNum, sessionDoc.txnNum);
+ const writeTs = sessionDoc.lastWriteOpTime.ts;
+ assert.eq(expectedTs.getTime(), writeTs.getTime());
+ }
+
+ function checkImageCollection(conn, sessionInfo, expectedTs, expectedImage, expectedImageKind) {
+ const coll = conn.getDB('config').image_collection;
+ const imageDoc =
+ coll.findOne({'_id.id': sessionInfo.sessionId});
+
+ assert.eq(sessionInfo.txnNum, imageDoc.txnNum, imageDoc);
+ assert.eq(expectedImage, imageDoc.image, imageDoc);
+ assert.eq(expectedImageKind, imageDoc.imageKind, imageDoc);
+ assert.eq(expectedTs.getTime(), imageDoc.ts.getTime(), imageDoc);
+ }
+
+ function runTests(lsid, mainConn, primary, secondary, storeImagesInSideCollection, docId) {
+ const setParam = {
+ setParameter: 1,
+ storeFindAndModifyImagesInSideCollection: storeImagesInSideCollection
+ };
+ primary.adminCommand(setParam);
+
+ let txnNumber = NumberLong(docId);
+ let incrementTxnNumber = function() {
+ txnNumber = NumberLong(txnNumber + 1);
+ };
+
+ const oplog = primary.getDB('local').oplog.rs;
+
+ // ////////////////////////////////////////////////////////////////////////
+ // // Test findAndModify command (upsert)
+
+ let cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ update: {$set: {x: 1}},
+ new: true,
+ upsert: true,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+
+ ////////////////////////////////////////////////////////////////////////
+ // Test findAndModify command (in-place update, return pre-image)
+
+ incrementTxnNumber();
+ cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ update: {$inc: {x: 1}},
+ new: false,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ let expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId});
+ let res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res.value, expectedPreImage);
+ // Get update entry.
+ let updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber});
+ // Check that the findAndModify oplog entry and sessions record has the appropriate fields
+ // and values.
+ const expectedWriteTs = Timestamp(0, 0);
+ const expectedStmtId = 0;
+ let retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"};
+ checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs);
+ checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts);
+ checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts);
+ if (storeImagesInSideCollection) {
+ const sessionInfo = {sessionId: lsid, txnNum: txnNumber};
+ checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPreImage, "preImage");
+ checkImageCollection(secondary, sessionInfo, updateOp.ts, expectedPreImage, "preImage");
+ } else {
+ // The preImage should be stored in the oplog.
+ const preImage =
+ oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.preImageOpTime.ts});
+ assert.eq(expectedPreImage, preImage.o);
+ }
+ // Assert that retrying the command will produce the same response.
+ let retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res, retryRes);
+
+ ////////////////////////////////////////////////////////////////////////
+ // Test findAndModify command (in-place update, return post-image)
+
+ incrementTxnNumber();
+ cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ update: {$inc: {x: 1}},
+ new: true,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ let expectedPostImage = mainConn.getDB('test').user.findOne({_id: docId});
+ // Get update entry.
+ updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber});
+ // Check that the findAndModify oplog entry and sessions record has the appropriate fields
+ // and values.
+ retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "postImage"};
+ checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs);
+ checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts);
+ checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts);
+ if (storeImagesInSideCollection) {
+ const sessionInfo = {sessionId: lsid, txnNum: txnNumber};
+ checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPostImage, "postImage");
+ checkImageCollection(
+ secondary, sessionInfo, updateOp.ts, expectedPostImage, "postImage");
+ } else {
+ // The postImage should be stored in the oplog.
+ const postImage =
+ oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.postImageOpTime.ts});
+ assert.eq(expectedPostImage, postImage.o);
+ }
+ // Assert that retrying the command will produce the same response.
+ retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res, retryRes);
+
+ ////////////////////////////////////////////////////////////////////////
+ // Test findAndModify command (replacement update, return pre-image)
+ incrementTxnNumber();
+ cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ update: {y: 1},
+ new: false,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId});
+ res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ // Get update entry.
+ updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber});
+ retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"};
+ // Check that the findAndModify oplog entry and sessions record has the appropriate fields
+ // and values.
+ checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs);
+ checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts);
+ checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts);
+ if (storeImagesInSideCollection) {
+ const sessionInfo = {sessionId: lsid, txnNum: txnNumber};
+ checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPreImage, "preImage");
+ checkImageCollection(secondary, sessionInfo, updateOp.ts, expectedPreImage, "preImage");
+ } else {
+ // The preImage should be stored in the oplog.
+ const preImage =
+ oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.preImageOpTime.ts});
+ assert.eq(expectedPreImage, preImage.o);
+ }
+
+ // Assert that retrying the command will produce the same response.
+ retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res, retryRes);
+
+ ////////////////////////////////////////////////////////////////////////
+ // Test findAndModify command (replacement update, return post-image)
+
+ incrementTxnNumber();
+ cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ update: {z: 1},
+ new: true,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ expectedPostImage = mainConn.getDB('test').user.findOne({_id: docId});
+
+ // Get update entry.
+ updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber});
+ retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "postImage"};
+ // Check that the findAndModify oplog entry and sessions record has the appropriate fields
+ // and values.
+ checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs);
+ checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts);
+ checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts);
+ if (storeImagesInSideCollection) {
+ const sessionInfo = {sessionId: lsid, txnNum: txnNumber};
+ checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPostImage, "postImage");
+ checkImageCollection(
+ secondary, sessionInfo, updateOp.ts, expectedPostImage, "postImage");
+ } else {
+ // The postImage should be stored in the oplog.
+ const postImage =
+ oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.postImageOpTime.ts});
+ assert.eq(expectedPostImage, postImage.o);
+ }
+ // Assert that retrying the command will produce the same response.
+ retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res, retryRes);
+
+ ////////////////////////////////////////////////////////////////////////
+ // Test findAndModify command (remove, return pre-image)
+ incrementTxnNumber();
+ cmd = {
+ findAndModify: 'user',
+ query: {_id: docId},
+ remove: true,
+ new: false,
+ lsid: {id: lsid},
+ txnNumber: txnNumber,
+ writeConcern: {w: numNodes},
+ };
+
+ expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId});
+ res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ // Get delete entry from top of oplog.
+ const deleteOp = oplog.findOne({ns: 'test.user', op: 'd', txnNumber: txnNumber});
+ //
+ retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"};
+ checkOplogEntry(deleteOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs);
+ checkSessionCatalog(primary, lsid, txnNumber, deleteOp.ts);
+ checkSessionCatalog(secondary, lsid, txnNumber, deleteOp.ts);
+ if (storeImagesInSideCollection) {
+ const sessionInfo = {sessionId: lsid, txnNum: txnNumber};
+ checkImageCollection(primary, sessionInfo, deleteOp.ts, expectedPreImage, "preImage");
+ checkImageCollection(secondary, sessionInfo, deleteOp.ts, expectedPreImage, "preImage");
+ } else {
+ // The preImage should be stored in the oplog.
+ const preImage =
+ oplog.findOne({ns: 'test.user', op: 'n', ts: deleteOp.preImageOpTime.ts});
+ assert.eq(expectedPreImage, preImage.o);
+ }
+ // Assert that retrying the command will produce the same response.
+ retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd));
+ assert.eq(res, retryRes);
+ }
+
+ const lsid = UUID();
+ // Test that retryable findAndModifys will store pre- and post- images in the oplog when
+ // 'storeFindAndModifyImagesInSideCollection'=false.
+ // Initiate the replica set with 'storeFindAndModifyImagesInSideCollection'=true to guarantee
+ // that the 'config.image_collection' exists. This is needed until we commit to a contract
+ // between the lifetime of this side collection, the parameter value, and FCV.
+ const rst = new ReplSetTest({
+ nodes: numNodes,
+ nodeOptions: {setParameter: {storeFindAndModifyImagesInSideCollection: true}}
+ });
+ rst.startSet();
+ rst.initiate();
+ runTests(lsid, rst.getPrimary(), rst.getPrimary(), rst.getSecondary(), true, 40);
+ runTests(lsid, rst.getPrimary(), rst.getPrimary(), rst.getSecondary(), false, 50);
+ rst.stopSet();
+ // Test that retryable findAndModifys will store pre- and post- images in the
+ // 'config.image_collection' table.
+ // TODO: Uncomment this block to get code coverage for sharded clusters. Shard servers will
+ // start with downgraded FCV, meaning the 'image_collection' won't be available on startup.
+ // const st = new ShardingTest({shards: {rs0: {nodes: numNodes, setParameter:
+ // {storeFindAndModifyImagesInSideCollection: true}}}});
+ // runTests(lsid, st.s, st.rs0.getPrimary(), st.rs0.getSecondary(), true, 60);
+ // runTests(lsid, st.s, st.rs0.getPrimary(), st.rs0.getSecondary(), true, 70);
+ // st.stop();
+ // runTests(true);
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index adcc2cfae90..c954b4b9df5 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1816,6 +1816,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/repl/image_collection_entry',
'catalog_raii',
'catalog/collection_info_cache',
'concurrency/lock_manager',
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 45c03ab79b4..58dc8f6f6cc 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -174,7 +174,9 @@ void writeToImagesCollection(OperationContext* opCtx,
Timestamp ts) {
repl::ImageEntry imageEntry;
invariant(opCtx->getLogicalSessionId());
+ invariant(opCtx->getTxnNumber());
imageEntry.set_id(*opCtx->getLogicalSessionId());
+ imageEntry.setTxnNumber(*opCtx->getTxnNumber());
imageEntry.setTs(ts);
imageEntry.setImage(std::move(image));
imageEntry.setImageKind(imageKind);
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 42ec244a59c..ee5b0e1295e 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/find_and_modify_result.h"
#include "mongo/db/query/find_and_modify_request.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/logger/redaction.h"
namespace mongo {
@@ -51,6 +52,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
const repl::OplogEntry& oplogWithCorrectLinks) {
auto opType = oplogEntry.getOpType();
auto ts = oplogEntry.getTimestamp();
+ const auto needsRetryImage = oplogEntry.getNeedsRetryImage();
if (opType == repl::OpTypeEnum::kDelete) {
uassert(
@@ -66,7 +68,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
uassert(40607,
str::stream() << "No pre-image available for findAndModify retry request:"
<< redact(request.toBSON()),
- oplogWithCorrectLinks.getPreImageOpTime());
+ oplogWithCorrectLinks.getPreImageOpTime() || needsRetryImage);
} else if (opType == repl::OpTypeEnum::kInsert) {
uassert(
40608,
@@ -98,7 +100,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
- oplogWithCorrectLinks.getPostImageOpTime());
+ oplogWithCorrectLinks.getPostImageOpTime() || needsRetryImage);
} else {
uassert(40612,
str::stream() << "findAndModify retry request: " << redact(request.toBSON())
@@ -107,7 +109,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ts.toString()
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
- oplogWithCorrectLinks.getPreImageOpTime());
+ oplogWithCorrectLinks.getPreImageOpTime() || needsRetryImage);
}
}
}
@@ -117,11 +119,31 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
* oplog.
*/
BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) {
- invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime());
+ invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime() ||
+ oplog.getNeedsRetryImage());
+ DBDirectClient client(opCtx);
+ if (oplog.getNeedsRetryImage()) {
+ // Extract image from side collection.
+ auto sessionIdBson = oplog.getSessionId()->toBSON();
+ const auto txnNumber = *oplog.getTxnNumber();
+ const auto query = BSON("_id" << sessionIdBson << "txnNum" << *oplog.getTxnNumber());
+ auto imageDoc = client.findOne(NamespaceString::kConfigImagesNamespace.ns(), query);
+ uassert(5637601,
+ str::stream()
+ << "image collection no longer contains the complete write history of this "
+ "transaction, record with sessionId "
+ << sessionIdBson.toString()
+ << " and txnNumber: "
+ << txnNumber
+ << " cannot be found",
+ !imageDoc.isEmpty());
+ return imageDoc.getField(repl::ImageEntry::kImageFieldName).Obj().getOwned();
+ }
+
+ // Extract image from oplog.
auto opTime = oplog.getPreImageOpTime() ? oplog.getPreImageOpTime().value()
: oplog.getPostImageOpTime().value();
- DBDirectClient client(opCtx);
auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(),
opTime.asQuery(),
nullptr,
diff --git a/src/mongo/db/repl/image_collection_entry.idl b/src/mongo/db/repl/image_collection_entry.idl
index 2cafe05ae8d..bb1b304a4da 100644
--- a/src/mongo/db/repl/image_collection_entry.idl
+++ b/src/mongo/db/repl/image_collection_entry.idl
@@ -17,6 +17,9 @@ structs:
_id:
cpp_name: _id
type: LogicalSessionId
+ txnNum:
+ cpp_name: txnNumber
+ type: TxnNumber
ts:
cpp_name: ts
type: timestamp
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 46e9e45def2..f20e365021c 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -324,6 +324,7 @@ void writeToImageCollection(OperationContext* opCtx,
LogicalSessionId::parse(IDLParserErrorContext("ParseSessionIdWhenWritingToImageCollection"),
op.getField(OplogEntryBase::kSessionIdFieldName).Obj());
imageEntry.set_id(sessionId);
+ imageEntry.setTxnNumber(op.getField(OplogEntryBase::kTxnNumberFieldName).numberLong());
imageEntry.setTs(op["ts"].timestamp());
imageEntry.setImageKind(imageKind);
imageEntry.setImage(image);