summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-03-20 13:01:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-01 19:04:36 +0000
commit40a3ba610db44735061b13f0a1703de1bc0a6f37 (patch)
tree7d5bca0c193fb96c8159b052419d45856abcaee3 /src/mongo/transport
parent2a89894a26de04beadd39b5f0e0ab98b98d1d62a (diff)
downloadmongo-40a3ba610db44735061b13f0a1703de1bc0a6f37.tar.gz
SERVER-46821 Allow NetworkingBatons to send work to ASIOReactor after detach
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/baton.h2
-rw-r--r--src/mongo/transport/baton_asio_linux.h5
-rw-r--r--src/mongo/transport/session_asio.h43
3 files changed, 40 insertions, 10 deletions
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index a4c4f1bb2b2..b65e8a539ca 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -91,6 +91,8 @@ public:
NetworkingBaton* networking() noexcept final {
return this;
}
+
+ virtual bool canWait() noexcept = 0;
};
} // namespace transport
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 5e8064c2805..1d89194ee98 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -176,6 +176,11 @@ public:
return std::move(pf.future);
}
+ bool canWait() noexcept override {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _opCtx;
+ }
+
bool cancelSession(Session& session) noexcept override {
const auto id = session.id();
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 17e6a7c0301..e4a40043c0d 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -189,11 +189,12 @@ public:
3,
"Cancelling outstanding I/O operations on connection to {remote}",
"remote"_attr = _remote);
- if (baton && baton->networking()) {
- baton->networking()->cancelSession(*this);
- } else {
- getSocket().cancel();
+ if (baton && baton->networking() && baton->networking()->cancelSession(*this)) {
+ // If we have a baton, it was for networking, and it owned our session, then we're done.
+ return;
}
+
+ getSocket().cancel();
}
void setTimeout(boost::optional<Milliseconds> timeout) override {
@@ -424,6 +425,7 @@ private:
template <typename MutableBufferSequence>
Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
+ // TODO SERVER-47229 Guard active ops for cancelation here.
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return opportunisticRead(*_sslSocket, buffers, baton);
@@ -449,6 +451,7 @@ private:
template <typename ConstBufferSequence>
Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) {
+ // TODO SERVER-47229 Guard active ops for cancelation here.
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
@@ -503,9 +506,19 @@ private:
asyncBuffers += size;
}
- if (baton && baton->networking()) {
- return baton->networking()
- ->addSession(*this, NetworkingBaton::Type::In)
+ if (auto networkingBaton = baton ? baton->networking() : nullptr;
+ networkingBaton && networkingBaton->canWait()) {
+ return networkingBaton->addSession(*this, NetworkingBaton::Type::In)
+ .onError([](Status error) {
+ if (ErrorCodes::isCancelationError(error)) {
+ // If the baton has detached, it will cancel its polling. We catch that
+ // error here and return Status::OK so that we invoke
+ // opportunisticRead() again and switch to asio::async_read() below.
+ return Status::OK();
+ }
+
+ return error;
+ })
.then([&stream, asyncBuffers, baton, this] {
return opportunisticRead(stream, asyncBuffers, baton);
});
@@ -591,9 +604,19 @@ private:
return std::move(*more);
}
- if (baton && baton->networking()) {
- return baton->networking()
- ->addSession(*this, NetworkingBaton::Type::Out)
+ if (auto networkingBaton = baton ? baton->networking() : nullptr;
+ networkingBaton && networkingBaton->canWait()) {
+ return networkingBaton->addSession(*this, NetworkingBaton::Type::Out)
+ .onError([](Status error) {
+ if (ErrorCodes::isCancelationError(error)) {
+ // If the baton has detached, it will cancel its polling. We catch that
+ // error here and return Status::OK so that we invoke
+ // opportunisticWrite() again and switch to asio::async_write() below.
+ return Status::OK();
+ }
+
+ return error;
+ })
.then([&stream, asyncBuffers, baton, this] {
return opportunisticWrite(stream, asyncBuffers, baton);
});