summaryrefslogtreecommitdiff
path: root/grid/protorecv.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'grid/protorecv.cpp')
-rw-r--r--grid/protorecv.cpp48
1 files changed, 39 insertions, 9 deletions
diff --git a/grid/protorecv.cpp b/grid/protorecv.cpp
index cfb832468b4..74d488ba660 100644
--- a/grid/protorecv.cpp
+++ b/grid/protorecv.cpp
@@ -72,7 +72,7 @@ void MR::removeFromReceivingList() {
}
MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
- pc(*_pc), msgid(_msgid), from(_from)
+ pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
{
messageLenExpected = nExpected = -1;
f.push_back(0);
@@ -98,18 +98,29 @@ void MR::reportMissings(bool reportAll) {
}
inline bool MR::complete() {
+ if( nReceived == nExpected ) {
+ assert( nExpected == (int) f.size() );
+ assert( f[0] != 0 );
+ return true;
+ }
+ return false;
+/*
if( nExpected > (int) f.size() || nExpected == -1 )
return false;
for( unsigned i = 0; i < f.size(); i++ )
if( f[i] == 0 )
return false;
return true;
+*/
}
/* true=msg complete */
bool MR::got(F *frag, EndPoint& fromAddr) {
MSGID msgid = frag->__msgid();
int i = frag->__num();
+ if( i == 544 && !frag->__isREQUESTACK() ) {
+ cout << "************ GOT LAST FRAGMENT #544" << endl;
+ }
if( nExpected < 0 && i == 0 ) {
messageLenExpected = frag->__firstFragMsgLen();
int mss = pc.udpConnection.mtu() - FragHeader;
@@ -123,31 +134,49 @@ bool MR::got(F *frag, EndPoint& fromAddr) {
/* we're simply seeing if we got the data, and then acking. */
delete frag;
frag = 0;
+ reportTimes.resize(f.size(), 0);
+ if( complete() ) {
+ __sendACK(&pc, fromAddr, msgid);
+ return true;
+ }
+ reportMissings(true);
+ return false;
}
else if( f[i] != 0 ) {
- ptrace( cout << ".dup packet i:" << i << ' ' << from.toString() << endl; )
+ ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
delete frag;
return false;
}
- if( frag )
+ if( frag ) {
f[i] = frag;
+ nReceived++;
+ }
reportTimes.resize(f.size(), 0);
+
+ if( !complete() )
+ return false;
+ __sendACK(&pc, fromAddr, msgid);
+ return true;
+
+/*
if( f[0] == 0 || f[i] == 0 ) {
- reportMissings(frag == 0);
+// reportMissings(frag == 0);
return false;
}
- if( i+1 < nExpected /*not to the last frag yet*/ ) {
- if( i > 0 && f[i-1] == 0 )
- reportMissings(frag == 0);
+ if( i+1 < nExpected ) {
+//cout << "TEMP COMMENT" << endl;
+// if( i > 0 && f[i-1] == 0 )
+// reportMissings(frag == 0);
return false;
}
// last fragment received
if( !complete() ) {
- reportMissings(frag == 0);
+// reportMissings(frag == 0);
return false;
}
__sendACK(&pc, fromAddr, msgid);
return frag != 0;
+*/
}
MR* CR::recv() {
@@ -159,6 +188,7 @@ MR* CR::recv() {
x = received.back();
received.pop_back();
}
+ ptrace( cout << "TEMP: CR:recv complete! ***********************************"; )
return x;
}
@@ -184,7 +214,7 @@ void receiverThread() {
while( 1 ) {
F *f = __recv(mypc->udpConnection, fromAddr.sa);
fromAddr.channel = f->__channel();
- ptrace( cout << "..__recv() from:" << fromAddr.toString() << endl; )
+ ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
ProtocolConnection *pc = mypc;
if( fromAddr.channel != pc->myEnd.channel ) {
if( !pc->acceptAnyChannel() ) {