diff options
author | Dwight <dmerriman@gmail.com> | 2007-11-17 21:10:00 -0500 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2007-11-17 21:10:00 -0500 |
commit | ab654fb9eda000661cde7f3273dac59fcd47caf8 (patch) | |
tree | f60e0be572a51e982bb1eb8eb3117ef6eec128c3 /grid | |
parent | a1e51472252fc1ef7d6b9c7e74490cd953f43968 (diff) | |
download | mongo-ab654fb9eda000661cde7f3273dac59fcd47caf8.tar.gz |
transport
Diffstat (limited to 'grid')
-rw-r--r-- | grid/message.cpp | 181 | ||||
-rw-r--r-- | grid/message.h | 23 | ||||
-rw-r--r-- | grid/protocol.h | 203 | ||||
-rw-r--r-- | grid/protoimpl.h | 144 | ||||
-rw-r--r-- | grid/protorecv.cpp | 229 | ||||
-rw-r--r-- | grid/protosend.cpp | 123 |
6 files changed, 789 insertions, 114 deletions
diff --git a/grid/message.cpp b/grid/message.cpp index a4d733ef510..6450bbaf872 100644 --- a/grid/message.cpp +++ b/grid/message.cpp @@ -7,131 +7,87 @@ #include "message.h"
#include <time.h>
#include "../util/goodies.h"
+#include "protocol.h"
+#include "protoimpl.h"
-const int FragMax = 1480;
-const int MSS = FragMax - 8;
-
-#pragma pack(push)
-#pragma pack(1)
-
-struct Fragment {
- enum { MinFragmentLen = 8 + 1 };
- int msgId;
- short fragmentLen;
- short fragmentNo;
- char data[1];
- int fragmentDataLen() { return fragmentLen - 8; }
- char* fragmentData() { return data; }
-
- bool ok(int nRead) {
- if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
- cout << "recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl;
- return false;
- }
- if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) {
- cout << "recv: bad first fragment. fragmentLen:" << fragmentLen << endl;
- return false;
- }
- return true;
- }
-
- MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; }
-};
-#pragma pack(pop)
-
-int NextMsgId = -1000;
+MSGID NextMsgId;
struct MsgStart {
MsgStart() {
- srand((unsigned) time(0));
- NextMsgId = rand() ^ (int) time(0);
- assert(MsgDataHeaderSize == 20);
- assert(sizeof(Fragment) == 9);
+ NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
+ assert(MsgDataHeaderSize == 16);
+ assert(sizeof(Fragment) == FragHeader+16);
}
} msgstart;
-MessagingPort::MessagingPort() {
+int nextChannel = curTimeMillis() & 0x3fff;
+
+MessagingPort::MessagingPort(int c) {
+ myChannel = c;
+ if( c == AUTOASSIGNCHANNEL ) {
+ myChannel = nextChannel++;
+ if( myChannel > 0x7000 ) {
+ cout << "warning: myChannel is getting high and there is no checks on this!" << endl;
+ assert(false);
+ }
+ }
+ pc = 0;
}
MessagingPort::~MessagingPort() {
+ delete pc; pc = 0;
}
void MessagingPort::init(int myUdpPort) {
SockAddr me(myUdpPort);
if( !conn.init(me) ) {
- cout << "conn init failure in MessagingPort::init " << myUdpPort << endl;
+ cout << "/conn init failure in MessagingPort::init " << myUdpPort << endl;
exit(2);
}
+ EndPoint ep;
+ ep.channel = myChannel;
+ ep.sa = me;
+ cout << "/Initializing MessagingPort " << ep.toString() << endl;
+ pc = new ProtocolConnection(conn, ep);
}
-/* this is a temp implementation. it will only work if there is a single entity messaging the receiver! */
bool MessagingPort::recv(Message& m) {
- int n = conn.recvfrom(buf, BufSize, m.from);
- Fragment *ff = (Fragment *) buf;
- if( !ff->ok(n) )
- return false;
+ MR *mr = pc->cr.recv();
- MsgData *somd = ff->startOfMsgData();
+ Fragment *first = mr->f[0]->internals;
+ m.channel = first->channel;
+ m.from = mr->from.sa;
+ MsgData *somd = first->startOfMsgData();
int totalLen = somd->len;
- if( ff->fragmentDataLen() >= totalLen ) {
- // it's a short message, we got it all in one packet
+
+ if( mr->n() == 1 ) {
+ // only one fragment, so use its buffer instead of making
+ // a copy
m.setData(somd, false);
return true;
}
- /* we'll need to read more */
- char *msgData = (char *) malloc(totalLen);
- m.setData((MsgData*) msgData, true);
- char *p = msgData;
- memcpy(p, somd, ff->fragmentDataLen());
- int sofar = ff->fragmentDataLen();
- int wanted = totalLen;
- p += ff->fragmentDataLen();
- wanted -= ff->fragmentDataLen();
-
- /* note out of order, retransmits not done. just get us going on localhost */
- int msgid = ff->msgId;
- int expectedFragmentNo = 1;
- SockAddr from;
- while( 1 ) {
- char b[FragMax];
- int n = conn.recvfrom(b, sizeof(b), from);
- Fragment *f = (Fragment *) b;
- if( !f->ok(n) )
- return false;
- if( f->msgId != msgid ) {
- cout << "bad fragment, wrong msg id, expected:" << msgid << " got:" << f->msgId << endl;
- return false;
- }
- if( f->fragmentNo != expectedFragmentNo ) {
- cout << "bad fragment, wrong fragmentNo, expected:" << expectedFragmentNo << " got:" << f->fragmentNo << endl;
- return false;
- }
- if( from != m.from ) {
- cout << "wrong sender? impl not done for multiple 'clients'" << endl;
- assert(false);
- return false;
- }
-
- memcpy(p, f->fragmentData(), f->fragmentDataLen());
- p += f->fragmentDataLen();
- wanted -= f->fragmentDataLen();
- expectedFragmentNo++;
-
- if( wanted <= 0 ) {
- assert( wanted == 0 );
- break;
- }
+ MsgData *fullmsg = (MsgData *) malloc(totalLen);
+ char *p = (char *) fullmsg;
+ for( int i = 0; i < mr->n(); i++ ) {
+ Fragment *frag = mr->f[i]->internals;
+ memcpy(p, frag->data, frag->fragmentDataLen());
+ p += frag->fragmentDataLen();
}
+ assert( p - ((char *)fullmsg) == totalLen );
+
+ mr->freeFrags();
+ m.setData(fullmsg, true);
return true;
}
void MessagingPort::reply(Message& received, Message& response) {
- say(received.from, response, received.data->id);
+ say(received.channel, received.from, response, received.data->id);
}
bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) {
- say(to, toSend);
+ assert( myChannel >= 0 );
+ say(myChannel, to, toSend);
while( 1 ) {
bool ok = recv(response);
if( !ok )
@@ -145,27 +101,34 @@ bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) { return true;
}
-void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
- toSend.data->reserved = 0;
- toSend.data->id = NextMsgId++;
-// cout << "TEMP: sending msgid " << toSend.data->id << endl;
+void MessagingPort::say(int channel, SockAddr& to, Message& toSend, int responseTo) {
+ MSGID msgid = NextMsgId;
+ ++NextMsgId;
+ toSend.data->id = msgid;
toSend.data->responseTo = responseTo;
+ toSend.channel = channel; assert(channel>0);
+
+ EndPoint ep;
+ ep.channel = channel;
+ ep.sa = to;
+ MS *ms = new MS(pc, ep, msgid);
+ int mss = conn.mtu() - FragHeader;
int left = toSend.data->len;
- assert( left > 0 && left <= 16 * 1024 * 1024 );
- Fragment *f = (Fragment *) buf;
- f->msgId = toSend.data->id;
- f->fragmentNo = 0;
+ int i = 0;
char *p = (char *) toSend.data;
- while( left > 0 ) {
- int l = left > MSS ? MSS : left;
- f->fragmentLen = l + 8;
- memcpy(f->data, p, l);
- p += l;
- left -= l;
- conn.sendto(buf, l+8, to);
- f->fragmentNo++;
-
- sleepmillis(50);
+ while( left>0 ) {
+ int datalen = left>=mss ? mss : left;
+ Fragment *frag = (Fragment *)malloc(mss+FragHeader);
+ frag->msgId = msgid;
+ frag->channel = channel;
+ frag->fragmentLen = datalen + FragHeader;
+ frag->fragmentNo = i++;
+ memcpy(frag->data, p, datalen);
+ p += datalen;
+ ms->fragments.push_back(new F(frag));
+ left -= datalen;
}
+
+ ms->send();
}
diff --git a/grid/message.h b/grid/message.h index 83ced78ed11..953b4162387 100644 --- a/grid/message.h +++ b/grid/message.h @@ -3,6 +3,7 @@ #pragma once
#include "../util/sock.h"
+#include "protocol.h"
class Message;
@@ -10,7 +11,11 @@ class MessagingPort { public:
enum { DBPort = 27017 };
- MessagingPort();
+ /* channels: if you are a server you can pass ANYCHANNEL to indicate you never initiate a
+ msg to someone yourself. the default will assign a new channel id to the messagingport.
+ */
+ enum { AUTOASSIGNCHANNEL = -1, ANYCHANNEL = -2 };
+ MessagingPort(int channel = AUTOASSIGNCHANNEL);
~MessagingPort();
void init(int myUdpPort);
@@ -21,10 +26,16 @@ public: bool recv(Message& m);
void reply(Message& received, Message& response);
bool call(SockAddr& to, Message& toSend, Message& response);
- void say(SockAddr& to, Message& toSend, int responseTo = -1);
+ void say(int channel, SockAddr& to, Message& toSend, int responseTo = -1);
+ void say(SockAddr& to, Message& toSend, int responseTo = -1) {
+ say(channel(), to, toSend, responseTo);
+ }
+ int channel() { return myChannel; }
private:
+ ProtocolConnection *pc;
UDPConnection conn;
+ int myChannel;
enum { BufSize = 64 * 1024 };
char buf[BufSize];
};
@@ -47,8 +58,7 @@ enum Operations { struct MsgData {
int len; /* len of the msg, including this field */
- int reserved;
- int id; /* request/reply id's match... */
+ MSGID id; /* request/reply id's match... */
int responseTo; /* id of the message we are responding to */
int operation;
char _data[4];
@@ -62,11 +72,14 @@ inline int MsgData::dataLen() { return len - MsgDataHeaderSize; } class Message {
public:
- Message() { data = 0; freeIt = false; }
+ Message() { data = 0; freeIt = false; channel = -1000; }
~Message() { reset(); }
SockAddr from;
MsgData *data;
+ int channel;
+
+// int channel() { return data->channel; }
void reset() {
if( freeIt && data )
diff --git a/grid/protocol.h b/grid/protocol.h new file mode 100644 index 00000000000..bb95503415d --- /dev/null +++ b/grid/protocol.h @@ -0,0 +1,203 @@ +// protocol.h
+
+#pragma once
+
+#include "boost/thread/mutex.hpp"
+#include "boost/thread/condition.hpp"
+#include "../util/sock.h"
+#include "../util/goodies.h"
+
+typedef WrappingInt MSGID;
+
+struct Fragment;
+
+class F; // fragment
+class MR; // message. R=receiver side.
+class CR; // connection receiver side
+class MS; // message S=sender side.
+class CS; // connection sender side
+class ProtocolConnection; // connection overall
+
+/* ip:port:channel
+ We have one receiver thread per process (per ip address destination), and then
+ multiplex messages out to multiple connections(generally one per thread) which
+ each have a 'channel'.
+*/
+class EndPoint {
+public:
+ int channel;
+ SockAddr sa;
+ bool operator<(const EndPoint& r) const {
+ if( channel != r.channel )
+ return channel < r.channel;
+ return sa < r.sa;
+ }
+ string toString() {
+ stringstream out;
+ out << sa.toString() << ':' << channel;
+ return out.str();
+ }
+};
+
+/* the double underscore stuff here is the actual implementation glue.
+ wanted to keep that stuff clean and separate. so put your implementation
+ of these in protoimpl.h.
+*/
+
+// sender ->
+void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *); // transmit a fragment
+void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid, int fragNo); // transmit the REQUEST ACK msg
+
+// receiver ->
+void __sendACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid); // transmit ACK
+void __sendMISSING(ProtocolConnection *pc, EndPoint& to, MSGID msgid, vector<short>& ids); // transmit MISSING
+
+// -> receiver
+F* __recv(UDPConnection& c, SockAddr& from); /* recv from socket a fragment and pass back */
+
+class F {
+public:
+ F(Fragment *f);
+ ~F();
+ int __num(); //frag #
+ MSGID __msgid();
+ int __channel();
+ bool __isREQUESTACK(); // if true, this is just a request for acknowledgement not real data
+
+ // sender side:
+ bool __isACK(); // if true, this is an ack of a message
+ bool __isMISSING(); // if true, list of frags to retransmit
+ short* __getMissing(int& n); // get the missing fragno list
+
+ Fragment *internals;
+ enum { NORMAL, ACK, MISSING, REQUESTACK, RESET } op;
+};
+
+class MR {
+public:
+ MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from);
+ ~MR() { freeFrags(); }
+ void freeFrags() {
+ for( unsigned i = 0; i < f.size(); i++ )
+ delete f[i];
+ }
+ bool got(F *f, EndPoint& from); // received a fragment
+ bool gotFirst() { return f[0] != 0; }
+ ProtocolConnection& pc;
+ void removeFromReceivingList();
+ bool complete();
+ const MSGID msgid;
+ int n() { return f.size(); }
+public:
+ void reportMissings(bool reportAll);
+ vector<F*> f;
+ vector<unsigned> reportTimes;
+ EndPoint from;
+};
+
+class MsgTracker {
+public:
+ std::list<MSGID> recentlyReceivedList;
+ std::set<MSGID> recentlyReceived;
+ MSGID lastFullyReceived;
+
+ void reset() {
+ recentlyReceivedList.clear();
+ recentlyReceived.clear();
+ lastFullyReceived = 0;
+ }
+
+ void got(MSGID m) {
+ unsigned sz = recentlyReceived.size();
+ if( sz > 256 ) {
+ recentlyReceived.erase(recentlyReceivedList.front());
+ recentlyReceivedList.pop_front();
+ }
+ recentlyReceivedList.push_back(m);
+ recentlyReceived.insert(m);
+ if( m > lastFullyReceived || sz == 0 )
+ lastFullyReceived = m;
+ }
+};
+
+class CR {
+ friend class MR;
+public:
+ ~CR() { cout << ".warning: ~CR() not implemented" << endl; }
+ CR(ProtocolConnection& _pc) : pc(_pc) { }
+ MR* recv();
+public:
+ MR* getPendingMsg(F *fr, EndPoint& fromAddr);
+ bool oldMessageId(int channel, MSGID m);
+ void queueReceived(MR*);
+
+ ProtocolConnection& pc;
+ boost::condition receivedSome;
+ vector<MR*> received;
+ map<int,MR*> pendingMessages; /* partly received msgs */
+ map<int,MsgTracker*> trackers; /* channel -> tracker */
+};
+
+/* -- sender side ------------------------------------------------*/
+
+class CS {
+public:
+ CS(ProtocolConnection& _pc) : pc(_pc) { }
+ ProtocolConnection& pc;
+ vector<MS*> pendingSend;
+};
+
+typedef map<EndPoint,ProtocolConnection*> EndPointToPC;
+extern EndPointToPC pcMap;
+
+/* -- overall Connection object ----------------------------------*/
+
+#pragma warning( disable: 4355 )
+
+class ProtocolConnection {
+public:
+ ProtocolConnection(UDPConnection& c, EndPoint& e) : udpConnection(c), myEnd(e), cs(*this), cr(*this) {
+ init();
+ }
+ ~ProtocolConnection() {
+ cout << ".warning: ~ProtocolConnection() not implemented (leaks mem)" << endl;
+ }
+ bool acceptAnyChannel() const;
+ UDPConnection& udpConnection;
+ /* note the channel for myEnd might be "any" --
+ so you can't use channel for sending. Use MS/MR.to
+ for that.
+ */
+ EndPoint myEnd;
+ CR cr;
+ CS cs;
+ bool first;
+private:
+ void init();
+};
+
+/* -- sender side ------------------------------------------------*/
+
+class MS {
+public:
+ MS(ProtocolConnection *_pc, EndPoint &_to, MSGID _msgid) :
+ pc(*_pc), to(_to), msgid(_msgid), complainInterval(50) { }
+ ~MS() {
+ for( unsigned i = 0; i < fragments.size(); i++ )
+ delete fragments[i];
+ }
+
+ /* init fragments, then call this */
+ void send();
+
+ vector<F*> fragments;
+
+ /* request retrainsmissions. */
+ bool complain(unsigned now);
+
+ ProtocolConnection& pc;
+ EndPoint to;
+ const MSGID msgid;
+ unsigned lastComplainTime;
+ unsigned complainInterval;
+};
diff --git a/grid/protoimpl.h b/grid/protoimpl.h new file mode 100644 index 00000000000..f4047bbaad3 --- /dev/null +++ b/grid/protoimpl.h @@ -0,0 +1,144 @@ +// protoimpl.h
+
+#pragma once
+
+#include "message.h"
+
+const int FragMax = 1480;
+const int FragHeader = 10;
+const int MSS = FragMax - FragHeader;
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct Fragment {
+ enum { MinFragmentLen = FragHeader + 1 };
+ MSGID msgId;
+ short channel;
+ short fragmentLen;
+ short fragmentNo;
+ char data[16];
+ int fragmentDataLen() { return fragmentLen - FragHeader; }
+ char* fragmentData() { return data; }
+
+ bool ok(int nRead) {
+ if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
+ cout << "recv: fragment bad. fragmentLen:" << fragmentLen << " nRead:" << nRead << endl;
+ return false;
+ }
+ if( fragmentNo == 0 && fragmentLen < MinFragmentLen + MsgDataHeaderSize ) {
+ cout << "recv: bad first fragment. fragmentLen:" << fragmentLen << endl;
+ return false;
+ }
+ return true;
+ }
+
+ MsgData* startOfMsgData() { assert(fragmentNo == 0); return (MsgData *) data; }
+};
+#pragma pack(pop)
+
+// sender ->
+inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f) {
+ assert( f->internals->channel == to.channel );
+ pc->udpConnection.sendto((char *) f->internals, f->internals->fragmentLen, to.sa);
+}
+
+inline void __sendREQUESTACK(ProtocolConnection *pc, EndPoint& to,
+ MSGID msgid, int fragNo) {
+ Fragment f;
+ f.msgId = msgid;
+ f.channel = to.channel; assert( f.channel >= 0 );
+ f.fragmentNo = ((short) -fragNo) -1;
+ f.fragmentLen = FragHeader;
+ cout << ".requesting ack, fragno=" << f.fragmentNo << " msg:" << f.msgId << ' ' << to.toString() << endl;
+ pc->udpConnection.sendto((char *)&f, f.fragmentLen, to.sa);
+}
+
+// receiver ->
+inline void __sendACK(ProtocolConnection *pc, EndPoint& to, MSGID msgid) {
+ cout << "...__sendACK() to:" << to.toString() << " msg:" << msgid << endl;
+ Fragment f;
+ f.msgId = msgid;
+ f.channel = to.channel; assert( f.channel >= 0 );
+ f.fragmentNo = -32768;
+ f.fragmentLen = FragHeader;
+ pc->udpConnection.sendto((char *)&f, f.fragmentLen, to.sa);
+}
+
+/* this is to clear old state for the channel in terms of what msgids are
+ already sent.
+*/
+inline void __sendRESET(ProtocolConnection *pc, EndPoint& to) {
+ Fragment f;
+ f.msgId = -1;
+ f.channel = to.channel; assert( f.channel >= 0 );
+ f.fragmentNo = -32766;
+ f.fragmentLen = FragHeader;
+ cout << "...__sendRESET() to:" << to.toString() << endl;
+ pc->udpConnection.sendto((char *)&f, f.fragmentLen, to.sa);
+}
+
+inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
+ MSGID msgid, vector<short>& ids) {
+ int n = ids.size(); cout << "..sendMISSING n:" << n << ' ' << to.toString() << endl;
+ if( n > 256 ) {
+ cout << "..info: sendMISSING: n:" << n << ' ' << to.toString() << endl;
+ n = 256;
+ }
+ Fragment *f = (Fragment*) malloc(FragHeader + n*2);
+ f->msgId = msgid;
+ f->channel = to.channel; assert( f->channel >= 0 );
+ f->fragmentNo = -32767;
+ f->fragmentLen = FragHeader + n*2;
+ short *s = (short *) f->data;
+ for( int i = 0; i < n; i++ )
+ *s++ = ids[i];
+ cout << "...sendMISSING fraglen:" << f->fragmentLen << endl;
+ pc->udpConnection.sendto((char *)f, f->fragmentLen, to.sa);
+// TEMPcomment free(f);
+}
+
+// -> receiver
+inline F* __recv(UDPConnection& c, SockAddr& from) {
+ Fragment *f = (Fragment *) malloc(c.mtu());
+ int n;
+ while( 1 ) {
+ n = c.recvfrom((char*) f, c.mtu(), from);
+ if( n >= 0 )
+ break;
+ if( !goingAway )
+ sleepsecs(60);
+ cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
+ }
+ assert( f->fragmentLen == n );
+ return new F(f);
+}
+
+inline F::F(Fragment *f) : internals(f), op(NORMAL) {
+ if( internals->fragmentNo < 0 ) {
+ if( internals->fragmentNo == -32768 ) {
+ op = ACK;
+ cout << ".got ACK msg:" << internals->msgId << endl;
+ } else if( internals->fragmentNo == -32767 ) {
+ op = MISSING;
+ cout << ".got MISSING" << endl;
+ } else if( internals->fragmentNo == -32766 ) {
+ op = RESET;
+ } else {
+ op = REQUESTACK;
+ internals->fragmentNo = -(internals->fragmentNo+1);
+ cout << ".got REQUESTACK frag:" << internals->fragmentNo << " msg:" << internals->msgId << endl;
+ }
+ }
+}
+inline F::~F() { free(internals); internals=0; }
+inline int F::__num() { return internals->fragmentNo; }
+inline MSGID F::__msgid() { return internals->msgId; }
+inline int F::__channel() { return internals->channel; }
+inline bool F::__isREQUESTACK() { return op == REQUESTACK; }
+inline bool F::__isACK() { return op == ACK; }
+inline bool F::__isMISSING() { return op == MISSING; }
+inline short* F::__getMissing(int& n) {
+ n = internals->fragmentDataLen() / 2;
+ return (short *) internals->fragmentData();
+}
diff --git a/grid/protorecv.cpp b/grid/protorecv.cpp new file mode 100644 index 00000000000..bb684209dd0 --- /dev/null +++ b/grid/protorecv.cpp @@ -0,0 +1,229 @@ +// protorecv.cpp
+
+#include "stdafx.h"
+#include "protocol.h"
+#include "boost/thread.hpp"
+#include "../util/goodies.h"
+#include "../util/sock.h"
+#include "protoimpl.h"
+
+using namespace boost;
+typedef boost::mutex::scoped_lock lock;
+
+boost::mutex mutexr;
+boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne
+ProtocolConnection *threadUseThisOne = 0;
+void receiverThread();
+
+map<SockAddr,ProtocolConnection*> firstPCForThisAddress;
+
+EndPointToPC pcMap;
+
+inline bool ProtocolConnection::acceptAnyChannel() const {
+ return myEnd.channel == MessagingPort::ANYCHANNEL;
+}
+
+void ProtocolConnection::init() {
+ first = true;
+ lock lk(mutexr);
+ if( firstPCForThisAddress.count(myEnd.sa) == 0 ) {
+ firstPCForThisAddress[myEnd.sa] = this;
+ // need a receiver thread
+ boost::thread receiver(receiverThread);
+ threadUseThisOne = this;
+ pcMap[myEnd] = this;
+ threadActivate.notify_one();
+ return;
+ }
+ pcMap[myEnd] = this;
+}
+
+/* find message for fragment */
+MR* CR::getPendingMsg(F *fr, EndPoint& fromAddr) {
+ MR*& m = pendingMessages[fr->__msgid()];
+ if( m == 0 )
+ m = new MR(&pc, fr->__msgid(), fromAddr);
+ return m;
+}
+
+void MR::removeFromReceivingList() {
+ pc.cr.pendingMessages.erase(msgid);
+}
+
+MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
+ pc(*_pc), msgid(_msgid), from(_from)
+{
+ f.push_back(0);
+}
+
+void MR::reportMissings(bool reportAll) {
+ vector<short> missing;
+ unsigned t = curTimeMillis();
+ for( unsigned i = 0; i < f.size(); i++ ) {
+ if( f[i] == 0 ) {
+ int diff = tdiff(reportTimes[i],t);
+ assert( diff >= 0 || reportTimes[i] == 0 );
+ if( diff > 100 || reportTimes[i]==0 ||
+ (reportAll && diff > 1) ) {
+ reportTimes[i] = t;
+ assert( i < 25000 );
+ missing.push_back(i);
+ }
+ }
+ }
+ if( !missing.empty() )
+ __sendMISSING(&pc, from, msgid, missing);
+}
+
+inline bool MR::complete() {
+ for( unsigned i = 0; i < f.size(); i++ )
+ if( f[i] == 0 )
+ return false;
+ return true;
+}
+
+/* true=msg complete */
+bool MR::got(F *frag, EndPoint& fromAddr) {
+ MSGID msgid = frag->__msgid();
+ int i = frag->__num();
+ if( i >= (int) f.size() )
+ f.resize(i+1, 0);
+ if( frag->__isREQUESTACK() ) {
+ cout << "...got(): processing a REQUESTACK" << endl;
+ /* we're simply seeing if we got the data, and then acking. */
+ delete frag;
+ frag = 0;
+ }
+ else if( f[i] != 0 ) {
+ cout << ".dup packet i:" << i << ' ' << from.toString() << endl;
+ delete frag;
+ return false;
+ }
+ if( frag )
+ f[i] = frag;
+ reportTimes.resize(f.size(), 0);
+ if( f[0] == 0 || f[i] == 0 ) {
+ reportMissings(frag == 0);
+ return false;
+ }
+ if( i+1 < n() /*not to the last frag yet*/ ) {
+ if( i > 0 && f[i-1] == 0 )
+ reportMissings(frag == 0);
+ return false;
+ }
+ // last fragment received
+ if( !complete() ) {
+ reportMissings(frag == 0);
+ return false;
+ }
+ __sendACK(&pc, fromAddr, msgid);
+ return frag != 0;
+}
+
+MR* CR::recv() {
+ MR *x;
+ {
+ lock lk(mutexr);
+ while( received.empty() )
+ receivedSome.wait(lk);
+ x = received.back();
+ received.pop_back();
+ }
+ return x;
+}
+
+// this is how we tell protosend.cpp we got these
+void gotACK(F*, ProtocolConnection *, EndPoint&);
+void gotMISSING(F*, ProtocolConnection *, EndPoint&);
+
+void receiverThread() {
+ ProtocolConnection *mypc;
+ {
+ lock lk(mutexr);
+ while( 1 ) {
+ if( threadUseThisOne != 0 ) {
+ mypc = threadUseThisOne;
+ threadUseThisOne = 0;
+ break;
+ }
+ threadActivate.wait(lk);
+ }
+ }
+
+ EndPoint fromAddr;
+ while( 1 ) {
+ F *f = __recv(mypc->udpConnection, fromAddr.sa);
+ fromAddr.channel = f->__channel();
+ cout << "..__recv() from:" << fromAddr.toString() << endl;
+ ProtocolConnection *pc = mypc;
+ if( fromAddr.channel != pc->myEnd.channel ) {
+ if( !pc->acceptAnyChannel() ) {
+ cout << ".WARNING: wrong channel\n";
+ cout << ". expected:" << pc->myEnd.channel << " got:" << fromAddr.channel << '\n';
+ cout << ". this may be ok if you just restarted" << endl;
+ delete f;
+ continue;
+ }
+ }
+
+ MsgTracker *& track = pc->cr.trackers[f->__channel()];
+ if( track == 0 ) {
+ cout << "..creating MsgTracker for channel " << f->__channel() << endl;
+ track = new MsgTracker();
+ }
+
+ if( f->op != F::NORMAL ) {
+ if( f->__isACK() ) {
+ gotACK(f, pc, fromAddr);
+ delete f;
+ continue;
+ }
+ if( f->__isMISSING() ) {
+ cout << "....temp: calling gotMISSING" << endl;
+ gotMISSING(f, pc, fromAddr);
+ delete f;
+ continue;
+ }
+ if( f->op == F::RESET ) {
+ cout << ".got RESET" << endl;
+ track->reset();
+ delete f;
+ continue;
+ }
+ }
+
+ if( track->recentlyReceived.count(f->__msgid()) ) {
+ // already done with it. ignore, other than acking.
+ if( f->__isREQUESTACK() )
+ __sendACK(pc, fromAddr, f->__msgid());
+ else {
+ cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl;
+ }
+ delete f;
+ continue;
+ }
+
+ if( f->__msgid() <= track->lastFullyReceived && !track->recentlyReceivedList.empty() ) {
+ // reconnect on an old channel?
+ cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived
+ << " conn:" << fromAddr.toString() << endl;
+ }
+
+ MR *m = pc->cr.getPendingMsg(f, fromAddr); /* todo: optimize for single fragment case? */
+ if( m == 0 ) {
+ cout << "..getPendingMsg() returns 0" << endl;
+ delete f;
+ continue;
+ }
+ if( m->got(f, fromAddr) ) {
+ track->got(m->msgid);
+ m->removeFromReceivingList();
+ {
+ lock lk(mutexr);
+ pc->cr.received.push_back(m);
+ pc->cr.receivedSome.notify_one();
+ }
+ }
+ }
+}
+
diff --git a/grid/protosend.cpp b/grid/protosend.cpp new file mode 100644 index 00000000000..045a24ae673 --- /dev/null +++ b/grid/protosend.cpp @@ -0,0 +1,123 @@ +// protosend.cpp
+
+/* todo: redo locking! */
+
+#include "stdafx.h"
+#include "protocol.h"
+#include "boost/thread.hpp"
+#include "../util/goodies.h"
+#include "../util/sock.h"
+#include "protoimpl.h"
+using namespace boost;
+typedef boost::mutex::scoped_lock lock;
+
+/* todo: granularity? */
+boost::mutex mutexs;
+boost::condition msgSent;
+
+bool erasePending(ProtocolConnection *pc, int id);
+
+inline bool MS::complain(unsigned now) {
+ if( tdiff(lastComplainTime, now) < (int) complainInterval )
+ return false;
+ if( complainInterval > 4000 ) {
+ cout << ".no ack of send after " << complainInterval/1000 <<
+ " seconds " << to.toString() << endl;
+ if( complainInterval > 35000 ) {
+ cout << ".GIVING UP on sending message" << endl;
+ erasePending(&pc, msgid);
+ return true;
+ }
+ }
+ complainInterval *= 2;
+ lastComplainTime = now;
+ __sendREQUESTACK(&pc, to, msgid, fragments.size()-1);
+ return false;
+}
+
+// where's my ack?
+void senderComplainThread() {
+ while( 1 ) {
+ sleepmillis(50);
+ unsigned now = curTimeMillis();
+ for( EndPointToPC::iterator i = pcMap.begin(); i != pcMap.end(); i++ ) {
+ ProtocolConnection *pc = i->second;
+ for( vector<MS*>::iterator j = pc->cs.pendingSend.begin(); j < pc->cs.pendingSend.end(); j++ )
+ if( (*j)->complain(now) )
+ break;
+ }
+ }
+}
+
+boost::thread sender(senderComplainThread);
+
+// received a MISSING message from the other end. retransmit.
+void gotMISSING(F* fr, ProtocolConnection *pc, EndPoint& from) {
+ cout << "gotmis";
+ lock lk(mutexs);
+ int id = fr->__msgid();
+ cout << "sing() impl " << id << endl;
+ vector<MS*>::iterator it;
+ for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) {
+ MS *m = *it;
+ if( m->msgid == id ) {
+ int n;
+ short* s = fr->__getMissing(n);
+ assert( n > 0 && n < 5000 );
+ for( int i = 0; i < n; i++ ) {
+ cout << "..sending missing fragment " << i << ' ' << m->to.toString() << endl;
+ __sendFrag(pc, m->to, m->fragments[i]);
+ }
+ return;
+ }
+ }
+ cout << ".warning: got missing rq for an unknown msg id:" << id
+ << ' ' << from.toString() << endl;
+}
+
+bool erasePending(ProtocolConnection *pc, int id) {
+ vector<MS*>::iterator it;
+ for( it = pc->cs.pendingSend.begin(); it < pc->cs.pendingSend.end(); it++ ) {
+ MS *m = *it;
+ if( m->msgid == id ) {
+ pc->cs.pendingSend.erase(it);
+ cout << "..gotACK/erase: found pendingSend msg:" << id << endl;
+ delete m;
+ msgSent.notify_one();
+ return true;
+ }
+ }
+ return false;
+}
+
+// received an ACK message. so we can discard our saved copy we were trying to send, we
+// are done with it.
+void gotACK(F* fr, ProtocolConnection *pc, EndPoint& from) {
+ lock lk(mutexs);
+ if( erasePending(pc, fr->__msgid()) )
+ return;
+ cout << ".warning: got ack for an unknown msg id:" << fr->__msgid()
+ << ' ' << from.toString() << endl;
+}
+
+void MS::send() {
+ /* flow control */
+ cout << "..MS::send() pending=";
+ lock lk(mutexs);
+ cout << pc.cs.pendingSend.size() << endl;
+
+ if( pc.first ) {
+ pc.first = false;
+ if( pc.myEnd.channel >= 0 )
+ __sendRESET(&pc, to);
+ }
+
+ while( pc.cs.pendingSend.size() >= 10 )
+ msgSent.wait(lk);
+ lastComplainTime = curTimeMillis();
+ pc.cs.pendingSend.push_back(this);
+ /* todo: pace */
+ for( unsigned i = 0; i < fragments.size(); i++ )
+ __sendFrag(&pc, to, fragments[i]);
+}
+
|