summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2020-08-26 14:30:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-04 04:40:00 +0000
commita4bd6d8c91b03940bae67035ac04b41087a8c7fa (patch)
treea0d00155bd93b5329ad106452e4926b4e20bd12d
parenta08a00b6c55c6bd43df2dd76893a6917f46cebdc (diff)
downloadmongo-a4bd6d8c91b03940bae67035ac04b41087a8c7fa.tar.gz
SERVER-50104 Implement proxy's retry logic for non-batch write commands
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js42
-rw-r--r--jstests/replsets/writes_during_tenant_migration.js17
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.cpp10
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_util.h63
-rw-r--r--src/mongo/db/service_entry_point_common.cpp9
5 files changed, 81 insertions, 60 deletions
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index f8915734d05..3d180875019 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -183,14 +183,23 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) {
// applicable database names and namespaces.
const cmdObjWithDbPrefix = createCmdObjWithDbPrefix(cmdObj);
- let resObj = originalRunCommand.apply(
- this, [prependDbPrefixToDbNameIfApplicable(dbName), cmdObjWithDbPrefix, options]);
+ let numAttempts = 0;
- // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests
- // assume the command was run against the original database.
- removeDbPrefix(resObj);
+ while (true) {
+ numAttempts++;
+ let resObj = originalRunCommand.apply(
+ this, [prependDbPrefixToDbNameIfApplicable(dbName), cmdObjWithDbPrefix, options]);
- return resObj;
+ // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests
+ // assume the command was run against the original database.
+ removeDbPrefix(resObj);
+
+ if (resObj.code != ErrorCodes.TenantMigrationAborted) {
+ return resObj;
+ }
+ jsTest.log("Got TenantMigrationAborted after trying " + numAttempts +
+ " times, retrying command " + tojson(cmdObj));
+ }
};
Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) {
@@ -198,14 +207,23 @@ Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs)
// applicable database names and namespaces.
const commandArgsWithDbPrefix = createCmdObjWithDbPrefix(commandArgs);
- let resObj = originalRunCommand.apply(
- this, [prependDbPrefixToDbNameIfApplicable(dbName), metadata, commandArgsWithDbPrefix]);
+ let numAttempts = 0;
- // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests
- // assume the command was run against the original database.
- removeDbPrefix(resObj);
+ while (true) {
+ numAttempts++;
+ let resObj = originalRunCommand.apply(
+ this, [prependDbPrefixToDbNameIfApplicable(dbName), metadata, commandArgsWithDbPrefix]);
- return resObj;
+ // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests
+ // assume the command was run against the original database.
+ removeDbPrefix(resObj);
+
+ if (resObj.code != ErrorCodes.TenantMigrationAborted) {
+ return resObj;
+ }
+ jsTest.log("Got TenantMigrationAborted after trying " + numAttempts +
+ " times, retrying command " + tojson(commandArgs));
+ }
};
OverrideHelpers.prependOverrideInParallelShell(
diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js
index dc955518597..174c25bd17b 100644
--- a/jstests/replsets/writes_during_tenant_migration.js
+++ b/jstests/replsets/writes_during_tenant_migration.js
@@ -1,7 +1,6 @@
/**
* Tests that the donor blocks writes that are executed while the migration in the blocking state,
- * then rejects the writes if the migration commits and and internally retries the writes if the
- * migration aborts.
+ * then rejects the writes when the migration completes.
*
* Tenant migrations are not expected to be run on servers with ephemeralForTest, and in particular
* this test fails on ephemeralForTest because the donor has to wait for the write to set the
@@ -321,10 +320,10 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te
}
/**
- * Tests that the donor blocks writes that are executed in the blocking state and internally retries
- * them after the migration aborts.
+ * Tests that the donor blocks writes that are executed in the blocking state and rejects them after
+ * the migration aborts.
*/
-function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testOpts) {
+function testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) {
let blockingFp =
configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts");
let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts");
@@ -346,8 +345,8 @@ function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testO
blockingFp.wait();
// The migration should unpause and abort after the write is blocked. Verify that the write is
- // retried and succeeds.
- runCommand(testOpts);
+ // rejected.
+ runCommand(testOpts, ErrorCodes.TenantMigrationAborted);
// Verify that the migration aborted due to the simulated error.
resumeMigrationThread.join();
@@ -355,7 +354,7 @@ function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testO
abortFp.off();
assert.commandFailedWithCode(migrationThread.returnData(), ErrorCodes.TenantMigrationAborted);
- testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
+ testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName);
}
const isNotWriteCommand = "not a write command";
@@ -841,7 +840,7 @@ const testFuncs = {
inAborted: testWriteIsAcceptedIfSentAfterMigrationHasAborted,
inBlocking: testWriteBlocksIfMigrationIsInBlocking,
inBlockingThenCommitted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits,
- inBlockingThenAborted: testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts
+ inBlockingThenAborted: testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts
};
for (const [testName, testFunc] of Object.entries(testFuncs)) {
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_access_blocker.cpp
index c04104837e1..ef989757aa0 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.cpp
@@ -84,9 +84,13 @@ void TenantMigrationAccessBlocker::checkIfCanWriteOrBlock(OperationContext* opCt
opCtx->waitForConditionOrInterrupt(
_transitionOutOfBlockingCV, ul, [&]() { return canWrite() || _access == Access::kReject; });
- uassert(ErrorCodes::TenantMigrationCommitted,
- "Write must be re-routed to the new owner of this database",
- _access == Access::kAllow);
+ auto status = onCompletion().getNoThrow();
+ if (status.isOK()) {
+ invariant(_access == Access::kReject);
+ uasserted(ErrorCodes::TenantMigrationCommitted,
+ "Write must be re-routed to the new owner of this database");
+ }
+ uassertStatusOK(status);
}
void TenantMigrationAccessBlocker::checkIfCanDoClusterTimeReadOrBlock(
diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h
index 274793a1b1f..15fce6eedd6 100644
--- a/src/mongo/db/repl/tenant_migration_donor_util.h
+++ b/src/mongo/db/repl/tenant_migration_donor_util.h
@@ -74,42 +74,43 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat
void onWriteToDatabase(OperationContext* opCtx, StringData dbName);
/**
- * Runs the argument function 'callable' as many times as needed for it to complete or throw an
- * exception or return a non-OK status (as indicated in 'replyBuilder') other than
- * TenantMigrationConflict. Clears 'replyBuilder' before each retry.
+ * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated
+ * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then
+ * throws TenantMigrationCommitted or TenantMigrationAborted.
*/
template <typename Callable>
-void migrationConflictRetry(OperationContext* opCtx,
- Callable&& callable,
- rpc::ReplyBuilderInterface* replyBuilder) {
+void migrationConflictHandler(OperationContext* opCtx,
+ StringData dbName,
+ Callable&& callable,
+ rpc::ReplyBuilderInterface* replyBuilder) {
+ checkIfCanReadOrBlock(opCtx, dbName);
+
auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext());
- while (true) {
- try {
- // callable will modify replyBuilder.
- callable();
- auto replyBodyBuilder = replyBuilder->getBodyBuilder();
-
- // getStatusFromWriteCommandReply expects an 'ok' field.
- CommandHelpers::extractOrAppendOk(replyBodyBuilder);
-
- // Commands such as insert, update, delete, and applyOps return the result as a status
- // rather than throwing.
- const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
-
- if (status == ErrorCodes::TenantMigrationConflict) {
- uassertStatusOK(status);
- }
- break;
- } catch (const TenantMigrationConflictException& ex) {
- auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>();
- invariant(migrationConflictInfo);
-
- if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker(
- migrationConflictInfo->getDatabasePrefix())) {
- mtab->checkIfCanWriteOrBlock(opCtx);
- }
+ try {
+ // callable will modify replyBuilder.
+ callable();
+ auto replyBodyBuilder = replyBuilder->getBodyBuilder();
+
+ // getStatusFromWriteCommandReply expects an 'ok' field.
+ CommandHelpers::extractOrAppendOk(replyBodyBuilder);
+
+ // Commands such as insert, update, delete, and applyOps return the result as a status
+ // rather than throwing.
+ const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj());
+
+ if (status == ErrorCodes::TenantMigrationConflict) {
+ uassertStatusOK(status);
+ }
+ return;
+ } catch (const TenantMigrationConflictException& ex) {
+ auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>();
+ invariant(migrationConflictInfo);
+
+ if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker(
+ migrationConflictInfo->getDatabasePrefix())) {
replyBuilder->getBodyBuilder().resetToEmpty();
+ mtab->checkIfCanWriteOrBlock(opCtx);
}
}
}
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 6e15fb29bff..fb083bc6fa5 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -520,9 +520,9 @@ void invokeWithNoSession(OperationContext* opCtx,
const OpMsgRequest& request,
CommandInvocation* invocation,
rpc::ReplyBuilderInterface* replyBuilder) {
- tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase());
- tenant_migration_donor::migrationConflictRetry(
+ tenant_migration_donor::migrationConflictHandler(
opCtx,
+ request.getDatabase(),
[&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
replyBuilder);
}
@@ -628,14 +628,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
}
}
- tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase());
-
// Use the API parameters that were stored when the transaction was initiated.
APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx);
try {
- tenant_migration_donor::migrationConflictRetry(
+ tenant_migration_donor::migrationConflictHandler(
opCtx,
+ request.getDatabase(),
[&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
replyBuilder);
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) {