summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2020-10-19 18:17:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-17 14:02:49 +0000
commit519bc2b24ecc6c29f2483c8b82786e16f61c2cba (patch)
tree1689c9416eb3d5fc06b616f60c7635e581c989f8
parent9b0e366a75a9cc25705969932b3374d21d4d13c9 (diff)
downloadmongo-519bc2b24ecc6c29f2483c8b82786e16f61c2cba.tar.gz
SERVER-51596: Create TenantMigrationTest test fixture for JS tests
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js416
-rw-r--r--jstests/replsets/libs/tenant_migration_util.js258
-rw-r--r--jstests/replsets/tenant_migration_commit_transaction_retry.js15
-rw-r--r--jstests/replsets/tenant_migration_concurrent_bulk_writes.js239
-rw-r--r--jstests/replsets/tenant_migration_concurrent_migrations.js77
-rw-r--r--jstests/replsets/tenant_migration_concurrent_reads.js175
-rw-r--r--jstests/replsets/tenant_migration_concurrent_writes.js89
-rw-r--r--jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js147
-rw-r--r--jstests/replsets/tenant_migration_donor_abort_state_transition.js84
-rw-r--r--jstests/replsets/tenant_migration_donor_current_op.js71
-rw-r--r--jstests/replsets/tenant_migration_donor_initial_sync_recovery.js88
-rw-r--r--jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js78
-rw-r--r--jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js91
-rw-r--r--jstests/replsets/tenant_migration_donor_retry.js186
-rw-r--r--jstests/replsets/tenant_migration_donor_rollback_recovery.js117
-rw-r--r--jstests/replsets/tenant_migration_donor_startup_recovery.js101
-rw-r--r--jstests/replsets/tenant_migration_donor_state_machine.js79
-rw-r--r--jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js107
-rw-r--r--jstests/replsets/tenant_migration_large_txn.js54
-rw-r--r--jstests/replsets/tenant_migration_no_failover.js93
20 files changed, 1251 insertions, 1314 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
new file mode 100644
index 00000000000..3875d2ab0d5
--- /dev/null
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -0,0 +1,416 @@
+/**
+ * Wrapper around ReplSetTest for testing tenant migration behavior.
+ */
+
+"use strict";
+
+load("jstests/aggregation/extras/utils.js");
+load("jstests/replsets/rslib.js");
+load('jstests/libs/fail_point_util.js');
+
+/**
+ * This fixture allows the user to optionally pass in a custom ReplSetTest for the donor and
+ * recipient replica sets, to be used for the test.
+ *
+ * If the caller does not provide their own replica set, a two node replset will be initialized
+ * instead, with all nodes running the latest version.
+ *
+ * @param {string} [name] the name of the replica sets
+ * @param {Object} [donorRst] the ReplSetTest instance to adopt for the donor
+ * @param {Object} [recipientRst] the ReplSetTest instance to adopt for the recipient
+ */
+function TenantMigrationTest({name = "TenantMigrationTest", donorRst, recipientRst}) {
+ const donorPassedIn = (donorRst !== undefined);
+ const recipientPassedIn = (recipientRst !== undefined);
+
+ donorRst = donorPassedIn ? donorRst : performSetUp(true /* isDonor */);
+ recipientRst = recipientPassedIn ? recipientRst : performSetUp(false /* isDonor */);
+
+ donorRst.getPrimary();
+ donorRst.awaitReplication();
+
+ recipientRst.getPrimary();
+ recipientRst.awaitReplication();
+
+ /**
+ * Creates a ReplSetTest instance. The repl set will have 2 nodes.
+ */
+ function performSetUp(isDonor) {
+ let setParameterOpts = {"enableTenantMigrations": true};
+ if (TestData.logComponentVerbosity) {
+ setParameterOpts["logComponentVerbosity"] =
+ tojsononeline(TestData.logComponentVerbosity);
+ }
+
+ // TODO: SERVER-51734: Remove setting this failpoint.
+ if (!isDonor) {
+ setParameterOpts["failpoint.returnResponseOkForRecipientSyncDataCmd"] =
+ tojson({mode: 'alwaysOn'});
+ }
+
+ let nodeOptions = {};
+ nodeOptions["setParameter"] = setParameterOpts;
+
+ const rstName = `${name}_${(isDonor ? "donor" : "recipient")}`;
+ const rst = new ReplSetTest({name: rstName, nodes: 2, nodeOptions});
+ rst.startSet();
+ rst.initiateWithHighElectionTimeout();
+
+ return rst;
+ }
+
+ /**
+ * Runs a tenant migration with the given migration options and waits for the migration to be
+ * committed or aborted.
+ *
+ * Returns the result of the initial donorStartMigration if it was unsuccessful. Otherwise,
+ * returns the command response containing the migration state on the donor after the migration
+ * has completed.
+ */
+ this.runMigration = function(migrationOpts, retryOnRetryableErrors = false) {
+ const res = this.startMigration(migrationOpts, retryOnRetryableErrors);
+ if (!res.ok) {
+ return res;
+ }
+
+ return this.waitForMigrationToComplete(migrationOpts, retryOnRetryableErrors);
+ };
+
+ /**
+ * Starts a tenant migration by running the 'donorStartMigration' command once.
+ *
+ * Returns the result of the 'donorStartMigration' command.
+ */
+ this.startMigration = function(migrationOpts, retryOnRetryableErrors = false) {
+ return this.runDonorStartMigration(
+ migrationOpts, false /* waitForMigrationToComplete */, retryOnRetryableErrors);
+ };
+
+ /**
+ * Waits for a migration to complete by continuously polling the donor primary with
+ * 'donorStartMigration' until the returned state is committed or aborted. Must be used with
+ * startMigration, after the migration has been started for the specified tenantId.
+ *
+ * Returns the result of the last 'donorStartMigration' command executed.
+ */
+ this.waitForMigrationToComplete = function(migrationOpts, retryOnRetryableErrors = false) {
+ // Assert that the migration has already been started.
+ const tenantId = migrationOpts.tenantId;
+ assert(this.getDonorPrimary()
+ .getCollection(TenantMigrationTest.kConfigDonorsNS)
+ .findOne({tenantId}));
+ return this.runDonorStartMigration(
+ migrationOpts, true /* waitForMigrationToComplete */, retryOnRetryableErrors);
+ };
+
+ /**
+ * Executes the 'donorStartMigration' command on the donor primary.
+ *
+ * This will return on the first successful command if 'waitForMigrationToComplete' is
+ * set to false. Otherwise, it will continuously poll the donor primary until the
+ * migration has been committed or aborted.
+ *
+ * If 'retryOnRetryableErrors' is set, this function will retry if the command fails
+ * with a NotPrimary or network error.
+ */
+ this.runDonorStartMigration = function({
+ migrationIdString,
+ tenantId,
+ recipientConnectionString = recipientRst.getURL(),
+ readPreference = {mode: "primary"},
+ },
+ waitForMigrationToComplete,
+ retryOnRetryableErrors) {
+ const cmdObj = {
+ donorStartMigration: 1,
+ tenantId,
+ migrationId: UUID(migrationIdString),
+ recipientConnectionString,
+ readPreference,
+ };
+
+ let donorPrimary = this.getDonorPrimary();
+ let stateRes;
+
+ assert.soon(() => {
+ try {
+ stateRes = donorPrimary.adminCommand(cmdObj);
+
+ if (!stateRes.ok) {
+ // If retry is enabled and the command failed with a NotPrimary error, continue
+ // looping.
+ if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(stateRes.code)) {
+ donorPrimary = donorRst.getPrimary();
+ return false;
+ }
+ return true;
+ }
+
+ // The command has been successfully executed. If we don't need to wait for the
+ // migration to complete, exit the loop.
+ if (!waitForMigrationToComplete) {
+ return true;
+ }
+
+ return (stateRes.state === TenantMigrationTest.State.kCommitted ||
+ stateRes.state === TenantMigrationTest.State.kAborted);
+ } catch (e) {
+ // If the thrown error is network related and we are allowing retryable errors,
+ // continue issuing commands.
+ if (retryOnRetryableErrors && isNetworkError(e)) {
+ return false;
+ }
+ throw e;
+ }
+ });
+ return stateRes;
+ };
+
+ /**
+ * Runs the donorForgetMigration command with the given migrationId and returns the response.
+ *
+ * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a
+ * NotPrimary or network error.
+ */
+ this.forgetMigration = function(migrationIdString, retryOnRetryableErrors = false) {
+ let donorPrimary = this.getDonorPrimary();
+
+ let res;
+
+ assert.soon(() => {
+ try {
+ res = donorPrimary.adminCommand(
+ {donorForgetMigration: 1, migrationId: UUID(migrationIdString)});
+
+ if (!res.ok) {
+ // If retry is enabled and the command failed with a NotPrimary error, continue
+ // looping.
+ if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) {
+ donorPrimary = donorRst.getPrimary();
+ return false;
+ }
+ }
+
+ return true;
+ } catch (e) {
+ if (retryOnRetryableErrors && isNetworkError(e)) {
+ return false;
+ }
+ throw e;
+ }
+ });
+ return res;
+ };
+
+ /**
+ * Asserts that durable and in-memory state for the migration 'migrationId' and 'tenantId' is
+ * eventually deleted from the given nodes.
+ */
+ this.waitForMigrationGarbageCollection = function(nodes, migrationId, tenantId) {
+ nodes.forEach(node => {
+ const configDonorsColl = node.getCollection("config.tenantMigrationDonors");
+ assert.soon(() => 0 === configDonorsColl.count({_id: migrationId}));
+
+ assert.soon(() => 0 ===
+ assert.commandWorked(node.adminCommand({serverStatus: 1}))
+ .repl.primaryOnlyServices.TenantMigrationDonorService);
+
+ let mtabs;
+ assert.soon(() => {
+ mtabs = assert.commandWorked(node.adminCommand({serverStatus: 1}))
+ .tenantMigrationAccessBlocker;
+ return !mtabs || !mtabs[tenantId];
+ }, tojson(mtabs));
+ });
+ };
+
+ /**
+ * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to the
+ * expected state on all the given nodes.
+ */
+ this.waitForNodesToReachState = function(nodes, migrationId, tenantId, expectedState) {
+ nodes.forEach(node => {
+ assert.soon(() =>
+ this.isNodeInExpectedState(node, migrationId, tenantId, expectedState));
+ });
+ };
+
+ /**
+ * Asserts that the migration 'migrationId' and 'tenantId' is in the expected state on all the
+ * given nodes.
+ */
+ this.assertNodesInExpectedState = function(nodes, migrationId, tenantId, expectedState) {
+ nodes.forEach(node => {
+ assert(this.isNodeInExpectedState(node, migrationId, tenantId, expectedState));
+ });
+ };
+
+ /**
+ * Returns true if the durable and in-memory state for the migration 'migrationId' and
+ * 'tenantId' is in the expected state, and false otherwise.
+ */
+ this.isNodeInExpectedState = function(node, migrationId, tenantId, expectedState) {
+ const configDonorsColl =
+ this.getDonorPrimary().getCollection("config.tenantMigrationDonors");
+ if (configDonorsColl.findOne({_id: migrationId}).state !== expectedState) {
+ return false;
+ }
+
+ const expectedAccessState = (expectedState === TenantMigrationTest.State.kCommitted)
+ ? TenantMigrationTest.AccessState.kReject
+ : TenantMigrationTest.AccessState.kAborted;
+ const mtabs =
+ assert.commandWorked(node.adminCommand({serverStatus: 1})).tenantMigrationAccessBlocker;
+ return (mtabs[tenantId].state === expectedAccessState);
+ };
+
+ function loadDummyData() {
+ const numDocs = 20;
+ const testData = [];
+ for (let i = 0; i < numDocs; ++i) {
+ testData.push({_id: i, x: i});
+ }
+ return testData;
+ }
+
+ /**
+ * Inserts documents into the specified collection on the donor primary.
+ */
+ this.insertDonorDB = function(dbName, collName, data = loadDummyData()) {
+ jsTestLog(`Inserting data into collection ${collName} of DB ${dbName} on the donor`);
+ const db = this.getDonorPrimary().getDB(dbName);
+ const coll = db.getCollection(collName);
+
+ assert.commandWorked(coll.insertMany(data));
+ };
+
+ /**
+ * Verifies that the documents on the recipient primary are correct.
+ */
+ this.verifyReceipientDB = function(tenantId, dbName, collName, data = loadDummyData()) {
+ // TODO (SERVER-51734): Uncomment this line.
+ // const shouldMigrate = this.isNamespaceForTenant(tenantId, dbName);
+ const shouldMigrate = false;
+
+ jsTestLog(`Verifying that data in collection ${collName} of DB ${dbName} was ${
+ (shouldMigrate ? "" : "not")} migrated to the recipient`);
+
+ const db = this.getRecipientPrimary().getDB(dbName);
+ const coll = db.getCollection(collName);
+
+ const findRes = coll.find();
+ const numDocsFound = findRes.count();
+
+ if (!shouldMigrate) {
+ assert.eq(0,
+ numDocsFound,
+ `Find command on recipient collection ${collName} of DB ${
+ dbName} should return 0 docs, instead has count of ${numDocsFound}`);
+ return;
+ }
+
+ const numDocsExpected = data.length;
+ assert.eq(numDocsExpected,
+ numDocsFound,
+ `Find command on recipient collection ${collName} of DB ${dbName} should return ${
+ numDocsExpected} docs, instead has count of ${numDocsFound}`);
+
+ const docsReturned = findRes.sort({_id: 1}).toArray();
+ assert(arrayEq(docsReturned, data),
+ () => (`${tojson(docsReturned)} is not equal to ${tojson(data)}`));
+ };
+
+ /**
+ * Crafts a tenant database name.
+ */
+ this.tenantDB = function(tenantId, dbName) {
+ return `${tenantId}_${dbName}`;
+ };
+
+ /**
+ * Crafts a database name that does not belong to the tenant.
+ */
+ this.nonTenantDB = function(tenantId, dbName) {
+ return `non_${tenantId}_${dbName}`;
+ };
+
+ /**
+ * Determines if a database name belongs to the given tenant.
+ */
+ this.isNamespaceForTenant = function(tenantId, dbName) {
+ return dbName.startsWith(`${tenantId}_`);
+ };
+
+ /**
+ * Returns the TenantMigrationAccessBlocker associated with given the tenantId on the
+ * node.
+ */
+ this.getTenantMigrationAccessBlocker = function(node, tenantId) {
+ return assert.commandWorked(node.adminCommand({serverStatus: 1}))
+ .tenantMigrationAccessBlocker[tenantId];
+ };
+
+ /**
+ * Returns the donor ReplSetTest.
+ */
+ this.getDonorRst = function() {
+ return donorRst;
+ };
+
+ /**
+ * Returns the recipient ReplSetTest.
+ */
+ this.getRecipientRst = function() {
+ return recipientRst;
+ };
+
+ /**
+ * Returns the donor's primary.
+ */
+ this.getDonorPrimary = function() {
+ return this.getDonorRst().getPrimary();
+ };
+
+ /**
+ * Returns the recipient's primary.
+ */
+ this.getRecipientPrimary = function() {
+ return this.getRecipientRst().getPrimary();
+ };
+
+ /**
+ * Returns the recipient's connection string.
+ */
+ this.getRecipientConnString = function() {
+ return this.getRecipientRst().getURL();
+ };
+
+ /**
+ * Shuts down the donor and recipient sets, only if they were not passed in as parameters.
+ * If they were passed in, the test that initialized them should be responsible for shutting
+ * them down.
+ */
+ this.stop = function() {
+ if (!donorPassedIn)
+ donorRst.stopSet();
+ if (!recipientPassedIn)
+ recipientRst.stopSet();
+ };
+}
+
+TenantMigrationTest.State = {
+ kCommitted: "committed",
+ kAborted: "aborted",
+ kDataSync: "data sync",
+ kBlocking: "blocking",
+};
+
+TenantMigrationTest.AccessState = {
+ kAllow: "allow",
+ kBlockWrites: "blockWrites",
+ kBlockWritesAndReads: "blockWritesAndReads",
+ kReject: "reject",
+ kAborted: "aborted",
+};
+
+TenantMigrationTest.kConfigDonorsNS = "config.tenantMigrationDonors";
diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js
index b983cbecb36..8b903a9aa14 100644
--- a/jstests/replsets/libs/tenant_migration_util.js
+++ b/jstests/replsets/libs/tenant_migration_util.js
@@ -2,230 +2,112 @@
* Utilities for testing tenant migrations.
*/
var TenantMigrationUtil = (function() {
- // An object that mirrors the access states for the TenantMigrationAccessBlocker.
- const accessState = {
- kAllow: "allow",
- kBlockWrites: "blockWrites",
- kBlockWritesAndReads: "blockWritesAndReads",
- kReject: "reject",
- kAborted: "aborted"
- };
+ load("jstests/replsets/libs/tenant_migration_test.js");
/**
- * Runs the donorStartMigration command with the given migration options every 'intervalMS'
+ * Runs the donorStartMigration command with the given migration options
* until the migration commits or aborts, or until the command fails. Returns the last command
* response.
+ *
+ * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a
+ * NotPrimary or network error.
+ *
+ * Only use when it is necessary to run the donorStartMigration command in its own thread. For
+ * all other use cases, please consider the runMigration() function in the TenantMigrationTest
+ * fixture.
*/
- function startMigration(donorPrimaryHost, migrationOpts, intervalMS = 100) {
+ function runMigrationAsync(migrationOpts, donorRstArgs, retryOnRetryableErrors = false) {
const cmdObj = {
donorStartMigration: 1,
migrationId: UUID(migrationOpts.migrationIdString),
recipientConnectionString: migrationOpts.recipientConnString,
tenantId: migrationOpts.tenantId,
- readPreference: migrationOpts.readPreference
- };
- const donorPrimary = new Mongo(donorPrimaryHost);
-
- while (true) {
- const res = donorPrimary.adminCommand(cmdObj);
- if (!res.ok || res.state == "committed" || res.state == "aborted") {
- return res;
- }
- sleep(intervalMS);
- }
- }
-
- /**
- * Runs the donorForgetMigration command with the given migrationId and returns the response.
- */
- function forgetMigration(donorPrimaryHost, migrationIdString) {
- const donorPrimary = new Mongo(donorPrimaryHost);
- return donorPrimary.adminCommand(
- {donorForgetMigration: 1, migrationId: UUID(migrationIdString)});
- }
-
- /**
- * Runs the donorStartMigration command with the given migration options every 'intervalMS'
- * until the migration commits or aborts, or until the command fails with error other than
- * NotPrimary or network errors. Returns the last command response.
- */
- function startMigrationRetryOnRetryableErrors(donorRstArgs, migrationOpts, intervalMS = 100) {
- const cmdObj = {
- donorStartMigration: 1,
- migrationId: UUID(migrationOpts.migrationIdString),
- recipientConnectionString: migrationOpts.recipientConnString,
- tenantId: migrationOpts.tenantId,
- readPreference: migrationOpts.readPreference
+ readPreference: migrationOpts.readPreference || {mode: "primary"},
};
const donorRst = new ReplSetTest({rstArgs: donorRstArgs});
let donorPrimary = donorRst.getPrimary();
- while (true) {
- let res;
+ let res;
+ assert.soon(() => {
try {
res = donorPrimary.adminCommand(cmdObj);
- } catch (e) {
- if (isNetworkError(e)) {
- jsTest.log(`Ignoring network error ${tojson(e)} for command ${tojson(cmdObj)}`);
- continue;
+
+ if (!res.ok) {
+ // If retry is enabled and the command failed with a NotPrimary error, continue
+ // looping.
+ if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) {
+ donorPrimary = donorRst.getPrimary();
+ return false;
+ }
+ return true;
}
- throw e;
- }
- if (!res.ok) {
- if (!ErrorCodes.isNotPrimaryError(res.code)) {
- return res;
+ return (res.state === "committed" || res.state === "aborted");
+ } catch (e) {
+ // If the thrown error is network related and we are allowing retryable errors,
+ // continue issuing commands.
+ if (retryOnRetryableErrors && isNetworkError(e)) {
+ return false;
}
- donorPrimary = donorRst.getPrimary();
- } else if (res.state == "committed" || res.state == "aborted") {
- return res;
+ throw e;
}
- sleep(intervalMS);
- }
+ });
+ return res;
}
/**
- * Runs the donorForgetMigration command with the given migrationId until the command succeeds
- * or fails with error other than NotPrimary or network errors. Returns the last command
- * response.
+ * Runs the donorForgetMigration command with the given migrationId and returns the response.
+ *
+ * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a
+ * NotPrimary or network error.
+ *
+ * Only use when it is necessary to run the donorForgetMigration command in its own thread. For
+ * all other use cases, please consider the forgetMigration() function in the
+ * TenantMigrationTest fixture.
*/
- function forgetMigrationRetryOnRetryableErrors(donorRstArgs, migrationIdString) {
- const cmdObj = {donorForgetMigration: 1, migrationId: UUID(migrationIdString)};
-
+ function forgetMigrationAsync(migrationIdString, donorRstArgs, retryOnRetryableErrors = false) {
const donorRst = new ReplSetTest({rstArgs: donorRstArgs});
let donorPrimary = donorRst.getPrimary();
- while (true) {
- let res;
+ let res;
+
+ assert.soon(() => {
try {
- res = donorPrimary.adminCommand(cmdObj);
+ res = donorPrimary.adminCommand(
+ {donorForgetMigration: 1, migrationId: UUID(migrationIdString)});
+
+ if (!res.ok) {
+ // If retry is enabled and the command failed with a NotPrimary error, continue
+ // looping.
+ if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) {
+ donorPrimary = donorRst.getPrimary();
+ return false;
+ }
+ }
+
+ return true;
} catch (e) {
- if (isNetworkError(e)) {
- jsTest.log(`Ignoring network error ${tojson(e)} for command ${tojson(cmdObj)}`);
- continue;
+ if (retryOnRetryableErrors && isNetworkError(e)) {
+ return false;
}
throw e;
}
-
- if (res.ok || !ErrorCodes.isNotPrimaryError(res.code)) {
- return res;
- }
- donorPrimary = donorRst.getPrimary();
- }
- }
-
- /**
- * Returns true if the durable and in-memory state for the migration 'migrationId' and
- * 'tenantId' is in state "committed", and false otherwise.
- */
- function isMigrationCommitted(node, migrationId, tenantId) {
- const configDonorsColl = node.getCollection("config.tenantMigrationDonors");
- if (configDonorsColl.findOne({_id: migrationId}).state != "committed") {
- return false;
- }
- const mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return mtabs[tenantId].state === TenantMigrationUtil.accessState.kReject;
- }
-
- /**
- * Returns true if the durable and in-memory state for the migration 'migrationId' and
- * 'tenantId' is in state "aborted", and false otherwise.
- */
- function isMigrationAborted(node, migrationId, tenantId) {
- const configDonorsColl = node.getCollection("config.tenantMigrationDonors");
- if (configDonorsColl.findOne({_id: migrationId}).state != "aborted") {
- return false;
- }
- const mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return mtabs[tenantId].state === TenantMigrationUtil.accessState.kAborted;
- }
-
- /**
- * Asserts that the migration 'migrationId' and 'tenantId' is in state "committed" on all the
- * given nodes.
- */
- function assertMigrationCommitted(nodes, migrationId, tenantId) {
- nodes.forEach(node => {
- assert(isMigrationCommitted(node, migrationId, tenantId));
- });
- }
-
- /**
- * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to state "committed"
- * on all the given nodes.
- */
- function waitForMigrationToCommit(nodes, migrationId, tenantId) {
- nodes.forEach(node => {
- assert.soon(() => isMigrationCommitted(node, migrationId, tenantId));
- });
- }
-
- /**
- * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to state "aborted"
- * on all the given nodes.
- */
- function waitForMigrationToAbort(nodes, migrationId, tenantId) {
- nodes.forEach(node => {
- assert.soon(() => isMigrationAborted(node, migrationId, tenantId));
});
+ return res;
}
- /**
- * Asserts that durable and in-memory state for the migration 'migrationId' and 'tenantId' is
- * eventually deleted from the given nodes.
- */
- function waitForMigrationGarbageCollection(nodes, migrationId, tenantId) {
- nodes.forEach(node => {
- const configDonorsColl = node.getCollection("config.tenantMigrationDonors");
- assert.soon(() => 0 === configDonorsColl.count({_id: migrationId}));
-
- assert.soon(() => 0 ===
- node.adminCommand({serverStatus: 1})
- .repl.primaryOnlyServices.TenantMigrationDonorService);
-
- let mtabs;
- assert.soon(() => {
- mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return !mtabs || !mtabs[tenantId];
- }, tojson(mtabs));
- });
- }
-
- /**
- * Returns the TenantMigrationAccessBlocker associated with given the tenantId on the
- * node.
- */
- function getTenantMigrationAccessBlocker(node, tenantId) {
- return node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker[tenantId];
- }
-
- /**
- * Crafts a tenant database name, given the tenantId.
- */
- function tenantDB(tenantId, dbName) {
- return `${tenantId}_${dbName}`;
- }
-
- /**
- * Crafts a database name that does not belong to the tenant, given the tenantId.
- */
- function nonTenantDB(tenantId, dbName) {
- return `non_${tenantId}_${dbName}`;
+ function createRstArgs(donorRst) {
+ const donorRstArgs = {
+ name: donorRst.name,
+ nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`),
+ nodeOptions: donorRst.nodeOptions,
+ keyFile: donorRst.keyFile,
+ host: donorRst.host,
+ waitForKeys: false,
+ };
+ return donorRstArgs;
}
- return {
- accessState,
- startMigration,
- forgetMigration,
- startMigrationRetryOnRetryableErrors,
- forgetMigrationRetryOnRetryableErrors,
- assertMigrationCommitted,
- waitForMigrationToCommit,
- waitForMigrationToAbort,
- waitForMigrationGarbageCollection,
- getTenantMigrationAccessBlocker,
- tenantDB,
- nonTenantDB,
- };
+ return {runMigrationAsync, forgetMigrationAsync, createRstArgs};
})();
diff --git a/jstests/replsets/tenant_migration_commit_transaction_retry.js b/jstests/replsets/tenant_migration_commit_transaction_retry.js
index 49b07ec2f6c..a0eb97b217e 100644
--- a/jstests/replsets/tenant_migration_commit_transaction_retry.js
+++ b/jstests/replsets/tenant_migration_commit_transaction_retry.js
@@ -10,6 +10,7 @@
// Direct writes to config.transactions cannot be part of a session.
TestData.disableImplicitSessions = true;
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
load("jstests/replsets/rslib.js");
load("jstests/libs/uuid_util.js");
@@ -48,9 +49,10 @@ donorRst.initiate();
recipientRst.startSet();
recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
+
const kTenantId = "testTenantId";
-const kDbName = kTenantId + "_" +
- "testDb";
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
const kCollName = "testColl";
const kNs = `${kDbName}.${kCollName}`;
@@ -80,18 +82,15 @@ jsTest.log("Run a migration to completion");
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
-assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
+assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
const donorDoc =
donorPrimary.getCollection("config.tenantMigrationDonors").findOne({tenantId: kTenantId});
-assert.commandWorked(
- donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}));
-TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, kTenantId);
+assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, kTenantId);
{
jsTest.log("Run another transaction after the migration");
diff --git a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
index 98e7feb16cd..a4d4f4146da 100644
--- a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
+++ b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
@@ -12,6 +12,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
const kMaxBatchSize = 2;
@@ -123,36 +124,37 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkUnorderedInserts-committed_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkUnorderedInserts-committed";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- let writeFp = configureFailPoint(
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
+
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
- let migrationRes =
- assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts));
- assert.eq(migrationRes.state, "committed");
+ const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
writeFp.off();
bulkWriteThread.join();
- let bulkWriteRes = bulkWriteThread.returnData();
- let writeErrors = bulkWriteRes.res.writeErrors;
+ const bulkWriteRes = bulkWriteThread.returnData();
+ const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length,
@@ -166,10 +168,12 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
assert(!err.errmsg);
}
- assert.eq(err.errInfo.recipientConnectionString, migrationOpts.recipientConnString);
- assert.eq(err.errInfo.tenantId, migrationOpts.tenantId);
+ assert.eq(err.errInfo.recipientConnectionString,
+ tenantMigrationTest.getRecipientConnString());
+ assert.eq(err.errInfo.tenantId, tenantId);
});
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
@@ -183,27 +187,31 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkUnorderedInserts-committed_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkUnorderedInserts-committed";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- let writeFp = configureFailPoint(
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
@@ -218,8 +226,8 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
bulkWriteThread.join();
migrationThread.join();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "committed");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
let bulkWriteRes = bulkWriteThread.returnData();
let writeErrors = bulkWriteRes.res.writeErrors;
@@ -236,10 +244,12 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
assert.eq(err.errmsg, "");
}
- assert.eq(err.errInfo.recipientConnectionString, migrationOpts.recipientConnString);
- assert.eq(err.errInfo.tenantId, migrationOpts.tenantId);
+ assert.eq(err.errInfo.recipientConnectionString,
+ tenantMigrationTest.getRecipientConnString());
+ assert.eq(err.errInfo.tenantId, tenantId);
});
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
@@ -252,32 +262,36 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkUnorderedInserts-aborted_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkUnorderedInserts-aborted";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- let writeFp = configureFailPoint(
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps);
- let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
+ const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
// The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op
// observer. Without this failpoint, the migration could have already aborted by the time the
// write gets to the op observer.
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
@@ -294,11 +308,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
abortFp.off();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "aborted");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted);
- let bulkWriteRes = bulkWriteThread.returnData();
- let writeErrors = bulkWriteRes.res.writeErrors;
+ const bulkWriteRes = bulkWriteThread.returnData();
+ const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length,
@@ -313,6 +327,7 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
}
});
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
@@ -325,36 +340,37 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkOrderedInserts-committed_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkOrderedInserts-committed";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- let writeFp = configureFailPoint(
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
+
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
bulkWriteThread.start();
writeFp.wait();
- let migrationRes =
- assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts));
- assert.eq(migrationRes.state, "committed");
+ const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
writeFp.off();
bulkWriteThread.join();
- let bulkWriteRes = bulkWriteThread.returnData();
- let writeErrors = bulkWriteRes.res.writeErrors;
+ const bulkWriteRes = bulkWriteThread.returnData();
+ const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
@@ -364,9 +380,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
// blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
- assert.eq(writeErrors[0].errInfo.recipientConnectionString, migrationOpts.recipientConnString);
- assert.eq(writeErrors[0].errInfo.tenantId, migrationOpts.tenantId);
+ assert.eq(writeErrors[0].errInfo.recipientConnectionString,
+ tenantMigrationTest.getRecipientConnString());
+ assert.eq(writeErrors[0].errInfo.tenantId, tenantId);
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
@@ -380,27 +398,31 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkOrderedInserts-committed_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkOrderedInserts-committed";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
- let writeFp = configureFailPoint(
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
+
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
@@ -415,11 +437,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
bulkWriteThread.join();
migrationThread.join();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "committed");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
- let bulkWriteRes = bulkWriteThread.returnData();
- let writeErrors = bulkWriteRes.res.writeErrors;
+ const bulkWriteRes = bulkWriteThread.returnData();
+ const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
@@ -429,9 +451,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
// blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
- assert.eq(writeErrors[0].errInfo.recipientConnectionString, migrationOpts.recipientConnString);
- assert.eq(writeErrors[0].errInfo.tenantId, migrationOpts.tenantId);
+ assert.eq(writeErrors[0].errInfo.recipientConnectionString,
+ tenantMigrationTest.getRecipientConnString());
+ assert.eq(writeErrors[0].errInfo.tenantId, tenantId);
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
@@ -444,32 +468,36 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
recipientRst.startSet();
recipientRst.initiate();
- let dbName = "bulkOrderedInserts-aborted_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "bulkOrderedInserts-aborted";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
- let writeFp = configureFailPoint(
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
+
+ const writeFp = configureFailPoint(
primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict});
- let bulkWriteThread =
+ const bulkWriteThread =
new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps);
- let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
+ const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
// The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op
// observer. Without this failpoint, the migration could have already aborted by the time the
// write gets to the op observer.
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
bulkWriteThread.start();
writeFp.wait();
@@ -486,11 +514,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
abortFp.off();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "aborted");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted);
- let bulkWriteRes = bulkWriteThread.returnData();
- let writeErrors = bulkWriteRes.res.writeErrors;
+ const bulkWriteRes = bulkWriteThread.returnData();
+ const writeErrors = bulkWriteRes.res.writeErrors;
assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted);
assert.eq(writeErrors.length, 1);
@@ -501,6 +529,7 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) {
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted);
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_concurrent_migrations.js b/jstests/replsets/tenant_migration_concurrent_migrations.js
index b749c6450c1..63796243762 100644
--- a/jstests/replsets/tenant_migration_concurrent_migrations.js
+++ b/jstests/replsets/tenant_migration_concurrent_migrations.js
@@ -15,7 +15,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
let startupParams = {};
startupParams["enableTenantMigrations"] = true;
@@ -35,81 +35,64 @@ rst1.initiate();
rst2.startSet();
rst2.initiate();
-const rst0Primary = rst0.getPrimary();
-const rst1Primary = rst1.getPrimary();
-
-const kConfigDonorsNS = "config.tenantMigrationDonors";
-const kTenantId = "testTenantId";
+const kTenantIdPrefix = "testTenantId";
// Test concurrent outgoing migrations to different recipients.
(() => {
- const tenantId = kTenantId + "ConcurrentOutgoingMigrationsToDifferentRecipient";
- const donorsColl = rst0Primary.getCollection(kConfigDonorsNS);
+ const tenantMigrationTest0 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst1});
+ const tenantMigrationTest1 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst2});
+ const tenantId0 = `${kTenantIdPrefix}_ConcurrentOutgoingMigrationsToDifferentRecipient0`;
+ const tenantId1 = `${kTenantIdPrefix}_ConcurrentOutgoingMigrationsToDifferentRecipient1`;
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
- tenantId: tenantId + "0",
- readPreference: {mode: "primary"}
+ tenantId: tenantId0,
};
const migrationOpts1 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst2.getURL(),
- tenantId: tenantId + "1",
- readPreference: {mode: "primary"}
+ tenantId: tenantId1,
};
- let migrationThread0 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0);
- let migrationThread1 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1);
+ assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts0));
+ assert.commandWorked(tenantMigrationTest1.startMigration(migrationOpts1));
- migrationThread0.start();
- migrationThread1.start();
- migrationThread0.join();
- migrationThread1.join();
+ const stateRes0 =
+ assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts0));
+ const stateRes1 =
+ assert.commandWorked(tenantMigrationTest1.waitForMigrationToComplete(migrationOpts1));
// Verify that both migrations succeeded.
- assert.commandWorked(migrationThread0.returnData());
- assert.commandWorked(migrationThread1.returnData());
- assert(donorsColl.findOne({tenantId: migrationOpts0.tenantId, state: "committed"}));
- assert(donorsColl.findOne({tenantId: migrationOpts1.tenantId, state: "committed"}));
+ assert(stateRes0.state, TenantMigrationTest.State.kCommitted);
+ assert(stateRes1.state, TenantMigrationTest.State.kCommitted);
})();
// Test concurrent incoming migrations from different donors.
(() => {
- const tenantId = kTenantId + "ConcurrentIncomingMigrations";
- const donorsColl0 = rst0Primary.getCollection(kConfigDonorsNS);
- const donorsColl1 = rst1Primary.getCollection(kConfigDonorsNS);
+ const tenantMigrationTest0 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst2});
+ const tenantMigrationTest1 = new TenantMigrationTest({donorRst: rst1, recipientRst: rst2});
+ const tenantId0 = `${kTenantIdPrefix}_ConcurrentIncomingMigrations0`;
+ const tenantId1 = `${kTenantIdPrefix}_ConcurrentIncomingMigrations1`;
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst2.getURL(),
- tenantId: tenantId + "0",
- readPreference: {mode: "primary"}
+ tenantId: tenantId0,
};
const migrationOpts1 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst2.getURL(),
- tenantId: tenantId + "1",
- readPreference: {mode: "primary"}
+ tenantId: tenantId1,
};
- let migrationThread0 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0);
- let migrationThread1 =
- new Thread(TenantMigrationUtil.startMigration, rst1Primary.host, migrationOpts1);
+ assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts0));
+ assert.commandWorked(tenantMigrationTest1.startMigration(migrationOpts1));
- migrationThread0.start();
- migrationThread1.start();
- migrationThread0.join();
- migrationThread1.join();
+ const stateRes0 =
+ assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts0));
+ const stateRes1 =
+ assert.commandWorked(tenantMigrationTest1.waitForMigrationToComplete(migrationOpts1));
// Verify that both migrations succeeded.
- assert.commandWorked(migrationThread0.returnData());
- assert.commandWorked(migrationThread1.returnData());
- assert(donorsColl0.findOne({tenantId: migrationOpts0.tenantId, state: "committed"}));
- assert(donorsColl1.findOne({tenantId: migrationOpts1.tenantId, state: "committed"}));
+ assert(stateRes0.state, TenantMigrationTest.State.kCommitted);
+ assert(stateRes1.state, TenantMigrationTest.State.kCommitted);
})();
// TODO (SERVER-50467): Ensure that tenant migration donor only removes a ReplicaSetMonitor for
diff --git a/jstests/replsets/tenant_migration_concurrent_reads.js b/jstests/replsets/tenant_migration_concurrent_reads.js
index e62ecbba0ef..6170d73891c 100644
--- a/jstests/replsets/tenant_migration_concurrent_reads.js
+++ b/jstests/replsets/tenant_migration_concurrent_reads.js
@@ -14,36 +14,14 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
-
-const donorRst = new ReplSetTest({
- nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
- name: 'donor',
- nodeOptions: {setParameter: {enableTenantMigrations: true}}
-});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: 'recipient',
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
+load("jstests/replsets/libs/tenant_migration_test.js");
-donorRst.startSet();
-donorRst.initiate();
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const kCollName = "testColl";
const kTenantDefinedDbName = "0";
-const kRecipientConnString = recipientRst.getURL();
const kMaxTimeMS = 5 * 1000;
-const kConfigDonorsNS = "config.tenantMigrationDonors";
/**
* To be used to resume a migration that is paused after entering the blocking state. Waits for the
@@ -80,29 +58,29 @@ function runCommand(db, cmd, expectedError) {
/**
* Tests that the donor rejects causal reads after the migration commits.
*/
-function testReadIsRejectedIfSentAfterMigrationHasCommitted(rst, testCase, dbName, collName) {
+function testReadIsRejectedIfSentAfterMigrationHasCommitted(testCase, dbName, collName) {
const tenantId = dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const primary = rst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
- const res =
- assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts));
- assert.eq(res.state, "committed");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
// the oplog on all the secondaries. This is to ensure that the write to put the migration into
// "committed" is majority-committed and that snapshot reads on the secondaries with unspecified
// atClusterTime have read timestamp >= commitTimestamp.
- rst.awaitLastOpCommitted();
+ donorRst.awaitLastOpCommitted();
- const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId});
- const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ tenantId: tenantId
+ });
+ const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary];
nodes.forEach(node => {
const db = node.getDB(dbName);
if (testCase.requiresReadTimestamp) {
@@ -121,31 +99,31 @@ function testReadIsRejectedIfSentAfterMigrationHasCommitted(rst, testCase, dbNam
/**
* Tests that the donor does not reject reads after the migration aborts.
*/
-function testReadIsAcceptedIfSentAfterMigrationHasAborted(rst, testCase, dbName, collName) {
+function testReadIsAcceptedIfSentAfterMigrationHasAborted(testCase, dbName, collName) {
const tenantId = dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const primary = rst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
- let abortFp = configureFailPoint(primary, "abortTenantMigrationAfterBlockingStarts");
- const res =
- assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts));
- assert.eq(res.state, "aborted");
+ let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
abortFp.off();
// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
// the oplog on all the secondaries. This is to ensure that the write to put the migration into
// "aborted" is majority-committed and that snapshot reads on the secondaries with unspecified
// atClusterTime have read timestamp >= abortTimestamp.
- rst.awaitLastOpCommitted();
+ donorRst.awaitLastOpCommitted();
- const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId});
- const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ tenantId: tenantId
+ });
+ const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary];
nodes.forEach(node => {
const db = node.getDB(dbName);
if (testCase.requiresReadTimestamp) {
@@ -161,54 +139,52 @@ function testReadIsAcceptedIfSentAfterMigrationHasAborted(rst, testCase, dbName,
* Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >=
* blockingTimestamp but does not block linearizable reads.
*/
-function testReadBlocksIfMigrationIsInBlocking(rst, testCase, dbName, collName) {
+function testReadBlocksIfMigrationIsInBlocking(testCase, dbName, collName) {
const tenantId = dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const primary = rst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
- let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// Wait for the migration to enter the blocking state.
- migrationThread.start();
blockingFp.wait();
// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
// the oplog on all secondaries to ensure that snapshot reads on the secondaries with
// unspecified atClusterTime have read timestamp >= blockTimestamp.
- rst.awaitLastOpCommitted();
+ donorRst.awaitLastOpCommitted();
- const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId});
+ const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ tenantId: tenantId
+ });
const command = testCase.requiresReadTimestamp
? testCase.command(collName, donorDoc.blockTimestamp)
: testCase.command(collName);
command.maxTimeMS = kMaxTimeMS;
- const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary];
nodes.forEach(node => {
const db = node.getDB(dbName);
runCommand(db, command, testCase.isLinearizableRead ? null : ErrorCodes.MaxTimeMSExpired);
});
blockingFp.off();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "committed");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
}
/**
* Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >=
* blockingTimestamp, and unblocks the reads once the migration aborts.
*/
-function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits(
- rst, testCase, dbName, collName) {
+function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits(testCase, dbName, collName) {
if (testCase.isLinearizableRead) {
// Linearizable reads are not blocked.
return;
@@ -217,41 +193,40 @@ function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits(
const tenantId = dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const primary = rst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
- let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts");
+ let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
const targetBlockedReads =
assert
- .commandWorked(primary.adminCommand(
+ .commandWorked(donorPrimary.adminCommand(
{configureFailPoint: "tenantMigrationBlockRead", mode: "alwaysOn"}))
.count +
1;
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
let resumeMigrationThread =
- new Thread(resumeMigrationAfterBlockingRead, primary.host, targetBlockedReads);
+ new Thread(resumeMigrationAfterBlockingRead, donorPrimary.host, targetBlockedReads);
// Run the commands after the migration enters the blocking state.
resumeMigrationThread.start();
- migrationThread.start();
blockingFp.wait();
// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
// the oplog on all secondaries to ensure that snapshot reads on the secondaries with
// unspecified atClusterTime have read timestamp >= blockTimestamp.
- rst.awaitLastOpCommitted();
+ donorRst.awaitLastOpCommitted();
- const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId});
+ const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ tenantId: tenantId
+ });
const command = testCase.requiresReadTimestamp
? testCase.command(collName, donorDoc.blockTimestamp)
: testCase.command(collName);
- const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary];
// The migration should unpause and commit after the read is blocked. Verify that the read
// is rejected.
@@ -262,16 +237,16 @@ function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits(
// Verify that the migration succeeded.
resumeMigrationThread.join();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "committed");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
}
/**
* Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >=
* blockingTimestamp, and rejects the reads once the migration commits.
*/
-function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase, dbName, collName) {
+function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(testCase, dbName, collName) {
if (testCase.isLinearizableRead) {
// Linearizable reads are not blocked.
return;
@@ -280,42 +255,41 @@ function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase,
const tenantId = dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const primary = rst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = donorRst.getPrimary();
- let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts");
- let abortFp = configureFailPoint(primary, "abortTenantMigrationAfterBlockingStarts");
+ let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
+ let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
const targetBlockedReads =
assert
- .commandWorked(primary.adminCommand(
+ .commandWorked(donorPrimary.adminCommand(
{configureFailPoint: "tenantMigrationBlockRead", mode: "alwaysOn"}))
.count +
1;
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
let resumeMigrationThread =
- new Thread(resumeMigrationAfterBlockingRead, primary.host, targetBlockedReads);
+ new Thread(resumeMigrationAfterBlockingRead, donorPrimary.host, targetBlockedReads);
// Run the commands after the migration enters the blocking state.
resumeMigrationThread.start();
- migrationThread.start();
blockingFp.wait();
// Wait for the last oplog entry on the primary to be visible in the committed snapshot view of
// the oplog on all secondaries to ensure that snapshot reads on the secondaries with
// unspecified atClusterTime have read timestamp >= blockTimestamp.
- rst.awaitLastOpCommitted();
+ donorRst.awaitLastOpCommitted();
- const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId});
+ const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ tenantId: tenantId
+ });
const command = testCase.requiresReadTimestamp
? testCase.command(collName, donorDoc.blockTimestamp)
: testCase.command(collName);
- const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary];
+ const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary];
// The migration should unpause and abort after the read is blocked. Verify that the read
// unblocks.
@@ -326,10 +300,10 @@ function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase,
// Verify that the migration failed due to the simulated error.
resumeMigrationThread.join();
- migrationThread.join();
abortFp.off();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "aborted");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
}
const testCases = {
@@ -419,10 +393,9 @@ const testFuncs = {
for (const [testName, testFunc] of Object.entries(testFuncs)) {
for (const [testCaseName, testCase] of Object.entries(testCases)) {
let dbName = testCaseName + "-" + testName + "_" + kTenantDefinedDbName;
- testFunc(donorRst, testCase, dbName, kCollName);
+ testFunc(testCase, dbName, kCollName);
}
}
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_concurrent_writes.js b/jstests/replsets/tenant_migration_concurrent_writes.js
index de13c5d5a1f..26e7fdbcd6d 100644
--- a/jstests/replsets/tenant_migration_concurrent_writes.js
+++ b/jstests/replsets/tenant_migration_concurrent_writes.js
@@ -15,29 +15,12 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
-
-const donorRst = new ReplSetTest(
- {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: 'recipient',
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
+load("jstests/replsets/libs/tenant_migration_test.js");
-donorRst.startSet();
-donorRst.initiate();
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
+const donorRst = tenantMigrationTest.getDonorRst();
const primary = donorRst.getPrimary();
-const kRecipientConnString = recipientRst.getURL();
const kCollName = "testColl";
const kTenantDefinedDbName = "0";
@@ -248,14 +231,11 @@ function testWriteIsRejectedIfSentAfterMigrationHasCommitted(testCase, testOpts)
const tenantId = testOpts.dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
- const res = assert.commandWorked(
- TenantMigrationUtil.startMigration(testOpts.primaryHost, migrationOpts));
- assert.eq(res.state, "committed");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
runCommand(testOpts, ErrorCodes.TenantMigrationCommitted);
testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
@@ -268,15 +248,12 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) {
const tenantId = testOpts.dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts");
- const res = assert.commandWorked(
- TenantMigrationUtil.startMigration(testOpts.primaryHost, migrationOpts));
- assert.eq(res.state, "aborted");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
abortFp.off();
// Wait until the in-memory migration state is updated after the migration has majority
@@ -285,7 +262,7 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) {
assert.soon(() => {
const mtabs =
testOpts.primaryDB.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return mtabs[tenantId].state === TenantMigrationUtil.accessState.kAborted;
+ return mtabs[tenantId].state === TenantMigrationTest.AccessState.kAborted;
});
runCommand(testOpts);
@@ -299,27 +276,24 @@ function testWriteBlocksIfMigrationIsInBlocking(testCase, testOpts) {
const tenantId = testOpts.dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
let blockingFp =
configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts);
+
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// Run the command after the migration enters the blocking state.
- migrationThread.start();
blockingFp.wait();
testOpts.command.maxTimeMS = kMaxTimeMS;
runCommand(testOpts, ErrorCodes.MaxTimeMSExpired);
// Allow the migration to complete.
blockingFp.off();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "committed");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
@@ -332,9 +306,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te
const tenantId = testOpts.dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
let blockingFp =
@@ -346,14 +318,12 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te
.count +
1;
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts);
let resumeMigrationThread =
new Thread(resumeMigrationAfterBlockingWrite, testOpts.primaryHost, targetBlockedWrites);
// Run the command after the migration enters the blocking state.
resumeMigrationThread.start();
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
blockingFp.wait();
// The migration should unpause and commit after the write is blocked. Verify that the write is
@@ -362,9 +332,9 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te
// Verify that the migration succeeded.
resumeMigrationThread.join();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "committed");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
@@ -377,9 +347,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes
const tenantId = testOpts.dbName.split('_')[0];
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ tenantId,
};
let blockingFp =
@@ -392,14 +360,12 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes
.count +
1;
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts);
let resumeMigrationThread =
new Thread(resumeMigrationAfterBlockingWrite, testOpts.primaryHost, targetBlockedWrites);
// Run the command after the migration enters the blocking state.
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
resumeMigrationThread.start();
- migrationThread.start();
blockingFp.wait();
// The migration should unpause and abort after the write is blocked. Verify that the write is
@@ -408,10 +374,10 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes
// Verify that the migration aborted due to the simulated error.
resumeMigrationThread.join();
- migrationThread.join();
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
abortFp.off();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "aborted");
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
@@ -969,6 +935,5 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) {
}
}
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js
index b5f13ab8f88..a9d5f60b80a 100644
--- a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js
+++ b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js
@@ -7,9 +7,8 @@
(function() {
'use strict';
-load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
/**
* Asserts that the number of recipientDataSync commands executed on the given recipient primary is
@@ -33,76 +32,55 @@ function generateUniqueTenantId() {
return chars[charIndex++];
}
-let startupParams = {};
-startupParams["enableTenantMigrations"] = true;
-// TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
-startupParams["failpoint.returnResponseOkForRecipientSyncDataCmd"] = tojson({mode: 'alwaysOn'});
+const donorRst = new ReplSetTest(
+ {nodes: 1, name: 'donorRst', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const rst0 = new ReplSetTest({nodes: 1, name: 'rst0', nodeOptions: {setParameter: startupParams}});
-const rst1 = new ReplSetTest({nodes: 1, name: 'rst1', nodeOptions: {setParameter: startupParams}});
-const rst2 = new ReplSetTest({nodes: 1, name: 'rst2', nodeOptions: {setParameter: startupParams}});
+donorRst.startSet();
+donorRst.initiate();
-rst0.startSet();
-rst0.initiate();
+const tenantMigrationTest0 = new TenantMigrationTest({name: jsTestName(), donorRst});
-rst1.startSet();
-rst1.initiate();
-
-rst2.startSet();
-rst2.initiate();
-
-const rst0Primary = rst0.getPrimary();
-const rst1Primary = rst1.getPrimary();
-
-const kConfigDonorsNS = "config.tenantMigrationDonors";
+const donorPrimary = donorRst.getPrimary();
+const recipientPrimary = tenantMigrationTest0.getRecipientPrimary();
let numRecipientSyncDataCmdSent = 0;
// Test that a retry of a donorStartMigration command joins the existing migration that has
// completed but has not been garbage-collected.
(() => {
+ const tenantId = `${generateUniqueTenantId()}_RetryAfterMigrationCompletes`;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
- tenantId: generateUniqueTenantId() + "RetryAfterMigrationCompletes",
- readPreference: {mode: "primary"}
+ tenantId,
};
- assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts));
- assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts));
+ assert.commandWorked(tenantMigrationTest0.runMigration(migrationOpts));
+ assert.commandWorked(tenantMigrationTest0.runMigration(migrationOpts));
// If the second donorStartMigration had started a duplicate migration, the recipient would have
// received four recipientSyncData commands instead of two.
numRecipientSyncDataCmdSent += 2;
- checkNumRecipientSyncDataCmdExecuted(rst1Primary, numRecipientSyncDataCmdSent);
+ checkNumRecipientSyncDataCmdExecuted(recipientPrimary, numRecipientSyncDataCmdSent);
})();
// Test that a retry of a donorStartMigration command joins the ongoing migration.
(() => {
+ const tenantId = `${generateUniqueTenantId()}_RetryBeforeMigrationCompletes`;
const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
- tenantId: generateUniqueTenantId() + "RetryBeforeMigrationCompletes",
- readPreference: {mode: "primary"}
+ tenantId,
};
- let migrationThread0 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts);
- let migrationThread1 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts);
-
- migrationThread0.start();
- migrationThread1.start();
- migrationThread0.join();
- migrationThread1.join();
+ assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts));
+ assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts));
- assert.commandWorked(migrationThread0.returnData());
- assert.commandWorked(migrationThread1.returnData());
+ assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts));
+ assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts));
// If the second donorStartMigration had started a duplicate migration, the recipient would have
// received four recipientSyncData commands instead of two.
numRecipientSyncDataCmdSent += 2;
- checkNumRecipientSyncDataCmdExecuted(rst1Primary, numRecipientSyncDataCmdSent);
+ checkNumRecipientSyncDataCmdExecuted(recipientPrimary, numRecipientSyncDataCmdSent);
})();
/**
@@ -111,15 +89,14 @@ let numRecipientSyncDataCmdSent = 0;
* has committed but not garbage-collected (i.e. the donor has not received donorForgetMigration).
*/
function testStartingConflictingMigrationAfterInitialMigrationCommitted(
- donorPrimary, migrationOpts0, migrationOpts1) {
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts0));
- assert.commandFailedWithCode(
- TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts1),
- ErrorCodes.ConflictingOperationInProgress);
+ tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1) {
+ tenantMigrationTest0.runMigration(migrationOpts0);
+ assert.commandFailedWithCode(tenantMigrationTest1.runMigration(migrationOpts1),
+ ErrorCodes.ConflictingOperationInProgress);
// If the second donorStartMigration had started a duplicate migration, there would be two donor
// state docs.
- let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert.eq(1, configDonorsColl.count({tenantId: migrationOpts0.tenantId}));
}
@@ -127,20 +104,12 @@ function testStartingConflictingMigrationAfterInitialMigrationCommitted(
* Tests that if the client runs multiple donorStartMigration commands that would start conflicting
* migrations, only one of the migrations will start and succeed.
*/
-function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migrationOpts1) {
- let migrationThread0 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0);
- let migrationThread1 =
- new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1);
+function testConcurrentConflictingMigrations(
+ tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1) {
+ const res0 = tenantMigrationTest0.startMigration(migrationOpts0);
+ const res1 = tenantMigrationTest1.startMigration(migrationOpts1);
- migrationThread0.start();
- migrationThread1.start();
- migrationThread0.join();
- migrationThread1.join();
-
- const res0 = migrationThread0.returnData();
- const res1 = migrationThread1.returnData();
- let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
// Verify that only one migration succeeded.
assert(res0.ok || res1.ok);
@@ -163,83 +132,75 @@ function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migra
// Test migrations with different migrationIds but identical settings.
(() => {
- let makeMigrationOpts = () => {
+ let makeTestParams = () => {
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
tenantId: generateUniqueTenantId() + "DiffMigrationId",
- readPreference: {mode: "primary"}
};
const migrationOpts1 = Object.extend({}, migrationOpts0, true);
migrationOpts1.migrationIdString = extractUUIDFromObject(UUID());
- return [migrationOpts0, migrationOpts1];
+ return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1];
};
- testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary,
- ...makeMigrationOpts());
- testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts());
+ testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams());
+ testConcurrentConflictingMigrations(...makeTestParams());
})();
// Test reusing a migrationId for different migration settings.
// Test different tenantIds.
(() => {
- let makeMigrationOpts = () => {
+ let makeTestParams = () => {
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
tenantId: generateUniqueTenantId() + "DiffTenantId",
- readPreference: {mode: "primary"}
};
const migrationOpts1 = Object.extend({}, migrationOpts0, true);
migrationOpts1.tenantId = generateUniqueTenantId() + "DiffTenantId";
- return [migrationOpts0, migrationOpts1];
+ return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1];
};
- testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary,
- ...makeMigrationOpts());
- testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts());
+ testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams());
+ testConcurrentConflictingMigrations(...makeTestParams());
})();
// Test different recipient connection strings.
(() => {
- let makeMigrationOpts = () => {
+ const tenantMigrationTest1 = new TenantMigrationTest({name: `${jsTestName()}1`, donorRst});
+
+ let makeTestParams = () => {
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
tenantId: generateUniqueTenantId() + "DiffRecipientConnString",
- readPreference: {mode: "primary"}
};
+ // The recipient connection string will be populated by the TenantMigrationTest fixture, so
+ // no need to set it here.
const migrationOpts1 = Object.extend({}, migrationOpts0, true);
- migrationOpts1.recipientConnString = rst2.getURL();
- return [migrationOpts0, migrationOpts1];
+ return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1];
};
- testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary,
- ...makeMigrationOpts());
- testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts());
+ testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams());
+ testConcurrentConflictingMigrations(...makeTestParams());
+
+ tenantMigrationTest1.stop();
})();
// Test different cloning read preference.
(() => {
- let makeMigrationOpts = () => {
+ let makeTestParams = () => {
const migrationOpts0 = {
migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: rst1.getURL(),
tenantId: generateUniqueTenantId() + "DiffReadPref",
- readPreference: {mode: "primary"}
};
const migrationOpts1 = Object.extend({}, migrationOpts0, true);
migrationOpts1.readPreference = {mode: "secondary"};
- return [migrationOpts0, migrationOpts1];
+ return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1];
};
- testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary,
- ...makeMigrationOpts());
- testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts());
+ testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams());
+ testConcurrentConflictingMigrations(...makeTestParams());
})();
-rst0.stopSet();
-rst1.stopSet();
-rst2.stopSet();
+tenantMigrationTest0.stop();
+donorRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_donor_abort_state_transition.js b/jstests/replsets/tenant_migration_donor_abort_state_transition.js
index 8ec5b6bedc7..4b40cc59980 100644
--- a/jstests/replsets/tenant_migration_donor_abort_state_transition.js
+++ b/jstests/replsets/tenant_migration_donor_abort_state_transition.js
@@ -9,29 +9,11 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/libs/parallelTester.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-const kTenantId = "testTenantId";
-
-const donorRst = new ReplSetTest(
- {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipientRst",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
-
-donorRst.startSet();
-donorRst.initiate();
-
-recipientRst.startSet();
-recipientRst.initiate();
+const kTenantIdPrefix = "testTenantId";
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
/**
* Starts a migration and forces the write to insert the donor's state doc to abort on the first few
@@ -40,19 +22,24 @@ recipientRst.initiate();
function testAbortInitialState(donorRst) {
const donorPrimary = donorRst.getPrimary();
+ // Force the storage transaction for the insert to abort prior to inserting the WiredTiger
+ // record.
+ let writeConflictFp = configureFailPoint(donorPrimary, "WTWriteConflictException");
+
+ const tenantId = `${kTenantIdPrefix}-initial`;
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- tenantId: kTenantId + "-initial",
- recipientConnString: recipientRst.getURL(),
- readPreference: {mode: "primary"},
+ tenantId,
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
- // Force the storage transaction for the insert to abort prior to inserting the WiredTiger
- // record.
- let writeConflictFp = configureFailPoint(donorPrimary, "WTWriteConflictException");
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ // Run the migration in its own thread, since the initial 'donorStartMigration' command will
+ // hang due to the failpoint.
let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
migrationThread.start();
writeConflictFp.wait();
@@ -65,8 +52,8 @@ function testAbortInitialState(donorRst) {
// Verify that the migration completes successfully.
assert.commandWorked(migrationThread.returnData());
- TenantMigrationUtil.waitForMigrationToCommit(
- donorRst.nodes, migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.waitForNodesToReachState(
+ donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted);
}
/**
@@ -80,21 +67,18 @@ function testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nex
nextState}" after reaching failpoint "${pauseFailPoint}"`);
const donorPrimary = donorRst.getPrimary();
+ const tenantId = `${kTenantIdPrefix}-${nextState}`;
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- tenantId: kTenantId + "-" + nextState,
- recipientConnString: recipientRst.getURL(),
- readPreference: {mode: "primary"},
+ tenantId,
};
setUpFailPoints.forEach(failPoint => configureFailPoint(donorPrimary, failPoint));
let pauseFp = configureFailPoint(donorPrimary, pauseFailPoint);
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
pauseFp.wait();
// Force the storage transaction for the write to transition to the next state to abort prior to
@@ -111,30 +95,36 @@ function testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nex
opObserverFp.off();
// Verify that the migration completes successfully.
- assert.commandWorked(migrationThread.returnData());
- if (nextState == "aborted") {
- TenantMigrationUtil.waitForMigrationToAbort(
- donorRst.nodes, migrationId, migrationOpts.tenantId);
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ if (nextState === TenantMigrationTest.State.kAborted) {
+ tenantMigrationTest.waitForNodesToReachState(
+ donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kAborted);
} else {
- TenantMigrationUtil.waitForMigrationToCommit(
- donorRst.nodes, migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.waitForNodesToReachState(
+ donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted);
}
}
+const donorRst = tenantMigrationTest.getDonorRst();
jsTest.log("Test aborting donor's state doc insert");
testAbortInitialState(donorRst);
jsTest.log("Test aborting donor's state doc update");
-[{pauseFailPoint: "pauseTenantMigrationAfterDataSync", nextState: "blocking"},
- {pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts", nextState: "committed"},
+[{
+ pauseFailPoint: "pauseTenantMigrationAfterDataSync",
+ nextState: TenantMigrationTest.State.kBlocking
+},
+ {
+ pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts",
+ nextState: TenantMigrationTest.State.kCommitted
+ },
{
pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts",
setUpFailPoints: ["abortTenantMigrationAfterBlockingStarts"],
- nextState: "aborted"
+ nextState: TenantMigrationTest.State.kAborted
}].forEach(({pauseFailPoint, setUpFailPoints = [], nextState}) => {
testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nextState);
});
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
}());
diff --git a/jstests/replsets/tenant_migration_donor_current_op.js b/jstests/replsets/tenant_migration_donor_current_op.js
index 5647a06d3f8..55481a20e8c 100644
--- a/jstests/replsets/tenant_migration_donor_current_op.js
+++ b/jstests/replsets/tenant_migration_donor_current_op.js
@@ -11,9 +11,8 @@
"use strict";
load("jstests/libs/fail_point_util.js");
-load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
// An object that mirrors the donor migration states.
const migrationStates = {
@@ -24,11 +23,9 @@ const migrationStates = {
kAborted: 4
};
-const donorRst = new ReplSetTest(
- {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
const recipientRst = new ReplSetTest({
nodes: 1,
- name: 'recipient',
+ name: 'donor',
nodeOptions: {
setParameter: {
enableTenantMigrations: true,
@@ -37,31 +34,24 @@ const recipientRst = new ReplSetTest({
}
}
});
+recipientRst.startSet();
+recipientRst.initiate();
const kRecipientConnString = recipientRst.getURL();
const kTenantId = 'testTenantId';
-recipientRst.startSet();
-recipientRst.initiate();
-
(() => {
jsTestLog("Testing currentOp output for migration in data sync state");
- donorRst.startSet();
- donorRst.initiate();
- const donorPrimary = donorRst.getPrimary();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst});
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
-
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
fp.wait();
const res = assert.commandWorked(
@@ -73,28 +63,22 @@ recipientRst.initiate();
assert.eq(res.inprog[0].migrationCompleted, false);
fp.off();
- migrationThread.join();
- donorRst.stopSet();
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ tenantMigrationTest.stop();
})();
(() => {
jsTestLog("Testing currentOp output for migration in blocking state");
- donorRst.startSet();
- donorRst.initiate();
- const donorPrimary = donorRst.getPrimary();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst});
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
-
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
fp.wait();
const res = assert.commandWorked(
@@ -107,25 +91,22 @@ recipientRst.initiate();
assert.eq(res.inprog[0].migrationCompleted, false);
fp.off();
- migrationThread.join();
- donorRst.stopSet();
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ tenantMigrationTest.stop();
})();
(() => {
jsTestLog("Testing currentOp output for aborted migration");
- donorRst.startSet();
- donorRst.initiate();
- const donorPrimary = donorRst.getPrimary();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst});
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
+ assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
const res = assert.commandWorked(
donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}));
@@ -137,25 +118,21 @@ recipientRst.initiate();
assert(res.inprog[0].commitOrAbortOpTime);
assert(res.inprog[0].abortReason);
assert.eq(res.inprog[0].migrationCompleted, false);
-
- donorRst.stopSet();
+ tenantMigrationTest.stop();
})();
// Check currentOp while in committed state before and after a migration has completed.
(() => {
jsTestLog("Testing currentOp output for committed migration");
- donorRst.startSet();
- donorRst.initiate();
- const donorPrimary = donorRst.getPrimary();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst});
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
+ assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
let res = donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"});
assert.eq(res.inprog.length, 1);
@@ -168,8 +145,7 @@ recipientRst.initiate();
jsTestLog("Testing currentOp output for a committed migration after donorForgetMigration");
- assert.commandWorked(
- TenantMigrationUtil.forgetMigration(donorPrimary.host, migrationOpts.migrationIdString));
+ assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
res = donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"});
assert.eq(res.inprog.length, 1);
@@ -180,8 +156,7 @@ recipientRst.initiate();
assert(res.inprog[0].commitOrAbortOpTime);
assert(res.inprog[0].expireAt);
assert.eq(res.inprog[0].migrationCompleted, true);
-
- donorRst.stopSet();
+ tenantMigrationTest.stop();
})();
recipientRst.stopSet();
diff --git a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
index ad43691ecec..5897bc68806 100644
--- a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
@@ -11,46 +11,16 @@
"use strict";
load("jstests/libs/fail_point_util.js");
-load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
-
-const donorRst = new ReplSetTest(
- {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: 'recipient',
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
-
-donorRst.startSet();
-donorRst.initiate();
+load("jstests/libs/parallelTester.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const kMaxSleepTimeMS = 1000;
const kTenantId = 'testTenantId';
-const kConfigDonorsNS = "config.tenantMigrationDonors";
-let donorPrimary = donorRst.getPrimary();
-let kRecipientConnString = recipientRst.getURL();
-
-const migrationOpts = {
- migrationIdString: extractUUIDFromObject(UUID()),
- recipientConnString: kRecipientConnString,
- tenantId: kTenantId,
- readPreference: {mode: "primary"},
-};
-
-let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+let donorPrimary = tenantMigrationTest.getDonorPrimary();
// Force the migration to pause after entering a randomly selected state to simulate a failure.
Random.setRandomSeed();
@@ -65,63 +35,68 @@ if (index < kMigrationFpNames.length) {
fp = configureFailPoint(donorPrimary, kMigrationFpNames[index]);
}
-migrationThread.start();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(UUID()),
+ tenantId: kTenantId
+};
+assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
sleep(Math.random() * kMaxSleepTimeMS);
// Add the initial sync node and make sure that it does not step up.
-var initialSyncNode =
+const donorRst = tenantMigrationTest.getDonorRst();
+const initialSyncNode =
donorRst.add({rsConfig: {priority: 0, votes: 0}, setParameter: {enableTenantMigrations: true}});
donorRst.reInitiate();
jsTestLog("Waiting for initial sync to finish.");
donorRst.awaitSecondaryNodes();
-let configDonorsColl = initialSyncNode.getCollection(kConfigDonorsNS);
+let configDonorsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigDonorsNS);
let donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
if (donorDoc) {
let state = donorDoc.state;
switch (state) {
- case "data sync":
- assert.soon(() => TenantMigrationUtil
+ case TenantMigrationTest.State.kDataSync:
+ assert.soon(() => tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
- .state == TenantMigrationUtil.accessState.kAllow);
+ .state == TenantMigrationTest.AccessState.kAllow);
break;
- case "blocking":
- assert.soon(() => TenantMigrationUtil
+ case TenantMigrationTest.State.kBlocking:
+ assert.soon(() => tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
- .state == TenantMigrationUtil.accessState.kBlockWritesAndReads);
+ .state == TenantMigrationTest.AccessState.kBlockWritesAndReads);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
break;
- case "committed":
- assert.soon(() => TenantMigrationUtil
+ case TenantMigrationTest.State.kCommitted:
+ assert.soon(() => tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
- .state == TenantMigrationUtil.accessState.kReject);
+ .state == TenantMigrationTest.AccessState.kReject);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
.commitOrAbortOpTime,
donorDoc.commitOrAbortOpTime) == 0);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
break;
- case "aborted":
- assert.soon(() => TenantMigrationUtil
+ case TenantMigrationTest.State.kAborted:
+ assert.soon(() => tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
- .state == TenantMigrationUtil.accessState.kAborted);
+ .state == TenantMigrationTest.AccessState.kAborted);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
.commitOrAbortOpTime,
donorDoc.commitOrAbortOpTime) == 0);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(initialSyncNode, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
@@ -135,7 +110,6 @@ if (fp) {
fp.off();
}
-migrationThread.join();
-donorRst.stopSet();
-recipientRst.stopSet();
+assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
index f3c0d424169..e93795322c4 100644
--- a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
+++ b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js
@@ -10,58 +10,45 @@
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
const kMaxSleepTimeMS = 100;
const kTenantId = "testTenantId";
-let recipientStartupParams = {};
-recipientStartupParams["enableTenantMigrations"] = true;
-// TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
-recipientStartupParams["failpoint.returnResponseOkForRecipientSyncDataCmd"] =
- tojson({mode: 'alwaysOn'});
-
/**
* Runs the donorStartMigration command to start a migration, and interrupts the migration on the
* donor using the 'interruptFunc', and verifies the command response using the
* 'verifyCmdResponseFunc'.
*/
function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) {
- const donorRst = new ReplSetTest(
- {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
- const recipientRst = new ReplSetTest(
- {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: recipientStartupParams}});
-
- donorRst.startSet();
- donorRst.initiate();
-
- recipientRst.startSet();
- recipientRst.initiate();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
- const donorPrimary = donorRst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
- migrationThread.start();
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const runMigrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
+ runMigrationThread.start();
// Wait for to donorStartMigration command to start.
assert.soon(() => donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"})
.inprog.length > 0);
sleep(Math.random() * kMaxSleepTimeMS);
- interruptFunc(donorRst, migrationId, migrationOpts.tenantId);
- verifyCmdResponseFunc(migrationThread);
+ interruptFunc(donorRst, migrationId, kTenantId);
+ verifyCmdResponseFunc(runMigrationThread);
- donorRst.stopSet();
- recipientRst.stopSet();
+ tenantMigrationTest.stop();
}
/**
@@ -70,40 +57,26 @@ function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc)
* 'verifyCmdResponseFunc'.
*/
function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) {
- const donorRst = new ReplSetTest({
- nodes: 1,
- name: "donorRst",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- }
- }
- });
- const recipientRst = new ReplSetTest(
- {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: recipientStartupParams}});
-
- donorRst.startSet();
- donorRst.initiate();
-
- recipientRst.startSet();
- recipientRst.initiate();
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
- const donorPrimary = donorRst.getPrimary();
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- let forgetMigrationThread = new Thread(
- TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString);
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ const forgetMigrationThread = new Thread(
+ TenantMigrationUtil.forgetMigrationAsync, migrationOpts.migrationIdString, donorRstArgs);
forgetMigrationThread.start();
- // Wait for to donorForgetMigration command to start.
+ // Wait for the donorForgetMigration command to start.
assert.soon(() => {
const res = assert.commandWorked(
donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}));
@@ -114,8 +87,7 @@ function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc)
interruptFunc(donorRst, migrationId, migrationOpts.tenantId);
verifyCmdResponseFunc(forgetMigrationThread);
- donorRst.stopSet();
- recipientRst.stopSet();
+ tenantMigrationTest.stop();
}
/**
@@ -130,8 +102,8 @@ function assertCmdSucceededOrInterruptedDueToStepDown(cmdThread) {
* Asserts the command either succeeded or failed with a NotPrimary or shutdown or network error.
*/
function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) {
+ const res = cmdThread.returnData();
try {
- const res = cmdThread.returnData();
assert(res.ok || ErrorCodes.isNotPrimaryError(res.code) ||
ErrorCodes.isShutdownError(res.code));
} catch (e) {
diff --git a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
index ef9629e5713..17537f760d9 100644
--- a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
+++ b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
@@ -10,10 +10,10 @@
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
const kMaxSleepTimeMS = 100;
-const kConfigDonorsNS = "config.tenantMigrationDonors";
const kTenantId = "testTenantId";
// Set the delay before a donor state doc is garbage collected to be short to speed up the test.
@@ -27,11 +27,14 @@ const kTTLMonitorSleepSecs = 1;
* primary stepped down or shut down after inserting the doc), asserts that the migration
* eventually commits.
*/
-function assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, tenantId) {
- const donorPrimary = donorRst.getPrimary();
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+function assertMigrationCommitsIfDurableStateExists(tenantMigrationTest, migrationId, tenantId) {
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
if (configDonorsColl.count({_id: migrationId}) > 0) {
- TenantMigrationUtil.waitForMigrationToCommit(donorRst.nodes, migrationId, tenantId);
+ tenantMigrationTest.waitForNodesToReachState(
+ donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted);
}
}
@@ -42,47 +45,26 @@ function assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, tenan
function testDonorStartMigrationInterrupt(interruptFunc) {
const donorRst = new ReplSetTest(
{nodes: 3, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
- const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipientRst",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint
- // 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
- });
donorRst.startSet();
donorRst.initiate();
- recipientRst.startSet();
- recipientRst.initiate();
-
- const donorPrimary = donorRst.getPrimary();
-
- const donorRstArgs = {
- name: donorRst.name,
- nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`),
- nodeOptions: donorRst.nodeOptions,
- keyFile: donorRst.keyFile,
- host: donorRst.host,
- waitForKeys: false,
- };
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
- let migrationThread = new Thread(
- TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts);
- migrationThread.start();
+ const runMigrationThread = new Thread(TenantMigrationUtil.runMigrationAsync,
+ migrationOpts,
+ donorRstArgs,
+ true /* retryOnRetryableErrors */);
+ runMigrationThread.start();
// Wait for to donorStartMigration command to start.
assert.soon(() => donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"})
@@ -91,11 +73,12 @@ function testDonorStartMigrationInterrupt(interruptFunc) {
sleep(Math.random() * kMaxSleepTimeMS);
interruptFunc(donorRst);
- assert.commandWorked(migrationThread.returnData());
- assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, migrationOpts.tenantId);
+ assert.commandWorked(runMigrationThread.returnData());
+ assertMigrationCommitsIfDurableStateExists(
+ tenantMigrationTest, migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.stop();
donorRst.stopSet();
- recipientRst.stopSet();
}
/**
@@ -136,30 +119,23 @@ function testDonorForgetMigrationInterrupt(interruptFunc) {
recipientRst.startSet();
recipientRst.initiate();
- let donorPrimary = donorRst.getPrimary();
-
- const donorRstArgs = {
- name: donorRst.name,
- nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`),
- nodeOptions: donorRst.nodeOptions,
- keyFile: donorRst.keyFile,
- host: donorRst.host,
- waitForKeys: false,
- };
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst});
+ let donorPrimary = tenantMigrationTest.getDonorPrimary();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: recipientRst.getURL(),
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- let forgetMigrationThread =
- new Thread(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors,
- donorRstArgs,
- migrationOpts.migrationIdString);
+ assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ const forgetMigrationThread = new Thread(TenantMigrationUtil.forgetMigrationAsync,
+ migrationOpts.migrationIdString,
+ donorRstArgs,
+ true /* retryOnRetryableErrors */);
forgetMigrationThread.start();
// Wait for to donorForgetMigration command to start.
@@ -174,13 +150,14 @@ function testDonorForgetMigrationInterrupt(interruptFunc) {
donorPrimary = donorRst.getPrimary();
assert.commandWorkedOrFailedWithCode(
- TenantMigrationUtil.forgetMigration(donorPrimary.host, extractUUIDFromObject(migrationId)),
+ tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString),
ErrorCodes.NoSuchTenantMigration);
assert.commandWorked(forgetMigrationThread.returnData());
- TenantMigrationUtil.waitForMigrationGarbageCollection(
+ tenantMigrationTest.waitForMigrationGarbageCollection(
donorRst.nodes, migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.stop();
donorRst.stopSet();
recipientRst.stopSet();
}
diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js
index 7dfa60ad132..4bfe59595ae 100644
--- a/jstests/replsets/tenant_migration_donor_retry.js
+++ b/jstests/replsets/tenant_migration_donor_retry.js
@@ -11,14 +11,15 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-const kTenantId = "testTenantId";
+const kTenantIdPrefix = "testTenantId";
let testNum = 0;
-const kConfigDonorsNS = "config.tenantMigrationDonors";
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
function makeTenantId() {
- return kTenantId + testNum++;
+ return kTenantIdPrefix + testNum++;
}
/**
@@ -29,17 +30,14 @@ function makeTenantId() {
* TODO: This function should be changed to testDonorRetryRecipientSyncDataCmdOnError once there is
* a way to differentiate between local and remote stepdown/shutdown error.
*/
-function testMigrationAbortsOnRecipientSyncDataCmdError(
- donorRst, recipientRst, errorCode, failMode) {
- const donorPrimary = donorRst.getPrimary();
- const recipientPrimary = recipientRst.getPrimary();
+function testMigrationAbortsOnRecipientSyncDataCmdError(errorCode, failMode) {
+ const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+ const tenantId = makeTenantId();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
- tenantId: kTenantId + makeTenantId(),
- readPreference: {mode: "primary"},
+ tenantId,
};
let fp = configureFailPoint(recipientPrimary,
@@ -51,9 +49,7 @@ function testMigrationAbortsOnRecipientSyncDataCmdError(
},
failMode);
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// Verify that the command failed.
const times = failMode.times ? failMode.times : 1;
@@ -62,10 +58,10 @@ function testMigrationAbortsOnRecipientSyncDataCmdError(
}
fp.off();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "aborted");
- assert.eq(res.abortReason.code, errorCode);
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
+ assert.eq(stateRes.abortReason.code, errorCode);
return migrationId;
}
@@ -78,19 +74,15 @@ function testMigrationAbortsOnRecipientSyncDataCmdError(
* TODO: This function should be changed to testDonorRetryRecipientForgetMigrationCmdOnError once
* there is a way to differentiate between local and remote stepdown/shutdown error.
*/
-function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipientRst, errorCode) {
- const donorPrimary = donorRst.getPrimary();
- const recipientPrimary = recipientRst.getPrimary();
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
-
+function testMigrationAbortsOnRecipientForgetMigrationCmdError(errorCode) {
+ const tenantId = makeTenantId();
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
- tenantId: makeTenantId(),
- readPreference: {mode: "primary"},
+ tenantId,
};
+ const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
let fp = configureFailPoint(recipientPrimary,
"failCommand",
{
@@ -100,56 +92,32 @@ function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipie
},
{times: 1});
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- let forgetMigrationThread = new Thread(
- TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString);
- forgetMigrationThread.start();
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
// Verify that the initial recipientForgetMigration command failed.
+ assert.commandFailedWithCode(
+ tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString), errorCode);
fp.wait();
-
- forgetMigrationThread.join();
- assert.commandFailedWithCode(forgetMigrationThread.returnData(), errorCode);
fp.off();
- const donorStateDoc = configDonorsColl.findOne({_id: migrationId});
- assert.eq("committed", donorStateDoc.state);
- assert(!donorStateDoc.expireAt);
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
+ assert(!stateRes.expireAt);
}
-const donorRst = new ReplSetTest(
- {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipientRst",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
-
-donorRst.startSet();
-donorRst.initiate();
-
-recipientRst.startSet();
-recipientRst.initiate();
-
-const donorPrimary = donorRst.getPrimary();
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
(() => {
jsTest.log(
"Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" +
" on recipient stepdown errors");
- const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
- donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {times: 1});
+ const migrationId =
+ testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.NotWritablePrimary, {times: 1});
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
- assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+ assert.eq(TenantMigrationTest.State.kAborted,
+ configDonorsColl.findOne({_id: migrationId}).state);
})();
(() => {
@@ -157,12 +125,13 @@ const donorPrimary = donorRst.getPrimary();
"Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" +
" on recipient shutdown errors");
- const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
- donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {times: 1});
+ const migrationId =
+ testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.ShutdownInProgress, {times: 1});
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
- assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+ assert.eq(TenantMigrationTest.State.kAborted,
+ configDonorsColl.findOne({_id: migrationId}).state);
})();
(() => {
@@ -170,12 +139,13 @@ const donorPrimary = donorRst.getPrimary();
"Test that the donor does not retry recipientSyncData (with returnAfterReachingDonorTimestamp) " +
"on stepdown errors");
- const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
- donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {skip: 1});
+ const migrationId =
+ testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.NotWritablePrimary, {skip: 1});
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
- assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+ assert.eq(TenantMigrationTest.State.kAborted,
+ configDonorsColl.findOne({_id: migrationId}).state);
})();
(() => {
@@ -183,25 +153,23 @@ const donorPrimary = donorRst.getPrimary();
"Test that the donor does not retry recipientSyncData (with returnAfterReachingDonorTimestamp) " +
"on recipient shutdown errors");
- const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError(
- donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {skip: 1});
+ const migrationId =
+ testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.ShutdownInProgress, {skip: 1});
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp);
- assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state);
+ assert.eq(TenantMigrationTest.State.kAborted,
+ configDonorsColl.findOne({_id: migrationId}).state);
})();
(() => {
jsTest.log("Test that the donor does not retry recipientForgetMigration on stepdown errors");
- testMigrationAbortsOnRecipientForgetMigrationCmdError(
- donorRst, recipientRst, ErrorCodes.NotWritablePrimary);
+ testMigrationAbortsOnRecipientForgetMigrationCmdError(ErrorCodes.NotWritablePrimary);
})();
(() => {
jsTest.log("Test that the donor does not retry recipientForgetMigration on shutdown errors");
-
- testMigrationAbortsOnRecipientForgetMigrationCmdError(
- donorRst, recipientRst, ErrorCodes.ShutdownInProgress);
+ testMigrationAbortsOnRecipientForgetMigrationCmdError(ErrorCodes.ShutdownInProgress);
})();
// Each donor state doc is updated three times throughout the lifetime of a tenant migration:
@@ -214,20 +182,25 @@ const kWriteErrorTimeMS = 50;
(() => {
jsTest.log("Test that the donor retries state doc insert on retriable errors");
+ const tenantId = makeTenantId();
+
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
- tenantId: makeTenantId(),
- readPreference: {mode: "primary"},
+ tenantId,
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
let fp = configureFailPoint(donorPrimary, "failCollectionInserts", {
- collectionNS: kConfigDonorsNS,
+ collectionNS: TenantMigrationTest.kConfigDonorsNS,
});
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst());
+
+ // Start up a new thread to run this migration, since the 'failCollectionInserts' failpoint will
+ // cause the initial 'donorStartMigration' command to loop forever without returning.
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
migrationThread.start();
// Make the insert keep failing for some time.
@@ -238,35 +211,41 @@ const kWriteErrorTimeMS = 50;
migrationThread.join();
assert.commandWorked(migrationThread.returnData());
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
- assert.eq("committed", configDonorsColl.findOne({_id: migrationId}).state);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
+ const donorStateDoc = configDonorsColl.findOne({_id: migrationId});
+ assert.eq(TenantMigrationTest.State.kCommitted, donorStateDoc.state);
})();
(() => {
jsTest.log("Test that the donor retries state doc update on retriable errors");
+ const tenantId = kTenantIdPrefix + "RetryOnStateDocUpdateError";
+
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
- tenantId: kTenantId + "RetryOnStateDocUpdateError",
- readPreference: {mode: "primary"},
+ tenantId,
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
};
- // Use a random number of skips to fail a random update to config.tenantMigrationDonors.
- let fp = configureFailPoint(donorPrimary,
- "failCollectionUpdates",
- {
- collectionNS: kConfigDonorsNS,
- },
- {skip: Math.floor(Math.random() * kTotalStateDocUpdates)});
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst());
- let migrationThread = new Thread((donorPrimaryHost, migrationOpts) => {
+ // Use a random number of skips to fail a random update to config.tenantMigrationDonors.
+ const fp = configureFailPoint(donorPrimary,
+ "failCollectionUpdates",
+ {
+ collectionNS: TenantMigrationTest.kConfigDonorsNS,
+ },
+ {skip: Math.floor(Math.random() * kTotalStateDocUpdates)});
+
+ // Start up a new thread to run this migration, since we want to continuously send
+ // 'donorStartMigration' commands while the 'failCollectionUpdates' failpoint is on.
+ const migrationThread = new Thread((migrationOpts, donorRstArgs) => {
load("jstests/replsets/libs/tenant_migration_util.js");
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimaryHost, migrationOpts));
- assert.commandWorked(
- TenantMigrationUtil.forgetMigration(donorPrimaryHost, migrationOpts.migrationIdString));
- }, donorPrimary.host, migrationOpts);
+ assert.commandWorked(TenantMigrationUtil.runMigrationAsync(migrationOpts, donorRstArgs));
+ assert.commandWorked(TenantMigrationUtil.forgetMigrationAsync(
+ migrationOpts.migrationIdString, donorRstArgs));
+ }, migrationOpts, donorRstArgs);
migrationThread.start();
// Make the update keep failing for some time.
@@ -275,12 +254,11 @@ const kWriteErrorTimeMS = 50;
fp.off();
migrationThread.join();
- const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+ const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
const donorStateDoc = configDonorsColl.findOne({_id: migrationId});
- assert.eq("committed", donorStateDoc.state);
+ assert.eq(donorStateDoc.state, TenantMigrationTest.State.kCommitted);
assert(donorStateDoc.expireAt);
})();
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_donor_rollback_recovery.js b/jstests/replsets/tenant_migration_donor_rollback_recovery.js
index 77036f11e3d..5db336581d3 100644
--- a/jstests/replsets/tenant_migration_donor_rollback_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_rollback_recovery.js
@@ -10,9 +10,9 @@ load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/replsets/libs/rollback_test.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-const kConfigDonorsNS = "config.tenantMigrationDonors";
const kTenantId = "testTenantId";
const kMaxSleepTimeMS = 250;
@@ -70,24 +70,20 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) {
config.members[2].priority = 0;
donorRst.initiateWithHighElectionTimeout(config);
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), recipientRst, donorRst});
+
const donorRollbackTest = new RollbackTest("donorRst", donorRst);
- const donorRstArgs = {
- name: donorRst.name,
- nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`),
- nodeOptions: donorRst.nodeOptions,
- keyFile: donorRst.keyFile,
- host: donorRst.host,
- waitForKeys: false,
- };
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
let donorPrimary = donorRollbackTest.getPrimary();
- setUpFunc(donorRstArgs, donorPrimary);
+ setUpFunc(tenantMigrationTest, donorRstArgs);
donorRollbackTest.awaitLastOpCommitted();
// Writes during this state will be rolled back.
donorRollbackTest.transitionToRollbackOperations();
- rollbackOpsFunc(donorRstArgs, donorPrimary);
+ rollbackOpsFunc(tenantMigrationTest, donorRstArgs);
// Transition to replication steady state.
donorRollbackTest.transitionToSyncSourceOperationsBeforeRollback();
@@ -99,7 +95,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) {
// secondary that the primary replicates data onto.
donorPrimary = donorRollbackTest.getPrimary();
let donorSecondary = donorRollbackTest.getSecondary();
- steadyStateFunc(donorPrimary, donorSecondary);
+ steadyStateFunc(tenantMigrationTest, donorPrimary, donorSecondary);
donorRollbackTest.stop();
}
@@ -114,23 +110,31 @@ function testRollbackInitialState() {
const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-initial");
let migrationThread;
- let setUpFunc = (donorRstArgs, donorPrimary) => {};
+ let setUpFunc = (tenantMigrationTest, donorRstArgs) => {};
- let rollbackOpsFunc = (donorRstArgs, donorPrimary) => {
- // Start the migration and wait for the primary to insert the state doc.
- migrationThread = new Thread(
- TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts);
+ let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => {
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+
+ // Start the migration asynchronously and wait for the primary to insert the state doc.
+ migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync,
+ migrationOpts,
+ donorRstArgs,
+ true /* retryOnRetryableErrors */);
migrationThread.start();
assert.soon(() => {
- return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({_id: migrationId});
+ return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({
+ _id: migrationId
+ });
});
};
- let steadyStateFunc = (donorPrimary, donorSecondary) => {
+ let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => {
// Verify that the migration restarted successfully on the new primary despite rollback.
assert.commandWorked(migrationThread.returnData());
- TenantMigrationUtil.assertMigrationCommitted(
- [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.assertNodesInExpectedState([donorPrimary, donorSecondary],
+ migrationId,
+ migrationOpts.tenantId,
+ TenantMigrationTest.State.kCommitted);
};
testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc);
@@ -151,32 +155,38 @@ function testRollBackStateTransition(pauseFailPoint, setUpFailPoints, nextState)
const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-" + nextState);
let migrationThread, pauseFp;
- let setUpFunc = (donorRstArgs, donorPrimary) => {
+ let setUpFunc = (tenantMigrationTest, donorRstArgs) => {
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
setUpFailPoints.forEach(failPoint => configureFailPoint(donorPrimary, failPoint));
pauseFp = configureFailPoint(donorPrimary, pauseFailPoint);
- migrationThread = new Thread(
- TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts);
+ migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync,
+ migrationOpts,
+ donorRstArgs,
+ true /* retryOnRetryableErrors */);
migrationThread.start();
pauseFp.wait();
};
- let rollbackOpsFunc = (donorRstArgs, donorPrimary) => {
+ let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => {
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
// Resume the migration and wait for the primary to do the write for the state transition.
pauseFp.off();
assert.soon(() => {
- return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({
+ return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({
_id: migrationId,
state: nextState
});
});
};
- let steadyStateFunc = (donorPrimary, donorSecondary) => {
+ let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => {
// Verify that the migration resumed successfully on the new primary despite the rollback.
assert.commandWorked(migrationThread.returnData());
- TenantMigrationUtil.waitForMigrationToCommit(
- [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId);
+ tenantMigrationTest.waitForNodesToReachState([donorPrimary, donorSecondary],
+ migrationId,
+ migrationOpts.tenantId,
+ TenantMigrationTest.State.kCommitted);
};
testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc);
@@ -193,32 +203,33 @@ function testRollBackMarkingStateGarbageCollectable() {
const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-markGarbageCollectable");
let forgetMigrationThread;
- let setUpFunc = (donorRstArgs, donorPrimary) => {
- const res = assert.commandWorked(
- TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- assert.eq("committed", res.state);
+ let setUpFunc = (tenantMigrationTest, donorRstArgs) => {
+ const res = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(TenantMigrationTest.State.kCommitted, res.state);
};
- let rollbackOpsFunc = (donorRstArgs, donorPrimary) => {
+ let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => {
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+
// Run donorForgetMigration and wait for the primary to do the write to mark the state doc
// as garbage collectable.
- forgetMigrationThread =
- new Thread(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors,
- donorRstArgs,
- migrationOpts.migrationIdString);
+ forgetMigrationThread = new Thread(TenantMigrationUtil.forgetMigrationAsync,
+ migrationOpts.migrationIdString,
+ donorRstArgs,
+ true /* retryOnRetryableErrors */);
forgetMigrationThread.start();
assert.soon(() => {
- return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({
+ return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({
_id: migrationId,
expireAt: {$exists: 1}
});
});
};
- let steadyStateFunc = (donorPrimary, donorSecondary) => {
+ let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => {
// Verify that the migration state got garbage collected successfully despite the rollback.
assert.commandWorked(forgetMigrationThread.returnData());
- TenantMigrationUtil.waitForMigrationGarbageCollection(
+ tenantMigrationTest.waitForMigrationGarbageCollection(
[donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId);
};
@@ -235,13 +246,13 @@ function testRollBackRandom() {
const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-random");
let migrationThread;
- let setUpFunc = (donorRstArgs, donorPrimary) => {
+ let setUpFunc = (tenantMigrationTest, donorRstArgs) => {
migrationThread = new Thread((donorRstArgs, migrationOpts) => {
load("jstests/replsets/libs/tenant_migration_util.js");
- assert.commandWorked(TenantMigrationUtil.startMigrationRetryOnRetryableErrors(
- donorRstArgs, migrationOpts));
- assert.commandWorked(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors(
- donorRstArgs, migrationOpts.migrationIdString));
+ assert.commandWorked(TenantMigrationUtil.runMigrationAsync(
+ migrationOpts, donorRstArgs, true /* retryOnRetryableErrors */));
+ assert.commandWorked(TenantMigrationUtil.forgetMigrationAsync(
+ migrationOpts.migrationIdString, donorRstArgs, true /* retryOnRetryableErrors */));
}, donorRstArgs, migrationOpts);
// Start the migration and wait for a random amount of time before transitioning to the
@@ -250,18 +261,22 @@ function testRollBackRandom() {
sleep(Math.random() * kMaxSleepTimeMS);
};
- let rollbackOpsFunc = (donorRstArgs, donorPrimary) => {
+ let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => {
// Let the migration run in the rollback operations state for a random amount of time.
sleep(Math.random() * kMaxSleepTimeMS);
};
- let steadyStateFunc = (donorPrimary, donorSecondary) => {
+ let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => {
// Verify that the migration completed and was garbage collected successfully despite the
// rollback.
migrationThread.join();
- if (donorPrimary.getCollection(kConfigDonorsNS).count({_id: migrationId}) > 0) {
- TenantMigrationUtil.waitForMigrationToCommit(
- [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId);
+ if (donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({
+ _id: migrationId
+ }) > 0) {
+ tenantMigrationTest.waitForNodesToReachState([donorPrimary, donorSecondary],
+ migrationId,
+ migrationOpts.tenantId,
+ TenantMigrationTest.State.kCommitted);
}
};
diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js
index d43e7a163de..e197d1598f6 100644
--- a/jstests/replsets/tenant_migration_donor_startup_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js
@@ -12,8 +12,8 @@
"use strict";
load("jstests/libs/fail_point_util.js");
-load("jstests/libs/parallelTester.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
const donorRst = new ReplSetTest({
nodes: 1,
@@ -25,53 +25,17 @@ const donorRst = new ReplSetTest({
}
}
});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: 'recipient',
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
donorRst.startSet();
donorRst.initiate();
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
const kMaxSleepTimeMS = 1000;
const kTenantId = 'testTenantId';
-const kConfigDonorsNS = "config.tenantMigrationDonors";
-
-let donorPrimary = donorRst.getPrimary();
-let kRecipientConnString = recipientRst.getURL();
-let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
-function startMigration(host, recipientConnString, tenantId) {
- const primary = new Mongo(host);
- try {
- primary.adminCommand({
- donorStartMigration: 1,
- migrationId: UUID(),
- recipientConnectionString: recipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"}
- });
- } catch (e) {
- if (isNetworkError(e)) {
- jsTestLog('Ignoring network error due to node being restarted: ' + tojson(e));
- return;
- }
- throw e;
- }
-}
-
-let migrationThread =
- new Thread(startMigration, donorPrimary.host, kRecipientConnString, kTenantId);
+let donorPrimary = tenantMigrationTest.getDonorPrimary();
+let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
// Force the migration to pause after entering a randomly selected state to simulate a failure.
Random.setRandomSeed();
@@ -85,7 +49,21 @@ if (index < kMigrationFpNames.length) {
configureFailPoint(donorPrimary, kMigrationFpNames[index]);
}
-migrationThread.start();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(UUID()),
+ tenantId: kTenantId,
+};
+
+try {
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+} catch (e) {
+ if (isNetworkError(e)) {
+ jsTestLog('Ignoring network error due to node being restarted: ' + tojson(e));
+ return;
+ }
+ throw e;
+}
+
sleep(Math.random() * kMaxSleepTimeMS);
donorRst.stopSet(null /* signal */, true /*forRestart */);
donorRst.startSet({
@@ -97,52 +75,52 @@ donorRst.startSet({
});
donorPrimary = donorRst.getPrimary();
-configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
let donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
if (donorDoc) {
let state = donorDoc.state;
switch (state) {
- case "data sync":
+ case TenantMigrationTest.State.kDataSync:
assert.soon(
- () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
- .state == TenantMigrationUtil.accessState.kAllow);
+ () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
+ .state == TenantMigrationTest.AccessState.kAllow);
break;
- case "blocking":
+ case TenantMigrationTest.State.kBlocking:
assert.soon(
- () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
- .state == TenantMigrationUtil.accessState.kBlockWritesAndReads);
+ () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
+ .state == TenantMigrationTest.AccessState.kBlockWritesAndReads);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
break;
- case "committed":
+ case TenantMigrationTest.State.kCommitted:
assert.soon(
- () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
- .state == TenantMigrationUtil.accessState.kReject);
+ () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
+ .state == TenantMigrationTest.AccessState.kReject);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
.commitOrAbortOpTime,
donorDoc.commitOrAbortOpTime) == 0);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
break;
- case "aborted":
+ case TenantMigrationTest.State.kAborted:
assert.soon(
- () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
- .state == TenantMigrationUtil.accessState.kAborted);
+ () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
+ .state == TenantMigrationTest.AccessState.kAborted);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
.commitOrAbortOpTime,
donorDoc.commitOrAbortOpTime) == 0);
assert.soon(
- () => bsonWoCompare(TenantMigrationUtil
+ () => bsonWoCompare(tenantMigrationTest
.getTenantMigrationAccessBlocker(donorPrimary, kTenantId)
.blockTimestamp,
donorDoc.blockTimestamp) == 0);
@@ -152,7 +130,6 @@ if (donorDoc) {
}
}
-migrationThread.join();
+tenantMigrationTest.stop();
donorRst.stopSet();
-recipientRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js
index 2f02cce130c..9c520ee8899 100644
--- a/jstests/replsets/tenant_migration_donor_state_machine.js
+++ b/jstests/replsets/tenant_migration_donor_state_machine.js
@@ -14,9 +14,8 @@
"use strict";
load("jstests/libs/fail_point_util.js");
-load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
let expectedNumRecipientSyncDataCmdSent = 0;
let expectedNumRecipientForgetMigrationCmdSent = 0;
@@ -45,8 +44,9 @@ function testDonorForgetMigrationAfterMigrationCompletes(
null == node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker);
});
- assert.soon(() =>
- 0 === donorPrimary.getCollection(kConfigDonorsNS).count({tenantId: tenantId}));
+ assert.soon(() => 0 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({
+ tenantId: tenantId
+ }));
assert.soon(() => 0 ===
donorPrimary.adminCommand({serverStatus: 1})
.repl.primaryOnlyServices.TenantMigrationDonorService);
@@ -72,58 +72,41 @@ const donorRst = new ReplSetTest({
}
}
});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipient",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
donorRst.startSet();
donorRst.initiate();
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
+const recipientRst = tenantMigrationTest.getRecipientRst();
-const donorPrimary = donorRst.getPrimary();
-const recipientPrimary = recipientRst.getPrimary();
-const kRecipientConnString = recipientRst.getURL();
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
const kTenantId = "testDb";
-const kConfigDonorsNS = "config.tenantMigrationDonors";
-let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
+let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
(() => {
jsTest.log("Test the case where the migration commits");
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts");
- migrationThread.start();
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// Wait for the migration to enter the blocking state.
blockingFp.wait();
let mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- assert.eq(mtabs[kTenantId].state, TenantMigrationUtil.accessState.kBlockWritesAndReads);
+ assert.eq(mtabs[kTenantId].state, TenantMigrationTest.AccessState.kBlockWritesAndReads);
assert(mtabs[kTenantId].blockTimestamp);
let donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne(
- {ns: kConfigDonorsNS, op: "u", "o.tenantId": kTenantId});
+ {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", "o.tenantId": kTenantId});
assert.eq(donorDoc.state, "blocking");
assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts);
@@ -134,19 +117,19 @@ let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
// Allow the migration to complete.
blockingFp.off();
- migrationThread.join();
- const res = assert.commandWorked(migrationThread.returnData());
- assert.eq(res.state, "committed");
+ const stateRes =
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
- let commitOplogEntry =
- donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
- assert.eq(donorDoc.state, "committed");
+ let commitOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne(
+ {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", o: donorDoc});
+ assert.eq(donorDoc.state, TenantMigrationTest.State.kCommitted);
assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts);
assert.soon(() => {
mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return mtabs[kTenantId].state === TenantMigrationUtil.accessState.kReject;
+ return mtabs[kTenantId].state === TenantMigrationTest.AccessState.kReject;
});
assert(mtabs[kTenantId].commitOrAbortOpTime);
@@ -164,28 +147,25 @@ let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS);
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts");
- const res =
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- assert.eq(res.state, "aborted");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
abortFp.off();
const donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
- const abortOplogEntry =
- donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc});
- assert.eq(donorDoc.state, "aborted");
+ const abortOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne(
+ {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", o: donorDoc});
+ assert.eq(donorDoc.state, TenantMigrationTest.State.kAborted);
assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts);
assert.eq(donorDoc.abortReason.code, ErrorCodes.InternalError);
let mtabs;
assert.soon(() => {
mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker;
- return mtabs[kTenantId].state === TenantMigrationUtil.accessState.kAborted;
+ return mtabs[kTenantId].state === TenantMigrationTest.AccessState.kAborted;
});
assert(mtabs[kTenantId].commitOrAbortOpTime);
@@ -207,9 +187,7 @@ configDonorsColl.dropIndex({expireAt: 1});
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
// Verify that donorForgetMigration fails since the migration hasn't started.
@@ -217,9 +195,8 @@ configDonorsColl.dropIndex({expireAt: 1});
donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}),
ErrorCodes.NoSuchTenantMigration);
- const res =
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
- assert.eq(res.state, "committed");
+ const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
assert.commandWorked(
donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}));
@@ -228,6 +205,6 @@ configDonorsColl.dropIndex({expireAt: 1});
donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}));
})();
+tenantMigrationTest.stop();
donorRst.stopSet();
-recipientRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js
index d081a618827..190c0d5b6d2 100644
--- a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js
+++ b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js
@@ -12,6 +12,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
// Set the delay before a donor state doc is garbage collected to be short to speed up the test.
@@ -33,18 +34,6 @@ const donorRst = new ReplSetTest({
}
}
});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: 'recipient',
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- },
- }
-});
-const kRecipientConnString = recipientRst.getURL();
function insertDocument(primaryHost, dbName, collName) {
const primary = new Mongo(primaryHost);
@@ -59,29 +48,30 @@ function insertDocument(primaryHost, dbName, collName) {
donorRst.startSet();
donorRst.initiate();
- recipientRst.startSet();
- recipientRst.initiate();
- let dbName = "migrationOutcome-committed_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
-
- let writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision");
- let writeThread = new Thread(insertDocument, primary.host, dbName, kCollName);
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
const migrationId = UUID();
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "migrationOutcome-committed";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision");
+ const writeThread = new Thread(insertDocument, primary.host, dbName, kCollName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
+
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
migrationThread.start();
blockFp.wait();
@@ -93,21 +83,20 @@ function insertDocument(primaryHost, dbName, collName) {
migrationThread.join();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "committed");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
- assert.commandWorked(
- TenantMigrationUtil.forgetMigration(primary.host, migrationOpts.migrationIdString));
- TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId);
+ assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+ tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId);
writeFp.off();
writeThread.join();
- let writeRes = writeThread.returnData();
+ const writeRes = writeThread.returnData();
assert.commandFailedWithCode(writeRes, ErrorCodes.TenantMigrationCommitted);
+ tenantMigrationTest.stop();
donorRst.stopSet();
- recipientRst.stopSet();
})();
(() => {
@@ -116,30 +105,31 @@ function insertDocument(primaryHost, dbName, collName) {
donorRst.startSet();
donorRst.initiate();
- recipientRst.startSet();
- recipientRst.initiate();
- let dbName = "migrationOutcome-aborted_" + kTenantDefinedDbName;
- const primary = donorRst.getPrimary();
- const primaryDB = primary.getDB(dbName);
-
- let writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision");
- let writeThread = new Thread(insertDocument, primary.host, dbName, kCollName);
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
const migrationId = UUID();
- assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- const tenantId = dbName.split('_')[0];
+ const tenantId = "migrationOutcome-aborted";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: kRecipientConnString,
- tenantId: tenantId,
- readPreference: {mode: "primary"},
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
+ tenantId,
};
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst);
+
+ const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName);
+ const primary = donorRst.getPrimary();
+ const primaryDB = primary.getDB(dbName);
+
+ const writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision");
+ const writeThread = new Thread(insertDocument, primary.host, dbName, kCollName);
+
+ assert.commandWorked(primaryDB.runCommand({create: kCollName}));
- let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
- let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
- let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts);
+ const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts");
+ const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts");
+ const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
migrationThread.start();
blockFp.wait();
@@ -151,21 +141,20 @@ function insertDocument(primaryHost, dbName, collName) {
migrationThread.join();
- let migrationRes = assert.commandWorked(migrationThread.returnData());
- assert.eq(migrationRes.state, "aborted");
+ const migrationRes = assert.commandWorked(migrationThread.returnData());
+ assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted);
abortFp.off();
- assert.commandWorked(
- TenantMigrationUtil.forgetMigration(primary.host, migrationOpts.migrationIdString));
- TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId);
+ assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString));
+ tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId);
writeFp.off();
writeThread.join();
- let writeRes = writeThread.returnData();
+ const writeRes = writeThread.returnData();
assert.commandFailedWithCode(writeRes, ErrorCodes.TenantMigrationAborted);
+ tenantMigrationTest.stop();
donorRst.stopSet();
- recipientRst.stopSet();
})();
})();
diff --git a/jstests/replsets/tenant_migration_large_txn.js b/jstests/replsets/tenant_migration_large_txn.js
index d9e0c04cdc7..a97fc8b9196 100644
--- a/jstests/replsets/tenant_migration_large_txn.js
+++ b/jstests/replsets/tenant_migration_large_txn.js
@@ -14,41 +14,16 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-const donorRst = new ReplSetTest({
- nodes: 1,
- name: "donor",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- }
- }
-});
-const recipientRst = new ReplSetTest({
- nodes: 1,
- name: "recipient",
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
-
-donorRst.startSet();
-donorRst.initiate();
-
-recipientRst.startSet();
-recipientRst.initiate();
+const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const kTenantId = "testTenantId";
-const kDbName = kTenantId + "_" +
- "testDb";
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
const kCollName = "testColl";
-const donorPrimary = donorRst.getPrimary();
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
/**
* Runs a large transaction (>16MB) on the given collection name that requires two applyOps oplog
@@ -77,32 +52,32 @@ function runTransaction(primaryHost, dbName, collName) {
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
+ recipientConnString: tenantMigrationTest.getRecipientConnString(),
tenantId: kTenantId,
- readPreference: {mode: "primary"},
};
+const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst());
// Start a migration, and pause it after the donor has majority-committed the initial state doc.
-let dataSyncFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync");
-let migrationThread =
- new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts);
+const dataSyncFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync");
+const migrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
migrationThread.start();
dataSyncFp.wait();
// Run a large transaction (>16MB) that will write two applyOps oplog entries. Pause
// commitTransaction after it has reserved oplog slots for the applyOps oplog entries and has
// written the first one.
-let logApplyOpsForTxnFp =
+const logApplyOpsForTxnFp =
configureFailPoint(donorPrimary, "hangAfterLoggingApplyOpsForTransaction", {}, {skip: 1});
-let txnThread = new Thread(runTransaction, donorPrimary.host, kDbName, kCollName);
+const txnThread = new Thread(runTransaction, donorPrimary.host, kDbName, kCollName);
txnThread.start();
logApplyOpsForTxnFp.wait();
// Allow the migration to move to the blocking state and commit.
dataSyncFp.off();
assert.soon(
- () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId).state ===
- TenantMigrationUtil.accessState.kBlockWritesAndReads);
+ () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId).state ===
+ TenantMigrationTest.AccessState.kBlockWritesAndReads);
logApplyOpsForTxnFp.off();
assert.commandWorked(migrationThread.returnData());
@@ -110,6 +85,5 @@ assert.commandWorked(migrationThread.returnData());
// blockingTimestamp .
txnThread.join();
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
})();
diff --git a/jstests/replsets/tenant_migration_no_failover.js b/jstests/replsets/tenant_migration_no_failover.js
index 404950e79b4..374f9140ab7 100644
--- a/jstests/replsets/tenant_migration_no_failover.js
+++ b/jstests/replsets/tenant_migration_no_failover.js
@@ -10,105 +10,36 @@
load("jstests/aggregation/extras/utils.js");
load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
-load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
-const numDocs = 20;
-const data = [];
-for (let i = 0; i < numDocs; ++i) {
- data.push({_id: i, x: i});
-}
-const insertDonorDB = (primary, dbName, collName) => {
- const db = primary.getDB(dbName);
- const coll = db.getCollection(collName);
-
- assert.commandWorked(coll.insertMany(data));
-};
-
-const verifyReceipientDB = (primary, dbName, collName, shouldMigrate) => {
- const db = primary.getDB(dbName);
- const coll = db.getCollection(collName);
-
- const findRes = coll.find();
- const numDocsFound = findRes.count();
-
- if (!shouldMigrate) {
- assert.eq(0,
- numDocsFound,
- `Find command on recipient collection ${collName} of DB ${
- dbName} should return 0 docs, instead has count of ${numDocsFound}`);
- return;
- }
-
- assert.eq(numDocs,
- numDocsFound,
- `Find command on recipient collection ${collName} of DB ${dbName} should return ${
- numDocs} docs, instead has count of ${numDocsFound}`);
-
- const docsReturned = findRes.sort({_id: 1}).toArray();
- assert(arrayEq(docsReturned, data),
- () => (`${tojson(docsReturned)} is not equal to ${tojson(data)}`));
-};
-
-const name = jsTestName();
-const donorRst = new ReplSetTest(
- {name: `${name}_donor`, nodes: 1, nodeOptions: {setParameter: {enableTenantMigrations: true}}});
-const recipientRst = new ReplSetTest({
- name: `${name}_recipient`,
- nodes: 1,
- nodeOptions: {
- setParameter: {
- enableTenantMigrations: true,
- // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'})
- }
- }
-});
-
-donorRst.startSet();
-donorRst.initiate();
-
-recipientRst.startSet();
-recipientRst.initiate();
-
-const tenantId = 'testTenantId';
+const tenantMigrationTest = new TenantMigrationTest(jsTestName());
+const tenantId = "testTenantId";
const dbNames = ["db0", "db1", "db2"];
-const tenantDBs = dbNames.map(dbName => TenantMigrationUtil.tenantDB(tenantId, dbName));
-const nonTenantDBs = dbNames.map(dbName => TenantMigrationUtil.nonTenantDB(tenantId, dbName));
+const tenantDBs = dbNames.map(dbName => tenantMigrationTest.tenantDB(tenantId, dbName));
+const nonTenantDBs = dbNames.map(dbName => tenantMigrationTest.nonTenantDB(tenantId, dbName));
const collNames = ["coll0", "coll1"];
-const donorPrimary = donorRst.getPrimary();
-const recipientPrimary = recipientRst.getPrimary();
-
for (const db of [...tenantDBs, ...nonTenantDBs]) {
for (const coll of collNames) {
- insertDonorDB(donorPrimary, db, coll);
+ tenantMigrationTest.insertDonorDB(db, coll);
}
}
const migrationId = UUID();
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationId),
- recipientConnString: recipientRst.getURL(),
tenantId,
- readPreference: {mode: "primary"}
};
-const res =
- assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts));
-assert.eq(res.state, "committed");
-
-for (const coll of collNames) {
- for (const db of tenantDBs) {
- // TODO (SERVER-51734): Change shouldMigrate to true.
- verifyReceipientDB(recipientPrimary, db, coll, false /* shouldMigrate */);
- }
+const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted);
- for (const db of nonTenantDBs) {
- verifyReceipientDB(recipientPrimary, db, coll, false /* shouldMigrate */);
+for (const db of [...tenantDBs, ...nonTenantDBs]) {
+ for (const coll of collNames) {
+ tenantMigrationTest.verifyReceipientDB(tenantId, db, coll);
}
}
-donorRst.stopSet();
-recipientRst.stopSet();
+tenantMigrationTest.stop();
})();