summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/op_observer_impl.cpp129
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp413
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp3
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp89
4 files changed, 387 insertions, 247 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 727b7156792..d8426660df1 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1013,7 +1013,12 @@ namespace {
OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
std::vector<repl::ReplOperation> stmts,
- const OplogSlot& prepareOplogSlot) {
+ const OplogSlot& oplogSlot,
+ repl::OpTime prevWriteOpTime,
+ StmtId stmtId,
+ const bool prepare,
+ const bool isPartialTxn,
+ const bool shouldWriteStateField) {
BSONObjBuilder applyOpsBuilder;
BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
for (auto& stmt : stmts) {
@@ -1028,41 +1033,51 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- auto txnParticipant = TransactionParticipant::get(opCtx);
- oplogLink.prevOpTime = txnParticipant.getLastWriteOpTime();
- // Until we support multiple oplog entries per transaction, prevOpTime should always be null.
- invariant(oplogLink.prevOpTime.isNull());
+ oplogLink.prevOpTime = prevWriteOpTime;
try {
- // We are only given an oplog slot for prepared transactions.
- auto prepare = !prepareOplogSlot.isNull();
if (prepare) {
// TODO: SERVER-36814 Remove "prepare" field on applyOps.
applyOpsBuilder.append("prepare", true);
}
+ if (isPartialTxn) {
+ applyOpsBuilder.append("partialTxn", true);
+ }
auto applyOpCmd = applyOpsBuilder.done();
- const StmtId stmtId(0);
-
auto times = replLogApplyOps(
- opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot);
-
- auto txnState = [prepare]() -> boost::optional<DurableTxnStateEnum> {
- if (serverGlobalParams.featureCompatibility.getVersion() <
- ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
+ opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, oplogSlot);
+
+ auto txnState = [prepare,
+ prevWriteOpTime,
+ isPartialTxn,
+ shouldWriteStateField]() -> boost::optional<DurableTxnStateEnum> {
+ if (!shouldWriteStateField || !prevWriteOpTime.isNull()) {
+ // TODO (SERVER-40678): Either remove the invariant on prepare or separate the
+ // checks of FCV and 'prevWriteOpTime' once we support implicit prepare on
+ // primaries.
invariant(!prepare);
return boost::none;
}
+ if (isPartialTxn) {
+ return DurableTxnStateEnum::kInProgress;
+ }
+
return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
}();
- onWriteOpCompleted(opCtx,
- cmdNss,
- {stmtId},
- times.writeOpTime,
- times.wallClockTime,
- txnState,
- boost::none /* startOpTime */);
+ if (prevWriteOpTime.isNull()) {
+ // Only update the transaction table on the first 'partialTxn' entry when using the
+ // multiple transaction oplog entries format.
+ auto startOpTime = isPartialTxn ? boost::make_optional(oplogSlot) : boost::none;
+ onWriteOpCompleted(opCtx,
+ cmdNss,
+ {stmtId},
+ times.writeOpTime,
+ times.wallClockTime,
+ txnState,
+ startOpTime);
+ }
return times;
} catch (const AssertionException& e) {
// Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
@@ -1112,9 +1127,6 @@ void logOplogEntriesForTransaction(OperationContext* opCtx,
invariant(!stmts.empty());
invariant(stmts.size() <= oplogSlots.size());
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
- sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
const auto txnParticipant = TransactionParticipant::get(opCtx);
OpTimeBundle prevWriteOpTime;
StmtId stmtId = 0;
@@ -1128,23 +1140,20 @@ void logOplogEntriesForTransaction(OperationContext* opCtx,
prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
// Note the logged statement IDs are not the same as the user-chosen statement IDs.
stmtId = 0;
- const NamespaceString cmdNss{"admin", "$cmd"};
auto oplogSlot = oplogSlots.begin();
- const auto startOpTime = *oplogSlot;
for (const auto& stmt : stmts) {
- bool isStartOfTxn = prevWriteOpTime.writeOpTime.isNull();
- prevWriteOpTime = logReplOperationForTransaction(
- opCtx, sessionInfo, prevWriteOpTime.writeOpTime, stmtId, stmt, *oplogSlot++);
- if (isStartOfTxn) {
- // Update the transaction table only on the first transaction oplog entry.
- onWriteOpCompleted(opCtx,
- cmdNss,
- {stmtId},
- prevWriteOpTime.writeOpTime,
- prevWriteOpTime.wallClockTime,
- DurableTxnStateEnum::kInProgress,
- startOpTime);
- }
+ // Wrap each individual operation in an applyOps entry. This is temporary until we
+ // implement the logic to pack multiple operations into a single applyOps entry for
+ // the larger transactions project.
+ // TODO (SERVER-40725): Modify this comment once packing logic is done.
+ prevWriteOpTime = logApplyOpsForTransaction(opCtx,
+ {stmt},
+ *oplogSlot++,
+ prevWriteOpTime.writeOpTime,
+ stmtId,
+ false /* prepare */,
+ true /* isPartialTxn */,
+ true /* shouldWriteStateField */);
stmtId++;
}
wuow.commit();
@@ -1279,10 +1288,25 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
return;
repl::OpTime commitOpTime;
+ // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
+ // a single call into the OpObserver. Therefore, we store the result here and pass it as an
+ // argument.
+ const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
if (!gUseMultipleOplogEntryFormatForTransactions ||
- serverGlobalParams.featureCompatibility.getVersion() <
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
- commitOpTime = logApplyOpsForTransaction(opCtx, statements, OplogSlot()).writeOpTime;
+ fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
+ invariant(lastWriteOpTime.isNull());
+ commitOpTime = logApplyOpsForTransaction(
+ opCtx,
+ statements,
+ OplogSlot(),
+ lastWriteOpTime,
+ StmtId(0),
+ false /* prepare */,
+ false /* isPartialTxn */,
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42)
+ .writeOpTime;
} else {
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
// reserve enough entries for all statements in the transaction, plus one for the commit
@@ -1382,9 +1406,12 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
return;
}
+ // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
+ // a single call into the OpObserver. Therefore, we store the result here and pass it as an
+ // argument.
+ const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
if (!gUseMultipleOplogEntryFormatForTransactions ||
- serverGlobalParams.featureCompatibility.getVersion() <
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
+ fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
// We write the oplog entry in a side transaction so that we do not commit the now-prepared
// transaction.
// We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
@@ -1393,12 +1420,22 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
writeConflictRetry(
opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
-
// Writes to the oplog only require a Global intent lock.
Lock::GlobalLock globalLock(opCtx, MODE_IX);
WriteUnitOfWork wuow(opCtx);
- logApplyOpsForTransaction(opCtx, statements, prepareOpTime);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
+ invariant(lastWriteOpTime.isNull());
+ logApplyOpsForTransaction(
+ opCtx,
+ statements,
+ prepareOpTime,
+ lastWriteOpTime,
+ StmtId(0),
+ true /* prepare */,
+ false /* isPartialTxn */,
+ fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42);
wuow.commit();
});
} else {
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index f31f81d9463..2c4a7bab692 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1360,21 +1360,12 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
auto uuid2 = CollectionUUID::gen();
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
-
std::vector<InsertStatement> inserts1;
- inserts1.emplace_back(0,
- BSON("_id" << 0 << "data"
- << "x"));
- inserts1.emplace_back(1,
- BSON("_id" << 1 << "data"
- << "y"));
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
std::vector<InsertStatement> inserts2;
- inserts2.emplace_back(0,
- BSON("_id" << 2 << "data"
- << "z"));
- inserts2.emplace_back(1,
- BSON("_id" << 3 << "data"
- << "w"));
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
@@ -1390,49 +1381,63 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 4) {
- ASSERT_EQ("i", oplogEntryObj["op"].String());
- ASSERT(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String());
- ASSERT_EQ("c", oplogEntryObj["op"].String());
- }
ASSERT(!oplogEntry.getPrepare());
- ASSERT_FALSE(oplogEntryObj.hasField("prepare"));
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "data"
- << "x"),
- oplogEntries[0].getObject());
- ASSERT_FALSE(oplogEntries[0].getObject2());
-
- ASSERT_EQ(nss1, oplogEntries[1].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "data"
- << "y"),
- oplogEntries[1].getObject());
- ASSERT_FALSE(oplogEntries[1].getObject2());
-
- ASSERT_EQ(nss2, oplogEntries[2].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[2].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "data"
- << "z"),
- oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
-
- ASSERT_EQ(nss2, oplogEntries[3].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[3].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 3 << "data"
- << "w"),
- oplogEntries[3].getObject());
- ASSERT_FALSE(oplogEntries[3].getObject2());
- ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4),
- oplogEntries[4].getObject());
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 3)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject());
+
+ oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 4);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
@@ -1476,37 +1481,46 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_EQ("u", oplogEntryObj["op"].String());
- ASSERT(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String());
- ASSERT_EQ("c", oplogEntryObj["op"].String());
- }
ASSERT(!oplogEntry.getPrepare());
- ASSERT_FALSE(oplogEntryObj.hasField("prepare"));
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data"
- << "x")),
- oplogEntries[0].getObject());
- ASSERT_TRUE(oplogEntries[0].getObject2());
- ASSERT_BSONOBJ_EQ(*oplogEntries[0].getObject2(), BSON("_id" << 0));
-
- ASSERT_EQ(nss2, oplogEntries[1].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data"
- << "y")),
- oplogEntries[1].getObject());
- ASSERT_TRUE(oplogEntries[1].getObject2());
- ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1));
- ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2),
- oplogEntries[2].getObject());
+
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
@@ -1541,33 +1555,41 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_EQ("d", oplogEntryObj["op"].String());
- ASSERT(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntryObj["ns"].String());
- ASSERT_EQ("c", oplogEntryObj["op"].String());
- }
ASSERT(!oplogEntry.getPrepare());
- ASSERT_FALSE(oplogEntryObj.hasField("prepare"));
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(oplogEntries[0].getObject(), BSON("_id" << 0));
- ASSERT_FALSE(oplogEntries[0].getObject2());
-
- ASSERT_EQ(nss2, oplogEntries[1].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1));
- ASSERT_FALSE(oplogEntries[1].getObject2());
- ASSERT_BSONOBJ_EQ(BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2),
- oplogEntries[2].getObject());
-}
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("commitTransaction" << 1 << "prepared" << false << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+}
TEST_F(OpObserverMultiEntryTransactionTest,
PreparingEmptyTransactionOnlyWritesPrepareOplogEntryAndToTransactionTable) {
@@ -1633,47 +1655,68 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 4) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kInsert);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 0), oplogEntries[0].getObject());
- ASSERT_FALSE(oplogEntries[0].getObject2());
-
- ASSERT_EQ(nss1, oplogEntries[1].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 1), oplogEntries[1].getObject());
- ASSERT_FALSE(oplogEntries[1].getObject2());
-
- ASSERT_EQ(nss2, oplogEntries[2].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[2].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 2), oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
-
- ASSERT_EQ(nss2, oplogEntries[3].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[3].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("_id" << 3), oplogEntries[3].getObject());
- ASSERT_FALSE(oplogEntries[3].getObject2());
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[4].getObject());
- ASSERT_FALSE(oplogEntries[4].getObject2());
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 2)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 3)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject());
+
+ oExpected = BSON("prepareTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[4].getObject());
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
txnParticipant.stashTransactionResources(opCtx());
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
@@ -1726,42 +1769,48 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kUpdate);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data"
- << "x")),
- oplogEntries[0].getObject());
- ASSERT_TRUE(oplogEntries[0].getObject2());
- ASSERT_BSONOBJ_EQ(*oplogEntries[0].getObject2(), BSON("_id" << 0));
-
- ASSERT_EQ(nss2, oplogEntries[1].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(BSON("$set" << BSON("data"
- << "y")),
- oplogEntries[1].getObject());
- ASSERT_TRUE(oplogEntries[1].getObject2());
- ASSERT_BSONOBJ_EQ(*oplogEntries[1].getObject2(), BSON("_id" << 1));
- ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("prepareTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
txnParticipant.stashTransactionResources(opCtx());
@@ -1808,36 +1857,42 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
checkSessionAndTransactionFields(oplogEntryObj, expectedStmtId);
oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
const auto& oplogEntry = oplogEntries.back();
- if (expectedStmtId++ < 2) {
- ASSERT_TRUE(oplogEntry.isCrudOpType());
- ASSERT(oplogEntry.getOpType() == repl::OpTypeEnum::kDelete);
- ASSERT_TRUE(oplogEntry.getInTxn());
- } else {
- ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getNss().toString());
- ASSERT_TRUE(oplogEntry.isCommand());
- ASSERT_TRUE(OplogEntry::CommandType::kPrepareTransaction ==
- oplogEntry.getCommandType());
- }
ASSERT(!oplogEntry.getPrepare());
ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ expectedStmtId++;
}
- ASSERT_EQ(nss1, oplogEntries[0].getNss());
- ASSERT_EQ(uuid1, *oplogEntries[0].getUuid());
- ASSERT_BSONOBJ_EQ(oplogEntries[0].getObject(), BSON("_id" << 0));
- ASSERT_FALSE(oplogEntries[0].getObject2());
- ASSERT_EQ(nss2, oplogEntries[1].getNss());
- ASSERT_EQ(uuid2, *oplogEntries[1].getUuid());
- ASSERT_BSONOBJ_EQ(oplogEntries[1].getObject(), BSON("_id" << 1));
- ASSERT_FALSE(oplogEntries[1].getObject2());
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns"
+ << nss1.toString()
+ << "ui"
+ << uuid1
+ << "o"
+ << BSON("_id" << 0)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns"
+ << nss2.toString()
+ << "ui"
+ << uuid2
+ << "o"
+ << BSON("_id" << 1)))
+ << "partialTxn"
+ << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected = BSON("prepareTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
- ASSERT_BSONOBJ_EQ(BSON("prepareTransaction" << 1), oplogEntries[2].getObject());
- ASSERT_FALSE(oplogEntries[2].getObject2());
-
ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
txnParticipant.stashTransactionResources(opCtx());
assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
@@ -1871,7 +1926,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
- ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
const auto startOpTime = insertEntry.getOpTime();
@@ -1947,7 +2003,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
- ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
const auto startOpTime = insertEntry.getOpTime();
const auto prepareTimestamp = prepareOpTime.getTimestamp();
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index db7a05df568..0bf542ea04b 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -811,7 +811,8 @@ Status RollbackImpl::_processRollbackOp(OperationContext* opCtx, const OplogEntr
invariant(iter.hasNext());
const auto prepareOplogEntry = iter.next(opCtx);
- if (prepareOplogEntry.getCommandType() == OplogEntry::CommandType::kApplyOps) {
+ if (prepareOplogEntry.getCommandType() == OplogEntry::CommandType::kApplyOps &&
+ prepareOplogEntry.shouldPrepare()) {
// Transform the prepare command into a normal applyOps command. If the
// "prepare" field were not removed, the operation would be ignored.
const auto swApplyOpsEntry =
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 6d6a9dc8d8f..d0c1139c437 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -2889,10 +2889,18 @@ public:
assertOldestActiveTxnTimestampEquals(boost::none, nullTs);
// first oplog entry should exist at firstOplogEntryTs and after it.
- const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op"
- << "i"
- << "o"
- << doc);
+ const auto firstOplogEntryFilter =
+ BSON("ts" << firstOplogEntryTs << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << coll->uuid().get()
+ << "o"
+ << doc))
+ << "partialTxn"
+ << true));
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);
@@ -2901,10 +2909,18 @@ public:
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true);
// second oplog entry should exist at secondOplogEntryTs and after it.
- const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op"
- << "i"
- << "o"
- << doc2);
+ const auto secondOplogEntryFilter =
+ BSON("ts" << secondOplogEntryTs << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << coll->uuid().get()
+ << "o"
+ << doc2))
+ << "partialTxn"
+ << true));
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false);
@@ -3082,10 +3098,18 @@ public:
assertOplogDocumentExistsAtTimestamp(commitFilter, nullTs, true);
// The first oplog entry should exist at firstOplogEntryTs and onwards.
- const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op"
- << "i"
- << "o"
- << doc);
+ const auto firstOplogEntryFilter =
+ BSON("ts" << firstOplogEntryTs << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << coll->uuid().get()
+ << "o"
+ << doc))
+ << "partialTxn"
+ << true));
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);
@@ -3093,12 +3117,19 @@ public:
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, prepareEntryTs, true);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, commitEntryTs, true);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, nullTs, true);
-
// The second oplog entry should exist at secondOplogEntryTs and onwards.
- const auto secondOplogEntryFilter = BSON("ts" << secondOplogEntryTs << "op"
- << "i"
- << "o"
- << doc2);
+ const auto secondOplogEntryFilter =
+ BSON("ts" << secondOplogEntryTs << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << coll->uuid().get()
+ << "o"
+ << doc2))
+ << "partialTxn"
+ << true));
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(secondOplogEntryFilter, firstOplogEntryTs, false);
@@ -3195,7 +3226,6 @@ public:
txnParticipant.abortActiveTransaction(_opCtx);
txnParticipant.stashTransactionResources(_opCtx);
- const BSONObj query1 = BSON("_id" << 1);
{
// The prepare oplog entry should exist at prepareEntryTs and onwards.
assertOplogDocumentExistsAtTimestamp(prepareFilter, presentTs, false);
@@ -3211,11 +3241,26 @@ public:
assertOplogDocumentExistsAtTimestamp(abortFilter, abortEntryTs, true);
assertOplogDocumentExistsAtTimestamp(abortFilter, nullTs, true);
+ UUID ui = UUID::gen();
+ {
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
+ auto coll = autoColl.getCollection();
+ ASSERT(coll);
+ ui = coll->uuid().get();
+ }
+
// The first oplog entry should exist at firstOplogEntryTs and onwards.
- const auto firstOplogEntryFilter = BSON("ts" << firstOplogEntryTs << "op"
- << "i"
- << "o"
- << doc);
+ const auto firstOplogEntryFilter = BSON(
+ "ts" << firstOplogEntryTs << "o" << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << ui
+ << "o"
+ << doc))
+ << "partialTxn"
+ << true));
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, presentTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, beforeTxnTs, false);
assertOplogDocumentExistsAtTimestamp(firstOplogEntryFilter, firstOplogEntryTs, true);