diff options
author | Aaron <aaron@10gen.com> | 2010-04-08 12:14:20 -0700 |
---|---|---|
committer | Aaron <aaron@10gen.com> | 2010-04-08 12:14:20 -0700 |
commit | 1b0b94cb9cb43421f93e5121d9c9c5402756a729 (patch) | |
tree | 27e54f7f14bf6c6c494abef8803f950c0e00d7c4 | |
parent | ec721d26e0ca3fe324ae5e67f54b87f6714abe4a (diff) | |
download | mongo-1b0b94cb9cb43421f93e5121d9c9c5402756a729.tar.gz |
SERVER-832 use send/recv timeouts
-rw-r--r-- | client/dbclient.cpp | 12 | ||||
-rw-r--r-- | client/dbclient.h | 9 | ||||
-rw-r--r-- | db/repl.cpp | 2 | ||||
-rw-r--r-- | util/httpclient.cpp | 10 | ||||
-rw-r--r-- | util/message.cpp | 217 | ||||
-rw-r--r-- | util/message.h | 16 | ||||
-rw-r--r-- | util/sock.h | 15 |
7 files changed, 157 insertions, 124 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp index 581a8db4324..5a4a267cc61 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -451,7 +451,7 @@ namespace mongo { // we keep around SockAddr for connection life -- maybe MessagingPort // requires that? server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port)); - p = auto_ptr<MessagingPort>(new MessagingPort()); + p = auto_ptr<MessagingPort>(new MessagingPort( _timeout )); if (server->getAddr() == "0.0.0.0"){ failed = true; @@ -1006,4 +1006,14 @@ namespace mongo { } } + bool serverAlive( const string &uri ) { + DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts + string err; + if ( !c.connect( uri, err ) ) + return false; + if ( !c.simpleCommand( "admin", 0, "ping" ) ) + return false; + return true; + } + } // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h index be344719a89..9d9f7ea83b2 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -754,14 +754,16 @@ namespace mongo { void _checkConnection(); void checkConnection() { if( failed ) _checkConnection(); } map< string, pair<string,string> > authCache; + int _timeout; public: /** @param _autoReconnect if true, automatically reconnect on a connection failure @param cp used by DBClientPaired. You do not need to specify this parameter + @param timeout tcp timeout in seconds - this is for read/write, not connect */ - DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) : - clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { } + DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0,int timeout=0) : + clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _timeout(timeout) { } /** Connect to a Mongo database server. @@ -924,6 +926,9 @@ namespace mongo { } }; + /** pings server to check if it's up + */ + bool serverAlive( const string &uri ); DBClientBase * createDirectClient(); diff --git a/db/repl.cpp b/db/repl.cpp index 2ab3a11b5a3..14b6df44f16 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -1606,7 +1606,7 @@ namespace mongo { bool ReplSource::connect() { if ( conn.get() == 0 ) { - conn = auto_ptr<DBClientConnection>(new DBClientConnection()); + conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, replPair ? 20 : 0 /* tcp timeout */)); string errmsg; ReplInfo r("trying to connect to sync source"); if ( !conn->connect(hostName.c_str(), errmsg) || diff --git a/util/httpclient.cpp b/util/httpclient.cpp index 08b6220569e..32a7c25d330 100644 --- a/util/httpclient.cpp +++ b/util/httpclient.cpp @@ -94,15 +94,11 @@ namespace mongo { { const char * out = req.c_str(); int toSend = req.size(); - while ( toSend ){ - int did = p.send( out , toSend ); - toSend -= did; - out += did; - } + p.send( out , toSend, "_go" ); } char buf[4096]; - int got = p.recv( buf , 4096 ); + int got = p.unsafe_recv( buf , 4096 ); buf[got] = 0; int rc; @@ -114,7 +110,7 @@ namespace mongo { if ( result ) sb << buf; - while ( ( got = p.recv( buf , 4096 ) ) > 0){ + while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0){ if ( result ) sb << buf; } diff --git a/util/message.cpp b/util/message.cpp index 948284764eb..b469218c18b 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -26,6 +26,7 @@ #include <fcntl.h> #include <errno.h> #include "../db/cmdline.h" +#include "../client/dbclient.h" namespace mongo { @@ -107,6 +108,7 @@ namespace mongo { } prebindOptions( sock ); + if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { int x = errno; log() << "listen(): bind() failed " << OUTPUT_ERRNOX(x) << " for socket: " << me.toString() << endl; @@ -188,8 +190,10 @@ namespace mongo { } ~PiggyBackData() { - flush(); - delete( _cur ); + DESTRUCTOR_GUARD ( + flush(); + delete( _cur ); + ); } void append( Message& m ) { @@ -202,13 +206,12 @@ namespace mongo { _cur += m.data->len; } - int flush() { + void flush() { if ( _buf == _cur ) - return 0; + return; - int x = _port->send( _buf , len() ); + _port->send( _buf , len(), "flush" ); _cur = _buf; - return x; } int len() { @@ -251,14 +254,15 @@ namespace mongo { ports.closeAll(); } - MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far) { + MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout() { ports.insert(this); } - MessagingPort::MessagingPort() { + MessagingPort::MessagingPort( int timeout ) { ports.insert(this); sock = -1; piggyBackData = 0; + _timeout = timeout; } void MessagingPort::shutdown() { @@ -295,6 +299,10 @@ namespace mongo { return false; } + if ( _timeout > 0 ) { + setSockTimeouts( sock, _timeout ); + } + ConnectBG bg; bg.sock = sock; bg.farEnd = farEnd; @@ -328,93 +336,61 @@ namespace mongo { } bool MessagingPort::recv(Message& m) { -again: - mmm( out() << "* recv() sock:" << this->sock << endl; ) - int len = -1; - - char *lenbuf = (char *) &len; - int lft = 4; - while ( 1 ) { - int x = recv( lenbuf, lft ); - if ( x == 0 ) { - DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; - m.reset(); - return false; - } - if ( x < 0 ) { - log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl; - m.reset(); - return false; - } - lft -= x; - if ( lft == 0 ) - break; - lenbuf += x; - log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl; - assert( lft > 0 ); - } - - if ( len < 0 || len > 16000000 ) { - if ( len == -1 ) { - // Endian check from the database, after connecting, to see what mode server is running in. - unsigned foo = 0x10203040; - int x = send( (char *) &foo, 4 ); - if ( x <= 0 ) { - log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; + try { + again: + mmm( out() << "* recv() sock:" << this->sock << endl; ) + int len = -1; + + char *lenbuf = (char *) &len; + int lft = 4; + recv( lenbuf, lft ); + + if ( len < 0 || len > 16000000 ) { + if ( len == -1 ) { + // Endian check from the database, after connecting, to see what mode server is running in. + unsigned foo = 0x10203040; + send( (char *) &foo, 4, "endian" ); + goto again; + } + + if ( len == 542393671 ){ + // an http GET + log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; + string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; + stringstream ss; + ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; + string s = ss.str(); + send( s.c_str(), s.size(), "http" ); return false; } - goto again; - } - - if ( len == 542393671 ){ - // an http GET - log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl; - string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n"; - stringstream ss; - ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; - string s = ss.str(); - send( s.c_str(), s.size() ); - return false; - } - log() << "bad recv() len: " << len << '\n'; - return false; - } - - int z = (len+1023)&0xfffffc00; - assert(z>=len); - MsgData *md = (MsgData *) malloc(z); - assert(md); - md->len = len; - - if ( len <= 0 ) { - out() << "got a length of " << len << ", something is wrong" << endl; - return false; - } - - char *p = (char *) &md->id; - int left = len -4; - while ( 1 ) { - int x = recv( p, left ); - if ( x == 0 ) { - DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; - m.reset(); + log() << "bad recv() len: " << len << '\n'; return false; } - if ( x < 0 ) { - log() << "MessagingPort recv() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; - m.reset(); + + int z = (len+1023)&0xfffffc00; + assert(z>=len); + MsgData *md = (MsgData *) malloc(z); + assert(md); + md->len = len; + + if ( len <= 0 ) { + out() << "got a length of " << len << ", something is wrong" << endl; return false; } - left -= x; - p += x; - if ( left <= 0 ) - break; + + char *p = (char *) &md->id; + int left = len -4; + recv( p, left ); + + m.setData(md, true); + return true; + + } catch ( const SocketException & ) { + m.reset(); + return false; } - - m.setData(md, true); - return true; } - + void MessagingPort::reply(Message& received, Message& response) { say(/*received.from, */response, received.data->id); } @@ -454,8 +430,6 @@ again: toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; - int x = -100; - if ( piggyBackData && piggyBackData->len() ) { mmm( out() << "* have piggy back" << endl; ) if ( ( piggyBackData->len() + toSend.data->len ) > 1300 ) { @@ -464,28 +438,67 @@ again: } else { piggyBackData->append( toSend ); - x = piggyBackData->flush(); + piggyBackData->flush(); + return; } } - if ( x == -100 ) - x = send( (char*)toSend.data, toSend.data->len ); - - if ( x <= 0 ) { - log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; - throw SocketException(); - } - + send( (char*)toSend.data, toSend.data->len, "say" ); } - int MessagingPort::send( const char * data , const int len ){ - return ::send( sock , data , len , portSendFlags ); + // sends all data or throws an exception + void MessagingPort::send( const char * data , int len, const char *context ){ + while( len > 0 ) { + int ret = ::send( sock , data , len , portSendFlags ); + if ( ret == -1 ) { + if ( errno != EAGAIN || _timeout == 0 ) { + log() << "MessagingPort " << context << " send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; + throw SocketException(); + } else { + if ( !serverAlive( farEnd.toString() ) ) { + log() << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl; + throw SocketException(); + } + } + } else { + assert( ret <= len ); + len -= ret; + data += ret; + } + } } - int MessagingPort::recv( char * buf , int max ){ - return ::recv( sock , buf , max , portRecvFlags ); + void MessagingPort::recv( char * buf , int len ){ + while( len > 0 ) { + int ret = ::recv( sock , buf , len , portRecvFlags ); + if ( ret == 0 ) { + DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; + throw SocketException(); + } + if ( ret == -1 ) { + if ( errno != EAGAIN || _timeout == 0 ) { + log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl; + throw SocketException(); + } else { + if ( !serverAlive( farEnd.toString() ) ) { + log() << "MessagingPort recv() remote dead " << farEnd.toString() << endl; + throw SocketException(); + } + } + } else { + if ( len <= 4 && ret != len ) + log() << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; + assert( ret <= len ); + len -= ret; + buf += ret; + } + } } + int MessagingPort::unsafe_recv( char *buf, int max ) { + return ::recv( sock , buf , max , portRecvFlags ); + } + void MessagingPort::piggyBack( Message& toSend , int responseTo ) { if ( toSend.data->len > 1300 ) { diff --git a/util/message.h b/util/message.h index 8175aba30d1..7b8f1c4c58c 100644 --- a/util/message.h +++ b/util/message.h @@ -59,7 +59,12 @@ namespace mongo { class MessagingPort : public AbstractMessagingPort { public: MessagingPort(int sock, const SockAddr& farEnd); - MessagingPort(); + + // in some cases the timeout will actually be 2x this value - eg we do a partial send, + // then the timeout fires, then we try to send again, then the timeout fires again with + // no data sent, then we detect that the other side is down + MessagingPort(int timeout = 0); + virtual ~MessagingPort(); void shutdown(); @@ -79,13 +84,18 @@ namespace mongo { virtual unsigned remotePort(); - int send( const char * data , const int len ); - int recv( char * data , int max ); + // send len or throw SocketException + void send( const char * data , int len, const char *context ); + // recv len or throw SocketException + void recv( char * data , int len ); + + int unsafe_recv( char *buf, int max ); private: int sock; PiggyBackData * piggyBackData; public: SockAddr farEnd; + int _timeout; friend class PiggyBackData; }; diff --git a/util/sock.h b/util/sock.h index 99d5f8868e2..70b1d586302 100644 --- a/util/sock.h +++ b/util/sock.h @@ -101,15 +101,14 @@ namespace mongo { return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock"; } - inline void setSockReceiveTimeout(int sock, int secs) { -// todo - finish - works? + inline void setSockTimeouts(int sock, int secs) { struct timeval tv; - tv.tv_sec = 0;//secs; - tv.tv_usec = 1000; - int rc = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv)); - if ( rc ) { - out() << "ERROR: setsockopt RCVTIMEO failed rc:" << rc << " " << OUTPUT_ERRNO << " secs:" << secs << " sock:" << sock << endl; - } + tv.tv_sec = secs; + tv.tv_usec = 0; + massert( 13083, "unable to set SO_RCVTIMEO", + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof(tv) ) == 0 ); + massert( 13084, "unable to set SO_SNDTIMEO", + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof(tv) ) == 0 ); } // If an ip address is passed in, just return that. If a hostname is passed |