diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2020-08-26 14:30:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-04 04:40:00 +0000 |
commit | a4bd6d8c91b03940bae67035ac04b41087a8c7fa (patch) | |
tree | a0d00155bd93b5329ad106452e4926b4e20bd12d | |
parent | a08a00b6c55c6bd43df2dd76893a6917f46cebdc (diff) | |
download | mongo-a4bd6d8c91b03940bae67035ac04b41087a8c7fa.tar.gz |
SERVER-50104 Implement proxy's retry logic for non-batch write commands
-rw-r--r-- | jstests/libs/override_methods/inject_tenant_prefix.js | 42 | ||||
-rw-r--r-- | jstests/replsets/writes_during_tenant_migration.js | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_access_blocker.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.h | 63 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 9 |
5 files changed, 81 insertions, 60 deletions
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index f8915734d05..3d180875019 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -183,14 +183,23 @@ Mongo.prototype.runCommand = function(dbName, cmdObj, options) { // applicable database names and namespaces. const cmdObjWithDbPrefix = createCmdObjWithDbPrefix(cmdObj); - let resObj = originalRunCommand.apply( - this, [prependDbPrefixToDbNameIfApplicable(dbName), cmdObjWithDbPrefix, options]); + let numAttempts = 0; - // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests - // assume the command was run against the original database. - removeDbPrefix(resObj); + while (true) { + numAttempts++; + let resObj = originalRunCommand.apply( + this, [prependDbPrefixToDbNameIfApplicable(dbName), cmdObjWithDbPrefix, options]); - return resObj; + // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests + // assume the command was run against the original database. + removeDbPrefix(resObj); + + if (resObj.code != ErrorCodes.TenantMigrationAborted) { + return resObj; + } + jsTest.log("Got TenantMigrationAborted after trying " + numAttempts + + " times, retrying command " + tojson(cmdObj)); + } }; Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) { @@ -198,14 +207,23 @@ Mongo.prototype.runCommandWithMetadata = function(dbName, metadata, commandArgs) // applicable database names and namespaces. const commandArgsWithDbPrefix = createCmdObjWithDbPrefix(commandArgs); - let resObj = originalRunCommand.apply( - this, [prependDbPrefixToDbNameIfApplicable(dbName), metadata, commandArgsWithDbPrefix]); + let numAttempts = 0; - // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests - // assume the command was run against the original database. - removeDbPrefix(resObj); + while (true) { + numAttempts++; + let resObj = originalRunCommand.apply( + this, [prependDbPrefixToDbNameIfApplicable(dbName), metadata, commandArgsWithDbPrefix]); - return resObj; + // Remove TestData.dbPrefix from all database names and namespaces in the resObj since tests + // assume the command was run against the original database. + removeDbPrefix(resObj); + + if (resObj.code != ErrorCodes.TenantMigrationAborted) { + return resObj; + } + jsTest.log("Got TenantMigrationAborted after trying " + numAttempts + + " times, retrying command " + tojson(commandArgs)); + } }; OverrideHelpers.prependOverrideInParallelShell( diff --git a/jstests/replsets/writes_during_tenant_migration.js b/jstests/replsets/writes_during_tenant_migration.js index dc955518597..174c25bd17b 100644 --- a/jstests/replsets/writes_during_tenant_migration.js +++ b/jstests/replsets/writes_during_tenant_migration.js @@ -1,7 +1,6 @@ /** * Tests that the donor blocks writes that are executed while the migration in the blocking state, - * then rejects the writes if the migration commits and and internally retries the writes if the - * migration aborts. + * then rejects the writes when the migration completes. * * Tenant migrations are not expected to be run on servers with ephemeralForTest, and in particular * this test fails on ephemeralForTest because the donor has to wait for the write to set the @@ -321,10 +320,10 @@ function testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits(testCase, te } /** - * Tests that the donor blocks writes that are executed in the blocking state and internally retries - * them after the migration aborts. + * Tests that the donor blocks writes that are executed in the blocking state and rejects them after + * the migration aborts. */ -function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testOpts) { +function testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts(testCase, testOpts) { let blockingFp = configureFailPoint(testOpts.primaryDB, "pauseTenantMigrationAfterBlockingStarts"); let abortFp = configureFailPoint(testOpts.primaryDB, "abortTenantMigrationAfterBlockingStarts"); @@ -346,8 +345,8 @@ function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testO blockingFp.wait(); // The migration should unpause and abort after the write is blocked. Verify that the write is - // retried and succeeds. - runCommand(testOpts); + // rejected. + runCommand(testOpts, ErrorCodes.TenantMigrationAborted); // Verify that the migration aborted due to the simulated error. resumeMigrationThread.join(); @@ -355,7 +354,7 @@ function testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts(testCase, testO abortFp.off(); assert.commandFailedWithCode(migrationThread.returnData(), ErrorCodes.TenantMigrationAborted); - testCase.assertCommandSucceeded(testOpts.primaryDB, testOpts.dbName, testOpts.collName); + testCase.assertCommandFailed(testOpts.primaryDB, testOpts.dbName, testOpts.collName); } const isNotWriteCommand = "not a write command"; @@ -841,7 +840,7 @@ const testFuncs = { inAborted: testWriteIsAcceptedIfSentAfterMigrationHasAborted, inBlocking: testWriteBlocksIfMigrationIsInBlocking, inBlockingThenCommitted: testBlockedWriteGetsUnblockedAndRejectedIfMigrationCommits, - inBlockingThenAborted: testBlockedReadGetsUnblockedAndRetriedIfMigrationAborts + inBlockingThenAborted: testBlockedReadGetsUnblockedAndRejectedIfMigrationAborts }; for (const [testName, testFunc] of Object.entries(testFuncs)) { diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_access_blocker.cpp index c04104837e1..ef989757aa0 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker.cpp @@ -84,9 +84,13 @@ void TenantMigrationAccessBlocker::checkIfCanWriteOrBlock(OperationContext* opCt opCtx->waitForConditionOrInterrupt( _transitionOutOfBlockingCV, ul, [&]() { return canWrite() || _access == Access::kReject; }); - uassert(ErrorCodes::TenantMigrationCommitted, - "Write must be re-routed to the new owner of this database", - _access == Access::kAllow); + auto status = onCompletion().getNoThrow(); + if (status.isOK()) { + invariant(_access == Access::kReject); + uasserted(ErrorCodes::TenantMigrationCommitted, + "Write must be re-routed to the new owner of this database"); + } + uassertStatusOK(status); } void TenantMigrationAccessBlocker::checkIfCanDoClusterTimeReadOrBlock( diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 274793a1b1f..15fce6eedd6 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -74,42 +74,43 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat void onWriteToDatabase(OperationContext* opCtx, StringData dbName); /** - * Runs the argument function 'callable' as many times as needed for it to complete or throw an - * exception or return a non-OK status (as indicated in 'replyBuilder') other than - * TenantMigrationConflict. Clears 'replyBuilder' before each retry. + * Runs the argument function 'callable'. If it throws a TenantMigrationConflict error (as indicated + * in 'replyBuilder'), clears 'replyBuilder' and blocks until the migration commits or aborts, then + * throws TenantMigrationCommitted or TenantMigrationAborted. */ template <typename Callable> -void migrationConflictRetry(OperationContext* opCtx, - Callable&& callable, - rpc::ReplyBuilderInterface* replyBuilder) { +void migrationConflictHandler(OperationContext* opCtx, + StringData dbName, + Callable&& callable, + rpc::ReplyBuilderInterface* replyBuilder) { + checkIfCanReadOrBlock(opCtx, dbName); + auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - while (true) { - try { - // callable will modify replyBuilder. - callable(); - auto replyBodyBuilder = replyBuilder->getBodyBuilder(); - - // getStatusFromWriteCommandReply expects an 'ok' field. - CommandHelpers::extractOrAppendOk(replyBodyBuilder); - - // Commands such as insert, update, delete, and applyOps return the result as a status - // rather than throwing. - const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); - - if (status == ErrorCodes::TenantMigrationConflict) { - uassertStatusOK(status); - } - break; - } catch (const TenantMigrationConflictException& ex) { - auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>(); - invariant(migrationConflictInfo); - - if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker( - migrationConflictInfo->getDatabasePrefix())) { - mtab->checkIfCanWriteOrBlock(opCtx); - } + try { + // callable will modify replyBuilder. + callable(); + auto replyBodyBuilder = replyBuilder->getBodyBuilder(); + + // getStatusFromWriteCommandReply expects an 'ok' field. + CommandHelpers::extractOrAppendOk(replyBodyBuilder); + + // Commands such as insert, update, delete, and applyOps return the result as a status + // rather than throwing. + const auto status = getStatusFromWriteCommandReply(replyBodyBuilder.asTempObj()); + + if (status == ErrorCodes::TenantMigrationConflict) { + uassertStatusOK(status); + } + return; + } catch (const TenantMigrationConflictException& ex) { + auto migrationConflictInfo = ex.extraInfo<TenantMigrationConflictInfo>(); + invariant(migrationConflictInfo); + + if (auto mtab = mtabByPrefix.getTenantMigrationAccessBlocker( + migrationConflictInfo->getDatabasePrefix())) { replyBuilder->getBodyBuilder().resetToEmpty(); + mtab->checkIfCanWriteOrBlock(opCtx); } } } diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 6e15fb29bff..fb083bc6fa5 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -520,9 +520,9 @@ void invokeWithNoSession(OperationContext* opCtx, const OpMsgRequest& request, CommandInvocation* invocation, rpc::ReplyBuilderInterface* replyBuilder) { - tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase()); - tenant_migration_donor::migrationConflictRetry( + tenant_migration_donor::migrationConflictHandler( opCtx, + request.getDatabase(), [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, replyBuilder); } @@ -628,14 +628,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, } } - tenant_migration_donor::checkIfCanReadOrBlock(opCtx, request.getDatabase()); - // Use the API parameters that were stored when the transaction was initiated. APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx); try { - tenant_migration_donor::migrationConflictRetry( + tenant_migration_donor::migrationConflictHandler( opCtx, + request.getDatabase(), [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, replyBuilder); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) { |