diff options
author | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
---|---|---|
committer | William R. Otte <wotte@dre.vanderbilt.edu> | 2006-07-24 15:50:30 +0000 |
commit | c44379cc7d9c7aa113989237ab0f56db12aa5219 (patch) | |
tree | 66a84b20d47f2269d8bdc6e0323f338763424d3a /ACE/ace/Message_Queue_T.cpp | |
parent | 3aff90f4a822fcf5d902bbfbcc9fa931d6191a8c (diff) | |
download | ATCD-c44379cc7d9c7aa113989237ab0f56db12aa5219.tar.gz |
Repo restructuring
Diffstat (limited to 'ACE/ace/Message_Queue_T.cpp')
-rw-r--r-- | ACE/ace/Message_Queue_T.cpp | 2813 |
1 files changed, 2813 insertions, 0 deletions
diff --git a/ACE/ace/Message_Queue_T.cpp b/ACE/ace/Message_Queue_T.cpp new file mode 100644 index 00000000000..729f22806eb --- /dev/null +++ b/ACE/ace/Message_Queue_T.cpp @@ -0,0 +1,2813 @@ +// $Id$ + +#ifndef ACE_MESSAGE_QUEUE_T_CPP +#define ACE_MESSAGE_QUEUE_T_CPP + +// #include Message_Queue.h instead of Message_Queue_T.h to avoid +// circular include problems. +#include "ace/Message_Queue.h" +#include "ace/Log_Msg.h" +#include "ace/OS_NS_sys_time.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Notification_Strategy.h" +#include "ace/Truncate.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue) +ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue) +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex) +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Ex_N) + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dump"); + + this->queue_.dump (); +#endif /* ACE_HAS_DUMP */ +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes"); + + this->queue_.message_bytes (new_value); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length"); + + this->queue_.message_length (new_value); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex"); + + if (this->queue_.open (hwm, lwm, ns) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("ACE_Message_Queue_Ex"))); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex"); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::open"); + + return this->queue_.open (hwm, lwm, ns); +} + +// Clean up the queue if we have not already done so! + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::close"); + + return this->queue_.close (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush"); + + return this->queue_.flush (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::flush_i"); + + return this->queue_.flush_i (); +} + +// Take a look at the first item without removing it. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::peek_dequeue_head"); + + ACE_Message_Block *mb = 0; + + int const cur_count = this->queue_.peek_dequeue_head (mb, timeout); + + if (cur_count != -1) + first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ()); + + return cur_count; +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head"); + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block ((char *) new_item, + sizeof (*new_item), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY), + -1); + + int const result = this->queue_.enqueue_head (mb, timeout); + if (result == -1) + // Zap the message. + mb->release (); + return result; +} + +// Enqueue an <ACE_MESSAGE_TYPE *> into the <Message_Queue> in +// accordance with its <msg_priority> (0 is lowest priority). Returns +// -1 on failure, else the number of items still on the queue. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue"); + + return this->enqueue_prio (new_item, timeout); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_prio"); + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block ((char *) new_item, + sizeof (*new_item), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY), + -1); + + int const result = this->queue_.enqueue_prio (mb, timeout); + if (result == -1) + // Zap the message. + mb->release (); + + return result; +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_deadline"); + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block ((char *) new_item, + sizeof (*new_item), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY ), + -1); + + int const result = this->queue_.enqueue_deadline (mb, timeout); + if (result == -1) + // Zap the message. + mb->release (); + + return result; +} + +// Block indefinitely waiting for an item to arrive, +// does not ignore alerts (e.g., signals). + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail"); + + ACE_Message_Block *mb = 0; + + ACE_NEW_RETURN (mb, + ACE_Message_Block ((char *) new_item, + sizeof (*new_item), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY), + -1); + + int const result = this->queue_.enqueue_tail (mb, timeout); + if (result == -1) + // Zap the message. + mb->release (); + return result; +} + +// Remove an item from the front of the queue. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_head"); + + ACE_Message_Block *mb = 0; + + int const cur_count = this->queue_.dequeue_head (mb, timeout); + + // Dequeue the message. + if (cur_count != -1) + { + first_item = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ()); + // Delete the message block. + mb->release (); + } + + return cur_count; +} + +// Remove the item with the lowest priority from the queue. If timeout == 0 +// block indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio (ACE_MESSAGE_TYPE *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_prio"); + + ACE_Message_Block *mb = 0; + + int const cur_count = this->queue_.dequeue_prio (mb, timeout); + + // Dequeue the message. + if (cur_count != -1) + { + dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ()); + // Delete the message block. + mb->release (); + } + + return cur_count; +} + +// Remove an item from the end of the queue. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail (ACE_MESSAGE_TYPE *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_tail"); + + ACE_Message_Block *mb = 0; + + int const cur_count = this->queue_.dequeue_tail (mb, timeout); + + // Dequeue the message. + if (cur_count != -1) + { + dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ()); + // Delete the message block. + mb->release (); + } + + return cur_count; +} + +// Remove an item with the lowest deadline time. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline (ACE_MESSAGE_TYPE *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue_deadline"); + + ACE_Message_Block *mb = 0; + + int const cur_count = this->queue_.dequeue_deadline (mb, timeout); + + // Dequeue the message. + if (cur_count != -1) + { + dequeued = reinterpret_cast<ACE_MESSAGE_TYPE *> (mb->base ()); + // Delete the message block. + mb->release (); + } + + return cur_count; +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notify"); + + return this->queue_.notify (); +} + + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> +ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N + (size_t high_water_mark, + size_t low_water_mark, + ACE_Notification_Strategy *ns): + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE> (high_water_mark, + low_water_mark, + ns) +{ + ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::ACE_Message_Queue_Ex_N"); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> +ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::~ACE_Message_Queue_Ex_N"); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head + (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_head"); + + // Create a chained ACE_Message_Blocks wrappers around the 'chained' + // ACE_MESSAGE_TYPES. + ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item); + if (0 == mb) + { + return -1; + } + + int result = this->queue_.enqueue_head (mb, timeout); + if (-1 == result) + { + // Zap the messages. + mb->release (); + } + return result; +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail + (ACE_MESSAGE_TYPE *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::enqueue_tail"); + + // Create a chained ACE_Message_Blocks wrappers around the 'chained' + // ACE_MESSAGE_TYPES. + ACE_Message_Block *mb = this->wrap_with_mbs_i (new_item); + if (0 == mb) + { + return -1; + } + + int result = this->queue_.enqueue_tail (mb, timeout); + if (-1 == result) + { + // Zap the message. + mb->release (); + } + return result; +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Message_Block * +ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i + (ACE_MESSAGE_TYPE *new_item) +{ + ACE_TRACE ("ACE_Message_Queue_Ex_N<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::wrap_with_mbs_i"); + + // We need to keep a reference to the head of the chain + ACE_Message_Block *mb_head = 0; + + ACE_NEW_RETURN (mb_head, + ACE_Message_Block ((char *) new_item, + sizeof (*new_item), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY), + 0); + + // mb_tail will point to the last ACE_Message_Block + ACE_Message_Block *mb_tail = mb_head; + + // Run through rest of the messages and wrap them + for (ACE_MESSAGE_TYPE *pobj = new_item->next (); pobj; pobj = pobj->next ()) + { + ACE_Message_Block *mb_temp = 0; + ACE_NEW_NORETURN (mb_temp, + ACE_Message_Block ((char *) pobj, + sizeof (*pobj), + ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::DEFAULT_PRIORITY)); + if (mb_temp == 0) + { + mb_head->release (); + mb_head = 0; + break; + } + + mb_tail->next (mb_temp); + mb_tail = mb_temp; + } + + return mb_head; +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator) + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue (ACE_MESSAGE_TYPE *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::dequeue"); + + return this->dequeue_head (first_item, timeout); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_Notification_Strategy * +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy"); + + return this->queue_.notification_strategy (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::notification_strategy"); + + this->queue_.notification_strategy (s); +} + +// Check if queue is empty (holds locks). + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_empty"); + + return this->queue_.is_empty (); +} + +// Check if queue is full (holds locks). + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::is_full"); + + return this->queue_.is_full (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark"); + + return this->queue_.high_water_mark (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark (size_t hwm) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::high_water_mark"); + + this->queue_.high_water_mark (hwm); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark"); + + return this->queue_.low_water_mark (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> void +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark (size_t lwm) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::low_water_mark"); + + this->queue_.low_water_mark (lwm); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_bytes"); + + return this->queue_.message_bytes (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_length"); + + return this->queue_.message_length (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> size_t +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::message_count"); + + return this->queue_.message_count (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivate"); + + return this->queue_.deactivate (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::activate"); + + return this->queue_.activate (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::pulse"); + + return this->queue_.pulse (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::deactivated"); + + return this->queue_.deactivated (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> int +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state (void) +{ + ACE_TRACE ("ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::state"); + + return this->queue_.state (); +} + +template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T & +ACE_Message_Queue_Ex<ACE_MESSAGE_TYPE, ACE_SYNCH_USE>::lock (void) +{ + return this->queue_.lock (); +} + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) + : queue_ (q), + curr_ (q.head_) +{ +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + if (this->curr_ != 0) + { + entry = this->curr_; + return 1; + } + + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::done (void) const +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + return this->curr_ == 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::advance (void) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + if (this->curr_) + this->curr_ = this->curr_->next (); + return this->curr_ != 0; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) +#endif /* ACE_HAS_DUMP */ +} + +ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator) + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q) + : queue_ (q), + curr_ (queue_.tail_) +{ +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::next (ACE_Message_Block *&entry) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + if (this->curr_ != 0) + { + entry = this->curr_; + return 1; + } + + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::done (void) const +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + return this->curr_ == 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::advance (void) +{ + ACE_READ_GUARD_RETURN (ACE_SYNCH_MUTEX_T, m, this->queue_.lock_, -1) + + if (this->curr_) + this->curr_ = this->curr_->prev (); + return this->curr_ != 0; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) +#endif /* ACE_HAS_DUMP */ +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue"); + return this->dequeue_head (first_item, timeout); +} + +template <ACE_SYNCH_DECL> ACE_Notification_Strategy * +ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); + + return this->notification_strategy_; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (ACE_Notification_Strategy *s) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy"); + + this->notification_strategy_ = s; +} + +// Check if queue is empty (does not hold locks). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty_i"); + return this->tail_ == 0; +} + +// Check if queue is full (does not hold locks). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full_i"); + return this->cur_bytes_ >= this->high_water_mark_; +} + +// Check if queue is empty (holds locks). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::is_empty (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_empty"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->is_empty_i (); +} + +// Check if queue is full (holds locks). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::is_full (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::is_full"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->is_full_i (); +} + +template <ACE_SYNCH_DECL> size_t +ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->high_water_mark_; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark (size_t hwm) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::high_water_mark"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->high_water_mark_ = hwm; +} + +template <ACE_SYNCH_DECL> size_t +ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->low_water_mark_; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark (size_t lwm) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::low_water_mark"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->low_water_mark_ = lwm; +} + +template <ACE_SYNCH_DECL> size_t +ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->cur_bytes_; +} + +template <ACE_SYNCH_DECL> size_t +ACE_Message_Queue<ACE_SYNCH_USE>::message_length (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->cur_length_; +} + +template <ACE_SYNCH_DECL> size_t +ACE_Message_Queue<ACE_SYNCH_USE>::message_count (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_count"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, 0); + + return this->cur_count_; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivate () +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->deactivate_i (0); // Not a pulse +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::activate (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->activate_i (); +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::pulse () +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::pulse"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + return this->deactivate_i (1); // Just a pulse +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivated"); + + return this->state_ == ACE_Message_Queue_Base::DEACTIVATED; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::state (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::state"); + + return this->state_; +} + +template <ACE_SYNCH_DECL> ACE_SYNCH_MUTEX_T & +ACE_Message_Queue<ACE_SYNCH_USE>::lock (void) +{ + return this->lock_; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + switch (this->state_) + { + case ACE_Message_Queue_Base::ACTIVATED: + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("state = ACTIVATED\n"))); + break; + case ACE_Message_Queue_Base::DEACTIVATED: + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("state = DEACTIVATED\n"))); + break; + case ACE_Message_Queue_Base::PULSED: + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("state = PULSED\n"))); + break; + } + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("low_water_mark = %d\n") + ACE_LIB_TEXT ("high_water_mark = %d\n") + ACE_LIB_TEXT ("cur_bytes = %d\n") + ACE_LIB_TEXT ("cur_length = %d\n") + ACE_LIB_TEXT ("cur_count = %d\n") + ACE_LIB_TEXT ("head_ = %u\n") + ACE_LIB_TEXT ("tail_ = %u\n"), + this->low_water_mark_, + this->high_water_mark_, + this->cur_bytes_, + this->cur_length_, + this->cur_count_, + this->head_, + this->tail_)); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_full_cond: \n"))); + not_full_cond_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("not_empty_cond: \n"))); + not_empty_cond_.dump (); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_bytes"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->cur_bytes_ = new_value; +} + +template <ACE_SYNCH_DECL> void +ACE_Message_Queue<ACE_SYNCH_USE>::message_length (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::message_length"); + ACE_GUARD (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_); + + this->cur_length_ = new_value; +} + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) + : not_empty_cond_ (this->lock_), + not_full_cond_ (this->lock_) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::ACE_Message_Queue"); + + if (this->open (hwm, lwm, ns) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("open"))); +} + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::~ACE_Message_Queue"); + if (this->head_ != 0 && this->close () == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("close"))); +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::flush_i (void) +{ + int number_flushed = 0; + + // Remove all the <ACE_Message_Block>s in the <ACE_Message_Queue> + // and <release> their memory. + for (this->tail_ = 0; this->head_ != 0; ) + { + ++number_flushed; + + size_t mb_bytes = 0; + size_t mb_length = 0; + this->head_->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + ACE_Message_Block *temp = this->head_; + this->head_ = this->head_->next (); + + // Make sure to use <release> rather than <delete> since this is + // reference counted. + temp->release (); + } + + return number_flushed; +} + +// Don't bother locking since if someone calls this function more than +// once for the same queue, we're in bigger trouble than just +// concurrency control! + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::open (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::open"); + this->high_water_mark_ = hwm; + this->low_water_mark_ = lwm; + this->state_ = ACE_Message_Queue_Base::ACTIVATED; + this->cur_bytes_ = 0; + this->cur_length_ = 0; + this->cur_count_ = 0; + this->tail_ = 0; + this->head_ = 0; + this->notification_strategy_ = ns; + return 0; +} + +// Implementation of the public deactivate() method +// (assumes locks are held). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i (int pulse) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::deactivate_i"); + int const previous_state = this->state_; + + if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) + { + // Wakeup all waiters. + this->not_empty_cond_.broadcast (); + this->not_full_cond_.broadcast (); + + if (pulse) + this->state_ = ACE_Message_Queue_Base::PULSED; + else + this->state_ = ACE_Message_Queue_Base::DEACTIVATED; + } + return previous_state; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::activate_i (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::activate_i"); + int const previous_state = this->state_; + this->state_ = ACE_Message_Queue_Base::ACTIVATED; + return previous_state; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::flush (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::flush"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + // Free up the remaining messages on the queue. + return this->flush_i (); +} + +// Clean up the queue if we have not already done so! + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::close (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::close"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + int const result = this->deactivate_i (); + + // Free up the remaining messages on the queue. + this->flush_i (); + + return result; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::signal_enqueue_waiters (void) +{ + if (this->not_full_cond_.signal () != 0) + return -1; + return 0; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::signal_dequeue_waiters (void) +{ + // Tell any blocked threads that the queue has a new item! + if (this->not_empty_cond_.signal () != 0) + return -1; + return 0; +} + +// Actually put the node at the end (no locking so must be called with +// locks held). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail_i"); + + if (new_item == 0) + return -1; + + // Update the queued size and length, taking into account any chained + // blocks (total_size_and_length() counts all continuation blocks). + // Keep count of how many blocks we're adding and, if there is a chain of + // blocks, find the end in seq_tail and be sure they're properly + // back-connected along the way. + ACE_Message_Block *seq_tail = new_item; + ++this->cur_count_; + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + while (seq_tail->next () != 0) + { + seq_tail->next ()->prev (seq_tail); + seq_tail = seq_tail->next (); + ++this->cur_count_; + seq_tail->total_size_and_length (this->cur_bytes_, + this->cur_length_); + } + + // List was empty, so build a new one. + if (this->tail_ == 0) + { + this->head_ = new_item; + this->tail_ = seq_tail; + // seq_tail->next (0); This is a condition of the while() loop above. + new_item->prev (0); + } + // Link at the end. + else + { + // seq_tail->next (0); This is a condition of the while() loop above. + this->tail_->next (new_item); + new_item->prev (this->tail_); + this->tail_ = seq_tail; + } + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Actually put the node(s) at the head (no locking) + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head_i"); + + if (new_item == 0) + return -1; + + // Update the queued size and length, taking into account any chained + // blocks (total_size_and_length() counts all continuation blocks). + // Keep count of how many blocks we're adding and, if there is a chain of + // blocks, find the end in seq_tail and be sure they're properly + // back-connected along the way. + ACE_Message_Block *seq_tail = new_item; + ++this->cur_count_; + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + while (seq_tail->next () != 0) + { + seq_tail->next ()->prev (seq_tail); + seq_tail = seq_tail->next (); + ++this->cur_count_; + seq_tail->total_size_and_length (this->cur_bytes_, + this->cur_length_); + } + + new_item->prev (0); + seq_tail->next (this->head_); + + if (this->head_ != 0) + this->head_->prev (seq_tail); + else + this->tail_ = seq_tail; + + this->head_ = new_item; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Actually put the node at its proper position relative to its +// priority. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + if (new_item == 0) + return -1; + + // Since this method uses enqueue_head_i() and enqueue_tail_i() for + // special situations, and this method doesn't support enqueueing + // chains of blocks off the 'next' pointer, make sure the new_item's + // next pointer is 0. + new_item->next (0); + + if (this->head_ == 0) + // Check for simple case of an empty queue, where all we need to + // do is insert <new_item> into the head. + return this->enqueue_head_i (new_item); + else + { + ACE_Message_Block *temp = 0; + + // Figure out where the new item goes relative to its priority. + // We start looking from the lowest priority (at the tail) to + // the highest priority (at the head). + + for (temp = this->tail_; + temp != 0; + temp = temp->prev ()) + if (temp->msg_priority () >= new_item->msg_priority ()) + // Break out when we've located an item that has + // greater or equal priority. + break; + + if (temp == 0) + // Check for simple case of inserting at the head of the queue, + // where all we need to do is insert <new_item> before the + // current head. + return this->enqueue_head_i (new_item); + else if (temp->next () == 0) + // Check for simple case of inserting at the tail of the + // queue, where all we need to do is insert <new_item> after + // the current tail. + return this->enqueue_tail_i (new_item); + else + { + // Insert the new message behind the message of greater or + // equal priority. This ensures that FIFO order is + // maintained when messages of the same priority are + // inserted consecutively. + new_item->prev (temp); + new_item->next (temp->next ()); + temp->next ()->prev (new_item); + temp->next (new_item); + } + } + + // Make sure to count all the bytes in a composite message!!! + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + ++this->cur_count_; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Actually put the node at its proper position relative to its +// deadline time. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i (ACE_Message_Block *new_item) +{ +#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline_i"); + + if (new_item == 0) + return -1; + + // Since this method uses enqueue_head_i() and enqueue_tail_i() for + // special situations, and this method doesn't support enqueueing + // chains of blocks off the 'next' pointer, make sure the new_item's + // next pointer is 0. + new_item->next (0); + + if (this->head_ == 0) + // Check for simple case of an empty queue, where all we need to + // do is insert <new_item> into the head. + return this->enqueue_head_i (new_item); + else + { + ACE_Message_Block *temp = 0; + + // Figure out where the new item goes relative to its priority. + // We start looking from the smallest deadline to the highest + // deadline. + + for (temp = this->head_; + temp != 0; + temp = temp->next ()) + if (new_item->msg_deadline_time () < temp->msg_deadline_time ()) + // Break out when we've located an item that has + // greater or equal priority. + break; + + if (temp == 0 || temp->next () == 0) + // Check for simple case of inserting at the tail of the queue, + // where all we need to do is insert <new_item> after the + // current tail. + return this->enqueue_tail_i (new_item); + else + { + // Insert the new message behind the message of + // lesser or equal deadline time. This ensures that FIFO order is + // maintained when messages of the same priority are + // inserted consecutively. + new_item->prev (temp); + new_item->next (temp->next ()); + temp->next ()->prev (new_item); + temp->next (new_item); + } + } + + // Make sure to count all the bytes in a composite message!!! + new_item->total_size_and_length (this->cur_bytes_, + this->cur_length_); + ++this->cur_count_; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return this->cur_count_; +#else + return this->enqueue_tail_i (new_item); +#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */ +} + +// Actually get the first ACE_Message_Block (no locking, so must be +// called with locks held). This method assumes that the queue has at +// least one item in it when it is called. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +{ + if (this->head_ ==0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Attempting to dequeue from empty queue")), + -1); + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + first_item = this->head_; + this->head_ = this->head_->next (); + + if (this->head_ == 0) + this->tail_ = 0; + else + // The prev pointer of first message block must point to 0... + this->head_->prev (0); + + size_t mb_bytes = 0; + size_t mb_length = 0; + first_item->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + if (this->cur_count_ == 0 && this->head_ == this->tail_) + this->head_ = this->tail_ = 0; + + // Make sure that the prev and next fields are 0! + first_item->prev (0); + first_item->next (0); + + // Only signal enqueueing threads if we've fallen below the low + // water mark. + if (this->cur_bytes_ <= this->low_water_mark_ + && this->signal_enqueue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Get the earliest (i.e., FIFO) ACE_Message_Block with the lowest +// priority (no locking, so must be called with locks held). This +// method assumes that the queue has at least one item in it when it +// is called. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i (ACE_Message_Block *&dequeued) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio_i"); + + if (this->head_ == 0) + return -1; + + // Find the earliest (i.e., FIFO) message enqueued with the lowest + // priority. + ACE_Message_Block *chosen = 0; + u_long priority = ULONG_MAX; + + for (ACE_Message_Block *temp = this->tail_; + temp != 0; + temp = temp->prev ()) + { + // Find the first version of the earliest message (i.e., + // preserve FIFO order for messages at the same priority). + if (temp->msg_priority () <= priority) + { + priority = temp->msg_priority (); + chosen = temp; + } + } + + // If every message block is the same priority, pass back the first + // one. + if (chosen == 0) + chosen = this->head_; + + // Patch up the queue. If we don't have a previous then we are at + // the head of the queue. + if (chosen->prev () == 0) + this->head_ = chosen->next (); + else + chosen->prev ()->next (chosen->next ()); + + if (chosen->next () == 0) + this->tail_ = chosen->prev (); + else + chosen->next ()->prev (chosen->prev ()); + + // Pass back the chosen block + dequeued = chosen; + + size_t mb_bytes = 0; + size_t mb_length = 0; + dequeued->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + if (this->cur_count_ == 0 && this->head_ == this->tail_) + this->head_ = this->tail_ = 0; + + // Make sure that the prev and next fields are 0! + dequeued->prev (0); + dequeued->next (0); + + // Only signal enqueueing threads if we've fallen below the low + // water mark. + if (this->cur_bytes_ <= this->low_water_mark_ + && this->signal_enqueue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Actually get the last ACE_Message_Block (no locking, so must be +// called with locks held). This method assumes that the queue has at +// least one item in it when it is called. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i (ACE_Message_Block *&dequeued) +{ + if (this->head_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Attempting to dequeue from empty queue")), + -1); + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail_i"); + dequeued = this->tail_; + if (this->tail_->prev () == 0) + { + this->head_ = 0; + this->tail_ = 0; + } + else + { + this->tail_->prev ()->next (0); + this->tail_ = this->tail_->prev (); + } + + size_t mb_bytes = 0; + size_t mb_length = 0; + dequeued->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + if (this->cur_count_ == 0 && this->head_ == this->tail_) + this->head_ = this->tail_ = 0; + + // Make sure that the prev and next fields are 0! + dequeued->prev (0); + dequeued->next (0); + + // Only signal enqueueing threads if we've fallen below the low + // water mark. + if (this->cur_bytes_ <= this->low_water_mark_ + && this->signal_enqueue_waiters () == -1) + return -1; + else + return ACE_Utils::Truncate (this->cur_count_); +} + +// Actually get the ACE_Message_Block with the lowest deadline time +// (no locking, so must be called with locks held). This method assumes +// that the queue has at least one item in it when it is called. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i (ACE_Message_Block *&dequeued) +{ +#if defined (ACE_HAS_TIMED_MESSAGE_BLOCKS) + if (this->head_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Attempting to dequeue from empty queue")), + -1); + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline_i"); + + // Find the last message enqueued with the lowest deadline time + ACE_Message_Block* chosen = 0; + ACE_Time_Value deadline = ACE_Time_Value::max_time; + for (ACE_Message_Block *temp = this->head_; temp != 0; temp = temp->next ()) + if (temp->msg_deadline_time () < deadline) + { + deadline = temp->msg_deadline_time (); + chosen = temp; + } + + // If every message block is the same deadline time, + // pass back the first one + if (chosen == 0) + chosen = this->head_; + + // Patch up the queue. If we don't have a previous + // then we are at the head of the queue. + if (chosen->prev () == 0) + this->head_ = chosen->next (); + else + chosen->prev ()->next (chosen->next ()); + + if (chosen->next () == 0) + this->tail_ = chosen->prev (); + else + chosen->next ()->prev (chosen->prev ()); + + // Pass back the chosen block + dequeued = chosen; + + size_t mb_bytes = 0; + size_t mb_length = 0; + dequeued->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + if (this->cur_count_ == 0 && this->head_ == this->tail_) + this->head_ = this->tail_ = 0; + + // Make sure that the prev and next fields are 0! + dequeued->prev (0); + dequeued->next (0); + + // Only signal enqueueing threads if we've fallen below the low + // water mark. + if (this->cur_bytes_ <= this->low_water_mark_ + && this->signal_enqueue_waiters () == -1) + return -1; + else + return this->cur_count_; +#else + return this->dequeue_head_i (dequeued); +#endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */ +} + +// Take a look at the first item without removing it. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + // Wait for at least one item to become available. + + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + return -1; + + first_item = this->head_; + return ACE_Utils::Truncate (this->cur_count_); +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &, + ACE_Time_Value *timeout) +{ + int result = 0; + + // Wait while the queue is full. + + while (this->is_full_i ()) + { + if (this->not_full_cond_.wait (timeout) == -1) + { + if (errno == ETIME) + errno = EWOULDBLOCK; + result = -1; + break; + } + if (this->state_ != ACE_Message_Queue_Base::ACTIVATED) + { + errno = ESHUTDOWN; + result = -1; + break; + } + } + return result; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond + (ACE_Guard<ACE_SYNCH_MUTEX_T> &, ACE_Time_Value *timeout) +{ + int result = 0; + + // Wait while the queue is empty. + + while (this->is_empty_i ()) + { + if (this->not_empty_cond_.wait (timeout) == -1) + { + if (errno == ETIME) + errno = EWOULDBLOCK; + result = -1; + break; + } + if (this->state_ != ACE_Message_Queue_Base::ACTIVATED) + { + errno = ESHUTDOWN; + result = -1; + break; + } + } + return result; +} + +// Block indefinitely waiting for an item to arrive, does not ignore +// alerts (e.g., signals). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); + int queue_count = 0; + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, timeout) == -1) + return -1; + + queue_count = this->enqueue_head_i (new_item); + + if (queue_count == -1) + return -1; + + this->notify (); + } + return queue_count; +} + +// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in +// accordance with its <msg_priority> (0 is lowest priority). Returns +// -1 on failure, else the number of items still on the queue. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio"); + int queue_count = 0; + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, timeout) == -1) + return -1; + + queue_count = this->enqueue_i (new_item); + + if (queue_count == -1) + return -1; + + this->notify (); + } + return queue_count; +} + +// Enqueue an <ACE_Message_Block *> into the <Message_Queue> in +// accordance with its <msg_deadline_time>. Returns +// -1 on failure, else the number of items still on the queue. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_deadline"); + int queue_count = 0; + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, timeout) == -1) + return -1; + + queue_count = this->enqueue_deadline_i (new_item); + + if (queue_count == -1) + return -1; + + this->notify (); + } + return queue_count; +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue"); + return this->enqueue_prio (new_item, timeout); +} + +// Block indefinitely waiting for an item to arrive, +// does not ignore alerts (e.g., signals). + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); + int queue_count = 0; + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_full_cond (ace_mon, timeout) == -1) + return -1; + + queue_count = this->enqueue_tail_i (new_item); + + if (queue_count == -1) + return -1; + + this->notify (); + } + return queue_count; +} + +// Remove an item from the front of the queue. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + return -1; + + return this->dequeue_head_i (first_item); +} + +// Remove item with the lowest priority from the queue. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio (ACE_Message_Block *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_prio"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + return -1; + + return this->dequeue_prio_i (dequeued); +} + +// Remove an item from the end of the queue. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail (ACE_Message_Block *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_tail"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + return -1; + + return this->dequeue_tail_i (dequeued); +} + +// Remove an item with the lowest deadline time. If timeout == 0 block +// indefinitely (or until an alert occurs). Otherwise, block for upto +// the amount of time specified by timeout. + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline (ACE_Message_Block *&dequeued, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_deadline"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + return -1; + + return this->dequeue_deadline_i (dequeued); +} + +template <ACE_SYNCH_DECL> int +ACE_Message_Queue<ACE_SYNCH_USE>::notify (void) +{ + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notify"); + + // By default, don't do anything. + if (this->notification_strategy_ == 0) + return 0; + else + return this->notification_strategy_->notify (); +} + + +// = Initialization and termination methods. +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, + size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) + : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), + pending_head_ (0), + pending_tail_ (0), + late_head_ (0), + late_tail_ (0), + beyond_late_head_ (0), + beyond_late_tail_ (0), + message_strategy_ (message_strategy) +{ + // Note, the ACE_Dynamic_Message_Queue assumes full responsibility + // for the passed ACE_Dynamic_Message_Strategy object, and deletes + // it in its own dtor +} + +// dtor: free message strategy and let base class dtor do the rest. + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) +{ + delete &this->message_strategy_; +} + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (ACE_Message_Block *&list_head, + ACE_Message_Block *&list_tail, + u_int status_flags) +{ + // start with an empty list + list_head = 0; + list_tail = 0; + + // Get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // Refresh priority status boundaries in the queue. + int result = this->refresh_queue (current_time); + if (result < 0) + return result; + + if (ACE_BIT_ENABLED (status_flags, + (u_int) ACE_Dynamic_Message_Strategy::PENDING) + && this->pending_head_ + && this->pending_tail_) + { + // patch up pointers for the new tail of the queue + if (this->pending_head_->prev ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_->prev ()->next (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // point to the head and tail of the list + list_head = this->pending_head_; + list_tail = this->pending_tail_; + + // cut the pending messages out of the queue entirely + this->pending_head_->prev (0); + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + + if (ACE_BIT_ENABLED (status_flags, + (u_int) ACE_Dynamic_Message_Strategy::LATE) + && this->late_head_ + && this->late_tail_) + { + // Patch up pointers for the (possibly) new head and tail of the + // queue. + if (this->late_tail_->next ()) + this->late_tail_->next ()->prev (this->late_head_->prev ()); + else + this->tail_ = this->late_head_->prev (); + + if (this->late_head_->prev ()) + this->late_head_->prev ()->next (this->late_tail_->next ()); + else + this->head_ = this->late_tail_->next (); + + // put late messages behind pending messages (if any) being returned + this->late_head_->prev (list_tail); + if (list_tail) + list_tail->next (this->late_head_); + else + list_head = this->late_head_; + + list_tail = this->late_tail_; + + this->late_tail_->next (0); + this->late_head_ = 0; + this->late_tail_ = 0; + } + + if (ACE_BIT_ENABLED (status_flags, + (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) + && this->beyond_late_head_ + && this->beyond_late_tail_) + { + // Patch up pointers for the new tail of the queue + if (this->beyond_late_tail_->next ()) + { + this->head_ = this->beyond_late_tail_->next (); + this->beyond_late_tail_->next ()->prev (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // Put beyond late messages at the end of the list being + // returned. + if (list_tail) + { + this->beyond_late_head_->prev (list_tail); + list_tail->next (this->beyond_late_head_); + } + else + list_head = this->beyond_late_head_; + + list_tail = this->beyond_late_tail_; + + this->beyond_late_tail_->next (0); + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } + + // Decrement message and size counts for removed messages. + ACE_Message_Block *temp1; + + for (temp1 = list_head; + temp1 != 0; + temp1 = temp1->next ()) + { + --this->cur_count_; + + size_t mb_bytes = 0; + size_t mb_length = 0; + temp1->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + } + + return result; +} + +// Detach all messages with status given in the passed flags from the +// queue and return them by setting passed head and tail pointers to +// the linked list they comprise. This method is intended primarily +// as a means of periodically harvesting messages that have missed +// their deadlines, but is available in its most general form. All +// messages are returned in priority order, from head to tail, as of +// the time this method was called. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; + return -1; + } + + int result; + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh priority status boundaries in the queue + result = this->refresh_queue (current_time); + if (result < 0) + return result; + + // *now* it's appropriate to wait for an enqueued item + result = this->wait_not_empty_cond (ace_mon, timeout); + if (result == -1) + return result; + + // call the internal dequeue method, which selects an item from the + // highest priority status portion of the queue that has messages + // enqueued. + result = this->dequeue_head_i (first_item); + + return result; +} + +// Dequeue and return the <ACE_Message_Block *> at the (logical) head +// of the queue. + +template <ACE_SYNCH_DECL> void +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); + this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); + + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT ("pending_head_ = %u\n") + ACE_LIB_TEXT ("pending_tail_ = %u\n") + ACE_LIB_TEXT ("late_head_ = %u\n") + ACE_LIB_TEXT ("late_tail_ = %u\n") + ACE_LIB_TEXT ("beyond_late_head_ = %u\n") + ACE_LIB_TEXT ("beyond_late_tail_ = %u\n"), + this->pending_head_, + this->pending_tail_, + this->late_head_, + this->late_tail_, + this->beyond_late_head_, + this->beyond_late_tail_)); + + ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("message_strategy_ : \n"))); + message_strategy_.dump (); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + // dump the state of the queue + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + if (new_item == 0) + return -1; + + int result = 0; + + // Get the current time. + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // Refresh priority status boundaries in the queue. + + result = this->refresh_queue (current_time); + if (result < 0) + return result; + + // Where we enqueue depends on the message's priority status. + switch (message_strategy_.priority_status (*new_item, + current_time)) + { + case ACE_Dynamic_Message_Strategy::PENDING: + if (this->pending_tail_ == 0) + { + // Check for simple case of an empty pending queue, where + // all we need to do is insert <new_item> into the tail of + // the queue. + pending_head_ = new_item; + pending_tail_ = pending_head_; + return this->enqueue_tail_i (new_item); + } + else + { + // Enqueue the new message in priority order in the pending + // sublist + result = sublist_enqueue_i (new_item, + current_time, + this->pending_head_, + this->pending_tail_, + ACE_Dynamic_Message_Strategy::PENDING); + } + break; + + case ACE_Dynamic_Message_Strategy::LATE: + if (this->late_tail_ == 0) + { + late_head_ = new_item; + late_tail_ = late_head_; + + if (this->pending_head_ == 0) + // Check for simple case of an empty pending queue, + // where all we need to do is insert <new_item> into the + // tail of the queue. + return this->enqueue_tail_i (new_item); + else if (this->beyond_late_tail_ == 0) + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + return this->enqueue_head_i (new_item); + else + { + // Otherwise, we can just splice the new message in + // between the pending and beyond late portions of the + // queue. + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->pending_head_->prev (new_item); + new_item->next (this->pending_head_); + } + } + else + { + // Enqueue the new message in priority order in the late + // sublist + result = sublist_enqueue_i (new_item, + current_time, + this->late_head_, + this->late_tail_, + ACE_Dynamic_Message_Strategy::LATE); + } + break; + + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: + if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, + // where all we need to do is insert <new_item> into the + // head of the queue. + beyond_late_head_ = new_item; + beyond_late_tail_ = beyond_late_head_; + return this->enqueue_head_i (new_item); + } + else + { + // all beyond late messages have the same (zero) priority, + // so just put the new one at the end of the beyond late + // messages + if (this->beyond_late_tail_->next ()) + this->beyond_late_tail_->next ()->prev (new_item); + else + this->tail_ = new_item; + + new_item->next (this->beyond_late_tail_->next ()); + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->beyond_late_tail_ = new_item; + } + + break; + + // should never get here, but just in case... + default: + result = -1; + break; + } + + if (result < 0) + return result; + + size_t mb_bytes = 0; + size_t mb_length = 0; + new_item->total_size_and_length (mb_bytes, + mb_length); + this->cur_bytes_ += mb_bytes; + this->cur_length_ += mb_length; + ++this->cur_count_; + + if (this->signal_dequeue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Enqueue an <ACE_Message_Block *> in accordance with its priority. +// priority may be *dynamic* or *static* or a combination or *both* It +// calls the priority evaluation function passed into the Dynamic +// Message Queue constructor to update the priorities of all enqueued +// messages. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item, + const ACE_Time_Value ¤t_time, + ACE_Message_Block *&sublist_head, + ACE_Message_Block *&sublist_tail, + ACE_Dynamic_Message_Strategy::Priority_Status status) +{ + int result = 0; + ACE_Message_Block *current_item = 0; + + // Find message after which to enqueue new item, based on message + // priority and priority status. + for (current_item = sublist_tail; + current_item; + current_item = current_item->prev ()) + { + if (message_strategy_.priority_status (*current_item, current_time) == status) + { + if (current_item->msg_priority () >= new_item->msg_priority ()) + break; + } + else + { + sublist_head = new_item; + break; + } + } + + if (current_item == 0) + { + // If the new message has highest priority of any, put it at the + // head of the list (and sublist). + new_item->prev (0); + new_item->next (this->head_); + if (this->head_ != 0) + this->head_->prev (new_item); + else + { + this->tail_ = new_item; + sublist_tail = new_item; + } + this->head_ = new_item; + sublist_head = new_item; + } + else + { + // insert the new item into the list + new_item->next (current_item->next ()); + new_item->prev (current_item); + + if (current_item->next ()) + current_item->next ()->prev (new_item); + else + this->tail_ = new_item; + + current_item->next (new_item); + + // If the new item has lowest priority of any in the sublist, + // move the tail pointer of the sublist back to the new item + if (current_item == sublist_tail) + sublist_tail = new_item; + } + + return result; +} + +// Enqueue a message in priority order within a given priority status +// sublist. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + + int result = 0; + int last_in_subqueue = 0; + + // first, try to dequeue from the head of the pending list + if (this->pending_head_) + { + first_item = this->pending_head_; + + if (0 == this->pending_head_->prev ()) + this->head_ = this->pending_head_->next (); + else + this->pending_head_->prev ()->next (this->pending_head_->next ()); + + if (0 == this->pending_head_->next ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + else + { + this->pending_head_->next ()->prev (this->pending_head_->prev ()); + this->pending_head_ = this->pending_head_->next (); + } + + first_item->prev (0); + first_item->next (0); + } + + // Second, try to dequeue from the head of the late list + else if (this->late_head_) + { + last_in_subqueue = this->late_head_ == this->late_tail_ ? 1 : 0; + + first_item = this->late_head_; + + if (0 == this->late_head_->prev ()) + this->head_ = this->late_head_->next (); + else + this->late_head_->prev ()->next (this->late_head_->next ()); + + if (0 == this->late_head_->next ()) + this->tail_ = this->late_head_->prev (); + else + { + this->late_head_->next ()->prev (this->late_head_->prev ()); + this->late_head_ = this->late_head_->next (); + } + + if (last_in_subqueue) + { + this->late_head_ = 0; + this->late_tail_ = 0; + } + + first_item->prev (0); + first_item->next (0); + } + // finally, try to dequeue from the head of the beyond late list + else if (this->beyond_late_head_) + { + last_in_subqueue = + (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; + + first_item = this->beyond_late_head_; + this->head_ = this->beyond_late_head_->next (); + + if (0 == this->beyond_late_head_->next ()) + this->tail_ = this->beyond_late_head_->prev (); + else + { + this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); + this->beyond_late_head_ = this->beyond_late_head_->next (); + } + + if (last_in_subqueue) + { + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } + + first_item->prev (0); + first_item->next (0); + } + else + { + // nothing to dequeue: set the pointer to zero and return an error code + first_item = 0; + result = -1; + } + + if (result < 0) + return result; + + size_t mb_bytes = 0; + size_t mb_length = 0; + first_item->total_size_and_length (mb_bytes, + mb_length); + // Subtract off all of the bytes associated with this message. + this->cur_bytes_ -= mb_bytes; + this->cur_length_ -= mb_length; + --this->cur_count_; + + // Only signal enqueueing threads if we've fallen below the low + // water mark. + if (this->cur_bytes_ <= this->low_water_mark_ + && this->signal_enqueue_waiters () == -1) + return -1; + else + return this->cur_count_; +} + +// Dequeue and return the <ACE_Message_Block *> at the head of the +// logical queue. Attempts first to dequeue from the pending portion +// of the queue, or if that is empty from the late portion, or if that +// is empty from the beyond late portion, or if that is empty just +// sets the passed pointer to zero and returns -1. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time) +{ + int result; + + result = refresh_pending_queue (current_time); + + if (result != -1) + result = refresh_late_queue (current_time); + + return result; +} + +// Refresh the queue using the strategy specific priority status +// function. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time) +{ + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + // refresh priority status boundaries in the queue + if (this->pending_head_) + { + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); + switch (current_status) + { + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: + // Make sure the head of the beyond late queue is set (there + // may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; + + // Zero out the late queue pointers, and set them only if + // there turn out to be late messages in the pending sublist + this->late_head_ = 0; + this->late_tail_ = 0; + + // Advance through the beyond late messages in the pending queue + do + { + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); + else + break; // do while + + } + while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->pending_head_) + { + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->pending_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + // there are no late messages left in the queue + break; // switch + else if (current_status != ACE_Dynamic_Message_Strategy::LATE) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + /* FALLTHRU */ + } + else + { + // There are no pending or late messages left in the + // queue. + this->beyond_late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; + break; // switch + } + + case ACE_Dynamic_Message_Strategy::LATE: + // Make sure the head of the late queue is set (there may + // not have been any late messages previously, or they may + // have all become beyond late). + if (this->late_head_ == 0) + this->late_head_ = this->pending_head_; + + // advance through the beyond late messages in the pending queue + do + { + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); + else + break; // do while + + } + while (current_status == ACE_Dynamic_Message_Strategy::LATE); + + if (this->pending_head_) + { + if (current_status != ACE_Dynamic_Message_Strategy::PENDING) + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), + (int) current_status), + -1); + + // Point tail of late sublist to previous item + this->late_tail_ = this->pending_head_->prev (); + } + else + { + // there are no pending messages left in the queue + this->late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + + break; // switch + case ACE_Dynamic_Message_Strategy::PENDING: + // do nothing - the pending queue is unchanged + break; // switch + default: + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ACE_LIB_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + } + } + return 0; +} + +// Refresh the pending queue using the strategy specific priority +// status function. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time) +{ + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + if (this->late_head_) + { + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + switch (current_status) + { + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: + + // make sure the head of the beyond late queue is set + // (there may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; + + // advance through the beyond late messages in the late queue + do + { + this->late_head_ = this->late_head_->next (); + + if (this->late_head_) + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + else + break; // do while + + } + while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->late_head_) + { + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->late_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + { + // there are no late messages left in the queue + this->late_head_ = 0; + this->late_tail_ = 0; + } + else if (current_status != ACE_Dynamic_Message_Strategy::LATE) + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + else + { + // there are no late messages left in the queue + this->beyond_late_tail_ = this->tail_; + this->late_head_ = 0; + this->late_tail_ = 0; + } + + break; // switch + + case ACE_Dynamic_Message_Strategy::LATE: + // do nothing - the late queue is unchanged + break; // switch + + case ACE_Dynamic_Message_Strategy::PENDING: + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Unexpected message priority status ") + ACE_LIB_TEXT ("[%d] (expected LATE or BEYOND_LATE)"), + (int) current_status), + -1); + default: + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + } + } + + return 0; +} + +// Refresh the late queue using the strategy specific priority status +// function. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, + timeout); +} + +// Private method to hide public base class method: just calls base +// class method. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); + return this->enqueue_prio (new_item, timeout); +} + +// Just call priority enqueue method: tail enqueue semantics for +// dynamic message queues are unstable: the message may or may not be +// where it was placed after the queue is refreshed prior to the next +// enqueue or dequeue operation. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); + return this->enqueue_prio (new_item, timeout); +} + +// Just call priority enqueue method: head enqueue semantics for +// dynamic message queues are unstable: the message may or may not be +// where it was placed after the queue is refreshed prior to the next +// enqueue or dequeue operation. + +template <ACE_SYNCH_DECL> +ACE_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns) +{ + ACE_Message_Queue<ACE_SYNCH_USE> *tmp = 0; + + ACE_NEW_RETURN (tmp, + ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns), + 0); + return tmp; +} + +// Factory method for a statically prioritized ACE_Message_Queue. + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Deadline_Message_Strategy *adms = 0; + + ACE_NEW_RETURN (adms, + ACE_Deadline_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0; + ACE_NEW_RETURN (tmp, + ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns), + 0); + return tmp; +} + +// Factory method for a dynamically prioritized (by time to deadline) +// ACE_Dynamic_Message_Queue. + +template <ACE_SYNCH_DECL> +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm, + size_t lwm, + ACE_Notification_Strategy *ns, + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) +{ + ACE_Laxity_Message_Strategy *alms = 0; + + ACE_NEW_RETURN (alms, + ACE_Laxity_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + dynamic_priority_max, + dynamic_priority_offset), + 0); + + ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *tmp = 0; + ACE_NEW_RETURN (tmp, + ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns), + 0); + return tmp; +} + +// Factory method for a dynamically prioritized (by laxity) +// <ACE_Dynamic_Message_Queue>. + +#if defined (ACE_VXWORKS) + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_Vx * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy *ns) +{ + ACE_Message_Queue_Vx *tmp = 0; + + ACE_NEW_RETURN (tmp, + ACE_Message_Queue_Vx (max_messages, max_message_length, ns), + 0); + return tmp; +} + // factory method for a wrapped VxWorks message queue + +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) + +template <ACE_SYNCH_DECL> +ACE_Message_Queue_NT * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads) +{ + ACE_Message_Queue_NT *tmp = 0; + + ACE_NEW_RETURN (tmp, + ACE_Message_Queue_NT (max_threads); + 0); + return tmp; +} + +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ +#endif /* defined (ACE_VXWORKS) */ + +ACE_END_VERSIONED_NAMESPACE_DECL + +#endif /* !ACE_MESSAGE_QUEUE_T_CPP */ |