summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2021-12-06 17:03:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-06 17:27:42 +0000
commit8c82e865d31fe143173753689bca0170bd715250 (patch)
tree4e111b6d577684fbd62fcf1d9dcbd588bbfdba0b
parent7d00ce21af91b582235da7586ff50537a99b3839 (diff)
downloadmongo-8c82e865d31fe143173753689bca0170bd715250.tar.gz
SERVER-58761 Make setFCV delete the config.transactions entries for all child sessions
-rw-r--r--jstests/multiVersion/internal_sessions_downgrade.js283
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp168
2 files changed, 451 insertions, 0 deletions
diff --git a/jstests/multiVersion/internal_sessions_downgrade.js b/jstests/multiVersion/internal_sessions_downgrade.js
new file mode 100644
index 00000000000..7b342b9b667
--- /dev/null
+++ b/jstests/multiVersion/internal_sessions_downgrade.js
@@ -0,0 +1,283 @@
+/*
+ * Test that internal sessions documents are properly removed from the config.transactions
+ * collection.
+ *
+ * @tags: [requires_fcv_51, featureFlagInternalTransactions]
+ */
+(function() {
+'use strict';
+
+const kConfigTxnNs = "config.transactions";
+const kDbName = "testDb";
+const kCollName = "testColl";
+
+(() => {
+ jsTest.log(
+ "Test downgrade updates existing parent session document with highest txnNumber of its " +
+ "child sessions");
+
+ const st = new ShardingTest({shards: {rs0: {nodes: 2}}});
+ const shard0Rst = st.rs0;
+ const shard0Primary = shard0Rst.getPrimary();
+
+ const testDB = shard0Primary.getDB(kDbName);
+
+ const sessionUUID = UUID();
+ const parentLsid = {id: sessionUUID};
+ const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()};
+ const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(6), txnUUID: UUID()};
+
+ // Create the parent and child sessions
+ assert.commandWorked(testDB.runCommand(
+ {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(4)}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: childLsid0,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 2}],
+ lsid: childLsid1,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false}));
+
+ const parentDocBeforeDowngrade =
+ shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+
+ assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+
+ shard0Rst.nodes.forEach(node => {
+ // We expect every document except for the parent session documents to be deleted.
+ const collectionDocCount = node.getCollection(kConfigTxnNs).count();
+ assert.eq(collectionDocCount, 1);
+
+ const doc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+ assert.eq(doc.txnNum, NumberLong(6));
+ assert.eq(doc.lastWriteOpTime.ts, Timestamp(1, 0));
+ assert.eq(doc.lastWriteOpTime.t, NumberLong(1));
+ assert.gte(doc.lastWriteDate, parentDocBeforeDowngrade.lastWriteDate);
+ });
+
+ st.stop();
+})();
+
+(() => {
+ jsTest.log(
+ "Test downgrade upserts new transaction document if relevant parent session document " +
+ "does not exist and does not modify unrelated transaction documents.");
+
+ const st = new ShardingTest({shards: {rs0: {nodes: 2}}});
+ const shard0Rst = st.rs0;
+ const shard0Primary = shard0Rst.getPrimary();
+
+ const kDbName = "testDb";
+ const kCollName = "testColl";
+ const testDB = shard0Primary.getDB(kDbName);
+
+ const sessionUUID = UUID();
+ const unrelatedParentLsid = {id: UUID()};
+ const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(7), txnUUID: UUID()};
+ const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(8), txnUUID: UUID()};
+
+ // Start a parent session without related children.
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: unrelatedParentLsid,
+ txnNumber: NumberLong(10)
+ }));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: childLsid0,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 2}],
+ lsid: childLsid1,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false}));
+
+ const unrelatedParentDocBeforeDowngrade =
+ shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": unrelatedParentLsid.id});
+
+ assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+
+ shard0Rst.nodes.forEach(node => {
+ // We expect every document except for the parent session documents to be deleted.
+ const collectionDocCount = node.getCollection(kConfigTxnNs).count();
+ assert.eq(collectionDocCount, 2);
+
+ const upsertedDoc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+ assert.eq(upsertedDoc.txnNum, NumberLong(8));
+ assert.eq(upsertedDoc.lastWriteOpTime.ts, Timestamp(1, 0));
+ assert.eq(upsertedDoc.lastWriteOpTime.t, NumberLong(1));
+
+ const unrelatedParentDocAfterDowngrade =
+ node.getCollection(kConfigTxnNs).findOne({"_id.id": unrelatedParentLsid.id});
+ assert.eq(unrelatedParentDocAfterDowngrade, unrelatedParentDocBeforeDowngrade);
+
+ // For newly upserted session documents, we use the current wall clock time as the
+ // lastWriteDate, thus we expect it to be farther ahead than the unchanged session document.
+ assert.gte(upsertedDoc.lastWriteDate, unrelatedParentDocAfterDowngrade.lastWriteDate);
+ });
+
+ st.stop();
+})();
+
+(() => {
+ jsTest.log(
+ "Test downgrade does not modify a parent session document if it has a higher txnNumber " +
+ "than its children.");
+
+ const st = new ShardingTest({shards: {rs0: {nodes: 2}}});
+ const shard0Rst = st.rs0;
+ const shard0Primary = shard0Rst.getPrimary();
+
+ const kDbName = "testDb";
+ const kCollName = "testColl";
+ const testDB = shard0Primary.getDB(kDbName);
+
+ const sessionUUID = UUID();
+ const parentLsid = {id: sessionUUID};
+ const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(11), txnUUID: UUID()};
+ const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(12), txnUUID: UUID()};
+
+ // Start a parent session without related children.
+ assert.commandWorked(testDB.runCommand(
+ {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(13)}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: childLsid0,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 2}],
+ lsid: childLsid1,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ const parentDocBeforeDowngrade =
+ shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false}));
+
+ assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+
+ shard0Rst.nodes.forEach(node => {
+ // We expect every document except for the parent session documents to be deleted.
+ const collectionDocCount = node.getCollection(kConfigTxnNs).count();
+ assert.eq(collectionDocCount, 1);
+
+ const parentDocAfterDowngrade =
+ shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+ assert.eq(parentDocAfterDowngrade, parentDocBeforeDowngrade);
+ });
+
+ st.stop();
+})();
+
+(() => {
+ jsTest.log("Test downgrade modifies a parent session document if it has a txnNumber equal to " +
+ " the highest txnNumber of its children.");
+
+ const st = new ShardingTest({shards: {rs0: {nodes: 2}}});
+ const shard0Rst = st.rs0;
+ const shard0Primary = shard0Rst.getPrimary();
+
+ const kDbName = "testDb";
+ const kCollName = "testColl";
+ const testDB = shard0Primary.getDB(kDbName);
+
+ const sessionUUID = UUID();
+ const parentLsid = {id: sessionUUID};
+ const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(13), txnUUID: UUID()};
+ const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(14), txnUUID: UUID()};
+
+ // Start a parent session without related children.
+ assert.commandWorked(testDB.runCommand(
+ {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(14)}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: childLsid0,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false}));
+
+ assert.commandWorked(testDB.runCommand({
+ insert: kCollName,
+ documents: [{x: 2}],
+ lsid: childLsid1,
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(testDB.adminCommand(
+ {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false}));
+
+ const parentDocBeforeDowngrade =
+ shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+
+ assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+
+ shard0Rst.nodes.forEach(node => {
+ // We expect every document except for the parent session documents to be deleted.
+ const collectionDocCount = node.getCollection(kConfigTxnNs).count();
+ assert.eq(collectionDocCount, 1);
+
+ const doc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID});
+ assert.eq(doc.txnNum, NumberLong(14));
+ assert.eq(doc.lastWriteOpTime.ts, Timestamp(1, 0));
+ assert.eq(doc.lastWriteOpTime.t, NumberLong(1));
+ assert.gte(doc.lastWriteDate, parentDocBeforeDowngrade.lastWriteDate);
+ });
+
+ st.stop();
+})();
+})();
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index 311036f4a32..0afc4dd5348 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -69,6 +69,8 @@
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/vector_clock.h"
#include "mongo/db/views/view_catalog.h"
@@ -655,6 +657,46 @@ private:
}
}
+ {
+
+ LOGV2(5876100, "Starting removal of internal sessions from config.transactions.");
+
+ // Due to the possibility that the shell or drivers have implicit sessions enabled, we
+ // cannot write to the config.transactions collection while we're in a session. So we
+ // construct a temporary client to as a work around.
+ auto newClient = opCtx->getServiceContext()->makeClient("InternalSessionsCleanup");
+
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
+
+ AlternativeClientRegion acr(newClient);
+
+ auto setFcvCancellationThreadPool([] {
+ ThreadPool::Options options;
+ options.poolName = "SetFcvDowngradeCancellableOpCtxPool";
+ options.minThreads = 1;
+ options.maxThreads = 1;
+
+ auto threadPool = std::make_shared<ThreadPool>(std::move(options));
+ threadPool->startup();
+ return threadPool;
+ }());
+
+ CancelableOperationContextFactory factory(opCtx->getCancellationToken(),
+ setFcvCancellationThreadPool);
+
+ // We use a CancelableOperationContext in order to stop cleanup if the original opCtx
+ // has been interrupted.
+ auto newOpCtxPtr = factory.makeOperationContext(&cc());
+ auto newOpCtx = newOpCtxPtr.get();
+
+ _cleanupInternalSessions(newOpCtx);
+
+ LOGV2(5876101, "Completed removal of internal sessions from config.transactions.");
+ }
+
uassert(ErrorCodes::Error(549181),
"Failing downgrade due to 'failDowngrading' failpoint set",
!failDowngrading.shouldFail());
@@ -721,6 +763,132 @@ private:
}
}
+ /**
+ * Removes all child sessions from the config.transactions collection and updates the parent
+ * sessions to have the highest txnNumber of either itself or its child sessions.
+ */
+ void _cleanupInternalSessions(OperationContext* opCtx) {
+ _updateSessionDocuments(opCtx, _constructParentLsidToTxnNumberMap(opCtx));
+ _deleteChildSessionDocuments(opCtx);
+ }
+
+ /**
+ * Constructs a map consisting of a mapping between the parent session and the highest
+ * txnNumber of its child sessions.
+ */
+ LogicalSessionIdMap<TxnNumber> _constructParentLsidToTxnNumberMap(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+
+ LogicalSessionIdMap<TxnNumber> parentLsidToTxnNum;
+ auto projection = BSON("_id" << 1 << "parentLsid" << 1);
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ BSON("parentLsid" << BSON("$exists" << true)),
+ {},
+ 0,
+ 0,
+ &projection);
+
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ auto lsid = LogicalSessionId::parse(
+ IDLParserErrorContext("parse lsid for session document modification"),
+ doc.getField("_id").Obj());
+ auto parentLsid = LogicalSessionId::parse(
+ IDLParserErrorContext("parse parentLsid for session document modification"),
+ doc.getField("parentLsid").Obj());
+ auto txnNum = lsid.getTxnNumber();
+ if (auto it = parentLsidToTxnNum.find(parentLsid); it != parentLsidToTxnNum.end()) {
+ it->second = std::max(*txnNum, it->second);
+ } else {
+ parentLsidToTxnNum[parentLsid] = *txnNum;
+ }
+ }
+
+ return parentLsidToTxnNum;
+ }
+
+ /**
+ * Update each parent session's txnNumber to the highest txnNumber seen for that session
+ * (including child sessions). We do this to account for the case where a child session
+ * ran a transaction with a higher txnNumber than the last recorded txnNumber for a
+ * parent session. The parent session should know what the most recent txnNumber sent by
+ * the driver is.
+ */
+ void _updateSessionDocuments(OperationContext* opCtx,
+ const LogicalSessionIdMap<TxnNumber>& parentLsidToTxnNum) {
+ DBDirectClient client(opCtx);
+ write_ops::UpdateCommandRequest updateOp(
+ NamespaceString::kSessionTransactionsTableNamespace);
+ std::vector<write_ops::UpdateOpEntry> updates;
+ for (const auto& [lsid, txnNumber] : parentLsidToTxnNum) {
+ SessionTxnRecord modifiedDoc;
+ bool parentSessionExists = false;
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ BSON("_id" << lsid.toBSON()));
+ if ((parentSessionExists = cursor->more())) {
+ modifiedDoc = SessionTxnRecord::parse(
+ IDLParserErrorContext("parse transaction document to modify"), cursor->next());
+
+ // We do not want to override the transaction state of a parent session with a
+ // greater txnNumber than that of its child sessions.
+ if (modifiedDoc.getTxnNum() > txnNumber) {
+ continue;
+ }
+ }
+
+ // Upsert a new transaction document for a parent session if it doesn't already
+ // exist in the config.transactions collection.
+ if (!parentSessionExists) {
+ modifiedDoc.setSessionId(lsid);
+ }
+
+ modifiedDoc.setLastWriteDate(Date_t::now());
+ modifiedDoc.setTxnNum(txnNumber);
+
+ // We set this timestamp to ensure that retry attempts fail with
+ // IncompleteTransactionHistory. This is to stop us from double applying an
+ // operation.
+ modifiedDoc.setLastWriteOpTime(repl::OpTime(Timestamp(1, 0), 1));
+
+ write_ops::UpdateOpEntry updateEntry;
+ updateEntry.setQ(BSON("_id" << lsid.toBSON()));
+ updateEntry.setU(
+ write_ops::UpdateModification::parseFromClassicUpdate(modifiedDoc.toBSON()));
+ updateEntry.setUpsert(true);
+ updates.push_back(updateEntry);
+
+ if (updates.size() == write_ops::kMaxWriteBatchSize) {
+ updateOp.setUpdates(updates);
+ auto response = client.runCommand(updateOp.serialize({}));
+ uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply()));
+ updates.clear();
+ }
+ }
+
+ if (updates.size() > 0) {
+ updateOp.setUpdates(updates);
+ auto response = client.runCommand(updateOp.serialize({}));
+ uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply()));
+ }
+ }
+
+ /**
+ * Delete the remaining child sessions from the config.transactions collection.
+ */
+ void _deleteChildSessionDocuments(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+
+ write_ops::DeleteCommandRequest deleteOp(
+ NamespaceString::kSessionTransactionsTableNamespace);
+ write_ops::DeleteOpEntry deleteEntry;
+ deleteEntry.setQ(BSON("_id.txnUUID" << BSON("$exists" << true)));
+ deleteEntry.setMulti(true);
+ deleteOp.setDeletes({deleteEntry});
+
+ auto response = client.runCommand(deleteOp.serialize({}));
+ uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply()));
+ }
+
} setFeatureCompatibilityVersionCommand;
} // namespace