diff options
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 129 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 413 | ||||
-rw-r--r-- | src/mongo/db/repl/rollback_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 89 |
4 files changed, 387 insertions, 247 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 727b7156792..d8426660df1 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1013,7 +1013,12 @@ namespace { OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, std::vector<repl::ReplOperation> stmts, - const OplogSlot& prepareOplogSlot) { + const OplogSlot& oplogSlot, + repl::OpTime prevWriteOpTime, + StmtId stmtId, + const bool prepare, + const bool isPartialTxn, + const bool shouldWriteStateField) { BSONObjBuilder applyOpsBuilder; BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); for (auto& stmt : stmts) { @@ -1028,41 +1033,51 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - auto txnParticipant = TransactionParticipant::get(opCtx); - oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime(); - // Until we support multiple oplog entries per transaction, prevOpTime should always be null. - invariant(oplogLink.prevOpTime.isNull()); + oplogLink.prevOpTime = prevWriteOpTime; try { - // We are only given an oplog slot for prepared transactions. - auto prepare = !prepareOplogSlot.isNull(); if (prepare) { // TODO: SERVER-36814 Remove "prepare" field on applyOps. applyOpsBuilder.append("prepare", true); } + if (isPartialTxn) { + applyOpsBuilder.append("partialTxn", true); + } auto applyOpCmd = applyOpsBuilder.done(); - const StmtId stmtId(0); - auto times = replLogApplyOps( - opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); - - auto txnState = [prepare]() -> boost::optional<DurableTxnStateEnum> { - if (serverGlobalParams.featureCompatibility.getVersion() < - ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { + opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, oplogSlot); + + auto txnState = [prepare, + prevWriteOpTime, + isPartialTxn, + shouldWriteStateField]() -> boost::optional<DurableTxnStateEnum> { + if (!shouldWriteStateField || !prevWriteOpTime.isNull()) { + // TODO (SERVER-40678): Either remove the invariant on prepare or separate the + // checks of FCV and 'prevWriteOpTime' once we support implicit prepare on + // primaries. invariant(!prepare); return boost::none; } + if (isPartialTxn) { + return DurableTxnStateEnum::kInProgress; + } + return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; }(); - onWriteOpCompleted(opCtx, - cmdNss, - {stmtId}, - times.writeOpTime, - times.wallClockTime, - txnState, - boost::none /* startOpTime */); + if (prevWriteOpTime.isNull()) { + // Only update the transaction table on the first 'partialTxn' entry when using the + // multiple transaction oplog entries format. + auto startOpTime = isPartialTxn ? boost::make_optional(oplogSlot) : boost::none; + onWriteOpCompleted(opCtx, + cmdNss, + {stmtId}, + times.writeOpTime, + times.wallClockTime, + txnState, + startOpTime); + } return times; } catch (const AssertionException& e) { // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge. @@ -1112,9 +1127,6 @@ void logOplogEntriesForTransaction(OperationContext* opCtx, invariant(!stmts.empty()); invariant(stmts.size() <= oplogSlots.size()); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); const auto txnParticipant = TransactionParticipant::get(opCtx); OpTimeBundle prevWriteOpTime; StmtId stmtId = 0; @@ -1128,23 +1140,20 @@ void logOplogEntriesForTransaction(OperationContext* opCtx, prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); // Note the logged statement IDs are not the same as the user-chosen statement IDs. stmtId = 0; - const NamespaceString cmdNss{"admin", "$cmd"}; auto oplogSlot = oplogSlots.begin(); - const auto startOpTime = *oplogSlot; for (const auto& stmt : stmts) { - bool isStartOfTxn = prevWriteOpTime.writeOpTime.isNull(); - prevWriteOpTime = logReplOperationForTransaction( - opCtx, sessionInfo, prevWriteOpTime.writeOpTime, stmtId, stmt, *oplogSlot++); - if (isStartOfTxn) { - // Update the transaction table only on the first transaction oplog entry. - onWriteOpCompleted(opCtx, - cmdNss, - {stmtId}, - prevWriteOpTime.writeOpTime, - prevWriteOpTime.wallClockTime, - DurableTxnStateEnum::kInProgress, - startOpTime); - } + // Wrap each individual operation in an applyOps entry. This is temporary until we + // implement the logic to pack multiple operations into a single applyOps entry for + // the larger transactions project. + // TODO (SERVER-40725): Modify this comment once packing logic is done. + prevWriteOpTime = logApplyOpsForTransaction(opCtx, + {stmt}, + *oplogSlot++, + prevWriteOpTime.writeOpTime, + stmtId, + false /* prepare */, + true /* isPartialTxn */, + true /* shouldWriteStateField */); stmtId++; } wuow.commit(); @@ -1279,10 +1288,25 @@ void OpObserverImpl::onUnpreparedTransactionCommit( return; repl::OpTime commitOpTime; + // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in + // a single call into the OpObserver. Therefore, we store the result here and pass it as an + // argument. + const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); if (!gUseMultipleOplogEntryFormatForTransactions || - serverGlobalParams.featureCompatibility.getVersion() < - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { - commitOpTime = logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime; + fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { + auto txnParticipant = TransactionParticipant::get(opCtx); + const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); + invariant(lastWriteOpTime.isNull()); + commitOpTime = logApplyOpsForTransaction( + opCtx, + statements, + OplogSlot(), + lastWriteOpTime, + StmtId(0), + false /* prepare */, + false /* isPartialTxn */, + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) + .writeOpTime; } else { // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction, plus one for the commit @@ -1382,9 +1406,12 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, return; } + // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in + // a single call into the OpObserver. Therefore, we store the result here and pass it as an + // argument. + const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); if (!gUseMultipleOplogEntryFormatForTransactions || - serverGlobalParams.featureCompatibility.getVersion() < - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { + fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { // We write the oplog entry in a side transaction so that we do not commit the now-prepared // transaction. // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp @@ -1393,12 +1420,22 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, writeConflictRetry( opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. Lock::GlobalLock globalLock(opCtx, MODE_IX); WriteUnitOfWork wuow(opCtx); - logApplyOpsForTransaction(opCtx, statements, prepareOpTime); + auto txnParticipant = TransactionParticipant::get(opCtx); + const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); + invariant(lastWriteOpTime.isNull()); + logApplyOpsForTransaction( + opCtx, + statements, + prepareOpTime, + lastWriteOpTime, + StmtId(0), + true /* prepare */, + false /* isPartialTxn */, + fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42); wuow.commit(); }); } else { diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index f31f81d9463..2c4a7bab692 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1360,21 +1360,12 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { auto uuid2 = CollectionUUID::gen(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - std::vector<InsertStatement> inserts1; - inserts1.emplace_back(0, - BSON("_id" << 0 << "data" - << "x")); - inserts1.emplace_back(1, - BSON("_id" << 1 << "data" - << "y")); + inserts1.emplace_back(0, BSON("_id" << 0)); + inserts1.emplace_back(1, BSON("_id" << 1)); std::vector<InsertStatement> inserts2; - inserts2.emplace_back(0, - BSON("_id" << 2 << "data" - << "z")); - inserts2.emplace_back(1, - BSON("_id" << 3 << "data" - << "w")); + inserts2.emplace_back(0, BSON("_id" << 2)); + inserts2.emplace_back(1, BSON("_id" << 3)); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); @@ -1390,49 +1381,63 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 4) { - ASSERT_EQ("i", oplogEntryObj["op"].String()); - ASSERT(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String()); - ASSERT_EQ("c", oplogEntryObj["op"].String()); - } ASSERT(!oplogEntry.getPrepare()); - ASSERT_FALSE(oplogEntryObj.hasField("prepare")); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "data" - << "x"), - oplogEntries[0].getObject()); - ASSERT_FALSE(oplogEntries[0].getObject2()); - - ASSERT_EQ(nss1, oplogEntries[1].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "data" - << "y"), - oplogEntries[1].getObject()); - ASSERT_FALSE(oplogEntries[1].getObject2()); - - ASSERT_EQ(nss2, oplogEntries[2].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[2].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "data" - << "z"), - oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); - - ASSERT_EQ(nss2, oplogEntries[3].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[3].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 3 << "data" - << "w"), - oplogEntries[3].getObject()); - ASSERT_FALSE(oplogEntries[3].getObject2()); - ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4), - oplogEntries[4].getObject()); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 2))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 3))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject()); + + oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { @@ -1476,37 +1481,46 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_EQ("u", oplogEntryObj["op"].String()); - ASSERT(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String()); - ASSERT_EQ("c", oplogEntryObj["op"].String()); - } ASSERT(!oplogEntry.getPrepare()); - ASSERT_FALSE(oplogEntryObj.hasField("prepare")); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data" - << "x")), - oplogEntries[0].getObject()); - ASSERT_TRUE(oplogEntries[0].getObject2()); - ASSERT_BSONOBJ_EQ(*oplogEntries[0].getObject2(), BSON("_id" << 0)); - - ASSERT_EQ(nss2, oplogEntries[1].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data" - << "y")), - oplogEntries[1].getObject()); - ASSERT_TRUE(oplogEntries[1].getObject2()); - ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1)); - ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2), - oplogEntries[2].getObject()); + + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "u" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("$set" << BSON("data" + << "x")) + << "o2" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "u" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("$set" << BSON("data" + << "y")) + << "o2" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); } TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { @@ -1541,33 +1555,41 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_EQ("d", oplogEntryObj["op"].String()); - ASSERT(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String()); - ASSERT_EQ("c", oplogEntryObj["op"].String()); - } ASSERT(!oplogEntry.getPrepare()); - ASSERT_FALSE(oplogEntryObj.hasField("prepare")); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(oplogEntries[0].getObject(), BSON("_id" << 0)); - ASSERT_FALSE(oplogEntries[0].getObject2()); - - ASSERT_EQ(nss2, oplogEntries[1].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1)); - ASSERT_FALSE(oplogEntries[1].getObject2()); - ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2), - oplogEntries[2].getObject()); -} + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "d" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "d" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); +} TEST_F(OpObserverMultiEntryTransactionTest, PreparingEmptyTransactionOnlyWritesPrepareOplogEntryAndToTransactionTable) { @@ -1633,47 +1655,68 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 4) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 0), oplogEntries[0].getObject()); - ASSERT_FALSE(oplogEntries[0].getObject2()); - - ASSERT_EQ(nss1, oplogEntries[1].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 1), oplogEntries[1].getObject()); - ASSERT_FALSE(oplogEntries[1].getObject2()); - - ASSERT_EQ(nss2, oplogEntries[2].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[2].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 2), oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); - - ASSERT_EQ(nss2, oplogEntries[3].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[3].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject()); - ASSERT_FALSE(oplogEntries[3].getObject2()); - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject()); - ASSERT_FALSE(oplogEntries[4].getObject2()); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 2))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 3))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject()); + + oExpected = BSON("prepareTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject()); + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); + txnParticipant.stashTransactionResources(opCtx()); assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); @@ -1726,42 +1769,48 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data" - << "x")), - oplogEntries[0].getObject()); - ASSERT_TRUE(oplogEntries[0].getObject2()); - ASSERT_BSONOBJ_EQ(*oplogEntries[0].getObject2(), BSON("_id" << 0)); - - ASSERT_EQ(nss2, oplogEntries[1].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data" - << "y")), - oplogEntries[1].getObject()); - ASSERT_TRUE(oplogEntries[1].getObject2()); - ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1)); - ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "u" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("$set" << BSON("data" + << "x")) + << "o2" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "u" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("$set" << BSON("data" + << "y")) + << "o2" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("prepareTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); + ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); txnParticipant.stashTransactionResources(opCtx()); @@ -1808,36 +1857,42 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId); oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); const auto& oplogEntry = oplogEntries.back(); - if (expectedStmtId++ < 2) { - ASSERT_TRUE(oplogEntry.isCrudOpType()); - ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete); - ASSERT_TRUE(oplogEntry.getInTxn()); - } else { - ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString()); - ASSERT_TRUE(oplogEntry.isCommand()); - ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction == - oplogEntry.getCommandType()); - } ASSERT(!oplogEntry.getPrepare()); ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + expectedStmtId++; } - ASSERT_EQ(nss1, oplogEntries[0].getNss()); - ASSERT_EQ(uuid1, *oplogEntries[0].getUuid()); - ASSERT_BSONOBJ_EQ(oplogEntries[0].getObject(), BSON("_id" << 0)); - ASSERT_FALSE(oplogEntries[0].getObject2()); - ASSERT_EQ(nss2, oplogEntries[1].getNss()); - ASSERT_EQ(uuid2, *oplogEntries[1].getUuid()); - ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1)); - ASSERT_FALSE(oplogEntries[1].getObject2()); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "d" + << "ns" + << nss1.toString() + << "ui" + << uuid1 + << "o" + << BSON("_id" << 0))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject()); + + oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op" + << "d" + << "ns" + << nss2.toString() + << "ui" + << uuid2 + << "o" + << BSON("_id" << 1))) + << "partialTxn" + << true); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); + + oExpected = BSON("prepareTransaction" << 1); + ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject()); ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); - ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject()); - ASSERT_FALSE(oplogEntries[2].getObject2()); - ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime()); txnParticipant.stashTransactionResources(opCtx()); assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared); @@ -1871,7 +1926,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); - ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand); + ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps); const auto startOpTime = insertEntry.getOpTime(); @@ -1947,7 +2003,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0])); - ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert); + ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand); + ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps); const auto startOpTime = insertEntry.getOpTime(); const auto prepareTimestamp = prepareOpTime.getTimestamp(); diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index db7a05df568..0bf542ea04b 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -811,7 +811,8 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr invariant(iter.hasNext()); const auto prepareOplogEntry = iter.next(opCtx); - if (prepareOplogEntry.getCommandType() == OplogEntry::CommandType::kApplyOps) { + if (prepareOplogEntry.getCommandType() == OplogEntry::CommandType::kApplyOps && + prepareOplogEntry.shouldPrepare()) { // Transform the prepare command into a normal applyOps command. If the // "prepare" field were not removed, the operation would be ignored. const auto swApplyOpsEntry = diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 6d6a9dc8d8f..d0c1139c437 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -2889,10 +2889,18 @@ public: assertOldestActiveTxnTimestampEquals(boost::none, nullTs); // first oplog entry should exist at firstOplogEntryTs and after it. - const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op" - << "i" - << "o" - << doc); + const auto firstOplogEntryFilter = + BSON("ts" << firstOplogEntryTs << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << coll->uuid().get() + << "o" + << doc)) + << "partialTxn" + << true)); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); @@ -2901,10 +2909,18 @@ public: assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true); // second oplog entry should exist at secondOplogEntryTs and after it. - const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op" - << "i" - << "o" - << doc2); + const auto secondOplogEntryFilter = + BSON("ts" << secondOplogEntryTs << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << coll->uuid().get() + << "o" + << doc2)) + << "partialTxn" + << true)); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false); @@ -3082,10 +3098,18 @@ public: assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true); // The first oplog entry should exist at firstOplogEntryTs and onwards. - const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op" - << "i" - << "o" - << doc); + const auto firstOplogEntryFilter = + BSON("ts" << firstOplogEntryTs << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << coll->uuid().get() + << "o" + << doc)) + << "partialTxn" + << true)); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); @@ -3093,12 +3117,19 @@ public: assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true); - // The second oplog entry should exist at secondOplogEntryTs and onwards. - const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op" - << "i" - << "o" - << doc2); + const auto secondOplogEntryFilter = + BSON("ts" << secondOplogEntryTs << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << coll->uuid().get() + << "o" + << doc2)) + << "partialTxn" + << true)); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false); @@ -3195,7 +3226,6 @@ public: txnParticipant.abortActiveTransaction(_opCtx); txnParticipant.stashTransactionResources(_opCtx); - const BSONObj query1 = BSON("_id" << 1); { // The prepare oplog entry should exist at prepareEntryTs and onwards. assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false); @@ -3211,11 +3241,26 @@ public: assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, true); assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, true); + UUID ui = UUID::gen(); + { + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + auto coll = autoColl.getCollection(); + ASSERT(coll); + ui = coll->uuid().get(); + } + // The first oplog entry should exist at firstOplogEntryTs and onwards. - const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op" - << "i" - << "o" - << doc); + const auto firstOplogEntryFilter = BSON( + "ts" << firstOplogEntryTs << "o" << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ns" + << nss.ns() + << "ui" + << ui + << "o" + << doc)) + << "partialTxn" + << true)); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false); assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true); |