diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-12-15 22:36:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-05 06:30:02 +0000 |
commit | d1be5c5e96b7f9e00560340445d819522603d0b4 (patch) | |
tree | c39a856e305a35508ae986324d6944a195a6e888 /src | |
parent | 7378eaed9f0bd1d63f5e1339965e76fc1e2b4e6a (diff) | |
download | mongo-d1be5c5e96b7f9e00560340445d819522603d0b4.tar.gz |
SERVER-53420 Make transport Session source/sink/poll functions noexcept
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/transport/mock_session.h | 126 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 13 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 33 |
3 files changed, 102 insertions, 70 deletions
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index b1ed38a9081..2b5eeefb35d 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -41,13 +41,67 @@ namespace mongo { namespace transport { -class MockSession : public Session { +class MockSessionBase : public Session { +public: + MockSessionBase() = default; + + explicit MockSessionBase(HostAndPort remote, + HostAndPort local, + SockAddr remoteAddr, + SockAddr localAddr) + : _remote(std::move(remote)), + _local(std::move(local)), + _remoteAddr(std::move(remoteAddr)), + _localAddr(std::move(localAddr)) {} + + const HostAndPort& remote() const override { + return _remote; + } + + const HostAndPort& local() const override { + return _local; + } + + const SockAddr& remoteAddr() const override { + return _remoteAddr; + } + + const SockAddr& localAddr() const override { + return _localAddr; + } + + void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {} + + void setTimeout(boost::optional<Milliseconds>) override {} + + bool isConnected() override { + return true; + } + +#ifdef MONGO_CONFIG_SSL + const SSLConfiguration* getSSLConfiguration() const override { + return nullptr; + } + + const std::shared_ptr<SSLManagerInterface> getSSLManager() const override { + return nullptr; + } +#endif + +private: + const HostAndPort _remote; + const HostAndPort _local; + const SockAddr _remoteAddr; + const SockAddr _localAddr; +}; + +class MockSession : public MockSessionBase { MockSession(const MockSession&) = delete; MockSession& operator=(const MockSession&) = delete; public: static std::shared_ptr<MockSession> create(TransportLayer* tl) { - std::shared_ptr<MockSession> handle(new MockSession(tl)); + auto handle = std::make_shared<MockSession>(tl); return handle; } @@ -56,8 +110,8 @@ public: SockAddr remoteAddr, SockAddr localAddr, TransportLayer* tl) { - std::shared_ptr<MockSession> handle(new MockSession( - std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr), tl)); + auto handle = std::make_shared<MockSession>( + std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr), tl); return handle; } @@ -65,29 +119,13 @@ public: return _tl; } - const HostAndPort& remote() const override { - return _remote; - } - - const HostAndPort& local() const override { - return _local; - } - - const SockAddr& remoteAddr() const override { - return _remoteAddr; - } - - const SockAddr& localAddr() const override { - return _localAddr; - } - void end() override { if (!_tl || !_tl->owns(id())) return; _tl->_sessions[id()].ended = true; } - StatusWith<Message> sourceMessage() override { + StatusWith<Message> sourceMessage() noexcept override { if (!_tl || _tl->inShutdown()) { return TransportLayer::ShutdownStatus; } else if (!_tl->owns(id())) { @@ -99,15 +137,15 @@ public: return Message(); // Subclasses can do something different. } - Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) noexcept override { return Future<Message>::makeReady(sourceMessage()); } - Status waitForData() override { + Status waitForData() noexcept override { return asyncWaitForData().getNoThrow(); } - Future<void> asyncWaitForData() override { + Future<void> asyncWaitForData() noexcept override { auto fp = makePromiseFuture<void>(); stdx::lock_guard<Latch> lk(_waitForDataMutex); _waitForDataQueue.emplace_back(std::move(fp.promise)); @@ -123,7 +161,7 @@ public: promise.emplaceValue(); } - Status sinkMessage(Message message) override { + Status sinkMessage(Message message) noexcept override { if (!_tl || _tl->inShutdown()) { return TransportLayer::ShutdownStatus; } else if (!_tl->owns(id())) { @@ -135,48 +173,24 @@ public: return Status::OK(); } - Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override { + Future<void> asyncSinkMessage(Message message, + const BatonHandle& handle = nullptr) noexcept override { return Future<void>::makeReady(sinkMessage(message)); } - void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {} - - void setTimeout(boost::optional<Milliseconds>) override {} - - bool isConnected() override { - return true; - } - -#ifdef MONGO_CONFIG_SSL - const SSLConfiguration* getSSLConfiguration() const override { - return nullptr; - } - - const std::shared_ptr<SSLManagerInterface> getSSLManager() const override { - return nullptr; - } -#endif - explicit MockSession(TransportLayer* tl) - : _tl(checked_cast<TransportLayerMock*>(tl)), _remote(), _local() {} + : MockSessionBase(), _tl(checked_cast<TransportLayerMock*>(tl)) {} explicit MockSession(HostAndPort remote, HostAndPort local, SockAddr remoteAddr, SockAddr localAddr, TransportLayer* tl) - : _tl(checked_cast<TransportLayerMock*>(tl)), - _remote(std::move(remote)), - _local(std::move(local)), - _remoteAddr(std::move(remoteAddr)), - _localAddr(std::move(localAddr)) {} + : MockSessionBase( + std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr)), + _tl(checked_cast<TransportLayerMock*>(tl)) {} protected: - TransportLayerMock* _tl; - - HostAndPort _remote; - HostAndPort _local; - SockAddr _remoteAddr; - SockAddr _localAddr; + TransportLayerMock* const _tl; mutable Mutex _waitForDataMutex = MONGO_MAKE_LATCH("MockSession::_waitForDataMutex"); std::list<Promise<void>> _waitForDataQueue; diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index 0ea3ff0db79..445c0fef78b 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -112,22 +112,23 @@ public: /** * Source (receive) a new Message from the remote host for this Session. */ - virtual StatusWith<Message> sourceMessage() = 0; - virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0; + virtual StatusWith<Message> sourceMessage() noexcept = 0; + virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) noexcept = 0; /** * Waits for the availability of incoming data. */ - virtual Status waitForData() = 0; - virtual Future<void> asyncWaitForData() = 0; + virtual Status waitForData() noexcept = 0; + virtual Future<void> asyncWaitForData() noexcept = 0; /** * Sink (send) a Message to the remote host for this Session. * * Async version will keep the buffer alive until the operation completes. */ - virtual Status sinkMessage(Message message) = 0; - virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0; + virtual Status sinkMessage(Message message) noexcept = 0; + virtual Future<void> asyncSinkMessage(Message message, + const BatonHandle& handle = nullptr) noexcept = 0; /** * Cancel any outstanding async operations. There is no way to cancel synchronous calls. diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 64516a97ac5..ef8ce3a6f28 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -170,29 +170,37 @@ public: } } - StatusWith<Message> sourceMessage() override { + StatusWith<Message> sourceMessage() noexcept override try { ensureSync(); return sourceMessageImpl().getNoThrow(); + } catch (const DBException& ex) { + return ex.toStatus(); } - Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override { + Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) noexcept override try { ensureAsync(); return sourceMessageImpl(baton); + } catch (const DBException& ex) { + return ex.toStatus(); } - Status waitForData() override { + Status waitForData() noexcept override try { ensureSync(); asio::error_code ec; getSocket().wait(asio::ip::tcp::socket::wait_read, ec); return errorCodeToStatus(ec); + } catch (const DBException& ex) { + return ex.toStatus(); } - Future<void> asyncWaitForData() override { + Future<void> asyncWaitForData() noexcept override try { ensureAsync(); return getSocket().async_wait(asio::ip::tcp::socket::wait_read, UseFuture{}); + } catch (const DBException& ex) { + return ex.toStatus(); } - Status sinkMessage(Message message) override { + Status sinkMessage(Message message) noexcept override try { ensureSync(); return write(asio::buffer(message.buf(), message.size())) @@ -202,9 +210,12 @@ public: } }) .getNoThrow(); + } catch (const DBException& ex) { + return ex.toStatus(); } - Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override { + Future<void> asyncSinkMessage(Message message, + const BatonHandle& baton = nullptr) noexcept override try { ensureAsync(); return write(asio::buffer(message.buf(), message.size()), baton) .then([this, message /*keep the buffer alive*/]() { @@ -212,6 +223,8 @@ public: networkCounter.hitPhysicalOut(message.size()); } }); + } catch (const DBException& ex) { + return ex.toStatus(); } void cancelAsyncOperations(const BatonHandle& baton = nullptr) override { @@ -345,10 +358,14 @@ protected: // which also means no timeout. auto timeout = _configuredTimeout.value_or(Milliseconds{0}); getSocket().set_option(ASIOSocketTimeoutOption<SO_SNDTIMEO>(timeout), ec); - uassertStatusOK(errorCodeToStatus(ec)); + if (auto status = errorCodeToStatus(ec); !status.isOK()) { + tasserted(5342000, status.reason()); + } getSocket().set_option(ASIOSocketTimeoutOption<SO_RCVTIMEO>(timeout), ec); - uassertStatusOK(errorCodeToStatus(ec)); + if (auto status = errorCodeToStatus(ec); !status.isOK()) { + tasserted(5342001, status.reason()); + } _socketTimeout = _configuredTimeout; } |