// Message_Queue.cpp // $Id$ #if !defined (ACE_MESSAGE_QUEUE_C) #define ACE_MESSAGE_QUEUE_C #define ACE_BUILD_DLL #include "ace/Message_Queue.h" #if !defined (__ACE_INLINE__) #include "ace/Message_Queue.i" #endif /* __ACE_INLINE__ */ ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) template ACE_Message_Queue_Iterator::ACE_Message_Queue_Iterator (ACE_Message_Queue &q) : queue_ (q), curr_ (q.head_) { } template int ACE_Message_Queue_Iterator::next (ACE_Message_Block *&entry) { ACE_Read_Guard m (this->queue_.lock_); if (this->curr_ != 0) { entry = this->curr_; return 1; } else return 0; } template int ACE_Message_Queue_Iterator::done (void) const { ACE_Read_Guard m (this->queue_.lock_); return this->curr_ == 0; } template int ACE_Message_Queue_Iterator::advance (void) { ACE_Read_Guard m (this->queue_.lock_); this->curr_ = this->curr_->next (); return this->curr_ != 0; } template void ACE_Message_Queue_Iterator::dump (void) const { } ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) template ACE_Message_Queue_Reverse_Iterator::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue &queue) : queue_ (queue), curr_ (queue_.tail_) { } template int ACE_Message_Queue_Reverse_Iterator::next (ACE_Message_Block *&entry) { ACE_Read_Guard m (this->queue_.lock_); if (this->curr_ != 0) { entry = this->curr_; return 1; } else return 0; } template int ACE_Message_Queue_Reverse_Iterator::done (void) const { ACE_Read_Guard m (this->queue_.lock_); return this->curr_ == 0; } template int ACE_Message_Queue_Reverse_Iterator::advance (void) { ACE_Read_Guard m (this->queue_.lock_); this->curr_ = this->curr_->prev (); return this->curr_ != 0; } template void ACE_Message_Queue_Reverse_Iterator::dump (void) const { } template void ACE_Message_Queue::dump (void) const { ACE_TRACE ("ACE_Message_Queue::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, "deactivated = %d\n" "low_water_mark = %d\n" "high_water_mark = %d\n" "cur_bytes = %d\n" "cur_count = %d\n" "head_ = %u\n" "tail_ = %u\n", this->deactivated_, this->low_water_mark_, this->high_water_mark_, this->cur_bytes_, this->cur_count_, this->head_, this->tail_)); ACE_DEBUG ((LM_DEBUG,"not_full_cond: \n")); not_full_cond_.dump (); ACE_DEBUG ((LM_DEBUG,"not_empty_cond: \n")); not_empty_cond_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } template ACE_Message_Queue::ACE_Message_Queue (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) : not_empty_cond_ (0), not_full_cond_ (0), enqueue_waiters_ (0), dequeue_waiters_ (0) #else : not_empty_cond_ (this->lock_), not_full_cond_ (this->lock_) #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ { ACE_TRACE ("ACE_Message_Queue::ACE_Message_Queue"); if (this->open (hwm, lwm, ns) == -1) ACE_ERROR ((LM_ERROR, "open")); } template ACE_Message_Queue::~ACE_Message_Queue (void) { ACE_TRACE ("ACE_Message_Queue::~ACE_Message_Queue"); if (this->head_ != 0 && this->close () == -1) ACE_ERROR ((LM_ERROR, "close")); } // Don't bother locking since if someone calls this function more than // once for the same queue, we're in bigger trouble than just // concurrency control! template int ACE_Message_Queue::open (size_t hwm, size_t lwm, ACE_Notification_Strategy *ns) { ACE_TRACE ("ACE_Message_Queue::open"); this->high_water_mark_ = hwm; this->low_water_mark_ = lwm; this->deactivated_ = 0; this->cur_bytes_ = 0; this->cur_count_ = 0; this->tail_ = 0; this->head_ = 0; this->notification_strategy_ = ns; return 0; } // Implementation of the public deactivate() method // (assumes locks are held). template int ACE_Message_Queue::deactivate_i (void) { ACE_TRACE ("ACE_Message_Queue::deactivate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; // Wakeup all waiters. #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) this->not_empty_cond_.broadcast (); this->not_full_cond_.broadcast (); #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ this->deactivated_ = 1; return current_status; } template int ACE_Message_Queue::activate_i (void) { ACE_TRACE ("ACE_Message_Queue::activate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; this->deactivated_ = 0; return current_status; } // Clean up the queue if we have not already done so! template int ACE_Message_Queue::close (void) { ACE_TRACE ("ACE_Message_Queue::close"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); int res = this->deactivate_i (); // Free up the remaining message on the list for (this->tail_ = 0; this->head_ != 0; ) { this->cur_count_--; ACE_Message_Block *temp; // Decrement all the counts. for (temp = this->head_; temp != 0; temp = temp->cont ()) this->cur_bytes_ -= temp->size (); temp = this->head_; this->head_ = this->head_->next (); // Make sure to use rather than since this is // reference counted. temp->release (); } return res; } template int ACE_Message_Queue::signal_enqueue_waiters (void) { #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) if (this->not_full_cond_.signal () != 0) return -1; #else if (this->enqueue_waiters_ > 0) { --this->enqueue_waiters_; return this->not_full_cond_.release (); } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } template int ACE_Message_Queue::signal_dequeue_waiters (void) { #if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) // Tell any blocked threads that the queue has a new item! if (this->not_empty_cond_.signal () != 0) return -1; #else if (this->dequeue_waiters_ > 0) { --this->dequeue_waiters_; return this->not_empty_cond_.release (); } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } // Actually put the node at the end (no locking so must be called with // locks held). template int ACE_Message_Queue::enqueue_tail_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_tail_i"); if (new_item == 0) return -1; // List was empty, so build a new one. if (this->tail_ == 0) { this->head_ = new_item; this->tail_ = new_item; new_item->next (0); new_item->prev (0); } // Link at the end. else { new_item->next (0); this->tail_->next (new_item); new_item->prev (this->tail_); this->tail_ = new_item; } // Make sure to count *all* the bytes in a composite message!!! for (ACE_Message_Block *temp = new_item; temp != 0; temp = temp->cont ()) this->cur_bytes_ += temp->size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually put the node at the head (no locking) template int ACE_Message_Queue::enqueue_head_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_head_i"); if (new_item == 0) return -1; new_item->prev (0); new_item->next (this->head_); if (this->head_ != 0) this->head_->prev (new_item); else this->tail_ = new_item; this->head_ = new_item; // Make sure to count *all* the bytes in a composite message!!! for (ACE_Message_Block *temp = new_item; temp != 0; temp = temp->cont ()) this->cur_bytes_ += temp->size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually put the node at its proper position relative to its // priority. template int ACE_Message_Queue::enqueue_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Message_Queue::enqueue_i"); if (new_item == 0) return -1; if (this->head_ == 0) // Check for simple case of an empty queue, where all we need to // do is insert into the head. return this->enqueue_head_i (new_item); else { ACE_Message_Block *temp; // Figure out where the new item goes relative to its priority. // We start looking from the highest priority to the lowest // priority. for (temp = this->tail_; temp != 0; temp = temp->prev ()) if (temp->msg_priority () >= new_item->msg_priority ()) // Break out when we've located an item that has higher // priority that . break; if (temp == 0) // Check for simple case of inserting at the head of the queue, // where all we need to do is insert before the // current head. return this->enqueue_head_i (new_item); else if (temp->next () == 0) // Check for simple case of inserting at the end of the // queue, where all we need to do is insert after // the current tail. return this->enqueue_tail_i (new_item); else { // Insert the message right before the item of equal or // higher priority. This ensures that FIFO order is // maintained when messages of the same priority are // inserted consecutively. new_item->prev (temp); new_item->next (temp->next ()); temp->next ()->prev (new_item); temp->next (new_item); } } // Make sure to count *all* the bytes in a composite message!!! for (ACE_Message_Block *temp = new_item; temp != 0; temp = temp->cont ()) this->cur_bytes_ += temp->size (); this->cur_count_++; if (this->signal_dequeue_waiters () == -1) return -1; else return this->cur_count_; } // Actually get the first ACE_Message_Block (no locking, so must be // called with locks held). This method assumes that the queue has at // least one item in it when it is called. template int ACE_Message_Queue::dequeue_head_i (ACE_Message_Block *&first_item) { ACE_TRACE ("ACE_Message_Queue::dequeue_head_i"); first_item = this->head_; this->head_ = this->head_->next (); if (this->head_ == 0) this->tail_ = 0; else // The prev pointer of the first message block has to point to // NULL... this->head_->prev (0); // Make sure to subtract off all of the bytes associated with this // message. for (ACE_Message_Block *temp = first_item; temp != 0; temp = temp->cont ()) this->cur_bytes_ -= temp->size (); this->cur_count_--; if (this->signal_enqueue_waiters () == -1) return -1; else return this->cur_count_; } // Take a look at the first item without removing it. template int ACE_Message_Queue::peek_dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::peek_dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } // Wait for at least one item to become available while (this->is_empty_i ()) { if (this->not_empty_cond_.wait (tv) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; return -1; } if (this->deactivated_) { errno = ESHUTDOWN; return -1; } } first_item = this->head_; return this->cur_count_; } template int ACE_Message_Queue::wait_not_full_cond (ACE_Guard &mon, ACE_Time_Value *tv) { #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) while (this->is_full_i ()) { ++this->enqueue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); this->not_full_cond_.acquire (); mon.acquire (); } #else ACE_UNUSED_ARG (mon); // Wait while the queue is full. while (this->is_full_i ()) { if (this->not_full_cond_.wait (tv) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; return -1; } if (this->deactivated_) { errno = ESHUTDOWN; return -1; } } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } template int ACE_Message_Queue::wait_not_empty_cond (ACE_Guard &mon, ACE_Time_Value *tv) { #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) while (this->is_empty_i ()) { ++this->dequeue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); this->not_empty_cond_.acquire (); mon.acquire (); } #else ACE_UNUSED_ARG (mon); // Wait while the queue is empty. while (this->is_empty_i ()) { if (this->not_empty_cond_.wait (tv) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; return -1; } if (this->deactivated_) { errno = ESHUTDOWN; return -1; } } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ return 0; } // Block indefinitely waiting for an item to arrive, // does not ignore alerts (e.g., signals). template int ACE_Message_Queue::enqueue_head (ACE_Message_Block *new_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::enqueue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, tv) == -1) return -1; int queue_count = this->enqueue_head_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } // Enqueue an into the in // accordance with its (0 is lowest priority). Returns // -1 on failure, else the number of items still on the queue. template int ACE_Message_Queue::enqueue_prio (ACE_Message_Block *new_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::enqueue_prio"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, tv) == -1) return -1; int queue_count = this->enqueue_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } template int ACE_Message_Queue::enqueue (ACE_Message_Block *new_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::enqueue"); return this->enqueue_prio (new_item, tv); } // Block indefinitely waiting for an item to arrive, // does not ignore alerts (e.g., signals). template int ACE_Message_Queue::enqueue_tail (ACE_Message_Block *new_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::enqueue_tail"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_full_cond (ace_mon, tv) == -1) return -1; int queue_count = this->enqueue_tail_i (new_item); if (queue_count == -1) return -1; else { this->notify (); return queue_count; } } // Remove an item from the front of the queue. If TV == 0 block // indefinitely (or until an alert occurs). Otherwise, block for upto // the amount of time specified by TV. template int ACE_Message_Queue::dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *tv) { ACE_TRACE ("ACE_Message_Queue::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); if (this->deactivated_) { errno = ESHUTDOWN; return -1; } if (this->wait_not_empty_cond (ace_mon, tv) == -1) return -1; return this->dequeue_head_i (first_item); } template int ACE_Message_Queue::notify (void) { ACE_TRACE ("ACE_Message_Queue::notify"); // By default, don't do anything. if (this->notification_strategy_ == 0) return 0; else return this->notification_strategy_->notify (); } #endif /* ACE_MESSAGE_QUEUE_C */