diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-12-05 22:37:49 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-05 22:37:49 +0000 |
commit | acac11eca3f8e76647273b20008f1f486e27518e (patch) | |
tree | 1ebf2436f4dc4549cb0593bad37880a9a1ddd8b6 | |
parent | d5d00a79c3ba6e7379edaa35fd6bcf820097f369 (diff) | |
download | mongo-acac11eca3f8e76647273b20008f1f486e27518e.tar.gz |
SERVER-44567 Reimplement CommandState destructors for v4.0
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 191 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 30 |
2 files changed, 125 insertions, 96 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 607f26d64f2..ec2382ea209 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -169,6 +169,47 @@ Date_t NetworkInterfaceTL::now() { return _reactor->now(); } +NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_, + RemoteCommandRequest request_, + const TaskExecutor::CallbackHandle& cbHandle_, + Promise<RemoteCommandResponse> promise_) + : interface(interface_), + request(std::move(request_)), + cbHandle(cbHandle_), + promise(std::move(promise_)) { + start = interface->now(); + if (request.timeout != request.kNoTimeout) { + deadline = start + request.timeout; + } +} + + +auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface, + RemoteCommandRequest request, + const TaskExecutor::CallbackHandle& cbHandle, + Promise<RemoteCommandResponse> promise) { + auto state = + std::make_shared<CommandState>(interface, std::move(request), cbHandle, std::move(promise)); + + { + stdx::lock_guard<stdx::mutex> lk(interface->_inProgressMutex); + interface->_inProgress.insert({cbHandle, state}); + } + + return state; +} + +NetworkInterfaceTL::CommandState::~CommandState() { + // Each CommandState has its lifetime extended via binding to callbacks, all of which happen to + // be destructed when the client object is told to cancel. This is a very oblique way to force + // destruction of the CommandState before its interface is destroyed. + invariant(interface); + + { + stdx::lock_guard<stdx::mutex> lk(interface->_inProgressMutex); + interface->_inProgress.erase(cbHandle); + } +} Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, @@ -192,22 +233,31 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa } auto pf = makePromiseFuture<RemoteCommandResponse>(); - auto state = std::make_shared<CommandState>(request, cbHandle, std::move(pf.promise)); - { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgress.insert({state->cbHandle, state}); - } - - state->start = now(); - if (state->request.timeout != state->request.kNoTimeout) { - state->deadline = state->start + state->request.timeout; - } + auto cmdState = CommandState::make(this, request, cbHandle, std::move(pf.promise)); + + std::move(pf.future) + .onError([](Status error) -> StatusWith<RemoteCommandResponse> { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, cmdState, onFinish](StatusWith<RemoteCommandResponse> response) { + auto duration = now() - cmdState->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + const auto& rs = response.getValue(); + LOG(2) << "Request " << cmdState->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) { log() << "Discarding command due to failpoint before acquireConn"; - std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { - onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); - }); return Status::OK(); } @@ -221,12 +271,12 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // return on the reactor thread. // // TODO: get rid of this cruft once we have a connection pool that's executor aware. - auto connFuture = _reactor->execute([this, state, request, baton] { + auto connFuture = _reactor->execute([this, cmdState, request, baton] { return makeReadyFutureWith([this, request] { return _pool->get(request.target, request.sslMode, request.timeout); }) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " << state->request.id + .tapError([cmdState](Status error) { + LOG(2) << "Failed to get connection from pool for request " << cmdState->request.id << ": " << error; }) .then([this, baton](ConnectionPool::ConnectionHandle conn) { @@ -238,74 +288,50 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa }); }); - auto remainingWork = [ - this, - state, - // TODO: once SERVER-35685 is done, stop using a `std::shared_ptr<Future>` here. - future = std::make_shared<decltype(pf.future)>(std::move(pf.future)), - baton, - onFinish - ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + auto resolver = [this, cmdState, baton]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { - return _onAcquireConn( - state, std::move(*future), std::move(*uassertStatusOK(swConn)), baton); - }) - .onError([](Status error) -> StatusWith<RemoteCommandResponse> { - // The TransportLayer has, for historical reasons returned SocketException for - // network errors, but sharding assumes HostUnreachable on network errors. - if (error == ErrorCodes::SocketException) { - error = Status(ErrorCodes::HostUnreachable, error.reason()); - } - return error; - }) - .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) { - auto duration = now() - state->start; - if (!response.isOK()) { - onFinish(RemoteCommandResponse(response.getStatus(), duration)); - } else { - const auto& rs = response.getValue(); - LOG(2) << "Request " << state->request.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - onFinish(rs); - } - }); + auto conn = std::move(*uassertStatusOK(swConn)); + + if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { + conn->indicateSuccess(); + return; + } + + _onAcquireConn(cmdState, std::move(conn), baton); + }).getAsync([&](Status status) { + if (!status.isOK() && !cmdState->done.swap(true)) { + // done is potentially set via callbacks in _onAcquireConn(). This branch most + // likely means that _onAcquireConn() wasn't able to schedule async work + cmdState->promise.setError(std::move(status)); + } + }); }; - if (baton) { - // If we have a baton, we want to get back to the baton thread immediately after we get a - // connection - std::move(connFuture).getAsync([ - baton, - rw = std::move(remainingWork) - ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { - baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { - std::move(rw)(std::move(swConn)); - }); + std::move(connFuture) + .getAsync([ this, resolver = std::move(resolver), baton ]( + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + if (baton) { + // If we have a baton, we want to get back to the baton thread immediately after we + // get a connection + baton->schedule( + [ resolver = std::move(resolver), swConn = std::move(swConn) ]() mutable { + std::move(resolver)(std::move(swConn)); + }); + } else { + // otherwise we're happy to run inline + std::move(resolver)(std::move(swConn)); + } }); - } else { - // otherwise we're happy to run inline - std::move(connFuture) - .getAsync([rw = std::move(remainingWork)]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { - std::move(rw)(std::move(swConn)); - }); - } return Status::OK(); } // This is only called from within a then() callback on a future, so throwing is equivalent to // returning a ready Future with a not-OK status. -Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( - std::shared_ptr<CommandState> state, - Future<RemoteCommandResponse> future, - CommandState::ConnHandle conn, - const transport::BatonHandle& baton) { - if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { - conn->indicateSuccess(); - return future; - } - +void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state, + CommandState::ConnHandle conn, + const transport::BatonHandle& baton) { if (state->done.load()) { conn->indicateSuccess(); uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); @@ -369,7 +395,6 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( return RemoteCommandResponse(std::move(response)); }) .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) { - _eraseInUseConn(state->cbHandle); if (!swr.isOK()) { state->conn->indicateFailure(swr.getStatus()); } else if (!swr.getValue().isOK()) { @@ -379,8 +404,9 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( state->conn->indicateSuccess(); } - if (state->done.swap(true)) + if (state->done.swap(true)) { return; + } if (getTestCommandsEnabled()) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -397,13 +423,6 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( state->promise.setFromStatusWith(std::move(swr)); }); - - return future; -} - -void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgress.erase(cbHandle); } void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, @@ -413,7 +432,11 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan if (it == _inProgress.end()) { return; } - auto state = it->second; + auto state = it->second.lock(); + if (!state) { + return; + } + _inProgress.erase(it); lk.unlock(); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 048aaba7f5b..6df2097835a 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -82,12 +82,20 @@ public: private: struct CommandState { - CommandState(RemoteCommandRequest request_, - TaskExecutor::CallbackHandle cbHandle_, - Promise<RemoteCommandResponse> promise_) - : request(std::move(request_)), - cbHandle(std::move(cbHandle_)), - promise(std::move(promise_)) {} + CommandState(NetworkInterfaceTL* interface_, + RemoteCommandRequest request_, + const TaskExecutor::CallbackHandle& cbHandle_, + Promise<RemoteCommandResponse> promise_); + ~CommandState(); + + // Create a new CommandState in a shared_ptr + // Prefer this over raw construction + static auto make(NetworkInterfaceTL* interface, + RemoteCommandRequest request, + const TaskExecutor::CallbackHandle& cbHandle, + Promise<RemoteCommandResponse> promise); + + NetworkInterfaceTL* interface; RemoteCommandRequest request; TaskExecutor::CallbackHandle cbHandle; @@ -113,11 +121,9 @@ private: }; void _run(); - void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); - Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, - Future<RemoteCommandResponse> future, - CommandState::ConnHandle conn, - const transport::BatonHandle& baton); + void _onAcquireConn(std::shared_ptr<CommandState> state, + CommandState::ConnHandle conn, + const transport::BatonHandle& baton); std::string _instanceName; ServiceContext* _svcCtx; @@ -137,7 +143,7 @@ private: stdx::thread _ioThread; stdx::mutex _inProgressMutex; - stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<CommandState>> _inProgress; + stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress; stdx::unordered_set<std::shared_ptr<transport::ReactorTimer>> _inProgressAlarms; stdx::condition_variable _workReadyCond; |