diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2021-12-06 17:03:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-06 17:27:42 +0000 |
commit | 8c82e865d31fe143173753689bca0170bd715250 (patch) | |
tree | 4e111b6d577684fbd62fcf1d9dcbd588bbfdba0b | |
parent | 7d00ce21af91b582235da7586ff50537a99b3839 (diff) | |
download | mongo-8c82e865d31fe143173753689bca0170bd715250.tar.gz |
SERVER-58761 Make setFCV delete the config.transactions entries for all child sessions
-rw-r--r-- | jstests/multiVersion/internal_sessions_downgrade.js | 283 | ||||
-rw-r--r-- | src/mongo/db/commands/set_feature_compatibility_version_command.cpp | 168 |
2 files changed, 451 insertions, 0 deletions
diff --git a/jstests/multiVersion/internal_sessions_downgrade.js b/jstests/multiVersion/internal_sessions_downgrade.js new file mode 100644 index 00000000000..7b342b9b667 --- /dev/null +++ b/jstests/multiVersion/internal_sessions_downgrade.js @@ -0,0 +1,283 @@ +/* + * Test that internal sessions documents are properly removed from the config.transactions + * collection. + * + * @tags: [requires_fcv_51, featureFlagInternalTransactions] + */ +(function() { +'use strict'; + +const kConfigTxnNs = "config.transactions"; +const kDbName = "testDb"; +const kCollName = "testColl"; + +(() => { + jsTest.log( + "Test downgrade updates existing parent session document with highest txnNumber of its " + + "child sessions"); + + const st = new ShardingTest({shards: {rs0: {nodes: 2}}}); + const shard0Rst = st.rs0; + const shard0Primary = shard0Rst.getPrimary(); + + const testDB = shard0Primary.getDB(kDbName); + + const sessionUUID = UUID(); + const parentLsid = {id: sessionUUID}; + const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(5), txnUUID: UUID()}; + const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(6), txnUUID: UUID()}; + + // Create the parent and child sessions + assert.commandWorked(testDB.runCommand( + {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(4)})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: childLsid0, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + lsid: childLsid1, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false})); + + const parentDocBeforeDowngrade = + shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + + assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + + shard0Rst.nodes.forEach(node => { + // We expect every document except for the parent session documents to be deleted. + const collectionDocCount = node.getCollection(kConfigTxnNs).count(); + assert.eq(collectionDocCount, 1); + + const doc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + assert.eq(doc.txnNum, NumberLong(6)); + assert.eq(doc.lastWriteOpTime.ts, Timestamp(1, 0)); + assert.eq(doc.lastWriteOpTime.t, NumberLong(1)); + assert.gte(doc.lastWriteDate, parentDocBeforeDowngrade.lastWriteDate); + }); + + st.stop(); +})(); + +(() => { + jsTest.log( + "Test downgrade upserts new transaction document if relevant parent session document " + + "does not exist and does not modify unrelated transaction documents."); + + const st = new ShardingTest({shards: {rs0: {nodes: 2}}}); + const shard0Rst = st.rs0; + const shard0Primary = shard0Rst.getPrimary(); + + const kDbName = "testDb"; + const kCollName = "testColl"; + const testDB = shard0Primary.getDB(kDbName); + + const sessionUUID = UUID(); + const unrelatedParentLsid = {id: UUID()}; + const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(7), txnUUID: UUID()}; + const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(8), txnUUID: UUID()}; + + // Start a parent session without related children. + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 0}], + lsid: unrelatedParentLsid, + txnNumber: NumberLong(10) + })); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: childLsid0, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + lsid: childLsid1, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false})); + + const unrelatedParentDocBeforeDowngrade = + shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": unrelatedParentLsid.id}); + + assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + + shard0Rst.nodes.forEach(node => { + // We expect every document except for the parent session documents to be deleted. + const collectionDocCount = node.getCollection(kConfigTxnNs).count(); + assert.eq(collectionDocCount, 2); + + const upsertedDoc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + assert.eq(upsertedDoc.txnNum, NumberLong(8)); + assert.eq(upsertedDoc.lastWriteOpTime.ts, Timestamp(1, 0)); + assert.eq(upsertedDoc.lastWriteOpTime.t, NumberLong(1)); + + const unrelatedParentDocAfterDowngrade = + node.getCollection(kConfigTxnNs).findOne({"_id.id": unrelatedParentLsid.id}); + assert.eq(unrelatedParentDocAfterDowngrade, unrelatedParentDocBeforeDowngrade); + + // For newly upserted session documents, we use the current wall clock time as the + // lastWriteDate, thus we expect it to be farther ahead than the unchanged session document. + assert.gte(upsertedDoc.lastWriteDate, unrelatedParentDocAfterDowngrade.lastWriteDate); + }); + + st.stop(); +})(); + +(() => { + jsTest.log( + "Test downgrade does not modify a parent session document if it has a higher txnNumber " + + "than its children."); + + const st = new ShardingTest({shards: {rs0: {nodes: 2}}}); + const shard0Rst = st.rs0; + const shard0Primary = shard0Rst.getPrimary(); + + const kDbName = "testDb"; + const kCollName = "testColl"; + const testDB = shard0Primary.getDB(kDbName); + + const sessionUUID = UUID(); + const parentLsid = {id: sessionUUID}; + const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(11), txnUUID: UUID()}; + const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(12), txnUUID: UUID()}; + + // Start a parent session without related children. + assert.commandWorked(testDB.runCommand( + {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(13)})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: childLsid0, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + lsid: childLsid1, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + const parentDocBeforeDowngrade = + shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false})); + + assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + + shard0Rst.nodes.forEach(node => { + // We expect every document except for the parent session documents to be deleted. + const collectionDocCount = node.getCollection(kConfigTxnNs).count(); + assert.eq(collectionDocCount, 1); + + const parentDocAfterDowngrade = + shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + assert.eq(parentDocAfterDowngrade, parentDocBeforeDowngrade); + }); + + st.stop(); +})(); + +(() => { + jsTest.log("Test downgrade modifies a parent session document if it has a txnNumber equal to " + + " the highest txnNumber of its children."); + + const st = new ShardingTest({shards: {rs0: {nodes: 2}}}); + const shard0Rst = st.rs0; + const shard0Primary = shard0Rst.getPrimary(); + + const kDbName = "testDb"; + const kCollName = "testColl"; + const testDB = shard0Primary.getDB(kDbName); + + const sessionUUID = UUID(); + const parentLsid = {id: sessionUUID}; + const childLsid0 = {id: sessionUUID, txnNumber: NumberLong(13), txnUUID: UUID()}; + const childLsid1 = {id: sessionUUID, txnNumber: NumberLong(14), txnUUID: UUID()}; + + // Start a parent session without related children. + assert.commandWorked(testDB.runCommand( + {insert: kCollName, documents: [{x: 0}], lsid: parentLsid, txnNumber: NumberLong(14)})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: childLsid0, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid0, txnNumber: NumberLong(0), autocommit: false})); + + assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + lsid: childLsid1, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(testDB.adminCommand( + {commitTransaction: 1, lsid: childLsid1, txnNumber: NumberLong(0), autocommit: false})); + + const parentDocBeforeDowngrade = + shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + + assert.commandWorked(shard0Primary.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + + shard0Rst.nodes.forEach(node => { + // We expect every document except for the parent session documents to be deleted. + const collectionDocCount = node.getCollection(kConfigTxnNs).count(); + assert.eq(collectionDocCount, 1); + + const doc = node.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID}); + assert.eq(doc.txnNum, NumberLong(14)); + assert.eq(doc.lastWriteOpTime.ts, Timestamp(1, 0)); + assert.eq(doc.lastWriteOpTime.t, NumberLong(1)); + assert.gte(doc.lastWriteDate, parentDocBeforeDowngrade.lastWriteDate); + }); + + st.stop(); +})(); +})(); diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 311036f4a32..0afc4dd5348 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -69,6 +69,8 @@ #include "mongo/db/s/resharding/resharding_coordinator_service.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/server_options.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/db/vector_clock.h" #include "mongo/db/views/view_catalog.h" @@ -655,6 +657,46 @@ private: } } + { + + LOGV2(5876100, "Starting removal of internal sessions from config.transactions."); + + // Due to the possibility that the shell or drivers have implicit sessions enabled, we + // cannot write to the config.transactions collection while we're in a session. So we + // construct a temporary client to as a work around. + auto newClient = opCtx->getServiceContext()->makeClient("InternalSessionsCleanup"); + + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } + + AlternativeClientRegion acr(newClient); + + auto setFcvCancellationThreadPool([] { + ThreadPool::Options options; + options.poolName = "SetFcvDowngradeCancellableOpCtxPool"; + options.minThreads = 1; + options.maxThreads = 1; + + auto threadPool = std::make_shared<ThreadPool>(std::move(options)); + threadPool->startup(); + return threadPool; + }()); + + CancelableOperationContextFactory factory(opCtx->getCancellationToken(), + setFcvCancellationThreadPool); + + // We use a CancelableOperationContext in order to stop cleanup if the original opCtx + // has been interrupted. + auto newOpCtxPtr = factory.makeOperationContext(&cc()); + auto newOpCtx = newOpCtxPtr.get(); + + _cleanupInternalSessions(newOpCtx); + + LOGV2(5876101, "Completed removal of internal sessions from config.transactions."); + } + uassert(ErrorCodes::Error(549181), "Failing downgrade due to 'failDowngrading' failpoint set", !failDowngrading.shouldFail()); @@ -721,6 +763,132 @@ private: } } + /** + * Removes all child sessions from the config.transactions collection and updates the parent + * sessions to have the highest txnNumber of either itself or its child sessions. + */ + void _cleanupInternalSessions(OperationContext* opCtx) { + _updateSessionDocuments(opCtx, _constructParentLsidToTxnNumberMap(opCtx)); + _deleteChildSessionDocuments(opCtx); + } + + /** + * Constructs a map consisting of a mapping between the parent session and the highest + * txnNumber of its child sessions. + */ + LogicalSessionIdMap<TxnNumber> _constructParentLsidToTxnNumberMap(OperationContext* opCtx) { + DBDirectClient client(opCtx); + + LogicalSessionIdMap<TxnNumber> parentLsidToTxnNum; + auto projection = BSON("_id" << 1 << "parentLsid" << 1); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, + BSON("parentLsid" << BSON("$exists" << true)), + {}, + 0, + 0, + &projection); + + while (cursor->more()) { + auto doc = cursor->next(); + auto lsid = LogicalSessionId::parse( + IDLParserErrorContext("parse lsid for session document modification"), + doc.getField("_id").Obj()); + auto parentLsid = LogicalSessionId::parse( + IDLParserErrorContext("parse parentLsid for session document modification"), + doc.getField("parentLsid").Obj()); + auto txnNum = lsid.getTxnNumber(); + if (auto it = parentLsidToTxnNum.find(parentLsid); it != parentLsidToTxnNum.end()) { + it->second = std::max(*txnNum, it->second); + } else { + parentLsidToTxnNum[parentLsid] = *txnNum; + } + } + + return parentLsidToTxnNum; + } + + /** + * Update each parent session's txnNumber to the highest txnNumber seen for that session + * (including child sessions). We do this to account for the case where a child session + * ran a transaction with a higher txnNumber than the last recorded txnNumber for a + * parent session. The parent session should know what the most recent txnNumber sent by + * the driver is. + */ + void _updateSessionDocuments(OperationContext* opCtx, + const LogicalSessionIdMap<TxnNumber>& parentLsidToTxnNum) { + DBDirectClient client(opCtx); + write_ops::UpdateCommandRequest updateOp( + NamespaceString::kSessionTransactionsTableNamespace); + std::vector<write_ops::UpdateOpEntry> updates; + for (const auto& [lsid, txnNumber] : parentLsidToTxnNum) { + SessionTxnRecord modifiedDoc; + bool parentSessionExists = false; + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, + BSON("_id" << lsid.toBSON())); + if ((parentSessionExists = cursor->more())) { + modifiedDoc = SessionTxnRecord::parse( + IDLParserErrorContext("parse transaction document to modify"), cursor->next()); + + // We do not want to override the transaction state of a parent session with a + // greater txnNumber than that of its child sessions. + if (modifiedDoc.getTxnNum() > txnNumber) { + continue; + } + } + + // Upsert a new transaction document for a parent session if it doesn't already + // exist in the config.transactions collection. + if (!parentSessionExists) { + modifiedDoc.setSessionId(lsid); + } + + modifiedDoc.setLastWriteDate(Date_t::now()); + modifiedDoc.setTxnNum(txnNumber); + + // We set this timestamp to ensure that retry attempts fail with + // IncompleteTransactionHistory. This is to stop us from double applying an + // operation. + modifiedDoc.setLastWriteOpTime(repl::OpTime(Timestamp(1, 0), 1)); + + write_ops::UpdateOpEntry updateEntry; + updateEntry.setQ(BSON("_id" << lsid.toBSON())); + updateEntry.setU( + write_ops::UpdateModification::parseFromClassicUpdate(modifiedDoc.toBSON())); + updateEntry.setUpsert(true); + updates.push_back(updateEntry); + + if (updates.size() == write_ops::kMaxWriteBatchSize) { + updateOp.setUpdates(updates); + auto response = client.runCommand(updateOp.serialize({})); + uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply())); + updates.clear(); + } + } + + if (updates.size() > 0) { + updateOp.setUpdates(updates); + auto response = client.runCommand(updateOp.serialize({})); + uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply())); + } + } + + /** + * Delete the remaining child sessions from the config.transactions collection. + */ + void _deleteChildSessionDocuments(OperationContext* opCtx) { + DBDirectClient client(opCtx); + + write_ops::DeleteCommandRequest deleteOp( + NamespaceString::kSessionTransactionsTableNamespace); + write_ops::DeleteOpEntry deleteEntry; + deleteEntry.setQ(BSON("_id.txnUUID" << BSON("$exists" << true))); + deleteEntry.setMulti(true); + deleteOp.setDeletes({deleteEntry}); + + auto response = client.runCommand(deleteOp.serialize({})); + uassertStatusOK(getStatusFromWriteCommandReply(response->getCommandReply())); + } + } setFeatureCompatibilityVersionCommand; } // namespace |