/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/base/error_codes.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/cursor_response.h" #include "mongo/executor/remote_command_response.h" #include "mongo/executor/remote_command_runner.h" #include "mongo/executor/remote_command_targeter.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/mongos_server_parameters_gen.h" #include "mongo/util/assert_util.h" #include "mongo/util/cancellation.h" #include "mongo/util/future.h" #include "mongo/util/future_util.h" #include "mongo/util/net/hostandport.h" #include #include #include namespace mongo { namespace executor { namespace remote_command_runner { namespace { // Only hedge commands that cannot trigger writes. const std::set supportedCmds{"collStats", "count", "dataSize", "dbStats", "distinct", "filemd5", "find", "listCollections", "listIndexes", "planCacheListFilters"}; /** * Given a vector of input Futures, whenAnyThat returns a Future which holds the value * of the first of those futures to resolve with a status, value, and index that * satisfies the conditions in the ConditionCallable Callable. */ template Future whenAnyThat(std::vector>&& futures, ConditionCallable&& shouldAccept) { invariant(futures.size() > 0); struct SharedBlock { SharedBlock(Promise result) : resultPromise(std::move(result)) {} // Tracks whether or not the resultPromise has been set. AtomicWord done{false}; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { std::move(futures[i]) .getAsync( [sharedBlock, myIndex = i, shouldAccept](StatusOrStatusWith value) { if (shouldAccept(value, myIndex)) { // If this is the first input future to complete and satisfy the // shouldAccept condition, change done to true and set the value on the // promise. if (!sharedBlock->done.swap(true)) { sharedBlock->resultPromise.setFrom(std::move(value)); } } }); } return future; } } // namespace /** * doHedgedRequest is a hedged version of the doRequest function. It asynchronously executes a * hedged request by sending commands to multiple targets through doRequest and then returns a * SemiFuture with the first result to become ready. * * In order to hedge, the command must be eligible for hedging, the hedgingMode server parameter * must be enabled, and multiple hosts must be provided by the targeter. If any of those conditions * is false, then the function will not hedge, and instead will just target the first host in the * vector provided by resolve. */ template SemiFuture> doHedgedRequest( CommandType cmd, OperationContext* opCtx, std::unique_ptr targeter, std::shared_ptr exec, CancellationToken token) { using SingleResponse = RemoteCommandRunnerResponse; // Set up cancellation token to cancel remaining hedged operations. CancellationSource hedgeCancellationToken{token}; return targeter->resolve(token) .thenRunOn(exec) .then([cmd, opCtx, exec, token, hedgeCancellationToken](std::vector targets) { uassert(ErrorCodes::HostNotFound, "No hosts available.", targets.size() != 0); bool shouldHedge = (gReadHedgingMode.load() == ReadHedgingMode::kOn) && (supportedCmds.count(CommandType::kCommandName.toString())); // When hedging is disabled, the requests vector will be of size 1. size_t hedgeCount = shouldHedge ? targets.size() : 1; std::vector> requests; for (size_t i = 0; i < hedgeCount; i++) { std::unique_ptr t = std::make_unique(targets[i]); requests.emplace_back( doRequest(cmd, opCtx, std::move(t), exec, hedgeCancellationToken.token()) .thenRunOn(exec)); } /** * When whenAnyThat is used in doHedgedRequest, the shouldAccept function always accepts * the future with index 0, which we treat as the "authoritative" request. This is the * codepath followed when we are not hedging or there is only 1 target provided. */ return whenAnyThat(std::move(requests), [](StatusWith response, size_t index) { Status commandStatus = response.getStatus(); if (index == 0) { return true; } if (commandStatus == ErrorCodes::MaxTimeMSExpired || commandStatus == ErrorCodes::StaleDbVersion || ErrorCodes::isStaleShardVersionError(commandStatus)) { return false; } return true; }); }) .onCompletion([hedgeCancellationToken](StatusWith result) mutable { // TODO SERVER-68101 add retry logic // TODO SERVER-68555 add extra error handling info hedgeCancellationToken.cancel(); return result; }) .semi(); } } // namespace remote_command_runner } // namespace executor } // namespace mongo