diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-03-20 13:01:55 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-06 04:45:54 +0000 |
commit | 22e37d4959198d64c9422e1433e7066cb454d216 (patch) | |
tree | 820aaa8735ff77445efbcbf71f2e3c966d272276 | |
parent | a29674836e2732727383f3ecbd78f99e790ea1ae (diff) | |
download | mongo-22e37d4959198d64c9422e1433e7066cb454d216.tar.gz |
SERVER-46821 Allow NetworkingBatons to send work to ASIOReactor after detach
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 130 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 13 | ||||
-rw-r--r-- | src/mongo/transport/baton.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 43 |
5 files changed, 86 insertions, 107 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index a988ef76420..a33fb359835 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -277,7 +277,7 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface, return std::pair(state, std::move(future)); } -AsyncDBClient* NetworkInterfaceTL::RequestState::client() noexcept { +AsyncDBClient* NetworkInterfaceTL::RequestState::getClient(const ConnectionHandle& conn) noexcept { if (!conn) { return nullptr; } @@ -321,9 +321,7 @@ void NetworkInterfaceTL::CommandStateBase::setTimer() { } void NetworkInterfaceTL::RequestState::returnConnection(Status status) noexcept { - // Settle the connection object on the reactor invariant(conn); - invariant(interface()->_reactor->onReactorThread()); auto connToReturn = std::exchange(conn, {}); @@ -369,25 +367,10 @@ void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept { } void NetworkInterfaceTL::RequestState::cancel() noexcept { - auto& reactor = interface()->_reactor; - - // If we failed, then get the client to finish up. - // Note: CommandState::returnConnection() and CommandState::cancel() run on the reactor - // thread only. One goes first and then the other, so there isn't a risk of canceling - // the next command to run on the connection. - if (reactor->onReactorThread()) { - if (auto clientPtr = client()) { - // If we have a client, cancel it - clientPtr->cancel(cmdState->baton); - } - } else { - ExecutorFuture<void>(reactor).getAsync([this, anchor = shared_from_this()](Status status) { - invariant(status.isOK()); - if (auto clientPtr = client()) { - // If we have a client, cancel it - clientPtr->cancel(cmdState->baton); - } - }); + auto connToCancel = weakConn.lock(); + if (auto clientPtr = getClient(connToCancel)) { + // If we have a client, cancel it + clientPtr->cancel(cmdState->baton); } } @@ -526,7 +509,8 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest(size return makeReadyFutureWith([this, requestState] { setTimer(); - return requestState->client()->runCommandRequest(*requestState->request, baton); + return RequestState::getClient(requestState->conn) + ->runCommandRequest(*requestState->request, baton); }) .then([this, requestState](RemoteCommandResponse response) { doMetadataHook(RemoteCommandOnAnyResponse(requestState->host, response)); @@ -748,6 +732,7 @@ void NetworkInterfaceTL::RequestState::send(StatusWith<ConnectionPool::Connectio request.emplace(remoteCommandRequest); host = request.get().target; conn = std::move(swConn.getValue()); + weakConn = conn; networkInterfaceHangCommandsAfterAcquireConn.pauseWhileSet(); @@ -781,74 +766,38 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut return RemoteCommandOnAnyResponse(host, std::move(error), stopwatch.elapsed()); }); - if (baton) { - // If we have a baton then use it for the promise and then switch to the reactor to return - // our connection. - std::move(anyFuture) - .thenRunOn(baton) - .onCompletion([ this, anchor = shared_from_this() ](auto swr) noexcept { - auto response = uassertStatusOK(swr); - auto status = swr.getValue().status; - auto commandStatus = getStatusFromCommandResult(response.data); - - // Ignore maxTimeMS expiration errors for hedged reads. - if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) { - LOGV2_DEBUG(4660700, - 2, - "Hedged request returned status", - "requestId"_attr = request->id, - "target"_attr = request->target, - "status"_attr = commandStatus); - } else { - if (cmdState->finishLine.arriveStrongly()) { - if (isHedge) { - auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx); - invariant(hm); - hm->incrementNumAdvantageouslyHedgedOperations(); - } - fulfilledPromise = true; - cmdState->fulfillFinalPromise(std::move(response)); - } - } + std::move(anyFuture) // + .thenRunOn(makeGuaranteedExecutor(baton, reactor)) // Switch to the baton/reactor. + .getAsync([ this, anchor = shared_from_this() ](auto swr) noexcept { + auto response = uassertStatusOK(swr); + auto status = response.status; - return status; - }) - .thenRunOn(reactor) - .getAsync( - [this, anchor = shared_from_this()](Status status) { returnConnection(status); }); - } else { - // If we do not have a baton, then we can fulfill the promise and return our connection in - // the same callback - std::move(anyFuture).thenRunOn(reactor).getAsync( - [ this, anchor = shared_from_this() ](auto swr) noexcept { - auto response = uassertStatusOK(swr); - auto status = response.status; - auto commandStatus = getStatusFromCommandResult(response.data); - ON_BLOCK_EXIT([&] { returnConnection(status); }); - - if (!cmdState->finishLine.arriveStrongly()) { - return; - } + returnConnection(status); - // Ignore maxTimeMS expiration errors for hedged reads - if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) { - LOGV2_DEBUG(4660701, - 2, - "Hedged request returned status", - "requestId"_attr = request->id, - "target"_attr = request->target, - "status"_attr = commandStatus); - } else { - if (isHedge) { - auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx); - invariant(hm); - hm->incrementNumAdvantageouslyHedgedOperations(); - } - fulfilledPromise = true; - cmdState->fulfillFinalPromise(std::move(response)); + auto commandStatus = getStatusFromCommandResult(response.data); + + if (!cmdState->finishLine.arriveStrongly()) { + return; + } + + // Ignore maxTimeMS expiration errors for hedged reads + if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) { + LOGV2_DEBUG(4660701, + 2, + "Hedged request returned status", + "requestId"_attr = request->id, + "target"_attr = request->target, + "status"_attr = commandStatus); + } else { + if (isHedge) { + auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx); + invariant(hm); + hm->incrementNumAdvantageouslyHedgedOperations(); } - }); - } + fulfilledPromise = true; + cmdState->fulfillFinalPromise(std::move(response)); + } + }); } NetworkInterfaceTL::ExhaustCommandState::ExhaustCommandState( @@ -915,8 +864,9 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque return makeReadyFutureWith( [this, requestState, clientCallback = std::move(clientCallback)]() mutable { setTimer(); - return requestState->client()->runExhaustCommandRequest( - *requestState->request, std::move(clientCallback), baton); + return RequestState::getClient(requestState->conn) + ->runExhaustCommandRequest( + *requestState->request, std::move(clientCallback), baton); }) .then([this, requestState] { return prevResponse; }); } diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 34bec18a411..d0c43e1277f 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -244,22 +244,20 @@ private: }; struct RequestState final : public std::enable_shared_from_this<RequestState> { + using ConnectionHandle = std::shared_ptr<ConnectionPool::ConnectionHandle::element_type>; + using WeakConnectionHandle = std::weak_ptr<ConnectionPool::ConnectionHandle::element_type>; RequestState(RequestManager* mgr, std::shared_ptr<CommandStateBase> cmdState_, size_t id) : cmdState{std::move(cmdState_)}, requestManager(mgr), reqId(id) {} ~RequestState(); /** - * Return the client object bound to the current command or nullptr if there isn't one. - * - * This is only useful on the networking thread (i.e. the reactor). + * Return the client for a given connection */ - AsyncDBClient* client() noexcept; + static AsyncDBClient* getClient(const ConnectionHandle& conn) noexcept; /** * Cancel the current client operation or do nothing if there is no client. - * - * This must be called from the networking thread (i.e. the reactor). */ void cancel() noexcept; @@ -295,7 +293,8 @@ private: boost::optional<RemoteCommandRequest> request; HostAndPort host; - ConnectionPool::ConnectionHandle conn; + ConnectionHandle conn; + WeakConnectionHandle weakConn; // Internal id of this request as tracked by the RequestManager. size_t reqId; diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index a4c4f1bb2b2..b65e8a539ca 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -91,6 +91,8 @@ public: NetworkingBaton* networking() noexcept final { return this; } + + virtual bool canWait() noexcept = 0; }; } // namespace transport diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 5e8064c2805..1d89194ee98 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -176,6 +176,11 @@ public: return std::move(pf.future); } + bool canWait() noexcept override { + stdx::lock_guard<Latch> lk(_mutex); + return _opCtx; + } + bool cancelSession(Session& session) noexcept override { const auto id = session.id(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 17e6a7c0301..e4a40043c0d 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -189,11 +189,12 @@ public: 3, "Cancelling outstanding I/O operations on connection to {remote}", "remote"_attr = _remote); - if (baton && baton->networking()) { - baton->networking()->cancelSession(*this); - } else { - getSocket().cancel(); + if (baton && baton->networking() && baton->networking()->cancelSession(*this)) { + // If we have a baton, it was for networking, and it owned our session, then we're done. + return; } + + getSocket().cancel(); } void setTimeout(boost::optional<Milliseconds> timeout) override { @@ -424,6 +425,7 @@ private: template <typename MutableBufferSequence> Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) { + // TODO SERVER-47229 Guard active ops for cancelation here. #ifdef MONGO_CONFIG_SSL if (_sslSocket) { return opportunisticRead(*_sslSocket, buffers, baton); @@ -449,6 +451,7 @@ private: template <typename ConstBufferSequence> Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) { + // TODO SERVER-47229 Guard active ops for cancelation here. #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { @@ -503,9 +506,19 @@ private: asyncBuffers += size; } - if (baton && baton->networking()) { - return baton->networking() - ->addSession(*this, NetworkingBaton::Type::In) + if (auto networkingBaton = baton ? baton->networking() : nullptr; + networkingBaton && networkingBaton->canWait()) { + return networkingBaton->addSession(*this, NetworkingBaton::Type::In) + .onError([](Status error) { + if (ErrorCodes::isCancelationError(error)) { + // If the baton has detached, it will cancel its polling. We catch that + // error here and return Status::OK so that we invoke + // opportunisticRead() again and switch to asio::async_read() below. + return Status::OK(); + } + + return error; + }) .then([&stream, asyncBuffers, baton, this] { return opportunisticRead(stream, asyncBuffers, baton); }); @@ -591,9 +604,19 @@ private: return std::move(*more); } - if (baton && baton->networking()) { - return baton->networking() - ->addSession(*this, NetworkingBaton::Type::Out) + if (auto networkingBaton = baton ? baton->networking() : nullptr; + networkingBaton && networkingBaton->canWait()) { + return networkingBaton->addSession(*this, NetworkingBaton::Type::Out) + .onError([](Status error) { + if (ErrorCodes::isCancelationError(error)) { + // If the baton has detached, it will cancel its polling. We catch that + // error here and return Status::OK so that we invoke + // opportunisticWrite() again and switch to asio::async_write() below. + return Status::OK(); + } + + return error; + }) .then([&stream, asyncBuffers, baton, this] { return opportunisticWrite(stream, asyncBuffers, baton); }); |