summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1997-08-08 20:28:00 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1997-08-08 20:28:00 +0000
commit7f0b8264a946a467068d241359aefa2c6e6a1a55 (patch)
tree41eae07b87cee304331f2cf7899f4993c60ba669 /ace/Message_Queue.cpp
parent4eb7522ebb90736cf0b69bf686207534736d8c4b (diff)
downloadATCD-7f0b8264a946a467068d241359aefa2c6e6a1a55.tar.gz
*** empty log message ***
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp262
1 files changed, 115 insertions, 147 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index ebffb72a842..2e6edb1306b 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -242,6 +242,38 @@ ACE_Message_Queue<ACE_SYNCH_2>::close (void)
return res;
}
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::signal_enqueue_waiters (void)
+#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ if (this->not_full_cond_.signal () != 0)
+ return -1;
+#else
+ if (this->enqueue_waiters_ > 0)
+ {
+ --this->enqueue_waiters_;
+ this->not_full_cond_.release ();
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return 0;
+}
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::signal_dequeue_waiters (void)
+{
+#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ // Tell any blocked threads that the queue has a new item!
+ if (this->not_empty_cond_.signal () != 0)
+ return -1;
+#else
+ if (this->dequeue_waiters_ > 0)
+ {
+ --this->dequeue_waiters_;
+ this->not_empty_cond_.release ();
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ return 0;
+}
+
// Actually put the node at the end (no locking so must be called with
// locks held).
@@ -278,18 +310,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- // Tell any blocked threads that the queue has a new item!
- if (this->not_empty_cond_.signal () != 0)
+
+ if (this->signal_dequeue_waiters () == -1)
return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return this->cur_count_;
+ else
+ return this->cur_count_;
}
// Actually put the node at the head (no locking)
@@ -320,18 +345,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- // Tell any blocked threads that the queue has a new item!
- if (this->not_empty_cond_.signal () != 0)
+
+ if (this->signal_dequeue_waiters () == -1)
return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return this->cur_count_;
+ else
+ return this->cur_count_;
}
// Actually put the node at its proper position relative to its
@@ -360,12 +378,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item)
for (temp = this->tail_;
temp != 0;
temp = temp->prev ())
- {
- if (temp->msg_priority () >= new_item->msg_priority ())
- // Break out when we've located an item that has higher
- // priority that <new_item>.
- break;
- }
+ if (temp->msg_priority () >= new_item->msg_priority ())
+ // Break out when we've located an item that has higher
+ // priority that <new_item>.
+ break;
if (temp == 0)
// Check for simple case of inserting at the head of the queue,
@@ -398,18 +414,11 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_i (ACE_Message_Block *new_item)
this->cur_bytes_ += temp->size ();
this->cur_count_++;
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- // Tell any blocked threads that the queue has a new item!
- if (this->not_empty_cond_.signal () != 0)
+
+ if (this->signal_dequeue_waiters () == -1)
return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return this->cur_count_;
+ else
+ return this->cur_count_;
}
// Actually get the first ACE_Message_Block (no locking, so must be
@@ -439,17 +448,10 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head_i (ACE_Message_Block *&first_item)
this->cur_count_--;
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->not_full_cond_.signal () != 0)
+ if (this->signal_dequeue_waiters () == -1)
return -1;
-#else
- if (this->enqueue_waiters_ > 0)
- {
- --this->enqueue_waiters_;
- this->not_full_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return this->cur_count_;
+ else
+ return this->cur_count_;
}
// Take a look at the first item without removing it.
@@ -488,24 +490,12 @@ ACE_Message_Queue<ACE_SYNCH_2>::peek_dequeue_head (ACE_Message_Block *&first_ite
return this->cur_count_;
}
-// Block indefinitely waiting for an item to arrive,
-// does not ignore alerts (e.g., signals).
-
-template <ACE_SYNCH_1> int
-ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->is_full_i ())
+ while (this->is_full_i ())
{
++this->enqueue_waiters_;
// @@ Need to add sanity checks for failure...
@@ -514,7 +504,7 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
ace_mon.acquire ();
}
#else
- // Wait while the queue is full
+ // Wait while the queue is full.
while (this->is_full_i ())
{
@@ -531,6 +521,59 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
}
}
#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+}
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
+ ACE_Time_Value *tv)
+{
+#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
+ while (this->is_empty_i ())
+ {
+ ++this->dequeue_waiters_;
+ // @@ Need to add sanity checks for failure...
+ ace_mon.release ();
+ this->not_empty_cond_.acquire ();
+ ace_mon.acquire ();
+ }
+#else
+ // Wait while the queue is empty.
+
+ while (this->is_empty_i ())
+ {
+ if (this->not_empty_cond_.wait (tv) == -1)
+ {
+ if (errno == ETIME)
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+ }
+#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+}
+
+// Block indefinitely waiting for an item to arrive,
+// does not ignore alerts (e.g., signals).
+
+template <ACE_SYNCH_1> int
+ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *tv)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_2>::enqueue_head");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_)
+ {
+ errno = ESHUTDOWN;
+ return -1;
+ }
+
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
int queue_count = this->enqueue_head_i (new_item);
@@ -560,33 +603,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_prio (ACE_Message_Block *new_item,
return -1;
}
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->is_full_i ())
- {
- ++this->enqueue_waiters_;
- // @@ Need to add sanity checks for failure...
- ace_mon.release ();
- this->not_full_cond_.acquire ();
- ace_mon.acquire ();
- }
-#else
- // Wait while the queue is full
-
- while (this->is_full_i ())
- {
- if (this->not_full_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
int queue_count = this->enqueue_i (new_item);
@@ -623,33 +641,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::enqueue_tail (ACE_Message_Block *new_item,
return -1;
}
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->is_full_i ())
- {
- ++this->enqueue_waiters_;
- // @@ Need to add sanity checks for failure...
- ace_mon.release ();
- this->not_full_cond_.acquire ();
- ace_mon.acquire ();
- }
-#else
- // Wait while the queue is full
-
- while (this->is_full_i ())
- {
- if (this->not_full_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ return -1;
int queue_count = this->enqueue_tail_i (new_item);
@@ -679,33 +672,8 @@ ACE_Message_Queue<ACE_SYNCH_2>::dequeue_head (ACE_Message_Block *&first_item,
return -1;
}
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->is_empty_i ())
- {
- ++this->dequeue_waiters_;
- // @@ Need to add sanity checks for failure...
- ace_mon.release ();
- this->not_empty_cond_.acquire ();
- ace_mon.acquire ();
- }
-#else
- // Wait while the queue is empty.
-
- while (this->is_empty_i ())
- {
- if (this->not_empty_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- return -1;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ return -1;
return this->dequeue_head_i (first_item);
}