diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-08-08 20:28:00 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 1997-08-08 20:28:00 +0000 |
commit | 7f0b8264a946a467068d241359aefa2c6e6a1a55 (patch) | |
tree | 41eae07b87cee304331f2cf7899f4993c60ba669 /ace/Message_Queue.cpp | |
parent | 4eb7522ebb90736cf0b69bf686207534736d8c4b (diff) | |
download | ATCD-7f0b8264a946a467068d241359aefa2c6e6a1a55.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r-- | ace/Message_Queue.cpp | 262 |
1 files changed, 115 insertions, 147 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index ebffb72a842..2e6edb1306b 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -242,6 +242,38 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void) return res; } +template <ACE_SYNCH_1> int +ACE_Message_Queue<ACE_SYNCH_2>::signal_enqueue_waiters (void) +#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + if (this->not_full_cond_.signal () != 0) + return -1; +#else + if (this->enqueue_waiters_ > 0) + { + --this->enqueue_waiters_; + this->not_full_cond_.release (); + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return 0; +} + +template <ACE_SYNCH_1> int +ACE_Message_Queue<ACE_SYNCH_2>::signal_dequeue_waiters (void) +{ +#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + // Tell any blocked threads that the queue has a new item! + if (this->not_empty_cond_.signal () != 0) + return -1; +#else + if (this->dequeue_waiters_ > 0) + { + --this->dequeue_waiters_; + this->not_empty_cond_.release (); + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + return 0; +} + // Actually put the node at the end (no locking so must be called with // locks held). @@ -278,18 +310,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - // Tell any blocked threads that the queue has a new item! - if (this->not_empty_cond_.signal () != 0) + + if (this->signal_dequeue_waiters () == -1) return -1; -#else - if (this->dequeue_waiters_ > 0) - { - --this->dequeue_waiters_; - this->not_empty_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return this->cur_count_; + else + return this->cur_count_; } // Actually put the node at the head (no locking) @@ -320,18 +345,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - // Tell any blocked threads that the queue has a new item! - if (this->not_empty_cond_.signal () != 0) + + if (this->signal_dequeue_waiters () == -1) return -1; -#else - if (this->dequeue_waiters_ > 0) - { - --this->dequeue_waiters_; - this->not_empty_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return this->cur_count_; + else + return this->cur_count_; } // Actually put the node at its proper position relative to its @@ -360,12 +378,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item) for (temp = this->tail_; temp != 0; temp = temp->prev ()) - { - if (temp->msg_priority () >= new_item->msg_priority ()) - // Break out when we've located an item that has higher - // priority that <new_item>. - break; - } + if (temp->msg_priority () >= new_item->msg_priority ()) + // Break out when we've located an item that has higher + // priority that <new_item>. + break; if (temp == 0) // Check for simple case of inserting at the head of the queue, @@ -398,18 +414,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item) this->cur_bytes_ += temp->size (); this->cur_count_++; -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - // Tell any blocked threads that the queue has a new item! - if (this->not_empty_cond_.signal () != 0) + + if (this->signal_dequeue_waiters () == -1) return -1; -#else - if (this->dequeue_waiters_ > 0) - { - --this->dequeue_waiters_; - this->not_empty_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return this->cur_count_; + else + return this->cur_count_; } // Actually get the first ACE_Message_Block (no locking, so must be @@ -439,17 +448,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item) this->cur_count_--; -#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->not_full_cond_.signal () != 0) + if (this->signal_dequeue_waiters () == -1) return -1; -#else - if (this->enqueue_waiters_ > 0) - { - --this->enqueue_waiters_; - this->not_full_cond_.release (); - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ - return this->cur_count_; + else + return this->cur_count_; } // Take a look at the first item without removing it. @@ -488,24 +490,12 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite return this->cur_count_; } -// Block indefinitely waiting for an item to arrive, -// does not ignore alerts (e.g., signals). - -template <ACE_SYNCH_1> int -ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *tv) +template <ACE_SYNCH_1> int +ACE_Message_Queue<ACE_SYNCH_2>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); - - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->is_full_i ()) + while (this->is_full_i ()) { ++this->enqueue_waiters_; // @@ Need to add sanity checks for failure... @@ -514,7 +504,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, ace_mon.acquire (); } #else - // Wait while the queue is full + // Wait while the queue is full. while (this->is_full_i ()) { @@ -531,6 +521,59 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, } } #endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ +} + +template <ACE_SYNCH_1> int +ACE_Message_Queue<ACE_SYNCH_2>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv) +{ +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + while (this->is_empty_i ()) + { + ++this->dequeue_waiters_; + // @@ Need to add sanity checks for failure... + ace_mon.release (); + this->not_empty_cond_.acquire (); + ace_mon.acquire (); + } +#else + // Wait while the queue is empty. + + while (this->is_empty_i ()) + { + if (this->not_empty_cond_.wait (tv) == -1) + { + if (errno == ETIME) + errno = EWOULDBLOCK; + return -1; + } + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + } +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ +} + +// Block indefinitely waiting for an item to arrive, +// does not ignore alerts (e.g., signals). + +template <ACE_SYNCH_1> int +ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *tv) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; int queue_count = this->enqueue_head_i (new_item); @@ -560,33 +603,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item, return -1; } -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->is_full_i ()) - { - ++this->enqueue_waiters_; - // @@ Need to add sanity checks for failure... - ace_mon.release (); - this->not_full_cond_.acquire (); - ace_mon.acquire (); - } -#else - // Wait while the queue is full - - while (this->is_full_i ()) - { - if (this->not_full_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; int queue_count = this->enqueue_i (new_item); @@ -623,33 +641,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item, return -1; } -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->is_full_i ()) - { - ++this->enqueue_waiters_; - // @@ Need to add sanity checks for failure... - ace_mon.release (); - this->not_full_cond_.acquire (); - ace_mon.acquire (); - } -#else - // Wait while the queue is full - - while (this->is_full_i ()) - { - if (this->not_full_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + if (this->wait_not_full_cond (ace_mon, tv) == -1) + return -1; int queue_count = this->enqueue_tail_i (new_item); @@ -679,33 +672,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item, return -1; } -#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) - if (this->is_empty_i ()) - { - ++this->dequeue_waiters_; - // @@ Need to add sanity checks for failure... - ace_mon.release (); - this->not_empty_cond_.acquire (); - ace_mon.acquire (); - } -#else - // Wait while the queue is empty. - - while (this->is_empty_i ()) - { - if (this->not_empty_cond_.wait (tv) == -1) - { - if (errno == ETIME) - errno = EWOULDBLOCK; - return -1; - } - if (this->deactivated_) - { - errno = ESHUTDOWN; - return -1; - } - } -#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + if (this->wait_not_empty_cond (ace_mon, tv) == -1) + return -1; return this->dequeue_head_i (first_item); } |