summaryrefslogtreecommitdiff
path: root/protocols/ace/RMCast/Acknowledge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/ace/RMCast/Acknowledge.cpp')
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp389
1 files changed, 0 insertions, 389 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
deleted file mode 100644
index b647a255dad..00000000000
--- a/protocols/ace/RMCast/Acknowledge.cpp
+++ /dev/null
@@ -1,389 +0,0 @@
-// file : ace/RMCast/Acknowledge.cpp
-// author : Boris Kolpackov <boris@kolpackov.net>
-// cvs-id : $Id$
-
-#include "ace/Time_Value.h" // ACE_Time_Value
-#include "ace/OS_NS_unistd.h"
-#include "ace/OS_NS_stdlib.h" // abort
-#include "ace/OS_NS_sys_time.h" // gettimeofday
-
-#include "Acknowledge.h"
-
-/*
-#include <iostream>
-using std::cerr;
-using std::endl;
-*/
-
-namespace ACE_RMCast
-{
- Acknowledge::
- Acknowledge (Parameters const& params)
- : params_ (params),
- hold_ (params.addr_map_size ()),
- cond_ (mutex_),
- nrtm_timer_ (params_.nrtm_timeout ()),
- stop_ (false)
- {
- }
-
- void Acknowledge::
- in_start (In_Element* in)
- {
- Element::in_start (in);
- }
-
- void Acknowledge::
- out_start (Out_Element* out)
- {
- Element::out_start (out);
-
- tracker_mgr_.spawn (track_thunk, this);
- }
-
- void Acknowledge::
- out_stop ()
- {
- {
- Lock l (mutex_);
- stop_ = true;
- cond_.signal ();
- }
-
- tracker_mgr_.wait ();
-
- Element::out_stop ();
- }
-
- void Acknowledge::
- collapse (Queue& q)
- {
- // I would normally use iterators in the logic below but ACE_Map_Manager
- // iterates over entries in no particular order so it is pretty much
- // unusable here. Instead we will do slow and cumbersome find's.
- //
-
- u64 sn (q.sn () + 1);
-
- for (;; ++sn)
- {
- Queue::ENTRY* e;
-
- if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
-
- Message_ptr m (e->int_id_.msg ());
- q.unbind (sn);
-
- in_->recv (m);
- }
-
- q.sn (sn - 1);
- }
-
- void Acknowledge::
- track ()
- {
- while (true)
- {
- Messages msgs;
-
- {
- Lock l (mutex_);
-
- if (stop_)
- break;
-
- if (hold_.current_size () != 0)
- {
- for (Map::iterator i (hold_.begin ()), e (hold_.end ());
- i != e;
- ++i)
- {
- Queue& q = (*i).int_id_;
-
- if (q.current_size () == 0) continue;
-
- track_queue ((*i).ext_id_, q, msgs);
- }
- }
-
- if (--nrtm_timer_ == 0)
- {
- nrtm_timer_ = params_.nrtm_timeout ();
-
- // Send NRTM.
- //
- unsigned short max_payload_size (
- params_.max_packet_size () - max_service_size);
-
- u32 max_elem (NRTM::max_count (max_payload_size));
-
- Profile_ptr nrtm (create_nrtm (max_elem));
-
- if (nrtm.get ())
- {
- Message_ptr m (new Message);
- m->add (nrtm);
- msgs.push_back (m);
-
- }
- }
- }
-
- // Send stuff off.
- //
- for (Messages::Iterator i (msgs); !i.done (); i.advance ())
- {
- Message_ptr* ppm;
- i.next (ppm);
- send (*ppm);
- }
-
- // Go to sleep but watch for "manual cancellation" request.
- //
- {
- ACE_Time_Value time (ACE_OS::gettimeofday ());
- time += params_.tick ();
-
- Lock l (mutex_);
-
- while (!stop_)
- {
- if (cond_.wait (&time) == -1)
- {
- if (errno != ETIME)
- ACE_OS::abort ();
- else
- break;
- }
- }
-
- if (stop_)
- break;
- }
- }
- }
-
- void Acknowledge::
- track_queue (Address const& addr, Queue& q, Messages& msgs)
- {
- unsigned short max_payload_size (
- params_.max_packet_size () - max_service_size);
-
- u32 max_elem (NAK::max_count (max_payload_size));
- u32 count (0);
-
- Queue::iterator i (q.begin ()), e (q.end ());
-
- // Track existing losses.
- //
- while (i != e)
- {
- NAK_ptr nak (new NAK (addr));
-
- // Inner loop that fills NAK profile with up to max_elem elements.
- //
- for (; i != e && nak->count () < max_elem; ++i)
- {
- u64 sn ((*i).ext_id_);
- Descr& d = (*i).int_id_;
-
- if (d.lost ())
- {
- d.timer (d.timer () - 1);
-
- if (d.timer () == 0)
- {
- //@@ Need exp fallback.
- //
- d.nak_count (d.nak_count () + 1);
- d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
-
- nak->add (sn);
-
- ++count;
-
- // cerr << 6 << "NAK # " << d.nak_count () << ": "
- // << addr << " " << sn << endl;
- }
- }
- }
-
- // Send this NAK.
- //
- if (nak->count ())
- {
- // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
- // << endl;
-
- Message_ptr m (new Message);
-
- m->add (Profile_ptr (nak.release ()));
-
- msgs.push_back (m);
- }
- }
-
- // Detect and record new losses.
- //
- for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
- {
- if (q.find (sn) == -1)
- {
- q.bind (sn, Descr (1));
- }
- }
- }
-
- void Acknowledge::
- recv (Message_ptr m)
- {
- // Handle NRTM. There could be some nasty interaction with code
- // that handles data below (like missing message and NAK). This
- // is why I hold the lock at the beginning (which may be not very
- // efficient).
- //
- Lock l (mutex_);
-
- if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
- {
- for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
- {
- u64 sn (nrtm->find ((*i).ext_id_));
-
- if (sn != 0)
- {
- Queue& q = (*i).int_id_;
-
- u64 old (q.max_sn ());
-
- if (old < sn)
- {
- // Mark as lost.
- //
- q.bind (sn, Descr (1));
- }
- }
- }
- }
-
- if (m->find (Data::id) || m->find (NoData::id))
- {
- Address from (
- static_cast<From const*> (m->find (From::id))->address ());
-
- u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
-
- Map::ENTRY* e;
-
- if (hold_.find (from, e) == -1)
- {
- // First message from this source.
- //
- hold_.bind (from, Queue (sn));
- in_->recv (m);
- }
- else
- {
- Queue& q = e->int_id_;
-
- if (sn <= q.sn ())
- {
- // Duplicate.
- //
- //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
- // << endl;
- }
- else if (sn == q.sn () + 1)
- {
- // Next message.
- //
-
- q.rebind (sn, Descr (m));
- collapse (q);
- }
- else
- {
- // Some messages are missing. Insert this one into the queue.
- //
- q.rebind (sn, Descr (m));
- }
- }
- }
- else
- {
- l.release ();
-
- // Just forward it up.
- //
- in_->recv (m);
- }
- }
-
- void Acknowledge::
- send (Message_ptr m)
- {
- if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
- {
- size_t max_payload_size (
- params_.max_packet_size () - max_service_size);
-
- if (max_payload_size > data->size ())
- {
- u32 max_size (max_payload_size - data->size ());
- u32 max_elem (NRTM::max_count (max_size));
-
- if (max_elem > 0)
- {
- Lock l (mutex_);
-
- Profile_ptr nrtm (create_nrtm (max_elem));
-
- if (nrtm.get ())
- m->add (nrtm);
- }
- }
-
- nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
- }
-
- out_->send (m);
- }
-
- Profile_ptr Acknowledge::
- create_nrtm (u32 max_elem)
- {
- // Prepare NRTM.
- //
- NRTM_ptr nrtm (new NRTM ());
-
- // Gather the information.
- //
- {
- for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
- {
- Address addr ((*i).ext_id_);
- Queue& q = (*i).int_id_;
-
- //@@ Should look for the highest known number.
- //
- nrtm->insert (addr, q.sn ());
-
- if (--max_elem == 0)
- break;
- }
- }
-
- if (nrtm->empty ())
- return Profile_ptr (0);
- else
- return Profile_ptr (nrtm.release ());
- }
-
- ACE_THR_FUNC_RETURN Acknowledge::
- track_thunk (void* obj)
- {
- reinterpret_cast<Acknowledge*> (obj)->track ();
- return 0;
- }
-}