diff options
-rw-r--r-- | jstests/replsets/tenant_migration_commit_transaction_retry.js | 10 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_fetch_committed_transactions.js | 142 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_oplog_view.js | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_util.h | 20 |
10 files changed, 405 insertions, 31 deletions
diff --git a/jstests/replsets/tenant_migration_commit_transaction_retry.js b/jstests/replsets/tenant_migration_commit_transaction_retry.js index 113324fa5f3..16334c4221c 100644 --- a/jstests/replsets/tenant_migration_commit_transaction_retry.js +++ b/jstests/replsets/tenant_migration_commit_transaction_retry.js @@ -125,16 +125,6 @@ assert.eq(txnEntryOnDonor, aggRes.cursor.firstBatch[0]); // Test the client can retry commitTransaction for that transaction that committed prior to the // migration. -// Insert the config.transactions entry on the recipient, but with a dummy lastWriteOpTime. The -// recipient should not need a real lastWriteOpTime to support a commitTransaction retry. -txnEntryOnDonor.lastWriteOpTime.ts = new Timestamp(0, 0); -assert.commandWorked( - recipientPrimary.getCollection("config.transactions").insert([txnEntryOnDonor])); -recipientRst.awaitLastOpCommitted(); -recipientRst.getSecondaries().forEach(node => { - assert.eq(1, node.getCollection("config.transactions").count(txnEntryOnDonor)); -}); - assert.commandWorked(recipientPrimary.adminCommand({ commitTransaction: 1, lsid: txnEntryOnDonor._id, diff --git a/jstests/replsets/tenant_migration_fetch_committed_transactions.js b/jstests/replsets/tenant_migration_fetch_committed_transactions.js new file mode 100644 index 00000000000..821dd543106 --- /dev/null +++ b/jstests/replsets/tenant_migration_fetch_committed_transactions.js @@ -0,0 +1,142 @@ +/** + * Tests that the migration recipient will retrieve committed transactions on the donor with a + * 'lastWriteOpTime' before the stored 'startFetchingOpTime'. The recipient should store these + * committed transaction entries in its own 'config.transactions' collection. + * + * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft, + * incompatible_with_windows_tls] + */ + +(function() { +"use strict"; + +load("jstests/core/txns/libs/prepare_helpers.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/rslib.js"); +load("jstests/libs/uuid_util.js"); + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); +if (!tenantMigrationTest.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + return; +} + +const tenantId = "testTenantId"; +const tenantDB = tenantMigrationTest.tenantDB(tenantId, "testDB"); +const nonTenantDB = tenantMigrationTest.nonTenantDB(tenantId, "testDB"); +const collName = "testColl"; +const tenantNS = `${tenantDB}.${collName}`; +const transactionsNS = "config.transactions"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +assert.commandWorked(donorPrimary.getCollection(tenantNS).insert([{_id: 0, x: 0}, {_id: 1, x: 1}], + {writeConcern: {w: "majority"}})); + +{ + jsTestLog("Run and commit a transaction prior to the migration"); + const session = donorPrimary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase(tenantDB); + const sessionColl = sessionDb.getCollection(collName); + + session.startTransaction({writeConcern: {w: "majority"}}); + const findAndModifyRes0 = sessionColl.findAndModify({query: {x: 0}, remove: true}); + assert.eq({_id: 0, x: 0}, findAndModifyRes0); + assert.commandWorked(session.commitTransaction_forTesting()); + assert.sameMembers(sessionColl.find({}).toArray(), [{_id: 1, x: 1}]); + session.endSession(); +} + +// This should be the only transaction entry on the donor fetched by the recipient. +const fetchedDonorTxnEntry = donorPrimary.getCollection(transactionsNS).find().toArray(); + +{ + jsTestLog("Run and abort a transaction prior to the migration"); + const session = donorPrimary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase(tenantDB); + const sessionColl = sessionDb.getCollection(collName); + + session.startTransaction({writeConcern: {w: "majority"}}); + const findAndModifyRes0 = sessionColl.findAndModify({query: {x: 1}, remove: true}); + assert.eq({_id: 1, x: 1}, findAndModifyRes0); + + // We prepare the transaction so that 'abortTransaction' will update the transactions table. We + // should later see that the recipient will not update its transactions table with this entry, + // since we only fetch committed transactions. + PrepareHelpers.prepareTransaction(session); + + assert.commandWorked(session.abortTransaction_forTesting()); + assert.sameMembers(sessionColl.find({}).toArray(), [{_id: 1, x: 1}]); + session.endSession(); +} + +{ + jsTestLog("Run and commit a transaction that does not belong to the tenant"); + const session = donorPrimary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase(nonTenantDB); + const sessionColl = sessionDb.getCollection(collName); + + session.startTransaction({writeConcern: {w: "majority"}}); + assert.commandWorked(sessionColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}])); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); +} + +const fpAfterRetrievingStartOpTime = configureFailPoint( + recipientPrimary, "fpAfterRetrievingStartOpTimesMigrationRecipientInstance", {action: "hang"}); + +jsTestLog("Starting a migration"); +const migrationId = UUID(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + tenantId, +}; +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); +fpAfterRetrievingStartOpTime.wait(); + +{ + jsTestLog("Run and commit a transaction in the middle of the migration"); + const session = donorPrimary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase(tenantDB); + const sessionColl = sessionDb.getCollection(collName); + + session.startTransaction({writeConcern: {w: "majority"}}); + assert.commandWorked(sessionColl.insert([{_id: 2, x: 2}, {_id: 3, x: 3}])); + assert.commandWorked(session.commitTransaction_forTesting()); + session.endSession(); +} + +jsTestLog("Waiting for migration to complete"); +fpAfterRetrievingStartOpTime.off(); +assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + +const donorTxnEntries = donorPrimary.getCollection(transactionsNS).find().toArray(); +jsTestLog(`All donor entries: ${tojson(donorTxnEntries)}`); +assert.eq(4, donorTxnEntries.length, `donor transaction entries: ${tojson(donorTxnEntries)}`); + +const recipientTxnEntries = recipientPrimary.getCollection(transactionsNS).find().toArray(); + +// Verify that the recipient has fetched and written only the first committed transaction entry from +// the donor. +assert.eq( + 1, recipientTxnEntries.length, `recipient transaction entries: ${tojson(recipientTxnEntries)}`); +assert.eq( + fetchedDonorTxnEntry, + recipientTxnEntries, + `fetched donor transaction entries: ${ + tojson( + fetchedDonorTxnEntry)}; recipient transaction entries: ${tojson(recipientTxnEntries)}`); + +// Test that the client can retry 'commitTransaction' on the recipient. +const recipientTxnEntry = recipientTxnEntries[0]; +assert.commandWorked(recipientPrimary.adminCommand({ + commitTransaction: 1, + lsid: recipientTxnEntry._id, + txnNumber: recipientTxnEntry.txnNum, + autocommit: false, +})); + +tenantMigrationTest.stop(); +})(); diff --git a/jstests/replsets/tenant_migration_oplog_view.js b/jstests/replsets/tenant_migration_oplog_view.js index 18f9018f1e3..5803b97e836 100644 --- a/jstests/replsets/tenant_migration_oplog_view.js +++ b/jstests/replsets/tenant_migration_oplog_view.js @@ -45,12 +45,15 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) { return; } +const dbName = "test"; +const collName = "collection"; + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const rsConn = new Mongo(donorRst.getURL()); const oplog = donorPrimary.getDB("local")["oplog.rs"]; const migrationOplogView = donorPrimary.getDB("local")["system.tenantMigration.oplogView"]; const session = rsConn.startSession({retryWrites: true}); -const collection = session.getDatabase("test")["collection"]; +const collection = session.getDatabase(dbName)[collName]; { // Assert an oplog entry representing a retryable write only projects fields defined in the @@ -118,6 +121,42 @@ const collection = session.getDatabase("test")["collection"]; assert.eq(viewEntry["postImageOpTime"]["ts"], resultOplogEntry["ts"]); } +{ + // Assert that an oplog entry that belongs to a transaction will project its 'o.applyOps.ns' + // field. This is used to filter transactions that belong to the tenant. + const txnSession = rsConn.startSession(); + const txnDb = txnSession.getDatabase(dbName); + const txnColl = txnDb.getCollection(collName); + assert.commandWorked(txnColl.insert({_id: 'insertDoc'})); + + txnSession.startTransaction({writeConcern: {w: "majority"}}); + assert.commandWorked(txnColl.insert({_id: 'transaction0'})); + assert.commandWorked(txnColl.insert({_id: 'transaction1'})); + assert.commandWorked(txnSession.commitTransaction_forTesting()); + + const txnEntryOnDonor = + donorPrimary.getCollection("config.transactions").find({state: "committed"}).toArray()[0]; + jsTestLog(`Txn entry on donor: ${tojson(txnEntryOnDonor)}`); + + const viewEntry = migrationOplogView.find({ts: txnEntryOnDonor.lastWriteOpTime.ts}).next(); + jsTestLog(`Transaction view entry: ${tojson(viewEntry)}`); + + // The following fields are filtered out of the view. + assert(!viewEntry.hasOwnProperty("txnNumber")); + assert(!viewEntry.hasOwnProperty("state")); + assert(!viewEntry.hasOwnProperty("preImageOpTime")); + assert(!viewEntry.hasOwnProperty("postImageOpTime")); + assert(!viewEntry.hasOwnProperty("stmtId")); + + // Verify that the view entry has the following fields. + assert(viewEntry.hasOwnProperty("ns")); + assert(viewEntry.hasOwnProperty("ts")); + assert(viewEntry.hasOwnProperty("applyOpsNs")); + + // Assert that 'applyOpsNs' contains the namespace of the inserts. + assert.eq(viewEntry.applyOpsNs, `${dbName}.${collName}`); +} + donorRst.stopSet(); tenantMigrationTest.stop(); })(); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index f48421ba528..e4e29093151 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1309,6 +1309,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface', '$BUILD_DIR/mongo/db/transaction', 'cloner_utils', 'oplog_buffer_collection', diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index aec31adb5dd..fdde0d01cc3 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -661,7 +661,7 @@ void createOplog(OperationContext* opCtx, }); createSlimOplogView(opCtx, ctx.db()); - tenant_migration_util::createRetryableWritesView(opCtx, ctx.db()); + tenant_migration_util::createOplogViewForTenantMigrations(opCtx, ctx.db()); /* sync here so we don't get any surprising lag later when we try to sync */ service->getStorageEngine()->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index ab648a0526d..18db4cc1973 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" #include "mongo/db/repl/cloner_utils.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/oplog_applier.h" @@ -73,6 +74,36 @@ NamespaceString getOplogBufferNs(const UUID& migrationUUID) { return NamespaceString(NamespaceString::kConfigDb, kOplogBufferPrefix + migrationUUID.toString()); } + +boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) { + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + + // Add kTenantMigrationOplogView, kSessionTransactionsTableNamespace, and kRsOplogNamespace + // to resolvedNamespaces since they are all used during different pipeline stages. + resolvedNamespaces[NamespaceString::kTenantMigrationOplogView.coll()] = { + NamespaceString::kTenantMigrationOplogView, std::vector<BSONObj>()}; + + resolvedNamespaces[NamespaceString::kSessionTransactionsTableNamespace.coll()] = { + NamespaceString::kSessionTransactionsTableNamespace, std::vector<BSONObj>()}; + + resolvedNamespaces[NamespaceString::kRsOplogNamespace.coll()] = { + NamespaceString::kRsOplogNamespace, std::vector<BSONObj>()}; + + return make_intrusive<ExpressionContext>(opCtx, + boost::none, /* explain */ + false, /* fromMongos */ + false, /* needsMerge */ + true, /* allowDiskUse */ + true, /* bypassDocumentValidation */ + false, /* isMapReduceCommand */ + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, /* runtimeConstants */ + nullptr, /* collator */ + MongoProcessInterface::create(opCtx), + std::move(resolvedNamespaces), + boost::none); /* collUUID */ +} + } // namespace // A convenient place to set test-specific parameters. @@ -82,6 +113,7 @@ MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth); MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV); MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer); +MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions); // Fails before waiting for the state doc to be majority replicated. MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); @@ -770,9 +802,71 @@ void TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBefore return; } +AggregateCommand TenantMigrationRecipientService::Instance::_makeCommittedTransactionsAggregation() + const { + + auto opCtx = cc().makeOperationContext(); + auto expCtx = makeExpressionContext(opCtx.get()); + + Timestamp startFetchingTimestamp; + { + stdx::lock_guard lk(_mutex); + invariant(_stateDoc.getStartFetchingDonorOpTime()); + startFetchingTimestamp = _stateDoc.getStartFetchingDonorOpTime().get().getTimestamp(); + } + + auto serializedPipeline = + tenant_migration_util::createCommittedTransactionsPipelineForTenantMigrations( + expCtx, startFetchingTimestamp, getTenantId()) + ->serializeToBson(); + + AggregateCommand aggRequest(NamespaceString::kSessionTransactionsTableNamespace, + std::move(serializedPipeline)); + + auto readConcern = repl::ReadConcernArgs( + boost::optional<LogicalTime>(startFetchingTimestamp), + boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern)); + aggRequest.setReadConcern(readConcern.toBSONInner()); + + aggRequest.setHint(BSON(SessionTxnRecord::kSessionIdFieldName << 1)); + aggRequest.setCursor(SimpleCursorOptions()); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + + return aggRequest; +} + void TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() { - // TODO (SERVER-53511): Run the aggregation. - return; + if (MONGO_unlikely(skipFetchingCommittedTransactions.shouldFail())) { // Test-only. + return; + } + + auto aggRequest = _makeCommittedTransactionsAggregation(); + + auto statusWith = DBClientCursor::fromAggregationRequest( + _client.get(), std::move(aggRequest), true /* secondaryOk */, false /* useExhaust */); + if (!statusWith.isOK()) { + LOGV2_ERROR(5351100, + "Fetch committed transactions aggregation failed", + "error"_attr = statusWith.getStatus()); + uassertStatusOK(statusWith.getStatus()); + } + + auto opCtx = cc().makeOperationContext(); + + auto cursor = statusWith.getValue().get(); + while (cursor->more()) { + auto transactionEntry = cursor->next(); + // TODO (SERVER-53513): Properly update 'config.transactions' and write a no-op entry + // instead of upserting the document. + uassertStatusOK( + tenant_migration_util::upsertCommittedTransactionEntry(opCtx.get(), transactionEntry)); + + stdx::lock_guard lk(_mutex); + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } + } } void TenantMigrationRecipientService::Instance::_startOplogFetcher() { diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index fde90d1327a..ab48c653ce3 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -32,6 +32,7 @@ #include <boost/optional.hpp> #include <memory> +#include "mongo/db/pipeline/aggregate_command_gen.h" #include "mongo/db/repl/oplog_fetcher.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/tenant_all_database_cloner.h" @@ -361,12 +362,18 @@ public: void _fetchRetryableWritesOplogBeforeStartOpTime(); /** - * Runs an aggregation that gets the donor's transactions entries in 'config.transactions' - * with 'lastWriteOpTime' < 'startFetchingOpTime' and 'state: committed'. + * Runs the aggregation from '_makeCommittedTransactionsAggregation()' and migrates the + * resulting committed transactions entries into 'config.transactions'. */ void _fetchCommittedTransactionsBeforeStartOpTime(); /** + * Creates an aggregation pipeline to fetch transaction entries with 'lastWriteOpTime' < + * 'startFetchingDonorOpTime' and 'state: committed'. + */ + AggregateCommand _makeCommittedTransactionsAggregation() const; + + /** * Starts the tenant oplog fetcher. */ void _startOplogFetcher(); @@ -492,8 +499,10 @@ public: // Because the cloners and oplog fetcher use exhaust, we need a separate connection for // each. The '_client' will be used for the cloners and other operations such as fetching // optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only. - std::unique_ptr<DBClientConnection> _client; // (M) - std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (M) + // + // Follow DBClientCursor synchonization rules. + std::unique_ptr<DBClientConnection> _client; // (S) + std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S) std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn = std::make_unique<CreateOplogFetcherFn>(); // (M) diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index e060a53ac9e..7399633def5 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -232,6 +232,12 @@ public: auto compFp = globalFailPointRegistry().find("skipComparingRecipientAndDonorFCV"); compFp->setMode(FailPoint::alwaysOn); + // Skip fetching committed transactions, as we will test this logic entirely in integration + // tests. + auto fetchCommittedTransactionsFp = + globalFailPointRegistry().find("skipFetchingCommittedTransactions"); + fetchCommittedTransactionsFp->setMode(FailPoint::alwaysOn); + // Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first // real second. _clkSource->advance(Milliseconds(1000)); diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 247da7fdc89..035c62bf998 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -29,10 +29,14 @@ #include "mongo/db/repl/tenant_migration_util.h" +#include "mongo/bson/json.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/pipeline/document_source_lookup.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" @@ -83,7 +87,7 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto } } -void createRetryableWritesView(OperationContext* opCtx, Database* db) { +void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db) { writeConflictRetry( opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView.ns(), [&] { { @@ -98,18 +102,32 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) { wuow.commit(); } - // First match entries with a `stmtId` so that we're filtering for retryable writes - // oplog entries. Pass the result into the next stage of the pipeline and only project - // the fields that a tenant migration recipient needs to refetch retryable writes oplog - // entries: `ts`, `prevOpTime`, `preImageOpTime`, and `postImageOpTime`. + // Project the fields that a tenant migration recipient needs to refetch retryable + // writes oplog entries: `ts`, `prevOpTime`, `preImageOpTime`, and `postImageOpTime`. + // Also projects the first 'ns' field of 'applyOps' for transactions. + // + // We use two stages in this pipeline because 'o.applyOps' is an array but '$project' + // does not recognize numeric paths as array indices. As a result, we use one '$project' + // stage to get the first element in 'o.applyOps', then a second stage to store the 'ns' + // field of the element into 'applyOpsNs'. + BSONArrayBuilder pipeline; + pipeline.append(BSON("$project" << BSON("_id" + << "$ts" + << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1 + << "preImageOpTime" << 1 << "postImageOpTime" + << 1 << "applyOpsNs" + << BSON("$first" + << "$o.applyOps")))); + pipeline.append(BSON("$project" << BSON("_id" + << "$ts" + << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1 + << "preImageOpTime" << 1 << "postImageOpTime" + << 1 << "applyOpsNs" + << "$applyOpsNs.ns"))); + CollectionOptions options; options.viewOn = NamespaceString::kRsOplogNamespace.coll().toString(); - options.pipeline = BSON_ARRAY( - BSON("$match" << BSON("stmtId" << BSON("$exists" << true))) - << BSON("$project" << BSON("_id" - << "$ts" - << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1 - << "preImageOpTime" << 1 << "postImageOpTime" << 1))); + options.pipeline = pipeline.arr(); WriteUnitOfWork wuow(opCtx); uassertStatusOK( @@ -118,6 +136,65 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) { }); } +std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineForTenantMigrations( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Timestamp& startFetchingTimestamp, + const std::string& tenantId) { + Pipeline::SourceContainer stages; + using Doc = Document; + + // 1. Match config.transactions entries that have a 'lastWriteOpTime.ts' before + // 'startFetchingTimestamp' and 'state: committed', which indicates that it is a committed + // transaction. Retryable writes should not have the 'state' field. + stages.emplace_back(DocumentSourceMatch::createFromBson( + Doc{{"$match", + Doc{{"state", Value{"committed"_sd}}, + {"lastWriteOpTime.ts", Doc{{"$lt", startFetchingTimestamp}}}}}} + .toBson() + .firstElement(), + expCtx)); + + // 2. Get all oplog entries that have a timestamp equal to 'lastWriteOpTime.ts'. Store these + // oplog entries in the 'oplogEntry' field. + stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\ + $lookup: {\ + from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\ + localField: 'lastWriteOpTime.ts',\ + foreignField: 'ts',\ + as: 'oplogEntry'\ + }}") + .firstElement(), + expCtx)); + + // 3. Filter out the entries that do not belong to the tenant. + stages.emplace_back(DocumentSourceMatch::createFromBson(fromjson("{\ + $match: {\ + 'oplogEntry.applyOpsNs': {$regex: '^" + tenantId + "_'}\ + }}") + .firstElement(), + expCtx)); + + // 4. Unset the 'oplogEntry' field and return the committed transaction entries. + stages.emplace_back(DocumentSourceProject::createUnset(FieldPath("oplogEntry"), expCtx)); + + return Pipeline::create(std::move(stages), expCtx); +} + +Status upsertCommittedTransactionEntry(OperationContext* opCtx, const BSONObj& entry) { + const auto nss = NamespaceString::kSessionTransactionsTableNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + // Sanity check. + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "No longer primary while attempting to insert transactions entry", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); + + return writeConflictRetry(opCtx, "insertCommittedTransactionEntry", nss.ns(), [&]() -> Status { + Helpers::upsert(opCtx, nss.ns(), entry, false /* fromMigrate */); + return Status::OK(); + }); +} + } // namespace tenant_migration_util } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index fe467ec5e0c..a1f1cd60d1d 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -37,6 +37,7 @@ #include "mongo/config.h" #include "mongo/db/catalog/database.h" #include "mongo/db/keys_collection_document_gen.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/executor/scoped_task_executor.h" #include "mongo/util/net/ssl_util.h" @@ -146,9 +147,24 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto /** * Creates a view on the oplog that allows a tenant migration recipient to fetch retryable writes - * from a tenant migration donor. + * and transactions from a tenant migration donor. */ -void createRetryableWritesView(OperationContext* opCtx, Database* db); +void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db); + +/** + * Creates a pipeline for fetching committed transactions on the donor before + * 'startFetchingTimestamp'. We use 'tenantId' to fetch transaction entries specific to a particular + * set of tenant databases. + */ +std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineForTenantMigrations( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Timestamp& startFetchingTimestamp, + const std::string& tenantId); + +/** + * Inserts a committed transactions entry into the 'config.transactions' collection. + */ +Status upsertCommittedTransactionEntry(OperationContext* opCtx, const BSONObj& entry); } // namespace tenant_migration_util |