summaryrefslogtreecommitdiff
path: root/grid
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-06-06 09:43:15 -0400
committerDwight <dmerriman@gmail.com>2008-06-06 09:43:15 -0400
commit3051b961cac30f9bf81ac72b816ddb5e8e3c2ee9 (patch)
tree85a5b1cb376b067eee5cf668d42deff78a870627 /grid
parent877b72efcdd55f9fc9b271c707d4b489e551793d (diff)
downloadmongo-3051b961cac30f9bf81ac72b816ddb5e8e3c2ee9.tar.gz
dos2unix
Diffstat (limited to 'grid')
-rw-r--r--grid/message.cpp392
-rw-r--r--grid/message.h258
-rw-r--r--grid/protocol.h522
-rw-r--r--grid/protoimpl.h484
-rw-r--r--grid/protorecv.cpp772
-rw-r--r--grid/protosend.cpp376
6 files changed, 1402 insertions, 1402 deletions
diff --git a/grid/message.cpp b/grid/message.cpp
index 0a10667e49f..caf2cd7e8a8 100644
--- a/grid/message.cpp
+++ b/grid/message.cpp
@@ -1,196 +1,196 @@
-/* message
-
- todo: authenticate; encrypt?
-*/
-
-#include "stdafx.h"
-#include "message.h"
-#include <time.h>
-#include "../util/goodies.h"
-
-// if you want trace output:
-#define mmm(x)
-
-/* listener ------------------------------------------------------------------- */
-
-void Listener::listen() {
- SockAddr me(port);
- int sock = socket(AF_INET, SOCK_STREAM, 0);
- if( sock == INVALID_SOCKET ) {
- cout << "ERROR: listen(): invalid socket? " << errno << endl;
- return;
- }
- prebindOptions( sock );
- if( bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
- cout << "listen(): bind() failed errno:" << errno << endl;
- if( errno == 98 )
- cout << "98 == addr already in use" << endl;
- closesocket(sock);
- return;
- }
-
- if( ::listen(sock, 128) != 0 ) {
- cout << "listen(): listen() failed " << errno << endl;
- closesocket(sock);
- return;
- }
-
- SockAddr from;
- while( 1 ) {
- int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
- if( s < 0 ) {
- cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
- continue;
- }
- disableNagle(s);
- cout << "Listener: connection accepted from " << from.toString() << endl;
- accepted( new MessagingPort(s, from) );
- }
-}
-
-/* messagingport -------------------------------------------------------------- */
-
-MSGID NextMsgId;
-struct MsgStart {
- MsgStart() {
- NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
- assert(MsgDataHeaderSize == 16);
- }
-} msgstart;
-
-MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { }
-
-MessagingPort::MessagingPort() {
- sock = -1;
-}
-
-void MessagingPort::shutdown() {
- if( sock >= 0 ) {
- closesocket(sock);
- sock = -1;
- }
-}
-
-MessagingPort::~MessagingPort() {
- shutdown();
-}
-
-bool MessagingPort::connect(SockAddr& _far)
-{
- farEnd = _far;
-
- sock = socket(AF_INET, SOCK_STREAM, 0);
- if( sock == INVALID_SOCKET ) {
- cout << "ERROR: connect(): invalid socket? " << errno << endl;
- return false;
- }
- if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
- cout << "ERROR: connect(): connect() failed" << errno << endl;
- closesocket(sock); sock = -1;
- return false;
- }
- disableNagle(sock);
- return true;
-}
-
-bool MessagingPort::recv(Message& m) {
- mmm( cout << "* recv() sock:" << this->sock << endl; )
- int len = -1;
-
- char *lenbuf = (char *) &len;
- int lft = 4;
- while( 1 ) {
- int x = ::recv(sock, lenbuf, lft, 0);
- if( x == 0 ) {
- cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if( x < 0 ) {
- cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString()<<endl;
- m.reset();
- return false;
- }
- lft -= x;
- if( lft == 0 )
- break;
- lenbuf += x;
- cout << "MessagingPort::recv(): got " << x << " bytes wanted 4, lft=" << lft << endl;
- assert( lft > 0 );
- }
-
-// assert( x == 4 );
-
- assert( len >= 0 && len <= 16000000 );
-
- int z = (len+1023)&0xfffffc00; assert(z>=len);
- MsgData *md = (MsgData *) malloc(z);
- md->len = len;
-
- if ( len <= 0 ){
- cout << "got a length of 0, something is wrong" << endl;
- return false;
- }
-
- char *p = (char *) &md->id;
- int left = len -4;
- while( 1 ) {
- int x = ::recv(sock, p, left, 0);
- if( x == 0 ) {
- cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- if( x < 0 ) {
- cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString() << endl;
- m.reset();
- return false;
- }
- left -= x;
- p += x;
- if( left <= 0 )
- break;
- }
-
- m.setData(md, true);
- return true;
-}
-
-void MessagingPort::reply(Message& received, Message& response) {
- say(received.from, response, received.data->id);
-}
-
-bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) {
- mmm( cout << "*call()" << endl; )
- MSGID old = toSend.data->id;
- say(to, toSend);
- while( 1 ) {
- bool ok = recv(response);
- if( !ok )
- return false;
- //cout << "got response: " << response.data->responseTo << endl;
- if( response.data->responseTo == toSend.data->id )
- break;
- cout << "********************" << endl;
- cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl;
- cout << " old:" << old << endl;
- cout << " response msgid:" << response.data->id << endl;
- cout << " response len: " << response.data->len << endl;
- assert(false);
- response.reset();
- }
- mmm( cout << "*call() end" << endl; )
- return true;
-}
-
-void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
- mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
- MSGID msgid = NextMsgId;
- ++NextMsgId;
- toSend.data->id = msgid;
- toSend.data->responseTo = responseTo;
- int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0);
- if( x <= 0 ) {
- cout << "MessagingPort::say: send() error " << errno << ' ' << farEnd.toString() << endl;
- }
-}
+/* message
+
+ todo: authenticate; encrypt?
+*/
+
+#include "stdafx.h"
+#include "message.h"
+#include <time.h>
+#include "../util/goodies.h"
+
+// if you want trace output:
+#define mmm(x)
+
+/* listener ------------------------------------------------------------------- */
+
+void Listener::listen() {
+ SockAddr me(port);
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if( sock == INVALID_SOCKET ) {
+ cout << "ERROR: listen(): invalid socket? " << errno << endl;
+ return;
+ }
+ prebindOptions( sock );
+ if( bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
+ cout << "listen(): bind() failed errno:" << errno << endl;
+ if( errno == 98 )
+ cout << "98 == addr already in use" << endl;
+ closesocket(sock);
+ return;
+ }
+
+ if( ::listen(sock, 128) != 0 ) {
+ cout << "listen(): listen() failed " << errno << endl;
+ closesocket(sock);
+ return;
+ }
+
+ SockAddr from;
+ while( 1 ) {
+ int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
+ if( s < 0 ) {
+ cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
+ continue;
+ }
+ disableNagle(s);
+ cout << "Listener: connection accepted from " << from.toString() << endl;
+ accepted( new MessagingPort(s, from) );
+ }
+}
+
+/* messagingport -------------------------------------------------------------- */
+
+MSGID NextMsgId;
+struct MsgStart {
+ MsgStart() {
+ NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
+ assert(MsgDataHeaderSize == 16);
+ }
+} msgstart;
+
+MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { }
+
+MessagingPort::MessagingPort() {
+ sock = -1;
+}
+
+void MessagingPort::shutdown() {
+ if( sock >= 0 ) {
+ closesocket(sock);
+ sock = -1;
+ }
+}
+
+MessagingPort::~MessagingPort() {
+ shutdown();
+}
+
+bool MessagingPort::connect(SockAddr& _far)
+{
+ farEnd = _far;
+
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if( sock == INVALID_SOCKET ) {
+ cout << "ERROR: connect(): invalid socket? " << errno << endl;
+ return false;
+ }
+ if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
+ cout << "ERROR: connect(): connect() failed" << errno << endl;
+ closesocket(sock); sock = -1;
+ return false;
+ }
+ disableNagle(sock);
+ return true;
+}
+
+bool MessagingPort::recv(Message& m) {
+ mmm( cout << "* recv() sock:" << this->sock << endl; )
+ int len = -1;
+
+ char *lenbuf = (char *) &len;
+ int lft = 4;
+ while( 1 ) {
+ int x = ::recv(sock, lenbuf, lft, 0);
+ if( x == 0 ) {
+ cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
+ m.reset();
+ return false;
+ }
+ if( x < 0 ) {
+ cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString()<<endl;
+ m.reset();
+ return false;
+ }
+ lft -= x;
+ if( lft == 0 )
+ break;
+ lenbuf += x;
+ cout << "MessagingPort::recv(): got " << x << " bytes wanted 4, lft=" << lft << endl;
+ assert( lft > 0 );
+ }
+
+// assert( x == 4 );
+
+ assert( len >= 0 && len <= 16000000 );
+
+ int z = (len+1023)&0xfffffc00; assert(z>=len);
+ MsgData *md = (MsgData *) malloc(z);
+ md->len = len;
+
+ if ( len <= 0 ){
+ cout << "got a length of 0, something is wrong" << endl;
+ return false;
+ }
+
+ char *p = (char *) &md->id;
+ int left = len -4;
+ while( 1 ) {
+ int x = ::recv(sock, p, left, 0);
+ if( x == 0 ) {
+ cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
+ m.reset();
+ return false;
+ }
+ if( x < 0 ) {
+ cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString() << endl;
+ m.reset();
+ return false;
+ }
+ left -= x;
+ p += x;
+ if( left <= 0 )
+ break;
+ }
+
+ m.setData(md, true);
+ return true;
+}
+
+void MessagingPort::reply(Message& received, Message& response) {
+ say(received.from, response, received.data->id);
+}
+
+bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) {
+ mmm( cout << "*call()" << endl; )
+ MSGID old = toSend.data->id;
+ say(to, toSend);
+ while( 1 ) {
+ bool ok = recv(response);
+ if( !ok )
+ return false;
+ //cout << "got response: " << response.data->responseTo << endl;
+ if( response.data->responseTo == toSend.data->id )
+ break;
+ cout << "********************" << endl;
+ cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl;
+ cout << " old:" << old << endl;
+ cout << " response msgid:" << response.data->id << endl;
+ cout << " response len: " << response.data->len << endl;
+ assert(false);
+ response.reset();
+ }
+ mmm( cout << "*call() end" << endl; )
+ return true;
+}
+
+void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
+ mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
+ MSGID msgid = NextMsgId;
+ ++NextMsgId;
+ toSend.data->id = msgid;
+ toSend.data->responseTo = responseTo;
+ int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0);
+ if( x <= 0 ) {
+ cout << "MessagingPort::say: send() error " << errno << ' ' << farEnd.toString() << endl;
+ }
+}
diff --git a/grid/message.h b/grid/message.h
index 1395fa51e0a..604e9a67343 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -1,129 +1,129 @@
-// message.h
-
-#pragma once
-
-#include "../util/sock.h"
-
-class Message;
-class MessagingPort;
-typedef WrappingInt MSGID;
-const int DBPort = 27017;
-
-class Listener {
-public:
- Listener(int p) : port(p) { }
- void listen(); // never returns (start a thread)
-
- /* spawn a thread, etc., then return */
- virtual void accepted(MessagingPort *mp) = 0;
-private:
- int port;
-};
-
-class AbstractMessagingPort {
-public:
- virtual void reply(Message& received, Message& response) = 0;
-};
-
-class MessagingPort : public AbstractMessagingPort {
-public:
- MessagingPort(int sock, SockAddr& farEnd);
- MessagingPort();
- ~MessagingPort();
-
- void shutdown();
-
- bool connect(SockAddr& farEnd);
-
- /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
- also, the Message data will go out of scope on the subsequent recv call.
- */
- bool recv(Message& m);
- void reply(Message& received, Message& response);
- bool call(SockAddr& to, Message& toSend, Message& response);
- void say(SockAddr& to, Message& toSend, int responseTo = -1);
-
-private:
- int sock;
- SockAddr farEnd;
-};
-
-#pragma pack(push)
-#pragma pack(1)
-
-enum Operations {
- opReply = 1, /* reply. responseTo is set. */
-
- dbMsg = 1000, /* generic msg command followed by a string */
-
- dbUpdate = 2001, /* update object */
- dbInsert = 2002,
-// dbGetByOID = 2003,
- dbQuery = 2004,
- dbGetMore = 2005,
- dbDelete = 2006,
- dbKillCursors = 2007
-};
-
-struct MsgData {
- int len; /* len of the msg, including this field */
- MSGID id; /* request/reply id's match... */
- int responseTo; /* id of the message we are responding to */
- int operation;
- char _data[4];
-
- int dataLen();
-};
-const int MsgDataHeaderSize = sizeof(MsgData) - 4;
-inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
-
-#pragma pack(pop)
-
-class Message {
-public:
- Message() { data = 0; freeIt = false; }
- Message( void * _data , bool _freeIt ){ data = (MsgData*)_data; freeIt = _freeIt; };
- ~Message() { reset(); }
-
- SockAddr from;
- MsgData *data;
-
- Message& operator=(Message& r) {
- assert( data == 0 );
- data = r.data;
- assert( r.freeIt );
- r.freeIt = false;
- r.data = 0;
- freeIt = true;
- return *this;
- }
-
- void reset() {
- if( freeIt && data )
- free(data);
- data = 0; freeIt = false;
- }
-
- void setData(MsgData *d, bool _freeIt) {
- assert( data == 0 );
- freeIt = _freeIt;
- data = d;
- }
- void setData(int operation, const char *msgtxt) {
- setData(operation, msgtxt, strlen(msgtxt)+1);
- }
- void setData(int operation, const char *msgdata, int len) {
- assert(data == 0);
- int dataLen = len + sizeof(MsgData) - 4;
- MsgData *d = (MsgData *) malloc(dataLen);
- memcpy(d->_data, msgdata, len);
- d->len = dataLen;
- d->operation = operation;
- freeIt= true;
- data = d;
- }
-
-private:
- bool freeIt;
-};
-
+// message.h
+
+#pragma once
+
+#include "../util/sock.h"
+
+class Message;
+class MessagingPort;
+typedef WrappingInt MSGID;
+const int DBPort = 27017;
+
+class Listener {
+public:
+ Listener(int p) : port(p) { }
+ void listen(); // never returns (start a thread)
+
+ /* spawn a thread, etc., then return */
+ virtual void accepted(MessagingPort *mp) = 0;
+private:
+ int port;
+};
+
+class AbstractMessagingPort {
+public:
+ virtual void reply(Message& received, Message& response) = 0;
+};
+
+class MessagingPort : public AbstractMessagingPort {
+public:
+ MessagingPort(int sock, SockAddr& farEnd);
+ MessagingPort();
+ ~MessagingPort();
+
+ void shutdown();
+
+ bool connect(SockAddr& farEnd);
+
+ /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
+ also, the Message data will go out of scope on the subsequent recv call.
+ */
+ bool recv(Message& m);
+ void reply(Message& received, Message& response);
+ bool call(SockAddr& to, Message& toSend, Message& response);
+ void say(SockAddr& to, Message& toSend, int responseTo = -1);
+
+private:
+ int sock;
+ SockAddr farEnd;
+};
+
+#pragma pack(push)
+#pragma pack(1)
+
+enum Operations {
+ opReply = 1, /* reply. responseTo is set. */
+
+ dbMsg = 1000, /* generic msg command followed by a string */
+
+ dbUpdate = 2001, /* update object */
+ dbInsert = 2002,
+// dbGetByOID = 2003,
+ dbQuery = 2004,
+ dbGetMore = 2005,
+ dbDelete = 2006,
+ dbKillCursors = 2007
+};
+
+struct MsgData {
+ int len; /* len of the msg, including this field */
+ MSGID id; /* request/reply id's match... */
+ int responseTo; /* id of the message we are responding to */
+ int operation;
+ char _data[4];
+
+ int dataLen();
+};
+const int MsgDataHeaderSize = sizeof(MsgData) - 4;
+inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
+
+#pragma pack(pop)
+
+class Message {
+public:
+ Message() { data = 0; freeIt = false; }
+ Message( void * _data , bool _freeIt ){ data = (MsgData*)_data; freeIt = _freeIt; };
+ ~Message() { reset(); }
+
+ SockAddr from;
+ MsgData *data;
+
+ Message& operator=(Message& r) {
+ assert( data == 0 );
+ data = r.data;
+ assert( r.freeIt );
+ r.freeIt = false;
+ r.data = 0;
+ freeIt = true;
+ return *this;
+ }
+
+ void reset() {
+ if( freeIt && data )
+ free(data);
+ data = 0; freeIt = false;
+ }
+
+ void setData(MsgData *d, bool _freeIt) {
+ assert( data == 0 );
+ freeIt = _freeIt;
+ data = d;
+ }
+ void setData(int operation, const char *msgtxt) {
+ setData(operation, msgtxt, strlen(msgtxt)+1);
+ }
+ void setData(int operation, const char *msgdata, int len) {
+ assert(data == 0);
+ int dataLen = len + sizeof(MsgData) - 4;
+ MsgData *d = (MsgData *) malloc(dataLen);
+ memcpy(d->_data, msgdata, len);
+ d->len = dataLen;
+ d->operation = operation;
+ freeIt= true;
+ data = d;
+ }
+
+private:
+ bool freeIt;
+};
+
diff --git a/grid/protocol.h b/grid/protocol.h
index f98c6cde1f4..e7d0298ec9b 100644
--- a/grid/protocol.h
+++ b/grid/protocol.h
@@ -1,261 +1,261 @@
-// protocol.h
-
-#pragma once
-
-NOT USED
-
-#include "boost/thread/mutex.hpp"
-#include "boost/thread/condition.hpp"
-#include "../util/sock.h"
-#include "../util/goodies.h"
-
-typedef WrappingInt MSGID;
-
-struct Fragment;
-
-#if 0
-#define ptrace(x)
-#else
-#define ptrace(x) { cout << curTimeMillis() % 10000; x }
-#endif
-
-#if 1
-#define etrace(x)
-#else
-#define etrace(x) { cout << curTimeMillis() % 10000; x }
-#endif
-
-class F; // fragment
-class MR; // message. R=receiver side.
-class CR; // connection receiver side
-class MS; // message S=sender side.
-class CS; // connection sender side
-class ProtocolConnection; // connection overall
-
-/* ip:port:channel
- We have one receiver thread per process (per ip address destination), and then
- multiplex messages out to multiple connections(generally one per thread) which
- each have a 'channel'.
-*/
-class EndPoint {
-public:
- EndPoint() : channel(0) { }
- int channel;
- SockAddr sa;
- bool operator<(const EndPoint& r) const {
- if( channel != r.channel )
- return channel < r.channel;
- return sa < r.sa;
- }
- string toString() {
- stringstream out;
- out << sa.toString() << ':';
- if( channel == -2 ) out << "ANYCHANNEL";
- else if( channel == -1 ) out << "AUTOASSIGNCHANNEL";
- else out << channel;
- return out.str();
- }
-};
-
-/* the double underscore stuff here is the actual implementation glue.
- wanted to keep that stuff clean and separate. so put your implementation
- of these in protoimpl.h.
-*/
-
-void __sendRESET(ProtocolConnection *pc);
-
-// sender ->
-void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *, bool retran=false); // transmit a fragment
-void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid, int fragNo); // transmit the REQUEST ACK msg
-
-// receiver ->
-void __sendACK(ProtocolConnection *pc, MSGID msgid); // transmit ACK
-void __sendMISSING(ProtocolConnection *pc, EndPoint& to, MSGID msgid, vector<short>& ids); // transmit MISSING
-
-// -> receiver
-F* __recv(UDPConnection& c, SockAddr& from); /* recv from socket a fragment and pass back */
-
-class F {
-public:
- F(Fragment *f);
- ~F();
- int __num(); //frag #
- int __len();
- MSGID __msgid();
- int __channel();
- bool __isREQUESTACK(); // if true, this is just a request for acknowledgement not real data
- int __firstFragMsgLen(); // only works on first fragment
-
- // sender side:
- bool __isACK(); // if true, this is an ack of a message
- bool __isMISSING(); // if true, list of frags to retransmit
- short* __getMissing(int& n); // get the missing fragno list
-
- Fragment *internals;
- enum { NORMAL, ACK, MISSING, REQUESTACK, RESET } op;
-};
-
-class MR {
-public:
- MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from);
- ~MR() { freeFrags(); }
- void freeFrags() {
- for( unsigned i = 0; i < f.size(); i++ )
- delete f[i];
- }
- bool got(F *f, EndPoint& from); // received a fragment
- bool gotFirst() { return f[0] != 0; }
- ProtocolConnection& pc;
- void removeFromReceivingList();
- bool complete();
- const MSGID msgid;
- int n() { return f.size(); }
-public:
- int messageLenExpected;
- int nExpected, nReceived;
- void reportMissings(bool reportAll);
- vector<F*> f;
- vector<unsigned> reportTimes;
- EndPoint from;
-};
-
-/* this is for knowing what is already received. we might get dup packets later and need
- to ignore them. */
-class MsgTracker {
-public:
- std::list<MSGID> recentlyReceivedList;
- std::set<MSGID> recentlyReceived;
- MSGID lastFullyReceived;
-
- void reset() {
- recentlyReceivedList.clear();
- recentlyReceived.clear();
- lastFullyReceived = 0;
- }
-
- void got(MSGID m) {
- unsigned sz = recentlyReceived.size();
- if( sz > 256 ) {
- recentlyReceived.erase(recentlyReceivedList.front());
- recentlyReceivedList.pop_front();
- }
- recentlyReceivedList.push_back(m);
- recentlyReceived.insert(m);
- if( m > lastFullyReceived || sz == 0 )
- lastFullyReceived = m;
- }
-};
-
-class CR {
- friend class MR;
-public:
- ~CR() { ptrace( cout << ".warning: ~CR() not implemented" << endl; ) }
- CR(ProtocolConnection& _pc) : pc(_pc) { }
- MR* recv();
-public:
- MR* getPendingMsg(F *fr);
- bool oldMessageId(int channel, MSGID m);
- void queueReceived(MR*);
-
- ProtocolConnection& pc;
- boost::condition receivedSome;
- vector<MR*> received; /* ready to dequeue and use */
- map<int,MR*> pendingMessages; /* partly received msgs */
- MsgTracker oldMsgTracker;
-};
-
-/* -- sender side ------------------------------------------------*/
-
-class CS {
-public:
- CS(ProtocolConnection& _pc);
-
- ProtocolConnection& pc;
- vector<MS*> pendingSend;
- boost::condition msgSent;
- void resetIt();
-
- double delayMax;
- double delay;
- void delayGotMissing() {
- double delayOld = delay;
- if( delay == 0 )
- delay = 2.0;
- else
- delay = delay * 1.25;
- if( delay > delayMax ) delay = delayMax;
- if( delay != delayOld )
- cout << ".DELAY INCREASED TO " << delay << endl;
- }
- void delaySentMsg() {
- if( delay != 0.0 ) {
- delay = delay * 0.5;
- if( delay<0.5 ) delay = 0;
- cout << ".DELAY DECREASED TO " << delay << endl;
- }
- }
-};
-
-typedef map<EndPoint,ProtocolConnection*> EndPointToPC;
-extern EndPointToPC pcMap; /* the *far* endpoint -> pc */
-
-/* -- overall Connection object ----------------------------------*/
-
-#pragma warning( disable: 4355 )
-
-class ProtocolConnection {
-public:
- string toString();
-
- ProtocolConnection(ProtocolConnection& par, EndPoint& to);
- ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd);
- ~ProtocolConnection();
-
- void shutdown();
- bool acceptAnyChannel() const;
- UDPConnection& udpConnection;
- /* note the channel for myEnd might be "any" for the any pc -
- so you can't use that channel for sending. Use MS/MR
- for that.
- */
- EndPoint myEnd;
- EndPoint farEnd;
-
- /* if this was instantiated automatically for an acceptAnyChannel(),
- keep a ptr back to it and queue received msgs there.
- */
- ProtocolConnection *parent;
-
- CR cr;
- CS cs;
- bool first; // true if yet to send first message on this conn
-
-private:
- void init();
-};
-
-/* -- sender side ------------------------------------------------*/
-
-class MS {
-public:
- MS(ProtocolConnection *_pc, EndPoint &_to, MSGID _msgid) :
- pc(_pc), to(_to), msgid(_msgid), complainInterval(50) { }
- ~MS() {
- for( unsigned i = 0; i < fragments.size(); i++ )
- delete fragments[i];
- }
-
- /* init fragments, then call this */
- void send();
-
- vector<F*> fragments;
-
- /* request retrainsmissions. */
- bool complain(unsigned now);
-
- ProtocolConnection* pc;
- EndPoint to;
- const MSGID msgid;
- unsigned lastComplainTime;
- unsigned complainInterval;
-};
+// protocol.h
+
+#pragma once
+
+NOT USED
+
+#include "boost/thread/mutex.hpp"
+#include "boost/thread/condition.hpp"
+#include "../util/sock.h"
+#include "../util/goodies.h"
+
+typedef WrappingInt MSGID;
+
+struct Fragment;
+
+#if 0
+#define ptrace(x)
+#else
+#define ptrace(x) { cout << curTimeMillis() % 10000; x }
+#endif
+
+#if 1
+#define etrace(x)
+#else
+#define etrace(x) { cout << curTimeMillis() % 10000; x }
+#endif
+
+class F; // fragment
+class MR; // message. R=receiver side.
+class CR; // connection receiver side
+class MS; // message S=sender side.
+class CS; // connection sender side
+class ProtocolConnection; // connection overall
+
+/* ip:port:channel
+ We have one receiver thread per process (per ip address destination), and then
+ multiplex messages out to multiple connections(generally one per thread) which
+ each have a 'channel'.
+*/
+class EndPoint {
+public:
+ EndPoint() : channel(0) { }
+ int channel;
+ SockAddr sa;
+ bool operator<(const EndPoint& r) const {
+ if( channel != r.channel )
+ return channel < r.channel;
+ return sa < r.sa;
+ }
+ string toString() {
+ stringstream out;
+ out << sa.toString() << ':';
+ if( channel == -2 ) out << "ANYCHANNEL";
+ else if( channel == -1 ) out << "AUTOASSIGNCHANNEL";
+ else out << channel;
+ return out.str();
+ }
+};
+
+/* the double underscore stuff here is the actual implementation glue.
+ wanted to keep that stuff clean and separate. so put your implementation
+ of these in protoimpl.h.
+*/
+
+void __sendRESET(ProtocolConnection *pc);
+
+// sender ->
+void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *, bool retran=false); // transmit a fragment
+void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid, int fragNo); // transmit the REQUEST ACK msg
+
+// receiver ->
+void __sendACK(ProtocolConnection *pc, MSGID msgid); // transmit ACK
+void __sendMISSING(ProtocolConnection *pc, EndPoint& to, MSGID msgid, vector<short>& ids); // transmit MISSING
+
+// -> receiver
+F* __recv(UDPConnection& c, SockAddr& from); /* recv from socket a fragment and pass back */
+
+class F {
+public:
+ F(Fragment *f);
+ ~F();
+ int __num(); //frag #
+ int __len();
+ MSGID __msgid();
+ int __channel();
+ bool __isREQUESTACK(); // if true, this is just a request for acknowledgement not real data
+ int __firstFragMsgLen(); // only works on first fragment
+
+ // sender side:
+ bool __isACK(); // if true, this is an ack of a message
+ bool __isMISSING(); // if true, list of frags to retransmit
+ short* __getMissing(int& n); // get the missing fragno list
+
+ Fragment *internals;
+ enum { NORMAL, ACK, MISSING, REQUESTACK, RESET } op;
+};
+
+class MR {
+public:
+ MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from);
+ ~MR() { freeFrags(); }
+ void freeFrags() {
+ for( unsigned i = 0; i < f.size(); i++ )
+ delete f[i];
+ }
+ bool got(F *f, EndPoint& from); // received a fragment
+ bool gotFirst() { return f[0] != 0; }
+ ProtocolConnection& pc;
+ void removeFromReceivingList();
+ bool complete();
+ const MSGID msgid;
+ int n() { return f.size(); }
+public:
+ int messageLenExpected;
+ int nExpected, nReceived;
+ void reportMissings(bool reportAll);
+ vector<F*> f;
+ vector<unsigned> reportTimes;
+ EndPoint from;
+};
+
+/* this is for knowing what is already received. we might get dup packets later and need
+ to ignore them. */
+class MsgTracker {
+public:
+ std::list<MSGID> recentlyReceivedList;
+ std::set<MSGID> recentlyReceived;
+ MSGID lastFullyReceived;
+
+ void reset() {
+ recentlyReceivedList.clear();
+ recentlyReceived.clear();
+ lastFullyReceived = 0;
+ }
+
+ void got(MSGID m) {
+ unsigned sz = recentlyReceived.size();
+ if( sz > 256 ) {
+ recentlyReceived.erase(recentlyReceivedList.front());
+ recentlyReceivedList.pop_front();
+ }
+ recentlyReceivedList.push_back(m);
+ recentlyReceived.insert(m);
+ if( m > lastFullyReceived || sz == 0 )
+ lastFullyReceived = m;
+ }
+};
+
+class CR {
+ friend class MR;
+public:
+ ~CR() { ptrace( cout << ".warning: ~CR() not implemented" << endl; ) }
+ CR(ProtocolConnection& _pc) : pc(_pc) { }
+ MR* recv();
+public:
+ MR* getPendingMsg(F *fr);
+ bool oldMessageId(int channel, MSGID m);
+ void queueReceived(MR*);
+
+ ProtocolConnection& pc;
+ boost::condition receivedSome;
+ vector<MR*> received; /* ready to dequeue and use */
+ map<int,MR*> pendingMessages; /* partly received msgs */
+ MsgTracker oldMsgTracker;
+};
+
+/* -- sender side ------------------------------------------------*/
+
+class CS {
+public:
+ CS(ProtocolConnection& _pc);
+
+ ProtocolConnection& pc;
+ vector<MS*> pendingSend;
+ boost::condition msgSent;
+ void resetIt();
+
+ double delayMax;
+ double delay;
+ void delayGotMissing() {
+ double delayOld = delay;
+ if( delay == 0 )
+ delay = 2.0;
+ else
+ delay = delay * 1.25;
+ if( delay > delayMax ) delay = delayMax;
+ if( delay != delayOld )
+ cout << ".DELAY INCREASED TO " << delay << endl;
+ }
+ void delaySentMsg() {
+ if( delay != 0.0 ) {
+ delay = delay * 0.5;
+ if( delay<0.5 ) delay = 0;
+ cout << ".DELAY DECREASED TO " << delay << endl;
+ }
+ }
+};
+
+typedef map<EndPoint,ProtocolConnection*> EndPointToPC;
+extern EndPointToPC pcMap; /* the *far* endpoint -> pc */
+
+/* -- overall Connection object ----------------------------------*/
+
+#pragma warning( disable: 4355 )
+
+class ProtocolConnection {
+public:
+ string toString();
+
+ ProtocolConnection(ProtocolConnection& par, EndPoint& to);
+ ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd);
+ ~ProtocolConnection();
+
+ void shutdown();
+ bool acceptAnyChannel() const;
+ UDPConnection& udpConnection;
+ /* note the channel for myEnd might be "any" for the any pc -
+ so you can't use that channel for sending. Use MS/MR
+ for that.
+ */
+ EndPoint myEnd;
+ EndPoint farEnd;
+
+ /* if this was instantiated automatically for an acceptAnyChannel(),
+ keep a ptr back to it and queue received msgs there.
+ */
+ ProtocolConnection *parent;
+
+ CR cr;
+ CS cs;
+ bool first; // true if yet to send first message on this conn
+
+private:
+ void init();
+};
+
+/* -- sender side ------------------------------------------------*/
+
+class MS {
+public:
+ MS(ProtocolConnection *_pc, EndPoint &_to, MSGID _msgid) :
+ pc(_pc), to(_to), msgid(_msgid), complainInterval(50) { }
+ ~MS() {
+ for( unsigned i = 0; i < fragments.size(); i++ )
+ delete fragments[i];
+ }
+
+ /* init fragments, then call this */
+ void send();
+
+ vector<F*> fragments;
+
+ /* request retrainsmissions. */
+ bool complain(unsigned now);
+
+ ProtocolConnection* pc;
+ EndPoint to;
+ const MSGID msgid;
+ unsigned lastComplainTime;
+ unsigned complainInterval;
+};
diff --git a/grid/protoimpl.h b/grid/protoimpl.h
index 5ad19a47250..9ca5277e527 100644
--- a/grid/protoimpl.h
+++ b/grid/protoimpl.h
@@ -1,242 +1,242 @@
-// protoimpl.h
-
-#pragma once
-
-/* packet dumping level of detail. */
-const bool dumpPackets = false; // this must be true to get anything at all
-const bool dumpIP = false; // output the ip address
-const bool dumpBytesDetailed = false; // more data output
-
-#include "message.h"
-
-extern boost::mutex coutmutex;
-
-const int FragMax = 1480;
-const int FragHeader = 10;
-const int MSS = FragMax - FragHeader;
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct Fragment {
- enum { MinFragmentLen = FragHeader + 1 };
- MSGID msgId;
- short channel;
- short fragmentLen;
- short fragmentNo;
- char data[16];
- int fragmentDataLen() { return fragmentLen - FragHeader; }
- char* fragmentData() { return data; }
-
- bool ok(int nRead) {
- if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
- ptrace( cout << ".recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl; )
- return false;
- }
- if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) {
- ptrace( cout << ".recv: bad first fragment. fragmentLen:" << fragmentLen << endl; )
- return false;
- }
- return true;
- }
-
- MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; }
-};
-#pragma pack(pop)
-
-inline void DUMP(Fragment& f, SockAddr& t, const char *tabs) {
- if( !dumpPackets )
- return;
- cout << tabs << curTimeMillis() % 10000 << ' ';
- short s = f.fragmentNo;
- if( s == -32768 )
- cout << "ACK M:" << f.msgId % 1000;
- else if( s == -32767 )
- cout << "MISSING";
- else if( s == -32766 )
- cout << "RESET ch:" << f.channel;
- else if( s < 0 )
- cout << "REQUESTACK";
- else
- cout << '#' << s << ' ' << f.fragmentLen << " M:" << f.msgId % 1000;
- cout << ' ';
- if( dumpIP )
- cout << t.toString();
-}
-
-inline void DUMPDATA(Fragment& f, const char *tabs) {
- if( !dumpPackets )
- return;
- if( f.fragmentNo >= 0 ) {
- cout << '\n' << tabs;
- int x = f.fragmentDataLen();
- if( dumpBytesDetailed ) {
- char *p = (char *) &f;
- cout << hex << *((unsigned*)p) << ' '; p+=4;
- cout << *((short*)p) << ' '; p+=2;
- cout << *((short*)p) << ' '; p+=2;
- cout << *((short*)p) << '|'; p+=2;
- if( x < 16 ) cout << "???";
- else {
- for( int i = 0; i < 4; i++ ) { // MSGDATA
- cout << *((unsigned*)p);
- cout << (i < 3 ? ' ' : '|');
- p += 4;
- }
- cout << '\n' << tabs;
- x -= 16;
- if( x > 32 ) x = 32;
- while( x-- > 0 ) { cout << (unsigned) (unsigned char) *p++ << ' '; }
- }
- }
- else {
- char *p = f.data;
- if( f.fragmentNo == 0 ) {
- p += 16; x -= 16;
- }
- if( x > 28 ) x = 28;
- for( int i = 0; i < x; i++ ) {
- if( *p == 0 ) cout << (char) 0xb0;
- else cout << (*p >= 32 ? *p : '.');
- p++;
- }
- }
- }
- cout << dec << endl;
-}
-
-inline void SEND(UDPConnection& c, Fragment &f, SockAddr& to, const char *extra="") {
- lock lk(coutmutex);
- DUMP(f, to, "\t\t\t\t\t>");
- c.sendto((char *) &f, f.fragmentLen, to);
- if( dumpPackets )
- cout << extra;
- DUMPDATA(f, "\t\t\t\t\t ");
-}
-
-// sender ->
-inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f, bool retran) {
- assert( f->internals->channel == to.channel );
- ptrace( cout << ".sendfrag " << f->__num() << ' ' << retran << endl; )
- SEND(pc->udpConnection, *f->internals, to.sa, retran ? " retran" : "");
-}
-
-inline void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to,
- MSGID msgid, int fragNo) {
- Fragment f;
- f.msgId = msgid;
- f.channel = to.channel; assert( f.channel >= 0 );
- f.fragmentNo = ((short) -fragNo) -1;
- f.fragmentLen = FragHeader;
- ptrace( cout << ".requesting ack, fragno=" << f.fragmentNo << " msg:" << f.msgId << ' ' << to.toString() << endl; )
- SEND(pc->udpConnection, f, to.sa);
-}
-
-// receiver ->
-inline void __sendACK(ProtocolConnection *pc, MSGID msgid) {
- ptrace( cout << "...__sendACK() to:" << pc->farEnd.toString() << " msg:" << msgid << endl; )
- Fragment f;
- f.msgId = msgid;
- f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
- f.fragmentNo = -32768;
- f.fragmentLen = FragHeader;
- SEND(pc->udpConnection, f, pc->farEnd.sa);
-}
-
-/* this is to clear old state for the channel in terms of what msgids are
- already sent.
-*/
-inline void __sendRESET(ProtocolConnection *pc) {
- Fragment f;
- f.msgId = -1;
- f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
- f.fragmentNo = -32766;
- f.fragmentLen = FragHeader;
- ptrace( cout << "...__sendRESET() to:" << pc->farEnd.toString() << endl; )
- SEND(pc->udpConnection, f, pc->farEnd.sa);
-}
-
-inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
- MSGID msgid, vector<short>& ids) {
- int n = ids.size();
- ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << " last:" << ids[ids.size()-1] << to.toString() << endl; )
- if( n > 256 ) {
- ptrace( cout << "\t..sendMISSING limiting to 256 ids" << endl; )
- n = 256;
- }
- Fragment *f = (Fragment*) malloc(FragHeader + n*2);
- f->msgId = msgid;
- f->channel = to.channel; assert( f->channel >= 0 );
- f->fragmentNo = -32767;
- f->fragmentLen = FragHeader + n*2;
- short *s = (short *) f->data;
- for( int i = 0; i < n; i++ )
- *s++ = ids[i];
-// ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; )
- SEND(pc->udpConnection, *f, to.sa);
- free(f);
-}
-
-// -> receiver
-inline F* __recv(UDPConnection& c, SockAddr& from) {
- Fragment *f = (Fragment *) malloc(MaxMTU);
- int n;
- while( 1 ) {
-// n = c.recvfrom((char*) f, c.mtu(), from);
- n = c.recvfrom((char*) f, MaxMTU, from);
-// cout << "recvfrom returned " << n << endl;
- if( n >= 0 )
- break;
- if( !goingAway ) {
- cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
- cout << "sleeping 2 seconds " << endl;
- sleepsecs(2);
- }
- }
- assert( f->fragmentLen == n );
- if( f->fragmentNo > 0 ) {
- // don't waste tons of space if the maxmtu is 16k but we get 1480
- unsigned newsz = (f->fragmentLen + 255) & 0xffffff00;
- if( newsz < MaxMTU )
- f = (Fragment *) realloc(f, newsz);
- }
- {
- lock lk(coutmutex);
- DUMP(*f, from, "\t\t\t\t\t\t\t\t\t\t<");
- DUMPDATA(*f, "\t\t\t\t\t\t\t\t\t\t ");
- }
- return new F(f);
-}
-
-inline F::F(Fragment *f) : internals(f), op(NORMAL) {
- if( internals->fragmentNo < 0 ) {
- if( internals->fragmentNo == -32768 ) {
- op = ACK;
- ptrace( cout << ".got ACK msg:" << internals->msgId << endl; )
- } else if( internals->fragmentNo == -32767 ) {
- op = MISSING;
- ptrace( cout << ".got MISSING" << endl; )
- } else if( internals->fragmentNo == -32766 ) {
- op = RESET;
- } else {
- op = REQUESTACK;
- internals->fragmentNo = -(internals->fragmentNo+1);
- ptrace( cout << ".got REQUESTACK frag:" << internals->fragmentNo << " msg:" << internals->msgId << endl; )
- }
- }
-}
-inline F::~F() { free(internals); internals=0; }
-inline int F::__num() { return internals->fragmentNo; }
-inline int F::__len() { return internals->fragmentLen; }
-inline MSGID F::__msgid() { return internals->msgId; }
-inline int F::__channel() { return internals->channel; }
-inline bool F::__isREQUESTACK() { return op == REQUESTACK; }
-inline bool F::__isACK() { return op == ACK; }
-inline bool F::__isMISSING() { return op == MISSING; }
-inline short* F::__getMissing(int& n) {
- n = internals->fragmentDataLen() / 2;
- return (short *) internals->fragmentData();
-}
-inline int F::__firstFragMsgLen() {
- return internals->startOfMsgData()->len;
-}
+// protoimpl.h
+
+#pragma once
+
+/* packet dumping level of detail. */
+const bool dumpPackets = false; // this must be true to get anything at all
+const bool dumpIP = false; // output the ip address
+const bool dumpBytesDetailed = false; // more data output
+
+#include "message.h"
+
+extern boost::mutex coutmutex;
+
+const int FragMax = 1480;
+const int FragHeader = 10;
+const int MSS = FragMax - FragHeader;
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct Fragment {
+ enum { MinFragmentLen = FragHeader + 1 };
+ MSGID msgId;
+ short channel;
+ short fragmentLen;
+ short fragmentNo;
+ char data[16];
+ int fragmentDataLen() { return fragmentLen - FragHeader; }
+ char* fragmentData() { return data; }
+
+ bool ok(int nRead) {
+ if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
+ ptrace( cout << ".recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl; )
+ return false;
+ }
+ if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) {
+ ptrace( cout << ".recv: bad first fragment. fragmentLen:" << fragmentLen << endl; )
+ return false;
+ }
+ return true;
+ }
+
+ MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; }
+};
+#pragma pack(pop)
+
+inline void DUMP(Fragment& f, SockAddr& t, const char *tabs) {
+ if( !dumpPackets )
+ return;
+ cout << tabs << curTimeMillis() % 10000 << ' ';
+ short s = f.fragmentNo;
+ if( s == -32768 )
+ cout << "ACK M:" << f.msgId % 1000;
+ else if( s == -32767 )
+ cout << "MISSING";
+ else if( s == -32766 )
+ cout << "RESET ch:" << f.channel;
+ else if( s < 0 )
+ cout << "REQUESTACK";
+ else
+ cout << '#' << s << ' ' << f.fragmentLen << " M:" << f.msgId % 1000;
+ cout << ' ';
+ if( dumpIP )
+ cout << t.toString();
+}
+
+inline void DUMPDATA(Fragment& f, const char *tabs) {
+ if( !dumpPackets )
+ return;
+ if( f.fragmentNo >= 0 ) {
+ cout << '\n' << tabs;
+ int x = f.fragmentDataLen();
+ if( dumpBytesDetailed ) {
+ char *p = (char *) &f;
+ cout << hex << *((unsigned*)p) << ' '; p+=4;
+ cout << *((short*)p) << ' '; p+=2;
+ cout << *((short*)p) << ' '; p+=2;
+ cout << *((short*)p) << '|'; p+=2;
+ if( x < 16 ) cout << "???";
+ else {
+ for( int i = 0; i < 4; i++ ) { // MSGDATA
+ cout << *((unsigned*)p);
+ cout << (i < 3 ? ' ' : '|');
+ p += 4;
+ }
+ cout << '\n' << tabs;
+ x -= 16;
+ if( x > 32 ) x = 32;
+ while( x-- > 0 ) { cout << (unsigned) (unsigned char) *p++ << ' '; }
+ }
+ }
+ else {
+ char *p = f.data;
+ if( f.fragmentNo == 0 ) {
+ p += 16; x -= 16;
+ }
+ if( x > 28 ) x = 28;
+ for( int i = 0; i < x; i++ ) {
+ if( *p == 0 ) cout << (char) 0xb0;
+ else cout << (*p >= 32 ? *p : '.');
+ p++;
+ }
+ }
+ }
+ cout << dec << endl;
+}
+
+inline void SEND(UDPConnection& c, Fragment &f, SockAddr& to, const char *extra="") {
+ lock lk(coutmutex);
+ DUMP(f, to, "\t\t\t\t\t>");
+ c.sendto((char *) &f, f.fragmentLen, to);
+ if( dumpPackets )
+ cout << extra;
+ DUMPDATA(f, "\t\t\t\t\t ");
+}
+
+// sender ->
+inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f, bool retran) {
+ assert( f->internals->channel == to.channel );
+ ptrace( cout << ".sendfrag " << f->__num() << ' ' << retran << endl; )
+ SEND(pc->udpConnection, *f->internals, to.sa, retran ? " retran" : "");
+}
+
+inline void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to,
+ MSGID msgid, int fragNo) {
+ Fragment f;
+ f.msgId = msgid;
+ f.channel = to.channel; assert( f.channel >= 0 );
+ f.fragmentNo = ((short) -fragNo) -1;
+ f.fragmentLen = FragHeader;
+ ptrace( cout << ".requesting ack, fragno=" << f.fragmentNo << " msg:" << f.msgId << ' ' << to.toString() << endl; )
+ SEND(pc->udpConnection, f, to.sa);
+}
+
+// receiver ->
+inline void __sendACK(ProtocolConnection *pc, MSGID msgid) {
+ ptrace( cout << "...__sendACK() to:" << pc->farEnd.toString() << " msg:" << msgid << endl; )
+ Fragment f;
+ f.msgId = msgid;
+ f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
+ f.fragmentNo = -32768;
+ f.fragmentLen = FragHeader;
+ SEND(pc->udpConnection, f, pc->farEnd.sa);
+}
+
+/* this is to clear old state for the channel in terms of what msgids are
+ already sent.
+*/
+inline void __sendRESET(ProtocolConnection *pc) {
+ Fragment f;
+ f.msgId = -1;
+ f.channel = pc->farEnd.channel; assert( f.channel >= 0 );
+ f.fragmentNo = -32766;
+ f.fragmentLen = FragHeader;
+ ptrace( cout << "...__sendRESET() to:" << pc->farEnd.toString() << endl; )
+ SEND(pc->udpConnection, f, pc->farEnd.sa);
+}
+
+inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
+ MSGID msgid, vector<short>& ids) {
+ int n = ids.size();
+ ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << " last:" << ids[ids.size()-1] << to.toString() << endl; )
+ if( n > 256 ) {
+ ptrace( cout << "\t..sendMISSING limiting to 256 ids" << endl; )
+ n = 256;
+ }
+ Fragment *f = (Fragment*) malloc(FragHeader + n*2);
+ f->msgId = msgid;
+ f->channel = to.channel; assert( f->channel >= 0 );
+ f->fragmentNo = -32767;
+ f->fragmentLen = FragHeader + n*2;
+ short *s = (short *) f->data;
+ for( int i = 0; i < n; i++ )
+ *s++ = ids[i];
+// ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; )
+ SEND(pc->udpConnection, *f, to.sa);
+ free(f);
+}
+
+// -> receiver
+inline F* __recv(UDPConnection& c, SockAddr& from) {
+ Fragment *f = (Fragment *) malloc(MaxMTU);
+ int n;
+ while( 1 ) {
+// n = c.recvfrom((char*) f, c.mtu(), from);
+ n = c.recvfrom((char*) f, MaxMTU, from);
+// cout << "recvfrom returned " << n << endl;
+ if( n >= 0 )
+ break;
+ if( !goingAway ) {
+ cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
+ cout << "sleeping 2 seconds " << endl;
+ sleepsecs(2);
+ }
+ }
+ assert( f->fragmentLen == n );
+ if( f->fragmentNo > 0 ) {
+ // don't waste tons of space if the maxmtu is 16k but we get 1480
+ unsigned newsz = (f->fragmentLen + 255) & 0xffffff00;
+ if( newsz < MaxMTU )
+ f = (Fragment *) realloc(f, newsz);
+ }
+ {
+ lock lk(coutmutex);
+ DUMP(*f, from, "\t\t\t\t\t\t\t\t\t\t<");
+ DUMPDATA(*f, "\t\t\t\t\t\t\t\t\t\t ");
+ }
+ return new F(f);
+}
+
+inline F::F(Fragment *f) : internals(f), op(NORMAL) {
+ if( internals->fragmentNo < 0 ) {
+ if( internals->fragmentNo == -32768 ) {
+ op = ACK;
+ ptrace( cout << ".got ACK msg:" << internals->msgId << endl; )
+ } else if( internals->fragmentNo == -32767 ) {
+ op = MISSING;
+ ptrace( cout << ".got MISSING" << endl; )
+ } else if( internals->fragmentNo == -32766 ) {
+ op = RESET;
+ } else {
+ op = REQUESTACK;
+ internals->fragmentNo = -(internals->fragmentNo+1);
+ ptrace( cout << ".got REQUESTACK frag:" << internals->fragmentNo << " msg:" << internals->msgId << endl; )
+ }
+ }
+}
+inline F::~F() { free(internals); internals=0; }
+inline int F::__num() { return internals->fragmentNo; }
+inline int F::__len() { return internals->fragmentLen; }
+inline MSGID F::__msgid() { return internals->msgId; }
+inline int F::__channel() { return internals->channel; }
+inline bool F::__isREQUESTACK() { return op == REQUESTACK; }
+inline bool F::__isACK() { return op == ACK; }
+inline bool F::__isMISSING() { return op == MISSING; }
+inline short* F::__getMissing(int& n) {
+ n = internals->fragmentDataLen() / 2;
+ return (short *) internals->fragmentData();
+}
+inline int F::__firstFragMsgLen() {
+ return internals->startOfMsgData()->len;
+}
diff --git a/grid/protorecv.cpp b/grid/protorecv.cpp
index 0639939ff08..49200d8ab56 100644
--- a/grid/protorecv.cpp
+++ b/grid/protorecv.cpp
@@ -1,386 +1,386 @@
-// protorecv.cpp
-
-#include "stdafx.h"
-#include "protocol.h"
-#include "boost/thread.hpp"
-#include "../util/goodies.h"
-#include "../util/sock.h"
-#include "protoimpl.h"
-#include "../db/introspect.h"
-
-boost::mutex biglock;
-boost::mutex coutmutex;
-boost::mutex threadStarterMutex;
-boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne
-ProtocolConnection *threadUseThisOne = 0;
-void receiverThread();
-
-map<SockAddr,ProtocolConnection*> firstPCForThisAddress;
-
-/* todo: eventually support multiple listeners on diff ports. not
- bothering yet.
-*/
-ProtocolConnection *any = 0;
-
-EndPointToPC pcMap;
-
-class GeneralInspector : public SingleResultObjCursor {
- Cursor* clone() { return new GeneralInspector(*this); }
- void fill() {
- b.append("version", "1.0.0.1");
- b.append("versionDesc", "none");
- b.append("nConnections", pcMap.size());
- }
-public:
- GeneralInspector() { reg("intr.general"); }
-} _geninspectorproto;
-
-#include <signal.h>
-
-/* our version of netstat */
-void sighandler(int x) {
- cout << "ProtocolConnections:" << endl;
- lock lk(biglock);
- EndPointToPC::iterator it = pcMap.begin();
- while( it != pcMap.end() ) {
- cout << " conn " << it->second->toString() << endl;
- it++;
- }
- cout << "any: ";
- if( any ) cout << any->toString();
- cout << "\ndone" << endl;
-}
-
-struct SetSignal {
- SetSignal() {
-#if !defined(_WIN32)
- signal(SIGUSR2, sighandler);
-#endif
- }
-} setSignal;
-
-string ProtocolConnection::toString() {
- stringstream out;
- out << myEnd.toString() << " <> " <<
- farEnd.toString() << " rcvd.size:" << cr.received.size() <<
- " pndngMsgs.size:" << cr.pendingMessages.size() <<
- " pndngSnd.size:" << cs.pendingSend.size();
- return out.str();
-}
-
-ProtocolConnection::~ProtocolConnection() {
- cout << ".warning: ~ProtocolConnection() not implemented (leaks mem etc)" << endl;
- if( any == this )
- any = 0;
-}
-
-void ProtocolConnection::shutdown() {
- ptrace( cout << ".shutdown()" << endl; )
- if( acceptAnyChannel() || first )
- return;
- ptrace( cout << ". to:" << to.toString() << endl; )
- __sendRESET(this);
-}
-
-inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint& _to) :
- udpConnection(par.udpConnection), myEnd(par.myEnd), cs(*this), cr(*this)
-{
- parent = &par;
- farEnd = _to;
- first = true;
- // todo: LOCK
-
- assert(pcMap.count(farEnd) == 0);
-// pcMap[myEnd] = this;
-}
-
-inline bool ProtocolConnection::acceptAnyChannel() const {
- return myEnd.channel == MessagingPort::ANYCHANNEL;
-}
-
-inline void ProtocolConnection::init() {
- first = true;
- lock lk(biglock);
- lock tslk(threadStarterMutex);
-
- if( acceptAnyChannel() ) {
- assert( any == 0 );
- any = this;
- }
- else {
- pcMap[farEnd] = this;
- }
-
- if( firstPCForThisAddress.count(myEnd.sa) == 0 ) {
- firstPCForThisAddress[myEnd.sa] = this;
- // need a receiver thread. one per port # we listen on. shared by channels.
- boost::thread receiver(receiverThread);
- threadUseThisOne = this;
- threadActivate.notify_one();
- return;
- }
-}
-
-ProtocolConnection::ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd) :
- udpConnection(c), myEnd(e), cs(*this), cr(*this)
-{
- parent = this;
- if( _farEnd ) {
- farEnd.channel = myEnd.channel;
- farEnd.sa = *_farEnd;
- }
- init();
-}
-
-/* find message for fragment */
-MR* CR::getPendingMsg(F *fr) {
- MR *m;
- map<int,MR*>::iterator i = pendingMessages.find(fr->__msgid());
- if( i == pendingMessages.end() ) {
- if( pendingMessages.size() > 20 ) {
- cout << ".warning: pendingMessages.size()>20, ignoring msg until we dequeue" << endl;
- return 0;
- }
- m = new MR(&pc, fr->__msgid(), pc.farEnd);
- pendingMessages[fr->__msgid()] = m;
- }
- else
- m = i->second;
- return m;
-/*
- MR*& m = pendingMessages[fr->__msgid()];
- if( m == 0 )
- m = new MR(&pc, fr->__msgid(), fromAddr);
- return m;
-*/
-}
-
-void MR::removeFromReceivingList() {
- pc.cr.pendingMessages.erase(msgid);
-}
-
-MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
- pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
-{
- messageLenExpected = nExpected = -1;
- f.push_back(0);
-}
-
-void MR::reportMissings(bool reportAll) {
- vector<short> missing;
- unsigned t = curTimeMillis();
- for( unsigned i = 0; i < f.size(); i++ ) {
- if( f[i] == 0 ) {
- int diff = tdiff(reportTimes[i],t);
- assert( diff >= 0 || reportTimes[i] == 0 );
- if( diff > 100 || reportTimes[i]==0 ||
- (reportAll && diff > 1) ) {
- reportTimes[i] = t;
- assert( i < 25000 );
- missing.push_back(i);
- }
- }
- }
- if( !missing.empty() )
- __sendMISSING(&pc, from, msgid, missing);
-}
-
-inline bool MR::complete() {
- if( nReceived == nExpected ) {
- assert( nExpected == (int) f.size() );
- assert( f[0] != 0 );
- return true;
- }
- return false;
-/*
- if( nExpected > (int) f.size() || nExpected == -1 )
- return false;
- for( unsigned i = 0; i < f.size(); i++ )
- if( f[i] == 0 )
- return false;
- return true;
-*/
-}
-
-/* true=msg complete */
-bool MR::got(F *frag, EndPoint& fromAddr) {
- MSGID msgid = frag->__msgid();
- int i = frag->__num();
- if( i == 544 && !frag->__isREQUESTACK() ) {
- cout << "************ GOT LAST FRAGMENT #544" << endl;
- }
- if( nExpected < 0 && i == 0 ) {
- messageLenExpected = frag->__firstFragMsgLen();
- if( messageLenExpected == frag->__len()-FragHeader )
- nExpected = 1;
- else {
- int mss = frag->__len()-FragHeader;
- assert( messageLenExpected > mss );
- nExpected = (messageLenExpected + mss - 1) / mss;
- ptrace( cout << ".got first frag, expect:" << nExpected << "packets, expectedLen:" << messageLenExpected << endl; )
- }
- }
- if( i >= (int) f.size() )
- f.resize(i+1, 0);
- if( frag->__isREQUESTACK() ) {
- ptrace( cout << "...got(): processing a REQUESTACK" << endl; )
- /* we're simply seeing if we got the data, and then acking. */
- delete frag;
- frag = 0;
- reportTimes.resize(f.size(), 0);
- if( complete() ) {
- __sendACK(&pc, msgid);
- return true;
- }
- reportMissings(true);
- return false;
- }
- else if( f[i] != 0 ) {
- ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
- delete frag;
- return false;
- }
- if( frag ) {
- f[i] = frag;
- nReceived++;
- }
- reportTimes.resize(f.size(), 0);
-
- if( !complete() )
- return false;
- __sendACK(&pc, msgid);
- return true;
-
-/*
- if( f[0] == 0 || f[i] == 0 ) {
-// reportMissings(frag == 0);
- return false;
- }
- if( i+1 < nExpected ) {
-//cout << "TEMP COMMENT" << endl;
-// if( i > 0 && f[i-1] == 0 )
-// reportMissings(frag == 0);
- return false;
- }
- // last fragment received
- if( !complete() ) {
-// reportMissings(frag == 0);
- return false;
- }
- __sendACK(&pc, fromAddr, msgid);
- return frag != 0;
-*/
-}
-
-MR* CR::recv() {
- MR *x;
- {
- lock lk(biglock);
- while( received.empty() )
- receivedSome.wait(lk);
- x = received.back();
- received.pop_back();
- }
- return x;
-}
-
-// this is how we tell protosend.cpp we got these
-void gotACK(F*, ProtocolConnection *);
-void gotMISSING(F*, ProtocolConnection *);
-
-void receiverThread() {
- ProtocolConnection *startingConn; // this thread manages many; this is just initiator or the parent for acceptany
- UDPConnection *uc;
- {
- lock lk(threadStarterMutex);
- while( 1 ) {
- if( threadUseThisOne != 0 ) {
- uc = &threadUseThisOne->udpConnection;
- startingConn = threadUseThisOne;
- threadUseThisOne = 0;
- break;
- }
- threadActivate.wait(lk);
- }
- }
-
- cout << "\n.Activating a new receiverThread\n " << startingConn->toString() << '\n' << endl;
-
- EndPoint fromAddr;
- while( 1 ) {
- F *f = __recv(*uc, fromAddr.sa);
- lock l(biglock);
- fromAddr.channel = f->__channel();
- ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
- assert( fromAddr.channel >= 0 );
- EndPointToPC::iterator it = pcMap.find(fromAddr);
- ProtocolConnection *mypc;
- if( it == pcMap.end() ) {
- if( !startingConn->acceptAnyChannel() ) {
- cout << ".WARNING: got packet from an unknown endpoint:" << fromAddr.toString() << endl;
- cout << ". this may be ok if you just restarted" << endl;
- delete f;
- continue;
- }
- cout << ".New connection accepted from " << fromAddr.toString() << endl;
- mypc = new ProtocolConnection(*startingConn, fromAddr);
- pcMap[fromAddr] = mypc;
- }
- else
- mypc = it->second;
-
- assert( fromAddr.channel == mypc->farEnd.channel );
- MsgTracker& track = mypc->cr.oldMsgTracker;
-
- if( f->op != F::NORMAL ) {
- if( f->__isACK() ) {
- gotACK(f, mypc);
- delete f;
- continue;
- }
- if( f->__isMISSING() ) {
- gotMISSING(f, mypc);
- delete f;
- continue;
- }
- if( f->op == F::RESET ) {
- ptrace( cout << ".got RESET" << endl; )
- track.reset();
- mypc->cs.resetIt();
- delete f;
- continue;
- }
- }
-
- if( track.recentlyReceived.count(f->__msgid()) ) {
- // already done with it. ignore, other than acking.
- if( f->__isREQUESTACK() )
- __sendACK(mypc, f->__msgid());
- else {
- ptrace( cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl; )
- }
- delete f;
- continue;
- }
-
- if( f->__msgid() <= track.lastFullyReceived && !track.recentlyReceivedList.empty() ) {
- // reconnect on an old channel?
- ptrace( cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived << " conn:" << fromAddr.toString() << endl; )
- }
-
- MR *m = mypc->cr.getPendingMsg(f); /* todo: optimize for single fragment case? */
- if( m == 0 ) {
- ptrace( cout << "..getPendingMsg() returns 0" << endl; )
- delete f;
- continue;
- }
- if( m->got(f, fromAddr) ) {
- track.got(m->msgid);
- m->removeFromReceivingList();
- {
- mypc->parent->cr.received.push_back(m);
- mypc->parent->cr.receivedSome.notify_one();
- }
- }
- }
-}
+// protorecv.cpp
+
+#include "stdafx.h"
+#include "protocol.h"
+#include "boost/thread.hpp"
+#include "../util/goodies.h"
+#include "../util/sock.h"
+#include "protoimpl.h"
+#include "../db/introspect.h"
+
+boost::mutex biglock;
+boost::mutex coutmutex;
+boost::mutex threadStarterMutex;
+boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne
+ProtocolConnection *threadUseThisOne = 0;
+void receiverThread();
+
+map<SockAddr,ProtocolConnection*> firstPCForThisAddress;
+
+/* todo: eventually support multiple listeners on diff ports. not
+ bothering yet.
+*/
+ProtocolConnection *any = 0;
+
+EndPointToPC pcMap;
+
+class GeneralInspector : public SingleResultObjCursor {
+ Cursor* clone() { return new GeneralInspector(*this); }
+ void fill() {
+ b.append("version", "1.0.0.1");
+ b.append("versionDesc", "none");
+ b.append("nConnections", pcMap.size());
+ }
+public:
+ GeneralInspector() { reg("intr.general"); }
+} _geninspectorproto;
+
+#include <signal.h>
+
+/* our version of netstat */
+void sighandler(int x) {
+ cout << "ProtocolConnections:" << endl;
+ lock lk(biglock);
+ EndPointToPC::iterator it = pcMap.begin();
+ while( it != pcMap.end() ) {
+ cout << " conn " << it->second->toString() << endl;
+ it++;
+ }
+ cout << "any: ";
+ if( any ) cout << any->toString();
+ cout << "\ndone" << endl;
+}
+
+struct SetSignal {
+ SetSignal() {
+#if !defined(_WIN32)
+ signal(SIGUSR2, sighandler);
+#endif
+ }
+} setSignal;
+
+string ProtocolConnection::toString() {
+ stringstream out;
+ out << myEnd.toString() << " <> " <<
+ farEnd.toString() << " rcvd.size:" << cr.received.size() <<
+ " pndngMsgs.size:" << cr.pendingMessages.size() <<
+ " pndngSnd.size:" << cs.pendingSend.size();
+ return out.str();
+}
+
+ProtocolConnection::~ProtocolConnection() {
+ cout << ".warning: ~ProtocolConnection() not implemented (leaks mem etc)" << endl;
+ if( any == this )
+ any = 0;
+}
+
+void ProtocolConnection::shutdown() {
+ ptrace( cout << ".shutdown()" << endl; )
+ if( acceptAnyChannel() || first )
+ return;
+ ptrace( cout << ". to:" << to.toString() << endl; )
+ __sendRESET(this);
+}
+
+inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint& _to) :
+ udpConnection(par.udpConnection), myEnd(par.myEnd), cs(*this), cr(*this)
+{
+ parent = &par;
+ farEnd = _to;
+ first = true;
+ // todo: LOCK
+
+ assert(pcMap.count(farEnd) == 0);
+// pcMap[myEnd] = this;
+}
+
+inline bool ProtocolConnection::acceptAnyChannel() const {
+ return myEnd.channel == MessagingPort::ANYCHANNEL;
+}
+
+inline void ProtocolConnection::init() {
+ first = true;
+ lock lk(biglock);
+ lock tslk(threadStarterMutex);
+
+ if( acceptAnyChannel() ) {
+ assert( any == 0 );
+ any = this;
+ }
+ else {
+ pcMap[farEnd] = this;
+ }
+
+ if( firstPCForThisAddress.count(myEnd.sa) == 0 ) {
+ firstPCForThisAddress[myEnd.sa] = this;
+ // need a receiver thread. one per port # we listen on. shared by channels.
+ boost::thread receiver(receiverThread);
+ threadUseThisOne = this;
+ threadActivate.notify_one();
+ return;
+ }
+}
+
+ProtocolConnection::ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd) :
+ udpConnection(c), myEnd(e), cs(*this), cr(*this)
+{
+ parent = this;
+ if( _farEnd ) {
+ farEnd.channel = myEnd.channel;
+ farEnd.sa = *_farEnd;
+ }
+ init();
+}
+
+/* find message for fragment */
+MR* CR::getPendingMsg(F *fr) {
+ MR *m;
+ map<int,MR*>::iterator i = pendingMessages.find(fr->__msgid());
+ if( i == pendingMessages.end() ) {
+ if( pendingMessages.size() > 20 ) {
+ cout << ".warning: pendingMessages.size()>20, ignoring msg until we dequeue" << endl;
+ return 0;
+ }
+ m = new MR(&pc, fr->__msgid(), pc.farEnd);
+ pendingMessages[fr->__msgid()] = m;
+ }
+ else
+ m = i->second;
+ return m;
+/*
+ MR*& m = pendingMessages[fr->__msgid()];
+ if( m == 0 )
+ m = new MR(&pc, fr->__msgid(), fromAddr);
+ return m;
+*/
+}
+
+void MR::removeFromReceivingList() {
+ pc.cr.pendingMessages.erase(msgid);
+}
+
+MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
+ pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
+{
+ messageLenExpected = nExpected = -1;
+ f.push_back(0);
+}
+
+void MR::reportMissings(bool reportAll) {
+ vector<short> missing;
+ unsigned t = curTimeMillis();
+ for( unsigned i = 0; i < f.size(); i++ ) {
+ if( f[i] == 0 ) {
+ int diff = tdiff(reportTimes[i],t);
+ assert( diff >= 0 || reportTimes[i] == 0 );
+ if( diff > 100 || reportTimes[i]==0 ||
+ (reportAll && diff > 1) ) {
+ reportTimes[i] = t;
+ assert( i < 25000 );
+ missing.push_back(i);
+ }
+ }
+ }
+ if( !missing.empty() )
+ __sendMISSING(&pc, from, msgid, missing);
+}
+
+inline bool MR::complete() {
+ if( nReceived == nExpected ) {
+ assert( nExpected == (int) f.size() );
+ assert( f[0] != 0 );
+ return true;
+ }
+ return false;
+/*
+ if( nExpected > (int) f.size() || nExpected == -1 )
+ return false;
+ for( unsigned i = 0; i < f.size(); i++ )
+ if( f[i] == 0 )
+ return false;
+ return true;
+*/
+}
+
+/* true=msg complete */
+bool MR::got(F *frag, EndPoint& fromAddr) {
+ MSGID msgid = frag->__msgid();
+ int i = frag->__num();
+ if( i == 544 && !frag->__isREQUESTACK() ) {
+ cout << "************ GOT LAST FRAGMENT #544" << endl;
+ }
+ if( nExpected < 0 && i == 0 ) {
+ messageLenExpected = frag->__firstFragMsgLen();
+ if( messageLenExpected == frag->__len()-FragHeader )
+ nExpected = 1;
+ else {
+ int mss = frag->__len()-FragHeader;
+ assert( messageLenExpected > mss );
+ nExpected = (messageLenExpected + mss - 1) / mss;
+ ptrace( cout << ".got first frag, expect:" << nExpected << "packets, expectedLen:" << messageLenExpected << endl; )
+ }
+ }
+ if( i >= (int) f.size() )
+ f.resize(i+1, 0);
+ if( frag->__isREQUESTACK() ) {
+ ptrace( cout << "...got(): processing a REQUESTACK" << endl; )
+ /* we're simply seeing if we got the data, and then acking. */
+ delete frag;
+ frag = 0;
+ reportTimes.resize(f.size(), 0);
+ if( complete() ) {
+ __sendACK(&pc, msgid);
+ return true;
+ }
+ reportMissings(true);
+ return false;
+ }
+ else if( f[i] != 0 ) {
+ ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
+ delete frag;
+ return false;
+ }
+ if( frag ) {
+ f[i] = frag;
+ nReceived++;
+ }
+ reportTimes.resize(f.size(), 0);
+
+ if( !complete() )
+ return false;
+ __sendACK(&pc, msgid);
+ return true;
+
+/*
+ if( f[0] == 0 || f[i] == 0 ) {
+// reportMissings(frag == 0);
+ return false;
+ }
+ if( i+1 < nExpected ) {
+//cout << "TEMP COMMENT" << endl;
+// if( i > 0 && f[i-1] == 0 )
+// reportMissings(frag == 0);
+ return false;
+ }
+ // last fragment received
+ if( !complete() ) {
+// reportMissings(frag == 0);
+ return false;
+ }
+ __sendACK(&pc, fromAddr, msgid);
+ return frag != 0;
+*/
+}
+
+MR* CR::recv() {
+ MR *x;
+ {
+ lock lk(biglock);
+ while( received.empty() )
+ receivedSome.wait(lk);
+ x = received.back();
+ received.pop_back();
+ }
+ return x;
+}
+
+// this is how we tell protosend.cpp we got these
+void gotACK(F*, ProtocolConnection *);
+void gotMISSING(F*, ProtocolConnection *);
+
+void receiverThread() {
+ ProtocolConnection *startingConn; // this thread manages many; this is just initiator or the parent for acceptany
+ UDPConnection *uc;
+ {
+ lock lk(threadStarterMutex);
+ while( 1 ) {
+ if( threadUseThisOne != 0 ) {
+ uc = &threadUseThisOne->udpConnection;
+ startingConn = threadUseThisOne;
+ threadUseThisOne = 0;
+ break;
+ }
+ threadActivate.wait(lk);
+ }
+ }
+
+ cout << "\n.Activating a new receiverThread\n " << startingConn->toString() << '\n' << endl;
+
+ EndPoint fromAddr;
+ while( 1 ) {
+ F *f = __recv(*uc, fromAddr.sa);
+ lock l(biglock);
+ fromAddr.channel = f->__channel();
+ ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
+ assert( fromAddr.channel >= 0 );
+ EndPointToPC::iterator it = pcMap.find(fromAddr);
+ ProtocolConnection *mypc;
+ if( it == pcMap.end() ) {
+ if( !startingConn->acceptAnyChannel() ) {
+ cout << ".WARNING: got packet from an unknown endpoint:" << fromAddr.toString() << endl;
+ cout << ". this may be ok if you just restarted" << endl;
+ delete f;
+ continue;
+ }
+ cout << ".New connection accepted from " << fromAddr.toString() << endl;
+ mypc = new ProtocolConnection(*startingConn, fromAddr);
+ pcMap[fromAddr] = mypc;
+ }
+ else
+ mypc = it->second;
+
+ assert( fromAddr.channel == mypc->farEnd.channel );
+ MsgTracker& track = mypc->cr.oldMsgTracker;
+
+ if( f->op != F::NORMAL ) {
+ if( f->__isACK() ) {
+ gotACK(f, mypc);
+ delete f;
+ continue;
+ }
+ if( f->__isMISSING() ) {
+ gotMISSING(f, mypc);
+ delete f;
+ continue;
+ }
+ if( f->op == F::RESET ) {
+ ptrace( cout << ".got RESET" << endl; )
+ track.reset();
+ mypc->cs.resetIt();
+ delete f;
+ continue;
+ }
+ }
+
+ if( track.recentlyReceived.count(f->__msgid()) ) {
+ // already done with it. ignore, other than acking.
+ if( f->__isREQUESTACK() )
+ __sendACK(mypc, f->__msgid());
+ else {
+ ptrace( cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl; )
+ }
+ delete f;
+ continue;
+ }
+
+ if( f->__msgid() <= track.lastFullyReceived && !track.recentlyReceivedList.empty() ) {
+ // reconnect on an old channel?
+ ptrace( cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived << " conn:" << fromAddr.toString() << endl; )
+ }
+
+ MR *m = mypc->cr.getPendingMsg(f); /* todo: optimize for single fragment case? */
+ if( m == 0 ) {
+ ptrace( cout << "..getPendingMsg() returns 0" << endl; )
+ delete f;
+ continue;
+ }
+ if( m->got(f, fromAddr) ) {
+ track.got(m->msgid);
+ m->removeFromReceivingList();
+ {
+ mypc->parent->cr.received.push_back(m);
+ mypc->parent->cr.receivedSome.notify_one();
+ }
+ }
+ }
+}
diff --git a/grid/protosend.cpp b/grid/protosend.cpp
index 2d22450ced1..dfd31036127 100644
--- a/grid/protosend.cpp
+++ b/grid/protosend.cpp
@@ -1,188 +1,188 @@
-// protosend.cpp
-
-/* todo: redo locking! */
-
-#include "stdafx.h"
-#include "protocol.h"
-#include "boost/thread.hpp"
-#include "../util/goodies.h"
-#include "../util/sock.h"
-#include "protoimpl.h"
-using namespace boost;
-typedef boost::mutex::scoped_lock lock;
-
-/* todo: granularity? */
-extern boost::mutex biglock;
-
-void senderComplainThread();
-
-boost::thread sender(senderComplainThread);
-
-bool erasePending(ProtocolConnection *pc, int id);
-
-inline bool MS::complain(unsigned now) {
- if( tdiff(lastComplainTime, now) < (int) complainInterval )
- return false;
- if( complainInterval > 4000 ) {
- etrace( cout << ".no ack of send after " << complainInterval/1000 << " seconds " << to.toString() << endl; )
- if( complainInterval > 35000 ) {
- etrace( cout << ".GIVING UP on sending message" << endl; )
- erasePending(pc, msgid);
- return true;
- }
- }
- complainInterval *= 2;
- lastComplainTime = now;
- __sendREQUESTACK(pc, to, msgid, fragments.size()-1);
- return false;
-}
-
-// where's my ack?
-void senderComplainThread() {
- sleepmillis(200);
-
- while( 1 ) {
- sleepmillis(50);
- {
- lock lk(biglock);
- unsigned now = curTimeMillis();
- /* todo: more efficient data structure here. */
- for( EndPointToPC::iterator i = pcMap.begin(); i != pcMap.end(); i++ ) {
- ProtocolConnection *pc = i->second;
- for( vector<MS*>::iterator j = pc->cs.pendingSend.begin(); j < pc->cs.pendingSend.end(); j++ )
- if( (*j)->complain(now) )
- break;
- }
- }
- }
-}
-
-inline MS* _slocked_getMSForFrag(F* fr, ProtocolConnection *pc) {
- int id = fr->__msgid();
- vector<MS*>::iterator it;
- for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) {
- MS *m = *it;
- if( m->msgid == id )
- return m;
- }
- return 0;
-}
-
-// received a MISSING message from the other end. retransmit.
-void gotMISSING(F* fr, ProtocolConnection *pc) {
- pc->cs.delayGotMissing();
-
- ptrace( cout << ".gotMISSING() msgid:" << fr->__msgid() << endl; )
- MS *m = _slocked_getMSForFrag(fr, pc);
- if( m ) {
- {
- int df = tdiff(m->lastComplainTime, curTimeMillis()) * 2;
- if( df < 10 )
- df = 10;
- if( df > 2000 )
- df = 2000;
- m->complainInterval = (unsigned) df;
- cout << "TEMP: set complainInterval to " << m->complainInterval << endl;
- }
-
- int n;
- short* s = fr->__getMissing(n);
- assert( n > 0 && n < 5000 );
- for( int i = 0; i < n; i++ ) {
- // ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; )
- __sendFrag(pc, m->to, m->fragments[s[i]], true);
- if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
- ptrace( cout << "SLEEP" << endl; )
- sleepmillis((int) pc->cs.delay);
- }
- }
- return;
- }
- ptrace( cout << "\t.warning: gotMISSING for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
-}
-
-/* done sending a msg, clean up and notify sender to unblock */
-bool erasePending(ProtocolConnection *pc, int id) {
- vector<MS*>::iterator it;
- CS& cs = pc->cs;
- for( it = cs.pendingSend.begin(); it < cs.pendingSend.end(); it++ ) {
- MS *m = *it;
- if( m->msgid == id ) {
- cs.pendingSend.erase(it);
- ptrace( cout << "..gotACK/erase: found pendingSend msg:" << id << endl; )
- delete m;
- cs.msgSent.notify_one();
- return true;
- }
- }
- return false;
-}
-
-// received an ACK message. so we can discard our saved copy we were trying to send, we
-// are done with it.
-void gotACK(F* fr, ProtocolConnection *pc) {
- if( erasePending(pc, fr->__msgid()) )
- return;
- ptrace( cout << ".warning: got ack for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
-}
-
-void MS::send() {
- /* flow control */
- lock lk(biglock);
- ptrace( cout << "..MS::send() pending=" << pc->cs.pendingSend.size() << endl; )
-
- if( pc->acceptAnyChannel() ) {
- EndPointToPC::iterator it = pcMap.find(to);
- if( it == pcMap.end() ) {
- cout << ".ERROR: can't find ProtocolConnection object for " << to.toString() << endl;
- assert(false);
- return;
- }
- /* switch to the child protocolconnection */
- pc = it->second;
- }
- else {
- assert(pc->myEnd.channel == to.channel);
- }
-
- if( pc->first ) {
- pc->first = false;
- if( pc->myEnd.channel >= 0 )
- __sendRESET(pc);
- assert( pc->farEnd.channel == to.channel);
- if( pc->farEnd.sa.isLocalHost() )
- pc->cs.delayMax = 1.0;
- }
-
- // not locked here, probably ok to call size()
- if( pc->cs.pendingSend.size() >= 1 ) {
- cout << ".waiting for queued sends to complete " << pc->cs.pendingSend.size() << endl;
- while( pc->cs.pendingSend.size() >= 1 )
- pc->cs.msgSent.wait(lk);
- cout << ".waitend" << endl;
- }
-
- lastComplainTime = curTimeMillis();
- pc->cs.pendingSend.push_back(this);
- /* todo: pace */
- for( unsigned i = 0; i < fragments.size(); i++ ) {
- __sendFrag(pc, to, fragments[i]);
- if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
- ptrace( cout << ".sleep " << pc->cs.delay << endl; )
- sleepmillis((int) pc->cs.delay);
- }
- }
-
- pc->cs.delaySentMsg();
-}
-
-CS::CS(ProtocolConnection& _pc) :
- pc(_pc), delay(0), delayMax(10)
-{
-}
-
-void CS::resetIt() {
- for( vector<MS*>::iterator i = pendingSend.begin(); i < pendingSend.end(); i++ )
- delete (*i);
- pendingSend.clear();
-}
+// protosend.cpp
+
+/* todo: redo locking! */
+
+#include "stdafx.h"
+#include "protocol.h"
+#include "boost/thread.hpp"
+#include "../util/goodies.h"
+#include "../util/sock.h"
+#include "protoimpl.h"
+using namespace boost;
+typedef boost::mutex::scoped_lock lock;
+
+/* todo: granularity? */
+extern boost::mutex biglock;
+
+void senderComplainThread();
+
+boost::thread sender(senderComplainThread);
+
+bool erasePending(ProtocolConnection *pc, int id);
+
+inline bool MS::complain(unsigned now) {
+ if( tdiff(lastComplainTime, now) < (int) complainInterval )
+ return false;
+ if( complainInterval > 4000 ) {
+ etrace( cout << ".no ack of send after " << complainInterval/1000 << " seconds " << to.toString() << endl; )
+ if( complainInterval > 35000 ) {
+ etrace( cout << ".GIVING UP on sending message" << endl; )
+ erasePending(pc, msgid);
+ return true;
+ }
+ }
+ complainInterval *= 2;
+ lastComplainTime = now;
+ __sendREQUESTACK(pc, to, msgid, fragments.size()-1);
+ return false;
+}
+
+// where's my ack?
+void senderComplainThread() {
+ sleepmillis(200);
+
+ while( 1 ) {
+ sleepmillis(50);
+ {
+ lock lk(biglock);
+ unsigned now = curTimeMillis();
+ /* todo: more efficient data structure here. */
+ for( EndPointToPC::iterator i = pcMap.begin(); i != pcMap.end(); i++ ) {
+ ProtocolConnection *pc = i->second;
+ for( vector<MS*>::iterator j = pc->cs.pendingSend.begin(); j < pc->cs.pendingSend.end(); j++ )
+ if( (*j)->complain(now) )
+ break;
+ }
+ }
+ }
+}
+
+inline MS* _slocked_getMSForFrag(F* fr, ProtocolConnection *pc) {
+ int id = fr->__msgid();
+ vector<MS*>::iterator it;
+ for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) {
+ MS *m = *it;
+ if( m->msgid == id )
+ return m;
+ }
+ return 0;
+}
+
+// received a MISSING message from the other end. retransmit.
+void gotMISSING(F* fr, ProtocolConnection *pc) {
+ pc->cs.delayGotMissing();
+
+ ptrace( cout << ".gotMISSING() msgid:" << fr->__msgid() << endl; )
+ MS *m = _slocked_getMSForFrag(fr, pc);
+ if( m ) {
+ {
+ int df = tdiff(m->lastComplainTime, curTimeMillis()) * 2;
+ if( df < 10 )
+ df = 10;
+ if( df > 2000 )
+ df = 2000;
+ m->complainInterval = (unsigned) df;
+ cout << "TEMP: set complainInterval to " << m->complainInterval << endl;
+ }
+
+ int n;
+ short* s = fr->__getMissing(n);
+ assert( n > 0 && n < 5000 );
+ for( int i = 0; i < n; i++ ) {
+ // ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; )
+ __sendFrag(pc, m->to, m->fragments[s[i]], true);
+ if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
+ ptrace( cout << "SLEEP" << endl; )
+ sleepmillis((int) pc->cs.delay);
+ }
+ }
+ return;
+ }
+ ptrace( cout << "\t.warning: gotMISSING for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
+}
+
+/* done sending a msg, clean up and notify sender to unblock */
+bool erasePending(ProtocolConnection *pc, int id) {
+ vector<MS*>::iterator it;
+ CS& cs = pc->cs;
+ for( it = cs.pendingSend.begin(); it < cs.pendingSend.end(); it++ ) {
+ MS *m = *it;
+ if( m->msgid == id ) {
+ cs.pendingSend.erase(it);
+ ptrace( cout << "..gotACK/erase: found pendingSend msg:" << id << endl; )
+ delete m;
+ cs.msgSent.notify_one();
+ return true;
+ }
+ }
+ return false;
+}
+
+// received an ACK message. so we can discard our saved copy we were trying to send, we
+// are done with it.
+void gotACK(F* fr, ProtocolConnection *pc) {
+ if( erasePending(pc, fr->__msgid()) )
+ return;
+ ptrace( cout << ".warning: got ack for an unknown msg id:" << fr->__msgid() << ' ' << pc->farEnd.toString() << endl; )
+}
+
+void MS::send() {
+ /* flow control */
+ lock lk(biglock);
+ ptrace( cout << "..MS::send() pending=" << pc->cs.pendingSend.size() << endl; )
+
+ if( pc->acceptAnyChannel() ) {
+ EndPointToPC::iterator it = pcMap.find(to);
+ if( it == pcMap.end() ) {
+ cout << ".ERROR: can't find ProtocolConnection object for " << to.toString() << endl;
+ assert(false);
+ return;
+ }
+ /* switch to the child protocolconnection */
+ pc = it->second;
+ }
+ else {
+ assert(pc->myEnd.channel == to.channel);
+ }
+
+ if( pc->first ) {
+ pc->first = false;
+ if( pc->myEnd.channel >= 0 )
+ __sendRESET(pc);
+ assert( pc->farEnd.channel == to.channel);
+ if( pc->farEnd.sa.isLocalHost() )
+ pc->cs.delayMax = 1.0;
+ }
+
+ // not locked here, probably ok to call size()
+ if( pc->cs.pendingSend.size() >= 1 ) {
+ cout << ".waiting for queued sends to complete " << pc->cs.pendingSend.size() << endl;
+ while( pc->cs.pendingSend.size() >= 1 )
+ pc->cs.msgSent.wait(lk);
+ cout << ".waitend" << endl;
+ }
+
+ lastComplainTime = curTimeMillis();
+ pc->cs.pendingSend.push_back(this);
+ /* todo: pace */
+ for( unsigned i = 0; i < fragments.size(); i++ ) {
+ __sendFrag(pc, to, fragments[i]);
+ if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
+ ptrace( cout << ".sleep " << pc->cs.delay << endl; )
+ sleepmillis((int) pc->cs.delay);
+ }
+ }
+
+ pc->cs.delaySentMsg();
+}
+
+CS::CS(ProtocolConnection& _pc) :
+ pc(_pc), delay(0), delayMax(10)
+{
+}
+
+void CS::resetIt() {
+ for( vector<MS*>::iterator i = pendingSend.begin(); i < pendingSend.end(); i++ )
+ delete (*i);
+ pendingSend.clear();
+}