summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2021-02-10 22:48:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-25 04:44:31 +0000
commit22101c2af0be081847d82dddf6f303726b81b8e6 (patch)
tree9ddc0d8969ee6b3c2cc4ef30278436bf6e7e6513
parent46d914bc91a5224bda0a715c15fc440d903c9ba2 (diff)
downloadmongo-22101c2af0be081847d82dddf6f303726b81b8e6.tar.gz
SERVER-53511: Make committed transactions aggregation pipeline
-rw-r--r--jstests/replsets/tenant_migration_commit_transaction_retry.js10
-rw-r--r--jstests/replsets/tenant_migration_fetch_committed_transactions.js142
-rw-r--r--jstests/replsets/tenant_migration_oplog_view.js41
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp98
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h17
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp6
-rw-r--r--src/mongo/db/repl/tenant_migration_util.cpp99
-rw-r--r--src/mongo/db/repl/tenant_migration_util.h20
10 files changed, 405 insertions, 31 deletions
diff --git a/jstests/replsets/tenant_migration_commit_transaction_retry.js b/jstests/replsets/tenant_migration_commit_transaction_retry.js
index 113324fa5f3..16334c4221c 100644
--- a/jstests/replsets/tenant_migration_commit_transaction_retry.js
+++ b/jstests/replsets/tenant_migration_commit_transaction_retry.js
@@ -125,16 +125,6 @@ assert.eq(txnEntryOnDonor, aggRes.cursor.firstBatch[0]);
// Test the client can retry commitTransaction for that transaction that committed prior to the
// migration.
-// Insert the config.transactions entry on the recipient, but with a dummy lastWriteOpTime. The
-// recipient should not need a real lastWriteOpTime to support a commitTransaction retry.
-txnEntryOnDonor.lastWriteOpTime.ts = new Timestamp(0, 0);
-assert.commandWorked(
- recipientPrimary.getCollection("config.transactions").insert([txnEntryOnDonor]));
-recipientRst.awaitLastOpCommitted();
-recipientRst.getSecondaries().forEach(node => {
- assert.eq(1, node.getCollection("config.transactions").count(txnEntryOnDonor));
-});
-
assert.commandWorked(recipientPrimary.adminCommand({
commitTransaction: 1,
lsid: txnEntryOnDonor._id,
diff --git a/jstests/replsets/tenant_migration_fetch_committed_transactions.js b/jstests/replsets/tenant_migration_fetch_committed_transactions.js
new file mode 100644
index 00000000000..821dd543106
--- /dev/null
+++ b/jstests/replsets/tenant_migration_fetch_committed_transactions.js
@@ -0,0 +1,142 @@
+/**
+ * Tests that the migration recipient will retrieve committed transactions on the donor with a
+ * 'lastWriteOpTime' before the stored 'startFetchingOpTime'. The recipient should store these
+ * committed transaction entries in its own 'config.transactions' collection.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft,
+ * incompatible_with_windows_tls]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/core/txns/libs/prepare_helpers.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/rslib.js");
+load("jstests/libs/uuid_util.js");
+
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ return;
+}
+
+const tenantId = "testTenantId";
+const tenantDB = tenantMigrationTest.tenantDB(tenantId, "testDB");
+const nonTenantDB = tenantMigrationTest.nonTenantDB(tenantId, "testDB");
+const collName = "testColl";
+const tenantNS = `${tenantDB}.${collName}`;
+const transactionsNS = "config.transactions";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+
+assert.commandWorked(donorPrimary.getCollection(tenantNS).insert([{_id: 0, x: 0}, {_id: 1, x: 1}],
+ {writeConcern: {w: "majority"}}));
+
+{
+ jsTestLog("Run and commit a transaction prior to the migration");
+ const session = donorPrimary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase(tenantDB);
+ const sessionColl = sessionDb.getCollection(collName);
+
+ session.startTransaction({writeConcern: {w: "majority"}});
+ const findAndModifyRes0 = sessionColl.findAndModify({query: {x: 0}, remove: true});
+ assert.eq({_id: 0, x: 0}, findAndModifyRes0);
+ assert.commandWorked(session.commitTransaction_forTesting());
+ assert.sameMembers(sessionColl.find({}).toArray(), [{_id: 1, x: 1}]);
+ session.endSession();
+}
+
+// This should be the only transaction entry on the donor fetched by the recipient.
+const fetchedDonorTxnEntry = donorPrimary.getCollection(transactionsNS).find().toArray();
+
+{
+ jsTestLog("Run and abort a transaction prior to the migration");
+ const session = donorPrimary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase(tenantDB);
+ const sessionColl = sessionDb.getCollection(collName);
+
+ session.startTransaction({writeConcern: {w: "majority"}});
+ const findAndModifyRes0 = sessionColl.findAndModify({query: {x: 1}, remove: true});
+ assert.eq({_id: 1, x: 1}, findAndModifyRes0);
+
+ // We prepare the transaction so that 'abortTransaction' will update the transactions table. We
+ // should later see that the recipient will not update its transactions table with this entry,
+ // since we only fetch committed transactions.
+ PrepareHelpers.prepareTransaction(session);
+
+ assert.commandWorked(session.abortTransaction_forTesting());
+ assert.sameMembers(sessionColl.find({}).toArray(), [{_id: 1, x: 1}]);
+ session.endSession();
+}
+
+{
+ jsTestLog("Run and commit a transaction that does not belong to the tenant");
+ const session = donorPrimary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase(nonTenantDB);
+ const sessionColl = sessionDb.getCollection(collName);
+
+ session.startTransaction({writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionColl.insert([{_id: 0, x: 0}, {_id: 1, x: 1}]));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+}
+
+const fpAfterRetrievingStartOpTime = configureFailPoint(
+ recipientPrimary, "fpAfterRetrievingStartOpTimesMigrationRecipientInstance", {action: "hang"});
+
+jsTestLog("Starting a migration");
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId,
+};
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+fpAfterRetrievingStartOpTime.wait();
+
+{
+ jsTestLog("Run and commit a transaction in the middle of the migration");
+ const session = donorPrimary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase(tenantDB);
+ const sessionColl = sessionDb.getCollection(collName);
+
+ session.startTransaction({writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionColl.insert([{_id: 2, x: 2}, {_id: 3, x: 3}]));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ session.endSession();
+}
+
+jsTestLog("Waiting for migration to complete");
+fpAfterRetrievingStartOpTime.off();
+assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+
+const donorTxnEntries = donorPrimary.getCollection(transactionsNS).find().toArray();
+jsTestLog(`All donor entries: ${tojson(donorTxnEntries)}`);
+assert.eq(4, donorTxnEntries.length, `donor transaction entries: ${tojson(donorTxnEntries)}`);
+
+const recipientTxnEntries = recipientPrimary.getCollection(transactionsNS).find().toArray();
+
+// Verify that the recipient has fetched and written only the first committed transaction entry from
+// the donor.
+assert.eq(
+ 1, recipientTxnEntries.length, `recipient transaction entries: ${tojson(recipientTxnEntries)}`);
+assert.eq(
+ fetchedDonorTxnEntry,
+ recipientTxnEntries,
+ `fetched donor transaction entries: ${
+ tojson(
+ fetchedDonorTxnEntry)}; recipient transaction entries: ${tojson(recipientTxnEntries)}`);
+
+// Test that the client can retry 'commitTransaction' on the recipient.
+const recipientTxnEntry = recipientTxnEntries[0];
+assert.commandWorked(recipientPrimary.adminCommand({
+ commitTransaction: 1,
+ lsid: recipientTxnEntry._id,
+ txnNumber: recipientTxnEntry.txnNum,
+ autocommit: false,
+}));
+
+tenantMigrationTest.stop();
+})();
diff --git a/jstests/replsets/tenant_migration_oplog_view.js b/jstests/replsets/tenant_migration_oplog_view.js
index 18f9018f1e3..5803b97e836 100644
--- a/jstests/replsets/tenant_migration_oplog_view.js
+++ b/jstests/replsets/tenant_migration_oplog_view.js
@@ -45,12 +45,15 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) {
return;
}
+const dbName = "test";
+const collName = "collection";
+
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const rsConn = new Mongo(donorRst.getURL());
const oplog = donorPrimary.getDB("local")["oplog.rs"];
const migrationOplogView = donorPrimary.getDB("local")["system.tenantMigration.oplogView"];
const session = rsConn.startSession({retryWrites: true});
-const collection = session.getDatabase("test")["collection"];
+const collection = session.getDatabase(dbName)[collName];
{
// Assert an oplog entry representing a retryable write only projects fields defined in the
@@ -118,6 +121,42 @@ const collection = session.getDatabase("test")["collection"];
assert.eq(viewEntry["postImageOpTime"]["ts"], resultOplogEntry["ts"]);
}
+{
+ // Assert that an oplog entry that belongs to a transaction will project its 'o.applyOps.ns'
+ // field. This is used to filter transactions that belong to the tenant.
+ const txnSession = rsConn.startSession();
+ const txnDb = txnSession.getDatabase(dbName);
+ const txnColl = txnDb.getCollection(collName);
+ assert.commandWorked(txnColl.insert({_id: 'insertDoc'}));
+
+ txnSession.startTransaction({writeConcern: {w: "majority"}});
+ assert.commandWorked(txnColl.insert({_id: 'transaction0'}));
+ assert.commandWorked(txnColl.insert({_id: 'transaction1'}));
+ assert.commandWorked(txnSession.commitTransaction_forTesting());
+
+ const txnEntryOnDonor =
+ donorPrimary.getCollection("config.transactions").find({state: "committed"}).toArray()[0];
+ jsTestLog(`Txn entry on donor: ${tojson(txnEntryOnDonor)}`);
+
+ const viewEntry = migrationOplogView.find({ts: txnEntryOnDonor.lastWriteOpTime.ts}).next();
+ jsTestLog(`Transaction view entry: ${tojson(viewEntry)}`);
+
+ // The following fields are filtered out of the view.
+ assert(!viewEntry.hasOwnProperty("txnNumber"));
+ assert(!viewEntry.hasOwnProperty("state"));
+ assert(!viewEntry.hasOwnProperty("preImageOpTime"));
+ assert(!viewEntry.hasOwnProperty("postImageOpTime"));
+ assert(!viewEntry.hasOwnProperty("stmtId"));
+
+ // Verify that the view entry has the following fields.
+ assert(viewEntry.hasOwnProperty("ns"));
+ assert(viewEntry.hasOwnProperty("ts"));
+ assert(viewEntry.hasOwnProperty("applyOpsNs"));
+
+ // Assert that 'applyOpsNs' contains the namespace of the inserts.
+ assert.eq(viewEntry.applyOpsNs, `${dbName}.${collName}`);
+}
+
donorRst.stopSet();
tenantMigrationTest.stop();
})();
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f48421ba528..e4e29093151 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1309,6 +1309,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
'$BUILD_DIR/mongo/db/transaction',
'cloner_utils',
'oplog_buffer_collection',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index aec31adb5dd..fdde0d01cc3 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -661,7 +661,7 @@ void createOplog(OperationContext* opCtx,
});
createSlimOplogView(opCtx, ctx.db());
- tenant_migration_util::createRetryableWritesView(opCtx, ctx.db());
+ tenant_migration_util::createOplogViewForTenantMigrations(opCtx, ctx.db());
/* sync here so we don't get any surprising lag later when we try to sync */
service->getStorageEngine()->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index ab648a0526d..18db4cc1973 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
#include "mongo/db/repl/cloner_utils.h"
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/oplog_applier.h"
@@ -73,6 +74,36 @@ NamespaceString getOplogBufferNs(const UUID& migrationUUID) {
return NamespaceString(NamespaceString::kConfigDb,
kOplogBufferPrefix + migrationUUID.toString());
}
+
+boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) {
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+
+ // Add kTenantMigrationOplogView, kSessionTransactionsTableNamespace, and kRsOplogNamespace
+ // to resolvedNamespaces since they are all used during different pipeline stages.
+ resolvedNamespaces[NamespaceString::kTenantMigrationOplogView.coll()] = {
+ NamespaceString::kTenantMigrationOplogView, std::vector<BSONObj>()};
+
+ resolvedNamespaces[NamespaceString::kSessionTransactionsTableNamespace.coll()] = {
+ NamespaceString::kSessionTransactionsTableNamespace, std::vector<BSONObj>()};
+
+ resolvedNamespaces[NamespaceString::kRsOplogNamespace.coll()] = {
+ NamespaceString::kRsOplogNamespace, std::vector<BSONObj>()};
+
+ return make_intrusive<ExpressionContext>(opCtx,
+ boost::none, /* explain */
+ false, /* fromMongos */
+ false, /* needsMerge */
+ true, /* allowDiskUse */
+ true, /* bypassDocumentValidation */
+ false, /* isMapReduceCommand */
+ NamespaceString::kSessionTransactionsTableNamespace,
+ boost::none, /* runtimeConstants */
+ nullptr, /* collator */
+ MongoProcessInterface::create(opCtx),
+ std::move(resolvedNamespaces),
+ boost::none); /* collUUID */
+}
+
} // namespace
// A convenient place to set test-specific parameters.
@@ -82,6 +113,7 @@ MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth);
MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV);
MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration);
MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer);
+MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions);
// Fails before waiting for the state doc to be majority replicated.
MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc);
@@ -770,9 +802,71 @@ void TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBefore
return;
}
+AggregateCommand TenantMigrationRecipientService::Instance::_makeCommittedTransactionsAggregation()
+ const {
+
+ auto opCtx = cc().makeOperationContext();
+ auto expCtx = makeExpressionContext(opCtx.get());
+
+ Timestamp startFetchingTimestamp;
+ {
+ stdx::lock_guard lk(_mutex);
+ invariant(_stateDoc.getStartFetchingDonorOpTime());
+ startFetchingTimestamp = _stateDoc.getStartFetchingDonorOpTime().get().getTimestamp();
+ }
+
+ auto serializedPipeline =
+ tenant_migration_util::createCommittedTransactionsPipelineForTenantMigrations(
+ expCtx, startFetchingTimestamp, getTenantId())
+ ->serializeToBson();
+
+ AggregateCommand aggRequest(NamespaceString::kSessionTransactionsTableNamespace,
+ std::move(serializedPipeline));
+
+ auto readConcern = repl::ReadConcernArgs(
+ boost::optional<LogicalTime>(startFetchingTimestamp),
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ aggRequest.setReadConcern(readConcern.toBSONInner());
+
+ aggRequest.setHint(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
+ aggRequest.setCursor(SimpleCursorOptions());
+ // We must set a writeConcern on internal commands.
+ aggRequest.setWriteConcern(WriteConcernOptions());
+
+ return aggRequest;
+}
+
void TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() {
- // TODO (SERVER-53511): Run the aggregation.
- return;
+ if (MONGO_unlikely(skipFetchingCommittedTransactions.shouldFail())) { // Test-only.
+ return;
+ }
+
+ auto aggRequest = _makeCommittedTransactionsAggregation();
+
+ auto statusWith = DBClientCursor::fromAggregationRequest(
+ _client.get(), std::move(aggRequest), true /* secondaryOk */, false /* useExhaust */);
+ if (!statusWith.isOK()) {
+ LOGV2_ERROR(5351100,
+ "Fetch committed transactions aggregation failed",
+ "error"_attr = statusWith.getStatus());
+ uassertStatusOK(statusWith.getStatus());
+ }
+
+ auto opCtx = cc().makeOperationContext();
+
+ auto cursor = statusWith.getValue().get();
+ while (cursor->more()) {
+ auto transactionEntry = cursor->next();
+ // TODO (SERVER-53513): Properly update 'config.transactions' and write a no-op entry
+ // instead of upserting the document.
+ uassertStatusOK(
+ tenant_migration_util::upsertCommittedTransactionEntry(opCtx.get(), transactionEntry));
+
+ stdx::lock_guard lk(_mutex);
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
+ }
}
void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index fde90d1327a..ab48c653ce3 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -32,6 +32,7 @@
#include <boost/optional.hpp>
#include <memory>
+#include "mongo/db/pipeline/aggregate_command_gen.h"
#include "mongo/db/repl/oplog_fetcher.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/tenant_all_database_cloner.h"
@@ -361,12 +362,18 @@ public:
void _fetchRetryableWritesOplogBeforeStartOpTime();
/**
- * Runs an aggregation that gets the donor's transactions entries in 'config.transactions'
- * with 'lastWriteOpTime' < 'startFetchingOpTime' and 'state: committed'.
+ * Runs the aggregation from '_makeCommittedTransactionsAggregation()' and migrates the
+ * resulting committed transactions entries into 'config.transactions'.
*/
void _fetchCommittedTransactionsBeforeStartOpTime();
/**
+ * Creates an aggregation pipeline to fetch transaction entries with 'lastWriteOpTime' <
+ * 'startFetchingDonorOpTime' and 'state: committed'.
+ */
+ AggregateCommand _makeCommittedTransactionsAggregation() const;
+
+ /**
* Starts the tenant oplog fetcher.
*/
void _startOplogFetcher();
@@ -492,8 +499,10 @@ public:
// Because the cloners and oplog fetcher use exhaust, we need a separate connection for
// each. The '_client' will be used for the cloners and other operations such as fetching
// optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only.
- std::unique_ptr<DBClientConnection> _client; // (M)
- std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (M)
+ //
+ // Follow DBClientCursor synchonization rules.
+ std::unique_ptr<DBClientConnection> _client; // (S)
+ std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S)
std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn =
std::make_unique<CreateOplogFetcherFn>(); // (M)
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index e060a53ac9e..7399633def5 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -232,6 +232,12 @@ public:
auto compFp = globalFailPointRegistry().find("skipComparingRecipientAndDonorFCV");
compFp->setMode(FailPoint::alwaysOn);
+ // Skip fetching committed transactions, as we will test this logic entirely in integration
+ // tests.
+ auto fetchCommittedTransactionsFp =
+ globalFailPointRegistry().find("skipFetchingCommittedTransactions");
+ fetchCommittedTransactionsFp->setMode(FailPoint::alwaysOn);
+
// Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first
// real second.
_clkSource->advance(Milliseconds(1000));
diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp
index 247da7fdc89..035c62bf998 100644
--- a/src/mongo/db/repl/tenant_migration_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_util.cpp
@@ -29,10 +29,14 @@
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/bson/json.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/wait_for_majority_service.h"
@@ -83,7 +87,7 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto
}
}
-void createRetryableWritesView(OperationContext* opCtx, Database* db) {
+void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db) {
writeConflictRetry(
opCtx, "createDonorOplogView", NamespaceString::kTenantMigrationOplogView.ns(), [&] {
{
@@ -98,18 +102,32 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) {
wuow.commit();
}
- // First match entries with a `stmtId` so that we're filtering for retryable writes
- // oplog entries. Pass the result into the next stage of the pipeline and only project
- // the fields that a tenant migration recipient needs to refetch retryable writes oplog
- // entries: `ts`, `prevOpTime`, `preImageOpTime`, and `postImageOpTime`.
+ // Project the fields that a tenant migration recipient needs to refetch retryable
+ // writes oplog entries: `ts`, `prevOpTime`, `preImageOpTime`, and `postImageOpTime`.
+ // Also projects the first 'ns' field of 'applyOps' for transactions.
+ //
+ // We use two stages in this pipeline because 'o.applyOps' is an array but '$project'
+ // does not recognize numeric paths as array indices. As a result, we use one '$project'
+ // stage to get the first element in 'o.applyOps', then a second stage to store the 'ns'
+ // field of the element into 'applyOpsNs'.
+ BSONArrayBuilder pipeline;
+ pipeline.append(BSON("$project" << BSON("_id"
+ << "$ts"
+ << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1
+ << "preImageOpTime" << 1 << "postImageOpTime"
+ << 1 << "applyOpsNs"
+ << BSON("$first"
+ << "$o.applyOps"))));
+ pipeline.append(BSON("$project" << BSON("_id"
+ << "$ts"
+ << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1
+ << "preImageOpTime" << 1 << "postImageOpTime"
+ << 1 << "applyOpsNs"
+ << "$applyOpsNs.ns")));
+
CollectionOptions options;
options.viewOn = NamespaceString::kRsOplogNamespace.coll().toString();
- options.pipeline = BSON_ARRAY(
- BSON("$match" << BSON("stmtId" << BSON("$exists" << true)))
- << BSON("$project" << BSON("_id"
- << "$ts"
- << "ns" << 1 << "ts" << 1 << "prevOpTime" << 1
- << "preImageOpTime" << 1 << "postImageOpTime" << 1)));
+ options.pipeline = pipeline.arr();
WriteUnitOfWork wuow(opCtx);
uassertStatusOK(
@@ -118,6 +136,65 @@ void createRetryableWritesView(OperationContext* opCtx, Database* db) {
});
}
+std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineForTenantMigrations(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const Timestamp& startFetchingTimestamp,
+ const std::string& tenantId) {
+ Pipeline::SourceContainer stages;
+ using Doc = Document;
+
+ // 1. Match config.transactions entries that have a 'lastWriteOpTime.ts' before
+ // 'startFetchingTimestamp' and 'state: committed', which indicates that it is a committed
+ // transaction. Retryable writes should not have the 'state' field.
+ stages.emplace_back(DocumentSourceMatch::createFromBson(
+ Doc{{"$match",
+ Doc{{"state", Value{"committed"_sd}},
+ {"lastWriteOpTime.ts", Doc{{"$lt", startFetchingTimestamp}}}}}}
+ .toBson()
+ .firstElement(),
+ expCtx));
+
+ // 2. Get all oplog entries that have a timestamp equal to 'lastWriteOpTime.ts'. Store these
+ // oplog entries in the 'oplogEntry' field.
+ stages.emplace_back(DocumentSourceLookUp::createFromBson(fromjson("{\
+ $lookup: {\
+ from: {db: 'local', coll: 'system.tenantMigration.oplogView'},\
+ localField: 'lastWriteOpTime.ts',\
+ foreignField: 'ts',\
+ as: 'oplogEntry'\
+ }}")
+ .firstElement(),
+ expCtx));
+
+ // 3. Filter out the entries that do not belong to the tenant.
+ stages.emplace_back(DocumentSourceMatch::createFromBson(fromjson("{\
+ $match: {\
+ 'oplogEntry.applyOpsNs': {$regex: '^" + tenantId + "_'}\
+ }}")
+ .firstElement(),
+ expCtx));
+
+ // 4. Unset the 'oplogEntry' field and return the committed transaction entries.
+ stages.emplace_back(DocumentSourceProject::createUnset(FieldPath("oplogEntry"), expCtx));
+
+ return Pipeline::create(std::move(stages), expCtx);
+}
+
+Status upsertCommittedTransactionEntry(OperationContext* opCtx, const BSONObj& entry) {
+ const auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ // Sanity check.
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "No longer primary while attempting to insert transactions entry",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
+
+ return writeConflictRetry(opCtx, "insertCommittedTransactionEntry", nss.ns(), [&]() -> Status {
+ Helpers::upsert(opCtx, nss.ns(), entry, false /* fromMigrate */);
+ return Status::OK();
+ });
+}
+
} // namespace tenant_migration_util
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h
index fe467ec5e0c..a1f1cd60d1d 100644
--- a/src/mongo/db/repl/tenant_migration_util.h
+++ b/src/mongo/db/repl/tenant_migration_util.h
@@ -37,6 +37,7 @@
#include "mongo/config.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/keys_collection_document_gen.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/net/ssl_util.h"
@@ -146,9 +147,24 @@ void storeExternalClusterTimeKeyDocs(std::shared_ptr<executor::ScopedTaskExecuto
/**
* Creates a view on the oplog that allows a tenant migration recipient to fetch retryable writes
- * from a tenant migration donor.
+ * and transactions from a tenant migration donor.
*/
-void createRetryableWritesView(OperationContext* opCtx, Database* db);
+void createOplogViewForTenantMigrations(OperationContext* opCtx, Database* db);
+
+/**
+ * Creates a pipeline for fetching committed transactions on the donor before
+ * 'startFetchingTimestamp'. We use 'tenantId' to fetch transaction entries specific to a particular
+ * set of tenant databases.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> createCommittedTransactionsPipelineForTenantMigrations(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const Timestamp& startFetchingTimestamp,
+ const std::string& tenantId);
+
+/**
+ * Inserts a committed transactions entry into the 'config.transactions' collection.
+ */
+Status upsertCommittedTransactionEntry(OperationContext* opCtx, const BSONObj& entry);
} // namespace tenant_migration_util