From 5a6422ce9f4ad85efd1d6b26949ee43e6c1bcda9 Mon Sep 17 00:00:00 2001 From: William Schultz Date: Wed, 8 May 2019 17:02:05 -0400 Subject: SERVER-40676 Write implicit commit oplog entry on primary for new large transaction format --- src/mongo/db/op_observer_impl.cpp | 196 +++++++++++++++++++------- src/mongo/db/op_observer_impl_test.cpp | 80 +++++++---- src/mongo/dbtests/storage_timestamp_tests.cpp | 58 +++----- 3 files changed, 218 insertions(+), 116 deletions(-) diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 618fd020820..9289b22ed28 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -992,24 +992,22 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, namespace { -// Logs one applyOps, packing in as many operations as fit in a single applyOps entry. If -// isPartialTxn is not set, all operations are attempted to be packed, regardless of whether or -// not they fit; TransactionTooLarge will be thrown if this is not the case. +// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array +// field. Appends as many operations as possible until either the constructed object exceeds the +// 16MB limit or the maximum number of transaction statements allowed in one entry. // -// Returns an iterator to the first ReplOperation not packed in the batch. -std::pair::const_iterator> logApplyOpsForTransaction( - OperationContext* opCtx, +// If 'limitSize' is false, then it attempts to include all given operations, regardless of whether +// or not they fit. If the ops don't fit, BSONObjectTooLarge will be thrown in that case. +// +// Returns an iterator to the first statement that wasn't packed into the applyOps object. +std::vector::const_iterator packTransactionStatementsForApplyOps( + BSONObjBuilder* applyOpsBuilder, std::vector::const_iterator stmtBegin, std::vector::const_iterator stmtEnd, - const OplogSlot& oplogSlot, - repl::OpTime prevWriteOpTime, - StmtId stmtId, - const bool prepare, - const bool isPartialTxn, - const bool shouldWriteStateField) { - BSONObjBuilder applyOpsBuilder; - BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); + bool limitSize) { + std::vector::const_iterator stmtIter; + BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { const auto& stmt = *stmtIter; // Stop packing when either number of transaction operations is reached, or when the next @@ -1018,7 +1016,7 @@ std::pair::const_iterator> logApp // BSON overhead and the other applyOps fields. But if the array with a single operation // exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation // should be able to be applied. - if (isPartialTxn && + if (limitSize && (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || (opsArray.arrSize() > 0 && (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > @@ -1027,6 +1025,35 @@ std::pair::const_iterator> logApp opsArray.append(stmt.toBSON()); } opsArray.done(); + return stmtIter; +} + +// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON +// builder object already has an 'applyOps' field appended pointing to the desired array of ops +// i.e. { "applyOps" : [op1, op2, ...] } +// +// @param prepare determines whether a 'prepare' field will be attached to the written oplog entry. +// @param isPartialTxn is this applyOps entry part of an in-progress multi oplog entry transaction. +// Should be set for all non-terminal ops of an unprepared multi oplog entry transaction. +// @param shouldWriteStateField determines whether a 'state' field will be included in the write to +// the transactions table. Only meaningful if 'updateTxnTable' is true. +// @param updateTxnTable determines whether the transactions table will updated after the oplog +// entry is written. +// +// Returns the optime of the written oplog entry. +OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, + BSONObjBuilder* applyOpsBuilder, + const OplogSlot& oplogSlot, + repl::OpTime prevWriteOpTime, + StmtId stmtId, + const bool prepare, + const bool isPartialTxn, + const bool shouldWriteStateField, + const bool updateTxnTable, + boost::optional count) { + + // A 'prepare' oplog entry should never include a 'partialTxn' field. + invariant(!(isPartialTxn && prepare)); const NamespaceString cmdNss{"admin", "$cmd"}; @@ -1040,12 +1067,15 @@ std::pair::const_iterator> logApp try { if (prepare) { // TODO: SERVER-36814 Remove "prepare" field on applyOps. - applyOpsBuilder.append("prepare", true); + applyOpsBuilder->append("prepare", true); } if (isPartialTxn) { - applyOpsBuilder.append("partialTxn", true); + applyOpsBuilder->append("partialTxn", true); + } + if (count) { + applyOpsBuilder->append("count", *count); } - auto applyOpCmd = applyOpsBuilder.done(); + auto applyOpCmd = applyOpsBuilder->done(); auto times = replLogApplyOps( opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, oplogSlot); @@ -1053,7 +1083,7 @@ std::pair::const_iterator> logApp prevWriteOpTime, isPartialTxn, shouldWriteStateField]() -> boost::optional { - if (!shouldWriteStateField || !prevWriteOpTime.isNull()) { + if (!shouldWriteStateField) { // TODO (SERVER-40678): Either remove the invariant on prepare or separate the // checks of FCV and 'prevWriteOpTime' once we support implicit prepare on // primaries. @@ -1068,7 +1098,7 @@ std::pair::const_iterator> logApp return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; }(); - if (prevWriteOpTime.isNull()) { + if (updateTxnTable) { // Only update the transaction table on the first 'partialTxn' entry when using the // multiple transaction oplog entries format. auto startOpTime = isPartialTxn ? boost::make_optional(oplogSlot) : boost::none; @@ -1080,7 +1110,7 @@ std::pair::const_iterator> logApp txnState, startOpTime); } - return {times, stmtIter}; + return times; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. uassert(ErrorCodes::TransactionTooLarge, @@ -1091,6 +1121,36 @@ std::pair::const_iterator> logApp MONGO_UNREACHABLE; } +// Log a single applyOps for transactions without any attempt to pack operations. If the given +// statements would exceed the maximum BSON size limit of a single oplog entry, this will throw a +// TransactionTooLarge error. +// TODO (SERVER-39809): Consider removing this function once old transaction format is no longer +// needed. +OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, + const std::vector& statements, + const OplogSlot& oplogSlot, + repl::OpTime prevWriteOpTime, + StmtId stmtId, + const bool prepare, + const bool isPartialTxn, + const bool shouldWriteStateField, + const bool updateTxnTable, + boost::optional count) { + BSONObjBuilder applyOpsBuilder; + packTransactionStatementsForApplyOps( + &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); + return logApplyOpsForTransaction(opCtx, + &applyOpsBuilder, + oplogSlot, + prevWriteOpTime, + stmtId, + prepare, + isPartialTxn, + shouldWriteStateField, + updateTxnTable, + count); +} + OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx, const OperationSessionInfo& sessionInfo, repl::OpTime prevOpTime, @@ -1126,7 +1186,8 @@ OpTimeBundle logReplOperationForTransaction(OperationContext* opCtx, // number of applyOps written will be used. The number of oplog entries written is returned. int logOplogEntriesForTransaction(OperationContext* opCtx, const std::vector& stmts, - const std::vector& oplogSlots) { + const std::vector& oplogSlots, + bool prepare) { invariant(!stmts.empty()); invariant(stmts.size() <= oplogSlots.size()); @@ -1144,18 +1205,49 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // Note the logged statement IDs are not the same as the user-chosen statement IDs. stmtId = 0; auto oplogSlot = oplogSlots.begin(); - auto stmtsBegin = stmts.begin(); - while (stmtsBegin != stmts.end()) { - std::tie(prevWriteOpTime, stmtsBegin) = - logApplyOpsForTransaction(opCtx, - stmtsBegin, - stmts.end(), - *oplogSlot++, - prevWriteOpTime.writeOpTime, - stmtId, - false /* prepare */, - true /* isPartialTxn */, - true /* shouldWriteStateField */); + + // At the beginning of each loop iteration below, 'stmtsIter' will always point to the + // first statement of the sequence of remaining, unpacked transaction statements. If all + // statements have been packed, it should point to stmts.end(), which is the loop's + // termination condition. + auto stmtsIter = stmts.begin(); + while (stmtsIter != stmts.end()) { + + BSONObjBuilder applyOpsBuilder; + auto nextStmt = packTransactionStatementsForApplyOps( + &applyOpsBuilder, stmtsIter, stmts.end(), true /* limitSize */); + + // If we packed the last op, then the next oplog entry we log should be the implicit + // commit, i.e. we omit the 'partialTxn' field. + // + // TODO (SERVER-40678): Until we support implicit prepare for transactions, always + // log 'partialTxn' for prepared transactions. For unprepared transactions, update + // the transactions table on the first and last op. For prepared transactions, only + // update it on the first op. + auto firstOp = stmtsIter == stmts.begin(); + auto lastOp = nextStmt == stmts.end(); + auto isPartialTxn = prepare || !lastOp; + auto updateTxnTable = firstOp || (lastOp && !prepare); + + // The 'count' field gives the total number of individual operations in the + // transaction. We do not include this if only logging a single applyOps entry for + // an unprepared transaction. It is never included here for prepared transactions. + auto count = (lastOp && !firstOp && !prepare) + ? boost::optional(stmts.size()) + : boost::none; + prevWriteOpTime = logApplyOpsForTransaction(opCtx, + &applyOpsBuilder, + *oplogSlot++, + prevWriteOpTime.writeOpTime, + stmtId, + false /* prepare */, + isPartialTxn, + true /* shouldWriteStateField */, + updateTxnTable, + count); + + // Advance the iterator to the beginning of the remaining unpacked statements. + stmtsIter = nextStmt; stmtId++; } wuow.commit(); @@ -1229,6 +1321,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, } // This is used only for multi-oplog-entry unprepared transactions +// TODO (SERVER-40728) Remove this unused function. repl::OpTime logCommitForUnpreparedTransaction(OperationContext* opCtx, StmtId stmtId, const repl::OpTime& prevOpTime, @@ -1302,27 +1395,25 @@ void OpObserverImpl::onUnpreparedTransactionCommit( invariant(lastWriteOpTime.isNull()); commitOpTime = logApplyOpsForTransaction( opCtx, - statements.begin(), - statements.end(), + statements, OplogSlot(), lastWriteOpTime, StmtId(0), false /* prepare */, false /* isPartialTxn */, - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) - .first.writeOpTime; + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, + true /* updateTxnTable */, + boost::none) + .writeOpTime; } else { // Reserve all the optimes in advance, so we only need to get the optime mutex once. We - // reserve enough entries for all statements in the transaction, plus one for the commit - // oplog entry. - auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size() + 1); - auto commitSlot = oplogSlots.back(); - oplogSlots.pop_back(); + // reserve enough entries for all statements in the transaction. + auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); invariant(oplogSlots.size() == statements.size()); - int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots); - const auto prevWriteOpTime = oplogSlots[numOplogEntries - 1]; - commitOpTime = logCommitForUnpreparedTransaction( - opCtx, StmtId(numOplogEntries), prevWriteOpTime, commitSlot, statements.size()); + + // Log in-progress entries for the transaction along with the implicit commit. + int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); + commitOpTime = oplogSlots[numOplogEntries - 1]; } invariant(!commitOpTime.isNull()); shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime); @@ -1431,14 +1522,15 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, invariant(lastWriteOpTime.isNull()); logApplyOpsForTransaction( opCtx, - statements.begin(), - statements.end(), + statements, prepareOpTime, lastWriteOpTime, StmtId(0), true /* prepare */, false /* isPartialTxn */, - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42); + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, + true /* updateTxnTable */, + boost::none); wuow.commit(); }); } else { @@ -1460,8 +1552,8 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, // should not write any operations other than the prepare oplog entry. int nOperationOplogEntries = 0; if (!statements.empty()) { - nOperationOplogEntries = - logOplogEntriesForTransaction(opCtx, statements, reservedSlots); + nOperationOplogEntries = logOplogEntriesForTransaction( + opCtx, statements, reservedSlots, true /* prepare */); // We had reserved enough oplog slots for the worst case where each operation // produced one oplog entry. When operations are smaller and can be packed, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 76a2e3d0ca6..5ef706c3f36 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1359,6 +1359,38 @@ TEST_F(OpObserverMultiEntryTransactionTest, assertNoTxnRecord(); } +TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { + const NamespaceString nss("testDB", "testColl"); + auto uuid = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + std::vector inserts; + inserts.emplace_back(0, BSON("_id" << 0)); + + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss, MODE_IX); + opObserver().onInserts(opCtx(), nss, uuid, inserts.begin(), inserts.end(), false); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0]; + checkSessionAndTransactionFields(oplogEntryObj, 0); + auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); + ASSERT(!oplogEntry.getPrepare()); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(repl::OpTime(), *oplogEntry.getPrevWriteOpTimeInTransaction()); + + // The implicit commit oplog entry. + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.toString() + << "ui" + << uuid + << "o" + << BSON("_id" << 0)))); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntry.getObject()); +} + TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); @@ -1379,7 +1411,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); opObserver().onUnpreparedTransactionCommit( opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 5); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1430,6 +1462,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { << true); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); + // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn' + // field. oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "i" << "ns" @@ -1438,12 +1472,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { << uuid2 << "o" << BSON("_id" << 3))) - << "partialTxn" - << true); + << "count" + << 4); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject()); - - oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4); - ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { @@ -1479,7 +1510,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { opObserver().onUpdate(opCtx(), update2); opObserver().onUnpreparedTransactionCommit( opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1510,6 +1541,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { << true); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn' + // field. oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "u" << "ns" @@ -1521,12 +1554,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { << "y")) << "o2" << BSON("_id" << 1))) - << "partialTxn" - << true); + << "count" + << 2); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); - - oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); - ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { @@ -1553,7 +1583,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); opObserver().onUnpreparedTransactionCommit( opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1581,6 +1611,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { << true); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn' + // field. oExpected = oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" << "d" << "ns" @@ -1589,12 +1621,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { << uuid2 << "o" << BSON("_id" << 1))) - << "partialTxn" - << true); + << "count" + << 2); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); - - oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); - ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, @@ -2082,7 +2111,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) { opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); opObserver().onUnpreparedTransactionCommit( opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -2128,12 +2157,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) { << "ui" << uuid2 << "o" - << BSON("_id" << 3))) - << "partialTxn" - << true); + << BSON("_id" << 3)))); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); - oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4); - ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) { @@ -2344,7 +2369,7 @@ TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultiple txnParticipant.addTransactionOperation(opCtx(), operation2); opObserver().onUnpreparedTransactionCommit( opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); - auto oplogEntryObjs = getNOplogEntries(opCtx(), 3); + auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); StmtId expectedStmtId = 0; std::vector oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -2363,11 +2388,8 @@ TEST_F(OpObserverLargeMultiEntryTransactionTest, LargeTransactionCreatesMultiple auto oExpected = BSON("applyOps" << BSON_ARRAY(operation1.toBSON()) << "partialTxn" << true); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); - oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "partialTxn" << true); + oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "count" << 2); ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); - - oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); - ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); } } // namespace diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 6a7823ef75f..e2213961d5f 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2844,8 +2844,7 @@ public: MultiOplogEntryTransaction() : MultiDocumentTransactionTest("multiOplogEntryTransaction") { const auto currentTime = _clock->getClusterTime(); firstOplogEntryTs = currentTime.addTicks(1).asTimestamp(); - secondOplogEntryTs = currentTime.addTicks(2).asTimestamp(); - commitEntryTs = currentTime.addTicks(3).asTimestamp(); + commitEntryTs = currentTime.addTicks(2).asTimestamp(); } void run() { @@ -2874,19 +2873,27 @@ public: assertDocumentAtTimestamp(coll, presentTs, BSONObj()); assertDocumentAtTimestamp(coll, beforeTxnTs, BSONObj()); assertDocumentAtTimestamp(coll, firstOplogEntryTs, BSONObj()); - assertDocumentAtTimestamp(coll, secondOplogEntryTs, BSONObj()); assertFilteredDocumentAtTimestamp(coll, query1, commitEntryTs, doc); assertFilteredDocumentAtTimestamp(coll, query2, commitEntryTs, doc2); assertFilteredDocumentAtTimestamp(coll, query1, nullTs, doc); assertFilteredDocumentAtTimestamp(coll, query2, nullTs, doc2); - // Commit oplog entry should exist at commitEntryTs. - const auto commitFilter = BSON("ts" << commitEntryTs << "op" - << "c"); + // Implicit commit oplog entry should exist at commitEntryTs. + const auto commitFilter = + BSON("ts" << commitEntryTs << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << coll->uuid().get() + << "o" + << doc2)) + << "count" + << 2)); assertOplogDocumentExistsAtTimestamp(commitFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, firstOplogEntryTs, false); - assertOplogDocumentExistsAtTimestamp(commitFilter, secondOplogEntryTs, false); assertOplogDocumentExistsAtTimestamp(commitFilter, commitEntryTs, true); assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true); @@ -2895,7 +2902,6 @@ public: assertOldestActiveTxnTimestampEquals(boost::none, presentTs); assertOldestActiveTxnTimestampEquals(boost::none, beforeTxnTs); assertOldestActiveTxnTimestampEquals(firstOplogEntryTs, firstOplogEntryTs); - assertOldestActiveTxnTimestampEquals(firstOplogEntryTs, secondOplogEntryTs); assertOldestActiveTxnTimestampEquals(boost::none, commitEntryTs); assertOldestActiveTxnTimestampEquals(boost::none, nullTs); @@ -2915,30 +2921,9 @@ public: assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); - assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, secondOplogEntryTs, true); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true); - // second oplog entry should exist at secondOplogEntryTs and after it. - const auto secondOplogEntryFilter = - BSON("ts" << secondOplogEntryTs << "o" - << BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" - << nss.ns() - << "ui" - << coll->uuid().get() - << "o" - << doc2)) - << "partialTxn" - << true)); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, secondOplogEntryTs, true); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, commitEntryTs, true); - assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, nullTs, true); - // Session state should go to inProgress at firstOplogEntryTs, then to committed // at commitEntryTs getSessionTxnInfoAtTimestamp(presentTs, false); @@ -2946,24 +2931,22 @@ public: auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); - - sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true); - ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); - // The transaction table is only updated at the start of the transaction. - ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); + ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(commitEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "committed"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs); + ASSERT_FALSE(sessionInfo.hasField("startOpTime")); sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true); ASSERT_EQ(sessionInfo["state"].String(), "committed"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs); + ASSERT_FALSE(sessionInfo.hasField("startOpTime")); } } protected: - Timestamp firstOplogEntryTs, secondOplogEntryTs; + Timestamp firstOplogEntryTs; private: MultiOplogScopedSettings multiOplogSettings; @@ -3161,19 +3144,23 @@ public: auto sessionInfo = getSessionTxnInfoAtTimestamp(firstOplogEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); + ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(secondOplogEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "inProgress"); // The transaction table is only updated at the start of the transaction. ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), firstOplogEntryTs); + ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(prepareEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "prepared"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), prepareEntryTs); + ASSERT_EQ(sessionInfo["startOpTime"]["ts"].timestamp(), firstOplogEntryTs); sessionInfo = getSessionTxnInfoAtTimestamp(nullTs, true); ASSERT_EQ(sessionInfo["state"].String(), "committed"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), commitEntryTs); + ASSERT_FALSE(sessionInfo.hasField("startOpTime")); } } @@ -3286,6 +3273,7 @@ public: auto sessionInfo = getSessionTxnInfoAtTimestamp(abortEntryTs, true); ASSERT_EQ(sessionInfo["state"].String(), "aborted"); ASSERT_EQ(sessionInfo["lastWriteOpTime"]["ts"].timestamp(), abortEntryTs); + ASSERT_FALSE(sessionInfo.hasField("startOpTime")); } } -- cgit v1.2.1