From 7d5b3ce7bbd430a8d9cb231befa9233e372c66cb Mon Sep 17 00:00:00 2001 From: Alex Li Date: Tue, 16 May 2023 16:23:37 +0000 Subject: SERVER-73779 Add maxTimeMSOpOnly to network message if it is a hedge sent through async_rpc API --- src/mongo/executor/SConscript | 1 + src/mongo/executor/hedged_async_rpc.h | 17 ++++ src/mongo/executor/hedged_async_rpc_test.cpp | 113 +++++++++++++++++++++++--- src/mongo/executor/remote_command_request.cpp | 15 ++-- 4 files changed, 127 insertions(+), 19 deletions(-) diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index b112f2c6aa3..587c4713785 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -37,6 +37,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/api_parameters', '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/s/mongos_server_parameters', '$BUILD_DIR/mongo/util/net/network', ], ) diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h index 6ab98cb9b59..15868c390db 100644 --- a/src/mongo/executor/hedged_async_rpc.h +++ b/src/mongo/executor/hedged_async_rpc.h @@ -226,6 +226,7 @@ SemiFuture> sendHedgedCommand( }); } + const auto globalMaxTimeMSForHedgedReads = gMaxTimeMSForHedgedReads.load(); for (size_t i = 0; i < hostsToTarget; i++) { std::unique_ptr t = std::make_unique(targets[i]); // We explicitly pass "NeverRetryPolicy" here because the retry mechanism @@ -237,6 +238,22 @@ SemiFuture> sendHedgedCommand( hedgeCancellationToken.token(), std::make_shared(), genericArgs); + + // If the request is a hedged request, set maxTimeMSOpOnly to the smaller of + // the server parameter maxTimeMSForHedgedReads or remaining max time from the + // opCtx. + if (opts.isHedgeEnabled && i != 0) { + auto maxTimeMSOpOnly = globalMaxTimeMSForHedgedReads; + if (opCtx->hasDeadline()) { + if (auto remainingMaxTime = opCtx->getRemainingMaxTimeMillis().count(); + remainingMaxTime < maxTimeMSOpOnly) { + maxTimeMSOpOnly = remainingMaxTime; + } + } + + options->genericArgs.unstable.setMaxTimeMSOpOnly(maxTimeMSOpOnly); + } + options->baton = baton; requests.push_back( sendCommand(options, opCtx, std::move(t)).thenRunOn(proxyExec)); diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp index de36a80b67c..b262e6d838f 100644 --- a/src/mongo/executor/hedged_async_rpc_test.cpp +++ b/src/mongo/executor/hedged_async_rpc_test.cpp @@ -185,6 +185,13 @@ TEST_F(HedgedAsyncRPCTest, FindHedgeRequestTwoHosts) { auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); onCommand([&](const auto& request) { + // Only check maxTimeMSOpOnly on hedged requests + if (request.target != kTwoHosts[0]) { + ASSERT(request.cmdObj["maxTimeMSOpOnly"]); + ASSERT_EQ(request.cmdObj["maxTimeMSOpOnly"].Long(), gMaxTimeMSForHedgedReads.load()); + } else { + ASSERT(!request.cmdObj["maxTimeMSOpOnly"]); + } ASSERT(request.cmdObj["find"]); return CursorResponse(testNS, 0LL, {testFirstBatch}) .toBSON(CursorResponse::ResponseType::InitialResponse); @@ -216,6 +223,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequest) { auto resultFuture = sendHedgedCommandWithHosts(helloCmd, kTwoHosts); onCommand([&](const auto& request) { + ASSERT(!request.cmdObj["maxTimeMSOpOnly"]); ASSERT(request.cmdObj["hello"]); return helloReply.toBSON(); }); @@ -293,7 +301,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRemoteErrorWithGenericReplyFields) { remoteErrorBson = remoteErrorBson.addFields(unstableFields.toBSON()); const auto rcr = RemoteCommandResponse(remoteErrorBson, Milliseconds(1)); network->scheduleResponse(hedged, now, rcr); - network->scheduleSuccessfulResponse(authoritative, now + Milliseconds(1000), rcr); + network->scheduleSuccessfulResponse(authoritative, now + Milliseconds(100), rcr); }); network->runUntil(now + Milliseconds(1500)); @@ -401,6 +409,85 @@ TEST_F(HedgedAsyncRPCTest, ExecutorShutdown) { ASSERT(ErrorCodes::isA(extraInfo->asLocal())); } +/** + * Check that hedged commands return expiration errors if timeout is exceeded. + * A lot of the deadline behavior is test only, but we only want to check that the timeout is + * set correctly and respected here. + */ +TEST_F(HedgedAsyncRPCTest, TimeoutExceeded) { + auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + + auto network = getNetworkInterfaceMock(); + auto now = getNetworkInterfaceMock()->now(); + network->enterNetwork(); + + // Send hedged requests to exceed max time, which would fail the operation with + // NetworkInterfaceMock's fatal error. In production, "MaxTimeMSExpired" would be returned, + // and these would be considered "success" by the network interface. But because there is + // no deadline set by service entry point, we rely on NetworkInterfaceMock's timeout checks. + performAuthoritativeHedgeBehavior( + network, + [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, + NetworkInterfaceMock::NetworkOperationIterator hedged) { + network->scheduleSuccessfulResponse( + authoritative, now + Milliseconds(1100), testSuccessResponse); + network->scheduleSuccessfulResponse( + hedged, now + Milliseconds(1000), testSuccessResponse); + }); + + network->runUntil(now + Milliseconds(1500)); + + auto counters = network->getCounters(); + network->exitNetwork(); + ASSERT_EQ(counters.failed, 1); + ASSERT_EQ(counters.canceled, 1); + + auto error = resultFuture.getNoThrow().getStatus(); + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + + auto extraInfo = error.extraInfo(); + ASSERT(extraInfo); + + // Fails NetworkInterfaceMock's timeout check. + ASSERT(extraInfo->isLocal()); + auto localError = extraInfo->asLocal(); + ASSERT_EQ(localError, ErrorCodes::NetworkInterfaceExceededTimeLimit); +} + +/** + * Check that hedged commands send maxTimeMSOpOnly with opCtx deadline given that the deadline is + * smaller than the global maxTimeMSForHedgedReads default. + */ +TEST_F(HedgedAsyncRPCTest, OpCtxRemainingDeadline) { + const auto kDeadline = 100; + getOpCtx()->setDeadlineAfterNowBy(Milliseconds(kDeadline), ErrorCodes::MaxTimeMSExpired); + auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + + onCommand([&](const auto& request) { + // Only check deadline from opCtx here, success/fail doesn't matter. In a real system, + // this deadline would have real timeout effects on the target host. + if (request.target != kTwoHosts[0]) { + ASSERT(request.cmdObj["maxTimeMSOpOnly"]); + ASSERT_EQ(request.cmdObj["maxTimeMSOpOnly"].Long(), kDeadline); + } + ASSERT(request.cmdObj["find"]); + return CursorResponse(testNS, 0LL, {testFirstBatch}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + auto network = getNetworkInterfaceMock(); + auto now = getNetworkInterfaceMock()->now(); + network->enterNetwork(); + network->runUntil(now + Milliseconds(150)); + network->exitNetwork(); + + auto counters = getNetworkInterfaceCounters(); + ASSERT_EQ(counters.succeeded, 1); + ASSERT_EQ(counters.canceled, 1); + + std::move(resultFuture).get(); +} + /** * When a hedged command is sent and one request resolves with a non-ignorable error, we propagate * that error upwards and cancel the other requests. @@ -451,7 +538,7 @@ TEST_F(HedgedAsyncRPCTest, BothCommandsFailWithIgnorableError) { NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse(hedged, now, testIgnorableErrorResponse); network->scheduleSuccessfulResponse( - authoritative, now + Milliseconds(1000), testIgnorableErrorResponse); + authoritative, now + Milliseconds(100), testIgnorableErrorResponse); }); network->runUntil(now + Milliseconds(1500)); @@ -486,7 +573,7 @@ TEST_F(HedgedAsyncRPCTest, AllCommandsFailWithIgnorableError) { [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged1) { network->scheduleResponse( - authoritative, now + Milliseconds(1000), testIgnorableErrorResponse); + authoritative, now + Milliseconds(100), testIgnorableErrorResponse); network->scheduleResponse(hedged1, now, testIgnorableErrorResponse); }); @@ -529,7 +616,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithIgnorableErrorAuthoritativeSucceeds) { NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse(hedged, now, testIgnorableErrorResponse); network->scheduleSuccessfulResponse( - authoritative, now + Milliseconds(1000), testSuccessResponse); + authoritative, now + Milliseconds(100), testSuccessResponse); }); network->runUntil(now + Milliseconds(1500)); @@ -570,7 +657,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeFailsWithIgnorableErrorHedgedCancelled) NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse(authoritative, now, testIgnorableErrorResponse); network->scheduleSuccessfulResponse( - hedged, now + Milliseconds(1000), testSuccessResponse); + hedged, now + Milliseconds(100), testSuccessResponse); }); network->runUntil(now + Milliseconds(1500)); @@ -612,7 +699,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeFailsWithFatalErrorHedgedCancelled) { NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse(authoritative, now, testFatalErrorResponse); network->scheduleSuccessfulResponse( - hedged, now + Milliseconds(1000), testSuccessResponse); + hedged, now + Milliseconds(100), testSuccessResponse); }); network->runUntil(now + Milliseconds(1500)); @@ -653,7 +740,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeSucceedsHedgedCancelled) { NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleSuccessfulResponse(authoritative, now, testSuccessResponse); network->scheduleSuccessfulResponse( - hedged, now + Milliseconds(1000), testFatalErrorResponse); + hedged, now + Milliseconds(100), testFatalErrorResponse); }); network->runUntil(now + Milliseconds(1500)); @@ -687,7 +774,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedSucceedsAuthoritativeCancelled) { [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleSuccessfulResponse( - authoritative, now + Milliseconds(1000), testFatalErrorResponse); + authoritative, now + Milliseconds(100), testFatalErrorResponse); network->scheduleSuccessfulResponse(hedged, now, testSuccessResponse); }); @@ -729,7 +816,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedThenAuthoritativeFailsWithIgnorableError) { [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse( - authoritative, now + Milliseconds(1000), testIgnorableErrorResponse); + authoritative, now + Milliseconds(100), testIgnorableErrorResponse); network->scheduleResponse(hedged, now, testAlternateIgnorableErrorResponse); }); @@ -773,7 +860,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithIgnorableErrorAuthoritativeFailsWithFa [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { network->scheduleResponse( - authoritative, now + Milliseconds(1000), testFatalErrorResponse); + authoritative, now + Milliseconds(100), testFatalErrorResponse); network->scheduleResponse(hedged, now, testIgnorableErrorResponse); }); @@ -815,7 +902,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeSucceedsHedgedFailsWithIgnorableError) { network, [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { - network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse); + network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse); network->scheduleResponse(hedged, now, testIgnorableErrorResponse); }); @@ -851,7 +938,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithFatalErrorAuthoritativeCanceled) { network, [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { - network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse); + network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse); network->scheduleResponse(hedged, now, testFatalErrorResponse); }); @@ -977,7 +1064,7 @@ TEST_F(HedgedAsyncRPCTest, RemoteErrorAttemptedTargetsContainActual) { network, [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, NetworkInterfaceMock::NetworkOperationIterator hedged) { - network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse); + network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse); network->scheduleResponse(hedged, now, testFatalErrorResponse); }); diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp index b3fa3fb687f..eef86bcd34e 100644 --- a/src/mongo/executor/remote_command_request.cpp +++ b/src/mongo/executor/remote_command_request.cpp @@ -38,6 +38,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/query_request_helper.h" #include "mongo/platform/atomic_word.h" +#include "mongo/s/mongos_server_parameters_gen.h" #include "mongo/util/str.h" using namespace fmt::literals; @@ -73,12 +74,14 @@ RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId, ? theCmdObj.addField(*opCtx->getComment()) : cmdObj = theCmdObj; - // maxTimeMSOpOnly is set in the network interface based on the remaining max time attached to - // the OpCtx. It should never be specified explicitly. - uassert(4924403, - str::stream() << "Command request object should not manually specify " - << query_request_helper::kMaxTimeMSOpOnlyField, - !cmdObj.hasField(query_request_helper::kMaxTimeMSOpOnlyField)); + // For hedged requests, adjust timeout + if (cmdObj.hasField("maxTimeMSOpOnly")) { + int maxTimeField = cmdObj["maxTimeMSOpOnly"].Number(); + if (auto maxTimeMSOpOnly = Milliseconds(maxTimeField); + timeout == executor::RemoteCommandRequest::kNoTimeout || maxTimeMSOpOnly < timeout) { + timeout = maxTimeMSOpOnly; + } + } if (options.hedgeOptions.isHedgeEnabled) { operationKey.emplace(UUID::gen()); -- cgit v1.2.1