summaryrefslogtreecommitdiff
path: root/ace/Message_Queue_T.h
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>2001-01-26 18:44:08 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>2001-01-26 18:44:08 +0000
commit991986bf2f012206fe7e29f4d6600093141ca2e5 (patch)
tree9906d454611b0ddf2904537f5d6d5456218b2b1a /ace/Message_Queue_T.h
parent0e7e3bcf2c5abe39e892c578052a054cd87e4314 (diff)
downloadATCD-991986bf2f012206fe7e29f4d6600093141ca2e5.tar.gz
ChangeLogTag:Fri Jan 26 11:18:15 2001 Douglas C. Schmidt <schmidt@ace.cs.wustl.edu>
Diffstat (limited to 'ace/Message_Queue_T.h')
-rw-r--r--ace/Message_Queue_T.h311
1 files changed, 302 insertions, 9 deletions
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index 7cc47832eed..b26a58c2b8d 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -36,7 +36,7 @@ class ACE_Message_Queue_NT;
* queueing facilities in System V STREAMs.
*
* An <ACE_Message_Queue> is the central queueing facility for
- * messages in the ASX framework. If <ACE_SYNCH_DECL> is
+ * messages in the ACE framework. If <ACE_SYNCH_DECL> is
* <ACE_MT_SYNCH> then all operations are thread-safe.
* Otherwise, if it's <ACE_NULL_SYNCH> then there's no locking
* overhead.
@@ -185,20 +185,24 @@ public:
// = Check if queue is full/empty.
/// True if queue is full, else false.
- /// True if queue is empty, else false.
virtual int is_full (void);
+ /// True if queue is empty, else false.
virtual int is_empty (void);
// = Queue statistic methods.
/**
* Number of total bytes on the queue, i.e., sum of the message
* block sizes.
+ */
+ virtual size_t message_bytes (void);
+ /**
* Number of total length on the queue, i.e., sum of the message
* block lengths.
- * Number of total messages on the queue.
*/
- virtual size_t message_bytes (void);
virtual size_t message_length (void);
+ /**
+ * Number of total messages on the queue.
+ */
virtual size_t message_count (void);
// = Manual changes to these stats (used when queued message blocks
@@ -206,29 +210,35 @@ public:
/**
* New value of the number of total bytes on the queue, i.e., sum of
* the message block sizes.
+ */
+ virtual void message_bytes (size_t new_size);
+ /**
* New value of the number of total length on the queue, i.e., sum
* of the message block lengths.
*/
- virtual void message_bytes (size_t new_size);
virtual void message_length (size_t new_length);
// = Flow control methods.
/**
* Get high watermark.
+ */
+ virtual size_t high_water_mark (void);
+ /**
* Set the high watermark, which determines how many bytes can be
* stored in a queue before it's considered "full."
*/
- virtual size_t high_water_mark (void);
virtual void high_water_mark (size_t hwm);
/**
* Get low watermark.
+ */
+ virtual size_t low_water_mark (void);
+ /**
* Set the low watermark, which determines how many bytes must be in
* the queue before supplier threads are allowed to enqueue
* additional <ACE_Message_Block>s.
*/
- virtual size_t low_water_mark (void);
virtual void low_water_mark (size_t lwm);
// = Activation control methods.
@@ -275,10 +285,8 @@ public:
/// Returns a reference to the lock used by the <ACE_Message_Queue>.
ACE_SYNCH_MUTEX_T &lock (void)
{
- //
// The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the
// header file (j.russell.noseworthy@objectsciences.com)
- //
return this->lock_;
}
@@ -741,6 +749,291 @@ public:
#endif /* ACE_WIN32 && ACE_HAS_WINNT4 != 0 */
};
+/**
+ * @class ACE_Message_Queue_Ex
+ *
+ * @brief A threaded message queueing facility, modeled after the
+ * queueing facilities in System V STREAMs.
+ *
+ * An <ACE_Message_Queue_Ex> is the templatized wrapper of the central
+ * queueing facility for messages in the ACE framework. If
+ * <ACE_SYNCH_DECL> is <ACE_MT_SYNCH> then all operations are
+ * thread-safe. Otherwise, if it's <ACE_NULL_SYNCH> then there's no
+ * locking overhead.
+ */
+template <class ACE_MESSAGE_TYPE, ACE_SYNCH_DECL>
+class ACE_Message_Queue_Ex
+{
+public:
+
+ // = Defualt priority value.
+ enum
+ {
+ DEFUALT_PRIORITY = 0
+ };
+
+#if 0
+ // @@ Iterators are not implemented yet...
+
+ friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>;
+ friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>;
+
+ // = Traits
+ typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>
+ ITERATOR;
+ typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>
+ REVERSE_ITERATOR;
+#endif /* 0 */
+
+ // = Initialization and termination methods.
+
+ /**
+ * Initialize an <ACE_Message_Queue>. The <high_water_mark>
+ * determines how many bytes can be stored in a queue before it's
+ * considered "full." Supplier threads must block until the queue
+ * is no longer full. The <low_water_mark> determines how many
+ * bytes must be in the queue before supplier threads are allowed to
+ * enqueue additional <ACE_Message_Block>s. By default, the
+ * <high_water_mark> equals the <low_water_mark>, which means that
+ * suppliers will be able to enqueue new messages as soon as a
+ * consumer removes any message from the queue. Making the
+ * <low_water_mark> smaller than the <high_water_mark> forces
+ * consumers to drain more messages from the queue before suppliers
+ * can enqueue new messages, which can minimize the "silly window
+ * syndrome."
+ */
+ ACE_Message_Queue_Ex (size_t high_water_mark = ACE_Message_Queue_Base::DEFAULT_HWM,
+ size_t low_water_mark = ACE_Message_Queue_Base::DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ /**
+ * Initialize an <ACE_Message_Queue>. The <high_water_mark>
+ * determines how many bytes can be stored in a queue before it's
+ * considered "full." Supplier threads must block until the queue
+ * is no longer full. The <low_water_mark> determines how many
+ * bytes must be in the queue before supplier threads are allowed to
+ * enqueue additional <ACE_Message_Block>s. By default, the
+ * <high_water_mark> equals the <low_water_mark>, which means that
+ * suppliers will be able to enqueue new messages as soon as a
+ * consumer removes any message from the queue. Making the
+ * <low_water_mark> smaller than the <high_water_mark> forces
+ * consumers to drain more messages from the queue before suppliers
+ * can enqueue new messages, which can minimize the "silly window
+ * syndrome."
+ */
+ int open (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
+ size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ /// Close down the message queue and release all resources.
+ int close (void);
+
+ /// Close down the message queue and release all resources.
+ ~ACE_Message_Queue_Ex (void);
+
+ // = Enqueue and dequeue methods.
+
+ // For the following enqueue and dequeue methods if <timeout> == 0,
+ // the caller will block until action is possible, else will wait
+ // until the absolute time specified in *<timeout> elapses). These
+ // calls will return, however, when queue is closed, deactivated,
+ // when a signal occurs, or if the time specified in timeout
+ // elapses, (in which case errno = EWOULDBLOCK).
+
+ /**
+ * Retrieve the first <ACE_Message_Block> without removing it. Note
+ * that <timeout> uses <{absolute}> time rather than <{relative}>
+ * time. If the <timeout> elapses without receiving a message -1 is
+ * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
+ * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
+ * Otherwise, returns -1 on failure, else the number of items still
+ * on the queue.
+ */
+ int peek_dequeue_head (ACE_MESSAGE_TYPE *&first_item,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * Enqueue an <ACE_Message_Block *> into the <Message_Queue> in
+ * accordance with its <msg_priority> (0 is lowest priority). FIFO
+ * order is maintained when messages of the same priority are
+ * inserted consecutively. Note that <timeout> uses <{absolute}>
+ * time rather than <{relative}> time. If the <timeout> elapses
+ * without receiving a message -1 is returned and <errno> is set to
+ * <EWOULDBLOCK>. If the queue is deactivated -1 is returned and
+ * <errno> is set to <ESHUTDOWN>. Otherwise, returns -1 on failure,
+ * else the number of items still on the queue.
+ */
+ int enqueue_prio (ACE_MESSAGE_TYPE *new_item,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * This is an alias for <enqueue_prio>. It's only here for
+ * backwards compatibility and will go away in a subsequent release.
+ * Please use <enqueue_prio> instead. Note that <timeout> uses
+ * <{absolute}> time rather than <{relative}> time.
+ */
+ int enqueue (ACE_MESSAGE_TYPE *new_item,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * Enqueue an <ACE_Message_Block *> at the end of the queue. Note
+ * that <timeout> uses <{absolute}> time rather than <{relative}>
+ * time. If the <timeout> elapses without receiving a message -1 is
+ * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
+ * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
+ * Otherwise, returns -1 on failure, else the number of items still
+ * on the queue.
+ */
+ int enqueue_tail (ACE_MESSAGE_TYPE *new_item,
+ ACE_Time_Value *timeout = 0);
+
+ /**
+ * Enqueue an <ACE_Message_Block *> at the head of the queue. Note
+ * that <timeout> uses <{absolute}> time rather than <{relative}>
+ * time. If the <timeout> elapses without receiving a message -1 is
+ * returned and <errno> is set to <EWOULDBLOCK>. If the queue is
+ * deactivated -1 is returned and <errno> is set to <ESHUTDOWN>.
+ * Otherwise, returns -1 on failure, else the number of items still
+ * on the queue.
+ */
+ int enqueue_head (ACE_MESSAGE_TYPE *new_item,
+ ACE_Time_Value *timeout = 0);
+
+ /// This method is an alias for the following <dequeue_head> method.
+ int dequeue (ACE_MESSAGE_TYPE *&first_item,
+ ACE_Time_Value *timeout = 0);
+ // This method is an alias for the following <dequeue_head> method.
+
+ /**
+ * Dequeue and return the <ACE_Message_Block *> at the head of the
+ * queue. Note that <timeout> uses <{absolute}> time rather than
+ * <{relative}> time. If the <timeout> elapses without receiving a
+ * message -1 is returned and <errno> is set to <EWOULDBLOCK>. If
+ * the queue is deactivated -1 is returned and <errno> is set to
+ * <ESHUTDOWN>. Otherwise, returns -1 on failure, else the number
+ * of items still on the queue.
+ */
+ int dequeue_head (ACE_MESSAGE_TYPE *&first_item,
+ ACE_Time_Value *timeout = 0);
+
+ // = Check if queue is full/empty.
+ /// True if queue is full, else false.
+ int is_full (void);
+ /// True if queue is empty, else false.
+ int is_empty (void);
+
+
+ // = Queue statistic methods.
+ /**
+ * Number of total bytes on the queue, i.e., sum of the message
+ * block sizes.
+ */
+ size_t message_bytes (void);
+ /**
+ * Number of total length on the queue, i.e., sum of the message
+ * block lengths.
+ */
+ size_t message_length (void);
+ /**
+ * Number of total messages on the queue.
+ */
+ size_t message_count (void);
+
+ // = Manual changes to these stats (used when queued message blocks
+ // change size or lengths).
+ /**
+ * New value of the number of total bytes on the queue, i.e., sum of
+ * the message block sizes.
+ */
+ virtual void message_bytes (size_t new_size);
+ /**
+ * New value of the number of total length on the queue, i.e., sum
+ * of the message block lengths.
+ */
+ virtual void message_length (size_t new_length);
+
+ // = Flow control methods.
+ /**
+ * Get high watermark.
+ */
+ virtual size_t high_water_mark (void);
+ /**
+ * Set the high watermark, which determines how many bytes can be
+ * stored in a queue before it's considered "full."
+ */
+ virtual void high_water_mark (size_t hwm);
+
+ /**
+ * Get low watermark.
+ */
+ virtual size_t low_water_mark (void);
+ /**
+ * Set the low watermark, which determines how many bytes must be in
+ * the queue before supplier threads are allowed to enqueue
+ * additional <ACE_Message_Block>s.
+ */
+ virtual void low_water_mark (size_t lwm);
+
+ // = Activation control methods.
+
+ /**
+ * Deactivate the queue and wakeup all threads waiting on the queue
+ * so they can continue. No messages are removed from the queue,
+ * however. Any other operations called until the queue is
+ * activated again will immediately return -1 with <errno> ==
+ * ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the
+ * call and WAS_ACTIVE if queue was active before the call.
+ */
+ int deactivate (void);
+
+ /**
+ * Reactivate the queue so that threads can enqueue and dequeue
+ * messages again. Returns WAS_INACTIVE if queue was inactive
+ * before the call and WAS_ACTIVE if queue was active before the
+ * call.
+ */
+ int activate (void);
+
+ /// Returns true if <deactivated_> is enabled.
+ int deactivated (void);
+
+ // = Notification hook.
+
+ /**
+ * This hook is automatically invoked by <enqueue_head>,
+ * <enqueue_tail>, and <enqueue_prio> when a new item is inserted
+ * into the queue. Subclasses can override this method to perform
+ * specific notification strategies (e.g., signaling events for a
+ * <WFMO_Reactor>, notifying a <Reactor>, etc.). In a
+ * multi-threaded application with concurrent consumers, there is no
+ * guarantee that the queue will be still be non-empty by the time
+ * the notification occurs.
+ */
+ int notify (void);
+
+ /// Get/set the notification strategy for the <Message_Queue>
+ ACE_Notification_Strategy *notification_strategy (void);
+ void notification_strategy (ACE_Notification_Strategy *s);
+
+ /// Returns a reference to the lock used by the <ACE_Message_Queue_Ex>.
+ ACE_SYNCH_MUTEX_T &lock (void)
+ {
+ // The Sun Forte 6 (CC 5.1) compiler is only happy if this is in the
+ // header file (j.russell.noseworthy@objectsciences.com)
+ return this->lock_;
+ }
+
+ /// Dump the state of an object.
+ void dump (void) const;
+
+ /// Declare the dynamic allocation hooks.
+ ACE_ALLOC_HOOK_DECLARE;
+
+private:
+ /// Implement this via an <ACE_Message_Queue>.
+ ACE_Message_Queue<ACE_SYNCH> *queue_;
+};
+
#if defined (__ACE_INLINE__)
#include "ace/Message_Queue_T.i"
#endif /* __ACE_INLINE__ */