summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Li <alex.li@mongodb.com>2023-05-16 16:23:37 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-16 17:48:12 +0000
commit7d5b3ce7bbd430a8d9cb231befa9233e372c66cb (patch)
treeb8168845e8d76f0987aa4dc3dd7af866172180fb
parentf1b417202d283df43bd3f9833726097297d34f63 (diff)
downloadmongo-7d5b3ce7bbd430a8d9cb231befa9233e372c66cb.tar.gz
SERVER-73779 Add maxTimeMSOpOnly to network message if it is a hedge sent through async_rpc API
-rw-r--r--src/mongo/executor/SConscript1
-rw-r--r--src/mongo/executor/hedged_async_rpc.h17
-rw-r--r--src/mongo/executor/hedged_async_rpc_test.cpp113
-rw-r--r--src/mongo/executor/remote_command_request.cpp15
4 files changed, 127 insertions, 19 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index b112f2c6aa3..587c4713785 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -37,6 +37,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/api_parameters',
'$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/s/mongos_server_parameters',
'$BUILD_DIR/mongo/util/net/network',
],
)
diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h
index 6ab98cb9b59..15868c390db 100644
--- a/src/mongo/executor/hedged_async_rpc.h
+++ b/src/mongo/executor/hedged_async_rpc.h
@@ -226,6 +226,7 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
});
}
+ const auto globalMaxTimeMSForHedgedReads = gMaxTimeMSForHedgedReads.load();
for (size_t i = 0; i < hostsToTarget; i++) {
std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]);
// We explicitly pass "NeverRetryPolicy" here because the retry mechanism
@@ -237,6 +238,22 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
hedgeCancellationToken.token(),
std::make_shared<NeverRetryPolicy>(),
genericArgs);
+
+ // If the request is a hedged request, set maxTimeMSOpOnly to the smaller of
+ // the server parameter maxTimeMSForHedgedReads or remaining max time from the
+ // opCtx.
+ if (opts.isHedgeEnabled && i != 0) {
+ auto maxTimeMSOpOnly = globalMaxTimeMSForHedgedReads;
+ if (opCtx->hasDeadline()) {
+ if (auto remainingMaxTime = opCtx->getRemainingMaxTimeMillis().count();
+ remainingMaxTime < maxTimeMSOpOnly) {
+ maxTimeMSOpOnly = remainingMaxTime;
+ }
+ }
+
+ options->genericArgs.unstable.setMaxTimeMSOpOnly(maxTimeMSOpOnly);
+ }
+
options->baton = baton;
requests.push_back(
sendCommand(options, opCtx, std::move(t)).thenRunOn(proxyExec));
diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp
index de36a80b67c..b262e6d838f 100644
--- a/src/mongo/executor/hedged_async_rpc_test.cpp
+++ b/src/mongo/executor/hedged_async_rpc_test.cpp
@@ -185,6 +185,13 @@ TEST_F(HedgedAsyncRPCTest, FindHedgeRequestTwoHosts) {
auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
onCommand([&](const auto& request) {
+ // Only check maxTimeMSOpOnly on hedged requests
+ if (request.target != kTwoHosts[0]) {
+ ASSERT(request.cmdObj["maxTimeMSOpOnly"]);
+ ASSERT_EQ(request.cmdObj["maxTimeMSOpOnly"].Long(), gMaxTimeMSForHedgedReads.load());
+ } else {
+ ASSERT(!request.cmdObj["maxTimeMSOpOnly"]);
+ }
ASSERT(request.cmdObj["find"]);
return CursorResponse(testNS, 0LL, {testFirstBatch})
.toBSON(CursorResponse::ResponseType::InitialResponse);
@@ -216,6 +223,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequest) {
auto resultFuture = sendHedgedCommandWithHosts(helloCmd, kTwoHosts);
onCommand([&](const auto& request) {
+ ASSERT(!request.cmdObj["maxTimeMSOpOnly"]);
ASSERT(request.cmdObj["hello"]);
return helloReply.toBSON();
});
@@ -293,7 +301,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRemoteErrorWithGenericReplyFields) {
remoteErrorBson = remoteErrorBson.addFields(unstableFields.toBSON());
const auto rcr = RemoteCommandResponse(remoteErrorBson, Milliseconds(1));
network->scheduleResponse(hedged, now, rcr);
- network->scheduleSuccessfulResponse(authoritative, now + Milliseconds(1000), rcr);
+ network->scheduleSuccessfulResponse(authoritative, now + Milliseconds(100), rcr);
});
network->runUntil(now + Milliseconds(1500));
@@ -402,6 +410,85 @@ TEST_F(HedgedAsyncRPCTest, ExecutorShutdown) {
}
/**
+ * Check that hedged commands return expiration errors if timeout is exceeded.
+ * A lot of the deadline behavior is test only, but we only want to check that the timeout is
+ * set correctly and respected here.
+ */
+TEST_F(HedgedAsyncRPCTest, TimeoutExceeded) {
+ auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+
+ auto network = getNetworkInterfaceMock();
+ auto now = getNetworkInterfaceMock()->now();
+ network->enterNetwork();
+
+ // Send hedged requests to exceed max time, which would fail the operation with
+ // NetworkInterfaceMock's fatal error. In production, "MaxTimeMSExpired" would be returned,
+ // and these would be considered "success" by the network interface. But because there is
+ // no deadline set by service entry point, we rely on NetworkInterfaceMock's timeout checks.
+ performAuthoritativeHedgeBehavior(
+ network,
+ [&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
+ NetworkInterfaceMock::NetworkOperationIterator hedged) {
+ network->scheduleSuccessfulResponse(
+ authoritative, now + Milliseconds(1100), testSuccessResponse);
+ network->scheduleSuccessfulResponse(
+ hedged, now + Milliseconds(1000), testSuccessResponse);
+ });
+
+ network->runUntil(now + Milliseconds(1500));
+
+ auto counters = network->getCounters();
+ network->exitNetwork();
+ ASSERT_EQ(counters.failed, 1);
+ ASSERT_EQ(counters.canceled, 1);
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+
+ // Fails NetworkInterfaceMock's timeout check.
+ ASSERT(extraInfo->isLocal());
+ auto localError = extraInfo->asLocal();
+ ASSERT_EQ(localError, ErrorCodes::NetworkInterfaceExceededTimeLimit);
+}
+
+/**
+ * Check that hedged commands send maxTimeMSOpOnly with opCtx deadline given that the deadline is
+ * smaller than the global maxTimeMSForHedgedReads default.
+ */
+TEST_F(HedgedAsyncRPCTest, OpCtxRemainingDeadline) {
+ const auto kDeadline = 100;
+ getOpCtx()->setDeadlineAfterNowBy(Milliseconds(kDeadline), ErrorCodes::MaxTimeMSExpired);
+ auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+
+ onCommand([&](const auto& request) {
+ // Only check deadline from opCtx here, success/fail doesn't matter. In a real system,
+ // this deadline would have real timeout effects on the target host.
+ if (request.target != kTwoHosts[0]) {
+ ASSERT(request.cmdObj["maxTimeMSOpOnly"]);
+ ASSERT_EQ(request.cmdObj["maxTimeMSOpOnly"].Long(), kDeadline);
+ }
+ ASSERT(request.cmdObj["find"]);
+ return CursorResponse(testNS, 0LL, {testFirstBatch})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ auto network = getNetworkInterfaceMock();
+ auto now = getNetworkInterfaceMock()->now();
+ network->enterNetwork();
+ network->runUntil(now + Milliseconds(150));
+ network->exitNetwork();
+
+ auto counters = getNetworkInterfaceCounters();
+ ASSERT_EQ(counters.succeeded, 1);
+ ASSERT_EQ(counters.canceled, 1);
+
+ std::move(resultFuture).get();
+}
+
+/**
* When a hedged command is sent and one request resolves with a non-ignorable error, we propagate
* that error upwards and cancel the other requests.
*/
@@ -451,7 +538,7 @@ TEST_F(HedgedAsyncRPCTest, BothCommandsFailWithIgnorableError) {
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(hedged, now, testIgnorableErrorResponse);
network->scheduleSuccessfulResponse(
- authoritative, now + Milliseconds(1000), testIgnorableErrorResponse);
+ authoritative, now + Milliseconds(100), testIgnorableErrorResponse);
});
network->runUntil(now + Milliseconds(1500));
@@ -486,7 +573,7 @@ TEST_F(HedgedAsyncRPCTest, AllCommandsFailWithIgnorableError) {
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged1) {
network->scheduleResponse(
- authoritative, now + Milliseconds(1000), testIgnorableErrorResponse);
+ authoritative, now + Milliseconds(100), testIgnorableErrorResponse);
network->scheduleResponse(hedged1, now, testIgnorableErrorResponse);
});
@@ -529,7 +616,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithIgnorableErrorAuthoritativeSucceeds) {
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(hedged, now, testIgnorableErrorResponse);
network->scheduleSuccessfulResponse(
- authoritative, now + Milliseconds(1000), testSuccessResponse);
+ authoritative, now + Milliseconds(100), testSuccessResponse);
});
network->runUntil(now + Milliseconds(1500));
@@ -570,7 +657,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeFailsWithIgnorableErrorHedgedCancelled)
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(authoritative, now, testIgnorableErrorResponse);
network->scheduleSuccessfulResponse(
- hedged, now + Milliseconds(1000), testSuccessResponse);
+ hedged, now + Milliseconds(100), testSuccessResponse);
});
network->runUntil(now + Milliseconds(1500));
@@ -612,7 +699,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeFailsWithFatalErrorHedgedCancelled) {
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(authoritative, now, testFatalErrorResponse);
network->scheduleSuccessfulResponse(
- hedged, now + Milliseconds(1000), testSuccessResponse);
+ hedged, now + Milliseconds(100), testSuccessResponse);
});
network->runUntil(now + Milliseconds(1500));
@@ -653,7 +740,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeSucceedsHedgedCancelled) {
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleSuccessfulResponse(authoritative, now, testSuccessResponse);
network->scheduleSuccessfulResponse(
- hedged, now + Milliseconds(1000), testFatalErrorResponse);
+ hedged, now + Milliseconds(100), testFatalErrorResponse);
});
network->runUntil(now + Milliseconds(1500));
@@ -687,7 +774,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedSucceedsAuthoritativeCancelled) {
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleSuccessfulResponse(
- authoritative, now + Milliseconds(1000), testFatalErrorResponse);
+ authoritative, now + Milliseconds(100), testFatalErrorResponse);
network->scheduleSuccessfulResponse(hedged, now, testSuccessResponse);
});
@@ -729,7 +816,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedThenAuthoritativeFailsWithIgnorableError) {
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(
- authoritative, now + Milliseconds(1000), testIgnorableErrorResponse);
+ authoritative, now + Milliseconds(100), testIgnorableErrorResponse);
network->scheduleResponse(hedged, now, testAlternateIgnorableErrorResponse);
});
@@ -773,7 +860,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithIgnorableErrorAuthoritativeFailsWithFa
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
network->scheduleResponse(
- authoritative, now + Milliseconds(1000), testFatalErrorResponse);
+ authoritative, now + Milliseconds(100), testFatalErrorResponse);
network->scheduleResponse(hedged, now, testIgnorableErrorResponse);
});
@@ -815,7 +902,7 @@ TEST_F(HedgedAsyncRPCTest, AuthoritativeSucceedsHedgedFailsWithIgnorableError) {
network,
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
- network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse);
+ network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse);
network->scheduleResponse(hedged, now, testIgnorableErrorResponse);
});
@@ -851,7 +938,7 @@ TEST_F(HedgedAsyncRPCTest, HedgedFailsWithFatalErrorAuthoritativeCanceled) {
network,
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
- network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse);
+ network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse);
network->scheduleResponse(hedged, now, testFatalErrorResponse);
});
@@ -977,7 +1064,7 @@ TEST_F(HedgedAsyncRPCTest, RemoteErrorAttemptedTargetsContainActual) {
network,
[&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
NetworkInterfaceMock::NetworkOperationIterator hedged) {
- network->scheduleResponse(authoritative, now + Milliseconds(1000), testSuccessResponse);
+ network->scheduleResponse(authoritative, now + Milliseconds(100), testSuccessResponse);
network->scheduleResponse(hedged, now, testFatalErrorResponse);
});
diff --git a/src/mongo/executor/remote_command_request.cpp b/src/mongo/executor/remote_command_request.cpp
index b3fa3fb687f..eef86bcd34e 100644
--- a/src/mongo/executor/remote_command_request.cpp
+++ b/src/mongo/executor/remote_command_request.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/query_request_helper.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/s/mongos_server_parameters_gen.h"
#include "mongo/util/str.h"
using namespace fmt::literals;
@@ -73,12 +74,14 @@ RemoteCommandRequestBase::RemoteCommandRequestBase(RequestId requestId,
? theCmdObj.addField(*opCtx->getComment())
: cmdObj = theCmdObj;
- // maxTimeMSOpOnly is set in the network interface based on the remaining max time attached to
- // the OpCtx. It should never be specified explicitly.
- uassert(4924403,
- str::stream() << "Command request object should not manually specify "
- << query_request_helper::kMaxTimeMSOpOnlyField,
- !cmdObj.hasField(query_request_helper::kMaxTimeMSOpOnlyField));
+ // For hedged requests, adjust timeout
+ if (cmdObj.hasField("maxTimeMSOpOnly")) {
+ int maxTimeField = cmdObj["maxTimeMSOpOnly"].Number();
+ if (auto maxTimeMSOpOnly = Milliseconds(maxTimeField);
+ timeout == executor::RemoteCommandRequest::kNoTimeout || maxTimeMSOpOnly < timeout) {
+ timeout = maxTimeMSOpOnly;
+ }
+ }
if (options.hedgeOptions.isHedgeEnabled) {
operationKey.emplace(UUID::gen());