summaryrefslogtreecommitdiff
path: root/ace/Message_Queue.cpp
diff options
context:
space:
mode:
authorlevine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-26 17:20:59 +0000
committerlevine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-26 17:20:59 +0000
commit5e5d58c08793b701904f7b7b5df3bd1a1e6d963e (patch)
tree97136f3ba676fc2f9fe5209d4fbf6c10f9c7acb8 /ace/Message_Queue.cpp
parent878c85fbf7e12f155d9029ddbc5c9826f36fdf61 (diff)
downloadATCD-5e5d58c08793b701904f7b7b5df3bd1a1e6d963e.tar.gz
On VxWorks, added ACE_Message_Queue_Vx to wrap native VxWorks message queues
Diffstat (limited to 'ace/Message_Queue.cpp')
-rw-r--r--ace/Message_Queue.cpp954
1 files changed, 119 insertions, 835 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 171d5174742..6f1c2e53c3c 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -10,116 +10,16 @@
#include "ace/Message_Queue.i"
#endif /* __ACE_INLINE__ */
-ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+#if defined (VXWORKS)
-ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+////////////////////////////////
+// class ACE_Message_Queue_Vx //
+////////////////////////////////
-//////////////////////////////////////
-// class ACE_Message_Queue_Iterator //
-//////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
- : queue_ (q),
- curr_ (q.head_)
-{
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_ != 0)
- {
- entry = this->curr_;
- return 1;
- }
- else
- return 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- return this->curr_ == 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void)
+void
+ACE_Message_Queue_Vx::dump (void) const
{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_)
- this->curr_ = this->curr_->next ();
- return this->curr_ != 0;
-}
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
-{
-}
-
-ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
-
-//////////////////////////////////////////////
-// class ACE_Message_Queue_Reverse_Iterator //
-//////////////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
- : queue_ (q),
- curr_ (queue_.tail_)
-{
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_ != 0)
- {
- entry = this->curr_;
- return 1;
- }
- else
- return 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- return this->curr_ == 0;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void)
-{
- ACE_Read_Guard<ACE_SYNCH_MUTEX_T> m (this->queue_.lock_);
-
- if (this->curr_)
- this->curr_ = this->curr_->prev ();
- return this->curr_ != 0;
-}
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
-{
-}
-
-/////////////////////////////
-// class ACE_Message_Queue //
-/////////////////////////////
-
-template <ACE_SYNCH_DECL> void
-ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump");
+ ACE_TRACE ("ACE_Message_Queue_Vx::dump");
ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
ACE_DEBUG ((LM_DEBUG,
ASYS_TEXT ("deactivated = %d\n")
@@ -128,7 +28,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
ASYS_TEXT ("cur_bytes = %d\n")
ASYS_TEXT ("cur_count = %d\n")
ASYS_TEXT ("head_ = %u\n")
- ASYS_TEXT ("tail_ = %u\n"),
+ ASYS_TEXT ("MSG_Q_ID = %u\n"),
this->deactivated_,
this->low_water_mark_,
this->high_water_mark_,
@@ -136,86 +36,79 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
this->cur_count_,
this->head_,
this->tail_));
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_full_cond: \n")));
- not_full_cond_.dump ();
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("not_empty_cond: \n")));
- not_empty_cond_.dump ();
ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- : not_empty_cond_ (0),
- not_full_cond_ (0),
- enqueue_waiters_ (0),
- dequeue_waiters_ (0)
-#else
- : not_empty_cond_ (this->lock_),
- not_full_cond_ (this->lock_)
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ACE_Message_Queue_Vx::ACE_Message_Queue_Vx (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
+ : ACE_Message_Queue<ACE_NULL_SYNCH> (0, 0, ns),
+ max_messages_ (ACE_static_cast (int, max_messages)),
+ max_message_length_ (ACE_static_cast (int, max_message_length))
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue");
+ ACE_TRACE ("ACE_Message_Queue_Vx::ACE_Message_Queue_Vx");
- if (this->open (hwm, lwm, ns) == -1)
+ if (this->open (max_messages_, max_message_length_, ns) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("open")));
}
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void)
+ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue");
- if (this->head_ != 0 && this->close () == -1)
- ACE_ERROR ((LM_ERROR, ASYS_TEXT ("close")));
+ ACE_TRACE ("ACE_Message_Queue_Vx::~ACE_Message_Queue_Vx");
}
// Don't bother locking since if someone calls this function more than
// once for the same queue, we're in bigger trouble than just
// concurrency control!
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
+int
+ACE_Message_Queue_Vx::open (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open");
- this->high_water_mark_ = hwm;
- this->low_water_mark_ = lwm;
+ ACE_TRACE ("ACE_Message_Queue_Vx::open");
+ this->high_water_mark_ = 0;
+ this->low_water_mark_ = 0;
this->deactivated_ = 0;
this->cur_bytes_ = 0;
this->cur_count_ = 0;
- this->tail_ = 0;
this->head_ = 0;
this->notification_strategy_ = ns;
- return 0;
+ this->max_messages_ = ACE_static_cast (int, max_messages);
+ this->max_message_length_ = ACE_static_cast (int, max_message_length);
+
+ if (tail_)
+ {
+ // Had already created a msgQ, so delete it.
+ close ();
+ activate_i ();
+ }
+
+ return (this->tail_ =
+ ACE_reinterpret_cast (ACE_Message_Block *,
+ ::msgQCreate (max_messages_,
+ max_message_length_,
+ MSG_Q_FIFO))) == NULL ? -1 : 0;
}
// Implementation of the public deactivate() method
// (assumes locks are held).
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (void)
+int
+ACE_Message_Queue_Vx::deactivate_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::deactivate_i");
int current_status =
this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
- // Wakeup all waiters.
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- this->not_empty_cond_.broadcast ();
- this->not_full_cond_.broadcast ();
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
-
this->deactivated_ = 1;
return current_status;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
+int
+ACE_Message_Queue_Vx::activate_i (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::activate_i");
int current_status =
this->deactivated_ ? WAS_INACTIVE : WAS_ACTIVE;
this->deactivated_ = 0;
@@ -224,755 +117,146 @@ ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void)
// Clean up the queue if we have not already done so!
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::close (void)
+int
+ACE_Message_Queue_Vx::close (void)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- int res = this->deactivate_i ();
-
- // Free up the remaining message on the list
-
- for (this->tail_ = 0; this->head_ != 0; )
- {
- this->cur_count_--;
-
- ACE_Message_Block *temp;
+ ACE_TRACE ("ACE_Message_Queue_Vx::close");
+ // Don't lock, because we don't have a lock. It shouldn't be
+ // necessary, anyways.
- // Decrement all the counts.
- for (temp = this->head_;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ -= temp->size ();
+ this->deactivate_i ();
- temp = this->head_;
- this->head_ = this->head_->next ();
-
- // Make sure to use <release> rather than <delete> since this is
- // reference counted.
- temp->release ();
- }
+ // Don't bother to free up the remaining message on the list,
+ // because we don't have any way to iterate over what's in the
+ // queue.
- return res;
+ return ::msgQDelete (msgq ());
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void)
+int
+ACE_Message_Queue_Vx::signal_enqueue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- if (this->not_full_cond_.signal () != 0)
- return -1;
-#else
- if (this->enqueue_waiters_ > 0)
- {
- --this->enqueue_waiters_;
- return this->not_full_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ // No-op.
return 0;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void)
+int
+ACE_Message_Queue_Vx::signal_dequeue_waiters (void)
{
-#if !defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- // Tell any blocked threads that the queue has a new item!
- if (this->not_empty_cond_.signal () != 0)
- return -1;
-#else
- if (this->dequeue_waiters_ > 0)
- {
- --this->dequeue_waiters_;
- return this->not_empty_cond_.release ();
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
+ // No-op.
return 0;
}
-// Actually put the node at the end (no locking so must be called with
-// locks held).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_tail_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i");
-
- if (new_item == 0)
- return -1;
-
- // List was empty, so build a new one.
- if (this->tail_ == 0)
- {
- this->head_ = new_item;
- this->tail_ = new_item;
- new_item->next (0);
- new_item->prev (0);
- }
- // Link at the end.
- else
- {
- new_item->next (0);
- this->tail_->next (new_item);
- new_item->prev (this->tail_);
- this->tail_ = new_item;
- }
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
-
- this->cur_count_++;
-
- if (this->signal_dequeue_waiters () == -1)
- return -1;
- else
- return this->cur_count_;
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_tail_i");
+ // No-op. This should _never_ be called.
+ ACE_NOTSUP_RETURN (-1);
}
-// Actually put the node at the head (no locking)
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_head_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_head_i");
if (new_item == 0)
return -1;
- new_item->prev (0);
- new_item->next (this->head_);
-
- if (this->head_ != 0)
- this->head_->prev (new_item);
- else
- this->tail_ = new_item;
-
- this->head_ = new_item;
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
+ // Don't try to send a composite message!!!! Only the first
+ // block will be sent.
this->cur_count_++;
- if (this->signal_dequeue_waiters () == -1)
- return -1;
+ // Always use this method to actually send a message on the queue.
+ if (::msgQSend (msgq (),
+ new_item->rd_ptr (),
+ new_item->size (),
+ WAIT_FOREVER,
+ MSG_PRI_NORMAL) == OK)
+ return ::msgQNumMsgs (msgq ());
else
- return this->cur_count_;
+ return -1;
}
-// Actually put the node at its proper position relative to its
-// priority.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+int
+ACE_Message_Queue_Vx::enqueue_i (ACE_Message_Block *new_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+ ACE_TRACE ("ACE_Message_Queue_Vx::enqueue_i");
if (new_item == 0)
return -1;
if (this->head_ == 0)
- // Check for simple case of an empty queue, where all we need to
- // do is insert <new_item> into the head.
+ // Should always take this branch.
return this->enqueue_head_i (new_item);
else
- {
- ACE_Message_Block *temp;
-
- // Figure out where the new item goes relative to its priority.
- // We start looking from the highest priority to the lowest
- // priority.
-
- for (temp = this->head_;
- temp != 0;
- temp = temp->next ())
- if (temp->msg_priority () < new_item->msg_priority ())
- // Break out when we've located an item that has lower
- // priority that <new_item>.
- break;
-
- if (temp == 0)
- // Check for simple case of inserting at the tail of the queue,
- // where all we need to do is insert <new_item> after the
- // current tail.
- return this->enqueue_tail_i (new_item);
- else if (temp->prev () == 0)
- // Check for simple case of inserting at the head of the
- // queue, where all we need to do is insert <new_item> before
- // the current head.
- return this->enqueue_head_i (new_item);
- else
- {
- // Insert the new message ahead of the item of
- // lesser priority. This ensures that FIFO order is
- // maintained when messages of the same priority are
- // inserted consecutively.
- new_item->next (temp);
- new_item->prev (temp->prev ());
- temp->prev ()->next (new_item);
- temp->prev (new_item);
- }
- }
-
- // Make sure to count *all* the bytes in a composite message!!!
-
- for (ACE_Message_Block *temp = new_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ += temp->size ();
-
- this->cur_count_++;
-
- if (this->signal_dequeue_waiters () == -1)
- return -1;
- else
- return this->cur_count_;
+ ACE_NOTSUP_RETURN (-1);
}
// Actually get the first ACE_Message_Block (no locking, so must be
// called with locks held). This method assumes that the queue has at
// least one item in it when it is called.
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+int
+ACE_Message_Queue_Vx::dequeue_head_i (ACE_Message_Block *&first_item)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
- first_item = this->head_;
- this->head_ = this->head_->next ();
+ ACE_TRACE ("ACE_Message_Queue_Vx::dequeue_head_i");
- if (this->head_ == 0)
- this->tail_ = 0;
- else
- // The prev pointer of the first message block has to point to
- // NULL...
- this->head_->prev (0);
+ // We don't allocate a new Message_Block: the caller must provide
+ // it, and must ensure that it is big enough (without chaining).
- // Make sure to subtract off all of the bytes associated with this
- // message.
- for (ACE_Message_Block *temp = first_item;
- temp != 0;
- temp = temp->cont ())
- this->cur_bytes_ -= temp->size ();
-
- this->cur_count_--;
+ if (first_item == 0 || first_item->wr_ptr () == 0)
+ return -1;
- if (this->signal_enqueue_waiters () == -1)
+ if (::msgQReceive (msgq (),
+ first_item->wr_ptr (),
+ first_item->size (),
+ WAIT_FOREVER) == ERROR)
return -1;
else
- return this->cur_count_;
+ return ::msgQNumMsgs (msgq ());
}
// Take a look at the first item without removing it.
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::wait_not_full_cond (ACE_Guard<ACE_Null_Mutex> &mon,
+ ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- // Wait for at least one item to become available.
-
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
- return -1;
-
- first_item = this->head_;
- return this->cur_count_;
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
-{
- int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_full_i () && result != -1)
- {
- ++this->enqueue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- if (tv == 0)
- result = this->not_full_cond_.acquire ();
- else
- result = this->not_full_cond_.acquire (*tv);
- int error = errno;
- mon.acquire ();
- errno = error;
- }
-#else
- ACE_UNUSED_ARG (mon);
-
- // Wait while the queue is full.
-
- while (this->is_full_i ())
- {
- if (this->not_full_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- result = -1;
- break;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- result = -1;
- break;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return result;
+ // Always return here, and let the VxWorks message queue handle blocking.
+ return 0;
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::wait_not_empty_cond (ACE_Guard<ACE_Null_Mutex> &mon,
+ ACE_Time_Value *tv)
{
- int result = 0;
-#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
- while (this->is_empty_i () && result != -1)
- {
- ++this->dequeue_waiters_;
- // @@ Need to add sanity checks for failure...
- mon.release ();
- if (tv == 0)
- result = this->not_empty_cond_.acquire ();
- else
- {
- result = this->not_empty_cond_.acquire (*tv);
- if (result == -1 && errno == ETIME)
- errno = EWOULDBLOCK;
- }
- int error = errno;
- mon.acquire ();
- errno = error;
- }
-#else
- ACE_UNUSED_ARG (mon);
-
- // Wait while the queue is empty.
-
- while (this->is_empty_i ())
- {
- if (this->not_empty_cond_.wait (tv) == -1)
- {
- if (errno == ETIME)
- errno = EWOULDBLOCK;
- result = -1;
- break;
- }
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- result = -1;
- break;
- }
- }
-#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */
- return result;
+ // Always return here, and let the VxWorks message queue handle blocking.
+ return 0;
}
-// Block indefinitely waiting for an item to arrive, does not ignore
-// alerts (e.g., signals).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+#if ! defined (ACE_REQUIRES_FUNC_DEFINITIONS)
+int
+ACE_Message_Queue_Vx::enqueue_tail (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_head_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
+ ACE_NOTSUP_RETURN (-1);
}
-// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
-// accordance with its <msg_priority> (0 is lowest priority). Returns
-// -1 on failure, else the number of items still on the queue.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+int
+ACE_Message_Queue_Vx::enqueue_head (ACE_Message_Block *mb, ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
+ ACE_NOTSUP_RETURN (-1);
}
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
+int
+ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
ACE_Time_Value *tv)
{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
- return this->enqueue_prio (new_item, tv);
-}
-
-// Block indefinitely waiting for an item to arrive,
-// does not ignore alerts (e.g., signals).
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
- return -1;
-
- int queue_count = this->enqueue_tail_i (new_item);
-
- if (queue_count == -1)
- return -1;
- else
- {
- this->notify ();
- return queue_count;
- }
-}
-
-// Remove an item from the front of the queue. If TV == 0 block
-// indefinitely (or until an alert occurs). Otherwise, block for upto
-// the amount of time specified by TV.
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
-
- if (this->deactivated_)
- {
- errno = ESHUTDOWN;
- return -1;
- }
-
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
- return -1;
-
- return this->dequeue_head_i (first_item);
-}
-
-template <ACE_SYNCH_DECL> int
-ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
-{
- ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify");
-
- // By default, don't do anything.
- if (this->notification_strategy_ == 0)
- return 0;
- else
- return this->notification_strategy_->notify ();
-}
-
-
-/////////////////////////////////////
-// class ACE_Dynamic_Message_Queue //
-/////////////////////////////////////
-
- // = Initialization and termination methods.
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
- ACE_Dynamic_Message_Strategy & message_strategy,
- size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
- : ACE_Message_Queue (hwm, lwm, ns)
- , message_strategy_ (message_strategy)
-{
- // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
- // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor
-}
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
-{
- delete &message_strategy_;
-}
-// dtor: free message strategy and let base class dtor do the rest
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
-{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
-
- int result = 0;
-
- // refresh dynamic priority of the new message
- result = (*priority_eval_func_ptr_) (*new_item, tv);
-
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
-
- // refresh dynamic priorities of messages in the queue
- this->refresh_priorities (current_time);
-
- // reorganize the queue according to the new priorities
- this->refresh_queue (current_time);
-
- // if there is only one message in the pending list,
- // the pending list will be empty after a *successful*
- // dequeue operation
- int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
-
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
-
- // null out the pending list tail pointer if
- // the pending list is now empty
- if ((empty_pending) && (result > 0))
- {
- pending_list_tail_ = 0;
- }
-
- return result;
-}
- // Enqueue an <ACE_Message_Block *> in accordance with its priority.
- // priority may be *dynamic* or *static* or a combination or *both*
- // It calls the priority evaluation function passed into the Dynamic
- // Message Queue constructor to update the priorities of all enqueued
- // messages.
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
-{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
-
- int result = 0;
-
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
-
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
- if (result < 0)
- {
- return result;
- }
-
- // reorganize the queue according to the new priorities
- result = this->refresh_queue (current_time);
- if (result < 0)
- {
- return result;
- }
-
- // if there is only one message in the pending list,
- // the pending list will be empty after a *successful*
- // dequeue operation
- int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
-
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
-
- // null out the pending list tail pointer if
- // the pending list is now empty
- if ((empty_pending) && (result > 0))
- {
- pending_list_tail_ = 0;
- }
-
- return result;
-}
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue.
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
-{
- int result = 0;
-
- // apply the priority update function to all enqueued
- // messages, starting at the head of the queue
- ACE_Message_Block *temp = head_;
- while (temp)
- {
- result = (*priority_eval_func_ptr_) (*temp, tv);
- if (result < 0)
- {
- break;
- }
-
- temp = temp->next ();
- }
-
- return result;
-}
- // refresh the priorities in the queue according
- // to a specific priority assignment function
-
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
-{
- // first, drop any messages from the queue and delete them:
- // reference counting at the data block level means that the
- // underlying data block will not be deleted if another
- // message block is still pointing to it.
- ACE_Message_Block *temp = (pending_list_tail_)
- ? pending_list_tail_->next ()
- : head_;
-
- while (temp)
- {
- // messages that have overflowed the given time bounds must be removed
- if (message_strategy_.is_beyond_late (*temp, tv))
- {
- // find the end of the chain of overflowed messages
- ACE_Message_Block *remove_tail = temp;
- while ((remove_tail) && (remove_tail->next ()) &&
- message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
- {
- remove_tail = remove_tail->next ();
- }
-
- temp = remove_tail->next ();
- if (remove_temp->next ())
- {
- remove_temp->next ()->prev (0);
- }
- else if (remove_temp->prev ())
- {
- remove_temp->prev ()->next (0);
- }
- else
- {
- head_ = 0;
- tail_ = 0;
- }
- remove_temp->prev (0);
- remove_temp->next (0);
-
- temp = remove_temp->next ();
-
- }
- else
- {
- temp = temp->next ();
- }
- }
-}
- // refresh the order of messages in the queue
- // after refreshing their priorities
-
-/////////////////////////////////////
-// class ACE_Message_Queue_Factory //
-/////////////////////////////////////
-
-template <ACE_SYNCH_DECL>
-ACE_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns)
-{
- return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns);
-}
- // factory method for a statically prioritized ACE_Message_Queue
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Deadline_Message_Strategy *adms;
-
- ACE_NEW_RETURN (adms,
- ACE_Deadline_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns);
-}
- // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
-
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Laxity_Message_Strategy *alms;
-
- ACE_NEW_RETURN (alms,
- ACE_Laxity_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns);
+ ACE_NOTSUP_RETURN (-1);
}
- // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
+#endif /* ! ACE_REQUIRES_FUNC_DEFINITIONS */
+#endif /* VXWORKS */
#endif /* ACE_MESSAGE_QUEUE_C */