summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2021-04-23 03:07:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-23 21:46:58 +0000
commit8467c08ff3b6945b9e9e66652df31cd8174a42d0 (patch)
treedd86f77666b234c1ecc2f9668faf2c0805011295
parent2882691604a80e36bdea975d7c040c28a55d5715 (diff)
downloadmongo-8467c08ff3b6945b9e9e66652df31cd8174a42d0.tar.gz
SERVER-55051: Donor should block non-timestamped reads after migration has committed
(cherry picked from commit 16cb79e780c4101118fdd3253bcacf5d63be886f)
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js5
-rw-r--r--jstests/replsets/libs/tenant_migration_recipient_sync_source.js11
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js12
-rw-r--r--jstests/replsets/libs/tenant_migration_util.js24
-rw-r--r--jstests/replsets/tenant_migration_collection_ttl.js2
-rw-r--r--jstests/replsets/tenant_migration_concurrent_bulk_writes.js34
-rw-r--r--jstests/replsets/tenant_migration_concurrent_writes_on_donor.js9
-rw-r--r--jstests/replsets/tenant_migration_external_cluster_validation.js10
-rw-r--r--jstests/replsets/tenant_migration_read_your_own_writes.js122
-rw-r--r--jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js16
-rw-r--r--jstests/replsets/tenant_migration_sync_source_too_stale.js9
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.h3
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.cpp7
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.h12
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp52
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_access_blocker.h6
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp16
-rw-r--r--src/mongo/db/service_entry_point_common.cpp8
20 files changed, 282 insertions, 80 deletions
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 7b8d8f49b72..0329cbdccce 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -237,6 +237,11 @@ function reformatResObjForLogging(resObj) {
* object so that only failed operations are retried.
*/
function modifyCmdObjForRetry(cmdObj, resObj) {
+ if (!resObj.hasOwnProperty("writeErrors") && ErrorCodes.isTenantMigrationError(resObj.code)) {
+ // If we get a top level error without writeErrors, retry the entire command.
+ return;
+ }
+
if (cmdObj.insert) {
let retryOps = [];
if (cmdObj.ordered === false) {
diff --git a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js
index 594ebfbc68f..13656e4bf3b 100644
--- a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js
+++ b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js
@@ -23,9 +23,14 @@ const setUpMigrationSyncSourceTest = function() {
name: `${jsTestName()}_donor`,
nodes: 3,
settings: {chainingAllowed: false},
- nodeOptions:
- Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor,
- {setParameter: {tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000}}),
+ nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
+ setParameter: {
+ tenantMigrationExcludeDonorHostTimeoutMS: 30 * 1000,
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads':
+ tojson({mode: 'alwaysOn'}),
+ }
+ }),
});
donorRst.startSet();
donorRst.initiateWithHighElectionTimeout();
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index 654b2f88dcf..9f555ec264f 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -15,17 +15,22 @@ load("jstests/replsets/libs/tenant_migration_util.js");
* instead, with all nodes running the latest version.
*
* @param {string} [name] the name of the replica sets
+ * @param {boolean} [enableRecipientTesting] whether recipient would actually migrate tenant data
* @param {Object} [donorRst] the ReplSetTest instance to adopt for the donor
* @param {Object} [recipientRst] the ReplSetTest instance to adopt for the recipient
* @param {Object} [sharedOptions] an object that can contain 'nodes' <number>, the number of nodes
* each RST will contain, and 'setParameter' <object>, an object with various server parameters.
+ * @param {boolean} [allowDonorReadAfterMigration] whether donor would allow reads after a committed
+ * migration.
*/
function TenantMigrationTest({
name = "TenantMigrationTest",
enableRecipientTesting = true,
donorRst,
recipientRst,
- sharedOptions = {}
+ sharedOptions = {},
+ // Default this to true so it is easier for data consistency checks.
+ allowStaleReadsOnDonor = true,
}) {
const donorPassedIn = (donorRst !== undefined);
const recipientPassedIn = (recipientRst !== undefined);
@@ -65,6 +70,11 @@ function TenantMigrationTest({
tojson({mode: 'alwaysOn'});
}
+ if (allowStaleReadsOnDonor) {
+ setParameterOpts["failpoint.tenantMigrationDonorAllowsNonTimestampedReads"] =
+ tojson({mode: 'alwaysOn'});
+ }
+
let nodeOptions = isDonor ? migrationX509Options.donor : migrationX509Options.recipient;
nodeOptions["setParameter"] = setParameterOpts;
diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js
index 0ab2c45b09d..cc6ecb5bfb6 100644
--- a/jstests/replsets/libs/tenant_migration_util.js
+++ b/jstests/replsets/libs/tenant_migration_util.js
@@ -284,6 +284,22 @@ var TenantMigrationUtil = (function() {
const donorPrimary = donorRst.getPrimary();
const recipientPrimary = recipientRst.getPrimary();
+ // Allows listCollections and listIndexes on donor after migration for consistency checks.
+ const donorAllowsReadsAfterMigration =
+ assert
+ .commandWorked(donorPrimary.adminCommand({
+ getParameter: 1,
+ "failpoint.tenantMigrationDonorAllowsNonTimestampedReads": 1
+ }))["failpoint.tenantMigrationDonorAllowsNonTimestampedReads"]
+ .mode;
+ // Only turn on the failpoint if it is not already.
+ if (!donorAllowsReadsAfterMigration) {
+ assert.commandWorked(donorPrimary.adminCommand({
+ configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads",
+ mode: "alwaysOn"
+ }));
+ }
+
// Filter out all dbs that don't belong to the tenant.
let combinedDBNames = [...donorPrimary.getDBNames(), ...recipientPrimary.getDBNames()];
combinedDBNames = combinedDBNames.filter(
@@ -330,6 +346,14 @@ var TenantMigrationUtil = (function() {
}
assert(success, 'dbhash mismatch between donor and recipient primaries');
}
+
+ // Reset failpoint on the donor after consistency checks if it wasn't enabled before.
+ if (!donorAllowsReadsAfterMigration) {
+ assert.commandWorked(donorPrimary.adminCommand({
+ configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads",
+ mode: "off"
+ }));
+ }
}
/**
diff --git a/jstests/replsets/tenant_migration_collection_ttl.js b/jstests/replsets/tenant_migration_collection_ttl.js
index 0837b9a9477..cfffa252eef 100644
--- a/jstests/replsets/tenant_migration_collection_ttl.js
+++ b/jstests/replsets/tenant_migration_collection_ttl.js
@@ -23,6 +23,8 @@ const garbageCollectionOpts = {
ttlMonitorSleepSecs: 5,
// Allow reads on recipient before migration completes for testing.
'failpoint.tenantMigrationRecipientNotRejectReads': tojson({mode: 'alwaysOn'}),
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
};
const tenantMigrationTest = new TenantMigrationTest(
diff --git a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
index c119b573da0..dc39487da6a 100644
--- a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
+++ b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js
@@ -36,8 +36,10 @@ const donorRst = new ReplSetTest({
nodeOptions: Object.assign(migrationX509Options.donor, {
setParameter: {
internalInsertMaxBatchSize:
- kMaxBatchSize /* Decrease internal max batch size so we can still show writes are
+ kMaxBatchSize, /* Decrease internal max batch size so we can still show writes are
batched without inserting hundreds of documents. */
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
}
})
});
@@ -46,9 +48,9 @@ const recipientRst = new ReplSetTest({
name: 'recipient',
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
- internalInsertMaxBatchSize:
- kMaxBatchSize /* Decrease internal max batch size so we can still show writes are
- batched without inserting hundreds of documents. */
+ internalInsertMaxBatchSize: kMaxBatchSize /* Decrease internal max batch size so we can
+ still show writes are batched without
+ inserting hundreds of documents. */
},
})
});
@@ -272,9 +274,9 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) {
const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState");
- // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op
- // observer. Without this failpoint, the migration could have already aborted by the time the
- // write gets to the op observer.
+ // The failpoint below is used to ensure that a write to throw
+ // TenantMigrationConflict in the op observer. Without this failpoint, the migration
+ // could have already aborted by the time the write gets to the op observer.
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread =
new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
@@ -369,8 +371,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) {
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
- // The single write error should correspond to the first write after the migration started
- // blocking writes.
+ // The single write error should correspond to the first write after the migration
+ // started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
@@ -444,8 +446,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) {
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
- // The single write error should correspond to the first write after the migration started
- // blocking writes.
+ // The single write error should correspond to the first write after the migration
+ // started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationCommitted);
@@ -493,9 +495,9 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) {
const abortFp = configureFailPoint(primaryDB, "abortTenantMigrationBeforeLeavingBlockingState");
- // The failpoint below is used to ensure that a write to throw TenantMigrationConflict in the op
- // observer. Without this failpoint, the migration could have already aborted by the time the
- // write gets to the op observer.
+ // The failpoint below is used to ensure that a write to throw
+ // TenantMigrationConflict in the op observer. Without this failpoint, the migration
+ // could have already aborted by the time the write gets to the op observer.
const blockFp = configureFailPoint(primaryDB, "pauseTenantMigrationBeforeLeavingBlockingState");
const migrationThread =
new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
@@ -525,8 +527,8 @@ function bulkWriteDocsUnordered(primaryHost, dbName, collName, numDocs) {
assert.eq(writeErrors.length, 1);
assert(writeErrors[0].errmsg);
- // The single write error should correspond to the first write after the migration started
- // blocking writes.
+ // The single write error should correspond to the first write after the migration
+ // started blocking writes.
assert.eq(writeErrors[0].index, kNumWriteBatchesWithoutMigrationConflict * kMaxBatchSize);
assert.eq(writeErrors[0].code, ErrorCodes.TenantMigrationAborted);
diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
index 29a3b9f1638..c512d1567aa 100644
--- a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
+++ b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
@@ -19,7 +19,14 @@ load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
+const tenantMigrationTest = new TenantMigrationTest({
+ name: jsTestName(),
+ sharedOptions: {
+ setParameter:
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ {'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'})}
+ }
+});
if (!tenantMigrationTest.isFeatureFlagEnabled()) {
jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
return;
diff --git a/jstests/replsets/tenant_migration_external_cluster_validation.js b/jstests/replsets/tenant_migration_external_cluster_validation.js
index 7cd80d16a90..6034360588d 100644
--- a/jstests/replsets/tenant_migration_external_cluster_validation.js
+++ b/jstests/replsets/tenant_migration_external_cluster_validation.js
@@ -52,9 +52,13 @@ const donorRst = new ReplSetTest({
nodes: 2,
name: "donor",
keyFile: "jstests/libs/key1",
- nodeOptions: Object.assign(
- x509Options.donor,
- {setParameter: {"failpoint.alwaysValidateClientsClusterTime": tojson({mode: "alwaysOn"})}}),
+ nodeOptions: Object.assign(x509Options.donor, {
+ setParameter: {
+ "failpoint.alwaysValidateClientsClusterTime": tojson({mode: "alwaysOn"}),
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
+ }
+ }),
});
const recipientRst = new ReplSetTest({
diff --git a/jstests/replsets/tenant_migration_read_your_own_writes.js b/jstests/replsets/tenant_migration_read_your_own_writes.js
new file mode 100644
index 00000000000..6eeac579037
--- /dev/null
+++ b/jstests/replsets/tenant_migration_read_your_own_writes.js
@@ -0,0 +1,122 @@
+/**
+ * Tests that non-timestamped reads are not allowed on the donor after the migration has committed
+ * so that we typically provide read-your-own-write guarantees for primary reads across tenant
+ * migrations when there is no other failover.
+ *
+ * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft,
+ * incompatible_with_windows_tls, incompatible_with_macos, requires_persistence]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint().
+
+const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), allowStaleReadsOnDonor: false});
+
+if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ return;
+}
+const kTenantId = "testTenantId";
+const kDbName = tenantMigrationTest.tenantDB(kTenantId, "testDB");
+const kCollName = "testColl";
+
+const donorPrimary = tenantMigrationTest.getDonorPrimary();
+
+tenantMigrationTest.insertDonorDB(kDbName, kCollName, [...Array(10).keys()].map(x => ({x: x})));
+
+const donorDB = donorPrimary.getDB(kDbName);
+const cursor = assert
+ .commandWorked(donorDB.runCommand({
+ find: kCollName,
+ batchSize: 5,
+ }))
+ .cursor;
+assert.eq(5, cursor.firstBatch.length, tojson(cursor));
+assert.neq(0, cursor.id, tojson(cursor));
+jsTestLog(`Started cursor id ${cursor.id} on the donor before the migration`);
+
+const migrationId = UUID();
+const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationId),
+ tenantId: kTenantId,
+};
+const migrationRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
+assert.eq(migrationRes.state, TenantMigrationTest.State.kCommitted);
+
+// Test that getMore works after the migration has committed.
+jsTestLog(`Testing getMore on cursor id ${cursor.id} on the donor after the migration`);
+assert.commandWorked(donorDB.runCommand({getMore: cursor.id, collection: kCollName}));
+
+// Test that local and majority reads are no longer allowed on the donor.
+const testCases = {
+ find: {command: {find: kCollName}},
+ count: {command: {count: kCollName}},
+ distinct: {command: {distinct: kCollName, key: "x", query: {}}},
+ aggregate: {command: {aggregate: kCollName, pipeline: [{$match: {}}], cursor: {}}},
+ mapReduce: {
+ command: {
+ mapReduce: kCollName,
+ map: () => {
+ emit(this.x, 1);
+ },
+ reduce: (key, value) => {
+ return 1;
+ },
+ out: {inline: 1}
+ },
+ skipReadConcernMajority: true,
+ },
+ findAndModify: {
+ command: {findAndModify: kCollName, query: {x: 1}, update: {$set: {x: 1}}},
+ skipReadConcernMajority: true,
+ },
+ update: {
+ // No-op write due to stale read is also not allowed.
+ command: {update: kCollName, updates: [{q: {x: 1}, u: {$set: {x: 1}}}]},
+ skipReadConcernMajority: true,
+ },
+ delete: {
+ // No-op write due to stale read is also not allowed.
+ command: {delete: kCollName, deletes: [{q: {x: 100}, limit: 1}]},
+ skipReadConcernMajority: true,
+ },
+ listCollections: {
+ command: {listCollections: 1},
+ skipReadConcernMajority: true,
+ },
+ listIndexes: {
+ command: {listIndexes: kCollName},
+ skipReadConcernMajority: true,
+ },
+};
+
+const readConcerns = {
+ local: {level: "local"},
+ majority: {level: "majority"},
+};
+
+for (const [testCaseName, testCase] of Object.entries(testCases)) {
+ for (const [readConcernName, readConcern] of Object.entries(readConcerns)) {
+ if (testCase.skipReadConcernMajority && readConcernName === "majority") {
+ continue;
+ }
+ jsTest.log(`Testing ${testCaseName} with readConcern ${readConcernName}`);
+ let cmd = testCase.command;
+ cmd.readConcern = readConcern;
+ assert.commandFailedWithCode(donorDB.runCommand(cmd), ErrorCodes.TenantMigrationCommitted);
+ }
+}
+
+// Enable stale reads on the donor set so that end of test data consistency check can pass.
+tenantMigrationTest.getDonorRst().nodes.forEach(
+ node => assert.commandWorked(node.adminCommand(
+ {configureFailPoint: "tenantMigrationDonorAllowsNonTimestampedReads", mode: "alwaysOn"})));
+
+tenantMigrationTest.stop();
+})();
diff --git a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js
index f33cf722c34..3069c919d33 100644
--- a/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js
+++ b/jstests/replsets/tenant_migration_recipient_sync_donor_timestamp.js
@@ -16,22 +16,11 @@ load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject()
load("jstests/replsets/libs/tenant_migration_test.js");
load("jstests/replsets/libs/tenant_migration_util.js");
-// Use a single node replSet to simplify the process.
-const donorRst = new ReplSetTest({
- nodes: 1,
- name: jsTestName() + "_donor",
- nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor
-});
-
-donorRst.startSet();
-donorRst.initiate();
-
// Make the batch size small so that we can pause before all the batches are applied.
const tenantMigrationTest = new TenantMigrationTest(
- {name: jsTestName(), donorRst, sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}});
+ {name: jsTestName(), sharedOptions: {setParameter: {tenantApplierBatchSizeOps: 2}}});
if (!tenantMigrationTest.isFeatureFlagEnabled()) {
- donorRst.stopSet();
jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
return;
}
@@ -105,6 +94,5 @@ fpPauseOplogApplier.off();
jsTestLog("Waiting for migration to complete.");
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
-donorRst.stopSet();
tenantMigrationTest.stop();
-})(); \ No newline at end of file
+})();
diff --git a/jstests/replsets/tenant_migration_sync_source_too_stale.js b/jstests/replsets/tenant_migration_sync_source_too_stale.js
index 07f8c315d77..2096394f3fc 100644
--- a/jstests/replsets/tenant_migration_sync_source_too_stale.js
+++ b/jstests/replsets/tenant_migration_sync_source_too_stale.js
@@ -10,7 +10,7 @@
* selection until it finds a sync source that is no longer too stale.
*
* @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_eft,
- * incompatible_with_windows_tls]
+ * incompatible_with_windows_tls, incompatible_with_macos, requires_persistence]
*/
(function() {
@@ -26,7 +26,12 @@ const donorRst = new ReplSetTest({
name: `${jsTestName()}_donor`,
nodes: 3,
settings: {chainingAllowed: false},
- nodeOptions: TenantMigrationUtil.makeX509OptionsForTest().donor,
+ nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().donor, {
+ setParameter: {
+ // Allow non-timestamped reads on donor after migration completes for testing.
+ 'failpoint.tenantMigrationDonorAllowsNonTimestampedReads': tojson({mode: 'alwaysOn'}),
+ }
+ }),
});
donorRst.startSet();
donorRst.initiateWithHighElectionTimeout();
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.h b/src/mongo/db/repl/tenant_migration_access_blocker.h
index b2498adfe84..0740d1561af 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.h
@@ -66,7 +66,8 @@ public:
OperationType operationType) = 0;
virtual Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) = 0;
- virtual SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) = 0;
+ virtual SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx,
+ StringData command) = 0;
//
// Called by index build user threads before acquiring an index build slot, and again right
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
index 4d80a4c3fe5..5af3448504f 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
@@ -130,10 +130,11 @@ TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc) {
return donorStateDoc;
}
-SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbName) {
+SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, const OpMsgRequest& request) {
// We need to check both donor and recipient access blockers in the case where two
// migrations happen back-to-back before the old recipient state (from the first
// migration) is garbage collected.
+ auto dbName = request.getDatabase();
auto mtabPair = TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.getTenantMigrationAccessBlockerForDbName(dbName);
@@ -152,7 +153,7 @@ SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbNam
std::vector<ExecutorFuture<void>> futures;
std::shared_ptr<executor::TaskExecutor> executor;
if (donorMtab) {
- auto canReadFuture = donorMtab->getCanReadFuture(opCtx);
+ auto canReadFuture = donorMtab->getCanReadFuture(opCtx, request.getCommandName());
if (canReadFuture.isReady()) {
auto status = canReadFuture.getNoThrow();
donorMtab->recordTenantMigrationError(status);
@@ -164,7 +165,7 @@ SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbNam
futures.emplace_back(std::move(canReadFuture).semi().thenRunOn(executor));
}
if (recipientMtab) {
- auto canReadFuture = recipientMtab->getCanReadFuture(opCtx);
+ auto canReadFuture = recipientMtab->getCanReadFuture(opCtx, request.getCommandName());
if (canReadFuture.isReady()) {
auto status = canReadFuture.getNoThrow();
recipientMtab->recordTenantMigrationError(status);
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h
index 6a200163ff5..530632d1b1b 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h
@@ -53,12 +53,14 @@ std::shared_ptr<TenantMigrationRecipientAccessBlocker> getTenantMigrationRecipie
TenantMigrationDonorDocument parseDonorStateDocument(const BSONObj& doc);
/**
- * If the operation has read concern "snapshot" or includes afterClusterTime, and the database is
- * in the read blocking state at the given atClusterTime or afterClusterTime or the selected read
- * timestamp, the promise will be set for the returned future when the migration is committed or
- * aborted. Note: for better performance, check if the future is immediately ready.
+ * Checks if a request is allowed to read based on the tenant migration states of this node as a
+ * donor or as a recipient. TenantMigrationCommitted is returned if the request needs to be
+ * re-routed to the new owner of the tenant. If the tenant is currently being migrated and the
+ * request needs to block, a future for when the request is unblocked is returned, and the promise
+ * will be set for the returned future when the migration is committed or aborted. Note: for better
+ * performance, check if the future is immediately ready.
*/
-SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, StringData dbName);
+SemiFuture<void> checkIfCanReadOrBlock(OperationContext* opCtx, const OpMsgRequest& request);
/**
* If the operation has read concern "linearizable", throws TenantMigrationCommitted error if the
diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
index 1e6e452a08f..cb8ac0a7466 100644
--- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.cpp
@@ -47,8 +47,27 @@ namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(tenantMigrationDonorAllowsNonTimestampedReads);
+
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
+// Commands that are not allowed to run against the donor after a committed migration so that we
+// typically provide read-your-own-write guarantees for primary reads across tenant migrations.
+const StringMap<int> commandDenyListAfterMigration = {
+ {"find", 1},
+ {"count", 1},
+ {"distinct", 1},
+ {"aggregate", 1},
+ {"mapReduce", 1},
+ {"mapreduce", 1},
+ {"findAndModify", 1},
+ {"findandmodify", 1},
+ {"listCollections", 1},
+ {"listIndexes", 1},
+ {"update", 1},
+ {"delete", 1},
+};
+
} // namespace
TenantMigrationDonorAccessBlocker::TenantMigrationDonorAccessBlocker(
@@ -115,10 +134,10 @@ Status TenantMigrationDonorAccessBlocker::waitUntilCommittedOrAborted(OperationC
MONGO_UNREACHABLE;
}
-SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture(
- OperationContext* opCtx) {
+SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture(OperationContext* opCtx,
+ StringData command) {
auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto readTimestamp = [opCtx, &readConcernArgs]() -> std::optional<Timestamp> {
+ auto readTimestamp = [opCtx, &readConcernArgs]() -> boost::optional<Timestamp> {
if (auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime()) {
return afterClusterTime->asTimestamp();
}
@@ -128,20 +147,29 @@ SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::getCanReadFuture(
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
return repl::StorageInterface::get(opCtx)->getPointInTimeReadTimestamp(opCtx);
}
- return std::nullopt;
+ return boost::none;
}();
+
+ stdx::lock_guard<Latch> lk(_mutex);
if (!readTimestamp) {
- return SharedSemiFuture<void>();
+ if (!MONGO_unlikely(tenantMigrationDonorAllowsNonTimestampedReads.shouldFail()) &&
+ _state == State::kReject &&
+ commandDenyListAfterMigration.find(command) != commandDenyListAfterMigration.end()) {
+ LOGV2_DEBUG(5505100,
+ 1,
+ "Donor blocking non-timestamped reads after committed migration",
+ "command"_attr = command,
+ "tenantId"_attr = _tenantId);
+ return SharedSemiFuture<void>(
+ Status(ErrorCodes::TenantMigrationCommitted,
+ "Read must be re-routed to the new owner of this tenant"));
+ } else {
+ return SharedSemiFuture<void>();
+ }
}
- return _getCanDoClusterTimeReadFuture(opCtx, *readTimestamp);
-}
-
-SharedSemiFuture<void> TenantMigrationDonorAccessBlocker::_getCanDoClusterTimeReadFuture(
- OperationContext* opCtx, Timestamp readTimestamp) {
- stdx::unique_lock<Latch> ul(_mutex);
auto canRead = _state == State::kAllow || _state == State::kAborted ||
- _state == State::kBlockWrites || readTimestamp < *_blockTimestamp;
+ _state == State::kBlockWrites || *readTimestamp < *_blockTimestamp;
if (canRead) {
return SharedSemiFuture<void>();
diff --git a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
index 290754a567f..405d19bc3a0 100644
--- a/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_donor_access_blocker.h
@@ -189,7 +189,7 @@ public:
Status waitUntilCommittedOrAborted(OperationContext* opCtx, OperationType operationType) final;
Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) final;
- SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) final;
+ SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx, StringData command) final;
//
// Called by index build user threads before acquiring an index build slot, and again right
@@ -270,10 +270,6 @@ private:
void _onMajorityCommitCommitOpTime(stdx::unique_lock<Latch>& lk);
void _onMajorityCommitAbortOpTime(stdx::unique_lock<Latch>& lk);
- // Helper for the method 'getCanReadFuture()'.
- SharedSemiFuture<void> _getCanDoClusterTimeReadFuture(OperationContext* opCtx,
- Timestamp readTimestamp);
-
ServiceContext* _serviceContext;
const std::string _tenantId;
const std::string _recipientConnString;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
index 18040a2f596..d164ca37330 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.cpp
@@ -79,7 +79,7 @@ Status TenantMigrationRecipientAccessBlocker::waitUntilCommittedOrAborted(
}
SharedSemiFuture<void> TenantMigrationRecipientAccessBlocker::getCanReadFuture(
- OperationContext* opCtx) {
+ OperationContext* opCtx, StringData command) {
if (MONGO_unlikely(tenantMigrationRecipientNotRejectReads.shouldFail())) {
return SharedSemiFuture<void>();
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
index 3f622d62039..e06a6a82bf5 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker.h
@@ -90,7 +90,7 @@ public:
Status waitUntilCommittedOrAborted(OperationContext* opCtx, OperationType operationType) final;
Status checkIfLinearizableReadWasAllowed(OperationContext* opCtx) final;
- SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx) final;
+ SharedSemiFuture<void> getCanReadFuture(OperationContext* opCtx, StringData command) final;
//
// Called by index build user threads before acquiring an index build slot, and again right
diff --git a/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp
index e0f1cdf002f..d84cd1a8bf5 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_access_blocker_test.cpp
@@ -145,25 +145,25 @@ TEST_F(TenantMigrationRecipientAccessBlockerTest, StateReject) {
// Default read concern.
ASSERT_THROWS_CODE(
- mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld);
+ mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld);
// Majority read concern.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern);
ASSERT_THROWS_CODE(
- mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld);
+ mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld);
// Snapshot read concern.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern);
opCtx()->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided,
Timestamp(1, 1));
ASSERT_THROWS_CODE(
- mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld);
+ mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld);
// Snapshot read concern with atClusterTime.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern);
ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(1, 1));
ASSERT_THROWS_CODE(
- mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld);
+ mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld);
}
TEST_F(TenantMigrationRecipientAccessBlockerTest, StateRejectBefore) {
@@ -194,22 +194,22 @@ TEST_F(TenantMigrationRecipientAccessBlockerTest, StateRejectBefore) {
}
// Default read concern.
- ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow());
+ ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow());
// Majority read concern.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern);
- ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow());
+ ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow());
// Snapshot read at a later timestamp.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern);
ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(3, 1));
- ASSERT_OK(mtab.getCanReadFuture(opCtx()).getNoThrow());
+ ASSERT_OK(mtab.getCanReadFuture(opCtx(), "find").getNoThrow());
// Snapshot read at an earlier timestamp.
ReadConcernArgs::get(opCtx()) = ReadConcernArgs(ReadConcernLevel::kSnapshotReadConcern);
ReadConcernArgs::get(opCtx()).setArgsAtClusterTimeForSnapshot(Timestamp(1, 1));
ASSERT_THROWS_CODE(
- mtab.getCanReadFuture(opCtx()).get(), DBException, ErrorCodes::SnapshotTooOld);
+ mtab.getCanReadFuture(opCtx(), "find").get(), DBException, ErrorCodes::SnapshotTooOld);
}
} // namespace repl
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index e2f3e5957e0..84a913cdcd8 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -812,8 +812,8 @@ Future<void> InvokeCommand::run() {
auto execContext = _ecd->getExecutionContext();
// TODO SERVER-53761: find out if we can do this more asynchronously. The client
// Strand is locked to current thread in ServiceStateMachine::Impl::startNewLoop().
- tenant_migration_access_blocker::checkIfCanReadOrBlock(
- execContext->getOpCtx(), execContext->getRequest().getDatabase())
+ tenant_migration_access_blocker::checkIfCanReadOrBlock(execContext->getOpCtx(),
+ execContext->getRequest())
.get(execContext->getOpCtx());
return runCommandInvocation(_ecd->getExecutionContext(), _ecd->getInvocation());
})
@@ -830,8 +830,8 @@ Future<void> CheckoutSessionAndInvokeCommand::run() {
auto execContext = _ecd->getExecutionContext();
// TODO SERVER-53761: find out if we can do this more asynchronously.
- tenant_migration_access_blocker::checkIfCanReadOrBlock(
- execContext->getOpCtx(), execContext->getRequest().getDatabase())
+ tenant_migration_access_blocker::checkIfCanReadOrBlock(execContext->getOpCtx(),
+ execContext->getRequest())
.get(execContext->getOpCtx());
return runCommandInvocation(_ecd->getExecutionContext(), _ecd->getInvocation());
})