summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/TMCast/MTQueue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/protocols/ace/TMCast/MTQueue.hpp')
-rw-r--r--ACE/protocols/ace/TMCast/MTQueue.hpp176
1 files changed, 176 insertions, 0 deletions
diff --git a/ACE/protocols/ace/TMCast/MTQueue.hpp b/ACE/protocols/ace/TMCast/MTQueue.hpp
new file mode 100644
index 00000000000..2905ea0564f
--- /dev/null
+++ b/ACE/protocols/ace/TMCast/MTQueue.hpp
@@ -0,0 +1,176 @@
+// $Id$
+// author : Boris Kolpackov <boris@dre.vanderbilt.edu>
+
+#ifndef TMCAST_MT_QUEUE_HPP
+#define TMCAST_MT_QUEUE_HPP
+
+#include "ace/Auto_Ptr.h"
+#include "ace/Unbounded_Set.h"
+#include "ace/Unbounded_Queue.h"
+#include "ace/os_include/sys/os_types.h"
+#include "ace/Condition_T.h"
+
+namespace ACE_TMCast
+{
+ template <typename T,
+ typename M,
+ typename C,
+ typename Q = ACE_Unbounded_Queue<T> >
+ class MTQueue
+ {
+ public:
+ typedef T ElementType;
+ typedef M MutexType;
+ typedef C ConditionalType;
+ typedef Q QueueType;
+
+ public:
+
+ MTQueue ()
+ : mutexp_ (new MutexType),
+ mutex_ (*mutexp_),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ MTQueue (MutexType& mutex)
+ : mutexp_ (),
+ mutex_ (mutex),
+ queue_ (),
+ signal_ (false)
+ {
+ }
+
+ public:
+ bool
+ empty () const
+ {
+ return queue_.is_empty ();
+ }
+
+ size_t
+ size () const
+ {
+ return queue_.size ();
+ }
+
+ // typedef typename QueueType::Empty Empty;
+
+ class Empty {};
+
+ T&
+ front ()
+ {
+ ACE_Unbounded_Queue_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+
+ T const&
+ front () const
+ {
+ ACE_Unbounded_Queue_Const_Iterator<T> f (queue_);
+ T* tmp;
+ if (!f.next (tmp)) throw Empty ();
+
+ return *tmp;
+ }
+
+ /*
+ T&
+ back ()
+ {
+ return queue_.back ();
+ }
+
+
+ T const&
+ back () const
+ {
+ return queue_.back ();
+ }
+ */
+
+ void
+ push (T const& t)
+ {
+ signal_ = empty ();
+ queue_.enqueue_tail (t);
+ }
+
+ void
+ pop ()
+ {
+ T junk;
+ queue_.dequeue_head (junk);
+ }
+
+ public:
+ void
+ lock () const
+ {
+ mutex_.acquire ();
+ }
+
+ void
+ unlock () const
+ {
+ if (signal_)
+ {
+ signal_ = false;
+
+ for (ConditionalSetConstIterator_ i (cond_set_);
+ !i.done ();
+ i.advance ())
+ {
+ ConditionalType** c = 0;
+
+ i.next (c);
+
+ (*c)->signal ();
+ }
+ }
+
+ mutex_.release ();
+ }
+
+ void
+ subscribe (ConditionalType& c)
+ {
+ //@@ should check for duplicates
+ //
+ cond_set_.insert (&c);
+ }
+
+ void
+ unsubscribe (ConditionalType& c)
+ {
+ //@@ should check for absence
+ //
+ cond_set_.remove (&c);
+ }
+
+ private:
+ auto_ptr<MutexType> mutexp_;
+ MutexType& mutex_;
+ QueueType queue_;
+
+ typedef
+ ACE_Unbounded_Set<ConditionalType*>
+ ConditionalSet_;
+
+ typedef
+ ACE_Unbounded_Set_Const_Iterator<ConditionalType*>
+ ConditionalSetConstIterator_;
+
+ ConditionalSet_ cond_set_;
+
+ mutable bool signal_;
+ };
+}
+
+#endif // TMCAST_MT_QUEUE_HPP