// $Id$ #ifndef ACE_MESSAGE_QUEUE_T_C #define ACE_MESSAGE_QUEUE_T_C #define ACE_BUILD_DLL // #include Message_Queue.h instead of Message_Queue_T.h to avoid // circular include problems. #include "ace/Message_Queue.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #if !defined (__ACE_INLINE__) #include "ace/Message_Queue_T.i" #endif /* __ACE_INLINE__ */ #include "ace/Strategies.h" // Need ACE_Notification_Strategy ACE_RCSID(ace, Message_Queue_T, "$Id$") ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) template ACE_Message_Queue_Iterator::ACE_Message_Queue_Iterator (ACE_Message_Queue &q) : queue_ (q), curr_ (q.head_) { } template int ACE_Message_Queue_Iterator::next (ACE_Message_Block *&entry) { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) if (this->curr_ != 0) { entry = this->curr_; return 1; } else return 0; } template int ACE_Message_Queue_Iterator::done (void) const { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) return this->curr_ == 0; } template int ACE_Message_Queue_Iterator::advance (void) { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) if (this->curr_) this->curr_ = this->curr_->next (); return this->curr_ != 0; } template void ACE_Message_Queue_Iterator::dump (void) const { } ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) template ACE_Message_Queue_Reverse_Iterator::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue &q) : queue_ (q), curr_ (queue_.tail_) { } template int ACE_Message_Queue_Reverse_Iterator::next (ACE_Message_Block *&entry) { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) if (this->curr_ != 0) { entry = this->curr_; return 1; } else return 0; } template int ACE_Message_Queue_Reverse_Iterator::done (void) const { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) return this->curr_ == 0; } template int ACE_Message_Queue_Reverse_Iterator::advance (void) { ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) if (this->curr_) this->curr_ = this->curr_->prev (); return this->curr_ != 0; } template void ACE_Message_Queue_Reverse_Iterator::dump (void) const { } template void ACE_Message_Queue::dump (void) const { ACE_TRACE ("ACE_Message_Queue::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("deactivated = %d\n") ASYS_TEXT ("low_water_mark = %d\n") ASYS_TEXT ("high_water_mark = %d\n") ASYS_TEXT ("cur_bytes = %d\n") ASYS_TEXT ("cur_count = %d\n") ASYS_TEXT ("head_ = %u\n") ASYS_TEXT ("tail_ = %u\n"), this->deactivated_, this->low_water_mark_, this->high_water_mark_, this->cur_bytes_, this->cur_count_, this->head_, this->tail_)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n"))); not_full_cond_.dump (); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n"))); not_empty_cond_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } template ACE_Message_Queue::ACE_Message_Queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) : not_empty_cond_ (0), not_full_cond_ (0), enqueue_waiters_ (0), dequeue_waiters_ (0) #else : not_empty_cond_ (this->lock_), not_full_cond_ (this->lock_) #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ { ACE_TRACE ("ACE_Message_Queue::ACE_Message_Queue"); if (this->open (hwm, lwm, ns) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open"))); } template ACE_Message_Queue::~ACE_Message_Queue (void) { ACE_TRACE ("ACE_Message_Queue::~ACE_Message_Queue"); if (this->head_ != 0 && this->close () == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close"))); } // Don't bother locking since if someone calls this function more than // once for the same queue, we're in bigger trouble than just // concurrency control! template int ACE_Message_Queue::open (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) { ACE_TRACE ("ACE_Message_Queue::open"); this->high_water_mark_ = hwm; this->low_water_mark_ = lwm; this->deactivated_ = 0; this->cur_bytes_ = 0; this->cur_count_ = 0; this->tail_ = 0; this->head_ = 0; this->notification_strategy_ = ns; return 0; } // Implementation of the public deactivate() method // (assumes locks are held). template int ACE_Message_Queue::deactivate_i (void) { ACE_TRACE ("ACE_Message_Queue::deactivate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; // Wakeup all waiters. #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) this->not_empty_cond_.broadcast (); this->not_full_cond_.broadcast (); #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ this->deactivated_ = 1; return current_status; } template int ACE_Message_Queue::activate_i (void) { ACE_TRACE ("ACE_Message_Queue::activate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; this->deactivated_ = 0; return current_status; } // Clean up the queue if we have not already done so! template int ACE_Message_Queue::close (void) { ACE_TRACE ("ACE_Message_Queue::close"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); int res = this->deactivate_i (); // Free up the remaining message on the list for (this->tail_ = 0; this->head_ != 0; ) { this->cur_count_--; this->cur_bytes_ -= this->head_->total_size (); ACE_Message_Block *temp = this->head_; this->head_ = this->head_->next (); // Make sure to use rather than since this is // reference counted. temp->release (); } return res; } template int ACE_Message_Queue::signal_enqueue_waiters (void) { #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) if (this->not_full_cond_.signal () != 0) return -1; #else if (this->enqueue_waiters_ > 0) { --this->enqueue_waiters_; return this->not_full_cond_.release (); } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } template int ACE_Message_Queue::signal_dequeue_waiters (void) { #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) // Tell any blocked threads that the queue has a new item! if (this->not_empty_cond_.signal () != 0) return -1; #else if (this->dequeue_waiters_ > 0) { --this->dequeue_waiters_; return this->not_empty_cond_.release (); } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } // Actually put the node at the end (no locking so must be called with // locks held). template int ACE_Message_Queue::enqueue_tail_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_tail_i"); if (new_item == 0) return -1; // List was empty, so build a new one. if (this->tail_ == 0) { this->head_ = new_item; this->tail_ = new_item; new_item->next (0); new_item->prev (0); } // Link at the end. else { new_item->next (0); this->tail_->next (new_item); new_item->prev (this->tail_); this->tail_ = new_item; } // Make sure to count all the bytes in a composite message!!! this->cur_bytes_ += new_item->total_size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually put the node at the head (no locking) template int ACE_Message_Queue::enqueue_head_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_head_i"); if (new_item == 0) return -1; new_item->prev (0); new_item->next (this->head_); if (this->head_ != 0) this->head_->prev (new_item); else this->tail_ = new_item; this->head_ = new_item; // Make sure to count all the bytes in a composite message!!! this->cur_bytes_ += new_item->total_size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually put the node at its proper position relative to its // priority. template int ACE_Message_Queue::enqueue_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_i"); if (new_item == 0) return -1; if (this->head_ == 0) // Check for simple case of an empty queue, where all we need to // do is insert into the head. return this->enqueue_head_i (new_item); else { ACE_Message_Block *temp; // Figure out where the new item goes relative to its priority. // We start looking from the highest priority to the lowest // priority. for (temp = this->tail_; temp != 0; temp = temp->prev ()) if (temp->msg_priority () >= new_item->msg_priority ()) // Break out when we've located an item that has // greater or equal priority. break; if (temp == 0) // Check for simple case of inserting at the head of the queue, // where all we need to do is insert before the // current head. return this->enqueue_head_i (new_item); else if (temp->next () == 0) // Check for simple case of inserting at the tail of the // queue, where all we need to do is insert after // the current tail. return this->enqueue_tail_i (new_item); else { // Insert the new message behind the message of // greater or equal priority. This ensures that FIFO order is // maintained when messages of the same priority are // inserted consecutively. new_item->prev (temp); new_item->next (temp->next ()); temp->next ()->prev (new_item); temp->next (new_item); } } // Make sure to count all the bytes in a composite message!!! this->cur_bytes_ += new_item->total_size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually get the first ACE_Message_Block (no locking, so must be // called with locks held). This method assumes that the queue has at // least one item in it when it is called. template int ACE_Message_Queue::dequeue_head_i (ACE_Message_Block *&first_item) { if (this->head_ ==0) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1); ACE_TRACE ("ACE_Message_Queue::dequeue_head_i"); first_item = this->head_; this->head_ = this->head_->next (); if (this->head_ == 0) this->tail_ = 0; else // The prev pointer of the first message block has to point to // NULL... this->head_->prev (0); // Subtract off all of the bytes associated with this message. this->cur_bytes_ -= first_item->total_size (); this->cur_count_--; // Only signal enqueueing threads if we've fallen below the low // water mark. if (this->cur_bytes_ <= this->low_water_mark_ && this->signal_enqueue_waiters () == -1) return -1; else return this->cur_count_; } // Take a look at the first item without removing it. template int ACE_Message_Queue::peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::peek_dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } // Wait for at least one item to become available. if (this->wait_not_empty_cond (ace_mon, timeout) == -1) return -1; first_item = this->head_; return this->cur_count_; } template int ACE_Message_Queue::wait_not_full_cond (ACE_Guard &mon, ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) while (this->is_full_i () && result != -1) { ++this->enqueue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); if (timeout == 0) result = this->not_full_cond_.acquire (); else result = this->not_full_cond_.acquire (*timeout); // Save/restore errno. ACE_Errno_Guard error (errno); mon.acquire (); } #else ACE_UNUSED_ARG (mon); // Wait while the queue is full. while (this->is_full_i ()) { if (this->not_full_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; result = -1; break; } if (this->deactivated_) { errno = ESHUTDOWN; result = -1; break; } } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return result; } template int ACE_Message_Queue::wait_not_empty_cond (ACE_Guard &mon, ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) while (this->is_empty_i () && result != -1) { ++this->dequeue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); if (timeout == 0) result = this->not_empty_cond_.acquire (); else { result = this->not_empty_cond_.acquire (*timeout); if (result == -1 && errno == ETIME) errno = EWOULDBLOCK; } // Save/restore errno. ACE_Errno_Guard error (errno); mon.acquire (); } #else ACE_UNUSED_ARG (mon); // Wait while the queue is empty. while (this->is_empty_i ()) { if (this->not_empty_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; result = -1; break; } if (this->deactivated_) { errno = ESHUTDOWN; result = -1; break; } } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return result; } // Block indefinitely waiting for an item to arrive, does not ignore // alerts (e.g., signals). template int ACE_Message_Queue::enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::enqueue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_head_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } // Enqueue an into the in // accordance with its (0 is lowest priority). Returns // -1 on failure, else the number of items still on the queue. template int ACE_Message_Queue::enqueue_prio (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::enqueue_prio"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } template int ACE_Message_Queue::enqueue (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::enqueue"); return this->enqueue_prio (new_item, timeout); } // Block indefinitely waiting for an item to arrive, // does not ignore alerts (e.g., signals). template int ACE_Message_Queue::enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::enqueue_tail"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_tail_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } // Remove an item from the front of the queue. If timeout == 0 block // indefinitely (or until an alert occurs). Otherwise, block for upto // the amount of time specified by timeout. template int ACE_Message_Queue::dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_empty_cond (ace_mon, timeout) == -1) return -1; return this->dequeue_head_i (first_item); } template int ACE_Message_Queue::notify (void) { ACE_TRACE ("ACE_Message_Queue::notify"); // By default, don't do anything. if (this->notification_strategy_ == 0) return 0; else return this->notification_strategy_->notify (); } // = Initialization and termination methods. template ACE_Dynamic_Message_Queue::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) : ACE_Message_Queue (hwm, lwm, ns), pending_head_ (0), pending_tail_ (0), late_head_ (0), late_tail_ (0), beyond_late_head_ (0), beyond_late_tail_ (0), message_strategy_ (message_strategy) { // Note, the ACE_Dynamic_Message_Queue assumes full responsibility // for the passed ACE_Dynamic_Message_Strategy object, and deletes // it in its own dtor } // dtor: free message strategy and let base class dtor do the rest. template ACE_Dynamic_Message_Queue::~ACE_Dynamic_Message_Queue (void) { delete &this->message_strategy_; } template int ACE_Dynamic_Message_Queue::remove_messages (ACE_Message_Block *&list_head, ACE_Message_Block *&list_tail, u_int status_flags) { // start with an empty list list_head = 0; list_tail = 0; // Get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); // Refresh priority status boundaries in the queue. int result = this->refresh_queue (current_time); if (result < 0) return result; if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::PENDING) && this->pending_head_ && this->pending_tail_) { // patch up pointers for the new tail of the queue if (this->pending_head_->prev ()) { this->tail_ = this->pending_head_->prev (); this->pending_head_->prev ()->next (0); } else { // the list has become empty this->head_ = 0; this->tail_ = 0; } // point to the head and tail of the list list_head = this->pending_head_; list_tail = this->pending_tail_; // cut the pending messages out of the queue entirely this->pending_head_->prev (0); this->pending_head_ = 0; this->pending_tail_ = 0; } if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::LATE) && this->late_head_ && this->late_tail_) { // Patch up pointers for the (possibly) new head and tail of the // queue. if (this->late_tail_->next ()) this->late_tail_->next ()->prev (this->late_head_->prev ()); else this->tail_ = this->late_head_->prev (); if (this->late_head_->prev ()) this->late_head_->prev ()->next (this->late_tail_->next ()); else this->head_ = this->late_tail_->next (); // put late messages behind pending messages (if any) being returned this->late_head_->prev (list_tail); if (list_tail) list_tail->next (this->late_head_); else list_head = this->late_head_; list_tail = this->late_tail_; this->late_tail_->next (0); this->late_head_ = 0; this->late_tail_ = 0; } if (ACE_BIT_ENABLED (status_flags, (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) && this->beyond_late_head_ && this->beyond_late_tail_) { // Patch up pointers for the new tail of the queue if (this->beyond_late_tail_->next ()) { this->head_ = this->beyond_late_tail_->next (); this->beyond_late_tail_->next ()->prev (0); } else { // the list has become empty this->head_ = 0; this->tail_ = 0; } // Put beyond late messages at the end of the list being // returned. if (list_tail) { this->beyond_late_head_->prev (list_tail); list_tail->next (this->beyond_late_head_); } else list_head = this->beyond_late_head_; list_tail = this->beyond_late_tail_; this->beyond_late_tail_->next (0); this->beyond_late_head_ = 0; this->beyond_late_tail_ = 0; } // Decrement message and size counts for removed messages. ACE_Message_Block *temp1; for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ()) { this->cur_count_--; this->cur_bytes_ -= temp1->total_size (); } return result; } // Detach all messages with status given in the passed flags from the // queue and return them by setting passed head and tail pointers to // the linked list they comprise. This method is intended primarily // as a means of periodically harvesting messages that have missed // their deadlines, but is available in its most general form. All // messages are returned in priority order, from head to tail, as of // the time this method was called. template int ACE_Dynamic_Message_Queue::dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Dynamic_Message_Queue::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } int result; // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) return result; // *now* it's appropriate to wait for an enqueued item result = this->wait_not_empty_cond (ace_mon, timeout); if (result == -1) return result; // call the internal dequeue method, which selects an item from the // highest priority status portion of the queue that has messages // enqueued. result = this->dequeue_head_i (first_item); return result; } // Dequeue and return the at the (logical) head // of the queue. template void ACE_Dynamic_Message_Queue::dump (void) const { ACE_TRACE ("ACE_Dynamic_Message_Queue::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Message_Queue (base class): \n"))); this->ACE_Message_Queue::dump (); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("pending_head_ = %u\n") ASYS_TEXT ("pending_tail_ = %u\n") ASYS_TEXT ("late_head_ = %u\n") ASYS_TEXT ("late_tail_ = %u\n") ASYS_TEXT ("beyond_late_head_ = %u\n") ASYS_TEXT ("beyond_late_tail_ = %u\n"), this->pending_head_, this->pending_tail_, this->late_head_, this->late_tail_, this->beyond_late_head_, this->beyond_late_tail_)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message_strategy_ : \n"))); message_strategy_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } // dump the state of the queue template int ACE_Dynamic_Message_Queue::enqueue_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Dynamic_Message_Queue::enqueue_i"); if (new_item == 0) return -1; int result = 0; // Get the current time. ACE_Time_Value current_time = ACE_OS::gettimeofday (); // Refresh priority status boundaries in the queue. result = this->refresh_queue (current_time); if (result < 0) return result; // Where we enqueue depends on the message's priority status. switch (message_strategy_.priority_status (*new_item, current_time)) { case ACE_Dynamic_Message_Strategy::PENDING: if (this->pending_tail_ == 0) { // Check for simple case of an empty pending queue, where // all we need to do is insert into the tail of // the queue. pending_head_ = new_item; pending_tail_ = pending_head_; return this->enqueue_tail_i (new_item); } else { // Enqueue the new message in priority order in the pending // sublist result = sublist_enqueue_i (new_item, current_time, this->pending_head_, this->pending_tail_, ACE_Dynamic_Message_Strategy::PENDING); } break; case ACE_Dynamic_Message_Strategy::LATE: if (this->late_tail_ == 0) { late_head_ = new_item; late_tail_ = late_head_; if (this->pending_head_ == 0) // Check for simple case of an empty pending queue, // where all we need to do is insert into the // tail of the queue. return this->enqueue_tail_i (new_item); else if (this->beyond_late_tail_ == 0) // Check for simple case of an empty beyond late queue, where all // we need to do is insert into the head of the queue. return this->enqueue_head_i (new_item); else { // Otherwise, we can just splice the new message in // between the pending and beyond late portions of the // queue. this->beyond_late_tail_->next (new_item); new_item->prev (this->beyond_late_tail_); this->pending_head_->prev (new_item); new_item->next (this->pending_head_); } } else { // Enqueue the new message in priority order in the late // sublist result = sublist_enqueue_i (new_item, current_time, this->late_head_, this->late_tail_, ACE_Dynamic_Message_Strategy::LATE); } break; case ACE_Dynamic_Message_Strategy::BEYOND_LATE: if (this->beyond_late_tail_ == 0) { // Check for simple case of an empty beyond late queue, // where all we need to do is insert into the // head of the queue. beyond_late_head_ = new_item; beyond_late_tail_ = beyond_late_head_; return this->enqueue_head_i (new_item); } else { // all beyond late messages have the same (zero) priority, // so just put the new one at the end of the beyond late // messages if (this->beyond_late_tail_->next ()) this->beyond_late_tail_->next ()->prev (new_item); else this->tail_ = new_item; new_item->next (this->beyond_late_tail_->next ()); this->beyond_late_tail_->next (new_item); new_item->prev (this->beyond_late_tail_); this->beyond_late_tail_ = new_item; } break; // should never get here, but just in case... default: result = -1; break; } if (result < 0) return result; this->cur_bytes_ += new_item->total_size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Enqueue an in accordance with its priority. // priority may be *dynamic* or *static* or a combination or *both* It // calls the priority evaluation function passed into the Dynamic // Message Queue constructor to update the priorities of all enqueued // messages. template int ACE_Dynamic_Message_Queue::sublist_enqueue_i (ACE_Message_Block *new_item, const ACE_Time_Value ¤t_time, ACE_Message_Block *&sublist_head, ACE_Message_Block *&sublist_tail, ACE_Dynamic_Message_Strategy::Priority_Status status) { int result = 0; ACE_Message_Block *current_item = 0; // Find message after which to enqueue new item, based on message // priority and priority status. for (current_item = sublist_tail; current_item; current_item = current_item->prev ()) { if (message_strategy_.priority_status (*current_item, current_time) == status) { if (current_item->msg_priority () >= new_item->msg_priority ()) break; } else { sublist_head = new_item; break; } } if (current_item == 0) { // If the new message has highest priority of any, put it at the // head of the list (and sublist). new_item->prev (0); new_item->next (this->head_); if (this->head_ != 0) this->head_->prev (new_item); else { this->tail_ = new_item; sublist_tail = new_item; } this->head_ = new_item; sublist_head = new_item; } else { // insert the new item into the list new_item->next (current_item->next ()); new_item->prev (current_item); if (current_item->next ()) current_item->next ()->prev (new_item); else this->tail_ = new_item; current_item->next (new_item); // If the new item has lowest priority of any in the sublist, // move the tail pointer of the sublist back to the new item if (current_item == sublist_tail) sublist_tail = new_item; } return result; } // Enqueue a message in priority order within a given priority status // sublist. template int ACE_Dynamic_Message_Queue::dequeue_head_i (ACE_Message_Block *&first_item) { ACE_TRACE ("ACE_Dynamic_Message_Queue::dequeue_head_i"); int result = 0; int last_in_subqueue = 0; // first, try to dequeue from the head of the pending list if (this->pending_head_) { first_item = this->pending_head_; if (0 == this->pending_head_->prev ()) this->head_ = this->pending_head_->next (); else this->pending_head_->prev ()->next (this->pending_head_->next ()); if (0 == this->pending_head_->next ()) { this->tail_ = this->pending_head_->prev (); this->pending_head_ = 0; this->pending_tail_ = 0; } else { this->pending_head_->next ()->prev (this->pending_head_->prev ()); this->pending_head_ = this->pending_head_->next (); } first_item->prev (0); first_item->next (0); } // Second, try to dequeue from the head of the late list else if (this->late_head_) { last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; first_item = this->late_head_; if (0 == this->late_head_->prev ()) this->head_ = this->late_head_->next (); else this->late_head_->prev ()->next (this->late_head_->next ()); if (0 == this->late_head_->next ()) this->tail_ = this->late_head_->prev (); else { this->late_head_->next ()->prev (this->late_head_->prev ()); this->late_head_ = this->late_head_->next (); } if (last_in_subqueue) { this->late_head_ = 0; this->late_tail_ = 0; } first_item->prev (0); first_item->next (0); } // finally, try to dequeue from the head of the beyond late list else if (this->beyond_late_head_) { last_in_subqueue = (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; first_item = this->beyond_late_head_; this->head_ = this->beyond_late_head_->next (); if (0 == this->beyond_late_head_->next ()) this->tail_ = this->beyond_late_head_->prev (); else { this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); this->beyond_late_head_ = this->beyond_late_head_->next (); } if (last_in_subqueue) { this->beyond_late_head_ = 0; this->beyond_late_tail_ = 0; } first_item->prev (0); first_item->next (0); } else { // nothing to dequeue: set the pointer to zero and return an error code first_item = 0; result = -1; } if (result < 0) return result; // Make sure to subtract off all of the bytes associated with this // message. this->cur_bytes_ -= first_item->total_size (); this->cur_count_--; // Only signal enqueueing threads if we've fallen below the low // water mark. if (this->cur_bytes_ <= this->low_water_mark_ && this->signal_enqueue_waiters () == -1) return -1; else return this->cur_count_; } // Dequeue and return the at the head of the // logical queue. Attempts first to dequeue from the pending portion // of the queue, or if that is empty from the late portion, or if that // is empty from the beyond late portion, or if that is empty just // sets the passed pointer to zero and returns -1. template int ACE_Dynamic_Message_Queue::refresh_queue (const ACE_Time_Value ¤t_time) { int result; result = refresh_pending_queue (current_time); if (result != -1) result = refresh_late_queue (current_time); return result; } // Refresh the queue using the strategy specific priority status // function. template int ACE_Dynamic_Message_Queue::refresh_pending_queue (const ACE_Time_Value ¤t_time) { ACE_Dynamic_Message_Strategy::Priority_Status current_status; // refresh priority status boundaries in the queue if (this->pending_head_) { current_status = message_strategy_.priority_status (*this->pending_head_, current_time); switch (current_status) { case ACE_Dynamic_Message_Strategy::BEYOND_LATE: // Make sure the head of the beyond late queue is set (there // may not have been any beyond late messages previously) this->beyond_late_head_ = this->head_; // Zero out the late queue pointers, and set them only if // there turn out to be late messages in the pending sublist this->late_head_ = 0; this->late_tail_ = 0; // Advance through the beyond late messages in the pending queue do { this->pending_head_ = this->pending_head_->next (); if (this->pending_head_) current_status = message_strategy_.priority_status (*this->pending_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); if (this->pending_head_) { // point tail of beyond late sublist to previous item this->beyond_late_tail_ = this->pending_head_->prev (); if (current_status == ACE_Dynamic_Message_Strategy::PENDING) // there are no late messages left in the queue break; // switch else if (current_status != ACE_Dynamic_Message_Strategy::LATE) { // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), (int) current_status), -1); } /* FALLTHRU */ } else { // There are no pending or late messages left in the // queue. this->beyond_late_tail_ = this->tail_; this->pending_head_ = 0; this->pending_tail_ = 0; break; // switch } case ACE_Dynamic_Message_Strategy::LATE: // Make sure the head of the late queue is set (there may // not have been any late messages previously, or they may // have all become beyond late). if (this->late_head_ == 0) this->late_head_ = this->pending_head_; // advance through the beyond late messages in the pending queue do { this->pending_head_ = this->pending_head_->next (); if (this->pending_head_) current_status = message_strategy_.priority_status (*this->pending_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::LATE); if (this->pending_head_) { if (current_status != ACE_Dynamic_Message_Strategy::PENDING) // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), (int) current_status), -1); // Point tail of late sublist to previous item this->late_tail_ = this->pending_head_->prev (); } else { // there are no pending messages left in the queue this->late_tail_ = this->tail_; this->pending_head_ = 0; this->pending_tail_ = 0; } break; // switch case ACE_Dynamic_Message_Strategy::PENDING: // do nothing - the pending queue is unchanged break; // switch default: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unknown message priority status [%d]"), (int) current_status), -1); } } return 0; } // Refresh the pending queue using the strategy specific priority // status function. template int ACE_Dynamic_Message_Queue::refresh_late_queue (const ACE_Time_Value ¤t_time) { ACE_Dynamic_Message_Strategy::Priority_Status current_status; if (this->late_head_) { current_status = message_strategy_.priority_status (*this->late_head_, current_time); switch (current_status) { case ACE_Dynamic_Message_Strategy::BEYOND_LATE: // make sure the head of the beyond late queue is set // (there may not have been any beyond late messages previously) this->beyond_late_head_ = this->head_; // advance through the beyond late messages in the late queue do { this->late_head_ = this->late_head_->next (); if (this->late_head_) current_status = message_strategy_.priority_status (*this->late_head_, current_time); else break; // do while } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); if (this->late_head_) { // point tail of beyond late sublist to previous item this->beyond_late_tail_ = this->late_head_->prev (); if (current_status == ACE_Dynamic_Message_Strategy::PENDING) { // there are no late messages left in the queue this->late_head_ = 0; this->late_tail_ = 0; } else if (current_status != ACE_Dynamic_Message_Strategy::LATE) // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), (int) current_status), -1); } else { // there are no late messages left in the queue this->beyond_late_tail_ = this->tail_; this->late_head_ = 0; this->late_tail_ = 0; } break; // switch case ACE_Dynamic_Message_Strategy::LATE: // do nothing - the late queue is unchanged break; // switch case ACE_Dynamic_Message_Strategy::PENDING: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unexpected message priority status ") ASYS_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), (int) current_status), -1); default: // if we got here, something is *seriously* wrong with the queue ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Unknown message priority status [%d]"), (int) current_status), -1); } } return 0; } // Refresh the late queue using the strategy specific priority status // function. template int ACE_Dynamic_Message_Queue::peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout) { return ACE_Message_Queue::peek_dequeue_head (first_item, timeout); } // Private method to hide public base class method: just calls base // class method. template int ACE_Dynamic_Message_Queue::enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Dynamic_Message_Queue::enqueue_tail"); return this->enqueue_prio (new_item, timeout); } // Just call priority enqueue method: tail enqueue semantics for // dynamic message queues are unstable: the message may or may not be // where it was placed after the queue is refreshed prior to the next // enqueue or dequeue operation. template int ACE_Dynamic_Message_Queue::enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Dynamic_Message_Queue::enqueue_head"); return this->enqueue_prio (new_item, timeout); } // Just call priority enqueue method: head enqueue semantics for // dynamic message queues are unstable: the message may or may not be // where it was placed after the queue is refreshed prior to the next // enqueue or dequeue operation. template ACE_Message_Queue * ACE_Message_Queue_Factory::create_static_message_queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) { ACE_Message_Queue *tmp; ACE_NEW_RETURN (tmp, ACE_Message_Queue (hwm, lwm, ns), 0); return tmp; } // Factory method for a statically prioritized ACE_Message_Queue. template ACE_Dynamic_Message_Queue * ACE_Message_Queue_Factory::create_deadline_message_queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, u_long dynamic_priority_max, u_long dynamic_priority_offset) { ACE_Deadline_Message_Strategy *adms; ACE_NEW_RETURN (adms, ACE_Deadline_Message_Strategy (static_bit_field_mask, static_bit_field_shift, dynamic_priority_max, dynamic_priority_offset), 0); ACE_Dynamic_Message_Queue *tmp; ACE_NEW_RETURN (tmp, ACE_Dynamic_Message_Queue (*adms, hwm, lwm, ns), 0); return tmp; } // Factory method for a dynamically prioritized (by time to deadline) // ACE_Dynamic_Message_Queue. template ACE_Dynamic_Message_Queue * ACE_Message_Queue_Factory::create_laxity_message_queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, u_long dynamic_priority_max, u_long dynamic_priority_offset) { ACE_Laxity_Message_Strategy *alms; ACE_NEW_RETURN (alms, ACE_Laxity_Message_Strategy (static_bit_field_mask, static_bit_field_shift, dynamic_priority_max, dynamic_priority_offset), 0); ACE_Dynamic_Message_Queue *tmp; ACE_NEW_RETURN (tmp, ACE_Dynamic_Message_Queue (*alms, hwm, lwm, ns), 0); return tmp; } // Factory method for a dynamically prioritized (by laxity) // . #if defined (VXWORKS) template ACE_Message_Queue_Vx * ACE_Message_Queue_Factory::create_Vx_message_queue (size_t max_messages, size_t max_message_length, ACE_Notification_Strategy *ns) { ACE_Message_Queue_Vx *tmp; ACE_NEW_RETURN (tmp, ACE_Message_Queue_Vx (max_messages, max_message_length, ns), 0); return tmp; } // factory method for a wrapped VxWorks message queue #if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) template ACE_Message_Queue_NT * ACE_Message_Queue_Factory::create_NT_message_queue (size_t max_threads) { ACE_Message_Queue_NT *tmp; ACE_NEW_RETURN (tmp, ACE_Message_Queue_NT (max_threads); 0); return tmp; } #endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ #endif /* defined (VXWORKS) */ #endif /* ACE_MESSAGE_QUEUE_T_C */