summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-12-05 22:37:49 +0000
committerevergreen <evergreen@mongodb.com>2019-12-05 22:37:49 +0000
commitacac11eca3f8e76647273b20008f1f486e27518e (patch)
tree1ebf2436f4dc4549cb0593bad37880a9a1ddd8b6
parentd5d00a79c3ba6e7379edaa35fd6bcf820097f369 (diff)
downloadmongo-acac11eca3f8e76647273b20008f1f486e27518e.tar.gz
SERVER-44567 Reimplement CommandState destructors for v4.0
-rw-r--r--src/mongo/executor/network_interface_tl.cpp191
-rw-r--r--src/mongo/executor/network_interface_tl.h30
2 files changed, 125 insertions, 96 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 607f26d64f2..ec2382ea209 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -169,6 +169,47 @@ Date_t NetworkInterfaceTL::now() {
return _reactor->now();
}
+NetworkInterfaceTL::CommandState::CommandState(NetworkInterfaceTL* interface_,
+ RemoteCommandRequest request_,
+ const TaskExecutor::CallbackHandle& cbHandle_,
+ Promise<RemoteCommandResponse> promise_)
+ : interface(interface_),
+ request(std::move(request_)),
+ cbHandle(cbHandle_),
+ promise(std::move(promise_)) {
+ start = interface->now();
+ if (request.timeout != request.kNoTimeout) {
+ deadline = start + request.timeout;
+ }
+}
+
+
+auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
+ RemoteCommandRequest request,
+ const TaskExecutor::CallbackHandle& cbHandle,
+ Promise<RemoteCommandResponse> promise) {
+ auto state =
+ std::make_shared<CommandState>(interface, std::move(request), cbHandle, std::move(promise));
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(interface->_inProgressMutex);
+ interface->_inProgress.insert({cbHandle, state});
+ }
+
+ return state;
+}
+
+NetworkInterfaceTL::CommandState::~CommandState() {
+ // Each CommandState has its lifetime extended via binding to callbacks, all of which happen to
+ // be destructed when the client object is told to cancel. This is a very oblique way to force
+ // destruction of the CommandState before its interface is destroyed.
+ invariant(interface);
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(interface->_inProgressMutex);
+ interface->_inProgress.erase(cbHandle);
+ }
+}
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
@@ -192,22 +233,31 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
}
auto pf = makePromiseFuture<RemoteCommandResponse>();
- auto state = std::make_shared<CommandState>(request, cbHandle, std::move(pf.promise));
- {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgress.insert({state->cbHandle, state});
- }
-
- state->start = now();
- if (state->request.timeout != state->request.kNoTimeout) {
- state->deadline = state->start + state->request.timeout;
- }
+ auto cmdState = CommandState::make(this, request, cbHandle, std::move(pf.promise));
+
+ std::move(pf.future)
+ .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
+ // The TransportLayer has, for historical reasons returned SocketException for
+ // network errors, but sharding assumes HostUnreachable on network errors.
+ if (error == ErrorCodes::SocketException) {
+ error = Status(ErrorCodes::HostUnreachable, error.reason());
+ }
+ return error;
+ })
+ .getAsync([this, cmdState, onFinish](StatusWith<RemoteCommandResponse> response) {
+ auto duration = now() - cmdState->start;
+ if (!response.isOK()) {
+ onFinish(RemoteCommandResponse(response.getStatus(), duration));
+ } else {
+ const auto& rs = response.getValue();
+ LOG(2) << "Request " << cmdState->request.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) {
log() << "Discarding command due to failpoint before acquireConn";
- std::move(pf.future).getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
- onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
- });
return Status::OK();
}
@@ -221,12 +271,12 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// return on the reactor thread.
//
// TODO: get rid of this cruft once we have a connection pool that's executor aware.
- auto connFuture = _reactor->execute([this, state, request, baton] {
+ auto connFuture = _reactor->execute([this, cmdState, request, baton] {
return makeReadyFutureWith([this, request] {
return _pool->get(request.target, request.sslMode, request.timeout);
})
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request " << state->request.id
+ .tapError([cmdState](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << cmdState->request.id
<< ": " << error;
})
.then([this, baton](ConnectionPool::ConnectionHandle conn) {
@@ -238,74 +288,50 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
});
});
- auto remainingWork = [
- this,
- state,
- // TODO: once SERVER-35685 is done, stop using a `std::shared_ptr<Future>` here.
- future = std::make_shared<decltype(pf.future)>(std::move(pf.future)),
- baton,
- onFinish
- ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ auto resolver = [this, cmdState, baton](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- return _onAcquireConn(
- state, std::move(*future), std::move(*uassertStatusOK(swConn)), baton);
- })
- .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
- // The TransportLayer has, for historical reasons returned SocketException for
- // network errors, but sharding assumes HostUnreachable on network errors.
- if (error == ErrorCodes::SocketException) {
- error = Status(ErrorCodes::HostUnreachable, error.reason());
- }
- return error;
- })
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
- auto duration = now() - state->start;
- if (!response.isOK()) {
- onFinish(RemoteCommandResponse(response.getStatus(), duration));
- } else {
- const auto& rs = response.getValue();
- LOG(2) << "Request " << state->request.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
- onFinish(rs);
- }
- });
+ auto conn = std::move(*uassertStatusOK(swConn));
+
+ if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
+ conn->indicateSuccess();
+ return;
+ }
+
+ _onAcquireConn(cmdState, std::move(conn), baton);
+ }).getAsync([&](Status status) {
+ if (!status.isOK() && !cmdState->done.swap(true)) {
+ // done is potentially set via callbacks in _onAcquireConn(). This branch most
+ // likely means that _onAcquireConn() wasn't able to schedule async work
+ cmdState->promise.setError(std::move(status));
+ }
+ });
};
- if (baton) {
- // If we have a baton, we want to get back to the baton thread immediately after we get a
- // connection
- std::move(connFuture).getAsync([
- baton,
- rw = std::move(remainingWork)
- ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
- baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
- std::move(rw)(std::move(swConn));
- });
+ std::move(connFuture)
+ .getAsync([ this, resolver = std::move(resolver), baton ](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ if (baton) {
+ // If we have a baton, we want to get back to the baton thread immediately after we
+ // get a connection
+ baton->schedule(
+ [ resolver = std::move(resolver), swConn = std::move(swConn) ]() mutable {
+ std::move(resolver)(std::move(swConn));
+ });
+ } else {
+ // otherwise we're happy to run inline
+ std::move(resolver)(std::move(swConn));
+ }
});
- } else {
- // otherwise we're happy to run inline
- std::move(connFuture)
- .getAsync([rw = std::move(remainingWork)](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
- std::move(rw)(std::move(swConn));
- });
- }
return Status::OK();
}
// This is only called from within a then() callback on a future, so throwing is equivalent to
// returning a ready Future with a not-OK status.
-Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
- std::shared_ptr<CommandState> state,
- Future<RemoteCommandResponse> future,
- CommandState::ConnHandle conn,
- const transport::BatonHandle& baton) {
- if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
- conn->indicateSuccess();
- return future;
- }
-
+void NetworkInterfaceTL::_onAcquireConn(std::shared_ptr<CommandState> state,
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton) {
if (state->done.load()) {
conn->indicateSuccess();
uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
@@ -369,7 +395,6 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
return RemoteCommandResponse(std::move(response));
})
.getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) {
- _eraseInUseConn(state->cbHandle);
if (!swr.isOK()) {
state->conn->indicateFailure(swr.getStatus());
} else if (!swr.getValue().isOK()) {
@@ -379,8 +404,9 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
state->conn->indicateSuccess();
}
- if (state->done.swap(true))
+ if (state->done.swap(true)) {
return;
+ }
if (getTestCommandsEnabled()) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -397,13 +423,6 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
state->promise.setFromStatusWith(std::move(swr));
});
-
- return future;
-}
-
-void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbHandle) {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgress.erase(cbHandle);
}
void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
@@ -413,7 +432,11 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
if (it == _inProgress.end()) {
return;
}
- auto state = it->second;
+ auto state = it->second.lock();
+ if (!state) {
+ return;
+ }
+
_inProgress.erase(it);
lk.unlock();
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 048aaba7f5b..6df2097835a 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -82,12 +82,20 @@ public:
private:
struct CommandState {
- CommandState(RemoteCommandRequest request_,
- TaskExecutor::CallbackHandle cbHandle_,
- Promise<RemoteCommandResponse> promise_)
- : request(std::move(request_)),
- cbHandle(std::move(cbHandle_)),
- promise(std::move(promise_)) {}
+ CommandState(NetworkInterfaceTL* interface_,
+ RemoteCommandRequest request_,
+ const TaskExecutor::CallbackHandle& cbHandle_,
+ Promise<RemoteCommandResponse> promise_);
+ ~CommandState();
+
+ // Create a new CommandState in a shared_ptr
+ // Prefer this over raw construction
+ static auto make(NetworkInterfaceTL* interface,
+ RemoteCommandRequest request,
+ const TaskExecutor::CallbackHandle& cbHandle,
+ Promise<RemoteCommandResponse> promise);
+
+ NetworkInterfaceTL* interface;
RemoteCommandRequest request;
TaskExecutor::CallbackHandle cbHandle;
@@ -113,11 +121,9 @@ private:
};
void _run();
- void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
- Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
- Future<RemoteCommandResponse> future,
- CommandState::ConnHandle conn,
- const transport::BatonHandle& baton);
+ void _onAcquireConn(std::shared_ptr<CommandState> state,
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton);
std::string _instanceName;
ServiceContext* _svcCtx;
@@ -137,7 +143,7 @@ private:
stdx::thread _ioThread;
stdx::mutex _inProgressMutex;
- stdx::unordered_map<TaskExecutor::CallbackHandle, std::shared_ptr<CommandState>> _inProgress;
+ stdx::unordered_map<TaskExecutor::CallbackHandle, std::weak_ptr<CommandState>> _inProgress;
stdx::unordered_set<std::shared_ptr<transport::ReactorTimer>> _inProgressAlarms;
stdx::condition_variable _workReadyCond;