diff options
author | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-26 17:20:59 +0000 |
---|---|---|
committer | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-26 17:20:59 +0000 |
commit | c6254ae122ac7dae45f2dda4e52f453aed8183f8 (patch) | |
tree | 97136f3ba676fc2f9fe5209d4fbf6c10f9c7acb8 | |
parent | 1ba488338e5bdc68020c86e3b5825bde7b9019ac (diff) | |
download | ATCD-c6254ae122ac7dae45f2dda4e52f453aed8183f8.tar.gz |
On VxWorks, added ACE_Message_Queue_Vx to wrap native VxWorks message queues
-rw-r--r-- | ace/Message_Queue.cpp | 954 | ||||
-rw-r--r-- | ace/Message_Queue.h | 435 | ||||
-rw-r--r-- | ace/Message_Queue.i | 147 | ||||
-rw-r--r-- | ace/Message_Queue_T.cpp | 984 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 486 | ||||
-rw-r--r-- | ace/Message_Queue_T.i | 146 |
6 files changed, 1839 insertions, 1313 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index 171d5174742..6f1c2e53c3c 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -10,116 +10,16 @@ #include "ace/Message_Queue.i" #endif /* __ACE_INLINE__ */ -ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) +#if defined (VXWORKS) -ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) +//////////////////////////////// +// class ACE_Message_Queue_Vx // +//////////////////////////////// -////////////////////////////////////// -// class ACE_Message_Queue_Iterator // -////////////////////////////////////// - -template <ACE_SYNCH_DECL> -ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) - : queue_ (q), - curr_ (q.head_) -{ -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) -{ - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - if (this->curr_ != 0) - { - entry = this->curr_; - return 1; - } - else - return 0; -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const -{ - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - return this->curr_ == 0; -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void) +void +ACE_Message_Queue_Vx::dump (void) const { - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - if (this->curr_) - this->curr_ = this->curr_->next (); - return this->curr_ != 0; -} - -template <ACE_SYNCH_DECL> void -ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const -{ -} - -ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) - -////////////////////////////////////////////// -// class ACE_Message_Queue_Reverse_Iterator // -////////////////////////////////////////////// - -template <ACE_SYNCH_DECL> -ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) - : queue_ (q), - curr_ (queue_.tail_) -{ -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) -{ - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - if (this->curr_ != 0) - { - entry = this->curr_; - return 1; - } - else - return 0; -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const -{ - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - return this->curr_ == 0; -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void) -{ - ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); - - if (this->curr_) - this->curr_ = this->curr_->prev (); - return this->curr_ != 0; -} - -template <ACE_SYNCH_DECL> void -ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const -{ -} - -///////////////////////////// -// class ACE_Message_Queue // -///////////////////////////// - -template <ACE_SYNCH_DECL> void -ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_TRACE ("ACE_Message_Queue_Vx::dump"); ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("deactivated = %d\n") @@ -128,7 +28,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const ASYS_TEXT ("cur_bytes = %d\n") ASYS_TEXT ("cur_count = %d\n") ASYS_TEXT ("head_ = %u\n") - ASYS_TEXT ("tail_ = %u\n"), + ASYS_TEXT ("MSG_Q_ID = %u\n"), this->deactivated_, this->low_water_mark_, this->high_water_mark_, @@ -136,86 +36,79 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const this->cur_count_, this->head_, this->tail_)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n"))); - not_full_cond_.dump (); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n"))); - not_empty_cond_.dump (); ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } -template <ACE_SYNCH_DECL> -ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - : not_empty_cond_ (0), - not_full_cond_ (0), - enqueue_waiters_ (0), - dequeue_waiters_ (0) -#else - : not_empty_cond_ (this->lock_), - not_full_cond_ (this->lock_) -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ +ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy *ns) + : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns), + max_messages_ (ACE_static_cast (int, max_messages)), + max_message_length_ (ACE_static_cast (int, max_message_length)) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue"); + ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx"); - if (this->open (hwm, lwm, ns) == -1) + if (this->open (max_messages_, max_message_length_, ns) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open"))); } -template <ACE_SYNCH_DECL> -ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void) +ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue"); - if (this->head_ != 0 && this->close () == -1) - ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close"))); + ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx"); } // Don't bother locking since if someone calls this function more than // once for the same queue, we're in bigger trouble than just // concurrency control! -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) +int +ACE_Message_Queue_Vx::open (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy *ns) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open"); - this->high_water_mark_ = hwm; - this->low_water_mark_ = lwm; + ACE_TRACE ("ACE_Message_Queue_Vx::open"); + this->high_water_mark_ = 0; + this->low_water_mark_ = 0; this->deactivated_ = 0; this->cur_bytes_ = 0; this->cur_count_ = 0; - this->tail_ = 0; this->head_ = 0; this->notification_strategy_ = ns; - return 0; + this->max_messages_ = ACE_static_cast (int, max_messages); + this->max_message_length_ = ACE_static_cast (int, max_message_length); + + if (tail_) + { + // Had already created a msgQ, so delete it. + close (); + activate_i (); + } + + return (this->tail_ = + ACE_reinterpret_cast (ACE_Message_Block *, + ::msgQCreate (max_messages_, + max_message_length_, + MSG_Q_FIFO))) == NULL ? -1 : 0; } // Implementation of the public deactivate() method // (assumes locks are held). -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (void) +int +ACE_Message_Queue_Vx::deactivate_i (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::deactivate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; - // Wakeup all waiters. -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - this->not_empty_cond_.broadcast (); - this->not_full_cond_.broadcast (); -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - this->deactivated_ = 1; return current_status; } -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void) +int +ACE_Message_Queue_Vx::activate_i (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::activate_i"); int current_status = this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; this->deactivated_ = 0; @@ -224,755 +117,146 @@ ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void) // Clean up the queue if we have not already done so! -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::close (void) +int +ACE_Message_Queue_Vx::close (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - int res = this->deactivate_i (); - - // Free up the remaining message on the list - - for (this->tail_ = 0; this->head_ != 0; ) - { - this->cur_count_--; - - ACE_Message_Block *temp; + ACE_TRACE ("ACE_Message_Queue_Vx::close"); + // Don't lock, because we don't have a lock. It shouldn't be + // necessary, anyways. - // Decrement all the counts. - for (temp = this->head_; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ -= temp->size (); + this->deactivate_i (); - temp = this->head_; - this->head_ = this->head_->next (); - - // Make sure to use <release> rather than <delete> since this is - // reference counted. - temp->release (); - } + // Don't bother to free up the remaining message on the list, + // because we don't have any way to iterate over what's in the + // queue. - return res; + return ::msgQDelete (msgq ()); } -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void) +int +ACE_Message_Queue_Vx::signal_enqueue_waiters (void) { -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->not_full_cond_.signal () != 0) - return -1; -#else - if (this->enqueue_waiters_ > 0) - { - --this->enqueue_waiters_; - return this->not_full_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + // No-op. return 0; } -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void) +int +ACE_Message_Queue_Vx::signal_dequeue_waiters (void) { -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - // Tell any blocked threads that the queue has a new item! - if (this->not_empty_cond_.signal () != 0) - return -1; -#else - if (this->dequeue_waiters_ > 0) - { - --this->dequeue_waiters_; - return this->not_empty_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + // No-op. return 0; } -// Actually put the node at the end (no locking so must be called with -// locks held). - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item) +int +ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i"); - - if (new_item == 0) - return -1; - - // List was empty, so build a new one. - if (this->tail_ == 0) - { - this->head_ = new_item; - this->tail_ = new_item; - new_item->next (0); - new_item->prev (0); - } - // Link at the end. - else - { - new_item->next (0); - this->tail_->next (new_item); - new_item->prev (this->tail_); - this->tail_ = new_item; - } - - // Make sure to count *all* the bytes in a composite message!!! - - for (ACE_Message_Block *temp = new_item; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ += temp->size (); - - this->cur_count_++; - - if (this->signal_dequeue_waiters () == -1) - return -1; - else - return this->cur_count_; + ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i"); + // No-op. This should _never_ be called. + ACE_NOTSUP_RETURN (-1); } -// Actually put the node at the head (no locking) - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) +int +ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i"); if (new_item == 0) return -1; - new_item->prev (0); - new_item->next (this->head_); - - if (this->head_ != 0) - this->head_->prev (new_item); - else - this->tail_ = new_item; - - this->head_ = new_item; - - // Make sure to count *all* the bytes in a composite message!!! - - for (ACE_Message_Block *temp = new_item; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ += temp->size (); + // Don't try to send a composite message!!!! Only the first + // block will be sent. this->cur_count_++; - if (this->signal_dequeue_waiters () == -1) - return -1; + // Always use this method to actually send a message on the queue. + if (::msgQSend (msgq (), + new_item->rd_ptr (), + new_item->size (), + WAIT_FOREVER, + MSG_PRI_NORMAL) == OK) + return ::msgQNumMsgs (msgq ()); else - return this->cur_count_; + return -1; } -// Actually put the node at its proper position relative to its -// priority. - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +int +ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i"); if (new_item == 0) return -1; if (this->head_ == 0) - // Check for simple case of an empty queue, where all we need to - // do is insert <new_item> into the head. + // Should always take this branch. return this->enqueue_head_i (new_item); else - { - ACE_Message_Block *temp; - - // Figure out where the new item goes relative to its priority. - // We start looking from the highest priority to the lowest - // priority. - - for (temp = this->head_; - temp != 0; - temp = temp->next ()) - if (temp->msg_priority () < new_item->msg_priority ()) - // Break out when we've located an item that has lower - // priority that <new_item>. - break; - - if (temp == 0) - // Check for simple case of inserting at the tail of the queue, - // where all we need to do is insert <new_item> after the - // current tail. - return this->enqueue_tail_i (new_item); - else if (temp->prev () == 0) - // Check for simple case of inserting at the head of the - // queue, where all we need to do is insert <new_item> before - // the current head. - return this->enqueue_head_i (new_item); - else - { - // Insert the new message ahead of the item of - // lesser priority. This ensures that FIFO order is - // maintained when messages of the same priority are - // inserted consecutively. - new_item->next (temp); - new_item->prev (temp->prev ()); - temp->prev ()->next (new_item); - temp->prev (new_item); - } - } - - // Make sure to count *all* the bytes in a composite message!!! - - for (ACE_Message_Block *temp = new_item; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ += temp->size (); - - this->cur_count_++; - - if (this->signal_dequeue_waiters () == -1) - return -1; - else - return this->cur_count_; + ACE_NOTSUP_RETURN (-1); } // Actually get the first ACE_Message_Block (no locking, so must be // called with locks held). This method assumes that the queue has at // least one item in it when it is called. -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +int +ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); - first_item = this->head_; - this->head_ = this->head_->next (); + ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i"); - if (this->head_ == 0) - this->tail_ = 0; - else - // The prev pointer of the first message block has to point to - // NULL... - this->head_->prev (0); + // We don't allocate a new Message_Block: the caller must provide + // it, and must ensure that it is big enough (without chaining). - // Make sure to subtract off all of the bytes associated with this - // message. - for (ACE_Message_Block *temp = first_item; - temp != 0; - temp = temp->cont ()) - this->cur_bytes_ -= temp->size (); - - this->cur_count_--; + if (first_item == 0 || first_item->wr_ptr () == 0) + return -1; - if (this->signal_enqueue_waiters () == -1) + if (::msgQReceive (msgq (), + first_item->wr_ptr (), + first_item->size (), + WAIT_FOREVER) == ERROR) return -1; else - return this->cur_count_; + return ::msgQNumMsgs (msgq ()); } // Take a look at the first item without removing it. -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) +int +ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon, + ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - - // Wait for at least one item to become available. - - if (this->wait_not_empty_cond (ace_mon, tv) == -1) - return -1; - - first_item = this->head_; - return this->cur_count_; -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) -{ - int result = 0; -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - while (this->is_full_i () && result != -1) - { - ++this->enqueue_waiters_; - // @@ Need to add sanity checks for failure... - mon.release (); - if (tv == 0) - result = this->not_full_cond_.acquire (); - else - result = this->not_full_cond_.acquire (*tv); - int error = errno; - mon.acquire (); - errno = error; - } -#else - ACE_UNUSED_ARG (mon); - - // Wait while the queue is full. - - while (this->is_full_i ()) - { - if (this->not_full_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - result = -1; - break; - } - if (this->deactivated_) - { - errno = ESHUTDOWN; - result = -1; - break; - } - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return result; + // Always return here, and let the VxWorks message queue handle blocking. + return 0; } -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) +int +ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon, + ACE_Time_Value *tv) { - int result = 0; -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - while (this->is_empty_i () && result != -1) - { - ++this->dequeue_waiters_; - // @@ Need to add sanity checks for failure... - mon.release (); - if (tv == 0) - result = this->not_empty_cond_.acquire (); - else - { - result = this->not_empty_cond_.acquire (*tv); - if (result == -1 && errno == ETIME) - errno = EWOULDBLOCK; - } - int error = errno; - mon.acquire (); - errno = error; - } -#else - ACE_UNUSED_ARG (mon); - - // Wait while the queue is empty. - - while (this->is_empty_i ()) - { - if (this->not_empty_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - result = -1; - break; - } - if (this->deactivated_) - { - errno = ESHUTDOWN; - result = -1; - break; - } - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return result; + // Always return here, and let the VxWorks message queue handle blocking. + return 0; } -// Block indefinitely waiting for an item to arrive, does not ignore -// alerts (e.g., signals). - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *tv) +#if ! defined (ACE_REQUIRES_FUNC_DEFINITIONS) +int +ACE_Message_Queue_Vx::enqueue_tail (ACE_Message_Block *mb, ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - - if (this->wait_not_full_cond (ace_mon, tv) == -1) - return -1; - - int queue_count = this->enqueue_head_i (new_item); - - if (queue_count == -1) - return -1; - else - { - this->notify (); - return queue_count; - } + ACE_NOTSUP_RETURN (-1); } -// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in -// accordance with its <msg_priority> (0 is lowest priority). Returns -// -1 on failure, else the number of items still on the queue. - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, - ACE_Time_Value *tv) +int +ACE_Message_Queue_Vx::enqueue_head (ACE_Message_Block *mb, ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - - if (this->wait_not_full_cond (ace_mon, tv) == -1) - return -1; - - int queue_count = this->enqueue_i (new_item); - - if (queue_count == -1) - return -1; - else - { - this->notify (); - return queue_count; - } + ACE_NOTSUP_RETURN (-1); } -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, +int +ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&, ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue"); - return this->enqueue_prio (new_item, tv); -} - -// Block indefinitely waiting for an item to arrive, -// does not ignore alerts (e.g., signals). - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *tv) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - - if (this->wait_not_full_cond (ace_mon, tv) == -1) - return -1; - - int queue_count = this->enqueue_tail_i (new_item); - - if (queue_count == -1) - return -1; - else - { - this->notify (); - return queue_count; - } -} - -// Remove an item from the front of the queue. If TV == 0 block -// indefinitely (or until an alert occurs). Otherwise, block for upto -// the amount of time specified by TV. - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - - if (this->wait_not_empty_cond (ace_mon, tv) == -1) - return -1; - - return this->dequeue_head_i (first_item); -} - -template <ACE_SYNCH_DECL> int -ACE_Message_Queue<ACE_SYNCH_USE>::notify (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify"); - - // By default, don't do anything. - if (this->notification_strategy_ == 0) - return 0; - else - return this->notification_strategy_->notify (); -} - - -///////////////////////////////////// -// class ACE_Dynamic_Message_Queue // -///////////////////////////////////// - - // = Initialization and termination methods. -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( - ACE_Dynamic_Message_Strategy & message_strategy, - size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) - : ACE_Message_Queue (hwm, lwm, ns) - , message_strategy_ (message_strategy) -{ - // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the - // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor -} - -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) -{ - delete &message_strategy_; -} -// dtor: free message strategy and let base class dtor do the rest - -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) -{ - ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); - - int result = 0; - - // refresh dynamic priority of the new message - result = (*priority_eval_func_ptr_) (*new_item, tv); - - // get the current time - ACE_Time_Value current_time = ACE_OS::gettimeofday (); - - // refresh dynamic priorities of messages in the queue - this->refresh_priorities (current_time); - - // reorganize the queue according to the new priorities - this->refresh_queue (current_time); - - // if there is only one message in the pending list, - // the pending list will be empty after a *successful* - // dequeue operation - int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; - - // invoke the base class method - result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); - - // null out the pending list tail pointer if - // the pending list is now empty - if ((empty_pending) && (result > 0)) - { - pending_list_tail_ = 0; - } - - return result; -} - // Enqueue an <ACE_Message_Block *> in accordance with its priority. - // priority may be *dynamic* or *static* or a combination or *both* - // It calls the priority evaluation function passed into the Dynamic - // Message Queue constructor to update the priorities of all enqueued - // messages. - -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) -{ - ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); - - int result = 0; - - // get the current time - ACE_Time_Value current_time = ACE_OS::gettimeofday (); - - // refresh dynamic priorities of messages in the queue - result = this->refresh_priorities (current_time); - if (result < 0) - { - return result; - } - - // reorganize the queue according to the new priorities - result = this->refresh_queue (current_time); - if (result < 0) - { - return result; - } - - // if there is only one message in the pending list, - // the pending list will be empty after a *successful* - // dequeue operation - int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; - - // invoke the base class method - result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); - - // null out the pending list tail pointer if - // the pending list is now empty - if ((empty_pending) && (result > 0)) - { - pending_list_tail_ = 0; - } - - return result; -} - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. - -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) -{ - int result = 0; - - // apply the priority update function to all enqueued - // messages, starting at the head of the queue - ACE_Message_Block *temp = head_; - while (temp) - { - result = (*priority_eval_func_ptr_) (*temp, tv); - if (result < 0) - { - break; - } - - temp = temp->next (); - } - - return result; -} - // refresh the priorities in the queue according - // to a specific priority assignment function - -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) -{ - // first, drop any messages from the queue and delete them: - // reference counting at the data block level means that the - // underlying data block will not be deleted if another - // message block is still pointing to it. - ACE_Message_Block *temp = (pending_list_tail_) - ? pending_list_tail_->next () - : head_; - - while (temp) - { - // messages that have overflowed the given time bounds must be removed - if (message_strategy_.is_beyond_late (*temp, tv)) - { - // find the end of the chain of overflowed messages - ACE_Message_Block *remove_tail = temp; - while ((remove_tail) && (remove_tail->next ()) && - message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) - { - remove_tail = remove_tail->next (); - } - - temp = remove_tail->next (); - if (remove_temp->next ()) - { - remove_temp->next ()->prev (0); - } - else if (remove_temp->prev ()) - { - remove_temp->prev ()->next (0); - } - else - { - head_ = 0; - tail_ = 0; - } - remove_temp->prev (0); - remove_temp->next (0); - - temp = remove_temp->next (); - - } - else - { - temp = temp->next (); - } - } -} - // refresh the order of messages in the queue - // after refreshing their priorities - -///////////////////////////////////// -// class ACE_Message_Queue_Factory // -///////////////////////////////////// - -template <ACE_SYNCH_DECL> -ACE_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns) -{ - return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns); -} - // factory method for a statically prioritized ACE_Message_Queue - -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Deadline_Message_Strategy *adms; - - ACE_NEW_RETURN (adms, - ACE_Deadline_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); - - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns); -} - // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue - -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Laxity_Message_Strategy *alms; - - ACE_NEW_RETURN (alms, - ACE_Laxity_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); - - - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns); + ACE_NOTSUP_RETURN (-1); } - // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue +#endif /* ! ACE_REQUIRES_FUNC_DEFINITIONS */ +#endif /* VXWORKS */ #endif /* ACE_MESSAGE_QUEUE_C */ diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h index a6d625829ab..1544d800868 100644 --- a/ace/Message_Queue.h +++ b/ace/Message_Queue.h @@ -49,96 +49,62 @@ public: }; }; -template <ACE_SYNCH_DECL> -class ACE_Message_Queue : public ACE_Message_Queue_Base +// Include the templates here. +#include "ace/Message_Queue_T.h" + +#if defined (VXWORKS) +# include <msgQLib.h> + +class ACE_Message_Queue_Vx : public ACE_Message_Queue<ACE_NULL_SYNCH> { // = TITLE - // A threaded message queueing facility, modeled after the - // queueing facilities in System V STREAMs. + // Wrapper for VxWorks message queues. // // = DESCRIPTION - // An <ACE_Message_Queue> is the central queueing facility for - // messages in the ASX framework. If <ACE_SYNCH_DECL> is - // ACE_MT_SYNCH then all operations are thread-safe. Otherwise, - // if it's <ACE_NULL_SYNCH> then there's no locking overhead. + // Specialization of ACE_Message_Queue to simply wrap VxWorks + // MsgQ. It does not use any synchronization, because it relies + // on the native MsgQ implementation to take care of that. The + // only system calls that it uses are VxWorks msgQLib calls, so + // it is suitable for use in iterrupt service routines. + // + // NOTE: *Many* ACE_Message_Queue features are not supported with + // this specialization, including: + // * The two size arguments to the constructor and open () are + // interpreted differently. The first is interpreted as the + // maximum number of bytes in a message. The second is + // interpreted as the maximum number of messages that can be + // queued. + // * dequeue_head () *requires* that the ACE_Message_Block + // pointer argument point to an ACE_Message_Block that was + // allocated by the caller. It must be big enough to support + // the received message, without using continutation. The + // pointer argument is not modified. + // * Message priority. MSG_Q_FIFO is hard-coded. + // * enqueue method timeouts. + // * peek_dequeue_head (). + // * ACE_Message_Queue_Iterators. + // * The ability to change low and high water marks after creation. + // * Message_Block chains. The continuation field of ACE_Message_Block + // * is ignored; only the first block of a fragment chain is + // * recognized. public: - friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; - friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; - - // = Traits - typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR; - typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR; - // = Initialization and termination methods. - ACE_Message_Queue (size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, - ACE_Notification_Strategy * = 0); + ACE_Message_Queue_Vx (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy * = 0); // Create a message queue with all the defaults. - virtual int open (size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, + virtual int open (size_t max_messages, + size_t max_message_length, ACE_Notification_Strategy * = 0); // Create a message queue with all the defaults. virtual int close (void); // Close down the message queue and release all resources. - virtual ~ACE_Message_Queue (void); + virtual ~ACE_Message_Queue_Vx (void); // Close down the message queue and release all resources. - // = Enqueue and dequeue methods. - - // For all the following routines if <timeout> == 0, the caller will - // block until action is possible, else will wait until the absolute - // time specified in *<timeout> elapses). These calls will return, - // however, when queue is closed, deactivated, when a signal occurs, - // or if the time specified in timeout elapses, (in which case errno - // = EWOULDBLOCK). - - virtual int peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv = 0); - // Retrieve the first <ACE_Message_Block> without removing it. - // Returns -1 on failure, else the number of items still on the - // queue. - - virtual int enqueue_prio (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in - // accordance with its <msg_priority> (0 is lowest priority). FIFO - // order is maintained when messages of the same priority are - // inserted consecutively. Returns -1 on failure, else the number - // of items still on the queue. - - virtual int enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - // This is an alias for <enqueue_prio>. It's only here for - // backwards compatibility and will go away in a subsequent release. - // Please use <enqueue_prio> instead. - - virtual int enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - // Enqueue an <ACE_Message_Block *> at the end of the queue. - // Returns -1 on failure, else the number of items still on the - // queue. - - virtual int enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - // Enqueue an <ACE_Message_Block *> at the head of the queue. - // Returns -1 on failure, else the number of items still on the - // queue. - - virtual int dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout = 0); - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. Returns -1 on failure, else the number of items still on - // the queue. - - // = Check if queue is full/empty. - virtual int is_full (void); - // True if queue is full, else false. - virtual int is_empty (void); - // True if queue is empty, else false. - // = Queue statistic methods. virtual size_t message_bytes (void); // Number of total bytes on the queue. @@ -157,39 +123,6 @@ public: // = Activation control methods. - virtual int deactivate (void); - // Deactivate the queue and wakeup all threads waiting on the queue - // so they can continue. No messages are removed from the queue, - // however. Any other operations called until the queue is - // activated again will immediately return -1 with <errno> == - // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the - // call and WAS_ACTIVE if queue was active before the call. - - virtual int activate (void); - // Reactivate the queue so that threads can enqueue and dequeue - // messages again. Returns WAS_INACTIVE if queue was inactive - // before the call and WAS_ACTIVE if queue was active before the - // call. - - virtual int deactivated (void); - // Returns true if <deactivated_> is enabled. - - // = Notification hook. - - virtual int notify (void); - // This hook is automatically invoked by <enqueue_head>, - // <enqueue_tail>, and <enqueue_prio> when a new item is inserted - // into the queue. Subclasses can override this method to perform - // specific notification strategies (e.g., signaling events for a - // <WFMO_Reactor>, notifying a <Reactor>, etc.). In a - // multi-threaded application with concurrent consumers, there is no - // guarantee that the queue will be still be non-empty by the time - // the notification occurs. - - // = Get/set the notification strategy for the <Message_Queue> - virtual ACE_Notification_Strategy *notification_strategy (void); - virtual void notification_strategy (ACE_Notification_Strategy *s); - void dump (void) const; // Dump the state of an object. @@ -197,11 +130,6 @@ public: // Declare the dynamic allocation hooks. protected: - // = Routines that actually do the enqueueing and dequeueing. - // These routines assume that locks are held by the corresponding - // public methods. Since they are virtual, you can change the - // queueing mechanism by subclassing from <ACE_Message_Queue>. - virtual int enqueue_i (ACE_Message_Block *new_item); // Enqueue an <ACE_Message_Block *> in accordance with its priority. @@ -228,11 +156,11 @@ protected: // Activate the queue. // = Helper methods to factor out common #ifdef code. - virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + virtual int wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon, ACE_Time_Value *tv); // Wait for the queue to become non-full. - virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + virtual int wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon, ACE_Time_Value *tv); // Wait for the queue to become non-empty. @@ -242,264 +170,36 @@ protected: virtual int signal_dequeue_waiters (void); // Inform any threads waiting to dequeue that they can procede. - ACE_Message_Block *head_; - // Pointer to head of ACE_Message_Block list. - - ACE_Message_Block *tail_; - // Pointer to tail of ACE_Message_Block list. - - size_t low_water_mark_; - // Lowest number before unblocking occurs. - - size_t high_water_mark_; - // Greatest number of bytes before blocking. - - size_t cur_bytes_; - // Current number of bytes in the queue. - - size_t cur_count_; - // Current number of messages in the queue. - - int deactivated_; - // Indicates that the queue is inactive. - - ACE_Notification_Strategy *notification_strategy_; - // The notification strategy used when a new message is enqueued. - - // = Synchronization primitives for controlling concurrent access. - ACE_SYNCH_MUTEX_T lock_; - // Protect queue from concurrent access. - -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - ACE_SYNCH_SEMAPHORE_T not_empty_cond_; - // Used to make threads sleep until the queue is no longer empty. - - ACE_SYNCH_SEMAPHORE_T not_full_cond_; - // Used to make threads sleep until the queue is no longer full. - - size_t dequeue_waiters_; - // Number of threads waiting to dequeue a <Message_Block>. - - size_t enqueue_waiters_; - // Number of threads waiting to enqueue a <Message_Block>. -#else - ACE_SYNCH_CONDITION_T not_empty_cond_; - // Used to make threads sleep until the queue is no longer empty. - - ACE_SYNCH_CONDITION_T not_full_cond_; - // Used to make threads sleep until the queue is no longer full. -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + MSG_Q_ID msgq (); + // Access the underlying msgQ. private: + int max_messages_; + // Maximum number of messages that can be queued. - // = Disallow these operations. - ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) - ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) -}; - -template <ACE_SYNCH_DECL> -class ACE_Message_Queue_Iterator -{ - // = TITLE - // Iterator for the <ACE_Message_Queue>. -public: - // = Initialization method. - ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); - - // = Iteration methods. - int next (ACE_Message_Block *&entry); - // Pass back the <entry> that hasn't been seen in the queue. - // Returns 0 when all items have been seen, else 1. - - int done (void) const; - // Returns 1 when all items have been seen, else 0. - - int advance (void); - // Move forward by one element in the queue. Returns 0 when all the - // items in the set have been seen, else 1. - - void dump (void) const; - // Dump the state of an object. + int max_message_length_; + // Maximum message size, in bytes. - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - ACE_Message_Queue <ACE_SYNCH_USE> &queue_; - // Message_Queue we are iterating over. - - ACE_Message_Block *curr_; - // Keeps track of how far we've advanced... -}; + int options_; + // Native message queue options. -template <ACE_SYNCH_DECL> -class ACE_Message_Queue_Reverse_Iterator -{ - // = TITLE - // Reverse Iterator for the <ACE_Message_Queue>. -public: - // = Initialization method. - ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); - - // = Iteration methods. - int next (ACE_Message_Block *&entry); - // Pass back the <entry> that hasn't been seen in the queue. - // Returns 0 when all items have been seen, else 1. - - int done (void) const; - // Returns 1 when all items have been seen, else 0. - - int advance (void); - // Move forward by one element in the queue. Returns 0 when all the - // items in the set have been seen, else 1. - - void dump (void) const; - // Dump the state of an object. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -private: - ACE_Message_Queue <ACE_SYNCH_USE> &queue_; - // Message_Queue we are iterating over. - - ACE_Message_Block *curr_; - // Keeps track of how far we've advanced... -}; - -template <ACE_SYNCH_DECL> -class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> -{ - // = TITLE - // A derived class which adapts the <ACE_Message_Queue> - // class in order to maintain dynamic priorities for enqueued - // <ACE_Message_Blocks> and manage the queue dynamically. - // - // = DESCRIPTION - // Priorities and queue orderings are refreshed at each enqueue - // and dequeue operation. Head and tail enqueue methods were - // made private to prevent out-of-order messages from confusing - // the pending and late portions of the queue. Messages in the - // pending portion of the queue whose dynamic priority becomes - // negative are placed into the late portion of the queue. - // Messages in the late portion of the queue whose dynamic - // priority becomes positive are dropped. These behaviors - // support a limited schedule overrun corresponding to one full - // cycle through dynamic priority values. These behaviors can - // be modified in derived classes by providing alternative - // definitions for the appropriate virtual methods. - // -public: - // = Initialization and termination methods. - ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, - size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, - ACE_Notification_Strategy * = 0); - - virtual ~ACE_Dynamic_Message_Queue (void); - // Close down the message queue and release all resources. - - ACE_ALLOC_HOOK_DECLARE; - // Declare the dynamic allocation hooks. - -protected: - virtual int enqueue_i (ACE_Message_Block *new_item); - // Enqueue an <ACE_Message_Block *> in accordance with its priority. - // priority may be *dynamic* or *static* or a combination or *both* - // It calls the priority evaluation function passed into the Dynamic - // Message Queue constructor to update the priorities of all - // enqueued messages. - - virtual int dequeue_head_i (ACE_Message_Block *&first_item); - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. - - virtual int refresh_priorities (const ACE_Time_Value & tv); - // Refresh the priorities in the queue according to a specific - // priority assignment function. - - virtual int refresh_queue (const ACE_Time_Value & tv); - // Refresh the order of messages in the queue after refreshing their - // priorities. + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Vx &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Vx (const ACE_Message_Queue_Vx &)) - ACE_Message_Block *pending_list_tail_; - // Pointer to tail of the pending messages (those whose priority is - // and has been non-negative) in the <ACE_Message_Block list>. + ACE_UNIMPLEMENTED_FUNC (virtual int peek_dequeue_head + (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0)) - ACE_Dynamic_Message_Strategy &message_strategy_; - // Pointer to a dynamic priority evaluation function. + ACE_UNIMPLEMENTED_FUNC (virtual int enqueue_tail + (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) -private: - // = Disallow these operations. - ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) - ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) - - // = These methods can wierdness in dynamically prioritized queue. - - // Disallow their use until and unless a coherent semantics for head - // and tail enqueueing can be identified. - ACE_UNIMPLEMENTED_FUNC (virtual int - enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0)) - ACE_UNIMPLEMENTED_FUNC (virtual int - enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0)) - - ACE_UNIMPLEMENTED_FUNC (virtual int - peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv = 0)) - // Since messages are *dynamically* prioritized, it is not possible - // to guarantee that the message at the head of the queue when this - // method is called will still be at the head when the next method - // is called: disallow its use until and unless a coherent semantics - // for peeking at the head of the queue can be identified. -}; - -template <ACE_SYNCH_DECL> -class ACE_Export ACE_Message_Queue_Factory -{ - // = TITLE - // ACE_Message_Queue_Factory is a static factory class template which - // provides a separate factory method for each of the major kinds of - // priority based message dispatching: static, earliest deadline first - // (EDF), and minimum laxity first (MLF). - // - // = DESCRIPTION - // The ACE_Dynamic_Message_Queue class assumes responsibility for - // releasing the resources of the strategy with which it was - // constructed: the user of a message queue constructed by - // any of these factory methods is only responsible for - // ensuring destruction of the message queue itself. - -public: - static ACE_Message_Queue<ACE_SYNCH_USE> * - create_static_message_queue (size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, - ACE_Notification_Strategy * = 0); - // factory method for a statically prioritized ACE_Message_Queue - - static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * - create_deadline_message_queue (size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, - ACE_Notification_Strategy * = 0, - u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue - - static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * - create_laxity_message_queue (size_t hwm = DEFAULT_HWM, - size_t lwm = DEFAULT_LWM, - ACE_Notification_Strategy * = 0, - u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue + ACE_UNIMPLEMENTED_FUNC (virtual int enqueue_head + (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) }; +#endif /* VXWORKS */ // This must go here to avoid problems with circular includes. #include "ace/Strategies.h" @@ -508,13 +208,4 @@ public: #include "ace/Message_Queue.i" #endif /* __ACE_INLINE__ */ -#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) -#include "ace/Message_Queue.cpp" -#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ - -#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) -#pragma implementation ("Message_Queue.cpp") -#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ - #endif /* ACE_MESSAGE_QUEUE_H */ - diff --git a/ace/Message_Queue.i b/ace/Message_Queue.i index 6697eb242a4..6ed62567167 100644 --- a/ace/Message_Queue.i +++ b/ace/Message_Queue.i @@ -1,148 +1,83 @@ /* -*- C++ -*- */ // $Id$ -// Message_Queue.i +#if defined (VXWORKS) +// Specialization to use native VxWorks Message Queues. -template <ACE_SYNCH_DECL> ACE_INLINE ACE_Notification_Strategy * -ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void) +ACE_INLINE MSG_Q_ID +ACE_Message_Queue_Vx::msgq () { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); - - return this->notification_strategy_; + // Hijack the tail_ field to store the MSG_Q_ID. + return ACE_reinterpret_cast (MSG_Q_ID, tail_); } -template <ACE_SYNCH_DECL> ACE_INLINE void -ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s) +ACE_INLINE int +ACE_Message_Queue_Vx::is_empty_i (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); - - this->notification_strategy_ = s; -} - -// Check if queue is empty (does not hold locks). - -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::is_empty_i"); return this->cur_bytes_ <= 0 && this->cur_count_ <= 0; } -// Check if queue is full (does not hold locks). - -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void) +ACE_INLINE int +ACE_Message_Queue_Vx::is_full_i (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i"); + ACE_TRACE ("ACE_Message_Queue_Vx::is_full_i"); return this->cur_bytes_ > this->high_water_mark_; } -// Check if queue is empty (holds locks). - -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - return this->is_empty_i (); -} - -// Check if queue is full (holds locks). - -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - return this->is_full_i (); -} - -template <ACE_SYNCH_DECL> ACE_INLINE size_t -ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void) +ACE_INLINE size_t +ACE_Message_Queue_Vx::high_water_mark (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark"); + // Don't need to guard, because this is fixed. return this->high_water_mark_; } -template <ACE_SYNCH_DECL> ACE_INLINE void -ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm) +ACE_INLINE void +ACE_Message_Queue_Vx::high_water_mark (size_t hwm) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); - ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); - - this->high_water_mark_ = hwm; + ACE_TRACE ("ACE_Message_Queue_Vx::high_water_mark"); + // Don't need to guard, because this is fixed. + errno = ENOTSUP; } -template <ACE_SYNCH_DECL> ACE_INLINE size_t -ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void) +ACE_INLINE size_t +ACE_Message_Queue_Vx::low_water_mark (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark"); + // Don't need to guard, because this is fixed. return this->low_water_mark_; } -template <ACE_SYNCH_DECL> ACE_INLINE void -ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm) +ACE_INLINE void +ACE_Message_Queue_Vx::low_water_mark (size_t lwm) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); - ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + ACE_TRACE ("ACE_Message_Queue_Vx::low_water_mark"); + // Don't need to guard, because this is fixed. - this->low_water_mark_ = lwm; + errno = ENOTSUP; } // Return the current number of bytes in the queue. -template <ACE_SYNCH_DECL> ACE_INLINE size_t -ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); - - return this->cur_bytes_; -} - -// Return the current number of messages in the queue. - -template <ACE_SYNCH_DECL> ACE_INLINE size_t -ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void) +ACE_INLINE size_t +ACE_Message_Queue_Vx::message_bytes (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); - - return this->cur_count_; + ACE_TRACE ("ACE_Message_Queue_Vx::message_bytes"); + ACE_NOTSUP_RETURN ((size_t) -1); } -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::activate (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - return this->activate_i (); -} - -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (void) -{ - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - return this->deactivate_i (); -} +// Return the current number of messages in the queue. -template <ACE_SYNCH_DECL> ACE_INLINE int -ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void) +ACE_INLINE size_t +ACE_Message_Queue_Vx::message_count (void) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated"); + ACE_TRACE ("ACE_Message_Queue_Vx::message_count"); + // Don't need to guard, because this is a system call. - return this->deactivated_; + return ::msgQNumMsgs (msgq ()); } -ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator) - - +#endif /* VXWORKS */ diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp new file mode 100644 index 00000000000..bb5370db1c2 --- /dev/null +++ b/ace/Message_Queue_T.cpp @@ -0,0 +1,984 @@ +// $Id$ + +#if !defined (ACE_MESSAGE_QUEUE_T_C) +#define ACE_MESSAGE_QUEUE_T_C + +#define ACE_BUILD_DLL +// #include Message_Queue.h instead of Message_Queue_T.h to avoid +// circular include problems. +#include "ace/Message_Queue.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Message_Queue_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) + +ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) + + +////////////////////////////////////// +// class ACE_Message_Queue_Iterator // +////////////////////////////////////// + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) + : queue_ (q), + curr_ (q.head_) +{ +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + if (this->curr_ != 0) + { + entry = this->curr_; + return 1; + } + else + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + return this->curr_ == 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void) +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + if (this->curr_) + this->curr_ = this->curr_->next (); + return this->curr_ != 0; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const +{ +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) + + +////////////////////////////////////////////// +// class ACE_Message_Queue_Reverse_Iterator // +////////////////////////////////////////////// + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) + : queue_ (q), + curr_ (queue_.tail_) +{ +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + if (this->curr_ != 0) + { + entry = this->curr_; + return 1; + } + else + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + return this->curr_ == 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void) +{ + ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_); + + if (this->curr_) + this->curr_ = this->curr_->prev (); + return this->curr_ != 0; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const +{ +} + + +///////////////////////////// +// class ACE_Message_Queue // +///////////////////////////// + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("deactivated = %d\n") + ASYS_TEXT ("low_water_mark = %d\n") + ASYS_TEXT ("high_water_mark = %d\n") + ASYS_TEXT ("cur_bytes = %d\n") + ASYS_TEXT ("cur_count = %d\n") + ASYS_TEXT ("head_ = %u\n") + ASYS_TEXT ("tail_ = %u\n"), + this->deactivated_, + this->low_water_mark_, + this->high_water_mark_, + this->cur_bytes_, + this->cur_count_, + this->head_, + this->tail_)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n"))); + not_full_cond_.dump (); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n"))); + not_empty_cond_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + : not_empty_cond_ (0), + not_full_cond_ (0), + enqueue_waiters_ (0), + dequeue_waiters_ (0) +#else + : not_empty_cond_ (this->lock_), + not_full_cond_ (this->lock_) +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue"); + + if (this->open (hwm, lwm, ns) == -1) + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open"))); +} + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue"); + if (this->head_ != 0 && this->close () == -1) + ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close"))); +} + +// Don't bother locking since if someone calls this function more than +// once for the same queue, we're in bigger trouble than just +// concurrency control! + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open"); + this->high_water_mark_ = hwm; + this->low_water_mark_ = lwm; + this->deactivated_ = 0; + this->cur_bytes_ = 0; + this->cur_count_ = 0; + this->tail_ = 0; + this->head_ = 0; + this->notification_strategy_ = ns; + return 0; +} + +// Implementation of the public deactivate() method +// (assumes locks are held). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i"); + int current_status = + this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + + // Wakeup all waiters. +#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + this->not_empty_cond_.broadcast (); + this->not_full_cond_.broadcast (); +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + + this->deactivated_ = 1; + return current_status; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i"); + int current_status = + this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE; + this->deactivated_ = 0; + return current_status; +} + +// Clean up the queue if we have not already done so! + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::close (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + int res = this->deactivate_i (); + + // Free up the remaining message on the list + + for (this->tail_ = 0; this->head_ != 0; ) + { + this->cur_count_--; + + ACE_Message_Block *temp; + + // Decrement all the counts. + for (temp = this->head_; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); + + temp = this->head_; + this->head_ = this->head_->next (); + + // Make sure to use <release> rather than <delete> since this is + // reference counted. + temp->release (); + } + + return res; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void) +{ +#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + if (this->not_full_cond_.signal () != 0) + return -1; +#else + if (this->enqueue_waiters_ > 0) + { + --this->enqueue_waiters_; + return this->not_full_cond_.release (); + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void) +{ +#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + // Tell any blocked threads that the queue has a new item! + if (this->not_empty_cond_.signal () != 0) + return -1; +#else + if (this->dequeue_waiters_ > 0) + { + --this->dequeue_waiters_; + return this->not_empty_cond_.release (); + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return 0; +} + +// Actually put the node at the end (no locking so must be called with +// locks held). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i"); + + if (new_item == 0) + return -1; + + // List was empty, so build a new one. + if (this->tail_ == 0) + { + this->head_ = new_item; + this->tail_ = new_item; + new_item->next (0); + new_item->prev (0); + } + // Link at the end. + else + { + new_item->next (0); + this->tail_->next (new_item); + new_item->prev (this->tail_); + this->tail_ = new_item; + } + + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); + + this->cur_count_++; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Actually put the node at the head (no locking) + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i"); + + if (new_item == 0) + return -1; + + new_item->prev (0); + new_item->next (this->head_); + + if (this->head_ != 0) + this->head_->prev (new_item); + else + this->tail_ = new_item; + + this->head_ = new_item; + + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); + + this->cur_count_++; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Actually put the node at its proper position relative to its +// priority. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + if (new_item == 0) + return -1; + + if (this->head_ == 0) + // Check for simple case of an empty queue, where all we need to + // do is insert <new_item> into the head. + return this->enqueue_head_i (new_item); + else + { + ACE_Message_Block *temp; + + // Figure out where the new item goes relative to its priority. + // We start looking from the highest priority to the lowest + // priority. + + for (temp = this->head_; + temp != 0; + temp = temp->next ()) + if (temp->msg_priority () < new_item->msg_priority ()) + // Break out when we've located an item that has lower + // priority that <new_item>. + break; + + if (temp == 0) + // Check for simple case of inserting at the tail of the queue, + // where all we need to do is insert <new_item> after the + // current tail. + return this->enqueue_tail_i (new_item); + else if (temp->prev () == 0) + // Check for simple case of inserting at the head of the + // queue, where all we need to do is insert <new_item> before + // the current head. + return this->enqueue_head_i (new_item); + else + { + // Insert the new message ahead of the item of + // lesser priority. This ensures that FIFO order is + // maintained when messages of the same priority are + // inserted consecutively. + new_item->next (temp); + new_item->prev (temp->prev ()); + temp->prev ()->next (new_item); + temp->prev (new_item); + } + } + + // Make sure to count *all* the bytes in a composite message!!! + + for (ACE_Message_Block *temp = new_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ += temp->size (); + + this->cur_count_++; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Actually get the first ACE_Message_Block (no locking, so must be +// called with locks held). This method assumes that the queue has at +// least one item in it when it is called. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + first_item = this->head_; + this->head_ = this->head_->next (); + + if (this->head_ == 0) + this->tail_ = 0; + else + // The prev pointer of the first message block has to point to + // NULL... + this->head_->prev (0); + + // Make sure to subtract off all of the bytes associated with this + // message. + for (ACE_Message_Block *temp = first_item; + temp != 0; + temp = temp->cont ()) + this->cur_bytes_ -= temp->size (); + + this->cur_count_--; + + if (this->signal_enqueue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Take a look at the first item without removing it. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + // Wait for at least one item to become available. + + if (this->wait_not_empty_cond (ace_mon, tv) == -1) + return -1; + + first_item = this->head_; + return this->cur_count_; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv) +{ + int result = 0; +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + while (this->is_full_i () && result != -1) + { + ++this->enqueue_waiters_; + // @@ Need to add sanity checks for failure... + mon.release (); + if (tv == 0) + result = this->not_full_cond_.acquire (); + else + result = this->not_full_cond_.acquire (*tv); + int error = errno; + mon.acquire (); + errno = error; + } +#else + ACE_UNUSED_ARG (mon); + + // Wait while the queue is full. + + while (this->is_full_i ()) + { + if (this->not_full_cond_.wait (tv) == -1) + { + if (errno == ETIME) + errno = EWOULDBLOCK; + result = -1; + break; + } + if (this->deactivated_) + { + errno = ESHUTDOWN; + result = -1; + break; + } + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return result; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv) +{ + int result = 0; +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + while (this->is_empty_i () && result != -1) + { + ++this->dequeue_waiters_; + // @@ Need to add sanity checks for failure... + mon.release (); + if (tv == 0) + result = this->not_empty_cond_.acquire (); + else + { + result = this->not_empty_cond_.acquire (*tv); + if (result == -1 && errno == ETIME) + errno = EWOULDBLOCK; + } + int error = errno; + mon.acquire (); + errno = error; + } +#else + ACE_UNUSED_ARG (mon); + + // Wait while the queue is empty. + + while (this->is_empty_i ()) + { + if (this->not_empty_cond_.wait (tv) == -1) + { + if (errno == ETIME) + errno = EWOULDBLOCK; + result = -1; + break; + } + if (this->deactivated_) + { + errno = ESHUTDOWN; + result = -1; + break; + } + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return result; +} + +// Block indefinitely waiting for an item to arrive, does not ignore +// alerts (e.g., signals). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; + + int queue_count = this->enqueue_head_i (new_item); + + if (queue_count == -1) + return -1; + else + { + this->notify (); + return queue_count; + } +} + +// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in +// accordance with its <msg_priority> (0 is lowest priority). Returns +// -1 on failure, else the number of items still on the queue. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; + + int queue_count = this->enqueue_i (new_item); + + if (queue_count == -1) + return -1; + else + { + this->notify (); + return queue_count; + } +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue"); + return this->enqueue_prio (new_item, tv); +} + +// Block indefinitely waiting for an item to arrive, +// does not ignore alerts (e.g., signals). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; + + int queue_count = this->enqueue_tail_i (new_item); + + if (queue_count == -1) + return -1; + else + { + this->notify (); + return queue_count; + } +} + +// Remove an item from the front of the queue. If TV == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by TV. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_empty_cond (ace_mon, tv) == -1) + return -1; + + return this->dequeue_head_i (first_item); +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::notify (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify"); + + // By default, don't do anything. + if (this->notification_strategy_ == 0) + return 0; + else + return this->notification_strategy_->notify (); +} + + +///////////////////////////////////// +// class ACE_Dynamic_Message_Queue // +///////////////////////////////////// + + // = Initialization and termination methods. +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( + ACE_Dynamic_Message_Strategy & message_strategy, + size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) + : ACE_Message_Queue (hwm, lwm, ns) + , message_strategy_ (message_strategy) +{ + // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the + // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor +} + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) +{ + delete &message_strategy_; +} +// dtor: free message strategy and let base class dtor do the rest + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + int result = 0; + + // refresh dynamic priority of the new message + result = (*priority_eval_func_ptr_) (*new_item, tv); + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh dynamic priorities of messages in the queue + this->refresh_priorities (current_time); + + // reorganize the queue according to the new priorities + this->refresh_queue (current_time); + + // if there is only one message in the pending list, + // the pending list will be empty after a *successful* + // dequeue operation + int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; + + // invoke the base class method + result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); + + // null out the pending list tail pointer if + // the pending list is now empty + if ((empty_pending) && (result > 0)) + { + pending_list_tail_ = 0; + } + + return result; +} + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all enqueued + // messages. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + + int result = 0; + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh dynamic priorities of messages in the queue + result = this->refresh_priorities (current_time); + if (result < 0) + { + return result; + } + + // reorganize the queue according to the new priorities + result = this->refresh_queue (current_time); + if (result < 0) + { + return result; + } + + // if there is only one message in the pending list, + // the pending list will be empty after a *successful* + // dequeue operation + int empty_pending = (head_ == pending_list_tail_) ? 1 : 0; + + // invoke the base class method + result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); + + // null out the pending list tail pointer if + // the pending list is now empty + if ((empty_pending) && (result > 0)) + { + pending_list_tail_ = 0; + } + + return result; +} + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) +{ + int result = 0; + + // apply the priority update function to all enqueued + // messages, starting at the head of the queue + ACE_Message_Block *temp = head_; + while (temp) + { + result = (*priority_eval_func_ptr_) (*temp, tv); + if (result < 0) + { + break; + } + + temp = temp->next (); + } + + return result; +} + // refresh the priorities in the queue according + // to a specific priority assignment function + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) +{ + // first, drop any messages from the queue and delete them: + // reference counting at the data block level means that the + // underlying data block will not be deleted if another + // message block is still pointing to it. + ACE_Message_Block *temp = (pending_list_tail_) + ? pending_list_tail_->next () + : head_; + + while (temp) + { + // messages that have overflowed the given time bounds must be removed + if (message_strategy_.is_beyond_late (*temp, tv)) + { + // find the end of the chain of overflowed messages + ACE_Message_Block *remove_tail = temp; + while ((remove_tail) && (remove_tail->next ()) && + message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) + { + remove_tail = remove_tail->next (); + } + + temp = remove_tail->next (); + if (remove_temp->next ()) + { + remove_temp->next ()->prev (0); + } + else if (remove_temp->prev ()) + { + remove_temp->prev ()->next (0); + } + else + { + head_ = 0; + tail_ = 0; + } + remove_temp->prev (0); + remove_temp->next (0); + + temp = remove_temp->next (); + + } + else + { + temp = temp->next (); + } + } +} + // refresh the order of messages in the queue + // after refreshing their priorities + + +///////////////////////////////////// +// class ACE_Message_Queue_Factory // +///////////////////////////////////// + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns); +} + // factory method for a statically prioritized ACE_Message_Queue + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Deadline_Message_Strategy *adms; + + ACE_NEW_RETURN (adms, + ACE_Deadline_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns); +} + // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Laxity_Message_Strategy *alms; + + ACE_NEW_RETURN (alms, + ACE_Laxity_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + + return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns); +} + // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue + + +#endif /* ACE_MESSAGE_QUEUE_T_C */ diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h new file mode 100644 index 00000000000..cc181c04a99 --- /dev/null +++ b/ace/Message_Queue_T.h @@ -0,0 +1,486 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Message_Queue_T.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_MESSAGE_QUEUE_T_H) +#define ACE_MESSAGE_QUEUE_T_H + +#include "ace/Synch.h" + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue : public ACE_Message_Queue_Base +{ + // = TITLE + // A threaded message queueing facility, modeled after the + // queueing facilities in System V STREAMs. + // + // = DESCRIPTION + // An <ACE_Message_Queue> is the central queueing facility for + // messages in the ASX framework. If <ACE_SYNCH_DECL> is + // ACE_MT_SYNCH then all operations are thread-safe. Otherwise, + // if it's <ACE_NULL_SYNCH> then there's no locking overhead. +public: + friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; + friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; + + // = Traits + typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR; + typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR; + + // = Initialization and termination methods. + ACE_Message_Queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + // Create a message queue with all the defaults. + virtual int open (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + // Create a message queue with all the defaults. + + virtual int close (void); + // Close down the message queue and release all resources. + + virtual ~ACE_Message_Queue (void); + // Close down the message queue and release all resources. + + // = Enqueue and dequeue methods. + + // For all the following routines if <timeout> == 0, the caller will + // block until action is possible, else will wait until the absolute + // time specified in *<timeout> elapses). These calls will return, + // however, when queue is closed, deactivated, when a signal occurs, + // or if the time specified in timeout elapses, (in which case errno + // = EWOULDBLOCK). + + virtual int peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0); + // Retrieve the first <ACE_Message_Block> without removing it. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int enqueue_prio (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in + // accordance with its <msg_priority> (0 is lowest priority). FIFO + // order is maintained when messages of the same priority are + // inserted consecutively. Returns -1 on failure, else the number + // of items still on the queue. + + virtual int enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // This is an alias for <enqueue_prio>. It's only here for + // backwards compatibility and will go away in a subsequent release. + // Please use <enqueue_prio> instead. + + virtual int enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> at the end of the queue. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> at the head of the queue. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. Returns -1 on failure, else the number of items still on + // the queue. + + // = Check if queue is full/empty. + virtual int is_full (void); + // True if queue is full, else false. + virtual int is_empty (void); + // True if queue is empty, else false. + + // = Queue statistic methods. + virtual size_t message_bytes (void); + // Number of total bytes on the queue. + virtual size_t message_count (void); + // Number of total messages on the queue. + + // = Flow control routines + virtual size_t high_water_mark (void); + // Get high watermark. + virtual void high_water_mark (size_t hwm); + // Set high watermark. + virtual size_t low_water_mark (void); + // Get low watermark. + virtual void low_water_mark (size_t lwm); + // Set low watermark. + + // = Activation control methods. + + virtual int deactivate (void); + // Deactivate the queue and wakeup all threads waiting on the queue + // so they can continue. No messages are removed from the queue, + // however. Any other operations called until the queue is + // activated again will immediately return -1 with <errno> == + // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + // call and WAS_ACTIVE if queue was active before the call. + + virtual int activate (void); + // Reactivate the queue so that threads can enqueue and dequeue + // messages again. Returns WAS_INACTIVE if queue was inactive + // before the call and WAS_ACTIVE if queue was active before the + // call. + + virtual int deactivated (void); + // Returns true if <deactivated_> is enabled. + + // = Notification hook. + + virtual int notify (void); + // This hook is automatically invoked by <enqueue_head>, + // <enqueue_tail>, and <enqueue_prio> when a new item is inserted + // into the queue. Subclasses can override this method to perform + // specific notification strategies (e.g., signaling events for a + // <WFMO_Reactor>, notifying a <Reactor>, etc.). In a + // multi-threaded application with concurrent consumers, there is no + // guarantee that the queue will be still be non-empty by the time + // the notification occurs. + + // = Get/set the notification strategy for the <Message_Queue> + virtual ACE_Notification_Strategy *notification_strategy (void); + virtual void notification_strategy (ACE_Notification_Strategy *s); + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + // = Routines that actually do the enqueueing and dequeueing. + // These routines assume that locks are held by the corresponding + // public methods. Since they are virtual, you can change the + // queueing mechanism by subclassing from <ACE_Message_Queue>. + + virtual int enqueue_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + + virtual int enqueue_tail_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> at the end of the queue. + + virtual int enqueue_head_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> at the head of the queue. + + virtual int dequeue_head_i (ACE_Message_Block *&first_item); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + + // = Check the boundary conditions (assumes locks are held). + virtual int is_full_i (void); + // True if queue is full, else false. + virtual int is_empty_i (void); + // True if queue is empty, else false. + + // = Implementation of the public activate() and deactivate() methods above (assumes locks are held). + virtual int deactivate_i (void); + // Deactivate the queue. + virtual int activate_i (void); + // Activate the queue. + + // = Helper methods to factor out common #ifdef code. + virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv); + // Wait for the queue to become non-full. + + virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv); + // Wait for the queue to become non-empty. + + virtual int signal_enqueue_waiters (void); + // Inform any threads waiting to enqueue that they can procede. + + virtual int signal_dequeue_waiters (void); + // Inform any threads waiting to dequeue that they can procede. + + ACE_Message_Block *head_; + // Pointer to head of ACE_Message_Block list. + + ACE_Message_Block *tail_; + // Pointer to tail of ACE_Message_Block list. + + size_t low_water_mark_; + // Lowest number before unblocking occurs. + + size_t high_water_mark_; + // Greatest number of bytes before blocking. + + size_t cur_bytes_; + // Current number of bytes in the queue. + + size_t cur_count_; + // Current number of messages in the queue. + + int deactivated_; + // Indicates that the queue is inactive. + + ACE_Notification_Strategy *notification_strategy_; + // The notification strategy used when a new message is enqueued. + + // = Synchronization primitives for controlling concurrent access. + ACE_SYNCH_MUTEX_T lock_; + // Protect queue from concurrent access. + +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + ACE_SYNCH_SEMAPHORE_T not_empty_cond_; + // Used to make threads sleep until the queue is no longer empty. + + ACE_SYNCH_SEMAPHORE_T not_full_cond_; + // Used to make threads sleep until the queue is no longer full. + + size_t dequeue_waiters_; + // Number of threads waiting to dequeue a <Message_Block>. + + size_t enqueue_waiters_; + // Number of threads waiting to enqueue a <Message_Block>. +#else + ACE_SYNCH_CONDITION_T not_empty_cond_; + // Used to make threads sleep until the queue is no longer empty. + + ACE_SYNCH_CONDITION_T not_full_cond_; + // Used to make threads sleep until the queue is no longer full. +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + +private: + + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) +}; + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue_Iterator +{ + // = TITLE + // Iterator for the <ACE_Message_Queue>. +public: + // = Initialization method. + ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); + + // = Iteration methods. + int next (ACE_Message_Block *&entry); + // Pass back the <entry> that hasn't been seen in the queue. + // Returns 0 when all items have been seen, else 1. + + int done (void) const; + // Returns 1 when all items have been seen, else 0. + + int advance (void); + // Move forward by one element in the queue. Returns 0 when all the + // items in the set have been seen, else 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Message_Queue <ACE_SYNCH_USE> &queue_; + // Message_Queue we are iterating over. + + ACE_Message_Block *curr_; + // Keeps track of how far we've advanced... +}; + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue_Reverse_Iterator +{ + // = TITLE + // Reverse Iterator for the <ACE_Message_Queue>. +public: + // = Initialization method. + ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); + + // = Iteration methods. + int next (ACE_Message_Block *&entry); + // Pass back the <entry> that hasn't been seen in the queue. + // Returns 0 when all items have been seen, else 1. + + int done (void) const; + // Returns 1 when all items have been seen, else 0. + + int advance (void); + // Move forward by one element in the queue. Returns 0 when all the + // items in the set have been seen, else 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Message_Queue <ACE_SYNCH_USE> &queue_; + // Message_Queue we are iterating over. + + ACE_Message_Block *curr_; + // Keeps track of how far we've advanced... +}; + +template <ACE_SYNCH_DECL> +class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> +{ + // = TITLE + // A derived class which adapts the <ACE_Message_Queue> + // class in order to maintain dynamic priorities for enqueued + // <ACE_Message_Blocks> and manage the queue dynamically. + // + // = DESCRIPTION + // Priorities and queue orderings are refreshed at each enqueue + // and dequeue operation. Head and tail enqueue methods were + // made private to prevent out-of-order messages from confusing + // the pending and late portions of the queue. Messages in the + // pending portion of the queue whose dynamic priority becomes + // negative are placed into the late portion of the queue. + // Messages in the late portion of the queue whose dynamic + // priority becomes positive are dropped. These behaviors + // support a limited schedule overrun corresponding to one full + // cycle through dynamic priority values. These behaviors can + // be modified in derived classes by providing alternative + // definitions for the appropriate virtual methods. + // +public: + // = Initialization and termination methods. + ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, + size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + virtual ~ACE_Dynamic_Message_Queue (void); + // Close down the message queue and release all resources. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + virtual int enqueue_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all + // enqueued messages. + + virtual int dequeue_head_i (ACE_Message_Block *&first_item); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + + virtual int refresh_priorities (const ACE_Time_Value & tv); + // Refresh the priorities in the queue according to a specific + // priority assignment function. + + virtual int refresh_queue (const ACE_Time_Value & tv); + // Refresh the order of messages in the queue after refreshing their + // priorities. + + ACE_Message_Block *pending_list_tail_; + // Pointer to tail of the pending messages (those whose priority is + // and has been non-negative) in the <ACE_Message_Block list>. + + ACE_Dynamic_Message_Strategy &message_strategy_; + // Pointer to a dynamic priority evaluation function. + +private: + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) + + // = These methods can wierdness in dynamically prioritized queue. + + // Disallow their use until and unless a coherent semantics for head + // and tail enqueueing can be identified. + ACE_UNIMPLEMENTED_FUNC (virtual int + enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) + ACE_UNIMPLEMENTED_FUNC (virtual int + enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) + + ACE_UNIMPLEMENTED_FUNC (virtual int + peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0)) + // Since messages are *dynamically* prioritized, it is not possible + // to guarantee that the message at the head of the queue when this + // method is called will still be at the head when the next method + // is called: disallow its use until and unless a coherent semantics + // for peeking at the head of the queue can be identified. +}; + +template <ACE_SYNCH_DECL> +class ACE_Export ACE_Message_Queue_Factory +{ + // = TITLE + // ACE_Message_Queue_Factory is a static factory class template which + // provides a separate factory method for each of the major kinds of + // priority based message dispatching: static, earliest deadline first + // (EDF), and minimum laxity first (MLF). + // + // = DESCRIPTION + // The ACE_Dynamic_Message_Queue class assumes responsibility for + // releasing the resources of the strategy with which it was + // constructed: the user of a message queue constructed by + // any of these factory methods is only responsible for + // ensuring destruction of the message queue itself. + +public: + static ACE_Message_Queue<ACE_SYNCH_USE> * + create_static_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + // factory method for a statically prioritized ACE_Message_Queue + + static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * + create_deadline_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0, + u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue + + static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * + create_laxity_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0, + u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue +}; + +#if defined (__ACE_INLINE__) +#include "ace/Message_Queue_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Message_Queue_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Message_Queue_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_MESSAGE_QUEUE_T_H */ diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i new file mode 100644 index 00000000000..e2f7878c0a2 --- /dev/null +++ b/ace/Message_Queue_T.i @@ -0,0 +1,146 @@ +/* -*- C++ -*- */ +// $Id$ + +template <ACE_SYNCH_DECL> ACE_INLINE ACE_Notification_Strategy * +ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); + + return this->notification_strategy_; +} + +template <ACE_SYNCH_DECL> ACE_INLINE void +ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); + + this->notification_strategy_ = s; +} + +// Check if queue is empty (does not hold locks). + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i"); + return this->cur_bytes_ <= 0 && this->cur_count_ <= 0; +} + +// Check if queue is full (does not hold locks). + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i"); + return this->cur_bytes_ > this->high_water_mark_; +} + +// Check if queue is empty (holds locks). + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->is_empty_i (); +} + +// Check if queue is full (holds locks). + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->is_full_i (); +} + +template <ACE_SYNCH_DECL> ACE_INLINE size_t +ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->high_water_mark_; +} + +template <ACE_SYNCH_DECL> ACE_INLINE void +ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->high_water_mark_ = hwm; +} + +template <ACE_SYNCH_DECL> ACE_INLINE size_t +ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->low_water_mark_; +} + +template <ACE_SYNCH_DECL> ACE_INLINE void +ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->low_water_mark_ = lwm; +} + +// Return the current number of bytes in the queue. + +template <ACE_SYNCH_DECL> ACE_INLINE size_t +ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->cur_bytes_; +} + +// Return the current number of messages in the queue. + +template <ACE_SYNCH_DECL> ACE_INLINE size_t +ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->cur_count_; +} + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::activate (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->activate_i (); +} + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivate (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->deactivate_i (); +} + +template <ACE_SYNCH_DECL> ACE_INLINE int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated"); + + return this->deactivated_; +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator) + + |