diff options
-rw-r--r-- | jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js | 310 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_retryability.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/image_collection_entry.idl | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 1 |
6 files changed, 344 insertions, 5 deletions
diff --git a/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js new file mode 100644 index 00000000000..26c49a816c4 --- /dev/null +++ b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js @@ -0,0 +1,310 @@ +/** + * Test that retryable findAndModify commands will store pre- and post- images in the appropriate + * collections according to the parameter value of `storeFindAndModifyImagesInSideCollection`. + */ +(function() { + "use strict"; + + load("jstests/libs/retryable_writes_util.js"); + + if (!RetryableWritesUtil.storageEngineSupportsRetryableWrites(jsTest.options().storageEngine)) { + jsTestLog("Retryable writes are not supported, skipping test"); + return; + } + + const numNodes = 2; + + function checkOplogEntry(entry, lsid, txnNum, stmtId, prevTs, retryImageArgs) { + assert.neq(entry, null); + assert.neq(entry.lsid, null); + assert.eq(lsid, entry.lsid.id, entry); + assert.eq(txnNum, entry.txnNumber, entry); + assert.eq(stmtId, entry.stmtId, entry); + + const oplogPrevTs = entry.prevOpTime.ts; + assert.eq(prevTs.getTime(), oplogPrevTs.getTime(), entry); + + if (retryImageArgs.needsRetryImage) { + assert.eq(retryImageArgs.imageKind, entry.needsRetryImage, entry); + assert(!entry.hasOwnProperty("preImageOpTime")); + assert(!entry.hasOwnProperty("postImageOpTime")); + } else { + assert(!entry.hasOwnProperty("needsRetryImage")); + } + } + + function checkSessionCatalog(conn, sessionId, txnNum, expectedTs) { + const coll = conn.getDB('config').transactions; + const sessionDoc = coll.findOne({'_id.id': sessionId}); + + assert.eq(txnNum, sessionDoc.txnNum); + const writeTs = sessionDoc.lastWriteOpTime.ts; + assert.eq(expectedTs.getTime(), writeTs.getTime()); + } + + function checkImageCollection(conn, sessionInfo, expectedTs, expectedImage, expectedImageKind) { + const coll = conn.getDB('config').image_collection; + const imageDoc = + coll.findOne({'_id.id': sessionInfo.sessionId}); + + assert.eq(sessionInfo.txnNum, imageDoc.txnNum, imageDoc); + assert.eq(expectedImage, imageDoc.image, imageDoc); + assert.eq(expectedImageKind, imageDoc.imageKind, imageDoc); + assert.eq(expectedTs.getTime(), imageDoc.ts.getTime(), imageDoc); + } + + function runTests(lsid, mainConn, primary, secondary, storeImagesInSideCollection, docId) { + const setParam = { + setParameter: 1, + storeFindAndModifyImagesInSideCollection: storeImagesInSideCollection + }; + primary.adminCommand(setParam); + + let txnNumber = NumberLong(docId); + let incrementTxnNumber = function() { + txnNumber = NumberLong(txnNumber + 1); + }; + + const oplog = primary.getDB('local').oplog.rs; + + // //////////////////////////////////////////////////////////////////////// + // // Test findAndModify command (upsert) + + let cmd = { + findAndModify: 'user', + query: {_id: docId}, + update: {$set: {x: 1}}, + new: true, + upsert: true, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + + //////////////////////////////////////////////////////////////////////// + // Test findAndModify command (in-place update, return pre-image) + + incrementTxnNumber(); + cmd = { + findAndModify: 'user', + query: {_id: docId}, + update: {$inc: {x: 1}}, + new: false, + upsert: false, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + let expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId}); + let res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res.value, expectedPreImage); + // Get update entry. + let updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber}); + // Check that the findAndModify oplog entry and sessions record has the appropriate fields + // and values. + const expectedWriteTs = Timestamp(0, 0); + const expectedStmtId = 0; + let retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"}; + checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs); + checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts); + checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts); + if (storeImagesInSideCollection) { + const sessionInfo = {sessionId: lsid, txnNum: txnNumber}; + checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPreImage, "preImage"); + checkImageCollection(secondary, sessionInfo, updateOp.ts, expectedPreImage, "preImage"); + } else { + // The preImage should be stored in the oplog. + const preImage = + oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.preImageOpTime.ts}); + assert.eq(expectedPreImage, preImage.o); + } + // Assert that retrying the command will produce the same response. + let retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res, retryRes); + + //////////////////////////////////////////////////////////////////////// + // Test findAndModify command (in-place update, return post-image) + + incrementTxnNumber(); + cmd = { + findAndModify: 'user', + query: {_id: docId}, + update: {$inc: {x: 1}}, + new: true, + upsert: false, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + let expectedPostImage = mainConn.getDB('test').user.findOne({_id: docId}); + // Get update entry. + updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber}); + // Check that the findAndModify oplog entry and sessions record has the appropriate fields + // and values. + retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "postImage"}; + checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs); + checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts); + checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts); + if (storeImagesInSideCollection) { + const sessionInfo = {sessionId: lsid, txnNum: txnNumber}; + checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPostImage, "postImage"); + checkImageCollection( + secondary, sessionInfo, updateOp.ts, expectedPostImage, "postImage"); + } else { + // The postImage should be stored in the oplog. + const postImage = + oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.postImageOpTime.ts}); + assert.eq(expectedPostImage, postImage.o); + } + // Assert that retrying the command will produce the same response. + retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res, retryRes); + + //////////////////////////////////////////////////////////////////////// + // Test findAndModify command (replacement update, return pre-image) + incrementTxnNumber(); + cmd = { + findAndModify: 'user', + query: {_id: docId}, + update: {y: 1}, + new: false, + upsert: false, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId}); + res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + // Get update entry. + updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber}); + retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"}; + // Check that the findAndModify oplog entry and sessions record has the appropriate fields + // and values. + checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs); + checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts); + checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts); + if (storeImagesInSideCollection) { + const sessionInfo = {sessionId: lsid, txnNum: txnNumber}; + checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPreImage, "preImage"); + checkImageCollection(secondary, sessionInfo, updateOp.ts, expectedPreImage, "preImage"); + } else { + // The preImage should be stored in the oplog. + const preImage = + oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.preImageOpTime.ts}); + assert.eq(expectedPreImage, preImage.o); + } + + // Assert that retrying the command will produce the same response. + retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res, retryRes); + + //////////////////////////////////////////////////////////////////////// + // Test findAndModify command (replacement update, return post-image) + + incrementTxnNumber(); + cmd = { + findAndModify: 'user', + query: {_id: docId}, + update: {z: 1}, + new: true, + upsert: false, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + expectedPostImage = mainConn.getDB('test').user.findOne({_id: docId}); + + // Get update entry. + updateOp = oplog.findOne({ns: 'test.user', op: 'u', txnNumber: txnNumber}); + retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "postImage"}; + // Check that the findAndModify oplog entry and sessions record has the appropriate fields + // and values. + checkOplogEntry(updateOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs); + checkSessionCatalog(primary, lsid, txnNumber, updateOp.ts); + checkSessionCatalog(secondary, lsid, txnNumber, updateOp.ts); + if (storeImagesInSideCollection) { + const sessionInfo = {sessionId: lsid, txnNum: txnNumber}; + checkImageCollection(primary, sessionInfo, updateOp.ts, expectedPostImage, "postImage"); + checkImageCollection( + secondary, sessionInfo, updateOp.ts, expectedPostImage, "postImage"); + } else { + // The postImage should be stored in the oplog. + const postImage = + oplog.findOne({ns: 'test.user', op: 'n', ts: updateOp.postImageOpTime.ts}); + assert.eq(expectedPostImage, postImage.o); + } + // Assert that retrying the command will produce the same response. + retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res, retryRes); + + //////////////////////////////////////////////////////////////////////// + // Test findAndModify command (remove, return pre-image) + incrementTxnNumber(); + cmd = { + findAndModify: 'user', + query: {_id: docId}, + remove: true, + new: false, + lsid: {id: lsid}, + txnNumber: txnNumber, + writeConcern: {w: numNodes}, + }; + + expectedPreImage = mainConn.getDB('test').user.findOne({_id: docId}); + res = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + // Get delete entry from top of oplog. + const deleteOp = oplog.findOne({ns: 'test.user', op: 'd', txnNumber: txnNumber}); + // + retryArgs = {needsRetryImage: storeImagesInSideCollection, imageKind: "preImage"}; + checkOplogEntry(deleteOp, lsid, txnNumber, expectedStmtId, expectedWriteTs, retryArgs); + checkSessionCatalog(primary, lsid, txnNumber, deleteOp.ts); + checkSessionCatalog(secondary, lsid, txnNumber, deleteOp.ts); + if (storeImagesInSideCollection) { + const sessionInfo = {sessionId: lsid, txnNum: txnNumber}; + checkImageCollection(primary, sessionInfo, deleteOp.ts, expectedPreImage, "preImage"); + checkImageCollection(secondary, sessionInfo, deleteOp.ts, expectedPreImage, "preImage"); + } else { + // The preImage should be stored in the oplog. + const preImage = + oplog.findOne({ns: 'test.user', op: 'n', ts: deleteOp.preImageOpTime.ts}); + assert.eq(expectedPreImage, preImage.o); + } + // Assert that retrying the command will produce the same response. + retryRes = assert.commandWorked(mainConn.getDB('test').runCommand(cmd)); + assert.eq(res, retryRes); + } + + const lsid = UUID(); + // Test that retryable findAndModifys will store pre- and post- images in the oplog when + // 'storeFindAndModifyImagesInSideCollection'=false. + // Initiate the replica set with 'storeFindAndModifyImagesInSideCollection'=true to guarantee + // that the 'config.image_collection' exists. This is needed until we commit to a contract + // between the lifetime of this side collection, the parameter value, and FCV. + const rst = new ReplSetTest({ + nodes: numNodes, + nodeOptions: {setParameter: {storeFindAndModifyImagesInSideCollection: true}} + }); + rst.startSet(); + rst.initiate(); + runTests(lsid, rst.getPrimary(), rst.getPrimary(), rst.getSecondary(), true, 40); + runTests(lsid, rst.getPrimary(), rst.getPrimary(), rst.getSecondary(), false, 50); + rst.stopSet(); + // Test that retryable findAndModifys will store pre- and post- images in the + // 'config.image_collection' table. + // TODO: Uncomment this block to get code coverage for sharded clusters. Shard servers will + // start with downgraded FCV, meaning the 'image_collection' won't be available on startup. + // const st = new ShardingTest({shards: {rs0: {nodes: numNodes, setParameter: + // {storeFindAndModifyImagesInSideCollection: true}}}}); + // runTests(lsid, st.s, st.rs0.getPrimary(), st.rs0.getSecondary(), true, 60); + // runTests(lsid, st.s, st.rs0.getPrimary(), st.rs0.getSecondary(), true, 70); + // st.stop(); + // runTests(true); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index adcc2cfae90..c954b4b9df5 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1816,6 +1816,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/repl/image_collection_entry', 'catalog_raii', 'catalog/collection_info_cache', 'concurrency/lock_manager', diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 45c03ab79b4..58dc8f6f6cc 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -174,7 +174,9 @@ void writeToImagesCollection(OperationContext* opCtx, Timestamp ts) { repl::ImageEntry imageEntry; invariant(opCtx->getLogicalSessionId()); + invariant(opCtx->getTxnNumber()); imageEntry.set_id(*opCtx->getLogicalSessionId()); + imageEntry.setTxnNumber(*opCtx->getTxnNumber()); imageEntry.setTs(ts); imageEntry.setImage(std::move(image)); imageEntry.setImageKind(imageKind); diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp index 42ec244a59c..ee5b0e1295e 100644 --- a/src/mongo/db/ops/write_ops_retryability.cpp +++ b/src/mongo/db/ops/write_ops_retryability.cpp @@ -36,6 +36,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/ops/find_and_modify_result.h" #include "mongo/db/query/find_and_modify_request.h" +#include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/logger/redaction.h" namespace mongo { @@ -51,6 +52,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, const repl::OplogEntry& oplogWithCorrectLinks) { auto opType = oplogEntry.getOpType(); auto ts = oplogEntry.getTimestamp(); + const auto needsRetryImage = oplogEntry.getNeedsRetryImage(); if (opType == repl::OpTypeEnum::kDelete) { uassert( @@ -66,7 +68,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, uassert(40607, str::stream() << "No pre-image available for findAndModify retry request:" << redact(request.toBSON()), - oplogWithCorrectLinks.getPreImageOpTime()); + oplogWithCorrectLinks.getPreImageOpTime() || needsRetryImage); } else if (opType == repl::OpTypeEnum::kInsert) { uassert( 40608, @@ -98,7 +100,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), - oplogWithCorrectLinks.getPostImageOpTime()); + oplogWithCorrectLinks.getPostImageOpTime() || needsRetryImage); } else { uassert(40612, str::stream() << "findAndModify retry request: " << redact(request.toBSON()) @@ -107,7 +109,7 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, << ts.toString() << ", oplog: " << redact(oplogEntry.toBSON()), - oplogWithCorrectLinks.getPreImageOpTime()); + oplogWithCorrectLinks.getPreImageOpTime() || needsRetryImage); } } } @@ -117,11 +119,31 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request, * oplog. */ BSONObj extractPreOrPostImage(OperationContext* opCtx, const repl::OplogEntry& oplog) { - invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime()); + invariant(oplog.getPreImageOpTime() || oplog.getPostImageOpTime() || + oplog.getNeedsRetryImage()); + DBDirectClient client(opCtx); + if (oplog.getNeedsRetryImage()) { + // Extract image from side collection. + auto sessionIdBson = oplog.getSessionId()->toBSON(); + const auto txnNumber = *oplog.getTxnNumber(); + const auto query = BSON("_id" << sessionIdBson << "txnNum" << *oplog.getTxnNumber()); + auto imageDoc = client.findOne(NamespaceString::kConfigImagesNamespace.ns(), query); + uassert(5637601, + str::stream() + << "image collection no longer contains the complete write history of this " + "transaction, record with sessionId " + << sessionIdBson.toString() + << " and txnNumber: " + << txnNumber + << " cannot be found", + !imageDoc.isEmpty()); + return imageDoc.getField(repl::ImageEntry::kImageFieldName).Obj().getOwned(); + } + + // Extract image from oplog. auto opTime = oplog.getPreImageOpTime() ? oplog.getPreImageOpTime().value() : oplog.getPostImageOpTime().value(); - DBDirectClient client(opCtx); auto oplogDoc = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery(), nullptr, diff --git a/src/mongo/db/repl/image_collection_entry.idl b/src/mongo/db/repl/image_collection_entry.idl index 2cafe05ae8d..bb1b304a4da 100644 --- a/src/mongo/db/repl/image_collection_entry.idl +++ b/src/mongo/db/repl/image_collection_entry.idl @@ -17,6 +17,9 @@ structs: _id: cpp_name: _id type: LogicalSessionId + txnNum: + cpp_name: txnNumber + type: TxnNumber ts: cpp_name: ts type: timestamp diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 46e9e45def2..f20e365021c 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -324,6 +324,7 @@ void writeToImageCollection(OperationContext* opCtx, LogicalSessionId::parse(IDLParserErrorContext("ParseSessionIdWhenWritingToImageCollection"), op.getField(OplogEntryBase::kSessionIdFieldName).Obj()); imageEntry.set_id(sessionId); + imageEntry.setTxnNumber(op.getField(OplogEntryBase::kTxnNumberFieldName).numberLong()); imageEntry.setTs(op["ts"].timestamp()); imageEntry.setImageKind(imageKind); imageEntry.setImage(image); |