summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlevine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-26 17:20:59 +0000
committerlevine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-26 17:20:59 +0000
commitc6254ae122ac7dae45f2dda4e52f453aed8183f8 (patch)
tree97136f3ba676fc2f9fe5209d4fbf6c10f9c7acb8
parent1ba488338e5bdc68020c86e3b5825bde7b9019ac (diff)
downloadATCD-c6254ae122ac7dae45f2dda4e52f453aed8183f8.tar.gz
On VxWorks, added ACE_Message_Queue_Vx to wrap native VxWorks message queues
-rw-r--r--ace/Message_Queue.cpp954
-rw-r--r--ace/Message_Queue.h435
-rw-r--r--ace/Message_Queue.i147
-rw-r--r--ace/Message_Queue_T.cpp984
-rw-r--r--ace/Message_Queue_T.h486
-rw-r--r--ace/Message_Queue_T.i146
6 files changed, 1839 insertions, 1313 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 171d5174742..6f1c2e53c3c 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -10,116 +10,16 @@
#include "ace/Message_Queue.i"
#endif /* __ACE_INLINE__ */
-ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+#if defined (VXWORKS)
-ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+////////////////////////////////
+// class ACE_Message_Queue_Vx //
+////////////////////////////////
-//////////////////////////////////////
-// class ACE_Message_Queue_Iterator //
-//////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
- : queue_ (q),
- curr_ (q.head_)
-{
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_ != 0)
- {
- entry = this->curr_;
- return 1;
- }
- else
- return 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- return this->curr_ == 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
+void
+ACE_Message_Queue_Vx::dump (void) const
{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_)
- this->curr_ = this->curr_->next ();
- return this->curr_ != 0;
-}
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
-{
-}
-
-ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
-
-//////////////////////////////////////////////
-// class ACE_Message_Queue_Reverse_Iterator //
-//////////////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
- : queue_ (q),
- curr_ (queue_.tail_)
-{
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_ != 0)
- {
- entry = this->curr_;
- return 1;
- }
- else
- return 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- return this->curr_ == 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_)
- this->curr_ = this->curr_->prev ();
- return this->curr_ != 0;
-}
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
-{
-}
-
-/////////////////////////////
-// class ACE_Message_Queue //
-/////////////////////////////
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
+ ACE_TRACE ("ACE_Message_Queue_Vx::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
ACE_DEBUG ((LM_DEBUG,
ASYS_TEXT ("deactivated = %d\n")
@@ -128,7 +28,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
ASYS_TEXT ("cur_bytes = %d\n")
ASYS_TEXT ("cur_count = %d\n")
ASYS_TEXT ("head_ = %u\n")
- ASYS_TEXT ("tail_ = %u\n"),
+ ASYS_TEXT ("MSG_Q_ID = %u\n"),
this->deactivated_,
this->low_water_mark_,
this->high_water_mark_,
@@ -136,86 +36,79 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
this->cur_count_,
this->head_,
this->tail_));
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n")));
- not_full_cond_.dump ();
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n")));
- not_empty_cond_.dump ();
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- : not_empty_cond_ (0),
- not_full_cond_ (0),
- enqueue_waiters_ (0),
- dequeue_waiters_ (0)
-#else
- : not_empty_cond_ (this->lock_),
- not_full_cond_ (this->lock_)
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
+ : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
+ max_messages_ (ACE_static_cast (int, max_messages)),
+ max_message_length_ (ACE_static_cast (int, max_message_length))
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
+ ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
- if (this->open (hwm, lwm, ns) == -1)
+ if (this->open (max_messages_, max_message_length_, ns) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open")));
}
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
+ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
- if (this->head_ != 0 && this->close () == -1)
- ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close")));
+ ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
}
// Don't bother locking since if someone calls this function more than
// once for the same queue, we're in bigger trouble than just
// concurrency control!
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
+int
+ACE_Message_Queue_Vx::open (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
- this->high_water_mark_ = hwm;
- this->low_water_mark_ = lwm;
+ ACE_TRACE ("ACE_Message_Queue_Vx::open");
+ this->high_water_mark_ = 0;
+ this->low_water_mark_ = 0;
this->deactivated_ = 0;
this->cur_bytes_ = 0;
this->cur_count_ = 0;
- this->tail_ = 0;
this->head_ = 0;
this->notification_strategy_ = ns;
- return 0;
+ this->max_messages_ = ACE_static_cast (int, max_messages);
+ this->max_message_length_ = ACE_static_cast (int, max_message_length);
+
+ if (tail_)
+ {
+ // Had already created a msgQ, so delete it.
+ close ();
+ activate_i ();
+ }
+
+ return (this->tail_ =
+ ACE_reinterpret_cast (ACE_Message_Block *,
+ ::msgQCreate (max_messages_,
+ max_message_length_,
+ MSG_Q_FIFO))) == NULL ? -1 : 0;
}
// Implementation of the public deactivate() method
// (assumes locks are held).
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (void)
+int
+ACE_Message_Queue_Vx::deactivate_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::deactivate_i");
int current_status =
this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
- // Wakeup all waiters.
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- this->not_empty_cond_.broadcast ();
- this->not_full_cond_.broadcast ();
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
-
this->deactivated_ = 1;
return current_status;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
+int
+ACE_Message_Queue_Vx::activate_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::activate_i");
int current_status =
this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
this->deactivated_ = 0;
@@ -224,755 +117,146 @@ ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
// Clean up the queue if we have not already done so!
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
+int
+ACE_Message_Queue_Vx::close (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- int res = this->deactivate_i ();
-
- // Free up the remaining message on the list
-
- for (this->tail_ = 0; this->head_ != 0; )
- {
- this->cur_count_--;
-
- ACE_Message_Block *temp;
+ ACE_TRACE ("ACE_Message_Queue_Vx::close");
+ // Don't lock, because we don't have a lock. It shouldn't be
+ // necessary, anyways.
- // Decrement all the counts.
- for (temp = this->head_;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ -= temp->size ();
+ this->deactivate_i ();
- temp = this->head_;
- this->head_ = this->head_->next ();
-
- // Make sure to use <release> rather than <delete> since this is
- // reference counted.
- temp->release ();
- }
+ // Don't bother to free up the remaining message on the list,
+ // because we don't have any way to iterate over what's in the
+ // queue.
- return res;
+ return ::msgQDelete (msgq ());
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
+int
+ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->not_full_cond_.signal () != 0)
- return -1;
-#else
- if (this->enqueue_waiters_ > 0)
- {
- --this->enqueue_waiters_;
- return this->not_full_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ // No-op.
return 0;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
+int
+ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- // Tell any blocked threads that the queue has a new item!
- if (this->not_empty_cond_.signal () != 0)
- return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- return this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ // No-op.
return 0;
}
-// Actually put the node at the end (no locking so must be called with
-// locks held).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
-
- if (new_item == 0)
- return -1;
-
- // List was empty, so build a new one.
- if (this->tail_ == 0)
- {
- this->head_ = new_item;
- this->tail_ = new_item;
- new_item->next (0);
- new_item->prev (0);
- }
- // Link at the end.
- else
- {
- new_item->next (0);
- this->tail_->next (new_item);
- new_item->prev (this->tail_);
- this->tail_ = new_item;
- }
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
-
- this->cur_count_++;
-
- if (this->signal_dequeue_waiters () == -1)
- return -1;
- else
- return this->cur_count_;
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
+ // No-op. This should _never_ be called.
+ ACE_NOTSUP_RETURN (-1);
}
-// Actually put the node at the head (no locking)
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
if (new_item == 0)
return -1;
- new_item->prev (0);
- new_item->next (this->head_);
-
- if (this->head_ != 0)
- this->head_->prev (new_item);
- else
- this->tail_ = new_item;
-
- this->head_ = new_item;
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
+ // Don't try to send a composite message!!!! Only the first
+ // block will be sent.
this->cur_count_++;
- if (this->signal_dequeue_waiters () == -1)
- return -1;
+ // Always use this method to actually send a message on the queue.
+ if (::msgQSend (msgq (),
+ new_item->rd_ptr (),
+ new_item->size (),
+ WAIT_FOREVER,
+ MSG_PRI_NORMAL) == OK)
+ return ::msgQNumMsgs (msgq ());
else
- return this->cur_count_;
+ return -1;
}
-// Actually put the node at its proper position relative to its
-// priority.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
if (new_item == 0)
return -1;
if (this->head_ == 0)
- // Check for simple case of an empty queue, where all we need to
- // do is insert <new_item> into the head.
+ // Should always take this branch.
return this->enqueue_head_i (new_item);
else
- {
- ACE_Message_Block *temp;
-
- // Figure out where the new item goes relative to its priority.
- // We start looking from the highest priority to the lowest
- // priority.
-
- for (temp = this->head_;
- temp != 0;
- temp = temp->next ())
- if (temp->msg_priority () < new_item->msg_priority ())
- // Break out when we've located an item that has lower
- // priority that <new_item>.
- break;
-
- if (temp == 0)
- // Check for simple case of inserting at the tail of the queue,
- // where all we need to do is insert <new_item> after the
- // current tail.
- return this->enqueue_tail_i (new_item);
- else if (temp->prev () == 0)
- // Check for simple case of inserting at the head of the
- // queue, where all we need to do is insert <new_item> before
- // the current head.
- return this->enqueue_head_i (new_item);
- else
- {
- // Insert the new message ahead of the item of
- // lesser priority. This ensures that FIFO order is
- // maintained when messages of the same priority are
- // inserted consecutively.
- new_item->next (temp);
- new_item->prev (temp->prev ());
- temp->prev ()->next (new_item);
- temp->prev (new_item);
- }
- }
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
-
- this->cur_count_++;
-
- if (this->signal_dequeue_waiters () == -1)
- return -1;
- else
- return this->cur_count_;
+ ACE_NOTSUP_RETURN (-1);
}
// Actually get the first ACE_Message_Block (no locking, so must be
// called with locks held). This method assumes that the queue has at
// least one item in it when it is called.
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+int
+ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
- first_item = this->head_;
- this->head_ = this->head_->next ();
+ ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
- if (this->head_ == 0)
- this->tail_ = 0;
- else
- // The prev pointer of the first message block has to point to
- // NULL...
- this->head_->prev (0);
+ // We don't allocate a new Message_Block: the caller must provide
+ // it, and must ensure that it is big enough (without chaining).
- // Make sure to subtract off all of the bytes associated with this
- // message.
- for (ACE_Message_Block *temp = first_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ -= temp->size ();
-
- this->cur_count_--;
+ if (first_item == 0 || first_item->wr_ptr () == 0)
+ return -1;
- if (this->signal_enqueue_waiters () == -1)
+ if (::msgQReceive (msgq (),
+ first_item->wr_ptr (),
+ first_item->size (),
+ WAIT_FOREVER) == ERROR)
return -1;
else
- return this->cur_count_;
+ return ::msgQNumMsgs (msgq ());
}
// Take a look at the first item without removing it.
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
+ ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- // Wait for at least one item to become available.
-
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
- return -1;
-
- first_item = this->head_;
- return this->cur_count_;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
-{
- int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_full_i () && result != -1)
- {
- ++this->enqueue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- if (tv == 0)
- result = this->not_full_cond_.acquire ();
- else
- result = this->not_full_cond_.acquire (*tv);
- int error = errno;
- mon.acquire ();
- errno = error;
- }
-#else
- ACE_UNUSED_ARG (mon);
-
- // Wait while the queue is full.
-
- while (this->is_full_i ())
- {
- if (this->not_full_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- result = -1;
- break;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- result = -1;
- break;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return result;
+ // Always return here, and let the VxWorks message queue handle blocking.
+ return 0;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
+ ACE_Time_Value *tv)
{
- int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_empty_i () && result != -1)
- {
- ++this->dequeue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- if (tv == 0)
- result = this->not_empty_cond_.acquire ();
- else
- {
- result = this->not_empty_cond_.acquire (*tv);
- if (result == -1 && errno == ETIME)
- errno = EWOULDBLOCK;
- }
- int error = errno;
- mon.acquire ();
- errno = error;
- }
-#else
- ACE_UNUSED_ARG (mon);
-
- // Wait while the queue is empty.
-
- while (this->is_empty_i ())
- {
- if (this->not_empty_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- result = -1;
- break;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- result = -1;
- break;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return result;
+ // Always return here, and let the VxWorks message queue handle blocking.
+ return 0;
}
-// Block indefinitely waiting for an item to arrive, does not ignore
-// alerts (e.g., signals).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+#if ! defined (ACE_REQUIRES_FUNC_DEFINITIONS)
+int
+ACE_Message_Queue_Vx::enqueue_tail (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_head_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
+ ACE_NOTSUP_RETURN (-1);
}
-// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
-// accordance with its <msg_priority> (0 is lowest priority). Returns
-// -1 on failure, else the number of items still on the queue.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::enqueue_head (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
+ ACE_NOTSUP_RETURN (-1);
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
+int
+ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
- return this->enqueue_prio (new_item, tv);
-}
-
-// Block indefinitely waiting for an item to arrive,
-// does not ignore alerts (e.g., signals).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_tail_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
-}
-
-// Remove an item from the front of the queue. If TV == 0 block
-// indefinitely (or until an alert occurs). Otherwise, block for upto
-// the amount of time specified by TV.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
- return -1;
-
- return this->dequeue_head_i (first_item);
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
-
- // By default, don't do anything.
- if (this->notification_strategy_ == 0)
- return 0;
- else
- return this->notification_strategy_->notify ();
-}
-
-
-/////////////////////////////////////
-// class ACE_Dynamic_Message_Queue //
-/////////////////////////////////////
-
- // = Initialization and termination methods.
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
- ACE_Dynamic_Message_Strategy & message_strategy,
- size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
- : ACE_Message_Queue (hwm, lwm, ns)
- , message_strategy_ (message_strategy)
-{
- // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
- // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor
-}
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
-{
- delete &message_strategy_;
-}
-// dtor: free message strategy and let base class dtor do the rest
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
-{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
-
- int result = 0;
-
- // refresh dynamic priority of the new message
- result = (*priority_eval_func_ptr_) (*new_item, tv);
-
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
-
- // refresh dynamic priorities of messages in the queue
- this->refresh_priorities (current_time);
-
- // reorganize the queue according to the new priorities
- this->refresh_queue (current_time);
-
- // if there is only one message in the pending list,
- // the pending list will be empty after a *successful*
- // dequeue operation
- int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
-
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
-
- // null out the pending list tail pointer if
- // the pending list is now empty
- if ((empty_pending) && (result > 0))
- {
- pending_list_tail_ = 0;
- }
-
- return result;
-}
- // Enqueue an <ACE_Message_Block *> in accordance with its priority.
- // priority may be *dynamic* or *static* or a combination or *both*
- // It calls the priority evaluation function passed into the Dynamic
- // Message Queue constructor to update the priorities of all enqueued
- // messages.
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
-{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
-
- int result = 0;
-
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
-
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
- if (result < 0)
- {
- return result;
- }
-
- // reorganize the queue according to the new priorities
- result = this->refresh_queue (current_time);
- if (result < 0)
- {
- return result;
- }
-
- // if there is only one message in the pending list,
- // the pending list will be empty after a *successful*
- // dequeue operation
- int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
-
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
-
- // null out the pending list tail pointer if
- // the pending list is now empty
- if ((empty_pending) && (result > 0))
- {
- pending_list_tail_ = 0;
- }
-
- return result;
-}
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue.
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
-{
- int result = 0;
-
- // apply the priority update function to all enqueued
- // messages, starting at the head of the queue
- ACE_Message_Block *temp = head_;
- while (temp)
- {
- result = (*priority_eval_func_ptr_) (*temp, tv);
- if (result < 0)
- {
- break;
- }
-
- temp = temp->next ();
- }
-
- return result;
-}
- // refresh the priorities in the queue according
- // to a specific priority assignment function
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
-{
- // first, drop any messages from the queue and delete them:
- // reference counting at the data block level means that the
- // underlying data block will not be deleted if another
- // message block is still pointing to it.
- ACE_Message_Block *temp = (pending_list_tail_)
- ? pending_list_tail_->next ()
- : head_;
-
- while (temp)
- {
- // messages that have overflowed the given time bounds must be removed
- if (message_strategy_.is_beyond_late (*temp, tv))
- {
- // find the end of the chain of overflowed messages
- ACE_Message_Block *remove_tail = temp;
- while ((remove_tail) && (remove_tail->next ()) &&
- message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
- {
- remove_tail = remove_tail->next ();
- }
-
- temp = remove_tail->next ();
- if (remove_temp->next ())
- {
- remove_temp->next ()->prev (0);
- }
- else if (remove_temp->prev ())
- {
- remove_temp->prev ()->next (0);
- }
- else
- {
- head_ = 0;
- tail_ = 0;
- }
- remove_temp->prev (0);
- remove_temp->next (0);
-
- temp = remove_temp->next ();
-
- }
- else
- {
- temp = temp->next ();
- }
- }
-}
- // refresh the order of messages in the queue
- // after refreshing their priorities
-
-/////////////////////////////////////
-// class ACE_Message_Queue_Factory //
-/////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
-{
- return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns);
-}
- // factory method for a statically prioritized ACE_Message_Queue
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Deadline_Message_Strategy *adms;
-
- ACE_NEW_RETURN (adms,
- ACE_Deadline_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns);
-}
- // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Laxity_Message_Strategy *alms;
-
- ACE_NEW_RETURN (alms,
- ACE_Laxity_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns);
+ ACE_NOTSUP_RETURN (-1);
}
- // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
+#endif /* ! ACE_REQUIRES_FUNC_DEFINITIONS */
+#endif /* VXWORKS */
#endif /* ACE_MESSAGE_QUEUE_C */
diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h
index a6d625829ab..1544d800868 100644
--- a/ace/Message_Queue.h
+++ b/ace/Message_Queue.h
@@ -49,96 +49,62 @@ public:
};
};
-template <ACE_SYNCH_DECL>
-class ACE_Message_Queue : public ACE_Message_Queue_Base
+// Include the templates here.
+#include "ace/Message_Queue_T.h"
+
+#if defined (VXWORKS)
+# include <msgQLib.h>
+
+class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH>
{
// = TITLE
- // A threaded message queueing facility, modeled after the
- // queueing facilities in System V STREAMs.
+ // Wrapper for VxWorks message queues.
//
// = DESCRIPTION
- // An <ACE_Message_Queue> is the central queueing facility for
- // messages in the ASX framework. If <ACE_SYNCH_DECL> is
- // ACE_MT_SYNCH then all operations are thread-safe. Otherwise,
- // if it's <ACE_NULL_SYNCH> then there's no locking overhead.
+ // Specialization of ACE_Message_Queue to simply wrap VxWorks
+ // MsgQ. It does not use any synchronization, because it relies
+ // on the native MsgQ implementation to take care of that. The
+ // only system calls that it uses are VxWorks msgQLib calls, so
+ // it is suitable for use in iterrupt service routines.
+ //
+ // NOTE: *Many* ACE_Message_Queue features are not supported with
+ // this specialization, including:
+ // * The two size arguments to the constructor and open () are
+ // interpreted differently. The first is interpreted as the
+ // maximum number of bytes in a message. The second is
+ // interpreted as the maximum number of messages that can be
+ // queued.
+ // * dequeue_head () *requires* that the ACE_Message_Block
+ // pointer argument point to an ACE_Message_Block that was
+ // allocated by the caller. It must be big enough to support
+ // the received message, without using continutation. The
+ // pointer argument is not modified.
+ // * Message priority. MSG_Q_FIFO is hard-coded.
+ // * enqueue method timeouts.
+ // * peek_dequeue_head ().
+ // * ACE_Message_Queue_Iterators.
+ // * The ability to change low and high water marks after creation.
+ // * Message_Block chains. The continuation field of ACE_Message_Block
+ // * is ignored; only the first block of a fragment chain is
+ // * recognized.
public:
- friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
- friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
-
- // = Traits
- typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR;
- typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR;
-
// = Initialization and termination methods.
- ACE_Message_Queue (size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
- ACE_Notification_Strategy * = 0);
+ ACE_Message_Queue_Vx (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy * = 0);
// Create a message queue with all the defaults.
- virtual int open (size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
+ virtual int open (size_t max_messages,
+ size_t max_message_length,
ACE_Notification_Strategy * = 0);
// Create a message queue with all the defaults.
virtual int close (void);
// Close down the message queue and release all resources.
- virtual ~ACE_Message_Queue (void);
+ virtual ~ACE_Message_Queue_Vx (void);
// Close down the message queue and release all resources.
- // = Enqueue and dequeue methods.
-
- // For all the following routines if <timeout> == 0, the caller will
- // block until action is possible, else will wait until the absolute
- // time specified in *<timeout> elapses). These calls will return,
- // however, when queue is closed, deactivated, when a signal occurs,
- // or if the time specified in timeout elapses, (in which case errno
- // = EWOULDBLOCK).
-
- virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv = 0);
- // Retrieve the first <ACE_Message_Block> without removing it.
- // Returns -1 on failure, else the number of items still on the
- // queue.
-
- virtual int enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
- // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
- // accordance with its <msg_priority> (0 is lowest priority). FIFO
- // order is maintained when messages of the same priority are
- // inserted consecutively. Returns -1 on failure, else the number
- // of items still on the queue.
-
- virtual int enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
- // This is an alias for <enqueue_prio>. It's only here for
- // backwards compatibility and will go away in a subsequent release.
- // Please use <enqueue_prio> instead.
-
- virtual int enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
- // Enqueue an <ACE_Message_Block *> at the end of the queue.
- // Returns -1 on failure, else the number of items still on the
- // queue.
-
- virtual int enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
- // Enqueue an <ACE_Message_Block *> at the head of the queue.
- // Returns -1 on failure, else the number of items still on the
- // queue.
-
- virtual int dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout = 0);
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue. Returns -1 on failure, else the number of items still on
- // the queue.
-
- // = Check if queue is full/empty.
- virtual int is_full (void);
- // True if queue is full, else false.
- virtual int is_empty (void);
- // True if queue is empty, else false.
-
// = Queue statistic methods.
virtual size_t message_bytes (void);
// Number of total bytes on the queue.
@@ -157,39 +123,6 @@ public:
// = Activation control methods.
- virtual int deactivate (void);
- // Deactivate the queue and wakeup all threads waiting on the queue
- // so they can continue. No messages are removed from the queue,
- // however. Any other operations called until the queue is
- // activated again will immediately return -1 with <errno> ==
- // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
- // call and WAS_ACTIVE if queue was active before the call.
-
- virtual int activate (void);
- // Reactivate the queue so that threads can enqueue and dequeue
- // messages again. Returns WAS_INACTIVE if queue was inactive
- // before the call and WAS_ACTIVE if queue was active before the
- // call.
-
- virtual int deactivated (void);
- // Returns true if <deactivated_> is enabled.
-
- // = Notification hook.
-
- virtual int notify (void);
- // This hook is automatically invoked by <enqueue_head>,
- // <enqueue_tail>, and <enqueue_prio> when a new item is inserted
- // into the queue. Subclasses can override this method to perform
- // specific notification strategies (e.g., signaling events for a
- // <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
- // multi-threaded application with concurrent consumers, there is no
- // guarantee that the queue will be still be non-empty by the time
- // the notification occurs.
-
- // = Get/set the notification strategy for the <Message_Queue>
- virtual ACE_Notification_Strategy *notification_strategy (void);
- virtual void notification_strategy (ACE_Notification_Strategy *s);
-
void dump (void) const;
// Dump the state of an object.
@@ -197,11 +130,6 @@ public:
// Declare the dynamic allocation hooks.
protected:
- // = Routines that actually do the enqueueing and dequeueing.
- // These routines assume that locks are held by the corresponding
- // public methods. Since they are virtual, you can change the
- // queueing mechanism by subclassing from <ACE_Message_Queue>.
-
virtual int enqueue_i (ACE_Message_Block *new_item);
// Enqueue an <ACE_Message_Block *> in accordance with its priority.
@@ -228,11 +156,11 @@ protected:
// Activate the queue.
// = Helper methods to factor out common #ifdef code.
- virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
ACE_Time_Value *tv);
// Wait for the queue to become non-full.
- virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
ACE_Time_Value *tv);
// Wait for the queue to become non-empty.
@@ -242,264 +170,36 @@ protected:
virtual int signal_dequeue_waiters (void);
// Inform any threads waiting to dequeue that they can procede.
- ACE_Message_Block *head_;
- // Pointer to head of ACE_Message_Block list.
-
- ACE_Message_Block *tail_;
- // Pointer to tail of ACE_Message_Block list.
-
- size_t low_water_mark_;
- // Lowest number before unblocking occurs.
-
- size_t high_water_mark_;
- // Greatest number of bytes before blocking.
-
- size_t cur_bytes_;
- // Current number of bytes in the queue.
-
- size_t cur_count_;
- // Current number of messages in the queue.
-
- int deactivated_;
- // Indicates that the queue is inactive.
-
- ACE_Notification_Strategy *notification_strategy_;
- // The notification strategy used when a new message is enqueued.
-
- // = Synchronization primitives for controlling concurrent access.
- ACE_SYNCH_MUTEX_T lock_;
- // Protect queue from concurrent access.
-
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- ACE_SYNCH_SEMAPHORE_T not_empty_cond_;
- // Used to make threads sleep until the queue is no longer empty.
-
- ACE_SYNCH_SEMAPHORE_T not_full_cond_;
- // Used to make threads sleep until the queue is no longer full.
-
- size_t dequeue_waiters_;
- // Number of threads waiting to dequeue a <Message_Block>.
-
- size_t enqueue_waiters_;
- // Number of threads waiting to enqueue a <Message_Block>.
-#else
- ACE_SYNCH_CONDITION_T not_empty_cond_;
- // Used to make threads sleep until the queue is no longer empty.
-
- ACE_SYNCH_CONDITION_T not_full_cond_;
- // Used to make threads sleep until the queue is no longer full.
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ MSG_Q_ID msgq ();
+ // Access the underlying msgQ.
private:
+ int max_messages_;
+ // Maximum number of messages that can be queued.
- // = Disallow these operations.
- ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
- ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
-};
-
-template <ACE_SYNCH_DECL>
-class ACE_Message_Queue_Iterator
-{
- // = TITLE
- // Iterator for the <ACE_Message_Queue>.
-public:
- // = Initialization method.
- ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
-
- // = Iteration methods.
- int next (ACE_Message_Block *&entry);
- // Pass back the <entry> that hasn't been seen in the queue.
- // Returns 0 when all items have been seen, else 1.
-
- int done (void) const;
- // Returns 1 when all items have been seen, else 0.
-
- int advance (void);
- // Move forward by one element in the queue. Returns 0 when all the
- // items in the set have been seen, else 1.
-
- void dump (void) const;
- // Dump the state of an object.
+ int max_message_length_;
+ // Maximum message size, in bytes.
- ACE_ALLOC_HOOK_DECLARE;
- // Declare the dynamic allocation hooks.
-
-private:
- ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
- // Message_Queue we are iterating over.
-
- ACE_Message_Block *curr_;
- // Keeps track of how far we've advanced...
-};
+ int options_;
+ // Native message queue options.
-template <ACE_SYNCH_DECL>
-class ACE_Message_Queue_Reverse_Iterator
-{
- // = TITLE
- // Reverse Iterator for the <ACE_Message_Queue>.
-public:
- // = Initialization method.
- ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
-
- // = Iteration methods.
- int next (ACE_Message_Block *&entry);
- // Pass back the <entry> that hasn't been seen in the queue.
- // Returns 0 when all items have been seen, else 1.
-
- int done (void) const;
- // Returns 1 when all items have been seen, else 0.
-
- int advance (void);
- // Move forward by one element in the queue. Returns 0 when all the
- // items in the set have been seen, else 1.
-
- void dump (void) const;
- // Dump the state of an object.
-
- ACE_ALLOC_HOOK_DECLARE;
- // Declare the dynamic allocation hooks.
-
-private:
- ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
- // Message_Queue we are iterating over.
-
- ACE_Message_Block *curr_;
- // Keeps track of how far we've advanced...
-};
-
-template <ACE_SYNCH_DECL>
-class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
-{
- // = TITLE
- // A derived class which adapts the <ACE_Message_Queue>
- // class in order to maintain dynamic priorities for enqueued
- // <ACE_Message_Blocks> and manage the queue dynamically.
- //
- // = DESCRIPTION
- // Priorities and queue orderings are refreshed at each enqueue
- // and dequeue operation. Head and tail enqueue methods were
- // made private to prevent out-of-order messages from confusing
- // the pending and late portions of the queue. Messages in the
- // pending portion of the queue whose dynamic priority becomes
- // negative are placed into the late portion of the queue.
- // Messages in the late portion of the queue whose dynamic
- // priority becomes positive are dropped. These behaviors
- // support a limited schedule overrun corresponding to one full
- // cycle through dynamic priority values. These behaviors can
- // be modified in derived classes by providing alternative
- // definitions for the appropriate virtual methods.
- //
-public:
- // = Initialization and termination methods.
- ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
- size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
- ACE_Notification_Strategy * = 0);
-
- virtual ~ACE_Dynamic_Message_Queue (void);
- // Close down the message queue and release all resources.
-
- ACE_ALLOC_HOOK_DECLARE;
- // Declare the dynamic allocation hooks.
-
-protected:
- virtual int enqueue_i (ACE_Message_Block *new_item);
- // Enqueue an <ACE_Message_Block *> in accordance with its priority.
- // priority may be *dynamic* or *static* or a combination or *both*
- // It calls the priority evaluation function passed into the Dynamic
- // Message Queue constructor to update the priorities of all
- // enqueued messages.
-
- virtual int dequeue_head_i (ACE_Message_Block *&first_item);
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue.
-
- virtual int refresh_priorities (const ACE_Time_Value & tv);
- // Refresh the priorities in the queue according to a specific
- // priority assignment function.
-
- virtual int refresh_queue (const ACE_Time_Value & tv);
- // Refresh the order of messages in the queue after refreshing their
- // priorities.
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Vx &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &))
- ACE_Message_Block *pending_list_tail_;
- // Pointer to tail of the pending messages (those whose priority is
- // and has been non-negative) in the <ACE_Message_Block list>.
+ ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head
+ (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv = 0))
- ACE_Dynamic_Message_Strategy &message_strategy_;
- // Pointer to a dynamic priority evaluation function.
+ ACE_UNIMPLEMENTED_FUNC (virtual int enqueue_tail
+ (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
-private:
- // = Disallow these operations.
- ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
- ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
-
- // = These methods can wierdness in dynamically prioritized queue.
-
- // Disallow their use until and unless a coherent semantics for head
- // and tail enqueueing can be identified.
- ACE_UNIMPLEMENTED_FUNC (virtual int
- enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0))
- ACE_UNIMPLEMENTED_FUNC (virtual int
- enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0))
-
- ACE_UNIMPLEMENTED_FUNC (virtual int
- peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv = 0))
- // Since messages are *dynamically* prioritized, it is not possible
- // to guarantee that the message at the head of the queue when this
- // method is called will still be at the head when the next method
- // is called: disallow its use until and unless a coherent semantics
- // for peeking at the head of the queue can be identified.
-};
-
-template <ACE_SYNCH_DECL>
-class ACE_Export ACE_Message_Queue_Factory
-{
- // = TITLE
- // ACE_Message_Queue_Factory is a static factory class template which
- // provides a separate factory method for each of the major kinds of
- // priority based message dispatching: static, earliest deadline first
- // (EDF), and minimum laxity first (MLF).
- //
- // = DESCRIPTION
- // The ACE_Dynamic_Message_Queue class assumes responsibility for
- // releasing the resources of the strategy with which it was
- // constructed: the user of a message queue constructed by
- // any of these factory methods is only responsible for
- // ensuring destruction of the message queue itself.
-
-public:
- static ACE_Message_Queue<ACE_SYNCH_USE> *
- create_static_message_queue (size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
- ACE_Notification_Strategy * = 0);
- // factory method for a statically prioritized ACE_Message_Queue
-
- static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
- create_deadline_message_queue (size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
- ACE_Notification_Strategy * = 0,
- u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
-
- static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
- create_laxity_message_queue (size_t hwm = DEFAULT_HWM,
- size_t lwm = DEFAULT_LWM,
- ACE_Notification_Strategy * = 0,
- u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
+ ACE_UNIMPLEMENTED_FUNC (virtual int enqueue_head
+ (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
};
+#endif /* VXWORKS */
// This must go here to avoid problems with circular includes.
#include "ace/Strategies.h"
@@ -508,13 +208,4 @@ public:
#include "ace/Message_Queue.i"
#endif /* __ACE_INLINE__ */
-#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
-#include "ace/Message_Queue.cpp"
-#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
-
-#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
-#pragma implementation ("Message_Queue.cpp")
-#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
-
#endif /* ACE_MESSAGE_QUEUE_H */
-
diff --git a/ace/Message_Queue.i b/ace/Message_Queue.i
index 6697eb242a4..6ed62567167 100644
--- a/ace/Message_Queue.i
+++ b/ace/Message_Queue.i
@@ -1,148 +1,83 @@
/* -*- C++ -*- */
// $Id$
-// Message_Queue.i
+#if defined (VXWORKS)
+// Specialization to use native VxWorks Message Queues.
-template <ACE_SYNCH_DECL> ACE_INLINE ACE_Notification_Strategy *
-ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
+ACE_INLINE MSG_Q_ID
+ACE_Message_Queue_Vx::msgq ()
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
-
- return this->notification_strategy_;
+ // Hijack the tail_ field to store the MSG_Q_ID.
+ return ACE_reinterpret_cast (MSG_Q_ID, tail_);
}
-template <ACE_SYNCH_DECL> ACE_INLINE void
-ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
+ACE_INLINE int
+ACE_Message_Queue_Vx::is_empty_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
-
- this->notification_strategy_ = s;
-}
-
-// Check if queue is empty (does not hold locks).
-
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::is_empty_i");
return this->cur_bytes_ <= 0 && this->cur_count_ <= 0;
}
-// Check if queue is full (does not hold locks).
-
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
+ACE_INLINE int
+ACE_Message_Queue_Vx::is_full_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::is_full_i");
return this->cur_bytes_ > this->high_water_mark_;
}
-// Check if queue is empty (holds locks).
-
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->is_empty_i ();
-}
-
-// Check if queue is full (holds locks).
-
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->is_full_i ();
-}
-
-template <ACE_SYNCH_DECL> ACE_INLINE size_t
-ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
+ACE_INLINE size_t
+ACE_Message_Queue_Vx::high_water_mark (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+ ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
+ // Don't need to guard, because this is fixed.
return this->high_water_mark_;
}
-template <ACE_SYNCH_DECL> ACE_INLINE void
-ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
+ACE_INLINE void
+ACE_Message_Queue_Vx::high_water_mark (size_t hwm)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
- ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
-
- this->high_water_mark_ = hwm;
+ ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark");
+ // Don't need to guard, because this is fixed.
+ errno = ENOTSUP;
}
-template <ACE_SYNCH_DECL> ACE_INLINE size_t
-ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
+ACE_INLINE size_t
+ACE_Message_Queue_Vx::low_water_mark (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+ ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
+ // Don't need to guard, because this is fixed.
return this->low_water_mark_;
}
-template <ACE_SYNCH_DECL> ACE_INLINE void
-ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
+ACE_INLINE void
+ACE_Message_Queue_Vx::low_water_mark (size_t lwm)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
- ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
+ ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark");
+ // Don't need to guard, because this is fixed.
- this->low_water_mark_ = lwm;
+ errno = ENOTSUP;
}
// Return the current number of bytes in the queue.
-template <ACE_SYNCH_DECL> ACE_INLINE size_t
-ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
-
- return this->cur_bytes_;
-}
-
-// Return the current number of messages in the queue.
-
-template <ACE_SYNCH_DECL> ACE_INLINE size_t
-ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
+ACE_INLINE size_t
+ACE_Message_Queue_Vx::message_bytes (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
-
- return this->cur_count_;
+ ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes");
+ ACE_NOTSUP_RETURN ((size_t) -1);
}
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->activate_i ();
-}
-
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- return this->deactivate_i ();
-}
+// Return the current number of messages in the queue.
-template <ACE_SYNCH_DECL> ACE_INLINE int
-ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
+ACE_INLINE size_t
+ACE_Message_Queue_Vx::message_count (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
+ ACE_TRACE ("ACE_Message_Queue_Vx::message_count");
+ // Don't need to guard, because this is a system call.
- return this->deactivated_;
+ return ::msgQNumMsgs (msgq ());
}
-ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
-
-
+#endif /* VXWORKS */
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
new file mode 100644
index 00000000000..bb5370db1c2
--- /dev/null
+++ b/ace/Message_Queue_T.cpp
@@ -0,0 +1,984 @@
+// $Id$
+
+#if !defined (ACE_MESSAGE_QUEUE_T_C)
+#define ACE_MESSAGE_QUEUE_T_C
+
+#define ACE_BUILD_DLL
+// #include Message_Queue.h instead of Message_Queue_T.h to avoid
+// circular include problems.
+#include "ace/Message_Queue.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Message_Queue_T.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+
+
+//////////////////////////////////////
+// class ACE_Message_Queue_Iterator //
+//////////////////////////////////////
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
+ : queue_ (q),
+ curr_ (q.head_)
+{
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ if (this->curr_ != 0)
+ {
+ entry = this->curr_;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ return this->curr_ == 0;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ if (this->curr_)
+ this->curr_ = this->curr_->next ();
+ return this->curr_ != 0;
+}
+
+template <ACE_SYNCH_DECL> void
+ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
+{
+}
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
+
+
+//////////////////////////////////////////////
+// class ACE_Message_Queue_Reverse_Iterator //
+//////////////////////////////////////////////
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
+ : queue_ (q),
+ curr_ (queue_.tail_)
+{
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ if (this->curr_ != 0)
+ {
+ entry = this->curr_;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ return this->curr_ == 0;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
+{
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
+
+ if (this->curr_)
+ this->curr_ = this->curr_->prev ();
+ return this->curr_ != 0;
+}
+
+template <ACE_SYNCH_DECL> void
+ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
+{
+}
+
+
+/////////////////////////////
+// class ACE_Message_Queue //
+/////////////////////////////
+
+template <ACE_SYNCH_DECL> void
+ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("deactivated = %d\n")
+ ASYS_TEXT ("low_water_mark = %d\n")
+ ASYS_TEXT ("high_water_mark = %d\n")
+ ASYS_TEXT ("cur_bytes = %d\n")
+ ASYS_TEXT ("cur_count = %d\n")
+ ASYS_TEXT ("head_ = %u\n")
+ ASYS_TEXT ("tail_ = %u\n"),
+ this->deactivated_,
+ this->low_water_mark_,
+ this->high_water_mark_,
+ this->cur_bytes_,
+ this->cur_count_,
+ this->head_,
+ this->tail_));
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n")));
+ not_full_cond_.dump ();
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n")));
+ not_empty_cond_.dump ();
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ : not_empty_cond_ (0),
+ not_full_cond_ (0),
+ enqueue_waiters_ (0),
+ dequeue_waiters_ (0)
+#else
+ : not_empty_cond_ (this->lock_),
+ not_full_cond_ (this->lock_)
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
+
+ if (this->open (hwm, lwm, ns) == -1)
+ ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open")));
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
+ if (this->head_ != 0 && this->close () == -1)
+ ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close")));
+}
+
+// Don't bother locking since if someone calls this function more than
+// once for the same queue, we're in bigger trouble than just
+// concurrency control!
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
+ this->high_water_mark_ = hwm;
+ this->low_water_mark_ = lwm;
+ this->deactivated_ = 0;
+ this->cur_bytes_ = 0;
+ this->cur_count_ = 0;
+ this->tail_ = 0;
+ this->head_ = 0;
+ this->notification_strategy_ = ns;
+ return 0;
+}
+
+// Implementation of the public deactivate() method
+// (assumes locks are held).
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
+ int current_status =
+ this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+
+ // Wakeup all waiters.
+#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ this->not_empty_cond_.broadcast ();
+ this->not_full_cond_.broadcast ();
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+
+ this->deactivated_ = 1;
+ return current_status;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
+ int current_status =
+ this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+ this->deactivated_ = 0;
+ return current_status;
+}
+
+// Clean up the queue if we have not already done so!
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ int res = this->deactivate_i ();
+
+ // Free up the remaining message on the list
+
+ for (this->tail_ = 0; this->head_ != 0; )
+ {
+ this->cur_count_--;
+
+ ACE_Message_Block *temp;
+
+ // Decrement all the counts.
+ for (temp = this->head_;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
+
+ temp = this->head_;
+ this->head_ = this->head_->next ();
+
+ // Make sure to use <release> rather than <delete> since this is
+ // reference counted.
+ temp->release ();
+ }
+
+ return res;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
+{
+#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ if (this->not_full_cond_.signal () != 0)
+ return -1;
+#else
+ if (this->enqueue_waiters_ > 0)
+ {
+ --this->enqueue_waiters_;
+ return this->not_full_cond_.release ();
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return 0;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
+{
+#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ // Tell any blocked threads that the queue has a new item!
+ if (this->not_empty_cond_.signal () != 0)
+ return -1;
+#else
+ if (this->dequeue_waiters_ > 0)
+ {
+ --this->dequeue_waiters_;
+ return this->not_empty_cond_.release ();
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return 0;
+}
+
+// Actually put the node at the end (no locking so must be called with
+// locks held).
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
+
+ if (new_item == 0)
+ return -1;
+
+ // List was empty, so build a new one.
+ if (this->tail_ == 0)
+ {
+ this->head_ = new_item;
+ this->tail_ = new_item;
+ new_item->next (0);
+ new_item->prev (0);
+ }
+ // Link at the end.
+ else
+ {
+ new_item->next (0);
+ this->tail_->next (new_item);
+ new_item->prev (this->tail_);
+ this->tail_ = new_item;
+ }
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ if (this->signal_dequeue_waiters () == -1)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually put the node at the head (no locking)
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
+
+ if (new_item == 0)
+ return -1;
+
+ new_item->prev (0);
+ new_item->next (this->head_);
+
+ if (this->head_ != 0)
+ this->head_->prev (new_item);
+ else
+ this->tail_ = new_item;
+
+ this->head_ = new_item;
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ if (this->signal_dequeue_waiters () == -1)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually put the node at its proper position relative to its
+// priority.
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ if (new_item == 0)
+ return -1;
+
+ if (this->head_ == 0)
+ // Check for simple case of an empty queue, where all we need to
+ // do is insert <new_item> into the head.
+ return this->enqueue_head_i (new_item);
+ else
+ {
+ ACE_Message_Block *temp;
+
+ // Figure out where the new item goes relative to its priority.
+ // We start looking from the highest priority to the lowest
+ // priority.
+
+ for (temp = this->head_;
+ temp != 0;
+ temp = temp->next ())
+ if (temp->msg_priority () < new_item->msg_priority ())
+ // Break out when we've located an item that has lower
+ // priority that <new_item>.
+ break;
+
+ if (temp == 0)
+ // Check for simple case of inserting at the tail of the queue,
+ // where all we need to do is insert <new_item> after the
+ // current tail.
+ return this->enqueue_tail_i (new_item);
+ else if (temp->prev () == 0)
+ // Check for simple case of inserting at the head of the
+ // queue, where all we need to do is insert <new_item> before
+ // the current head.
+ return this->enqueue_head_i (new_item);
+ else
+ {
+ // Insert the new message ahead of the item of
+ // lesser priority. This ensures that FIFO order is
+ // maintained when messages of the same priority are
+ // inserted consecutively.
+ new_item->next (temp);
+ new_item->prev (temp->prev ());
+ temp->prev ()->next (new_item);
+ temp->prev (new_item);
+ }
+ }
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ if (this->signal_dequeue_waiters () == -1)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually get the first ACE_Message_Block (no locking, so must be
+// called with locks held). This method assumes that the queue has at
+// least one item in it when it is called.
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+ first_item = this->head_;
+ this->head_ = this->head_->next ();
+
+ if (this->head_ == 0)
+ this->tail_ = 0;
+ else
+ // The prev pointer of the first message block has to point to
+ // NULL...
+ this->head_->prev (0);
+
+ // Make sure to subtract off all of the bytes associated with this
+ // message.
+ for (ACE_Message_Block *temp = first_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
+
+ this->cur_count_--;
+
+ if (this->signal_enqueue_waiters () == -1)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Take a look at the first item without removing it.
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait for at least one item to become available.
+
+ if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ return -1;
+
+ first_item = this->head_;
+ return this->cur_count_;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv)
+{
+ int result = 0;
+#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ while (this->is_full_i () && result != -1)
+ {
+ ++this->enqueue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ mon.release ();
+ if (tv == 0)
+ result = this->not_full_cond_.acquire ();
+ else
+ result = this->not_full_cond_.acquire (*tv);
+ int error = errno;
+ mon.acquire ();
+ errno = error;
+ }
+#else
+ ACE_UNUSED_ARG (mon);
+
+ // Wait while the queue is full.
+
+ while (this->is_full_i ())
+ {
+ if (this->not_full_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ result = -1;
+ break;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ result = -1;
+ break;
+ }
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return result;
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv)
+{
+ int result = 0;
+#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ while (this->is_empty_i () && result != -1)
+ {
+ ++this->dequeue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ mon.release ();
+ if (tv == 0)
+ result = this->not_empty_cond_.acquire ();
+ else
+ {
+ result = this->not_empty_cond_.acquire (*tv);
+ if (result == -1 && errno == ETIME)
+ errno = EWOULDBLOCK;
+ }
+ int error = errno;
+ mon.acquire ();
+ errno = error;
+ }
+#else
+ ACE_UNUSED_ARG (mon);
+
+ // Wait while the queue is empty.
+
+ while (this->is_empty_i ())
+ {
+ if (this->not_empty_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ result = -1;
+ break;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ result = -1;
+ break;
+ }
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return result;
+}
+
+// Block indefinitely waiting for an item to arrive, does not ignore
+// alerts (e.g., signals).
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
+
+ int queue_count = this->enqueue_head_i (new_item);
+
+ if (queue_count == -1)
+ return -1;
+ else
+ {
+ this->notify ();
+ return queue_count;
+ }
+}
+
+// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
+// accordance with its <msg_priority> (0 is lowest priority). Returns
+// -1 on failure, else the number of items still on the queue.
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
+
+ int queue_count = this->enqueue_i (new_item);
+
+ if (queue_count == -1)
+ return -1;
+ else
+ {
+ this->notify ();
+ return queue_count;
+ }
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
+ return this->enqueue_prio (new_item, tv);
+}
+
+// Block indefinitely waiting for an item to arrive,
+// does not ignore alerts (e.g., signals).
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
+
+ int queue_count = this->enqueue_tail_i (new_item);
+
+ if (queue_count == -1)
+ return -1;
+ else
+ {
+ this->notify ();
+ return queue_count;
+ }
+}
+
+// Remove an item from the front of the queue. If TV == 0 block
+// indefinitely (or until an alert occurs). Otherwise, block for upto
+// the amount of time specified by TV.
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ return -1;
+
+ return this->dequeue_head_i (first_item);
+}
+
+template <ACE_SYNCH_DECL> int
+ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
+
+ // By default, don't do anything.
+ if (this->notification_strategy_ == 0)
+ return 0;
+ else
+ return this->notification_strategy_->notify ();
+}
+
+
+/////////////////////////////////////
+// class ACE_Dynamic_Message_Queue //
+/////////////////////////////////////
+
+ // = Initialization and termination methods.
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
+ ACE_Dynamic_Message_Strategy & message_strategy,
+ size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+ : ACE_Message_Queue (hwm, lwm, ns)
+ , message_strategy_ (message_strategy)
+{
+ // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
+ // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
+{
+ delete &message_strategy_;
+}
+// dtor: free message strategy and let base class dtor do the rest
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ int result = 0;
+
+ // refresh dynamic priority of the new message
+ result = (*priority_eval_func_ptr_) (*new_item, tv);
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ this->refresh_priorities (current_time);
+
+ // reorganize the queue according to the new priorities
+ this->refresh_queue (current_time);
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+
+ int result = 0;
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ result = this->refresh_priorities (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // reorganize the queue according to the new priorities
+ result = this->refresh_queue (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
+{
+ int result = 0;
+
+ // apply the priority update function to all enqueued
+ // messages, starting at the head of the queue
+ ACE_Message_Block *temp = head_;
+ while (temp)
+ {
+ result = (*priority_eval_func_ptr_) (*temp, tv);
+ if (result < 0)
+ {
+ break;
+ }
+
+ temp = temp->next ();
+ }
+
+ return result;
+}
+ // refresh the priorities in the queue according
+ // to a specific priority assignment function
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
+{
+ // first, drop any messages from the queue and delete them:
+ // reference counting at the data block level means that the
+ // underlying data block will not be deleted if another
+ // message block is still pointing to it.
+ ACE_Message_Block *temp = (pending_list_tail_)
+ ? pending_list_tail_->next ()
+ : head_;
+
+ while (temp)
+ {
+ // messages that have overflowed the given time bounds must be removed
+ if (message_strategy_.is_beyond_late (*temp, tv))
+ {
+ // find the end of the chain of overflowed messages
+ ACE_Message_Block *remove_tail = temp;
+ while ((remove_tail) && (remove_tail->next ()) &&
+ message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
+ {
+ remove_tail = remove_tail->next ();
+ }
+
+ temp = remove_tail->next ();
+ if (remove_temp->next ())
+ {
+ remove_temp->next ()->prev (0);
+ }
+ else if (remove_temp->prev ())
+ {
+ remove_temp->prev ()->next (0);
+ }
+ else
+ {
+ head_ = 0;
+ tail_ = 0;
+ }
+ remove_temp->prev (0);
+ remove_temp->next (0);
+
+ temp = remove_temp->next ();
+
+ }
+ else
+ {
+ temp = temp->next ();
+ }
+ }
+}
+ // refresh the order of messages in the queue
+ // after refreshing their priorities
+
+
+/////////////////////////////////////
+// class ACE_Message_Queue_Factory //
+/////////////////////////////////////
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+{
+ return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns);
+}
+ // factory method for a statically prioritized ACE_Message_Queue
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Deadline_Message_Strategy *adms;
+
+ ACE_NEW_RETURN (adms,
+ ACE_Deadline_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns);
+}
+ // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Laxity_Message_Strategy *alms;
+
+ ACE_NEW_RETURN (alms,
+ ACE_Laxity_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns);
+}
+ // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
+
+
+#endif /* ACE_MESSAGE_QUEUE_T_C */
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
new file mode 100644
index 00000000000..cc181c04a99
--- /dev/null
+++ b/ace/Message_Queue_T.h
@@ -0,0 +1,486 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// ace
+//
+// = FILENAME
+// Message_Queue_T.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (ACE_MESSAGE_QUEUE_T_H)
+#define ACE_MESSAGE_QUEUE_T_H
+
+#include "ace/Synch.h"
+
+template <ACE_SYNCH_DECL>
+class ACE_Message_Queue : public ACE_Message_Queue_Base
+{
+ // = TITLE
+ // A threaded message queueing facility, modeled after the
+ // queueing facilities in System V STREAMs.
+ //
+ // = DESCRIPTION
+ // An <ACE_Message_Queue> is the central queueing facility for
+ // messages in the ASX framework. If <ACE_SYNCH_DECL> is
+ // ACE_MT_SYNCH then all operations are thread-safe. Otherwise,
+ // if it's <ACE_NULL_SYNCH> then there's no locking overhead.
+public:
+ friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
+ friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
+
+ // = Traits
+ typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR;
+ typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR;
+
+ // = Initialization and termination methods.
+ ACE_Message_Queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ // Create a message queue with all the defaults.
+ virtual int open (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+ // Create a message queue with all the defaults.
+
+ virtual int close (void);
+ // Close down the message queue and release all resources.
+
+ virtual ~ACE_Message_Queue (void);
+ // Close down the message queue and release all resources.
+
+ // = Enqueue and dequeue methods.
+
+ // For all the following routines if <timeout> == 0, the caller will
+ // block until action is possible, else will wait until the absolute
+ // time specified in *<timeout> elapses). These calls will return,
+ // however, when queue is closed, deactivated, when a signal occurs,
+ // or if the time specified in timeout elapses, (in which case errno
+ // = EWOULDBLOCK).
+
+ virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv = 0);
+ // Retrieve the first <ACE_Message_Block> without removing it.
+ // Returns -1 on failure, else the number of items still on the
+ // queue.
+
+ virtual int enqueue_prio (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
+ // accordance with its <msg_priority> (0 is lowest priority). FIFO
+ // order is maintained when messages of the same priority are
+ // inserted consecutively. Returns -1 on failure, else the number
+ // of items still on the queue.
+
+ virtual int enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ // This is an alias for <enqueue_prio>. It's only here for
+ // backwards compatibility and will go away in a subsequent release.
+ // Please use <enqueue_prio> instead.
+
+ virtual int enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ // Enqueue an <ACE_Message_Block *> at the end of the queue.
+ // Returns -1 on failure, else the number of items still on the
+ // queue.
+
+ virtual int enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ // Enqueue an <ACE_Message_Block *> at the head of the queue.
+ // Returns -1 on failure, else the number of items still on the
+ // queue.
+
+ virtual int dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue. Returns -1 on failure, else the number of items still on
+ // the queue.
+
+ // = Check if queue is full/empty.
+ virtual int is_full (void);
+ // True if queue is full, else false.
+ virtual int is_empty (void);
+ // True if queue is empty, else false.
+
+ // = Queue statistic methods.
+ virtual size_t message_bytes (void);
+ // Number of total bytes on the queue.
+ virtual size_t message_count (void);
+ // Number of total messages on the queue.
+
+ // = Flow control routines
+ virtual size_t high_water_mark (void);
+ // Get high watermark.
+ virtual void high_water_mark (size_t hwm);
+ // Set high watermark.
+ virtual size_t low_water_mark (void);
+ // Get low watermark.
+ virtual void low_water_mark (size_t lwm);
+ // Set low watermark.
+
+ // = Activation control methods.
+
+ virtual int deactivate (void);
+ // Deactivate the queue and wakeup all threads waiting on the queue
+ // so they can continue. No messages are removed from the queue,
+ // however. Any other operations called until the queue is
+ // activated again will immediately return -1 with <errno> ==
+ // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ // call and WAS_ACTIVE if queue was active before the call.
+
+ virtual int activate (void);
+ // Reactivate the queue so that threads can enqueue and dequeue
+ // messages again. Returns WAS_INACTIVE if queue was inactive
+ // before the call and WAS_ACTIVE if queue was active before the
+ // call.
+
+ virtual int deactivated (void);
+ // Returns true if <deactivated_> is enabled.
+
+ // = Notification hook.
+
+ virtual int notify (void);
+ // This hook is automatically invoked by <enqueue_head>,
+ // <enqueue_tail>, and <enqueue_prio> when a new item is inserted
+ // into the queue. Subclasses can override this method to perform
+ // specific notification strategies (e.g., signaling events for a
+ // <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
+ // multi-threaded application with concurrent consumers, there is no
+ // guarantee that the queue will be still be non-empty by the time
+ // the notification occurs.
+
+ // = Get/set the notification strategy for the <Message_Queue>
+ virtual ACE_Notification_Strategy *notification_strategy (void);
+ virtual void notification_strategy (ACE_Notification_Strategy *s);
+
+ void dump (void) const;
+ // Dump the state of an object.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+protected:
+ // = Routines that actually do the enqueueing and dequeueing.
+ // These routines assume that locks are held by the corresponding
+ // public methods. Since they are virtual, you can change the
+ // queueing mechanism by subclassing from <ACE_Message_Queue>.
+
+ virtual int enqueue_i (ACE_Message_Block *new_item);
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+
+ virtual int enqueue_tail_i (ACE_Message_Block *new_item);
+ // Enqueue an <ACE_Message_Block *> at the end of the queue.
+
+ virtual int enqueue_head_i (ACE_Message_Block *new_item);
+ // Enqueue an <ACE_Message_Block *> at the head of the queue.
+
+ virtual int dequeue_head_i (ACE_Message_Block *&first_item);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+ // = Check the boundary conditions (assumes locks are held).
+ virtual int is_full_i (void);
+ // True if queue is full, else false.
+ virtual int is_empty_i (void);
+ // True if queue is empty, else false.
+
+ // = Implementation of the public activate() and deactivate() methods above (assumes locks are held).
+ virtual int deactivate_i (void);
+ // Deactivate the queue.
+ virtual int activate_i (void);
+ // Activate the queue.
+
+ // = Helper methods to factor out common #ifdef code.
+ virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv);
+ // Wait for the queue to become non-full.
+
+ virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv);
+ // Wait for the queue to become non-empty.
+
+ virtual int signal_enqueue_waiters (void);
+ // Inform any threads waiting to enqueue that they can procede.
+
+ virtual int signal_dequeue_waiters (void);
+ // Inform any threads waiting to dequeue that they can procede.
+
+ ACE_Message_Block *head_;
+ // Pointer to head of ACE_Message_Block list.
+
+ ACE_Message_Block *tail_;
+ // Pointer to tail of ACE_Message_Block list.
+
+ size_t low_water_mark_;
+ // Lowest number before unblocking occurs.
+
+ size_t high_water_mark_;
+ // Greatest number of bytes before blocking.
+
+ size_t cur_bytes_;
+ // Current number of bytes in the queue.
+
+ size_t cur_count_;
+ // Current number of messages in the queue.
+
+ int deactivated_;
+ // Indicates that the queue is inactive.
+
+ ACE_Notification_Strategy *notification_strategy_;
+ // The notification strategy used when a new message is enqueued.
+
+ // = Synchronization primitives for controlling concurrent access.
+ ACE_SYNCH_MUTEX_T lock_;
+ // Protect queue from concurrent access.
+
+#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ ACE_SYNCH_SEMAPHORE_T not_empty_cond_;
+ // Used to make threads sleep until the queue is no longer empty.
+
+ ACE_SYNCH_SEMAPHORE_T not_full_cond_;
+ // Used to make threads sleep until the queue is no longer full.
+
+ size_t dequeue_waiters_;
+ // Number of threads waiting to dequeue a <Message_Block>.
+
+ size_t enqueue_waiters_;
+ // Number of threads waiting to enqueue a <Message_Block>.
+#else
+ ACE_SYNCH_CONDITION_T not_empty_cond_;
+ // Used to make threads sleep until the queue is no longer empty.
+
+ ACE_SYNCH_CONDITION_T not_full_cond_;
+ // Used to make threads sleep until the queue is no longer full.
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+
+private:
+
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
+};
+
+template <ACE_SYNCH_DECL>
+class ACE_Message_Queue_Iterator
+{
+ // = TITLE
+ // Iterator for the <ACE_Message_Queue>.
+public:
+ // = Initialization method.
+ ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
+
+ // = Iteration methods.
+ int next (ACE_Message_Block *&entry);
+ // Pass back the <entry> that hasn't been seen in the queue.
+ // Returns 0 when all items have been seen, else 1.
+
+ int done (void) const;
+ // Returns 1 when all items have been seen, else 0.
+
+ int advance (void);
+ // Move forward by one element in the queue. Returns 0 when all the
+ // items in the set have been seen, else 1.
+
+ void dump (void) const;
+ // Dump the state of an object.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+private:
+ ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
+ // Message_Queue we are iterating over.
+
+ ACE_Message_Block *curr_;
+ // Keeps track of how far we've advanced...
+};
+
+template <ACE_SYNCH_DECL>
+class ACE_Message_Queue_Reverse_Iterator
+{
+ // = TITLE
+ // Reverse Iterator for the <ACE_Message_Queue>.
+public:
+ // = Initialization method.
+ ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue);
+
+ // = Iteration methods.
+ int next (ACE_Message_Block *&entry);
+ // Pass back the <entry> that hasn't been seen in the queue.
+ // Returns 0 when all items have been seen, else 1.
+
+ int done (void) const;
+ // Returns 1 when all items have been seen, else 0.
+
+ int advance (void);
+ // Move forward by one element in the queue. Returns 0 when all the
+ // items in the set have been seen, else 1.
+
+ void dump (void) const;
+ // Dump the state of an object.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+private:
+ ACE_Message_Queue <ACE_SYNCH_USE> &queue_;
+ // Message_Queue we are iterating over.
+
+ ACE_Message_Block *curr_;
+ // Keeps track of how far we've advanced...
+};
+
+template <ACE_SYNCH_DECL>
+class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
+{
+ // = TITLE
+ // A derived class which adapts the <ACE_Message_Queue>
+ // class in order to maintain dynamic priorities for enqueued
+ // <ACE_Message_Blocks> and manage the queue dynamically.
+ //
+ // = DESCRIPTION
+ // Priorities and queue orderings are refreshed at each enqueue
+ // and dequeue operation. Head and tail enqueue methods were
+ // made private to prevent out-of-order messages from confusing
+ // the pending and late portions of the queue. Messages in the
+ // pending portion of the queue whose dynamic priority becomes
+ // negative are placed into the late portion of the queue.
+ // Messages in the late portion of the queue whose dynamic
+ // priority becomes positive are dropped. These behaviors
+ // support a limited schedule overrun corresponding to one full
+ // cycle through dynamic priority values. These behaviors can
+ // be modified in derived classes by providing alternative
+ // definitions for the appropriate virtual methods.
+ //
+public:
+ // = Initialization and termination methods.
+ ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
+ size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ virtual ~ACE_Dynamic_Message_Queue (void);
+ // Close down the message queue and release all resources.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+protected:
+ virtual int enqueue_i (ACE_Message_Block *new_item);
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all
+ // enqueued messages.
+
+ virtual int dequeue_head_i (ACE_Message_Block *&first_item);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+ virtual int refresh_priorities (const ACE_Time_Value & tv);
+ // Refresh the priorities in the queue according to a specific
+ // priority assignment function.
+
+ virtual int refresh_queue (const ACE_Time_Value & tv);
+ // Refresh the order of messages in the queue after refreshing their
+ // priorities.
+
+ ACE_Message_Block *pending_list_tail_;
+ // Pointer to tail of the pending messages (those whose priority is
+ // and has been non-negative) in the <ACE_Message_Block list>.
+
+ ACE_Dynamic_Message_Strategy &message_strategy_;
+ // Pointer to a dynamic priority evaluation function.
+
+private:
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &))
+
+ // = These methods can wierdness in dynamically prioritized queue.
+
+ // Disallow their use until and unless a coherent semantics for head
+ // and tail enqueueing can be identified.
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
+
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv = 0))
+ // Since messages are *dynamically* prioritized, it is not possible
+ // to guarantee that the message at the head of the queue when this
+ // method is called will still be at the head when the next method
+ // is called: disallow its use until and unless a coherent semantics
+ // for peeking at the head of the queue can be identified.
+};
+
+template <ACE_SYNCH_DECL>
+class ACE_Export ACE_Message_Queue_Factory
+{
+ // = TITLE
+ // ACE_Message_Queue_Factory is a static factory class template which
+ // provides a separate factory method for each of the major kinds of
+ // priority based message dispatching: static, earliest deadline first
+ // (EDF), and minimum laxity first (MLF).
+ //
+ // = DESCRIPTION
+ // The ACE_Dynamic_Message_Queue class assumes responsibility for
+ // releasing the resources of the strategy with which it was
+ // constructed: the user of a message queue constructed by
+ // any of these factory methods is only responsible for
+ // ensuring destruction of the message queue itself.
+
+public:
+ static ACE_Message_Queue<ACE_SYNCH_USE> *
+ create_static_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+ // factory method for a statically prioritized ACE_Message_Queue
+
+ static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ create_deadline_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0,
+ u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
+
+ static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ create_laxity_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0,
+ u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
+};
+
+#if defined (__ACE_INLINE__)
+#include "ace/Message_Queue_T.i"
+#endif /* __ACE_INLINE__ */
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ace/Message_Queue_T.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Message_Queue_T.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_MESSAGE_QUEUE_T_H */
diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i
new file mode 100644
index 00000000000..e2f7878c0a2
--- /dev/null
+++ b/ace/Message_Queue_T.i
@@ -0,0 +1,146 @@
+/* -*- C++ -*- */
+// $Id$
+
+template <ACE_SYNCH_DECL> ACE_INLINE ACE_Notification_Strategy *
+ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
+
+ return this->notification_strategy_;
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE void
+ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
+
+ this->notification_strategy_ = s;
+}
+
+// Check if queue is empty (does not hold locks).
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i");
+ return this->cur_bytes_ <= 0 && this->cur_count_ <= 0;
+}
+
+// Check if queue is full (does not hold locks).
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i");
+ return this->cur_bytes_ > this->high_water_mark_;
+}
+
+// Check if queue is empty (holds locks).
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->is_empty_i ();
+}
+
+// Check if queue is full (holds locks).
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->is_full_i ();
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE size_t
+ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+
+ return this->high_water_mark_;
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE void
+ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark");
+ ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
+
+ this->high_water_mark_ = hwm;
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE size_t
+ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+
+ return this->low_water_mark_;
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE void
+ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark");
+ ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_);
+
+ this->low_water_mark_ = lwm;
+}
+
+// Return the current number of bytes in the queue.
+
+template <ACE_SYNCH_DECL> ACE_INLINE size_t
+ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+
+ return this->cur_bytes_;
+}
+
+// Return the current number of messages in the queue.
+
+template <ACE_SYNCH_DECL> ACE_INLINE size_t
+ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0);
+
+ return this->cur_count_;
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::activate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->activate_i ();
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ return this->deactivate_i ();
+}
+
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated");
+
+ return this->deactivated_;
+}
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
+
+