diff options
author | Henrik Edin <henrik.edin@mongodb.com> | 2017-07-27 12:58:10 -0400 |
---|---|---|
committer | Henrik Edin <henrik.edin@mongodb.com> | 2017-09-15 13:18:36 -0400 |
commit | 699f4020febee8b8bf0e3cbc056c01776b52a238 (patch) | |
tree | a720205a2b04abb081292564943c6e0d4f14a484 /src/mongo/transport | |
parent | f1bf0b33b4f1ce7bb50f208ef5e2d736ef5eba68 (diff) | |
download | mongo-699f4020febee8b8bf0e3cbc056c01776b52a238.tar.gz |
SERVER-30136 Move session stats tracking to the service entry point. At the same time, unify logging of connection accepted/refused/closed in this class too to make the transport layers cleaner.
Diffstat (limited to 'src/mongo/transport')
20 files changed, 150 insertions, 142 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index 834e76ab16b..986110eaacd 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -45,6 +45,29 @@ class ServiceEntryPoint { MONGO_DISALLOW_COPYING(ServiceEntryPoint); public: + /** + * Stats for sessions open. + */ + struct Stats { + /** + * Returns the number of sessions currently open. + */ + size_t numOpenSessions = 0; + + /** + * Returns the total number of sessions that have ever been created. + */ + size_t numCreatedSessions = 0; + + /** + * Returns the number of available sessions we could still open. Only relevant + * when we are operating under a transport::Session limit (for example, in the + * legacy implementation, we respect a maximum number of connections). If there + * is no session limit, returns std::numeric_limits<int>::max(). + */ + size_t numAvailableSessions = 0; + }; + virtual ~ServiceEntryPoint() = default; /** @@ -58,6 +81,16 @@ public: virtual void endAllSessions(transport::Session::TagMask tags) = 0; /** + * Returns high-level stats about current sessions. + */ + virtual Stats sessionStats() const = 0; + + /** + * Returns the number of sessions currently open. + */ + virtual size_t numOpenSessions() const = 0; + + /** * Processes a request and fills out a DbResponse. */ virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index f4ca1bd4c60..62f148f9ec1 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -42,7 +42,37 @@ #include "mongo/util/processinfo.h" #include "mongo/util/scopeguard.h" +#if !defined(_WIN32) +#include <sys/resource.h> +#endif + namespace mongo { +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { + + const auto supportedMax = [] { +#ifdef _WIN32 + return serverGlobalParams.maxConns; +#else + struct rlimit limit; + verify(getrlimit(RLIMIT_NOFILE, &limit) == 0); + + size_t max = (size_t)(limit.rlim_cur * .8); + + LOG(1) << "fd limit" + << " hard:" << limit.rlim_max << " soft:" << limit.rlim_cur << " max conn: " << max; + + return std::min(max, serverGlobalParams.maxConns); +#endif + }(); + + // If we asked for more connections than supported, inform the user. + if (supportedMax < serverGlobalParams.maxConns && + serverGlobalParams.maxConns != DEFAULT_MAX_CONN) { + log() << " --maxConns too high, can only handle " << supportedMax; + } + + _maxNumConnections = supportedMax; +} void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs @@ -56,15 +86,47 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { SSMListIterator ssmIt; const auto sync = (_svcCtx->getServiceExecutor() == nullptr); - auto ssm = ServiceStateMachine::create(_svcCtx, std::move(session), sync); + const bool quiet = serverGlobalParams.quiet.load(); + size_t connectionCount; + + auto ssm = ServiceStateMachine::create(_svcCtx, session, sync); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - ssmIt = _sessions.emplace(_sessions.begin(), ssm); + connectionCount = _sessions.size() + 1; + if (connectionCount <= _maxNumConnections) { + ssmIt = _sessions.emplace(_sessions.begin(), ssm); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + } } - ssm->setCleanupHook([this, ssmIt] { - stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(ssmIt); + // Checking if we successfully added a connection above. Separated from the lock so we don't log + // while holding it. + if (connectionCount > _maxNumConnections) { + if (!quiet) { + log() << "connection refused because too many open connections: " << connectionCount; + } + return; + } + + if (!quiet) { + const auto word = (connectionCount == 1 ? " connection"_sd : " connections"_sd); + log() << "connection accepted from " << session->remote() << " #" << session->id() << " (" + << connectionCount << word << " now open)"; + } + + ssm->setCleanupHook([ this, ssmIt, session = std::move(session) ] { + size_t connectionCount; + auto remote = session->remote(); + { + stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); + _sessions.erase(ssmIt); + connectionCount = _sessions.size(); + _currentConnections.store(connectionCount); + } + const auto word = (connectionCount == 1 ? " connection"_sd : " connections"_sd); + log() << "end connection " << remote << " (" << connectionCount << word << " now open)"; + }); if (!sync) { @@ -122,9 +184,15 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } } -std::size_t ServiceEntryPointImpl::getNumberOfConnections() const { - stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); - return _sessions.size(); +ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const { + + size_t sessionCount = _currentConnections.load(); + + ServiceEntryPoint::Stats ret; + ret.numOpenSessions = sessionCount; + ret.numCreatedSessions = _createdConnections.load(); + ret.numAvailableSessions = _maxNumConnections - sessionCount; + return ret; } } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index fa1f92eb545..324846a1e94 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -54,13 +54,17 @@ class ServiceEntryPointImpl : public ServiceEntryPoint { MONGO_DISALLOW_COPYING(ServiceEntryPointImpl); public: - explicit ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {} + explicit ServiceEntryPointImpl(ServiceContext* svcCtx); void startSession(transport::SessionHandle session) final; void endAllSessions(transport::Session::TagMask tags) final; - std::size_t getNumberOfConnections() const; + Stats sessionStats() const final; + + size_t numOpenSessions() const final { + return _currentConnections.load(); + } private: using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>; @@ -71,6 +75,10 @@ private: mutable stdx::mutex _sessionsMutex; SSMList _sessions; + + size_t _maxNumConnections{DEFAULT_MAX_CONN}; + AtomicWord<size_t> _currentConnections{0}; + AtomicWord<size_t> _createdConnections{0}; }; } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 0f1fd2f07bd..eb70533ce27 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -110,4 +110,12 @@ void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) { } } +ServiceEntryPoint::Stats ServiceEntryPointMock::sessionStats() const { + return {}; +} + +size_t ServiceEntryPointMock::numOpenSessions() const { + return 0ULL; +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index 719454f16dd..769245f46ce 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -67,6 +67,10 @@ public: void endAllSessions(transport::Session::TagMask tags) override; + Stats sessionStats() const override; + + size_t numOpenSessions() const override; + DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; private: diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index f1efcbf8580..d675cdf7bff 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -131,10 +131,6 @@ void ServiceEntryPointTestSuite::MockTLHarness::asyncWait(Ticket&& ticket, return _asyncWait(std::move(ticket), std::move(callback)); } -TransportLayer::Stats ServiceEntryPointTestSuite::MockTLHarness::sessionStats() { - return Stats(); -} - void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session) { return _end(session); } diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index 7c31e38f5b0..96461584b83 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -110,7 +110,6 @@ public: Status wait(transport::Ticket&& ticket) override; void asyncWait(transport::Ticket&& ticket, TicketCallback callback) override; - Stats sessionStats() override; void end(const transport::SessionHandle& session) override; Status setup() override; Status start() override; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 28eda505906..4146aef841a 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -463,24 +463,11 @@ ServiceStateMachine::State ServiceStateMachine::state() { void ServiceStateMachine::_cleanupSession(ThreadGuard& guard) { _state.store(State::Ended); - auto tl = _session()->getTransportLayer(); - auto remote = _session()->remote(); - _inMessage.reset(); // By ignoring the return value of Client::releaseCurrent() we destroy the session. // _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed. Client::releaseCurrent(); - - if (!serverGlobalParams.quiet.load()) { - // Get the number of open sessions minus 1 (this one will get cleaned up when - // this SSM gets destroyed) - // TODO Swich to using ServiceEntryPointImpl::getNumberOfConnections(), or move this - // into the ServiceEntryPoint - auto conns = tl->sessionStats().numOpenSessions - 1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << remote << " (" << conns << word << " now open)"; - } } } // namespace mongo diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index ea02cd98163..38707f53966 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -76,6 +76,14 @@ public: void endAllSessions(transport::Session::TagMask tags) override {} + Stats sessionStats() const override { + return {}; + } + + size_t numOpenSessions() const override { + return 0ULL; + } + void setUassertInHandler() { _uassertInHandler = true; } diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 41b9da16885..2a0a7427eab 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -75,10 +75,6 @@ public: } } - virtual ~ASIOSession() { - _tl->_currentConnections.subtractAndFetch(1); - } - TransportLayer* getTransportLayer() const override { return _tl; } diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 47b0e379992..383e3bd0cd5 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -64,29 +64,6 @@ public: friend class Session; - /** - * Stats for sessions open in the Transport Layer. - */ - struct Stats { - /** - * Returns the number of sessions currently open in the transport layer. - */ - size_t numOpenSessions = 0; - - /** - * Returns the total number of sessions that have ever been created by this TransportLayer. - */ - size_t numCreatedSessions = 0; - - /** - * Returns the number of available sessions we could still open. Only relevant - * when we are operating under a transport::Session limit (for example, in the - * legacy implementation, we respect a maximum number of connections). If there - * is no session limit, returns std::numeric_limits<int>::max(). - */ - size_t numAvailableSessions = 0; - }; - virtual ~TransportLayer() = default; /** @@ -148,11 +125,6 @@ public: virtual void asyncWait(Ticket&& ticket, TicketCallback callback) = 0; /** - * Returns the number of sessions currently open in the transport layer. - */ - virtual Stats sessionStats() = 0; - - /** * End the given Session. Tickets for this Session that have already been * started via wait() or asyncWait() will complete, but may return a failed Status. * Future calls to wait() or asyncWait() for this Session will fail. If this diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 5fbba53dcbd..b1dd15f57d7 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -122,15 +122,6 @@ void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) { ownedASIOTicket = std::move(ownedASIOTicket) ](Status status) { callback(status); }); } -TransportLayer::Stats TransportLayerASIO::sessionStats() { - TransportLayer::Stats ret; - auto sessionCount = _currentConnections.load(); - ret.numOpenSessions = sessionCount; - ret.numCreatedSessions = _createdConnections.load(); - ret.numAvailableSessions = static_cast<size_t>(_listenerOptions.maxConns) - sessionCount; - return ret; -} - // Must not be called while holding the TransportLayerASIO mutex. void TransportLayerASIO::end(const SessionHandle& session) { auto asioSession = checked_pointer_cast<ASIOSession>(session); @@ -306,23 +297,8 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { return; } - size_t connCount = _currentConnections.addAndFetch(1); - if (connCount > _listenerOptions.maxConns) { - log() << "connection refused because too many open connections: " << connCount; - _currentConnections.subtractAndFetch(1); - _acceptConnection(acceptor); - return; - } - std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket))); - _createdConnections.addAndFetch(1); - if (!serverGlobalParams.quiet.load()) { - const auto word = (connCount == 1 ? " connection"_sd : " connections"_sd); - log() << "connection accepted from " << session->remote() << " #" << session->id() - << " (" << connCount << word << " now open)"; - } - _sep->startSession(std::move(session)); _acceptConnection(acceptor); }; diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 5df87057f0e..c29bc63e56d 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -102,8 +102,6 @@ public: void asyncWait(Ticket&& ticket, TicketCallback callback) final; - Stats sessionStats() final; - void end(const SessionHandle& session) final; Status setup() final; @@ -166,9 +164,6 @@ private: ServiceEntryPoint* const _sep = nullptr; AtomicWord<bool> _running{false}; Options _listenerOptions; - - AtomicWord<size_t> _createdConnections{0}; - AtomicWord<size_t> _currentConnections{0}; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index 920baa5e5c4..5986d1f8fb4 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -153,15 +153,6 @@ Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session, stdx::make_unique<LegacyTicket>(std::move(legacySession), expiration, std::move(sourceCb))); } -TransportLayer::Stats TransportLayerLegacy::sessionStats() { - Stats stats; - stats.numOpenSessions = _currentConnections.load(); - stats.numAvailableSessions = Listener::globalTicketHolder.available(); - stats.numCreatedSessions = Listener::globalConnectionNumber.load(); - - return stats; -} - Ticket TransportLayerLegacy::sinkMessage(const SessionHandle& session, const Message& message, Date_t expiration) { @@ -207,7 +198,6 @@ void TransportLayerLegacy::_closeConnection(Connection* conn) { conn->closed = true; conn->amp->shutdown(); - Listener::globalTicketHolder.release(); } void TransportLayerLegacy::shutdown() { @@ -220,8 +210,6 @@ void TransportLayerLegacy::_destroy(LegacySession& session) { if (!session.conn()->closed) { _closeConnection(session.conn()); } - - _currentConnections.subtractAndFetch(1); } Status TransportLayerLegacy::_runTicket(Ticket ticket) { @@ -268,16 +256,8 @@ Status TransportLayerLegacy::_runTicket(Ticket ticket) { } void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) { - if (!Listener::globalTicketHolder.tryAcquire()) { - log() << "connection refused because too many open connections: " - << Listener::globalTicketHolder.used(); - amp->shutdown(); - return; - } - amp->setLogLevel(logger::LogSeverity::Debug(1)); - _currentConnections.addAndFetch(1); auto session = LegacySession::create(std::move(amp), this); invariant(_sep); _sep->startSession(std::move(session)); diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 07a0d808066..796d47de1f4 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -83,8 +83,6 @@ public: Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - Stats sessionStats() override; - void end(const SessionHandle& session) override; void shutdown() override; @@ -225,7 +223,6 @@ private: stdx::thread _listenerThread; AtomicWord<bool> _running; - AtomicWord<size_t> _currentConnections{0}; Options _options; }; diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp index 39ec7236d3e..3341fa49e47 100644 --- a/src/mongo/transport/transport_layer_legacy_test.cpp +++ b/src/mongo/transport/transport_layer_legacy_test.cpp @@ -61,6 +61,14 @@ public: _sessions.clear(); } + Stats sessionStats() const override { + return {}; + } + + size_t numOpenSessions() const override { + return 0ULL; + } + transport::TransportLayerLegacy* tll = nullptr; std::list<transport::SessionHandle> _sessions; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index c841cf7993c..0bcd5b3a76e 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -70,7 +70,7 @@ void TransportLayerManager::asyncWait(Ticket&& ticket, TicketCallback callback) } template <typename Callable> -void TransportLayerManager::_foreach(Callable&& cb) { +void TransportLayerManager::_foreach(Callable&& cb) const { { stdx::lock_guard<stdx::mutex> lk(_tlsMutex); for (auto&& tl : _tls) { @@ -79,25 +79,6 @@ void TransportLayerManager::_foreach(Callable&& cb) { } } -TransportLayer::Stats TransportLayerManager::sessionStats() { - Stats stats; - - _foreach([&](TransportLayer* tl) { - Stats s = tl->sessionStats(); - - stats.numOpenSessions += s.numOpenSessions; - stats.numCreatedSessions += s.numCreatedSessions; - if (std::numeric_limits<size_t>::max() - stats.numAvailableSessions < - s.numAvailableSessions) { - stats.numAvailableSessions = std::numeric_limits<size_t>::max(); - } else { - stats.numAvailableSessions += s.numAvailableSessions; - } - }); - - return stats; -} - void TransportLayerManager::end(const SessionHandle& session) { session->getTransportLayer()->end(session); } diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 3c91e31719d..8349fc56f32 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -69,8 +69,6 @@ public: Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - Stats sessionStats() override; - void end(const SessionHandle& session) override; Status start() override; @@ -95,9 +93,9 @@ public: private: template <typename Callable> - void _foreach(Callable&& cb); + void _foreach(Callable&& cb) const; - stdx::mutex _tlsMutex; + mutable stdx::mutex _tlsMutex; std::vector<std::unique_ptr<TransportLayer>> _tls; }; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 9b51fda6f5c..44c43b2d236 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -89,10 +89,6 @@ void TransportLayerMock::asyncWait(Ticket&& ticket, TicketCallback callback) { callback(wait(std::move(ticket))); } -TransportLayer::Stats TransportLayerMock::sessionStats() { - return Stats(); -} - SessionHandle TransportLayerMock::createSession() { auto session = MockSession::create(this); Session::Id sessionId = session->id(); diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index 208a7d93c97..fd8670818ad 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -61,8 +61,6 @@ public: Status wait(Ticket&& ticket) override; void asyncWait(Ticket&& ticket, TicketCallback callback) override; - Stats sessionStats() override; - SessionHandle createSession(); SessionHandle get(Session::Id id); bool owns(Session::Id id); |