summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2019-05-06 10:17:00 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2019-05-06 10:17:00 -0400
commit1b8d3d355363b3b2a5b8565df60f07a08393ced6 (patch)
treeaed490c00c4c7eedd72e8957490ff82edda87d10 /src/mongo
parent30f602bb9c8799e9b2b0d1c608d13fdfb24d2ce2 (diff)
downloadmongo-1b8d3d355363b3b2a5b8565df60f07a08393ced6.tar.gz
SERVER-40725 Pack multiple operations into a single applyOps on primary for large transactions
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/op_observer_impl.cpp115
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp347
-rw-r--r--src/mongo/db/transaction_participant.idl10
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp40
4 files changed, 444 insertions, 68 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 3ffb4988165..83ef9fe73e4 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -992,17 +992,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
namespace {
-OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
- std::vector<repl::ReplOperation> stmts,
- const OplogSlot& oplogSlot,
- repl::OpTime prevWriteOpTime,
- StmtId stmtId,
- const bool prepare,
- const bool isPartialTxn,
- const bool shouldWriteStateField) {
+// Logs one applyOps, packing in as many operations as fit in a single applyOps entry. If
+// isPartialTxn is not set, all operations are attempted to be packed, regardless of whether or
+// not they fit; TransactionTooLarge will be thrown if this is not the case.
+//
+// Returns an iterator to the first ReplOperation not packed in the batch.
+std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApplyOpsForTransaction(
+ OperationContext* opCtx,
+ std::vector<repl::ReplOperation>::const_iterator stmtBegin,
+ std::vector<repl::ReplOperation>::const_iterator stmtEnd,
+ const OplogSlot& oplogSlot,
+ repl::OpTime prevWriteOpTime,
+ StmtId stmtId,
+ const bool prepare,
+ const bool isPartialTxn,
+ const bool shouldWriteStateField) {
BSONObjBuilder applyOpsBuilder;
BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
- for (auto& stmt : stmts) {
+ std::vector<repl::ReplOperation>::const_iterator stmtIter;
+ for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
+ const auto& stmt = *stmtIter;
+ // Stop packing when either number of transaction operations is reached, or when the next
+ // one would put the array over the maximum BSON Object User Size. We rely on the
+ // head room between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the
+ // BSON overhead and the other applyOps fields. But if the array with a single operation
+ // exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation
+ // should be able to be applied.
+ if (isPartialTxn &&
+ (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
+ (opsArray.arrSize() > 0 &&
+ (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) >
+ BSONObjMaxUserSize))))
+ break;
opsArray.append(stmt.toBSON());
}
opsArray.done();
@@ -1059,7 +1080,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
txnState,
startOpTime);
}
- return times;
+ return {times, stmtIter};
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
uassert(ErrorCodes::TransactionTooLarge,
@@ -1099,12 +1120,13 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx,
return times;
}
-// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts'. If
-// there are more oplog slots than statements, then only the first n slots are used, where n is the
-// size of 'stmts'.
-void logOplogEntriesForTransaction(OperationContext* opCtx,
- const std::vector<repl::ReplOperation>& stmts,
- const std::vector<OplogSlot>& oplogSlots) {
+// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts' in
+// the worst case, where each operation requires an applyOps entry of its own. If there are more
+// oplog slots than applyOps operations are written, the number of oplog slots corresponding to the
+// number of applyOps written will be used. The number of oplog entries written is returned.
+int logOplogEntriesForTransaction(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& stmts,
+ const std::vector<OplogSlot>& oplogSlots) {
invariant(!stmts.empty());
invariant(stmts.size() <= oplogSlots.size());
@@ -1122,23 +1144,23 @@ void logOplogEntriesForTransaction(OperationContext* opCtx,
// Note the logged statement IDs are not the same as the user-chosen statement IDs.
stmtId = 0;
auto oplogSlot = oplogSlots.begin();
- for (const auto& stmt : stmts) {
- // Wrap each individual operation in an applyOps entry. This is temporary until we
- // implement the logic to pack multiple operations into a single applyOps entry for
- // the larger transactions project.
- // TODO (SERVER-40725): Modify this comment once packing logic is done.
- prevWriteOpTime = logApplyOpsForTransaction(opCtx,
- {stmt},
- *oplogSlot++,
- prevWriteOpTime.writeOpTime,
- stmtId,
- false /* prepare */,
- true /* isPartialTxn */,
- true /* shouldWriteStateField */);
+ auto stmtsBegin = stmts.begin();
+ while (stmtsBegin != stmts.end()) {
+ std::tie(prevWriteOpTime, stmtsBegin) =
+ logApplyOpsForTransaction(opCtx,
+ stmtsBegin,
+ stmts.end(),
+ *oplogSlot++,
+ prevWriteOpTime.writeOpTime,
+ stmtId,
+ false /* prepare */,
+ true /* isPartialTxn */,
+ true /* shouldWriteStateField */);
stmtId++;
}
wuow.commit();
});
+ return stmtId;
}
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
@@ -1280,14 +1302,15 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
invariant(lastWriteOpTime.isNull());
commitOpTime = logApplyOpsForTransaction(
opCtx,
- statements,
+ statements.begin(),
+ statements.end(),
OplogSlot(),
lastWriteOpTime,
StmtId(0),
false /* prepare */,
false /* isPartialTxn */,
fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42)
- .writeOpTime;
+ .first.writeOpTime;
} else {
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
// reserve enough entries for all statements in the transaction, plus one for the commit
@@ -1296,12 +1319,10 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
auto commitSlot = oplogSlots.back();
oplogSlots.pop_back();
invariant(oplogSlots.size() == statements.size());
- logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
- commitOpTime = logCommitForUnpreparedTransaction(opCtx,
- statements.size() /* stmtId */,
- oplogSlots.back(),
- commitSlot,
- statements.size());
+ int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
+ const auto prevWriteOpTime = oplogSlots[numOplogEntries - 1];
+ commitOpTime = logCommitForUnpreparedTransaction(
+ opCtx, StmtId(numOplogEntries), prevWriteOpTime, commitSlot, statements.size());
}
invariant(!commitOpTime.isNull());
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime);
@@ -1410,7 +1431,8 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
invariant(lastWriteOpTime.isNull());
logApplyOpsForTransaction(
opCtx,
- statements,
+ statements.begin(),
+ statements.end(),
prepareOpTime,
lastWriteOpTime,
StmtId(0),
@@ -1436,16 +1458,23 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
// It is possible that the transaction resulted in no changes, In that case, we
// should not write any operations other than the prepare oplog entry.
+ int nOperationOplogEntries = 0;
if (!statements.empty()) {
- logOplogEntriesForTransaction(opCtx, statements, reservedSlots);
-
- // The prevOpTime is the OpTime of the second last entry in the reserved slots.
- prevOpTime = reservedSlots.rbegin()[1];
+ nOperationOplogEntries =
+ logOplogEntriesForTransaction(opCtx, statements, reservedSlots);
+
+ // We had reserved enough oplog slots for the worst case where each operation
+ // produced one oplog entry. When operations are smaller and can be packed,
+ // we will waste the extra slots. The prepare will still use the last
+ // reserved slot, because the transaction participant has already used
+ // that as the prepare time. So we set the prevOpTime to the last applyOps,
+ // to make the prevOpTime links work correctly.
+ prevOpTime = reservedSlots[nOperationOplogEntries - 1];
}
auto startTxnSlot = reservedSlots.front();
const auto startOpTime = startTxnSlot;
logPrepareTransaction(
- opCtx, statements.size() /* stmtId */, prevOpTime, prepareOpTime, startOpTime);
+ opCtx, StmtId(nOperationOplogEntries), prevOpTime, prepareOpTime, startOpTime);
wuow.commit();
});
}
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 2c4a7bab692..76a2e3d0ca6 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1326,13 +1326,19 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
class OpObserverMultiEntryTransactionTest : public OpObserverTransactionTest {
void setUp() override {
gUseMultipleOplogEntryFormatForTransactions = true;
+ _prevPackingLimit = gMaxNumberOfTransactionOperationsInSingleOplogEntry;
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = 1;
OpObserverTransactionTest::setUp();
}
void tearDown() override {
OpObserverTransactionTest::tearDown();
gUseMultipleOplogEntryFormatForTransactions = false;
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = _prevPackingLimit;
}
+
+private:
+ int _prevPackingLimit;
};
TEST_F(OpObserverMultiEntryTransactionTest,
@@ -1911,11 +1917,14 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
inserts1.emplace_back(0,
BSON("_id" << 0 << "data"
<< "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
repl::OpTime prepareOpTime;
- auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
@@ -1923,17 +1932,21 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
opObserver().onTransactionPrepare(
opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
- const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
- ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
- ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ const auto insertEntry1 = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry1.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry1.getCommandType() == OplogEntry::CommandType::kApplyOps);
- const auto startOpTime = insertEntry.getOpTime();
+ const auto insertEntry2 = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_TRUE(insertEntry2.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry2.getCommandType() == OplogEntry::CommandType::kApplyOps);
+
+ const auto startOpTime = insertEntry1.getOpTime();
const auto prepareTimestamp = prepareOpTime.getTimestamp();
- const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[2]));
ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
@@ -1960,16 +1973,18 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
commitTimestamp,
txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ oplogEntryObjs = getNOplogEntries(opCtx(), 4);
const auto commitOplogObj = oplogEntryObjs.back();
- // Statement id's for the insert and prepare should be 0 and 1 respectively.
- const auto expectedCommitStmtId = 2;
+ // Statement id's for the inserts and prepare should be [0,1] and 2 respectively.
+ const auto expectedCommitStmtId = 3;
checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId);
auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
auto o = commitEntry.getObject();
auto oExpected = BSON(
"commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true);
ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime());
assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted);
// startTimestamp should no longer be set once the transaction has been committed.
@@ -2037,11 +2052,323 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
auto o = abortEntry.getObject();
auto oExpected = BSON("abortTransaction" << 1);
ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(abortEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*abortEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime());
assertTxnRecord(txnNum(), abortSlot, DurableTxnStateEnum::kAborted);
// startOpTime should no longer be set once a transaction has been aborted.
assertTxnRecordStartOpTime(boost::none);
}
+TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+
+ const NamespaceString nss1("testDB", "testColl");
+ const NamespaceString nss2("testDB2", "testColl2");
+ auto uuid1 = CollectionUUID::gen();
+ auto uuid2 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ StmtId expectedStmtId = 0;
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.getPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
+ }
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 3)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+ oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+
+ const NamespaceString nss1("testDB", "testColl");
+ const NamespaceString nss2("testDB2", "testColl2");
+ auto uuid1 = CollectionUUID::gen();
+ auto uuid2 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ StmtId expectedStmtId = 0;
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.getPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
+ }
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 3)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+ oExpected = BSON("prepareTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+ const NamespaceString nss1("testDB", "testColl");
+ auto uuid1 = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ repl::OpTime prepareOpTime;
+ auto reservedSlots = repl::getNextOpTimes(opCtx(), 3);
+ prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(
+ opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+
+ const auto startOpTime = insertEntry.getOpTime();
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kPrepareTransaction);
+
+ // Reserve oplog entry for the commit oplog entry.
+ OplogSlot commitSlot = repl::getNextOpTime(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ // commitTimestamp must be greater than the prepareTimestamp.
+ auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ txnParticipant.transitionToCommittingWithPrepareforTest(opCtx());
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ const auto commitOplogObj = oplogEntryObjs.back();
+ // Statement id is based on number of operations + prepare, not actual number of applyOps.
+ const auto expectedCommitStmtId = 3;
+ checkSessionAndTransactionFields(commitOplogObj, expectedCommitStmtId);
+ auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
+ auto o = commitEntry.getObject();
+ auto oExpected = BSON(
+ "commitTransaction" << 1 << "commitTimestamp" << commitTimestamp << "prepared" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime());
+
+ assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted);
+ // startTimestamp should no longer be set once the transaction has been committed.
+ assertTxnRecordStartOpTime(boost::none);
+}
+
+class OpObserverLargeMultiEntryTransactionTest : public OpObserverLargeTransactionTest {
+ void setUp() override {
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ OpObserverTransactionTest::setUp();
+ }
+
+ void tearDown() override {
+ OpObserverTransactionTest::tearDown();
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ }
+};
+
+// Tests that a large transaction may be committed. This test creates a transaction with two
+// operations that together are just big enough to exceed the size limit, which should result in a
+// two oplog entry transaction.
+TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultipleOplogEntries) {
+ const NamespaceString nss("testDB", "testColl");
+ auto uuid = CollectionUUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ // This size is crafted such that two operations of this size are not too big to fit in a single
+ // oplog entry, but two operations plus oplog overhead are too big to fit in a single oplog
+ // entry.
+ constexpr size_t kHalfTransactionSize = BSONObjMaxInternalSize / 2 - 175;
+ std::unique_ptr<uint8_t[]> halfTransactionData(new uint8_t[kHalfTransactionSize]());
+ auto operation1 = repl::OplogEntry::makeInsertOperation(
+ nss,
+ uuid,
+ BSON(
+ "_id" << 0 << "data"
+ << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)));
+ auto operation2 = repl::OplogEntry::makeInsertOperation(
+ nss,
+ uuid,
+ BSON(
+ "_id" << 0 << "data"
+ << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)));
+ txnParticipant.addTransactionOperation(opCtx(), operation1);
+ txnParticipant.addTransactionOperation(opCtx(), operation2);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ StmtId expectedStmtId = 0;
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.getPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
+ }
+
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(operation1.toBSON()) << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant.idl b/src/mongo/db/transaction_participant.idl
index 61a5f990e48..6add17293df 100644
--- a/src/mongo/db/transaction_participant.idl
+++ b/src/mongo/db/transaction_participant.idl
@@ -64,3 +64,13 @@ server_parameters:
cpp_vartype: bool
cpp_varname: gUseMultipleOplogEntryFormatForTransactions
default: false
+
+ maxNumberOfTransactionOperationsInSingleOplogEntry:
+ description: >-
+ Maximum number of operations to pack into a single oplog entry, when multi-oplog
+ format for transactions is in use.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gMaxNumberOfTransactionOperationsInSingleOplogEntry
+ default: 2147483647 # INT_MAX
+ validator: { gte: 1 }
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 5803f091224..e83430fb809 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2823,20 +2823,31 @@ public:
}
};
+class MultiOplogScopedSettings {
+public:
+ MultiOplogScopedSettings()
+ : _prevPackingLimit(gMaxNumberOfTransactionOperationsInSingleOplogEntry) {
+ gUseMultipleOplogEntryFormatForTransactions = true;
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = 1;
+ }
+ ~MultiOplogScopedSettings() {
+ gUseMultipleOplogEntryFormatForTransactions = false;
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = _prevPackingLimit;
+ }
+
+private:
+ int _prevPackingLimit;
+};
+
class MultiOplogEntryTransaction : public MultiDocumentTransactionTest {
public:
MultiOplogEntryTransaction() : MultiDocumentTransactionTest("multiOplogEntryTransaction") {
- gUseMultipleOplogEntryFormatForTransactions = true;
const auto currentTime = _clock->getClusterTime();
firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
secondOplogEntryTs = currentTime.addTicks(2).asTimestamp();
commitEntryTs = currentTime.addTicks(3).asTimestamp();
}
- ~MultiOplogEntryTransaction() {
- gUseMultipleOplogEntryFormatForTransactions = false;
- }
-
void run() {
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
@@ -2953,13 +2964,15 @@ public:
protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs;
+
+private:
+ MultiOplogScopedSettings multiOplogSettings;
};
class CommitPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
public:
CommitPreparedMultiOplogEntryTransaction()
: MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
- gUseMultipleOplogEntryFormatForTransactions = true;
const auto currentTime = _clock->getClusterTime();
firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
secondOplogEntryTs = currentTime.addTicks(2).asTimestamp();
@@ -2967,10 +2980,6 @@ public:
commitEntryTs = currentTime.addTicks(4).asTimestamp();
}
- ~CommitPreparedMultiOplogEntryTransaction() {
- gUseMultipleOplogEntryFormatForTransactions = false;
- }
-
void run() {
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
@@ -3170,23 +3179,21 @@ public:
protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs;
+
+private:
+ MultiOplogScopedSettings multiOplogSettings;
};
class AbortPreparedMultiOplogEntryTransaction : public MultiDocumentTransactionTest {
public:
AbortPreparedMultiOplogEntryTransaction()
: MultiDocumentTransactionTest("preparedMultiOplogEntryTransaction") {
- gUseMultipleOplogEntryFormatForTransactions = true;
const auto currentTime = _clock->getClusterTime();
firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
prepareEntryTs = currentTime.addTicks(2).asTimestamp();
abortEntryTs = currentTime.addTicks(3).asTimestamp();
}
- ~AbortPreparedMultiOplogEntryTransaction() {
- gUseMultipleOplogEntryFormatForTransactions = false;
- }
-
void run() {
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
@@ -3284,6 +3291,9 @@ public:
protected:
Timestamp firstOplogEntryTs, secondOplogEntryTs, prepareEntryTs, abortEntryTs;
+
+private:
+ MultiOplogScopedSettings multiOplogSettings;
};
class PreparedMultiDocumentTransaction : public MultiDocumentTransactionTest {