summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2017-08-14 13:12:43 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2017-08-17 15:49:14 -0400
commit3d3543b684d239b12e6dac97d2e3d57d4b0dbfc4 (patch)
tree3678cde306c8389224f91f42918beec8448547b8
parentc1aaff64cdf88d3ff2f0220033964fa6fcdb5513 (diff)
downloadmongo-3d3543b684d239b12e6dac97d2e3d57d4b0dbfc4.tar.gz
SERVER-30076 Use the UUID of the transactions collection for rollback via refetch
-rw-r--r--jstests/replsets/rollback_transaction_table.js213
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp34
-rw-r--r--src/mongo/db/repl/rs_rollback.h3
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp119
-rw-r--r--src/mongo/db/session_catalog.cpp12
-rw-r--r--src/mongo/db/session_catalog.h6
6 files changed, 327 insertions, 60 deletions
diff --git a/jstests/replsets/rollback_transaction_table.js b/jstests/replsets/rollback_transaction_table.js
new file mode 100644
index 00000000000..e18a209eedb
--- /dev/null
+++ b/jstests/replsets/rollback_transaction_table.js
@@ -0,0 +1,213 @@
+/**
+ * Test that the transaction collection can be rolled back properly, as long as the UUID of the
+ * collection has not changed between the sync source and the primary.
+ *
+ * 1. Initiate a 3-node replica set, with two data bearing nodes.
+ * 2. Run a transaction on the primary and await replication.
+ * 3. Partition the primary.
+ * 4. On the partitioned primary:
+ * - Run a transaction with a higher txnNumber for the first session id.
+ * - Run a new transaction for a second session id.
+ * 5. On the newly-stepped up primary:
+ * - Run a new transaction for a third session id.
+ * 5. Heal the partition.
+ * 6. Verify the partitioned primary's transaction collection properly rolled back:
+ * - The txnNumber for the first session id is the original value.
+ * - There is no record for the second session id.
+ * - A record for the third session id was created during oplog replay.
+ *
+ */
+(function() {
+ "use strict";
+
+ load("jstests/replsets/rslib.js");
+
+ function assertSameRecordOnBothConnections(primary, secondary, lsid) {
+ let primaryRecord = primary.getDB("config").transactions.findOne({"_id.id": lsid.id});
+ let secondaryRecord = secondary.getDB("config").transactions.findOne({"_id.id": lsid.id});
+
+ assert.eq(bsonWoCompare(primaryRecord, secondaryRecord),
+ 0,
+ "expected transaction records: " + tojson(primaryRecord) + " and " +
+ tojson(secondaryRecord) + " to be the same for lsid: " + tojson(lsid));
+ }
+
+ function assertRecordHasTxnNumber(conn, lsid, txnNum) {
+ let recordTxnNum = conn.getDB("config").transactions.findOne({"_id.id": lsid.id}).txnNum;
+ assert.eq(recordTxnNum,
+ txnNum,
+ "expected node: " + conn + " to have txnNumber: " + txnNum + " for session id: " +
+ lsid + " - instead found: " + recordTxnNum);
+ }
+
+ let testName = "rollback_transaction_table";
+ let dbName = "test";
+
+ let replTest = new ReplSetTest({
+ name: testName,
+ nodes: [
+ // Primary flops between nodes 0 and 1.
+ {},
+ {},
+ // Arbiter to sway elections.
+ {rsConfig: {arbiterOnly: true}}
+ ],
+ useBridge: true,
+ nodeOptions: {setParameter: {rollbackMethod: "rollbackViaRefetch"}}
+ });
+ let nodes = replTest.startSet();
+ replTest.initiate();
+
+ let downstream = nodes[0];
+ let upstream = nodes[1];
+ let arbiter = nodes[2];
+
+ jsTestLog("Making sure 'downstream node' is the primary node.");
+ assert.eq(downstream, replTest.getPrimary());
+
+ jsTestLog("Running a transaction on the 'downstream node' and waiting for it to replicate.");
+ let firstLsid = {id: UUID()};
+ let firstCmd = {
+ insert: "foo",
+ documents: [{_id: 10}, {_id: 30}],
+ ordered: false,
+ lsid: firstLsid,
+ txnNumber: NumberLong(5)
+ };
+
+ assert.commandWorked(downstream.getDB(dbName).runCommand(firstCmd));
+ replTest.awaitReplication();
+
+ // Both data bearing nodes should have the same record for the first session id.
+ assertSameRecordOnBothConnections(downstream, upstream, firstLsid);
+
+ assert.eq(downstream.getDB("config").transactions.find().itcount(), 1);
+ assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(5));
+
+ assert.eq(upstream.getDB("config").transactions.find().itcount(), 1);
+ assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5));
+
+ jsTestLog(
+ "Creating a partition between 'the downstream and arbiter node' and 'the upstream node.'");
+ downstream.disconnect(upstream);
+ arbiter.disconnect(upstream);
+
+ jsTestLog(
+ "Running a higher transaction for the existing session on only the 'downstream node.'");
+ let higherTxnFirstCmd = {
+ insert: "foo",
+ documents: [{_id: 50}],
+ ordered: false,
+ lsid: firstLsid,
+ txnNumber: NumberLong(20)
+ };
+
+ assert.commandWorked(downstream.getDB(dbName).runCommand(higherTxnFirstCmd));
+
+ // Now the data bearing nodes should have different transaction table records for the first
+ // session id.
+ assert.eq(downstream.getDB("config").transactions.find().itcount(), 1);
+ assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20));
+
+ assert.eq(upstream.getDB("config").transactions.find().itcount(), 1);
+ assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5));
+
+ jsTestLog("Running a transaction for a second session on the 'downstream node.'");
+ let secondLsid = {id: UUID()};
+ let secondCmd = {
+ insert: "foo",
+ documents: [{_id: 100}, {_id: 200}],
+ ordered: false,
+ lsid: secondLsid,
+ txnNumber: NumberLong(100)
+ };
+
+ assert.commandWorked(downstream.getDB(dbName).runCommand(secondCmd));
+
+ // Only the downstream node should have two transaction table records, one for the first and
+ // second session ids.
+ assert.eq(downstream.getDB("config").transactions.find().itcount(), 2);
+ assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20));
+ assertRecordHasTxnNumber(downstream, secondLsid, NumberLong(100));
+
+ assert.eq(upstream.getDB("config").transactions.find().itcount(), 1);
+ assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5));
+
+ // We do not disconnect the downstream node from the arbiter node at the same time as we
+ // disconnect it from the upstream node. This prevents a race where the transaction using the
+ // second session id must finish before the downstream node steps down from being the primary.
+ jsTestLog(
+ "Disconnecting the 'downstream node' from the 'arbiter node' and reconnecting the 'upstream node' to the 'arbiter node.'");
+ downstream.disconnect(arbiter);
+ upstream.reconnect(arbiter);
+
+ jsTestLog("Waiting for the 'upstream node' to become the new primary.");
+ waitForState(downstream, ReplSetTest.State.SECONDARY);
+ waitForState(upstream, ReplSetTest.State.PRIMARY);
+ assert.eq(upstream, replTest.getPrimary());
+
+ jsTestLog("Running a new transaction for a third session on the 'upstream node.'");
+ let thirdLsid = {id: UUID()};
+ let thirdCmd = {
+ insert: "foo",
+ documents: [{_id: 1000}, {_id: 2000}],
+ ordered: false,
+ lsid: thirdLsid,
+ txnNumber: NumberLong(1)
+ };
+
+ assert.commandWorked(upstream.getDB(dbName).runCommand(thirdCmd));
+
+ // Now the upstream node also has two transaction table records, but for the first and third
+ // session ids, not the first and second.
+ assert.eq(downstream.getDB("config").transactions.find().itcount(), 2);
+ assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(20));
+ assertRecordHasTxnNumber(downstream, secondLsid, NumberLong(100));
+
+ assert.eq(upstream.getDB("config").transactions.find().itcount(), 2);
+ assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5));
+ assertRecordHasTxnNumber(upstream, thirdLsid, NumberLong(1));
+
+ // Gets the rollback ID of the downstream node before rollback occurs.
+ let downstreamRBIDBefore = assert.commandWorked(downstream.adminCommand('replSetGetRBID')).rbid;
+
+ jsTestLog("Reconnecting the 'downstream node.'");
+ downstream.reconnect(upstream);
+ downstream.reconnect(arbiter);
+
+ jsTestLog("Waiting for the 'downstream node' to complete rollback.");
+ replTest.awaitReplication();
+ replTest.awaitSecondaryNodes();
+
+ // Ensure that connection to the downstream node is re-established, since the connection should
+ // have gotten killed during the downstream node's transition to ROLLBACK state.
+ reconnect(downstream);
+
+ jsTestLog(
+ "Checking the rollback ID of the downstream node to confirm that a rollback occurred.");
+ assert.neq(downstreamRBIDBefore,
+ assert.commandWorked(downstream.adminCommand('replSetGetRBID')).rbid);
+
+ // Verify the record for the first lsid rolled back to its original value, the record for the
+ // second lsid was removed, and the record for the third lsid was created during oplog replay.
+ jsTestLog("Verifying the transaction collection rolled back properly.");
+
+ assertSameRecordOnBothConnections(downstream, upstream, firstLsid);
+ assertRecordHasTxnNumber(downstream, firstLsid, NumberLong(5));
+ assertRecordHasTxnNumber(upstream, firstLsid, NumberLong(5));
+
+ assert.isnull(downstream.getDB("config").transactions.findOne({"_id.id": secondLsid.id}));
+ assert.isnull(upstream.getDB("config").transactions.findOne({"_id.id": secondLsid.id}));
+
+ assertSameRecordOnBothConnections(downstream, upstream, thirdLsid);
+ assertRecordHasTxnNumber(downstream, thirdLsid, NumberLong(1));
+ assertRecordHasTxnNumber(upstream, thirdLsid, NumberLong(1));
+
+ assert.eq(downstream.getDB("config").transactions.find().itcount(), 2);
+ assert.eq(upstream.getDB("config").transactions.find().itcount(), 2);
+
+ // Confirm the nodes are consistent.
+ replTest.checkReplicatedDataHashes(testName);
+
+ replTest.stopSet();
+}());
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 9532179b3d0..a6ac87e169a 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -256,19 +256,23 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf
invariant(sessionId);
invariant(oplogEntry.getStatementId());
- // TODO: SERVER-30076
- // Once collection uuids replace namespace strings for rollback, this will need to be
- // changed to the uuid of the session transaction table collection. Need to add
- // txnDoc.uuid with the proper uuid.
- // DocID txnDoc;
- // BSONObjBuilder txnBob;
- // txnBob.append("_id", sessionId->toBSON());
- // txnDoc.ownedObj = txnBob.obj();
- // txnDoc._id = txnDoc.ownedObj.firstElement();
- // txnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns().c_str();
- //
- // fixUpInfo.docsToRefetch.insert(txnDoc);
- // fixUpInfo.refetchTransactionDocs = true;
+ auto transactionTableUUID = fixUpInfo.transactionTableUUID;
+ if (transactionTableUUID) {
+ BSONObjBuilder txnBob;
+ txnBob.append("_id", sessionId->toBSON());
+ auto txnObj = txnBob.obj();
+
+ DocID txnDoc(txnObj, txnObj.firstElement(), transactionTableUUID.get());
+ txnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns();
+
+ fixUpInfo.docsToRefetch.insert(txnDoc);
+ fixUpInfo.refetchTransactionDocs = true;
+ } else {
+ throw RSFatalException(
+ str::stream() << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " does not have a UUID, but local op has a transaction number: "
+ << redact(oplogEntry.toBSON()));
+ }
}
if (oplogEntry.getOpType() == OpTypeEnum::kCommand) {
@@ -1308,6 +1312,10 @@ Status _syncRollback(OperationContext* opCtx,
uassert(
40506, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID);
+ // Find the UUID of the transactions collection. An OperationContext is required because the
+ // UUID is not known at compile time, so the SessionCatalog needs to load the collection.
+ how.transactionTableUUID = SessionCatalog::getTransactionTableUUID(opCtx);
+
log() << "Finding the Common Point";
try {
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index b4659313608..74d59ae3c8f 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -276,6 +276,9 @@ struct FixUpInfo {
stdx::unordered_map<UUID, std::pair<OpTime, NamespaceString>, UUID::Hash>
collectionsToRemoveFromDropPendingCollections;
+ // The UUID of the transactions collection. Set at the beginning of rollback.
+ boost::optional<UUID> transactionTableUUID = boost::none;
+
// True if rollback requires re-fetching documents in the session transaction table. If true,
// after rollback the in-memory transaction table is cleared.
bool refetchTransactionDocs = false;
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 6555843ad26..ec966c4cb01 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -1975,53 +1975,78 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invar
RSFatalException);
}
-// TODO: Uncomment this test once transactions have been updated to work with the proper
-// uuid. See SERVER-30076.
-// TEST(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetched) {
-// FixUpInfo fui;
-// auto entryWithoutTxnNumber =
-// BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op"
-// << "i"
-// << "ui"
-// << UUID::gen()
-// << "ns"
-// << "test.t2"
-// << "o"
-// << BSON("_id" << 2 << "a" << 2));
-// ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber));
-//
-// // With no txnNumber present, no extra documents need to be refetched.
-// ASSERT_EQ(fui.docsToRefetch.size(), 1U);
-//
-// UUID uuid = UUID::gen();
-// auto entryWithTxnNumber =
-// BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op"
-// << "i"
-// << "ui"
-// << uuid
-// << "ns"
-// << "test.t"
-// << "o"
-// << BSON("_id" << 1 << "a" << 1)
-// << "txnNumber"
-// << 1LL
-// << "stmtId"
-// << 1
-// << "lsid"
-// << makeLogicalSessionIdForTest().toBSON());
-// ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber));
-//
-// // If txnNumber is present, the session transactions table document corresponding to the oplog
-// // entry's sessionId also needs to be refetched.
-// ASSERT_EQ(fui.docsToRefetch.size(), 3U);
-//
-// DocID expectedTxnDoc;
-// expectedTxnDoc.ownedObj = BSON("_id" << entryWithTxnNumber["lsid"]);
-// expectedTxnDoc._id = expectedTxnDoc.ownedObj.firstElement();
-// expectedTxnDoc.ns = NamespaceString::kSessionTransactionsTableNamespace.ns().c_str();
-// expectedTxnDoc.uuid = uuid;
-// ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end());
-//}
+TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) {
+ // If txnNumber is present, but the transaction collection has no UUID, rollback fails.
+ UUID uuid = UUID::gen();
+ auto lsid = makeLogicalSessionIdForTest();
+ auto entryWithTxnNumber =
+ BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 1
+ << "lsid"
+ << lsid.toBSON());
+
+ FixUpInfo fui;
+ ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber).ignore(),
+ RSFatalException);
+}
+
+TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetched) {
+ FixUpInfo fui;
+
+ // With no txnNumber present, no extra documents need to be refetched.
+ auto entryWithoutTxnNumber =
+ BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op"
+ << "i"
+ << "ui"
+ << UUID::gen()
+ << "ns"
+ << "test.t2"
+ << "o"
+ << BSON("_id" << 2 << "a" << 2));
+
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber));
+ ASSERT_EQ(fui.docsToRefetch.size(), 1U);
+
+ // If txnNumber is present, and the transaction table exists and has a UUID, the session
+ // transactions table document corresponding to the oplog entry's sessionId also needs to be
+ // refetched.
+ UUID uuid = UUID::gen();
+ auto lsid = makeLogicalSessionIdForTest();
+ auto entryWithTxnNumber =
+ BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op"
+ << "i"
+ << "ui"
+ << uuid
+ << "ns"
+ << "test.t"
+ << "o"
+ << BSON("_id" << 1 << "a" << 1)
+ << "txnNumber"
+ << 1LL
+ << "stmtId"
+ << 1
+ << "lsid"
+ << lsid.toBSON());
+ UUID transactionTableUUID = UUID::gen();
+ fui.transactionTableUUID = transactionTableUUID;
+
+ ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber));
+ ASSERT_EQ(fui.docsToRefetch.size(), 3U);
+
+ auto expectedObj = BSON("_id" << lsid.toBSON());
+ DocID expectedTxnDoc(expectedObj, expectedObj.firstElement(), transactionTableUUID);
+ ASSERT_TRUE(fui.docsToRefetch.find(expectedTxnDoc) != fui.docsToRefetch.end());
+}
TEST_F(RSRollbackTest, RollbackReturnsImmediatelyOnFailureToTransitionToRollback) {
// On failing to transition to ROLLBACK, rollback() should return immediately and not call
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index e7b672aa1d5..4e0a28bb0ac 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -34,6 +34,7 @@
#include <boost/optional.hpp>
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@@ -91,6 +92,17 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) {
return sessionTransactionTable.get_ptr();
}
+boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext* opCtx) {
+ AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
+
+ const auto coll = autoColl.getCollection();
+ if (coll == nullptr) {
+ return boost::none;
+ }
+
+ return coll->uuid();
+}
+
void SessionCatalog::onStepUp(OperationContext* opCtx) {
DBDirectClient client(opCtx);
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index b4c4d6b3775..27aeb059669 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -75,6 +75,12 @@ public:
static SessionCatalog* get(ServiceContext* service);
/**
+ * Fetches the UUID of the transaction table, or an empty optional if the collection does not
+ * exist or has no UUID. Acquires a lock on the collection. Required for rollback via refetch.
+ */
+ static boost::optional<UUID> getTransactionTableUUID(OperationContext* opCtx);
+
+ /**
* Invoked when the node enters the primary state. Ensures that the transactions collection is
* created. Throws on severe exceptions due to which it is not safe to continue the step-up
* process.