diff options
author | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-13 21:37:07 +0000 |
---|---|---|
committer | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-07-13 21:37:07 +0000 |
commit | 473ff435580383c6a6d340036880f0f9b1325c13 (patch) | |
tree | 2a884960410063b347d9daf081edea292ee6c760 /ace/Message_Queue_T.cpp | |
parent | ebe027a057115b891dd9896647194923eb69f46f (diff) | |
download | ATCD-473ff435580383c6a6d340036880f0f9b1325c13.tar.gz |
refined dynamic queues based on performance test results
Diffstat (limited to 'ace/Message_Queue_T.cpp')
-rw-r--r-- | ace/Message_Queue_T.cpp | 1033 |
1 files changed, 737 insertions, 296 deletions
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index 0e952d50b2b..3e30afa25f1 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -400,34 +400,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) // We start looking from the highest priority to the lowest // priority. - for (temp = this->head_; + for (temp = this->tail_; temp != 0; - temp = temp->next ()) - if (temp->msg_priority () < new_item->msg_priority ()) - // Break out when we've located an item that has lower - // priority that <new_item>. + temp = temp->prev ()) + if (temp->msg_priority () >= new_item->msg_priority ()) + // Break out when we've located an item that has + // greater or equal priority. break; if (temp == 0) - // Check for simple case of inserting at the tail of the queue, - // where all we need to do is insert <new_item> after the - // current tail. - return this->enqueue_tail_i (new_item); - else if (temp->prev () == 0) - // Check for simple case of inserting at the head of the - // queue, where all we need to do is insert <new_item> before - // the current head. + // Check for simple case of inserting at the head of the queue, + // where all we need to do is insert <new_item> before the + // current head. return this->enqueue_head_i (new_item); + else if (temp->next () == 0) + // Check for simple case of inserting at the tail of the + // queue, where all we need to do is insert <new_item> after + // the current tail. + return this->enqueue_tail_i (new_item); else { - // Insert the new message ahead of the item of - // lesser priority. This ensures that FIFO order is + // Insert the new message behind the message of + // greater or equal priority. This ensures that FIFO order is // maintained when messages of the same priority are // inserted consecutively. - new_item->next (temp); - new_item->prev (temp->prev ()); - temp->prev ()->next (new_item); - temp->prev (new_item); + new_item->prev (temp); + new_item->next (temp->next ()); + temp->next ()->prev (new_item); + temp->next (new_item); } } @@ -453,6 +453,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) { + if (this->head_ ==0) + { + ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1); + } + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); first_item = this->head_; this->head_ = this->head_->next (); @@ -483,7 +488,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -496,7 +501,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i // Wait for at least one item to become available. - if (this->wait_not_empty_cond (ace_mon, tv) == -1) + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) return -1; first_item = this->head_; @@ -505,7 +510,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) @@ -514,10 +519,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ ++this->enqueue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); - if (tv == 0) + if (timeout == 0) result = this->not_full_cond_.acquire (); else - result = this->not_full_cond_.acquire (*tv); + result = this->not_full_cond_.acquire (*timeout); int error = errno; mon.acquire (); errno = error; @@ -529,7 +534,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ while (this->is_full_i ()) { - if (this->not_full_cond_.wait (tv) == -1) + if (this->not_full_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; @@ -549,7 +554,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) @@ -558,11 +563,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX ++this->dequeue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); - if (tv == 0) + if (timeout == 0) result = this->not_empty_cond_.acquire (); else { - result = this->not_empty_cond_.acquire (*tv); + result = this->not_empty_cond_.acquire (*timeout); if (result == -1 && errno == ETIME) errno = EWOULDBLOCK; } @@ -577,7 +582,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX while (this->is_empty_i ()) { - if (this->not_empty_cond_.wait (tv) == -1) + if (this->not_empty_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; @@ -600,7 +605,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -611,7 +616,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_head_i (new_item); @@ -631,7 +636,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -642,7 +647,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_i (new_item); @@ -658,10 +663,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue"); - return this->enqueue_prio (new_item, tv); + return this->enqueue_prio (new_item, timeout); } // Block indefinitely waiting for an item to arrive, @@ -669,7 +674,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -680,7 +685,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_tail_i (new_item); @@ -694,13 +699,13 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, } } -// Remove an item from the front of the queue. If TV == 0 block +// Remove an item from the front of the queue. If timeout == 0 block // indefinitely (or until an alert occurs). Otherwise, block for upto -// the amount of time specified by TV. +// the amount of time specified by timeout. template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -711,8 +716,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, return -1; } - if (this->wait_not_empty_cond (ace_mon, tv) == -1) + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + { return -1; + } return this->dequeue_head_i (first_item); } @@ -742,6 +749,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( size_t lwm, ACE_Notification_Strategy *ns) : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns) + , pending_head_ (0) + , pending_tail_ (0) + , late_head_ (0) + , late_tail_ (0) + , beyond_late_head_ (0) + , beyond_late_tail_ (0) , message_strategy_ (message_strategy) { // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the @@ -751,55 +764,157 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) { - delete &message_strategy_; + delete &(this->message_strategy_); } // dtor: free message strategy and let base class dtor do the rest template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages ( + ACE_Message_Block *&list_head, + ACE_Message_Block *&list_tail, + u_int status_flags) { - ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + int result = 0; - int result; + // start with an empty list + list_head = 0; + list_tail = 0; - // get the current time - ACE_Time_Value current_time = ACE_OS::gettimeofday (); - // refresh dynamic priority of the new message - result = message_strategy_.update_priority (*new_item, current_time); + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh priority status boundaries in the queue + result = this->refresh_queue (current_time); if (result < 0) { return result; } - // refresh dynamic priorities of messages in the queue - result = this->refresh_priorities (current_time); - if (result < 0) + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) && + (this->pending_head_) && (this->pending_tail_)) { - return result; + // patch up pointers for the new tail of the queue + if (this->pending_head_->prev ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_->prev ()->next (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // point to the head and tail of the list + list_head = this->pending_head_; + list_tail = this->pending_tail_; + + // cut the pending messages out of the queue entirely + this->pending_head_->prev (0); + this->pending_head_ = 0; + this->pending_tail_ = 0; } - // reorganize the queue according to the new priorities - result = this->refresh_queue (current_time); - if (result < 0) + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) && + (this->late_head_) && (this->late_tail_)) { - return result; + // patch up pointers for the (possibly) new head and tail of the queue + if (this->late_tail_->next ()) + { + this->late_tail_->next ()->prev (this->late_head_->prev ()); + } + else + { + this->tail_ = this->late_head_->prev (); + } + if (this->late_head_->prev ()) + { + this->late_head_->prev ()->next (this->late_tail_->next ()); + } + else + { + this->head_ = this->late_tail_->next (); + } + + // put late messages behind pending messages (if any) being returned + this->late_head_->prev (list_tail); + if (list_tail) + { + list_tail->next (this->late_head_); + } + else + { + list_head = this->late_head_; + } + list_tail = this->late_tail_; + + this->late_tail_->next (0); + this->late_head_ = 0; + this->late_tail_ = 0; } - // invoke the base class method - result = ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (new_item); + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) && + (this->beyond_late_head_) && (this->beyond_late_tail_)) + { + // patch up pointers for the new tail of the queue + if (this->beyond_late_tail_->next ()) + { + this->head_ = this->beyond_late_tail_->next (); + this->beyond_late_tail_->next ()->prev (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // put beyond late messages at the end of the list being returned + if (list_tail) + { + this->beyond_late_head_->prev (list_tail); + list_tail->next (this->beyond_late_head_); + } + else + { + list_head = this->beyond_late_head_; + } + list_tail = this->beyond_late_tail_; + + this->beyond_late_tail_->next (0); + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } + + // decrement message and size counts for removed messages + ACE_Message_Block *temp1, *temp2; + for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ()) + { + this->cur_count_--; + + for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ()) + { + this->cur_bytes_ -= temp2->size (); + } + } return result; } - // Enqueue an <ACE_Message_Block *> in accordance with its priority. - // priority may be *dynamic* or *static* or a combination or *both* - // It calls the priority evaluation function passed into the Dynamic - // Message Queue constructor to update the priorities of all enqueued - // messages. + // Detach all messages with status given in the passed flags from + // the queue and return them by setting passed head and tail pointers + // to the linked list they comprise. This method is intended primarily + // as a means of periodically harvesting messages that have missed + // their deadlines, but is available in its most general form. All + // messages are returned in priority order, from head to tail, as of + // the time this method was called. + + template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); @@ -816,269 +931,636 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); - // refresh dynamic priorities of messages in the queue - result = this->refresh_priorities (current_time); + // refresh priority status boundaries in the queue + result = this->refresh_queue (current_time); if (result < 0) { return result; } - // reorganize the queue according to the new priorities, - // possibly dropping messages which are later than can - // be represented by the range of priority values + // *now* it's appropriate to wait for an enqueued item + result = this->wait_not_empty_cond (ace_mon, timeout); + if (result == -1) + { + return result; + } + + // call the internal dequeue method, which selects an + // item from the highest priority status portion of + // the queue that has messages enqueued. + result = dequeue_head_i (first_item); + + return result; +} + // Dequeue and return the <ACE_Message_Block *> + // at the (logical) head of the queue. + +template <ACE_SYNCH_DECL> void +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); + this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("pending_head_ = %u\n") + ASYS_TEXT ("pending_tail_ = %u\n") + ASYS_TEXT ("late_head_ = %u\n") + ASYS_TEXT ("late_tail_ = %u\n") + ASYS_TEXT ("beyond_late_head_ = %u\n") + ASYS_TEXT ("beyond_late_tail_ = %u\n"), + this->pending_head_, + this->pending_tail_, + this->late_head_, + this->late_tail_, + this->beyond_late_head_, + this->beyond_late_tail_)); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message_strategy_ : \n"))); + message_strategy_.dump (); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + // dump the state of the queue + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + int result = 0; + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) { return result; } - // *now* it's appropriate to wait for an enqueued item - result = this->wait_not_empty_cond (ace_mon, tv); - if (result == -1) + // where we enqueue depends on the message's priority status + switch (message_strategy_.priority_status (*new_item, current_time)) { - return result; + case ACE_Dynamic_Message_Strategy::PENDING: + if (this->pending_tail_ == 0) + { + // Check for simple case of an empty pending queue, where all we need to + // do is insert <new_item> into the tail of the queue. + pending_head_ = new_item; + pending_tail_ = pending_head_; + result = this->enqueue_tail_i (new_item); + } + else + { + // enqueue the new message in priority order in the pending sublist + result = sublist_enqueue_i (new_item, + current_time, + this->pending_head_, + this->pending_tail_, + ACE_Dynamic_Message_Strategy::PENDING); + } + + break; + + case ACE_Dynamic_Message_Strategy::LATE: + if (this->late_tail_ == 0) + { + late_head_ = new_item; + late_tail_ = late_head_; + + if (this->pending_head_ == 0) + { + // Check for simple case of an empty pending queue, where all + // we need to do is insert <new_item> into the tail of the queue. + result = this->enqueue_tail_i (new_item); + } + else if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + result = this->enqueue_head_i (new_item); + } + else + { + // otherwise, we can just splice the new message in between + // the pending and beyond late portions of the queue + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->pending_head_->prev (new_item); + new_item->next (this->pending_head_); + } + } + else + { + // enqueue the new message in priority order in the late sublist + result = sublist_enqueue_i (new_item, + current_time, + this->late_head_, + this->late_tail_, + ACE_Dynamic_Message_Strategy::LATE); + } + break; + + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: + if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + beyond_late_head_ = new_item; + beyond_late_tail_ = beyond_late_head_; + result = this->enqueue_head_i (new_item); + } + else + { + // all beyond late messages have the same (zero) priority, so + // just put the new one at the end of the beyond late messages + if (this->beyond_late_tail_->next ()) + { + this->beyond_late_tail_->next ()->prev (new_item); + } + else + { + this->tail_ = new_item; + } + + new_item->next (this->beyond_late_tail_->next ()); + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->beyond_late_tail_ = new_item; + } + + break; + + // should never get here, but just in case... + default: + result = -1; + break; } - // invoke the internal virtual method - return this->dequeue_head_i (first_item); + return result; } - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all enqueued + // messages. -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item, + const ACE_Time_Value ¤t_time, + ACE_Message_Block *&sublist_head, + ACE_Message_Block *&sublist_tail, + ACE_Dynamic_Message_Strategy::Priority_Status status) { int result = 0; + ACE_Message_Block *current_item = 0; - // apply the priority update function to all enqueued - // messages, starting at the head of the queue - ACE_Message_Block *temp = ACE_Message_Queue<ACE_SYNCH_USE>::head_; - while (temp) + // find message after which to enqueue new item, + // based on message priority and priority status + for (current_item = sublist_tail; + current_item; + current_item = current_item->prev ()) { - result = message_strategy_.update_priority (*temp, tv); - if (result < 0) + if (message_strategy_.priority_status (*current_item, current_time) == status) + { + if (current_item->msg_priority () >= new_item->msg_priority ()) + { + break; + } + } + else { + sublist_head = new_item; break; } - - temp = temp->next (); + } + + if (current_item == 0) + { + // if the new message has highest priority of any, + // put it at the head of the list (and sublist) + result = enqueue_head_i (new_item); + sublist_head = new_item; + } + else + { + // insert the new item into the list + new_item->next (current_item->next ()); + new_item->prev (current_item); + if (current_item->next ()) + { + current_item->next ()->prev (new_item); + } + else + { + this->tail_ = new_item; + } + + current_item->next (new_item); + + // if the new item has lowest priority of any in the sublist, + // move the tail pointer of the sublist back to the new item + if (current_item == sublist_tail) + { + sublist_tail = new_item; + } } return result; } - // refresh the priorities in the queue according - // to a specific priority assignment function + // enqueue a message in priority order within a given priority status sublist + template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) { - // Remove messages that are later than the priority range can represent - int result = remove_stale_messages (tv); - if (result < 0) + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + + int result = 0; + int last_in_subqueue = 0; + + // first, try to dequeue from the head of the pending list + if (this->pending_head_) { - return result; + first_item = this->pending_head_; + + if (0 == this->pending_head_->prev ()) + { + this->head_ = this->pending_head_->next (); + } + else + { + this->pending_head_->prev ()->next (this->pending_head_->next ()); + } + + if (0 == this->pending_head_->next ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + else + { + this->pending_head_->next ()->prev (this->pending_head_->prev ()); + this->pending_head_ = this->pending_head_->next (); + } + + first_item->prev (0); + first_item->next (0); + } + // second, try to dequeue from the head of the late list + else if (this->late_head_) + { + last_in_subqueue = + (this->late_head_ == this->late_tail_) ? 1 : 0; + + first_item = this->late_head_; + + if (0 == this->late_head_->prev ()) + { + this->head_ = this->late_head_->next (); + } + else + { + this->late_head_->prev ()->next (this->late_head_->next ()); + } + + if (0 == this->late_head_->next ()) + { + this->tail_ = this->late_head_->prev (); + } + else + { + this->late_head_->next ()->prev (this->late_head_->prev ()); + this->late_head_ = this->late_head_->next (); + } + + if (last_in_subqueue) + { + this->late_head_ = 0; + this->late_tail_ = 0; + } + + first_item->prev (0); + first_item->next (0); } + // finally, try to dequeue from the head of the beyond late list + else if (this->beyond_late_head_) + { + last_in_subqueue = + (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; + + first_item = this->beyond_late_head_; + this->head_ = this->beyond_late_head_->next (); + + if (0 == this->beyond_late_head_->next ()) + { + this->tail_ = this->beyond_late_head_->prev (); + } + else + { + this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); + this->beyond_late_head_ = this->beyond_late_head_->next (); + } + + if (last_in_subqueue) + { + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } - // Refresh the order of messages in the queue, - // putting pending messages ahead of late messages - return reorder_queue (tv); + first_item->prev (0); + first_item->next (0); + } + else + { + // nothing to dequeue: set the pointer to zero and return an error code + first_item = 0; + result = -1; + } + + return result; } - // refresh the order of messages in the queue - // after refreshing their priorities + // Dequeue and return the <ACE_Message_Block *> at the head of the + // logical queue. Attempts first to dequeue from the pending + // portion of the queue, or if that is empty from the late portion, + // or if that is empty from the beyond late portion, or if that is + // empty just sets the passed pointer to zero and returns -1. + template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_stale_messages (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time) { - int result = 0; + int result; + + result = refresh_pending_queue (current_time); + + if (result != -1) + { + result = refresh_late_queue (current_time); + } - // start at the beginning of the list - ACE_Message_Block *current = head_; + return result; +} + // Refresh the queue using the strategy + // specific priority status function. - // maintain a list of dropped messages to - // be appended to the end of the list after - // the sweep is complete - ACE_Message_Block *append_list_head = 0; - ACE_Message_Block *append_list_tail = 0; - while (current) +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time) +{ + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + // refresh priority status boundaries in the queue + if (this->pending_head_) { - // messages that have overflowed the given time bounds must be removed - if (message_strategy_.is_beyond_late (*current, tv)) + current_status = message_strategy_.priority_status (*this->pending_head_, current_time); + switch (current_status) { - // find the end of the chain of overflowed messages - ACE_Message_Block *remove_head = current; - ACE_Message_Block *remove_tail = current; - while ((remove_tail) && (remove_tail->next ()) && - message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) - { - // extend the chain of messages to be removed - remove_tail = remove_tail->next (); - } + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: - // fix up list pointers to bypass the overflowed message chain + // make sure the head of the beyond late queue is set + // (there may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; - if (remove_tail->next ()) - { - remove_tail->next ()->prev (remove_head->prev ()); - } - else - { - tail_ = remove_head->prev (); - } + // zero out the late queue pointers, and set them only if + // there turn out to be late messages in the pending sublist + this->late_head_ = 0; + this->late_tail_ = 0; - if (remove_head->prev ()) - { - remove_head->prev ()->next (remove_tail->next ()); - } - else - { - head_ = remove_tail->next (); - } + // advance through the beyond late messages in the pending queue + do + { + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) + { + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); + } + else + { + break; // do while + } - // move the current pointer past the end of the chain - current = remove_tail->next (); - - // Cut the chain of overflowed messages out of the list - remove_head->prev (0); - remove_tail->next (0); - - // Call strategy's drop_message method on each overflowed message. - // Cannot just delete each message even though reference counting - // at the data bloc level means that the underlying data block will - // not be deleted if another message block is still pointing to it. - // If the entire set of message blocks is known in advance, they may - // be allocated on the stack instead of the heap (to speed performance), - // and the caller *cannot* surrender ownership of the memory to the - // list. Putting this policy in the strategy allows the correct memory - // management scheme to be configured in either case. - ACE_Message_Block *temp1 = remove_head; - ACE_Message_Block *temp2 = remove_head->next (); - ACE_Message_Block *size_temp; - size_t msg_size; - while (temp1) - { - // Make sure to count *all* the bytes in a composite message!!! - for (size_temp = temp1, msg_size = 0; - size_temp != 0; - size_temp = size_temp->cont ()) + } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->pending_head_) { - msg_size += size_temp->size (); + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->pending_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + { + // there are no late messages left in the queue + break; // switch + } + else + { + if (current_status != ACE_Dynamic_Message_Strategy::LATE) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + + // intentionally fall through to the next case + } + } + else + { + // there are no pending or late messages left in the queue + this->beyond_late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; + + break; // switch } - result = message_strategy_.drop_message (temp1); - if (result < 0) + case ACE_Dynamic_Message_Strategy::LATE: + + // make sure the head of the late queue is set (there may not have been + // any late messages previously, or they may have all become beyond late) + if (this->late_head_ == 0) { - return result; + this->late_head_ = this->pending_head_; } - if (temp1) + // advance through the beyond late messages in the pending queue + do { - // if the message was not destroyed, zero out its priority and - // put it on the list to append to the back of the queue - temp1->msg_priority (0); - temp1->next (0); - if (append_list_tail) + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) { - temp1->prev (append_list_tail); - append_list_tail->next (temp1); + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); } else { - temp1->prev (0); - append_list_head = temp1; + break; // do while + } + + } while (current_status == ACE_Dynamic_Message_Strategy::LATE); + + if (this->pending_head_) + { + if (current_status != ACE_Dynamic_Message_Strategy::PENDING) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), + (int) current_status), + -1); } - append_list_tail = temp1; + + // point tail of late sublist to previous item + this->late_tail_ = this->pending_head_->prev (); } else { - // if the message was destroyed, decrease the message - // count and byte count in the message queue - this->cur_count_--; - this->cur_bytes_ -= msg_size; + // there are no pending messages left in the queue + this->late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; } - temp1 = temp2; - temp2 = temp2 ? temp2->next () : temp2; - } - } - else - { - current = current->next (); - } - } + break; // switch - // append any saved dropped messages to the end of the queue - if (append_list_tail) - { - if (tail_) - { - tail_->next (append_list_head); - append_list_head->prev (tail_); - tail_ = append_list_tail; - } - else - { - head_ = append_list_head; - tail_ = append_list_tail; + case ACE_Dynamic_Message_Strategy::PENDING: + + // do nothing - the pending queue is unchanged + + break; // switch + + default: + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + break; // switch } } - return result; + return 0; } - // Remove messages that are later than the priority range can represent. + // Refresh the pending queue using the strategy + // specific priority status function. template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::reorder_queue (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time) { - // if the queue is not empty, and the first message is late, need to reorder - if ((head_) && (! message_strategy_.is_pending (*head_, tv))) + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + if (this->late_head_) { - // find the end of the chain of newly late messages - // (since the last time the queue was reordered) - ACE_Message_Block *reorder_head = head_; - ACE_Message_Block *reorder_tail = head_; - while ((reorder_tail) && (reorder_tail->next ()) && - reorder_tail->next ()->msg_priority () <= reorder_head->msg_priority ()) + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + switch (current_status) { - // extend the chain of messages to be removed - reorder_tail = reorder_tail->next (); - } + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: - // if a proper subset of the queue is out of order, reorganize the queue - if (reorder_tail != tail_) - { - // fix up list pointers to bypass the overflowed message chain - if (reorder_tail->next ()) - { - reorder_tail->next ()->prev (reorder_head->prev ()); - } - else - { - tail_ = reorder_head->prev (); - } - if (reorder_head->prev ()) - { - reorder_head->prev ()->next (reorder_tail->next ()); - } - else - { - head_ = reorder_tail->next (); - } + // make sure the head of the beyond late queue is set + // (there may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; + + // advance through the beyond late messages in the late queue + do + { + this->late_head_ = this->late_head_->next (); + + if (this->late_head_) + { + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + } + else + { + break; // do while + } + + } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->late_head_) + { + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->late_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + { + // there are no late messages left in the queue + this->late_head_ = 0; + this->late_tail_ = 0; + } + else if (current_status != ACE_Dynamic_Message_Strategy::LATE) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + } + else + { + // there are no late messages left in the queue + this->beyond_late_tail_ = this->tail_; + this->late_head_ = 0; + this->late_tail_ = 0; + } + + break; // switch + + case ACE_Dynamic_Message_Strategy::LATE: + + // do nothing - the late queue is unchanged + + break; // switch + + case ACE_Dynamic_Message_Strategy::PENDING: + + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status " + "[%d] (expected LATE or BEYOND_LATE)"), + (int) current_status), + -1); + + break; // switch + + default: + + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + + break; // switch } - } + } return 0; } - // Refresh the order of messages in the queue. + // Refresh the late queue using the strategy + // specific priority status function. template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head ( ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { - return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, tv); + return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout); } // private method to hide public base class method: just calls base class method @@ -1124,7 +1606,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) { @@ -1133,7 +1614,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t ACE_NEW_RETURN (adms, ACE_Deadline_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset), 0); @@ -1142,32 +1622,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t } // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_cleanup_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Deadline_Cleanup_Message_Strategy *adcms; - - ACE_NEW_RETURN (adcms, - ACE_Deadline_Cleanup_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); - - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adcms, hwm, lwm, ns); -} - // factory method for a dynamically prioritized (by time to deadline) - // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages - template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * @@ -1176,7 +1630,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) { @@ -1185,7 +1638,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw ACE_NEW_RETURN (alms, ACE_Laxity_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset), 0); @@ -1196,30 +1648,19 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_cleanup_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Laxity_Message_Strategy *alcms; - - ACE_NEW_RETURN (alcms, - ACE_Laxity_Cleanup_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); +#if defined (VXWORKS) - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alcms, hwm, lwm, ns); +ACE_Message_Queue_Vx * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy *ns) +{ + return new ACE_Message_Queue_Vx (max_messages, max_message_length, ns); } - // factory method for a dynamically prioritized (by laxity) - // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages + // factory method for a wrapped VxWorks message queue + +#endif /* defined (VXWORKS) */ + + #endif /* ACE_MESSAGE_QUEUE_T_C */ |