summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/RMCast/Acknowledge.h
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/protocols/ace/RMCast/Acknowledge.h')
-rw-r--r--ACE/protocols/ace/RMCast/Acknowledge.h248
1 files changed, 248 insertions, 0 deletions
diff --git a/ACE/protocols/ace/RMCast/Acknowledge.h b/ACE/protocols/ace/RMCast/Acknowledge.h
new file mode 100644
index 00000000000..3d7654b1fec
--- /dev/null
+++ b/ACE/protocols/ace/RMCast/Acknowledge.h
@@ -0,0 +1,248 @@
+// file : ace/RMCast/Acknowledge.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_ACKNOWLEDGE_H
+#define ACE_RMCAST_ACKNOWLEDGE_H
+
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Thread_Manager.h"
+
+#include "Stack.h"
+#include "Protocol.h"
+#include "Bits.h"
+#include "Parameters.h"
+
+#if !defined (ACE_RMCAST_DEFAULT_MAP_SIZE)
+#define ACE_RMCAST_DEFAULT_MAP_SIZE 10
+#endif /* ACE_RMCAST_DEFAULT_MAP_SIZE */
+
+#if !defined (ACE_RMCAST_DEFAULT_QUEUE_SIZE)
+#define ACE_RMCAST_DEFAULT_QUEUE_SIZE 10
+#endif /* ACE_RMCAST_DEFAULT_QUEUE_SIZE */
+
+namespace ACE_RMCast
+{
+ class Acknowledge : public Element
+ {
+ public:
+ Acknowledge (Parameters const& params);
+
+ virtual void
+ in_start (In_Element* in);
+
+ virtual void
+ out_start (Out_Element* out);
+
+ virtual void
+ out_stop ();
+
+ public:
+ virtual void
+ recv (Message_ptr m);
+
+ virtual void
+ send (Message_ptr m);
+
+ // Sun C++ 5.4 can't handle private here.
+ //
+ // private:
+ public:
+ struct Descr
+ {
+ //@@ There should be no default c-tor.
+ //
+ Descr ()
+ : nak_count_ (0), timer_ (1)
+ {
+ }
+
+ Descr (unsigned long timer)
+ : nak_count_ (0), timer_ (timer)
+ {
+ }
+
+ Descr (Message_ptr m)
+ : m_ (m)
+ {
+ }
+
+ public:
+ bool
+ lost () const
+ {
+ return m_.get () == 0;
+ }
+
+ public:
+ Message_ptr
+ msg ()
+ {
+ return m_;
+ }
+
+ void
+ msg (Message_ptr m)
+ {
+ m_ = m;
+ }
+
+ public:
+ unsigned long
+ nak_count () const
+ {
+ return nak_count_;
+ }
+
+ void
+ nak_count (unsigned long v)
+ {
+ nak_count_ = v;
+ }
+
+ unsigned long
+ timer () const
+ {
+ return timer_;
+ }
+
+ void
+ timer (unsigned long v)
+ {
+ timer_ = v;
+ }
+
+ private:
+ Message_ptr m_;
+
+ unsigned long nak_count_;
+ unsigned long timer_;
+ };
+
+ private:
+ struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex>
+ {
+ typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base;
+
+ // Should never be here but required by ACE_Hash_Blah_Blah.
+ //
+ Queue ()
+ : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (0), max_sn_ (0)
+ {
+ }
+
+ Queue (u64 sn)
+ : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (sn), max_sn_ (sn)
+ {
+ }
+
+ Queue (Queue const& q)
+ : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (q.sn_), max_sn_ (sn_)
+ {
+ for (Queue::const_iterator i (q), e (q, 1); i != e; ++i)
+ {
+ bind ((*i).ext_id_, (*i).int_id_);
+ }
+ }
+
+ public:
+ int
+ bind (u64 sn, Descr const& d)
+ {
+ int r (Base::bind (sn, d));
+
+ if (r == 0 && sn > max_sn_) max_sn_ = sn;
+
+ return r;
+ }
+
+ int
+ rebind (u64 sn, Descr const& d)
+ {
+ int r (Base::rebind (sn, d));
+
+ if (r == 0 && sn > max_sn_) max_sn_ = sn;
+
+ return r;
+ }
+
+ int
+ unbind (u64 sn)
+ {
+ int r (Base::unbind (sn));
+
+ if (r == 0 && sn == max_sn_)
+ {
+ for (--max_sn_; max_sn_ >= sn_; --max_sn_)
+ {
+ if (find (max_sn_) == 0) break;
+ }
+ }
+
+ return r;
+ }
+
+ public:
+ u64
+ sn () const
+ {
+ return sn_;
+ }
+
+ void
+ sn (u64 sn)
+ {
+ sn_ = sn;
+ }
+
+ u64
+ max_sn () const
+ {
+ if (current_size () == 0) return sn_;
+
+ return max_sn_;
+ }
+
+ private:
+ u64 sn_, max_sn_;
+ };
+
+ typedef
+ ACE_Hash_Map_Manager_Ex<Address,
+ Queue,
+ AddressHasher,
+ ACE_Equal_To<Address>,
+ ACE_Null_Mutex>
+ Map;
+
+ private:
+ void
+ collapse (Queue& q);
+
+ void
+ track ();
+
+ void
+ track_queue (Address const& addr, Queue& q, Messages& msgs);
+
+ Profile_ptr
+ create_nrtm (u32 max_elem);
+
+ static ACE_THR_FUNC_RETURN
+ track_thunk (void* obj);
+
+ private:
+ Parameters const& params_;
+
+ Map hold_;
+ Mutex mutex_;
+ Condition cond_;
+
+ unsigned long nrtm_timer_;
+
+ bool stop_;
+ ACE_Thread_Manager tracker_mgr_;
+ };
+}
+
+#endif // ACE_RMCAST_ACKNOWLEDGE_H