summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2014-12-31 18:25:38 -0500
committerBenety Goh <benety@mongodb.com>2015-01-06 13:46:05 -0500
commit9804789bbba304cb0649d7874ef0bb390f536b46 (patch)
tree131534e02a8730c11db39c33fb41625776f96bcb
parent9080534036974a91067cef0ace3aff60c65a123b (diff)
downloadmongo-9804789bbba304cb0649d7874ef0bb390f536b46.tar.gz
SERVER-16705 fixed leak in PortMessageServer::acceptedMP
This leak occurs when the server is shutdown before the connection handling thread starts.
-rw-r--r--src/mongo/util/net/message_server_port.cpp130
1 files changed, 57 insertions, 73 deletions
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp
index 4fc0f96016a..a77a139abc3 100644
--- a/src/mongo/util/net/message_server_port.cpp
+++ b/src/mongo/util/net/message_server_port.cpp
@@ -29,17 +29,13 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-#define MONGO_PCH_WHITELISTED
#include "mongo/platform/basic.h"
-#include "mongo/pch.h"
-#undef MONGO_PCH_WHITELISTED
#include <boost/scoped_ptr.hpp>
#include <boost/thread/thread.hpp>
+#include <memory>
-#ifndef USE_ASIO
-
-
+#include "mongo/base/disallow_copying.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/server_options.h"
#include "mongo/db/stats/counters.h"
@@ -48,11 +44,13 @@
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/message_port.h"
#include "mongo/util/net/message_server.h"
#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/scopeguard.h"
#ifdef __linux__ // TODO: consider making this ifndef _WIN32
# include <sys/resource.h>
@@ -66,6 +64,28 @@ namespace mongo {
using boost::scoped_ptr;
+namespace {
+
+ class MessagingPortWithHandler : public MessagingPort {
+ MONGO_DISALLOW_COPYING(MessagingPortWithHandler);
+
+ public:
+ MessagingPortWithHandler(const boost::shared_ptr<Socket>& socket,
+ MessageHandler* handler,
+ long long connectionId)
+ : MessagingPort(socket), _handler(handler) {
+ setConnectionId(connectionId);
+ }
+
+ MessageHandler* const getHandler() const { return _handler; }
+
+ private:
+ // Not owned.
+ MessageHandler* const _handler;
+ };
+
+} // namespace
+
class PortMessageServer : public MessageServer , public Listener {
public:
/**
@@ -79,24 +99,20 @@ namespace mongo {
Listener( "" , opts.ipList, opts.port ), _handler(handler) {
}
- virtual void acceptedMP(MessagingPort * p) {
+ virtual void accepted(boost::shared_ptr<Socket> psocket, long long connectionId ) {
+ ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
+ std::auto_ptr<MessagingPortWithHandler> portWithHandler(
+ new MessagingPortWithHandler(psocket, _handler, connectionId));
if ( ! Listener::globalTicketHolder.tryAcquire() ) {
log() << "connection refused because too many open connections: " << Listener::globalTicketHolder.used() << endl;
-
- // TODO: would be nice if we notified them...
- p->shutdown();
- delete p;
-
- sleepmillis(2); // otherwise we'll hard loop
return;
}
try {
#ifndef __linux__ // TODO: consider making this ifdef _WIN32
{
- HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler);
- boost::thread thr(stdx::bind(&handleIncomingMsg, himParam));
+ boost::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get()));
}
#else
pthread_attr_t attrs;
@@ -120,8 +136,8 @@ namespace mongo {
pthread_t thread;
- HandleIncomingMsgParam* himParam = new HandleIncomingMsgParam(p, _handler);
- int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, himParam);
+ int failed =
+ pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get());
pthread_attr_destroy(&attrs);
@@ -129,27 +145,19 @@ namespace mongo {
log() << "pthread_create failed: " << errnoWithDescription(failed) << endl;
throw boost::thread_resource_error(); // for consistency with boost::thread
}
-#endif
+#endif // __linux__
+
+ portWithHandler.release();
+ sleepAfterClosingPort.Dismiss();
}
catch ( boost::thread_resource_error& ) {
Listener::globalTicketHolder.release();
log() << "can't create new thread, closing connection" << endl;
-
- p->shutdown();
- delete p;
-
- sleepmillis(2);
}
catch ( ... ) {
Listener::globalTicketHolder.release();
log() << "unknown error accepting new socket" << endl;
-
- p->shutdown();
- delete p;
-
- sleepmillis(2);
}
-
}
virtual void setAsTimeTracker() {
@@ -170,19 +178,6 @@ namespace mongo {
MessageHandler* _handler;
/**
- * Simple holder for threadRun parameters. Should not destroy the objects it holds -
- * it is the responsibility of the caller to take care of them.
- */
- struct HandleIncomingMsgParam {
- HandleIncomingMsgParam(MessagingPort* inPort, MessageHandler* handler):
- inPort(inPort), handler(handler) {
- }
-
- MessagingPort* inPort;
- MessageHandler* handler;
- };
-
- /**
* Handles incoming messages from a given socket.
*
* Terminating conditions:
@@ -197,61 +192,52 @@ namespace mongo {
static void* handleIncomingMsg(void* arg) {
TicketHolderReleaser connTicketReleaser( &Listener::globalTicketHolder );
- scoped_ptr<HandleIncomingMsgParam> himArg(static_cast<HandleIncomingMsgParam*>(arg));
- MessagingPort* inPort = himArg->inPort;
- MessageHandler* handler = himArg->handler;
-
- {
- string threadName = "conn";
- if ( inPort->connectionId() > 0 )
- threadName = str::stream() << threadName << inPort->connectionId();
- setThreadName( threadName.c_str() );
- }
-
- verify( inPort );
- inPort->psock->setLogLevel(logger::LogSeverity::Debug(1));
- scoped_ptr<MessagingPort> p( inPort );
+ invariant(arg);
+ scoped_ptr<MessagingPortWithHandler> portWithHandler(
+ static_cast<MessagingPortWithHandler*>(arg));
+ MessageHandler* const handler = portWithHandler->getHandler();
- string otherSide;
+ setThreadName(std::string(str::stream() << "conn" << portWithHandler->connectionId()));
+ portWithHandler->psock->setLogLevel(logger::LogSeverity::Debug(1));
Message m;
try {
LastError * le = new LastError();
lastError.reset( le ); // lastError now has ownership
- otherSide = p->psock->remoteString();
-
- handler->connected( p.get() );
+ handler->connected(portWithHandler.get());
while ( ! inShutdown() ) {
m.reset();
- p->psock->clearCounters();
+ portWithHandler->psock->clearCounters();
- if ( ! p->recv(m) ) {
+ if (!portWithHandler->recv(m)) {
if (!serverGlobalParams.quiet) {
int conns = Listener::globalTicketHolder.used()-1;
const char* word = (conns == 1 ? " connection" : " connections");
- log() << "end connection " << otherSide << " (" << conns << word << " now open)" << endl;
+ log() << "end connection " << portWithHandler->psock->remoteString()
+ << " (" << conns << word << " now open)" << endl;
}
- p->shutdown();
+ portWithHandler->shutdown();
break;
}
- handler->process( m , p.get() , le );
- networkCounter.hit( p->psock->getBytesIn() , p->psock->getBytesOut() );
+ handler->process(m, portWithHandler.get(), le);
+ networkCounter.hit(portWithHandler->psock->getBytesIn(),
+ portWithHandler->psock->getBytesOut());
}
}
catch ( AssertionException& e ) {
log() << "AssertionException handling request, closing client connection: " << e << endl;
- p->shutdown();
+ portWithHandler->shutdown();
}
catch ( SocketException& e ) {
log() << "SocketException handling request, closing client connection: " << e << endl;
- p->shutdown();
+ portWithHandler->shutdown();
}
catch ( const DBException& e ) { // must be right above std::exception to avoid catching subclasses
log() << "DBException handling request, closing client connection: " << e << endl;
- p->shutdown();
+ portWithHandler->shutdown();
}
catch ( std::exception &e ) {
error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
@@ -264,7 +250,7 @@ namespace mongo {
if (manager)
manager->cleanupThreadLocals();
#endif
- handler->disconnected( p.get() );
+ handler->disconnected(portWithHandler.get());
return NULL;
}
@@ -275,6 +261,4 @@ namespace mongo {
return new PortMessageServer( opts , handler );
}
-}
-
-#endif
+} // namespace mongo