diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-03-20 13:01:55 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-01 19:04:36 +0000 |
commit | 40a3ba610db44735061b13f0a1703de1bc0a6f37 (patch) | |
tree | 7d5bca0c193fb96c8159b052419d45856abcaee3 /src/mongo/transport | |
parent | 2a89894a26de04beadd39b5f0e0ab98b98d1d62a (diff) | |
download | mongo-40a3ba610db44735061b13f0a1703de1bc0a6f37.tar.gz |
SERVER-46821 Allow NetworkingBatons to send work to ASIOReactor after detach
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/baton.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/baton_asio_linux.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 43 |
3 files changed, 40 insertions, 10 deletions
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index a4c4f1bb2b2..b65e8a539ca 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -91,6 +91,8 @@ public: NetworkingBaton* networking() noexcept final { return this; } + + virtual bool canWait() noexcept = 0; }; } // namespace transport diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 5e8064c2805..1d89194ee98 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -176,6 +176,11 @@ public: return std::move(pf.future); } + bool canWait() noexcept override { + stdx::lock_guard<Latch> lk(_mutex); + return _opCtx; + } + bool cancelSession(Session& session) noexcept override { const auto id = session.id(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 17e6a7c0301..e4a40043c0d 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -189,11 +189,12 @@ public: 3, "Cancelling outstanding I/O operations on connection to {remote}", "remote"_attr = _remote); - if (baton && baton->networking()) { - baton->networking()->cancelSession(*this); - } else { - getSocket().cancel(); + if (baton && baton->networking() && baton->networking()->cancelSession(*this)) { + // If we have a baton, it was for networking, and it owned our session, then we're done. + return; } + + getSocket().cancel(); } void setTimeout(boost::optional<Milliseconds> timeout) override { @@ -424,6 +425,7 @@ private: template <typename MutableBufferSequence> Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) { + // TODO SERVER-47229 Guard active ops for cancelation here. #ifdef MONGO_CONFIG_SSL if (_sslSocket) { return opportunisticRead(*_sslSocket, buffers, baton); @@ -449,6 +451,7 @@ private: template <typename ConstBufferSequence> Future<void> write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr) { + // TODO SERVER-47229 Guard active ops for cancelation here. #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { @@ -503,9 +506,19 @@ private: asyncBuffers += size; } - if (baton && baton->networking()) { - return baton->networking() - ->addSession(*this, NetworkingBaton::Type::In) + if (auto networkingBaton = baton ? baton->networking() : nullptr; + networkingBaton && networkingBaton->canWait()) { + return networkingBaton->addSession(*this, NetworkingBaton::Type::In) + .onError([](Status error) { + if (ErrorCodes::isCancelationError(error)) { + // If the baton has detached, it will cancel its polling. We catch that + // error here and return Status::OK so that we invoke + // opportunisticRead() again and switch to asio::async_read() below. + return Status::OK(); + } + + return error; + }) .then([&stream, asyncBuffers, baton, this] { return opportunisticRead(stream, asyncBuffers, baton); }); @@ -591,9 +604,19 @@ private: return std::move(*more); } - if (baton && baton->networking()) { - return baton->networking() - ->addSession(*this, NetworkingBaton::Type::Out) + if (auto networkingBaton = baton ? baton->networking() : nullptr; + networkingBaton && networkingBaton->canWait()) { + return networkingBaton->addSession(*this, NetworkingBaton::Type::Out) + .onError([](Status error) { + if (ErrorCodes::isCancelationError(error)) { + // If the baton has detached, it will cancel its polling. We catch that + // error here and return Status::OK so that we invoke + // opportunisticWrite() again and switch to asio::async_write() below. + return Status::OK(); + } + + return error; + }) .then([&stream, asyncBuffers, baton, this] { return opportunisticWrite(stream, asyncBuffers, baton); }); |