summaryrefslogtreecommitdiff
path: root/ACE/ace
diff options
context:
space:
mode:
authorJohnny Willemsen <jwillemsen@remedy.nl>2008-02-22 14:21:32 +0000
committerJohnny Willemsen <jwillemsen@remedy.nl>2008-02-22 14:21:32 +0000
commita3e24ec87b85631c0373f1cade818a45f786b59e (patch)
tree172f37f98309b7a7da016e7fd838edbc2649db2b /ACE/ace
parent0a75ac2fe413d75456bd1c03ecca5295d278bb9c (diff)
downloadATCD-a3e24ec87b85631c0373f1cade818a45f786b59e.tar.gz
Diffstat (limited to 'ACE/ace')
-rw-r--r--ACE/ace/Makefile.am1
-rw-r--r--ACE/ace/Message_Queue.cpp222
-rw-r--r--ACE/ace/Message_Queue.h198
-rw-r--r--ACE/ace/Message_Queue.inl124
-rw-r--r--ACE/ace/Message_Queue_NT.cpp237
-rw-r--r--ACE/ace/Message_Queue_NT.h231
-rw-r--r--ACE/ace/Message_Queue_NT.inl131
-rw-r--r--ACE/ace/ace.mpc1
8 files changed, 601 insertions, 544 deletions
diff --git a/ACE/ace/Makefile.am b/ACE/ace/Makefile.am
index 0f7248b7199..d5bc8bf0a99 100644
--- a/ACE/ace/Makefile.am
+++ b/ACE/ace/Makefile.am
@@ -157,6 +157,7 @@ libACE_la_SOURCES = \
Mem_Map.cpp \
Message_Block.cpp \
Message_Queue.cpp \
+ Message_Queue_NT.cpp \
Message_Queue_Vx.cpp \
Method_Request.cpp \
Msg_WFMO_Reactor.cpp \
diff --git a/ACE/ace/Message_Queue.cpp b/ACE/ace/Message_Queue.cpp
index 672a1f9876a..0ce105db50b 100644
--- a/ACE/ace/Message_Queue.cpp
+++ b/ACE/ace/Message_Queue.cpp
@@ -7,12 +7,10 @@
#include "ace/Message_Queue.inl"
#endif /* __ACE_INLINE__ */
-
ACE_RCSID (ace,
Message_Queue,
"$Id$")
-
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
@@ -27,224 +25,4 @@ ACE_Message_Queue_Base::state (void)
return this->state_;
}
-#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
-
-ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
- : max_cthrs_ (max_threads),
- cur_thrs_ (0),
- cur_bytes_ (0),
- cur_length_ (0),
- cur_count_ (0),
- completion_port_ (ACE_INVALID_HANDLE)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
- this->open (max_threads);
-}
-
-int
-ACE_Message_Queue_NT::open (DWORD max_threads)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::open");
- this->max_cthrs_ = max_threads;
- this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
- 0,
- ACE_Message_Queue_Base::ACTIVATED,
- max_threads);
- return (this->completion_port_ == 0 ? -1 : 0);
-}
-
-int
-ACE_Message_Queue_NT::close (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::close");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
- this->deactivate ();
- return (::CloseHandle (this->completion_port_) ? 0 : -1 );
-}
-
-ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
- this->close ();
-}
-
-int
-ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
- if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
- {
- size_t const msize = new_item->total_size ();
- size_t const mlength = new_item->total_length ();
- // Note - we send ACTIVATED in the 3rd arg to tell the completion
- // routine it's _NOT_ being woken up because of deactivate().
- ULONG_PTR state_to_post;
- state_to_post = ACE_Message_Queue_Base::ACTIVATED;
- if (::PostQueuedCompletionStatus (this->completion_port_,
- static_cast<DWORD> (msize),
- state_to_post,
- reinterpret_cast<LPOVERLAPPED> (new_item)))
- {
- // Update the states once I succeed.
- this->cur_bytes_ += msize;
- this->cur_length_ += mlength;
- return ACE_Utils::truncate_cast<int> (++this->cur_count_);
- }
- }
- else
- errno = ESHUTDOWN;
-
- // Fail to enqueue the message.
- return -1;
-}
-
-int
-ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
-
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
-
- // Make sure the MQ is not deactivated before proceeding.
- if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
- {
- errno = ESHUTDOWN; // Operation on deactivated MQ not allowed.
- return -1;
- }
- else
- ++this->cur_thrs_; // Increase the waiting thread count.
- }
-
- ULONG_PTR queue_state;
- DWORD msize;
- // Get a message from the completion port.
- int retv = ::GetQueuedCompletionStatus (this->completion_port_,
- &msize,
- &queue_state,
- reinterpret_cast<LPOVERLAPPED *> (&first_item),
- (timeout == 0 ? INFINITE : timeout->msec ()));
- {
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
- --this->cur_thrs_; // Decrease waiting thread count.
- if (retv)
- {
- if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
- { // Really get a valid MB from the queue.
- --this->cur_count_;
- this->cur_bytes_ -= msize;
- this->cur_length_ -= first_item->total_length ();
- return ACE_Utils::truncate_cast<int> (this->cur_count_);
- }
- else // Woken up by deactivate () or pulse ().
- errno = ESHUTDOWN;
- }
- }
- return -1;
-}
-
-int
-ACE_Message_Queue_NT::deactivate (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
-
- int const previous_state = this->state_;
- if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
- {
- this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
-
- // Get the number of shutdown messages necessary to wake up all
- // waiting threads.
- DWORD cntr =
- this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
- while (cntr-- > 0)
- ::PostQueuedCompletionStatus (this->completion_port_,
- 0,
- this->state_,
- 0);
- }
- return previous_state;
-}
-
-int
-ACE_Message_Queue_NT::activate (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::activate");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
- int const previous_status = this->state_;
- this->state_ = ACE_Message_Queue_Base::ACTIVATED;
- return previous_status;
-}
-
-int
-ACE_Message_Queue_NT::pulse (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::pulse");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
-
- int const previous_state = this->state_;
- if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
- {
- this->state_ = ACE_Message_Queue_Base::PULSED;
-
- // Get the number of shutdown messages necessary to wake up all
- // waiting threads.
-
- DWORD cntr =
- this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
- while (cntr-- > 0)
- ::PostQueuedCompletionStatus (this->completion_port_,
- 0,
- this->state_,
- 0);
- }
- return previous_state;
-}
-
-void
-ACE_Message_Queue_NT::dump (void) const
-{
-#if defined (ACE_HAS_DUMP)
- ACE_TRACE ("ACE_Message_Queue_NT::dump");
-
- ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
- switch (this->state_)
- {
- case ACE_Message_Queue_Base::ACTIVATED:
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("state = ACTIVATED\n")));
- break;
- case ACE_Message_Queue_Base::DEACTIVATED:
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("state = DEACTIVATED\n")));
- break;
- case ACE_Message_Queue_Base::PULSED:
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("state = PULSED\n")));
- break;
- }
-
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("max_cthrs_ = %d\n")
- ACE_TEXT ("cur_thrs_ = %d\n")
- ACE_TEXT ("cur_bytes = %d\n")
- ACE_TEXT ("cur_length = %d\n")
- ACE_TEXT ("cur_count = %d\n")
- ACE_TEXT ("completion_port_ = %x\n"),
- this->max_cthrs_,
- this->cur_thrs_,
- this->cur_bytes_,
- this->cur_length_,
- this->cur_count_,
- this->completion_port_));
- ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
-#endif /* ACE_HAS_DUMP */
-}
-
-#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
-
ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/Message_Queue.h b/ACE/ace/Message_Queue.h
index a3e81f02151..b5f847700fd 100644
--- a/ACE/ace/Message_Queue.h
+++ b/ACE/ace/Message_Queue.h
@@ -229,204 +229,6 @@ ACE_END_VERSIONED_NAMESPACE_DECL
// Include the templates here.
#include "ace/Message_Queue_T.h"
-ACE_BEGIN_VERSIONED_NAMESPACE_DECL
-
-#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
-/**
- * @class ACE_Message_Queue_NT
- *
- * @brief Message Queue implementation using IO completion port on NT.
- *
- * Implementation of a strip-downed ACE_Message_Queue using NT's
- * IO completion port mechanism.
- * @note *Many* ACE_Message_Queue features are not supported with
- * this implementation, including:
- * * <open> method have different signatures.
- * * <dequeue_head> *requires* that the ACE_Message_Block
- * pointer argument point to an ACE_Message_Block that was
- * allocated by the caller.
- * * <peek_dequeue_head>.
- * * <ACE_Message_Queue_Iterators>.
- * * No flow control.
- */
-class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
-{
-public:
- // = Initialization and termination methods.
- ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
-
- /**
- * Initialize the Message Queue by creating a new NT I/O completion
- * port. The first arguemnt specifies the number of threads
- * released by the MQ that are allowed to run concurrently. Return
- * 0 when succeeds, -1 otherwise.
- */
- virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
-
- /// Close down the underlying I/O completion port. You need to
- /// re-open the MQ after this function is executed.
- virtual int close (void);
-
- /// Close down the message queue and release all resources.
- virtual ~ACE_Message_Queue_NT (void);
-
- // = Enqueue and dequeue methods.
-
- /**
- * Enqueue an <ACE_Message_Block *> at the end of the queue.
- * Returns -1 on failure, else the number of items still on the
- * queue.
- */
- virtual int enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
- virtual int enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout = 0);
-
- /**
- * Dequeue and return the <ACE_Message_Block *> at the head of the
- * queue. Returns -1 on failure, else the number of items still on
- * the queue.
- */
- virtual int dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout = 0);
- virtual int dequeue (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout = 0);
-
- // = Check if queue is full/empty.
- /**
- * Always return false.
- */
-
- virtual bool is_full (void);
- /**
- * True if queue is empty, else false. Notice the return value is
- * only transient.
- */
- virtual bool is_empty (void);
-
- // = Queue statistic methods (transient.)
- /**
- * Number of total bytes on the queue, i.e., sum of the message
- * block sizes.
- */
- virtual size_t message_bytes (void);
-
- /**
- * Number of total length on the queue, i.e., sum of the message
- * block lengths.
- */
- virtual size_t message_length (void);
-
- /**
- * Number of total messages on the queue.
- */
- virtual size_t message_count (void);
-
- // = Manual changes to these stats (used when queued message blocks
- // change size or lengths).
- /**
- * New value of the number of total bytes on the queue, i.e., sum of
- * the message block sizes.
- */
- virtual void message_bytes (size_t new_size);
-
- /**
- * New value of the number of total length on the queue, i.e., sum
- * of the message block lengths.
- */
- virtual void message_length (size_t new_length);
-
- /// Get the max concurrent thread number.
- virtual DWORD max_threads (void);
-
- // = Activation control methods.
-
- /**
- * Deactivate the queue and wake up all threads waiting on the queue
- * so they can continue. No messages are removed from the queue,
- * however. Any other operations called until the queue is
- * activated again will immediately return -1 with @c errno
- * ESHUTDOWN.
- *
- * @retval The queue's state before this call.
- */
- virtual int deactivate (void);
-
- /**
- * Reactivate the queue so that threads can enqueue and dequeue
- * messages again. Returns the state of the queue before the call.
- */
- virtual int activate (void);
-
- /**
- * Pulse the queue to wake up any waiting threads. Changes the
- * queue state to PULSED; future enqueue/dequeue operations proceed
- * as in ACTIVATED state.
- *
- * @retval The queue's state before this call.
- */
- virtual int pulse (void);
-
- /// Returns true if the state of the queue is <DEACTIVATED>,
- /// but false if the queue's is <ACTIVATED> or <PULSED>.
- virtual int deactivated (void);
-
- // = Not currently implemented...
- int peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout = 0);
- ACE_Notification_Strategy *notification_strategy (void);
- void notification_strategy (ACE_Notification_Strategy *s);
-
- // = Notification hook.
-
- /// Dump the state of an object.
- virtual void dump (void) const;
-
- /// Get the handle to the underlying completion port.
- virtual ACE_HANDLE completion_port (void);
-
- /// Declare the dynamic allocation hooks.
- ACE_ALLOC_HOOK_DECLARE;
-
-private:
-
- // Disallow copying and assignment.
- ACE_Message_Queue_NT (const ACE_Message_Queue_NT &);
- void operator= (const ACE_Message_Queue_NT &);
-
-private:
- // = Internal states.
-
- /// Maximum threads that can be released (and run) concurrently.
- DWORD max_cthrs_;
-
- /// Current number of threads waiting to dequeue messages.
- DWORD cur_thrs_;
-
- /// Current number of bytes in queue.
- size_t cur_bytes_;
-
- /// Current length of messages in queue.
- size_t cur_length_;
-
- /// Current number of messages in the queue.
- size_t cur_count_;
-
- /**
- * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex
- * but since this class is only supported on NT, it's okay to use
- * ACE_Thread_Mutex here.
- */
- ACE_SYNCH_MUTEX lock_;
-
- /// Underlying NT IoCompletionPort.
- ACE_HANDLE completion_port_;
-
-};
-#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
-
-ACE_END_VERSIONED_NAMESPACE_DECL
-
#if defined (__ACE_INLINE__)
#include "ace/Message_Queue.inl"
#endif /* __ACE_INLINE__ */
diff --git a/ACE/ace/Message_Queue.inl b/ACE/ace/Message_Queue.inl
index 671ecc22040..aaadea9d3e6 100644
--- a/ACE/ace/Message_Queue.inl
+++ b/ACE/ace/Message_Queue.inl
@@ -2,10 +2,6 @@
//
// $Id$
-#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
-# include "ace/Guard_T.h"
-#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
-
ACE_BEGIN_VERSIONED_NAMESPACE_DECL
ACE_INLINE
@@ -13,124 +9,4 @@ ACE_Message_Queue_Base::ACE_Message_Queue_Base (void)
{
}
-#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
-
-ACE_INLINE int
-ACE_Message_Queue_NT::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *timeout)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::enqueue_tail");
- return this->enqueue (new_item, timeout);
-}
-
-ACE_INLINE int
-ACE_Message_Queue_NT::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
- return this->dequeue (first_item, timeout);
-}
-
-ACE_INLINE bool
-ACE_Message_Queue_NT::is_full (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::is_full");
- return false; // Always not full.
-}
-
-ACE_INLINE bool
-ACE_Message_Queue_NT::is_empty (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::is_empty");
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, false);
-
- return this->cur_bytes_ > 0 || this->cur_count_ > 0 ? false : true;
-}
-
-ACE_INLINE size_t
-ACE_Message_Queue_NT::message_bytes (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::message_bytes");
- // Accessing to size_t must be atomic.
- return this->cur_bytes_;
-}
-
-ACE_INLINE size_t
-ACE_Message_Queue_NT::message_length (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::message_length");
- // Accessing to size_t must be atomic.
- return this->cur_length_;
-}
-
-ACE_INLINE size_t
-ACE_Message_Queue_NT::message_count (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::message_count");
- // Accessing to size_t must be atomic.
- return this->cur_count_;
-}
-
-ACE_INLINE void
-ACE_Message_Queue_NT::message_bytes (size_t new_value)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::message_bytes");
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->cur_bytes_ = new_value;
-}
-
-ACE_INLINE void
-ACE_Message_Queue_NT::message_length (size_t new_value)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::message_length");
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->cur_length_ = new_value;
-}
-
-ACE_INLINE DWORD
-ACE_Message_Queue_NT::max_threads (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::max_threads");
- return this->max_cthrs_;
-}
-
-ACE_INLINE int
-ACE_Message_Queue_NT::deactivated (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::deactivated");
- // Accessing to int must be atomic.
- return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
-}
-
-ACE_INLINE ACE_HANDLE
-ACE_Message_Queue_NT::completion_port (void)
-{
- ACE_TRACE ("ACE_Message_Queue_NT::completion_port");
- return this->completion_port_;
-}
-
-ACE_INLINE int
-ACE_Message_Queue_NT::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *timeout)
-{
- ACE_UNUSED_ARG(first_item);
- ACE_UNUSED_ARG(timeout);
- ACE_NOTSUP_RETURN (-1);
-}
-
-ACE_INLINE ACE_Notification_Strategy *
-ACE_Message_Queue_NT::notification_strategy (void)
-{
- ACE_NOTSUP_RETURN (0);
-}
-
-ACE_INLINE void
-ACE_Message_Queue_NT::notification_strategy (ACE_Notification_Strategy *)
-{
-}
-
-#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
-
ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/Message_Queue_NT.cpp b/ACE/ace/Message_Queue_NT.cpp
new file mode 100644
index 00000000000..3c4fbe99c64
--- /dev/null
+++ b/ACE/ace/Message_Queue_NT.cpp
@@ -0,0 +1,237 @@
+// $Id$
+
+#include "ace/Message_Queue_NT.h"
+#include "ace/Log_Msg.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/Message_Queue_NT.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID (ace,
+ Message_Queue_NT,
+ "$Id$")
+
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+
+ACE_Message_Queue_NT::ACE_Message_Queue_NT (DWORD max_threads)
+ : max_cthrs_ (max_threads),
+ cur_thrs_ (0),
+ cur_bytes_ (0),
+ cur_length_ (0),
+ cur_count_ (0),
+ completion_port_ (ACE_INVALID_HANDLE)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
+ this->open (max_threads);
+}
+
+int
+ACE_Message_Queue_NT::open (DWORD max_threads)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::open");
+ this->max_cthrs_ = max_threads;
+ this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
+ 0,
+ ACE_Message_Queue_Base::ACTIVATED,
+ max_threads);
+ return (this->completion_port_ == 0 ? -1 : 0);
+}
+
+int
+ACE_Message_Queue_NT::close (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::close");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ this->deactivate ();
+ return (::CloseHandle (this->completion_port_) ? 0 : -1 );
+}
+
+ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
+ this->close ();
+}
+
+int
+ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ if (this->state_ != ACE_Message_Queue_Base::DEACTIVATED)
+ {
+ size_t const msize = new_item->total_size ();
+ size_t const mlength = new_item->total_length ();
+ // Note - we send ACTIVATED in the 3rd arg to tell the completion
+ // routine it's _NOT_ being woken up because of deactivate().
+ ULONG_PTR state_to_post;
+ state_to_post = ACE_Message_Queue_Base::ACTIVATED;
+ if (::PostQueuedCompletionStatus (this->completion_port_,
+ static_cast<DWORD> (msize),
+ state_to_post,
+ reinterpret_cast<LPOVERLAPPED> (new_item)))
+ {
+ // Update the states once I succeed.
+ this->cur_bytes_ += msize;
+ this->cur_length_ += mlength;
+ return ACE_Utils::truncate_cast<int> (++this->cur_count_);
+ }
+ }
+ else
+ errno = ESHUTDOWN;
+
+ // Fail to enqueue the message.
+ return -1;
+}
+
+int
+ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
+
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ // Make sure the MQ is not deactivated before proceeding.
+ if (this->state_ == ACE_Message_Queue_Base::DEACTIVATED)
+ {
+ errno = ESHUTDOWN; // Operation on deactivated MQ not allowed.
+ return -1;
+ }
+ else
+ ++this->cur_thrs_; // Increase the waiting thread count.
+ }
+
+ ULONG_PTR queue_state;
+ DWORD msize;
+ // Get a message from the completion port.
+ int retv = ::GetQueuedCompletionStatus (this->completion_port_,
+ &msize,
+ &queue_state,
+ reinterpret_cast<LPOVERLAPPED *> (&first_item),
+ (timeout == 0 ? INFINITE : timeout->msec ()));
+ {
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ --this->cur_thrs_; // Decrease waiting thread count.
+ if (retv)
+ {
+ if (queue_state == ACE_Message_Queue_Base::ACTIVATED)
+ { // Really get a valid MB from the queue.
+ --this->cur_count_;
+ this->cur_bytes_ -= msize;
+ this->cur_length_ -= first_item->total_length ();
+ return ACE_Utils::truncate_cast<int> (this->cur_count_);
+ }
+ else // Woken up by deactivate () or pulse ().
+ errno = ESHUTDOWN;
+ }
+ }
+ return -1;
+}
+
+int
+ACE_Message_Queue_NT::deactivate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ int const previous_state = this->state_;
+ if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
+ {
+ this->state_ = ACE_Message_Queue_Base::DEACTIVATED;
+
+ // Get the number of shutdown messages necessary to wake up all
+ // waiting threads.
+ DWORD cntr =
+ this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
+ while (cntr-- > 0)
+ ::PostQueuedCompletionStatus (this->completion_port_,
+ 0,
+ this->state_,
+ 0);
+ }
+ return previous_state;
+}
+
+int
+ACE_Message_Queue_NT::activate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::activate");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ int const previous_status = this->state_;
+ this->state_ = ACE_Message_Queue_Base::ACTIVATED;
+ return previous_status;
+}
+
+int
+ACE_Message_Queue_NT::pulse (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::pulse");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ int const previous_state = this->state_;
+ if (previous_state != ACE_Message_Queue_Base::DEACTIVATED)
+ {
+ this->state_ = ACE_Message_Queue_Base::PULSED;
+
+ // Get the number of shutdown messages necessary to wake up all
+ // waiting threads.
+
+ DWORD cntr =
+ this->cur_thrs_ - static_cast<DWORD> (this->cur_count_);
+ while (cntr-- > 0)
+ ::PostQueuedCompletionStatus (this->completion_port_,
+ 0,
+ this->state_,
+ 0);
+ }
+ return previous_state;
+}
+
+void
+ACE_Message_Queue_NT::dump (void) const
+{
+#if defined (ACE_HAS_DUMP)
+ ACE_TRACE ("ACE_Message_Queue_NT::dump");
+
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ switch (this->state_)
+ {
+ case ACE_Message_Queue_Base::ACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("state = ACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::DEACTIVATED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("state = DEACTIVATED\n")));
+ break;
+ case ACE_Message_Queue_Base::PULSED:
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("state = PULSED\n")));
+ break;
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("max_cthrs_ = %d\n")
+ ACE_TEXT ("cur_thrs_ = %d\n")
+ ACE_TEXT ("cur_bytes = %d\n")
+ ACE_TEXT ("cur_length = %d\n")
+ ACE_TEXT ("cur_count = %d\n")
+ ACE_TEXT ("completion_port_ = %x\n"),
+ this->max_cthrs_,
+ this->cur_thrs_,
+ this->cur_bytes_,
+ this->cur_length_,
+ this->cur_count_,
+ this->completion_port_));
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+#endif /* ACE_HAS_DUMP */
+}
+
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/Message_Queue_NT.h b/ACE/ace/Message_Queue_NT.h
new file mode 100644
index 00000000000..40de899f32a
--- /dev/null
+++ b/ACE/ace/Message_Queue_NT.h
@@ -0,0 +1,231 @@
+// -*- C++ -*-
+
+//=============================================================================
+/**
+ * @file Message_Queue_NT.h
+ *
+ * $Id$
+ *
+ * @author Douglas C. Schmidt <schmidt@cs.wustl.edu>
+ */
+//=============================================================================
+
+#ifndef ACE_MESSAGE_QUEUE_NT_H
+#define ACE_MESSAGE_QUEUE_NT_H
+#include /**/ "ace/pre.h"
+
+#include "ace/Message_Queue.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+# include "ace/Synch_Traits.h" /* Needed in ACE_Message_Queue_NT */
+# include "ace/Thread_Mutex.h" /* Needed in ACE_Message_Queue_NT */
+#endif
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+/**
+ * @class ACE_Message_Queue_NT
+ *
+ * @brief Message Queue implementation using IO completion port on NT.
+ *
+ * Implementation of a strip-downed ACE_Message_Queue using NT's
+ * IO completion port mechanism.
+ * @note *Many* ACE_Message_Queue features are not supported with
+ * this implementation, including:
+ * * <open> method have different signatures.
+ * * <dequeue_head> *requires* that the ACE_Message_Block
+ * pointer argument point to an ACE_Message_Block that was
+ * allocated by the caller.
+ * * <peek_dequeue_head>.
+ * * <ACE_Message_Queue_Iterators>.
+ * * No flow control.
+ */
+class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
+{
+public:
+ // = Initialization and termination methods.
+ ACE_Message_Queue_NT (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
+
+ /**
+ * Initialize the Message Queue by creating a new NT I/O completion
+ * port. The first arguemnt specifies the number of threads
+ * released by the MQ that are allowed to run concurrently. Return
+ * 0 when succeeds, -1 otherwise.
+ */
+ virtual int open (DWORD max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
+
+ /// Close down the underlying I/O completion port. You need to
+ /// re-open the MQ after this function is executed.
+ virtual int close (void);
+
+ /// Close down the message queue and release all resources.
+ virtual ~ACE_Message_Queue_NT (void);
+
+ // = Enqueue and dequeue methods.
+
+ /**
+ * Enqueue an <ACE_Message_Block *> at the end of the queue.
+ * Returns -1 on failure, else the number of items still on the
+ * queue.
+ */
+ virtual int enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ virtual int enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * Dequeue and return the <ACE_Message_Block *> at the head of the
+ * queue. Returns -1 on failure, else the number of items still on
+ * the queue.
+ */
+ virtual int dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+ virtual int dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+
+ // = Check if queue is full/empty.
+ /**
+ * Always return false.
+ */
+
+ virtual bool is_full (void);
+ /**
+ * True if queue is empty, else false. Notice the return value is
+ * only transient.
+ */
+ virtual bool is_empty (void);
+
+ // = Queue statistic methods (transient.)
+ /**
+ * Number of total bytes on the queue, i.e., sum of the message
+ * block sizes.
+ */
+ virtual size_t message_bytes (void);
+
+ /**
+ * Number of total length on the queue, i.e., sum of the message
+ * block lengths.
+ */
+ virtual size_t message_length (void);
+
+ /**
+ * Number of total messages on the queue.
+ */
+ virtual size_t message_count (void);
+
+ // = Manual changes to these stats (used when queued message blocks
+ // change size or lengths).
+ /**
+ * New value of the number of total bytes on the queue, i.e., sum of
+ * the message block sizes.
+ */
+ virtual void message_bytes (size_t new_size);
+
+ /**
+ * New value of the number of total length on the queue, i.e., sum
+ * of the message block lengths.
+ */
+ virtual void message_length (size_t new_length);
+
+ /// Get the max concurrent thread number.
+ virtual DWORD max_threads (void);
+
+ // = Activation control methods.
+
+ /**
+ * Deactivate the queue and wake up all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1 with @c errno
+ * ESHUTDOWN.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int deactivate (void);
+
+ /**
+ * Reactivate the queue so that threads can enqueue and dequeue
+ * messages again. Returns the state of the queue before the call.
+ */
+ virtual int activate (void);
+
+ /**
+ * Pulse the queue to wake up any waiting threads. Changes the
+ * queue state to PULSED; future enqueue/dequeue operations proceed
+ * as in ACTIVATED state.
+ *
+ * @retval The queue's state before this call.
+ */
+ virtual int pulse (void);
+
+ /// Returns true if the state of the queue is <DEACTIVATED>,
+ /// but false if the queue's is <ACTIVATED> or <PULSED>.
+ virtual int deactivated (void);
+
+ // = Not currently implemented...
+ int peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+ ACE_Notification_Strategy *notification_strategy (void);
+ void notification_strategy (ACE_Notification_Strategy *s);
+
+ // = Notification hook.
+
+ /// Dump the state of an object.
+ virtual void dump (void) const;
+
+ /// Get the handle to the underlying completion port.
+ virtual ACE_HANDLE completion_port (void);
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+private:
+
+ // Disallow copying and assignment.
+ ACE_Message_Queue_NT (const ACE_Message_Queue_NT &);
+ void operator= (const ACE_Message_Queue_NT &);
+
+private:
+ // = Internal states.
+
+ /// Maximum threads that can be released (and run) concurrently.
+ DWORD max_cthrs_;
+
+ /// Current number of threads waiting to dequeue messages.
+ DWORD cur_thrs_;
+
+ /// Current number of bytes in queue.
+ size_t cur_bytes_;
+
+ /// Current length of messages in queue.
+ size_t cur_length_;
+
+ /// Current number of messages in the queue.
+ size_t cur_count_;
+
+ /**
+ * Synchronizer. This should really be an ACE_Recursive_Thread_Mutex
+ * but since this class is only supported on NT, it's okay to use
+ * ACE_Thread_Mutex here.
+ */
+ ACE_SYNCH_MUTEX lock_;
+
+ /// Underlying NT IoCompletionPort.
+ ACE_HANDLE completion_port_;
+
+};
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ACE_END_VERSIONED_NAMESPACE_DECL
+
+#if defined (__ACE_INLINE__)
+#include "ace/Message_Queue.inl"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+#endif /* ACE_MESSAGE_QUEUE_NT_H */
diff --git a/ACE/ace/Message_Queue_NT.inl b/ACE/ace/Message_Queue_NT.inl
new file mode 100644
index 00000000000..6db82111d5b
--- /dev/null
+++ b/ACE/ace/Message_Queue_NT.inl
@@ -0,0 +1,131 @@
+// -*- C++ -*-
+//
+// $Id$
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+# include "ace/Guard_T.h"
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ACE_BEGIN_VERSIONED_NAMESPACE_DECL
+
+#if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
+
+ACE_INLINE int
+ACE_Message_Queue_NT::enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::enqueue_tail");
+ return this->enqueue (new_item, timeout);
+}
+
+ACE_INLINE int
+ACE_Message_Queue_NT::dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
+ return this->dequeue (first_item, timeout);
+}
+
+ACE_INLINE bool
+ACE_Message_Queue_NT::is_full (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::is_full");
+ return false; // Always not full.
+}
+
+ACE_INLINE bool
+ACE_Message_Queue_NT::is_empty (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::is_empty");
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, false);
+
+ return this->cur_bytes_ > 0 || this->cur_count_ > 0 ? false : true;
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::message_bytes (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_bytes");
+ // Accessing to size_t must be atomic.
+ return this->cur_bytes_;
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::message_length (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_length");
+ // Accessing to size_t must be atomic.
+ return this->cur_length_;
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::message_count (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_count");
+ // Accessing to size_t must be atomic.
+ return this->cur_count_;
+}
+
+ACE_INLINE void
+ACE_Message_Queue_NT::message_bytes (size_t new_value)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_bytes");
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->cur_bytes_ = new_value;
+}
+
+ACE_INLINE void
+ACE_Message_Queue_NT::message_length (size_t new_value)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_length");
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->cur_length_ = new_value;
+}
+
+ACE_INLINE DWORD
+ACE_Message_Queue_NT::max_threads (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::max_threads");
+ return this->max_cthrs_;
+}
+
+ACE_INLINE int
+ACE_Message_Queue_NT::deactivated (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::deactivated");
+ // Accessing to int must be atomic.
+ return this->state_ == ACE_Message_Queue_Base::DEACTIVATED;
+}
+
+ACE_INLINE ACE_HANDLE
+ACE_Message_Queue_NT::completion_port (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::completion_port");
+ return this->completion_port_;
+}
+
+ACE_INLINE int
+ACE_Message_Queue_NT::peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_UNUSED_ARG(first_item);
+ ACE_UNUSED_ARG(timeout);
+ ACE_NOTSUP_RETURN (-1);
+}
+
+ACE_INLINE ACE_Notification_Strategy *
+ACE_Message_Queue_NT::notification_strategy (void)
+{
+ ACE_NOTSUP_RETURN (0);
+}
+
+ACE_INLINE void
+ACE_Message_Queue_NT::notification_strategy (ACE_Notification_Strategy *)
+{
+}
+
+#endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
+
+ACE_END_VERSIONED_NAMESPACE_DECL
diff --git a/ACE/ace/ace.mpc b/ACE/ace/ace.mpc
index a6053729a3d..3a8a1609054 100644
--- a/ACE/ace/ace.mpc
+++ b/ACE/ace/ace.mpc
@@ -120,6 +120,7 @@ project(ACE) : acedefaults, install, other, codecs, token, svcconf, uuid, fileca
MEM_Stream.cpp
Message_Block.cpp
Message_Queue.cpp
+ Message_Queue_NT.cpp
Message_Queue_Vx.cpp
Method_Request.cpp
MMAP_Memory_Pool.cpp