diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-02-03 06:18:15 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-03 07:19:42 +0000 |
commit | 71287a3a8f033923ca9c2735c72da4460fbbf06d (patch) | |
tree | 16b9482e36180928ca9a3a4dd7b8d8384ecc55ac | |
parent | 1d8aaed7f9286ce90317d03fc1815bb58b43e31d (diff) | |
download | mongo-71287a3a8f033923ca9c2735c72da4460fbbf06d.tar.gz |
SERVER-60524 Make retryable internal transactions retryable across data placement changes
8 files changed, 400 insertions, 0 deletions
diff --git a/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js index a54b2cf0fb0..dee7fe15ccc 100644 --- a/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js +++ b/jstests/noPassthrough/store_retryable_find_and_modify_images_in_side_collection.js @@ -68,6 +68,8 @@ function assertRetryCommand(cmdResponse, retryResponse) { // The retry response can contain a different 'clusterTime' from the initial response. delete cmdResponse.$clusterTime; delete retryResponse.$clusterTime; + // The retry response contains the "retriedStmtId" field but the initial response does not. + delete retryResponse.retriedStmtId; assert.eq(cmdResponse, retryResponse); } diff --git a/jstests/replsets/tenant_migration_recipient_fetches_synthetic_find_and_modify_oplog_entries.js b/jstests/replsets/tenant_migration_recipient_fetches_synthetic_find_and_modify_oplog_entries.js index bfbf1c960e5..05708a93e7e 100644 --- a/jstests/replsets/tenant_migration_recipient_fetches_synthetic_find_and_modify_oplog_entries.js +++ b/jstests/replsets/tenant_migration_recipient_fetches_synthetic_find_and_modify_oplog_entries.js @@ -107,6 +107,8 @@ delete cmdResponse.$clusterTime; delete retryResponse.$clusterTime; delete cmdResponse.operationTime; delete retryResponse.operationTime; +// The retry response contains the "retriedStmtId" field but the initial response does not. +delete retryResponse.retriedStmtId; assert.eq(0, bsonWoCompare(cmdResponse, retryResponse), retryResponse); assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); diff --git a/jstests/sharding/internal_transactions_for_retryable_writes_retry_in_different_batches.js b/jstests/sharding/internal_transactions_for_retryable_writes_retry_in_different_batches.js new file mode 100644 index 00000000000..b7c78f2bc1b --- /dev/null +++ b/jstests/sharding/internal_transactions_for_retryable_writes_retry_in_different_batches.js @@ -0,0 +1,373 @@ +/* + * Test that retryable writes executed using or without using internal transactions execute exactly + * once regardless of how they are batched on retries, and that the responses from mongods include a + * "retriedStmtIds" field containing the statement ids for retried statements. + * + * @tags: [requires_fcv_52, featureFlagInternalTransactions] + */ +(function() { +"use strict"; + +load("jstests/sharding/libs/sharded_transactions_helpers.js"); + +const st = new ShardingTest({shards: 1}); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const shard0TestDB = st.rs0.getPrimary().getDB(kDbName); +const shard0TestColl = shard0TestDB.getCollection(kCollName); + +const stmtId1 = NumberInt(1); +const stmtId2 = NumberInt(2); + +/* + * Returns a new command object created from 'cmdObj' and the session/transaction fields defined in + * 'sessionOpts'. + */ +function makeCmdObjWithTxnFields(cmdObj, sessionOpts) { + const cmdObjWithTxnFields = Object.assign({}, cmdObj); + cmdObjWithTxnFields.lsid = sessionOpts.lsid; + cmdObjWithTxnFields.txnNumber = sessionOpts.txnNumber; + if (sessionOpts.isTransaction) { + cmdObjWithTxnFields.autocommit = false; + } + return cmdObjWithTxnFields; +} + +/* + * Runs all the commands in 'cmdObjs' in a retryable write or transaction as defined in + * 'sessionOpts', and returns the responses to those commands in the given order. + */ +function runCommandsWithSessionOpts(cmdObjs, sessionOpts) { + let cmdResponses = []; + + cmdObjs.forEach((cmdObj, index) => { + const cmdObjWithTxnFields = makeCmdObjWithTxnFields(cmdObj, sessionOpts); + if (sessionOpts.isTransaction && index == 0) { + cmdObjWithTxnFields.startTransaction = true; + } + cmdResponses.push(assert.commandWorked(shard0TestDB.runCommand(cmdObjWithTxnFields))); + }); + if (sessionOpts.isTransaction) { + assert.commandWorked(shard0TestDB.adminCommand( + makeCommitTransactionCmdObj(sessionOpts.lsid, sessionOpts.txnNumber))); + } + + return cmdResponses; +} + +// Test that write statements executed in the same command do not re-executed when they are +// present in separate commands, and the right stmtId is returned in the "retriedStmtIds" field in +// each retry response. + +function runInsertTestRetryStatementsSeparately(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing two insert statements in a single command in ${ + tojson(initialSessionOpts)} then retrying them in two separate commands in ${ + tojson(retrySessionOpts)}`); + + const [initialRes] = runCommandsWithSessionOpts( + [{insert: kCollName, documents: [{_id: 1}, {_id: 2}], stmtIds: [stmtId1, stmtId2]}], + initialSessionOpts); + const [retryRes1, retryRes2] = runCommandsWithSessionOpts( + [ + {insert: kCollName, documents: [{_id: 1}], stmtIds: [stmtId1]}, + {insert: kCollName, documents: [{_id: 2}], stmtIds: [stmtId2]} + ], + retrySessionOpts); + + assert.eq(initialRes.n, 2); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes1.n, 1); + assert.eq(retryRes1.retriedStmtIds, [stmtId1]); + assert.eq(retryRes2.n, 1); + assert.eq(retryRes2.retriedStmtIds, [stmtId2]); + assert.eq(shard0TestColl.find({_id: 1}).itcount(), 1); + assert.eq(shard0TestColl.find({_id: 2}).itcount(), 1); + + assert.commandWorked(shard0TestColl.remove({})); +} + +function runUpdateTestRetryStatementsSeparately(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing two update statements in a single command in ${ + tojson(initialSessionOpts)} then retrying them in two separate commands in ${ + tojson(retrySessionOpts)}`); + + assert.commandWorked(shard0TestColl.insert([{_id: 1, x: 0}, {_id: 2, x: 0}])); + + const [initialRes] = runCommandsWithSessionOpts( + [{ + update: kCollName, + updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}, {q: {_id: 2}, u: {$inc: {x: 1}}}], + stmtIds: [stmtId1, stmtId2] + }], + initialSessionOpts); + const [retryRes1, retryRes2] = runCommandsWithSessionOpts( + [ + {update: kCollName, updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}], stmtIds: [stmtId1]}, + {update: kCollName, updates: [{q: {_id: 2}, u: {$inc: {x: 1}}}], stmtIds: [stmtId2]} + ], + retrySessionOpts); + + assert.eq(initialRes.nModified, 2); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes1.nModified, 1); + assert.eq(retryRes1.retriedStmtIds, [stmtId1]); + assert.eq(retryRes2.nModified, 1); + assert.eq(retryRes2.retriedStmtIds, [stmtId2]); + assert.eq(shard0TestColl.find({_id: 1, x: 1}).itcount(), 1); + assert.eq(shard0TestColl.find({_id: 2, x: 1}).itcount(), 1); + + assert.commandWorked(shard0TestColl.remove({})); +} + +function runDeleteTestRetryStatementsSeparately(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing two delete statements in a single command in ${ + tojson(initialSessionOpts)} then retrying them in two separate commands in ${ + tojson(retrySessionOpts)}`); + + assert.commandWorked(shard0TestColl.insert([{_id: 1}, {_id: 2}])); + + const [initialRes] = + runCommandsWithSessionOpts([{ + delete: kCollName, + deletes: [{q: {_id: 1}, limit: 1}, {q: {_id: 2}, limit: 1}], + stmtIds: [stmtId1, stmtId2] + }], + initialSessionOpts); + const [retryRes1, retryRes2] = runCommandsWithSessionOpts( + [ + {delete: kCollName, deletes: [{q: {_id: 1}, limit: 1}], stmtIds: [stmtId1]}, + {delete: kCollName, deletes: [{q: {_id: 2}, limit: 1}], stmtIds: [stmtId2]} + ], + retrySessionOpts); + + assert.eq(initialRes.n, 2); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes1.n, 1); + assert.eq(retryRes1.retriedStmtIds, [stmtId1]); + assert.eq(retryRes2.n, 1); + assert.eq(retryRes2.retriedStmtIds, [stmtId2]); + assert.eq(shard0TestColl.find({_id: 1}).itcount(), 0); + assert.eq(shard0TestColl.find({_id: 2}).itcount(), 0); + + assert.commandWorked(shard0TestColl.remove({})); +} + +// Test that an executed write statement does not re-execute when it is present in a command +// containing un-executed write statements, and that its stmtId is returned in the "retriedStmtId" +// field in the response. + +function runInsertTestRetryWithAdditionalStatement(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing an insert statement in a command in ${ + tojson(initialSessionOpts)} then retrying it with un-executed insert statement in a command in ${ + tojson(retrySessionOpts)}`); + + const [initialRes] = runCommandsWithSessionOpts( + [{insert: kCollName, documents: [{_id: 1}], stmtIds: [stmtId1]}], initialSessionOpts); + const [retryRes] = runCommandsWithSessionOpts( + [{insert: kCollName, documents: [{_id: 1}, {_id: 2}], stmtIds: [stmtId1, stmtId2]}], + retrySessionOpts); + + assert.eq(initialRes.n, 1); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes.n, 2); + assert.eq(retryRes.retriedStmtIds, [stmtId1]); + assert.eq(shard0TestColl.find({_id: 1}).itcount(), 1); + assert.eq(shard0TestColl.find({_id: 2}).itcount(), 1); + + assert.commandWorked(shard0TestColl.remove({})); +} + +function runUpdateTestRetryWithAdditionalStatement(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing an update statement in a command in ${ + tojson(initialSessionOpts)} then retrying it with an un-executed update statement in a command in ${ + tojson(retrySessionOpts)}`); + + assert.commandWorked(shard0TestColl.insert([{_id: 1, x: 0}, {_id: 2, x: 0}])); + + const [initialRes] = runCommandsWithSessionOpts( + [{update: kCollName, updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}], stmtIds: [stmtId1]}], + initialSessionOpts); + const [retryRes] = runCommandsWithSessionOpts( + [{ + update: kCollName, + updates: [{q: {_id: 1}, u: {$inc: {x: 1}}}, {q: {_id: 2}, u: {$inc: {x: 1}}}], + stmtIds: [stmtId1, stmtId2] + }], + retrySessionOpts); + + assert.eq(initialRes.nModified, 1); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes.nModified, 2); + assert.eq(retryRes.retriedStmtIds, [stmtId1]); + assert.eq(shard0TestColl.find({_id: 1, x: 1}).itcount(), 1); + assert.eq(shard0TestColl.find({_id: 2, x: 1}).itcount(), 1); + + assert.commandWorked(shard0TestColl.remove({})); +} + +function runDeleteTestRetryWithAdditionalStatement(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing a delete statement in a command in ${ + tojson(initialSessionOpts)} then retrying it with an un-executed delete statement in a command in ${ + tojson(retrySessionOpts)}`); + + assert.commandWorked(shard0TestColl.insert([{_id: 1}, {_id: 2}])); + + const [initialRes] = runCommandsWithSessionOpts( + [{delete: kCollName, deletes: [{q: {_id: 1}, limit: 1}], stmtIds: [stmtId1]}], + initialSessionOpts); + const [retryRes] = + runCommandsWithSessionOpts([{ + delete: kCollName, + deletes: [{q: {_id: 1}, limit: 1}, {q: {_id: 2}, limit: 1}], + stmtIds: [stmtId1, stmtId2] + }], + retrySessionOpts); + + assert.eq(initialRes.n, 1); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes.n, 2); + assert.eq(retryRes.retriedStmtIds, [stmtId1]); + assert.eq(shard0TestColl.find({_id: 1}).itcount(), 0); + assert.eq(shard0TestColl.find({_id: 2}).itcount(), 0); + + assert.commandWorked(shard0TestColl.remove({})); +} + +// Test that the response to a retried findAndModify command contains the "retriedStmtId" field. + +function runFindAndModifyTest(makeSessionOptionsFunc) { + const {initialSessionOpts, retrySessionOpts} = makeSessionOptionsFunc(); + + jsTest.log(`Test executing a findAndModify statement in a command in ${ + tojson(initialSessionOpts)} then retrying it in a command in ${tojson(retrySessionOpts)}`); + + const cmdObj = { + findAndModify: kCollName, + query: {_id: 1, x: 0}, + update: {$inc: {x: 1}}, + upsert: true, + stmtId: stmtId1 + }; + const [initialRes] = runCommandsWithSessionOpts([cmdObj], initialSessionOpts); + const [retryRes] = runCommandsWithSessionOpts([cmdObj], retrySessionOpts); + + assert.eq(initialRes.lastErrorObject, retryRes.lastErrorObject); + assert.eq(initialRes.value, retryRes.value); + assert(!initialRes.hasOwnProperty("retriedStmtIds")); + assert.eq(retryRes.retriedStmtId, stmtId1); + assert.eq(shard0TestColl.find({_id: 1, x: 1}).itcount(), 1); + + assert.commandWorked(shard0TestColl.remove({})); +} + +function runTests(makeSessionOptionsFunc) { + runInsertTestRetryStatementsSeparately(makeSessionOptionsFunc); + runUpdateTestRetryStatementsSeparately(makeSessionOptionsFunc); + runDeleteTestRetryStatementsSeparately(makeSessionOptionsFunc); + + runInsertTestRetryWithAdditionalStatement(makeSessionOptionsFunc); + runUpdateTestRetryWithAdditionalStatement(makeSessionOptionsFunc); + runDeleteTestRetryWithAdditionalStatement(makeSessionOptionsFunc); + + runFindAndModifyTest(makeSessionOptionsFunc); +} + +{ + let makeSessionOptions = () => { + const sessionUUID = UUID(); + // Retryable writes in the parent session. + const initialSessionOpts = { + lsid: {id: sessionUUID}, + txnNumber: NumberLong(5), + isTransaction: false + }; + // Retryable writes in the parent session. + const retrySessionOpts = { + lsid: {id: sessionUUID}, + txnNumber: NumberLong(5), + isTransaction: false + }; + return {initialSessionOpts, retrySessionOpts}; + }; + + runTests(makeSessionOptions); +} + +{ + let makeSessionOptions = () => { + const sessionUUID = UUID(); + // Retryable writes in the parent session. + const initialSessionOpts = { + lsid: {id: sessionUUID}, + txnNumber: NumberLong(5), + isTransaction: false + }; + // Internal transaction for retryable writes in a child session. + const retrySessionOpts = { + lsid: {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()}, + txnNumber: NumberLong(0), + isTransaction: true + }; + return {initialSessionOpts, retrySessionOpts}; + }; + + runTests(makeSessionOptions); +} + +{ + let makeSessionOptions = () => { + const sessionUUID = UUID(); + // Internal transaction for retryable writes in a child session. + const initialSessionOpts = { + lsid: {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()}, + txnNumber: NumberLong(0), + isTransaction: true + }; + // Retryable writes in the parent session. + const retrySessionOpts = { + lsid: {id: sessionUUID}, + txnNumber: NumberLong(5), + isTransaction: false + }; + return {initialSessionOpts, retrySessionOpts}; + }; + + runTests(makeSessionOptions); +} + +{ + let makeSessionOptions = () => { + const sessionUUID = UUID(); + // Internal transaction for retryable writes in a child session. + const initialSessionOpts = { + lsid: {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()}, + txnNumber: NumberLong(0), + isTransaction: true + }; + // Retryable writes in the parent session. + const retrySessionOpts = { + lsid: {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()}, + txnNumber: NumberLong(0), + isTransaction: true + }; + return {initialSessionOpts, retrySessionOpts}; + }; + + runTests(makeSessionOptions); +} + +st.stop(); +})(); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 6518620c51c..dab13fcc40c 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -645,6 +645,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount(); RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); auto findAndModifyReply = parseOplogEntryForFindAndModify(opCtx, req, *entry); + findAndModifyReply.setRetriedStmtId(stmtId); // Make sure to wait for writeConcern on the opTime that will include this // write. Needs to set to the system last opTime to get the latest term in an diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index e90d4ac25ce..865a69dbc7b 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -458,6 +458,9 @@ void populateReply(OperationContext* opCtx, auto& replyBase = cmdReply->getWriteCommandReplyBase(); replyBase.setN(nVal); + if (!result.retriedStmtIds.empty()) { + replyBase.setRetriedStmtIds(result.retriedStmtIds); + } if (!errors.empty()) { replyBase.setWriteErrors(errors); diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index 91c82db928b..cf58747c7c5 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -91,6 +91,12 @@ structs: type: array<object_owned> optional: true unstable: false + retriedStmtIds: + description: "The statement numbers for the write statements that had already been + executed, thus were not executed by this command." + type: array<int> + optional: true + unstable: true InsertCommandReply: description: "Contains information related to insert command reply." @@ -298,6 +304,12 @@ structs: description: "The document after the write, if the 'new' field of the request is true. Otherwise, the document before the write." unstable: false + retriedStmtId: + description: "The statement number for this findAndModify statement if it had + already been executed, thus was not executed by this command." + type: int + optional: true + unstable: true commands: diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 32213015ea7..748c70da401 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -724,6 +724,7 @@ WriteResult performInserts(OperationContext* opCtx, } else if (wasAlreadyExecuted) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + out.retriedStmtIds.push_back(stmtId); out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); } } @@ -1022,6 +1023,7 @@ WriteResult performUpdates(OperationContext* opCtx, containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); out.results.emplace_back(parseOplogEntryForUpdate(*entry)); + out.retriedStmtIds.push_back(stmtId); continue; } } @@ -1247,6 +1249,7 @@ WriteResult performDeletes(OperationContext* opCtx, containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); + out.retriedStmtIds.push_back(stmtId); continue; } diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h index 377a440524f..548a3034713 100644 --- a/src/mongo/db/ops/write_ops_exec.h +++ b/src/mongo/db/ops/write_ops_exec.h @@ -56,6 +56,10 @@ struct WriteResult { */ std::vector<StatusWith<SingleWriteResult>> results; + // Stores the statement ids for the ops that had already been executed, thus were not executed + // by this write. + std::vector<StmtId> retriedStmtIds; + // In case of an error, whether the operation can continue. bool canContinue = true; }; |