diff options
Diffstat (limited to 'ace/Message_Queue_T.cpp')
-rw-r--r-- | ace/Message_Queue_T.cpp | 587 |
1 files changed, 332 insertions, 255 deletions
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index f1255f597a6..fa4bd8951d2 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -24,6 +24,11 @@ ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) + +////////////////////////////////////// +// class ACE_Message_Queue_Iterator // +////////////////////////////////////// + template <ACE_SYNCH_DECL> ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) : queue_ (q), @@ -34,7 +39,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Messa template <ACE_SYNCH_DECL> int ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); if (this->curr_ != 0) { @@ -48,7 +53,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) template <ACE_SYNCH_DECL> int ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); return this->curr_ == 0; } @@ -56,7 +61,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const template <ACE_SYNCH_DECL> int ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void) { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); if (this->curr_) this->curr_ = this->curr_->next (); @@ -70,6 +75,11 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) + +////////////////////////////////////////////// +// class ACE_Message_Queue_Reverse_Iterator // +////////////////////////////////////////////// + template <ACE_SYNCH_DECL> ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) : queue_ (q), @@ -80,7 +90,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Ite template <ACE_SYNCH_DECL> int ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); if (this->curr_ != 0) { @@ -94,7 +104,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&ent template <ACE_SYNCH_DECL> int ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); return this->curr_ == 0; } @@ -102,7 +112,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const template <ACE_SYNCH_DECL> int ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void) { - ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); if (this->curr_) this->curr_ = this->curr_->prev (); @@ -114,6 +124,11 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const { } + +///////////////////////////// +// class ACE_Message_Queue // +///////////////////////////// + template <ACE_SYNCH_DECL> void ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const { @@ -236,9 +251,15 @@ ACE_Message_Queue<ACE_SYNCH_USE>::close (void) { this->cur_count_--; - this->cur_bytes_ -= this->head_->total_size (); + ACE_Message_Block *temp; + + // Decrement all the counts. + for (temp = this->head_; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); - ACE_Message_Block *temp = this->head_; + temp = this->head_; this->head_ = this->head_->next (); // Make sure to use <release> rather than <delete> since this is @@ -310,8 +331,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item) this->tail_ = new_item; } - // Make sure to count all the bytes in a composite message!!! - this->cur_bytes_ += new_item->total_size (); + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); this->cur_count_++; @@ -341,8 +366,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) this->head_ = new_item; - // Make sure to count all the bytes in a composite message!!! - this->cur_bytes_ += new_item->total_size (); + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); this->cur_count_++; @@ -406,8 +435,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) } } - // Make sure to count all the bytes in a composite message!!! - this->cur_bytes_ += new_item->total_size (); + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); this->cur_count_++; @@ -425,9 +458,10 @@ template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) { if (this->head_ ==0) - ACE_ERROR_RETURN ((LM_ERROR, - ASYS_TEXT ("Attempting to dequeue from empty queue")), - -1); + { + ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1); + } + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); first_item = this->head_; this->head_ = this->head_->next (); @@ -439,8 +473,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item // NULL... this->head_->prev (0); - // Subtract off all of the bytes associated with this message. - this->cur_bytes_ -= first_item->total_size (); + // Make sure to subtract off all of the bytes associated with this + // message. + for (ACE_Message_Block *temp = first_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); this->cur_count_--; @@ -736,9 +774,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) // dtor: free message strategy and let base class dtor do the rest template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head, - ACE_Message_Block *&list_tail, - u_int status_flags) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages ( + ACE_Message_Block *&list_head, + ACE_Message_Block *&list_tail, + u_int status_flags) { int result = 0; @@ -753,121 +792,130 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&l // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) + { return result; + } - if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) - && this->pending_head_ - && this->pending_tail_) + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) && + (this->pending_head_) && (this->pending_tail_)) + { + // patch up pointers for the new tail of the queue + if (this->pending_head_->prev ()) { - // patch up pointers for the new tail of the queue - if (this->pending_head_->prev ()) - { - this->tail_ = this->pending_head_->prev (); - this->pending_head_->prev ()->next (0); - } - else - { - // the list has become empty - this->head_ = 0; - this->tail_ = 0; - } - - // point to the head and tail of the list - list_head = this->pending_head_; - list_tail = this->pending_tail_; - - // cut the pending messages out of the queue entirely - this->pending_head_->prev (0); - this->pending_head_ = 0; - this->pending_tail_ = 0; + this->tail_ = this->pending_head_->prev (); + this->pending_head_->prev ()->next (0); } - - if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) - && this->late_head_ - && this->late_tail_) + else { - // Patch up pointers for the (possibly) new head and tail of the - // queue. - if (this->late_tail_->next ()) - this->late_tail_->next ()->prev (this->late_head_->prev ()); - else - this->tail_ = this->late_head_->prev (); - - if (this->late_head_->prev ()) - this->late_head_->prev ()->next (this->late_tail_->next ()); - else - this->head_ = this->late_tail_->next (); - - // put late messages behind pending messages (if any) being returned - this->late_head_->prev (list_tail); - if (list_tail) - list_tail->next (this->late_head_); - else - list_head = this->late_head_; - - list_tail = this->late_tail_; - - this->late_tail_->next (0); - this->late_head_ = 0; - this->late_tail_ = 0; + // the list has become empty + this->head_ = 0; + this->tail_ = 0; } - if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) - && this->beyond_late_head_ - && this->beyond_late_tail_) + // point to the head and tail of the list + list_head = this->pending_head_; + list_tail = this->pending_tail_; + + // cut the pending messages out of the queue entirely + this->pending_head_->prev (0); + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) && + (this->late_head_) && (this->late_tail_)) + { + // patch up pointers for the (possibly) new head and tail of the queue + if (this->late_tail_->next ()) { - // Patch up pointers for the new tail of the queue - if (this->beyond_late_tail_->next ()) - { - this->head_ = this->beyond_late_tail_->next (); - this->beyond_late_tail_->next ()->prev (0); - } - else - { - // the list has become empty - this->head_ = 0; - this->tail_ = 0; - } - - // Put beyond late messages at the end of the list being - // returned. - if (list_tail) - { - this->beyond_late_head_->prev (list_tail); - list_tail->next (this->beyond_late_head_); - } - else - list_head = this->beyond_late_head_; + this->late_tail_->next ()->prev (this->late_head_->prev ()); + } + else + { + this->tail_ = this->late_head_->prev (); + } + if (this->late_head_->prev ()) + { + this->late_head_->prev ()->next (this->late_tail_->next ()); + } + else + { + this->head_ = this->late_tail_->next (); + } - list_tail = this->beyond_late_tail_; + // put late messages behind pending messages (if any) being returned + this->late_head_->prev (list_tail); + if (list_tail) + { + list_tail->next (this->late_head_); + } + else + { + list_head = this->late_head_; + } + list_tail = this->late_tail_; + + this->late_tail_->next (0); + this->late_head_ = 0; + this->late_tail_ = 0; + } + + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) && + (this->beyond_late_head_) && (this->beyond_late_tail_)) + { + // patch up pointers for the new tail of the queue + if (this->beyond_late_tail_->next ()) + { + this->head_ = this->beyond_late_tail_->next (); + this->beyond_late_tail_->next ()->prev (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } - this->beyond_late_tail_->next (0); - this->beyond_late_head_ = 0; - this->beyond_late_tail_ = 0; + // put beyond late messages at the end of the list being returned + if (list_tail) + { + this->beyond_late_head_->prev (list_tail); + list_tail->next (this->beyond_late_head_); + } + else + { + list_head = this->beyond_late_head_; } + list_tail = this->beyond_late_tail_; - // Decrement message and size counts for removed messages. - ACE_Message_Block *temp1; + this->beyond_late_tail_->next (0); + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } - for (temp1 = list_head; - temp1 != 0; - temp1 = temp1->next ()) - { - this->cur_count_--; + // decrement message and size counts for removed messages + ACE_Message_Block *temp1, *temp2; + for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ()) + { + this->cur_count_--; - this->cur_bytes_ -= temp1->total_size (); + for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ()) + { + this->cur_bytes_ -= temp2->size (); } + } return result; } + // Detach all messages with status given in the passed flags from + // the queue and return them by setting passed head and tail pointers + // to the linked list they comprise. This method is intended primarily + // as a means of periodically harvesting messages that have missed + // their deadlines, but is available in its most general form. All + // messages are returned in priority order, from head to tail, as of + // the time this method was called. + -// Detach all messages with status given in the passed flags from the -// queue and return them by setting passed head and tail pointers to -// the linked list they comprise. This method is intended primarily -// as a means of periodically harvesting messages that have missed -// their deadlines, but is available in its most general form. All -// messages are returned in priority order, from head to tail, as of -// the time this method was called. template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, @@ -891,23 +939,26 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) + { return result; + } // *now* it's appropriate to wait for an enqueued item result = this->wait_not_empty_cond (ace_mon, timeout); if (result == -1) + { return result; + } - // call the internal dequeue method, which selects an item from the - // highest priority status portion of the queue that has messages - // enqueued. - result = this->dequeue_head_i (first_item); + // call the internal dequeue method, which selects an + // item from the highest priority status portion of + // the queue that has messages enqueued. + result = dequeue_head_i (first_item); return result; } - -// Dequeue and return the <ACE_Message_Block *> at the (logical) head -// of the queue. + // Dequeue and return the <ACE_Message_Block *> + // at the (logical) head of the queue. template <ACE_SYNCH_DECL> void ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const @@ -945,121 +996,130 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); if (new_item == 0) + { return -1; + } int result = 0; - // Get the current time. + // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); - // Refresh priority status boundaries in the queue. - + // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) + { return result; + } - // Where we enqueue depends on the message's priority status. - switch (message_strategy_.priority_status (*new_item, - current_time)) - { + // where we enqueue depends on the message's priority status + switch (message_strategy_.priority_status (*new_item, current_time)) + { case ACE_Dynamic_Message_Strategy::PENDING: if (this->pending_tail_ == 0) - { - // Check for simple case of an empty pending queue, where - // all we need to do is insert <new_item> into the tail of - // the queue. - pending_head_ = new_item; - pending_tail_ = pending_head_; - return this->enqueue_tail_i (new_item); - } + { + // Check for simple case of an empty pending queue, where all we need to + // do is insert <new_item> into the tail of the queue. + pending_head_ = new_item; + pending_tail_ = pending_head_; + return this->enqueue_tail_i (new_item); + } else - { - // Enqueue the new message in priority order in the pending - // sublist - result = sublist_enqueue_i (new_item, - current_time, - this->pending_head_, - this->pending_tail_, - ACE_Dynamic_Message_Strategy::PENDING); - } + { + // enqueue the new message in priority order in the pending sublist + result = sublist_enqueue_i (new_item, + current_time, + this->pending_head_, + this->pending_tail_, + ACE_Dynamic_Message_Strategy::PENDING); + } + break; case ACE_Dynamic_Message_Strategy::LATE: if (this->late_tail_ == 0) + { + late_head_ = new_item; + late_tail_ = late_head_; + + if (this->pending_head_ == 0) { - late_head_ = new_item; - late_tail_ = late_head_; - - if (this->pending_head_ == 0) - // Check for simple case of an empty pending queue, - // where all we need to do is insert <new_item> into the - // tail of the queue. - return this->enqueue_tail_i (new_item); - else if (this->beyond_late_tail_ == 0) - // Check for simple case of an empty beyond late queue, where all - // we need to do is insert <new_item> into the head of the queue. - return this->enqueue_head_i (new_item); - else - { - // Otherwise, we can just splice the new message in - // between the pending and beyond late portions of the - // queue. - this->beyond_late_tail_->next (new_item); - new_item->prev (this->beyond_late_tail_); - this->pending_head_->prev (new_item); - new_item->next (this->pending_head_); - } + // Check for simple case of an empty pending queue, where all + // we need to do is insert <new_item> into the tail of the queue. + return this->enqueue_tail_i (new_item); } - else + else if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + return this->enqueue_head_i (new_item); + } + else { - // Enqueue the new message in priority order in the late - // sublist - result = sublist_enqueue_i (new_item, - current_time, - this->late_head_, - this->late_tail_, - ACE_Dynamic_Message_Strategy::LATE); + // otherwise, we can just splice the new message in between + // the pending and beyond late portions of the queue + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->pending_head_->prev (new_item); + new_item->next (this->pending_head_); } + } + else + { + // enqueue the new message in priority order in the late sublist + result = sublist_enqueue_i (new_item, + current_time, + this->late_head_, + this->late_tail_, + ACE_Dynamic_Message_Strategy::LATE); + } break; case ACE_Dynamic_Message_Strategy::BEYOND_LATE: if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + beyond_late_head_ = new_item; + beyond_late_tail_ = beyond_late_head_; + return this->enqueue_head_i (new_item); + } + else + { + // all beyond late messages have the same (zero) priority, so + // just put the new one at the end of the beyond late messages + if (this->beyond_late_tail_->next ()) { - // Check for simple case of an empty beyond late queue, - // where all we need to do is insert <new_item> into the - // head of the queue. - beyond_late_head_ = new_item; - beyond_late_tail_ = beyond_late_head_; - return this->enqueue_head_i (new_item); + this->beyond_late_tail_->next ()->prev (new_item); } - else + else { - // all beyond late messages have the same (zero) priority, - // so just put the new one at the end of the beyond late - // messages - if (this->beyond_late_tail_->next ()) - this->beyond_late_tail_->next ()->prev (new_item); - else - this->tail_ = new_item; - - new_item->next (this->beyond_late_tail_->next ()); - this->beyond_late_tail_->next (new_item); - new_item->prev (this->beyond_late_tail_); - this->beyond_late_tail_ = new_item; + this->tail_ = new_item; } + new_item->next (this->beyond_late_tail_->next ()); + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->beyond_late_tail_ = new_item; + } + break; - // should never get here, but just in case... + // should never get here, but just in case... default: result = -1; break; - } + } if (result < 0) + { return result; + } - this->cur_bytes_ += new_item->total_size (); + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); this->cur_count_++; @@ -1068,12 +1128,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item else return this->cur_count_; } + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all enqueued + // messages. -// Enqueue an <ACE_Message_Block *> in accordance with its priority. -// priority may be *dynamic* or *static* or a combination or *both* It -// calls the priority evaluation function passed into the Dynamic -// Message Queue constructor to update the priorities of all enqueued -// messages. template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item, @@ -1085,64 +1145,72 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block * int result = 0; ACE_Message_Block *current_item = 0; - // Find message after which to enqueue new item, based on message - // priority and priority status. + // find message after which to enqueue new item, + // based on message priority and priority status for (current_item = sublist_tail; current_item; current_item = current_item->prev ()) + { + if (message_strategy_.priority_status (*current_item, current_time) == status) { - if (message_strategy_.priority_status (*current_item, current_time) == status) - { - if (current_item->msg_priority () >= new_item->msg_priority ()) - break; - } - else - { - sublist_head = new_item; - break; - } + if (current_item->msg_priority () >= new_item->msg_priority ()) + { + break; + } } + else + { + sublist_head = new_item; + break; + } + } if (current_item == 0) + { + // if the new message has highest priority of any, + // put it at the head of the list (and sublist) + new_item->prev (0); + new_item->next (this->head_); + if (this->head_ != 0) { - // If the new message has highest priority of any, put it at the - // head of the list (and sublist). - new_item->prev (0); - new_item->next (this->head_); - if (this->head_ != 0) - this->head_->prev (new_item); - else - { - this->tail_ = new_item; - sublist_tail = new_item; - } - this->head_ = new_item; - sublist_head = new_item; + this->head_->prev (new_item); + } + else + { + this->tail_ = new_item; + sublist_tail = new_item; } + this->head_ = new_item; + sublist_head = new_item; + } else + { + // insert the new item into the list + new_item->next (current_item->next ()); + new_item->prev (current_item); + if (current_item->next ()) { - // insert the new item into the list - new_item->next (current_item->next ()); - new_item->prev (current_item); - - if (current_item->next ()) - current_item->next ()->prev (new_item); - else - this->tail_ = new_item; + current_item->next ()->prev (new_item); + } + else + { + this->tail_ = new_item; + } - current_item->next (new_item); + current_item->next (new_item); - // If the new item has lowest priority of any in the sublist, - // move the tail pointer of the sublist back to the new item - if (current_item == sublist_tail) - sublist_tail = new_item; + // if the new item has lowest priority of any in the sublist, + // move the tail pointer of the sublist back to the new item + if (current_item == sublist_tail) + { + sublist_tail = new_item; } + } return result; } + // enqueue a message in priority order within a given priority status sublist -// Enqueue a message in priority order within a given priority status -// sublist. template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) @@ -1177,8 +1245,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&fi first_item->prev (0); first_item->next (0); } - - // Second, try to dequeue from the head of the late list + // second, try to dequeue from the head of the late list else if (this->late_head_) { last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; @@ -1245,7 +1312,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&fi // Make sure to subtract off all of the bytes associated with this // message. - this->cur_bytes_ -= first_item->total_size (); + for (ACE_Message_Block *temp = first_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); this->cur_count_--; @@ -1288,8 +1358,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_ // refresh priority status boundaries in the queue if (this->pending_head_) { - current_status = message_strategy_.priority_status (*this->pending_head_, - current_time); + current_status = message_strategy_.priority_status (*this->pending_head_, current_time); switch (current_status) { case ACE_Dynamic_Message_Strategy::BEYOND_LATE: @@ -1397,6 +1466,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_ -1); } } + return 0; } @@ -1489,11 +1559,11 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Val // function. template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head ( + ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) { - return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, - timeout); + return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout); } // Private method to hide public base class method: just calls base @@ -1525,6 +1595,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_i // where it was placed after the queue is refreshed prior to the next // enqueue or dequeue operation. +///////////////////////////////////// +// class ACE_Message_Queue_Factory // +///////////////////////////////////// + template <ACE_SYNCH_DECL> ACE_Message_Queue<ACE_SYNCH_USE> * ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm, @@ -1632,4 +1706,7 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_th #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ #endif /* defined (VXWORKS) */ + + + #endif /* ACE_MESSAGE_QUEUE_T_C */ |