diff options
author | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-03-17 17:30:53 +0000 |
---|---|---|
committer | boris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2005-03-17 17:30:53 +0000 |
commit | 7509cf5a83d7170c77a420e865ec623ce903f7e3 (patch) | |
tree | e1f53b06358082bea5d193592b62a878524303db /protocols | |
parent | 3095f224fb9e440f88317f313b08a8a999905236 (diff) | |
download | ATCD-7509cf5a83d7170c77a420e865ec623ce903f7e3.tar.gz |
ChangeLogTag: Thu Mar 17 19:45:10 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.cpp | 37 | ||||
-rw-r--r-- | protocols/ace/RMCast/Acknowledge.h | 2 | ||||
-rw-r--r-- | protocols/ace/RMCast/Bits.h | 11 | ||||
-rw-r--r-- | protocols/ace/RMCast/Link.cpp | 41 | ||||
-rw-r--r-- | protocols/ace/RMCast/Link.h | 2 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.cpp | 47 | ||||
-rw-r--r-- | protocols/ace/RMCast/Retransmit.h | 3 | ||||
-rw-r--r-- | protocols/ace/RMCast/Socket.cpp | 2 |
8 files changed, 114 insertions, 31 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp index 1fdf19a92d9..edad0f56696 100644 --- a/protocols/ace/RMCast/Acknowledge.cpp +++ b/protocols/ace/RMCast/Acknowledge.cpp @@ -14,7 +14,9 @@ namespace ACE_RMCast Acknowledge:: Acknowledge () - : nrtm_timer_ (nrtm_timeout) + : cond_ (mutex_), + nrtm_timer_ (nrtm_timeout), + stop_ (false) { } @@ -35,7 +37,12 @@ namespace ACE_RMCast void Acknowledge:: out_stop () { - tracker_mgr_.cancel_all (1); + { + Lock l (mutex_); + stop_ = true; + cond_.signal (); + } + tracker_mgr_.wait (); Element::out_stop (); @@ -76,6 +83,9 @@ namespace ACE_RMCast { Lock l (mutex_); + if (stop_) + break; + if (hold_.current_size () != 0) { for (Map::iterator i (hold_.begin ()), e (hold_.end ()); @@ -117,7 +127,28 @@ namespace ACE_RMCast send (*ppm); } - ACE_OS::sleep (tick); + // Go to sleep but watch for "manual cancellation" request. + // + { + ACE_Time_Value time (ACE_OS::gettimeofday ()); + time += tick; + + Lock l (mutex_); + + while (!stop_) + { + if (cond_.wait (&time) == -1) + { + if (errno != ETIME) + abort (); + else + break; + } + } + + if (stop_) + break; + } } } diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h index 39de2f63a1c..5726f07d0d3 100644 --- a/protocols/ace/RMCast/Acknowledge.h +++ b/protocols/ace/RMCast/Acknowledge.h @@ -225,9 +225,11 @@ namespace ACE_RMCast private: Map hold_; Mutex mutex_; + Condition cond_; unsigned long nrtm_timer_; + bool stop_; ACE_Thread_Manager tracker_mgr_; }; diff --git a/protocols/ace/RMCast/Bits.h b/protocols/ace/RMCast/Bits.h index 7d9e02575d5..1b45580647b 100644 --- a/protocols/ace/RMCast/Bits.h +++ b/protocols/ace/RMCast/Bits.h @@ -1,5 +1,3 @@ -// -*- C++ -*- - // file : ace/RMCast/Bits.h // author : Boris Kolpackov <boris@kolpackov.net> // cvs-id : $Id$ @@ -8,15 +6,10 @@ #define ACE_RMCAST_BITS_H #include "ace/Auto_Ptr.h" +#include "ace/Thread_Mutex.h" +#include "ace/Condition_T.h" #include "ace/Synch_Traits.h" - -class ACE_Thread_Mutex; -template <typename T> class ACE_Guard; -template <typename T> class ACE_Guard; -template <typename T> class ACE_Condition; - - namespace ACE_RMCast { typedef ACE_SYNCH_MUTEX Mutex; diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp index 7b68367f4f0..d619243bbcb 100644 --- a/protocols/ace/RMCast/Link.cpp +++ b/protocols/ace/RMCast/Link.cpp @@ -8,6 +8,11 @@ namespace ACE_RMCast { + // Time period after which a manual cancellation request is + // checked for. + // + ACE_Time_Value const timeout (0, 500); + Link:: Link (Address const& addr) : addr_ (addr), @@ -15,7 +20,8 @@ namespace ACE_RMCast static_cast<ACE_UINT32> (INADDR_ANY)), AF_INET, IPPROTO_UDP, - 1) + 1), + stop_ (false) { srand (time (0)); @@ -82,7 +88,10 @@ namespace ACE_RMCast { // Stop receiving thread. // - recv_mgr_.cancel_all (1); + { + Lock l (mutex_); + stop_ = true; + } recv_mgr_.wait (); Element::in_stop (); @@ -167,7 +176,33 @@ namespace ACE_RMCast //@@ CDR-specific. // - size = rsock_.recv (data, 4, addr, MSG_PEEK); + // Block for up to timeout time waiting for an incomming message. + // + for (;;) + { + ACE_Time_Value t (timeout); + ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); + + if (r == -1) + { + if (errno != ETIME) + abort (); + } + else + { + size = static_cast<size_t> (r); + break; + } + + // Check for cancellation request. + // + { + Lock l (mutex_); + if (stop_) + return; + } + } + if (size != 4 || addr == self_) { diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h index 4c1495d2a37..8da695023f2 100644 --- a/protocols/ace/RMCast/Link.h +++ b/protocols/ace/RMCast/Link.h @@ -53,11 +53,13 @@ namespace ACE_RMCast ACE_SOCK_Dgram_Mcast rsock_; ACE_SOCK_Dgram ssock_; + bool stop_; ACE_Thread_Manager recv_mgr_; // Simulator. // Message_ptr hold_; + Mutex mutex_; }; } diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp index 05e59e1879a..a72832aded6 100644 --- a/protocols/ace/RMCast/Retransmit.cpp +++ b/protocols/ace/RMCast/Retransmit.cpp @@ -12,6 +12,7 @@ namespace ACE_RMCast Retransmit:: Retransmit () + : cond_ (mutex_), stop_ (false) { } @@ -26,7 +27,12 @@ namespace ACE_RMCast void Retransmit:: out_stop () { - tracker_mgr_.cancel_all (1); + { + Lock l (mutex_); + stop_ = true; + cond_.signal (); + } + tracker_mgr_.wait (); Element::out_stop (); @@ -108,25 +114,40 @@ namespace ACE_RMCast { while (true) { + Lock l (mutex_); + + for (Queue::iterator i (queue_); !i.done ();) { - Lock l (mutex_); + if ((*i).int_id_.inc () >= 60) + { + u64 sn ((*i).ext_id_); + i.advance (); + queue_.unbind (sn); + } + else + { + i.advance (); + } + } - for (Queue::iterator i (queue_); !i.done ();) + // Go to sleep but watch for "manual cancellation" request. + // + ACE_Time_Value time (ACE_OS::gettimeofday ()); + time += tick; + + while (!stop_) + { + if (cond_.wait (&time) == -1) { - if ((*i).int_id_.inc () >= 60) - { - u64 sn ((*i).ext_id_); - i.advance (); - queue_.unbind (sn); - } + if (errno != ETIME) + abort (); else - { - i.advance (); - } + break; } } - ACE_OS::sleep (tick); + if (stop_) + break; } } } diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h index fbfd20fa62c..fa910ee2837 100644 --- a/protocols/ace/RMCast/Retransmit.h +++ b/protocols/ace/RMCast/Retransmit.h @@ -6,7 +6,6 @@ #define ACE_RMCAST_RETRANSMIT_H #include <ace/Hash_Map_Manager.h> - #include <ace/Thread_Manager.h> #include "Stack.h" @@ -88,7 +87,9 @@ namespace ACE_RMCast private: Queue queue_; Mutex mutex_; + Condition cond_; + bool stop_; ACE_Thread_Manager tracker_mgr_; }; } diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp index 21e43c07f47..578897f6761 100644 --- a/protocols/ace/RMCast/Socket.cpp +++ b/protocols/ace/RMCast/Socket.cpp @@ -6,8 +6,6 @@ #include "ace/OS_NS_string.h" #include "ace/Unbounded_Queue.h" -#include "ace/Thread_Mutex.h" -#include "ace/Condition_T.h" #include "Stack.h" #include "Protocol.h" |