summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-19 14:52:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-29 00:50:10 +0000
commitea197fd5193fe034174584e60290dd90fe01d2b1 (patch)
treeac2e923c5dc87fd4115293b60f6ed926433b078a
parent5e8446f92d4826cfe2e8b9082efd0fbd540a9718 (diff)
downloadmongo-ea197fd5193fe034174584e60290dd90fe01d2b1.tar.gz
SERVER-58756 Store stmtIds for the operations in retryable internal transactions in applyOps oplog entries
-rw-r--r--buildscripts/idl/idl/generator.py9
-rw-r--r--jstests/sharding/internal_sessions_end_sessions.js2
-rw-r--r--jstests/sharding/internal_sessions_reaping.js2
-rw-r--r--jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js227
-rw-r--r--jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js2
-rw-r--r--jstests/sharding/libs/sharded_transactions_helpers.js8
-rw-r--r--src/mongo/db/logical_session_id_helpers.cpp4
-rw-r--r--src/mongo/db/logical_session_id_helpers.h6
-rw-r--r--src/mongo/db/op_observer_impl.cpp47
-rw-r--r--src/mongo/db/operation_context.h6
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp4
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp4
-rw-r--r--src/mongo/db/repl/oplog_entry.h45
-rw-r--r--src/mongo/db/repl/oplog_entry.idl18
-rw-r--r--src/mongo/db/transaction_participant.cpp10
-rw-r--r--src/mongo/db/transaction_participant.h4
-rw-r--r--src/mongo/db/transaction_participant_test.cpp30
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp7
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h1
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp84
20 files changed, 481 insertions, 39 deletions
diff --git a/buildscripts/idl/idl/generator.py b/buildscripts/idl/idl/generator.py
index 51569af4d4f..00ea3113455 100644
--- a/buildscripts/idl/idl/generator.py
+++ b/buildscripts/idl/idl/generator.py
@@ -1258,8 +1258,13 @@ class _CppSourceFileWriter(_CppFileWriterBase):
self._writer.write_line('++expectedFieldNumber;')
if field.chained_struct_field:
- self._writer.write_line('%s.%s(std::move(values));' % (_get_field_member_name(
- field.chained_struct_field), _get_field_member_setter_name(field)))
+ if field.type.is_variant:
+ self._writer.write_line('%s.%s(%s(std::move(values)));' %
+ (_get_field_member_name(field.chained_struct_field),
+ _get_field_member_setter_name(field), field.type.cpp_type))
+ else:
+ self._writer.write_line('%s.%s(std::move(values));' % (_get_field_member_name(
+ field.chained_struct_field), _get_field_member_setter_name(field)))
else:
self._writer.write_line('%s = std::move(values);' % (_get_field_member_name(field)))
diff --git a/jstests/sharding/internal_sessions_end_sessions.js b/jstests/sharding/internal_sessions_end_sessions.js
index 76297576b59..7d5d24b648b 100644
--- a/jstests/sharding/internal_sessions_end_sessions.js
+++ b/jstests/sharding/internal_sessions_end_sessions.js
@@ -99,7 +99,7 @@ numTransactionsCollEntries++;
numImageCollEntries++;
assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount());
-// TODO (SERVER-58756): a retryable findAndModify command run inside a retryable internal
+// TODO (SERVER-60540): a retryable findAndModify command run inside a retryable internal
// transaction should have a config.image_collection entry like a regular retryable write.
// assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount());
diff --git a/jstests/sharding/internal_sessions_reaping.js b/jstests/sharding/internal_sessions_reaping.js
index ebf0916d174..e0ac3d7571a 100644
--- a/jstests/sharding/internal_sessions_reaping.js
+++ b/jstests/sharding/internal_sessions_reaping.js
@@ -145,7 +145,7 @@ assert.commandWorked(testDB.runCommand(
jsTest.log("Verify that the config.transactions entry for the retryable internal transaction for " +
"the findAndModify did not get reaped although there is already a new retryable write");
assert.eq(numTransactionsCollEntries, transactionsCollOnPrimary.find().itcount());
-// TODO (SERVER-58756): a retryable findAndModify command run inside a retryable internal
+// TODO (SERVER-60540): a retryable findAndModify command run inside a retryable internal
// transaction should have a config.image_collection entry like a regular retryable write.
// assert.eq(numImageCollEntries, imageCollOnPrimary.find().itcount());
diff --git a/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js b/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js
new file mode 100644
index 00000000000..6a87e3505c6
--- /dev/null
+++ b/jstests/sharding/internal_transactions_for_retryable_writes_oplog_entries.js
@@ -0,0 +1,227 @@
+/*
+ * Tests that the stmtIds for write statements in an internal transaction for retryable writes
+ * are stored in the individual operation entries in the applyOps oplog entry for the transaction.
+ *
+ * @tags: [requires_fcv_51, featureFlagInternalTransactions]
+ */
+(function() {
+'use strict';
+
+load('jstests/sharding/libs/sharded_transactions_helpers.js');
+
+const kDbName = "testDb";
+const kCollName = "testColl";
+
+const st = new ShardingTest({shards: 1});
+const mongosTestDB = st.s.getDB(kDbName);
+const mongosTestColl = mongosTestDB.getCollection(kCollName);
+
+const kStmtIdsOption = {
+ isComplete: 1,
+ isIncomplete: 2,
+ isRepeated: 3
+};
+
+/*
+ * Returns an array of NumberInts (i.e. stmtId type) based on the specified option:
+ * - If option is 'isComplete', returns [NumberInt(0), ..., NumberInt((numStmtIds-1)*10)].
+ * - If option is 'isIncomplete', returns [NumberInt(-1), NumberInt(1), ...,
+ * NumberInt(numStmtIds-1)].
+ * - If option is 'isRepeated', returns [NumberInt(1), ..., NumberInt(1)].
+ */
+function makeCustomStmtIdsForTest(numStmtIds, option) {
+ switch (option) {
+ case kStmtIdsOption.isComplete:
+ return [...Array(numStmtIds).keys()].map(i => NumberInt(i * 10));
+ case kStmtIdsOption.isIncomplete:
+ let stmtIds = [...Array(numStmtIds).keys()];
+ stmtIds[0] = -1;
+ return stmtIds.map(i => NumberInt(i));
+ case kStmtIdsOption.isRepeated:
+ return Array(numStmtIds).fill(1).map(i => NumberInt(i));
+ }
+}
+
+function verifyOplogEntries(cmdObj,
+ lsid,
+ txnNumber,
+ numWriteStatements,
+ {shouldStoreStmtIds, customStmtIdsOption, isPreparedTransaction}) {
+ const writeCmdObj = Object.assign(cmdObj, {
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ startTransaction: true,
+ autocommit: false,
+ });
+ let stmtIds = null;
+ if (customStmtIdsOption) {
+ stmtIds = makeCustomStmtIdsForTest(numWriteStatements, customStmtIdsOption);
+ writeCmdObj.stmtIds = stmtIds;
+ }
+ const commitCmdObj = {
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ };
+
+ const writeRes = mongosTestDB.runCommand(writeCmdObj);
+ if (customStmtIdsOption == kStmtIdsOption.isRepeated) {
+ assert.commandFailedWithCode(writeRes, 5875600);
+ assert.commandWorked(mongosTestColl.remove({}));
+ return;
+ }
+ assert.commandWorked(writeRes);
+ if (isPreparedTransaction) {
+ const shard0Primary = st.rs0.getPrimary();
+ const isPreparedTransactionRes = assert.commandWorked(shard0Primary.adminCommand({
+ prepareTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ writeConcern: {w: "majority"},
+ }));
+ commitCmdObj.commitTimestamp = isPreparedTransactionRes.prepareTimestamp;
+ assert.commandWorked(shard0Primary.adminCommand(commitCmdObj));
+ }
+ assert.commandWorked(mongosTestDB.adminCommand(commitCmdObj));
+
+ const oplogEntries = getOplogEntriesForTxn(st.rs0, lsid, txnNumber);
+ assert.eq(oplogEntries.length, isPreparedTransaction ? 2 : 1, oplogEntries);
+
+ const applyOpsOplogEntry = oplogEntries[0];
+ assert(!applyOpsOplogEntry.hasOwnProperty("stmtId"));
+ const operations = applyOpsOplogEntry.o.applyOps;
+ operations.forEach((operation, index) => {
+ if (shouldStoreStmtIds) {
+ const operationStmtId = stmtIds ? stmtIds[index] : index;
+ if (operationStmtId == -1) {
+ // Uninitialized stmtIds should be ignored.
+ assert(!operation.hasOwnProperty("stmtId"), operation);
+ } else {
+ assert.eq(operation.stmtId, operationStmtId, operation);
+ }
+ } else {
+ assert(!operation.hasOwnProperty("stmtId"), operation);
+ }
+ });
+
+ if (isPreparedTransaction) {
+ const commitOplogEntry = oplogEntries[1];
+ assert(!commitOplogEntry.hasOwnProperty("stmtId"));
+ }
+
+ assert.commandWorked(mongosTestColl.remove({}));
+}
+
+function testInserts(lsid, txnNumber, testOptions) {
+ jsTest.log("Test batched inserts");
+ const insertCmdObj = {
+ insert: kCollName,
+ documents: [{_id: 0, x: 0}, {_id: 1, x: 1}],
+ };
+ verifyOplogEntries(insertCmdObj, lsid, txnNumber, insertCmdObj.documents.length, testOptions);
+}
+
+function testUpdates(lsid, txnNumber, testOptions) {
+ jsTest.log("Test batched updates");
+ assert.commandWorked(mongosTestColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}]));
+ const updateCmdObj = {
+ update: kCollName,
+ updates: [
+ {q: {_id: 0, x: 0}, u: {$inc: {x: -10}}},
+ {q: {_id: 1, x: 1}, u: {$inc: {x: 10}}},
+ ],
+ };
+ verifyOplogEntries(updateCmdObj, lsid, txnNumber, updateCmdObj.updates.length, testOptions);
+}
+
+function testDeletes(lsid, txnNumber, testOptions) {
+ jsTest.log("Test batched deletes");
+ assert.commandWorked(mongosTestColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}]));
+ const deleteCmdObj = {
+ delete: kCollName,
+ deletes: [
+ {q: {_id: 0, x: 0}, limit: 0},
+ {q: {_id: 1, x: 1}, limit: 0},
+ ],
+ };
+ verifyOplogEntries(deleteCmdObj, lsid, txnNumber, deleteCmdObj.deletes.length, testOptions);
+}
+
+{
+ jsTest.log("Test that oplog entries for non-internal transactions do not have stmtIds");
+ const lsid = {id: UUID()};
+ let txnNumber = 0;
+ const testOptions = {shouldStoreStmtIds: false};
+ testInserts(lsid, txnNumber++, testOptions);
+ testUpdates(lsid, txnNumber++, testOptions);
+ testDeletes(lsid, txnNumber++, testOptions);
+}
+
+{
+ jsTest.log(
+ "Test that oplog entries for non-retryable internal transactions do not have stmtIds");
+ const lsid = {id: UUID(), txnUUID: UUID()};
+ let txnNumber = 0;
+ const testOptions = {shouldStoreStmtIds: false};
+ testInserts(lsid, txnNumber++, testOptions);
+ testUpdates(lsid, txnNumber++, testOptions);
+ testDeletes(lsid, txnNumber++, testOptions);
+}
+
+{
+ jsTest.log("Test that oplog entries for retryable internal transactions have stmtIds");
+ const lsid = {id: UUID(), txnNumber: NumberLong(0), txnUUID: UUID()};
+ let txnNumber = 0;
+
+ let runTests = ({isPreparedTransaction}) => {
+ jsTest.log("Test prepared transactions: " + isPreparedTransaction);
+
+ jsTest.log("Test with default stmtIds");
+ const testOptions0 = {shouldStoreStmtIds: true, isPreparedTransaction};
+ testInserts(lsid, txnNumber++, testOptions0);
+ testUpdates(lsid, txnNumber++, testOptions0);
+ testDeletes(lsid, txnNumber++, testOptions0);
+
+ jsTest.log("Test with custom and valid stmtIds");
+ const testOptions1 = {
+ shouldStoreStmtIds: true,
+ customStmtIdsOption: kStmtIdsOption.isComplete,
+ isPreparedTransaction
+ };
+ testInserts(lsid, txnNumber++, testOptions1);
+ testUpdates(lsid, txnNumber++, testOptions1);
+ testDeletes(lsid, txnNumber++, testOptions1);
+
+ jsTest.log(
+ "Test with custom stmtIds containing -1. Verify that operation entries for write " +
+ "statements with stmtId=-1 do not have a 'stmtId' field");
+ const testOptions2 = {
+ shouldStoreStmtIds: true,
+ customStmtIdsOption: kStmtIdsOption.isIncomplete,
+ isPreparedTransaction
+ };
+ testInserts(lsid, txnNumber++, testOptions2);
+ testUpdates(lsid, txnNumber++, testOptions2);
+ testDeletes(lsid, txnNumber++, testOptions2);
+
+ jsTest.log(
+ "Test with custom stmtIds containing repeats. Verify that the command fails with " +
+ "a uassert instead causes the mongod that executes it to crash");
+ const testOptions3 = {
+ shouldStoreStmtIds: true,
+ customStmtIdsOption: kStmtIdsOption.isRepeated,
+ isPreparedTransaction
+ };
+ testInserts(lsid, txnNumber++, testOptions3);
+ testUpdates(lsid, txnNumber++, testOptions3);
+ testDeletes(lsid, txnNumber++, testOptions3);
+ };
+
+ runTests({isPreparedTransaction: false});
+ runTests({isPreparedTransaction: true});
+}
+
+st.stop();
+})();
diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js
index 21b26c3d98a..e11c7c857fb 100644
--- a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js
+++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js
@@ -46,6 +46,7 @@ function testCommitAfterRetry(db, lsid, txnNumber) {
documents: [{x: 0}],
lsid: lsid,
txnNumber: txnNumber,
+ stmtId: NumberInt(0),
startTransaction: true,
autocommit: false,
};
@@ -62,6 +63,7 @@ function testCommitAfterRetry(db, lsid, txnNumber) {
documents: [{x: 1}],
lsid: lsid,
txnNumber: txnNumber,
+ stmtId: NumberInt(1),
autocommit: false,
};
const insertRes0 = assert.commandFailedWithCode(
diff --git a/jstests/sharding/libs/sharded_transactions_helpers.js b/jstests/sharding/libs/sharded_transactions_helpers.js
index 205a804eeb6..573a11e099f 100644
--- a/jstests/sharding/libs/sharded_transactions_helpers.js
+++ b/jstests/sharding/libs/sharded_transactions_helpers.js
@@ -191,3 +191,11 @@ function flushRoutersAndRefreshShardMetadata(st, {ns, dbNames = []} = {}) {
});
});
}
+
+function getOplogEntriesForTxn(rs, lsid, txnNumber) {
+ const filter = {txnNumber: NumberLong(txnNumber)};
+ for (let k in lsid) {
+ filter["lsid." + k] = lsid[k];
+ }
+ return rs.getPrimary().getCollection("local.oplog.rs").find(filter).sort({_id: 1}).toArray();
+}
diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp
index 525ed561db2..88f3a32874f 100644
--- a/src/mongo/db/logical_session_id_helpers.cpp
+++ b/src/mongo/db/logical_session_id_helpers.cpp
@@ -89,6 +89,10 @@ LogicalSessionId castToParentSessionId(const LogicalSessionId& sessionId) {
return sessionId;
}
+bool isInternalSessionForRetryableWrite(const LogicalSessionId& sessionId) {
+ return sessionId.getTxnNumber().has_value();
+}
+
LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient,
OperationContext* opCtx,
std::initializer_list<Privilege> allowSpoof) {
diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h
index 3c799e2177d..f0b732f8862 100644
--- a/src/mongo/db/logical_session_id_helpers.h
+++ b/src/mongo/db/logical_session_id_helpers.h
@@ -61,6 +61,12 @@ boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& ses
LogicalSessionId castToParentSessionId(const LogicalSessionId& sessionId);
/**
+ * Returns true if the session with the given session id is an internal session for internal
+ * transactions for retryable writes.
+ */
+bool isInternalSessionForRetryableWrite(const LogicalSessionId& sessionId);
+
+/**
* Factory functions to generate logical session records.
*/
LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid,
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 234d7c3d0df..03488edae19 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -543,8 +543,14 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
return;
}
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+
for (auto iter = first; iter != last; iter++) {
auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc);
+ if (inRetryableInternalTransaction) {
+ operation.setInitializedStatementIds(iter->stmtIds);
+ }
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
txnParticipant.addTransactionOperation(opCtx, operation);
@@ -651,9 +657,14 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+
auto operation = MutableOplogEntry::makeUpdateOperation(
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
-
+ if (inRetryableInternalTransaction) {
+ operation.setInitializedStatementIds(args.updateArgs.stmtIds);
+ }
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
@@ -665,7 +676,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
- oplogEntry.getDurableReplOperation().setDestinedRecipient(
+ oplogEntry.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) {
@@ -788,8 +799,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
tassert(5868700,
"Attempted a retryable write within a multi-document transaction",
args.retryableWritePreImageRecordingType == RetryableOptions::kNotRetryable);
+
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+
auto operation =
MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+ if (inRetryableInternalTransaction) {
+ operation.setInitializedStatementIds({stmtId});
+ }
if (args.preImageRecordingEnabledForCollection) {
tassert(5868701,
@@ -1209,12 +1227,14 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
namespace {
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
-// field. Appends as many operations as possible until either the constructed object exceeds the
-// 16MB limit or the maximum number of transaction statements allowed in one entry.
+// field. Appends as many operations as possible to the array (and their corresponding statement
+// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
+// maximum number of transaction statements allowed in one entry.
//
// Returns an iterator to the first statement that wasn't packed into the applyOps object.
std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
+ std::vector<StmtId>* stmtIdsWritten,
std::vector<repl::ReplOperation>::iterator stmtBegin,
std::vector<repl::ReplOperation>::iterator stmtEnd) {
@@ -1234,6 +1254,8 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
BSONObjMaxUserSize)))
break;
opsArray.append(stmt.toBSON());
+ const auto stmtIds = stmt.getStatementIds();
+ stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end());
}
try {
// BSONArrayBuilder will throw a BSONObjectTooLarge exception if we exceeded the max BSON
@@ -1264,7 +1286,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
MutableOplogEntry* oplogEntry,
boost::optional<DurableTxnStateEnum> txnState,
boost::optional<repl::OpTime> startOpTime,
+ std::vector<StmtId> stmtIdsWritten,
const bool updateTxnTable) {
+ if (!stmtIdsWritten.empty()) {
+ invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()));
+ }
+
const bool areInternalTransactionsEnabled =
feature_flags::gFeatureFlagInternalTransactions.isEnabled(
serverGlobalParams.featureCompatibility);
@@ -1291,7 +1318,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
if (areInternalTransactionsEnabled) {
sessionTxnRecord.setTxnRetryCounter(txnRetryCounter);
}
- onWriteOpCompleted(opCtx, {}, sessionTxnRecord);
+ onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord);
}
return times;
} catch (const AssertionException& e) {
@@ -1380,8 +1407,9 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
while (stmtsIter != stmts->end()) {
BSONObjBuilder applyOpsBuilder;
- auto nextStmt =
- packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts->end());
+ std::vector<StmtId> stmtIdsWritten;
+ auto nextStmt = packTransactionStatementsForApplyOps(
+ &applyOpsBuilder, &stmtIdsWritten, stmtsIter, stmts->end());
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -1441,8 +1469,8 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
auto txnState = isPartialTxn
? DurableTxnStateEnum::kInProgress
: (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted);
- prevWriteOpTime =
- logApplyOpsForTransaction(opCtx, &oplogEntry, txnState, startOpTime, updateTxnTable);
+ prevWriteOpTime = logApplyOpsForTransaction(
+ opCtx, &oplogEntry, txnState, startOpTime, std::move(stmtIdsWritten), updateTxnTable);
hangAfterLoggingApplyOpsForTransaction.pauseWhileSet();
@@ -1623,6 +1651,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
&oplogEntry,
DurableTxnStateEnum::kPrepared,
oplogSlot,
+ {},
true /* updateTxnTable */);
}
wuow.commit();
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index ab63235a741..e914b93dbdb 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -36,6 +36,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_id.h"
#include "mongo/db/query/datetime/date_time_support.h"
#include "mongo/db/storage/recovery_unit.h"
@@ -451,6 +452,11 @@ public:
return _inMultiDocumentTransaction;
}
+ bool isRetryableWrite() const {
+ return _txnNumber &&
+ (!_inMultiDocumentTransaction || isInternalSessionForRetryableWrite(*_lsid));
+ }
+
/**
* Sets that this operation is part of a multi-document transaction. Once this is set, it cannot
* be unset.
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index f43dcf8581d..de03842f680 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -572,8 +572,8 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
template <typename T>
StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) {
- return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex)
- : kUninitializedStmtId;
+ return opCtx->isRetryableWrite() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex)
+ : kUninitializedStmtId;
}
SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 3f3c88cd359..ee80f7afaf3 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -284,8 +284,10 @@ OpTime MutableOplogEntry::getOpTime() const {
}
size_t DurableOplogEntry::getDurableReplOperationSize(const DurableReplOperation& op) {
+ const auto stmtIds = variant_util::toVector<StmtId>(op.getStatementIds());
return sizeof(op) + op.getNss().size() + op.getObject().objsize() +
- (op.getObject2() ? op.getObject2()->objsize() : 0);
+ (op.getObject2() ? op.getObject2()->objsize() : 0) +
+ (sizeof(std::vector<StmtId>) + (sizeof(StmtId) * stmtIds.size()));
}
StatusWith<DurableOplogEntry> DurableOplogEntry::parse(const BSONObj& object) {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index c13fd9265df..f8d63ca61cc 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -41,6 +41,18 @@
namespace mongo {
namespace repl {
+namespace variant_util {
+template <typename T>
+std::vector<T> toVector(boost::optional<stdx::variant<T, std::vector<T>>> optVals) {
+ if (!optVals) {
+ return {};
+ }
+ return stdx::visit(visit_helper::Overloaded{[](T val) { return std::vector<T>{val}; },
+ [](const std::vector<T>& vals) { return vals; }},
+ *optVals);
+}
+} // namespace variant_util
+
/**
* The first oplog entry is a no-op with this message in its "msg" field.
*/
@@ -76,6 +88,25 @@ public:
_fullPreImage = std::move(value);
}
+ /**
+ * Sets the statement ids for this ReplOperation to 'stmtIds' if it does not contain any
+ * kUninitializedStmtId (i.e. placeholder statement id).
+ */
+ void setInitializedStatementIds(const std::vector<StmtId>& stmtIds) & {
+ if (std::count(stmtIds.begin(), stmtIds.end(), kUninitializedStmtId) > 0) {
+ return;
+ }
+ if (stmtIds.size() > 1) {
+ DurableReplOperation::setStatementIds({{stmtIds}});
+ } else if (stmtIds.size() == 1) {
+ DurableReplOperation::setStatementIds({{stmtIds.front()}});
+ }
+ }
+
+ std::vector<StmtId> getStatementIds() const {
+ return variant_util::toVector<StmtId>(DurableReplOperation::getStatementIds());
+ }
+
private:
BSONObj _preImageDocumentKey;
BSONObj _fullPreImage;
@@ -123,22 +154,16 @@ public:
void setStatementIds(const std::vector<StmtId>& stmtIds) & {
if (stmtIds.empty()) {
- OplogEntryBase::setStatementIds(boost::none);
+ getDurableReplOperation().setStatementIds(boost::none);
} else if (stmtIds.size() == 1) {
- OplogEntryBase::setStatementIds({{stmtIds.front()}});
+ getDurableReplOperation().setStatementIds({{stmtIds.front()}});
} else {
- OplogEntryBase::setStatementIds({{stmtIds}});
+ getDurableReplOperation().setStatementIds({{stmtIds}});
}
}
std::vector<StmtId> getStatementIds() const {
- if (!OplogEntryBase::getStatementIds()) {
- return {};
- }
- return stdx::visit(
- visit_helper::Overloaded{[](StmtId stmtId) { return std::vector<StmtId>{stmtId}; },
- [](const std::vector<StmtId>& stmtIds) { return stmtIds; }},
- *OplogEntryBase::getStatementIds());
+ return variant_util::toVector<StmtId>(OplogEntryBase::getStatementIds());
}
void setTxnNumber(boost::optional<std::int64_t> value) & {
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 6b014e66e3e..06c9ee1fcab 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -59,8 +59,8 @@ enums:
structs:
DurableReplOperation:
- description: "A document that represents an operation in a transaction. Should never be
- used directly in server code. Instead, create an instance of ReplOperation."
+ description: "A document that represents an operation. Should never be used directly in
+ server code. Instead, create an instance of ReplOperation."
fields:
op:
cpp_name: opType
@@ -104,6 +104,13 @@ structs:
optional: true
description: "The destined recipient for this op under the new shard key pattern.
Only included when a resharding operation is in progress."
+ stmtId:
+ cpp_name: statementIds
+ type:
+ variant: [StmtId, array<StmtId>]
+ optional: true
+ description: "Identifier of the transaction statement(s) which generated this oplog
+ entry"
OplogEntryBase:
description: A document in which the server stores an oplog entry.
@@ -142,13 +149,6 @@ structs:
optional: true
description: "Used by tests in replication and also by production resharding code to
store timestamps."
- stmtId:
- cpp_name: statementIds
- type:
- variant: [StmtId, array<StmtId>]
- optional: true
- description: "Identifier of the transaction statement(s) which generated this oplog
- entry"
prevOpTime:
cpp_name: prevWriteOpTimeInTransaction
type: optime
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index fa4194a2632..8e3a861b990 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1316,6 +1316,14 @@ void TransactionParticipant::Participant::addTransactionOperation(
invariant(p().autoCommit && !*p().autoCommit && o().activeTxnNumber != kUninitializedTxnNumber);
invariant(opCtx->lockState()->inAWriteUnitOfWork());
p().transactionOperations.push_back(operation);
+ const auto stmtIds = operation.getStatementIds();
+ for (auto stmtId : stmtIds) {
+ auto [_, inserted] = p().transactionStmtIds.insert(stmtId);
+ uassert(5875600,
+ str::stream() << "Found two operations using the same stmtId of " << stmtId,
+ inserted);
+ }
+
p().transactionOperationBytes +=
repl::DurableOplogEntry::getDurableReplOperationSize(operation);
if (!operation.getPreImage().isEmpty()) {
@@ -1357,6 +1365,7 @@ void TransactionParticipant::Participant::clearOperationsInMemory(OperationConte
invariant(p().autoCommit);
p().transactionOperationBytes = 0;
p().transactionOperations.clear();
+ p().transactionStmtIds.clear();
p().numberOfPreImagesToWrite = 0;
}
@@ -2404,6 +2413,7 @@ void TransactionParticipant::Participant::_resetTransactionState(
p().transactionOperationBytes = 0;
p().transactionOperations.clear();
+ p().transactionStmtIds.clear();
o(wl).prepareOpTime = repl::OpTime();
o(wl).recoveryPrepareOpTime = repl::OpTime();
p().autoCommit = boost::none;
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index f6d12ee9dc9..80dc173cd9c 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -1010,6 +1010,10 @@ private:
// transaction.
std::vector<repl::ReplOperation> transactionOperations;
+ // Holds stmtIds for operations which have been applied in the current multi-document
+ // transaction.
+ stdx::unordered_set<StmtId> transactionStmtIds;
+
// Total size in bytes of all operations within the _transactionOperations vector.
size_t transactionOperationBytes{0};
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index a07abc90d5f..7b6b95e018e 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -1369,7 +1369,7 @@ TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) {
// Tests that a transaction aborts if it becomes too large based on the server parameter
// 'transactionLimitBytes'.
-TEST_F(TxnParticipantTest, TransactionExceedsSizeParameter) {
+TEST_F(TxnParticipantTest, TransactionExceedsSizeParameterObjectField) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -1394,6 +1394,34 @@ TEST_F(TxnParticipantTest, TransactionExceedsSizeParameter) {
ErrorCodes::TransactionTooLarge);
}
+TEST_F(TxnParticipantTest, TransactionExceedsSizeParameterStmtIdsField) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ auto oldLimit = gTransactionSizeLimitBytes.load();
+ ON_BLOCK_EXIT([oldLimit] { gTransactionSizeLimitBytes.store(oldLimit); });
+
+ // Set a limit of 2.5 MB
+ gTransactionSizeLimitBytes.store(2 * 1024 * 1024 + 512 * 1024);
+
+ // Two 1MB operations should succeed; three 1MB operations should fail.
+ int stmtId = 0;
+ auto makeOperation = [&] {
+ std::vector<StmtId> stmtIds;
+ stmtIds.resize(1024 * 1024 / sizeof(StmtId));
+ std::generate(stmtIds.begin(), stmtIds.end(), [&stmtId] { return stmtId++; });
+ auto operation = repl::DurableOplogEntry::makeInsertOperation(kNss, _uuid, BSONObj());
+ operation.setInitializedStatementIds(stmtIds);
+ return operation;
+ };
+ txnParticipant.addTransactionOperation(opCtx(), makeOperation());
+ txnParticipant.addTransactionOperation(opCtx(), makeOperation());
+ ASSERT_THROWS_CODE(txnParticipant.addTransactionOperation(opCtx(), makeOperation()),
+ AssertionException,
+ ErrorCodes::TransactionTooLarge);
+}
+
TEST_F(TxnParticipantTest, StashInNestedSessionIsANoop) {
auto outerScopedSession = checkOutSession();
Locker* originalLocker = opCtx()->lockState();
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 533bac1f1f3..e9f1bd1617b 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -259,7 +259,8 @@ BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest&
: _opCtx(opCtx),
_clientRequest(clientRequest),
_batchTxnNum(_opCtx->getTxnNumber()),
- _inTransaction(bool(TransactionRouter::get(opCtx))) {
+ _inTransaction(bool(TransactionRouter::get(opCtx))),
+ _isRetryableWrite(opCtx->isRetryableWrite()) {
_writeOps.reserve(_clientRequest.sizeWriteOps());
for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) {
@@ -467,7 +468,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch&
const auto batchType = _clientRequest.getBatchType();
boost::optional<std::vector<int32_t>> stmtIdsForOp;
- if (_batchTxnNum) {
+ if (_isRetryableWrite) {
stmtIdsForOp.emplace();
}
@@ -551,7 +552,7 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest(const TargetedWriteBatch&
wcb.setIsTimeseriesNamespace(true);
}
- if (_batchTxnNum) {
+ if (_isRetryableWrite) {
wcb.setStmtIds(std::move(stmtIdsForOp));
}
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 73a9f8175a5..375ae643079 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -245,6 +245,7 @@ private:
// Set to true if this write is part of a transaction.
const bool _inTransaction{false};
+ const bool _isRetryableWrite{false};
boost::optional<int> _nShardsOwningChunks;
};
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index caf629be09b..f2c520f094b 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -1464,6 +1464,90 @@ TEST_F(BatchWriteOpTest, MultiOpTwoWCErrors) {
ASSERT(clientResponse.isWriteConcernErrorSet());
}
+TEST_F(BatchWriteOpTest, AttachingStmtIds) {
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED(), boost::none);
+ auto targeter = initTargeterFullRange(nss, endpoint);
+
+ const std::vector<StmtId> stmtIds{1, 2, 3};
+ const std::vector<LogicalSessionId> lsids{
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdWithTxnUUIDForTest(),
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(),
+ };
+ const BatchedCommandRequest originalRequest([&] {
+ write_ops::InsertCommandRequest insertOp(nss);
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2), BSON("x" << 3)});
+ insertOp.getWriteCommandRequestBase().setStmtIds({stmtIds});
+ return insertOp;
+ }());
+
+ auto makeTargetedBatchedCommandRequest = [&] {
+ BatchWriteOp batchOp(_opCtx, originalRequest);
+
+ OwnedPointerMap<ShardId, TargetedWriteBatch> targetedOwned;
+ std::map<ShardId, TargetedWriteBatch*>& targeted = targetedOwned.mutableMap();
+ ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted));
+ ASSERT(!batchOp.isFinished());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ assertEndpointsEqual(targeted.begin()->second->getEndpoint(), endpoint);
+
+ BatchedCommandRequest targetedRequest =
+ batchOp.buildBatchRequest(*targeted.begin()->second, targeter);
+ return targetedRequest;
+ };
+
+ {
+ // Verify that when the command is not running in a session, the targeted batched command
+ // request does not have stmtIds attached to it.
+ auto request = makeTargetedBatchedCommandRequest();
+ ASSERT_FALSE(request.getInsertRequest().getStmtIds());
+ }
+
+ {
+ // Verify that when the command is running in a session but not as retryable writes or in a
+ // retryable internal transaction, the targeted batched command request does not have
+ // stmtIds to it.
+ for (auto& lsid : lsids) {
+ _opCtx->setLogicalSessionId(lsid);
+ auto targetedRequest = makeTargetedBatchedCommandRequest();
+ ASSERT_FALSE(targetedRequest.getInsertRequest().getStmtIds());
+ }
+ }
+
+ {
+ // Verify that when the command is running in a session as retryable writes, the targeted
+ // batched command request has stmtIds attached to it.
+ _opCtx->setTxnNumber(0);
+ for (auto& lsid : lsids) {
+ _opCtx->setLogicalSessionId(lsid);
+ auto targetedRequest = makeTargetedBatchedCommandRequest();
+ auto requestStmtIds = targetedRequest.getInsertRequest().getStmtIds();
+ ASSERT(requestStmtIds);
+ ASSERT(*requestStmtIds == stmtIds);
+ }
+ }
+
+ {
+ // Verify that when the command is running in a transaction, the targeted batched command
+ // request has stmtIds attached to it if and only if the transaction is a retryable internal
+ // transaction.
+ _opCtx->setTxnNumber(0);
+ _opCtx->setInMultiDocumentTransaction();
+ for (auto& lsid : lsids) {
+ _opCtx->setLogicalSessionId(lsid);
+ auto request = makeTargetedBatchedCommandRequest();
+ auto requestStmtIds = request.getInsertRequest().getStmtIds();
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ ASSERT(requestStmtIds);
+ ASSERT(*requestStmtIds == stmtIds);
+ } else {
+ ASSERT_FALSE(requestStmtIds);
+ }
+ }
+ }
+}
+
//
// Tests of batch size limit functionality
//