diff options
author | Johnny Willemsen <jwillemsen@remedy.nl> | 2008-02-22 14:21:32 +0000 |
---|---|---|
committer | Johnny Willemsen <jwillemsen@remedy.nl> | 2008-02-22 14:21:32 +0000 |
commit | a3e24ec87b85631c0373f1cade818a45f786b59e (patch) | |
tree | 172f37f98309b7a7da016e7fd838edbc2649db2b /ACE/ace | |
parent | 0a75ac2fe413d75456bd1c03ecca5295d278bb9c (diff) | |
download | ATCD-a3e24ec87b85631c0373f1cade818a45f786b59e.tar.gz |
Diffstat (limited to 'ACE/ace')
-rw-r--r-- | ACE/ace/Makefile.am | 1 | ||||
-rw-r--r-- | ACE/ace/Message_Queue.cpp | 222 | ||||
-rw-r--r-- | ACE/ace/Message_Queue.h | 198 | ||||
-rw-r--r-- | ACE/ace/Message_Queue.inl | 124 | ||||
-rw-r--r-- | ACE/ace/Message_Queue_NT.cpp | 237 | ||||
-rw-r--r-- | ACE/ace/Message_Queue_NT.h | 231 | ||||
-rw-r--r-- | ACE/ace/Message_Queue_NT.inl | 131 | ||||
-rw-r--r-- | ACE/ace/ace.mpc | 1 |
8 files changed, 601 insertions, 544 deletions
diff --git a/ACE/ace/Makefile.am b/ACE/ace/Makefile.am index 0f7248b7199..d5bc8bf0a99 100644 --- a/ACE/ace/Makefile.am +++ b/ACE/ace/Makefile.am @@ -157,6 +157,7 @@ libACE_la_SOURCES = \ Mem_Map.cpp \ Message_Block.cpp \ Message_Queue.cpp \ + Message_Queue_NT.cpp \ Message_Queue_Vx.cpp \ Method_Request.cpp \ Msg_WFMO_Reactor.cpp \ diff --git a/ACE/ace/Message_Queue.cpp b/ACE/ace/Message_Queue.cpp index 672a1f9876a..0ce105db50b 100644 --- a/ACE/ace/Message_Queue.cpp +++ b/ACE/ace/Message_Queue.cpp @@ -7,12 +7,10 @@ #include "ace/Message_Queue.inl" #endif /* __ACE_INLINE__ */ - ACE_RCSID (ace, Message_Queue, "$Id$") - ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void) @@ -27,224 +25,4 @@ ACE_Message_Queue_Base::state (void) return this->state_; } -#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) - -ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads) - : max_cthrs_ (max_threads), - cur_thrs_ (0), - cur_bytes_ (0), - cur_length_ (0), - cur_count_ (0), - completion_port_ (ACE_INVALID_HANDLE) -{ - ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT"); - this->open (max_threads); -} - -int -ACE_Message_Queue_NT::open (DWORD max_threads) -{ - ACE_TRACE ("ACE_Message_Queue_NT::open"); - this->max_cthrs_ = max_threads; - this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE, - 0, - ACE_Message_Queue_Base::ACTIVATED, - max_threads); - return (this->completion_port_ == 0 ? -1 : 0); -} - -int -ACE_Message_Queue_NT::close (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::close"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - this->deactivate (); - return (::CloseHandle (this->completion_port_) ? 0 : -1 ); -} - -ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT"); - this->close (); -} - -int -ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *) -{ - ACE_TRACE ("ACE_Message_Queue_NT::enqueue"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED) - { - size_t const msize = new_item->total_size (); - size_t const mlength = new_item->total_length (); - // Note - we send ACTIVATED in the 3rd arg to tell the completion - // routine it's _NOT_ being woken up because of deactivate(). - ULONG_PTR state_to_post; - state_to_post = ACE_Message_Queue_Base::ACTIVATED; - if (::PostQueuedCompletionStatus (this->completion_port_, - static_cast<DWORD> (msize), - state_to_post, - reinterpret_cast<LPOVERLAPPED> (new_item))) - { - // Update the states once I succeed. - this->cur_bytes_ += msize; - this->cur_length_ += mlength; - return ACE_Utils::truncate_cast<int> (++this->cur_count_); - } - } - else - errno = ESHUTDOWN; - - // Fail to enqueue the message. - return -1; -} - -int -ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head"); - - { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - - // Make sure the MQ is not deactivated before proceeding. - if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) - { - errno = ESHUTDOWN; // Operation on deactivated MQ not allowed. - return -1; - } - else - ++this->cur_thrs_; // Increase the waiting thread count. - } - - ULONG_PTR queue_state; - DWORD msize; - // Get a message from the completion port. - int retv = ::GetQueuedCompletionStatus (this->completion_port_, - &msize, - &queue_state, - reinterpret_cast<LPOVERLAPPED *> (&first_item), - (timeout == 0 ? INFINITE : timeout->msec ())); - { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - --this->cur_thrs_; // Decrease waiting thread count. - if (retv) - { - if (queue_state == ACE_Message_Queue_Base::ACTIVATED) - { // Really get a valid MB from the queue. - --this->cur_count_; - this->cur_bytes_ -= msize; - this->cur_length_ -= first_item->total_length (); - return ACE_Utils::truncate_cast<int> (this->cur_count_); - } - else // Woken up by deactivate () or pulse (). - errno = ESHUTDOWN; - } - } - return -1; -} - -int -ACE_Message_Queue_NT::deactivate (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::deactivate"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - - int const previous_state = this->state_; - if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) - { - this->state_ = ACE_Message_Queue_Base::DEACTIVATED; - - // Get the number of shutdown messages necessary to wake up all - // waiting threads. - DWORD cntr = - this->cur_thrs_ - static_cast<DWORD> (this->cur_count_); - while (cntr-- > 0) - ::PostQueuedCompletionStatus (this->completion_port_, - 0, - this->state_, - 0); - } - return previous_state; -} - -int -ACE_Message_Queue_NT::activate (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::activate"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - int const previous_status = this->state_; - this->state_ = ACE_Message_Queue_Base::ACTIVATED; - return previous_status; -} - -int -ACE_Message_Queue_NT::pulse (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::pulse"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); - - int const previous_state = this->state_; - if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) - { - this->state_ = ACE_Message_Queue_Base::PULSED; - - // Get the number of shutdown messages necessary to wake up all - // waiting threads. - - DWORD cntr = - this->cur_thrs_ - static_cast<DWORD> (this->cur_count_); - while (cntr-- > 0) - ::PostQueuedCompletionStatus (this->completion_port_, - 0, - this->state_, - 0); - } - return previous_state; -} - -void -ACE_Message_Queue_NT::dump (void) const -{ -#if defined (ACE_HAS_DUMP) - ACE_TRACE ("ACE_Message_Queue_NT::dump"); - - ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); - switch (this->state_) - { - case ACE_Message_Queue_Base::ACTIVATED: - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("state = ACTIVATED\n"))); - break; - case ACE_Message_Queue_Base::DEACTIVATED: - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("state = DEACTIVATED\n"))); - break; - case ACE_Message_Queue_Base::PULSED: - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("state = PULSED\n"))); - break; - } - - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("max_cthrs_ = %d\n") - ACE_TEXT ("cur_thrs_ = %d\n") - ACE_TEXT ("cur_bytes = %d\n") - ACE_TEXT ("cur_length = %d\n") - ACE_TEXT ("cur_count = %d\n") - ACE_TEXT ("completion_port_ = %x\n"), - this->max_cthrs_, - this->cur_thrs_, - this->cur_bytes_, - this->cur_length_, - this->cur_count_, - this->completion_port_)); - ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); -#endif /* ACE_HAS_DUMP */ -} - -#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ - ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/Message_Queue.h b/ACE/ace/Message_Queue.h index a3e81f02151..b5f847700fd 100644 --- a/ACE/ace/Message_Queue.h +++ b/ACE/ace/Message_Queue.h @@ -229,204 +229,6 @@ ACE_END_VERSIONED_NAMESPACE_DECL // Include the templates here. #include "ace/Message_Queue_T.h" -ACE_BEGIN_VERSIONED_NAMESPACE_DECL - -#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) -/** - * @class ACE_Message_Queue_NT - * - * @brief Message Queue implementation using IO completion port on NT. - * - * Implementation of a strip-downed ACE_Message_Queue using NT's - * IO completion port mechanism. - * @note *Many* ACE_Message_Queue features are not supported with - * this implementation, including: - * * <open> method have different signatures. - * * <dequeue_head> *requires* that the ACE_Message_Block - * pointer argument point to an ACE_Message_Block that was - * allocated by the caller. - * * <peek_dequeue_head>. - * * <ACE_Message_Queue_Iterators>. - * * No flow control. - */ -class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base -{ -public: - // = Initialization and termination methods. - ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); - - /** - * Initialize the Message Queue by creating a new NT I/O completion - * port. The first arguemnt specifies the number of threads - * released by the MQ that are allowed to run concurrently. Return - * 0 when succeeds, -1 otherwise. - */ - virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); - - /// Close down the underlying I/O completion port. You need to - /// re-open the MQ after this function is executed. - virtual int close (void); - - /// Close down the message queue and release all resources. - virtual ~ACE_Message_Queue_NT (void); - - // = Enqueue and dequeue methods. - - /** - * Enqueue an <ACE_Message_Block *> at the end of the queue. - * Returns -1 on failure, else the number of items still on the - * queue. - */ - virtual int enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - virtual int enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0); - - /** - * Dequeue and return the <ACE_Message_Block *> at the head of the - * queue. Returns -1 on failure, else the number of items still on - * the queue. - */ - virtual int dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout = 0); - virtual int dequeue (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout = 0); - - // = Check if queue is full/empty. - /** - * Always return false. - */ - - virtual bool is_full (void); - /** - * True if queue is empty, else false. Notice the return value is - * only transient. - */ - virtual bool is_empty (void); - - // = Queue statistic methods (transient.) - /** - * Number of total bytes on the queue, i.e., sum of the message - * block sizes. - */ - virtual size_t message_bytes (void); - - /** - * Number of total length on the queue, i.e., sum of the message - * block lengths. - */ - virtual size_t message_length (void); - - /** - * Number of total messages on the queue. - */ - virtual size_t message_count (void); - - // = Manual changes to these stats (used when queued message blocks - // change size or lengths). - /** - * New value of the number of total bytes on the queue, i.e., sum of - * the message block sizes. - */ - virtual void message_bytes (size_t new_size); - - /** - * New value of the number of total length on the queue, i.e., sum - * of the message block lengths. - */ - virtual void message_length (size_t new_length); - - /// Get the max concurrent thread number. - virtual DWORD max_threads (void); - - // = Activation control methods. - - /** - * Deactivate the queue and wake up all threads waiting on the queue - * so they can continue. No messages are removed from the queue, - * however. Any other operations called until the queue is - * activated again will immediately return -1 with @c errno - * ESHUTDOWN. - * - * @retval The queue's state before this call. - */ - virtual int deactivate (void); - - /** - * Reactivate the queue so that threads can enqueue and dequeue - * messages again. Returns the state of the queue before the call. - */ - virtual int activate (void); - - /** - * Pulse the queue to wake up any waiting threads. Changes the - * queue state to PULSED; future enqueue/dequeue operations proceed - * as in ACTIVATED state. - * - * @retval The queue's state before this call. - */ - virtual int pulse (void); - - /// Returns true if the state of the queue is <DEACTIVATED>, - /// but false if the queue's is <ACTIVATED> or <PULSED>. - virtual int deactivated (void); - - // = Not currently implemented... - int peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout = 0); - ACE_Notification_Strategy *notification_strategy (void); - void notification_strategy (ACE_Notification_Strategy *s); - - // = Notification hook. - - /// Dump the state of an object. - virtual void dump (void) const; - - /// Get the handle to the underlying completion port. - virtual ACE_HANDLE completion_port (void); - - /// Declare the dynamic allocation hooks. - ACE_ALLOC_HOOK_DECLARE; - -private: - - // Disallow copying and assignment. - ACE_Message_Queue_NT (const ACE_Message_Queue_NT &); - void operator= (const ACE_Message_Queue_NT &); - -private: - // = Internal states. - - /// Maximum threads that can be released (and run) concurrently. - DWORD max_cthrs_; - - /// Current number of threads waiting to dequeue messages. - DWORD cur_thrs_; - - /// Current number of bytes in queue. - size_t cur_bytes_; - - /// Current length of messages in queue. - size_t cur_length_; - - /// Current number of messages in the queue. - size_t cur_count_; - - /** - * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex - * but since this class is only supported on NT, it's okay to use - * ACE_Thread_Mutex here. - */ - ACE_SYNCH_MUTEX lock_; - - /// Underlying NT IoCompletionPort. - ACE_HANDLE completion_port_; - -}; -#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ - -ACE_END_VERSIONED_NAMESPACE_DECL - #if defined (__ACE_INLINE__) #include "ace/Message_Queue.inl" #endif /* __ACE_INLINE__ */ diff --git a/ACE/ace/Message_Queue.inl b/ACE/ace/Message_Queue.inl index 671ecc22040..aaadea9d3e6 100644 --- a/ACE/ace/Message_Queue.inl +++ b/ACE/ace/Message_Queue.inl @@ -2,10 +2,6 @@ // // $Id$ -#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) -# include "ace/Guard_T.h" -#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ - ACE_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE @@ -13,124 +9,4 @@ ACE_Message_Queue_Base::ACE_Message_Queue_Base (void) { } -#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) - -ACE_INLINE int -ACE_Message_Queue_NT::enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Message_Queue_NT::enqueue_tail"); - return this->enqueue (new_item, timeout); -} - -ACE_INLINE int -ACE_Message_Queue_NT::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout) -{ - ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head"); - return this->dequeue (first_item, timeout); -} - -ACE_INLINE bool -ACE_Message_Queue_NT::is_full (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::is_full"); - return false; // Always not full. -} - -ACE_INLINE bool -ACE_Message_Queue_NT::is_empty (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::is_empty"); - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, false); - - return this->cur_bytes_ > 0 || this->cur_count_ > 0 ? false : true; -} - -ACE_INLINE size_t -ACE_Message_Queue_NT::message_bytes (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::message_bytes"); - // Accessing to size_t must be atomic. - return this->cur_bytes_; -} - -ACE_INLINE size_t -ACE_Message_Queue_NT::message_length (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::message_length"); - // Accessing to size_t must be atomic. - return this->cur_length_; -} - -ACE_INLINE size_t -ACE_Message_Queue_NT::message_count (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::message_count"); - // Accessing to size_t must be atomic. - return this->cur_count_; -} - -ACE_INLINE void -ACE_Message_Queue_NT::message_bytes (size_t new_value) -{ - ACE_TRACE ("ACE_Message_Queue_NT::message_bytes"); - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - this->cur_bytes_ = new_value; -} - -ACE_INLINE void -ACE_Message_Queue_NT::message_length (size_t new_value) -{ - ACE_TRACE ("ACE_Message_Queue_NT::message_length"); - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - this->cur_length_ = new_value; -} - -ACE_INLINE DWORD -ACE_Message_Queue_NT::max_threads (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::max_threads"); - return this->max_cthrs_; -} - -ACE_INLINE int -ACE_Message_Queue_NT::deactivated (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::deactivated"); - // Accessing to int must be atomic. - return this->state_ == ACE_Message_Queue_Base::DEACTIVATED; -} - -ACE_INLINE ACE_HANDLE -ACE_Message_Queue_NT::completion_port (void) -{ - ACE_TRACE ("ACE_Message_Queue_NT::completion_port"); - return this->completion_port_; -} - -ACE_INLINE int -ACE_Message_Queue_NT::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *timeout) -{ - ACE_UNUSED_ARG(first_item); - ACE_UNUSED_ARG(timeout); - ACE_NOTSUP_RETURN (-1); -} - -ACE_INLINE ACE_Notification_Strategy * -ACE_Message_Queue_NT::notification_strategy (void) -{ - ACE_NOTSUP_RETURN (0); -} - -ACE_INLINE void -ACE_Message_Queue_NT::notification_strategy (ACE_Notification_Strategy *) -{ -} - -#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ - ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/Message_Queue_NT.cpp b/ACE/ace/Message_Queue_NT.cpp new file mode 100644 index 00000000000..3c4fbe99c64 --- /dev/null +++ b/ACE/ace/Message_Queue_NT.cpp @@ -0,0 +1,237 @@ +// $Id$ + +#include "ace/Message_Queue_NT.h" +#include "ace/Log_Msg.h" + +#if !defined (__ACE_INLINE__) +#include "ace/Message_Queue_NT.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (ace, + Message_Queue_NT, + "$Id$") + + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) + +ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads) + : max_cthrs_ (max_threads), + cur_thrs_ (0), + cur_bytes_ (0), + cur_length_ (0), + cur_count_ (0), + completion_port_ (ACE_INVALID_HANDLE) +{ + ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT"); + this->open (max_threads); +} + +int +ACE_Message_Queue_NT::open (DWORD max_threads) +{ + ACE_TRACE ("ACE_Message_Queue_NT::open"); + this->max_cthrs_ = max_threads; + this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE, + 0, + ACE_Message_Queue_Base::ACTIVATED, + max_threads); + return (this->completion_port_ == 0 ? -1 : 0); +} + +int +ACE_Message_Queue_NT::close (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::close"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + this->deactivate (); + return (::CloseHandle (this->completion_port_) ? 0 : -1 ); +} + +ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT"); + this->close (); +} + +int +ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *) +{ + ACE_TRACE ("ACE_Message_Queue_NT::enqueue"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED) + { + size_t const msize = new_item->total_size (); + size_t const mlength = new_item->total_length (); + // Note - we send ACTIVATED in the 3rd arg to tell the completion + // routine it's _NOT_ being woken up because of deactivate(). + ULONG_PTR state_to_post; + state_to_post = ACE_Message_Queue_Base::ACTIVATED; + if (::PostQueuedCompletionStatus (this->completion_port_, + static_cast<DWORD> (msize), + state_to_post, + reinterpret_cast<LPOVERLAPPED> (new_item))) + { + // Update the states once I succeed. + this->cur_bytes_ += msize; + this->cur_length_ += mlength; + return ACE_Utils::truncate_cast<int> (++this->cur_count_); + } + } + else + errno = ESHUTDOWN; + + // Fail to enqueue the message. + return -1; +} + +int +ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head"); + + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + // Make sure the MQ is not deactivated before proceeding. + if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED) + { + errno = ESHUTDOWN; // Operation on deactivated MQ not allowed. + return -1; + } + else + ++this->cur_thrs_; // Increase the waiting thread count. + } + + ULONG_PTR queue_state; + DWORD msize; + // Get a message from the completion port. + int retv = ::GetQueuedCompletionStatus (this->completion_port_, + &msize, + &queue_state, + reinterpret_cast<LPOVERLAPPED *> (&first_item), + (timeout == 0 ? INFINITE : timeout->msec ())); + { + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + --this->cur_thrs_; // Decrease waiting thread count. + if (retv) + { + if (queue_state == ACE_Message_Queue_Base::ACTIVATED) + { // Really get a valid MB from the queue. + --this->cur_count_; + this->cur_bytes_ -= msize; + this->cur_length_ -= first_item->total_length (); + return ACE_Utils::truncate_cast<int> (this->cur_count_); + } + else // Woken up by deactivate () or pulse (). + errno = ESHUTDOWN; + } + } + return -1; +} + +int +ACE_Message_Queue_NT::deactivate (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::deactivate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + int const previous_state = this->state_; + if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) + { + this->state_ = ACE_Message_Queue_Base::DEACTIVATED; + + // Get the number of shutdown messages necessary to wake up all + // waiting threads. + DWORD cntr = + this->cur_thrs_ - static_cast<DWORD> (this->cur_count_); + while (cntr-- > 0) + ::PostQueuedCompletionStatus (this->completion_port_, + 0, + this->state_, + 0); + } + return previous_state; +} + +int +ACE_Message_Queue_NT::activate (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::activate"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + int const previous_status = this->state_; + this->state_ = ACE_Message_Queue_Base::ACTIVATED; + return previous_status; +} + +int +ACE_Message_Queue_NT::pulse (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::pulse"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + int const previous_state = this->state_; + if (previous_state != ACE_Message_Queue_Base::DEACTIVATED) + { + this->state_ = ACE_Message_Queue_Base::PULSED; + + // Get the number of shutdown messages necessary to wake up all + // waiting threads. + + DWORD cntr = + this->cur_thrs_ - static_cast<DWORD> (this->cur_count_); + while (cntr-- > 0) + ::PostQueuedCompletionStatus (this->completion_port_, + 0, + this->state_, + 0); + } + return previous_state; +} + +void +ACE_Message_Queue_NT::dump (void) const +{ +#if defined (ACE_HAS_DUMP) + ACE_TRACE ("ACE_Message_Queue_NT::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + switch (this->state_) + { + case ACE_Message_Queue_Base::ACTIVATED: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("state = ACTIVATED\n"))); + break; + case ACE_Message_Queue_Base::DEACTIVATED: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("state = DEACTIVATED\n"))); + break; + case ACE_Message_Queue_Base::PULSED: + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("state = PULSED\n"))); + break; + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("max_cthrs_ = %d\n") + ACE_TEXT ("cur_thrs_ = %d\n") + ACE_TEXT ("cur_bytes = %d\n") + ACE_TEXT ("cur_length = %d\n") + ACE_TEXT ("cur_count = %d\n") + ACE_TEXT ("completion_port_ = %x\n"), + this->max_cthrs_, + this->cur_thrs_, + this->cur_bytes_, + this->cur_length_, + this->cur_count_, + this->completion_port_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +#endif /* ACE_HAS_DUMP */ +} + +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/Message_Queue_NT.h b/ACE/ace/Message_Queue_NT.h new file mode 100644 index 00000000000..40de899f32a --- /dev/null +++ b/ACE/ace/Message_Queue_NT.h @@ -0,0 +1,231 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file Message_Queue_NT.h + * + * $Id$ + * + * @author Douglas C. Schmidt <schmidt@cs.wustl.edu> + */ +//============================================================================= + +#ifndef ACE_MESSAGE_QUEUE_NT_H +#define ACE_MESSAGE_QUEUE_NT_H +#include /**/ "ace/pre.h" + +#include "ace/Message_Queue.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) +# include "ace/Synch_Traits.h" /* Needed in ACE_Message_Queue_NT */ +# include "ace/Thread_Mutex.h" /* Needed in ACE_Message_Queue_NT */ +#endif + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) +/** + * @class ACE_Message_Queue_NT + * + * @brief Message Queue implementation using IO completion port on NT. + * + * Implementation of a strip-downed ACE_Message_Queue using NT's + * IO completion port mechanism. + * @note *Many* ACE_Message_Queue features are not supported with + * this implementation, including: + * * <open> method have different signatures. + * * <dequeue_head> *requires* that the ACE_Message_Block + * pointer argument point to an ACE_Message_Block that was + * allocated by the caller. + * * <peek_dequeue_head>. + * * <ACE_Message_Queue_Iterators>. + * * No flow control. + */ +class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base +{ +public: + // = Initialization and termination methods. + ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); + + /** + * Initialize the Message Queue by creating a new NT I/O completion + * port. The first arguemnt specifies the number of threads + * released by the MQ that are allowed to run concurrently. Return + * 0 when succeeds, -1 otherwise. + */ + virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); + + /// Close down the underlying I/O completion port. You need to + /// re-open the MQ after this function is executed. + virtual int close (void); + + /// Close down the message queue and release all resources. + virtual ~ACE_Message_Queue_NT (void); + + // = Enqueue and dequeue methods. + + /** + * Enqueue an <ACE_Message_Block *> at the end of the queue. + * Returns -1 on failure, else the number of items still on the + * queue. + */ + virtual int enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + virtual int enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + + /** + * Dequeue and return the <ACE_Message_Block *> at the head of the + * queue. Returns -1 on failure, else the number of items still on + * the queue. + */ + virtual int dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + virtual int dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + + // = Check if queue is full/empty. + /** + * Always return false. + */ + + virtual bool is_full (void); + /** + * True if queue is empty, else false. Notice the return value is + * only transient. + */ + virtual bool is_empty (void); + + // = Queue statistic methods (transient.) + /** + * Number of total bytes on the queue, i.e., sum of the message + * block sizes. + */ + virtual size_t message_bytes (void); + + /** + * Number of total length on the queue, i.e., sum of the message + * block lengths. + */ + virtual size_t message_length (void); + + /** + * Number of total messages on the queue. + */ + virtual size_t message_count (void); + + // = Manual changes to these stats (used when queued message blocks + // change size or lengths). + /** + * New value of the number of total bytes on the queue, i.e., sum of + * the message block sizes. + */ + virtual void message_bytes (size_t new_size); + + /** + * New value of the number of total length on the queue, i.e., sum + * of the message block lengths. + */ + virtual void message_length (size_t new_length); + + /// Get the max concurrent thread number. + virtual DWORD max_threads (void); + + // = Activation control methods. + + /** + * Deactivate the queue and wake up all threads waiting on the queue + * so they can continue. No messages are removed from the queue, + * however. Any other operations called until the queue is + * activated again will immediately return -1 with @c errno + * ESHUTDOWN. + * + * @retval The queue's state before this call. + */ + virtual int deactivate (void); + + /** + * Reactivate the queue so that threads can enqueue and dequeue + * messages again. Returns the state of the queue before the call. + */ + virtual int activate (void); + + /** + * Pulse the queue to wake up any waiting threads. Changes the + * queue state to PULSED; future enqueue/dequeue operations proceed + * as in ACTIVATED state. + * + * @retval The queue's state before this call. + */ + virtual int pulse (void); + + /// Returns true if the state of the queue is <DEACTIVATED>, + /// but false if the queue's is <ACTIVATED> or <PULSED>. + virtual int deactivated (void); + + // = Not currently implemented... + int peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + ACE_Notification_Strategy *notification_strategy (void); + void notification_strategy (ACE_Notification_Strategy *s); + + // = Notification hook. + + /// Dump the state of an object. + virtual void dump (void) const; + + /// Get the handle to the underlying completion port. + virtual ACE_HANDLE completion_port (void); + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + +private: + + // Disallow copying and assignment. + ACE_Message_Queue_NT (const ACE_Message_Queue_NT &); + void operator= (const ACE_Message_Queue_NT &); + +private: + // = Internal states. + + /// Maximum threads that can be released (and run) concurrently. + DWORD max_cthrs_; + + /// Current number of threads waiting to dequeue messages. + DWORD cur_thrs_; + + /// Current number of bytes in queue. + size_t cur_bytes_; + + /// Current length of messages in queue. + size_t cur_length_; + + /// Current number of messages in the queue. + size_t cur_count_; + + /** + * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex + * but since this class is only supported on NT, it's okay to use + * ACE_Thread_Mutex here. + */ + ACE_SYNCH_MUTEX lock_; + + /// Underlying NT IoCompletionPort. + ACE_HANDLE completion_port_; + +}; +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "ace/Message_Queue.inl" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* ACE_MESSAGE_QUEUE_NT_H */ diff --git a/ACE/ace/Message_Queue_NT.inl b/ACE/ace/Message_Queue_NT.inl new file mode 100644 index 00000000000..6db82111d5b --- /dev/null +++ b/ACE/ace/Message_Queue_NT.inl @@ -0,0 +1,131 @@ +// -*- C++ -*- +// +// $Id$ + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) +# include "ace/Guard_T.h" +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +#if defined (ACE_HAS_WIN32_OVERLAPPED_IO) + +ACE_INLINE int +ACE_Message_Queue_NT::enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_NT::enqueue_tail"); + return this->enqueue (new_item, timeout); +} + +ACE_INLINE int +ACE_Message_Queue_NT::dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head"); + return this->dequeue (first_item, timeout); +} + +ACE_INLINE bool +ACE_Message_Queue_NT::is_full (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::is_full"); + return false; // Always not full. +} + +ACE_INLINE bool +ACE_Message_Queue_NT::is_empty (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::is_empty"); + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, false); + + return this->cur_bytes_ > 0 || this->cur_count_ > 0 ? false : true; +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::message_bytes (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_bytes"); + // Accessing to size_t must be atomic. + return this->cur_bytes_; +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::message_length (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_length"); + // Accessing to size_t must be atomic. + return this->cur_length_; +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::message_count (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_count"); + // Accessing to size_t must be atomic. + return this->cur_count_; +} + +ACE_INLINE void +ACE_Message_Queue_NT::message_bytes (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_bytes"); + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + this->cur_bytes_ = new_value; +} + +ACE_INLINE void +ACE_Message_Queue_NT::message_length (size_t new_value) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_length"); + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + this->cur_length_ = new_value; +} + +ACE_INLINE DWORD +ACE_Message_Queue_NT::max_threads (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::max_threads"); + return this->max_cthrs_; +} + +ACE_INLINE int +ACE_Message_Queue_NT::deactivated (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::deactivated"); + // Accessing to int must be atomic. + return this->state_ == ACE_Message_Queue_Base::DEACTIVATED; +} + +ACE_INLINE ACE_HANDLE +ACE_Message_Queue_NT::completion_port (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::completion_port"); + return this->completion_port_; +} + +ACE_INLINE int +ACE_Message_Queue_NT::peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_UNUSED_ARG(first_item); + ACE_UNUSED_ARG(timeout); + ACE_NOTSUP_RETURN (-1); +} + +ACE_INLINE ACE_Notification_Strategy * +ACE_Message_Queue_NT::notification_strategy (void) +{ + ACE_NOTSUP_RETURN (0); +} + +ACE_INLINE void +ACE_Message_Queue_NT::notification_strategy (ACE_Notification_Strategy *) +{ +} + +#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */ + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc index a6053729a3d..3a8a1609054 100644 --- a/ACE/ace/ace.mpc +++ b/ACE/ace/ace.mpc @@ -120,6 +120,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca MEM_Stream.cpp Message_Block.cpp Message_Queue.cpp + Message_Queue_NT.cpp Message_Queue_Vx.cpp Method_Request.cpp MMAP_Memory_Pool.cpp |