diff options
Diffstat (limited to 'protocols/ace/RMCast/Retransmit.cpp')
-rw-r--r-- | protocols/ace/RMCast/Retransmit.cpp | 158 |
1 files changed, 0 insertions, 158 deletions
diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp deleted file mode 100644 index b968908f82b..00000000000 --- a/protocols/ace/RMCast/Retransmit.cpp +++ /dev/null @@ -1,158 +0,0 @@ -// file : ace/RMCast/Retransmit.cpp -// author : Boris Kolpackov <boris@kolpackov.net> -// cvs-id : $Id$ - -#include "ace/Time_Value.h" // ACE_Time_Value -#include "ace/OS_NS_stdlib.h" // abort -#include "ace/OS_NS_sys_time.h" // gettimeofday - -#include "Retransmit.h" - -/* -#include <iostream> -using std::cerr; -using std::endl; -*/ - -namespace ACE_RMCast -{ - Retransmit:: - Retransmit (Parameters const& params) - : params_ (params), - cond_ (mutex_), - stop_ (false) - { - } - - void Retransmit:: - out_start (Out_Element* out) - { - Element::out_start (out); - - tracker_mgr_.spawn (track_thunk, this); - } - - void Retransmit:: - out_stop () - { - { - Lock l (mutex_); - stop_ = true; - cond_.signal (); - } - - tracker_mgr_.wait (); - - Element::out_stop (); - } - - void Retransmit:: - send (Message_ptr m) - { - if (m->find (Data::id) != 0) - { - SN const* sn = static_cast<SN const*> (m->find (SN::id)); - - Lock l (mutex_); - queue_.bind (sn->num (), Descr (m->clone ())); - } - - out_->send (m); - } - - void Retransmit:: - recv (Message_ptr m) - { - if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) - { - Address to (static_cast<To const*> (m->find (To::id))->address ()); - - if (nak->address () == to) - { - Lock l (mutex_); - - for (NAK::iterator j (const_cast<NAK*> (nak)->begin ()); - !j.done (); - j.advance ()) - { - u64* psn; - j.next (psn); - - Message_ptr m; - - Queue::ENTRY* pair; - - if (queue_.find (*psn, pair) == 0) - { - //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl; - - m = pair->int_id_.message (); - - pair->int_id_.reset (); - } - else - { - //cerr << 4 << "message " << *psn << " not available" << endl; - - m = Message_ptr (new Message); - m->add (Profile_ptr (new SN (*psn))); - m->add (Profile_ptr (new NoData)); - } - - out_->send (m); - } - } - } - - in_->recv (m); - } - - ACE_THR_FUNC_RETURN Retransmit:: - track_thunk (void* obj) - { - reinterpret_cast<Retransmit*> (obj)->track (); - return 0; - } - - void Retransmit:: - track () - { - while (true) - { - Lock l (mutex_); - - for (Queue::iterator i (queue_); !i.done ();) - { - if ((*i).int_id_.inc () >= params_.retention_timeout ()) - { - u64 sn ((*i).ext_id_); - i.advance (); - queue_.unbind (sn); - } - else - { - i.advance (); - } - } - - // Go to sleep but watch for "manual cancellation" request. - // - ACE_Time_Value time (ACE_OS::gettimeofday ()); - time += params_.tick (); - - while (!stop_) - { - if (cond_.wait (&time) == -1) - { - if (errno != ETIME) - ACE_OS::abort (); - else - break; - } - } - - if (stop_) - break; - } - } -} |