summaryrefslogtreecommitdiff
path: root/ace/Message_Queue_T.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Message_Queue_T.cpp')
-rw-r--r--ace/Message_Queue_T.cpp1033
1 files changed, 737 insertions, 296 deletions
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index 0e952d50b2b..3e30afa25f1 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -400,34 +400,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
// We start looking from the highest priority to the lowest
// priority.
- for (temp = this->head_;
+ for (temp = this->tail_;
temp != 0;
- temp = temp->next ())
- if (temp->msg_priority () < new_item->msg_priority ())
- // Break out when we've located an item that has lower
- // priority that <new_item>.
+ temp = temp->prev ())
+ if (temp->msg_priority () >= new_item->msg_priority ())
+ // Break out when we've located an item that has
+ // greater or equal priority.
break;
if (temp == 0)
- // Check for simple case of inserting at the tail of the queue,
- // where all we need to do is insert <new_item> after the
- // current tail.
- return this->enqueue_tail_i (new_item);
- else if (temp->prev () == 0)
- // Check for simple case of inserting at the head of the
- // queue, where all we need to do is insert <new_item> before
- // the current head.
+ // Check for simple case of inserting at the head of the queue,
+ // where all we need to do is insert <new_item> before the
+ // current head.
return this->enqueue_head_i (new_item);
+ else if (temp->next () == 0)
+ // Check for simple case of inserting at the tail of the
+ // queue, where all we need to do is insert <new_item> after
+ // the current tail.
+ return this->enqueue_tail_i (new_item);
else
{
- // Insert the new message ahead of the item of
- // lesser priority. This ensures that FIFO order is
+ // Insert the new message behind the message of
+ // greater or equal priority. This ensures that FIFO order is
// maintained when messages of the same priority are
// inserted consecutively.
- new_item->next (temp);
- new_item->prev (temp->prev ());
- temp->prev ()->next (new_item);
- temp->prev (new_item);
+ new_item->prev (temp);
+ new_item->next (temp->next ());
+ temp->next ()->prev (new_item);
+ temp->next (new_item);
}
}
@@ -453,6 +453,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
{
+ if (this->head_ ==0)
+ {
+ ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1);
+ }
+
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
first_item = this->head_;
this->head_ = this->head_->next ();
@@ -483,7 +488,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -496,7 +501,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i
// Wait for at least one item to become available.
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
return -1;
first_item = this->head_;
@@ -505,7 +510,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
int result = 0;
#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
@@ -514,10 +519,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
++this->enqueue_waiters_;
// @@ Need to add sanity checks for failure...
mon.release ();
- if (tv == 0)
+ if (timeout == 0)
result = this->not_full_cond_.acquire ();
else
- result = this->not_full_cond_.acquire (*tv);
+ result = this->not_full_cond_.acquire (*timeout);
int error = errno;
mon.acquire ();
errno = error;
@@ -529,7 +534,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
while (this->is_full_i ())
{
- if (this->not_full_cond_.wait (tv) == -1)
+ if (this->not_full_cond_.wait (timeout) == -1)
{
if (errno == ETIME)
errno = EWOULDBLOCK;
@@ -549,7 +554,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
int result = 0;
#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
@@ -558,11 +563,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
++this->dequeue_waiters_;
// @@ Need to add sanity checks for failure...
mon.release ();
- if (tv == 0)
+ if (timeout == 0)
result = this->not_empty_cond_.acquire ();
else
{
- result = this->not_empty_cond_.acquire (*tv);
+ result = this->not_empty_cond_.acquire (*timeout);
if (result == -1 && errno == ETIME)
errno = EWOULDBLOCK;
}
@@ -577,7 +582,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
while (this->is_empty_i ())
{
- if (this->not_empty_cond_.wait (tv) == -1)
+ if (this->not_empty_cond_.wait (timeout) == -1)
{
if (errno == ETIME)
errno = EWOULDBLOCK;
@@ -600,7 +605,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -611,7 +616,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_head_i (new_item);
@@ -631,7 +636,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -642,7 +647,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_i (new_item);
@@ -658,10 +663,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
- return this->enqueue_prio (new_item, tv);
+ return this->enqueue_prio (new_item, timeout);
}
// Block indefinitely waiting for an item to arrive,
@@ -669,7 +674,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -680,7 +685,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_tail_i (new_item);
@@ -694,13 +699,13 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
}
}
-// Remove an item from the front of the queue. If TV == 0 block
+// Remove an item from the front of the queue. If timeout == 0 block
// indefinitely (or until an alert occurs). Otherwise, block for upto
-// the amount of time specified by TV.
+// the amount of time specified by timeout.
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -711,8 +716,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
return -1;
}
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
+ {
return -1;
+ }
return this->dequeue_head_i (first_item);
}
@@ -742,6 +749,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
size_t lwm,
ACE_Notification_Strategy *ns)
: ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns)
+ , pending_head_ (0)
+ , pending_tail_ (0)
+ , late_head_ (0)
+ , late_tail_ (0)
+ , beyond_late_head_ (0)
+ , beyond_late_tail_ (0)
, message_strategy_ (message_strategy)
{
// note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
@@ -751,55 +764,157 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
template <ACE_SYNCH_DECL>
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
{
- delete &message_strategy_;
+ delete &(this->message_strategy_);
}
// dtor: free message strategy and let base class dtor do the rest
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (
+ ACE_Message_Block *&list_head,
+ ACE_Message_Block *&list_tail,
+ u_int status_flags)
{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+ int result = 0;
- int result;
+ // start with an empty list
+ list_head = 0;
+ list_tail = 0;
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
- // refresh dynamic priority of the new message
- result = message_strategy_.update_priority (*new_item, current_time);
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh priority status boundaries in the queue
+ result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
- if (result < 0)
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) &&
+ (this->pending_head_) && (this->pending_tail_))
{
- return result;
+ // patch up pointers for the new tail of the queue
+ if (this->pending_head_->prev ())
+ {
+ this->tail_ = this->pending_head_->prev ();
+ this->pending_head_->prev ()->next (0);
+ }
+ else
+ {
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
+ }
+
+ // point to the head and tail of the list
+ list_head = this->pending_head_;
+ list_tail = this->pending_tail_;
+
+ // cut the pending messages out of the queue entirely
+ this->pending_head_->prev (0);
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
}
- // reorganize the queue according to the new priorities
- result = this->refresh_queue (current_time);
- if (result < 0)
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) &&
+ (this->late_head_) && (this->late_tail_))
{
- return result;
+ // patch up pointers for the (possibly) new head and tail of the queue
+ if (this->late_tail_->next ())
+ {
+ this->late_tail_->next ()->prev (this->late_head_->prev ());
+ }
+ else
+ {
+ this->tail_ = this->late_head_->prev ();
+ }
+ if (this->late_head_->prev ())
+ {
+ this->late_head_->prev ()->next (this->late_tail_->next ());
+ }
+ else
+ {
+ this->head_ = this->late_tail_->next ();
+ }
+
+ // put late messages behind pending messages (if any) being returned
+ this->late_head_->prev (list_tail);
+ if (list_tail)
+ {
+ list_tail->next (this->late_head_);
+ }
+ else
+ {
+ list_head = this->late_head_;
+ }
+ list_tail = this->late_tail_;
+
+ this->late_tail_->next (0);
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
}
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (new_item);
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) &&
+ (this->beyond_late_head_) && (this->beyond_late_tail_))
+ {
+ // patch up pointers for the new tail of the queue
+ if (this->beyond_late_tail_->next ())
+ {
+ this->head_ = this->beyond_late_tail_->next ();
+ this->beyond_late_tail_->next ()->prev (0);
+ }
+ else
+ {
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
+ }
+
+ // put beyond late messages at the end of the list being returned
+ if (list_tail)
+ {
+ this->beyond_late_head_->prev (list_tail);
+ list_tail->next (this->beyond_late_head_);
+ }
+ else
+ {
+ list_head = this->beyond_late_head_;
+ }
+ list_tail = this->beyond_late_tail_;
+
+ this->beyond_late_tail_->next (0);
+ this->beyond_late_head_ = 0;
+ this->beyond_late_tail_ = 0;
+ }
+
+ // decrement message and size counts for removed messages
+ ACE_Message_Block *temp1, *temp2;
+ for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ())
+ {
+ this->cur_count_--;
+
+ for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ())
+ {
+ this->cur_bytes_ -= temp2->size ();
+ }
+ }
return result;
}
- // Enqueue an <ACE_Message_Block *> in accordance with its priority.
- // priority may be *dynamic* or *static* or a combination or *both*
- // It calls the priority evaluation function passed into the Dynamic
- // Message Queue constructor to update the priorities of all enqueued
- // messages.
+ // Detach all messages with status given in the passed flags from
+ // the queue and return them by setting passed head and tail pointers
+ // to the linked list they comprise. This method is intended primarily
+ // as a means of periodically harvesting messages that have missed
+ // their deadlines, but is available in its most general form. All
+ // messages are returned in priority order, from head to tail, as of
+ // the time this method was called.
+
+
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
@@ -816,269 +931,636 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs
// get the current time
ACE_Time_Value current_time = ACE_OS::gettimeofday ();
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
+ // refresh priority status boundaries in the queue
+ result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // reorganize the queue according to the new priorities,
- // possibly dropping messages which are later than can
- // be represented by the range of priority values
+ // *now* it's appropriate to wait for an enqueued item
+ result = this->wait_not_empty_cond (ace_mon, timeout);
+ if (result == -1)
+ {
+ return result;
+ }
+
+ // call the internal dequeue method, which selects an
+ // item from the highest priority status portion of
+ // the queue that has messages enqueued.
+ result = dequeue_head_i (first_item);
+
+ return result;
+}
+ // Dequeue and return the <ACE_Message_Block *>
+ // at the (logical) head of the queue.
+
+template <ACE_SYNCH_DECL> void
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
+ this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("pending_head_ = %u\n")
+ ASYS_TEXT ("pending_tail_ = %u\n")
+ ASYS_TEXT ("late_head_ = %u\n")
+ ASYS_TEXT ("late_tail_ = %u\n")
+ ASYS_TEXT ("beyond_late_head_ = %u\n")
+ ASYS_TEXT ("beyond_late_tail_ = %u\n"),
+ this->pending_head_,
+ this->pending_tail_,
+ this->late_head_,
+ this->late_tail_,
+ this->beyond_late_head_,
+ this->beyond_late_tail_));
+
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message_strategy_ : \n")));
+ message_strategy_.dump ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+ // dump the state of the queue
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ int result = 0;
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh priority status boundaries in the queue
result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // *now* it's appropriate to wait for an enqueued item
- result = this->wait_not_empty_cond (ace_mon, tv);
- if (result == -1)
+ // where we enqueue depends on the message's priority status
+ switch (message_strategy_.priority_status (*new_item, current_time))
{
- return result;
+ case ACE_Dynamic_Message_Strategy::PENDING:
+ if (this->pending_tail_ == 0)
+ {
+ // Check for simple case of an empty pending queue, where all we need to
+ // do is insert <new_item> into the tail of the queue.
+ pending_head_ = new_item;
+ pending_tail_ = pending_head_;
+ result = this->enqueue_tail_i (new_item);
+ }
+ else
+ {
+ // enqueue the new message in priority order in the pending sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->pending_head_,
+ this->pending_tail_,
+ ACE_Dynamic_Message_Strategy::PENDING);
+ }
+
+ break;
+
+ case ACE_Dynamic_Message_Strategy::LATE:
+ if (this->late_tail_ == 0)
+ {
+ late_head_ = new_item;
+ late_tail_ = late_head_;
+
+ if (this->pending_head_ == 0)
+ {
+ // Check for simple case of an empty pending queue, where all
+ // we need to do is insert <new_item> into the tail of the queue.
+ result = this->enqueue_tail_i (new_item);
+ }
+ else if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ result = this->enqueue_head_i (new_item);
+ }
+ else
+ {
+ // otherwise, we can just splice the new message in between
+ // the pending and beyond late portions of the queue
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->pending_head_->prev (new_item);
+ new_item->next (this->pending_head_);
+ }
+ }
+ else
+ {
+ // enqueue the new message in priority order in the late sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->late_head_,
+ this->late_tail_,
+ ACE_Dynamic_Message_Strategy::LATE);
+ }
+ break;
+
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
+ if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ beyond_late_head_ = new_item;
+ beyond_late_tail_ = beyond_late_head_;
+ result = this->enqueue_head_i (new_item);
+ }
+ else
+ {
+ // all beyond late messages have the same (zero) priority, so
+ // just put the new one at the end of the beyond late messages
+ if (this->beyond_late_tail_->next ())
+ {
+ this->beyond_late_tail_->next ()->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ }
+
+ new_item->next (this->beyond_late_tail_->next ());
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->beyond_late_tail_ = new_item;
+ }
+
+ break;
+
+ // should never get here, but just in case...
+ default:
+ result = -1;
+ break;
}
- // invoke the internal virtual method
- return this->dequeue_head_i (first_item);
+ return result;
}
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue.
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
+ const ACE_Time_Value &current_time,
+ ACE_Message_Block *&sublist_head,
+ ACE_Message_Block *&sublist_tail,
+ ACE_Dynamic_Message_Strategy::Priority_Status status)
{
int result = 0;
+ ACE_Message_Block *current_item = 0;
- // apply the priority update function to all enqueued
- // messages, starting at the head of the queue
- ACE_Message_Block *temp = ACE_Message_Queue<ACE_SYNCH_USE>::head_;
- while (temp)
+ // find message after which to enqueue new item,
+ // based on message priority and priority status
+ for (current_item = sublist_tail;
+ current_item;
+ current_item = current_item->prev ())
{
- result = message_strategy_.update_priority (*temp, tv);
- if (result < 0)
+ if (message_strategy_.priority_status (*current_item, current_time) == status)
+ {
+ if (current_item->msg_priority () >= new_item->msg_priority ())
+ {
+ break;
+ }
+ }
+ else
{
+ sublist_head = new_item;
break;
}
-
- temp = temp->next ();
+ }
+
+ if (current_item == 0)
+ {
+ // if the new message has highest priority of any,
+ // put it at the head of the list (and sublist)
+ result = enqueue_head_i (new_item);
+ sublist_head = new_item;
+ }
+ else
+ {
+ // insert the new item into the list
+ new_item->next (current_item->next ());
+ new_item->prev (current_item);
+ if (current_item->next ())
+ {
+ current_item->next ()->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ }
+
+ current_item->next (new_item);
+
+ // if the new item has lowest priority of any in the sublist,
+ // move the tail pointer of the sublist back to the new item
+ if (current_item == sublist_tail)
+ {
+ sublist_tail = new_item;
+ }
}
return result;
}
- // refresh the priorities in the queue according
- // to a specific priority assignment function
+ // enqueue a message in priority order within a given priority status sublist
+
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
{
- // Remove messages that are later than the priority range can represent
- int result = remove_stale_messages (tv);
- if (result < 0)
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+
+ int result = 0;
+ int last_in_subqueue = 0;
+
+ // first, try to dequeue from the head of the pending list
+ if (this->pending_head_)
{
- return result;
+ first_item = this->pending_head_;
+
+ if (0 == this->pending_head_->prev ())
+ {
+ this->head_ = this->pending_head_->next ();
+ }
+ else
+ {
+ this->pending_head_->prev ()->next (this->pending_head_->next ());
+ }
+
+ if (0 == this->pending_head_->next ())
+ {
+ this->tail_ = this->pending_head_->prev ();
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
+ }
+ else
+ {
+ this->pending_head_->next ()->prev (this->pending_head_->prev ());
+ this->pending_head_ = this->pending_head_->next ();
+ }
+
+ first_item->prev (0);
+ first_item->next (0);
+ }
+ // second, try to dequeue from the head of the late list
+ else if (this->late_head_)
+ {
+ last_in_subqueue =
+ (this->late_head_ == this->late_tail_) ? 1 : 0;
+
+ first_item = this->late_head_;
+
+ if (0 == this->late_head_->prev ())
+ {
+ this->head_ = this->late_head_->next ();
+ }
+ else
+ {
+ this->late_head_->prev ()->next (this->late_head_->next ());
+ }
+
+ if (0 == this->late_head_->next ())
+ {
+ this->tail_ = this->late_head_->prev ();
+ }
+ else
+ {
+ this->late_head_->next ()->prev (this->late_head_->prev ());
+ this->late_head_ = this->late_head_->next ();
+ }
+
+ if (last_in_subqueue)
+ {
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+
+ first_item->prev (0);
+ first_item->next (0);
}
+ // finally, try to dequeue from the head of the beyond late list
+ else if (this->beyond_late_head_)
+ {
+ last_in_subqueue =
+ (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
+
+ first_item = this->beyond_late_head_;
+ this->head_ = this->beyond_late_head_->next ();
+
+ if (0 == this->beyond_late_head_->next ())
+ {
+ this->tail_ = this->beyond_late_head_->prev ();
+ }
+ else
+ {
+ this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
+ this->beyond_late_head_ = this->beyond_late_head_->next ();
+ }
+
+ if (last_in_subqueue)
+ {
+ this->beyond_late_head_ = 0;
+ this->beyond_late_tail_ = 0;
+ }
- // Refresh the order of messages in the queue,
- // putting pending messages ahead of late messages
- return reorder_queue (tv);
+ first_item->prev (0);
+ first_item->next (0);
+ }
+ else
+ {
+ // nothing to dequeue: set the pointer to zero and return an error code
+ first_item = 0;
+ result = -1;
+ }
+
+ return result;
}
- // refresh the order of messages in the queue
- // after refreshing their priorities
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // logical queue. Attempts first to dequeue from the pending
+ // portion of the queue, or if that is empty from the late portion,
+ // or if that is empty from the beyond late portion, or if that is
+ // empty just sets the passed pointer to zero and returns -1.
+
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_stale_messages (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
{
- int result = 0;
+ int result;
+
+ result = refresh_pending_queue (current_time);
+
+ if (result != -1)
+ {
+ result = refresh_late_queue (current_time);
+ }
- // start at the beginning of the list
- ACE_Message_Block *current = head_;
+ return result;
+}
+ // Refresh the queue using the strategy
+ // specific priority status function.
- // maintain a list of dropped messages to
- // be appended to the end of the list after
- // the sweep is complete
- ACE_Message_Block *append_list_head = 0;
- ACE_Message_Block *append_list_tail = 0;
- while (current)
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
+{
+ ACE_Dynamic_Message_Strategy::Priority_Status current_status;
+
+ // refresh priority status boundaries in the queue
+ if (this->pending_head_)
{
- // messages that have overflowed the given time bounds must be removed
- if (message_strategy_.is_beyond_late (*current, tv))
+ current_status = message_strategy_.priority_status (*this->pending_head_, current_time);
+ switch (current_status)
{
- // find the end of the chain of overflowed messages
- ACE_Message_Block *remove_head = current;
- ACE_Message_Block *remove_tail = current;
- while ((remove_tail) && (remove_tail->next ()) &&
- message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
- {
- // extend the chain of messages to be removed
- remove_tail = remove_tail->next ();
- }
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
- // fix up list pointers to bypass the overflowed message chain
+ // make sure the head of the beyond late queue is set
+ // (there may not have been any beyond late messages previously)
+ this->beyond_late_head_ = this->head_;
- if (remove_tail->next ())
- {
- remove_tail->next ()->prev (remove_head->prev ());
- }
- else
- {
- tail_ = remove_head->prev ();
- }
+ // zero out the late queue pointers, and set them only if
+ // there turn out to be late messages in the pending sublist
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
- if (remove_head->prev ())
- {
- remove_head->prev ()->next (remove_tail->next ());
- }
- else
- {
- head_ = remove_tail->next ();
- }
+ // advance through the beyond late messages in the pending queue
+ do
+ {
+ this->pending_head_ = this->pending_head_->next ();
+
+ if (this->pending_head_)
+ {
+ current_status = message_strategy_.priority_status (*this->pending_head_,
+ current_time);
+ }
+ else
+ {
+ break; // do while
+ }
- // move the current pointer past the end of the chain
- current = remove_tail->next ();
-
- // Cut the chain of overflowed messages out of the list
- remove_head->prev (0);
- remove_tail->next (0);
-
- // Call strategy's drop_message method on each overflowed message.
- // Cannot just delete each message even though reference counting
- // at the data bloc level means that the underlying data block will
- // not be deleted if another message block is still pointing to it.
- // If the entire set of message blocks is known in advance, they may
- // be allocated on the stack instead of the heap (to speed performance),
- // and the caller *cannot* surrender ownership of the memory to the
- // list. Putting this policy in the strategy allows the correct memory
- // management scheme to be configured in either case.
- ACE_Message_Block *temp1 = remove_head;
- ACE_Message_Block *temp2 = remove_head->next ();
- ACE_Message_Block *size_temp;
- size_t msg_size;
- while (temp1)
- {
- // Make sure to count *all* the bytes in a composite message!!!
- for (size_temp = temp1, msg_size = 0;
- size_temp != 0;
- size_temp = size_temp->cont ())
+ } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
+
+ if (this->pending_head_)
{
- msg_size += size_temp->size ();
+ // point tail of beyond late sublist to previous item
+ this->beyond_late_tail_ = this->pending_head_->prev ();
+
+ if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // there are no late messages left in the queue
+ break; // switch
+ }
+ else
+ {
+ if (current_status != ACE_Dynamic_Message_Strategy::LATE)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
+ (int) current_status),
+ -1);
+ }
+
+ // intentionally fall through to the next case
+ }
+ }
+ else
+ {
+ // there are no pending or late messages left in the queue
+ this->beyond_late_tail_ = this->tail_;
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
+
+ break; // switch
}
- result = message_strategy_.drop_message (temp1);
- if (result < 0)
+ case ACE_Dynamic_Message_Strategy::LATE:
+
+ // make sure the head of the late queue is set (there may not have been
+ // any late messages previously, or they may have all become beyond late)
+ if (this->late_head_ == 0)
{
- return result;
+ this->late_head_ = this->pending_head_;
}
- if (temp1)
+ // advance through the beyond late messages in the pending queue
+ do
{
- // if the message was not destroyed, zero out its priority and
- // put it on the list to append to the back of the queue
- temp1->msg_priority (0);
- temp1->next (0);
- if (append_list_tail)
+ this->pending_head_ = this->pending_head_->next ();
+
+ if (this->pending_head_)
{
- temp1->prev (append_list_tail);
- append_list_tail->next (temp1);
+ current_status = message_strategy_.priority_status (*this->pending_head_,
+ current_time);
}
else
{
- temp1->prev (0);
- append_list_head = temp1;
+ break; // do while
+ }
+
+ } while (current_status == ACE_Dynamic_Message_Strategy::LATE);
+
+ if (this->pending_head_)
+ {
+ if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
+ (int) current_status),
+ -1);
}
- append_list_tail = temp1;
+
+ // point tail of late sublist to previous item
+ this->late_tail_ = this->pending_head_->prev ();
}
else
{
- // if the message was destroyed, decrease the message
- // count and byte count in the message queue
- this->cur_count_--;
- this->cur_bytes_ -= msg_size;
+ // there are no pending messages left in the queue
+ this->late_tail_ = this->tail_;
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
}
- temp1 = temp2;
- temp2 = temp2 ? temp2->next () : temp2;
- }
- }
- else
- {
- current = current->next ();
- }
- }
+ break; // switch
- // append any saved dropped messages to the end of the queue
- if (append_list_tail)
- {
- if (tail_)
- {
- tail_->next (append_list_head);
- append_list_head->prev (tail_);
- tail_ = append_list_tail;
- }
- else
- {
- head_ = append_list_head;
- tail_ = append_list_tail;
+ case ACE_Dynamic_Message_Strategy::PENDING:
+
+ // do nothing - the pending queue is unchanged
+
+ break; // switch
+
+ default:
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unknown message priority status [%d]"),
+ (int) current_status),
+ -1);
+ break; // switch
}
}
- return result;
+ return 0;
}
- // Remove messages that are later than the priority range can represent.
+ // Refresh the pending queue using the strategy
+ // specific priority status function.
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::reorder_queue (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
{
- // if the queue is not empty, and the first message is late, need to reorder
- if ((head_) && (! message_strategy_.is_pending (*head_, tv)))
+ ACE_Dynamic_Message_Strategy::Priority_Status current_status;
+
+ if (this->late_head_)
{
- // find the end of the chain of newly late messages
- // (since the last time the queue was reordered)
- ACE_Message_Block *reorder_head = head_;
- ACE_Message_Block *reorder_tail = head_;
- while ((reorder_tail) && (reorder_tail->next ()) &&
- reorder_tail->next ()->msg_priority () <= reorder_head->msg_priority ())
+ current_status = message_strategy_.priority_status (*this->late_head_,
+ current_time);
+ switch (current_status)
{
- // extend the chain of messages to be removed
- reorder_tail = reorder_tail->next ();
- }
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
- // if a proper subset of the queue is out of order, reorganize the queue
- if (reorder_tail != tail_)
- {
- // fix up list pointers to bypass the overflowed message chain
- if (reorder_tail->next ())
- {
- reorder_tail->next ()->prev (reorder_head->prev ());
- }
- else
- {
- tail_ = reorder_head->prev ();
- }
- if (reorder_head->prev ())
- {
- reorder_head->prev ()->next (reorder_tail->next ());
- }
- else
- {
- head_ = reorder_tail->next ();
- }
+ // make sure the head of the beyond late queue is set
+ // (there may not have been any beyond late messages previously)
+ this->beyond_late_head_ = this->head_;
+
+ // advance through the beyond late messages in the late queue
+ do
+ {
+ this->late_head_ = this->late_head_->next ();
+
+ if (this->late_head_)
+ {
+ current_status = message_strategy_.priority_status (*this->late_head_,
+ current_time);
+ }
+ else
+ {
+ break; // do while
+ }
+
+ } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
+
+ if (this->late_head_)
+ {
+ // point tail of beyond late sublist to previous item
+ this->beyond_late_tail_ = this->late_head_->prev ();
+
+ if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // there are no late messages left in the queue
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+ else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
+ (int) current_status),
+ -1);
+ }
+ }
+ else
+ {
+ // there are no late messages left in the queue
+ this->beyond_late_tail_ = this->tail_;
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+
+ break; // switch
+
+ case ACE_Dynamic_Message_Strategy::LATE:
+
+ // do nothing - the late queue is unchanged
+
+ break; // switch
+
+ case ACE_Dynamic_Message_Strategy::PENDING:
+
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status "
+ "[%d] (expected LATE or BEYOND_LATE)"),
+ (int) current_status),
+ -1);
+
+ break; // switch
+
+ default:
+
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unknown message priority status [%d]"),
+ (int) current_status),
+ -1);
+
+ break; // switch
}
- }
+ }
return 0;
}
- // Refresh the order of messages in the queue.
+ // Refresh the late queue using the strategy
+ // specific priority status function.
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (
ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
- return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, tv);
+ return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout);
}
// private method to hide public base class method: just calls base class method
@@ -1124,7 +1606,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
ACE_Notification_Strategy *ns,
u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
{
@@ -1133,7 +1614,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
ACE_NEW_RETURN (adms,
ACE_Deadline_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset),
0);
@@ -1142,32 +1622,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
}
// factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_cleanup_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Deadline_Cleanup_Message_Strategy *adcms;
-
- ACE_NEW_RETURN (adcms,
- ACE_Deadline_Cleanup_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adcms, hwm, lwm, ns);
-}
- // factory method for a dynamically prioritized (by time to deadline)
- // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages
-
template <ACE_SYNCH_DECL>
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
@@ -1176,7 +1630,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
ACE_Notification_Strategy *ns,
u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
{
@@ -1185,7 +1638,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
ACE_NEW_RETURN (alms,
ACE_Laxity_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset),
0);
@@ -1196,30 +1648,19 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
// factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_cleanup_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Laxity_Message_Strategy *alcms;
-
- ACE_NEW_RETURN (alcms,
- ACE_Laxity_Cleanup_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
+#if defined (VXWORKS)
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alcms, hwm, lwm, ns);
+ACE_Message_Queue_Vx *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
+{
+ return new ACE_Message_Queue_Vx (max_messages, max_message_length, ns);
}
- // factory method for a dynamically prioritized (by laxity)
- // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages
+ // factory method for a wrapped VxWorks message queue
+
+#endif /* defined (VXWORKS) */
+
+
#endif /* ACE_MESSAGE_QUEUE_T_C */