summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/Acknowledge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/Acknowledge.cpp')
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp110
1 files changed, 75 insertions, 35 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
index 12a857f1c53..1056d6a3281 100644
--- a/protocols/ace/RMCast/Acknowledge.cpp
+++ b/protocols/ace/RMCast/Acknowledge.cpp
@@ -2,12 +2,18 @@
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
-#include <ace/Time_Value.h> // ACE_Time_Value
-#include <ace/OS_NS_unistd.h>
-#include <ace/OS_NS_sys_time.h> // gettimeofday
+#include "ace/Time_Value.h" // ACE_Time_Value
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_time.h" // gettimeofday
#include "Acknowledge.h"
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
namespace ACE_RMCast
{
ACE_Time_Value const tick (0, 5000);
@@ -108,7 +114,9 @@ namespace ACE_RMCast
// Send NRTM.
//
- Profile_ptr nrtm (create_nrtm ());
+ u32 max_elem (NRTM::max_count (max_payload_size));
+
+ Profile_ptr nrtm (create_nrtm (max_elem));
if (nrtm.get ())
{
@@ -157,48 +165,66 @@ namespace ACE_RMCast
void Acknowledge::
track_queue (Address const& addr, Queue& q, Messages& msgs)
{
- NAK_ptr nak (new NAK (addr));
+ u32 max_elem (NAK::max_count (max_payload_size));
+ u32 count (0);
+
+ Queue::iterator i (q.begin ()), e (q.end ());
// Track existing losses.
//
- for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i)
+ while (i != e)
{
- u64 sn ((*i).ext_id_);
- Descr& d = (*i).int_id_;
+ NAK_ptr nak (new NAK (addr));
- if (d.lost ())
+ // Inner loop that fills NAK profile with up to max_elem elements.
+ //
+ for (; i != e && nak->count () < max_elem; ++i)
{
- d.timer (d.timer () - 1);
+ u64 sn ((*i).ext_id_);
+ Descr& d = (*i).int_id_;
- if (d.timer () == 0)
+ if (d.lost ())
{
- //@@ Need exp fallback.
- //
- d.nak_count (d.nak_count () + 1);
- d.timer ((d.nak_count () + 1) * nak_timeout);
+ d.timer (d.timer () - 1);
- nak->add (sn);
+ if (d.timer () == 0)
+ {
+ //@@ Need exp fallback.
+ //
+ d.nak_count (d.nak_count () + 1);
+ d.timer ((d.nak_count () + 1) * nak_timeout);
- //cerr << 6 << "NAK # " << d.nak_count () << ": "
- // << addr << " " << sn << endl;
+ nak->add (sn);
+
+ ++count;
+
+ //cerr << 6 << "NAK # " << d.nak_count () << ": "
+ // << addr << " " << sn << endl;
+ }
}
}
- }
- // Send NAK.
- //
- if (nak->count ())
- {
- // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
- // << endl;
+ // Send this NAK.
+ //
+ if (nak->count ())
+ {
+ // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
+ // << endl;
- Message_ptr m (new Message);
+ Message_ptr m (new Message);
- m->add (Profile_ptr (nak.release ()));
+ m->add (Profile_ptr (nak.release ()));
- msgs.push_back (m);
+ msgs.push_back (m);
+ }
}
+ /*
+ if (count > max_elem)
+ cerr << "NAC count : " << count << endl
+ << "NAK max : " << max_elem << endl;
+ */
+
// Detect and record new losses.
//
for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
@@ -256,6 +282,8 @@ namespace ACE_RMCast
// First message from this source.
//
hold_.bind (from, Queue (sn));
+ //@@ rm
+ //
hold_.find (from, e);
in_->recv (m);
@@ -300,13 +328,20 @@ namespace ACE_RMCast
void Acknowledge::
send (Message_ptr m)
{
- if (m->find (Data::id) != 0)
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
- Lock l (mutex_);
+ u32 max_size (max_payload_size - data->size ());
+ u32 max_elem (NRTM::max_count (max_size));
- Profile_ptr nrtm (create_nrtm ());
+ if (max_elem > 0)
+ {
+ Lock l (mutex_);
+
+ Profile_ptr nrtm (create_nrtm (max_elem));
- if (nrtm.get ()) m->add (nrtm);
+ if (nrtm.get ())
+ m->add (nrtm);
+ }
nrtm_timer_ = nrtm_timeout; // Reset timer.
}
@@ -315,7 +350,7 @@ namespace ACE_RMCast
}
Profile_ptr Acknowledge::
- create_nrtm ()
+ create_nrtm (u32 max_elem)
{
// Prepare NRTM.
//
@@ -332,11 +367,16 @@ namespace ACE_RMCast
//@@ Should look for the highest known number.
//
nrtm->insert (addr, q.sn ());
+
+ if (--max_elem == 0)
+ break;
}
}
- if (nrtm->empty ()) return Profile_ptr (0);
- else return Profile_ptr (nrtm.release ());
+ if (nrtm->empty ())
+ return Profile_ptr (0);
+ else
+ return Profile_ptr (nrtm.release ());
}
ACE_THR_FUNC_RETURN Acknowledge::