diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-08-14 13:12:43 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2017-08-17 15:49:14 -0400 |
commit | 3d3543b684d239b12e6dac97d2e3d57d4b0dbfc4 (patch) | |
tree | 3678cde306c8389224f91f42918beec8448547b8 | |
parent | c1aaff64cdf88d3ff2f0220033964fa6fcdb5513 (diff) | |
download | mongo-3d3543b684d239b12e6dac97d2e3d57d4b0dbfc4.tar.gz |
SERVER-30076 Use the UUID of the transactions collection for rollback via refetch
-rw-r--r-- | jstests/replsets/rollback_transaction_table.js | 213 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 119 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 6 |
6 files changed, 327 insertions, 60 deletions
diff --git a/jstests/replsets/rollback_transaction_table.js b/jstests/replsets/rollback_transaction_table.js new file mode 100644 index 00000000000..e18a209eedb --- /dev/null +++ b/jstests/replsets/rollback_transaction_table.js @@ -0,0 +1,213 @@ +/** + * Test that the transaction collection can be rolled back properly, as long as the UUID of the + * collection has not changed between the sync source and the primary. + * + * 1. Initiate a 3-node replica set, with two data bearing nodes. + * 2. Run a transaction on the primary and await replication. + * 3. Partition the primary. + * 4. On the partitioned primary: + * - Run a transaction with a higher txnNumber for the first session id. + * - Run a new transaction for a second session id. + * 5. On the newly-stepped up primary: + * - Run a new transaction for a third session id. + * 5. Heal the partition. + * 6. Verify the partitioned primary's transaction collection properly rolled back: + * - The txnNumber for the first session id is the original value. + * - There is no record for the second session id. + * - A record for the third session id was created during oplog replay. + * + */ +(function() { + "use strict"; + + load("jstests/replsets/rslib.js"); + + function assertSameRecordOnBothConnections(primary, secondary, lsid) { + let primaryRecord = primary.getDB("config").transactions.findOne({"_id.id": lsid.id}); + let secondaryRecord = secondary.getDB("config").transactions.findOne({"_id.id": lsid.id}); + + assert.eq(bsonWoCompare(primaryRecord, secondaryRecord), + 0, + "expected transaction records: " + tojson(primaryRecord) + " and " + + tojson(secondaryRecord) + " to be the same for lsid: " + tojson(lsid)); + } + + function assertRecordHasTxnNumber(conn, lsid, txnNum) { + let recordTxnNum = conn.getDB("config").transactions.findOne({"_id.id": lsid.id}).txnNum; + assert.eq(recordTxnNum, + txnNum, + "expected node: " + conn + " to have txnNumber: " + txnNum + " for session id: " + + lsid + " - instead found: " + recordTxnNum); + } + + let testName = "rollback_transaction_table"; + let dbName = "test"; + + let replTest = new ReplSetTest({ + name: testName, + nodes: [ + // Primary flops between nodes 0 and 1. + {}, + {}, + // Arbiter to sway elections. + {rsConfig: {arbiterOnly: true}} + ], + useBridge: true, + nodeOptions: {setParameter: {rollbackMethod: "rollbackViaRefetch"}} + }); + let nodes = replTest.startSet(); + replTest.initiate(); + + let downstream = nodes[0]; + let upstream = nodes[1]; + let arbiter = nodes[2]; + + jsTestLog("Making sure 'downstream node' is the primary node."); + assert.eq(downstream, replTest.getPrimary()); + + jsTestLog("Running a transaction on the 'downstream node' and waiting for it to replicate."); + let firstLsid = {id: UUID()}; + let firstCmd = { + insert: "foo", + documents: [{_id: 10}, {_id: 30}], + ordered: false, + lsid: firstLsid, + txnNumber: NumberLong(5) + }; + + assert.commandWorked(downstream.getDB(dbName).runCommand(firstCmd)); + replTest.awaitReplication(); + + // Both data bearing nodes should have the same record for the first session id. + assertSameRecordOnBothConnections(downstream, upstream, firstLsid); + + assert.eq(downstream.getDB("config").transactions.find().itcount(), 1); + assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(5)); + + assert.eq(upstream.getDB("config").transactions.find().itcount(), 1); + assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5)); + + jsTestLog( + "Creating a partition between 'the downstream and arbiter node' and 'the upstream node.'"); + downstream.disconnect(upstream); + arbiter.disconnect(upstream); + + jsTestLog( + "Running a higher transaction for the existing session on only the 'downstream node.'"); + let higherTxnFirstCmd = { + insert: "foo", + documents: [{_id: 50}], + ordered: false, + lsid: firstLsid, + txnNumber: NumberLong(20) + }; + + assert.commandWorked(downstream.getDB(dbName).runCommand(higherTxnFirstCmd)); + + // Now the data bearing nodes should have different transaction table records for the first + // session id. + assert.eq(downstream.getDB("config").transactions.find().itcount(), 1); + assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20)); + + assert.eq(upstream.getDB("config").transactions.find().itcount(), 1); + assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5)); + + jsTestLog("Running a transaction for a second session on the 'downstream node.'"); + let secondLsid = {id: UUID()}; + let secondCmd = { + insert: "foo", + documents: [{_id: 100}, {_id: 200}], + ordered: false, + lsid: secondLsid, + txnNumber: NumberLong(100) + }; + + assert.commandWorked(downstream.getDB(dbName).runCommand(secondCmd)); + + // Only the downstream node should have two transaction table records, one for the first and + // second session ids. + assert.eq(downstream.getDB("config").transactions.find().itcount(), 2); + assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20)); + assertRecordHasTxnNumber(downstream, secondLsid, NumberLong(100)); + + assert.eq(upstream.getDB("config").transactions.find().itcount(), 1); + assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5)); + + // We do not disconnect the downstream node from the arbiter node at the same time as we + // disconnect it from the upstream node. This prevents a race where the transaction using the + // second session id must finish before the downstream node steps down from being the primary. + jsTestLog( + "Disconnecting the 'downstream node' from the 'arbiter node' and reconnecting the 'upstream node' to the 'arbiter node.'"); + downstream.disconnect(arbiter); + upstream.reconnect(arbiter); + + jsTestLog("Waiting for the 'upstream node' to become the new primary."); + waitForState(downstream, ReplSetTest.State.SECONDARY); + waitForState(upstream, ReplSetTest.State.PRIMARY); + assert.eq(upstream, replTest.getPrimary()); + + jsTestLog("Running a new transaction for a third session on the 'upstream node.'"); + let thirdLsid = {id: UUID()}; + let thirdCmd = { + insert: "foo", + documents: [{_id: 1000}, {_id: 2000}], + ordered: false, + lsid: thirdLsid, + txnNumber: NumberLong(1) + }; + + assert.commandWorked(upstream.getDB(dbName).runCommand(thirdCmd)); + + // Now the upstream node also has two transaction table records, but for the first and third + // session ids, not the first and second. + assert.eq(downstream.getDB("config").transactions.find().itcount(), 2); + assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20)); + assertRecordHasTxnNumber(downstream, secondLsid, NumberLong(100)); + + assert.eq(upstream.getDB("config").transactions.find().itcount(), 2); + assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5)); + assertRecordHasTxnNumber(upstream, thirdLsid, NumberLong(1)); + + // Gets the rollback ID of the downstream node before rollback occurs. + let downstreamRBIDBefore = assert.commandWorked(downstream.adminCommand('replSetGetRBID')).rbid; + + jsTestLog("Reconnecting the 'downstream node.'"); + downstream.reconnect(upstream); + downstream.reconnect(arbiter); + + jsTestLog("Waiting for the 'downstream node' to complete rollback."); + replTest.awaitReplication(); + replTest.awaitSecondaryNodes(); + + // Ensure that connection to the downstream node is re-established, since the connection should + // have gotten killed during the downstream node's transition to ROLLBACK state. + reconnect(downstream); + + jsTestLog( + "Checking the rollback ID of the downstream node to confirm that a rollback occurred."); + assert.neq(downstreamRBIDBefore, + assert.commandWorked(downstream.adminCommand('replSetGetRBID')).rbid); + + // Verify the record for the first lsid rolled back to its original value, the record for the + // second lsid was removed, and the record for the third lsid was created during oplog replay. + jsTestLog("Verifying the transaction collection rolled back properly."); + + assertSameRecordOnBothConnections(downstream, upstream, firstLsid); + assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(5)); + assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5)); + + assert.isnull(downstream.getDB("config").transactions.findOne({"_id.id": secondLsid.id})); + assert.isnull(upstream.getDB("config").transactions.findOne({"_id.id": secondLsid.id})); + + assertSameRecordOnBothConnections(downstream, upstream, thirdLsid); + assertRecordHasTxnNumber(downstream, thirdLsid, NumberLong(1)); + assertRecordHasTxnNumber(upstream, thirdLsid, NumberLong(1)); + + assert.eq(downstream.getDB("config").transactions.find().itcount(), 2); + assert.eq(upstream.getDB("config").transactions.find().itcount(), 2); + + // Confirm the nodes are consistent. + replTest.checkReplicatedDataHashes(testName); + + replTest.stopSet(); +}()); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 9532179b3d0..a6ac87e169a 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -256,19 +256,23 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf invariant(sessionId); invariant(oplogEntry.getStatementId()); - // TODO: SERVER-30076 - // Once collection uuids replace namespace strings for rollback, this will need to be - // changed to the uuid of the session transaction table collection. Need to add - // txnDoc.uuid with the proper uuid. - // DocID txnDoc; - // BSONObjBuilder txnBob; - // txnBob.append("_id", sessionId->toBSON()); - // txnDoc.ownedObj = txnBob.obj(); - // txnDoc._id = txnDoc.ownedObj.firstElement(); - // txnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns().c_str(); - // - // fixUpInfo.docsToRefetch.insert(txnDoc); - // fixUpInfo.refetchTransactionDocs = true; + auto transactionTableUUID = fixUpInfo.transactionTableUUID; + if (transactionTableUUID) { + BSONObjBuilder txnBob; + txnBob.append("_id", sessionId->toBSON()); + auto txnObj = txnBob.obj(); + + DocID txnDoc(txnObj, txnObj.firstElement(), transactionTableUUID.get()); + txnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns(); + + fixUpInfo.docsToRefetch.insert(txnDoc); + fixUpInfo.refetchTransactionDocs = true; + } else { + throw RSFatalException( + str::stream() << NamespaceString::kSessionTransactionsTableNamespace.ns() + << " does not have a UUID, but local op has a transaction number: " + << redact(oplogEntry.toBSON())); + } } if (oplogEntry.getOpType() == OpTypeEnum::kCommand) { @@ -1308,6 +1312,10 @@ Status _syncRollback(OperationContext* opCtx, uassert( 40506, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID); + // Find the UUID of the transactions collection. An OperationContext is required because the + // UUID is not known at compile time, so the SessionCatalog needs to load the collection. + how.transactionTableUUID = SessionCatalog::getTransactionTableUUID(opCtx); + log() << "Finding the Common Point"; try { diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index b4659313608..74d59ae3c8f 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -276,6 +276,9 @@ struct FixUpInfo { stdx::unordered_map<UUID, std::pair<OpTime, NamespaceString>, UUID::Hash> collectionsToRemoveFromDropPendingCollections; + // The UUID of the transactions collection. Set at the beginning of rollback. + boost::optional<UUID> transactionTableUUID = boost::none; + // True if rollback requires re-fetching documents in the session transaction table. If true, // after rollback the in-memory transaction table is cleared. bool refetchTransactionDocs = false; diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 6555843ad26..ec966c4cb01 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -1975,53 +1975,78 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar RSFatalException); } -// TODO: Uncomment this test once transactions have been updated to work with the proper -// uuid. See SERVER-30076. -// TEST(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetched) { -// FixUpInfo fui; -// auto entryWithoutTxnNumber = -// BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" -// << "i" -// << "ui" -// << UUID::gen() -// << "ns" -// << "test.t2" -// << "o" -// << BSON("_id" << 2 << "a" << 2)); -// ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber)); -// -// // With no txnNumber present, no extra documents need to be refetched. -// ASSERT_EQ(fui.docsToRefetch.size(), 1U); -// -// UUID uuid = UUID::gen(); -// auto entryWithTxnNumber = -// BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" -// << "i" -// << "ui" -// << uuid -// << "ns" -// << "test.t" -// << "o" -// << BSON("_id" << 1 << "a" << 1) -// << "txnNumber" -// << 1LL -// << "stmtId" -// << 1 -// << "lsid" -// << makeLogicalSessionIdForTest().toBSON()); -// ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber)); -// -// // If txnNumber is present, the session transactions table document corresponding to the oplog -// // entry's sessionId also needs to be refetched. -// ASSERT_EQ(fui.docsToRefetch.size(), 3U); -// -// DocID expectedTxnDoc; -// expectedTxnDoc.ownedObj = BSON("_id" << entryWithTxnNumber["lsid"]); -// expectedTxnDoc._id = expectedTxnDoc.ownedObj.firstElement(); -// expectedTxnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns().c_str(); -// expectedTxnDoc.uuid = uuid; -// ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end()); -//} +TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) { + // If txnNumber is present, but the transaction collection has no UUID, rollback fails. + UUID uuid = UUID::gen(); + auto lsid = makeLogicalSessionIdForTest(); + auto entryWithTxnNumber = + BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON()); + + FixUpInfo fui; + ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber).ignore(), + RSFatalException); +} + +TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetched) { + FixUpInfo fui; + + // With no txnNumber present, no extra documents need to be refetched. + auto entryWithoutTxnNumber = + BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" + << "i" + << "ui" + << UUID::gen() + << "ns" + << "test.t2" + << "o" + << BSON("_id" << 2 << "a" << 2)); + + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber)); + ASSERT_EQ(fui.docsToRefetch.size(), 1U); + + // If txnNumber is present, and the transaction table exists and has a UUID, the session + // transactions table document corresponding to the oplog entry's sessionId also needs to be + // refetched. + UUID uuid = UUID::gen(); + auto lsid = makeLogicalSessionIdForTest(); + auto entryWithTxnNumber = + BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON()); + UUID transactionTableUUID = UUID::gen(); + fui.transactionTableUUID = transactionTableUUID; + + ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber)); + ASSERT_EQ(fui.docsToRefetch.size(), 3U); + + auto expectedObj = BSON("_id" << lsid.toBSON()); + DocID expectedTxnDoc(expectedObj, expectedObj.firstElement(), transactionTableUUID); + ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end()); +} TEST_F(RSRollbackTest, RollbackReturnsImmediatelyOnFailureToTransitionToRollback) { // On failing to transition to ROLLBACK, rollback() should return immediately and not call diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index e7b672aa1d5..4e0a28bb0ac 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -34,6 +34,7 @@ #include <boost/optional.hpp> +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" @@ -91,6 +92,17 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) { return sessionTransactionTable.get_ptr(); } +boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext* opCtx) { + AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); + + const auto coll = autoColl.getCollection(); + if (coll == nullptr) { + return boost::none; + } + + return coll->uuid(); +} + void SessionCatalog::onStepUp(OperationContext* opCtx) { DBDirectClient client(opCtx); diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index b4c4d6b3775..27aeb059669 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -75,6 +75,12 @@ public: static SessionCatalog* get(ServiceContext* service); /** + * Fetches the UUID of the transaction table, or an empty optional if the collection does not + * exist or has no UUID. Acquires a lock on the collection. Required for rollback via refetch. + */ + static boost::optional<UUID> getTransactionTableUUID(OperationContext* opCtx); + + /** * Invoked when the node enters the primary state. Ensures that the transactions collection is * created. Throws on severe exceptions due to which it is not safe to continue the step-up * process. |