diff options
Diffstat (limited to 'grid/message.cpp')
-rw-r--r-- | grid/message.cpp | 354 |
1 files changed, 179 insertions, 175 deletions
diff --git a/grid/message.cpp b/grid/message.cpp index 17d451590a1..2035c3021cf 100644 --- a/grid/message.cpp +++ b/grid/message.cpp @@ -1,15 +1,15 @@ /** * Copyright (C) 2008 10gen Inc. -* +* * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. -* +* * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. -* +* * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ @@ -31,83 +31,83 @@ /* listener ------------------------------------------------------------------- */ void Listener::listen() { - SockAddr me(port); - int sock = socket(AF_INET, SOCK_STREAM, 0); - if( sock == INVALID_SOCKET ) { - log() << "ERROR: listen(): invalid socket? " << errno << endl; - return; - } - prebindOptions( sock ); - if( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { - log() << "listen(): bind() failed errno:" << errno << endl; - if( errno == 98 ) - log() << "98 == addr already in use" << endl; - closesocket(sock); - return; - } - - if( ::listen(sock, 128) != 0 ) { - log() << "listen(): listen() failed " << errno << endl; - closesocket(sock); - return; - } - - SockAddr from; - while( 1 ) { - int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); - if( s < 0 ) { - log() << "Listener: accept() returns " << s << " errno:" << errno << endl; - continue; - } - disableNagle(s); - log() << "connection accepted from " << from.toString() << endl; - accepted( new MessagingPort(s, from) ); - } + SockAddr me(port); + int sock = socket(AF_INET, SOCK_STREAM, 0); + if ( sock == INVALID_SOCKET ) { + log() << "ERROR: listen(): invalid socket? " << errno << endl; + return; + } + prebindOptions( sock ); + if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { + log() << "listen(): bind() failed errno:" << errno << endl; + if ( errno == 98 ) + log() << "98 == addr already in use" << endl; + closesocket(sock); + return; + } + + if ( ::listen(sock, 128) != 0 ) { + log() << "listen(): listen() failed " << errno << endl; + closesocket(sock); + return; + } + + SockAddr from; + while ( 1 ) { + int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); + if ( s < 0 ) { + log() << "Listener: accept() returns " << s << " errno:" << errno << endl; + continue; + } + disableNagle(s); + log() << "connection accepted from " << from.toString() << endl; + accepted( new MessagingPort(s, from) ); + } } /* messagingport -------------------------------------------------------------- */ MSGID NextMsgId; struct MsgStart { - MsgStart() { - NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); - assert(MsgDataHeaderSize == 16); - } + MsgStart() { + NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); + assert(MsgDataHeaderSize == 16); + } } msgstart; -// we "new" this so it guaranteed to still be around when other automatic global vars +// we "new" this so it guaranteed to still be around when other automatic global vars // are being destructed during termination. set<MessagingPort*>& ports = *(new set<MessagingPort*>()); -void closeAllSockets() { - for( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) - (*i)->shutdown(); +void closeAllSockets() { + for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) + (*i)->shutdown(); } -MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { - ports.insert(this); +MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { + ports.insert(this); } MessagingPort::MessagingPort() { - ports.insert(this); - sock = -1; + ports.insert(this); + sock = -1; } -void MessagingPort::shutdown() { - if( sock >= 0 ) { - closesocket(sock); - sock = -1; - } +void MessagingPort::shutdown() { + if ( sock >= 0 ) { + closesocket(sock); + sock = -1; + } } -MessagingPort::~MessagingPort() { - shutdown(); - ports.erase(this); +MessagingPort::~MessagingPort() { + shutdown(); + ports.erase(this); } #include "../util/background.h" -class ConnectBG : public BackgroundJob { +class ConnectBG : public BackgroundJob { public: int sock; int res; @@ -119,13 +119,13 @@ public: bool MessagingPort::connect(SockAddr& _far) { - farEnd = _far; + farEnd = _far; - sock = socket(AF_INET, SOCK_STREAM, 0); - if( sock == INVALID_SOCKET ) { - log() << "ERROR: connect(): invalid socket? " << errno << endl; - return false; - } + sock = socket(AF_INET, SOCK_STREAM, 0); + if ( sock == INVALID_SOCKET ) { + log() << "ERROR: connect(): invalid socket? " << errno << endl; + return false; + } #if 0 long fl = fcntl(sock, F_GETFL, 0); @@ -134,12 +134,13 @@ bool MessagingPort::connect(SockAddr& _far) fcntl(sock, F_SETFL, fl); int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); - if( res ) { - if( errno == EINPROGRESS ) - //log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl; - closesocket(sock); sock = -1; - return false; - } + if ( res ) { + if ( errno == EINPROGRESS ) + //log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl; + closesocket(sock); + sock = -1; + return false; + } #endif @@ -149,137 +150,140 @@ bool MessagingPort::connect(SockAddr& _far) bg.go(); // int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); - if( bg.wait(5000) ) { - if( bg.res ) { - closesocket(sock); sock = -1; + if ( bg.wait(5000) ) { + if ( bg.res ) { + closesocket(sock); + sock = -1; return false; } } - else { + else { // time out the connect - closesocket(sock); sock = -1; + closesocket(sock); + sock = -1; bg.wait(); // so bg stays in scope until bg thread terminates return false; } - disableNagle(sock); - return true; + disableNagle(sock); + return true; } bool MessagingPort::recv(Message& m) { again: - mmm( cout << "* recv() sock:" << this->sock << endl; ) - int len = -1; - - char *lenbuf = (char *) &len; - int lft = 4; - while( 1 ) { - int x = ::recv(sock, lenbuf, lft, 0); - if( x == 0 ) { - DEV cout << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; - m.reset(); - return false; - } - if( x < 0 ) { + mmm( cout << "* recv() sock:" << this->sock << endl; ) + int len = -1; + + char *lenbuf = (char *) &len; + int lft = 4; + while ( 1 ) { + int x = ::recv(sock, lenbuf, lft, 0); + if ( x == 0 ) { + DEV cout << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; + m.reset(); + return false; + } + if ( x < 0 ) { log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString()<<endl; - m.reset(); - return false; - } - lft -= x; - if( lft == 0 ) - break; - lenbuf += x; - log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl; - assert( lft > 0 ); - } - - if( len < 0 || len > 16000000 ) { - if( len == -1 ) { - // Endian check from the database, after connecting, to see what mode server is running in. - unsigned foo = 0x10203040; - int x = ::send(sock, (char *) &foo, 4, 0); - if( x <= 0 ) { - log() << "MessagingPort endian send() error " << errno << ' ' << farEnd.toString() << endl; - return false; - } - goto again; - } - log() << "bad recv() len: " << len << '\n'; - return false; - } - - int z = (len+1023)&0xfffffc00; assert(z>=len); - MsgData *md = (MsgData *) malloc(z); - md->len = len; - - if ( len <= 0 ){ - cout << "got a length of " << len << ", something is wrong" << endl; - return false; - } - - char *p = (char *) &md->id; - int left = len -4; - while( 1 ) { - int x = ::recv(sock, p, left, 0); - if( x == 0 ) { - DEV cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; - m.reset(); - return false; - } - if( x < 0 ) { - log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString() << endl; - m.reset(); - return false; - } - left -= x; - p += x; - if( left <= 0 ) - break; - } - - m.setData(md, true); - return true; + m.reset(); + return false; + } + lft -= x; + if ( lft == 0 ) + break; + lenbuf += x; + log() << "MessagingPort recv() got " << x << " bytes wanted 4, lft=" << lft << endl; + assert( lft > 0 ); + } + + if ( len < 0 || len > 16000000 ) { + if ( len == -1 ) { + // Endian check from the database, after connecting, to see what mode server is running in. + unsigned foo = 0x10203040; + int x = ::send(sock, (char *) &foo, 4, 0); + if ( x <= 0 ) { + log() << "MessagingPort endian send() error " << errno << ' ' << farEnd.toString() << endl; + return false; + } + goto again; + } + log() << "bad recv() len: " << len << '\n'; + return false; + } + + int z = (len+1023)&0xfffffc00; + assert(z>=len); + MsgData *md = (MsgData *) malloc(z); + md->len = len; + + if ( len <= 0 ) { + cout << "got a length of " << len << ", something is wrong" << endl; + return false; + } + + char *p = (char *) &md->id; + int left = len -4; + while ( 1 ) { + int x = ::recv(sock, p, left, 0); + if ( x == 0 ) { + DEV cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; + m.reset(); + return false; + } + if ( x < 0 ) { + log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString() << endl; + m.reset(); + return false; + } + left -= x; + p += x; + if ( left <= 0 ) + break; + } + + m.setData(md, true); + return true; } void MessagingPort::reply(Message& received, Message& response) { - say(/*received.from, */response, received.data->id); + say(/*received.from, */response, received.data->id); } void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { - say(/*received.from, */response, responseTo); + say(/*received.from, */response, responseTo); } bool MessagingPort::call(Message& toSend, Message& response) { - mmm( cout << "*call()" << endl; ) - MSGID old = toSend.data->id; - say(/*to,*/ toSend); - while( 1 ) { - bool ok = recv(response); - if( !ok ) - return false; - //cout << "got response: " << response.data->responseTo << endl; - if( response.data->responseTo == toSend.data->id ) - break; - cout << "********************" << endl; - cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl; - cout << " old:" << old << endl; - cout << " response msgid:" << response.data->id << endl; - cout << " response len: " << response.data->len << endl; - assert(false); - response.reset(); - } - mmm( cout << "*call() end" << endl; ) - return true; + mmm( cout << "*call()" << endl; ) + MSGID old = toSend.data->id; + say(/*to,*/ toSend); + while ( 1 ) { + bool ok = recv(response); + if ( !ok ) + return false; + //cout << "got response: " << response.data->responseTo << endl; + if ( response.data->responseTo == toSend.data->id ) + break; + cout << "********************" << endl; + cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl; + cout << " old:" << old << endl; + cout << " response msgid:" << response.data->id << endl; + cout << " response len: " << response.data->len << endl; + assert(false); + response.reset(); + } + mmm( cout << "*call() end" << endl; ) + return true; } void MessagingPort::say(Message& toSend, int responseTo) { - mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) - MSGID msgid = NextMsgId; - ++NextMsgId; - toSend.data->id = msgid; - toSend.data->responseTo = responseTo; - int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0); - if( x <= 0 ) { + mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) + MSGID msgid = NextMsgId; + ++NextMsgId; + toSend.data->id = msgid; + toSend.data->responseTo = responseTo; + int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0); + if ( x <= 0 ) { log() << "MessagingPort say send() error " << errno << ' ' << farEnd.toString() << endl; - } + } } |