summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-12-15 22:36:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-05 06:30:02 +0000
commitd1be5c5e96b7f9e00560340445d819522603d0b4 (patch)
treec39a856e305a35508ae986324d6944a195a6e888 /src
parent7378eaed9f0bd1d63f5e1339965e76fc1e2b4e6a (diff)
downloadmongo-d1be5c5e96b7f9e00560340445d819522603d0b4.tar.gz
SERVER-53420 Make transport Session source/sink/poll functions noexcept
Diffstat (limited to 'src')
-rw-r--r--src/mongo/transport/mock_session.h126
-rw-r--r--src/mongo/transport/session.h13
-rw-r--r--src/mongo/transport/session_asio.h33
3 files changed, 102 insertions, 70 deletions
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index b1ed38a9081..2b5eeefb35d 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -41,13 +41,67 @@
namespace mongo {
namespace transport {
-class MockSession : public Session {
+class MockSessionBase : public Session {
+public:
+ MockSessionBase() = default;
+
+ explicit MockSessionBase(HostAndPort remote,
+ HostAndPort local,
+ SockAddr remoteAddr,
+ SockAddr localAddr)
+ : _remote(std::move(remote)),
+ _local(std::move(local)),
+ _remoteAddr(std::move(remoteAddr)),
+ _localAddr(std::move(localAddr)) {}
+
+ const HostAndPort& remote() const override {
+ return _remote;
+ }
+
+ const HostAndPort& local() const override {
+ return _local;
+ }
+
+ const SockAddr& remoteAddr() const override {
+ return _remoteAddr;
+ }
+
+ const SockAddr& localAddr() const override {
+ return _localAddr;
+ }
+
+ void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {}
+
+ void setTimeout(boost::optional<Milliseconds>) override {}
+
+ bool isConnected() override {
+ return true;
+ }
+
+#ifdef MONGO_CONFIG_SSL
+ const SSLConfiguration* getSSLConfiguration() const override {
+ return nullptr;
+ }
+
+ const std::shared_ptr<SSLManagerInterface> getSSLManager() const override {
+ return nullptr;
+ }
+#endif
+
+private:
+ const HostAndPort _remote;
+ const HostAndPort _local;
+ const SockAddr _remoteAddr;
+ const SockAddr _localAddr;
+};
+
+class MockSession : public MockSessionBase {
MockSession(const MockSession&) = delete;
MockSession& operator=(const MockSession&) = delete;
public:
static std::shared_ptr<MockSession> create(TransportLayer* tl) {
- std::shared_ptr<MockSession> handle(new MockSession(tl));
+ auto handle = std::make_shared<MockSession>(tl);
return handle;
}
@@ -56,8 +110,8 @@ public:
SockAddr remoteAddr,
SockAddr localAddr,
TransportLayer* tl) {
- std::shared_ptr<MockSession> handle(new MockSession(
- std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr), tl));
+ auto handle = std::make_shared<MockSession>(
+ std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr), tl);
return handle;
}
@@ -65,29 +119,13 @@ public:
return _tl;
}
- const HostAndPort& remote() const override {
- return _remote;
- }
-
- const HostAndPort& local() const override {
- return _local;
- }
-
- const SockAddr& remoteAddr() const override {
- return _remoteAddr;
- }
-
- const SockAddr& localAddr() const override {
- return _localAddr;
- }
-
void end() override {
if (!_tl || !_tl->owns(id()))
return;
_tl->_sessions[id()].ended = true;
}
- StatusWith<Message> sourceMessage() override {
+ StatusWith<Message> sourceMessage() noexcept override {
if (!_tl || _tl->inShutdown()) {
return TransportLayer::ShutdownStatus;
} else if (!_tl->owns(id())) {
@@ -99,15 +137,15 @@ public:
return Message(); // Subclasses can do something different.
}
- Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) noexcept override {
return Future<Message>::makeReady(sourceMessage());
}
- Status waitForData() override {
+ Status waitForData() noexcept override {
return asyncWaitForData().getNoThrow();
}
- Future<void> asyncWaitForData() override {
+ Future<void> asyncWaitForData() noexcept override {
auto fp = makePromiseFuture<void>();
stdx::lock_guard<Latch> lk(_waitForDataMutex);
_waitForDataQueue.emplace_back(std::move(fp.promise));
@@ -123,7 +161,7 @@ public:
promise.emplaceValue();
}
- Status sinkMessage(Message message) override {
+ Status sinkMessage(Message message) noexcept override {
if (!_tl || _tl->inShutdown()) {
return TransportLayer::ShutdownStatus;
} else if (!_tl->owns(id())) {
@@ -135,48 +173,24 @@ public:
return Status::OK();
}
- Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) override {
+ Future<void> asyncSinkMessage(Message message,
+ const BatonHandle& handle = nullptr) noexcept override {
return Future<void>::makeReady(sinkMessage(message));
}
- void cancelAsyncOperations(const BatonHandle& handle = nullptr) override {}
-
- void setTimeout(boost::optional<Milliseconds>) override {}
-
- bool isConnected() override {
- return true;
- }
-
-#ifdef MONGO_CONFIG_SSL
- const SSLConfiguration* getSSLConfiguration() const override {
- return nullptr;
- }
-
- const std::shared_ptr<SSLManagerInterface> getSSLManager() const override {
- return nullptr;
- }
-#endif
-
explicit MockSession(TransportLayer* tl)
- : _tl(checked_cast<TransportLayerMock*>(tl)), _remote(), _local() {}
+ : MockSessionBase(), _tl(checked_cast<TransportLayerMock*>(tl)) {}
explicit MockSession(HostAndPort remote,
HostAndPort local,
SockAddr remoteAddr,
SockAddr localAddr,
TransportLayer* tl)
- : _tl(checked_cast<TransportLayerMock*>(tl)),
- _remote(std::move(remote)),
- _local(std::move(local)),
- _remoteAddr(std::move(remoteAddr)),
- _localAddr(std::move(localAddr)) {}
+ : MockSessionBase(
+ std::move(remote), std::move(local), std::move(remoteAddr), std::move(localAddr)),
+ _tl(checked_cast<TransportLayerMock*>(tl)) {}
protected:
- TransportLayerMock* _tl;
-
- HostAndPort _remote;
- HostAndPort _local;
- SockAddr _remoteAddr;
- SockAddr _localAddr;
+ TransportLayerMock* const _tl;
mutable Mutex _waitForDataMutex = MONGO_MAKE_LATCH("MockSession::_waitForDataMutex");
std::list<Promise<void>> _waitForDataQueue;
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index 0ea3ff0db79..445c0fef78b 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -112,22 +112,23 @@ public:
/**
* Source (receive) a new Message from the remote host for this Session.
*/
- virtual StatusWith<Message> sourceMessage() = 0;
- virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) = 0;
+ virtual StatusWith<Message> sourceMessage() noexcept = 0;
+ virtual Future<Message> asyncSourceMessage(const BatonHandle& handle = nullptr) noexcept = 0;
/**
* Waits for the availability of incoming data.
*/
- virtual Status waitForData() = 0;
- virtual Future<void> asyncWaitForData() = 0;
+ virtual Status waitForData() noexcept = 0;
+ virtual Future<void> asyncWaitForData() noexcept = 0;
/**
* Sink (send) a Message to the remote host for this Session.
*
* Async version will keep the buffer alive until the operation completes.
*/
- virtual Status sinkMessage(Message message) = 0;
- virtual Future<void> asyncSinkMessage(Message message, const BatonHandle& handle = nullptr) = 0;
+ virtual Status sinkMessage(Message message) noexcept = 0;
+ virtual Future<void> asyncSinkMessage(Message message,
+ const BatonHandle& handle = nullptr) noexcept = 0;
/**
* Cancel any outstanding async operations. There is no way to cancel synchronous calls.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 64516a97ac5..ef8ce3a6f28 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -170,29 +170,37 @@ public:
}
}
- StatusWith<Message> sourceMessage() override {
+ StatusWith<Message> sourceMessage() noexcept override try {
ensureSync();
return sourceMessageImpl().getNoThrow();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override {
+ Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) noexcept override try {
ensureAsync();
return sourceMessageImpl(baton);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Status waitForData() override {
+ Status waitForData() noexcept override try {
ensureSync();
asio::error_code ec;
getSocket().wait(asio::ip::tcp::socket::wait_read, ec);
return errorCodeToStatus(ec);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Future<void> asyncWaitForData() override {
+ Future<void> asyncWaitForData() noexcept override try {
ensureAsync();
return getSocket().async_wait(asio::ip::tcp::socket::wait_read, UseFuture{});
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Status sinkMessage(Message message) override {
+ Status sinkMessage(Message message) noexcept override try {
ensureSync();
return write(asio::buffer(message.buf(), message.size()))
@@ -202,9 +210,12 @@ public:
}
})
.getNoThrow();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- Future<void> asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) override {
+ Future<void> asyncSinkMessage(Message message,
+ const BatonHandle& baton = nullptr) noexcept override try {
ensureAsync();
return write(asio::buffer(message.buf(), message.size()), baton)
.then([this, message /*keep the buffer alive*/]() {
@@ -212,6 +223,8 @@ public:
networkCounter.hitPhysicalOut(message.size());
}
});
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
void cancelAsyncOperations(const BatonHandle& baton = nullptr) override {
@@ -345,10 +358,14 @@ protected:
// which also means no timeout.
auto timeout = _configuredTimeout.value_or(Milliseconds{0});
getSocket().set_option(ASIOSocketTimeoutOption<SO_SNDTIMEO>(timeout), ec);
- uassertStatusOK(errorCodeToStatus(ec));
+ if (auto status = errorCodeToStatus(ec); !status.isOK()) {
+ tasserted(5342000, status.reason());
+ }
getSocket().set_option(ASIOSocketTimeoutOption<SO_RCVTIMEO>(timeout), ec);
- uassertStatusOK(errorCodeToStatus(ec));
+ if (auto status = errorCodeToStatus(ec); !status.isOK()) {
+ tasserted(5342001, status.reason());
+ }
_socketTimeout = _configuredTimeout;
}