summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-03-20 13:01:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-06 04:45:54 +0000
commit22e37d4959198d64c9422e1433e7066cb454d216 (patch)
tree820aaa8735ff77445efbcbf71f2e3c966d272276
parenta29674836e2732727383f3ecbd78f99e790ea1ae (diff)
downloadmongo-22e37d4959198d64c9422e1433e7066cb454d216.tar.gz
SERVER-46821 Allow NetworkingBatons to send work to ASIOReactor after detach
-rw-r--r--src/mongo/executor/network_interface_tl.cpp130
-rw-r--r--src/mongo/executor/network_interface_tl.h13
-rw-r--r--src/mongo/transport/baton.h2
-rw-r--r--src/mongo/transport/baton_asio_linux.h5
-rw-r--r--src/mongo/transport/session_asio.h43
5 files changed, 86 insertions, 107 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index a988ef76420..a33fb359835 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -277,7 +277,7 @@ auto NetworkInterfaceTL::CommandState::make(NetworkInterfaceTL* interface,
return std::pair(state, std::move(future));
}
-AsyncDBClient* NetworkInterfaceTL::RequestState::client() noexcept {
+AsyncDBClient* NetworkInterfaceTL::RequestState::getClient(const ConnectionHandle& conn) noexcept {
if (!conn) {
return nullptr;
}
@@ -321,9 +321,7 @@ void NetworkInterfaceTL::CommandStateBase::setTimer() {
}
void NetworkInterfaceTL::RequestState::returnConnection(Status status) noexcept {
- // Settle the connection object on the reactor
invariant(conn);
- invariant(interface()->_reactor->onReactorThread());
auto connToReturn = std::exchange(conn, {});
@@ -369,25 +367,10 @@ void NetworkInterfaceTL::CommandStateBase::tryFinish(Status status) noexcept {
}
void NetworkInterfaceTL::RequestState::cancel() noexcept {
- auto& reactor = interface()->_reactor;
-
- // If we failed, then get the client to finish up.
- // Note: CommandState::returnConnection() and CommandState::cancel() run on the reactor
- // thread only. One goes first and then the other, so there isn't a risk of canceling
- // the next command to run on the connection.
- if (reactor->onReactorThread()) {
- if (auto clientPtr = client()) {
- // If we have a client, cancel it
- clientPtr->cancel(cmdState->baton);
- }
- } else {
- ExecutorFuture<void>(reactor).getAsync([this, anchor = shared_from_this()](Status status) {
- invariant(status.isOK());
- if (auto clientPtr = client()) {
- // If we have a client, cancel it
- clientPtr->cancel(cmdState->baton);
- }
- });
+ auto connToCancel = weakConn.lock();
+ if (auto clientPtr = getClient(connToCancel)) {
+ // If we have a client, cancel it
+ clientPtr->cancel(cmdState->baton);
}
}
@@ -526,7 +509,8 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest(size
return makeReadyFutureWith([this, requestState] {
setTimer();
- return requestState->client()->runCommandRequest(*requestState->request, baton);
+ return RequestState::getClient(requestState->conn)
+ ->runCommandRequest(*requestState->request, baton);
})
.then([this, requestState](RemoteCommandResponse response) {
doMetadataHook(RemoteCommandOnAnyResponse(requestState->host, response));
@@ -748,6 +732,7 @@ void NetworkInterfaceTL::RequestState::send(StatusWith<ConnectionPool::Connectio
request.emplace(remoteCommandRequest);
host = request.get().target;
conn = std::move(swConn.getValue());
+ weakConn = conn;
networkInterfaceHangCommandsAfterAcquireConn.pauseWhileSet();
@@ -781,74 +766,38 @@ void NetworkInterfaceTL::RequestState::resolve(Future<RemoteCommandResponse> fut
return RemoteCommandOnAnyResponse(host, std::move(error), stopwatch.elapsed());
});
- if (baton) {
- // If we have a baton then use it for the promise and then switch to the reactor to return
- // our connection.
- std::move(anyFuture)
- .thenRunOn(baton)
- .onCompletion([ this, anchor = shared_from_this() ](auto swr) noexcept {
- auto response = uassertStatusOK(swr);
- auto status = swr.getValue().status;
- auto commandStatus = getStatusFromCommandResult(response.data);
-
- // Ignore maxTimeMS expiration errors for hedged reads.
- if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) {
- LOGV2_DEBUG(4660700,
- 2,
- "Hedged request returned status",
- "requestId"_attr = request->id,
- "target"_attr = request->target,
- "status"_attr = commandStatus);
- } else {
- if (cmdState->finishLine.arriveStrongly()) {
- if (isHedge) {
- auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx);
- invariant(hm);
- hm->incrementNumAdvantageouslyHedgedOperations();
- }
- fulfilledPromise = true;
- cmdState->fulfillFinalPromise(std::move(response));
- }
- }
+ std::move(anyFuture) //
+ .thenRunOn(makeGuaranteedExecutor(baton, reactor)) // Switch to the baton/reactor.
+ .getAsync([ this, anchor = shared_from_this() ](auto swr) noexcept {
+ auto response = uassertStatusOK(swr);
+ auto status = response.status;
- return status;
- })
- .thenRunOn(reactor)
- .getAsync(
- [this, anchor = shared_from_this()](Status status) { returnConnection(status); });
- } else {
- // If we do not have a baton, then we can fulfill the promise and return our connection in
- // the same callback
- std::move(anyFuture).thenRunOn(reactor).getAsync(
- [ this, anchor = shared_from_this() ](auto swr) noexcept {
- auto response = uassertStatusOK(swr);
- auto status = response.status;
- auto commandStatus = getStatusFromCommandResult(response.data);
- ON_BLOCK_EXIT([&] { returnConnection(status); });
-
- if (!cmdState->finishLine.arriveStrongly()) {
- return;
- }
+ returnConnection(status);
- // Ignore maxTimeMS expiration errors for hedged reads
- if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) {
- LOGV2_DEBUG(4660701,
- 2,
- "Hedged request returned status",
- "requestId"_attr = request->id,
- "target"_attr = request->target,
- "status"_attr = commandStatus);
- } else {
- if (isHedge) {
- auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx);
- invariant(hm);
- hm->incrementNumAdvantageouslyHedgedOperations();
- }
- fulfilledPromise = true;
- cmdState->fulfillFinalPromise(std::move(response));
+ auto commandStatus = getStatusFromCommandResult(response.data);
+
+ if (!cmdState->finishLine.arriveStrongly()) {
+ return;
+ }
+
+ // Ignore maxTimeMS expiration errors for hedged reads
+ if (isHedge && commandStatus == ErrorCodes::MaxTimeMSExpired) {
+ LOGV2_DEBUG(4660701,
+ 2,
+ "Hedged request returned status",
+ "requestId"_attr = request->id,
+ "target"_attr = request->target,
+ "status"_attr = commandStatus);
+ } else {
+ if (isHedge) {
+ auto hm = HedgingMetrics::get(cmdState->interface->_svcCtx);
+ invariant(hm);
+ hm->incrementNumAdvantageouslyHedgedOperations();
}
- });
- }
+ fulfilledPromise = true;
+ cmdState->fulfillFinalPromise(std::move(response));
+ }
+ });
}
NetworkInterfaceTL::ExhaustCommandState::ExhaustCommandState(
@@ -915,8 +864,9 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque
return makeReadyFutureWith(
[this, requestState, clientCallback = std::move(clientCallback)]() mutable {
setTimer();
- return requestState->client()->runExhaustCommandRequest(
- *requestState->request, std::move(clientCallback), baton);
+ return RequestState::getClient(requestState->conn)
+ ->runExhaustCommandRequest(
+ *requestState->request, std::move(clientCallback), baton);
})
.then([this, requestState] { return prevResponse; });
}
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 34bec18a411..d0c43e1277f 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -244,22 +244,20 @@ private:
};
struct RequestState final : public std::enable_shared_from_this<RequestState> {
+ using ConnectionHandle = std::shared_ptr<ConnectionPool::ConnectionHandle::element_type>;
+ using WeakConnectionHandle = std::weak_ptr<ConnectionPool::ConnectionHandle::element_type>;
RequestState(RequestManager* mgr, std::shared_ptr<CommandStateBase> cmdState_, size_t id)
: cmdState{std::move(cmdState_)}, requestManager(mgr), reqId(id) {}
~RequestState();
/**
- * Return the client object bound to the current command or nullptr if there isn't one.
- *
- * This is only useful on the networking thread (i.e. the reactor).
+ * Return the client for a given connection
*/
- AsyncDBClient* client() noexcept;
+ static AsyncDBClient* getClient(const ConnectionHandle& conn) noexcept;
/**
* Cancel the current client operation or do nothing if there is no client.
- *
- * This must be called from the networking thread (i.e. the reactor).
*/
void cancel() noexcept;
@@ -295,7 +293,8 @@ private:
boost::optional<RemoteCommandRequest> request;
HostAndPort host;
- ConnectionPool::ConnectionHandle conn;
+ ConnectionHandle conn;
+ WeakConnectionHandle weakConn;
// Internal id of this request as tracked by the RequestManager.
size_t reqId;
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index a4c4f1bb2b2..b65e8a539ca 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -91,6 +91,8 @@ public:
NetworkingBaton* networking() noexcept final {
return this;
}
+
+ virtual bool canWait() noexcept = 0;
};
} // namespace transport
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 5e8064c2805..1d89194ee98 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -176,6 +176,11 @@ public:
return std::move(pf.future);
}
+ bool canWait() noexcept override {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _opCtx;
+ }
+
bool cancelSession(Session& session) noexcept override {
const auto id = session.id();
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 17e6a7c0301..e4a40043c0d 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -189,11 +189,12 @@ public:
3,
"Cancelling outstanding I/O operations on connection to {remote}",
"remote"_attr = _remote);
- if (baton && baton->networking()) {
- baton->networking()->cancelSession(*this);
- } else {
- getSocket().cancel();
+ if (baton && baton->networking() && baton->networking()->cancelSession(*this)) {
+ // If we have a baton, it was for networking, and it owned our session, then we're done.
+ return;
}
+
+ getSocket().cancel();
}
void setTimeout(boost::optional<Milliseconds> timeout) override {
@@ -424,6 +425,7 @@ private:
template <typename MutableBufferSequence>
Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
+ // TODO SERVER-47229 Guard active ops for cancelation here.
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return opportunisticRead(*_sslSocket, buffers, baton);
@@ -449,6 +451,7 @@ private:
template <typename ConstBufferSequence>
Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) {
+ // TODO SERVER-47229 Guard active ops for cancelation here.
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
@@ -503,9 +506,19 @@ private:
asyncBuffers += size;
}
- if (baton && baton->networking()) {
- return baton->networking()
- ->addSession(*this, NetworkingBaton::Type::In)
+ if (auto networkingBaton = baton ? baton->networking() : nullptr;
+ networkingBaton && networkingBaton->canWait()) {
+ return networkingBaton->addSession(*this, NetworkingBaton::Type::In)
+ .onError([](Status error) {
+ if (ErrorCodes::isCancelationError(error)) {
+ // If the baton has detached, it will cancel its polling. We catch that
+ // error here and return Status::OK so that we invoke
+ // opportunisticRead() again and switch to asio::async_read() below.
+ return Status::OK();
+ }
+
+ return error;
+ })
.then([&stream, asyncBuffers, baton, this] {
return opportunisticRead(stream, asyncBuffers, baton);
});
@@ -591,9 +604,19 @@ private:
return std::move(*more);
}
- if (baton && baton->networking()) {
- return baton->networking()
- ->addSession(*this, NetworkingBaton::Type::Out)
+ if (auto networkingBaton = baton ? baton->networking() : nullptr;
+ networkingBaton && networkingBaton->canWait()) {
+ return networkingBaton->addSession(*this, NetworkingBaton::Type::Out)
+ .onError([](Status error) {
+ if (ErrorCodes::isCancelationError(error)) {
+ // If the baton has detached, it will cancel its polling. We catch that
+ // error here and return Status::OK so that we invoke
+ // opportunisticWrite() again and switch to asio::async_write() below.
+ return Status::OK();
+ }
+
+ return error;
+ })
.then([&stream, asyncBuffers, baton, this] {
return opportunisticWrite(stream, asyncBuffers, baton);
});