From faa3a50eb38ab1271067493effed24970671c3cd Mon Sep 17 00:00:00 2001 From: nanbor Date: Fri, 11 Dec 1998 08:12:12 +0000 Subject: ACE_Message_Queue_NT changes. --- ace/Message_Queue.cpp | 169 +++++++++++++++++++++++++++++++++++++++ ace/Message_Queue.h | 205 ++++++++++++++++++++++++++++++++++++++++++++++++ ace/Message_Queue.i | 67 ++++++++++++++++ ace/Message_Queue_T.cpp | 15 ++++ ace/Message_Queue_T.h | 16 ++++ ace/Message_Queue_T.i | 10 ++- 6 files changed, 480 insertions(+), 2 deletions(-) (limited to 'ace') diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp index 51156850dd0..445028cfd10 100644 --- a/ace/Message_Queue.cpp +++ b/ace/Message_Queue.cpp @@ -262,4 +262,173 @@ ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&, #endif /* VXWORKS */ +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) + +ACE_Message_Queue_NT::ACE_Message_Queue_NT (size_t max_threads) + : max_cthrs_ (max_threads), + cur_thrs_ (0), + cur_bytes_ (0), + cur_count_ (0), + deactivated_ (0), + completion_port_ (ACE_INVALID_HANDLE) +{ + ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT"); + this->open (max_threads); +} + +int +ACE_Message_Queue_NT::open (size_t max_threads) +{ + ACE_TRACE ("ACE_Message_Queue_NT::open"); + this->max_cthrs_ = max_threads; + this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE, + NULL, + ACE_Message_Queue_Base::WAS_ACTIVE, + max_threads); + return (this->completion_port_ == NULL ? -1 : 0); +} + +int +ACE_Message_Queue_NT::close (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::close"); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + this->deactivate (); + return (::CloseHandle (this->completion_port_) ? 0 : -1 ); +} + +ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT"); + this->close (); +} + +int +ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *) +{ + ACE_TRACE ("ACE_Message_Queue_NT::enqueue"); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + if (!this->deactivated_) + { + size_t msize = new_item->size (); + if (::PostQueuedCompletionStatus (this->completion_port_, + msize, + this->deactivated_, + ACE_reinterpret_cast (LPOVERLAPPED, new_item))) + { + // Update the states once I succeed. + this->cur_bytes_ += msize; + return ++this->cur_count_; + } + } + else + errno = ESHUTDOWN; + + // Fail to enqueue the message. + return -1; +} + +int +ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head"); + + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + if (this->deactivated_) // Make sure the MQ is not deactivated before + { // I proceed. + errno = ESHUTDOWN; // Operation on deactivated MQ not allowed. + return -1; + } + else + ++this->cur_thrs_; // Increase the waiting thread count. + } + + DWORD shutdown; + DWORD msize; + // Get a message from the completion port. + int retv = ::GetQueuedCompletionStatus (this->completion_port_, + &msize, + &shutdown, + ACE_reinterpret_cast (LPOVERLAPPED *, &first_item), + (timeout == 0 ? INFINITE : timeout->msec ())); + { + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + --this->cur_thrs_; // Decrease waiting thread count. + if (retv) + { + if (!shutdown) + { // Really get a valid MB from the queue. + --this->cur_count_; + this->cur_bytes_ -= msize; + return this->cur_count_; + } + else // I am woken up by deactivate (). + errno = ESHUTDOWN; + } + } + return -1; +} + +int +ACE_Message_Queue_NT::deactivate (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::deactivate"); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + + if (this->deactivated_) // Check if I have been deactivated already. + return ACE_Message_Queue_Base::WAS_INACTIVE; + + this->deactivated_ = 1; + + // Get the number of shutdown messages necessary to wake up + // all waiting threads. + + for (size_t cntr = this->cur_thrs_ - this->cur_count_; + cntr > 0; cntr++) + ::PostQueuedCompletionStatus (this->completion_port_, + 0, + this->deactivated_, + NULL); + return ACE_Message_Queue_Base::WAS_ACTIVE; +} + +int +ACE_Message_Queue_NT::activate (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::activate"); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + if (!this->deactivated_) + return ACE_Message_Queue_Base::WAS_ACTIVE; + + this->deactivated_ = 0; + return ACE_Message_Queue_Base::WAS_INACTIVE; +} + +void +ACE_Message_Queue_NT::dump (void) const +{ + ACE_TRACE ("ACE_Message_Queue_NT::dump"); + + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("deactivated = %d\n") + ASYS_TEXT ("max_cthrs_ = %d\n") + ASYS_TEXT ("cur_thrs_ = %d\n") + ASYS_TEXT ("cur_bytes = %d\n") + ASYS_TEXT ("cur_count = %d\n") + ASYS_TEXT ("completion_port_ = %x\n"), + this->deactivated_, + this->max_cthrs_, + this->cur_thrs_, + this->cur_bytes_, + this->cur_count_, + this->completion_port_)); + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ + #endif /* ACE_MESSAGE_QUEUE_C */ diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h index e9766f1ac95..76f7b05467c 100644 --- a/ace/Message_Queue.h +++ b/ace/Message_Queue.h @@ -52,6 +52,78 @@ public: WAS_INACTIVE = 2 // Message queue was inactive before activate() or deactivate(). }; + + ACE_Message_Queue_Base (void); + + virtual int close (void) = 0; + // Close down the message queue and release all resources. + + virtual ~ACE_Message_Queue_Base (void) = 0; + // Close down the message queue and release all resources. + + // = Enqueue and dequeue methods. + + // For the following enqueue and dequeue methods if == 0, + // the caller will block until action is possible, else will wait + // until the absolute time specified in * elapses). These + // calls will return, however, when queue is closed, deactivated, + // when a signal occurs, or if the time specified in timeout + // elapses, (in which case errno = EWOULDBLOCK). + + virtual int enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0) = 0; + // Enqueue a into the tail of the queue. + // Return -1 on failure, number of items in queue otherwise. + + virtual int dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0) = 0; + // Dequeue and return the at the head of the + // queue. Returns -1 on failure, else the number of items still on + // the queue. + + // = Check if queue is full/empty. + virtual int is_full (void) = 0; + // True if queue is full, else false. + virtual int is_empty (void) = 0; + // True if queue is empty, else false. + + // = Queue statistic methods. + virtual size_t message_bytes (void) = 0; + // Number of total bytes on the queue. + virtual size_t message_count (void) = 0; + // Number of total messages on the queue. + + // = Activation control methods. + + virtual int deactivate (void) = 0; + // Deactivate the queue and wakeup all threads waiting on the queue + // so they can continue. No messages are removed from the queue, + // however. Any other operations called until the queue is + // activated again will immediately return -1 with == + // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + // call and WAS_ACTIVE if queue was active before the call. + + virtual int activate (void) = 0; + // Reactivate the queue so that threads can enqueue and dequeue + // messages again. Returns WAS_INACTIVE if queue was inactive + // before the call and WAS_ACTIVE if queue was active before the + // call. + + virtual int deactivated (void) = 0; + // Returns true if is enabled. + + // = Notification hook. + + virtual void dump (void) const = 0; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Base &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Base (const ACE_Message_Queue_Base &)) }; // Include the templates here. @@ -198,6 +270,139 @@ private: }; #endif /* VXWORKS */ +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) +class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base +{ + // = TITLE + // Message Queue implementation using IO completion port on NT. + // + // = DESCRIPTION + // Implementation of a strip-downed ACE_Message_Queue using NT's + // IO completion port mechanism. + // + // NOTE: *Many* ACE_Message_Queue features are not supported with + // this implementation, including: + // * open method have different signatures. + // * dequeue_head () *requires* that the ACE_Message_Block + // pointer argument point to an ACE_Message_Block that was + // allocated by the caller. + // * peek_dequeue_head (). + // * ACE_Message_Queue_Iterators. + // * No flow control. + // * Message_Block chains. The continuation field of ACE_Message_Block + // * is ignored; only the first block of a fragment chain is + // * recognized. +public: + // = Initialization and termination methods. + ACE_Message_Queue_NT (size_t max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); + + virtual int open (size_t max_threads = ACE_Message_Queue_Base::DEFAULT_HWM); + // Initialize the Message Queue by creating a new NT I/O completion + // port. The first arguemnt specifies the number of threads + // released by the MQ that are allowed to run concurrently. Return + // 0 when succeeds, -1 otherwise. + + virtual int close (void); + // Close down the underlying I/O completion port. You need to + // re-open the MQ after this function is executed. + + virtual ~ACE_Message_Queue_NT (void); + // Close down the message queue and release all resources. + + // = Enqueue and dequeue methods. + + virtual int enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an at the end of the queue. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + // Dequeue and return the at the head of the + // queue. Returns -1 on failure, else the number of items still on + // the queue. + + // = Check if queue is full/empty. + virtual int is_full (void); + // Always return false. + virtual int is_empty (void); + // True if queue is empty, else false. Notice the return value is + // only transient. + + // = Queue statistic methods (transient.) + virtual size_t message_bytes (void); + // Number of total bytes on the queue. + virtual size_t message_count (void); + // Number of total messages on the queue. + + virtual size_t max_threads (void); + // Get the max concurrent thread number. + + // = Activation control methods. + + virtual int deactivate (void); + // Deactivate the queue and wakeup all threads waiting on the queue + // so they can continue. Messages already in the queue get removed. + // If there are more messages in the queue than there are threads + // waiting on the queue, the left over messages will not be removed. + // Any other enqueue/dequeue operations called until the queue is + // activated again will immediately return -1 with == + // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + // call and WAS_ACTIVE if queue was active before the call. + + virtual int activate (void); + // Reactivate the queue so that threads can enqueue and dequeue + // messages again. Returns WAS_INACTIVE if queue was inactive + // before the call and WAS_ACTIVE if queue was active before the + // call. + + virtual int deactivated (void); + // Returns true if is enabled. + + // = Notification hook. + + virtual void dump (void) const; + // Dump the state of an object. + + virtual ACE_HANDLE completion_port (void); + // Get the handle to the underlying completion port. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + // = Internal states. + + size_t max_cthrs_; + // Maximum threads that can be released (and run) concurrently. + + size_t cur_thrs_; + // Current number of threads waiting to dequeue messages. + + size_t cur_bytes_; + // Current number of bytes in queue. + + size_t cur_count_; + // Current number of messages in the queue. + + ACE_Thread_Mutex lock_; + // Synchronizer. This should really be an ACE_Recursive_Thread_Mutex + // but since this class is only supported on NT, it's okay to use + // ACE_Thread_Mutex here. + + int deactivated_; + // Indicates that the queue is inactive. + + ACE_HANDLE completion_port_; + // Underlying NT IoCompletionPort. + + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_NT &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_NT (const ACE_Message_Queue_NT &)) +}; +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ + // This must go here to avoid problems with circular includes. #include "ace/Strategies.h" diff --git a/ace/Message_Queue.i b/ace/Message_Queue.i index 90ef02953b9..e1f953b0f7a 100644 --- a/ace/Message_Queue.i +++ b/ace/Message_Queue.i @@ -1,6 +1,16 @@ /* -*- C++ -*- */ // $Id$ +ACE_INLINE +ACE_Message_Queue_Base::ACE_Message_Queue_Base (void) +{ +} + +ACE_INLINE +ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void) +{ +} + #if defined (VXWORKS) // Specialization to use native VxWorks Message Queues. @@ -81,3 +91,60 @@ ACE_Message_Queue_Vx::message_count (void) } #endif /* VXWORKS */ + +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) +ACE_INLINE int +ACE_Message_Queue_NT::is_full (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::is_full"); + return 0; // Always not full. +} + +ACE_INLINE int +ACE_Message_Queue_NT::is_empty (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::is_empty"); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0); + + return (this->cur_bytes_ > 0 && this->cur_count_ > 0 ? 1 : 0); +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::message_bytes (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_bytes"); + // Accessing to size_t must be atomic. + return this->cur_bytes_; +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::message_count (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::message_count"); + // Accessing to size_t must be atomic. + return this->cur_count_; +} + +ACE_INLINE size_t +ACE_Message_Queue_NT::max_threads (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::max_threads"); + return this->max_cthrs_; +} + +ACE_INLINE int +ACE_Message_Queue_NT::deactivated (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::ceactivated"); + // Accessing to int must be atomic. + return this->deactivated_; +} + +ACE_INLINE ACE_HANDLE +ACE_Message_Queue_NT::completion_port (void) +{ + ACE_TRACE ("ACE_Message_Queue_NT::completion_port"); + return this->completion_port_; +} + +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index ff1df7d53d5..654353c7025 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -1705,6 +1705,21 @@ ACE_Message_Queue_Factory::create_Vx_message_queue (size_t max_me } // factory method for a wrapped VxWorks message queue +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) + +template +ACE_Message_Queue_NT * +ACE_Message_Queue_Factory::create_NT_message_queue (size_t max_threads) +{ + ACE_Message_Queue_NT *tmp; + + ACE_NEW_RETURN (tmp, + ACE_Message_Queue_NT (max_threads); + 0); + return tmp; +} + +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ #endif /* defined (VXWORKS) */ diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index e5628daffce..d13250fa005 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -27,6 +27,10 @@ class ACE_Message_Queue_Vx; #endif /* defined (VXWORKS) */ +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) +class ACE_Message_Queue_NT; +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ + template class ACE_Message_Queue : public ACE_Message_Queue_Base { @@ -132,6 +136,10 @@ public: // Returns -1 on failure, else the number of items still on the // queue. + virtual int dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + // Same as the following. + virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); // Dequeue and return the at the head of the @@ -626,6 +634,14 @@ public: // factory method for a wrapped VxWorks message queue #endif /* defined (VXWORKS) */ + +#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0) + + static ACE_Message_Queue_NT * + create_NT_message_queue (size_t max_threads); + // factory method for a NT message queue. + +#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */ }; #if defined (__ACE_INLINE__) diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i index 1391f82a99d..b5b40a48d07 100644 --- a/ace/Message_Queue_T.i +++ b/ace/Message_Queue_T.i @@ -1,6 +1,14 @@ /* -*- C++ -*- */ // $Id$ +template ACE_INLINE int +ACE_Message_Queue::dequeue (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout) +{ + ACE_TRACE ("ACE_Message_Queue::notification_strategy"); + return this->dequeue (first_item, timeout); +} + template ACE_INLINE ACE_Notification_Strategy * ACE_Message_Queue::notification_strategy (void) { @@ -142,5 +150,3 @@ ACE_Message_Queue::deactivated (void) } ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator) - - -- cgit v1.2.1