diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-09-10 22:06:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-17 18:42:37 +0000 |
commit | 96f566cef77449cf4dd304840fe770393106fc65 (patch) | |
tree | bb16ad0219c6ca38a214566f539d7be53c8d1047 | |
parent | a7714cfefeb609b30bbf2d9340418ce809d32146 (diff) | |
download | mongo-96f566cef77449cf4dd304840fe770393106fc65.tar.gz |
SERVER-50616 TenantMigrationDonor should retry its steps until success, the donor node is stepping down or shutting down, or the donor gets an error that should lead to an abort decision
-rw-r--r-- | jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js | 38 | ||||
-rw-r--r-- | jstests/replsets/libs/tenant_migration_util.js | 36 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_donor_failover_and_shutdown.js | 141 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_donor_retry.js | 278 | ||||
-rw-r--r-- | jstests/replsets/tenant_migration_donor_state_machine.js | 58 | ||||
-rw-r--r-- | jstests/replsets/writes_during_tenant_migration.js | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/tenant_migration_donor_cmds.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 432 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 28 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/util/future_util.cpp | 59 | ||||
-rw-r--r-- | src/mongo/util/future_util.h | 25 |
14 files changed, 847 insertions, 279 deletions
diff --git a/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js b/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js index ee567a13290..9d79de3a0a3 100644 --- a/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js +++ b/jstests/replsets/conflicting_donor_start_tenant_migration_cmds.js @@ -9,20 +9,7 @@ load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); - -/** - * Starts a tenant migration on the given donor primary according the given migration options. - */ -function startMigration(donorPrimaryHost, migrationOpts) { - const donorPrimary = new Mongo(donorPrimaryHost); - return donorPrimary.adminCommand({ - donorStartMigration: 1, - migrationId: UUID(migrationOpts.migrationIdString), - recipientConnectionString: migrationOpts.recipientConnString, - databasePrefix: migrationOpts.dbPrefix, - readPreference: migrationOpts.readPreference - }); -} +load("jstests/replsets/libs/tenant_migration_util.js"); /** * Asserts that the number of recipientDataSync commands executed on the given recipient primary is @@ -79,8 +66,8 @@ let numRecipientSyncDataCmdSent = 0; readPreference: {mode: "primary"} }; - assert.commandWorked(startMigration(rst0Primary.host, migrationOpts)); - assert.commandWorked(startMigration(rst0Primary.host, migrationOpts)); + assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts)); + assert.commandWorked(TenantMigrationUtil.startMigration(rst0Primary.host, migrationOpts)); // If the second donorStartMigration had started a duplicate migration, the recipient would have // received four recipientSyncData commands instead of two. @@ -97,8 +84,10 @@ let numRecipientSyncDataCmdSent = 0; readPreference: {mode: "primary"} }; - let migrationThread0 = new Thread(startMigration, rst0Primary.host, migrationOpts); - let migrationThread1 = new Thread(startMigration, rst0Primary.host, migrationOpts); + let migrationThread0 = + new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts); + let migrationThread1 = + new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts); migrationThread0.start(); migrationThread1.start(); @@ -121,9 +110,10 @@ let numRecipientSyncDataCmdSent = 0; */ function testStartingConflictingMigrationAfterInitialMigrationCommitted( donorPrimary, migrationOpts0, migrationOpts1) { - assert.commandWorked(startMigration(donorPrimary.host, migrationOpts0)); - assert.commandFailedWithCode(startMigration(donorPrimary.host, migrationOpts1), - ErrorCodes.ConflictingOperationInProgress); + assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts0)); + assert.commandFailedWithCode( + TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts1), + ErrorCodes.ConflictingOperationInProgress); // If the second donorStartMigration had started a duplicate migration, there would be two donor // state docs. @@ -136,8 +126,10 @@ function testStartingConflictingMigrationAfterInitialMigrationCommitted( * migrations, only one of the migrations will start and succeed. */ function testConcurrentConflictingMigrations(donorPrimary, migrationOpts0, migrationOpts1) { - let migrationThread0 = new Thread(startMigration, rst0Primary.host, migrationOpts0); - let migrationThread1 = new Thread(startMigration, rst0Primary.host, migrationOpts1); + let migrationThread0 = + new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts0); + let migrationThread1 = + new Thread(TenantMigrationUtil.startMigration, rst0Primary.host, migrationOpts1); migrationThread0.start(); migrationThread1.start(); diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js new file mode 100644 index 00000000000..41750650d06 --- /dev/null +++ b/jstests/replsets/libs/tenant_migration_util.js @@ -0,0 +1,36 @@ +/** + * Utilities for testing tenant migrations. + */ +var TenantMigrationUtil = (function() { + // An object that mirrors the access states for the TenantMigrationAccessBlocker. + const accessState = {kAllow: 0, kBlockingWrites: 1, kBlockingReadsAndWrites: 2, kReject: 3}; + + function startMigration(donorPrimaryHost, migrationOpts) { + const donorPrimary = new Mongo(donorPrimaryHost); + return donorPrimary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(migrationOpts.migrationIdString), + recipientConnectionString: migrationOpts.recipientConnString, + databasePrefix: migrationOpts.dbPrefix, + readPreference: migrationOpts.readPreference + }); + } + + function forgetMigration(donorPrimaryHost, migrationIdString) { + const migrationId = UUID(migrationIdString); + const donorPrimary = new Mongo(donorPrimaryHost); + while (true) { + let res = + donorPrimary.adminCommand({donorForgetMigration: 1, migrationId: migrationId}); + if (res.ok || res.code != ErrorCodes.NoSuchTenantMigration) { + return res; + } + } + } + + return { + accessState, + startMigration, + forgetMigration, + }; +})();
\ No newline at end of file diff --git a/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js b/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js new file mode 100644 index 00000000000..a29ee85c96d --- /dev/null +++ b/jstests/replsets/tenant_migration_donor_failover_and_shutdown.js @@ -0,0 +1,141 @@ +/** + * Tests that the migration is interrupted successfully on stepdown and shutdown. + * + * @tags: [requires_fcv_47, incompatible_with_eft] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +const kMaxSleepTimeMS = 1000; + +/** + * Runs the donorStartMigration command to start a migration, and interrupts the migration on the + * donor using the 'interruptFunc', and asserts that the command either succeeds or fails with an + * expected error. + */ +function testDonorStartMigrationInterrupt(interruptFunc, isExpectedErrorFunc) { + const donorRst = new ReplSetTest( + {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipientRst", + nodeOptions: {setParameter: {enableTenantMigrations: true}} + }); + + donorRst.startSet(); + donorRst.initiate(); + + recipientRst.startSet(); + recipientRst.initiate(); + + const donorPrimary = donorRst.getPrimary(); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: "testDbPrefix", + readPreference: {mode: "primary"}, + }; + + let migrationThread = + new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + migrationThread.start(); + sleep(Math.random() * kMaxSleepTimeMS); + interruptFunc(donorRst); + migrationThread.join(); + + const res = migrationThread.returnData(); + assert(res.ok || isExpectedErrorFunc(res.code), tojson(res)); + + donorRst.stopSet(); + recipientRst.stopSet(); +} + +/** + * Starts a migration and waits for it to commit, then runs the donorForgetMigration, and interrupts + * the donor using the 'interruptFunc', and asserts that the command either succeeds or fails with + * an expected error. + */ +function testDonorForgetMigrationInterrupt(interruptFunc, isExpectedErrorFunc) { + const donorRst = new ReplSetTest( + {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipientRst", + nodeOptions: {setParameter: {enableTenantMigrations: true}} + }); + + donorRst.startSet(); + donorRst.initiate(); + + recipientRst.startSet(); + recipientRst.initiate(); + + const donorPrimary = donorRst.getPrimary(); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: "testDbPrefix", + readPreference: {mode: "primary"}, + }; + + assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); + let forgetMigrationThread = new Thread( + TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString); + forgetMigrationThread.start(); + sleep(Math.random() * kMaxSleepTimeMS); + interruptFunc(donorRst); + forgetMigrationThread.join(); + + const res = forgetMigrationThread.returnData(); + assert(res.ok || isExpectedErrorFunc(res.code), tojson(res)); + + donorRst.stopSet(); + recipientRst.stopSet(); +} + +(() => { + jsTest.log("Test that the donorStartMigration command is interrupted successfully on stepdown"); + testDonorStartMigrationInterrupt((donorRst) => { + assert.commandWorked( + donorRst.getPrimary().adminCommand({replSetStepDown: 1000, force: true})); + }, (errorCode) => ErrorCodes.isNotPrimaryError(errorCode)); +})(); + +(() => { + jsTest.log("Test that the donorStartMigration command is interrupted successfully on shutdown"); + testDonorStartMigrationInterrupt( + (donorRst) => { + donorRst.stopSet(); + }, + (errorCode) => + ErrorCodes.isNotPrimaryError(errorCode) || ErrorCodes.isShutdownError(errorCode)); +})(); + +(() => { + jsTest.log("Test that the donorForgetMigration is interrupted successfully on stepdown"); + testDonorForgetMigrationInterrupt((donorRst) => { + assert.commandWorked( + donorRst.getPrimary().adminCommand({replSetStepDown: 1000, force: true})); + }, (errorCode) => ErrorCodes.isNotPrimaryError(errorCode)); +})(); + +(() => { + jsTest.log("Test that the donorForgetMigration is interrupted successfully on shutdown"); + testDonorForgetMigrationInterrupt( + (donorRst) => { + donorRst.stopSet(); + }, + (errorCode) => + ErrorCodes.isNotPrimaryError(errorCode) || ErrorCodes.isShutdownError(errorCode)); +})(); +})(); diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js new file mode 100644 index 00000000000..effedf7495f --- /dev/null +++ b/jstests/replsets/tenant_migration_donor_retry.js @@ -0,0 +1,278 @@ +/** + * Tests that the donor retries its steps until success, or it gets an error that should lead to + * an abort decision. + * + * @tags: [requires_fcv_47, incompatible_with_eft] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +const kDBPrefix = "testDb"; +let testNum = 0; +const kConfigDonorsNS = "config.tenantMigrationDonors"; + +function makeDbPrefix() { + return kDBPrefix + testNum++; +} + +/** + * Starts a migration from 'donorRst' and 'recipientRst', uses failCommand to force the + * recipientSyncData command to fail with the given 'errorCode', and asserts the donor does not + * retry on that error and aborts the migration. + * + * TODO: This function should be changed to testDonorRetryRecipientSyncDataCmdOnError once there is + * a way to differentiate between local and remote stepdown/shutdown error. + */ +function testMigrationAbortsOnRecipientSyncDataCmdError( + donorRst, recipientRst, errorCode, failMode) { + const donorPrimary = donorRst.getPrimary(); + const recipientPrimary = recipientRst.getPrimary(); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: kDBPrefix + makeDbPrefix(), + readPreference: {mode: "primary"}, + }; + + let fp = configureFailPoint(recipientPrimary, + "failCommand", + { + failInternalCommands: true, + errorCode: errorCode, + failCommands: ["recipientSyncData"], + }, + failMode); + + let migrationThread = + new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + migrationThread.start(); + + // Verify that the command failed. + const times = failMode.times ? failMode.times : 1; + for (let i = 0; i < times; i++) { + fp.wait(); + } + fp.off(); + + migrationThread.join(); + assert.commandFailedWithCode(migrationThread.returnData(), ErrorCodes.TenantMigrationAborted); + + return migrationId; +} + +/** + * Starts a migration from 'donorRst' and 'recipientRst', uses failCommand to force the + * recipientForgetMigration command to fail with the given 'errorCode', and asserts the donor does + * not retry on that error and aborts the migration. + * + * TODO: This function should be changed to testDonorRetryRecipientForgetMigrationCmdOnError once + * there is a way to differentiate between local and remote stepdown/shutdown error. + */ +function testMigrationAbortsOnRecipientForgetMigrationCmdError(donorRst, recipientRst, errorCode) { + const donorPrimary = donorRst.getPrimary(); + const recipientPrimary = recipientRst.getPrimary(); + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: makeDbPrefix(), + readPreference: {mode: "primary"}, + }; + + let fp = configureFailPoint(recipientPrimary, + "failCommand", + { + failInternalCommands: true, + errorCode: errorCode, + failCommands: ["recipientForgetMigration"], + }, + {times: 1}); + + assert.commandWorked(TenantMigrationUtil.startMigration(donorPrimary.host, migrationOpts)); + let forgetMigrationThread = new Thread( + TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString); + forgetMigrationThread.start(); + + // Verify that the initial recipientForgetMigration command failed. + fp.wait(); + + forgetMigrationThread.join(); + assert.commandFailedWithCode(forgetMigrationThread.returnData(), errorCode); + fp.off(); + + const donorStateDoc = configDonorsColl.findOne({_id: migrationId}); + assert.eq("committed", donorStateDoc.state); + assert(donorStateDoc.expireAt); +} + +const donorRst = new ReplSetTest( + {nodes: 1, name: "donorRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); +const recipientRst = new ReplSetTest( + {nodes: 1, name: "recipientRst", nodeOptions: {setParameter: {enableTenantMigrations: true}}}); + +donorRst.startSet(); +donorRst.initiate(); + +recipientRst.startSet(); +recipientRst.initiate(); + +const donorPrimary = donorRst.getPrimary(); + +(() => { + jsTest.log( + "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" + + " on recipient stepdown errors"); + + const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( + donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {times: 1}); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp); + assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); +})(); + +(() => { + jsTest.log( + "Test that the donor does not retry recipientSyncData (to make the recipient start cloning)" + + " on recipient shutdown errors"); + + const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( + donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {times: 1}); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + assert(!configDonorsColl.findOne({_id: migrationId}).blockTimestamp); + assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); +})(); + +(() => { + jsTest.log( + "Test that the donor does not retry recipientSyncData (with returnAfterReachingTimestamp) " + + "on stepdown errors"); + + const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( + donorRst, recipientRst, ErrorCodes.NotWritablePrimary, {skip: 1}); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp); + assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); +})(); + +(() => { + jsTest.log( + "Test that the donor does not retry recipientSyncData (with returnAfterReachingTimestamp) " + + "on recipient shutdown errors"); + + const migrationId = testMigrationAbortsOnRecipientSyncDataCmdError( + donorRst, recipientRst, ErrorCodes.ShutdownInProgress, {skip: 1}); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + assert(configDonorsColl.findOne({_id: migrationId}).blockTimestamp); + assert.eq("aborted", configDonorsColl.findOne({_id: migrationId}).state); +})(); + +(() => { + jsTest.log("Test that the donor does not retry recipientForgetMigration on stepdown errors"); + testMigrationAbortsOnRecipientForgetMigrationCmdError( + donorRst, recipientRst, ErrorCodes.NotWritablePrimary); +})(); + +(() => { + jsTest.log("Test that the donor does not retry recipientForgetMigration on shutdown errors"); + + testMigrationAbortsOnRecipientForgetMigrationCmdError( + donorRst, recipientRst, ErrorCodes.ShutdownInProgress); +})(); + +// Each donor state doc is updated three times throughout the lifetime of a tenant migration: +// - Set the "state" to "blocking" +// - Set the "state" to "commit"/"abort" +// - Set the "expireAt" to make the doc garbage collectable by the TTL index. +const kTotalStateDocUpdates = 3; +const kWriteErrorTimeMS = 50; + +(() => { + jsTest.log("Test that the donor retries state doc insert on retriable errors"); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: makeDbPrefix(), + readPreference: {mode: "primary"}, + }; + + let fp = configureFailPoint(donorPrimary, "failCollectionInserts", { + collectionNS: kConfigDonorsNS, + }); + + let migrationThread = + new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + migrationThread.start(); + + // Make the insert keep failing for some time. + fp.wait(); + sleep(kWriteErrorTimeMS); + fp.off(); + + migrationThread.join(); + assert.commandWorked(migrationThread.returnData()); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + assert.eq("committed", configDonorsColl.findOne({_id: migrationId}).state); +})(); + +(() => { + jsTest.log("Test that the donor retries state doc update on retriable errors"); + + const migrationId = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: kDBPrefix + "RetryOnStateDocUpdateError", + readPreference: {mode: "primary"}, + }; + + // Use a random number of skips to fail a random update to config.tenantMigrationDonors. + let fp = configureFailPoint(donorPrimary, + "failCollectionUpdates", + { + collectionNS: kConfigDonorsNS, + }, + {skip: Math.floor(Math.random() * kTotalStateDocUpdates)}); + + let startMigrationThread = + new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); + let forgetMigrationThread = new Thread( + TenantMigrationUtil.forgetMigration, donorPrimary.host, migrationOpts.migrationIdString); + startMigrationThread.start(); + forgetMigrationThread.start(); + + // Make the update keep failing for some time. + fp.wait(); + sleep(kWriteErrorTimeMS); + fp.off(); + + startMigrationThread.join(); + forgetMigrationThread.join(); + assert.commandWorked(startMigrationThread.returnData()); + assert.commandWorked(forgetMigrationThread.returnData()); + + const configDonorsColl = donorPrimary.getCollection(kConfigDonorsNS); + const donorStateDoc = configDonorsColl.findOne({_id: migrationId}); + assert.eq("committed", donorStateDoc.state); + assert(donorStateDoc.expireAt); +})(); + +donorRst.stopSet(); +recipientRst.stopSet(); +})(); diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index 48787ac019a..0404c528f59 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -16,6 +16,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); let expectedNumRecipientSyncDataCmdSent = 0; let expectedNumRecipientForgetMigrationCmdSent = 0; @@ -51,14 +52,6 @@ function testDonorForgetMigration(donorRst, recipientRst, migrationId, dbPrefix) assert.eq(Object.keys(donorRecipientMonitorPoolStats).length, 0); } -// An object that mirrors the access states for the TenantMigrationAccessBlocker. -const accessState = { - kAllow: 0, - kBlockingWrites: 1, - kBlockingReadsAndWrites: 2, - kReject: 3 -}; - const donorRst = new ReplSetTest({ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: "donor", @@ -97,23 +90,15 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); (() => { jsTest.log("Test the case where the migration commits"); const migrationId = UUID(); - - function startMigration(host, recipientConnString, dbPrefix, migrationIdString) { - const primary = new Mongo(host); - assert.commandWorked(primary.adminCommand({ - donorStartMigration: 1, - migrationId: UUID(migrationIdString), - recipientConnectionString: recipientConnString, - databasePrefix: dbPrefix, - readPreference: {mode: "primary"} - })); - } - - let migrationThread = new Thread(startMigration, - donorPrimary.host, - kRecipientConnString, - kDBPrefix, - extractUUIDFromObject(migrationId)); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + recipientConnString: recipientRst.getURL(), + dbPrefix: kDBPrefix, + readPreference: {mode: "primary"}, + }; + + let migrationThread = + new Thread(TenantMigrationUtil.startMigration, donorPrimary.host, migrationOpts); let blockingFp = configureFailPoint(donorPrimary, "pauseTenantMigrationAfterBlockingStarts"); migrationThread.start(); @@ -121,7 +106,7 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); blockingFp.wait(); let mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[kDBPrefix].access, accessState.kBlockingReadsAndWrites); + assert.eq(mtab[kDBPrefix].access, TenantMigrationUtil.accessState.kBlockingReadsAndWrites); assert(mtab[kDBPrefix].blockTimestamp); let donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); @@ -134,16 +119,18 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); blockingFp.off(); migrationThread.join(); - mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[kDBPrefix].access, accessState.kReject); - assert(mtab[kDBPrefix].commitOrAbortOpTime); - donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); let commitOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); assert.eq(donorDoc.state, "committed"); assert.eq(donorDoc.commitOrAbortOpTime.ts, commitOplogEntry.ts); + assert.soon(() => { + mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; + return mtab[kDBPrefix].access === TenantMigrationUtil.accessState.kReject; + }); + assert(mtab[kDBPrefix].commitOrAbortOpTime); + expectedNumRecipientSyncDataCmdSent += 2; const recipientSyncDataMetrics = recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData; @@ -168,16 +155,19 @@ configDonorsColl.createIndex({expireAt: 1}, {expireAfterSeconds: 0}); ErrorCodes.TenantMigrationAborted); abortFp.off(); - const mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; - assert.eq(mtab[kDBPrefix].access, accessState.kAllow); - assert(!mtab[kDBPrefix].commitOrAbortOpTime); - const donorDoc = configDonorsColl.findOne({databasePrefix: kDBPrefix}); const abortOplogEntry = donorPrimary.getDB("local").oplog.rs.findOne({ns: kConfigDonorsNS, op: "u", o: donorDoc}); assert.eq(donorDoc.state, "aborted"); assert.eq(donorDoc.commitOrAbortOpTime.ts, abortOplogEntry.ts); + let mtab; + assert.soon(() => { + mtab = donorPrimary.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; + return mtab[kDBPrefix].access === TenantMigrationUtil.accessState.kAllow; + }); + assert(!mtab[kDBPrefix].commitOrAbortOpTime); + expectedNumRecipientSyncDataCmdSent += 2; const recipientSyncDataMetrics = recipientPrimary.adminCommand({serverStatus: 1}).metrics.commands.recipientSyncData; diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js index 33f1c97b878..e56b12dbd05 100644 --- a/jstests/replsets/writes_during_tenant_migration.js +++ b/jstests/replsets/writes_during_tenant_migration.js @@ -14,6 +14,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); const donorRst = new ReplSetTest( {nodes: 1, name: 'donor', nodeOptions: {setParameter: {enableTenantMigrations: true}}}); @@ -260,6 +261,15 @@ function testWriteIsAcceptedIfSentAfterMigrationHasAborted(testCase, testOpts) { ErrorCodes.TenantMigrationAborted); abortFp.off(); + // Wait until the in-memory migration state is updated after the migration has majority + // committed the abort decision. Otherwise, the command below is expected to block and then get + // rejected. + assert.soon(() => { + const mtab = + testOpts.primaryDB.adminCommand({serverStatus: 1}).tenantMigrationAccessBlocker; + return mtab[dbPrefix].access === TenantMigrationUtil.accessState.kAllow; + }); + runCommand(testOpts); testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } @@ -327,7 +337,7 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te * Tests that the donor blocks writes that are executed in the blocking state and rejects them after * the migration aborts. */ -function testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) { +function testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) { let blockingFp = configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts"); let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts"); @@ -844,7 +854,7 @@ const testFuncs = { inAborted: testWriteIsAcceptedIfSentAfterMigrationHasAborted, inBlocking: testWriteBlocksIfMigrationIsInBlocking, inBlockingThenCommitted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits, - inBlockingThenAborted: testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts + inBlockingThenAborted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationAborts }; for (const [testName, testFunc] of Object.entries(testFuncs)) { diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp index 5478df86f57..0cf8d415ac9 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp @@ -88,7 +88,7 @@ public: TenantMigrationDonorService::Instance::getOrCreate(donorService, donorStateDoc); uassertStatusOK(donor->checkIfOptionsConflict(donorStateDoc)); - donor->getDecisionFuture().get(); + donor->getDecisionFuture().get(opCtx); } void doCheckAuthorization(OperationContext* opCtx) const {} @@ -182,7 +182,7 @@ public: donor); donor.get().get()->onReceiveDonorForgetMigration(); - donor.get().get()->getCompletionFuture().get(); + donor.get().get()->getCompletionFuture().get(opCtx); } private: diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 99b7db99e5f..c4d20c00f6a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1328,6 +1328,7 @@ env.Library( 'tenant_migration_donor_service.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/util/future_util', 'primary_only_service', 'repl_server_parameters', 'tenant_migration_donor', diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index a7bc9ad78b4..1fe5e546783 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/logv2/log.h" +#include "mongo/util/future_util.h" namespace mongo { @@ -62,6 +63,21 @@ std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker( .getTenantMigrationAccessBlockerForDbPrefix(dbPrefix); } +bool shouldStopInsertingDonorStateDoc(Status status) { + return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress || + ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status); +} + +bool shouldStopUpdatingDonorStateDoc(Status status) { + return status.isOK() || ErrorCodes::isShutdownError(status) || + ErrorCodes::isNotPrimaryError(status); +} + +bool shouldStopSendingRecipientCommand(Status status) { + return status.isOK() || ErrorCodes::isShutdownError(status) || + ErrorCodes::isNotPrimaryError(status); +} + } // namespace TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext, @@ -113,170 +129,212 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) { } } -repl::OpTime TenantMigrationDonorService::Instance::_insertStateDocument() { - const auto stateDocBson = _stateDoc.toBSON(); - - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - DBDirectClient dbClient(opCtx); - - const auto commandResponse = dbClient.runCommand([&] { - write_ops::Insert insertOp(_stateDocumentsNS); - insertOp.setDocuments({stateDocBson}); - return insertOp.serialize({}); - }()); - const auto commandReply = commandResponse->getCommandReply(); - uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); - - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); +ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor) { + return AsyncTry([this] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient dbClient(opCtx); + + auto commandResponse = dbClient.runCommand([&] { + write_ops::Update updateOp(_stateDocumentsNS); + auto updateModification = + write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON()); + write_ops::UpdateOpEntry updateEntry( + BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()), + updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(true); + updateOp.setUpdates({updateEntry}); + + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .until([](StatusWith<repl::OpTime> swOpTime) { + return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); + }) + .on(**executor); } -repl::OpTime TenantMigrationDonorService::Instance::_updateStateDocument( +ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const TenantMigrationDonorStateEnum nextState) { - boost::optional<repl::OpTime> updateOpTime; - - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - uassertStatusOK( - writeConflictRetry(opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status { - AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - - if (!collection) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << _stateDocumentsNS.ns() << " does not exist"); - } - - WriteUnitOfWork wuow(opCtx); - - const auto originalStateDocBson = _stateDoc.toBSON(); - const auto originalRecordId = Helpers::findOne( - opCtx, collection.getCollection(), originalStateDocBson, false /* requireIndex */); - const auto originalSnapshot = - Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); - invariant(!originalRecordId.isNull()); - - // Reserve an opTime for the write. - auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - - // Update the state. - _stateDoc.setState(nextState); - switch (nextState) { - case TenantMigrationDonorStateEnum::kBlocking: - _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); - break; - case TenantMigrationDonorStateEnum::kCommitted: - case TenantMigrationDonorStateEnum::kAborted: - _stateDoc.setCommitOrAbortOpTime(oplogSlot); - break; - default: - MONGO_UNREACHABLE; - } - const auto updatedStateDocBson = _stateDoc.toBSON(); - - CollectionUpdateArgs args; - args.criteria = BSON("_id" << _stateDoc.getId()); - args.oplogSlot = oplogSlot; - args.update = updatedStateDocBson; - - collection->updateDocument(opCtx, - originalRecordId, - originalSnapshot, - updatedStateDocBson, - false, - nullptr /* OpDebug* */, - &args); - - wuow.commit(); - - updateOpTime = oplogSlot; - return Status::OK(); - })); - - invariant(updateOpTime); - return updateOpTime.get(); + const auto originalStateDocBson = _stateDoc.toBSON(); + + return AsyncTry([this, executor, nextState, originalStateDocBson] { + boost::optional<repl::OpTime> updateOpTime; + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + uassertStatusOK(writeConflictRetry( + opCtx, "updateStateDoc", _stateDocumentsNS.ns(), [&]() -> Status { + AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() + << _stateDocumentsNS.ns() << " does not exist"); + } + + WriteUnitOfWork wuow(opCtx); + + const auto originalRecordId = Helpers::findOne(opCtx, + collection.getCollection(), + originalStateDocBson, + false /* requireIndex */); + const auto originalSnapshot = Snapshotted<BSONObj>( + opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); + invariant(!originalRecordId.isNull()); + + // Reserve an opTime for the write. + auto oplogSlot = + repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; + + // Update the state. + _stateDoc.setState(nextState); + switch (nextState) { + case TenantMigrationDonorStateEnum::kBlocking: + _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); + break; + case TenantMigrationDonorStateEnum::kCommitted: + case TenantMigrationDonorStateEnum::kAborted: + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + default: + MONGO_UNREACHABLE; + } + const auto updatedStateDocBson = _stateDoc.toBSON(); + + CollectionUpdateArgs args; + args.criteria = BSON("_id" << _stateDoc.getId()); + args.oplogSlot = oplogSlot; + args.update = updatedStateDocBson; + + collection->updateDocument(opCtx, + originalRecordId, + originalSnapshot, + updatedStateDocBson, + false, + nullptr /* OpDebug* */, + &args); + + wuow.commit(); + + updateOpTime = oplogSlot; + return Status::OK(); + })); + + invariant(updateOpTime); + return updateOpTime.get(); + }) + .until([](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus()); + }) + .on(**executor); } -repl::OpTime TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable() { - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - DBDirectClient dbClient(opCtx); - - _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + - Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); - - auto commandResponse = dbClient.runCommand([&] { - write_ops::Update updateOp(_stateDocumentsNS); - auto updateModification = - write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON()); - write_ops::UpdateOpEntry updateEntry( - BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()), - updateModification); - updateEntry.setMulti(false); - updateEntry.setUpsert(false); - updateOp.setUpdates({updateEntry}); - - return updateOp.serialize({}); - }()); - - const auto commandReply = commandResponse->getCommandReply(); - uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); - - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); +ExecutorFuture<repl::OpTime> +TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable( + std::shared_ptr<executor::ScopedTaskExecutor> executor) { + return AsyncTry([this] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient dbClient(opCtx); + + _stateDoc.setExpireAt( + _serviceContext->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); + + auto commandResponse = dbClient.runCommand([&] { + write_ops::Update updateOp(_stateDocumentsNS); + auto updateModification = + write_ops::UpdateModification::parseFromClassicUpdate(_stateDoc.toBSON()); + write_ops::UpdateOpEntry updateEntry( + BSON(TenantMigrationDonorDocument::kIdFieldName << _stateDoc.getId()), + updateModification); + updateEntry.setMulti(false); + updateEntry.setUpsert(false); + updateOp.setUpdates({updateEntry}); + + return updateOp.serialize({}); + }()); + + const auto commandReply = commandResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .until([](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus()); + }) + .on(**executor); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) { return WaitForMajorityService::get(_serviceContext) .waitUntilMajority(std::move(opTime)) .thenRunOn(**executor); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient( - OperationContext* opCtx, - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter, + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const BSONObj& cmdObj) { - HostAndPort recipientHost = - uassertStatusOK(recipientTargeter->findHost(opCtx, ReadPreferenceSetting())); - - executor::RemoteCommandRequest request(recipientHost, - NamespaceString::kAdminDb.toString(), - std::move(cmdObj), - rpc::makeEmptyMetadata(), - nullptr, - kRecipientSyncDataTimeout); - - auto recipientSyncDataResponsePF = - makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>(); - auto promisePtr = std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>( - std::move(recipientSyncDataResponsePF.promise)); - - auto scheduleResult = - (**executor)->scheduleRemoteCommand(std::move(request), [promisePtr](const auto& args) { - promisePtr->emplaceValue(args); - }); - - if (!scheduleResult.isOK()) { - // Since the command failed to be scheduled, the callback above did not and will not run. - // Thus, it is safe to fulfill the promise here without worrying about synchronizing access - // with the executor's thread. - promisePtr->setError(scheduleResult.getStatus()); - } - - return std::move(recipientSyncDataResponsePF.future) - .thenRunOn(**executor) - .then([this](auto args) -> Status { - if (!args.response.status.isOK()) { - return args.response.status; - } - return getStatusFromCommandResult(args.response.data); - }); + return AsyncTry([this, executor, recipientTargeterRS, cmdObj] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + HostAndPort recipientHost = + uassertStatusOK(recipientTargeterRS->findHost(opCtx, ReadPreferenceSetting())); + + executor::RemoteCommandRequest request(recipientHost, + NamespaceString::kAdminDb.toString(), + std::move(cmdObj), + rpc::makeEmptyMetadata(), + nullptr, + kRecipientSyncDataTimeout); + + auto recipientSyncDataResponsePF = + makePromiseFuture<executor::TaskExecutor::RemoteCommandCallbackArgs>(); + auto promisePtr = + std::make_shared<Promise<executor::TaskExecutor::RemoteCommandCallbackArgs>>( + std::move(recipientSyncDataResponsePF.promise)); + + auto scheduleResult = + (**executor) + ->scheduleRemoteCommand(std::move(request), [promisePtr](const auto& args) { + promisePtr->emplaceValue(args); + }); + + if (!scheduleResult.isOK()) { + // Since the command failed to be scheduled, the callback above did not and will + // not run. Thus, it is safe to fulfill the promise here without worrying about + // synchronizing access with the executor's thread. + promisePtr->setError(scheduleResult.getStatus()); + } + + return std::move(recipientSyncDataResponsePF.future) + .thenRunOn(**executor) + .then([this](auto args) -> Status { + if (!args.response.status.isOK()) { + return args.response.status; + } + return getStatusFromCommandResult(args.response.data); + }); + }) + .until([](Status status) { return shouldStopSendingRecipientCommand(status); }) + .on(**executor); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { if (skipSendingRecipientSyncDataCommand.shouldFail()) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -295,18 +353,14 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa return request.toBSON(BSONObj()); }()); - return _sendCommandToRecipient(opCtx, executor, recipientTargeter, cmdObj); + return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter) { - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - return _sendCommandToRecipient(opCtx, - executor, - recipientTargeter, + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { + return _sendCommandToRecipient(executor, + recipientTargeterRS, RecipientForgetMigration(_stateDoc.getId()).toBSON(BSONObj())); } @@ -314,7 +368,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { auto recipientUri = uassertStatusOK(MongoURI::parse(_stateDoc.getRecipientConnectionString().toString())); - auto recipientTargeter = std::shared_ptr<RemoteCommandTargeterRS>( + auto recipientTargeterRS = std::shared_ptr<RemoteCommandTargeterRS>( new RemoteCommandTargeterRS(recipientUri.getSetName(), recipientUri.getServers()), [this, setName = recipientUri.getSetName()](RemoteCommandTargeterRS* p) { ReplicaSetMonitor::remove(setName); @@ -324,11 +378,12 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( return ExecutorFuture<void>(**executor) .then([this, executor] { // Enter "dataSync" state. - const auto opTime = _insertStateDocument(); - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _insertStateDocument(executor).then([this, executor](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }); }) - .then([this, executor, recipientTargeter] { - return _sendRecipientSyncDataCommand(executor, recipientTargeter.get()); + .then([this, executor, recipientTargeterRS] { + return _sendRecipientSyncDataCommand(executor, recipientTargeterRS); }) .then([this, executor] { // Enter "blocking" state. @@ -336,11 +391,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); invariant(mtab); mtab->startBlockingWrites(); - const auto opTime = _updateStateDocument(TenantMigrationDonorStateEnum::kBlocking); - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kBlocking) + .then([this, executor](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }); }) - .then([this, executor, recipientTargeter] { - return _sendRecipientSyncDataCommand(executor, recipientTargeter.get()); + .then([this, executor, recipientTargeterRS] { + return _sendRecipientSyncDataCommand(executor, recipientTargeterRS); }) .then([this] { auto opCtxHolder = cc().makeOperationContext(); @@ -360,24 +417,25 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( uasserted(ErrorCodes::InternalError, "simulate a tenant migration error"); }); }) - .then([this] { + .then([this, executor] { // Enter "commit" state. - _updateStateDocument(TenantMigrationDonorStateEnum::kCommitted); - auto mtab = - getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); - invariant(mtab); - return mtab->onCompletion(); + return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kCommitted) + .then([this, executor](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }); }) .onError([this, executor](Status status) { auto mtab = getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) { - return SharedSemiFuture<void>(status); + return ExecutorFuture<void>(**executor, status); } else { // Enter "abort" state. _abortReason.emplace(status); - _updateStateDocument(TenantMigrationDonorStateEnum::kAborted); - return mtab->onCompletion(); + return _updateStateDocument(executor, TenantMigrationDonorStateEnum::kAborted) + .then([this, executor](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }); } }) .onCompletion([this](Status status) { @@ -396,14 +454,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } if (status.isOK()) { - _decisionPromise.emplaceValue(); - } else { + // The migration commited or aborted successfully. if (_abortReason) { - status.addContext(str::stream() - << "Tenant migration with id \"" << _stateDoc.getId() - << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() - << "\" aborted due to " << _abortReason); + _decisionPromise.setError( + {ErrorCodes::TenantMigrationAborted, + str::stream() << "Tenant migration with id \"" << _stateDoc.getId() + << "\" and dbPrefix \"" << _stateDoc.getDatabasePrefix() + << "\" aborted due to " << _abortReason}); + } else { + _decisionPromise.emplaceValue(); } + } else { + // There was a conflicting migration or this node is stepping down or shutting down. _decisionPromise.setError(status); } }) @@ -412,13 +474,15 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( return _receiveDonorForgetMigrationPromise.getFuture(); }) .then([this, executor] { - const auto opTime = _markStateDocumentAsGarbageCollectable(); - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _markStateDocumentAsGarbageCollectable(executor).then( + [this, executor](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime)); + }); }) - .then([this, executor, recipientTargeter] { - return _sendRecipientForgetMigrationCommand(executor, recipientTargeter.get()); + .then([this, executor, recipientTargeterRS] { + return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS); }) - .onCompletion([this, executor](Status status) { + .onCompletion([this](Status status) { LOGV2(4920400, "Marked migration state as garbage collectable", "migrationId"_attr = _stateDoc.getId(), diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 97ad6b3f4dc..af1409ab0db 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -100,7 +100,8 @@ public: * Inserts the state document to _stateDocumentsNS and returns the opTime for the insert * oplog entry. */ - repl::OpTime _insertStateDocument(); + ExecutorFuture<repl::OpTime> _insertStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor); /** * Updates the state document to have the given state. Then, persists the updated document @@ -108,41 +109,44 @@ public: * commitOrAbortTimestamp depending on the state. Returns the opTime for the update oplog * entry. */ - repl::OpTime _updateStateDocument(const TenantMigrationDonorStateEnum nextState); + ExecutorFuture<repl::OpTime> _updateStateDocument( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const TenantMigrationDonorStateEnum nextState); /** - * Sets the "expireAt" time for the state document to be garbage collected. + * Sets the "expireAt" time for the state document to be garbage collected, and returns the + * the opTime for the write. */ - repl::OpTime _markStateDocumentAsGarbageCollectable(); + ExecutorFuture<repl::OpTime> _markStateDocumentAsGarbageCollectable( + std::shared_ptr<executor::ScopedTaskExecutor> executor); /** * Waits for given opTime to be majority committed. */ ExecutorFuture<void> _waitForMajorityWriteConcern( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, repl::OpTime opTime); + std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime); /** * Sends the given command to the recipient replica set. */ ExecutorFuture<void> _sendCommandToRecipient( - OperationContext* opCtx, - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter, + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const BSONObj& cmdObj); /** * Sends the recipientSyncData command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientSyncDataCommand( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter); + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); /** * Sends the recipientForgetMigration command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientForgetMigrationCommand( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - RemoteCommandTargeter* recipientTargeter); + std::shared_ptr<executor::ScopedTaskExecutor> executor, + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); ServiceContext* _serviceContext; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 94331d29d1f..8b7b0e82f39 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -123,6 +123,7 @@ env.Library( '$BUILD_DIR/mongo/db/rs_local_client', '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/idl/server_parameter', + '$BUILD_DIR/mongo/util/future_util', 'resharding_util', ], ) diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index a2df539fb88..fd6f45ef4ef 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -510,6 +510,16 @@ env.Benchmark( ], ) +env.Library( + target='future_util', + source=[ + 'future_util.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/executor/task_executor_interface' + ], +) + env.Benchmark( target='hash_table_bm', source='hash_table_bm.cpp', @@ -672,6 +682,7 @@ icuEnv.CppUnitTest( 'diagnostic_info' if get_option('use-diagnostic-latches') == 'on' else [], 'dns_query', 'fail_point', + 'future_util', 'icu', 'latch_analyzer' if get_option('use-diagnostic-latches') == 'on' else [], 'md5', diff --git a/src/mongo/util/future_util.cpp b/src/mongo/util/future_util.cpp new file mode 100644 index 00000000000..72517448cc8 --- /dev/null +++ b/src/mongo/util/future_util.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/util/future_util.h" + +namespace mongo { + +ExecutorFuture<void> sleepUntil(std::shared_ptr<executor::TaskExecutor> executor, + const Date_t& date) { + auto [promise, future] = makePromiseFuture<void>(); + auto taskCompletionPromise = std::make_shared<Promise<void>>(std::move(promise)); + + auto scheduledWorkHandle = executor->scheduleWorkAt( + date, [taskCompletionPromise](const executor::TaskExecutor::CallbackArgs& args) mutable { + if (args.status.isOK()) { + taskCompletionPromise->emplaceValue(); + } else { + taskCompletionPromise->setError(args.status); + } + }); + + if (!scheduledWorkHandle.isOK()) { + taskCompletionPromise->setError(scheduledWorkHandle.getStatus()); + } + return std::move(future).thenRunOn(executor); +} + +ExecutorFuture<void> sleepFor(std::shared_ptr<executor::TaskExecutor> executor, + Milliseconds duration) { + return sleepUntil(executor, executor->now() + duration); +} + +} // namespace mongo diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h index 6c1d71ac134..06f035f8ef6 100644 --- a/src/mongo/util/future_util.h +++ b/src/mongo/util/future_util.h @@ -28,6 +28,7 @@ */ #pragma once +#include "mongo/executor/task_executor.h" #include "mongo/util/future.h" namespace mongo { @@ -36,32 +37,12 @@ namespace mongo { * Returns a future which will be fulfilled at the given date. */ ExecutorFuture<void> sleepUntil(std::shared_ptr<executor::TaskExecutor> executor, - const Date_t& date) { - auto [promise, future] = makePromiseFuture<void>(); - auto taskCompletionPromise = std::make_shared<Promise<void>>(std::move(promise)); - - auto scheduledWorkHandle = executor->scheduleWorkAt( - date, [taskCompletionPromise](const executor::TaskExecutor::CallbackArgs& args) mutable { - if (args.status.isOK()) { - taskCompletionPromise->emplaceValue(); - } else { - taskCompletionPromise->setError(args.status); - } - }); - - if (!scheduledWorkHandle.isOK()) { - taskCompletionPromise->setError(scheduledWorkHandle.getStatus()); - } - return std::move(future).thenRunOn(executor); -} - + const Date_t& date); /** * Returns a future which will be fulfilled after the given duration. */ ExecutorFuture<void> sleepFor(std::shared_ptr<executor::TaskExecutor> executor, - Milliseconds duration) { - return sleepUntil(executor, executor->now() + duration); -} + Milliseconds duration); namespace future_util_details { |