summaryrefslogtreecommitdiff
path: root/grid
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2007-11-26 16:19:50 -0500
committerDwight <dmerriman@gmail.com>2007-11-26 16:19:50 -0500
commit178091b0ad390bf5171d1702e5ab87062440df5f (patch)
treebfe6c6b6633c832e2874a0cd6bd790bb05cd61d2 /grid
parent1b272bdfaf82da8fa0099898f4ed5ce36eea4942 (diff)
downloadmongo-178091b0ad390bf5171d1702e5ab87062440df5f.tar.gz
transport changes
Diffstat (limited to 'grid')
-rw-r--r--grid/protocol.h31
-rw-r--r--grid/protoimpl.h18
-rw-r--r--grid/protorecv.cpp48
-rw-r--r--grid/protosend.cpp78
4 files changed, 139 insertions, 36 deletions
diff --git a/grid/protocol.h b/grid/protocol.h
index 91765780487..8bf945ec88b 100644
--- a/grid/protocol.h
+++ b/grid/protocol.h
@@ -11,8 +11,13 @@ typedef WrappingInt MSGID;
struct Fragment;
+#if 1
#define ptrace(x)
-#define etrace(x) x
+#define etrace(x)
+#else
+#define ptrace(x) { cout << curTimeMillis() % 10000; x }
+#define etrace(x) { cout << curTimeMillis() % 10000; x }
+#endif
class F; // fragment
class MR; // message. R=receiver side.
@@ -96,7 +101,7 @@ public:
int n() { return f.size(); }
public:
int messageLenExpected;
- int nExpected;
+ int nExpected, nReceived;
void reportMissings(bool reportAll);
vector<F*> f;
vector<unsigned> reportTimes;
@@ -150,10 +155,30 @@ public:
class CS {
public:
- CS(ProtocolConnection& _pc) : pc(_pc) { }
+ CS(ProtocolConnection& _pc);
+
ProtocolConnection& pc;
vector<MS*> pendingSend;
void resetIt();
+
+ double delayMax;
+ double delay;
+ void delayGotMissing() {
+ double delayOld = delay;
+ if( delay == 0 )
+ delay = 2.0;
+ else
+ delay = delay * 1.25;
+ if( delay > delayMax ) delay = delayMax;
+ if( delay != delayOld )
+ cout << ".DELAY INCREASED TO " << delay << endl;
+ }
+ void delaySentMsg() {
+ if( delay != 0.0 ) {
+ delay = delay * 0.5;
+ cout << ".DELAY DECREASED TO " << delay << endl;
+ }
+ }
};
typedef map<EndPoint,ProtocolConnection*> EndPointToPC;
diff --git a/grid/protoimpl.h b/grid/protoimpl.h
index 8fa32ad6c36..5f019c6e732 100644
--- a/grid/protoimpl.h
+++ b/grid/protoimpl.h
@@ -112,13 +112,15 @@ inline void SEND(UDPConnection& c, Fragment &f, SockAddr& to, const char *extra=
lock lk(coutmutex);
DUMP(f, to, "\t\t\t\t\t>");
c.sendto((char *) &f, f.fragmentLen, to);
- cout << extra;
+ if( dumpPackets )
+ cout << extra;
DUMPDATA(f, "\t\t\t\t\t ");
}
// sender ->
inline void __sendFrag(ProtocolConnection *pc, EndPoint& to, F *f, bool retran) {
assert( f->internals->channel == to.channel );
+ ptrace( cout << ".sendfrag " << f->__num() << ' ' << retran << endl; )
SEND(pc->udpConnection, *f->internals, to.sa, retran ? " retran" : "");
}
@@ -160,9 +162,9 @@ inline void __sendRESET(ProtocolConnection *pc, EndPoint& to) {
inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
MSGID msgid, vector<short>& ids) {
int n = ids.size();
- ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << ' ' << to.toString() << endl; )
+ ptrace( cout << "..sendMISSING n:" << n << " firstmissing:" << ids[0] << " last:" << ids[ids.size()-1] << to.toString() << endl; )
if( n > 256 ) {
- ptrace( cout << "..info: sendMISSING limiting to 256 ids" << endl; )
+ ptrace( cout << "\t..sendMISSING limiting to 256 ids" << endl; )
n = 256;
}
Fragment *f = (Fragment*) malloc(FragHeader + n*2);
@@ -173,7 +175,7 @@ inline void __sendMISSING(ProtocolConnection *pc, EndPoint& to,
short *s = (short *) f->data;
for( int i = 0; i < n; i++ )
*s++ = ids[i];
- ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; )
+// ptrace( cout << "...sendMISSING fraglen:" << f->fragmentLen << endl; )
SEND(pc->udpConnection, *f, to.sa);
free(f);
}
@@ -186,9 +188,11 @@ inline F* __recv(UDPConnection& c, SockAddr& from) {
n = c.recvfrom((char*) f, c.mtu(), from);
if( n >= 0 )
break;
- if( !goingAway )
- sleepsecs(60);
- cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
+ if( !goingAway ) {
+ cout << ".recvfrom returned error " << getLastError() << " socket:" << c.sock << endl;
+ cout << "sleeping 2 seconds " << endl;
+ sleepsecs(2);
+ }
}
assert( f->fragmentLen == n );
{
diff --git a/grid/protorecv.cpp b/grid/protorecv.cpp
index cfb832468b4..74d488ba660 100644
--- a/grid/protorecv.cpp
+++ b/grid/protorecv.cpp
@@ -72,7 +72,7 @@ void MR::removeFromReceivingList() {
}
MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
- pc(*_pc), msgid(_msgid), from(_from)
+ pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
{
messageLenExpected = nExpected = -1;
f.push_back(0);
@@ -98,18 +98,29 @@ void MR::reportMissings(bool reportAll) {
}
inline bool MR::complete() {
+ if( nReceived == nExpected ) {
+ assert( nExpected == (int) f.size() );
+ assert( f[0] != 0 );
+ return true;
+ }
+ return false;
+/*
if( nExpected > (int) f.size() || nExpected == -1 )
return false;
for( unsigned i = 0; i < f.size(); i++ )
if( f[i] == 0 )
return false;
return true;
+*/
}
/* true=msg complete */
bool MR::got(F *frag, EndPoint& fromAddr) {
MSGID msgid = frag->__msgid();
int i = frag->__num();
+ if( i == 544 && !frag->__isREQUESTACK() ) {
+ cout << "************ GOT LAST FRAGMENT #544" << endl;
+ }
if( nExpected < 0 && i == 0 ) {
messageLenExpected = frag->__firstFragMsgLen();
int mss = pc.udpConnection.mtu() - FragHeader;
@@ -123,31 +134,49 @@ bool MR::got(F *frag, EndPoint& fromAddr) {
/* we're simply seeing if we got the data, and then acking. */
delete frag;
frag = 0;
+ reportTimes.resize(f.size(), 0);
+ if( complete() ) {
+ __sendACK(&pc, fromAddr, msgid);
+ return true;
+ }
+ reportMissings(true);
+ return false;
}
else if( f[i] != 0 ) {
- ptrace( cout << ".dup packet i:" << i << ' ' << from.toString() << endl; )
+ ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
delete frag;
return false;
}
- if( frag )
+ if( frag ) {
f[i] = frag;
+ nReceived++;
+ }
reportTimes.resize(f.size(), 0);
+
+ if( !complete() )
+ return false;
+ __sendACK(&pc, fromAddr, msgid);
+ return true;
+
+/*
if( f[0] == 0 || f[i] == 0 ) {
- reportMissings(frag == 0);
+// reportMissings(frag == 0);
return false;
}
- if( i+1 < nExpected /*not to the last frag yet*/ ) {
- if( i > 0 && f[i-1] == 0 )
- reportMissings(frag == 0);
+ if( i+1 < nExpected ) {
+//cout << "TEMP COMMENT" << endl;
+// if( i > 0 && f[i-1] == 0 )
+// reportMissings(frag == 0);
return false;
}
// last fragment received
if( !complete() ) {
- reportMissings(frag == 0);
+// reportMissings(frag == 0);
return false;
}
__sendACK(&pc, fromAddr, msgid);
return frag != 0;
+*/
}
MR* CR::recv() {
@@ -159,6 +188,7 @@ MR* CR::recv() {
x = received.back();
received.pop_back();
}
+ ptrace( cout << "TEMP: CR:recv complete! ***********************************"; )
return x;
}
@@ -184,7 +214,7 @@ void receiverThread() {
while( 1 ) {
F *f = __recv(mypc->udpConnection, fromAddr.sa);
fromAddr.channel = f->__channel();
- ptrace( cout << "..__recv() from:" << fromAddr.toString() << endl; )
+ ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
ProtocolConnection *pc = mypc;
if( fromAddr.channel != pc->myEnd.channel ) {
if( !pc->acceptAnyChannel() ) {
diff --git a/grid/protosend.cpp b/grid/protosend.cpp
index de76bd30499..9abef1f7916 100644
--- a/grid/protosend.cpp
+++ b/grid/protosend.cpp
@@ -53,27 +53,50 @@ void senderComplainThread() {
boost::thread sender(senderComplainThread);
-// received a MISSING message from the other end. retransmit.
-void gotMISSING(F* fr, ProtocolConnection *pc, EndPoint& from) {
- ptrace( cout << ".gotmis"; )
- lock lk(mutexs);
+inline MS* _slocked_getMSForFrag(F* fr, ProtocolConnection *pc) {
int id = fr->__msgid();
- ptrace( cout << "sing() msgid:" << id << endl; )
vector<MS*>::iterator it;
for( it = pc->cs.pendingSend.begin(); it<pc->cs.pendingSend.end(); it++ ) {
MS *m = *it;
- if( m->msgid == id ) {
- int n;
- short* s = fr->__getMissing(n);
- assert( n > 0 && n < 5000 );
- for( int i = 0; i < n; i++ ) {
- ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; )
- __sendFrag(pc, m->to, m->fragments[s[i]], true);
+ if( m->msgid == id )
+ return m;
+ }
+ return 0;
+}
+
+// received a MISSING message from the other end. retransmit.
+void gotMISSING(F* fr, ProtocolConnection *pc, EndPoint& from) {
+ lock lk(mutexs);
+
+ pc->cs.delayGotMissing();
+
+ ptrace( cout << ".gotMISSING() msgid:" << fr->__msgid() << endl; )
+ MS *m = _slocked_getMSForFrag(fr, pc);
+ if( m ) {
+ {
+ int df = tdiff(m->lastComplainTime, curTimeMillis()) * 2;
+ if( df < 10 )
+ df = 10;
+ if( df > 2000 )
+ df = 2000;
+ m->complainInterval = (unsigned) df;
+ cout << "TEMP: set complainInterval to " << m->complainInterval << endl;
+ }
+
+ int n;
+ short* s = fr->__getMissing(n);
+ assert( n > 0 && n < 5000 );
+ for( int i = 0; i < n; i++ ) {
+ // ptrace( cout << ".. resending frag #" << s[i] << ' ' << m->to.toString() << endl; )
+ __sendFrag(pc, m->to, m->fragments[s[i]], true);
+ if( i % 64 == 0 && pc->cs.delay >= 1.0 ) {
+ ptrace( cout << "SLEEP" << endl; )
+ sleepmillis((int) pc->cs.delay);
}
- return;
}
+ return;
}
- ptrace( cout << ".warning: got missing rq for an unknown msg id:" << id << ' ' << from.toString() << endl; )
+ ptrace( cout << "\t.warning: gotMISSING for an unknown msg id:" << fr->__msgid() << ' ' << from.toString() << endl; )
}
bool erasePending(ProtocolConnection *pc, int id) {
@@ -102,15 +125,24 @@ void gotACK(F* fr, ProtocolConnection *pc, EndPoint& from) {
void MS::send() {
/* flow control */
- ptrace( cout << "..MS::send() pending="; )
+ cout << "send() to:" << to.toString() << endl;
+ ptrace( cout << "..MS::send() pending=" << pc.cs.pendingSend.size() << endl; )
lock lk(mutexs);
- ptrace( cout << pc.cs.pendingSend.size() << endl; )
if( pc.first ) {
pc.first = false;
if( pc.myEnd.channel >= 0 )
__sendRESET(&pc, to);
pc.to = to;
+#if defined(_WIN32)
+ pc.cs.delayMax = 1.0;
+#else
+// cout << "SADDR:" << pc.to.sa.sa.sin_addr.s_addr << endl;
+ if( pc.to.sa.sa.sin_addr.s_addr == 0x100007f ) {
+// cout << "TEMP: LOCALHOST ************************************* " << endl;
+ pc.cs.delayMax = 1.0;
+ }
+#endif
}
while( pc.cs.pendingSend.size() >= 1 ) {
@@ -122,8 +154,20 @@ void MS::send() {
lastComplainTime = curTimeMillis();
pc.cs.pendingSend.push_back(this);
/* todo: pace */
- for( unsigned i = 0; i < fragments.size(); i++ )
+ for( unsigned i = 0; i < fragments.size(); i++ ) {
__sendFrag(&pc, to, fragments[i]);
+ if( i % 64 == 0 && pc.cs.delay >= 1.0 ) {
+ ptrace( cout << "SLEEP" << endl; )
+ sleepmillis((int) pc.cs.delay);
+ }
+ }
+
+ pc.cs.delaySentMsg();
+}
+
+CS::CS(ProtocolConnection& _pc) :
+ pc(_pc), delay(0), delayMax(10)
+{
}
void CS::resetIt() {