diff options
author | Benety Goh <benety@mongodb.com> | 2014-12-31 18:25:38 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-01-06 13:46:05 -0500 |
commit | 9804789bbba304cb0649d7874ef0bb390f536b46 (patch) | |
tree | 131534e02a8730c11db39c33fb41625776f96bcb | |
parent | 9080534036974a91067cef0ace3aff60c65a123b (diff) | |
download | mongo-9804789bbba304cb0649d7874ef0bb390f536b46.tar.gz |
SERVER-16705 fixed leak in PortMessageServer::acceptedMP
This leak occurs when the server is shutdown before the connection handling thread starts.
-rw-r--r-- | src/mongo/util/net/message_server_port.cpp | 130 |
1 files changed, 57 insertions, 73 deletions
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp index 4fc0f96016a..a77a139abc3 100644 --- a/src/mongo/util/net/message_server_port.cpp +++ b/src/mongo/util/net/message_server_port.cpp @@ -29,17 +29,13 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork -#define MONGO_PCH_WHITELISTED #include "mongo/platform/basic.h" -#include "mongo/pch.h" -#undef MONGO_PCH_WHITELISTED #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> +#include <memory> -#ifndef USE_ASIO - - +#include "mongo/base/disallow_copying.h" #include "mongo/db/lasterror.h" #include "mongo/db/server_options.h" #include "mongo/db/stats/counters.h" @@ -48,11 +44,13 @@ #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" #include "mongo/util/net/ssl_manager.h" +#include "mongo/util/scopeguard.h" #ifdef __linux__ // TODO: consider making this ifndef _WIN32 # include <sys/resource.h> @@ -66,6 +64,28 @@ namespace mongo { using boost::scoped_ptr; +namespace { + + class MessagingPortWithHandler : public MessagingPort { + MONGO_DISALLOW_COPYING(MessagingPortWithHandler); + + public: + MessagingPortWithHandler(const boost::shared_ptr<Socket>& socket, + MessageHandler* handler, + long long connectionId) + : MessagingPort(socket), _handler(handler) { + setConnectionId(connectionId); + } + + MessageHandler* const getHandler() const { return _handler; } + + private: + // Not owned. + MessageHandler* const _handler; + }; + +} // namespace + class PortMessageServer : public MessageServer , public Listener { public: /** @@ -79,24 +99,20 @@ namespace mongo { Listener( "" , opts.ipList, opts.port ), _handler(handler) { } - virtual void acceptedMP(MessagingPort * p) { + virtual void accepted(boost::shared_ptr<Socket> psocket, long long connectionId ) { + ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2); + std::auto_ptr<MessagingPortWithHandler> portWithHandler( + new MessagingPortWithHandler(psocket, _handler, connectionId)); if ( ! Listener::globalTicketHolder.tryAcquire() ) { log() << "connection refused because too many open connections: " << Listener::globalTicketHolder.used() << endl; - - // TODO: would be nice if we notified them... - p->shutdown(); - delete p; - - sleepmillis(2); // otherwise we'll hard loop return; } try { #ifndef __linux__ // TODO: consider making this ifdef _WIN32 { - HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler); - boost::thread thr(stdx::bind(&handleIncomingMsg, himParam)); + boost::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get())); } #else pthread_attr_t attrs; @@ -120,8 +136,8 @@ namespace mongo { pthread_t thread; - HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler); - int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, himParam); + int failed = + pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get()); pthread_attr_destroy(&attrs); @@ -129,27 +145,19 @@ namespace mongo { log() << "pthread_create failed: " << errnoWithDescription(failed) << endl; throw boost::thread_resource_error(); // for consistency with boost::thread } -#endif +#endif // __linux__ + + portWithHandler.release(); + sleepAfterClosingPort.Dismiss(); } catch ( boost::thread_resource_error& ) { Listener::globalTicketHolder.release(); log() << "can't create new thread, closing connection" << endl; - - p->shutdown(); - delete p; - - sleepmillis(2); } catch ( ... ) { Listener::globalTicketHolder.release(); log() << "unknown error accepting new socket" << endl; - - p->shutdown(); - delete p; - - sleepmillis(2); } - } virtual void setAsTimeTracker() { @@ -170,19 +178,6 @@ namespace mongo { MessageHandler* _handler; /** - * Simple holder for threadRun parameters. Should not destroy the objects it holds - - * it is the responsibility of the caller to take care of them. - */ - struct HandleIncomingMsgParam { - HandleIncomingMsgParam(MessagingPort* inPort, MessageHandler* handler): - inPort(inPort), handler(handler) { - } - - MessagingPort* inPort; - MessageHandler* handler; - }; - - /** * Handles incoming messages from a given socket. * * Terminating conditions: @@ -197,61 +192,52 @@ namespace mongo { static void* handleIncomingMsg(void* arg) { TicketHolderReleaser connTicketReleaser( &Listener::globalTicketHolder ); - scoped_ptr<HandleIncomingMsgParam> himArg(static_cast<HandleIncomingMsgParam*>(arg)); - MessagingPort* inPort = himArg->inPort; - MessageHandler* handler = himArg->handler; - - { - string threadName = "conn"; - if ( inPort->connectionId() > 0 ) - threadName = str::stream() << threadName << inPort->connectionId(); - setThreadName( threadName.c_str() ); - } - - verify( inPort ); - inPort->psock->setLogLevel(logger::LogSeverity::Debug(1)); - scoped_ptr<MessagingPort> p( inPort ); + invariant(arg); + scoped_ptr<MessagingPortWithHandler> portWithHandler( + static_cast<MessagingPortWithHandler*>(arg)); + MessageHandler* const handler = portWithHandler->getHandler(); - string otherSide; + setThreadName(std::string(str::stream() << "conn" << portWithHandler->connectionId())); + portWithHandler->psock->setLogLevel(logger::LogSeverity::Debug(1)); Message m; try { LastError * le = new LastError(); lastError.reset( le ); // lastError now has ownership - otherSide = p->psock->remoteString(); - - handler->connected( p.get() ); + handler->connected(portWithHandler.get()); while ( ! inShutdown() ) { m.reset(); - p->psock->clearCounters(); + portWithHandler->psock->clearCounters(); - if ( ! p->recv(m) ) { + if (!portWithHandler->recv(m)) { if (!serverGlobalParams.quiet) { int conns = Listener::globalTicketHolder.used()-1; const char* word = (conns == 1 ? " connection" : " connections"); - log() << "end connection " << otherSide << " (" << conns << word << " now open)" << endl; + log() << "end connection " << portWithHandler->psock->remoteString() + << " (" << conns << word << " now open)" << endl; } - p->shutdown(); + portWithHandler->shutdown(); break; } - handler->process( m , p.get() , le ); - networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() ); + handler->process(m, portWithHandler.get(), le); + networkCounter.hit(portWithHandler->psock->getBytesIn(), + portWithHandler->psock->getBytesOut()); } } catch ( AssertionException& e ) { log() << "AssertionException handling request, closing client connection: " << e << endl; - p->shutdown(); + portWithHandler->shutdown(); } catch ( SocketException& e ) { log() << "SocketException handling request, closing client connection: " << e << endl; - p->shutdown(); + portWithHandler->shutdown(); } catch ( const DBException& e ) { // must be right above std::exception to avoid catching subclasses log() << "DBException handling request, closing client connection: " << e << endl; - p->shutdown(); + portWithHandler->shutdown(); } catch ( std::exception &e ) { error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; @@ -264,7 +250,7 @@ namespace mongo { if (manager) manager->cleanupThreadLocals(); #endif - handler->disconnected( p.get() ); + handler->disconnected(portWithHandler.get()); return NULL; } @@ -275,6 +261,4 @@ namespace mongo { return new PortMessageServer( opts , handler ); } -} - -#endif +} // namespace mongo |