summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-05-18 15:28:50 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-18 19:59:43 +0000
commit9395bf07b2c2e6a3204d10543f7d731754bd7c9d (patch)
tree9310f254e3f024b170dd9d6cba445b36c8f7f5bc /src/mongo/executor
parent20de257ec7f9f1def474e7a62375df364ae85f4b (diff)
downloadmongo-9395bf07b2c2e6a3204d10543f7d731754bd7c9d.tar.gz
SERVER-47437 Synchronize and unify NetworkInterfaceTL state components
This patch does the following: - Fixes a use-after-move scenario with operationKey. - Alters the cancelation pattern to kill remotely first, then locally. - Initializes the requestManager and timer members in the CommandState*::make() function. - Uses the existing mutex to synchronize connection resolution and cancelation. - Attempts to bind std::shared_ptr<RequestState> only to the request path.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface_tl.cpp269
-rw-r--r--src/mongo/executor/network_interface_tl.h74
-rw-r--r--src/mongo/executor/remote_command_request.h4
3 files changed, 147 insertions, 200 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 3353c59d3ec..96a8337023d 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -268,8 +268,9 @@ NetworkInterfaceTL::CommandStateBase::CommandStateBase(
: interface(interface_),
requestOnAny(std::move(request_)),
cbHandle(cbHandle_),
- finishLine(maxRequestFailures()),
- operationKey(request_.operationKey) {}
+ timer(interface->_reactor->makeTimer()),
+ finishLine(1),
+ operationKey(requestOnAny.operationKey) {}
NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_,
RemoteCommandRequestOnAny request_,
@@ -299,6 +300,8 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
state->tryFinish(swRequest.getValue().status);
});
+ state->requestManager = std::make_unique<RequestManager>(state.get());
+
{
stdx::lock_guard lk(interface->_inProgressMutex);
if (interface->inShutdown()) {
@@ -335,7 +338,6 @@ void NetworkInterfaceTL::CommandStateBase::setTimer() {
}
// TODO reform with SERVER-41459
- timer = interface->_reactor->makeTimer();
timer->waitUntil(deadline, baton).getAsync([this, anchor = shared_from_this()](Status status) {
if (!status.isOK()) {
return;
@@ -380,14 +382,8 @@ void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept {
LOGV2_DEBUG(
4646302, 2, "Finished request", "requestId"_attr = requestOnAny.id, "status"_attr = status);
- if (timer) {
- // The command has resolved one way or another,
- timer->cancel(baton);
- }
-
- if (!status.isOK() && requestManager) {
- requestManager->cancelRequests();
- }
+ // The command has resolved one way or another.
+ timer->cancel(baton);
if (interface->_counters) {
// Increment our counters for the integration test
@@ -400,11 +396,18 @@ void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept {
interface->_inProgress.erase(cbHandle);
}
- if (operationKey && requestManager) {
+ invariant(requestManager);
+ if (operationKey) {
// Kill operations for requests that we didn't use to fulfill the promise.
requestManager->killOperationsForPendingRequests();
}
+ if (!status.isOK()) {
+ // We cancel after we issue _killOperations because, if we cancel before, existing
+ // RequestStates may finish and destruct to quickly.
+ requestManager->cancelRequests();
+ }
+
networkInterfaceCommandsFailedWithErrorCode.shouldFail([&](const BSONObj& data) {
const auto errorCode = data.getIntField("errorCode");
if (errorCode != status.code()) {
@@ -474,12 +477,6 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
cmdState->deadline = cmdState->stopwatch.start() + cmdState->requestOnAny.timeout;
}
cmdState->baton = baton;
- cmdState->requestManager = std::make_unique<RequestManager>(cmdState->hedgeCount, cmdState);
-
- std::vector<std::shared_ptr<NetworkInterfaceTL::RequestState>> requestStates;
- for (size_t i = 0; i < cmdState->hedgeCount; i++) {
- requestStates.emplace_back(cmdState->requestManager->makeRequest());
- }
if (_svcCtx && cmdState->requestOnAny.hedgeOptions) {
auto hm = HedgingMetrics::get(_svcCtx);
@@ -536,22 +533,20 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
return Status::OK();
}
- RequestManager* rm = cmdState->requestManager.get();
-
// Attempt to get a connection to every target host
- for (size_t idx = 0; idx < request.target.size() && !rm->usedAllConn(); ++idx) {
+ for (size_t idx = 0; idx < request.target.size(); ++idx) {
auto connFuture = _pool->get(request.target[idx], request.sslMode, request.timeout);
// If connection future is ready or requests should be sent in order, send the request
// immediately.
if (connFuture.isReady() || targetHostsInAlphabeticalOrder) {
- rm->trySend(std::move(connFuture).getNoThrow(), idx);
+ cmdState->requestManager->trySend(std::move(connFuture).getNoThrow(), idx);
continue;
}
// Otherwise, schedule the request.
- std::move(connFuture).thenRunOn(_reactor).getAsync([requestStates, rm, idx](auto swConn) {
- rm->trySend(std::move(swConn), idx);
+ std::move(connFuture).thenRunOn(_reactor).getAsync([cmdState = cmdState, idx](auto swConn) {
+ cmdState->requestManager->trySend(std::move(swConn), idx);
});
}
@@ -572,10 +567,8 @@ void NetworkInterfaceTL::testEgress(const HostAndPort& hostAndPort,
}
}
-Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest(size_t reqId) {
- auto requestState = requestManager->getRequest(reqId);
- invariant(requestState);
-
+Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest(
+ std::shared_ptr<RequestState> requestState) {
return makeReadyFutureWith([this, requestState] {
setTimer();
return RequestState::getClient(requestState->conn)
@@ -602,73 +595,31 @@ void NetworkInterfaceTL::CommandState::fulfillFinalPromise(
promise.setFromStatusWith(std::move(response));
}
-std::shared_ptr<NetworkInterfaceTL::RequestState>
-NetworkInterfaceTL::RequestManager::makeRequest() {
- stdx::lock_guard<Latch> lk(mutex);
- auto reqId = requestCount.fetchAndAdd(1);
- auto requestState =
- std::make_shared<NetworkInterfaceTL::RequestState>(this, cmdState.lock(), reqId);
- requests[reqId] = requestState;
- return requestState;
-}
-
-std::shared_ptr<NetworkInterfaceTL::RequestState> NetworkInterfaceTL::RequestManager::getRequest(
- size_t reqId) {
- invariant(requestCount.load() > reqId);
- return requests[reqId].lock();
-}
-
-std::shared_ptr<NetworkInterfaceTL::RequestState>
-NetworkInterfaceTL::RequestManager::getNextRequest() {
- stdx::lock_guard<Latch> lk(mutex);
- if (sentIdx.load() < requests.size()) {
- auto requestState = requests[sentIdx.fetchAndAdd(1)].lock();
- invariant(requestState);
-
- if (sentIdx.load() > 1) {
- requestState->isHedge = true;
- }
- return requestState;
- } else {
- return nullptr;
- }
-}
-
-NetworkInterfaceTL::ConnStatus NetworkInterfaceTL::RequestManager::getConnStatus(size_t reqId) {
- invariant(requestCount.load() > reqId);
- return connStatus[reqId];
-}
-
-bool NetworkInterfaceTL::RequestManager::sentNone() const {
- return sentIdx.load() == 0;
-}
-
-bool NetworkInterfaceTL::RequestManager::sentAll() const {
- return sentIdx.load() == requests.size();
-}
-
-bool NetworkInterfaceTL::RequestManager::usedAllConn() const {
- return std::count(connStatus.begin(), connStatus.end(), ConnStatus::Unset) == 0;
-}
+NetworkInterfaceTL::RequestManager::RequestManager(CommandStateBase* cmdState_)
+ : cmdState{cmdState_},
+ requests(cmdState->maxConcurrentRequests(), std::weak_ptr<RequestState>()) {}
void NetworkInterfaceTL::RequestManager::cancelRequests() {
{
stdx::lock_guard<Latch> lk(mutex);
isLocked = true;
- if (sentNone()) {
+ if (sentIdx == 0) {
// We've canceled before any connections were acquired.
return;
}
}
for (size_t i = 0; i < requests.size(); i++) {
- LOGV2_DEBUG(4646301,
- 2,
- "Cancelling request",
- "requestId"_attr = cmdState.lock()->requestOnAny.id,
- "index"_attr = i);
+ // Note that right now, this will cause the connection to be discarded before most if not
+ // all responsnes for our _killOperations.
+ // TODO SERVER-47602 should fix this.
if (auto requestState = requests[i].lock()) {
+ LOGV2_DEBUG(4646301,
+ 2,
+ "Cancelling request",
+ "requestId"_attr = cmdState->requestOnAny.id,
+ "index"_attr = i);
requestState->cancel();
}
}
@@ -679,7 +630,7 @@ void NetworkInterfaceTL::RequestManager::killOperationsForPendingRequests() {
stdx::lock_guard<Latch> lk(mutex);
isLocked = true;
- if (sentNone()) {
+ if (sentIdx == 0) {
// We've canceled before any connections were acquired.
return;
}
@@ -691,17 +642,19 @@ void NetworkInterfaceTL::RequestManager::killOperationsForPendingRequests() {
continue;
}
- // If the request was sent, send a remote command request to the target host
- // to kill the operation started by the request.
- bool hasAcquiredConn = getConnStatus(requestState->reqId) == ConnStatus::OK;
- if (!hasAcquiredConn || !requestState->request) {
+ auto conn = requestState->weakConn.lock();
+ if (!conn) {
+ // If there is nothing from weakConn, the networking has already finished.
continue;
}
+ // If the request was sent, send a remote command request to the target host
+ // to kill the operation started by the request.
+
LOGV2_DEBUG(4664801,
2,
"Sending remote _killOperations request to cancel command",
- "operationKey"_attr = cmdState.lock()->operationKey,
+ "operationKey"_attr = cmdState->operationKey,
"target"_attr = requestState->request->target,
"requestId"_attr = requestState->request->id);
@@ -714,55 +667,84 @@ void NetworkInterfaceTL::RequestManager::killOperationsForPendingRequests() {
void NetworkInterfaceTL::RequestManager::trySend(
StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept {
- auto cmdStatePtr = cmdState.lock();
- invariant(cmdStatePtr);
// Our connection wasn't any good
if (!swConn.isOK()) {
- connStatus[idx] = ConnStatus::Failed;
- if (!usedAllConn()) {
- return;
+ {
+ stdx::lock_guard<Latch> lk(mutex);
+
+ auto currentConnsResolved = ++connsResolved;
+ if (currentConnsResolved < cmdState->maxPossibleConns()) {
+ // If we still have connections outstanding, we don't need to fail the promise.
+ return;
+ }
+
+ if (sentIdx > 0) {
+ // If a request has been sent, we shouldn't fail the promise.
+ return;
+ }
+
+ if (isLocked) {
+ // If we've finished, obviously we don't need to fail the promise.
+ return;
+ }
}
// We're the last one, set the promise if it hasn't already been set via cancel or timeout
- if (cmdStatePtr->finishLine.arriveStrongly()) {
- auto& reactor = cmdStatePtr->interface->_reactor;
+ if (cmdState->finishLine.arriveStrongly()) {
+ auto& reactor = cmdState->interface->_reactor;
if (reactor->onReactorThread()) {
- cmdStatePtr->fulfillFinalPromise(swConn.getStatus());
+ cmdState->fulfillFinalPromise(std::move(swConn.getStatus()));
} else {
ExecutorFuture<void>(reactor, swConn.getStatus())
- .getAsync([this, cmdStatePtr](Status status) {
- cmdStatePtr->fulfillFinalPromise(std::move(status));
+ .getAsync([this, anchor = cmdState->shared_from_this()](Status status) {
+ cmdState->fulfillFinalPromise(std::move(status));
});
}
}
return;
}
- connStatus[idx] = ConnStatus::OK;
+ std::shared_ptr<RequestState> requestState;
{
stdx::lock_guard<Latch> lk(mutex);
- if (cmdStatePtr->finishLine.isReady() || sentAll() || isLocked) {
+
+ // Increment the number of conns we were able to resolve.
+ ++connsResolved;
+
+ auto haveSentAll = sentIdx >= cmdState->maxConcurrentRequests();
+ if (haveSentAll || isLocked) {
// Our command has already been satisfied or we have already sent out all
// the requests.
swConn.getValue()->indicateSuccess();
return;
}
- }
- auto requestState = getNextRequest();
- invariant(requestState);
+ auto currentSentIdx = sentIdx++;
+
+ requestState = std::make_shared<RequestState>(this, cmdState->shared_from_this(), idx);
+ requestState->isHedge = currentSentIdx > 0;
+
+ // Set conn/weakConn+request under the lock so they will always be observed during cancel.
+ requestState->conn = std::move(swConn.getValue());
+ requestState->weakConn = requestState->conn;
+
+ requestState->request = RemoteCommandRequest(cmdState->requestOnAny, idx);
+ requestState->host = requestState->request->target;
+
+ requests.at(currentSentIdx) = requestState;
+ }
LOGV2_DEBUG(4646300,
2,
"Sending request",
- "requestId"_attr = cmdStatePtr->requestOnAny.id,
- "target"_attr = cmdStatePtr->requestOnAny.target[idx]);
-
- RemoteCommandRequest request({cmdStatePtr->requestOnAny, idx});
+ "requestId"_attr = cmdState->requestOnAny.id,
+ "target"_attr = cmdState->requestOnAny.target[idx]);
if (requestState->isHedge) {
- invariant(cmdStatePtr->requestOnAny.hedgeOptions);
+ auto& request = *requestState->request;
+
+ invariant(request.hedgeOptions);
auto maxTimeMS = request.hedgeOptions->maxTimeMSForHedgedReads;
BSONObjBuilder updatedCmdBuilder;
@@ -774,47 +756,30 @@ void NetworkInterfaceTL::RequestManager::trySend(
2,
"Set maxTimeMS for request",
"maxTime"_attr = Milliseconds(maxTimeMS),
- "requestId"_attr = cmdStatePtr->requestOnAny.id,
- "target"_attr = cmdStatePtr->requestOnAny.target[idx]);
+ "requestId"_attr = cmdState->requestOnAny.id,
+ "target"_attr = cmdState->requestOnAny.target[idx]);
- if (cmdStatePtr->interface->_svcCtx) {
- auto hm = HedgingMetrics::get(cmdStatePtr->interface->_svcCtx);
+ if (cmdState->interface->_svcCtx) {
+ auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx);
invariant(hm);
hm->incrementNumTotalHedgedOperations();
}
}
- requestState->send(std::move(swConn), request);
-}
-
-void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn,
- size_t idx) noexcept {
- invariant(requestManager);
- requestManager->trySend(std::move(swConn), idx);
-}
-
-void NetworkInterfaceTL::RequestState::send(StatusWith<ConnectionPool::ConnectionHandle> swConn,
- RemoteCommandRequest remoteCommandRequest) noexcept {
-
- // We have a connection and the command hasn't already been attempted
- request.emplace(remoteCommandRequest);
- host = request.get().target;
- conn = std::move(swConn.getValue());
- weakConn = conn;
-
networkInterfaceHangCommandsAfterAcquireConn.pauseWhileSet();
+ // We have a connection and the command hasn't already been attempted
LOGV2_DEBUG(4630601,
2,
"Request acquired a connection",
- "requestId"_attr = request->id,
- "target"_attr = request->target);
+ "requestId"_attr = requestState->request->id,
+ "target"_attr = requestState->request->target);
- if (auto counters = interface()->_counters) {
+ if (auto counters = cmdState->interface->_counters) {
counters->recordSent();
}
- resolve(cmdState->sendRequest(reqId));
+ requestState->resolve(cmdState->sendRequest(requestState));
}
void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> future) noexcept {
@@ -901,6 +866,8 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface
Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"});
});
+ state->requestManager = std::make_unique<RequestManager>(state.get());
+
{
stdx::lock_guard lk(interface->_inProgressMutex);
if (interface->inShutdown()) {
@@ -913,9 +880,8 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface
return state;
}
-Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendRequest(size_t reqId) {
- auto requestState = requestManager->getRequest(reqId);
- invariant(requestState);
+Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendRequest(
+ std::shared_ptr<RequestState> requestState) {
setTimer();
requestState->getClient(requestState->conn)
@@ -1017,23 +983,20 @@ Status NetworkInterfaceTL::startExhaustCommand(const TaskExecutor::CallbackHandl
cmdState->deadline = cmdState->stopwatch.start() + cmdState->requestOnAny.timeout;
}
cmdState->baton = baton;
- cmdState->requestManager = std::make_unique<RequestManager>(1, cmdState);
-
- auto requestState = cmdState->requestManager->makeRequest();
+ cmdState->requestManager = std::make_unique<RequestManager>(cmdState.get());
// Attempt to get a connection to every target host
- for (size_t idx = 0; idx < request.target.size() && !cmdState->requestManager->usedAllConn();
- ++idx) {
+ for (size_t idx = 0; idx < request.target.size(); ++idx) {
auto connFuture = _pool->get(request.target[idx], request.sslMode, request.timeout);
if (connFuture.isReady()) {
- requestState->trySend(std::move(connFuture).getNoThrow(), idx);
+ cmdState->requestManager->trySend(std::move(connFuture).getNoThrow(), idx);
continue;
}
// For every connection future we didn't have immediately ready, schedule
- std::move(connFuture).thenRunOn(_reactor).getAsync([requestState, idx](auto swConn) {
- requestState->trySend(std::move(swConn), idx);
+ std::move(connFuture).thenRunOn(_reactor).getAsync([cmdState, idx](auto swConn) {
+ cmdState->requestManager->trySend(std::move(swConn), idx);
});
}
@@ -1092,34 +1055,26 @@ Status NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestS
auto cbHandle = executor::TaskExecutor::CallbackHandle();
auto [killOpCmdState, future] = CommandState::make(this, killOpRequest, cbHandle);
killOpCmdState->deadline = killOpCmdState->stopwatch.start() + killOpRequest.timeout;
- killOpCmdState->requestManager = std::make_unique<RequestManager>(1, killOpCmdState);
-
- auto killOpRequestState = killOpCmdState->requestManager->makeRequest();
std::move(future).getAsync(
- [this, operationKey, target = target](StatusWith<RemoteCommandOnAnyResponse> swr) {
+ [this, operationKey, killOpRequest](StatusWith<RemoteCommandOnAnyResponse> swr) {
invariant(swr.isOK());
auto rs = std::move(swr.getValue());
LOGV2_DEBUG(51813,
2,
"Remote _killOperations request to cancel command finished with response",
"operationKey"_attr = operationKey,
- "target"_attr = target,
+ "target"_attr = killOpRequest.target,
"response"_attr =
redact(rs.isOK() ? rs.data.toString() : rs.status.toString()));
});
// Send the _killOperations request.
auto connFuture = _pool->get(target, sslMode, killOpRequest.kNoTimeout);
- if (connFuture.isReady()) {
- killOpRequestState->trySend(std::move(connFuture).getNoThrow(), 0);
- return Status::OK();
- }
-
std::move(connFuture)
.thenRunOn(_reactor)
- .getAsync([this, killOpRequestState, killOpRequest](auto swConn) {
- killOpRequestState->trySend(std::move(swConn), 0);
+ .getAsync([this, killOpCmdState = killOpCmdState](auto swConn) {
+ killOpCmdState->requestManager->trySend(std::move(swConn), 0);
});
return Status::OK();
} catch (const DBException& ex) {
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 648c1294f21..7dfb40fbba4 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -114,14 +114,8 @@ private:
/**
* Use the current RequestState to send out a command request.
*/
- virtual Future<RemoteCommandResponse> sendRequest(size_t reqId) = 0;
-
- /**
- * Return the maximum number of request failures this Command can tolerate
- */
- virtual size_t maxRequestFailures() {
- return 1;
- }
+ virtual Future<RemoteCommandResponse> sendRequest(
+ std::shared_ptr<RequestState> requestState) = 0;
/**
* Set a timer to fulfill the promise with a timeout error.
@@ -148,6 +142,24 @@ private:
*/
void doMetadataHook(const RemoteCommandOnAnyResponse& response);
+ /**
+ * Return the maximum amount of requests that can come from this command.
+ */
+ size_t maxConcurrentRequests() const noexcept {
+ if (!requestOnAny.hedgeOptions) {
+ return 1ull;
+ }
+
+ return requestOnAny.hedgeOptions->count + 1ull;
+ }
+
+ /**
+ * Return the most connections we expect to be able to acquire.
+ */
+ size_t maxPossibleConns() const noexcept {
+ return requestOnAny.target.size();
+ }
+
NetworkInterfaceTL* interface;
RemoteCommandRequestOnAny requestOnAny;
@@ -161,6 +173,8 @@ private:
std::unique_ptr<RequestManager> requestManager;
+ // TODO replace the finishLine with an atomic bool. It is no longer tracking allowed
+ // failures accurately.
StrongWeakFinishLine finishLine;
boost::optional<UUID> operationKey;
@@ -178,7 +192,8 @@ private:
RemoteCommandRequestOnAny request,
const TaskExecutor::CallbackHandle& cbHandle);
- Future<RemoteCommandResponse> sendRequest(size_t reqId) override;
+ Future<RemoteCommandResponse> sendRequest(
+ std::shared_ptr<RequestState> requestState) override;
void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
@@ -201,7 +216,8 @@ private:
const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandOnReplyFn&& onReply);
- Future<RemoteCommandResponse> sendRequest(size_t reqId) override;
+ Future<RemoteCommandResponse> sendRequest(
+ std::shared_ptr<RequestState> requestState) override;
void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
@@ -213,42 +229,26 @@ private:
RemoteCommandOnReplyFn onReplyFn;
};
- enum class ConnStatus { Unset, OK, Failed };
-
struct RequestManager {
- RequestManager(size_t numHedges, std::shared_ptr<CommandStateBase> cmdState_)
- : connStatus(cmdState_->requestOnAny.target.size(), ConnStatus::Unset),
- requests(numHedges),
- cmdState(cmdState_){};
-
- std::shared_ptr<RequestState> makeRequest();
- std::shared_ptr<RequestState> getRequest(size_t reqId);
- std::shared_ptr<RequestState> getNextRequest();
+ RequestManager(CommandStateBase* cmdState);
void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept;
void cancelRequests();
void killOperationsForPendingRequests();
- bool sentNone() const;
- bool sentAll() const;
+ CommandStateBase* cmdState;
+ std::vector<std::weak_ptr<RequestState>> requests;
- ConnStatus getConnStatus(size_t reqId);
- bool usedAllConn() const;
+ Mutex mutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::RequestManager::mutex");
- std::vector<ConnStatus> connStatus;
- std::vector<std::weak_ptr<RequestState>> requests;
- std::weak_ptr<CommandStateBase> cmdState;
+ // Number of connections we've resolved.
+ size_t connsResolved{0};
// Number of sent requests.
- AtomicWord<size_t> sentIdx{0};
-
- // Number of requests to send.
- AtomicWord<size_t> requestCount{0};
+ size_t sentIdx{0};
// Set to true when the command finishes or is canceled to block remaining requests.
bool isLocked{false};
-
- Mutex mutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::RequestManager::mutex");
};
struct RequestState final : public std::enable_shared_from_this<RequestState> {
@@ -277,14 +277,6 @@ private:
void returnConnection(Status status) noexcept;
/**
- * Attempt to send a request using the given connection
- */
- void trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept;
-
- void send(StatusWith<ConnectionPool::ConnectionHandle> swConn,
- RemoteCommandRequest remoteCommandRequest) noexcept;
-
- /**
* Resolve an eventual response
*/
void resolve(Future<RemoteCommandResponse> future) noexcept;
diff --git a/src/mongo/executor/remote_command_request.h b/src/mongo/executor/remote_command_request.h
index 56c7c89dfb9..811692308e4 100644
--- a/src/mongo/executor/remote_command_request.h
+++ b/src/mongo/executor/remote_command_request.h
@@ -44,8 +44,8 @@ namespace executor {
struct RemoteCommandRequestBase {
struct HedgeOptions {
- size_t count;
- int maxTimeMSForHedgedReads;
+ size_t count = 0;
+ int maxTimeMSForHedgedReads = 0;
};
enum FireAndForgetMode { kOn, kOff };