summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2019-05-08 17:02:05 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2019-05-08 17:02:05 -0400
commit5a6422ce9f4ad85efd1d6b26949ee43e6c1bcda9 (patch)
treed2f0e737aa711635759869991e2a0bfb8e85aa87
parente0726e830cca9f4971722616eeb24b56321fe4b8 (diff)
downloadmongo-5a6422ce9f4ad85efd1d6b26949ee43e6c1bcda9.tar.gz
SERVER-40676 Write implicit commit oplog entry on primary for new large transaction format
-rw-r--r--src/mongo/db/op_observer_impl.cpp196
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp80
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp58
3 files changed, 218 insertions, 116 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 618fd020820..9289b22ed28 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -992,24 +992,22 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
namespace {
-// Logs one applyOps, packing in as many operations as fit in a single applyOps entry. If
-// isPartialTxn is not set, all operations are attempted to be packed, regardless of whether or
-// not they fit; TransactionTooLarge will be thrown if this is not the case.
+// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
+// field. Appends as many operations as possible until either the constructed object exceeds the
+// 16MB limit or the maximum number of transaction statements allowed in one entry.
//
-// Returns an iterator to the first ReplOperation not packed in the batch.
-std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApplyOpsForTransaction(
- OperationContext* opCtx,
+// If 'limitSize' is false, then it attempts to include all given operations, regardless of whether
+// or not they fit. If the ops don't fit, BSONObjectTooLarge will be thrown in that case.
+//
+// Returns an iterator to the first statement that wasn't packed into the applyOps object.
+std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps(
+ BSONObjBuilder* applyOpsBuilder,
std::vector<repl::ReplOperation>::const_iterator stmtBegin,
std::vector<repl::ReplOperation>::const_iterator stmtEnd,
- const OplogSlot& oplogSlot,
- repl::OpTime prevWriteOpTime,
- StmtId stmtId,
- const bool prepare,
- const bool isPartialTxn,
- const bool shouldWriteStateField) {
- BSONObjBuilder applyOpsBuilder;
- BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
+ bool limitSize) {
+
std::vector<repl::ReplOperation>::const_iterator stmtIter;
+ BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
const auto& stmt = *stmtIter;
// Stop packing when either number of transaction operations is reached, or when the next
@@ -1018,7 +1016,7 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
// BSON overhead and the other applyOps fields. But if the array with a single operation
// exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation
// should be able to be applied.
- if (isPartialTxn &&
+ if (limitSize &&
(opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
(opsArray.arrSize() > 0 &&
(opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) >
@@ -1027,6 +1025,35 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
opsArray.append(stmt.toBSON());
}
opsArray.done();
+ return stmtIter;
+}
+
+// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON
+// builder object already has an 'applyOps' field appended pointing to the desired array of ops
+// i.e. { "applyOps" : [op1, op2, ...] }
+//
+// @param prepare determines whether a 'prepare' field will be attached to the written oplog entry.
+// @param isPartialTxn is this applyOps entry part of an in-progress multi oplog entry transaction.
+// Should be set for all non-terminal ops of an unprepared multi oplog entry transaction.
+// @param shouldWriteStateField determines whether a 'state' field will be included in the write to
+// the transactions table. Only meaningful if 'updateTxnTable' is true.
+// @param updateTxnTable determines whether the transactions table will updated after the oplog
+// entry is written.
+//
+// Returns the optime of the written oplog entry.
+OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
+ BSONObjBuilder* applyOpsBuilder,
+ const OplogSlot& oplogSlot,
+ repl::OpTime prevWriteOpTime,
+ StmtId stmtId,
+ const bool prepare,
+ const bool isPartialTxn,
+ const bool shouldWriteStateField,
+ const bool updateTxnTable,
+ boost::optional<long long> count) {
+
+ // A 'prepare' oplog entry should never include a 'partialTxn' field.
+ invariant(!(isPartialTxn && prepare));
const NamespaceString cmdNss{"admin", "$cmd"};
@@ -1040,12 +1067,15 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
try {
if (prepare) {
// TODO: SERVER-36814 Remove "prepare" field on applyOps.
- applyOpsBuilder.append("prepare", true);
+ applyOpsBuilder->append("prepare", true);
}
if (isPartialTxn) {
- applyOpsBuilder.append("partialTxn", true);
+ applyOpsBuilder->append("partialTxn", true);
+ }
+ if (count) {
+ applyOpsBuilder->append("count", *count);
}
- auto applyOpCmd = applyOpsBuilder.done();
+ auto applyOpCmd = applyOpsBuilder->done();
auto times = replLogApplyOps(
opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, oplogSlot);
@@ -1053,7 +1083,7 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
prevWriteOpTime,
isPartialTxn,
shouldWriteStateField]() -> boost::optional<DurableTxnStateEnum> {
- if (!shouldWriteStateField || !prevWriteOpTime.isNull()) {
+ if (!shouldWriteStateField) {
// TODO (SERVER-40678): Either remove the invariant on prepare or separate the
// checks of FCV and 'prevWriteOpTime' once we support implicit prepare on
// primaries.
@@ -1068,7 +1098,7 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
}();
- if (prevWriteOpTime.isNull()) {
+ if (updateTxnTable) {
// Only update the transaction table on the first 'partialTxn' entry when using the
// multiple transaction oplog entries format.
auto startOpTime = isPartialTxn ? boost::make_optional(oplogSlot) : boost::none;
@@ -1080,7 +1110,7 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
txnState,
startOpTime);
}
- return {times, stmtIter};
+ return times;
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
uassert(ErrorCodes::TransactionTooLarge,
@@ -1091,6 +1121,36 @@ std::pair<OpTimeBundle, std::vector<repl::ReplOperation>::const_iterator> logApp
MONGO_UNREACHABLE;
}
+// Log a single applyOps for transactions without any attempt to pack operations. If the given
+// statements would exceed the maximum BSON size limit of a single oplog entry, this will throw a
+// TransactionTooLarge error.
+// TODO (SERVER-39809): Consider removing this function once old transaction format is no longer
+// needed.
+OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& statements,
+ const OplogSlot& oplogSlot,
+ repl::OpTime prevWriteOpTime,
+ StmtId stmtId,
+ const bool prepare,
+ const bool isPartialTxn,
+ const bool shouldWriteStateField,
+ const bool updateTxnTable,
+ boost::optional<long long> count) {
+ BSONObjBuilder applyOpsBuilder;
+ packTransactionStatementsForApplyOps(
+ &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
+ return logApplyOpsForTransaction(opCtx,
+ &applyOpsBuilder,
+ oplogSlot,
+ prevWriteOpTime,
+ stmtId,
+ prepare,
+ isPartialTxn,
+ shouldWriteStateField,
+ updateTxnTable,
+ count);
+}
+
OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx,
const OperationSessionInfo& sessionInfo,
repl::OpTime prevOpTime,
@@ -1126,7 +1186,8 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx,
// number of applyOps written will be used. The number of oplog entries written is returned.
int logOplogEntriesForTransaction(OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
- const std::vector<OplogSlot>& oplogSlots) {
+ const std::vector<OplogSlot>& oplogSlots,
+ bool prepare) {
invariant(!stmts.empty());
invariant(stmts.size() <= oplogSlots.size());
@@ -1144,18 +1205,49 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
// Note the logged statement IDs are not the same as the user-chosen statement IDs.
stmtId = 0;
auto oplogSlot = oplogSlots.begin();
- auto stmtsBegin = stmts.begin();
- while (stmtsBegin != stmts.end()) {
- std::tie(prevWriteOpTime, stmtsBegin) =
- logApplyOpsForTransaction(opCtx,
- stmtsBegin,
- stmts.end(),
- *oplogSlot++,
- prevWriteOpTime.writeOpTime,
- stmtId,
- false /* prepare */,
- true /* isPartialTxn */,
- true /* shouldWriteStateField */);
+
+ // At the beginning of each loop iteration below, 'stmtsIter' will always point to the
+ // first statement of the sequence of remaining, unpacked transaction statements. If all
+ // statements have been packed, it should point to stmts.end(), which is the loop's
+ // termination condition.
+ auto stmtsIter = stmts.begin();
+ while (stmtsIter != stmts.end()) {
+
+ BSONObjBuilder applyOpsBuilder;
+ auto nextStmt = packTransactionStatementsForApplyOps(
+ &applyOpsBuilder, stmtsIter, stmts.end(), true /* limitSize */);
+
+ // If we packed the last op, then the next oplog entry we log should be the implicit
+ // commit, i.e. we omit the 'partialTxn' field.
+ //
+ // TODO (SERVER-40678): Until we support implicit prepare for transactions, always
+ // log 'partialTxn' for prepared transactions. For unprepared transactions, update
+ // the transactions table on the first and last op. For prepared transactions, only
+ // update it on the first op.
+ auto firstOp = stmtsIter == stmts.begin();
+ auto lastOp = nextStmt == stmts.end();
+ auto isPartialTxn = prepare || !lastOp;
+ auto updateTxnTable = firstOp || (lastOp && !prepare);
+
+ // The 'count' field gives the total number of individual operations in the
+ // transaction. We do not include this if only logging a single applyOps entry for
+ // an unprepared transaction. It is never included here for prepared transactions.
+ auto count = (lastOp && !firstOp && !prepare)
+ ? boost::optional<long long>(stmts.size())
+ : boost::none;
+ prevWriteOpTime = logApplyOpsForTransaction(opCtx,
+ &applyOpsBuilder,
+ *oplogSlot++,
+ prevWriteOpTime.writeOpTime,
+ stmtId,
+ false /* prepare */,
+ isPartialTxn,
+ true /* shouldWriteStateField */,
+ updateTxnTable,
+ count);
+
+ // Advance the iterator to the beginning of the remaining unpacked statements.
+ stmtsIter = nextStmt;
stmtId++;
}
wuow.commit();
@@ -1229,6 +1321,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
}
// This is used only for multi-oplog-entry unprepared transactions
+// TODO (SERVER-40728) Remove this unused function.
repl::OpTime logCommitForUnpreparedTransaction(OperationContext* opCtx,
StmtId stmtId,
const repl::OpTime& prevOpTime,
@@ -1302,27 +1395,25 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
invariant(lastWriteOpTime.isNull());
commitOpTime = logApplyOpsForTransaction(
opCtx,
- statements.begin(),
- statements.end(),
+ statements,
OplogSlot(),
lastWriteOpTime,
StmtId(0),
false /* prepare */,
false /* isPartialTxn */,
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42)
- .first.writeOpTime;
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
+ true /* updateTxnTable */,
+ boost::none)
+ .writeOpTime;
} else {
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
- // reserve enough entries for all statements in the transaction, plus one for the commit
- // oplog entry.
- auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1);
- auto commitSlot = oplogSlots.back();
- oplogSlots.pop_back();
+ // reserve enough entries for all statements in the transaction.
+ auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
invariant(oplogSlots.size() == statements.size());
- int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots);
- const auto prevWriteOpTime = oplogSlots[numOplogEntries - 1];
- commitOpTime = logCommitForUnpreparedTransaction(
- opCtx, StmtId(numOplogEntries), prevWriteOpTime, commitSlot, statements.size());
+
+ // Log in-progress entries for the transaction along with the implicit commit.
+ int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false);
+ commitOpTime = oplogSlots[numOplogEntries - 1];
}
invariant(!commitOpTime.isNull());
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime);
@@ -1431,14 +1522,15 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
invariant(lastWriteOpTime.isNull());
logApplyOpsForTransaction(
opCtx,
- statements.begin(),
- statements.end(),
+ statements,
prepareOpTime,
lastWriteOpTime,
StmtId(0),
true /* prepare */,
false /* isPartialTxn */,
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42);
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
+ true /* updateTxnTable */,
+ boost::none);
wuow.commit();
});
} else {
@@ -1460,8 +1552,8 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
// should not write any operations other than the prepare oplog entry.
int nOperationOplogEntries = 0;
if (!statements.empty()) {
- nOperationOplogEntries =
- logOplogEntriesForTransaction(opCtx, statements, reservedSlots);
+ nOperationOplogEntries = logOplogEntriesForTransaction(
+ opCtx, statements, reservedSlots, true /* prepare */);
// We had reserved enough oplog slots for the worst case where each operation
// produced one oplog entry. When operations are smaller and can be packed,
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 76a2e3d0ca6..5ef706c3f36 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1359,6 +1359,38 @@ TEST_F(OpObserverMultiEntryTransactionTest,
assertNoTxnRecord();
}
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
+ const NamespaceString nss("testDB", "testColl");
+ auto uuid = CollectionUUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts;
+ inserts.emplace_back(0, BSON("_id" << 0));
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, inserts.begin(), inserts.end(), false);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0];
+ checkSessionAndTransactionFields(oplogEntryObj, 0);
+ auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ ASSERT(!oplogEntry.getPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(repl::OpTime(), *oplogEntry.getPrevWriteOpTimeInTransaction());
+
+ // The implicit commit oplog entry.
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.toString()
+ << "ui"
+ << uuid
+ << "o"
+ << BSON("_id" << 0))));
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntry.getObject());
+}
+
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
const NamespaceString nss1("testDB", "testColl");
const NamespaceString nss2("testDB2", "testColl2");
@@ -1379,7 +1411,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
opObserver().onUnpreparedTransactionCommit(
opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 5);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -1430,6 +1462,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
<< true);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
<< "i"
<< "ns"
@@ -1438,12 +1472,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
<< uuid2
<< "o"
<< BSON("_id" << 3)))
- << "partialTxn"
- << true);
+ << "count"
+ << 4);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject());
-
- oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4);
- ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
@@ -1479,7 +1510,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
opObserver().onUpdate(opCtx(), update2);
opObserver().onUnpreparedTransactionCommit(
opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -1510,6 +1541,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
<< true);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
<< "u"
<< "ns"
@@ -1521,12 +1554,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
<< "y"))
<< "o2"
<< BSON("_id" << 1)))
- << "partialTxn"
- << true);
+ << "count"
+ << 2);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
-
- oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
- ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
@@ -1553,7 +1583,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none);
opObserver().onUnpreparedTransactionCommit(
opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -1581,6 +1611,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
<< true);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
oExpected = oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
<< "d"
<< "ns"
@@ -1589,12 +1621,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
<< uuid2
<< "o"
<< BSON("_id" << 1)))
- << "partialTxn"
- << true);
+ << "count"
+ << 2);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
-
- oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
- ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest,
@@ -2082,7 +2111,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) {
opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
opObserver().onUnpreparedTransactionCommit(
opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -2128,12 +2157,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) {
<< "ui"
<< uuid2
<< "o"
- << BSON("_id" << 3)))
- << "partialTxn"
- << true);
+ << BSON("_id" << 3))));
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
- oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4);
- ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) {
@@ -2344,7 +2369,7 @@ TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultiple
txnParticipant.addTransactionOperation(opCtx(), operation2);
opObserver().onUnpreparedTransactionCommit(
opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
- auto oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
StmtId expectedStmtId = 0;
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -2363,11 +2388,8 @@ TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultiple
auto oExpected = BSON("applyOps" << BSON_ARRAY(operation1.toBSON()) << "partialTxn" << true);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
- oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "partialTxn" << true);
+ oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "count" << 2);
ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
-
- oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
- ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
}
} // namespace
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 6a7823ef75f..e2213961d5f 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2844,8 +2844,7 @@ public:
MultiOplogEntryTransaction() : MultiDocumentTransactionTest("multiOplogEntryTransaction") {
const auto currentTime = _clock->getClusterTime();
firstOplogEntryTs = currentTime.addTicks(1).asTimestamp();
- secondOplogEntryTs = currentTime.addTicks(2).asTimestamp();
- commitEntryTs = currentTime.addTicks(3).asTimestamp();
+ commitEntryTs = currentTime.addTicks(2).asTimestamp();
}
void run() {
@@ -2874,19 +2873,27 @@ public:
assertDocumentAtTimestamp(coll, presentTs, BSONObj());
assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj());
assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj());
- assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj());
assertFilteredDocumentAtTimestamp(coll, query1, commitEntryTs, doc);
assertFilteredDocumentAtTimestamp(coll, query2, commitEntryTs, doc2);
assertFilteredDocumentAtTimestamp(coll, query1, nullTs, doc);
assertFilteredDocumentAtTimestamp(coll, query2, nullTs, doc2);
- // Commit oplog entry should exist at commitEntryTs.
- const auto commitFilter = BSON("ts" << commitEntryTs << "op"
- << "c");
+ // Implicit commit oplog entry should exist at commitEntryTs.
+ const auto commitFilter =
+ BSON("ts" << commitEntryTs << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << coll->uuid().get()
+ << "o"
+ << doc2))
+ << "count"
+ << 2));
assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, firstOplogEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(commitFilter, secondOplogEntryTs, false);
assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, true);
assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true);
@@ -2895,7 +2902,6 @@ public:
assertOldestActiveTxnTimestampEquals(boost::none, presentTs);
assertOldestActiveTxnTimestampEquals(boost::none, beforeTxnTs);
assertOldestActiveTxnTimestampEquals(firstOplogEntryTs, firstOplogEntryTs);
- assertOldestActiveTxnTimestampEquals(firstOplogEntryTs, secondOplogEntryTs);
assertOldestActiveTxnTimestampEquals(boost::none, commitEntryTs);
assertOldestActiveTxnTimestampEquals(boost::none, nullTs);
@@ -2915,30 +2921,9 @@ public:
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);
- assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, secondOplogEntryTs, true);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true);
- // second oplog entry should exist at secondOplogEntryTs and after it.
- const auto secondOplogEntryFilter =
- BSON("ts" << secondOplogEntryTs << "o"
- << BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns"
- << nss.ns()
- << "ui"
- << coll->uuid().get()
- << "o"
- << doc2))
- << "partialTxn"
- << true));
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false);
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false);
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false);
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, secondOplogEntryTs, true);
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, commitEntryTs, true);
- assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, nullTs, true);
-
// Session state should go to inProgress at firstOplogEntryTs, then to committed
// at commitEntryTs
getSessionTxnInfoAtTimestamp(presentTs, false);
@@ -2946,24 +2931,22 @@ public:
auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
-
- sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true);
- ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
- // The transaction table is only updated at the start of the transaction.
- ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
+ ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(commitEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "committed");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs);
+ ASSERT_FALSE(sessionInfo.hasField("startOpTime"));
sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "committed");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs);
+ ASSERT_FALSE(sessionInfo.hasField("startOpTime"));
}
}
protected:
- Timestamp firstOplogEntryTs, secondOplogEntryTs;
+ Timestamp firstOplogEntryTs;
private:
MultiOplogScopedSettings multiOplogSettings;
@@ -3161,19 +3144,23 @@ public:
auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
+ ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "inProgress");
// The transaction table is only updated at the start of the transaction.
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs);
+ ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "prepared");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), prepareEntryTs);
+ ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs);
sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "committed");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs);
+ ASSERT_FALSE(sessionInfo.hasField("startOpTime"));
}
}
@@ -3286,6 +3273,7 @@ public:
auto sessionInfo = getSessionTxnInfoAtTimestamp(abortEntryTs, true);
ASSERT_EQ(sessionInfo["state"].String(), "aborted");
ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), abortEntryTs);
+ ASSERT_FALSE(sessionInfo.hasField("startOpTime"));
}
}