summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2019-05-01 15:12:45 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2019-05-06 11:44:40 -0400
commit74fbaeaa74a1e60f0a0e69c57f2ca2f3f96474da (patch)
treefb10a48c70890b4f7a791e122b8a7eff85890af8 /src/mongo
parent9b740facbac92320ed96ecb8ef169deb535f96cd (diff)
downloadmongo-74fbaeaa74a1e60f0a0e69c57f2ca2f3f96474da.tar.gz
SERVER-39797 Rollback implicit commit applyOps oplog entry in rollback-via-refetch
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp69
-rw-r--r--src/mongo/db/repl/rs_rollback.h4
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp387
3 files changed, 416 insertions, 44 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index be2ddd5d13e..4d90bba29cc 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -218,7 +218,9 @@ Status FixUpInfo::recordDropTargetInfo(const BSONElement& dropTarget,
return Status::OK();
}
-Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo,
+Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* opCtx,
+ const OplogInterface& localOplog,
+ FixUpInfo& fixUpInfo,
const BSONObj& ourObj,
bool isNestedApplyOpsCommand) {
@@ -539,30 +541,53 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
// }]
// }
// }
- if (first.type() != Array) {
- std::string message = str::stream()
- << "Expected applyOps argument to be an array; found " << redact(first);
- severe() << message;
- return Status(ErrorCodes::UnrecoverableRollbackError, message);
- }
- for (const auto& subopElement : first.Array()) {
- if (subopElement.type() != Object) {
+ // Additionally, for transactions, applyOps entries may be linked by their
+ // previousTransactionOpTimes. For those, we need to walk the chain and get to
+ // all the entries. We don't worry about the order that we walk the entries.
+ auto operations = first;
+ auto prevWriteOpTime = oplogEntry.getPrevWriteOpTimeInTransaction();
+ auto txnHistoryIter = prevWriteOpTime
+ ? localOplog.makeTransactionHistoryIterator(*prevWriteOpTime)
+ : nullptr;
+ do {
+ if (operations.type() != Array) {
std::string message = str::stream()
- << "Expected applyOps operations to be of Object type, but found "
- << redact(subopElement);
+ << "Expected applyOps argument to be an array; found "
+ << redact(operations);
severe() << message;
return Status(ErrorCodes::UnrecoverableRollbackError, message);
}
- // In applyOps, the object contains an array of different oplog entries, we call
- // updateFixUpInfoFromLocalOplogEntry here in order to record the information
- // needed for rollback that is contained within the applyOps, creating a nested
- // call.
- auto subStatus =
- updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj(), true);
- if (!subStatus.isOK()) {
- return subStatus;
+ for (const auto& subopElement : operations.Array()) {
+ if (subopElement.type() != Object) {
+ std::string message = str::stream()
+ << "Expected applyOps operations to be of Object type, but found "
+ << redact(subopElement);
+ severe() << message;
+ return Status(ErrorCodes::UnrecoverableRollbackError, message);
+ }
+ // In applyOps, the object contains an array of different oplog entries, we
+ // call
+ // updateFixUpInfoFromLocalOplogEntry here in order to record the
+ // information
+ // needed for rollback that is contained within the applyOps, creating a
+ // nested
+ // call.
+ auto subStatus = updateFixUpInfoFromLocalOplogEntry(
+ opCtx, localOplog, fixUpInfo, subopElement.Obj(), true);
+ if (!subStatus.isOK()) {
+ return subStatus;
+ }
}
- }
+ if (!txnHistoryIter || !txnHistoryIter->hasNext())
+ break;
+ try {
+ auto nextApplyOps = txnHistoryIter->next(opCtx);
+ operations = nextApplyOps.getObject().firstElement();
+ } catch (const DBException& ex) {
+ // If we can't get the full transaction history, we can't roll back;
+ return {ErrorCodes::UnrecoverableRollbackError, ex.reason()};
+ }
+ } while (1);
return Status::OK();
}
case OplogEntry::CommandType::kAbortTransaction: {
@@ -926,8 +951,8 @@ Status _syncRollback(OperationContext* opCtx,
log() << "Finding the Common Point";
try {
- auto processOperationForFixUp = [&how](const BSONObj& operation) {
- return updateFixUpInfoFromLocalOplogEntry(how, operation, false);
+ auto processOperationForFixUp = [&how, &opCtx, &localOplog](const BSONObj& operation) {
+ return updateFixUpInfoFromLocalOplogEntry(opCtx, localOplog, how, operation, false);
};
// Calls syncRollBackLocalOperations to run updateFixUpInfoFromLocalOplogEntry
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index da364d6ef3e..619327ff3eb 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -360,7 +360,9 @@ private:
* rolling back node from after the common point. "ourObj" is the oplog document that needs
* to be reverted.
*/
-Status updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo,
+Status updateFixUpInfoFromLocalOplogEntry(OperationContext* opCtx,
+ const OplogInterface& localOplog,
+ FixUpInfo& fixUpInfo,
const BSONObj& ourObj,
bool isNestedApplyOpsCommand);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index b80c756aac5..acf1bf40d8b 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -1985,7 +1985,8 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) {
<< "o"
<< BSON("_id" << 1 << "a" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto invalidOplogEntry = BSON("op"
<< "i"
<< "ui"
@@ -1998,7 +1999,8 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) {
<< ""
<< "o"
<< BSON("_id" << 1 << "a" << 1));
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false),
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false),
RSFatalException);
}
@@ -2016,7 +2018,8 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) {
<< "o"
<< BSON("_id" << 1 << "a" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto invalidOplogEntry = BSON("op"
<< "i"
<< "ui"
@@ -2029,7 +2032,8 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) {
<< "test.t"
<< "o"
<< BSONObj());
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false),
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false),
RSFatalException);
}
@@ -2049,7 +2053,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) {
<< "o2"
<< BSON("_id" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto invalidOplogEntry = BSON("op"
<< "u"
<< "ui"
@@ -2062,7 +2067,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) {
<< "test.t"
<< "o"
<< BSON("_id" << 1 << "a" << 1));
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false),
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false),
RSFatalException);
}
@@ -2082,7 +2088,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) {
<< "o2"
<< BSON("_id" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto invalidOplogEntry = BSON("op"
<< "u"
<< "ui"
@@ -2097,7 +2104,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) {
<< BSON("_id" << 1 << "a" << 1)
<< "o2"
<< BSONObj());
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false),
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false),
RSFatalException);
}
@@ -2111,14 +2119,17 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutSessionIdIsFatal, "in
<< "o"
<< BSON("_id" << 1 << "a" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto txnNumber = BSON("txnNumber" << 1LL);
const auto noSessionIdOrStmtId = validOplogEntry.addField(txnNumber.firstElement());
const auto stmtId = BSON("stmtId" << 1);
const auto noSessionId = noSessionIdOrStmtId.addField(stmtId.firstElement());
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, noSessionId, false), RSFatalException);
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, noSessionId, false),
+ RSFatalException);
}
DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invariant") {
@@ -2131,7 +2142,8 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar
<< "o"
<< BSON("_id" << 1 << "a" << 1));
FixUpInfo fui;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false));
const auto txnNumber = BSON("txnNumber" << 1LL);
const auto noSessionIdOrStmtId = validOplogEntry.addField(txnNumber.firstElement());
@@ -2139,7 +2151,9 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar
const auto lsid = makeLogicalSessionIdForTest();
const auto sessionId = BSON("lsid" << lsid.toBSON());
const auto noStmtId = noSessionIdOrStmtId.addField(sessionId.firstElement());
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, noStmtId, false), RSFatalException);
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, noStmtId, false),
+ RSFatalException);
}
TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) {
@@ -2162,7 +2176,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) {
<< lsid.toBSON());
FixUpInfo fui;
- ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false),
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false),
RSFatalException);
}
@@ -2179,7 +2194,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch
<< "o"
<< BSON("_id" << 2 << "a" << 2));
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithoutTxnNumber, false));
ASSERT_EQ(fui.docsToRefetch.size(), 1U);
// If txnNumber is present, and the transaction table exists and has a UUID, the session
@@ -2204,7 +2220,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch
UUID transactionTableUUID = UUID::gen();
fui.transactionTableUUID = transactionTableUUID;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false));
ASSERT_EQ(fui.docsToRefetch.size(), 3U);
auto expectedObj = BSON("_id" << lsid.toBSON());
@@ -2246,7 +2263,8 @@ TEST_F(RSRollbackTest, LocalEntryWithPartialTxnAddsTransactionTableDocToBeRefetc
UUID transactionTableUUID = UUID::gen();
fui.transactionTableUUID = transactionTableUUID;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false));
ASSERT_EQ(fui.docsToRefetch.size(), 1U);
auto expectedObj = BSON("_id" << lsid.toBSON());
@@ -2278,7 +2296,8 @@ TEST_F(RSRollbackTest, LocalAbortTxnRefetchesTransactionTableEntry) {
UUID transactionTableUUID = UUID::gen();
fui.transactionTableUUID = transactionTableUUID;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, abortTxnEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, abortTxnEntry, false));
ASSERT_EQ(fui.docsToRefetch.size(), 1U);
auto expectedObj = BSON("_id" << lsid.toBSON());
@@ -2286,7 +2305,7 @@ TEST_F(RSRollbackTest, LocalAbortTxnRefetchesTransactionTableEntry) {
ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end());
}
-TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEntry) {
+TEST_F(RSRollbackTest, LocalEntryWithAbortedPartialTxnRefetchesOnlyTransactionTableEntry) {
FixUpInfo fui;
// If txnNumber is present, and the transaction table exists and has a UUID, the session
@@ -2335,8 +2354,10 @@ TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEn
UUID transactionTableUUID = UUID::gen();
fui.transactionTableUUID = transactionTableUUID;
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, abortTxnEntry, false));
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, abortTxnEntry, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false));
ASSERT_EQ(fui.docsToRefetch.size(), 1U);
auto expectedObj = BSON("_id" << lsid.toBSON());
@@ -2344,6 +2365,329 @@ TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEn
ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end());
}
+TEST_F(RSRollbackTest, LocalEntryWithCommittedTxnRefetchesDocsAndTransactionTableEntry) {
+ FixUpInfo fui;
+ UUID uuid = UUID::gen();
+ auto lsid = makeLogicalSessionIdForTest();
+ auto commitTxnEntry =
+ BSON("ts" << Timestamp(Seconds(1), 2) << "t" << 1LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 2 << "a" << 2)))
+ << "count"
+ << 2)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 2
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(Seconds(1), 1) << "t" << 1LL));
+ auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(2));
+
+ auto partialTxnEntry =
+ BSON("ts" << Timestamp(Seconds(1), 1) << "t" << 1LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)))
+ << "partialTxn"
+ << true)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 1
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(0, 0) << "t" << -1LL));
+
+ auto partialTxnOperation = std::make_pair(partialTxnEntry, RecordId(1));
+ UUID transactionTableUUID = UUID::gen();
+ fui.transactionTableUUID = transactionTableUUID;
+
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */,
+ OplogInterfaceMock({commitTxnOperation, partialTxnOperation}),
+ fui,
+ commitTxnEntry,
+ false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */,
+ OplogInterfaceMock({commitTxnOperation, partialTxnOperation}),
+ fui,
+ partialTxnEntry,
+ false));
+ ASSERT_EQ(fui.docsToRefetch.size(), 3U);
+
+ auto expectedObj = BSON("_id" << lsid.toBSON());
+ DocID expectedTxnDoc(expectedObj, expectedObj.firstElement(), transactionTableUUID);
+ ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end());
+
+ auto expectedCrudObj = BSON("_id" << 2);
+ auto expectedCrudDoc = DocID(expectedObj, expectedObj.firstElement(), transactionTableUUID);
+ ASSERT_TRUE(fui.docsToRefetch.find(expectedCrudDoc) != fui.docsToRefetch.end());
+
+ expectedCrudObj = BSON("_id" << 1);
+ expectedCrudDoc = DocID(expectedObj, expectedObj.firstElement(), transactionTableUUID);
+ ASSERT_TRUE(fui.docsToRefetch.find(expectedCrudDoc) != fui.docsToRefetch.end());
+}
+
+TEST_F(RSRollbackTest, RollbackFetchesTransactionOperationBeforeCommonPoint) {
+ createOplog(_opCtx.get());
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ auto coll = _createCollection(_opCtx.get(), "test.t", options);
+ options.uuid = UUID::gen();
+ auto txnTable = _createCollection(_opCtx.get(), "config.transactions", options);
+
+ auto commonOperation = makeOpAndRecordId(10);
+ UUID uuid = *coll->uuid();
+ auto lsid = makeLogicalSessionIdForTest();
+ auto commitTxnEntry =
+ BSON("ts" << Timestamp(Seconds(10), 12) << "t" << 10LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 0 << "a" << 0)))
+ << "count"
+ << 3)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 3
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL));
+ auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(12));
+
+ auto entryAfterCommonPoint =
+ BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)))
+ << "partialTxn"
+ << true)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 2
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL));
+ auto operationAfterCommonPoint = std::make_pair(entryAfterCommonPoint, RecordId(11));
+
+ auto entryBeforeCommonPoint =
+ BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 2 << "a" << 2)))
+ << "partialTxn"
+ << true)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 1
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(0, 0) << "t" << -1LL));
+ auto operationBeforeCommonPoint = std::make_pair(entryBeforeCommonPoint, RecordId(9));
+
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog, const UUID& txnTableUuid)
+ : RollbackSourceMock(std::move(oplog)), _txnTableUuid(txnTableUuid) {}
+
+ std::pair<BSONObj, NamespaceString> findOneByUUID(const std::string& db,
+ UUID uuid,
+ const BSONObj& filter) const override {
+ int numFields = 0;
+ if (uuid == _txnTableUuid) {
+ // This unit test does not test transaction table fetches.
+ return {BSONObj(), NamespaceString::kSessionTransactionsTableNamespace};
+ }
+ for (const auto element : filter) {
+ ++numFields;
+ ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter;
+ }
+ ASSERT_EQUALS(1, numFields) << filter;
+ searchedIds.insert(filter.firstElement().numberInt());
+ switch (filter.firstElement().numberInt()) {
+ case 0:
+ return {BSON("_id" << 0 << "v" << 0), NamespaceString()};
+ case 1:
+ return {BSON("_id" << 1 << "v" << 1), NamespaceString()};
+ case 2:
+ return {BSON("_id" << 2 << "v" << 3), NamespaceString()};
+ }
+ FAIL("Unexpected findOne request") << filter;
+ return {}; // Unreachable; why doesn't compiler know?
+ }
+
+ mutable std::multiset<int> searchedIds;
+
+ private:
+ UUID _txnTableUuid;
+
+ } rollbackSource(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock({commonOperation, operationBeforeCommonPoint})),
+ *txnTable->uuid());
+
+ ASSERT_OK(syncRollback(_opCtx.get(),
+ OplogInterfaceMock({commitTxnOperation,
+ operationAfterCommonPoint,
+ commonOperation,
+ operationBeforeCommonPoint}),
+ rollbackSource,
+ {},
+ _coordinator,
+ _replicationProcess.get()));
+ ASSERT_EQUALS(3U, rollbackSource.searchedIds.size());
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(0));
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1));
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2));
+}
+
+TEST_F(RSRollbackTest, RollbackIncompleteTransactionReturnsUnrecoverableRollbackError) {
+ createOplog(_opCtx.get());
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ auto coll = _createCollection(_opCtx.get(), "test.t", options);
+ options.uuid = UUID::gen();
+ auto txnTable = _createCollection(_opCtx.get(), "config.transactions", options);
+
+ auto commonOperation = makeOpAndRecordId(10);
+ UUID uuid = *coll->uuid();
+ auto lsid = makeLogicalSessionIdForTest();
+ auto commitTxnEntry =
+ BSON("ts" << Timestamp(Seconds(10), 12) << "t" << 10LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 0 << "a" << 0)))
+ << "count"
+ << 3)
+ << "stmtId"
+ << 3
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL));
+ auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(12));
+
+ auto entryAfterCommonPoint =
+ BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL << "op"
+ << "c"
+ << "ns"
+ << "admin.$cmd"
+ << "o"
+ << BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)))
+ << "partialTxn"
+ << true)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 2
+ << "lsid"
+ << lsid.toBSON()
+ << "prevOpTime"
+ << BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL));
+ auto operationAfterCommonPoint = std::make_pair(entryAfterCommonPoint, RecordId(11));
+
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog, const UUID& txnTableUuid)
+ : RollbackSourceMock(std::move(oplog)), _txnTableUuid(txnTableUuid) {}
+
+ std::pair<BSONObj, NamespaceString> findOneByUUID(const std::string& db,
+ UUID uuid,
+ const BSONObj& filter) const override {
+ if (uuid == _txnTableUuid) {
+ // This unit test does not test transaction table fetches.
+ return {BSONObj(), NamespaceString::kSessionTransactionsTableNamespace};
+ } else {
+ return {BSONObj(), NamespaceString()};
+ }
+ }
+
+ private:
+ UUID _txnTableUuid;
+ } rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({commonOperation})),
+ *txnTable->uuid());
+
+
+ auto status = syncRollback(
+ _opCtx.get(),
+ OplogInterfaceMock({commitTxnOperation, operationAfterCommonPoint, commonOperation}),
+ rollbackSource,
+ {},
+ _coordinator,
+ _replicationProcess.get());
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
+}
+
TEST_F(RSRollbackTest, RollbackFailsIfTransactionDocumentRefetchReturnsDifferentNamespace) {
createOplog(_opCtx.get());
@@ -2376,7 +2720,8 @@ TEST_F(RSRollbackTest, RollbackFailsIfTransactionDocumentRefetchReturnsDifferent
fui.rbid = 1;
// The FixUpInfo will have an extra doc to refetch: the corresponding transaction table entry.
- ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false));
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(
+ nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false));
ASSERT_EQ(fui.docsToRefetch.size(), 2U);
{