summaryrefslogtreecommitdiff
path: root/ace/Message_Queue_T.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ace/Message_Queue_T.cpp')
-rw-r--r--ace/Message_Queue_T.cpp587
1 files changed, 332 insertions, 255 deletions
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index f1255f597a6..fa4bd8951d2 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -24,6 +24,11 @@ ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+
+//////////////////////////////////////
+// class ACE_Message_Queue_Iterator //
+//////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -34,7 +39,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Messa
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
if (this->curr_ != 0)
{
@@ -48,7 +53,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
return this->curr_ == 0;
}
@@ -56,7 +61,7 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
if (this->curr_)
this->curr_ = this->curr_->next ();
@@ -70,6 +75,11 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
+
+//////////////////////////////////////////////
+// class ACE_Message_Queue_Reverse_Iterator //
+//////////////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -80,7 +90,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Ite
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
if (this->curr_ != 0)
{
@@ -94,7 +104,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&ent
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
return this->curr_ == 0;
}
@@ -102,7 +112,7 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
template <ACE_SYNCH_DECL> int
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
{
- ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1)
+ ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
if (this->curr_)
this->curr_ = this->curr_->prev ();
@@ -114,6 +124,11 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
{
}
+
+/////////////////////////////
+// class ACE_Message_Queue //
+/////////////////////////////
+
template <ACE_SYNCH_DECL> void
ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
{
@@ -236,9 +251,15 @@ ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
{
this->cur_count_--;
- this->cur_bytes_ -= this->head_->total_size ();
+ ACE_Message_Block *temp;
+
+ // Decrement all the counts.
+ for (temp = this->head_;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
- ACE_Message_Block *temp = this->head_;
+ temp = this->head_;
this->head_ = this->head_->next ();
// Make sure to use <release> rather than <delete> since this is
@@ -310,8 +331,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
this->tail_ = new_item;
}
- // Make sure to count all the bytes in a composite message!!!
- this->cur_bytes_ += new_item->total_size ();
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
this->cur_count_++;
@@ -341,8 +366,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
this->head_ = new_item;
- // Make sure to count all the bytes in a composite message!!!
- this->cur_bytes_ += new_item->total_size ();
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
this->cur_count_++;
@@ -406,8 +435,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
}
}
- // Make sure to count all the bytes in a composite message!!!
- this->cur_bytes_ += new_item->total_size ();
+ // Make sure to count *all* the bytes in a composite message!!!
+
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
this->cur_count_++;
@@ -425,9 +458,10 @@ template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
{
if (this->head_ ==0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ASYS_TEXT ("Attempting to dequeue from empty queue")),
- -1);
+ {
+ ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1);
+ }
+
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
first_item = this->head_;
this->head_ = this->head_->next ();
@@ -439,8 +473,12 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item
// NULL...
this->head_->prev (0);
- // Subtract off all of the bytes associated with this message.
- this->cur_bytes_ -= first_item->total_size ();
+ // Make sure to subtract off all of the bytes associated with this
+ // message.
+ for (ACE_Message_Block *temp = first_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
this->cur_count_--;
@@ -736,9 +774,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
// dtor: free message strategy and let base class dtor do the rest
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head,
- ACE_Message_Block *&list_tail,
- u_int status_flags)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (
+ ACE_Message_Block *&list_head,
+ ACE_Message_Block *&list_tail,
+ u_int status_flags)
{
int result = 0;
@@ -753,121 +792,130 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&l
// refresh priority status boundaries in the queue
result = this->refresh_queue (current_time);
if (result < 0)
+ {
return result;
+ }
- if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING)
- && this->pending_head_
- && this->pending_tail_)
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) &&
+ (this->pending_head_) && (this->pending_tail_))
+ {
+ // patch up pointers for the new tail of the queue
+ if (this->pending_head_->prev ())
{
- // patch up pointers for the new tail of the queue
- if (this->pending_head_->prev ())
- {
- this->tail_ = this->pending_head_->prev ();
- this->pending_head_->prev ()->next (0);
- }
- else
- {
- // the list has become empty
- this->head_ = 0;
- this->tail_ = 0;
- }
-
- // point to the head and tail of the list
- list_head = this->pending_head_;
- list_tail = this->pending_tail_;
-
- // cut the pending messages out of the queue entirely
- this->pending_head_->prev (0);
- this->pending_head_ = 0;
- this->pending_tail_ = 0;
+ this->tail_ = this->pending_head_->prev ();
+ this->pending_head_->prev ()->next (0);
}
-
- if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE)
- && this->late_head_
- && this->late_tail_)
+ else
{
- // Patch up pointers for the (possibly) new head and tail of the
- // queue.
- if (this->late_tail_->next ())
- this->late_tail_->next ()->prev (this->late_head_->prev ());
- else
- this->tail_ = this->late_head_->prev ();
-
- if (this->late_head_->prev ())
- this->late_head_->prev ()->next (this->late_tail_->next ());
- else
- this->head_ = this->late_tail_->next ();
-
- // put late messages behind pending messages (if any) being returned
- this->late_head_->prev (list_tail);
- if (list_tail)
- list_tail->next (this->late_head_);
- else
- list_head = this->late_head_;
-
- list_tail = this->late_tail_;
-
- this->late_tail_->next (0);
- this->late_head_ = 0;
- this->late_tail_ = 0;
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
}
- if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE)
- && this->beyond_late_head_
- && this->beyond_late_tail_)
+ // point to the head and tail of the list
+ list_head = this->pending_head_;
+ list_tail = this->pending_tail_;
+
+ // cut the pending messages out of the queue entirely
+ this->pending_head_->prev (0);
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
+ }
+
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) &&
+ (this->late_head_) && (this->late_tail_))
+ {
+ // patch up pointers for the (possibly) new head and tail of the queue
+ if (this->late_tail_->next ())
{
- // Patch up pointers for the new tail of the queue
- if (this->beyond_late_tail_->next ())
- {
- this->head_ = this->beyond_late_tail_->next ();
- this->beyond_late_tail_->next ()->prev (0);
- }
- else
- {
- // the list has become empty
- this->head_ = 0;
- this->tail_ = 0;
- }
-
- // Put beyond late messages at the end of the list being
- // returned.
- if (list_tail)
- {
- this->beyond_late_head_->prev (list_tail);
- list_tail->next (this->beyond_late_head_);
- }
- else
- list_head = this->beyond_late_head_;
+ this->late_tail_->next ()->prev (this->late_head_->prev ());
+ }
+ else
+ {
+ this->tail_ = this->late_head_->prev ();
+ }
+ if (this->late_head_->prev ())
+ {
+ this->late_head_->prev ()->next (this->late_tail_->next ());
+ }
+ else
+ {
+ this->head_ = this->late_tail_->next ();
+ }
- list_tail = this->beyond_late_tail_;
+ // put late messages behind pending messages (if any) being returned
+ this->late_head_->prev (list_tail);
+ if (list_tail)
+ {
+ list_tail->next (this->late_head_);
+ }
+ else
+ {
+ list_head = this->late_head_;
+ }
+ list_tail = this->late_tail_;
+
+ this->late_tail_->next (0);
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) &&
+ (this->beyond_late_head_) && (this->beyond_late_tail_))
+ {
+ // patch up pointers for the new tail of the queue
+ if (this->beyond_late_tail_->next ())
+ {
+ this->head_ = this->beyond_late_tail_->next ();
+ this->beyond_late_tail_->next ()->prev (0);
+ }
+ else
+ {
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
+ }
- this->beyond_late_tail_->next (0);
- this->beyond_late_head_ = 0;
- this->beyond_late_tail_ = 0;
+ // put beyond late messages at the end of the list being returned
+ if (list_tail)
+ {
+ this->beyond_late_head_->prev (list_tail);
+ list_tail->next (this->beyond_late_head_);
+ }
+ else
+ {
+ list_head = this->beyond_late_head_;
}
+ list_tail = this->beyond_late_tail_;
- // Decrement message and size counts for removed messages.
- ACE_Message_Block *temp1;
+ this->beyond_late_tail_->next (0);
+ this->beyond_late_head_ = 0;
+ this->beyond_late_tail_ = 0;
+ }
- for (temp1 = list_head;
- temp1 != 0;
- temp1 = temp1->next ())
- {
- this->cur_count_--;
+ // decrement message and size counts for removed messages
+ ACE_Message_Block *temp1, *temp2;
+ for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ())
+ {
+ this->cur_count_--;
- this->cur_bytes_ -= temp1->total_size ();
+ for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ())
+ {
+ this->cur_bytes_ -= temp2->size ();
}
+ }
return result;
}
+ // Detach all messages with status given in the passed flags from
+ // the queue and return them by setting passed head and tail pointers
+ // to the linked list they comprise. This method is intended primarily
+ // as a means of periodically harvesting messages that have missed
+ // their deadlines, but is available in its most general form. All
+ // messages are returned in priority order, from head to tail, as of
+ // the time this method was called.
+
-// Detach all messages with status given in the passed flags from the
-// queue and return them by setting passed head and tail pointers to
-// the linked list they comprise. This method is intended primarily
-// as a means of periodically harvesting messages that have missed
-// their deadlines, but is available in its most general form. All
-// messages are returned in priority order, from head to tail, as of
-// the time this method was called.
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
@@ -891,23 +939,26 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs
// refresh priority status boundaries in the queue
result = this->refresh_queue (current_time);
if (result < 0)
+ {
return result;
+ }
// *now* it's appropriate to wait for an enqueued item
result = this->wait_not_empty_cond (ace_mon, timeout);
if (result == -1)
+ {
return result;
+ }
- // call the internal dequeue method, which selects an item from the
- // highest priority status portion of the queue that has messages
- // enqueued.
- result = this->dequeue_head_i (first_item);
+ // call the internal dequeue method, which selects an
+ // item from the highest priority status portion of
+ // the queue that has messages enqueued.
+ result = dequeue_head_i (first_item);
return result;
}
-
-// Dequeue and return the <ACE_Message_Block *> at the (logical) head
-// of the queue.
+ // Dequeue and return the <ACE_Message_Block *>
+ // at the (logical) head of the queue.
template <ACE_SYNCH_DECL> void
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
@@ -945,121 +996,130 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item
ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
if (new_item == 0)
+ {
return -1;
+ }
int result = 0;
- // Get the current time.
+ // get the current time
ACE_Time_Value current_time = ACE_OS::gettimeofday ();
- // Refresh priority status boundaries in the queue.
-
+ // refresh priority status boundaries in the queue
result = this->refresh_queue (current_time);
if (result < 0)
+ {
return result;
+ }
- // Where we enqueue depends on the message's priority status.
- switch (message_strategy_.priority_status (*new_item,
- current_time))
- {
+ // where we enqueue depends on the message's priority status
+ switch (message_strategy_.priority_status (*new_item, current_time))
+ {
case ACE_Dynamic_Message_Strategy::PENDING:
if (this->pending_tail_ == 0)
- {
- // Check for simple case of an empty pending queue, where
- // all we need to do is insert <new_item> into the tail of
- // the queue.
- pending_head_ = new_item;
- pending_tail_ = pending_head_;
- return this->enqueue_tail_i (new_item);
- }
+ {
+ // Check for simple case of an empty pending queue, where all we need to
+ // do is insert <new_item> into the tail of the queue.
+ pending_head_ = new_item;
+ pending_tail_ = pending_head_;
+ return this->enqueue_tail_i (new_item);
+ }
else
- {
- // Enqueue the new message in priority order in the pending
- // sublist
- result = sublist_enqueue_i (new_item,
- current_time,
- this->pending_head_,
- this->pending_tail_,
- ACE_Dynamic_Message_Strategy::PENDING);
- }
+ {
+ // enqueue the new message in priority order in the pending sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->pending_head_,
+ this->pending_tail_,
+ ACE_Dynamic_Message_Strategy::PENDING);
+ }
+
break;
case ACE_Dynamic_Message_Strategy::LATE:
if (this->late_tail_ == 0)
+ {
+ late_head_ = new_item;
+ late_tail_ = late_head_;
+
+ if (this->pending_head_ == 0)
{
- late_head_ = new_item;
- late_tail_ = late_head_;
-
- if (this->pending_head_ == 0)
- // Check for simple case of an empty pending queue,
- // where all we need to do is insert <new_item> into the
- // tail of the queue.
- return this->enqueue_tail_i (new_item);
- else if (this->beyond_late_tail_ == 0)
- // Check for simple case of an empty beyond late queue, where all
- // we need to do is insert <new_item> into the head of the queue.
- return this->enqueue_head_i (new_item);
- else
- {
- // Otherwise, we can just splice the new message in
- // between the pending and beyond late portions of the
- // queue.
- this->beyond_late_tail_->next (new_item);
- new_item->prev (this->beyond_late_tail_);
- this->pending_head_->prev (new_item);
- new_item->next (this->pending_head_);
- }
+ // Check for simple case of an empty pending queue, where all
+ // we need to do is insert <new_item> into the tail of the queue.
+ return this->enqueue_tail_i (new_item);
}
- else
+ else if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ return this->enqueue_head_i (new_item);
+ }
+ else
{
- // Enqueue the new message in priority order in the late
- // sublist
- result = sublist_enqueue_i (new_item,
- current_time,
- this->late_head_,
- this->late_tail_,
- ACE_Dynamic_Message_Strategy::LATE);
+ // otherwise, we can just splice the new message in between
+ // the pending and beyond late portions of the queue
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->pending_head_->prev (new_item);
+ new_item->next (this->pending_head_);
}
+ }
+ else
+ {
+ // enqueue the new message in priority order in the late sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->late_head_,
+ this->late_tail_,
+ ACE_Dynamic_Message_Strategy::LATE);
+ }
break;
case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ beyond_late_head_ = new_item;
+ beyond_late_tail_ = beyond_late_head_;
+ return this->enqueue_head_i (new_item);
+ }
+ else
+ {
+ // all beyond late messages have the same (zero) priority, so
+ // just put the new one at the end of the beyond late messages
+ if (this->beyond_late_tail_->next ())
{
- // Check for simple case of an empty beyond late queue,
- // where all we need to do is insert <new_item> into the
- // head of the queue.
- beyond_late_head_ = new_item;
- beyond_late_tail_ = beyond_late_head_;
- return this->enqueue_head_i (new_item);
+ this->beyond_late_tail_->next ()->prev (new_item);
}
- else
+ else
{
- // all beyond late messages have the same (zero) priority,
- // so just put the new one at the end of the beyond late
- // messages
- if (this->beyond_late_tail_->next ())
- this->beyond_late_tail_->next ()->prev (new_item);
- else
- this->tail_ = new_item;
-
- new_item->next (this->beyond_late_tail_->next ());
- this->beyond_late_tail_->next (new_item);
- new_item->prev (this->beyond_late_tail_);
- this->beyond_late_tail_ = new_item;
+ this->tail_ = new_item;
}
+ new_item->next (this->beyond_late_tail_->next ());
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->beyond_late_tail_ = new_item;
+ }
+
break;
- // should never get here, but just in case...
+ // should never get here, but just in case...
default:
result = -1;
break;
- }
+ }
if (result < 0)
+ {
return result;
+ }
- this->cur_bytes_ += new_item->total_size ();
+ for (ACE_Message_Block *temp = new_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ += temp->size ();
this->cur_count_++;
@@ -1068,12 +1128,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item
else
return this->cur_count_;
}
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
-// Enqueue an <ACE_Message_Block *> in accordance with its priority.
-// priority may be *dynamic* or *static* or a combination or *both* It
-// calls the priority evaluation function passed into the Dynamic
-// Message Queue constructor to update the priorities of all enqueued
-// messages.
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
@@ -1085,64 +1145,72 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *
int result = 0;
ACE_Message_Block *current_item = 0;
- // Find message after which to enqueue new item, based on message
- // priority and priority status.
+ // find message after which to enqueue new item,
+ // based on message priority and priority status
for (current_item = sublist_tail;
current_item;
current_item = current_item->prev ())
+ {
+ if (message_strategy_.priority_status (*current_item, current_time) == status)
{
- if (message_strategy_.priority_status (*current_item, current_time) == status)
- {
- if (current_item->msg_priority () >= new_item->msg_priority ())
- break;
- }
- else
- {
- sublist_head = new_item;
- break;
- }
+ if (current_item->msg_priority () >= new_item->msg_priority ())
+ {
+ break;
+ }
}
+ else
+ {
+ sublist_head = new_item;
+ break;
+ }
+ }
if (current_item == 0)
+ {
+ // if the new message has highest priority of any,
+ // put it at the head of the list (and sublist)
+ new_item->prev (0);
+ new_item->next (this->head_);
+ if (this->head_ != 0)
{
- // If the new message has highest priority of any, put it at the
- // head of the list (and sublist).
- new_item->prev (0);
- new_item->next (this->head_);
- if (this->head_ != 0)
- this->head_->prev (new_item);
- else
- {
- this->tail_ = new_item;
- sublist_tail = new_item;
- }
- this->head_ = new_item;
- sublist_head = new_item;
+ this->head_->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ sublist_tail = new_item;
}
+ this->head_ = new_item;
+ sublist_head = new_item;
+ }
else
+ {
+ // insert the new item into the list
+ new_item->next (current_item->next ());
+ new_item->prev (current_item);
+ if (current_item->next ())
{
- // insert the new item into the list
- new_item->next (current_item->next ());
- new_item->prev (current_item);
-
- if (current_item->next ())
- current_item->next ()->prev (new_item);
- else
- this->tail_ = new_item;
+ current_item->next ()->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ }
- current_item->next (new_item);
+ current_item->next (new_item);
- // If the new item has lowest priority of any in the sublist,
- // move the tail pointer of the sublist back to the new item
- if (current_item == sublist_tail)
- sublist_tail = new_item;
+ // if the new item has lowest priority of any in the sublist,
+ // move the tail pointer of the sublist back to the new item
+ if (current_item == sublist_tail)
+ {
+ sublist_tail = new_item;
}
+ }
return result;
}
+ // enqueue a message in priority order within a given priority status sublist
-// Enqueue a message in priority order within a given priority status
-// sublist.
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
@@ -1177,8 +1245,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&fi
first_item->prev (0);
first_item->next (0);
}
-
- // Second, try to dequeue from the head of the late list
+ // second, try to dequeue from the head of the late list
else if (this->late_head_)
{
last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0;
@@ -1245,7 +1312,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&fi
// Make sure to subtract off all of the bytes associated with this
// message.
- this->cur_bytes_ -= first_item->total_size ();
+ for (ACE_Message_Block *temp = first_item;
+ temp != 0;
+ temp = temp->cont ())
+ this->cur_bytes_ -= temp->size ();
this->cur_count_--;
@@ -1288,8 +1358,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_
// refresh priority status boundaries in the queue
if (this->pending_head_)
{
- current_status = message_strategy_.priority_status (*this->pending_head_,
- current_time);
+ current_status = message_strategy_.priority_status (*this->pending_head_, current_time);
switch (current_status)
{
case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
@@ -1397,6 +1466,7 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_
-1);
}
}
+
return 0;
}
@@ -1489,11 +1559,11 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Val
// function.
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (
+ ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
{
- return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item,
- timeout);
+ return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout);
}
// Private method to hide public base class method: just calls base
@@ -1525,6 +1595,10 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_i
// where it was placed after the queue is refreshed prior to the next
// enqueue or dequeue operation.
+/////////////////////////////////////
+// class ACE_Message_Queue_Factory //
+/////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue<ACE_SYNCH_USE> *
ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
@@ -1632,4 +1706,7 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_th
#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
#endif /* defined (VXWORKS) */
+
+
+
#endif /* ACE_MESSAGE_QUEUE_T_C */