diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-10-19 14:52:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-29 00:50:10 +0000 |
commit | ea197fd5193fe034174584e60290dd90fe01d2b1 (patch) | |
tree | ac2e923c5dc87fd4115293b60f6ed926433b078a | |
parent | 5e8446f92d4826cfe2e8b9082efd0fbd540a9718 (diff) | |
download | mongo-ea197fd5193fe034174584e60290dd90fe01d2b1.tar.gz |
SERVER-58756 Store stmtIds for the operations in retryable internal transactions in applyOps oplog entries
20 files changed, 481 insertions, 39 deletions
diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py index 51569af4d4f..00ea3113455 100644 --- a/buildscripts/idl/idl/generator.py +++ b/buildscripts/idl/idl/generator.py @@ -1258,8 +1258,13 @@ class _CppSourceFileWriter(_CppFileWriterBase): self._writer.write_line('++expectedFieldNumber;') if field.chained_struct_field: - self._writer.write_line('%s.%s(std::move(values));' % (_get_field_member_name( - field.chained_struct_field), _get_field_member_setter_name(field))) + if field.type.is_variant: + self._writer.write_line('%s.%s(%s(std::move(values)));' % + (_get_field_member_name(field.chained_struct_field), + _get_field_member_setter_name(field), field.type.cpp_type)) + else: + self._writer.write_line('%s.%s(std::move(values));' % (_get_field_member_name( + field.chained_struct_field), _get_field_member_setter_name(field))) else: self._writer.write_line('%s = std::move(values);' % (_get_field_member_name(field))) diff --git a/jstests/sharding/internal_sessions_end_sessions.js b/jstests/sharding/internal_sessions_end_sessions.js index 76297576b59..7d5d24b648b 100644 --- a/jstests/sharding/internal_sessions_end_sessions.js +++ b/jstests/sharding/internal_sessions_end_sessions.js @@ -99,7 +99,7 @@ numTransactionsCollEntries++; numImageCollEntries++; assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); -// TODO (SERVER-58756): a retryable findAndModify command run inside a retryable internal +// TODO (SERVER-60540): a retryable findAndModify command run inside a retryable internal // transaction should have a config.image_collection entry like a regular retryable write. // assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount()); diff --git a/jstests/sharding/internal_sessions_reaping.js b/jstests/sharding/internal_sessions_reaping.js index ebf0916d174..e0ac3d7571a 100644 --- a/jstests/sharding/internal_sessions_reaping.js +++ b/jstests/sharding/internal_sessions_reaping.js @@ -145,7 +145,7 @@ assert.commandWorked(testDB.runCommand( jsTest.log("Verify that the config.transactions entry for the retryable internal transaction for " + "the findAndModify did not get reaped although there is already a new retryable write"); assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount()); -// TODO (SERVER-58756): a retryable findAndModify command run inside a retryable internal +// TODO (SERVER-60540): a retryable findAndModify command run inside a retryable internal // transaction should have a config.image_collection entry like a regular retryable write. // assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount()); diff --git a/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js b/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js new file mode 100644 index 00000000000..6a87e3505c6 --- /dev/null +++ b/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js @@ -0,0 +1,227 @@ +/* + * Tests that the stmtIds for write statements in an internal transaction for retryable writes + * are stored in the individual operation entries in the applyOps oplog entry for the transaction. + * + * @tags: [requires_fcv_51, featureFlagInternalTransactions] + */ +(function() { +'use strict'; + +load('jstests/sharding/libs/sharded_transactions_helpers.js'); + +const kDbName = "testDb"; +const kCollName = "testColl"; + +const st = new ShardingTest({shards: 1}); +const mongosTestDB = st.s.getDB(kDbName); +const mongosTestColl = mongosTestDB.getCollection(kCollName); + +const kStmtIdsOption = { + isComplete: 1, + isIncomplete: 2, + isRepeated: 3 +}; + +/* + * Returns an array of NumberInts (i.e. stmtId type) based on the specified option: + * - If option is 'isComplete', returns [NumberInt(0), ..., NumberInt((numStmtIds-1)*10)]. + * - If option is 'isIncomplete', returns [NumberInt(-1), NumberInt(1), ..., + * NumberInt(numStmtIds-1)]. + * - If option is 'isRepeated', returns [NumberInt(1), ..., NumberInt(1)]. + */ +function makeCustomStmtIdsForTest(numStmtIds, option) { + switch (option) { + case kStmtIdsOption.isComplete: + return [...Array(numStmtIds).keys()].map(i => NumberInt(i * 10)); + case kStmtIdsOption.isIncomplete: + let stmtIds = [...Array(numStmtIds).keys()]; + stmtIds[0] = -1; + return stmtIds.map(i => NumberInt(i)); + case kStmtIdsOption.isRepeated: + return Array(numStmtIds).fill(1).map(i => NumberInt(i)); + } +} + +function verifyOplogEntries(cmdObj, + lsid, + txnNumber, + numWriteStatements, + {shouldStoreStmtIds, customStmtIdsOption, isPreparedTransaction}) { + const writeCmdObj = Object.assign(cmdObj, { + lsid: lsid, + txnNumber: NumberLong(txnNumber), + startTransaction: true, + autocommit: false, + }); + let stmtIds = null; + if (customStmtIdsOption) { + stmtIds = makeCustomStmtIdsForTest(numWriteStatements, customStmtIdsOption); + writeCmdObj.stmtIds = stmtIds; + } + const commitCmdObj = { + commitTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + }; + + const writeRes = mongosTestDB.runCommand(writeCmdObj); + if (customStmtIdsOption == kStmtIdsOption.isRepeated) { + assert.commandFailedWithCode(writeRes, 5875600); + assert.commandWorked(mongosTestColl.remove({})); + return; + } + assert.commandWorked(writeRes); + if (isPreparedTransaction) { + const shard0Primary = st.rs0.getPrimary(); + const isPreparedTransactionRes = assert.commandWorked(shard0Primary.adminCommand({ + prepareTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + writeConcern: {w: "majority"}, + })); + commitCmdObj.commitTimestamp = isPreparedTransactionRes.prepareTimestamp; + assert.commandWorked(shard0Primary.adminCommand(commitCmdObj)); + } + assert.commandWorked(mongosTestDB.adminCommand(commitCmdObj)); + + const oplogEntries = getOplogEntriesForTxn(st.rs0, lsid, txnNumber); + assert.eq(oplogEntries.length, isPreparedTransaction ? 2 : 1, oplogEntries); + + const applyOpsOplogEntry = oplogEntries[0]; + assert(!applyOpsOplogEntry.hasOwnProperty("stmtId")); + const operations = applyOpsOplogEntry.o.applyOps; + operations.forEach((operation, index) => { + if (shouldStoreStmtIds) { + const operationStmtId = stmtIds ? stmtIds[index] : index; + if (operationStmtId == -1) { + // Uninitialized stmtIds should be ignored. + assert(!operation.hasOwnProperty("stmtId"), operation); + } else { + assert.eq(operation.stmtId, operationStmtId, operation); + } + } else { + assert(!operation.hasOwnProperty("stmtId"), operation); + } + }); + + if (isPreparedTransaction) { + const commitOplogEntry = oplogEntries[1]; + assert(!commitOplogEntry.hasOwnProperty("stmtId")); + } + + assert.commandWorked(mongosTestColl.remove({})); +} + +function testInserts(lsid, txnNumber, testOptions) { + jsTest.log("Test batched inserts"); + const insertCmdObj = { + insert: kCollName, + documents: [{_id: 0, x: 0}, {_id: 1, x: 1}], + }; + verifyOplogEntries(insertCmdObj, lsid, txnNumber, insertCmdObj.documents.length, testOptions); +} + +function testUpdates(lsid, txnNumber, testOptions) { + jsTest.log("Test batched updates"); + assert.commandWorked(mongosTestColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}])); + const updateCmdObj = { + update: kCollName, + updates: [ + {q: {_id: 0, x: 0}, u: {$inc: {x: -10}}}, + {q: {_id: 1, x: 1}, u: {$inc: {x: 10}}}, + ], + }; + verifyOplogEntries(updateCmdObj, lsid, txnNumber, updateCmdObj.updates.length, testOptions); +} + +function testDeletes(lsid, txnNumber, testOptions) { + jsTest.log("Test batched deletes"); + assert.commandWorked(mongosTestColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}])); + const deleteCmdObj = { + delete: kCollName, + deletes: [ + {q: {_id: 0, x: 0}, limit: 0}, + {q: {_id: 1, x: 1}, limit: 0}, + ], + }; + verifyOplogEntries(deleteCmdObj, lsid, txnNumber, deleteCmdObj.deletes.length, testOptions); +} + +{ + jsTest.log("Test that oplog entries for non-internal transactions do not have stmtIds"); + const lsid = {id: UUID()}; + let txnNumber = 0; + const testOptions = {shouldStoreStmtIds: false}; + testInserts(lsid, txnNumber++, testOptions); + testUpdates(lsid, txnNumber++, testOptions); + testDeletes(lsid, txnNumber++, testOptions); +} + +{ + jsTest.log( + "Test that oplog entries for non-retryable internal transactions do not have stmtIds"); + const lsid = {id: UUID(), txnUUID: UUID()}; + let txnNumber = 0; + const testOptions = {shouldStoreStmtIds: false}; + testInserts(lsid, txnNumber++, testOptions); + testUpdates(lsid, txnNumber++, testOptions); + testDeletes(lsid, txnNumber++, testOptions); +} + +{ + jsTest.log("Test that oplog entries for retryable internal transactions have stmtIds"); + const lsid = {id: UUID(), txnNumber: NumberLong(0), txnUUID: UUID()}; + let txnNumber = 0; + + let runTests = ({isPreparedTransaction}) => { + jsTest.log("Test prepared transactions: " + isPreparedTransaction); + + jsTest.log("Test with default stmtIds"); + const testOptions0 = {shouldStoreStmtIds: true, isPreparedTransaction}; + testInserts(lsid, txnNumber++, testOptions0); + testUpdates(lsid, txnNumber++, testOptions0); + testDeletes(lsid, txnNumber++, testOptions0); + + jsTest.log("Test with custom and valid stmtIds"); + const testOptions1 = { + shouldStoreStmtIds: true, + customStmtIdsOption: kStmtIdsOption.isComplete, + isPreparedTransaction + }; + testInserts(lsid, txnNumber++, testOptions1); + testUpdates(lsid, txnNumber++, testOptions1); + testDeletes(lsid, txnNumber++, testOptions1); + + jsTest.log( + "Test with custom stmtIds containing -1. Verify that operation entries for write " + + "statements with stmtId=-1 do not have a 'stmtId' field"); + const testOptions2 = { + shouldStoreStmtIds: true, + customStmtIdsOption: kStmtIdsOption.isIncomplete, + isPreparedTransaction + }; + testInserts(lsid, txnNumber++, testOptions2); + testUpdates(lsid, txnNumber++, testOptions2); + testDeletes(lsid, txnNumber++, testOptions2); + + jsTest.log( + "Test with custom stmtIds containing repeats. Verify that the command fails with " + + "a uassert instead causes the mongod that executes it to crash"); + const testOptions3 = { + shouldStoreStmtIds: true, + customStmtIdsOption: kStmtIdsOption.isRepeated, + isPreparedTransaction + }; + testInserts(lsid, txnNumber++, testOptions3); + testUpdates(lsid, txnNumber++, testOptions3); + testDeletes(lsid, txnNumber++, testOptions3); + }; + + runTests({isPreparedTransaction: false}); + runTests({isPreparedTransaction: true}); +} + +st.stop(); +})(); diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js index 21b26c3d98a..e11c7c857fb 100644 --- a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js +++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js @@ -46,6 +46,7 @@ function testCommitAfterRetry(db, lsid, txnNumber) { documents: [{x: 0}], lsid: lsid, txnNumber: txnNumber, + stmtId: NumberInt(0), startTransaction: true, autocommit: false, }; @@ -62,6 +63,7 @@ function testCommitAfterRetry(db, lsid, txnNumber) { documents: [{x: 1}], lsid: lsid, txnNumber: txnNumber, + stmtId: NumberInt(1), autocommit: false, }; const insertRes0 = assert.commandFailedWithCode( diff --git a/jstests/sharding/libs/sharded_transactions_helpers.js b/jstests/sharding/libs/sharded_transactions_helpers.js index 205a804eeb6..573a11e099f 100644 --- a/jstests/sharding/libs/sharded_transactions_helpers.js +++ b/jstests/sharding/libs/sharded_transactions_helpers.js @@ -191,3 +191,11 @@ function flushRoutersAndRefreshShardMetadata(st, {ns, dbNames = []} = {}) { }); }); } + +function getOplogEntriesForTxn(rs, lsid, txnNumber) { + const filter = {txnNumber: NumberLong(txnNumber)}; + for (let k in lsid) { + filter["lsid." + k] = lsid[k]; + } + return rs.getPrimary().getCollection("local.oplog.rs").find(filter).sort({_id: 1}).toArray(); +} diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp index 525ed561db2..88f3a32874f 100644 --- a/src/mongo/db/logical_session_id_helpers.cpp +++ b/src/mongo/db/logical_session_id_helpers.cpp @@ -89,6 +89,10 @@ LogicalSessionId castToParentSessionId(const LogicalSessionId& sessionId) { return sessionId; } +bool isInternalSessionForRetryableWrite(const LogicalSessionId& sessionId) { + return sessionId.getTxnNumber().has_value(); +} + LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient, OperationContext* opCtx, std::initializer_list<Privilege> allowSpoof) { diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h index 3c799e2177d..f0b732f8862 100644 --- a/src/mongo/db/logical_session_id_helpers.h +++ b/src/mongo/db/logical_session_id_helpers.h @@ -61,6 +61,12 @@ boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& ses LogicalSessionId castToParentSessionId(const LogicalSessionId& sessionId); /** + * Returns true if the session with the given session id is an internal session for internal + * transactions for retryable writes. + */ +bool isInternalSessionForRetryableWrite(const LogicalSessionId& sessionId); + +/** * Factory functions to generate logical session records. */ LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 234d7c3d0df..03488edae19 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -543,8 +543,14 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, return; } + const bool inRetryableInternalTransaction = + isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + for (auto iter = first; iter != last; iter++) { auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc); + if (inRetryableInternalTransaction) { + operation.setInitializedStatementIds(iter->stmtIds); + } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(iter->doc)); txnParticipant.addTransactionOperation(opCtx, operation); @@ -651,9 +657,14 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg OpTimeBundle opTime; if (inMultiDocumentTransaction) { + const bool inRetryableInternalTransaction = + isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); - + if (inRetryableInternalTransaction) { + operation.setInitializedStatementIds(args.updateArgs.stmtIds); + } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); @@ -665,7 +676,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; - oplogEntry.getDurableReplOperation().setDestinedRecipient( + oplogEntry.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) { @@ -788,8 +799,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, tassert(5868700, "Attempted a retryable write within a multi-document transaction", args.retryableWritePreImageRecordingType == RetryableOptions::kNotRetryable); + + const bool inRetryableInternalTransaction = + isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); + if (inRetryableInternalTransaction) { + operation.setInitializedStatementIds({stmtId}); + } if (args.preImageRecordingEnabledForCollection) { tassert(5868701, @@ -1209,12 +1227,14 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, namespace { // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array -// field. Appends as many operations as possible until either the constructed object exceeds the -// 16MB limit or the maximum number of transaction statements allowed in one entry. +// field. Appends as many operations as possible to the array (and their corresponding statement +// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the +// maximum number of transaction statements allowed in one entry. // // Returns an iterator to the first statement that wasn't packed into the applyOps object. std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, + std::vector<StmtId>* stmtIdsWritten, std::vector<repl::ReplOperation>::iterator stmtBegin, std::vector<repl::ReplOperation>::iterator stmtEnd) { @@ -1234,6 +1254,8 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( BSONObjMaxUserSize))) break; opsArray.append(stmt.toBSON()); + const auto stmtIds = stmt.getStatementIds(); + stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end()); } try { // BSONArrayBuilder will throw a BSONObjectTooLarge exception if we exceeded the max BSON @@ -1264,7 +1286,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, MutableOplogEntry* oplogEntry, boost::optional<DurableTxnStateEnum> txnState, boost::optional<repl::OpTime> startOpTime, + std::vector<StmtId> stmtIdsWritten, const bool updateTxnTable) { + if (!stmtIdsWritten.empty()) { + invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId())); + } + const bool areInternalTransactionsEnabled = feature_flags::gFeatureFlagInternalTransactions.isEnabled( serverGlobalParams.featureCompatibility); @@ -1291,7 +1318,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, if (areInternalTransactionsEnabled) { sessionTxnRecord.setTxnRetryCounter(txnRetryCounter); } - onWriteOpCompleted(opCtx, {}, sessionTxnRecord); + onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord); } return times; } catch (const AssertionException& e) { @@ -1380,8 +1407,9 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, while (stmtsIter != stmts->end()) { BSONObjBuilder applyOpsBuilder; - auto nextStmt = - packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts->end()); + std::vector<StmtId> stmtIdsWritten; + auto nextStmt = packTransactionStatementsForApplyOps( + &applyOpsBuilder, &stmtIdsWritten, stmtsIter, stmts->end()); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. @@ -1441,8 +1469,8 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress : (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted); - prevWriteOpTime = - logApplyOpsForTransaction(opCtx, &oplogEntry, txnState, startOpTime, updateTxnTable); + prevWriteOpTime = logApplyOpsForTransaction( + opCtx, &oplogEntry, txnState, startOpTime, std::move(stmtIdsWritten), updateTxnTable); hangAfterLoggingApplyOpsForTransaction.pauseWhileSet(); @@ -1623,6 +1651,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, &oplogEntry, DurableTxnStateEnum::kPrepared, oplogSlot, + {}, true /* updateTxnTable */); } wuow.commit(); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index ab63235a741..e914b93dbdb 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -36,6 +36,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_id.h" #include "mongo/db/query/datetime/date_time_support.h" #include "mongo/db/storage/recovery_unit.h" @@ -451,6 +452,11 @@ public: return _inMultiDocumentTransaction; } + bool isRetryableWrite() const { + return _txnNumber && + (!_inMultiDocumentTransaction || isInternalSessionForRetryableWrite(*_lsid)); + } + /** * Sets that this operation is part of a multi-document transaction. Once this is set, it cannot * be unset. diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f43dcf8581d..de03842f680 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -572,8 +572,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, template <typename T> StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) { - return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex) - : kUninitializedStmtId; + return opCtx->isRetryableWrite() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex) + : kUninitializedStmtId; } SingleWriteResult makeWriteResultForInsertOrDeleteRetry() { diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 3f3c88cd359..ee80f7afaf3 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -284,8 +284,10 @@ OpTime MutableOplogEntry::getOpTime() const { } size_t DurableOplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) { + const auto stmtIds = variant_util::toVector<StmtId>(op.getStatementIds()); return sizeof(op) + op.getNss().size() + op.getObject().objsize() + - (op.getObject2() ? op.getObject2()->objsize() : 0); + (op.getObject2() ? op.getObject2()->objsize() : 0) + + (sizeof(std::vector<StmtId>) + (sizeof(StmtId) * stmtIds.size())); } StatusWith<DurableOplogEntry> DurableOplogEntry::parse(const BSONObj& object) { diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index c13fd9265df..f8d63ca61cc 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -41,6 +41,18 @@ namespace mongo { namespace repl { +namespace variant_util { +template <typename T> +std::vector<T> toVector(boost::optional<stdx::variant<T, std::vector<T>>> optVals) { + if (!optVals) { + return {}; + } + return stdx::visit(visit_helper::Overloaded{[](T val) { return std::vector<T>{val}; }, + [](const std::vector<T>& vals) { return vals; }}, + *optVals); +} +} // namespace variant_util + /** * The first oplog entry is a no-op with this message in its "msg" field. */ @@ -76,6 +88,25 @@ public: _fullPreImage = std::move(value); } + /** + * Sets the statement ids for this ReplOperation to 'stmtIds' if it does not contain any + * kUninitializedStmtId (i.e. placeholder statement id). + */ + void setInitializedStatementIds(const std::vector<StmtId>& stmtIds) & { + if (std::count(stmtIds.begin(), stmtIds.end(), kUninitializedStmtId) > 0) { + return; + } + if (stmtIds.size() > 1) { + DurableReplOperation::setStatementIds({{stmtIds}}); + } else if (stmtIds.size() == 1) { + DurableReplOperation::setStatementIds({{stmtIds.front()}}); + } + } + + std::vector<StmtId> getStatementIds() const { + return variant_util::toVector<StmtId>(DurableReplOperation::getStatementIds()); + } + private: BSONObj _preImageDocumentKey; BSONObj _fullPreImage; @@ -123,22 +154,16 @@ public: void setStatementIds(const std::vector<StmtId>& stmtIds) & { if (stmtIds.empty()) { - OplogEntryBase::setStatementIds(boost::none); + getDurableReplOperation().setStatementIds(boost::none); } else if (stmtIds.size() == 1) { - OplogEntryBase::setStatementIds({{stmtIds.front()}}); + getDurableReplOperation().setStatementIds({{stmtIds.front()}}); } else { - OplogEntryBase::setStatementIds({{stmtIds}}); + getDurableReplOperation().setStatementIds({{stmtIds}}); } } std::vector<StmtId> getStatementIds() const { - if (!OplogEntryBase::getStatementIds()) { - return {}; - } - return stdx::visit( - visit_helper::Overloaded{[](StmtId stmtId) { return std::vector<StmtId>{stmtId}; }, - [](const std::vector<StmtId>& stmtIds) { return stmtIds; }}, - *OplogEntryBase::getStatementIds()); + return variant_util::toVector<StmtId>(OplogEntryBase::getStatementIds()); } void setTxnNumber(boost::optional<std::int64_t> value) & { diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 6b014e66e3e..06c9ee1fcab 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -59,8 +59,8 @@ enums: structs: DurableReplOperation: - description: "A document that represents an operation in a transaction. Should never be - used directly in server code. Instead, create an instance of ReplOperation." + description: "A document that represents an operation. Should never be used directly in + server code. Instead, create an instance of ReplOperation." fields: op: cpp_name: opType @@ -104,6 +104,13 @@ structs: optional: true description: "The destined recipient for this op under the new shard key pattern. Only included when a resharding operation is in progress." + stmtId: + cpp_name: statementIds + type: + variant: [StmtId, array<StmtId>] + optional: true + description: "Identifier of the transaction statement(s) which generated this oplog + entry" OplogEntryBase: description: A document in which the server stores an oplog entry. @@ -142,13 +149,6 @@ structs: optional: true description: "Used by tests in replication and also by production resharding code to store timestamps." - stmtId: - cpp_name: statementIds - type: - variant: [StmtId, array<StmtId>] - optional: true - description: "Identifier of the transaction statement(s) which generated this oplog - entry" prevOpTime: cpp_name: prevWriteOpTimeInTransaction type: optime diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index fa4194a2632..8e3a861b990 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1316,6 +1316,14 @@ void TransactionParticipant::Participant::addTransactionOperation( invariant(p().autoCommit && !*p().autoCommit && o().activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); p().transactionOperations.push_back(operation); + const auto stmtIds = operation.getStatementIds(); + for (auto stmtId : stmtIds) { + auto [_, inserted] = p().transactionStmtIds.insert(stmtId); + uassert(5875600, + str::stream() << "Found two operations using the same stmtId of " << stmtId, + inserted); + } + p().transactionOperationBytes += repl::DurableOplogEntry::getDurableReplOperationSize(operation); if (!operation.getPreImage().isEmpty()) { @@ -1357,6 +1365,7 @@ void TransactionParticipant::Participant::clearOperationsInMemory(OperationConte invariant(p().autoCommit); p().transactionOperationBytes = 0; p().transactionOperations.clear(); + p().transactionStmtIds.clear(); p().numberOfPreImagesToWrite = 0; } @@ -2404,6 +2413,7 @@ void TransactionParticipant::Participant::_resetTransactionState( p().transactionOperationBytes = 0; p().transactionOperations.clear(); + p().transactionStmtIds.clear(); o(wl).prepareOpTime = repl::OpTime(); o(wl).recoveryPrepareOpTime = repl::OpTime(); p().autoCommit = boost::none; diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index f6d12ee9dc9..80dc173cd9c 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -1010,6 +1010,10 @@ private: // transaction. std::vector<repl::ReplOperation> transactionOperations; + // Holds stmtIds for operations which have been applied in the current multi-document + // transaction. + stdx::unordered_set<StmtId> transactionStmtIds; + // Total size in bytes of all operations within the _transactionOperations vector. size_t transactionOperationBytes{0}; diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index a07abc90d5f..7b6b95e018e 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -1369,7 +1369,7 @@ TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { // Tests that a transaction aborts if it becomes too large based on the server parameter // 'transactionLimitBytes'. -TEST_F(TxnParticipantTest, TransactionExceedsSizeParameter) { +TEST_F(TxnParticipantTest, TransactionExceedsSizeParameterObjectField) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -1394,6 +1394,34 @@ TEST_F(TxnParticipantTest, TransactionExceedsSizeParameter) { ErrorCodes::TransactionTooLarge); } +TEST_F(TxnParticipantTest, TransactionExceedsSizeParameterStmtIdsField) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + auto oldLimit = gTransactionSizeLimitBytes.load(); + ON_BLOCK_EXIT([oldLimit] { gTransactionSizeLimitBytes.store(oldLimit); }); + + // Set a limit of 2.5 MB + gTransactionSizeLimitBytes.store(2 * 1024 * 1024 + 512 * 1024); + + // Two 1MB operations should succeed; three 1MB operations should fail. + int stmtId = 0; + auto makeOperation = [&] { + std::vector<StmtId> stmtIds; + stmtIds.resize(1024 * 1024 / sizeof(StmtId)); + std::generate(stmtIds.begin(), stmtIds.end(), [&stmtId] { return stmtId++; }); + auto operation = repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSONObj()); + operation.setInitializedStatementIds(stmtIds); + return operation; + }; + txnParticipant.addTransactionOperation(opCtx(), makeOperation()); + txnParticipant.addTransactionOperation(opCtx(), makeOperation()); + ASSERT_THROWS_CODE(txnParticipant.addTransactionOperation(opCtx(), makeOperation()), + AssertionException, + ErrorCodes::TransactionTooLarge); +} + TEST_F(TxnParticipantTest, StashInNestedSessionIsANoop) { auto outerScopedSession = checkOutSession(); Locker* originalLocker = opCtx()->lockState(); diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 533bac1f1f3..e9f1bd1617b 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -259,7 +259,8 @@ BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest& : _opCtx(opCtx), _clientRequest(clientRequest), _batchTxnNum(_opCtx->getTxnNumber()), - _inTransaction(bool(TransactionRouter::get(opCtx))) { + _inTransaction(bool(TransactionRouter::get(opCtx))), + _isRetryableWrite(opCtx->isRetryableWrite()) { _writeOps.reserve(_clientRequest.sizeWriteOps()); for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) { @@ -467,7 +468,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& const auto batchType = _clientRequest.getBatchType(); boost::optional<std::vector<int32_t>> stmtIdsForOp; - if (_batchTxnNum) { + if (_isRetryableWrite) { stmtIdsForOp.emplace(); } @@ -551,7 +552,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& wcb.setIsTimeseriesNamespace(true); } - if (_batchTxnNum) { + if (_isRetryableWrite) { wcb.setStmtIds(std::move(stmtIdsForOp)); } diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 73a9f8175a5..375ae643079 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -245,6 +245,7 @@ private: // Set to true if this write is part of a transaction. const bool _inTransaction{false}; + const bool _isRetryableWrite{false}; boost::optional<int> _nShardsOwningChunks; }; diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index caf629be09b..f2c520f094b 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -1464,6 +1464,90 @@ TEST_F(BatchWriteOpTest, MultiOpTwoWCErrors) { ASSERT(clientResponse.isWriteConcernErrorSet()); } +TEST_F(BatchWriteOpTest, AttachingStmtIds) { + NamespaceString nss("foo.bar"); + ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED(), boost::none); + auto targeter = initTargeterFullRange(nss, endpoint); + + const std::vector<StmtId> stmtIds{1, 2, 3}; + const std::vector<LogicalSessionId> lsids{ + makeLogicalSessionIdForTest(), + makeLogicalSessionIdWithTxnUUIDForTest(), + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(), + }; + const BatchedCommandRequest originalRequest([&] { + write_ops::InsertCommandRequest insertOp(nss); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2), BSON("x" << 3)}); + insertOp.getWriteCommandRequestBase().setStmtIds({stmtIds}); + return insertOp; + }()); + + auto makeTargetedBatchedCommandRequest = [&] { + BatchWriteOp batchOp(_opCtx, originalRequest); + + OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned; + std::map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap(); + ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted)); + ASSERT(!batchOp.isFinished()); + ASSERT_EQUALS(targeted.size(), 1u); + assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint); + + BatchedCommandRequest targetedRequest = + batchOp.buildBatchRequest(*targeted.begin()->second, targeter); + return targetedRequest; + }; + + { + // Verify that when the command is not running in a session, the targeted batched command + // request does not have stmtIds attached to it. + auto request = makeTargetedBatchedCommandRequest(); + ASSERT_FALSE(request.getInsertRequest().getStmtIds()); + } + + { + // Verify that when the command is running in a session but not as retryable writes or in a + // retryable internal transaction, the targeted batched command request does not have + // stmtIds to it. + for (auto& lsid : lsids) { + _opCtx->setLogicalSessionId(lsid); + auto targetedRequest = makeTargetedBatchedCommandRequest(); + ASSERT_FALSE(targetedRequest.getInsertRequest().getStmtIds()); + } + } + + { + // Verify that when the command is running in a session as retryable writes, the targeted + // batched command request has stmtIds attached to it. + _opCtx->setTxnNumber(0); + for (auto& lsid : lsids) { + _opCtx->setLogicalSessionId(lsid); + auto targetedRequest = makeTargetedBatchedCommandRequest(); + auto requestStmtIds = targetedRequest.getInsertRequest().getStmtIds(); + ASSERT(requestStmtIds); + ASSERT(*requestStmtIds == stmtIds); + } + } + + { + // Verify that when the command is running in a transaction, the targeted batched command + // request has stmtIds attached to it if and only if the transaction is a retryable internal + // transaction. + _opCtx->setTxnNumber(0); + _opCtx->setInMultiDocumentTransaction(); + for (auto& lsid : lsids) { + _opCtx->setLogicalSessionId(lsid); + auto request = makeTargetedBatchedCommandRequest(); + auto requestStmtIds = request.getInsertRequest().getStmtIds(); + if (isInternalSessionForRetryableWrite(lsid)) { + ASSERT(requestStmtIds); + ASSERT(*requestStmtIds == stmtIds); + } else { + ASSERT_FALSE(requestStmtIds); + } + } + } +} + // // Tests of batch size limit functionality // |