summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/async_client.cpp6
-rw-r--r--src/mongo/client/async_client.h3
-rw-r--r--src/mongo/executor/network_interface.h6
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp33
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.h5
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp134
-rw-r--r--src/mongo/executor/network_interface_mock.cpp7
-rw-r--r--src/mongo/executor/network_interface_mock.h4
-rw-r--r--src/mongo/executor/network_interface_tl.cpp172
-rw-r--r--src/mongo/executor/network_interface_tl.h78
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp26
-rw-r--r--src/mongo/util/clock_source.h8
12 files changed, 434 insertions, 48 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index cd7a3f78bd9..1aceecbfd94 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -303,10 +303,12 @@ Future<void> AsyncDBClient::_continueReceiveExhaustResponse(
// Run callback
auto now = exhaustParameters.clkSource->now();
auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start);
+ bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
- exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration));
+ exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration),
+ isMoreToComeSet);
- if (!OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome)) {
+ if (!isMoreToComeSet) {
return Status::OK();
}
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index 28e0bc2e9da..c67c690554c 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -47,7 +47,8 @@ namespace mongo {
class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> {
public:
- using RemoteCommandCallbackFn = unique_function<void(const executor::RemoteCommandResponse&)>;
+ using RemoteCommandCallbackFn =
+ unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>;
struct ExhaustRequestParameters {
ExhaustRequestParameters(ExhaustRequestParameters&&) = default;
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 386893b4881..a228b17899b 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -56,6 +56,8 @@ public:
using Response = RemoteCommandResponse;
using RemoteCommandCompletionFn =
unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
+ using RemoteCommandOnReplyFn =
+ unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>;
virtual ~NetworkInterface();
@@ -150,6 +152,10 @@ public:
RemoteCommandRequestOnAny& request,
RemoteCommandCompletionFn&& onFinish,
const BatonHandle& baton = nullptr) = 0;
+ virtual Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequestOnAny& request,
+ RemoteCommandOnReplyFn&& onReply,
+ const BatonHandle& baton = nullptr) = 0;
Future<TaskExecutor::ResponseOnAnyStatus> startCommand(
const TaskExecutor::CallbackHandle& cbHandle,
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 847fcf4b5b8..6440f3eccf3 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -113,6 +113,39 @@ Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand(
});
}
+Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand(
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest request,
+ std::function<void(const RemoteCommandResponse&)> exhaustUtilCB,
+ const BatonHandle& baton) {
+ RemoteCommandRequestOnAny rcroa{request};
+ auto pf = makePromiseFuture<void>();
+
+ auto status = net().startExhaustCommand(
+ cbHandle,
+ rcroa,
+ [p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)](
+ const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable {
+ exhaustUtilCB(rs);
+
+ if (!rs.status.isOK()) {
+ invariant(!isMoreToComeSet);
+ p.setError(rs.status);
+ return;
+ }
+
+ if (!isMoreToComeSet) {
+ p.emplaceValue();
+ }
+ },
+ baton);
+
+ if (!status.isOK()) {
+ return status;
+ }
+ return std::move(pf.future);
+}
+
RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync(
RemoteCommandRequest& request) {
auto deferred = runCommand(makeCallbackHandle(), request);
diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h
index eb164359d89..6bfb64ef563 100644
--- a/src/mongo/executor/network_interface_integration_fixture.h
+++ b/src/mongo/executor/network_interface_integration_fixture.h
@@ -81,6 +81,11 @@ public:
Future<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest request);
+ Future<void> startExhaustCommand(
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest request,
+ std::function<void(const RemoteCommandResponse&)> exhaustUtilCB,
+ const BatonHandle& baton = nullptr);
RemoteCommandResponse runCommandSync(RemoteCommandRequest& request);
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index cebca17052f..6838a4dc51c 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/message.h"
+#include "mongo/rpc/topology_version_gen.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/integration_test.h"
#include "mongo/unittest/unittest.h"
@@ -589,6 +590,139 @@ TEST_F(NetworkInterfaceTest, IsMasterRequestMissingInternalClientInfoWhenNotInte
assertNumOps(0u, 0u, 0u, 1u);
}
+class ExhaustRequestHandlerUtil {
+public:
+ struct responseOutcomeCount {
+ int _success = 0;
+ int _failed = 0;
+ };
+
+ std::function<void(const RemoteCommandResponse&)>&& getExhaustRequestCallbackFn() {
+ return std::move(_callbackFn);
+ }
+
+ ExhaustRequestHandlerUtil::responseOutcomeCount getCountersWhenReady() {
+ stdx::unique_lock<Latch> lk(_mutex);
+ _cv.wait(_mutex, [&] { return _replyUpdated; });
+ _replyUpdated = false;
+ return _responseOutcomeCount;
+ }
+
+private:
+ // set to true once '_responseOutcomeCount' has been updated. Used to indicate that a new
+ // response has been sent.
+ bool _replyUpdated = false;
+
+ // counter of how many successful and failed responses were received.
+ responseOutcomeCount _responseOutcomeCount;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("ExhaustRequestHandlerUtil::_mutex");
+ stdx::condition_variable _cv;
+
+ // called when a server sends a new isMaster exhaust response. Updates _responseOutcomeCount
+ // and _replyUpdated.
+ std::function<void(const RemoteCommandResponse&)> _callbackFn =
+ [&](const executor::RemoteCommandResponse& response) {
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (response.status.isOK()) {
+ _responseOutcomeCount._success++;
+ } else {
+ _responseOutcomeCount._failed++;
+ }
+ _replyUpdated = true;
+ }
+
+ _cv.notify_all();
+ };
+};
+
+TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldReceiveMultipleResponses) {
+ auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion"
+ << TopologyVersion(OID::max(), 0).toBSON());
+
+ auto request = makeTestCommand(boost::none, isMasterCmd);
+ auto cbh = makeCallbackHandle();
+ ExhaustRequestHandlerUtil exhaustRequestHandler;
+
+ auto exhaustFuture = startExhaustCommand(
+ cbh, std::move(request), exhaustRequestHandler.getExhaustRequestCallbackFn());
+
+ {
+ // The server sends a response either when a topology change occurs or when it has not sent
+ // a response in 'maxAwaitTimeMS'. In this case we expect a response every 'maxAwaitTimeMS'
+ // = 1000 (set in the isMaster cmd above)
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+ ASSERT(!exhaustFuture.isReady());
+
+ // The first response should be successful
+ ASSERT_EQ(counters._success, 1);
+ ASSERT_EQ(counters._failed, 0);
+ }
+
+ {
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+ ASSERT(!exhaustFuture.isReady());
+
+ // The second response should also be successful
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 0);
+ }
+
+ net().cancelCommand(cbh);
+ auto error = exhaustFuture.getNoThrow();
+ ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable));
+
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+
+ // The command was cancelled so the 'fail' counter should be incremented
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 1);
+}
+
+TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldStopOnFailure) {
+ // Both assetCommandOK and makeTestCommand target the first host in the connection string, so we
+ // are guaranteed that the failpoint is set on the same host that we run the exhaust command on.
+ auto configureFailpointCmd = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("errorCode" << ErrorCodes::CommandFailed
+ << "failCommands"
+ << BSON_ARRAY("isMaster")));
+ assertCommandOK("admin", configureFailpointCmd);
+
+ ON_BLOCK_EXIT([&] {
+ auto stopFpRequest = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "off");
+ assertCommandOK("admin", stopFpRequest);
+ });
+
+ auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion"
+ << TopologyVersion(OID::max(), 0).toBSON());
+
+ auto request = makeTestCommand(boost::none, isMasterCmd);
+ auto cbh = makeCallbackHandle();
+ ExhaustRequestHandlerUtil exhaustRequestHandler;
+
+ auto exhaustFuture = startExhaustCommand(
+ cbh, std::move(request), exhaustRequestHandler.getExhaustRequestCallbackFn());
+
+ {
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+
+ auto error = exhaustFuture.getNoThrow();
+ ASSERT_EQ(error, ErrorCodes::CommandFailed);
+
+ // The response should be marked as failed
+ ASSERT_EQ(counters._success, 0);
+ ASSERT_EQ(counters._failed, 1);
+ }
+}
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 080fe35738e..0d329a64c1c 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -133,6 +133,13 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
return Status::OK();
}
+Status NetworkInterfaceMock::startExhaustCommand(const CallbackHandle& cbHandle,
+ RemoteCommandRequestOnAny& request,
+ RemoteCommandOnReplyFn&& onReply,
+ const BatonHandle& baton) {
+ MONGO_UNREACHABLE;
+}
+
void NetworkInterfaceMock::setHandshakeReplyForHost(
const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 2fb1d313500..c67ee4c58a1 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -113,6 +113,10 @@ public:
RemoteCommandRequestOnAny& request,
RemoteCommandCompletionFn&& onFinish,
const BatonHandle& baton = nullptr) override;
+ Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequestOnAny& request,
+ RemoteCommandOnReplyFn&& onReply,
+ const BatonHandle& baton = nullptr) override;
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index fcc650295b5..0bb2d3bb539 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -222,15 +222,20 @@ Date_t NetworkInterfaceTL::now() {
return _reactor->now();
}
-NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_,
- RemoteCommandRequestOnAny request_,
- const TaskExecutor::CallbackHandle& cbHandle_)
+NetworkInterfaceTL::CommandStateBase::CommandStateBase(
+ NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_)
: interface(interface_),
requestOnAny(std::move(request_)),
cbHandle(cbHandle_),
finishLine(maxRequestFailures()),
operationKey(request_.operationKey) {}
+NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_)
+ : CommandStateBase(interface_, std::move(request_), cbHandle_) {}
auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
RemoteCommandRequestOnAny request,
@@ -270,7 +275,7 @@ AsyncDBClient* NetworkInterfaceTL::RequestState::client() noexcept {
return checked_cast<connection_pool_tl::TLConnection*>(conn.get())->client();
}
-void NetworkInterfaceTL::CommandState::setTimer() {
+void NetworkInterfaceTL::CommandStateBase::setTimer() {
if (deadline == RemoteCommandRequest::kNoExpirationDate) {
return;
}
@@ -301,7 +306,7 @@ void NetworkInterfaceTL::CommandState::setTimer() {
<< ", op was " << redact(requestOnAny.toString());
LOGV2_DEBUG(22595, 2, "{message}", "message"_attr = message);
- promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message));
+ fulfillFinalPromise(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message));
});
}
@@ -321,7 +326,7 @@ void NetworkInterfaceTL::RequestState::returnConnection(Status status) noexcept
connToReturn->indicateSuccess();
}
-void NetworkInterfaceTL::CommandState::tryFinish(Status status) noexcept {
+void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept {
invariant(finishLine.isReady());
if (timer) {
@@ -487,7 +492,8 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest() {
});
}
-void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyResponse& response) {
+void NetworkInterfaceTL::CommandStateBase::doMetadataHook(
+ const RemoteCommandOnAnyResponse& response) {
if (auto& hook = interface->_metadataHook; hook && !finishLine.isReady()) {
invariant(response.target);
@@ -496,6 +502,11 @@ void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyRe
}
}
+void NetworkInterfaceTL::CommandState::fulfillFinalPromise(
+ StatusWith<RemoteCommandOnAnyResponse> response) {
+ promise.setFromStatusWith(std::move(response));
+}
+
void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn,
size_t idx) noexcept {
trySend(std::move(swConn), {cmdState->requestOnAny, idx});
@@ -514,10 +525,10 @@ void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::Connec
if (cmdState->finishLine.arriveStrongly()) {
auto& reactor = interface()->_reactor;
if (reactor->onReactorThread()) {
- cmdState->promise.setError(swConn.getStatus());
+ cmdState->fulfillFinalPromise(swConn.getStatus());
} else {
ExecutorFuture<void>(reactor, swConn.getStatus()).getAsync([this](Status status) {
- cmdState->promise.setError(std::move(status));
+ cmdState->fulfillFinalPromise(std::move(status));
});
}
}
@@ -577,7 +588,7 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut
auto response = uassertStatusOK(swr);
auto status = swr.getValue().status;
if (cmdState->finishLine.arriveStrongly()) {
- cmdState->promise.emplaceValue(std::move(response));
+ cmdState->fulfillFinalPromise(std::move(response));
}
return status;
@@ -597,11 +608,144 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut
return;
}
- cmdState->promise.emplaceValue(std::move(response));
+ cmdState->fulfillFinalPromise(std::move(response));
});
}
}
+NetworkInterfaceTL::ExhaustCommandState::ExhaustCommandState(
+ NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_,
+ RemoteCommandOnReplyFn&& onReply_)
+ : CommandStateBase(interface_, std::move(request_), cbHandle_),
+ onReplyFn(std::move(onReply_)) {}
+
+auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface,
+ RemoteCommandRequestOnAny request,
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandOnReplyFn&& onReply) {
+ auto state = std::make_shared<ExhaustCommandState>(
+ interface, std::move(request), cbHandle, std::move(onReply));
+ auto [promise, future] = makePromiseFuture<void>();
+ state->promise = std::move(promise);
+ std::move(future)
+ .onError([state](Status error) {
+ stdx::lock_guard lk(state->_onReplyMutex);
+ state->onReplyFn(RemoteCommandOnAnyResponse(
+ boost::none, std::move(error), state->stopwatch.elapsed()),
+ false);
+ })
+ .getAsync([state](Status status) { state->tryFinish(status); });
+
+ {
+ stdx::lock_guard lk(interface->_inProgressMutex);
+ interface->_inProgress.insert({cbHandle, state});
+ }
+
+ return state;
+}
+
+Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendRequest() {
+ auto requestState = requestStatePtr.lock();
+ invariant(requestState);
+
+ auto clientCallback = [this, requestState](const RemoteCommandResponse& response,
+ bool isMoreToComeSet) {
+ // Stash this response on the command state to be used to fulfill the promise.
+ prevResponse = response;
+ auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response);
+ doMetadataHook(onAnyResponse);
+
+ // If the command failed, we will call 'onReply' as a part of the future chain paired with
+ // the promise. This is to be sure that all error paths will run 'onReply' only once upon
+ // future completion.
+ if (!getStatusFromCommandResult(response.data).isOK()) {
+ // The moreToCome bit should *not* be set if the command failed
+ invariant(!isMoreToComeSet);
+ return;
+ }
+
+ // Reset the stopwatch to measure the correct duration for the folowing reply
+ stopwatch.restart();
+ setTimer();
+
+ stdx::lock_guard lk(_onReplyMutex);
+ onReplyFn(onAnyResponse, isMoreToComeSet);
+ };
+
+ return makeReadyFutureWith(
+ [this, requestState, clientCallback = std::move(clientCallback)]() mutable {
+ setTimer();
+ return requestState->client()->runExhaustCommandRequest(
+ *requestState->request, std::move(clientCallback), baton);
+ })
+ .then([this, requestState] { return prevResponse; });
+}
+
+void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise(
+ StatusWith<RemoteCommandOnAnyResponse> response) {
+ auto status = !response.getStatus().isOK()
+ ? response.getStatus()
+ : getStatusFromCommandResult(response.getValue().data);
+
+ if (!status.isOK()) {
+ promise.setError(status);
+ return;
+ }
+
+ promise.emplaceValue();
+}
+
+Status NetworkInterfaceTL::startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequestOnAny& request,
+ RemoteCommandOnReplyFn&& onReply,
+ const BatonHandle& baton) {
+ if (inShutdown()) {
+ return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
+ }
+
+ LOG(kDiagnosticLogLevel) << "startCommand: " << redact(request.toString());
+
+ if (_metadataHook) {
+ BSONObjBuilder newMetadata(std::move(request.metadata));
+
+ auto status = _metadataHook->writeRequestMetadata(request.opCtx, &newMetadata);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ request.metadata = newMetadata.obj();
+ }
+
+ auto cmdState = ExhaustCommandState::make(this, request, cbHandle, std::move(onReply));
+ if (cmdState->requestOnAny.timeout != cmdState->requestOnAny.kNoTimeout) {
+ cmdState->deadline = cmdState->stopwatch.start() + cmdState->requestOnAny.timeout;
+ }
+ cmdState->baton = baton;
+
+ auto requestState = std::make_shared<RequestState>(cmdState);
+ cmdState->requestStatePtr = requestState;
+
+ // Attempt to get a connection to every target host
+ for (size_t idx = 0; idx < request.target.size() && !requestState->connFinishLine.isReady();
+ ++idx) {
+ auto connFuture = _pool->get(request.target[idx], request.sslMode, request.timeout);
+
+ if (connFuture.isReady()) {
+ requestState->trySend(std::move(connFuture).getNoThrow(), idx);
+ continue;
+ }
+
+ // For every connection future we didn't have immediately ready, schedule
+ std::move(connFuture).thenRunOn(_reactor).getAsync([requestState, idx](auto swConn) {
+ requestState->trySend(std::move(swConn), idx);
+ });
+ }
+
+ return Status::OK();
+}
+
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const BatonHandle&) {
stdx::unique_lock<Latch> lk(_inProgressMutex);
@@ -640,7 +784,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
"Canceling operation; original request was: {cmdStateToCancel_requestOnAny}",
"cmdStateToCancel_requestOnAny"_attr =
redact(cmdStateToCancel->requestOnAny.toString()));
- cmdStateToCancel->promise.setError(
+ cmdStateToCancel->fulfillFinalPromise(
{ErrorCodes::CallbackCanceled,
str::stream() << "Command canceled; original request was: "
<< redact(cmdStateToCancel->requestOnAny.toString())});
@@ -688,13 +832,13 @@ void NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestSta
if (rs.isOK()) {
// _killOperations succeeded but the operation interrupted error is expected to be
// ignored in resolve() since cancelCommand() crossed the command finishLine first.
- cmdStateToKill->promise.setError(
+ cmdStateToKill->fulfillFinalPromise(
{ErrorCodes::CallbackCanceled,
str::stream() << "cancelCommand successfully issued remote interruption"});
} else {
// _killOperations timed out or failed due to other errors.
rs.status.addContext("operation's client canceled by cancelCommand");
- cmdStateToKill->promise.setError(std::move(rs.status));
+ cmdStateToKill->fulfillFinalPromise(std::move(rs.status));
}
});
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 8957cd875c6..d89489b1b65 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -74,6 +74,10 @@ public:
RemoteCommandRequestOnAny& request,
RemoteCommandCompletionFn&& onFinish,
const BatonHandle& baton) override;
+ Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequestOnAny& request,
+ RemoteCommandOnReplyFn&& onReply,
+ const BatonHandle& baton) override;
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const BatonHandle& baton) override;
@@ -92,22 +96,16 @@ public:
private:
struct RequestState;
- struct CommandState final : public std::enable_shared_from_this<CommandState> {
- CommandState(NetworkInterfaceTL* interface_,
- RemoteCommandRequestOnAny request_,
- const TaskExecutor::CallbackHandle& cbHandle_);
- virtual ~CommandState() = default;
-
- // Create a new CommandState in a shared_ptr
- // Prefer this over raw construction
- static auto make(NetworkInterfaceTL* interface,
- RemoteCommandRequestOnAny request,
- const TaskExecutor::CallbackHandle& cbHandle);
+ struct CommandStateBase : public std::enable_shared_from_this<CommandStateBase> {
+ CommandStateBase(NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_);
+ virtual ~CommandStateBase() = default;
/**
* Use the current RequestState to send out a command request.
*/
- virtual Future<RemoteCommandResponse> sendRequest();
+ virtual Future<RemoteCommandResponse> sendRequest() = 0;
/**
* Return the maximum number of request failures this Command can tolerate
@@ -122,6 +120,11 @@ private:
virtual void setTimer();
/**
+ * Fulfill the promise with the response.
+ */
+ virtual void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) = 0;
+
+ /**
* Fulfill the promise for the Command.
*
* This will throw/invariant if called multiple times. In an ideal world, this would do the
@@ -150,13 +153,56 @@ private:
std::weak_ptr<RequestState> requestStatePtr;
StrongWeakFinishLine finishLine;
- Promise<RemoteCommandOnAnyResponse> promise;
boost::optional<UUID> operationKey;
};
+ struct CommandState final : public CommandStateBase {
+ CommandState(NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_);
+ ~CommandState() = default;
+
+ // Create a new CommandState in a shared_ptr
+ // Prefer this over raw construction
+ static auto make(NetworkInterfaceTL* interface,
+ RemoteCommandRequestOnAny request,
+ const TaskExecutor::CallbackHandle& cbHandle);
+
+ Future<RemoteCommandResponse> sendRequest() override;
+
+ void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
+
+ Promise<RemoteCommandOnAnyResponse> promise;
+ };
+
+ struct ExhaustCommandState final : public CommandStateBase {
+ ExhaustCommandState(NetworkInterfaceTL* interface_,
+ RemoteCommandRequestOnAny request_,
+ const TaskExecutor::CallbackHandle& cbHandle_,
+ RemoteCommandOnReplyFn&& onReply_);
+ virtual ~ExhaustCommandState() = default;
+
+ // Create a new ExhaustCommandState in a shared_ptr
+ // Prefer this over raw construction
+ static auto make(NetworkInterfaceTL* interface,
+ RemoteCommandRequestOnAny request,
+ const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandOnReplyFn&& onReply);
+
+ Future<RemoteCommandResponse> sendRequest() override;
+
+ void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
+
+ Promise<void> promise;
+ RemoteCommandResponse prevResponse;
+ Mutex _onReplyMutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex");
+ RemoteCommandOnReplyFn onReplyFn;
+ };
+
struct RequestState final : public std::enable_shared_from_this<RequestState> {
- RequestState(std::shared_ptr<CommandState> cmdState_)
+ RequestState(std::shared_ptr<CommandStateBase> cmdState_)
: cmdState{std::move(cmdState_)},
connFinishLine(cmdState->requestOnAny.target.size()) {}
~RequestState();
@@ -198,7 +244,7 @@ private:
return cmdState->interface;
}
- std::shared_ptr<CommandState> cmdState;
+ std::shared_ptr<CommandStateBase> cmdState;
ClockSource::StopWatch stopwatch;
@@ -263,7 +309,7 @@ private:
Mutex _inProgressMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_inProgressMutex");
- stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress;
+ stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandStateBase>> _inProgress;
stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>>
_inProgressAlarms;
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index 80aa571c2bd..ca8211af7b0 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -178,7 +178,7 @@ private:
// called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated.
AsyncDBClient::RemoteCommandCallbackFn _callbackFn =
- [&](const executor::RemoteCommandResponse& response) {
+ [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) {
{
stdx::unique_lock<Latch> lk(_mutex);
_reply = response;
@@ -302,6 +302,18 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
nullptr};
assertOK(failpointHandle->runCommandRequest(configureFailPointRequest).get());
+ ON_BLOCK_EXIT([&] {
+ auto stopFpRequest = executor::RemoteCommandRequest{server,
+ "admin",
+ BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "off"),
+ BSONObj(),
+ nullptr};
+ assertOK(failpointHandle->runCommandRequest(stopFpRequest).get());
+ });
+
// Send a dummy topologyVersion because the mongod generates this and sends it to the client on
// the initial handshake.
auto isMasterRequest = executor::RemoteCommandRequest{
@@ -323,18 +335,6 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
ASSERT_OK(reply.status);
ASSERT_EQ(reply.data["ok"].Double(), 0.0);
}
-
- ON_BLOCK_EXIT([&] {
- auto stopFpRequest = executor::RemoteCommandRequest{server,
- "admin",
- BSON("configureFailPoint"
- << "failCommand"
- << "mode"
- << "off"),
- BSONObj(),
- nullptr};
- assertOK(failpointHandle->runCommandRequest(stopFpRequest).get());
- });
}
} // namespace
diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h
index f57c2a35200..f07d83780a9 100644
--- a/src/mongo/util/clock_source.h
+++ b/src/mongo/util/clock_source.h
@@ -60,7 +60,7 @@ class ClockSource {
public:
/**
* A StopWatch tracks the time that its ClockSource believes has passed since the creation of
- * the StopWatch.
+ * the StopWatch or since 'restart' has been invoked.
*
* For microsecond accurate metrics, use a Timer instead.
*/
@@ -87,9 +87,13 @@ public:
return now() - _start;
}
+ auto restart() {
+ _start = now();
+ }
+
private:
ClockSource* const _clockSource;
- const Date_t _start;
+ Date_t _start;
};
virtual ~ClockSource() = default;