diff options
author | XueruiFa <xuerui.fa@mongodb.com> | 2020-10-19 18:17:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-17 14:02:49 +0000 |
commit | 519bc2b24ecc6c29f2483c8b82786e16f61c2cba (patch) | |
tree | 1689c9416eb3d5fc06b616f60c7635e581c989f8 /jstests | |
parent | 9b0e366a75a9cc25705969932b3374d21d4d13c9 (diff) | |
download | mongo-519bc2b24ecc6c29f2483c8b82786e16f61c2cba.tar.gz |
SERVER-51596: Create TenantMigrationTest test fixture for JS tests
Diffstat (limited to 'jstests')
20 files changed, 1251 insertions, 1314 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js new file mode 100644 index 00000000000..3875d2ab0d5 --- /dev/null +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -0,0 +1,416 @@ +/** + * Wrapper around ReplSetTest for testing tenant migration behavior. + */ + +"use strict"; + +load("jstests/aggregation/extras/utils.js"); +load("jstests/replsets/rslib.js"); +load('jstests/libs/fail_point_util.js'); + +/** + * This fixture allows the user to optionally pass in a custom ReplSetTest for the donor and + * recipient replica sets, to be used for the test. + * + * If the caller does not provide their own replica set, a two node replset will be initialized + * instead, with all nodes running the latest version. + * + * @param {string} [name] the name of the replica sets + * @param {Object} [donorRst] the ReplSetTest instance to adopt for the donor + * @param {Object} [recipientRst] the ReplSetTest instance to adopt for the recipient + */ +function TenantMigrationTest({name = "TenantMigrationTest", donorRst, recipientRst}) { + const donorPassedIn = (donorRst !== undefined); + const recipientPassedIn = (recipientRst !== undefined); + + donorRst = donorPassedIn ? donorRst : performSetUp(true /* isDonor */); + recipientRst = recipientPassedIn ? recipientRst : performSetUp(false /* isDonor */); + + donorRst.getPrimary(); + donorRst.awaitReplication(); + + recipientRst.getPrimary(); + recipientRst.awaitReplication(); + + /** + * Creates a ReplSetTest instance. The repl set will have 2 nodes. + */ + function performSetUp(isDonor) { + let setParameterOpts = {"enableTenantMigrations": true}; + if (TestData.logComponentVerbosity) { + setParameterOpts["logComponentVerbosity"] = + tojsononeline(TestData.logComponentVerbosity); + } + + // TODO: SERVER-51734: Remove setting this failpoint. + if (!isDonor) { + setParameterOpts["failpoint.returnResponseOkForRecipientSyncDataCmd"] = + tojson({mode: 'alwaysOn'}); + } + + let nodeOptions = {}; + nodeOptions["setParameter"] = setParameterOpts; + + const rstName = `${name}_${(isDonor ? "donor" : "recipient")}`; + const rst = new ReplSetTest({name: rstName, nodes: 2, nodeOptions}); + rst.startSet(); + rst.initiateWithHighElectionTimeout(); + + return rst; + } + + /** + * Runs a tenant migration with the given migration options and waits for the migration to be + * committed or aborted. + * + * Returns the result of the initial donorStartMigration if it was unsuccessful. Otherwise, + * returns the command response containing the migration state on the donor after the migration + * has completed. + */ + this.runMigration = function(migrationOpts, retryOnRetryableErrors = false) { + const res = this.startMigration(migrationOpts, retryOnRetryableErrors); + if (!res.ok) { + return res; + } + + return this.waitForMigrationToComplete(migrationOpts, retryOnRetryableErrors); + }; + + /** + * Starts a tenant migration by running the 'donorStartMigration' command once. + * + * Returns the result of the 'donorStartMigration' command. + */ + this.startMigration = function(migrationOpts, retryOnRetryableErrors = false) { + return this.runDonorStartMigration( + migrationOpts, false /* waitForMigrationToComplete */, retryOnRetryableErrors); + }; + + /** + * Waits for a migration to complete by continuously polling the donor primary with + * 'donorStartMigration' until the returned state is committed or aborted. Must be used with + * startMigration, after the migration has been started for the specified tenantId. + * + * Returns the result of the last 'donorStartMigration' command executed. + */ + this.waitForMigrationToComplete = function(migrationOpts, retryOnRetryableErrors = false) { + // Assert that the migration has already been started. + const tenantId = migrationOpts.tenantId; + assert(this.getDonorPrimary() + .getCollection(TenantMigrationTest.kConfigDonorsNS) + .findOne({tenantId})); + return this.runDonorStartMigration( + migrationOpts, true /* waitForMigrationToComplete */, retryOnRetryableErrors); + }; + + /** + * Executes the 'donorStartMigration' command on the donor primary. + * + * This will return on the first successful command if 'waitForMigrationToComplete' is + * set to false. Otherwise, it will continuously poll the donor primary until the + * migration has been committed or aborted. + * + * If 'retryOnRetryableErrors' is set, this function will retry if the command fails + * with a NotPrimary or network error. + */ + this.runDonorStartMigration = function({ + migrationIdString, + tenantId, + recipientConnectionString = recipientRst.getURL(), + readPreference = {mode: "primary"}, + }, + waitForMigrationToComplete, + retryOnRetryableErrors) { + const cmdObj = { + donorStartMigration: 1, + tenantId, + migrationId: UUID(migrationIdString), + recipientConnectionString, + readPreference, + }; + + let donorPrimary = this.getDonorPrimary(); + let stateRes; + + assert.soon(() => { + try { + stateRes = donorPrimary.adminCommand(cmdObj); + + if (!stateRes.ok) { + // If retry is enabled and the command failed with a NotPrimary error, continue + // looping. + if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(stateRes.code)) { + donorPrimary = donorRst.getPrimary(); + return false; + } + return true; + } + + // The command has been successfully executed. If we don't need to wait for the + // migration to complete, exit the loop. + if (!waitForMigrationToComplete) { + return true; + } + + return (stateRes.state === TenantMigrationTest.State.kCommitted || + stateRes.state === TenantMigrationTest.State.kAborted); + } catch (e) { + // If the thrown error is network related and we are allowing retryable errors, + // continue issuing commands. + if (retryOnRetryableErrors && isNetworkError(e)) { + return false; + } + throw e; + } + }); + return stateRes; + }; + + /** + * Runs the donorForgetMigration command with the given migrationId and returns the response. + * + * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a + * NotPrimary or network error. + */ + this.forgetMigration = function(migrationIdString, retryOnRetryableErrors = false) { + let donorPrimary = this.getDonorPrimary(); + + let res; + + assert.soon(() => { + try { + res = donorPrimary.adminCommand( + {donorForgetMigration: 1, migrationId: UUID(migrationIdString)}); + + if (!res.ok) { + // If retry is enabled and the command failed with a NotPrimary error, continue + // looping. + if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) { + donorPrimary = donorRst.getPrimary(); + return false; + } + } + + return true; + } catch (e) { + if (retryOnRetryableErrors && isNetworkError(e)) { + return false; + } + throw e; + } + }); + return res; + }; + + /** + * Asserts that durable and in-memory state for the migration 'migrationId' and 'tenantId' is + * eventually deleted from the given nodes. + */ + this.waitForMigrationGarbageCollection = function(nodes, migrationId, tenantId) { + nodes.forEach(node => { + const configDonorsColl = node.getCollection("config.tenantMigrationDonors"); + assert.soon(() => 0 === configDonorsColl.count({_id: migrationId})); + + assert.soon(() => 0 === + assert.commandWorked(node.adminCommand({serverStatus: 1})) + .repl.primaryOnlyServices.TenantMigrationDonorService); + + let mtabs; + assert.soon(() => { + mtabs = assert.commandWorked(node.adminCommand({serverStatus: 1})) + .tenantMigrationAccessBlocker; + return !mtabs || !mtabs[tenantId]; + }, tojson(mtabs)); + }); + }; + + /** + * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to the + * expected state on all the given nodes. + */ + this.waitForNodesToReachState = function(nodes, migrationId, tenantId, expectedState) { + nodes.forEach(node => { + assert.soon(() => + this.isNodeInExpectedState(node, migrationId, tenantId, expectedState)); + }); + }; + + /** + * Asserts that the migration 'migrationId' and 'tenantId' is in the expected state on all the + * given nodes. + */ + this.assertNodesInExpectedState = function(nodes, migrationId, tenantId, expectedState) { + nodes.forEach(node => { + assert(this.isNodeInExpectedState(node, migrationId, tenantId, expectedState)); + }); + }; + + /** + * Returns true if the durable and in-memory state for the migration 'migrationId' and + * 'tenantId' is in the expected state, and false otherwise. + */ + this.isNodeInExpectedState = function(node, migrationId, tenantId, expectedState) { + const configDonorsColl = + this.getDonorPrimary().getCollection("config.tenantMigrationDonors"); + if (configDonorsColl.findOne({_id: migrationId}).state !== expectedState) { + return false; + } + + const expectedAccessState = (expectedState === TenantMigrationTest.State.kCommitted) + ? TenantMigrationTest.AccessState.kReject + : TenantMigrationTest.AccessState.kAborted; + const mtabs = + assert.commandWorked(node.adminCommand({serverStatus: 1})).tenantMigrationAccessBlocker; + return (mtabs[tenantId].state === expectedAccessState); + }; + + function loadDummyData() { + const numDocs = 20; + const testData = []; + for (let i = 0; i < numDocs; ++i) { + testData.push({_id: i, x: i}); + } + return testData; + } + + /** + * Inserts documents into the specified collection on the donor primary. + */ + this.insertDonorDB = function(dbName, collName, data = loadDummyData()) { + jsTestLog(`Inserting data into collection ${collName} of DB ${dbName} on the donor`); + const db = this.getDonorPrimary().getDB(dbName); + const coll = db.getCollection(collName); + + assert.commandWorked(coll.insertMany(data)); + }; + + /** + * Verifies that the documents on the recipient primary are correct. + */ + this.verifyReceipientDB = function(tenantId, dbName, collName, data = loadDummyData()) { + // TODO (SERVER-51734): Uncomment this line. + // const shouldMigrate = this.isNamespaceForTenant(tenantId, dbName); + const shouldMigrate = false; + + jsTestLog(`Verifying that data in collection ${collName} of DB ${dbName} was ${ + (shouldMigrate ? "" : "not")} migrated to the recipient`); + + const db = this.getRecipientPrimary().getDB(dbName); + const coll = db.getCollection(collName); + + const findRes = coll.find(); + const numDocsFound = findRes.count(); + + if (!shouldMigrate) { + assert.eq(0, + numDocsFound, + `Find command on recipient collection ${collName} of DB ${ + dbName} should return 0 docs, instead has count of ${numDocsFound}`); + return; + } + + const numDocsExpected = data.length; + assert.eq(numDocsExpected, + numDocsFound, + `Find command on recipient collection ${collName} of DB ${dbName} should return ${ + numDocsExpected} docs, instead has count of ${numDocsFound}`); + + const docsReturned = findRes.sort({_id: 1}).toArray(); + assert(arrayEq(docsReturned, data), + () => (`${tojson(docsReturned)} is not equal to ${tojson(data)}`)); + }; + + /** + * Crafts a tenant database name. + */ + this.tenantDB = function(tenantId, dbName) { + return `${tenantId}_${dbName}`; + }; + + /** + * Crafts a database name that does not belong to the tenant. + */ + this.nonTenantDB = function(tenantId, dbName) { + return `non_${tenantId}_${dbName}`; + }; + + /** + * Determines if a database name belongs to the given tenant. + */ + this.isNamespaceForTenant = function(tenantId, dbName) { + return dbName.startsWith(`${tenantId}_`); + }; + + /** + * Returns the TenantMigrationAccessBlocker associated with given the tenantId on the + * node. + */ + this.getTenantMigrationAccessBlocker = function(node, tenantId) { + return assert.commandWorked(node.adminCommand({serverStatus: 1})) + .tenantMigrationAccessBlocker[tenantId]; + }; + + /** + * Returns the donor ReplSetTest. + */ + this.getDonorRst = function() { + return donorRst; + }; + + /** + * Returns the recipient ReplSetTest. + */ + this.getRecipientRst = function() { + return recipientRst; + }; + + /** + * Returns the donor's primary. + */ + this.getDonorPrimary = function() { + return this.getDonorRst().getPrimary(); + }; + + /** + * Returns the recipient's primary. + */ + this.getRecipientPrimary = function() { + return this.getRecipientRst().getPrimary(); + }; + + /** + * Returns the recipient's connection string. + */ + this.getRecipientConnString = function() { + return this.getRecipientRst().getURL(); + }; + + /** + * Shuts down the donor and recipient sets, only if they were not passed in as parameters. + * If they were passed in, the test that initialized them should be responsible for shutting + * them down. + */ + this.stop = function() { + if (!donorPassedIn) + donorRst.stopSet(); + if (!recipientPassedIn) + recipientRst.stopSet(); + }; +} + +TenantMigrationTest.State = { + kCommitted: "committed", + kAborted: "aborted", + kDataSync: "data sync", + kBlocking: "blocking", +}; + +TenantMigrationTest.AccessState = { + kAllow: "allow", + kBlockWrites: "blockWrites", + kBlockWritesAndReads: "blockWritesAndReads", + kReject: "reject", + kAborted: "aborted", +}; + +TenantMigrationTest.kConfigDonorsNS = "config.tenantMigrationDonors"; diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js index b983cbecb36..8b903a9aa14 100644 --- a/jstests/replsets/libs/tenant_migration_util.js +++ b/jstests/replsets/libs/tenant_migration_util.js @@ -2,230 +2,112 @@ * Utilities for testing tenant migrations. */ var TenantMigrationUtil = (function() { - // An object that mirrors the access states for the TenantMigrationAccessBlocker. - const accessState = { - kAllow: "allow", - kBlockWrites: "blockWrites", - kBlockWritesAndReads: "blockWritesAndReads", - kReject: "reject", - kAborted: "aborted" - }; + load("jstests/replsets/libs/tenant_migration_test.js"); /** - * Runs the donorStartMigration command with the given migration options every 'intervalMS' + * Runs the donorStartMigration command with the given migration options * until the migration commits or aborts, or until the command fails. Returns the last command * response. + * + * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a + * NotPrimary or network error. + * + * Only use when it is necessary to run the donorStartMigration command in its own thread. For + * all other use cases, please consider the runMigration() function in the TenantMigrationTest + * fixture. */ - function startMigration(donorPrimaryHost, migrationOpts, intervalMS = 100) { + function runMigrationAsync(migrationOpts, donorRstArgs, retryOnRetryableErrors = false) { const cmdObj = { donorStartMigration: 1, migrationId: UUID(migrationOpts.migrationIdString), recipientConnectionString: migrationOpts.recipientConnString, tenantId: migrationOpts.tenantId, - readPreference: migrationOpts.readPreference - }; - const donorPrimary = new Mongo(donorPrimaryHost); - - while (true) { - const res = donorPrimary.adminCommand(cmdObj); - if (!res.ok || res.state == "committed" || res.state == "aborted") { - return res; - } - sleep(intervalMS); - } - } - - /** - * Runs the donorForgetMigration command with the given migrationId and returns the response. - */ - function forgetMigration(donorPrimaryHost, migrationIdString) { - const donorPrimary = new Mongo(donorPrimaryHost); - return donorPrimary.adminCommand( - {donorForgetMigration: 1, migrationId: UUID(migrationIdString)}); - } - - /** - * Runs the donorStartMigration command with the given migration options every 'intervalMS' - * until the migration commits or aborts, or until the command fails with error other than - * NotPrimary or network errors. Returns the last command response. - */ - function startMigrationRetryOnRetryableErrors(donorRstArgs, migrationOpts, intervalMS = 100) { - const cmdObj = { - donorStartMigration: 1, - migrationId: UUID(migrationOpts.migrationIdString), - recipientConnectionString: migrationOpts.recipientConnString, - tenantId: migrationOpts.tenantId, - readPreference: migrationOpts.readPreference + readPreference: migrationOpts.readPreference || {mode: "primary"}, }; const donorRst = new ReplSetTest({rstArgs: donorRstArgs}); let donorPrimary = donorRst.getPrimary(); - while (true) { - let res; + let res; + assert.soon(() => { try { res = donorPrimary.adminCommand(cmdObj); - } catch (e) { - if (isNetworkError(e)) { - jsTest.log(`Ignoring network error ${tojson(e)} for command ${tojson(cmdObj)}`); - continue; + + if (!res.ok) { + // If retry is enabled and the command failed with a NotPrimary error, continue + // looping. + if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) { + donorPrimary = donorRst.getPrimary(); + return false; + } + return true; } - throw e; - } - if (!res.ok) { - if (!ErrorCodes.isNotPrimaryError(res.code)) { - return res; + return (res.state === "committed" || res.state === "aborted"); + } catch (e) { + // If the thrown error is network related and we are allowing retryable errors, + // continue issuing commands. + if (retryOnRetryableErrors && isNetworkError(e)) { + return false; } - donorPrimary = donorRst.getPrimary(); - } else if (res.state == "committed" || res.state == "aborted") { - return res; + throw e; } - sleep(intervalMS); - } + }); + return res; } /** - * Runs the donorForgetMigration command with the given migrationId until the command succeeds - * or fails with error other than NotPrimary or network errors. Returns the last command - * response. + * Runs the donorForgetMigration command with the given migrationId and returns the response. + * + * If 'retryOnRetryableErrors' is set, this function will retry if the command fails with a + * NotPrimary or network error. + * + * Only use when it is necessary to run the donorForgetMigration command in its own thread. For + * all other use cases, please consider the forgetMigration() function in the + * TenantMigrationTest fixture. */ - function forgetMigrationRetryOnRetryableErrors(donorRstArgs, migrationIdString) { - const cmdObj = {donorForgetMigration: 1, migrationId: UUID(migrationIdString)}; - + function forgetMigrationAsync(migrationIdString, donorRstArgs, retryOnRetryableErrors = false) { const donorRst = new ReplSetTest({rstArgs: donorRstArgs}); let donorPrimary = donorRst.getPrimary(); - while (true) { - let res; + let res; + + assert.soon(() => { try { - res = donorPrimary.adminCommand(cmdObj); + res = donorPrimary.adminCommand( + {donorForgetMigration: 1, migrationId: UUID(migrationIdString)}); + + if (!res.ok) { + // If retry is enabled and the command failed with a NotPrimary error, continue + // looping. + if (retryOnRetryableErrors && ErrorCodes.isNotPrimaryError(res.code)) { + donorPrimary = donorRst.getPrimary(); + return false; + } + } + + return true; } catch (e) { - if (isNetworkError(e)) { - jsTest.log(`Ignoring network error ${tojson(e)} for command ${tojson(cmdObj)}`); - continue; + if (retryOnRetryableErrors && isNetworkError(e)) { + return false; } throw e; } - - if (res.ok || !ErrorCodes.isNotPrimaryError(res.code)) { - return res; - } - donorPrimary = donorRst.getPrimary(); - } - } - - /** - * Returns true if the durable and in-memory state for the migration 'migrationId' and - * 'tenantId' is in state "committed", and false otherwise. - */ - function isMigrationCommitted(node, migrationId, tenantId) { - const configDonorsColl = node.getCollection("config.tenantMigrationDonors"); - if (configDonorsColl.findOne({_id: migrationId}).state != "committed") { - return false; - } - const mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return mtabs[tenantId].state === TenantMigrationUtil.accessState.kReject; - } - - /** - * Returns true if the durable and in-memory state for the migration 'migrationId' and - * 'tenantId' is in state "aborted", and false otherwise. - */ - function isMigrationAborted(node, migrationId, tenantId) { - const configDonorsColl = node.getCollection("config.tenantMigrationDonors"); - if (configDonorsColl.findOne({_id: migrationId}).state != "aborted") { - return false; - } - const mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return mtabs[tenantId].state === TenantMigrationUtil.accessState.kAborted; - } - - /** - * Asserts that the migration 'migrationId' and 'tenantId' is in state "committed" on all the - * given nodes. - */ - function assertMigrationCommitted(nodes, migrationId, tenantId) { - nodes.forEach(node => { - assert(isMigrationCommitted(node, migrationId, tenantId)); - }); - } - - /** - * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to state "committed" - * on all the given nodes. - */ - function waitForMigrationToCommit(nodes, migrationId, tenantId) { - nodes.forEach(node => { - assert.soon(() => isMigrationCommitted(node, migrationId, tenantId)); - }); - } - - /** - * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to state "aborted" - * on all the given nodes. - */ - function waitForMigrationToAbort(nodes, migrationId, tenantId) { - nodes.forEach(node => { - assert.soon(() => isMigrationAborted(node, migrationId, tenantId)); }); + return res; } - /** - * Asserts that durable and in-memory state for the migration 'migrationId' and 'tenantId' is - * eventually deleted from the given nodes. - */ - function waitForMigrationGarbageCollection(nodes, migrationId, tenantId) { - nodes.forEach(node => { - const configDonorsColl = node.getCollection("config.tenantMigrationDonors"); - assert.soon(() => 0 === configDonorsColl.count({_id: migrationId})); - - assert.soon(() => 0 === - node.adminCommand({serverStatus: 1}) - .repl.primaryOnlyServices.TenantMigrationDonorService); - - let mtabs; - assert.soon(() => { - mtabs = node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return !mtabs || !mtabs[tenantId]; - }, tojson(mtabs)); - }); - } - - /** - * Returns the TenantMigrationAccessBlocker associated with given the tenantId on the - * node. - */ - function getTenantMigrationAccessBlocker(node, tenantId) { - return node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker[tenantId]; - } - - /** - * Crafts a tenant database name, given the tenantId. - */ - function tenantDB(tenantId, dbName) { - return `${tenantId}_${dbName}`; - } - - /** - * Crafts a database name that does not belong to the tenant, given the tenantId. - */ - function nonTenantDB(tenantId, dbName) { - return `non_${tenantId}_${dbName}`; + function createRstArgs(donorRst) { + const donorRstArgs = { + name: donorRst.name, + nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`), + nodeOptions: donorRst.nodeOptions, + keyFile: donorRst.keyFile, + host: donorRst.host, + waitForKeys: false, + }; + return donorRstArgs; } - return { - accessState, - startMigration, - forgetMigration, - startMigrationRetryOnRetryableErrors, - forgetMigrationRetryOnRetryableErrors, - assertMigrationCommitted, - waitForMigrationToCommit, - waitForMigrationToAbort, - waitForMigrationGarbageCollection, - getTenantMigrationAccessBlocker, - tenantDB, - nonTenantDB, - }; + return {runMigrationAsync, forgetMigrationAsync, createRstArgs}; })(); diff --git a/jstests/replsets/tenant_migration_commit_transaction_retry.js b/jstests/replsets/tenant_migration_commit_transaction_retry.js index 49b07ec2f6c..a0eb97b217e 100644 --- a/jstests/replsets/tenant_migration_commit_transaction_retry.js +++ b/jstests/replsets/tenant_migration_commit_transaction_retry.js @@ -10,6 +10,7 @@ // Direct writes to config.transactions cannot be part of a session. TestData.disableImplicitSessions = true; +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); load("jstests/replsets/rslib.js"); load("jstests/libs/uuid_util.js"); @@ -48,9 +49,10 @@ donorRst.initiate(); recipientRst.startSet(); recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); + const kTenantId = "testTenantId"; -const kDbName = kTenantId + "_" + - "testDb"; +const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB"); const kCollName = "testColl"; const kNs = `${kDbName}.${kCollName}`; @@ -80,18 +82,15 @@ jsTest.log("Run a migration to completion"); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, }; -assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); +assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); const donorDoc = donorPrimary.getCollection("config.tenantMigrationDonors").findOne({tenantId: kTenantId}); -assert.commandWorked( - donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId})); -TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, kTenantId); +assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); +tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, kTenantId); { jsTest.log("Run another transaction after the migration"); diff --git a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js index 98e7feb16cd..a4d4f4146da 100644 --- a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js +++ b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js @@ -12,6 +12,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); const kMaxBatchSize = 2; @@ -123,36 +124,37 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkUnorderedInserts-committed_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkUnorderedInserts-committed"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - let writeFp = configureFailPoint( + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); + + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps); bulkWriteThread.start(); writeFp.wait(); - let migrationRes = - assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts)); - assert.eq(migrationRes.state, "committed"); + const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); writeFp.off(); bulkWriteThread.join(); - let bulkWriteRes = bulkWriteThread.returnData(); - let writeErrors = bulkWriteRes.res.writeErrors; + const bulkWriteRes = bulkWriteThread.returnData(); + const writeErrors = bulkWriteRes.res.writeErrors; assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted); assert.eq(writeErrors.length, @@ -166,10 +168,12 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { assert(!err.errmsg); } - assert.eq(err.errInfo.recipientConnectionString, migrationOpts.recipientConnString); - assert.eq(err.errInfo.tenantId, migrationOpts.tenantId); + assert.eq(err.errInfo.recipientConnectionString, + tenantMigrationTest.getRecipientConnString()); + assert.eq(err.errInfo.tenantId, tenantId); }); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); @@ -183,27 +187,31 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkUnorderedInserts-committed_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkUnorderedInserts-committed"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); - let writeFp = configureFailPoint( + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps); - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); bulkWriteThread.start(); writeFp.wait(); @@ -218,8 +226,8 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { bulkWriteThread.join(); migrationThread.join(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "committed"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); let bulkWriteRes = bulkWriteThread.returnData(); let writeErrors = bulkWriteRes.res.writeErrors; @@ -236,10 +244,12 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { assert.eq(err.errmsg, ""); } - assert.eq(err.errInfo.recipientConnectionString, migrationOpts.recipientConnString); - assert.eq(err.errInfo.tenantId, migrationOpts.tenantId); + assert.eq(err.errInfo.recipientConnectionString, + tenantMigrationTest.getRecipientConnString()); + assert.eq(err.errInfo.tenantId, tenantId); }); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); @@ -252,32 +262,36 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkUnorderedInserts-aborted_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkUnorderedInserts-aborted"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); - let writeFp = configureFailPoint( + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsUnordered, primary.host, dbName, kCollName, kNumWriteOps); - let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); + const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op // observer. Without this failpoint, the migration could have already aborted by the time the // write gets to the op observer. - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); bulkWriteThread.start(); writeFp.wait(); @@ -294,11 +308,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { abortFp.off(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "aborted"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted); - let bulkWriteRes = bulkWriteThread.returnData(); - let writeErrors = bulkWriteRes.res.writeErrors; + const bulkWriteRes = bulkWriteThread.returnData(); + const writeErrors = bulkWriteRes.res.writeErrors; assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted); assert.eq(writeErrors.length, @@ -313,6 +327,7 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { } }); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); @@ -325,36 +340,37 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkOrderedInserts-committed_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkOrderedInserts-committed"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - let writeFp = configureFailPoint( + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); + + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps); bulkWriteThread.start(); writeFp.wait(); - let migrationRes = - assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts)); - assert.eq(migrationRes.state, "committed"); + const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); writeFp.off(); bulkWriteThread.join(); - let bulkWriteRes = bulkWriteThread.returnData(); - let writeErrors = bulkWriteRes.res.writeErrors; + const bulkWriteRes = bulkWriteThread.returnData(); + const writeErrors = bulkWriteRes.res.writeErrors; assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted); assert.eq(writeErrors.length, 1); @@ -364,9 +380,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { // blocking writes. assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted); - assert.eq(writeErrors[0].errInfo.recipientConnectionString, migrationOpts.recipientConnString); - assert.eq(writeErrors[0].errInfo.tenantId, migrationOpts.tenantId); + assert.eq(writeErrors[0].errInfo.recipientConnectionString, + tenantMigrationTest.getRecipientConnString()); + assert.eq(writeErrors[0].errInfo.tenantId, tenantId); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); @@ -380,27 +398,31 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkOrderedInserts-committed_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkOrderedInserts-committed"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); - let writeFp = configureFailPoint( + assert.commandWorked(primaryDB.runCommand({create: kCollName})); + + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps); - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); bulkWriteThread.start(); writeFp.wait(); @@ -415,11 +437,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { bulkWriteThread.join(); migrationThread.join(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "committed"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); - let bulkWriteRes = bulkWriteThread.returnData(); - let writeErrors = bulkWriteRes.res.writeErrors; + const bulkWriteRes = bulkWriteThread.returnData(); + const writeErrors = bulkWriteRes.res.writeErrors; assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted); assert.eq(writeErrors.length, 1); @@ -429,9 +451,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { // blocking writes. assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted); - assert.eq(writeErrors[0].errInfo.recipientConnectionString, migrationOpts.recipientConnString); - assert.eq(writeErrors[0].errInfo.tenantId, migrationOpts.tenantId); + assert.eq(writeErrors[0].errInfo.recipientConnectionString, + tenantMigrationTest.getRecipientConnString()); + assert.eq(writeErrors[0].errInfo.tenantId, tenantId); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); @@ -444,32 +468,36 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { recipientRst.startSet(); recipientRst.initiate(); - let dbName = "bulkOrderedInserts-aborted_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "bulkOrderedInserts-aborted"; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); - let writeFp = configureFailPoint( + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); + + const writeFp = configureFailPoint( primaryDB, "hangDuringBatchInsert", {}, {skip: kNumWriteBatchesWithoutMigrationConflict}); - let bulkWriteThread = + const bulkWriteThread = new Thread(bulkWriteDocsOrdered, primary.host, dbName, kCollName, kNumWriteOps); - let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); + const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op // observer. Without this failpoint, the migration could have already aborted by the time the // write gets to the op observer. - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); bulkWriteThread.start(); writeFp.wait(); @@ -486,11 +514,11 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { abortFp.off(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "aborted"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted); - let bulkWriteRes = bulkWriteThread.returnData(); - let writeErrors = bulkWriteRes.res.writeErrors; + const bulkWriteRes = bulkWriteThread.returnData(); + const writeErrors = bulkWriteRes.res.writeErrors; assert.eq(primaryDB[kCollName].count(), bulkWriteRes.res.nInserted); assert.eq(writeErrors.length, 1); @@ -501,6 +529,7 @@ function retryFailedWrites(primaryDB, collName, writeErrors, ops) { assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_concurrent_migrations.js b/jstests/replsets/tenant_migration_concurrent_migrations.js index b749c6450c1..63796243762 100644 --- a/jstests/replsets/tenant_migration_concurrent_migrations.js +++ b/jstests/replsets/tenant_migration_concurrent_migrations.js @@ -15,7 +15,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); let startupParams = {}; startupParams["enableTenantMigrations"] = true; @@ -35,81 +35,64 @@ rst1.initiate(); rst2.startSet(); rst2.initiate(); -const rst0Primary = rst0.getPrimary(); -const rst1Primary = rst1.getPrimary(); - -const kConfigDonorsNS = "config.tenantMigrationDonors"; -const kTenantId = "testTenantId"; +const kTenantIdPrefix = "testTenantId"; // Test concurrent outgoing migrations to different recipients. (() => { - const tenantId = kTenantId + "ConcurrentOutgoingMigrationsToDifferentRecipient"; - const donorsColl = rst0Primary.getCollection(kConfigDonorsNS); + const tenantMigrationTest0 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst1}); + const tenantMigrationTest1 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst2}); + const tenantId0 = `${kTenantIdPrefix}_ConcurrentOutgoingMigrationsToDifferentRecipient0`; + const tenantId1 = `${kTenantIdPrefix}_ConcurrentOutgoingMigrationsToDifferentRecipient1`; const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), - tenantId: tenantId + "0", - readPreference: {mode: "primary"} + tenantId: tenantId0, }; const migrationOpts1 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst2.getURL(), - tenantId: tenantId + "1", - readPreference: {mode: "primary"} + tenantId: tenantId1, }; - let migrationThread0 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0); - let migrationThread1 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1); + assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts0)); + assert.commandWorked(tenantMigrationTest1.startMigration(migrationOpts1)); - migrationThread0.start(); - migrationThread1.start(); - migrationThread0.join(); - migrationThread1.join(); + const stateRes0 = + assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts0)); + const stateRes1 = + assert.commandWorked(tenantMigrationTest1.waitForMigrationToComplete(migrationOpts1)); // Verify that both migrations succeeded. - assert.commandWorked(migrationThread0.returnData()); - assert.commandWorked(migrationThread1.returnData()); - assert(donorsColl.findOne({tenantId: migrationOpts0.tenantId, state: "committed"})); - assert(donorsColl.findOne({tenantId: migrationOpts1.tenantId, state: "committed"})); + assert(stateRes0.state, TenantMigrationTest.State.kCommitted); + assert(stateRes1.state, TenantMigrationTest.State.kCommitted); })(); // Test concurrent incoming migrations from different donors. (() => { - const tenantId = kTenantId + "ConcurrentIncomingMigrations"; - const donorsColl0 = rst0Primary.getCollection(kConfigDonorsNS); - const donorsColl1 = rst1Primary.getCollection(kConfigDonorsNS); + const tenantMigrationTest0 = new TenantMigrationTest({donorRst: rst0, recipientRst: rst2}); + const tenantMigrationTest1 = new TenantMigrationTest({donorRst: rst1, recipientRst: rst2}); + const tenantId0 = `${kTenantIdPrefix}_ConcurrentIncomingMigrations0`; + const tenantId1 = `${kTenantIdPrefix}_ConcurrentIncomingMigrations1`; const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst2.getURL(), - tenantId: tenantId + "0", - readPreference: {mode: "primary"} + tenantId: tenantId0, }; const migrationOpts1 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst2.getURL(), - tenantId: tenantId + "1", - readPreference: {mode: "primary"} + tenantId: tenantId1, }; - let migrationThread0 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0); - let migrationThread1 = - new Thread(TenantMigrationUtil.startMigration, rst1Primary.host, migrationOpts1); + assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts0)); + assert.commandWorked(tenantMigrationTest1.startMigration(migrationOpts1)); - migrationThread0.start(); - migrationThread1.start(); - migrationThread0.join(); - migrationThread1.join(); + const stateRes0 = + assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts0)); + const stateRes1 = + assert.commandWorked(tenantMigrationTest1.waitForMigrationToComplete(migrationOpts1)); // Verify that both migrations succeeded. - assert.commandWorked(migrationThread0.returnData()); - assert.commandWorked(migrationThread1.returnData()); - assert(donorsColl0.findOne({tenantId: migrationOpts0.tenantId, state: "committed"})); - assert(donorsColl1.findOne({tenantId: migrationOpts1.tenantId, state: "committed"})); + assert(stateRes0.state, TenantMigrationTest.State.kCommitted); + assert(stateRes1.state, TenantMigrationTest.State.kCommitted); })(); // TODO (SERVER-50467): Ensure that tenant migration donor only removes a ReplicaSetMonitor for diff --git a/jstests/replsets/tenant_migration_concurrent_reads.js b/jstests/replsets/tenant_migration_concurrent_reads.js index e62ecbba0ef..6170d73891c 100644 --- a/jstests/replsets/tenant_migration_concurrent_reads.js +++ b/jstests/replsets/tenant_migration_concurrent_reads.js @@ -14,36 +14,14 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); - -const donorRst = new ReplSetTest({ - nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], - name: 'donor', - nodeOptions: {setParameter: {enableTenantMigrations: true}} -}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: 'recipient', - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); +load("jstests/replsets/libs/tenant_migration_test.js"); -donorRst.startSet(); -donorRst.initiate(); -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const kCollName = "testColl"; const kTenantDefinedDbName = "0"; -const kRecipientConnString = recipientRst.getURL(); const kMaxTimeMS = 5 * 1000; -const kConfigDonorsNS = "config.tenantMigrationDonors"; /** * To be used to resume a migration that is paused after entering the blocking state. Waits for the @@ -80,29 +58,29 @@ function runCommand(db, cmd, expectedError) { /** * Tests that the donor rejects causal reads after the migration commits. */ -function testReadIsRejectedIfSentAfterMigrationHasCommitted(rst, testCase, dbName, collName) { +function testReadIsRejectedIfSentAfterMigrationHasCommitted(testCase, dbName, collName) { const tenantId = dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const primary = rst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); - const res = - assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts)); - assert.eq(res.state, "committed"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all the secondaries. This is to ensure that the write to put the migration into // "committed" is majority-committed and that snapshot reads on the secondaries with unspecified // atClusterTime have read timestamp >= commitTimestamp. - rst.awaitLastOpCommitted(); + donorRst.awaitLastOpCommitted(); - const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId}); - const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + tenantId: tenantId + }); + const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary]; nodes.forEach(node => { const db = node.getDB(dbName); if (testCase.requiresReadTimestamp) { @@ -121,31 +99,31 @@ function testReadIsRejectedIfSentAfterMigrationHasCommitted(rst, testCase, dbNam /** * Tests that the donor does not reject reads after the migration aborts. */ -function testReadIsAcceptedIfSentAfterMigrationHasAborted(rst, testCase, dbName, collName) { +function testReadIsAcceptedIfSentAfterMigrationHasAborted(testCase, dbName, collName) { const tenantId = dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const primary = rst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); - let abortFp = configureFailPoint(primary, "abortTenantMigrationAfterBlockingStarts"); - const res = - assert.commandWorked(TenantMigrationUtil.startMigration(primary.host, migrationOpts)); - assert.eq(res.state, "aborted"); + let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); abortFp.off(); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all the secondaries. This is to ensure that the write to put the migration into // "aborted" is majority-committed and that snapshot reads on the secondaries with unspecified // atClusterTime have read timestamp >= abortTimestamp. - rst.awaitLastOpCommitted(); + donorRst.awaitLastOpCommitted(); - const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId}); - const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + tenantId: tenantId + }); + const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary]; nodes.forEach(node => { const db = node.getDB(dbName); if (testCase.requiresReadTimestamp) { @@ -161,54 +139,52 @@ function testReadIsAcceptedIfSentAfterMigrationHasAborted(rst, testCase, dbName, * Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >= * blockingTimestamp but does not block linearizable reads. */ -function testReadBlocksIfMigrationIsInBlocking(rst, testCase, dbName, collName) { +function testReadBlocksIfMigrationIsInBlocking(testCase, dbName, collName) { const tenantId = dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const primary = rst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); - let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); // Wait for the migration to enter the blocking state. - migrationThread.start(); blockingFp.wait(); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with // unspecified atClusterTime have read timestamp >= blockTimestamp. - rst.awaitLastOpCommitted(); + donorRst.awaitLastOpCommitted(); - const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId}); + const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + tenantId: tenantId + }); const command = testCase.requiresReadTimestamp ? testCase.command(collName, donorDoc.blockTimestamp) : testCase.command(collName); command.maxTimeMS = kMaxTimeMS; - const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary]; nodes.forEach(node => { const db = node.getDB(dbName); runCommand(db, command, testCase.isLinearizableRead ? null : ErrorCodes.MaxTimeMSExpired); }); blockingFp.off(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "committed"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); } /** * Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >= * blockingTimestamp, and unblocks the reads once the migration aborts. */ -function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits( - rst, testCase, dbName, collName) { +function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits(testCase, dbName, collName) { if (testCase.isLinearizableRead) { // Linearizable reads are not blocked. return; @@ -217,41 +193,40 @@ function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits( const tenantId = dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const primary = rst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); - let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts"); + let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); const targetBlockedReads = assert - .commandWorked(primary.adminCommand( + .commandWorked(donorPrimary.adminCommand( {configureFailPoint: "tenantMigrationBlockRead", mode: "alwaysOn"})) .count + 1; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); let resumeMigrationThread = - new Thread(resumeMigrationAfterBlockingRead, primary.host, targetBlockedReads); + new Thread(resumeMigrationAfterBlockingRead, donorPrimary.host, targetBlockedReads); // Run the commands after the migration enters the blocking state. resumeMigrationThread.start(); - migrationThread.start(); blockingFp.wait(); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with // unspecified atClusterTime have read timestamp >= blockTimestamp. - rst.awaitLastOpCommitted(); + donorRst.awaitLastOpCommitted(); - const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId}); + const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + tenantId: tenantId + }); const command = testCase.requiresReadTimestamp ? testCase.command(collName, donorDoc.blockTimestamp) : testCase.command(collName); - const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary]; // The migration should unpause and commit after the read is blocked. Verify that the read // is rejected. @@ -262,16 +237,16 @@ function testBlockedReadGetsUnblockedAndRejectedIfMigrationCommits( // Verify that the migration succeeded. resumeMigrationThread.join(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "committed"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); } /** * Tests that the donor blocks clusterTime reads in the blocking state with readTimestamp >= * blockingTimestamp, and rejects the reads once the migration commits. */ -function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase, dbName, collName) { +function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(testCase, dbName, collName) { if (testCase.isLinearizableRead) { // Linearizable reads are not blocked. return; @@ -280,42 +255,41 @@ function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase, const tenantId = dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const primary = rst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = donorRst.getPrimary(); - let blockingFp = configureFailPoint(primary, "pauseTenantMigrationAfterBlockingStarts"); - let abortFp = configureFailPoint(primary, "abortTenantMigrationAfterBlockingStarts"); + let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); + let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); const targetBlockedReads = assert - .commandWorked(primary.adminCommand( + .commandWorked(donorPrimary.adminCommand( {configureFailPoint: "tenantMigrationBlockRead", mode: "alwaysOn"})) .count + 1; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); let resumeMigrationThread = - new Thread(resumeMigrationAfterBlockingRead, primary.host, targetBlockedReads); + new Thread(resumeMigrationAfterBlockingRead, donorPrimary.host, targetBlockedReads); // Run the commands after the migration enters the blocking state. resumeMigrationThread.start(); - migrationThread.start(); blockingFp.wait(); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with // unspecified atClusterTime have read timestamp >= blockTimestamp. - rst.awaitLastOpCommitted(); + donorRst.awaitLastOpCommitted(); - const donorDoc = primary.getCollection(kConfigDonorsNS).findOne({tenantId: tenantId}); + const donorDoc = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + tenantId: tenantId + }); const command = testCase.requiresReadTimestamp ? testCase.command(collName, donorDoc.blockTimestamp) : testCase.command(collName); - const nodes = testCase.isSupportedOnSecondaries ? rst.nodes : [primary]; + const nodes = testCase.isSupportedOnSecondaries ? donorRst.nodes : [donorPrimary]; // The migration should unpause and abort after the read is blocked. Verify that the read // unblocks. @@ -326,10 +300,10 @@ function testBlockedReadGetsUnblockedAndSucceedsIfMigrationAborts(rst, testCase, // Verify that the migration failed due to the simulated error. resumeMigrationThread.join(); - migrationThread.join(); abortFp.off(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "aborted"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); } const testCases = { @@ -419,10 +393,9 @@ const testFuncs = { for (const [testName, testFunc] of Object.entries(testFuncs)) { for (const [testCaseName, testCase] of Object.entries(testCases)) { let dbName = testCaseName + "-" + testName + "_" + kTenantDefinedDbName; - testFunc(donorRst, testCase, dbName, kCollName); + testFunc(testCase, dbName, kCollName); } } -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_concurrent_writes.js b/jstests/replsets/tenant_migration_concurrent_writes.js index de13c5d5a1f..26e7fdbcd6d 100644 --- a/jstests/replsets/tenant_migration_concurrent_writes.js +++ b/jstests/replsets/tenant_migration_concurrent_writes.js @@ -15,29 +15,12 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); - -const donorRst = new ReplSetTest( - {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: 'recipient', - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); +load("jstests/replsets/libs/tenant_migration_test.js"); -donorRst.startSet(); -donorRst.initiate(); -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); +const donorRst = tenantMigrationTest.getDonorRst(); const primary = donorRst.getPrimary(); -const kRecipientConnString = recipientRst.getURL(); const kCollName = "testColl"; const kTenantDefinedDbName = "0"; @@ -248,14 +231,11 @@ function testWriteIsRejectedIfSentAfterMigrationHasCommitted(testCase, testOpts) const tenantId = testOpts.dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; - const res = assert.commandWorked( - TenantMigrationUtil.startMigration(testOpts.primaryHost, migrationOpts)); - assert.eq(res.state, "committed"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); runCommand(testOpts, ErrorCodes.TenantMigrationCommitted); testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); @@ -268,15 +248,12 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) { const tenantId = testOpts.dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts"); - const res = assert.commandWorked( - TenantMigrationUtil.startMigration(testOpts.primaryHost, migrationOpts)); - assert.eq(res.state, "aborted"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); abortFp.off(); // Wait until the in-memory migration state is updated after the migration has majority @@ -285,7 +262,7 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) { assert.soon(() => { const mtabs = testOpts.primaryDB.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return mtabs[tenantId].state === TenantMigrationUtil.accessState.kAborted; + return mtabs[tenantId].state === TenantMigrationTest.AccessState.kAborted; }); runCommand(testOpts); @@ -299,27 +276,24 @@ function testWriteBlocksIfMigrationIsInBlocking(testCase, testOpts) { const tenantId = testOpts.dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; let blockingFp = configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts); + + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); // Run the command after the migration enters the blocking state. - migrationThread.start(); blockingFp.wait(); testOpts.command.maxTimeMS = kMaxTimeMS; runCommand(testOpts, ErrorCodes.MaxTimeMSExpired); // Allow the migration to complete. blockingFp.off(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "committed"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } @@ -332,9 +306,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te const tenantId = testOpts.dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; let blockingFp = @@ -346,14 +318,12 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te .count + 1; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts); let resumeMigrationThread = new Thread(resumeMigrationAfterBlockingWrite, testOpts.primaryHost, targetBlockedWrites); // Run the command after the migration enters the blocking state. resumeMigrationThread.start(); - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); blockingFp.wait(); // The migration should unpause and commit after the write is blocked. Verify that the write is @@ -362,9 +332,9 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te // Verify that the migration succeeded. resumeMigrationThread.join(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "committed"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } @@ -377,9 +347,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes const tenantId = testOpts.dbName.split('_')[0]; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + tenantId, }; let blockingFp = @@ -392,14 +360,12 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes .count + 1; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, testOpts.primaryHost, migrationOpts); let resumeMigrationThread = new Thread(resumeMigrationAfterBlockingWrite, testOpts.primaryHost, targetBlockedWrites); // Run the command after the migration enters the blocking state. + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); resumeMigrationThread.start(); - migrationThread.start(); blockingFp.wait(); // The migration should unpause and abort after the write is blocked. Verify that the write is @@ -408,10 +374,10 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, tes // Verify that the migration aborted due to the simulated error. resumeMigrationThread.join(); - migrationThread.join(); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); abortFp.off(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "aborted"); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } @@ -969,6 +935,5 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) { } } -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js index b5f13ab8f88..a9d5f60b80a 100644 --- a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js +++ b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js @@ -7,9 +7,8 @@ (function() { 'use strict'; -load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); /** * Asserts that the number of recipientDataSync commands executed on the given recipient primary is @@ -33,76 +32,55 @@ function generateUniqueTenantId() { return chars[charIndex++]; } -let startupParams = {}; -startupParams["enableTenantMigrations"] = true; -// TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. -startupParams["failpoint.returnResponseOkForRecipientSyncDataCmd"] = tojson({mode: 'alwaysOn'}); +const donorRst = new ReplSetTest( + {nodes: 1, name: 'donorRst', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const rst0 = new ReplSetTest({nodes: 1, name: 'rst0', nodeOptions: {setParameter: startupParams}}); -const rst1 = new ReplSetTest({nodes: 1, name: 'rst1', nodeOptions: {setParameter: startupParams}}); -const rst2 = new ReplSetTest({nodes: 1, name: 'rst2', nodeOptions: {setParameter: startupParams}}); +donorRst.startSet(); +donorRst.initiate(); -rst0.startSet(); -rst0.initiate(); +const tenantMigrationTest0 = new TenantMigrationTest({name: jsTestName(), donorRst}); -rst1.startSet(); -rst1.initiate(); - -rst2.startSet(); -rst2.initiate(); - -const rst0Primary = rst0.getPrimary(); -const rst1Primary = rst1.getPrimary(); - -const kConfigDonorsNS = "config.tenantMigrationDonors"; +const donorPrimary = donorRst.getPrimary(); +const recipientPrimary = tenantMigrationTest0.getRecipientPrimary(); let numRecipientSyncDataCmdSent = 0; // Test that a retry of a donorStartMigration command joins the existing migration that has // completed but has not been garbage-collected. (() => { + const tenantId = `${generateUniqueTenantId()}_RetryAfterMigrationCompletes`; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), - tenantId: generateUniqueTenantId() + "RetryAfterMigrationCompletes", - readPreference: {mode: "primary"} + tenantId, }; - assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts)); - assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts)); + assert.commandWorked(tenantMigrationTest0.runMigration(migrationOpts)); + assert.commandWorked(tenantMigrationTest0.runMigration(migrationOpts)); // If the second donorStartMigration had started a duplicate migration, the recipient would have // received four recipientSyncData commands instead of two. numRecipientSyncDataCmdSent += 2; - checkNumRecipientSyncDataCmdExecuted(rst1Primary, numRecipientSyncDataCmdSent); + checkNumRecipientSyncDataCmdExecuted(recipientPrimary, numRecipientSyncDataCmdSent); })(); // Test that a retry of a donorStartMigration command joins the ongoing migration. (() => { + const tenantId = `${generateUniqueTenantId()}_RetryBeforeMigrationCompletes`; const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), - tenantId: generateUniqueTenantId() + "RetryBeforeMigrationCompletes", - readPreference: {mode: "primary"} + tenantId, }; - let migrationThread0 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts); - let migrationThread1 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts); - - migrationThread0.start(); - migrationThread1.start(); - migrationThread0.join(); - migrationThread1.join(); + assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts)); + assert.commandWorked(tenantMigrationTest0.startMigration(migrationOpts)); - assert.commandWorked(migrationThread0.returnData()); - assert.commandWorked(migrationThread1.returnData()); + assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts)); + assert.commandWorked(tenantMigrationTest0.waitForMigrationToComplete(migrationOpts)); // If the second donorStartMigration had started a duplicate migration, the recipient would have // received four recipientSyncData commands instead of two. numRecipientSyncDataCmdSent += 2; - checkNumRecipientSyncDataCmdExecuted(rst1Primary, numRecipientSyncDataCmdSent); + checkNumRecipientSyncDataCmdExecuted(recipientPrimary, numRecipientSyncDataCmdSent); })(); /** @@ -111,15 +89,14 @@ let numRecipientSyncDataCmdSent = 0; * has committed but not garbage-collected (i.e. the donor has not received donorForgetMigration). */ function testStartingConflictingMigrationAfterInitialMigrationCommitted( - donorPrimary, migrationOpts0, migrationOpts1) { - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts0)); - assert.commandFailedWithCode( - TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts1), - ErrorCodes.ConflictingOperationInProgress); + tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1) { + tenantMigrationTest0.runMigration(migrationOpts0); + assert.commandFailedWithCode(tenantMigrationTest1.runMigration(migrationOpts1), + ErrorCodes.ConflictingOperationInProgress); // If the second donorStartMigration had started a duplicate migration, there would be two donor // state docs. - let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); assert.eq(1, configDonorsColl.count({tenantId: migrationOpts0.tenantId})); } @@ -127,20 +104,12 @@ function testStartingConflictingMigrationAfterInitialMigrationCommitted( * Tests that if the client runs multiple donorStartMigration commands that would start conflicting * migrations, only one of the migrations will start and succeed. */ -function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migrationOpts1) { - let migrationThread0 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0); - let migrationThread1 = - new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1); +function testConcurrentConflictingMigrations( + tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1) { + const res0 = tenantMigrationTest0.startMigration(migrationOpts0); + const res1 = tenantMigrationTest1.startMigration(migrationOpts1); - migrationThread0.start(); - migrationThread1.start(); - migrationThread0.join(); - migrationThread1.join(); - - const res0 = migrationThread0.returnData(); - const res1 = migrationThread1.returnData(); - let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); // Verify that only one migration succeeded. assert(res0.ok || res1.ok); @@ -163,83 +132,75 @@ function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migra // Test migrations with different migrationIds but identical settings. (() => { - let makeMigrationOpts = () => { + let makeTestParams = () => { const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), tenantId: generateUniqueTenantId() + "DiffMigrationId", - readPreference: {mode: "primary"} }; const migrationOpts1 = Object.extend({}, migrationOpts0, true); migrationOpts1.migrationIdString = extractUUIDFromObject(UUID()); - return [migrationOpts0, migrationOpts1]; + return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1]; }; - testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary, - ...makeMigrationOpts()); - testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts()); + testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams()); + testConcurrentConflictingMigrations(...makeTestParams()); })(); // Test reusing a migrationId for different migration settings. // Test different tenantIds. (() => { - let makeMigrationOpts = () => { + let makeTestParams = () => { const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), tenantId: generateUniqueTenantId() + "DiffTenantId", - readPreference: {mode: "primary"} }; const migrationOpts1 = Object.extend({}, migrationOpts0, true); migrationOpts1.tenantId = generateUniqueTenantId() + "DiffTenantId"; - return [migrationOpts0, migrationOpts1]; + return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1]; }; - testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary, - ...makeMigrationOpts()); - testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts()); + testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams()); + testConcurrentConflictingMigrations(...makeTestParams()); })(); // Test different recipient connection strings. (() => { - let makeMigrationOpts = () => { + const tenantMigrationTest1 = new TenantMigrationTest({name: `${jsTestName()}1`, donorRst}); + + let makeTestParams = () => { const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), tenantId: generateUniqueTenantId() + "DiffRecipientConnString", - readPreference: {mode: "primary"} }; + // The recipient connection string will be populated by the TenantMigrationTest fixture, so + // no need to set it here. const migrationOpts1 = Object.extend({}, migrationOpts0, true); - migrationOpts1.recipientConnString = rst2.getURL(); - return [migrationOpts0, migrationOpts1]; + return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest1, migrationOpts1]; }; - testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary, - ...makeMigrationOpts()); - testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts()); + testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams()); + testConcurrentConflictingMigrations(...makeTestParams()); + + tenantMigrationTest1.stop(); })(); // Test different cloning read preference. (() => { - let makeMigrationOpts = () => { + let makeTestParams = () => { const migrationOpts0 = { migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: rst1.getURL(), tenantId: generateUniqueTenantId() + "DiffReadPref", - readPreference: {mode: "primary"} }; const migrationOpts1 = Object.extend({}, migrationOpts0, true); migrationOpts1.readPreference = {mode: "secondary"}; - return [migrationOpts0, migrationOpts1]; + return [tenantMigrationTest0, migrationOpts0, tenantMigrationTest0, migrationOpts1]; }; - testStartingConflictingMigrationAfterInitialMigrationCommitted(rst0Primary, - ...makeMigrationOpts()); - testConcurrentConflictingMigrations(rst0Primary, ...makeMigrationOpts()); + testStartingConflictingMigrationAfterInitialMigrationCommitted(...makeTestParams()); + testConcurrentConflictingMigrations(...makeTestParams()); })(); -rst0.stopSet(); -rst1.stopSet(); -rst2.stopSet(); +tenantMigrationTest0.stop(); +donorRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_donor_abort_state_transition.js b/jstests/replsets/tenant_migration_donor_abort_state_transition.js index 8ec5b6bedc7..4b40cc59980 100644 --- a/jstests/replsets/tenant_migration_donor_abort_state_transition.js +++ b/jstests/replsets/tenant_migration_donor_abort_state_transition.js @@ -9,29 +9,11 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); load("jstests/libs/parallelTester.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -const kTenantId = "testTenantId"; - -const donorRst = new ReplSetTest( - {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipientRst", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); - -donorRst.startSet(); -donorRst.initiate(); - -recipientRst.startSet(); -recipientRst.initiate(); +const kTenantIdPrefix = "testTenantId"; +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); /** * Starts a migration and forces the write to insert the donor's state doc to abort on the first few @@ -40,19 +22,24 @@ recipientRst.initiate(); function testAbortInitialState(donorRst) { const donorPrimary = donorRst.getPrimary(); + // Force the storage transaction for the insert to abort prior to inserting the WiredTiger + // record. + let writeConflictFp = configureFailPoint(donorPrimary, "WTWriteConflictException"); + + const tenantId = `${kTenantIdPrefix}-initial`; const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - tenantId: kTenantId + "-initial", - recipientConnString: recipientRst.getURL(), - readPreference: {mode: "primary"}, + tenantId, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; - // Force the storage transaction for the insert to abort prior to inserting the WiredTiger - // record. - let writeConflictFp = configureFailPoint(donorPrimary, "WTWriteConflictException"); + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + // Run the migration in its own thread, since the initial 'donorStartMigration' command will + // hang due to the failpoint. let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); writeConflictFp.wait(); @@ -65,8 +52,8 @@ function testAbortInitialState(donorRst) { // Verify that the migration completes successfully. assert.commandWorked(migrationThread.returnData()); - TenantMigrationUtil.waitForMigrationToCommit( - donorRst.nodes, migrationId, migrationOpts.tenantId); + tenantMigrationTest.waitForNodesToReachState( + donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted); } /** @@ -80,21 +67,18 @@ function testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nex nextState}" after reaching failpoint "${pauseFailPoint}"`); const donorPrimary = donorRst.getPrimary(); + const tenantId = `${kTenantIdPrefix}-${nextState}`; const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - tenantId: kTenantId + "-" + nextState, - recipientConnString: recipientRst.getURL(), - readPreference: {mode: "primary"}, + tenantId, }; setUpFailPoints.forEach(failPoint => configureFailPoint(donorPrimary, failPoint)); let pauseFp = configureFailPoint(donorPrimary, pauseFailPoint); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); pauseFp.wait(); // Force the storage transaction for the write to transition to the next state to abort prior to @@ -111,30 +95,36 @@ function testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nex opObserverFp.off(); // Verify that the migration completes successfully. - assert.commandWorked(migrationThread.returnData()); - if (nextState == "aborted") { - TenantMigrationUtil.waitForMigrationToAbort( - donorRst.nodes, migrationId, migrationOpts.tenantId); + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + if (nextState === TenantMigrationTest.State.kAborted) { + tenantMigrationTest.waitForNodesToReachState( + donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kAborted); } else { - TenantMigrationUtil.waitForMigrationToCommit( - donorRst.nodes, migrationId, migrationOpts.tenantId); + tenantMigrationTest.waitForNodesToReachState( + donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted); } } +const donorRst = tenantMigrationTest.getDonorRst(); jsTest.log("Test aborting donor's state doc insert"); testAbortInitialState(donorRst); jsTest.log("Test aborting donor's state doc update"); -[{pauseFailPoint: "pauseTenantMigrationAfterDataSync", nextState: "blocking"}, - {pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts", nextState: "committed"}, +[{ + pauseFailPoint: "pauseTenantMigrationAfterDataSync", + nextState: TenantMigrationTest.State.kBlocking +}, + { + pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts", + nextState: TenantMigrationTest.State.kCommitted + }, { pauseFailPoint: "pauseTenantMigrationAfterBlockingStarts", setUpFailPoints: ["abortTenantMigrationAfterBlockingStarts"], - nextState: "aborted" + nextState: TenantMigrationTest.State.kAborted }].forEach(({pauseFailPoint, setUpFailPoints = [], nextState}) => { testAbortStateTransition(donorRst, pauseFailPoint, setUpFailPoints, nextState); }); -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); }()); diff --git a/jstests/replsets/tenant_migration_donor_current_op.js b/jstests/replsets/tenant_migration_donor_current_op.js index 5647a06d3f8..55481a20e8c 100644 --- a/jstests/replsets/tenant_migration_donor_current_op.js +++ b/jstests/replsets/tenant_migration_donor_current_op.js @@ -11,9 +11,8 @@ "use strict"; load("jstests/libs/fail_point_util.js"); -load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); // An object that mirrors the donor migration states. const migrationStates = { @@ -24,11 +23,9 @@ const migrationStates = { kAborted: 4 }; -const donorRst = new ReplSetTest( - {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); const recipientRst = new ReplSetTest({ nodes: 1, - name: 'recipient', + name: 'donor', nodeOptions: { setParameter: { enableTenantMigrations: true, @@ -37,31 +34,24 @@ const recipientRst = new ReplSetTest({ } } }); +recipientRst.startSet(); +recipientRst.initiate(); const kRecipientConnString = recipientRst.getURL(); const kTenantId = 'testTenantId'; -recipientRst.startSet(); -recipientRst.initiate(); - (() => { jsTestLog("Testing currentOp output for migration in data sync state"); - donorRst.startSet(); - donorRst.initiate(); - const donorPrimary = donorRst.getPrimary(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, tenantId: kTenantId, - readPreference: {mode: "primary"}, }; let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); - - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); fp.wait(); const res = assert.commandWorked( @@ -73,28 +63,22 @@ recipientRst.initiate(); assert.eq(res.inprog[0].migrationCompleted, false); fp.off(); - migrationThread.join(); - donorRst.stopSet(); + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + tenantMigrationTest.stop(); })(); (() => { jsTestLog("Testing currentOp output for migration in blocking state"); - donorRst.startSet(); - donorRst.initiate(); - const donorPrimary = donorRst.getPrimary(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, tenantId: kTenantId, - readPreference: {mode: "primary"}, }; let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); - - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); fp.wait(); const res = assert.commandWorked( @@ -107,25 +91,22 @@ recipientRst.initiate(); assert.eq(res.inprog[0].migrationCompleted, false); fp.off(); - migrationThread.join(); - donorRst.stopSet(); + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + tenantMigrationTest.stop(); })(); (() => { jsTestLog("Testing currentOp output for aborted migration"); - donorRst.startSet(); - donorRst.initiate(); - const donorPrimary = donorRst.getPrimary(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, tenantId: kTenantId, - readPreference: {mode: "primary"}, }; configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); + assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); const res = assert.commandWorked( donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"})); @@ -137,25 +118,21 @@ recipientRst.initiate(); assert(res.inprog[0].commitOrAbortOpTime); assert(res.inprog[0].abortReason); assert.eq(res.inprog[0].migrationCompleted, false); - - donorRst.stopSet(); + tenantMigrationTest.stop(); })(); // Check currentOp while in committed state before and after a migration has completed. (() => { jsTestLog("Testing currentOp output for committed migration"); - donorRst.startSet(); - donorRst.initiate(); - const donorPrimary = donorRst.getPrimary(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, tenantId: kTenantId, - readPreference: {mode: "primary"}, }; - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); + assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); let res = donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}); assert.eq(res.inprog.length, 1); @@ -168,8 +145,7 @@ recipientRst.initiate(); jsTestLog("Testing currentOp output for a committed migration after donorForgetMigration"); - assert.commandWorked( - TenantMigrationUtil.forgetMigration(donorPrimary.host, migrationOpts.migrationIdString)); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); res = donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}); assert.eq(res.inprog.length, 1); @@ -180,8 +156,7 @@ recipientRst.initiate(); assert(res.inprog[0].commitOrAbortOpTime); assert(res.inprog[0].expireAt); assert.eq(res.inprog[0].migrationCompleted, true); - - donorRst.stopSet(); + tenantMigrationTest.stop(); })(); recipientRst.stopSet(); diff --git a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js index ad43691ecec..5897bc68806 100644 --- a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js +++ b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js @@ -11,46 +11,16 @@ "use strict"; load("jstests/libs/fail_point_util.js"); -load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); - -const donorRst = new ReplSetTest( - {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: 'recipient', - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); - -donorRst.startSet(); -donorRst.initiate(); +load("jstests/libs/parallelTester.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const kMaxSleepTimeMS = 1000; const kTenantId = 'testTenantId'; -const kConfigDonorsNS = "config.tenantMigrationDonors"; -let donorPrimary = donorRst.getPrimary(); -let kRecipientConnString = recipientRst.getURL(); - -const migrationOpts = { - migrationIdString: extractUUIDFromObject(UUID()), - recipientConnString: kRecipientConnString, - tenantId: kTenantId, - readPreference: {mode: "primary"}, -}; - -let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); +let donorPrimary = tenantMigrationTest.getDonorPrimary(); // Force the migration to pause after entering a randomly selected state to simulate a failure. Random.setRandomSeed(); @@ -65,63 +35,68 @@ if (index < kMigrationFpNames.length) { fp = configureFailPoint(donorPrimary, kMigrationFpNames[index]); } -migrationThread.start(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantId: kTenantId +}; +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); sleep(Math.random() * kMaxSleepTimeMS); // Add the initial sync node and make sure that it does not step up. -var initialSyncNode = +const donorRst = tenantMigrationTest.getDonorRst(); +const initialSyncNode = donorRst.add({rsConfig: {priority: 0, votes: 0}, setParameter: {enableTenantMigrations: true}}); donorRst.reInitiate(); jsTestLog("Waiting for initial sync to finish."); donorRst.awaitSecondaryNodes(); -let configDonorsColl = initialSyncNode.getCollection(kConfigDonorsNS); +let configDonorsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigDonorsNS); let donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); if (donorDoc) { let state = donorDoc.state; switch (state) { - case "data sync": - assert.soon(() => TenantMigrationUtil + case TenantMigrationTest.State.kDataSync: + assert.soon(() => tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) - .state == TenantMigrationUtil.accessState.kAllow); + .state == TenantMigrationTest.AccessState.kAllow); break; - case "blocking": - assert.soon(() => TenantMigrationUtil + case TenantMigrationTest.State.kBlocking: + assert.soon(() => tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) - .state == TenantMigrationUtil.accessState.kBlockWritesAndReads); + .state == TenantMigrationTest.AccessState.kBlockWritesAndReads); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); break; - case "committed": - assert.soon(() => TenantMigrationUtil + case TenantMigrationTest.State.kCommitted: + assert.soon(() => tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) - .state == TenantMigrationUtil.accessState.kReject); + .state == TenantMigrationTest.AccessState.kReject); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) .commitOrAbortOpTime, donorDoc.commitOrAbortOpTime) == 0); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); break; - case "aborted": - assert.soon(() => TenantMigrationUtil + case TenantMigrationTest.State.kAborted: + assert.soon(() => tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) - .state == TenantMigrationUtil.accessState.kAborted); + .state == TenantMigrationTest.AccessState.kAborted); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) .commitOrAbortOpTime, donorDoc.commitOrAbortOpTime) == 0); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(initialSyncNode, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); @@ -135,7 +110,6 @@ if (fp) { fp.off(); } -migrationThread.join(); -donorRst.stopSet(); -recipientRst.stopSet(); +assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js index f3c0d424169..e93795322c4 100644 --- a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js +++ b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js @@ -10,58 +10,45 @@ load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); const kMaxSleepTimeMS = 100; const kTenantId = "testTenantId"; -let recipientStartupParams = {}; -recipientStartupParams["enableTenantMigrations"] = true; -// TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. -recipientStartupParams["failpoint.returnResponseOkForRecipientSyncDataCmd"] = - tojson({mode: 'alwaysOn'}); - /** * Runs the donorStartMigration command to start a migration, and interrupts the migration on the * donor using the 'interruptFunc', and verifies the command response using the * 'verifyCmdResponseFunc'. */ function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) { - const donorRst = new ReplSetTest( - {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); - const recipientRst = new ReplSetTest( - {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: recipientStartupParams}}); - - donorRst.startSet(); - donorRst.initiate(); - - recipientRst.startSet(); - recipientRst.initiate(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); - const donorPrimary = donorRst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); - migrationThread.start(); + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const runMigrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); + runMigrationThread.start(); // Wait for to donorStartMigration command to start. assert.soon(() => donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}) .inprog.length > 0); sleep(Math.random() * kMaxSleepTimeMS); - interruptFunc(donorRst, migrationId, migrationOpts.tenantId); - verifyCmdResponseFunc(migrationThread); + interruptFunc(donorRst, migrationId, kTenantId); + verifyCmdResponseFunc(runMigrationThread); - donorRst.stopSet(); - recipientRst.stopSet(); + tenantMigrationTest.stop(); } /** @@ -70,40 +57,26 @@ function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) * 'verifyCmdResponseFunc'. */ function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) { - const donorRst = new ReplSetTest({ - nodes: 1, - name: "donorRst", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - } - } - }); - const recipientRst = new ReplSetTest( - {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: recipientStartupParams}}); - - donorRst.startSet(); - donorRst.initiate(); - - recipientRst.startSet(); - recipientRst.initiate(); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); - const donorPrimary = donorRst.getPrimary(); + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - let forgetMigrationThread = new Thread( - TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString); + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + const forgetMigrationThread = new Thread( + TenantMigrationUtil.forgetMigrationAsync, migrationOpts.migrationIdString, donorRstArgs); forgetMigrationThread.start(); - // Wait for to donorForgetMigration command to start. + // Wait for the donorForgetMigration command to start. assert.soon(() => { const res = assert.commandWorked( donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"})); @@ -114,8 +87,7 @@ function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) interruptFunc(donorRst, migrationId, migrationOpts.tenantId); verifyCmdResponseFunc(forgetMigrationThread); - donorRst.stopSet(); - recipientRst.stopSet(); + tenantMigrationTest.stop(); } /** @@ -130,8 +102,8 @@ function assertCmdSucceededOrInterruptedDueToStepDown(cmdThread) { * Asserts the command either succeeded or failed with a NotPrimary or shutdown or network error. */ function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) { + const res = cmdThread.returnData(); try { - const res = cmdThread.returnData(); assert(res.ok || ErrorCodes.isNotPrimaryError(res.code) || ErrorCodes.isShutdownError(res.code)); } catch (e) { diff --git a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js index ef9629e5713..17537f760d9 100644 --- a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js +++ b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js @@ -10,10 +10,10 @@ load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); const kMaxSleepTimeMS = 100; -const kConfigDonorsNS = "config.tenantMigrationDonors"; const kTenantId = "testTenantId"; // Set the delay before a donor state doc is garbage collected to be short to speed up the test. @@ -27,11 +27,14 @@ const kTTLMonitorSleepSecs = 1; * primary stepped down or shut down after inserting the doc), asserts that the migration * eventually commits. */ -function assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, tenantId) { - const donorPrimary = donorRst.getPrimary(); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); +function assertMigrationCommitsIfDurableStateExists(tenantMigrationTest, migrationId, tenantId) { + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); if (configDonorsColl.count({_id: migrationId}) > 0) { - TenantMigrationUtil.waitForMigrationToCommit(donorRst.nodes, migrationId, tenantId); + tenantMigrationTest.waitForNodesToReachState( + donorRst.nodes, migrationId, tenantId, TenantMigrationTest.State.kCommitted); } } @@ -42,47 +45,26 @@ function assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, tenan function testDonorStartMigrationInterrupt(interruptFunc) { const donorRst = new ReplSetTest( {nodes: 3, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); - const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipientRst", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint - // 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } - }); donorRst.startSet(); donorRst.initiate(); - recipientRst.startSet(); - recipientRst.initiate(); - - const donorPrimary = donorRst.getPrimary(); - - const donorRstArgs = { - name: donorRst.name, - nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`), - nodeOptions: donorRst.nodeOptions, - keyFile: donorRst.keyFile, - host: donorRst.host, - waitForKeys: false, - }; + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); - let migrationThread = new Thread( - TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts); - migrationThread.start(); + const runMigrationThread = new Thread(TenantMigrationUtil.runMigrationAsync, + migrationOpts, + donorRstArgs, + true /* retryOnRetryableErrors */); + runMigrationThread.start(); // Wait for to donorStartMigration command to start. assert.soon(() => donorPrimary.adminCommand({currentOp: true, desc: "tenant donor migration"}) @@ -91,11 +73,12 @@ function testDonorStartMigrationInterrupt(interruptFunc) { sleep(Math.random() * kMaxSleepTimeMS); interruptFunc(donorRst); - assert.commandWorked(migrationThread.returnData()); - assertMigrationCommitsIfDurableStateExists(donorRst, migrationId, migrationOpts.tenantId); + assert.commandWorked(runMigrationThread.returnData()); + assertMigrationCommitsIfDurableStateExists( + tenantMigrationTest, migrationId, migrationOpts.tenantId); + tenantMigrationTest.stop(); donorRst.stopSet(); - recipientRst.stopSet(); } /** @@ -136,30 +119,23 @@ function testDonorForgetMigrationInterrupt(interruptFunc) { recipientRst.startSet(); recipientRst.initiate(); - let donorPrimary = donorRst.getPrimary(); - - const donorRstArgs = { - name: donorRst.name, - nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`), - nodeOptions: donorRst.nodeOptions, - keyFile: donorRst.keyFile, - host: donorRst.host, - waitForKeys: false, - }; + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); + let donorPrimary = tenantMigrationTest.getDonorPrimary(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, + recipientConnString: recipientRst.getURL(), }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - let forgetMigrationThread = - new Thread(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors, - donorRstArgs, - migrationOpts.migrationIdString); + assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + const forgetMigrationThread = new Thread(TenantMigrationUtil.forgetMigrationAsync, + migrationOpts.migrationIdString, + donorRstArgs, + true /* retryOnRetryableErrors */); forgetMigrationThread.start(); // Wait for to donorForgetMigration command to start. @@ -174,13 +150,14 @@ function testDonorForgetMigrationInterrupt(interruptFunc) { donorPrimary = donorRst.getPrimary(); assert.commandWorkedOrFailedWithCode( - TenantMigrationUtil.forgetMigration(donorPrimary.host, extractUUIDFromObject(migrationId)), + tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString), ErrorCodes.NoSuchTenantMigration); assert.commandWorked(forgetMigrationThread.returnData()); - TenantMigrationUtil.waitForMigrationGarbageCollection( + tenantMigrationTest.waitForMigrationGarbageCollection( donorRst.nodes, migrationId, migrationOpts.tenantId); + tenantMigrationTest.stop(); donorRst.stopSet(); recipientRst.stopSet(); } diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js index 7dfa60ad132..4bfe59595ae 100644 --- a/jstests/replsets/tenant_migration_donor_retry.js +++ b/jstests/replsets/tenant_migration_donor_retry.js @@ -11,14 +11,15 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -const kTenantId = "testTenantId"; +const kTenantIdPrefix = "testTenantId"; let testNum = 0; -const kConfigDonorsNS = "config.tenantMigrationDonors"; +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); function makeTenantId() { - return kTenantId + testNum++; + return kTenantIdPrefix + testNum++; } /** @@ -29,17 +30,14 @@ function makeTenantId() { * TODO: This function should be changed to testDonorRetryRecipientSyncDataCmdOnError once there is * a way to differentiate between local and remote stepdown/shutdown error. */ -function testMigrationAbortsOnRecipientSyncDataCmdError( - donorRst, recipientRst, errorCode, failMode) { - const donorPrimary = donorRst.getPrimary(); - const recipientPrimary = recipientRst.getPrimary(); +function testMigrationAbortsOnRecipientSyncDataCmdError(errorCode, failMode) { + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + const tenantId = makeTenantId(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), - tenantId: kTenantId + makeTenantId(), - readPreference: {mode: "primary"}, + tenantId, }; let fp = configureFailPoint(recipientPrimary, @@ -51,9 +49,7 @@ function testMigrationAbortsOnRecipientSyncDataCmdError( }, failMode); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); // Verify that the command failed. const times = failMode.times ? failMode.times : 1; @@ -62,10 +58,10 @@ function testMigrationAbortsOnRecipientSyncDataCmdError( } fp.off(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "aborted"); - assert.eq(res.abortReason.code, errorCode); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); + assert.eq(stateRes.abortReason.code, errorCode); return migrationId; } @@ -78,19 +74,15 @@ function testMigrationAbortsOnRecipientSyncDataCmdError( * TODO: This function should be changed to testDonorRetryRecipientForgetMigrationCmdOnError once * there is a way to differentiate between local and remote stepdown/shutdown error. */ -function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipientRst, errorCode) { - const donorPrimary = donorRst.getPrimary(); - const recipientPrimary = recipientRst.getPrimary(); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); - +function testMigrationAbortsOnRecipientForgetMigrationCmdError(errorCode) { + const tenantId = makeTenantId(); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), - tenantId: makeTenantId(), - readPreference: {mode: "primary"}, + tenantId, }; + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); let fp = configureFailPoint(recipientPrimary, "failCommand", { @@ -100,56 +92,32 @@ function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipie }, {times: 1}); - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - let forgetMigrationThread = new Thread( - TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString); - forgetMigrationThread.start(); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); // Verify that the initial recipientForgetMigration command failed. + assert.commandFailedWithCode( + tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString), errorCode); fp.wait(); - - forgetMigrationThread.join(); - assert.commandFailedWithCode(forgetMigrationThread.returnData(), errorCode); fp.off(); - const donorStateDoc = configDonorsColl.findOne({_id: migrationId}); - assert.eq("committed", donorStateDoc.state); - assert(!donorStateDoc.expireAt); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); + assert(!stateRes.expireAt); } -const donorRst = new ReplSetTest( - {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipientRst", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); - -donorRst.startSet(); -donorRst.initiate(); - -recipientRst.startSet(); -recipientRst.initiate(); - -const donorPrimary = donorRst.getPrimary(); +const donorPrimary = tenantMigrationTest.getDonorPrimary(); (() => { jsTest.log( "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" + " on recipient stepdown errors"); - const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( - donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {times: 1}); + const migrationId = + testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.NotWritablePrimary, {times: 1}); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp); - assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); + assert.eq(TenantMigrationTest.State.kAborted, + configDonorsColl.findOne({_id: migrationId}).state); })(); (() => { @@ -157,12 +125,13 @@ const donorPrimary = donorRst.getPrimary(); "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" + " on recipient shutdown errors"); - const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( - donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {times: 1}); + const migrationId = + testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.ShutdownInProgress, {times: 1}); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp); - assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); + assert.eq(TenantMigrationTest.State.kAborted, + configDonorsColl.findOne({_id: migrationId}).state); })(); (() => { @@ -170,12 +139,13 @@ const donorPrimary = donorRst.getPrimary(); "Test that the donor does not retry recipientSyncData (with returnAfterReachingDonorTimestamp) " + "on stepdown errors"); - const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( - donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {skip: 1}); + const migrationId = + testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.NotWritablePrimary, {skip: 1}); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp); - assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); + assert.eq(TenantMigrationTest.State.kAborted, + configDonorsColl.findOne({_id: migrationId}).state); })(); (() => { @@ -183,25 +153,23 @@ const donorPrimary = donorRst.getPrimary(); "Test that the donor does not retry recipientSyncData (with returnAfterReachingDonorTimestamp) " + "on recipient shutdown errors"); - const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( - donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {skip: 1}); + const migrationId = + testMigrationAbortsOnRecipientSyncDataCmdError(ErrorCodes.ShutdownInProgress, {skip: 1}); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp); - assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); + assert.eq(TenantMigrationTest.State.kAborted, + configDonorsColl.findOne({_id: migrationId}).state); })(); (() => { jsTest.log("Test that the donor does not retry recipientForgetMigration on stepdown errors"); - testMigrationAbortsOnRecipientForgetMigrationCmdError( - donorRst, recipientRst, ErrorCodes.NotWritablePrimary); + testMigrationAbortsOnRecipientForgetMigrationCmdError(ErrorCodes.NotWritablePrimary); })(); (() => { jsTest.log("Test that the donor does not retry recipientForgetMigration on shutdown errors"); - - testMigrationAbortsOnRecipientForgetMigrationCmdError( - donorRst, recipientRst, ErrorCodes.ShutdownInProgress); + testMigrationAbortsOnRecipientForgetMigrationCmdError(ErrorCodes.ShutdownInProgress); })(); // Each donor state doc is updated three times throughout the lifetime of a tenant migration: @@ -214,20 +182,25 @@ const kWriteErrorTimeMS = 50; (() => { jsTest.log("Test that the donor retries state doc insert on retriable errors"); + const tenantId = makeTenantId(); + const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), - tenantId: makeTenantId(), - readPreference: {mode: "primary"}, + tenantId, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; let fp = configureFailPoint(donorPrimary, "failCollectionInserts", { - collectionNS: kConfigDonorsNS, + collectionNS: TenantMigrationTest.kConfigDonorsNS, }); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst()); + + // Start up a new thread to run this migration, since the 'failCollectionInserts' failpoint will + // cause the initial 'donorStartMigration' command to loop forever without returning. + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); // Make the insert keep failing for some time. @@ -238,35 +211,41 @@ const kWriteErrorTimeMS = 50; migrationThread.join(); assert.commandWorked(migrationThread.returnData()); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); - assert.eq("committed", configDonorsColl.findOne({_id: migrationId}).state); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); + const donorStateDoc = configDonorsColl.findOne({_id: migrationId}); + assert.eq(TenantMigrationTest.State.kCommitted, donorStateDoc.state); })(); (() => { jsTest.log("Test that the donor retries state doc update on retriable errors"); + const tenantId = kTenantIdPrefix + "RetryOnStateDocUpdateError"; + const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), - tenantId: kTenantId + "RetryOnStateDocUpdateError", - readPreference: {mode: "primary"}, + tenantId, + recipientConnString: tenantMigrationTest.getRecipientConnString(), }; - // Use a random number of skips to fail a random update to config.tenantMigrationDonors. - let fp = configureFailPoint(donorPrimary, - "failCollectionUpdates", - { - collectionNS: kConfigDonorsNS, - }, - {skip: Math.floor(Math.random() * kTotalStateDocUpdates)}); + const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst()); - let migrationThread = new Thread((donorPrimaryHost, migrationOpts) => { + // Use a random number of skips to fail a random update to config.tenantMigrationDonors. + const fp = configureFailPoint(donorPrimary, + "failCollectionUpdates", + { + collectionNS: TenantMigrationTest.kConfigDonorsNS, + }, + {skip: Math.floor(Math.random() * kTotalStateDocUpdates)}); + + // Start up a new thread to run this migration, since we want to continuously send + // 'donorStartMigration' commands while the 'failCollectionUpdates' failpoint is on. + const migrationThread = new Thread((migrationOpts, donorRstArgs) => { load("jstests/replsets/libs/tenant_migration_util.js"); - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimaryHost, migrationOpts)); - assert.commandWorked( - TenantMigrationUtil.forgetMigration(donorPrimaryHost, migrationOpts.migrationIdString)); - }, donorPrimary.host, migrationOpts); + assert.commandWorked(TenantMigrationUtil.runMigrationAsync(migrationOpts, donorRstArgs)); + assert.commandWorked(TenantMigrationUtil.forgetMigrationAsync( + migrationOpts.migrationIdString, donorRstArgs)); + }, migrationOpts, donorRstArgs); migrationThread.start(); // Make the update keep failing for some time. @@ -275,12 +254,11 @@ const kWriteErrorTimeMS = 50; fp.off(); migrationThread.join(); - const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); const donorStateDoc = configDonorsColl.findOne({_id: migrationId}); - assert.eq("committed", donorStateDoc.state); + assert.eq(donorStateDoc.state, TenantMigrationTest.State.kCommitted); assert(donorStateDoc.expireAt); })(); -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_donor_rollback_recovery.js b/jstests/replsets/tenant_migration_donor_rollback_recovery.js index 77036f11e3d..5db336581d3 100644 --- a/jstests/replsets/tenant_migration_donor_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_donor_rollback_recovery.js @@ -10,9 +10,9 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/replsets/libs/rollback_test.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -const kConfigDonorsNS = "config.tenantMigrationDonors"; const kTenantId = "testTenantId"; const kMaxSleepTimeMS = 250; @@ -70,24 +70,20 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { config.members[2].priority = 0; donorRst.initiateWithHighElectionTimeout(config); + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), recipientRst, donorRst}); + const donorRollbackTest = new RollbackTest("donorRst", donorRst); - const donorRstArgs = { - name: donorRst.name, - nodeHosts: donorRst.nodes.map(node => `127.0.0.1:${node.port}`), - nodeOptions: donorRst.nodeOptions, - keyFile: donorRst.keyFile, - host: donorRst.host, - waitForKeys: false, - }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); let donorPrimary = donorRollbackTest.getPrimary(); - setUpFunc(donorRstArgs, donorPrimary); + setUpFunc(tenantMigrationTest, donorRstArgs); donorRollbackTest.awaitLastOpCommitted(); // Writes during this state will be rolled back. donorRollbackTest.transitionToRollbackOperations(); - rollbackOpsFunc(donorRstArgs, donorPrimary); + rollbackOpsFunc(tenantMigrationTest, donorRstArgs); // Transition to replication steady state. donorRollbackTest.transitionToSyncSourceOperationsBeforeRollback(); @@ -99,7 +95,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { // secondary that the primary replicates data onto. donorPrimary = donorRollbackTest.getPrimary(); let donorSecondary = donorRollbackTest.getSecondary(); - steadyStateFunc(donorPrimary, donorSecondary); + steadyStateFunc(tenantMigrationTest, donorPrimary, donorSecondary); donorRollbackTest.stop(); } @@ -114,23 +110,31 @@ function testRollbackInitialState() { const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-initial"); let migrationThread; - let setUpFunc = (donorRstArgs, donorPrimary) => {}; + let setUpFunc = (tenantMigrationTest, donorRstArgs) => {}; - let rollbackOpsFunc = (donorRstArgs, donorPrimary) => { - // Start the migration and wait for the primary to insert the state doc. - migrationThread = new Thread( - TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts); + let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => { + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + + // Start the migration asynchronously and wait for the primary to insert the state doc. + migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync, + migrationOpts, + donorRstArgs, + true /* retryOnRetryableErrors */); migrationThread.start(); assert.soon(() => { - return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({_id: migrationId}); + return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({ + _id: migrationId + }); }); }; - let steadyStateFunc = (donorPrimary, donorSecondary) => { + let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => { // Verify that the migration restarted successfully on the new primary despite rollback. assert.commandWorked(migrationThread.returnData()); - TenantMigrationUtil.assertMigrationCommitted( - [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId); + tenantMigrationTest.assertNodesInExpectedState([donorPrimary, donorSecondary], + migrationId, + migrationOpts.tenantId, + TenantMigrationTest.State.kCommitted); }; testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc); @@ -151,32 +155,38 @@ function testRollBackStateTransition(pauseFailPoint, setUpFailPoints, nextState) const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-" + nextState); let migrationThread, pauseFp; - let setUpFunc = (donorRstArgs, donorPrimary) => { + let setUpFunc = (tenantMigrationTest, donorRstArgs) => { + const donorPrimary = tenantMigrationTest.getDonorPrimary(); setUpFailPoints.forEach(failPoint => configureFailPoint(donorPrimary, failPoint)); pauseFp = configureFailPoint(donorPrimary, pauseFailPoint); - migrationThread = new Thread( - TenantMigrationUtil.startMigrationRetryOnRetryableErrors, donorRstArgs, migrationOpts); + migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync, + migrationOpts, + donorRstArgs, + true /* retryOnRetryableErrors */); migrationThread.start(); pauseFp.wait(); }; - let rollbackOpsFunc = (donorRstArgs, donorPrimary) => { + let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => { + const donorPrimary = tenantMigrationTest.getDonorPrimary(); // Resume the migration and wait for the primary to do the write for the state transition. pauseFp.off(); assert.soon(() => { - return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({ + return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({ _id: migrationId, state: nextState }); }); }; - let steadyStateFunc = (donorPrimary, donorSecondary) => { + let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => { // Verify that the migration resumed successfully on the new primary despite the rollback. assert.commandWorked(migrationThread.returnData()); - TenantMigrationUtil.waitForMigrationToCommit( - [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId); + tenantMigrationTest.waitForNodesToReachState([donorPrimary, donorSecondary], + migrationId, + migrationOpts.tenantId, + TenantMigrationTest.State.kCommitted); }; testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc); @@ -193,32 +203,33 @@ function testRollBackMarkingStateGarbageCollectable() { const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-markGarbageCollectable"); let forgetMigrationThread; - let setUpFunc = (donorRstArgs, donorPrimary) => { - const res = assert.commandWorked( - TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - assert.eq("committed", res.state); + let setUpFunc = (tenantMigrationTest, donorRstArgs) => { + const res = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(TenantMigrationTest.State.kCommitted, res.state); }; - let rollbackOpsFunc = (donorRstArgs, donorPrimary) => { + let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => { + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + // Run donorForgetMigration and wait for the primary to do the write to mark the state doc // as garbage collectable. - forgetMigrationThread = - new Thread(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors, - donorRstArgs, - migrationOpts.migrationIdString); + forgetMigrationThread = new Thread(TenantMigrationUtil.forgetMigrationAsync, + migrationOpts.migrationIdString, + donorRstArgs, + true /* retryOnRetryableErrors */); forgetMigrationThread.start(); assert.soon(() => { - return 1 === donorPrimary.getCollection(kConfigDonorsNS).count({ + return 1 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({ _id: migrationId, expireAt: {$exists: 1} }); }); }; - let steadyStateFunc = (donorPrimary, donorSecondary) => { + let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => { // Verify that the migration state got garbage collected successfully despite the rollback. assert.commandWorked(forgetMigrationThread.returnData()); - TenantMigrationUtil.waitForMigrationGarbageCollection( + tenantMigrationTest.waitForMigrationGarbageCollection( [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId); }; @@ -235,13 +246,13 @@ function testRollBackRandom() { const migrationOpts = makeMigrationOpts(migrationId, kTenantId + "-random"); let migrationThread; - let setUpFunc = (donorRstArgs, donorPrimary) => { + let setUpFunc = (tenantMigrationTest, donorRstArgs) => { migrationThread = new Thread((donorRstArgs, migrationOpts) => { load("jstests/replsets/libs/tenant_migration_util.js"); - assert.commandWorked(TenantMigrationUtil.startMigrationRetryOnRetryableErrors( - donorRstArgs, migrationOpts)); - assert.commandWorked(TenantMigrationUtil.forgetMigrationRetryOnRetryableErrors( - donorRstArgs, migrationOpts.migrationIdString)); + assert.commandWorked(TenantMigrationUtil.runMigrationAsync( + migrationOpts, donorRstArgs, true /* retryOnRetryableErrors */)); + assert.commandWorked(TenantMigrationUtil.forgetMigrationAsync( + migrationOpts.migrationIdString, donorRstArgs, true /* retryOnRetryableErrors */)); }, donorRstArgs, migrationOpts); // Start the migration and wait for a random amount of time before transitioning to the @@ -250,18 +261,22 @@ function testRollBackRandom() { sleep(Math.random() * kMaxSleepTimeMS); }; - let rollbackOpsFunc = (donorRstArgs, donorPrimary) => { + let rollbackOpsFunc = (tenantMigrationTest, donorRstArgs) => { // Let the migration run in the rollback operations state for a random amount of time. sleep(Math.random() * kMaxSleepTimeMS); }; - let steadyStateFunc = (donorPrimary, donorSecondary) => { + let steadyStateFunc = (tenantMigrationTest, donorPrimary, donorSecondary) => { // Verify that the migration completed and was garbage collected successfully despite the // rollback. migrationThread.join(); - if (donorPrimary.getCollection(kConfigDonorsNS).count({_id: migrationId}) > 0) { - TenantMigrationUtil.waitForMigrationToCommit( - [donorPrimary, donorSecondary], migrationId, migrationOpts.tenantId); + if (donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({ + _id: migrationId + }) > 0) { + tenantMigrationTest.waitForNodesToReachState([donorPrimary, donorSecondary], + migrationId, + migrationOpts.tenantId, + TenantMigrationTest.State.kCommitted); } }; diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js index d43e7a163de..e197d1598f6 100644 --- a/jstests/replsets/tenant_migration_donor_startup_recovery.js +++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js @@ -12,8 +12,8 @@ "use strict"; load("jstests/libs/fail_point_util.js"); -load("jstests/libs/parallelTester.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); const donorRst = new ReplSetTest({ nodes: 1, @@ -25,53 +25,17 @@ const donorRst = new ReplSetTest({ } } }); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: 'recipient', - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); donorRst.startSet(); donorRst.initiate(); -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); const kMaxSleepTimeMS = 1000; const kTenantId = 'testTenantId'; -const kConfigDonorsNS = "config.tenantMigrationDonors"; - -let donorPrimary = donorRst.getPrimary(); -let kRecipientConnString = recipientRst.getURL(); -let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); -function startMigration(host, recipientConnString, tenantId) { - const primary = new Mongo(host); - try { - primary.adminCommand({ - donorStartMigration: 1, - migrationId: UUID(), - recipientConnectionString: recipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"} - }); - } catch (e) { - if (isNetworkError(e)) { - jsTestLog('Ignoring network error due to node being restarted: ' + tojson(e)); - return; - } - throw e; - } -} - -let migrationThread = - new Thread(startMigration, donorPrimary.host, kRecipientConnString, kTenantId); +let donorPrimary = tenantMigrationTest.getDonorPrimary(); +let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); // Force the migration to pause after entering a randomly selected state to simulate a failure. Random.setRandomSeed(); @@ -85,7 +49,21 @@ if (index < kMigrationFpNames.length) { configureFailPoint(donorPrimary, kMigrationFpNames[index]); } -migrationThread.start(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantId: kTenantId, +}; + +try { + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); +} catch (e) { + if (isNetworkError(e)) { + jsTestLog('Ignoring network error due to node being restarted: ' + tojson(e)); + return; + } + throw e; +} + sleep(Math.random() * kMaxSleepTimeMS); donorRst.stopSet(null /* signal */, true /*forRestart */); donorRst.startSet({ @@ -97,52 +75,52 @@ donorRst.startSet({ }); donorPrimary = donorRst.getPrimary(); -configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); +configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); let donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); if (donorDoc) { let state = donorDoc.state; switch (state) { - case "data sync": + case TenantMigrationTest.State.kDataSync: assert.soon( - () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) - .state == TenantMigrationUtil.accessState.kAllow); + () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) + .state == TenantMigrationTest.AccessState.kAllow); break; - case "blocking": + case TenantMigrationTest.State.kBlocking: assert.soon( - () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) - .state == TenantMigrationUtil.accessState.kBlockWritesAndReads); + () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) + .state == TenantMigrationTest.AccessState.kBlockWritesAndReads); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(donorPrimary, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); break; - case "committed": + case TenantMigrationTest.State.kCommitted: assert.soon( - () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) - .state == TenantMigrationUtil.accessState.kReject); + () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) + .state == TenantMigrationTest.AccessState.kReject); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(donorPrimary, kTenantId) .commitOrAbortOpTime, donorDoc.commitOrAbortOpTime) == 0); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(donorPrimary, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); break; - case "aborted": + case TenantMigrationTest.State.kAborted: assert.soon( - () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) - .state == TenantMigrationUtil.accessState.kAborted); + () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId) + .state == TenantMigrationTest.AccessState.kAborted); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(donorPrimary, kTenantId) .commitOrAbortOpTime, donorDoc.commitOrAbortOpTime) == 0); assert.soon( - () => bsonWoCompare(TenantMigrationUtil + () => bsonWoCompare(tenantMigrationTest .getTenantMigrationAccessBlocker(donorPrimary, kTenantId) .blockTimestamp, donorDoc.blockTimestamp) == 0); @@ -152,7 +130,6 @@ if (donorDoc) { } } -migrationThread.join(); +tenantMigrationTest.stop(); donorRst.stopSet(); -recipientRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index 2f02cce130c..9c520ee8899 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -14,9 +14,8 @@ "use strict"; load("jstests/libs/fail_point_util.js"); -load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); let expectedNumRecipientSyncDataCmdSent = 0; let expectedNumRecipientForgetMigrationCmdSent = 0; @@ -45,8 +44,9 @@ function testDonorForgetMigrationAfterMigrationCompletes( null == node.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker); }); - assert.soon(() => - 0 === donorPrimary.getCollection(kConfigDonorsNS).count({tenantId: tenantId})); + assert.soon(() => 0 === donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS).count({ + tenantId: tenantId + })); assert.soon(() => 0 === donorPrimary.adminCommand({serverStatus: 1}) .repl.primaryOnlyServices.TenantMigrationDonorService); @@ -72,58 +72,41 @@ const donorRst = new ReplSetTest({ } } }); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipient", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); donorRst.startSet(); donorRst.initiate(); -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); +const recipientRst = tenantMigrationTest.getRecipientRst(); -const donorPrimary = donorRst.getPrimary(); -const recipientPrimary = recipientRst.getPrimary(); -const kRecipientConnString = recipientRst.getURL(); +const donorPrimary = tenantMigrationTest.getDonorPrimary(); +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); const kTenantId = "testDb"; -const kConfigDonorsNS = "config.tenantMigrationDonors"; -let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); +let configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); (() => { jsTest.log("Test the case where the migration commits"); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, }; - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); - migrationThread.start(); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); // Wait for the migration to enter the blocking state. blockingFp.wait(); let mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtabs[kTenantId].state, TenantMigrationUtil.accessState.kBlockWritesAndReads); + assert.eq(mtabs[kTenantId].state, TenantMigrationTest.AccessState.kBlockWritesAndReads); assert(mtabs[kTenantId].blockTimestamp); let donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); let blockOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne( - {ns: kConfigDonorsNS, op: "u", "o.tenantId": kTenantId}); + {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", "o.tenantId": kTenantId}); assert.eq(donorDoc.state, "blocking"); assert.eq(donorDoc.blockTimestamp, blockOplogEntry.ts); @@ -134,19 +117,19 @@ let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); // Allow the migration to complete. blockingFp.off(); - migrationThread.join(); - const res = assert.commandWorked(migrationThread.returnData()); - assert.eq(res.state, "committed"); + const stateRes = + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); - let commitOplogEntry = - donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); - assert.eq(donorDoc.state, "committed"); + let commitOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne( + {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", o: donorDoc}); + assert.eq(donorDoc.state, TenantMigrationTest.State.kCommitted); assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts); assert.soon(() => { mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return mtabs[kTenantId].state === TenantMigrationUtil.accessState.kReject; + return mtabs[kTenantId].state === TenantMigrationTest.AccessState.kReject; }); assert(mtabs[kTenantId].commitOrAbortOpTime); @@ -164,28 +147,25 @@ let configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, }; let abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationAfterBlockingStarts"); - const res = - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - assert.eq(res.state, "aborted"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); abortFp.off(); const donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); - const abortOplogEntry = - donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); - assert.eq(donorDoc.state, "aborted"); + const abortOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne( + {ns: TenantMigrationTest.kConfigDonorsNS, op: "u", o: donorDoc}); + assert.eq(donorDoc.state, TenantMigrationTest.State.kAborted); assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts); assert.eq(donorDoc.abortReason.code, ErrorCodes.InternalError); let mtabs; assert.soon(() => { mtabs = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - return mtabs[kTenantId].state === TenantMigrationUtil.accessState.kAborted; + return mtabs[kTenantId].state === TenantMigrationTest.AccessState.kAborted; }); assert(mtabs[kTenantId].commitOrAbortOpTime); @@ -207,9 +187,7 @@ configDonorsColl.dropIndex({expireAt: 1}); const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId: kTenantId, - readPreference: {mode: "primary"}, }; // Verify that donorForgetMigration fails since the migration hasn't started. @@ -217,9 +195,8 @@ configDonorsColl.dropIndex({expireAt: 1}); donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}), ErrorCodes.NoSuchTenantMigration); - const res = - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); - assert.eq(res.state, "committed"); + const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); assert.commandWorked( donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId})); @@ -228,6 +205,6 @@ configDonorsColl.dropIndex({expireAt: 1}); donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId})); })(); +tenantMigrationTest.stop(); donorRst.stopSet(); -recipientRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js index d081a618827..190c0d5b6d2 100644 --- a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js +++ b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js @@ -12,6 +12,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); // Set the delay before a donor state doc is garbage collected to be short to speed up the test. @@ -33,18 +34,6 @@ const donorRst = new ReplSetTest({ } } }); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: 'recipient', - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - }, - } -}); -const kRecipientConnString = recipientRst.getURL(); function insertDocument(primaryHost, dbName, collName) { const primary = new Mongo(primaryHost); @@ -59,29 +48,30 @@ function insertDocument(primaryHost, dbName, collName) { donorRst.startSet(); donorRst.initiate(); - recipientRst.startSet(); - recipientRst.initiate(); - let dbName = "migrationOutcome-committed_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); - - let writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision"); - let writeThread = new Thread(insertDocument, primary.host, dbName, kCollName); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); const migrationId = UUID(); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "migrationOutcome-committed"; const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision"); + const writeThread = new Thread(insertDocument, primary.host, dbName, kCollName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); + + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); blockFp.wait(); @@ -93,21 +83,20 @@ function insertDocument(primaryHost, dbName, collName) { migrationThread.join(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "committed"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); - assert.commandWorked( - TenantMigrationUtil.forgetMigration(primary.host, migrationOpts.migrationIdString)); - TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId); writeFp.off(); writeThread.join(); - let writeRes = writeThread.returnData(); + const writeRes = writeThread.returnData(); assert.commandFailedWithCode(writeRes, ErrorCodes.TenantMigrationCommitted); + tenantMigrationTest.stop(); donorRst.stopSet(); - recipientRst.stopSet(); })(); (() => { @@ -116,30 +105,31 @@ function insertDocument(primaryHost, dbName, collName) { donorRst.startSet(); donorRst.initiate(); - recipientRst.startSet(); - recipientRst.initiate(); - let dbName = "migrationOutcome-aborted_" + kTenantDefinedDbName; - const primary = donorRst.getPrimary(); - const primaryDB = primary.getDB(dbName); - - let writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision"); - let writeThread = new Thread(insertDocument, primary.host, dbName, kCollName); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); const migrationId = UUID(); - assert.commandWorked(primaryDB.runCommand({create: kCollName})); - const tenantId = dbName.split('_')[0]; + const tenantId = "migrationOutcome-aborted"; const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: kRecipientConnString, - tenantId: tenantId, - readPreference: {mode: "primary"}, + recipientConnString: tenantMigrationTest.getRecipientConnString(), + tenantId, }; + const donorRstArgs = TenantMigrationUtil.createRstArgs(donorRst); + + const dbName = tenantMigrationTest.tenantDB(tenantId, kTenantDefinedDbName); + const primary = donorRst.getPrimary(); + const primaryDB = primary.getDB(dbName); + + const writeFp = configureFailPoint(primaryDB, "hangWriteBeforeWaitingForMigrationDecision"); + const writeThread = new Thread(insertDocument, primary.host, dbName, kCollName); + + assert.commandWorked(primaryDB.runCommand({create: kCollName})); - let abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); - let blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); - let migrationThread = - new Thread(TenantMigrationUtil.startMigration, primary.host, migrationOpts); + const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationAfterBlockingStarts"); + const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationAfterBlockingStarts"); + const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); blockFp.wait(); @@ -151,21 +141,20 @@ function insertDocument(primaryHost, dbName, collName) { migrationThread.join(); - let migrationRes = assert.commandWorked(migrationThread.returnData()); - assert.eq(migrationRes.state, "aborted"); + const migrationRes = assert.commandWorked(migrationThread.returnData()); + assert.eq(migrationRes.state, TenantMigrationTest.State.kAborted); abortFp.off(); - assert.commandWorked( - TenantMigrationUtil.forgetMigration(primary.host, migrationOpts.migrationIdString)); - TenantMigrationUtil.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + tenantMigrationTest.waitForMigrationGarbageCollection(donorRst.nodes, migrationId, tenantId); writeFp.off(); writeThread.join(); - let writeRes = writeThread.returnData(); + const writeRes = writeThread.returnData(); assert.commandFailedWithCode(writeRes, ErrorCodes.TenantMigrationAborted); + tenantMigrationTest.stop(); donorRst.stopSet(); - recipientRst.stopSet(); })(); })(); diff --git a/jstests/replsets/tenant_migration_large_txn.js b/jstests/replsets/tenant_migration_large_txn.js index d9e0c04cdc7..a97fc8b9196 100644 --- a/jstests/replsets/tenant_migration_large_txn.js +++ b/jstests/replsets/tenant_migration_large_txn.js @@ -14,41 +14,16 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -const donorRst = new ReplSetTest({ - nodes: 1, - name: "donor", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - } - } -}); -const recipientRst = new ReplSetTest({ - nodes: 1, - name: "recipient", - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); - -donorRst.startSet(); -donorRst.initiate(); - -recipientRst.startSet(); -recipientRst.initiate(); +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const kTenantId = "testTenantId"; -const kDbName = kTenantId + "_" + - "testDb"; +const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB"); const kCollName = "testColl"; -const donorPrimary = donorRst.getPrimary(); +const donorPrimary = tenantMigrationTest.getDonorPrimary(); /** * Runs a large transaction (>16MB) on the given collection name that requires two applyOps oplog @@ -77,32 +52,32 @@ function runTransaction(primaryHost, dbName, collName) { const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), + recipientConnString: tenantMigrationTest.getRecipientConnString(), tenantId: kTenantId, - readPreference: {mode: "primary"}, }; +const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst()); // Start a migration, and pause it after the donor has majority-committed the initial state doc. -let dataSyncFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync"); -let migrationThread = - new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); +const dataSyncFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterDataSync"); +const migrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); dataSyncFp.wait(); // Run a large transaction (>16MB) that will write two applyOps oplog entries. Pause // commitTransaction after it has reserved oplog slots for the applyOps oplog entries and has // written the first one. -let logApplyOpsForTxnFp = +const logApplyOpsForTxnFp = configureFailPoint(donorPrimary, "hangAfterLoggingApplyOpsForTransaction", {}, {skip: 1}); -let txnThread = new Thread(runTransaction, donorPrimary.host, kDbName, kCollName); +const txnThread = new Thread(runTransaction, donorPrimary.host, kDbName, kCollName); txnThread.start(); logApplyOpsForTxnFp.wait(); // Allow the migration to move to the blocking state and commit. dataSyncFp.off(); assert.soon( - () => TenantMigrationUtil.getTenantMigrationAccessBlocker(donorPrimary, kTenantId).state === - TenantMigrationUtil.accessState.kBlockWritesAndReads); + () => tenantMigrationTest.getTenantMigrationAccessBlocker(donorPrimary, kTenantId).state === + TenantMigrationTest.AccessState.kBlockWritesAndReads); logApplyOpsForTxnFp.off(); assert.commandWorked(migrationThread.returnData()); @@ -110,6 +85,5 @@ assert.commandWorked(migrationThread.returnData()); // blockingTimestamp . txnThread.join(); -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_no_failover.js b/jstests/replsets/tenant_migration_no_failover.js index 404950e79b4..374f9140ab7 100644 --- a/jstests/replsets/tenant_migration_no_failover.js +++ b/jstests/replsets/tenant_migration_no_failover.js @@ -10,105 +10,36 @@ load("jstests/aggregation/extras/utils.js"); load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); -load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); -const numDocs = 20; -const data = []; -for (let i = 0; i < numDocs; ++i) { - data.push({_id: i, x: i}); -} -const insertDonorDB = (primary, dbName, collName) => { - const db = primary.getDB(dbName); - const coll = db.getCollection(collName); - - assert.commandWorked(coll.insertMany(data)); -}; - -const verifyReceipientDB = (primary, dbName, collName, shouldMigrate) => { - const db = primary.getDB(dbName); - const coll = db.getCollection(collName); - - const findRes = coll.find(); - const numDocsFound = findRes.count(); - - if (!shouldMigrate) { - assert.eq(0, - numDocsFound, - `Find command on recipient collection ${collName} of DB ${ - dbName} should return 0 docs, instead has count of ${numDocsFound}`); - return; - } - - assert.eq(numDocs, - numDocsFound, - `Find command on recipient collection ${collName} of DB ${dbName} should return ${ - numDocs} docs, instead has count of ${numDocsFound}`); - - const docsReturned = findRes.sort({_id: 1}).toArray(); - assert(arrayEq(docsReturned, data), - () => (`${tojson(docsReturned)} is not equal to ${tojson(data)}`)); -}; - -const name = jsTestName(); -const donorRst = new ReplSetTest( - {name: `${name}_donor`, nodes: 1, nodeOptions: {setParameter: {enableTenantMigrations: true}}}); -const recipientRst = new ReplSetTest({ - name: `${name}_recipient`, - nodes: 1, - nodeOptions: { - setParameter: { - enableTenantMigrations: true, - // TODO SERVER-51734: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}) - } - } -}); - -donorRst.startSet(); -donorRst.initiate(); - -recipientRst.startSet(); -recipientRst.initiate(); - -const tenantId = 'testTenantId'; +const tenantMigrationTest = new TenantMigrationTest(jsTestName()); +const tenantId = "testTenantId"; const dbNames = ["db0", "db1", "db2"]; -const tenantDBs = dbNames.map(dbName => TenantMigrationUtil.tenantDB(tenantId, dbName)); -const nonTenantDBs = dbNames.map(dbName => TenantMigrationUtil.nonTenantDB(tenantId, dbName)); +const tenantDBs = dbNames.map(dbName => tenantMigrationTest.tenantDB(tenantId, dbName)); +const nonTenantDBs = dbNames.map(dbName => tenantMigrationTest.nonTenantDB(tenantId, dbName)); const collNames = ["coll0", "coll1"]; -const donorPrimary = donorRst.getPrimary(); -const recipientPrimary = recipientRst.getPrimary(); - for (const db of [...tenantDBs, ...nonTenantDBs]) { for (const coll of collNames) { - insertDonorDB(donorPrimary, db, coll); + tenantMigrationTest.insertDonorDB(db, coll); } } const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), - recipientConnString: recipientRst.getURL(), tenantId, - readPreference: {mode: "primary"} }; -const res = - assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); -assert.eq(res.state, "committed"); - -for (const coll of collNames) { - for (const db of tenantDBs) { - // TODO (SERVER-51734): Change shouldMigrate to true. - verifyReceipientDB(recipientPrimary, db, coll, false /* shouldMigrate */); - } +const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); +assert.eq(stateRes.state, TenantMigrationTest.State.kCommitted); - for (const db of nonTenantDBs) { - verifyReceipientDB(recipientPrimary, db, coll, false /* shouldMigrate */); +for (const db of [...tenantDBs, ...nonTenantDBs]) { + for (const coll of collNames) { + tenantMigrationTest.verifyReceipientDB(tenantId, db, coll); } } -donorRst.stopSet(); -recipientRst.stopSet(); +tenantMigrationTest.stop(); })(); |