diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2019-05-01 15:12:45 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2019-05-06 11:44:40 -0400 |
commit | 74fbaeaa74a1e60f0a0e69c57f2ca2f3f96474da (patch) | |
tree | fb10a48c70890b4f7a791e122b8a7eff85890af8 /src/mongo | |
parent | 9b740facbac92320ed96ecb8ef169deb535f96cd (diff) | |
download | mongo-74fbaeaa74a1e60f0a0e69c57f2ca2f3f96474da.tar.gz |
SERVER-39797 Rollback implicit commit applyOps oplog entry in rollback-via-refetch
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 387 |
3 files changed, 416 insertions, 44 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index be2ddd5d13e..4d90bba29cc 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -218,7 +218,9 @@ Status FixUpInfo::recordDropTargetInfo(const BSONElement& dropTarget, return Status::OK(); } -Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, +Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(OperationContext* opCtx, + const OplogInterface& localOplog, + FixUpInfo& fixUpInfo, const BSONObj& ourObj, bool isNestedApplyOpsCommand) { @@ -539,30 +541,53 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf // }] // } // } - if (first.type() != Array) { - std::string message = str::stream() - << "Expected applyOps argument to be an array; found " << redact(first); - severe() << message; - return Status(ErrorCodes::UnrecoverableRollbackError, message); - } - for (const auto& subopElement : first.Array()) { - if (subopElement.type() != Object) { + // Additionally, for transactions, applyOps entries may be linked by their + // previousTransactionOpTimes. For those, we need to walk the chain and get to + // all the entries. We don't worry about the order that we walk the entries. + auto operations = first; + auto prevWriteOpTime = oplogEntry.getPrevWriteOpTimeInTransaction(); + auto txnHistoryIter = prevWriteOpTime + ? localOplog.makeTransactionHistoryIterator(*prevWriteOpTime) + : nullptr; + do { + if (operations.type() != Array) { std::string message = str::stream() - << "Expected applyOps operations to be of Object type, but found " - << redact(subopElement); + << "Expected applyOps argument to be an array; found " + << redact(operations); severe() << message; return Status(ErrorCodes::UnrecoverableRollbackError, message); } - // In applyOps, the object contains an array of different oplog entries, we call - // updateFixUpInfoFromLocalOplogEntry here in order to record the information - // needed for rollback that is contained within the applyOps, creating a nested - // call. - auto subStatus = - updateFixUpInfoFromLocalOplogEntry(fixUpInfo, subopElement.Obj(), true); - if (!subStatus.isOK()) { - return subStatus; + for (const auto& subopElement : operations.Array()) { + if (subopElement.type() != Object) { + std::string message = str::stream() + << "Expected applyOps operations to be of Object type, but found " + << redact(subopElement); + severe() << message; + return Status(ErrorCodes::UnrecoverableRollbackError, message); + } + // In applyOps, the object contains an array of different oplog entries, we + // call + // updateFixUpInfoFromLocalOplogEntry here in order to record the + // information + // needed for rollback that is contained within the applyOps, creating a + // nested + // call. + auto subStatus = updateFixUpInfoFromLocalOplogEntry( + opCtx, localOplog, fixUpInfo, subopElement.Obj(), true); + if (!subStatus.isOK()) { + return subStatus; + } } - } + if (!txnHistoryIter || !txnHistoryIter->hasNext()) + break; + try { + auto nextApplyOps = txnHistoryIter->next(opCtx); + operations = nextApplyOps.getObject().firstElement(); + } catch (const DBException& ex) { + // If we can't get the full transaction history, we can't roll back; + return {ErrorCodes::UnrecoverableRollbackError, ex.reason()}; + } + } while (1); return Status::OK(); } case OplogEntry::CommandType::kAbortTransaction: { @@ -926,8 +951,8 @@ Status _syncRollback(OperationContext* opCtx, log() << "Finding the Common Point"; try { - auto processOperationForFixUp = [&how](const BSONObj& operation) { - return updateFixUpInfoFromLocalOplogEntry(how, operation, false); + auto processOperationForFixUp = [&how, &opCtx, &localOplog](const BSONObj& operation) { + return updateFixUpInfoFromLocalOplogEntry(opCtx, localOplog, how, operation, false); }; // Calls syncRollBackLocalOperations to run updateFixUpInfoFromLocalOplogEntry diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index da364d6ef3e..619327ff3eb 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -360,7 +360,9 @@ private: * rolling back node from after the common point. "ourObj" is the oplog document that needs * to be reverted. */ -Status updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInfo, +Status updateFixUpInfoFromLocalOplogEntry(OperationContext* opCtx, + const OplogInterface& localOplog, + FixUpInfo& fixUpInfo, const BSONObj& ourObj, bool isNestedApplyOpsCommand); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index b80c756aac5..acf1bf40d8b 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -1985,7 +1985,8 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) { << "o" << BSON("_id" << 1 << "a" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto invalidOplogEntry = BSON("op" << "i" << "ui" @@ -1998,7 +1999,8 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) { << "" << "o" << BSON("_id" << 1 << "a" << 1)); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false), + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false), RSFatalException); } @@ -2016,7 +2018,8 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { << "o" << BSON("_id" << 1 << "a" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto invalidOplogEntry = BSON("op" << "i" << "ui" @@ -2029,7 +2032,8 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { << "test.t" << "o" << BSONObj()); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false), + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false), RSFatalException); } @@ -2049,7 +2053,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) { << "o2" << BSON("_id" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto invalidOplogEntry = BSON("op" << "u" << "ui" @@ -2062,7 +2067,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) { << "test.t" << "o" << BSON("_id" << 1 << "a" << 1)); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false), + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false), RSFatalException); } @@ -2082,7 +2088,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { << "o2" << BSON("_id" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto invalidOplogEntry = BSON("op" << "u" << "ui" @@ -2097,7 +2104,8 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { << BSON("_id" << 1 << "a" << 1) << "o2" << BSONObj()); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, invalidOplogEntry, false), + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, invalidOplogEntry, false), RSFatalException); } @@ -2111,14 +2119,17 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutSessionIdIsFatal, "in << "o" << BSON("_id" << 1 << "a" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto txnNumber = BSON("txnNumber" << 1LL); const auto noSessionIdOrStmtId = validOplogEntry.addField(txnNumber.firstElement()); const auto stmtId = BSON("stmtId" << 1); const auto noSessionId = noSessionIdOrStmtId.addField(stmtId.firstElement()); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, noSessionId, false), RSFatalException); + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, noSessionId, false), + RSFatalException); } DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invariant") { @@ -2131,7 +2142,8 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar << "o" << BSON("_id" << 1 << "a" << 1)); FixUpInfo fui; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, validOplogEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, validOplogEntry, false)); const auto txnNumber = BSON("txnNumber" << 1LL); const auto noSessionIdOrStmtId = validOplogEntry.addField(txnNumber.firstElement()); @@ -2139,7 +2151,9 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar const auto lsid = makeLogicalSessionIdForTest(); const auto sessionId = BSON("lsid" << lsid.toBSON()); const auto noStmtId = noSessionIdOrStmtId.addField(sessionId.firstElement()); - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, noStmtId, false), RSFatalException); + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, noStmtId, false), + RSFatalException); } TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) { @@ -2162,7 +2176,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) { << lsid.toBSON()); FixUpInfo fui; - ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false), + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false), RSFatalException); } @@ -2179,7 +2194,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch << "o" << BSON("_id" << 2 << "a" << 2)); - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithoutTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 1U); // If txnNumber is present, and the transaction table exists and has a UUID, the session @@ -2204,7 +2220,8 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 3U); auto expectedObj = BSON("_id" << lsid.toBSON()); @@ -2246,7 +2263,8 @@ TEST_F(RSRollbackTest, LocalEntryWithPartialTxnAddsTransactionTableDocToBeRefetc UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 1U); auto expectedObj = BSON("_id" << lsid.toBSON()); @@ -2278,7 +2296,8 @@ TEST_F(RSRollbackTest, LocalAbortTxnRefetchesTransactionTableEntry) { UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, abortTxnEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, abortTxnEntry, false)); ASSERT_EQ(fui.docsToRefetch.size(), 1U); auto expectedObj = BSON("_id" << lsid.toBSON()); @@ -2286,7 +2305,7 @@ TEST_F(RSRollbackTest, LocalAbortTxnRefetchesTransactionTableEntry) { ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end()); } -TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEntry) { +TEST_F(RSRollbackTest, LocalEntryWithAbortedPartialTxnRefetchesOnlyTransactionTableEntry) { FixUpInfo fui; // If txnNumber is present, and the transaction table exists and has a UUID, the session @@ -2335,8 +2354,10 @@ TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEn UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, abortTxnEntry, false)); - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, abortTxnEntry, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 1U); auto expectedObj = BSON("_id" << lsid.toBSON()); @@ -2344,6 +2365,329 @@ TEST_F(RSRollbackTest, LocalEntryWithAbortedInTxnRefetchesOnlyTransactionTableEn ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end()); } +TEST_F(RSRollbackTest, LocalEntryWithCommittedTxnRefetchesDocsAndTransactionTableEntry) { + FixUpInfo fui; + UUID uuid = UUID::gen(); + auto lsid = makeLogicalSessionIdForTest(); + auto commitTxnEntry = + BSON("ts" << Timestamp(Seconds(1), 2) << "t" << 1LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 2 << "a" << 2))) + << "count" + << 2) + << "txnNumber" + << 1LL + << "stmtId" + << 2 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(Seconds(1), 1) << "t" << 1LL)); + auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(2)); + + auto partialTxnEntry = + BSON("ts" << Timestamp(Seconds(1), 1) << "t" << 1LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1))) + << "partialTxn" + << true) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(0, 0) << "t" << -1LL)); + + auto partialTxnOperation = std::make_pair(partialTxnEntry, RecordId(1)); + UUID transactionTableUUID = UUID::gen(); + fui.transactionTableUUID = transactionTableUUID; + + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, + OplogInterfaceMock({commitTxnOperation, partialTxnOperation}), + fui, + commitTxnEntry, + false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, + OplogInterfaceMock({commitTxnOperation, partialTxnOperation}), + fui, + partialTxnEntry, + false)); + ASSERT_EQ(fui.docsToRefetch.size(), 3U); + + auto expectedObj = BSON("_id" << lsid.toBSON()); + DocID expectedTxnDoc(expectedObj, expectedObj.firstElement(), transactionTableUUID); + ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end()); + + auto expectedCrudObj = BSON("_id" << 2); + auto expectedCrudDoc = DocID(expectedObj, expectedObj.firstElement(), transactionTableUUID); + ASSERT_TRUE(fui.docsToRefetch.find(expectedCrudDoc) != fui.docsToRefetch.end()); + + expectedCrudObj = BSON("_id" << 1); + expectedCrudDoc = DocID(expectedObj, expectedObj.firstElement(), transactionTableUUID); + ASSERT_TRUE(fui.docsToRefetch.find(expectedCrudDoc) != fui.docsToRefetch.end()); +} + +TEST_F(RSRollbackTest, RollbackFetchesTransactionOperationBeforeCommonPoint) { + createOplog(_opCtx.get()); + CollectionOptions options; + options.uuid = UUID::gen(); + auto coll = _createCollection(_opCtx.get(), "test.t", options); + options.uuid = UUID::gen(); + auto txnTable = _createCollection(_opCtx.get(), "config.transactions", options); + + auto commonOperation = makeOpAndRecordId(10); + UUID uuid = *coll->uuid(); + auto lsid = makeLogicalSessionIdForTest(); + auto commitTxnEntry = + BSON("ts" << Timestamp(Seconds(10), 12) << "t" << 10LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 0 << "a" << 0))) + << "count" + << 3) + << "txnNumber" + << 1LL + << "stmtId" + << 3 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL)); + auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(12)); + + auto entryAfterCommonPoint = + BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1))) + << "partialTxn" + << true) + << "txnNumber" + << 1LL + << "stmtId" + << 2 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL)); + auto operationAfterCommonPoint = std::make_pair(entryAfterCommonPoint, RecordId(11)); + + auto entryBeforeCommonPoint = + BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 2 << "a" << 2))) + << "partialTxn" + << true) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(0, 0) << "t" << -1LL)); + auto operationBeforeCommonPoint = std::make_pair(entryBeforeCommonPoint, RecordId(9)); + + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog, const UUID& txnTableUuid) + : RollbackSourceMock(std::move(oplog)), _txnTableUuid(txnTableUuid) {} + + std::pair<BSONObj, NamespaceString> findOneByUUID(const std::string& db, + UUID uuid, + const BSONObj& filter) const override { + int numFields = 0; + if (uuid == _txnTableUuid) { + // This unit test does not test transaction table fetches. + return {BSONObj(), NamespaceString::kSessionTransactionsTableNamespace}; + } + for (const auto element : filter) { + ++numFields; + ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter; + } + ASSERT_EQUALS(1, numFields) << filter; + searchedIds.insert(filter.firstElement().numberInt()); + switch (filter.firstElement().numberInt()) { + case 0: + return {BSON("_id" << 0 << "v" << 0), NamespaceString()}; + case 1: + return {BSON("_id" << 1 << "v" << 1), NamespaceString()}; + case 2: + return {BSON("_id" << 2 << "v" << 3), NamespaceString()}; + } + FAIL("Unexpected findOne request") << filter; + return {}; // Unreachable; why doesn't compiler know? + } + + mutable std::multiset<int> searchedIds; + + private: + UUID _txnTableUuid; + + } rollbackSource(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock({commonOperation, operationBeforeCommonPoint})), + *txnTable->uuid()); + + ASSERT_OK(syncRollback(_opCtx.get(), + OplogInterfaceMock({commitTxnOperation, + operationAfterCommonPoint, + commonOperation, + operationBeforeCommonPoint}), + rollbackSource, + {}, + _coordinator, + _replicationProcess.get())); + ASSERT_EQUALS(3U, rollbackSource.searchedIds.size()); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(0)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2)); +} + +TEST_F(RSRollbackTest, RollbackIncompleteTransactionReturnsUnrecoverableRollbackError) { + createOplog(_opCtx.get()); + CollectionOptions options; + options.uuid = UUID::gen(); + auto coll = _createCollection(_opCtx.get(), "test.t", options); + options.uuid = UUID::gen(); + auto txnTable = _createCollection(_opCtx.get(), "config.transactions", options); + + auto commonOperation = makeOpAndRecordId(10); + UUID uuid = *coll->uuid(); + auto lsid = makeLogicalSessionIdForTest(); + auto commitTxnEntry = + BSON("ts" << Timestamp(Seconds(10), 12) << "t" << 10LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 0 << "a" << 0))) + << "count" + << 3) + << "stmtId" + << 3 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL)); + auto commitTxnOperation = std::make_pair(commitTxnEntry, RecordId(12)); + + auto entryAfterCommonPoint = + BSON("ts" << Timestamp(Seconds(10), 11) << "t" << 10LL << "op" + << "c" + << "ns" + << "admin.$cmd" + << "o" + << BSON("applyOps" << BSON_ARRAY(BSON("op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1))) + << "partialTxn" + << true) + << "txnNumber" + << 1LL + << "stmtId" + << 2 + << "lsid" + << lsid.toBSON() + << "prevOpTime" + << BSON("ts" << Timestamp(Seconds(10), 9) << "t" << 10LL)); + auto operationAfterCommonPoint = std::make_pair(entryAfterCommonPoint, RecordId(11)); + + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog, const UUID& txnTableUuid) + : RollbackSourceMock(std::move(oplog)), _txnTableUuid(txnTableUuid) {} + + std::pair<BSONObj, NamespaceString> findOneByUUID(const std::string& db, + UUID uuid, + const BSONObj& filter) const override { + if (uuid == _txnTableUuid) { + // This unit test does not test transaction table fetches. + return {BSONObj(), NamespaceString::kSessionTransactionsTableNamespace}; + } else { + return {BSONObj(), NamespaceString()}; + } + } + + private: + UUID _txnTableUuid; + } rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({commonOperation})), + *txnTable->uuid()); + + + auto status = syncRollback( + _opCtx.get(), + OplogInterfaceMock({commitTxnOperation, operationAfterCommonPoint, commonOperation}), + rollbackSource, + {}, + _coordinator, + _replicationProcess.get()); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); +} + TEST_F(RSRollbackTest, RollbackFailsIfTransactionDocumentRefetchReturnsDifferentNamespace) { createOplog(_opCtx.get()); @@ -2376,7 +2720,8 @@ TEST_F(RSRollbackTest, RollbackFailsIfTransactionDocumentRefetchReturnsDifferent fui.rbid = 1; // The FixUpInfo will have an extra doc to refetch: the corresponding transaction table entry. - ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false)); + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry( + nullptr /* opCtx */, OplogInterfaceMock(), fui, entryWithTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 2U); { |