summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp508
1 files changed, 508 insertions, 0 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
new file mode 100644
index 00000000000..30b8a77abaa
--- /dev/null
+++ b/ace/Message_Queue.cpp
@@ -0,0 +1,508 @@
+// Message_Queue.cpp
+// $Id$
+
+#if !defined (ACE_MESSAGE_QUEUE_C)
+#define ACE_MESSAGE_QUEUE_C
+
+#define ACE_BUILD_DLL
+#include "ace/Message_Queue.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Message_Queue.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+
+template <ACE_SYNCH_1> void
+ACE_Message_Queue<ACE_SYNCH_2>::dump (void) const
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dump");
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG,
+ "deactivated = %d\n"
+ "low_water_mark = %d\n"
+ "high_water_mark = %d\n"
+ "cur_bytes = %d\n"
+ "cur_count = %d\n",
+ this->deactivated_,
+ this->low_water_mark_,
+ this->high_water_mark_,
+ this->cur_bytes_,
+ this->cur_count_));
+ ACE_DEBUG ((LM_DEBUG,"notfull_cond: \n"));
+ notfull_cond_.dump();
+ ACE_DEBUG ((LM_DEBUG,"notempty_cond: \n"));
+ notempty_cond_.dump();
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+template <ACE_SYNCH_1>
+ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue (size_t hwm,
+ size_t lwm)
+ : notfull_cond_ (this->lock_),
+ notempty_cond_ (this->lock_)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::ACE_Message_Queue");
+ if (this->open (hwm, lwm) == -1)
+ ACE_ERROR ((LM_ERROR, "open"));
+}
+
+template <ACE_SYNCH_1>
+ACE_Message_Queue<ACE_SYNCH_2>::~ACE_Message_Queue (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::~ACE_Message_Queue");
+ if (this->head_ != 0)
+ if (this->close () == -1)
+ ACE_ERROR ((LM_ERROR, "close"));
+}
+
+// Don't bother locking since if someone calls this function more than
+// once for the same queue, we're in bigger trouble than just
+// concurrency control!
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::open (size_t hwm, size_t lwm)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::open");
+ this->high_water_mark_ = hwm;
+ this->low_water_mark_ = lwm;
+ this->deactivated_ = 0;
+ this->cur_bytes_ = 0;
+ this->cur_count_ = 0;
+ this->tail_ = 0;
+ this->head_ = 0;
+ return 0;
+}
+
+// Implementation of the public deactivate() method
+// (assumes locks are held).
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::deactivate_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::deactivate_i");
+ int current_status =
+ this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+
+ // Wakeup all waiters.
+ this->notempty_cond_.broadcast ();
+ this->notfull_cond_.broadcast ();
+
+ this->deactivated_ = 1;
+ return current_status;
+}
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::activate_i (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::activate_i");
+ int current_status =
+ this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
+ this->deactivated_ = 0;
+ return current_status;
+}
+
+// Clean up the queue if we have not already done so!
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::close (void)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::close");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ int res = this->deactivate_i ();
+
+ // Free up the remaining message on the list
+
+ for (this->tail_ = 0; this->head_ != 0; )
+ {
+ ACE_Message_Block *temp;
+
+ // Make sure we decrement all the counts.
+ for (temp = this->head_;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
+
+ this->cur_count_--;
+
+ this->head_ = this->head_->next ();
+ delete temp;
+ }
+
+ return res;
+}
+
+// Actually put the node at the end (no locking so must be called with
+// locks held).
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i");
+
+ if (new_item == 0)
+ return -1;
+
+ // List was empty, so build a new one.
+ if (this->tail_ == 0)
+ {
+ this->head_ = new_item;
+ this->tail_ = new_item;
+ new_item->next (0);
+ new_item->prev (0);
+ }
+ // Link at the end.
+ else
+ {
+
+ new_item->next (0);
+ this->tail_->next (new_item);
+ new_item->prev (this->tail_);
+ this->tail_ = new_item;
+ }
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ // Tell any blocked threads that the queue has a new item!
+ if (this->notempty_cond_.signal () != 0)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually put the node at the head (no locking)
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i");
+
+ if (new_item == 0)
+ return -1;
+
+ new_item->prev (0);
+ new_item->next (this->head_);
+
+ if (this->head_ != 0)
+ this->head_->prev (new_item);
+ else
+ this->tail_ = new_item;
+
+ this->head_ = new_item;
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ // Tell any blocked threads that the queue has a new item!
+ if (this->notempty_cond_.signal () != 0)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually put the node at its proper position relative to its
+// priority.
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i");
+
+ if (new_item == 0)
+ return -1;
+
+ if (this->head_ == 0)
+ // Check for simple case of an empty queue, where all we need to
+ // do is insert <new_item> into the head.
+ return this->enqueue_head_i (new_item);
+ else
+ {
+ ACE_Message_Block *temp;
+
+ // Figure out where the new item goes relative to its priority.
+ // We start looking from the highest priority to the lowest
+ // priority.
+
+ for (temp = this->tail_;
+ temp != 0;
+ temp = temp->prev ())
+ {
+ if (temp->msg_priority () >= new_item->msg_priority ())
+ // Break out when we've located an item that has higher
+ // priority that <new_item>.
+ break;
+ }
+
+ if (temp == 0)
+ // Check for simple case of inserting at the head of the queue,
+ // where all we need to do is insert <new_item> before the
+ // current head.
+ return this->enqueue_head_i (new_item);
+ else if (temp->next () == 0)
+ // Check for simple case of inserting at the end of the
+ // queue, where all we need to do is insert <new_item> after
+ // the current tail.
+ return this->enqueue_tail_i (new_item);
+ else
+ {
+ // Insert the message right before the item of equal or
+ // higher priority. This ensures that FIFO order is
+ // maintained when messages of the same priority are
+ // inserted consecutively.
+ new_item->prev (temp);
+ new_item->next (temp->next ());
+ temp->next ()->prev (new_item);
+ temp->next (new_item);
+ }
+ }
+
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
+
+ this->cur_count_++;
+
+ // Tell any blocked threads that the queue has a new item!
+ if (this->notempty_cond_.signal () != 0)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Actually get the first ACE_Message_Block (no locking, so must be called
+// with locks held). This method assumes that the queue has at least
+// one item in it when it is called.
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i");
+ first_item = this->head_;
+ this->head_ = this->head_->next ();
+
+ if (this->head_ == 0)
+ this->tail_ = 0;
+ else
+ // The prev pointer of the first message block has to point to
+ // NULL...
+ this->head_->prev (0);
+
+ // Make sure to subtract off all of the bytes associated with this
+ // message.
+ for (ACE_Message_Block *temp = first_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
+
+ this->cur_count_--;
+
+#if 0
+ if (this->cur_bytes_ <= this->low_water_mark_)
+ // If queue is no longer full signal any waiting threads.
+#endif /* 0 */
+
+ if (this->notfull_cond_.signal () != 0)
+ return -1;
+ else
+ return this->cur_count_;
+}
+
+// Take a look at the first item without removing it.
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait for at least one item to become available
+
+ while (this->is_empty_i ())
+ {
+ if (this->notempty_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+
+ first_item = this->head_;
+ return this->cur_count_;
+}
+
+// Block indefinitely waiting for an item to arrive,
+// does not ignore alerts (e.g., signals).
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait while the queue is full
+
+ while (this->is_full_i ())
+ {
+ if (this->notfull_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+ return this->enqueue_head_i (new_item);
+}
+
+// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
+// accordance with its <msg_priority> (0 is lowest priority). Returns
+// -1 on failure, else the number of items still on the queue.
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue");
+
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait while the queue is full
+
+ while (this->is_full_i ())
+ {
+ if (this->notfull_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+
+ return this->enqueue_i (new_item);
+}
+
+// Block indefinitely waiting for an item to arrive,
+// does not ignore alerts (e.g., signals).
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait while the queue is full
+
+ while (this->is_full_i ())
+ {
+ if (this->notfull_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+ return this->enqueue_tail_i (new_item);
+}
+
+// Remove an item from the front of the queue. If TV == 0 block
+// indefinitely (or until an alert occurs). Otherwise, block for upto
+// the amount of time specified by TV.
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ // Wait while the queue is empty.
+
+ while (this->is_empty_i ())
+ {
+ if (this->notempty_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+
+ return this->dequeue_head_i (first_item);
+}
+
+#endif /* ACE_MESSAGE_QUEUE_C */