diff options
Diffstat (limited to 'protocols/ace/RMCast/Acknowledge.cpp')
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.cpp | 110 |
1 files changed, 75 insertions, 35 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp index 12a857f1c53..1056d6a3281 100644 --- a/protocols/ace/RMCast/Acknowledge.cpp +++ b/protocols/ace/RMCast/Acknowledge.cpp @@ -2,12 +2,18 @@ // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ -#include <ace/Time_Value.h> // ACE_Time_Value -#include <ace/OS_NS_unistd.h> -#include <ace/OS_NS_sys_time.h> // gettimeofday +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_time.h" // gettimeofday #include "Acknowledge.h" +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + namespace ACE_RMCast { ACE_Time_Value const tick (0, 5000); @@ -108,7 +114,9 @@ namespace ACE_RMCast // Send NRTM. // - Profile_ptr nrtm (create_nrtm ()); + u32 max_elem (NRTM::max_count (max_payload_size)); + + Profile_ptr nrtm (create_nrtm (max_elem)); if (nrtm.get ()) { @@ -157,48 +165,66 @@ namespace ACE_RMCast void Acknowledge:: track_queue (Address const& addr, Queue& q, Messages& msgs) { - NAK_ptr nak (new NAK (addr)); + u32 max_elem (NAK::max_count (max_payload_size)); + u32 count (0); + + Queue::iterator i (q.begin ()), e (q.end ()); // Track existing losses. // - for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i) + while (i != e) { - u64 sn ((*i).ext_id_); - Descr& d = (*i).int_id_; + NAK_ptr nak (new NAK (addr)); - if (d.lost ()) + // Inner loop that fills NAK profile with up to max_elem elements. + // + for (; i != e && nak->count () < max_elem; ++i) { - d.timer (d.timer () - 1); + u64 sn ((*i).ext_id_); + Descr& d = (*i).int_id_; - if (d.timer () == 0) + if (d.lost ()) { - //@@ Need exp fallback. - // - d.nak_count (d.nak_count () + 1); - d.timer ((d.nak_count () + 1) * nak_timeout); + d.timer (d.timer () - 1); - nak->add (sn); + if (d.timer () == 0) + { + //@@ Need exp fallback. + // + d.nak_count (d.nak_count () + 1); + d.timer ((d.nak_count () + 1) * nak_timeout); - //cerr << 6 << "NAK # " << d.nak_count () << ": " - // << addr << " " << sn << endl; + nak->add (sn); + + ++count; + + //cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; + } } } - } - // Send NAK. - // - if (nak->count ()) - { - // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" - // << endl; + // Send this NAK. + // + if (nak->count ()) + { + // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" + // << endl; - Message_ptr m (new Message); + Message_ptr m (new Message); - m->add (Profile_ptr (nak.release ())); + m->add (Profile_ptr (nak.release ())); - msgs.push_back (m); + msgs.push_back (m); + } } + /* + if (count > max_elem) + cerr << "NAC count : " << count << endl + << "NAK max : " << max_elem << endl; + */ + // Detect and record new losses. // for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) @@ -256,6 +282,8 @@ namespace ACE_RMCast // First message from this source. // hold_.bind (from, Queue (sn)); + //@@ rm + // hold_.find (from, e); in_->recv (m); @@ -300,13 +328,20 @@ namespace ACE_RMCast void Acknowledge:: send (Message_ptr m) { - if (m->find (Data::id) != 0) + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) { - Lock l (mutex_); + u32 max_size (max_payload_size - data->size ()); + u32 max_elem (NRTM::max_count (max_size)); - Profile_ptr nrtm (create_nrtm ()); + if (max_elem > 0) + { + Lock l (mutex_); + + Profile_ptr nrtm (create_nrtm (max_elem)); - if (nrtm.get ()) m->add (nrtm); + if (nrtm.get ()) + m->add (nrtm); + } nrtm_timer_ = nrtm_timeout; // Reset timer. } @@ -315,7 +350,7 @@ namespace ACE_RMCast } Profile_ptr Acknowledge:: - create_nrtm () + create_nrtm (u32 max_elem) { // Prepare NRTM. // @@ -332,11 +367,16 @@ namespace ACE_RMCast //@@ Should look for the highest known number. // nrtm->insert (addr, q.sn ()); + + if (--max_elem == 0) + break; } } - if (nrtm->empty ()) return Profile_ptr (0); - else return Profile_ptr (nrtm.release ()); + if (nrtm->empty ()) + return Profile_ptr (0); + else + return Profile_ptr (nrtm.release ()); } ACE_THR_FUNC_RETURN Acknowledge:: |