diff options
-rw-r--r-- | src/mongo/client/async_client.cpp | 6 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_fixture.cpp | 33 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_fixture.h | 5 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 134 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 172 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 78 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_integration_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/util/clock_source.h | 8 |
12 files changed, 434 insertions, 48 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index cd7a3f78bd9..1aceecbfd94 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -303,10 +303,12 @@ Future<void> AsyncDBClient::_continueReceiveExhaustResponse( // Run callback auto now = exhaustParameters.clkSource->now(); auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start); + bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome); rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg)); - exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration)); + exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration), + isMoreToComeSet); - if (!OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome)) { + if (!isMoreToComeSet) { return Status::OK(); } diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index 28e0bc2e9da..c67c690554c 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -47,7 +47,8 @@ namespace mongo { class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> { public: - using RemoteCommandCallbackFn = unique_function<void(const executor::RemoteCommandResponse&)>; + using RemoteCommandCallbackFn = + unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>; struct ExhaustRequestParameters { ExhaustRequestParameters(ExhaustRequestParameters&&) = default; diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 386893b4881..a228b17899b 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -56,6 +56,8 @@ public: using Response = RemoteCommandResponse; using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; + using RemoteCommandOnReplyFn = + unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>; virtual ~NetworkInterface(); @@ -150,6 +152,10 @@ public: RemoteCommandRequestOnAny& request, RemoteCommandCompletionFn&& onFinish, const BatonHandle& baton = nullptr) = 0; + virtual Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequestOnAny& request, + RemoteCommandOnReplyFn&& onReply, + const BatonHandle& baton = nullptr) = 0; Future<TaskExecutor::ResponseOnAnyStatus> startCommand( const TaskExecutor::CallbackHandle& cbHandle, diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 847fcf4b5b8..6440f3eccf3 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -113,6 +113,39 @@ Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand( }); } +Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand( + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest request, + std::function<void(const RemoteCommandResponse&)> exhaustUtilCB, + const BatonHandle& baton) { + RemoteCommandRequestOnAny rcroa{request}; + auto pf = makePromiseFuture<void>(); + + auto status = net().startExhaustCommand( + cbHandle, + rcroa, + [p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)]( + const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable { + exhaustUtilCB(rs); + + if (!rs.status.isOK()) { + invariant(!isMoreToComeSet); + p.setError(rs.status); + return; + } + + if (!isMoreToComeSet) { + p.emplaceValue(); + } + }, + baton); + + if (!status.isOK()) { + return status; + } + return std::move(pf.future); +} + RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync( RemoteCommandRequest& request) { auto deferred = runCommand(makeCallbackHandle(), request); diff --git a/src/mongo/executor/network_interface_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h index eb164359d89..6bfb64ef563 100644 --- a/src/mongo/executor/network_interface_integration_fixture.h +++ b/src/mongo/executor/network_interface_integration_fixture.h @@ -81,6 +81,11 @@ public: Future<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest request); + Future<void> startExhaustCommand( + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest request, + std::function<void(const RemoteCommandResponse&)> exhaustUtilCB, + const BatonHandle& baton = nullptr); RemoteCommandResponse runCommandSync(RemoteCommandRequest& request); diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index cebca17052f..6838a4dc51c 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -46,6 +46,7 @@ #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/message.h" +#include "mongo/rpc/topology_version_gen.h" #include "mongo/stdx/future.h" #include "mongo/unittest/integration_test.h" #include "mongo/unittest/unittest.h" @@ -589,6 +590,139 @@ TEST_F(NetworkInterfaceTest, IsMasterRequestMissingInternalClientInfoWhenNotInte assertNumOps(0u, 0u, 0u, 1u); } +class ExhaustRequestHandlerUtil { +public: + struct responseOutcomeCount { + int _success = 0; + int _failed = 0; + }; + + std::function<void(const RemoteCommandResponse&)>&& getExhaustRequestCallbackFn() { + return std::move(_callbackFn); + } + + ExhaustRequestHandlerUtil::responseOutcomeCount getCountersWhenReady() { + stdx::unique_lock<Latch> lk(_mutex); + _cv.wait(_mutex, [&] { return _replyUpdated; }); + _replyUpdated = false; + return _responseOutcomeCount; + } + +private: + // set to true once '_responseOutcomeCount' has been updated. Used to indicate that a new + // response has been sent. + bool _replyUpdated = false; + + // counter of how many successful and failed responses were received. + responseOutcomeCount _responseOutcomeCount; + + Mutex _mutex = MONGO_MAKE_LATCH("ExhaustRequestHandlerUtil::_mutex"); + stdx::condition_variable _cv; + + // called when a server sends a new isMaster exhaust response. Updates _responseOutcomeCount + // and _replyUpdated. + std::function<void(const RemoteCommandResponse&)> _callbackFn = + [&](const executor::RemoteCommandResponse& response) { + { + stdx::unique_lock<Latch> lk(_mutex); + if (response.status.isOK()) { + _responseOutcomeCount._success++; + } else { + _responseOutcomeCount._failed++; + } + _replyUpdated = true; + } + + _cv.notify_all(); + }; +}; + +TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldReceiveMultipleResponses) { + auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion" + << TopologyVersion(OID::max(), 0).toBSON()); + + auto request = makeTestCommand(boost::none, isMasterCmd); + auto cbh = makeCallbackHandle(); + ExhaustRequestHandlerUtil exhaustRequestHandler; + + auto exhaustFuture = startExhaustCommand( + cbh, std::move(request), exhaustRequestHandler.getExhaustRequestCallbackFn()); + + { + // The server sends a response either when a topology change occurs or when it has not sent + // a response in 'maxAwaitTimeMS'. In this case we expect a response every 'maxAwaitTimeMS' + // = 1000 (set in the isMaster cmd above) + auto counters = exhaustRequestHandler.getCountersWhenReady(); + ASSERT(!exhaustFuture.isReady()); + + // The first response should be successful + ASSERT_EQ(counters._success, 1); + ASSERT_EQ(counters._failed, 0); + } + + { + auto counters = exhaustRequestHandler.getCountersWhenReady(); + ASSERT(!exhaustFuture.isReady()); + + // The second response should also be successful + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 0); + } + + net().cancelCommand(cbh); + auto error = exhaustFuture.getNoThrow(); + ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable)); + + auto counters = exhaustRequestHandler.getCountersWhenReady(); + + // The command was cancelled so the 'fail' counter should be incremented + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 1); +} + +TEST_F(NetworkInterfaceTest, StartExhaustCommandShouldStopOnFailure) { + // Both assetCommandOK and makeTestCommand target the first host in the connection string, so we + // are guaranteed that the failpoint is set on the same host that we run the exhaust command on. + auto configureFailpointCmd = BSON("configureFailPoint" + << "failCommand" + << "mode" + << "alwaysOn" + << "data" + << BSON("errorCode" << ErrorCodes::CommandFailed + << "failCommands" + << BSON_ARRAY("isMaster"))); + assertCommandOK("admin", configureFailpointCmd); + + ON_BLOCK_EXIT([&] { + auto stopFpRequest = BSON("configureFailPoint" + << "failCommand" + << "mode" + << "off"); + assertCommandOK("admin", stopFpRequest); + }); + + auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion" + << TopologyVersion(OID::max(), 0).toBSON()); + + auto request = makeTestCommand(boost::none, isMasterCmd); + auto cbh = makeCallbackHandle(); + ExhaustRequestHandlerUtil exhaustRequestHandler; + + auto exhaustFuture = startExhaustCommand( + cbh, std::move(request), exhaustRequestHandler.getExhaustRequestCallbackFn()); + + { + auto counters = exhaustRequestHandler.getCountersWhenReady(); + + auto error = exhaustFuture.getNoThrow(); + ASSERT_EQ(error, ErrorCodes::CommandFailed); + + // The response should be marked as failed + ASSERT_EQ(counters._success, 0); + ASSERT_EQ(counters._failed, 1); + } +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 080fe35738e..0d329a64c1c 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -133,6 +133,13 @@ Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, return Status::OK(); } +Status NetworkInterfaceMock::startExhaustCommand(const CallbackHandle& cbHandle, + RemoteCommandRequestOnAny& request, + RemoteCommandOnReplyFn&& onReply, + const BatonHandle& baton) { + MONGO_UNREACHABLE; +} + void NetworkInterfaceMock::setHandshakeReplyForHost( const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 2fb1d313500..c67ee4c58a1 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -113,6 +113,10 @@ public: RemoteCommandRequestOnAny& request, RemoteCommandCompletionFn&& onFinish, const BatonHandle& baton = nullptr) override; + Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequestOnAny& request, + RemoteCommandOnReplyFn&& onReply, + const BatonHandle& baton = nullptr) override; /** * If the network operation is in the _unscheduled or _processing queues, moves the operation diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index fcc650295b5..0bb2d3bb539 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -222,15 +222,20 @@ Date_t NetworkInterfaceTL::now() { return _reactor->now(); } -NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_, - RemoteCommandRequestOnAny request_, - const TaskExecutor::CallbackHandle& cbHandle_) +NetworkInterfaceTL::CommandStateBase::CommandStateBase( + NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_) : interface(interface_), requestOnAny(std::move(request_)), cbHandle(cbHandle_), finishLine(maxRequestFailures()), operationKey(request_.operationKey) {} +NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_) + : CommandStateBase(interface_, std::move(request_), cbHandle_) {} auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface, RemoteCommandRequestOnAny request, @@ -270,7 +275,7 @@ AsyncDBClient* NetworkInterfaceTL::RequestState::client() noexcept { return checked_cast<connection_pool_tl::TLConnection*>(conn.get())->client(); } -void NetworkInterfaceTL::CommandState::setTimer() { +void NetworkInterfaceTL::CommandStateBase::setTimer() { if (deadline == RemoteCommandRequest::kNoExpirationDate) { return; } @@ -301,7 +306,7 @@ void NetworkInterfaceTL::CommandState::setTimer() { << ", op was " << redact(requestOnAny.toString()); LOGV2_DEBUG(22595, 2, "{message}", "message"_attr = message); - promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message)); + fulfillFinalPromise(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, message)); }); } @@ -321,7 +326,7 @@ void NetworkInterfaceTL::RequestState::returnConnection(Status status) noexcept connToReturn->indicateSuccess(); } -void NetworkInterfaceTL::CommandState::tryFinish(Status status) noexcept { +void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept { invariant(finishLine.isReady()); if (timer) { @@ -487,7 +492,8 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest() { }); } -void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyResponse& response) { +void NetworkInterfaceTL::CommandStateBase::doMetadataHook( + const RemoteCommandOnAnyResponse& response) { if (auto& hook = interface->_metadataHook; hook && !finishLine.isReady()) { invariant(response.target); @@ -496,6 +502,11 @@ void NetworkInterfaceTL::CommandState::doMetadataHook(const RemoteCommandOnAnyRe } } +void NetworkInterfaceTL::CommandState::fulfillFinalPromise( + StatusWith<RemoteCommandOnAnyResponse> response) { + promise.setFromStatusWith(std::move(response)); +} + void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::ConnectionHandle> swConn, size_t idx) noexcept { trySend(std::move(swConn), {cmdState->requestOnAny, idx}); @@ -514,10 +525,10 @@ void NetworkInterfaceTL::RequestState::trySend(StatusWith<ConnectionPool::Connec if (cmdState->finishLine.arriveStrongly()) { auto& reactor = interface()->_reactor; if (reactor->onReactorThread()) { - cmdState->promise.setError(swConn.getStatus()); + cmdState->fulfillFinalPromise(swConn.getStatus()); } else { ExecutorFuture<void>(reactor, swConn.getStatus()).getAsync([this](Status status) { - cmdState->promise.setError(std::move(status)); + cmdState->fulfillFinalPromise(std::move(status)); }); } } @@ -577,7 +588,7 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut auto response = uassertStatusOK(swr); auto status = swr.getValue().status; if (cmdState->finishLine.arriveStrongly()) { - cmdState->promise.emplaceValue(std::move(response)); + cmdState->fulfillFinalPromise(std::move(response)); } return status; @@ -597,11 +608,144 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut return; } - cmdState->promise.emplaceValue(std::move(response)); + cmdState->fulfillFinalPromise(std::move(response)); }); } } +NetworkInterfaceTL::ExhaustCommandState::ExhaustCommandState( + NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_, + RemoteCommandOnReplyFn&& onReply_) + : CommandStateBase(interface_, std::move(request_), cbHandle_), + onReplyFn(std::move(onReply_)) {} + +auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface, + RemoteCommandRequestOnAny request, + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandOnReplyFn&& onReply) { + auto state = std::make_shared<ExhaustCommandState>( + interface, std::move(request), cbHandle, std::move(onReply)); + auto [promise, future] = makePromiseFuture<void>(); + state->promise = std::move(promise); + std::move(future) + .onError([state](Status error) { + stdx::lock_guard lk(state->_onReplyMutex); + state->onReplyFn(RemoteCommandOnAnyResponse( + boost::none, std::move(error), state->stopwatch.elapsed()), + false); + }) + .getAsync([state](Status status) { state->tryFinish(status); }); + + { + stdx::lock_guard lk(interface->_inProgressMutex); + interface->_inProgress.insert({cbHandle, state}); + } + + return state; +} + +Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendRequest() { + auto requestState = requestStatePtr.lock(); + invariant(requestState); + + auto clientCallback = [this, requestState](const RemoteCommandResponse& response, + bool isMoreToComeSet) { + // Stash this response on the command state to be used to fulfill the promise. + prevResponse = response; + auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response); + doMetadataHook(onAnyResponse); + + // If the command failed, we will call 'onReply' as a part of the future chain paired with + // the promise. This is to be sure that all error paths will run 'onReply' only once upon + // future completion. + if (!getStatusFromCommandResult(response.data).isOK()) { + // The moreToCome bit should *not* be set if the command failed + invariant(!isMoreToComeSet); + return; + } + + // Reset the stopwatch to measure the correct duration for the folowing reply + stopwatch.restart(); + setTimer(); + + stdx::lock_guard lk(_onReplyMutex); + onReplyFn(onAnyResponse, isMoreToComeSet); + }; + + return makeReadyFutureWith( + [this, requestState, clientCallback = std::move(clientCallback)]() mutable { + setTimer(); + return requestState->client()->runExhaustCommandRequest( + *requestState->request, std::move(clientCallback), baton); + }) + .then([this, requestState] { return prevResponse; }); +} + +void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise( + StatusWith<RemoteCommandOnAnyResponse> response) { + auto status = !response.getStatus().isOK() + ? response.getStatus() + : getStatusFromCommandResult(response.getValue().data); + + if (!status.isOK()) { + promise.setError(status); + return; + } + + promise.emplaceValue(); +} + +Status NetworkInterfaceTL::startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequestOnAny& request, + RemoteCommandOnReplyFn&& onReply, + const BatonHandle& baton) { + if (inShutdown()) { + return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; + } + + LOG(kDiagnosticLogLevel) << "startCommand: " << redact(request.toString()); + + if (_metadataHook) { + BSONObjBuilder newMetadata(std::move(request.metadata)); + + auto status = _metadataHook->writeRequestMetadata(request.opCtx, &newMetadata); + if (!status.isOK()) { + return status; + } + + request.metadata = newMetadata.obj(); + } + + auto cmdState = ExhaustCommandState::make(this, request, cbHandle, std::move(onReply)); + if (cmdState->requestOnAny.timeout != cmdState->requestOnAny.kNoTimeout) { + cmdState->deadline = cmdState->stopwatch.start() + cmdState->requestOnAny.timeout; + } + cmdState->baton = baton; + + auto requestState = std::make_shared<RequestState>(cmdState); + cmdState->requestStatePtr = requestState; + + // Attempt to get a connection to every target host + for (size_t idx = 0; idx < request.target.size() && !requestState->connFinishLine.isReady(); + ++idx) { + auto connFuture = _pool->get(request.target[idx], request.sslMode, request.timeout); + + if (connFuture.isReady()) { + requestState->trySend(std::move(connFuture).getNoThrow(), idx); + continue; + } + + // For every connection future we didn't have immediately ready, schedule + std::move(connFuture).thenRunOn(_reactor).getAsync([requestState, idx](auto swConn) { + requestState->trySend(std::move(swConn), idx); + }); + } + + return Status::OK(); +} + void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const BatonHandle&) { stdx::unique_lock<Latch> lk(_inProgressMutex); @@ -640,7 +784,7 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan "Canceling operation; original request was: {cmdStateToCancel_requestOnAny}", "cmdStateToCancel_requestOnAny"_attr = redact(cmdStateToCancel->requestOnAny.toString())); - cmdStateToCancel->promise.setError( + cmdStateToCancel->fulfillFinalPromise( {ErrorCodes::CallbackCanceled, str::stream() << "Command canceled; original request was: " << redact(cmdStateToCancel->requestOnAny.toString())}); @@ -688,13 +832,13 @@ void NetworkInterfaceTL::_killOperation(std::shared_ptr<RequestState> requestSta if (rs.isOK()) { // _killOperations succeeded but the operation interrupted error is expected to be // ignored in resolve() since cancelCommand() crossed the command finishLine first. - cmdStateToKill->promise.setError( + cmdStateToKill->fulfillFinalPromise( {ErrorCodes::CallbackCanceled, str::stream() << "cancelCommand successfully issued remote interruption"}); } else { // _killOperations timed out or failed due to other errors. rs.status.addContext("operation's client canceled by cancelCommand"); - cmdStateToKill->promise.setError(std::move(rs.status)); + cmdStateToKill->fulfillFinalPromise(std::move(rs.status)); } }); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 8957cd875c6..d89489b1b65 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -74,6 +74,10 @@ public: RemoteCommandRequestOnAny& request, RemoteCommandCompletionFn&& onFinish, const BatonHandle& baton) override; + Status startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequestOnAny& request, + RemoteCommandOnReplyFn&& onReply, + const BatonHandle& baton) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, const BatonHandle& baton) override; @@ -92,22 +96,16 @@ public: private: struct RequestState; - struct CommandState final : public std::enable_shared_from_this<CommandState> { - CommandState(NetworkInterfaceTL* interface_, - RemoteCommandRequestOnAny request_, - const TaskExecutor::CallbackHandle& cbHandle_); - virtual ~CommandState() = default; - - // Create a new CommandState in a shared_ptr - // Prefer this over raw construction - static auto make(NetworkInterfaceTL* interface, - RemoteCommandRequestOnAny request, - const TaskExecutor::CallbackHandle& cbHandle); + struct CommandStateBase : public std::enable_shared_from_this<CommandStateBase> { + CommandStateBase(NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_); + virtual ~CommandStateBase() = default; /** * Use the current RequestState to send out a command request. */ - virtual Future<RemoteCommandResponse> sendRequest(); + virtual Future<RemoteCommandResponse> sendRequest() = 0; /** * Return the maximum number of request failures this Command can tolerate @@ -122,6 +120,11 @@ private: virtual void setTimer(); /** + * Fulfill the promise with the response. + */ + virtual void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) = 0; + + /** * Fulfill the promise for the Command. * * This will throw/invariant if called multiple times. In an ideal world, this would do the @@ -150,13 +153,56 @@ private: std::weak_ptr<RequestState> requestStatePtr; StrongWeakFinishLine finishLine; - Promise<RemoteCommandOnAnyResponse> promise; boost::optional<UUID> operationKey; }; + struct CommandState final : public CommandStateBase { + CommandState(NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_); + ~CommandState() = default; + + // Create a new CommandState in a shared_ptr + // Prefer this over raw construction + static auto make(NetworkInterfaceTL* interface, + RemoteCommandRequestOnAny request, + const TaskExecutor::CallbackHandle& cbHandle); + + Future<RemoteCommandResponse> sendRequest() override; + + void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override; + + Promise<RemoteCommandOnAnyResponse> promise; + }; + + struct ExhaustCommandState final : public CommandStateBase { + ExhaustCommandState(NetworkInterfaceTL* interface_, + RemoteCommandRequestOnAny request_, + const TaskExecutor::CallbackHandle& cbHandle_, + RemoteCommandOnReplyFn&& onReply_); + virtual ~ExhaustCommandState() = default; + + // Create a new ExhaustCommandState in a shared_ptr + // Prefer this over raw construction + static auto make(NetworkInterfaceTL* interface, + RemoteCommandRequestOnAny request, + const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandOnReplyFn&& onReply); + + Future<RemoteCommandResponse> sendRequest() override; + + void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override; + + Promise<void> promise; + RemoteCommandResponse prevResponse; + Mutex _onReplyMutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex"); + RemoteCommandOnReplyFn onReplyFn; + }; + struct RequestState final : public std::enable_shared_from_this<RequestState> { - RequestState(std::shared_ptr<CommandState> cmdState_) + RequestState(std::shared_ptr<CommandStateBase> cmdState_) : cmdState{std::move(cmdState_)}, connFinishLine(cmdState->requestOnAny.target.size()) {} ~RequestState(); @@ -198,7 +244,7 @@ private: return cmdState->interface; } - std::shared_ptr<CommandState> cmdState; + std::shared_ptr<CommandStateBase> cmdState; ClockSource::StopWatch stopwatch; @@ -263,7 +309,7 @@ private: Mutex _inProgressMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_inProgressMutex"); - stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress; + stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandStateBase>> _inProgress; stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<AlarmState>> _inProgressAlarms; diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index 80aa571c2bd..ca8211af7b0 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -178,7 +178,7 @@ private: // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated. AsyncDBClient::RemoteCommandCallbackFn _callbackFn = - [&](const executor::RemoteCommandResponse& response) { + [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) { { stdx::unique_lock<Latch> lk(_mutex); _reply = response; @@ -302,6 +302,18 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { nullptr}; assertOK(failpointHandle->runCommandRequest(configureFailPointRequest).get()); + ON_BLOCK_EXIT([&] { + auto stopFpRequest = executor::RemoteCommandRequest{server, + "admin", + BSON("configureFailPoint" + << "failCommand" + << "mode" + << "off"), + BSONObj(), + nullptr}; + assertOK(failpointHandle->runCommandRequest(stopFpRequest).get()); + }); + // Send a dummy topologyVersion because the mongod generates this and sends it to the client on // the initial handshake. auto isMasterRequest = executor::RemoteCommandRequest{ @@ -323,18 +335,6 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { ASSERT_OK(reply.status); ASSERT_EQ(reply.data["ok"].Double(), 0.0); } - - ON_BLOCK_EXIT([&] { - auto stopFpRequest = executor::RemoteCommandRequest{server, - "admin", - BSON("configureFailPoint" - << "failCommand" - << "mode" - << "off"), - BSONObj(), - nullptr}; - assertOK(failpointHandle->runCommandRequest(stopFpRequest).get()); - }); } } // namespace diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h index f57c2a35200..f07d83780a9 100644 --- a/src/mongo/util/clock_source.h +++ b/src/mongo/util/clock_source.h @@ -60,7 +60,7 @@ class ClockSource { public: /** * A StopWatch tracks the time that its ClockSource believes has passed since the creation of - * the StopWatch. + * the StopWatch or since 'restart' has been invoked. * * For microsecond accurate metrics, use a Timer instead. */ @@ -87,9 +87,13 @@ public: return now() - _start; } + auto restart() { + _start = now(); + } + private: ClockSource* const _clockSource; - const Date_t _start; + Date_t _start; }; virtual ~ClockSource() = default; |