summaryrefslogtreecommitdiff
path: root/ace
diff options
context:
space:
mode:
authornanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-12-11 08:12:12 +0000
committernanbor <nanbor@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-12-11 08:12:12 +0000
commitfaa3a50eb38ab1271067493effed24970671c3cd (patch)
tree19116443796e651c37c02da3de798f2c65a10a78 /ace
parent5426463c5f26ca39ee0bf9f0ce0d5bc1c10bca2f (diff)
downloadATCD-faa3a50eb38ab1271067493effed24970671c3cd.tar.gz
ACE_Message_Queue_NT changes.
Diffstat (limited to 'ace')
-rw-r--r--ace/Message_Queue.cpp169
-rw-r--r--ace/Message_Queue.h205
-rw-r--r--ace/Message_Queue.i67
-rw-r--r--ace/Message_Queue_T.cpp15
-rw-r--r--ace/Message_Queue_T.h16
-rw-r--r--ace/Message_Queue_T.i10
6 files changed, 480 insertions, 2 deletions
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 51156850dd0..445028cfd10 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -262,4 +262,173 @@ ACE_Message_Queue_Vx::peek_dequeue_head (ACE_Message_Block *&,
#endif /* VXWORKS */
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+
+ACE_Message_Queue_NT::ACE_Message_Queue_NT (size_t max_threads)
+ : max_cthrs_ (max_threads),
+ cur_thrs_ (0),
+ cur_bytes_ (0),
+ cur_count_ (0),
+ deactivated_ (0),
+ completion_port_ (ACE_INVALID_HANDLE)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::ACE_Message_Queue_NT");
+ this->open (max_threads);
+}
+
+int
+ACE_Message_Queue_NT::open (size_t max_threads)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::open");
+ this->max_cthrs_ = max_threads;
+ this->completion_port_ = ::CreateIoCompletionPort (ACE_INVALID_HANDLE,
+ NULL,
+ ACE_Message_Queue_Base::WAS_ACTIVE,
+ max_threads);
+ return (this->completion_port_ == NULL ? -1 : 0);
+}
+
+int
+ACE_Message_Queue_NT::close (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::close");
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ this->deactivate ();
+ return (::CloseHandle (this->completion_port_) ? 0 : -1 );
+}
+
+ACE_Message_Queue_NT::~ACE_Message_Queue_NT (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::~ACE_Message_Queue_NT");
+ this->close ();
+}
+
+int
+ACE_Message_Queue_NT::enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::enqueue");
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ if (!this->deactivated_)
+ {
+ size_t msize = new_item->size ();
+ if (::PostQueuedCompletionStatus (this->completion_port_,
+ msize,
+ this->deactivated_,
+ ACE_reinterpret_cast (LPOVERLAPPED, new_item)))
+ {
+ // Update the states once I succeed.
+ this->cur_bytes_ += msize;
+ return ++this->cur_count_;
+ }
+ }
+ else
+ errno = ESHUTDOWN;
+
+ // Fail to enqueue the message.
+ return -1;
+}
+
+int
+ACE_Message_Queue_NT::dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::dequeue_head");
+
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ if (this->deactivated_) // Make sure the MQ is not deactivated before
+ { // I proceed.
+ errno = ESHUTDOWN; // Operation on deactivated MQ not allowed.
+ return -1;
+ }
+ else
+ ++this->cur_thrs_; // Increase the waiting thread count.
+ }
+
+ DWORD shutdown;
+ DWORD msize;
+ // Get a message from the completion port.
+ int retv = ::GetQueuedCompletionStatus (this->completion_port_,
+ &msize,
+ &shutdown,
+ ACE_reinterpret_cast (LPOVERLAPPED *, &first_item),
+ (timeout == 0 ? INFINITE : timeout->msec ()));
+ {
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ --this->cur_thrs_; // Decrease waiting thread count.
+ if (retv)
+ {
+ if (!shutdown)
+ { // Really get a valid MB from the queue.
+ --this->cur_count_;
+ this->cur_bytes_ -= msize;
+ return this->cur_count_;
+ }
+ else // I am woken up by deactivate ().
+ errno = ESHUTDOWN;
+ }
+ }
+ return -1;
+}
+
+int
+ACE_Message_Queue_NT::deactivate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::deactivate");
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+
+ if (this->deactivated_) // Check if I have been deactivated already.
+ return ACE_Message_Queue_Base::WAS_INACTIVE;
+
+ this->deactivated_ = 1;
+
+ // Get the number of shutdown messages necessary to wake up
+ // all waiting threads.
+
+ for (size_t cntr = this->cur_thrs_ - this->cur_count_;
+ cntr > 0; cntr++)
+ ::PostQueuedCompletionStatus (this->completion_port_,
+ 0,
+ this->deactivated_,
+ NULL);
+ return ACE_Message_Queue_Base::WAS_ACTIVE;
+}
+
+int
+ACE_Message_Queue_NT::activate (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::activate");
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ if (!this->deactivated_)
+ return ACE_Message_Queue_Base::WAS_ACTIVE;
+
+ this->deactivated_ = 0;
+ return ACE_Message_Queue_Base::WAS_INACTIVE;
+}
+
+void
+ACE_Message_Queue_NT::dump (void) const
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::dump");
+
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("deactivated = %d\n")
+ ASYS_TEXT ("max_cthrs_ = %d\n")
+ ASYS_TEXT ("cur_thrs_ = %d\n")
+ ASYS_TEXT ("cur_bytes = %d\n")
+ ASYS_TEXT ("cur_count = %d\n")
+ ASYS_TEXT ("completion_port_ = %x\n"),
+ this->deactivated_,
+ this->max_cthrs_,
+ this->cur_thrs_,
+ this->cur_bytes_,
+ this->cur_count_,
+ this->completion_port_));
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
+
#endif /* ACE_MESSAGE_QUEUE_C */
diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h
index e9766f1ac95..76f7b05467c 100644
--- a/ace/Message_Queue.h
+++ b/ace/Message_Queue.h
@@ -52,6 +52,78 @@ public:
WAS_INACTIVE = 2
// Message queue was inactive before activate() or deactivate().
};
+
+ ACE_Message_Queue_Base (void);
+
+ virtual int close (void) = 0;
+ // Close down the message queue and release all resources.
+
+ virtual ~ACE_Message_Queue_Base (void) = 0;
+ // Close down the message queue and release all resources.
+
+ // = Enqueue and dequeue methods.
+
+ // For the following enqueue and dequeue methods if <timeout> == 0,
+ // the caller will block until action is possible, else will wait
+ // until the absolute time specified in *<timeout> elapses). These
+ // calls will return, however, when queue is closed, deactivated,
+ // when a signal occurs, or if the time specified in timeout
+ // elapses, (in which case errno = EWOULDBLOCK).
+
+ virtual int enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0) = 0;
+ // Enqueue a <ACE_Message_Block *> into the tail of the queue.
+ // Return -1 on failure, number of items in queue otherwise.
+
+ virtual int dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0) = 0;
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue. Returns -1 on failure, else the number of items still on
+ // the queue.
+
+ // = Check if queue is full/empty.
+ virtual int is_full (void) = 0;
+ // True if queue is full, else false.
+ virtual int is_empty (void) = 0;
+ // True if queue is empty, else false.
+
+ // = Queue statistic methods.
+ virtual size_t message_bytes (void) = 0;
+ // Number of total bytes on the queue.
+ virtual size_t message_count (void) = 0;
+ // Number of total messages on the queue.
+
+ // = Activation control methods.
+
+ virtual int deactivate (void) = 0;
+ // Deactivate the queue and wakeup all threads waiting on the queue
+ // so they can continue. No messages are removed from the queue,
+ // however. Any other operations called until the queue is
+ // activated again will immediately return -1 with <errno> ==
+ // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ // call and WAS_ACTIVE if queue was active before the call.
+
+ virtual int activate (void) = 0;
+ // Reactivate the queue so that threads can enqueue and dequeue
+ // messages again. Returns WAS_INACTIVE if queue was inactive
+ // before the call and WAS_ACTIVE if queue was active before the
+ // call.
+
+ virtual int deactivated (void) = 0;
+ // Returns true if <deactivated_> is enabled.
+
+ // = Notification hook.
+
+ virtual void dump (void) const = 0;
+ // Dump the state of an object.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+private:
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_Base &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_Base (const ACE_Message_Queue_Base &))
};
// Include the templates here.
@@ -198,6 +270,139 @@ private:
};
#endif /* VXWORKS */
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+class ACE_Export ACE_Message_Queue_NT : public ACE_Message_Queue_Base
+{
+ // = TITLE
+ // Message Queue implementation using IO completion port on NT.
+ //
+ // = DESCRIPTION
+ // Implementation of a strip-downed ACE_Message_Queue using NT's
+ // IO completion port mechanism.
+ //
+ // NOTE: *Many* ACE_Message_Queue features are not supported with
+ // this implementation, including:
+ // * open method have different signatures.
+ // * dequeue_head () *requires* that the ACE_Message_Block
+ // pointer argument point to an ACE_Message_Block that was
+ // allocated by the caller.
+ // * peek_dequeue_head ().
+ // * ACE_Message_Queue_Iterators.
+ // * No flow control.
+ // * Message_Block chains. The continuation field of ACE_Message_Block
+ // * is ignored; only the first block of a fragment chain is
+ // * recognized.
+public:
+ // = Initialization and termination methods.
+ ACE_Message_Queue_NT (size_t max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
+
+ virtual int open (size_t max_threads = ACE_Message_Queue_Base::DEFAULT_HWM);
+ // Initialize the Message Queue by creating a new NT I/O completion
+ // port. The first arguemnt specifies the number of threads
+ // released by the MQ that are allowed to run concurrently. Return
+ // 0 when succeeds, -1 otherwise.
+
+ virtual int close (void);
+ // Close down the underlying I/O completion port. You need to
+ // re-open the MQ after this function is executed.
+
+ virtual ~ACE_Message_Queue_NT (void);
+ // Close down the message queue and release all resources.
+
+ // = Enqueue and dequeue methods.
+
+ virtual int enqueue (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0);
+ // Enqueue an <ACE_Message_Block *> at the end of the queue.
+ // Returns -1 on failure, else the number of items still on the
+ // queue.
+
+ virtual int dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue. Returns -1 on failure, else the number of items still on
+ // the queue.
+
+ // = Check if queue is full/empty.
+ virtual int is_full (void);
+ // Always return false.
+ virtual int is_empty (void);
+ // True if queue is empty, else false. Notice the return value is
+ // only transient.
+
+ // = Queue statistic methods (transient.)
+ virtual size_t message_bytes (void);
+ // Number of total bytes on the queue.
+ virtual size_t message_count (void);
+ // Number of total messages on the queue.
+
+ virtual size_t max_threads (void);
+ // Get the max concurrent thread number.
+
+ // = Activation control methods.
+
+ virtual int deactivate (void);
+ // Deactivate the queue and wakeup all threads waiting on the queue
+ // so they can continue. Messages already in the queue get removed.
+ // If there are more messages in the queue than there are threads
+ // waiting on the queue, the left over messages will not be removed.
+ // Any other enqueue/dequeue operations called until the queue is
+ // activated again will immediately return -1 with <errno> ==
+ // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ // call and WAS_ACTIVE if queue was active before the call.
+
+ virtual int activate (void);
+ // Reactivate the queue so that threads can enqueue and dequeue
+ // messages again. Returns WAS_INACTIVE if queue was inactive
+ // before the call and WAS_ACTIVE if queue was active before the
+ // call.
+
+ virtual int deactivated (void);
+ // Returns true if <deactivated_> is enabled.
+
+ // = Notification hook.
+
+ virtual void dump (void) const;
+ // Dump the state of an object.
+
+ virtual ACE_HANDLE completion_port (void);
+ // Get the handle to the underlying completion port.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+private:
+ // = Internal states.
+
+ size_t max_cthrs_;
+ // Maximum threads that can be released (and run) concurrently.
+
+ size_t cur_thrs_;
+ // Current number of threads waiting to dequeue messages.
+
+ size_t cur_bytes_;
+ // Current number of bytes in queue.
+
+ size_t cur_count_;
+ // Current number of messages in the queue.
+
+ ACE_Thread_Mutex lock_;
+ // Synchronizer. This should really be an ACE_Recursive_Thread_Mutex
+ // but since this class is only supported on NT, it's okay to use
+ // ACE_Thread_Mutex here.
+
+ int deactivated_;
+ // Indicates that the queue is inactive.
+
+ ACE_HANDLE completion_port_;
+ // Underlying NT IoCompletionPort.
+
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue_NT &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue_NT (const ACE_Message_Queue_NT &))
+};
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
+
// This must go here to avoid problems with circular includes.
#include "ace/Strategies.h"
diff --git a/ace/Message_Queue.i b/ace/Message_Queue.i
index 90ef02953b9..e1f953b0f7a 100644
--- a/ace/Message_Queue.i
+++ b/ace/Message_Queue.i
@@ -1,6 +1,16 @@
/* -*- C++ -*- */
// $Id$
+ACE_INLINE
+ACE_Message_Queue_Base::ACE_Message_Queue_Base (void)
+{
+}
+
+ACE_INLINE
+ACE_Message_Queue_Base::~ACE_Message_Queue_Base (void)
+{
+}
+
#if defined (VXWORKS)
// Specialization to use native VxWorks Message Queues.
@@ -81,3 +91,60 @@ ACE_Message_Queue_Vx::message_count (void)
}
#endif /* VXWORKS */
+
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+ACE_INLINE int
+ACE_Message_Queue_NT::is_full (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::is_full");
+ return 0; // Always not full.
+}
+
+ACE_INLINE int
+ACE_Message_Queue_NT::is_empty (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::is_empty");
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
+
+ return (this->cur_bytes_ > 0 && this->cur_count_ > 0 ? 1 : 0);
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::message_bytes (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_bytes");
+ // Accessing to size_t must be atomic.
+ return this->cur_bytes_;
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::message_count (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::message_count");
+ // Accessing to size_t must be atomic.
+ return this->cur_count_;
+}
+
+ACE_INLINE size_t
+ACE_Message_Queue_NT::max_threads (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::max_threads");
+ return this->max_cthrs_;
+}
+
+ACE_INLINE int
+ACE_Message_Queue_NT::deactivated (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::ceactivated");
+ // Accessing to int must be atomic.
+ return this->deactivated_;
+}
+
+ACE_INLINE ACE_HANDLE
+ACE_Message_Queue_NT::completion_port (void)
+{
+ ACE_TRACE ("ACE_Message_Queue_NT::completion_port");
+ return this->completion_port_;
+}
+
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index ff1df7d53d5..654353c7025 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -1705,6 +1705,21 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_me
}
// factory method for a wrapped VxWorks message queue
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue_NT *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_NT_message_queue (size_t max_threads)
+{
+ ACE_Message_Queue_NT *tmp;
+
+ ACE_NEW_RETURN (tmp,
+ ACE_Message_Queue_NT (max_threads);
+ 0);
+ return tmp;
+}
+
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
#endif /* defined (VXWORKS) */
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index e5628daffce..d13250fa005 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -27,6 +27,10 @@
class ACE_Message_Queue_Vx;
#endif /* defined (VXWORKS) */
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+class ACE_Message_Queue_NT;
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
+
template <ACE_SYNCH_DECL>
class ACE_Message_Queue : public ACE_Message_Queue_Base
{
@@ -132,6 +136,10 @@ public:
// Returns -1 on failure, else the number of items still on the
// queue.
+ virtual int dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout = 0);
+ // Same as the following.
+
virtual int dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
// Dequeue and return the <ACE_Message_Block *> at the head of the
@@ -626,6 +634,14 @@ public:
// factory method for a wrapped VxWorks message queue
#endif /* defined (VXWORKS) */
+
+#if defined (ACE_WIN32) && (ACE_HAS_WINNT4 != 0)
+
+ static ACE_Message_Queue_NT *
+ create_NT_message_queue (size_t max_threads);
+ // factory method for a NT message queue.
+
+#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
};
#if defined (__ACE_INLINE__)
diff --git a/ace/Message_Queue_T.i b/ace/Message_Queue_T.i
index 1391f82a99d..b5b40a48d07 100644
--- a/ace/Message_Queue_T.i
+++ b/ace/Message_Queue_T.i
@@ -1,6 +1,14 @@
/* -*- C++ -*- */
// $Id$
+template <ACE_SYNCH_DECL> ACE_INLINE int
+ACE_Message_Queue<ACE_SYNCH_USE>::dequeue (ACE_Message_Block *&first_item,
+ ACE_Time_Value *timeout)
+{
+ ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy");
+ return this->dequeue (first_item, timeout);
+}
+
template <ACE_SYNCH_DECL> ACE_INLINE ACE_Notification_Strategy *
ACE_Message_Queue<ACE_SYNCH_USE>::notification_strategy (void)
{
@@ -142,5 +150,3 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
}
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
-
-