summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-03-17 17:30:53 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-03-17 17:30:53 +0000
commit7509cf5a83d7170c77a420e865ec623ce903f7e3 (patch)
treee1f53b06358082bea5d193592b62a878524303db /protocols
parent3095f224fb9e440f88317f313b08a8a999905236 (diff)
downloadATCD-7509cf5a83d7170c77a420e865ec623ce903f7e3.tar.gz
ChangeLogTag: Thu Mar 17 19:45:10 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'protocols')
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp37
-rw-r--r--protocols/ace/RMCast/Acknowledge.h2
-rw-r--r--protocols/ace/RMCast/Bits.h11
-rw-r--r--protocols/ace/RMCast/Link.cpp41
-rw-r--r--protocols/ace/RMCast/Link.h2
-rw-r--r--protocols/ace/RMCast/Retransmit.cpp47
-rw-r--r--protocols/ace/RMCast/Retransmit.h3
-rw-r--r--protocols/ace/RMCast/Socket.cpp2
8 files changed, 114 insertions, 31 deletions
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
index 1fdf19a92d9..edad0f56696 100644
--- a/protocols/ace/RMCast/Acknowledge.cpp
+++ b/protocols/ace/RMCast/Acknowledge.cpp
@@ -14,7 +14,9 @@ namespace ACE_RMCast
Acknowledge::
Acknowledge ()
- : nrtm_timer_ (nrtm_timeout)
+ : cond_ (mutex_),
+ nrtm_timer_ (nrtm_timeout),
+ stop_ (false)
{
}
@@ -35,7 +37,12 @@ namespace ACE_RMCast
void Acknowledge::
out_stop ()
{
- tracker_mgr_.cancel_all (1);
+ {
+ Lock l (mutex_);
+ stop_ = true;
+ cond_.signal ();
+ }
+
tracker_mgr_.wait ();
Element::out_stop ();
@@ -76,6 +83,9 @@ namespace ACE_RMCast
{
Lock l (mutex_);
+ if (stop_)
+ break;
+
if (hold_.current_size () != 0)
{
for (Map::iterator i (hold_.begin ()), e (hold_.end ());
@@ -117,7 +127,28 @@ namespace ACE_RMCast
send (*ppm);
}
- ACE_OS::sleep (tick);
+ // Go to sleep but watch for "manual cancellation" request.
+ //
+ {
+ ACE_Time_Value time (ACE_OS::gettimeofday ());
+ time += tick;
+
+ Lock l (mutex_);
+
+ while (!stop_)
+ {
+ if (cond_.wait (&time) == -1)
+ {
+ if (errno != ETIME)
+ abort ();
+ else
+ break;
+ }
+ }
+
+ if (stop_)
+ break;
+ }
}
}
diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h
index 39de2f63a1c..5726f07d0d3 100644
--- a/protocols/ace/RMCast/Acknowledge.h
+++ b/protocols/ace/RMCast/Acknowledge.h
@@ -225,9 +225,11 @@ namespace ACE_RMCast
private:
Map hold_;
Mutex mutex_;
+ Condition cond_;
unsigned long nrtm_timer_;
+ bool stop_;
ACE_Thread_Manager tracker_mgr_;
};
diff --git a/protocols/ace/RMCast/Bits.h b/protocols/ace/RMCast/Bits.h
index 7d9e02575d5..1b45580647b 100644
--- a/protocols/ace/RMCast/Bits.h
+++ b/protocols/ace/RMCast/Bits.h
@@ -1,5 +1,3 @@
-// -*- C++ -*-
-
// file : ace/RMCast/Bits.h
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
@@ -8,15 +6,10 @@
#define ACE_RMCAST_BITS_H
#include "ace/Auto_Ptr.h"
+#include "ace/Thread_Mutex.h"
+#include "ace/Condition_T.h"
#include "ace/Synch_Traits.h"
-
-class ACE_Thread_Mutex;
-template <typename T> class ACE_Guard;
-template <typename T> class ACE_Guard;
-template <typename T> class ACE_Condition;
-
-
namespace ACE_RMCast
{
typedef ACE_SYNCH_MUTEX Mutex;
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
index 7b68367f4f0..d619243bbcb 100644
--- a/protocols/ace/RMCast/Link.cpp
+++ b/protocols/ace/RMCast/Link.cpp
@@ -8,6 +8,11 @@
namespace ACE_RMCast
{
+ // Time period after which a manual cancellation request is
+ // checked for.
+ //
+ ACE_Time_Value const timeout (0, 500);
+
Link::
Link (Address const& addr)
: addr_ (addr),
@@ -15,7 +20,8 @@ namespace ACE_RMCast
static_cast<ACE_UINT32> (INADDR_ANY)),
AF_INET,
IPPROTO_UDP,
- 1)
+ 1),
+ stop_ (false)
{
srand (time (0));
@@ -82,7 +88,10 @@ namespace ACE_RMCast
{
// Stop receiving thread.
//
- recv_mgr_.cancel_all (1);
+ {
+ Lock l (mutex_);
+ stop_ = true;
+ }
recv_mgr_.wait ();
Element::in_stop ();
@@ -167,7 +176,33 @@ namespace ACE_RMCast
//@@ CDR-specific.
//
- size = rsock_.recv (data, 4, addr, MSG_PEEK);
+ // Block for up to timeout time waiting for an incomming message.
+ //
+ for (;;)
+ {
+ ACE_Time_Value t (timeout);
+ ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
+
+ if (r == -1)
+ {
+ if (errno != ETIME)
+ abort ();
+ }
+ else
+ {
+ size = static_cast<size_t> (r);
+ break;
+ }
+
+ // Check for cancellation request.
+ //
+ {
+ Lock l (mutex_);
+ if (stop_)
+ return;
+ }
+ }
+
if (size != 4 || addr == self_)
{
diff --git a/protocols/ace/RMCast/Link.h b/protocols/ace/RMCast/Link.h
index 4c1495d2a37..8da695023f2 100644
--- a/protocols/ace/RMCast/Link.h
+++ b/protocols/ace/RMCast/Link.h
@@ -53,11 +53,13 @@ namespace ACE_RMCast
ACE_SOCK_Dgram_Mcast rsock_;
ACE_SOCK_Dgram ssock_;
+ bool stop_;
ACE_Thread_Manager recv_mgr_;
// Simulator.
//
Message_ptr hold_;
+
Mutex mutex_;
};
}
diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp
index 05e59e1879a..a72832aded6 100644
--- a/protocols/ace/RMCast/Retransmit.cpp
+++ b/protocols/ace/RMCast/Retransmit.cpp
@@ -12,6 +12,7 @@ namespace ACE_RMCast
Retransmit::
Retransmit ()
+ : cond_ (mutex_), stop_ (false)
{
}
@@ -26,7 +27,12 @@ namespace ACE_RMCast
void Retransmit::
out_stop ()
{
- tracker_mgr_.cancel_all (1);
+ {
+ Lock l (mutex_);
+ stop_ = true;
+ cond_.signal ();
+ }
+
tracker_mgr_.wait ();
Element::out_stop ();
@@ -108,25 +114,40 @@ namespace ACE_RMCast
{
while (true)
{
+ Lock l (mutex_);
+
+ for (Queue::iterator i (queue_); !i.done ();)
{
- Lock l (mutex_);
+ if ((*i).int_id_.inc () >= 60)
+ {
+ u64 sn ((*i).ext_id_);
+ i.advance ();
+ queue_.unbind (sn);
+ }
+ else
+ {
+ i.advance ();
+ }
+ }
- for (Queue::iterator i (queue_); !i.done ();)
+ // Go to sleep but watch for "manual cancellation" request.
+ //
+ ACE_Time_Value time (ACE_OS::gettimeofday ());
+ time += tick;
+
+ while (!stop_)
+ {
+ if (cond_.wait (&time) == -1)
{
- if ((*i).int_id_.inc () >= 60)
- {
- u64 sn ((*i).ext_id_);
- i.advance ();
- queue_.unbind (sn);
- }
+ if (errno != ETIME)
+ abort ();
else
- {
- i.advance ();
- }
+ break;
}
}
- ACE_OS::sleep (tick);
+ if (stop_)
+ break;
}
}
}
diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h
index fbfd20fa62c..fa910ee2837 100644
--- a/protocols/ace/RMCast/Retransmit.h
+++ b/protocols/ace/RMCast/Retransmit.h
@@ -6,7 +6,6 @@
#define ACE_RMCAST_RETRANSMIT_H
#include <ace/Hash_Map_Manager.h>
-
#include <ace/Thread_Manager.h>
#include "Stack.h"
@@ -88,7 +87,9 @@ namespace ACE_RMCast
private:
Queue queue_;
Mutex mutex_;
+ Condition cond_;
+ bool stop_;
ACE_Thread_Manager tracker_mgr_;
};
}
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
index 21e43c07f47..578897f6761 100644
--- a/protocols/ace/RMCast/Socket.cpp
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -6,8 +6,6 @@
#include "ace/OS_NS_string.h"
#include "ace/Unbounded_Queue.h"
-#include "ace/Thread_Mutex.h"
-#include "ace/Condition_T.h"
#include "Stack.h"
#include "Protocol.h"