diff options
Diffstat (limited to 'ACE/protocols/ace/RMCast/Acknowledge.cpp')
-rw-r--r-- | ACE/protocols/ace/RMCast/Acknowledge.cpp | 389 |
1 files changed, 389 insertions, 0 deletions
diff --git a/ACE/protocols/ace/RMCast/Acknowledge.cpp b/ACE/protocols/ace/RMCast/Acknowledge.cpp new file mode 100644 index 00000000000..b647a255dad --- /dev/null +++ b/ACE/protocols/ace/RMCast/Acknowledge.cpp @@ -0,0 +1,389 @@ +// file : ace/RMCast/Acknowledge.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_stdlib.h" // abort +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "Acknowledge.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Acknowledge:: + Acknowledge (Parameters const& params) + : params_ (params), + hold_ (params.addr_map_size ()), + cond_ (mutex_), + nrtm_timer_ (params_.nrtm_timeout ()), + stop_ (false) + { + } + + void Acknowledge:: + in_start (In_Element* in) + { + Element::in_start (in); + } + + void Acknowledge:: + out_start (Out_Element* out) + { + Element::out_start (out); + + tracker_mgr_.spawn (track_thunk, this); + } + + void Acknowledge:: + out_stop () + { + { + Lock l (mutex_); + stop_ = true; + cond_.signal (); + } + + tracker_mgr_.wait (); + + Element::out_stop (); + } + + void Acknowledge:: + collapse (Queue& q) + { + // I would normally use iterators in the logic below but ACE_Map_Manager + // iterates over entries in no particular order so it is pretty much + // unusable here. Instead we will do slow and cumbersome find's. + // + + u64 sn (q.sn () + 1); + + for (;; ++sn) + { + Queue::ENTRY* e; + + if (q.find (sn, e) == -1 || e->int_id_.lost ()) break; + + Message_ptr m (e->int_id_.msg ()); + q.unbind (sn); + + in_->recv (m); + } + + q.sn (sn - 1); + } + + void Acknowledge:: + track () + { + while (true) + { + Messages msgs; + + { + Lock l (mutex_); + + if (stop_) + break; + + if (hold_.current_size () != 0) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); + i != e; + ++i) + { + Queue& q = (*i).int_id_; + + if (q.current_size () == 0) continue; + + track_queue ((*i).ext_id_, q, msgs); + } + } + + if (--nrtm_timer_ == 0) + { + nrtm_timer_ = params_.nrtm_timeout (); + + // Send NRTM. + // + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + + u32 max_elem (NRTM::max_count (max_payload_size)); + + Profile_ptr nrtm (create_nrtm (max_elem)); + + if (nrtm.get ()) + { + Message_ptr m (new Message); + m->add (nrtm); + msgs.push_back (m); + + } + } + } + + // Send stuff off. + // + for (Messages::Iterator i (msgs); !i.done (); i.advance ()) + { + Message_ptr* ppm; + i.next (ppm); + send (*ppm); + } + + // Go to sleep but watch for "manual cancellation" request. + // + { + ACE_Time_Value time (ACE_OS::gettimeofday ()); + time += params_.tick (); + + Lock l (mutex_); + + while (!stop_) + { + if (cond_.wait (&time) == -1) + { + if (errno != ETIME) + ACE_OS::abort (); + else + break; + } + } + + if (stop_) + break; + } + } + } + + void Acknowledge:: + track_queue (Address const& addr, Queue& q, Messages& msgs) + { + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + + u32 max_elem (NAK::max_count (max_payload_size)); + u32 count (0); + + Queue::iterator i (q.begin ()), e (q.end ()); + + // Track existing losses. + // + while (i != e) + { + NAK_ptr nak (new NAK (addr)); + + // Inner loop that fills NAK profile with up to max_elem elements. + // + for (; i != e && nak->count () < max_elem; ++i) + { + u64 sn ((*i).ext_id_); + Descr& d = (*i).int_id_; + + if (d.lost ()) + { + d.timer (d.timer () - 1); + + if (d.timer () == 0) + { + //@@ Need exp fallback. + // + d.nak_count (d.nak_count () + 1); + d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); + + nak->add (sn); + + ++count; + + // cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; + } + } + } + + // Send this NAK. + // + if (nak->count ()) + { + // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" + // << endl; + + Message_ptr m (new Message); + + m->add (Profile_ptr (nak.release ())); + + msgs.push_back (m); + } + } + + // Detect and record new losses. + // + for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) + { + if (q.find (sn) == -1) + { + q.bind (sn, Descr (1)); + } + } + } + + void Acknowledge:: + recv (Message_ptr m) + { + // Handle NRTM. There could be some nasty interaction with code + // that handles data below (like missing message and NAK). This + // is why I hold the lock at the beginning (which may be not very + // efficient). + // + Lock l (mutex_); + + if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + u64 sn (nrtm->find ((*i).ext_id_)); + + if (sn != 0) + { + Queue& q = (*i).int_id_; + + u64 old (q.max_sn ()); + + if (old < sn) + { + // Mark as lost. + // + q.bind (sn, Descr (1)); + } + } + } + } + + if (m->find (Data::id) || m->find (NoData::id)) + { + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); + + Map::ENTRY* e; + + if (hold_.find (from, e) == -1) + { + // First message from this source. + // + hold_.bind (from, Queue (sn)); + in_->recv (m); + } + else + { + Queue& q = e->int_id_; + + if (sn <= q.sn ()) + { + // Duplicate. + // + //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn + // << endl; + } + else if (sn == q.sn () + 1) + { + // Next message. + // + + q.rebind (sn, Descr (m)); + collapse (q); + } + else + { + // Some messages are missing. Insert this one into the queue. + // + q.rebind (sn, Descr (m)); + } + } + } + else + { + l.release (); + + // Just forward it up. + // + in_->recv (m); + } + } + + void Acknowledge:: + send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + size_t max_payload_size ( + params_.max_packet_size () - max_service_size); + + if (max_payload_size > data->size ()) + { + u32 max_size (max_payload_size - data->size ()); + u32 max_elem (NRTM::max_count (max_size)); + + if (max_elem > 0) + { + Lock l (mutex_); + + Profile_ptr nrtm (create_nrtm (max_elem)); + + if (nrtm.get ()) + m->add (nrtm); + } + } + + nrtm_timer_ = params_.nrtm_timeout (); // Reset timer. + } + + out_->send (m); + } + + Profile_ptr Acknowledge:: + create_nrtm (u32 max_elem) + { + // Prepare NRTM. + // + NRTM_ptr nrtm (new NRTM ()); + + // Gather the information. + // + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + Address addr ((*i).ext_id_); + Queue& q = (*i).int_id_; + + //@@ Should look for the highest known number. + // + nrtm->insert (addr, q.sn ()); + + if (--max_elem == 0) + break; + } + } + + if (nrtm->empty ()) + return Profile_ptr (0); + else + return Profile_ptr (nrtm.release ()); + } + + ACE_THR_FUNC_RETURN Acknowledge:: + track_thunk (void* obj) + { + reinterpret_cast<Acknowledge*> (obj)->track (); + return 0; + } +} |