summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron <aaron@10gen.com>2010-04-08 12:14:20 -0700
committerAaron <aaron@10gen.com>2010-04-08 12:14:20 -0700
commit1b0b94cb9cb43421f93e5121d9c9c5402756a729 (patch)
tree27e54f7f14bf6c6c494abef8803f950c0e00d7c4
parentec721d26e0ca3fe324ae5e67f54b87f6714abe4a (diff)
downloadmongo-1b0b94cb9cb43421f93e5121d9c9c5402756a729.tar.gz
SERVER-832 use send/recv timeouts
-rw-r--r--client/dbclient.cpp12
-rw-r--r--client/dbclient.h9
-rw-r--r--db/repl.cpp2
-rw-r--r--util/httpclient.cpp10
-rw-r--r--util/message.cpp217
-rw-r--r--util/message.h16
-rw-r--r--util/sock.h15
7 files changed, 157 insertions, 124 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index 581a8db4324..5a4a267cc61 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -451,7 +451,7 @@ namespace mongo {
// we keep around SockAddr for connection life -- maybe MessagingPort
// requires that?
server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
- p = auto_ptr<MessagingPort>(new MessagingPort());
+ p = auto_ptr<MessagingPort>(new MessagingPort( _timeout ));
if (server->getAddr() == "0.0.0.0"){
failed = true;
@@ -1006,4 +1006,14 @@ namespace mongo {
}
}
+ bool serverAlive( const string &uri ) {
+ DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts
+ string err;
+ if ( !c.connect( uri, err ) )
+ return false;
+ if ( !c.simpleCommand( "admin", 0, "ping" ) )
+ return false;
+ return true;
+ }
+
} // namespace mongo
diff --git a/client/dbclient.h b/client/dbclient.h
index be344719a89..9d9f7ea83b2 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -754,14 +754,16 @@ namespace mongo {
void _checkConnection();
void checkConnection() { if( failed ) _checkConnection(); }
map< string, pair<string,string> > authCache;
+ int _timeout;
public:
/**
@param _autoReconnect if true, automatically reconnect on a connection failure
@param cp used by DBClientPaired. You do not need to specify this parameter
+ @param timeout tcp timeout in seconds - this is for read/write, not connect
*/
- DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) :
- clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { }
+ DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0,int timeout=0) :
+ clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _timeout(timeout) { }
/** Connect to a Mongo database server.
@@ -924,6 +926,9 @@ namespace mongo {
}
};
+ /** pings server to check if it's up
+ */
+ bool serverAlive( const string &uri );
DBClientBase * createDirectClient();
diff --git a/db/repl.cpp b/db/repl.cpp
index 2ab3a11b5a3..14b6df44f16 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -1606,7 +1606,7 @@ namespace mongo {
bool ReplSource::connect() {
if ( conn.get() == 0 ) {
- conn = auto_ptr<DBClientConnection>(new DBClientConnection());
+ conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, replPair ? 20 : 0 /* tcp timeout */));
string errmsg;
ReplInfo r("trying to connect to sync source");
if ( !conn->connect(hostName.c_str(), errmsg) ||
diff --git a/util/httpclient.cpp b/util/httpclient.cpp
index 08b6220569e..32a7c25d330 100644
--- a/util/httpclient.cpp
+++ b/util/httpclient.cpp
@@ -94,15 +94,11 @@ namespace mongo {
{
const char * out = req.c_str();
int toSend = req.size();
- while ( toSend ){
- int did = p.send( out , toSend );
- toSend -= did;
- out += did;
- }
+ p.send( out , toSend, "_go" );
}
char buf[4096];
- int got = p.recv( buf , 4096 );
+ int got = p.unsafe_recv( buf , 4096 );
buf[got] = 0;
int rc;
@@ -114,7 +110,7 @@ namespace mongo {
if ( result )
sb << buf;
- while ( ( got = p.recv( buf , 4096 ) ) > 0){
+ while ( ( got = p.unsafe_recv( buf , 4096 ) ) > 0){
if ( result )
sb << buf;
}
diff --git a/util/message.cpp b/util/message.cpp
index 948284764eb..b469218c18b 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -26,6 +26,7 @@
#include <fcntl.h>
#include <errno.h>
#include "../db/cmdline.h"
+#include "../client/dbclient.h"
namespace mongo {
@@ -107,6 +108,7 @@ namespace mongo {
}
prebindOptions( sock );
+
if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) {
int x = errno;
log() << "listen(): bind() failed " << OUTPUT_ERRNOX(x) << " for socket: " << me.toString() << endl;
@@ -188,8 +190,10 @@ namespace mongo {
}
~PiggyBackData() {
- flush();
- delete( _cur );
+ DESTRUCTOR_GUARD (
+ flush();
+ delete( _cur );
+ );
}
void append( Message& m ) {
@@ -202,13 +206,12 @@ namespace mongo {
_cur += m.data->len;
}
- int flush() {
+ void flush() {
if ( _buf == _cur )
- return 0;
+ return;
- int x = _port->send( _buf , len() );
+ _port->send( _buf , len(), "flush" );
_cur = _buf;
- return x;
}
int len() {
@@ -251,14 +254,15 @@ namespace mongo {
ports.closeAll();
}
- MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far) {
+ MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout() {
ports.insert(this);
}
- MessagingPort::MessagingPort() {
+ MessagingPort::MessagingPort( int timeout ) {
ports.insert(this);
sock = -1;
piggyBackData = 0;
+ _timeout = timeout;
}
void MessagingPort::shutdown() {
@@ -295,6 +299,10 @@ namespace mongo {
return false;
}
+ if ( _timeout > 0 ) {
+ setSockTimeouts( sock, _timeout );
+ }
+
ConnectBG bg;
bg.sock = sock;
bg.farEnd = farEnd;
@@ -328,93 +336,61 @@ namespace mongo {
}
bool MessagingPort::recv(Message& m) {
-again:
- mmm( out() << "* recv() sock:" << this->sock << endl; )
- int len = -1;
-
- char *lenbuf = (char *) &len;
- int lft = 4;
- while ( 1 ) {
- int x = recv( lenbuf, lft );
- if ( x == 0 ) {
- DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if ( x < 0 ) {
- log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl;
- m.reset();
- return false;
- }
- lft -= x;
- if ( lft == 0 )
- break;
- lenbuf += x;
- log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl;
- assert( lft > 0 );
- }
-
- if ( len < 0 || len > 16000000 ) {
- if ( len == -1 ) {
- // Endian check from the database, after connecting, to see what mode server is running in.
- unsigned foo = 0x10203040;
- int x = send( (char *) &foo, 4 );
- if ( x <= 0 ) {
- log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
+ try {
+ again:
+ mmm( out() << "* recv() sock:" << this->sock << endl; )
+ int len = -1;
+
+ char *lenbuf = (char *) &len;
+ int lft = 4;
+ recv( lenbuf, lft );
+
+ if ( len < 0 || len > 16000000 ) {
+ if ( len == -1 ) {
+ // Endian check from the database, after connecting, to see what mode server is running in.
+ unsigned foo = 0x10203040;
+ send( (char *) &foo, 4, "endian" );
+ goto again;
+ }
+
+ if ( len == 542393671 ){
+ // an http GET
+ log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
+ string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
+ stringstream ss;
+ ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
+ string s = ss.str();
+ send( s.c_str(), s.size(), "http" );
return false;
}
- goto again;
- }
-
- if ( len == 542393671 ){
- // an http GET
- log() << "looks like you're trying to access db over http on native driver port. please add 1000 for webserver" << endl;
- string msg = "You are trying to access MongoDB on the native driver port. For http diagnostic access, add 1000 to the port number\n";
- stringstream ss;
- ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
- string s = ss.str();
- send( s.c_str(), s.size() );
- return false;
- }
- log() << "bad recv() len: " << len << '\n';
- return false;
- }
-
- int z = (len+1023)&0xfffffc00;
- assert(z>=len);
- MsgData *md = (MsgData *) malloc(z);
- assert(md);
- md->len = len;
-
- if ( len <= 0 ) {
- out() << "got a length of " << len << ", something is wrong" << endl;
- return false;
- }
-
- char *p = (char *) &md->id;
- int left = len -4;
- while ( 1 ) {
- int x = recv( p, left );
- if ( x == 0 ) {
- DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
+ log() << "bad recv() len: " << len << '\n';
return false;
}
- if ( x < 0 ) {
- log() << "MessagingPort recv() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
- m.reset();
+
+ int z = (len+1023)&0xfffffc00;
+ assert(z>=len);
+ MsgData *md = (MsgData *) malloc(z);
+ assert(md);
+ md->len = len;
+
+ if ( len <= 0 ) {
+ out() << "got a length of " << len << ", something is wrong" << endl;
return false;
}
- left -= x;
- p += x;
- if ( left <= 0 )
- break;
+
+ char *p = (char *) &md->id;
+ int left = len -4;
+ recv( p, left );
+
+ m.setData(md, true);
+ return true;
+
+ } catch ( const SocketException & ) {
+ m.reset();
+ return false;
}
-
- m.setData(md, true);
- return true;
}
-
+
void MessagingPort::reply(Message& received, Message& response) {
say(/*received.from, */response, received.data->id);
}
@@ -454,8 +430,6 @@ again:
toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
- int x = -100;
-
if ( piggyBackData && piggyBackData->len() ) {
mmm( out() << "* have piggy back" << endl; )
if ( ( piggyBackData->len() + toSend.data->len ) > 1300 ) {
@@ -464,28 +438,67 @@ again:
}
else {
piggyBackData->append( toSend );
- x = piggyBackData->flush();
+ piggyBackData->flush();
+ return;
}
}
- if ( x == -100 )
- x = send( (char*)toSend.data, toSend.data->len );
-
- if ( x <= 0 ) {
- log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
- throw SocketException();
- }
-
+ send( (char*)toSend.data, toSend.data->len, "say" );
}
- int MessagingPort::send( const char * data , const int len ){
- return ::send( sock , data , len , portSendFlags );
+ // sends all data or throws an exception
+ void MessagingPort::send( const char * data , int len, const char *context ){
+ while( len > 0 ) {
+ int ret = ::send( sock , data , len , portSendFlags );
+ if ( ret == -1 ) {
+ if ( errno != EAGAIN || _timeout == 0 ) {
+ log() << "MessagingPort " << context << " send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
+ throw SocketException();
+ } else {
+ if ( !serverAlive( farEnd.toString() ) ) {
+ log() << "MessagingPort " << context << " send() remote dead " << farEnd.toString() << endl;
+ throw SocketException();
+ }
+ }
+ } else {
+ assert( ret <= len );
+ len -= ret;
+ data += ret;
+ }
+ }
}
- int MessagingPort::recv( char * buf , int max ){
- return ::recv( sock , buf , max , portRecvFlags );
+ void MessagingPort::recv( char * buf , int len ){
+ while( len > 0 ) {
+ int ret = ::recv( sock , buf , len , portRecvFlags );
+ if ( ret == 0 ) {
+ DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
+ throw SocketException();
+ }
+ if ( ret == -1 ) {
+ if ( errno != EAGAIN || _timeout == 0 ) {
+ log() << "MessagingPort recv() " << OUTPUT_ERRNO << " " << farEnd.toString()<<endl;
+ throw SocketException();
+ } else {
+ if ( !serverAlive( farEnd.toString() ) ) {
+ log() << "MessagingPort recv() remote dead " << farEnd.toString() << endl;
+ throw SocketException();
+ }
+ }
+ } else {
+ if ( len <= 4 && ret != len )
+ log() << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl;
+ assert( ret <= len );
+ len -= ret;
+ buf += ret;
+ }
+ }
}
+ int MessagingPort::unsafe_recv( char *buf, int max ) {
+ return ::recv( sock , buf , max , portRecvFlags );
+ }
+
void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
if ( toSend.data->len > 1300 ) {
diff --git a/util/message.h b/util/message.h
index 8175aba30d1..7b8f1c4c58c 100644
--- a/util/message.h
+++ b/util/message.h
@@ -59,7 +59,12 @@ namespace mongo {
class MessagingPort : public AbstractMessagingPort {
public:
MessagingPort(int sock, const SockAddr& farEnd);
- MessagingPort();
+
+ // in some cases the timeout will actually be 2x this value - eg we do a partial send,
+ // then the timeout fires, then we try to send again, then the timeout fires again with
+ // no data sent, then we detect that the other side is down
+ MessagingPort(int timeout = 0);
+
virtual ~MessagingPort();
void shutdown();
@@ -79,13 +84,18 @@ namespace mongo {
virtual unsigned remotePort();
- int send( const char * data , const int len );
- int recv( char * data , int max );
+ // send len or throw SocketException
+ void send( const char * data , int len, const char *context );
+ // recv len or throw SocketException
+ void recv( char * data , int len );
+
+ int unsafe_recv( char *buf, int max );
private:
int sock;
PiggyBackData * piggyBackData;
public:
SockAddr farEnd;
+ int _timeout;
friend class PiggyBackData;
};
diff --git a/util/sock.h b/util/sock.h
index 99d5f8868e2..70b1d586302 100644
--- a/util/sock.h
+++ b/util/sock.h
@@ -101,15 +101,14 @@ namespace mongo {
return "/tmp/mongodb-" + BSONObjBuilder::numStr(port) + ".sock";
}
- inline void setSockReceiveTimeout(int sock, int secs) {
-// todo - finish - works?
+ inline void setSockTimeouts(int sock, int secs) {
struct timeval tv;
- tv.tv_sec = 0;//secs;
- tv.tv_usec = 1000;
- int rc = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *) &tv, sizeof(tv));
- if ( rc ) {
- out() << "ERROR: setsockopt RCVTIMEO failed rc:" << rc << " " << OUTPUT_ERRNO << " secs:" << secs << " sock:" << sock << endl;
- }
+ tv.tv_sec = secs;
+ tv.tv_usec = 0;
+ massert( 13083, "unable to set SO_RCVTIMEO",
+ setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (void *) &tv, sizeof(tv) ) == 0 );
+ massert( 13084, "unable to set SO_SNDTIMEO",
+ setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (void *) &tv, sizeof(tv) ) == 0 );
}
// If an ip address is passed in, just return that. If a hostname is passed