diff options
author | Dwight <dmerriman@gmail.com> | 2008-06-06 09:43:15 -0400 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2008-06-06 09:43:15 -0400 |
commit | 3051b961cac30f9bf81ac72b816ddb5e8e3c2ee9 (patch) | |
tree | 85a5b1cb376b067eee5cf668d42deff78a870627 /grid | |
parent | 877b72efcdd55f9fc9b271c707d4b489e551793d (diff) | |
download | mongo-3051b961cac30f9bf81ac72b816ddb5e8e3c2ee9.tar.gz |
dos2unix
Diffstat (limited to 'grid')
-rw-r--r-- | grid/message.cpp | 392 | ||||
-rw-r--r-- | grid/message.h | 258 | ||||
-rw-r--r-- | grid/protocol.h | 522 | ||||
-rw-r--r-- | grid/protoimpl.h | 484 | ||||
-rw-r--r-- | grid/protorecv.cpp | 772 | ||||
-rw-r--r-- | grid/protosend.cpp | 376 |
6 files changed, 1402 insertions, 1402 deletions
diff --git a/grid/message.cpp b/grid/message.cpp index 0a10667e49f..caf2cd7e8a8 100644 --- a/grid/message.cpp +++ b/grid/message.cpp @@ -1,196 +1,196 @@ -/* message
-
- todo: authenticate; encrypt?
-*/
-
-#include "stdafx.h"
-#include "message.h"
-#include <time.h>
-#include "../util/goodies.h"
-
-// if you want trace output:
-#define mmm(x)
-
-/* listener ------------------------------------------------------------------- */
-
-void Listener::listen() {
- SockAddr me(port);
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if( sock == INVALID_SOCKET ) {
- cout << "ERROR: listen(): invalid socket? " << errno << endl;
- return;
- }
- prebindOptions( sock );
- if( bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
- cout << "listen(): bind() failed errno:" << errno << endl;
- if( errno == 98 )
- cout << "98 == addr already in use" << endl;
- closesocket(sock);
- return;
- }
-
- if( ::listen(sock, 128) != 0 ) {
- cout << "listen(): listen() failed " << errno << endl;
- closesocket(sock);
- return;
- }
-
- SockAddr from;
- while( 1 ) {
- int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
- if( s < 0 ) {
- cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
- continue;
- }
- disableNagle(s);
- cout << "Listener: connection accepted from " << from.toString() << endl;
- accepted( new MessagingPort(s, from) );
- }
-}
-
-/* messagingport -------------------------------------------------------------- */
-
-MSGID NextMsgId;
-struct MsgStart {
- MsgStart() {
- NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
- assert(MsgDataHeaderSize == 16);
- }
-} msgstart;
-
-MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { }
-
-MessagingPort::MessagingPort() {
- sock = -1;
-}
-
-void MessagingPort::shutdown() {
- if( sock >= 0 ) {
- closesocket(sock);
- sock = -1;
- }
-}
-
-MessagingPort::~MessagingPort() {
- shutdown();
-}
-
-bool MessagingPort::connect(SockAddr& _far)
-{
- farEnd = _far;
-
- sock = socket(AF_INET, SOCK_STREAM, 0);
- if( sock == INVALID_SOCKET ) {
- cout << "ERROR: connect(): invalid socket? " << errno << endl;
- return false;
- }
- if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
- cout << "ERROR: connect(): connect() failed" << errno << endl;
- closesocket(sock); sock = -1;
- return false;
- }
- disableNagle(sock);
- return true;
-}
-
-bool MessagingPort::recv(Message& m) {
- mmm( cout << "* recv() sock:" << this->sock << endl; )
- int len = -1;
-
- char *lenbuf = (char *) &len;
- int lft = 4;
- while( 1 ) {
- int x = ::recv(sock, lenbuf, lft, 0);
- if( x == 0 ) {
- cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if( x < 0 ) {
- cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString()<<endl;
- m.reset();
- return false;
- }
- lft -= x;
- if( lft == 0 )
- break;
- lenbuf += x;
- cout << "MessagingPort::recv(): got " << x << " bytes wanted 4, lft=" << lft << endl;
- assert( lft > 0 );
- }
-
-// assert( x == 4 );
-
- assert( len >= 0 && len <= 16000000 );
-
- int z = (len+1023)&0xfffffc00; assert(z>=len);
- MsgData *md = (MsgData *) malloc(z);
- md->len = len;
-
- if ( len <= 0 ){
- cout << "got a length of 0, something is wrong" << endl;
- return false;
- }
-
- char *p = (char *) &md->id;
- int left = len -4;
- while( 1 ) {
- int x = ::recv(sock, p, left, 0);
- if( x == 0 ) {
- cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if( x < 0 ) {
- cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- left -= x;
- p += x;
- if( left <= 0 )
- break;
- }
-
- m.setData(md, true);
- return true;
-}
-
-void MessagingPort::reply(Message& received, Message& response) {
- say(received.from, response, received.data->id);
-}
-
-bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) {
- mmm( cout << "*call()" << endl; )
- MSGID old = toSend.data->id;
- say(to, toSend);
- while( 1 ) {
- bool ok = recv(response);
- if( !ok )
- return false;
- //cout << "got response: " << response.data->responseTo << endl;
- if( response.data->responseTo == toSend.data->id )
- break;
- cout << "********************" << endl;
- cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl;
- cout << " old:" << old << endl;
- cout << " response msgid:" << response.data->id << endl;
- cout << " response len: " << response.data->len << endl;
- assert(false);
- response.reset();
- }
- mmm( cout << "*call() end" << endl; )
- return true;
-}
-
-void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
- mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
- MSGID msgid = NextMsgId;
- ++NextMsgId;
- toSend.data->id = msgid;
- toSend.data->responseTo = responseTo;
- int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0);
- if( x <= 0 ) {
- cout << "MessagingPort::say: send() error " << errno << ' ' << farEnd.toString() << endl;
- }
-}
+/* message + + todo: authenticate; encrypt? +*/ + +#include "stdafx.h" +#include "message.h" +#include <time.h> +#include "../util/goodies.h" + +// if you want trace output: +#define mmm(x) + +/* listener ------------------------------------------------------------------- */ + +void Listener::listen() { + SockAddr me(port); + int sock = socket(AF_INET, SOCK_STREAM, 0); + if( sock == INVALID_SOCKET ) { + cout << "ERROR: listen(): invalid socket? " << errno << endl; + return; + } + prebindOptions( sock ); + if( bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { + cout << "listen(): bind() failed errno:" << errno << endl; + if( errno == 98 ) + cout << "98 == addr already in use" << endl; + closesocket(sock); + return; + } + + if( ::listen(sock, 128) != 0 ) { + cout << "listen(): listen() failed " << errno << endl; + closesocket(sock); + return; + } + + SockAddr from; + while( 1 ) { + int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); + if( s < 0 ) { + cout << "Listener: accept() returns " << s << " errno:" << errno << endl; + continue; + } + disableNagle(s); + cout << "Listener: connection accepted from " << from.toString() << endl; + accepted( new MessagingPort(s, from) ); + } +} + +/* messagingport -------------------------------------------------------------- */ + +MSGID NextMsgId; +struct MsgStart { + MsgStart() { + NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); + assert(MsgDataHeaderSize == 16); + } +} msgstart; + +MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { } + +MessagingPort::MessagingPort() { + sock = -1; +} + +void MessagingPort::shutdown() { + if( sock >= 0 ) { + closesocket(sock); + sock = -1; + } +} + +MessagingPort::~MessagingPort() { + shutdown(); +} + +bool MessagingPort::connect(SockAddr& _far) +{ + farEnd = _far; + + sock = socket(AF_INET, SOCK_STREAM, 0); + if( sock == INVALID_SOCKET ) { + cout << "ERROR: connect(): invalid socket? " << errno << endl; + return false; + } + if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) { + cout << "ERROR: connect(): connect() failed" << errno << endl; + closesocket(sock); sock = -1; + return false; + } + disableNagle(sock); + return true; +} + +bool MessagingPort::recv(Message& m) { + mmm( cout << "* recv() sock:" << this->sock << endl; ) + int len = -1; + + char *lenbuf = (char *) &len; + int lft = 4; + while( 1 ) { + int x = ::recv(sock, lenbuf, lft, 0); + if( x == 0 ) { + cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; + m.reset(); + return false; + } + if( x < 0 ) { + cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString()<<endl; + m.reset(); + return false; + } + lft -= x; + if( lft == 0 ) + break; + lenbuf += x; + cout << "MessagingPort::recv(): got " << x << " bytes wanted 4, lft=" << lft << endl; + assert( lft > 0 ); + } + +// assert( x == 4 ); + + assert( len >= 0 && len <= 16000000 ); + + int z = (len+1023)&0xfffffc00; assert(z>=len); + MsgData *md = (MsgData *) malloc(z); + md->len = len; + + if ( len <= 0 ){ + cout << "got a length of 0, something is wrong" << endl; + return false; + } + + char *p = (char *) &md->id; + int left = len -4; + while( 1 ) { + int x = ::recv(sock, p, left, 0); + if( x == 0 ) { + cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; + m.reset(); + return false; + } + if( x < 0 ) { + cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString() << endl; + m.reset(); + return false; + } + left -= x; + p += x; + if( left <= 0 ) + break; + } + + m.setData(md, true); + return true; +} + +void MessagingPort::reply(Message& received, Message& response) { + say(received.from, response, received.data->id); +} + +bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) { + mmm( cout << "*call()" << endl; ) + MSGID old = toSend.data->id; + say(to, toSend); + while( 1 ) { + bool ok = recv(response); + if( !ok ) + return false; + //cout << "got response: " << response.data->responseTo << endl; + if( response.data->responseTo == toSend.data->id ) + break; + cout << "********************" << endl; + cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl; + cout << " old:" << old << endl; + cout << " response msgid:" << response.data->id << endl; + cout << " response len: " << response.data->len << endl; + assert(false); + response.reset(); + } + mmm( cout << "*call() end" << endl; ) + return true; +} + +void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) { + mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) + MSGID msgid = NextMsgId; + ++NextMsgId; + toSend.data->id = msgid; + toSend.data->responseTo = responseTo; + int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0); + if( x <= 0 ) { + cout << "MessagingPort::say: send() error " << errno << ' ' << farEnd.toString() << endl; + } +} diff --git a/grid/message.h b/grid/message.h index 1395fa51e0a..604e9a67343 100644 --- a/grid/message.h +++ b/grid/message.h @@ -1,129 +1,129 @@ -// message.h
-
-#pragma once
-
-#include "../util/sock.h"
-
-class Message;
-class MessagingPort;
-typedef WrappingInt MSGID;
-const int DBPort = 27017;
-
-class Listener {
-public:
- Listener(int p) : port(p) { }
- void listen(); // never returns (start a thread)
-
- /* spawn a thread, etc., then return */
- virtual void accepted(MessagingPort *mp) = 0;
-private:
- int port;
-};
-
-class AbstractMessagingPort {
-public:
- virtual void reply(Message& received, Message& response) = 0;
-};
-
-class MessagingPort : public AbstractMessagingPort {
-public:
- MessagingPort(int sock, SockAddr& farEnd);
- MessagingPort();
- ~MessagingPort();
-
- void shutdown();
-
- bool connect(SockAddr& farEnd);
-
- /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
- also, the Message data will go out of scope on the subsequent recv call.
- */
- bool recv(Message& m);
- void reply(Message& received, Message& response);
- bool call(SockAddr& to, Message& toSend, Message& response);
- void say(SockAddr& to, Message& toSend, int responseTo = -1);
-
-private:
- int sock;
- SockAddr farEnd;
-};
-
-#pragma pack(push)
-#pragma pack(1)
-
-enum Operations {
- opReply = 1, /* reply. responseTo is set. */
-
- dbMsg = 1000, /* generic msg command followed by a string */
-
- dbUpdate = 2001, /* update object */
- dbInsert = 2002,
-// dbGetByOID = 2003,
- dbQuery = 2004,
- dbGetMore = 2005,
- dbDelete = 2006,
- dbKillCursors = 2007
-};
-
-struct MsgData {
- int len; /* len of the msg, including this field */
- MSGID id; /* request/reply id's match... */
- int responseTo; /* id of the message we are responding to */
- int operation;
- char _data[4];
-
- int dataLen();
-};
-const int MsgDataHeaderSize = sizeof(MsgData) - 4;
-inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
-
-#pragma pack(pop)
-
-class Message {
-public:
- Message() { data = 0; freeIt = false; }
- Message( void * _data , bool _freeIt ){ data = (MsgData*)_data; freeIt = _freeIt; };
- ~Message() { reset(); }
-
- SockAddr from;
- MsgData *data;
-
- Message& operator=(Message& r) {
- assert( data == 0 );
- data = r.data;
- assert( r.freeIt );
- r.freeIt = false;
- r.data = 0;
- freeIt = true;
- return *this;
- }
-
- void reset() {
- if( freeIt && data )
- free(data);
- data = 0; freeIt = false;
- }
-
- void setData(MsgData *d, bool _freeIt) {
- assert( data == 0 );
- freeIt = _freeIt;
- data = d;
- }
- void setData(int operation, const char *msgtxt) {
- setData(operation, msgtxt, strlen(msgtxt)+1);
- }
- void setData(int operation, const char *msgdata, int len) {
- assert(data == 0);
- int dataLen = len + sizeof(MsgData) - 4;
- MsgData *d = (MsgData *) malloc(dataLen);
- memcpy(d->_data, msgdata, len);
- d->len = dataLen;
- d->operation = operation;
- freeIt= true;
- data = d;
- }
-
-private:
- bool freeIt;
-};
-
+// message.h + +#pragma once + +#include "../util/sock.h" + +class Message; +class MessagingPort; +typedef WrappingInt MSGID; +const int DBPort = 27017; + +class Listener { +public: + Listener(int p) : port(p) { } + void listen(); // never returns (start a thread) + + /* spawn a thread, etc., then return */ + virtual void accepted(MessagingPort *mp) = 0; +private: + int port; +}; + +class AbstractMessagingPort { +public: + virtual void reply(Message& received, Message& response) = 0; +}; + +class MessagingPort : public AbstractMessagingPort { +public: + MessagingPort(int sock, SockAddr& farEnd); + MessagingPort(); + ~MessagingPort(); + + void shutdown(); + + bool connect(SockAddr& farEnd); + + /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. + also, the Message data will go out of scope on the subsequent recv call. + */ + bool recv(Message& m); + void reply(Message& received, Message& response); + bool call(SockAddr& to, Message& toSend, Message& response); + void say(SockAddr& to, Message& toSend, int responseTo = -1); + +private: + int sock; + SockAddr farEnd; +}; + +#pragma pack(push) +#pragma pack(1) + +enum Operations { + opReply = 1, /* reply. responseTo is set. */ + + dbMsg = 1000, /* generic msg command followed by a string */ + + dbUpdate = 2001, /* update object */ + dbInsert = 2002, +// dbGetByOID = 2003, + dbQuery = 2004, + dbGetMore = 2005, + dbDelete = 2006, + dbKillCursors = 2007 +}; + +struct MsgData { + int len; /* len of the msg, including this field */ + MSGID id; /* request/reply id's match... */ + int responseTo; /* id of the message we are responding to */ + int operation; + char _data[4]; + + int dataLen(); +}; +const int MsgDataHeaderSize = sizeof(MsgData) - 4; +inline int MsgData::dataLen() { return len - MsgDataHeaderSize; } + +#pragma pack(pop) + +class Message { +public: + Message() { data = 0; freeIt = false; } + Message( void * _data , bool _freeIt ){ data = (MsgData*)_data; freeIt = _freeIt; }; + ~Message() { reset(); } + + SockAddr from; + MsgData *data; + + Message& operator=(Message& r) { + assert( data == 0 ); + data = r.data; + assert( r.freeIt ); + r.freeIt = false; + r.data = 0; + freeIt = true; + return *this; + } + + void reset() { + if( freeIt && data ) + free(data); + data = 0; freeIt = false; + } + + void setData(MsgData *d, bool _freeIt) { + assert( data == 0 ); + freeIt = _freeIt; + data = d; + } + void setData(int operation, const char *msgtxt) { + setData(operation, msgtxt, strlen(msgtxt)+1); + } + void setData(int operation, const char *msgdata, int len) { + assert(data == 0); + int dataLen = len + sizeof(MsgData) - 4; + MsgData *d = (MsgData *) malloc(dataLen); + memcpy(d->_data, msgdata, len); + d->len = dataLen; + d->operation = operation; + freeIt= true; + data = d; + } + +private: + bool freeIt; +}; + diff --git a/grid/protocol.h b/grid/protocol.h index f98c6cde1f4..e7d0298ec9b 100644 --- a/grid/protocol.h +++ b/grid/protocol.h @@ -1,261 +1,261 @@ -// protocol.h
-
-#pragma once
-
-NOT USED
-
-#include "boost/thread/mutex.hpp"
-#include "boost/thread/condition.hpp"
-#include "../util/sock.h"
-#include "../util/goodies.h"
-
-typedef WrappingInt MSGID;
-
-struct Fragment;
-
-#if 0
-#define ptrace(x)
-#else
-#define ptrace(x) { cout << curTimeMillis() % 10000; x }
-#endif
-
-#if 1
-#define etrace(x)
-#else
-#define etrace(x) { cout << curTimeMillis() % 10000; x }
-#endif
-
-class F; // fragment
-class MR; // message. R=receiver side.
-class CR; // connection receiver side
-class MS; // message S=sender side.
-class CS; // connection sender side
-class ProtocolConnection; // connection overall
-
-/* ip:port:channel
- We have one receiver thread per process (per ip address destination), and then
- multiplex messages out to multiple connections(generally one per thread) which
- each have a 'channel'.
-*/
-class EndPoint {
-public:
- EndPoint() : channel(0) { }
- int channel;
- SockAddr sa;
- bool operator<(const EndPoint& r) const {
- if( channel != r.channel )
- return channel < r.channel;
- return sa < r.sa;
- }
- string toString() {
- stringstream out;
- out << sa.toString() << ':';
- if( channel == -2 ) out << "ANYCHANNEL";
- else if( channel == -1 ) out << "AUTOASSIGNCHANNEL";
- else out << channel;
- return out.str();
- }
-};
-
-/* the double underscore stuff here is the actual implementation glue.
- wanted to keep that stuff clean and separate. so put your implementation
- of these in protoimpl.h.
-*/
-
-void __sendRESET(ProtocolConnection *pc);
-
-// sender ->
-void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *, bool retran=false); // transmit a fragment
-void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid, int fragNo); // transmit the REQUEST ACK msg
-
-// receiver ->
-void __sendACK(ProtocolConnection *pc, MSGID msgid); // transmit ACK
-void __sendMISSING(ProtocolConnection *pc, EndPoint& to, MSGID msgid, vector<short>& ids); // transmit MISSING
-
-// -> receiver
-F* __recv(UDPConnection& c, SockAddr& from); /* recv from socket a fragment and pass back */
-
-class F {
-public:
- F(Fragment *f);
- ~F();
- int __num(); //frag #
- int __len();
- MSGID __msgid();
- int __channel();
- bool __isREQUESTACK(); // if true, this is just a request for acknowledgement not real data
- int __firstFragMsgLen(); // only works on first fragment
-
- // sender side:
- bool __isACK(); // if true, this is an ack of a message
- bool __isMISSING(); // if true, list of frags to retransmit
- short* __getMissing(int& n); // get the missing fragno list
-
- Fragment *internals;
- enum { NORMAL, ACK, MISSING, REQUESTACK, RESET } op;
-};
-
-class MR {
-public:
- MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from);
- ~MR() { freeFrags(); }
- void freeFrags() {
- for( unsigned i = 0; i < f.size(); i++ )
- delete f[i];
- }
- bool got(F *f, EndPoint& from); // received a fragment
- bool gotFirst() { return f[0] != 0; }
- ProtocolConnection& pc;
- void removeFromReceivingList();
- bool complete();
- const MSGID msgid;
- int n() { return f.size(); }
-public:
- int messageLenExpected;
- int nExpected, nReceived;
- void reportMissings(bool reportAll);
- vector<F*> f;
- vector<unsigned> reportTimes;
- EndPoint from;
-};
-
-/* this is for knowing what is already received. we might get dup packets later and need
- to ignore them. */
-class MsgTracker {
-public:
- std::list<MSGID> recentlyReceivedList;
- std::set<MSGID> recentlyReceived;
- MSGID lastFullyReceived;
-
- void reset() {
- recentlyReceivedList.clear();
- recentlyReceived.clear();
- lastFullyReceived = 0;
- }
-
- void got(MSGID m) {
- unsigned sz = recentlyReceived.size();
- if( sz > 256 ) {
- recentlyReceived.erase(recentlyReceivedList.front());
- recentlyReceivedList.pop_front();
- }
- recentlyReceivedList.push_back(m);
- recentlyReceived.insert(m);
- if( m > lastFullyReceived || sz == 0 )
- lastFullyReceived = m;
- }
-};
-
-class CR {
- friend class MR;
-public:
- ~CR() { ptrace( cout << ".warning: ~CR() not implemented" << endl; ) }
- CR(ProtocolConnection& _pc) : pc(_pc) { }
- MR* recv();
-public:
- MR* getPendingMsg(F *fr);
- bool oldMessageId(int channel, MSGID m);
- void queueReceived(MR*);
-
- ProtocolConnection& pc;
- boost::condition receivedSome;
- vector<MR*> received; /* ready to dequeue and use */
- map<int,MR*> pendingMessages; /* partly received msgs */
- MsgTracker oldMsgTracker;
-};
-
-/* -- sender side ------------------------------------------------*/
-
-class CS {
-public:
- CS(ProtocolConnection& _pc);
-
- ProtocolConnection& pc;
- vector<MS*> pendingSend;
- boost::condition msgSent;
- void resetIt();
-
- double delayMax;
- double delay;
- void delayGotMissing() {
- double delayOld = delay;
- if( delay == 0 )
- delay = 2.0;
- else
- delay = delay * 1.25;
- if( delay > delayMax ) delay = delayMax;
- if( delay != delayOld )
- cout << ".DELAY INCREASED TO " << delay << endl;
- }
- void delaySentMsg() {
- if( delay != 0.0 ) {
- delay = delay * 0.5;
- if( delay<0.5 ) delay = 0;
- cout << ".DELAY DECREASED TO " << delay << endl;
- }
- }
-};
-
-typedef map<EndPoint,ProtocolConnection*> EndPointToPC;
-extern EndPointToPC pcMap; /* the *far* endpoint -> pc */
-
-/* -- overall Connection object ----------------------------------*/
-
-#pragma warning( disable: 4355 )
-
-class ProtocolConnection {
-public:
- string toString();
-
- ProtocolConnection(ProtocolConnection& par, EndPoint& to);
- ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd);
- ~ProtocolConnection();
-
- void shutdown();
- bool acceptAnyChannel() const;
- UDPConnection& udpConnection;
- /* note the channel for myEnd might be "any" for the any pc -
- so you can't use that channel for sending. Use MS/MR
- for that.
- */
- EndPoint myEnd;
- EndPoint farEnd;
-
- /* if this was instantiated automatically for an acceptAnyChannel(),
- keep a ptr back to it and queue received msgs there.
- */
- ProtocolConnection *parent;
-
- CR cr;
- CS cs;
- bool first; // true if yet to send first message on this conn
-
-private:
- void init();
-};
-
-/* -- sender side ------------------------------------------------*/
-
-class MS {
-public:
- MS(ProtocolConnection *_pc, EndPoint &_to, MSGID _msgid) :
- pc(_pc), to(_to), msgid(_msgid), complainInterval(50) { }
- ~MS() {
- for( unsigned i = 0; i < fragments.size(); i++ )
- delete fragments[i];
- }
-
- /* init fragments, then call this */
- void send();
-
- vector<F*> fragments;
-
- /* request retrainsmissions. */
- bool complain(unsigned now);
-
- ProtocolConnection* pc;
- EndPoint to;
- const MSGID msgid;
- unsigned lastComplainTime;
- unsigned complainInterval;
-};
+// protocol.h + +#pragma once + +NOT USED + +#include "boost/thread/mutex.hpp" +#include "boost/thread/condition.hpp" +#include "../util/sock.h" +#include "../util/goodies.h" + +typedef WrappingInt MSGID; + +struct Fragment; + +#if 0 +#define ptrace(x) +#else +#define ptrace(x) { cout << curTimeMillis() % 10000; x } +#endif + +#if 1 +#define etrace(x) +#else +#define etrace(x) { cout << curTimeMillis() % 10000; x } +#endif + +class F; // fragment +class MR; // message. R=receiver side. +class CR; // connection receiver side +class MS; // message S=sender side. +class CS; // connection sender side +class ProtocolConnection; // connection overall + +/* ip:port:channel + We have one receiver thread per process (per ip address destination), and then + multiplex messages out to multiple connections(generally one per thread) which + each have a 'channel'. +*/ +class EndPoint { +public: + EndPoint() : channel(0) { } + int channel; + SockAddr sa; + bool operator<(const EndPoint& r) const { + if( channel != r.channel ) + return channel < r.channel; + return sa < r.sa; + } + string toString() { + stringstream out; + out << sa.toString() << ':'; + if( channel == -2 ) out << "ANYCHANNEL"; + else if( channel == -1 ) out << "AUTOASSIGNCHANNEL"; + else out << channel; + return out.str(); + } +}; + +/* the double underscore stuff here is the actual implementation glue. + wanted to keep that stuff clean and separate. so put your implementation + of these in protoimpl.h. +*/ + +void __sendRESET(ProtocolConnection *pc); + +// sender -> +void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *, bool retran=false); // transmit a fragment +void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid, int fragNo); // transmit the REQUEST ACK msg + +// receiver -> +void __sendACK(ProtocolConnection *pc, MSGID msgid); // transmit ACK +void __sendMISSING(ProtocolConnection *pc, EndPoint& to, MSGID msgid, vector<short>& ids); // transmit MISSING + +// -> receiver +F* __recv(UDPConnection& c, SockAddr& from); /* recv from socket a fragment and pass back */ + +class F { +public: + F(Fragment *f); + ~F(); + int __num(); //frag # + int __len(); + MSGID __msgid(); + int __channel(); + bool __isREQUESTACK(); // if true, this is just a request for acknowledgement not real data + int __firstFragMsgLen(); // only works on first fragment + + // sender side: + bool __isACK(); // if true, this is an ack of a message + bool __isMISSING(); // if true, list of frags to retransmit + short* __getMissing(int& n); // get the missing fragno list + + Fragment *internals; + enum { NORMAL, ACK, MISSING, REQUESTACK, RESET } op; +}; + +class MR { +public: + MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from); + ~MR() { freeFrags(); } + void freeFrags() { + for( unsigned i = 0; i < f.size(); i++ ) + delete f[i]; + } + bool got(F *f, EndPoint& from); // received a fragment + bool gotFirst() { return f[0] != 0; } + ProtocolConnection& pc; + void removeFromReceivingList(); + bool complete(); + const MSGID msgid; + int n() { return f.size(); } +public: + int messageLenExpected; + int nExpected, nReceived; + void reportMissings(bool reportAll); + vector<F*> f; + vector<unsigned> reportTimes; + EndPoint from; +}; + +/* this is for knowing what is already received. we might get dup packets later and need + to ignore them. */ +class MsgTracker { +public: + std::list<MSGID> recentlyReceivedList; + std::set<MSGID> recentlyReceived; + MSGID lastFullyReceived; + + void reset() { + recentlyReceivedList.clear(); + recentlyReceived.clear(); + lastFullyReceived = 0; + } + + void got(MSGID m) { + unsigned sz = recentlyReceived.size(); + if( sz > 256 ) { + recentlyReceived.erase(recentlyReceivedList.front()); + recentlyReceivedList.pop_front(); + } + recentlyReceivedList.push_back(m); + recentlyReceived.insert(m); + if( m > lastFullyReceived || sz == 0 ) + lastFullyReceived = m; + } +}; + +class CR { + friend class MR; +public: + ~CR() { ptrace( cout << ".warning: ~CR() not implemented" << endl; ) } + CR(ProtocolConnection& _pc) : pc(_pc) { } + MR* recv(); +public: + MR* getPendingMsg(F *fr); + bool oldMessageId(int channel, MSGID m); + void queueReceived(MR*); + + ProtocolConnection& pc; + boost::condition receivedSome; + vector<MR*> received; /* ready to dequeue and use */ + map<int,MR*> pendingMessages; /* partly received msgs */ + MsgTracker oldMsgTracker; +}; + +/* -- sender side ------------------------------------------------*/ + +class CS { +public: + CS(ProtocolConnection& _pc); + + ProtocolConnection& pc; + vector<MS*> pendingSend; + boost::condition msgSent; + void resetIt(); + + double delayMax; + double delay; + void delayGotMissing() { + double delayOld = delay; + if( delay == 0 ) + delay = 2.0; + else + delay = delay * 1.25; + if( delay > delayMax ) delay = delayMax; + if( delay != delayOld ) + cout << ".DELAY INCREASED TO " << delay << endl; + } + void delaySentMsg() { + if( delay != 0.0 ) { + delay = delay * 0.5; + if( delay<0.5 ) delay = 0; + cout << ".DELAY DECREASED TO " << delay << endl; + } + } +}; + +typedef map<EndPoint,ProtocolConnection*> EndPointToPC; +extern EndPointToPC pcMap; /* the *far* endpoint -> pc */ + +/* -- overall Connection object ----------------------------------*/ + +#pragma warning( disable: 4355 ) + +class ProtocolConnection { +public: + string toString(); + + ProtocolConnection(ProtocolConnection& par, EndPoint& to); + ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd); + ~ProtocolConnection(); + + void shutdown(); + bool acceptAnyChannel() const; + UDPConnection& udpConnection; + /* note the channel for myEnd might be "any" for the any pc - + so you can't use that channel for sending. Use MS/MR + for that. + */ + EndPoint myEnd; + EndPoint farEnd; + + /* if this was instantiated automatically for an acceptAnyChannel(), + keep a ptr back to it and queue received msgs there. + */ + ProtocolConnection *parent; + + CR cr; + CS cs; + bool first; // true if yet to send first message on this conn + +private: + void init(); +}; + +/* -- sender side ------------------------------------------------*/ + +class MS { +public: + MS(ProtocolConnection *_pc, EndPoint &_to, MSGID _msgid) : + pc(_pc), to(_to), msgid(_msgid), complainInterval(50) { } + ~MS() { + for( unsigned i = 0; i < fragments.size(); i++ ) + delete fragments[i]; + } + + /* init fragments, then call this */ + void send(); + + vector<F*> fragments; + + /* request retrainsmissions. */ + bool complain(unsigned now); + + ProtocolConnection* pc; + EndPoint to; + const MSGID msgid; + unsigned lastComplainTime; + unsigned complainInterval; +}; diff --git a/grid/protoimpl.h b/grid/protoimpl.h index 5ad19a47250..9ca5277e527 100644 --- a/grid/protoimpl.h +++ b/grid/protoimpl.h @@ -1,242 +1,242 @@ -// protoimpl.h
-
-#pragma once
-
-/* packet dumping level of detail. */
-const bool dumpPackets = false; // this must be true to get anything at all
-const bool dumpIP = false; // output the ip address
-const bool dumpBytesDetailed = false; // more data output
-
-#include "message.h"
-
-extern boost::mutex coutmutex;
-
-const int FragMax = 1480;
-const int FragHeader = 10;
-const int MSS = FragMax - FragHeader;
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct Fragment {
- enum { MinFragmentLen = FragHeader + 1 };
- MSGID msgId;
- short channel;
- short fragmentLen;
- short fragmentNo;
- char data[16];
- int fragmentDataLen() { return fragmentLen - FragHeader; }
- char* fragmentData() { return data; }
-
- bool ok(int nRead) {
- if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
- ptrace( cout << ".recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl; )
- return false;
- }
- if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) {
- ptrace( cout << ".recv: bad first fragment. fragmentLen:" << fragmentLen << endl; )
- return false;
- }
- return true;
- }
-
- MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; }
-};
-#pragma pack(pop)
-
-inline void DUMP(Fragment& f, SockAddr& t, const char *tabs) {
- if( !dumpPackets )
- return;
- cout << tabs << curTimeMillis() % 10000 << ' ';
- short s = f.fragmentNo;
- if( s == -32768 )
- cout << "ACK M:" << f.msgId % 1000;
- else if( s == -32767 )
- cout << "MISSING";
- else if( s == -32766 )
- cout << "RESET ch:" << f.channel;
- else if( s < 0 )
- cout << "REQUESTACK";
- else
- cout << '#' << s << ' ' << f.fragmentLen << " M:" << f.msgId % 1000;
- cout << ' ';
- if( dumpIP )
- cout << t.toString();
-}
-
-inline void DUMPDATA(Fragment& f, const char *tabs) {
- if( !dumpPackets )
- return;
- if( f.fragmentNo >= 0 ) {
- cout << '\n' << tabs;
- int x = f.fragmentDataLen();
- if( dumpBytesDetailed ) {
- char *p = (char *) &f;
- cout << hex << *((unsigned*)p) << ' '; p+=4;
- cout << *((short*)p) << ' '; p+=2;
- cout << *((short*)p) << ' '; p+=2;
- cout << *((short*)p) << '|'; p+=2;
- if( x < 16 ) cout << "???";
- else {
- for( int i = 0; i < 4; i++ ) { // MSGDATA
- cout << *((unsigned*)p);
- cout << (i < 3 ? ' ' : '|');
- p += 4;
- }
- cout << '\n' << tabs;
- x -= 16;
- if( x > 32 ) x = 32;
- while( x-- > 0 ) { cout << (unsigned) (unsigned char) *p++ << ' '; }
- }
- }
- else {
- char *p = f.data;
- if( f.fragmentNo == 0 ) {
- p += 16; x -= 16;
- }
- if( x > 28 ) x = 28;
- for( int i = 0; i < x; i++ ) {
- if( *p == 0 ) cout << (char) 0xb0;
- else cout << (*p >= 32 ? *p : '.');
- p++;
- }
- }
- }
- cout << dec << endl;
-}
-
-inline void SEND(UDPConnection& c, Fragment &f, SockAddr& to, const char *extra="") {
- lock lk(coutmutex);
- DUMP(f, to, "\t\t\t\t\t>");
- c.sendto((char *) &f, f.fragmentLen, to);
- if( dumpPackets )
- cout << extra;
- DUMPDATA(f, "\t\t\t\t\t ");
-}
-
-// sender ->
-inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f, bool retran) {
- assert( f->internals->channel == to.channel );
- ptrace( cout << ".sendfrag " << f->__num() << ' ' << retran << endl; )
- SEND(pc->udpConnection, *f->internals, to.sa, retran ? " retran" : "");
-}
-
-inline void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to,
- MSGID msgid, int fragNo) {
- Fragment f;
- f.msgId = msgid;
- f.channel = to.channel; assert( f.channel >= 0 );
- f.fragmentNo = ((short) -fragNo) -1;
- f.fragmentLen = FragHeader;
- ptrace( cout << ".requesting ack, fragno=" << f.fragmentNo << " msg:" << f.msgId << ' ' << to.toString() << endl; )
- SEND(pc->udpConnection, f, to.sa);
-}
-
-// receiver ->
-inline void __sendACK(ProtocolConnection *pc, MSGID msgid) {
- ptrace( cout << "...__sendACK() to:" << pc->farEnd.toString() << " msg:" << msgid << endl; )
- Fragment f;
- f.msgId = msgid;
- f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
- f.fragmentNo = -32768;
- f.fragmentLen = FragHeader;
- SEND(pc->udpConnection, f, pc->farEnd.sa);
-}
-
-/* this is to clear old state for the channel in terms of what msgids are
- already sent.
-*/
-inline void __sendRESET(ProtocolConnection *pc) {
- Fragment f;
- f.msgId = -1;
- f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
- f.fragmentNo = -32766;
- f.fragmentLen = FragHeader;
- ptrace( cout << "...__sendRESET() to:" << pc->farEnd.toString() << endl; )
- SEND(pc->udpConnection, f, pc->farEnd.sa);
-}
-
-inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
- MSGID msgid, vector<short>& ids) {
- int n = ids.size();
- ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << " last:" << ids[ids.size()-1] << to.toString() << endl; )
- if( n > 256 ) {
- ptrace( cout << "\t..sendMISSING limiting to 256 ids" << endl; )
- n = 256;
- }
- Fragment *f = (Fragment*) malloc(FragHeader + n*2);
- f->msgId = msgid;
- f->channel = to.channel; assert( f->channel >= 0 );
- f->fragmentNo = -32767;
- f->fragmentLen = FragHeader + n*2;
- short *s = (short *) f->data;
- for( int i = 0; i < n; i++ )
- *s++ = ids[i];
-// ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; )
- SEND(pc->udpConnection, *f, to.sa);
- free(f);
-}
-
-// -> receiver
-inline F* __recv(UDPConnection& c, SockAddr& from) {
- Fragment *f = (Fragment *) malloc(MaxMTU);
- int n;
- while( 1 ) {
-// n = c.recvfrom((char*) f, c.mtu(), from);
- n = c.recvfrom((char*) f, MaxMTU, from);
-// cout << "recvfrom returned " << n << endl;
- if( n >= 0 )
- break;
- if( !goingAway ) {
- cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
- cout << "sleeping 2 seconds " << endl;
- sleepsecs(2);
- }
- }
- assert( f->fragmentLen == n );
- if( f->fragmentNo > 0 ) {
- // don't waste tons of space if the maxmtu is 16k but we get 1480
- unsigned newsz = (f->fragmentLen + 255) & 0xffffff00;
- if( newsz < MaxMTU )
- f = (Fragment *) realloc(f, newsz);
- }
- {
- lock lk(coutmutex);
- DUMP(*f, from, "\t\t\t\t\t\t\t\t\t\t<");
- DUMPDATA(*f, "\t\t\t\t\t\t\t\t\t\t ");
- }
- return new F(f);
-}
-
-inline F::F(Fragment *f) : internals(f), op(NORMAL) {
- if( internals->fragmentNo < 0 ) {
- if( internals->fragmentNo == -32768 ) {
- op = ACK;
- ptrace( cout << ".got ACK msg:" << internals->msgId << endl; )
- } else if( internals->fragmentNo == -32767 ) {
- op = MISSING;
- ptrace( cout << ".got MISSING" << endl; )
- } else if( internals->fragmentNo == -32766 ) {
- op = RESET;
- } else {
- op = REQUESTACK;
- internals->fragmentNo = -(internals->fragmentNo+1);
- ptrace( cout << ".got REQUESTACK frag:" << internals->fragmentNo << " msg:" << internals->msgId << endl; )
- }
- }
-}
-inline F::~F() { free(internals); internals=0; }
-inline int F::__num() { return internals->fragmentNo; }
-inline int F::__len() { return internals->fragmentLen; }
-inline MSGID F::__msgid() { return internals->msgId; }
-inline int F::__channel() { return internals->channel; }
-inline bool F::__isREQUESTACK() { return op == REQUESTACK; }
-inline bool F::__isACK() { return op == ACK; }
-inline bool F::__isMISSING() { return op == MISSING; }
-inline short* F::__getMissing(int& n) {
- n = internals->fragmentDataLen() / 2;
- return (short *) internals->fragmentData();
-}
-inline int F::__firstFragMsgLen() {
- return internals->startOfMsgData()->len;
-}
+// protoimpl.h + +#pragma once + +/* packet dumping level of detail. */ +const bool dumpPackets = false; // this must be true to get anything at all +const bool dumpIP = false; // output the ip address +const bool dumpBytesDetailed = false; // more data output + +#include "message.h" + +extern boost::mutex coutmutex; + +const int FragMax = 1480; +const int FragHeader = 10; +const int MSS = FragMax - FragHeader; + +#pragma pack(push) +#pragma pack(1) + +struct Fragment { + enum { MinFragmentLen = FragHeader + 1 }; + MSGID msgId; + short channel; + short fragmentLen; + short fragmentNo; + char data[16]; + int fragmentDataLen() { return fragmentLen - FragHeader; } + char* fragmentData() { return data; } + + bool ok(int nRead) { + if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) { + ptrace( cout << ".recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl; ) + return false; + } + if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) { + ptrace( cout << ".recv: bad first fragment. fragmentLen:" << fragmentLen << endl; ) + return false; + } + return true; + } + + MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; } +}; +#pragma pack(pop) + +inline void DUMP(Fragment& f, SockAddr& t, const char *tabs) { + if( !dumpPackets ) + return; + cout << tabs << curTimeMillis() % 10000 << ' '; + short s = f.fragmentNo; + if( s == -32768 ) + cout << "ACK M:" << f.msgId % 1000; + else if( s == -32767 ) + cout << "MISSING"; + else if( s == -32766 ) + cout << "RESET ch:" << f.channel; + else if( s < 0 ) + cout << "REQUESTACK"; + else + cout << '#' << s << ' ' << f.fragmentLen << " M:" << f.msgId % 1000; + cout << ' '; + if( dumpIP ) + cout << t.toString(); +} + +inline void DUMPDATA(Fragment& f, const char *tabs) { + if( !dumpPackets ) + return; + if( f.fragmentNo >= 0 ) { + cout << '\n' << tabs; + int x = f.fragmentDataLen(); + if( dumpBytesDetailed ) { + char *p = (char *) &f; + cout << hex << *((unsigned*)p) << ' '; p+=4; + cout << *((short*)p) << ' '; p+=2; + cout << *((short*)p) << ' '; p+=2; + cout << *((short*)p) << '|'; p+=2; + if( x < 16 ) cout << "???"; + else { + for( int i = 0; i < 4; i++ ) { // MSGDATA + cout << *((unsigned*)p); + cout << (i < 3 ? ' ' : '|'); + p += 4; + } + cout << '\n' << tabs; + x -= 16; + if( x > 32 ) x = 32; + while( x-- > 0 ) { cout << (unsigned) (unsigned char) *p++ << ' '; } + } + } + else { + char *p = f.data; + if( f.fragmentNo == 0 ) { + p += 16; x -= 16; + } + if( x > 28 ) x = 28; + for( int i = 0; i < x; i++ ) { + if( *p == 0 ) cout << (char) 0xb0; + else cout << (*p >= 32 ? *p : '.'); + p++; + } + } + } + cout << dec << endl; +} + +inline void SEND(UDPConnection& c, Fragment &f, SockAddr& to, const char *extra="") { + lock lk(coutmutex); + DUMP(f, to, "\t\t\t\t\t>"); + c.sendto((char *) &f, f.fragmentLen, to); + if( dumpPackets ) + cout << extra; + DUMPDATA(f, "\t\t\t\t\t "); +} + +// sender -> +inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f, bool retran) { + assert( f->internals->channel == to.channel ); + ptrace( cout << ".sendfrag " << f->__num() << ' ' << retran << endl; ) + SEND(pc->udpConnection, *f->internals, to.sa, retran ? " retran" : ""); +} + +inline void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, + MSGID msgid, int fragNo) { + Fragment f; + f.msgId = msgid; + f.channel = to.channel; assert( f.channel >= 0 ); + f.fragmentNo = ((short) -fragNo) -1; + f.fragmentLen = FragHeader; + ptrace( cout << ".requesting ack, fragno=" << f.fragmentNo << " msg:" << f.msgId << ' ' << to.toString() << endl; ) + SEND(pc->udpConnection, f, to.sa); +} + +// receiver -> +inline void __sendACK(ProtocolConnection *pc, MSGID msgid) { + ptrace( cout << "...__sendACK() to:" << pc->farEnd.toString() << " msg:" << msgid << endl; ) + Fragment f; + f.msgId = msgid; + f.channel = pc->farEnd.channel; assert( f.channel >= 0 ); + f.fragmentNo = -32768; + f.fragmentLen = FragHeader; + SEND(pc->udpConnection, f, pc->farEnd.sa); +} + +/* this is to clear old state for the channel in terms of what msgids are + already sent. +*/ +inline void __sendRESET(ProtocolConnection *pc) { + Fragment f; + f.msgId = -1; + f.channel = pc->farEnd.channel; assert( f.channel >= 0 ); + f.fragmentNo = -32766; + f.fragmentLen = FragHeader; + ptrace( cout << "...__sendRESET() to:" << pc->farEnd.toString() << endl; ) + SEND(pc->udpConnection, f, pc->farEnd.sa); +} + +inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to, + MSGID msgid, vector<short>& ids) { + int n = ids.size(); + ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << " last:" << ids[ids.size()-1] << to.toString() << endl; ) + if( n > 256 ) { + ptrace( cout << "\t..sendMISSING limiting to 256 ids" << endl; ) + n = 256; + } + Fragment *f = (Fragment*) malloc(FragHeader + n*2); + f->msgId = msgid; + f->channel = to.channel; assert( f->channel >= 0 ); + f->fragmentNo = -32767; + f->fragmentLen = FragHeader + n*2; + short *s = (short *) f->data; + for( int i = 0; i < n; i++ ) + *s++ = ids[i]; +// ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; ) + SEND(pc->udpConnection, *f, to.sa); + free(f); +} + +// -> receiver +inline F* __recv(UDPConnection& c, SockAddr& from) { + Fragment *f = (Fragment *) malloc(MaxMTU); + int n; + while( 1 ) { +// n = c.recvfrom((char*) f, c.mtu(), from); + n = c.recvfrom((char*) f, MaxMTU, from); +// cout << "recvfrom returned " << n << endl; + if( n >= 0 ) + break; + if( !goingAway ) { + cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl; + cout << "sleeping 2 seconds " << endl; + sleepsecs(2); + } + } + assert( f->fragmentLen == n ); + if( f->fragmentNo > 0 ) { + // don't waste tons of space if the maxmtu is 16k but we get 1480 + unsigned newsz = (f->fragmentLen + 255) & 0xffffff00; + if( newsz < MaxMTU ) + f = (Fragment *) realloc(f, newsz); + } + { + lock lk(coutmutex); + DUMP(*f, from, "\t\t\t\t\t\t\t\t\t\t<"); + DUMPDATA(*f, "\t\t\t\t\t\t\t\t\t\t "); + } + return new F(f); +} + +inline F::F(Fragment *f) : internals(f), op(NORMAL) { + if( internals->fragmentNo < 0 ) { + if( internals->fragmentNo == -32768 ) { + op = ACK; + ptrace( cout << ".got ACK msg:" << internals->msgId << endl; ) + } else if( internals->fragmentNo == -32767 ) { + op = MISSING; + ptrace( cout << ".got MISSING" << endl; ) + } else if( internals->fragmentNo == -32766 ) { + op = RESET; + } else { + op = REQUESTACK; + internals->fragmentNo = -(internals->fragmentNo+1); + ptrace( cout << ".got REQUESTACK frag:" << internals->fragmentNo << " msg:" << internals->msgId << endl; ) + } + } +} +inline F::~F() { free(internals); internals=0; } +inline int F::__num() { return internals->fragmentNo; } +inline int F::__len() { return internals->fragmentLen; } +inline MSGID F::__msgid() { return internals->msgId; } +inline int F::__channel() { return internals->channel; } +inline bool F::__isREQUESTACK() { return op == REQUESTACK; } +inline bool F::__isACK() { return op == ACK; } +inline bool F::__isMISSING() { return op == MISSING; } +inline short* F::__getMissing(int& n) { + n = internals->fragmentDataLen() / 2; + return (short *) internals->fragmentData(); +} +inline int F::__firstFragMsgLen() { + return internals->startOfMsgData()->len; +} diff --git a/grid/protorecv.cpp b/grid/protorecv.cpp index 0639939ff08..49200d8ab56 100644 --- a/grid/protorecv.cpp +++ b/grid/protorecv.cpp @@ -1,386 +1,386 @@ -// protorecv.cpp
-
-#include "stdafx.h"
-#include "protocol.h"
-#include "boost/thread.hpp"
-#include "../util/goodies.h"
-#include "../util/sock.h"
-#include "protoimpl.h"
-#include "../db/introspect.h"
-
-boost::mutex biglock;
-boost::mutex coutmutex;
-boost::mutex threadStarterMutex;
-boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne
-ProtocolConnection *threadUseThisOne = 0;
-void receiverThread();
-
-map<SockAddr,ProtocolConnection*> firstPCForThisAddress;
-
-/* todo: eventually support multiple listeners on diff ports. not
- bothering yet.
-*/
-ProtocolConnection *any = 0;
-
-EndPointToPC pcMap;
-
-class GeneralInspector : public SingleResultObjCursor {
- Cursor* clone() { return new GeneralInspector(*this); }
- void fill() {
- b.append("version", "1.0.0.1");
- b.append("versionDesc", "none");
- b.append("nConnections", pcMap.size());
- }
-public:
- GeneralInspector() { reg("intr.general"); }
-} _geninspectorproto;
-
-#include <signal.h>
-
-/* our version of netstat */
-void sighandler(int x) {
- cout << "ProtocolConnections:" << endl;
- lock lk(biglock);
- EndPointToPC::iterator it = pcMap.begin();
- while( it != pcMap.end() ) {
- cout << " conn " << it->second->toString() << endl;
- it++;
- }
- cout << "any: ";
- if( any ) cout << any->toString();
- cout << "\ndone" << endl;
-}
-
-struct SetSignal {
- SetSignal() {
-#if !defined(_WIN32)
- signal(SIGUSR2, sighandler);
-#endif
- }
-} setSignal;
-
-string ProtocolConnection::toString() {
- stringstream out;
- out << myEnd.toString() << " <> " <<
- farEnd.toString() << " rcvd.size:" << cr.received.size() <<
- " pndngMsgs.size:" << cr.pendingMessages.size() <<
- " pndngSnd.size:" << cs.pendingSend.size();
- return out.str();
-}
-
-ProtocolConnection::~ProtocolConnection() {
- cout << ".warning: ~ProtocolConnection() not implemented (leaks mem etc)" << endl;
- if( any == this )
- any = 0;
-}
-
-void ProtocolConnection::shutdown() {
- ptrace( cout << ".shutdown()" << endl; )
- if( acceptAnyChannel() || first )
- return;
- ptrace( cout << ". to:" << to.toString() << endl; )
- __sendRESET(this);
-}
-
-inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint& _to) :
- udpConnection(par.udpConnection), myEnd(par.myEnd), cs(*this), cr(*this)
-{
- parent = ∥
- farEnd = _to;
- first = true;
- // todo: LOCK
-
- assert(pcMap.count(farEnd) == 0);
-// pcMap[myEnd] = this;
-}
-
-inline bool ProtocolConnection::acceptAnyChannel() const {
- return myEnd.channel == MessagingPort::ANYCHANNEL;
-}
-
-inline void ProtocolConnection::init() {
- first = true;
- lock lk(biglock);
- lock tslk(threadStarterMutex);
-
- if( acceptAnyChannel() ) {
- assert( any == 0 );
- any = this;
- }
- else {
- pcMap[farEnd] = this;
- }
-
- if( firstPCForThisAddress.count(myEnd.sa) == 0 ) {
- firstPCForThisAddress[myEnd.sa] = this;
- // need a receiver thread. one per port # we listen on. shared by channels.
- boost::thread receiver(receiverThread);
- threadUseThisOne = this;
- threadActivate.notify_one();
- return;
- }
-}
-
-ProtocolConnection::ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd) :
- udpConnection(c), myEnd(e), cs(*this), cr(*this)
-{
- parent = this;
- if( _farEnd ) {
- farEnd.channel = myEnd.channel;
- farEnd.sa = *_farEnd;
- }
- init();
-}
-
-/* find message for fragment */
-MR* CR::getPendingMsg(F *fr) {
- MR *m;
- map<int,MR*>::iterator i = pendingMessages.find(fr->__msgid());
- if( i == pendingMessages.end() ) {
- if( pendingMessages.size() > 20 ) {
- cout << ".warning: pendingMessages.size()>20, ignoring msg until we dequeue" << endl;
- return 0;
- }
- m = new MR(&pc, fr->__msgid(), pc.farEnd);
- pendingMessages[fr->__msgid()] = m;
- }
- else
- m = i->second;
- return m;
-/*
- MR*& m = pendingMessages[fr->__msgid()];
- if( m == 0 )
- m = new MR(&pc, fr->__msgid(), fromAddr);
- return m;
-*/
-}
-
-void MR::removeFromReceivingList() {
- pc.cr.pendingMessages.erase(msgid);
-}
-
-MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
- pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
-{
- messageLenExpected = nExpected = -1;
- f.push_back(0);
-}
-
-void MR::reportMissings(bool reportAll) {
- vector<short> missing;
- unsigned t = curTimeMillis();
- for( unsigned i = 0; i < f.size(); i++ ) {
- if( f[i] == 0 ) {
- int diff = tdiff(reportTimes[i],t);
- assert( diff >= 0 || reportTimes[i] == 0 );
- if( diff > 100 || reportTimes[i]==0 ||
- (reportAll && diff > 1) ) {
- reportTimes[i] = t;
- assert( i < 25000 );
- missing.push_back(i);
- }
- }
- }
- if( !missing.empty() )
- __sendMISSING(&pc, from, msgid, missing);
-}
-
-inline bool MR::complete() {
- if( nReceived == nExpected ) {
- assert( nExpected == (int) f.size() );
- assert( f[0] != 0 );
- return true;
- }
- return false;
-/*
- if( nExpected > (int) f.size() || nExpected == -1 )
- return false;
- for( unsigned i = 0; i < f.size(); i++ )
- if( f[i] == 0 )
- return false;
- return true;
-*/
-}
-
-/* true=msg complete */
-bool MR::got(F *frag, EndPoint& fromAddr) {
- MSGID msgid = frag->__msgid();
- int i = frag->__num();
- if( i == 544 && !frag->__isREQUESTACK() ) {
- cout << "************ GOT LAST FRAGMENT #544" << endl;
- }
- if( nExpected < 0 && i == 0 ) {
- messageLenExpected = frag->__firstFragMsgLen();
- if( messageLenExpected == frag->__len()-FragHeader )
- nExpected = 1;
- else {
- int mss = frag->__len()-FragHeader;
- assert( messageLenExpected > mss );
- nExpected = (messageLenExpected + mss - 1) / mss;
- ptrace( cout << ".got first frag, expect:" << nExpected << "packets, expectedLen:" << messageLenExpected << endl; )
- }
- }
- if( i >= (int) f.size() )
- f.resize(i+1, 0);
- if( frag->__isREQUESTACK() ) {
- ptrace( cout << "...got(): processing a REQUESTACK" << endl; )
- /* we're simply seeing if we got the data, and then acking. */
- delete frag;
- frag = 0;
- reportTimes.resize(f.size(), 0);
- if( complete() ) {
- __sendACK(&pc, msgid);
- return true;
- }
- reportMissings(true);
- return false;
- }
- else if( f[i] != 0 ) {
- ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
- delete frag;
- return false;
- }
- if( frag ) {
- f[i] = frag;
- nReceived++;
- }
- reportTimes.resize(f.size(), 0);
-
- if( !complete() )
- return false;
- __sendACK(&pc, msgid);
- return true;
-
-/*
- if( f[0] == 0 || f[i] == 0 ) {
-// reportMissings(frag == 0);
- return false;
- }
- if( i+1 < nExpected ) {
-//cout << "TEMP COMMENT" << endl;
-// if( i > 0 && f[i-1] == 0 )
-// reportMissings(frag == 0);
- return false;
- }
- // last fragment received
- if( !complete() ) {
-// reportMissings(frag == 0);
- return false;
- }
- __sendACK(&pc, fromAddr, msgid);
- return frag != 0;
-*/
-}
-
-MR* CR::recv() {
- MR *x;
- {
- lock lk(biglock);
- while( received.empty() )
- receivedSome.wait(lk);
- x = received.back();
- received.pop_back();
- }
- return x;
-}
-
-// this is how we tell protosend.cpp we got these
-void gotACK(F*, ProtocolConnection *);
-void gotMISSING(F*, ProtocolConnection *);
-
-void receiverThread() {
- ProtocolConnection *startingConn; // this thread manages many; this is just initiator or the parent for acceptany
- UDPConnection *uc;
- {
- lock lk(threadStarterMutex);
- while( 1 ) {
- if( threadUseThisOne != 0 ) {
- uc = &threadUseThisOne->udpConnection;
- startingConn = threadUseThisOne;
- threadUseThisOne = 0;
- break;
- }
- threadActivate.wait(lk);
- }
- }
-
- cout << "\n.Activating a new receiverThread\n " << startingConn->toString() << '\n' << endl;
-
- EndPoint fromAddr;
- while( 1 ) {
- F *f = __recv(*uc, fromAddr.sa);
- lock l(biglock);
- fromAddr.channel = f->__channel();
- ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
- assert( fromAddr.channel >= 0 );
- EndPointToPC::iterator it = pcMap.find(fromAddr);
- ProtocolConnection *mypc;
- if( it == pcMap.end() ) {
- if( !startingConn->acceptAnyChannel() ) {
- cout << ".WARNING: got packet from an unknown endpoint:" << fromAddr.toString() << endl;
- cout << ". this may be ok if you just restarted" << endl;
- delete f;
- continue;
- }
- cout << ".New connection accepted from " << fromAddr.toString() << endl;
- mypc = new ProtocolConnection(*startingConn, fromAddr);
- pcMap[fromAddr] = mypc;
- }
- else
- mypc = it->second;
-
- assert( fromAddr.channel == mypc->farEnd.channel );
- MsgTracker& track = mypc->cr.oldMsgTracker;
-
- if( f->op != F::NORMAL ) {
- if( f->__isACK() ) {
- gotACK(f, mypc);
- delete f;
- continue;
- }
- if( f->__isMISSING() ) {
- gotMISSING(f, mypc);
- delete f;
- continue;
- }
- if( f->op == F::RESET ) {
- ptrace( cout << ".got RESET" << endl; )
- track.reset();
- mypc->cs.resetIt();
- delete f;
- continue;
- }
- }
-
- if( track.recentlyReceived.count(f->__msgid()) ) {
- // already done with it. ignore, other than acking.
- if( f->__isREQUESTACK() )
- __sendACK(mypc, f->__msgid());
- else {
- ptrace( cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl; )
- }
- delete f;
- continue;
- }
-
- if( f->__msgid() <= track.lastFullyReceived && !track.recentlyReceivedList.empty() ) {
- // reconnect on an old channel?
- ptrace( cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived << " conn:" << fromAddr.toString() << endl; )
- }
-
- MR *m = mypc->cr.getPendingMsg(f); /* todo: optimize for single fragment case? */
- if( m == 0 ) {
- ptrace( cout << "..getPendingMsg() returns 0" << endl; )
- delete f;
- continue;
- }
- if( m->got(f, fromAddr) ) {
- track.got(m->msgid);
- m->removeFromReceivingList();
- {
- mypc->parent->cr.received.push_back(m);
- mypc->parent->cr.receivedSome.notify_one();
- }
- }
- }
-}
+// protorecv.cpp + +#include "stdafx.h" +#include "protocol.h" +#include "boost/thread.hpp" +#include "../util/goodies.h" +#include "../util/sock.h" +#include "protoimpl.h" +#include "../db/introspect.h" + +boost::mutex biglock; +boost::mutex coutmutex; +boost::mutex threadStarterMutex; +boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne +ProtocolConnection *threadUseThisOne = 0; +void receiverThread(); + +map<SockAddr,ProtocolConnection*> firstPCForThisAddress; + +/* todo: eventually support multiple listeners on diff ports. not + bothering yet. +*/ +ProtocolConnection *any = 0; + +EndPointToPC pcMap; + +class GeneralInspector : public SingleResultObjCursor { + Cursor* clone() { return new GeneralInspector(*this); } + void fill() { + b.append("version", "1.0.0.1"); + b.append("versionDesc", "none"); + b.append("nConnections", pcMap.size()); + } +public: + GeneralInspector() { reg("intr.general"); } +} _geninspectorproto; + +#include <signal.h> + +/* our version of netstat */ +void sighandler(int x) { + cout << "ProtocolConnections:" << endl; + lock lk(biglock); + EndPointToPC::iterator it = pcMap.begin(); + while( it != pcMap.end() ) { + cout << " conn " << it->second->toString() << endl; + it++; + } + cout << "any: "; + if( any ) cout << any->toString(); + cout << "\ndone" << endl; +} + +struct SetSignal { + SetSignal() { +#if !defined(_WIN32) + signal(SIGUSR2, sighandler); +#endif + } +} setSignal; + +string ProtocolConnection::toString() { + stringstream out; + out << myEnd.toString() << " <> " << + farEnd.toString() << " rcvd.size:" << cr.received.size() << + " pndngMsgs.size:" << cr.pendingMessages.size() << + " pndngSnd.size:" << cs.pendingSend.size(); + return out.str(); +} + +ProtocolConnection::~ProtocolConnection() { + cout << ".warning: ~ProtocolConnection() not implemented (leaks mem etc)" << endl; + if( any == this ) + any = 0; +} + +void ProtocolConnection::shutdown() { + ptrace( cout << ".shutdown()" << endl; ) + if( acceptAnyChannel() || first ) + return; + ptrace( cout << ". to:" << to.toString() << endl; ) + __sendRESET(this); +} + +inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint& _to) : + udpConnection(par.udpConnection), myEnd(par.myEnd), cs(*this), cr(*this) +{ + parent = ∥ + farEnd = _to; + first = true; + // todo: LOCK + + assert(pcMap.count(farEnd) == 0); +// pcMap[myEnd] = this; +} + +inline bool ProtocolConnection::acceptAnyChannel() const { + return myEnd.channel == MessagingPort::ANYCHANNEL; +} + +inline void ProtocolConnection::init() { + first = true; + lock lk(biglock); + lock tslk(threadStarterMutex); + + if( acceptAnyChannel() ) { + assert( any == 0 ); + any = this; + } + else { + pcMap[farEnd] = this; + } + + if( firstPCForThisAddress.count(myEnd.sa) == 0 ) { + firstPCForThisAddress[myEnd.sa] = this; + // need a receiver thread. one per port # we listen on. shared by channels. + boost::thread receiver(receiverThread); + threadUseThisOne = this; + threadActivate.notify_one(); + return; + } +} + +ProtocolConnection::ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd) : + udpConnection(c), myEnd(e), cs(*this), cr(*this) +{ + parent = this; + if( _farEnd ) { + farEnd.channel = myEnd.channel; + farEnd.sa = *_farEnd; + } + init(); +} + +/* find message for fragment */ +MR* CR::getPendingMsg(F *fr) { + MR *m; + map<int,MR*>::iterator i = pendingMessages.find(fr->__msgid()); + if( i == pendingMessages.end() ) { + if( pendingMessages.size() > 20 ) { + cout << ".warning: pendingMessages.size()>20, ignoring msg until we dequeue" << endl; + return 0; + } + m = new MR(&pc, fr->__msgid(), pc.farEnd); + pendingMessages[fr->__msgid()] = m; + } + else + m = i->second; + return m; +/* + MR*& m = pendingMessages[fr->__msgid()]; + if( m == 0 ) + m = new MR(&pc, fr->__msgid(), fromAddr); + return m; +*/ +} + +void MR::removeFromReceivingList() { + pc.cr.pendingMessages.erase(msgid); +} + +MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) : + pc(*_pc), msgid(_msgid), from(_from), nReceived(0) +{ + messageLenExpected = nExpected = -1; + f.push_back(0); +} + +void MR::reportMissings(bool reportAll) { + vector<short> missing; + unsigned t = curTimeMillis(); + for( unsigned i = 0; i < f.size(); i++ ) { + if( f[i] == 0 ) { + int diff = tdiff(reportTimes[i],t); + assert( diff >= 0 || reportTimes[i] == 0 ); + if( diff > 100 || reportTimes[i]==0 || + (reportAll && diff > 1) ) { + reportTimes[i] = t; + assert( i < 25000 ); + missing.push_back(i); + } + } + } + if( !missing.empty() ) + __sendMISSING(&pc, from, msgid, missing); +} + +inline bool MR::complete() { + if( nReceived == nExpected ) { + assert( nExpected == (int) f.size() ); + assert( f[0] != 0 ); + return true; + } + return false; +/* + if( nExpected > (int) f.size() || nExpected == -1 ) + return false; + for( unsigned i = 0; i < f.size(); i++ ) + if( f[i] == 0 ) + return false; + return true; +*/ +} + +/* true=msg complete */ +bool MR::got(F *frag, EndPoint& fromAddr) { + MSGID msgid = frag->__msgid(); + int i = frag->__num(); + if( i == 544 && !frag->__isREQUESTACK() ) { + cout << "************ GOT LAST FRAGMENT #544" << endl; + } + if( nExpected < 0 && i == 0 ) { + messageLenExpected = frag->__firstFragMsgLen(); + if( messageLenExpected == frag->__len()-FragHeader ) + nExpected = 1; + else { + int mss = frag->__len()-FragHeader; + assert( messageLenExpected > mss ); + nExpected = (messageLenExpected + mss - 1) / mss; + ptrace( cout << ".got first frag, expect:" << nExpected << "packets, expectedLen:" << messageLenExpected << endl; ) + } + } + if( i >= (int) f.size() ) + f.resize(i+1, 0); + if( frag->__isREQUESTACK() ) { + ptrace( cout << "...got(): processing a REQUESTACK" << endl; ) + /* we're simply seeing if we got the data, and then acking. */ + delete frag; + frag = 0; + reportTimes.resize(f.size(), 0); + if( complete() ) { + __sendACK(&pc, msgid); + return true; + } + reportMissings(true); + return false; + } + else if( f[i] != 0 ) { + ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; ) + delete frag; + return false; + } + if( frag ) { + f[i] = frag; + nReceived++; + } + reportTimes.resize(f.size(), 0); + + if( !complete() ) + return false; + __sendACK(&pc, msgid); + return true; + +/* + if( f[0] == 0 || f[i] == 0 ) { +// reportMissings(frag == 0); + return false; + } + if( i+1 < nExpected ) { +//cout << "TEMP COMMENT" << endl; +// if( i > 0 && f[i-1] == 0 ) +// reportMissings(frag == 0); + return false; + } + // last fragment received + if( !complete() ) { +// reportMissings(frag == 0); + return false; + } + __sendACK(&pc, fromAddr, msgid); + return frag != 0; +*/ +} + +MR* CR::recv() { + MR *x; + { + lock lk(biglock); + while( received.empty() ) + receivedSome.wait(lk); + x = received.back(); + received.pop_back(); + } + return x; +} + +// this is how we tell protosend.cpp we got these +void gotACK(F*, ProtocolConnection *); +void gotMISSING(F*, ProtocolConnection *); + +void receiverThread() { + ProtocolConnection *startingConn; // this thread manages many; this is just initiator or the parent for acceptany + UDPConnection *uc; + { + lock lk(threadStarterMutex); + while( 1 ) { + if( threadUseThisOne != 0 ) { + uc = &threadUseThisOne->udpConnection; + startingConn = threadUseThisOne; + threadUseThisOne = 0; + break; + } + threadActivate.wait(lk); + } + } + + cout << "\n.Activating a new receiverThread\n " << startingConn->toString() << '\n' << endl; + + EndPoint fromAddr; + while( 1 ) { + F *f = __recv(*uc, fromAddr.sa); + lock l(biglock); + fromAddr.channel = f->__channel(); + ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; ) + assert( fromAddr.channel >= 0 ); + EndPointToPC::iterator it = pcMap.find(fromAddr); + ProtocolConnection *mypc; + if( it == pcMap.end() ) { + if( !startingConn->acceptAnyChannel() ) { + cout << ".WARNING: got packet from an unknown endpoint:" << fromAddr.toString() << endl; + cout << ". this may be ok if you just restarted" << endl; + delete f; + continue; + } + cout << ".New connection accepted from " << fromAddr.toString() << endl; + mypc = new ProtocolConnection(*startingConn, fromAddr); + pcMap[fromAddr] = mypc; + } + else + mypc = it->second; + + assert( fromAddr.channel == mypc->farEnd.channel ); + MsgTracker& track = mypc->cr.oldMsgTracker; + + if( f->op != F::NORMAL ) { + if( f->__isACK() ) { + gotACK(f, mypc); + delete f; + continue; + } + if( f->__isMISSING() ) { + gotMISSING(f, mypc); + delete f; + continue; + } + if( f->op == F::RESET ) { + ptrace( cout << ".got RESET" << endl; ) + track.reset(); + mypc->cs.resetIt(); + delete f; + continue; + } + } + + if( track.recentlyReceived.count(f->__msgid()) ) { + // already done with it. ignore, other than acking. + if( f->__isREQUESTACK() ) + __sendACK(mypc, f->__msgid()); + else { + ptrace( cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl; ) + } + delete f; + continue; + } + + if( f->__msgid() <= track.lastFullyReceived && !track.recentlyReceivedList.empty() ) { + // reconnect on an old channel? + ptrace( cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived << " conn:" << fromAddr.toString() << endl; ) + } + + MR *m = mypc->cr.getPendingMsg(f); /* todo: optimize for single fragment case? */ + if( m == 0 ) { + ptrace( cout << "..getPendingMsg() returns 0" << endl; ) + delete f; + continue; + } + if( m->got(f, fromAddr) ) { + track.got(m->msgid); + m->removeFromReceivingList(); + { + mypc->parent->cr.received.push_back(m); + mypc->parent->cr.receivedSome.notify_one(); + } + } + } +} diff --git a/grid/protosend.cpp b/grid/protosend.cpp index 2d22450ced1..dfd31036127 100644 --- a/grid/protosend.cpp +++ b/grid/protosend.cpp @@ -1,188 +1,188 @@ -// protosend.cpp
-
-/* todo: redo locking! */
-
-#include "stdafx.h"
-#include "protocol.h"
-#include "boost/thread.hpp"
-#include "../util/goodies.h"
-#include "../util/sock.h"
-#include "protoimpl.h"
-using namespace boost;
-typedef boost::mutex::scoped_lock lock;
-
-/* todo: granularity? */
-extern boost::mutex biglock;
-
-void senderComplainThread();
-
-boost::thread sender(senderComplainThread);
-
-bool erasePending(ProtocolConnection *pc, int id);
-
-inline bool MS::complain(unsigned now) {
- if( tdiff(lastComplainTime, now) < (int) complainInterval )
- return false;
- if( complainInterval > 4000 ) {
- etrace( cout << ".no ack of send after " << complainInterval/1000 << " seconds " << to.toString() << endl; )
- if( complainInterval > 35000 ) {
- etrace( cout << ".GIVING UP on sending message" << endl; )
- erasePending(pc, msgid);
- return true;
- }
- }
- complainInterval *= 2;
- lastComplainTime = now;
- __sendREQUESTACK(pc, to, msgid, fragments.size()-1);
- return false;
-}
-
-// where's my ack?
-void senderComplainThread() {
- sleepmillis(200);
-
- while( 1 ) {
- sleepmillis(50);
- {
- lock lk(biglock);
- unsigned now = curTimeMillis();
- /* todo: more efficient data structure here. */
- for( EndPointToPC::iterator i = pcMap.begin(); i != pcMap.end(); i++ ) {
- ProtocolConnection *pc = i->second;
- for( vector<MS*>::iterator j = pc->cs.pendingSend.begin(); j < pc->cs.pendingSend.end(); j++ )
- if( (*j)->complain(now) )
- break;
- }
- }
- }
-}
-
-inline MS* _slocked_getMSForFrag(F* fr, ProtocolConnection *pc) {
- int id = fr->__msgid();
- vector<MS*>::iterator it;
- for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) {
- MS *m = *it;
- if( m->msgid == id )
- return m;
- }
- return 0;
-}
-
-// received a MISSING message from the other end. retransmit.
-void gotMISSING(F* fr, ProtocolConnection *pc) {
- pc->cs.delayGotMissing();
-
- ptrace( cout << ".gotMISSING() msgid:" << fr->__msgid() << endl; )
- MS *m = _slocked_getMSForFrag(fr, pc);
- if( m ) {
- {
- int df = tdiff(m->lastComplainTime, curTimeMillis()) * 2;
- if( df < 10 )
- df = 10;
- if( df > 2000 )
- df = 2000;
- m->complainInterval = (unsigned) df;
- cout << "TEMP: set complainInterval to " << m->complainInterval << endl;
- }
-
- int n;
- short* s = fr->__getMissing(n);
- assert( n > 0 && n < 5000 );
- for( int i = 0; i < n; i++ ) {
- // ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; )
- __sendFrag(pc, m->to, m->fragments[s[i]], true);
- if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
- ptrace( cout << "SLEEP" << endl; )
- sleepmillis((int) pc->cs.delay);
- }
- }
- return;
- }
- ptrace( cout << "\t.warning: gotMISSING for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
-}
-
-/* done sending a msg, clean up and notify sender to unblock */
-bool erasePending(ProtocolConnection *pc, int id) {
- vector<MS*>::iterator it;
- CS& cs = pc->cs;
- for( it = cs.pendingSend.begin(); it < cs.pendingSend.end(); it++ ) {
- MS *m = *it;
- if( m->msgid == id ) {
- cs.pendingSend.erase(it);
- ptrace( cout << "..gotACK/erase: found pendingSend msg:" << id << endl; )
- delete m;
- cs.msgSent.notify_one();
- return true;
- }
- }
- return false;
-}
-
-// received an ACK message. so we can discard our saved copy we were trying to send, we
-// are done with it.
-void gotACK(F* fr, ProtocolConnection *pc) {
- if( erasePending(pc, fr->__msgid()) )
- return;
- ptrace( cout << ".warning: got ack for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
-}
-
-void MS::send() {
- /* flow control */
- lock lk(biglock);
- ptrace( cout << "..MS::send() pending=" << pc->cs.pendingSend.size() << endl; )
-
- if( pc->acceptAnyChannel() ) {
- EndPointToPC::iterator it = pcMap.find(to);
- if( it == pcMap.end() ) {
- cout << ".ERROR: can't find ProtocolConnection object for " << to.toString() << endl;
- assert(false);
- return;
- }
- /* switch to the child protocolconnection */
- pc = it->second;
- }
- else {
- assert(pc->myEnd.channel == to.channel);
- }
-
- if( pc->first ) {
- pc->first = false;
- if( pc->myEnd.channel >= 0 )
- __sendRESET(pc);
- assert( pc->farEnd.channel == to.channel);
- if( pc->farEnd.sa.isLocalHost() )
- pc->cs.delayMax = 1.0;
- }
-
- // not locked here, probably ok to call size()
- if( pc->cs.pendingSend.size() >= 1 ) {
- cout << ".waiting for queued sends to complete " << pc->cs.pendingSend.size() << endl;
- while( pc->cs.pendingSend.size() >= 1 )
- pc->cs.msgSent.wait(lk);
- cout << ".waitend" << endl;
- }
-
- lastComplainTime = curTimeMillis();
- pc->cs.pendingSend.push_back(this);
- /* todo: pace */
- for( unsigned i = 0; i < fragments.size(); i++ ) {
- __sendFrag(pc, to, fragments[i]);
- if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
- ptrace( cout << ".sleep " << pc->cs.delay << endl; )
- sleepmillis((int) pc->cs.delay);
- }
- }
-
- pc->cs.delaySentMsg();
-}
-
-CS::CS(ProtocolConnection& _pc) :
- pc(_pc), delay(0), delayMax(10)
-{
-}
-
-void CS::resetIt() {
- for( vector<MS*>::iterator i = pendingSend.begin(); i < pendingSend.end(); i++ )
- delete (*i);
- pendingSend.clear();
-}
+// protosend.cpp + +/* todo: redo locking! */ + +#include "stdafx.h" +#include "protocol.h" +#include "boost/thread.hpp" +#include "../util/goodies.h" +#include "../util/sock.h" +#include "protoimpl.h" +using namespace boost; +typedef boost::mutex::scoped_lock lock; + +/* todo: granularity? */ +extern boost::mutex biglock; + +void senderComplainThread(); + +boost::thread sender(senderComplainThread); + +bool erasePending(ProtocolConnection *pc, int id); + +inline bool MS::complain(unsigned now) { + if( tdiff(lastComplainTime, now) < (int) complainInterval ) + return false; + if( complainInterval > 4000 ) { + etrace( cout << ".no ack of send after " << complainInterval/1000 << " seconds " << to.toString() << endl; ) + if( complainInterval > 35000 ) { + etrace( cout << ".GIVING UP on sending message" << endl; ) + erasePending(pc, msgid); + return true; + } + } + complainInterval *= 2; + lastComplainTime = now; + __sendREQUESTACK(pc, to, msgid, fragments.size()-1); + return false; +} + +// where's my ack? +void senderComplainThread() { + sleepmillis(200); + + while( 1 ) { + sleepmillis(50); + { + lock lk(biglock); + unsigned now = curTimeMillis(); + /* todo: more efficient data structure here. */ + for( EndPointToPC::iterator i = pcMap.begin(); i != pcMap.end(); i++ ) { + ProtocolConnection *pc = i->second; + for( vector<MS*>::iterator j = pc->cs.pendingSend.begin(); j < pc->cs.pendingSend.end(); j++ ) + if( (*j)->complain(now) ) + break; + } + } + } +} + +inline MS* _slocked_getMSForFrag(F* fr, ProtocolConnection *pc) { + int id = fr->__msgid(); + vector<MS*>::iterator it; + for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) { + MS *m = *it; + if( m->msgid == id ) + return m; + } + return 0; +} + +// received a MISSING message from the other end. retransmit. +void gotMISSING(F* fr, ProtocolConnection *pc) { + pc->cs.delayGotMissing(); + + ptrace( cout << ".gotMISSING() msgid:" << fr->__msgid() << endl; ) + MS *m = _slocked_getMSForFrag(fr, pc); + if( m ) { + { + int df = tdiff(m->lastComplainTime, curTimeMillis()) * 2; + if( df < 10 ) + df = 10; + if( df > 2000 ) + df = 2000; + m->complainInterval = (unsigned) df; + cout << "TEMP: set complainInterval to " << m->complainInterval << endl; + } + + int n; + short* s = fr->__getMissing(n); + assert( n > 0 && n < 5000 ); + for( int i = 0; i < n; i++ ) { + // ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; ) + __sendFrag(pc, m->to, m->fragments[s[i]], true); + if( i % 64 == 0 && pc->cs.delay >= 1.0 ) { + ptrace( cout << "SLEEP" << endl; ) + sleepmillis((int) pc->cs.delay); + } + } + return; + } + ptrace( cout << "\t.warning: gotMISSING for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; ) +} + +/* done sending a msg, clean up and notify sender to unblock */ +bool erasePending(ProtocolConnection *pc, int id) { + vector<MS*>::iterator it; + CS& cs = pc->cs; + for( it = cs.pendingSend.begin(); it < cs.pendingSend.end(); it++ ) { + MS *m = *it; + if( m->msgid == id ) { + cs.pendingSend.erase(it); + ptrace( cout << "..gotACK/erase: found pendingSend msg:" << id << endl; ) + delete m; + cs.msgSent.notify_one(); + return true; + } + } + return false; +} + +// received an ACK message. so we can discard our saved copy we were trying to send, we +// are done with it. +void gotACK(F* fr, ProtocolConnection *pc) { + if( erasePending(pc, fr->__msgid()) ) + return; + ptrace( cout << ".warning: got ack for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; ) +} + +void MS::send() { + /* flow control */ + lock lk(biglock); + ptrace( cout << "..MS::send() pending=" << pc->cs.pendingSend.size() << endl; ) + + if( pc->acceptAnyChannel() ) { + EndPointToPC::iterator it = pcMap.find(to); + if( it == pcMap.end() ) { + cout << ".ERROR: can't find ProtocolConnection object for " << to.toString() << endl; + assert(false); + return; + } + /* switch to the child protocolconnection */ + pc = it->second; + } + else { + assert(pc->myEnd.channel == to.channel); + } + + if( pc->first ) { + pc->first = false; + if( pc->myEnd.channel >= 0 ) + __sendRESET(pc); + assert( pc->farEnd.channel == to.channel); + if( pc->farEnd.sa.isLocalHost() ) + pc->cs.delayMax = 1.0; + } + + // not locked here, probably ok to call size() + if( pc->cs.pendingSend.size() >= 1 ) { + cout << ".waiting for queued sends to complete " << pc->cs.pendingSend.size() << endl; + while( pc->cs.pendingSend.size() >= 1 ) + pc->cs.msgSent.wait(lk); + cout << ".waitend" << endl; + } + + lastComplainTime = curTimeMillis(); + pc->cs.pendingSend.push_back(this); + /* todo: pace */ + for( unsigned i = 0; i < fragments.size(); i++ ) { + __sendFrag(pc, to, fragments[i]); + if( i % 64 == 0 && pc->cs.delay >= 1.0 ) { + ptrace( cout << ".sleep " << pc->cs.delay << endl; ) + sleepmillis((int) pc->cs.delay); + } + } + + pc->cs.delaySentMsg(); +} + +CS::CS(ProtocolConnection& _pc) : + pc(_pc), delay(0), delayMax(10) +{ +} + +void CS::resetIt() { + for( vector<MS*>::iterator i = pendingSend.begin(); i < pendingSend.end(); i++ ) + delete (*i); + pendingSend.clear(); +} |