diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2021-04-23 03:07:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-23 21:46:58 +0000 |
commit | 8467c08ff3b6945b9e9e66652df31cd8174a42d0 (patch) | |
tree | dd86f77666b234c1ecc2f9668faf2c0805011295 | |
parent | 2882691604a80e36bdea975d7c040c28a55d5715 (diff) | |
download | mongo-8467c08ff3b6945b9e9e66652df31cd8174a42d0.tar.gz |
SERVER-55051: Donor should block non-timestamped reads after migration has committed
(cherry picked from commit 16cb79e780c4101118fdd3253bcacf5d63be886f)
20 files changed, 282 insertions, 80 deletions
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 7b8d8f49b72..0329cbdccce 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -237,6 +237,11 @@ function reformatResObjForLogging(resObj) { * object so that only failed operations are retried. */ function modifyCmdObjForRetry(cmdObj, resObj) { + if (!resObj.hasOwnProperty("writeErrors") && ErrorCodes.isTenantMigrationError(resObj.code)) { + // If we get a top level error without writeErrors, retry the entire command. + return; + } + if (cmdObj.insert) { let retryOps = []; if (cmdObj.ordered === false) { diff --git a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js index 594ebfbc68f..13656e4bf3b 100644 --- a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js +++ b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js @@ -23,9 +23,14 @@ const setUpMigrationSyncSourceTest = function() { name: `${jsTestName()}_donor`, nodes: 3, settings: {chainingAllowed: false}, - nodeOptions: - Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, - {setParameter: {tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000}}), + nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, { + setParameter: { + tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000, + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': + tojson({mode: 'alwaysOn'}), + } + }), }); donorRst.startSet(); donorRst.initiateWithHighElectionTimeout(); diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 654b2f88dcf..9f555ec264f 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -15,17 +15,22 @@ load("jstests/replsets/libs/tenant_migration_util.js"); * instead, with all nodes running the latest version. * * @param {string} [name] the name of the replica sets + * @param {boolean} [enableRecipientTesting] whether recipient would actually migrate tenant data * @param {Object} [donorRst] the ReplSetTest instance to adopt for the donor * @param {Object} [recipientRst] the ReplSetTest instance to adopt for the recipient * @param {Object} [sharedOptions] an object that can contain 'nodes' <number>, the number of nodes * each RST will contain, and 'setParameter' <object>, an object with various server parameters. + * @param {boolean} [allowDonorReadAfterMigration] whether donor would allow reads after a committed + * migration. */ function TenantMigrationTest({ name = "TenantMigrationTest", enableRecipientTesting = true, donorRst, recipientRst, - sharedOptions = {} + sharedOptions = {}, + // Default this to true so it is easier for data consistency checks. + allowStaleReadsOnDonor = true, }) { const donorPassedIn = (donorRst !== undefined); const recipientPassedIn = (recipientRst !== undefined); @@ -65,6 +70,11 @@ function TenantMigrationTest({ tojson({mode: 'alwaysOn'}); } + if (allowStaleReadsOnDonor) { + setParameterOpts["failpoint.tenantMigrationDonorAllowsNonTimestampedReads"] = + tojson({mode: 'alwaysOn'}); + } + let nodeOptions = isDonor ? migrationX509Options.donor : migrationX509Options.recipient; nodeOptions["setParameter"] = setParameterOpts; diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js index 0ab2c45b09d..cc6ecb5bfb6 100644 --- a/jstests/replsets/libs/tenant_migration_util.js +++ b/jstests/replsets/libs/tenant_migration_util.js @@ -284,6 +284,22 @@ var TenantMigrationUtil = (function() { const donorPrimary = donorRst.getPrimary(); const recipientPrimary = recipientRst.getPrimary(); + // Allows listCollections and listIndexes on donor after migration for consistency checks. + const donorAllowsReadsAfterMigration = + assert + .commandWorked(donorPrimary.adminCommand({ + getParameter: 1, + "failpoint.tenantMigrationDonorAllowsNonTimestampedReads": 1 + }))["failpoint.tenantMigrationDonorAllowsNonTimestampedReads"] + .mode; + // Only turn on the failpoint if it is not already. + if (!donorAllowsReadsAfterMigration) { + assert.commandWorked(donorPrimary.adminCommand({ + configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads", + mode: "alwaysOn" + })); + } + // Filter out all dbs that don't belong to the tenant. let combinedDBNames = [...donorPrimary.getDBNames(), ...recipientPrimary.getDBNames()]; combinedDBNames = combinedDBNames.filter( @@ -330,6 +346,14 @@ var TenantMigrationUtil = (function() { } assert(success, 'dbhash mismatch between donor and recipient primaries'); } + + // Reset failpoint on the donor after consistency checks if it wasn't enabled before. + if (!donorAllowsReadsAfterMigration) { + assert.commandWorked(donorPrimary.adminCommand({ + configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads", + mode: "off" + })); + } } /** diff --git a/jstests/replsets/tenant_migration_collection_ttl.js b/jstests/replsets/tenant_migration_collection_ttl.js index 0837b9a9477..cfffa252eef 100644 --- a/jstests/replsets/tenant_migration_collection_ttl.js +++ b/jstests/replsets/tenant_migration_collection_ttl.js @@ -23,6 +23,8 @@ const garbageCollectionOpts = { ttlMonitorSleepSecs: 5, // Allow reads on recipient before migration completes for testing. 'failpoint.tenantMigrationRecipientNotRejectReads': tojson({mode: 'alwaysOn'}), + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}), }; const tenantMigrationTest = new TenantMigrationTest( diff --git a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js index c119b573da0..dc39487da6a 100644 --- a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js +++ b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js @@ -36,8 +36,10 @@ const donorRst = new ReplSetTest({ nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { internalInsertMaxBatchSize: - kMaxBatchSize /* Decrease internal max batch size so we can still show writes are + kMaxBatchSize, /* Decrease internal max batch size so we can still show writes are batched without inserting hundreds of documents. */ + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}), } }) }); @@ -46,9 +48,9 @@ const recipientRst = new ReplSetTest({ name: 'recipient', nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { - internalInsertMaxBatchSize: - kMaxBatchSize /* Decrease internal max batch size so we can still show writes are - batched without inserting hundreds of documents. */ + internalInsertMaxBatchSize: kMaxBatchSize /* Decrease internal max batch size so we can + still show writes are batched without + inserting hundreds of documents. */ }, }) }); @@ -272,9 +274,9 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) { const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState"); - // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op - // observer. Without this failpoint, the migration could have already aborted by the time the - // write gets to the op observer. + // The failpoint below is used to ensure that a write to throw + // TenantMigrationConflict in the op observer. Without this failpoint, the migration + // could have already aborted by the time the write gets to the op observer. const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState"); const migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); @@ -369,8 +371,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) { assert.eq(writeErrors.length, 1); assert(writeErrors[0].errmsg); - // The single write error should correspond to the first write after the migration started - // blocking writes. + // The single write error should correspond to the first write after the migration + // started blocking writes. assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted); @@ -444,8 +446,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) { assert.eq(writeErrors.length, 1); assert(writeErrors[0].errmsg); - // The single write error should correspond to the first write after the migration started - // blocking writes. + // The single write error should correspond to the first write after the migration + // started blocking writes. assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted); @@ -493,9 +495,9 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) { const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState"); - // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op - // observer. Without this failpoint, the migration could have already aborted by the time the - // write gets to the op observer. + // The failpoint below is used to ensure that a write to throw + // TenantMigrationConflict in the op observer. Without this failpoint, the migration + // could have already aborted by the time the write gets to the op observer. const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState"); const migrationThread = new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); @@ -525,8 +527,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) { assert.eq(writeErrors.length, 1); assert(writeErrors[0].errmsg); - // The single write error should correspond to the first write after the migration started - // blocking writes. + // The single write error should correspond to the first write after the migration + // started blocking writes. assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize); assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted); diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js index 29a3b9f1638..c512d1567aa 100644 --- a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js +++ b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js @@ -19,7 +19,14 @@ load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); +const tenantMigrationTest = new TenantMigrationTest({ + name: jsTestName(), + sharedOptions: { + setParameter: + // Allow non-timestamped reads on donor after migration completes for testing. + {'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'})} + } +}); if (!tenantMigrationTest.isFeatureFlagEnabled()) { jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); return; diff --git a/jstests/replsets/tenant_migration_external_cluster_validation.js b/jstests/replsets/tenant_migration_external_cluster_validation.js index 7cd80d16a90..6034360588d 100644 --- a/jstests/replsets/tenant_migration_external_cluster_validation.js +++ b/jstests/replsets/tenant_migration_external_cluster_validation.js @@ -52,9 +52,13 @@ const donorRst = new ReplSetTest({ nodes: 2, name: "donor", keyFile: "jstests/libs/key1", - nodeOptions: Object.assign( - x509Options.donor, - {setParameter: {"failpoint.alwaysValidateClientsClusterTime": tojson({mode: "alwaysOn"})}}), + nodeOptions: Object.assign(x509Options.donor, { + setParameter: { + "failpoint.alwaysValidateClientsClusterTime": tojson({mode: "alwaysOn"}), + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}), + } + }), }); const recipientRst = new ReplSetTest({ diff --git a/jstests/replsets/tenant_migration_read_your_own_writes.js b/jstests/replsets/tenant_migration_read_your_own_writes.js new file mode 100644 index 00000000000..6eeac579037 --- /dev/null +++ b/jstests/replsets/tenant_migration_read_your_own_writes.js @@ -0,0 +1,122 @@ +/** + * Tests that non-timestamped reads are not allowed on the donor after the migration has committed + * so that we typically provide read-your-own-write guarantees for primary reads across tenant + * migrations when there is no other failover. + * + * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft, + * incompatible_with_windows_tls, incompatible_with_macos, requires_persistence] + */ + +(function() { +"use strict"; + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). + +const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), allowStaleReadsOnDonor: false}); + +if (!tenantMigrationTest.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + return; +} +const kTenantId = "testTenantId"; +const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB"); +const kCollName = "testColl"; + +const donorPrimary = tenantMigrationTest.getDonorPrimary(); + +tenantMigrationTest.insertDonorDB(kDbName, kCollName, [...Array(10).keys()].map(x => ({x: x}))); + +const donorDB = donorPrimary.getDB(kDbName); +const cursor = assert + .commandWorked(donorDB.runCommand({ + find: kCollName, + batchSize: 5, + })) + .cursor; +assert.eq(5, cursor.firstBatch.length, tojson(cursor)); +assert.neq(0, cursor.id, tojson(cursor)); +jsTestLog(`Started cursor id ${cursor.id} on the donor before the migration`); + +const migrationId = UUID(); +const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationId), + tenantId: kTenantId, +}; +const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); +assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted); + +// Test that getMore works after the migration has committed. +jsTestLog(`Testing getMore on cursor id ${cursor.id} on the donor after the migration`); +assert.commandWorked(donorDB.runCommand({getMore: cursor.id, collection: kCollName})); + +// Test that local and majority reads are no longer allowed on the donor. +const testCases = { + find: {command: {find: kCollName}}, + count: {command: {count: kCollName}}, + distinct: {command: {distinct: kCollName, key: "x", query: {}}}, + aggregate: {command: {aggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}}, + mapReduce: { + command: { + mapReduce: kCollName, + map: () => { + emit(this.x, 1); + }, + reduce: (key, value) => { + return 1; + }, + out: {inline: 1} + }, + skipReadConcernMajority: true, + }, + findAndModify: { + command: {findAndModify: kCollName, query: {x: 1}, update: {$set: {x: 1}}}, + skipReadConcernMajority: true, + }, + update: { + // No-op write due to stale read is also not allowed. + command: {update: kCollName, updates: [{q: {x: 1}, u: {$set: {x: 1}}}]}, + skipReadConcernMajority: true, + }, + delete: { + // No-op write due to stale read is also not allowed. + command: {delete: kCollName, deletes: [{q: {x: 100}, limit: 1}]}, + skipReadConcernMajority: true, + }, + listCollections: { + command: {listCollections: 1}, + skipReadConcernMajority: true, + }, + listIndexes: { + command: {listIndexes: kCollName}, + skipReadConcernMajority: true, + }, +}; + +const readConcerns = { + local: {level: "local"}, + majority: {level: "majority"}, +}; + +for (const [testCaseName, testCase] of Object.entries(testCases)) { + for (const [readConcernName, readConcern] of Object.entries(readConcerns)) { + if (testCase.skipReadConcernMajority && readConcernName === "majority") { + continue; + } + jsTest.log(`Testing ${testCaseName} with readConcern ${readConcernName}`); + let cmd = testCase.command; + cmd.readConcern = readConcern; + assert.commandFailedWithCode(donorDB.runCommand(cmd), ErrorCodes.TenantMigrationCommitted); + } +} + +// Enable stale reads on the donor set so that end of test data consistency check can pass. +tenantMigrationTest.getDonorRst().nodes.forEach( + node => assert.commandWorked(node.adminCommand( + {configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads", mode: "alwaysOn"}))); + +tenantMigrationTest.stop(); +})(); diff --git a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js index f33cf722c34..3069c919d33 100644 --- a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js +++ b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js @@ -16,22 +16,11 @@ load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject() load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/replsets/libs/tenant_migration_util.js"); -// Use a single node replSet to simplify the process. -const donorRst = new ReplSetTest({ - nodes: 1, - name: jsTestName() + "_donor", - nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor -}); - -donorRst.startSet(); -donorRst.initiate(); - // Make the batch size small so that we can pause before all the batches are applied. const tenantMigrationTest = new TenantMigrationTest( - {name: jsTestName(), donorRst, sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}}); + {name: jsTestName(), sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}}); if (!tenantMigrationTest.isFeatureFlagEnabled()) { - donorRst.stopSet(); jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); return; } @@ -105,6 +94,5 @@ fpPauseOplogApplier.off(); jsTestLog("Waiting for migration to complete."); assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); -donorRst.stopSet(); tenantMigrationTest.stop(); -})();
\ No newline at end of file +})(); diff --git a/jstests/replsets/tenant_migration_sync_source_too_stale.js b/jstests/replsets/tenant_migration_sync_source_too_stale.js index 07f8c315d77..2096394f3fc 100644 --- a/jstests/replsets/tenant_migration_sync_source_too_stale.js +++ b/jstests/replsets/tenant_migration_sync_source_too_stale.js @@ -10,7 +10,7 @@ * selection until it finds a sync source that is no longer too stale. * * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft, - * incompatible_with_windows_tls] + * incompatible_with_windows_tls, incompatible_with_macos, requires_persistence] */ (function() { @@ -26,7 +26,12 @@ const donorRst = new ReplSetTest({ name: `${jsTestName()}_donor`, nodes: 3, settings: {chainingAllowed: false}, - nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor, + nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, { + setParameter: { + // Allow non-timestamped reads on donor after migration completes for testing. + 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}), + } + }), }); donorRst.startSet(); donorRst.initiateWithHighElectionTimeout(); diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h index b2498adfe84..0740d1561af 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker.h @@ -66,7 +66,8 @@ public: OperationType operationType) = 0; virtual Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) = 0; - virtual SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) = 0; + virtual SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx, + StringData command) = 0; // // Called by index build user threads before acquiring an index build slot, and again right diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 4d80a4c3fe5..5af3448504f 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -130,10 +130,11 @@ TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc) { return donorStateDoc; } -SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbName) { +SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, const OpMsgRequest& request) { // We need to check both donor and recipient access blockers in the case where two // migrations happen back-to-back before the old recipient state (from the first // migration) is garbage collected. + auto dbName = request.getDatabase(); auto mtabPair = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .getTenantMigrationAccessBlockerForDbName(dbName); @@ -152,7 +153,7 @@ SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbNam std::vector<ExecutorFuture<void>> futures; std::shared_ptr<executor::TaskExecutor> executor; if (donorMtab) { - auto canReadFuture = donorMtab->getCanReadFuture(opCtx); + auto canReadFuture = donorMtab->getCanReadFuture(opCtx, request.getCommandName()); if (canReadFuture.isReady()) { auto status = canReadFuture.getNoThrow(); donorMtab->recordTenantMigrationError(status); @@ -164,7 +165,7 @@ SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbNam futures.emplace_back(std::move(canReadFuture).semi().thenRunOn(executor)); } if (recipientMtab) { - auto canReadFuture = recipientMtab->getCanReadFuture(opCtx); + auto canReadFuture = recipientMtab->getCanReadFuture(opCtx, request.getCommandName()); if (canReadFuture.isReady()) { auto status = canReadFuture.getNoThrow(); recipientMtab->recordTenantMigrationError(status); diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h index 6a200163ff5..530632d1b1b 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h @@ -53,12 +53,14 @@ std::shared_ptr<TenantMigrationRecipientAccessBlocker> getTenantMigrationRecipie TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc); /** - * If the operation has read concern "snapshot" or includes afterClusterTime, and the database is - * in the read blocking state at the given atClusterTime or afterClusterTime or the selected read - * timestamp, the promise will be set for the returned future when the migration is committed or - * aborted. Note: for better performance, check if the future is immediately ready. + * Checks if a request is allowed to read based on the tenant migration states of this node as a + * donor or as a recipient. TenantMigrationCommitted is returned if the request needs to be + * re-routed to the new owner of the tenant. If the tenant is currently being migrated and the + * request needs to block, a future for when the request is unblocked is returned, and the promise + * will be set for the returned future when the migration is committed or aborted. Note: for better + * performance, check if the future is immediately ready. */ -SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbName); +SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, const OpMsgRequest& request); /** * If the operation has read concern "linearizable", throws TenantMigrationCommitted error if the diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp index 1e6e452a08f..cb8ac0a7466 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp @@ -47,8 +47,27 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(tenantMigrationDonorAllowsNonTimestampedReads); + const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); +// Commands that are not allowed to run against the donor after a committed migration so that we +// typically provide read-your-own-write guarantees for primary reads across tenant migrations. +const StringMap<int> commandDenyListAfterMigration = { + {"find", 1}, + {"count", 1}, + {"distinct", 1}, + {"aggregate", 1}, + {"mapReduce", 1}, + {"mapreduce", 1}, + {"findAndModify", 1}, + {"findandmodify", 1}, + {"listCollections", 1}, + {"listIndexes", 1}, + {"update", 1}, + {"delete", 1}, +}; + } // namespace TenantMigrationDonorAccessBlocker::TenantMigrationDonorAccessBlocker( @@ -115,10 +134,10 @@ Status TenantMigrationDonorAccessBlocker::waitUntilCommittedOrAborted(OperationC MONGO_UNREACHABLE; } -SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture( - OperationContext* opCtx) { +SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture(OperationContext* opCtx, + StringData command) { auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - auto readTimestamp = [opCtx, &readConcernArgs]() -> std::optional<Timestamp> { + auto readTimestamp = [opCtx, &readConcernArgs]() -> boost::optional<Timestamp> { if (auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime()) { return afterClusterTime->asTimestamp(); } @@ -128,20 +147,29 @@ SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture( if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { return repl::StorageInterface::get(opCtx)->getPointInTimeReadTimestamp(opCtx); } - return std::nullopt; + return boost::none; }(); + + stdx::lock_guard<Latch> lk(_mutex); if (!readTimestamp) { - return SharedSemiFuture<void>(); + if (!MONGO_unlikely(tenantMigrationDonorAllowsNonTimestampedReads.shouldFail()) && + _state == State::kReject && + commandDenyListAfterMigration.find(command) != commandDenyListAfterMigration.end()) { + LOGV2_DEBUG(5505100, + 1, + "Donor blocking non-timestamped reads after committed migration", + "command"_attr = command, + "tenantId"_attr = _tenantId); + return SharedSemiFuture<void>( + Status(ErrorCodes::TenantMigrationCommitted, + "Read must be re-routed to the new owner of this tenant")); + } else { + return SharedSemiFuture<void>(); + } } - return _getCanDoClusterTimeReadFuture(opCtx, *readTimestamp); -} - -SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::_getCanDoClusterTimeReadFuture( - OperationContext* opCtx, Timestamp readTimestamp) { - stdx::unique_lock<Latch> ul(_mutex); auto canRead = _state == State::kAllow || _state == State::kAborted || - _state == State::kBlockWrites || readTimestamp < *_blockTimestamp; + _state == State::kBlockWrites || *readTimestamp < *_blockTimestamp; if (canRead) { return SharedSemiFuture<void>(); diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h index 290754a567f..405d19bc3a0 100644 --- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h @@ -189,7 +189,7 @@ public: Status waitUntilCommittedOrAborted(OperationContext* opCtx, OperationType operationType) final; Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) final; - SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) final; + SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx, StringData command) final; // // Called by index build user threads before acquiring an index build slot, and again right @@ -270,10 +270,6 @@ private: void _onMajorityCommitCommitOpTime(stdx::unique_lock<Latch>& lk); void _onMajorityCommitAbortOpTime(stdx::unique_lock<Latch>& lk); - // Helper for the method 'getCanReadFuture()'. - SharedSemiFuture<void> _getCanDoClusterTimeReadFuture(OperationContext* opCtx, - Timestamp readTimestamp); - ServiceContext* _serviceContext; const std::string _tenantId; const std::string _recipientConnString; diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp index 18040a2f596..d164ca37330 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp @@ -79,7 +79,7 @@ Status TenantMigrationRecipientAccessBlocker::waitUntilCommittedOrAborted( } SharedSemiFuture<void> TenantMigrationRecipientAccessBlocker::getCanReadFuture( - OperationContext* opCtx) { + OperationContext* opCtx, StringData command) { if (MONGO_unlikely(tenantMigrationRecipientNotRejectReads.shouldFail())) { return SharedSemiFuture<void>(); } diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h index 3f622d62039..e06a6a82bf5 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h @@ -90,7 +90,7 @@ public: Status waitUntilCommittedOrAborted(OperationContext* opCtx, OperationType operationType) final; Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) final; - SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) final; + SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx, StringData command) final; // // Called by index build user threads before acquiring an index build slot, and again right diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp index e0f1cdf002f..d84cd1a8bf5 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp @@ -145,25 +145,25 @@ TEST_F(TenantMigrationRecipientAccessBlockerTest, StateReject) { // Default read concern. ASSERT_THROWS_CODE( - mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld); + mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld); // Majority read concern. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern); ASSERT_THROWS_CODE( - mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld); + mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld); // Snapshot read concern. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern); opCtx()->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, Timestamp(1, 1)); ASSERT_THROWS_CODE( - mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld); + mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld); // Snapshot read concern with atClusterTime. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern); ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(1, 1)); ASSERT_THROWS_CODE( - mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld); + mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld); } TEST_F(TenantMigrationRecipientAccessBlockerTest, StateRejectBefore) { @@ -194,22 +194,22 @@ TEST_F(TenantMigrationRecipientAccessBlockerTest, StateRejectBefore) { } // Default read concern. - ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow()); + ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow()); // Majority read concern. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern); - ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow()); + ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow()); // Snapshot read at a later timestamp. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern); ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(3, 1)); - ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow()); + ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow()); // Snapshot read at an earlier timestamp. ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern); ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(1, 1)); ASSERT_THROWS_CODE( - mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld); + mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld); } } // namespace repl diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index e2f3e5957e0..84a913cdcd8 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -812,8 +812,8 @@ Future<void> InvokeCommand::run() { auto execContext = _ecd->getExecutionContext(); // TODO SERVER-53761: find out if we can do this more asynchronously. The client // Strand is locked to current thread in ServiceStateMachine::Impl::startNewLoop(). - tenant_migration_access_blocker::checkIfCanReadOrBlock( - execContext->getOpCtx(), execContext->getRequest().getDatabase()) + tenant_migration_access_blocker::checkIfCanReadOrBlock(execContext->getOpCtx(), + execContext->getRequest()) .get(execContext->getOpCtx()); return runCommandInvocation(_ecd->getExecutionContext(), _ecd->getInvocation()); }) @@ -830,8 +830,8 @@ Future<void> CheckoutSessionAndInvokeCommand::run() { auto execContext = _ecd->getExecutionContext(); // TODO SERVER-53761: find out if we can do this more asynchronously. - tenant_migration_access_blocker::checkIfCanReadOrBlock( - execContext->getOpCtx(), execContext->getRequest().getDatabase()) + tenant_migration_access_blocker::checkIfCanReadOrBlock(execContext->getOpCtx(), + execContext->getRequest()) .get(execContext->getOpCtx()); return runCommandInvocation(_ecd->getExecutionContext(), _ecd->getInvocation()); }) |