diff options
author | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-26 17:20:59 +0000 |
---|---|---|
committer | levine <levine@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-26 17:20:59 +0000 |
commit | 5e5d58c08793b701904f7b7b5df3bd1a1e6d963e (patch) | |
tree | 97136f3ba676fc2f9fe5209d4fbf6c10f9c7acb8 /ace/Message_Queue_T.h | |
parent | 878c85fbf7e12f155d9029ddbc5c9826f36fdf61 (diff) | |
download | ATCD-5e5d58c08793b701904f7b7b5df3bd1a1e6d963e.tar.gz |
On VxWorks, added ACE_Message_Queue_Vx to wrap native VxWorks message queues
Diffstat (limited to 'ace/Message_Queue_T.h')
-rw-r--r-- | ace/Message_Queue_T.h | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h new file mode 100644 index 00000000000..cc181c04a99 --- /dev/null +++ b/ace/Message_Queue_T.h @@ -0,0 +1,486 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// ace +// +// = FILENAME +// Message_Queue_T.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (ACE_MESSAGE_QUEUE_T_H) +#define ACE_MESSAGE_QUEUE_T_H + +#include "ace/Synch.h" + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue : public ACE_Message_Queue_Base +{ + // = TITLE + // A threaded message queueing facility, modeled after the + // queueing facilities in System V STREAMs. + // + // = DESCRIPTION + // An <ACE_Message_Queue> is the central queueing facility for + // messages in the ASX framework. If <ACE_SYNCH_DECL> is + // ACE_MT_SYNCH then all operations are thread-safe. Otherwise, + // if it's <ACE_NULL_SYNCH> then there's no locking overhead. +public: + friend class ACE_Message_Queue_Iterator<ACE_SYNCH_USE>; + friend class ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>; + + // = Traits + typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE> ITERATOR; + typedef ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE> REVERSE_ITERATOR; + + // = Initialization and termination methods. + ACE_Message_Queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + // Create a message queue with all the defaults. + virtual int open (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + // Create a message queue with all the defaults. + + virtual int close (void); + // Close down the message queue and release all resources. + + virtual ~ACE_Message_Queue (void); + // Close down the message queue and release all resources. + + // = Enqueue and dequeue methods. + + // For all the following routines if <timeout> == 0, the caller will + // block until action is possible, else will wait until the absolute + // time specified in *<timeout> elapses). These calls will return, + // however, when queue is closed, deactivated, when a signal occurs, + // or if the time specified in timeout elapses, (in which case errno + // = EWOULDBLOCK). + + virtual int peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0); + // Retrieve the first <ACE_Message_Block> without removing it. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int enqueue_prio (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> into the <Message_Queue> in + // accordance with its <msg_priority> (0 is lowest priority). FIFO + // order is maintained when messages of the same priority are + // inserted consecutively. Returns -1 on failure, else the number + // of items still on the queue. + + virtual int enqueue (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // This is an alias for <enqueue_prio>. It's only here for + // backwards compatibility and will go away in a subsequent release. + // Please use <enqueue_prio> instead. + + virtual int enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> at the end of the queue. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // Enqueue an <ACE_Message_Block *> at the head of the queue. + // Returns -1 on failure, else the number of items still on the + // queue. + + virtual int dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. Returns -1 on failure, else the number of items still on + // the queue. + + // = Check if queue is full/empty. + virtual int is_full (void); + // True if queue is full, else false. + virtual int is_empty (void); + // True if queue is empty, else false. + + // = Queue statistic methods. + virtual size_t message_bytes (void); + // Number of total bytes on the queue. + virtual size_t message_count (void); + // Number of total messages on the queue. + + // = Flow control routines + virtual size_t high_water_mark (void); + // Get high watermark. + virtual void high_water_mark (size_t hwm); + // Set high watermark. + virtual size_t low_water_mark (void); + // Get low watermark. + virtual void low_water_mark (size_t lwm); + // Set low watermark. + + // = Activation control methods. + + virtual int deactivate (void); + // Deactivate the queue and wakeup all threads waiting on the queue + // so they can continue. No messages are removed from the queue, + // however. Any other operations called until the queue is + // activated again will immediately return -1 with <errno> == + // ESHUTDOWN. Returns WAS_INACTIVE if queue was inactive before the + // call and WAS_ACTIVE if queue was active before the call. + + virtual int activate (void); + // Reactivate the queue so that threads can enqueue and dequeue + // messages again. Returns WAS_INACTIVE if queue was inactive + // before the call and WAS_ACTIVE if queue was active before the + // call. + + virtual int deactivated (void); + // Returns true if <deactivated_> is enabled. + + // = Notification hook. + + virtual int notify (void); + // This hook is automatically invoked by <enqueue_head>, + // <enqueue_tail>, and <enqueue_prio> when a new item is inserted + // into the queue. Subclasses can override this method to perform + // specific notification strategies (e.g., signaling events for a + // <WFMO_Reactor>, notifying a <Reactor>, etc.). In a + // multi-threaded application with concurrent consumers, there is no + // guarantee that the queue will be still be non-empty by the time + // the notification occurs. + + // = Get/set the notification strategy for the <Message_Queue> + virtual ACE_Notification_Strategy *notification_strategy (void); + virtual void notification_strategy (ACE_Notification_Strategy *s); + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + // = Routines that actually do the enqueueing and dequeueing. + // These routines assume that locks are held by the corresponding + // public methods. Since they are virtual, you can change the + // queueing mechanism by subclassing from <ACE_Message_Queue>. + + virtual int enqueue_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + + virtual int enqueue_tail_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> at the end of the queue. + + virtual int enqueue_head_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> at the head of the queue. + + virtual int dequeue_head_i (ACE_Message_Block *&first_item); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + + // = Check the boundary conditions (assumes locks are held). + virtual int is_full_i (void); + // True if queue is full, else false. + virtual int is_empty_i (void); + // True if queue is empty, else false. + + // = Implementation of the public activate() and deactivate() methods above (assumes locks are held). + virtual int deactivate_i (void); + // Deactivate the queue. + virtual int activate_i (void); + // Activate the queue. + + // = Helper methods to factor out common #ifdef code. + virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv); + // Wait for the queue to become non-full. + + virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, + ACE_Time_Value *tv); + // Wait for the queue to become non-empty. + + virtual int signal_enqueue_waiters (void); + // Inform any threads waiting to enqueue that they can procede. + + virtual int signal_dequeue_waiters (void); + // Inform any threads waiting to dequeue that they can procede. + + ACE_Message_Block *head_; + // Pointer to head of ACE_Message_Block list. + + ACE_Message_Block *tail_; + // Pointer to tail of ACE_Message_Block list. + + size_t low_water_mark_; + // Lowest number before unblocking occurs. + + size_t high_water_mark_; + // Greatest number of bytes before blocking. + + size_t cur_bytes_; + // Current number of bytes in the queue. + + size_t cur_count_; + // Current number of messages in the queue. + + int deactivated_; + // Indicates that the queue is inactive. + + ACE_Notification_Strategy *notification_strategy_; + // The notification strategy used when a new message is enqueued. + + // = Synchronization primitives for controlling concurrent access. + ACE_SYNCH_MUTEX_T lock_; + // Protect queue from concurrent access. + +#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) + ACE_SYNCH_SEMAPHORE_T not_empty_cond_; + // Used to make threads sleep until the queue is no longer empty. + + ACE_SYNCH_SEMAPHORE_T not_full_cond_; + // Used to make threads sleep until the queue is no longer full. + + size_t dequeue_waiters_; + // Number of threads waiting to dequeue a <Message_Block>. + + size_t enqueue_waiters_; + // Number of threads waiting to enqueue a <Message_Block>. +#else + ACE_SYNCH_CONDITION_T not_empty_cond_; + // Used to make threads sleep until the queue is no longer empty. + + ACE_SYNCH_CONDITION_T not_full_cond_; + // Used to make threads sleep until the queue is no longer full. +#endif /* ACE_HAS_OPTIMIZED_MESSAGE_QUEUE */ + +private: + + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &)) +}; + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue_Iterator +{ + // = TITLE + // Iterator for the <ACE_Message_Queue>. +public: + // = Initialization method. + ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); + + // = Iteration methods. + int next (ACE_Message_Block *&entry); + // Pass back the <entry> that hasn't been seen in the queue. + // Returns 0 when all items have been seen, else 1. + + int done (void) const; + // Returns 1 when all items have been seen, else 0. + + int advance (void); + // Move forward by one element in the queue. Returns 0 when all the + // items in the set have been seen, else 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Message_Queue <ACE_SYNCH_USE> &queue_; + // Message_Queue we are iterating over. + + ACE_Message_Block *curr_; + // Keeps track of how far we've advanced... +}; + +template <ACE_SYNCH_DECL> +class ACE_Message_Queue_Reverse_Iterator +{ + // = TITLE + // Reverse Iterator for the <ACE_Message_Queue>. +public: + // = Initialization method. + ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &queue); + + // = Iteration methods. + int next (ACE_Message_Block *&entry); + // Pass back the <entry> that hasn't been seen in the queue. + // Returns 0 when all items have been seen, else 1. + + int done (void) const; + // Returns 1 when all items have been seen, else 0. + + int advance (void); + // Move forward by one element in the queue. Returns 0 when all the + // items in the set have been seen, else 1. + + void dump (void) const; + // Dump the state of an object. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +private: + ACE_Message_Queue <ACE_SYNCH_USE> &queue_; + // Message_Queue we are iterating over. + + ACE_Message_Block *curr_; + // Keeps track of how far we've advanced... +}; + +template <ACE_SYNCH_DECL> +class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> +{ + // = TITLE + // A derived class which adapts the <ACE_Message_Queue> + // class in order to maintain dynamic priorities for enqueued + // <ACE_Message_Blocks> and manage the queue dynamically. + // + // = DESCRIPTION + // Priorities and queue orderings are refreshed at each enqueue + // and dequeue operation. Head and tail enqueue methods were + // made private to prevent out-of-order messages from confusing + // the pending and late portions of the queue. Messages in the + // pending portion of the queue whose dynamic priority becomes + // negative are placed into the late portion of the queue. + // Messages in the late portion of the queue whose dynamic + // priority becomes positive are dropped. These behaviors + // support a limited schedule overrun corresponding to one full + // cycle through dynamic priority values. These behaviors can + // be modified in derived classes by providing alternative + // definitions for the appropriate virtual methods. + // +public: + // = Initialization and termination methods. + ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, + size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + + virtual ~ACE_Dynamic_Message_Queue (void); + // Close down the message queue and release all resources. + + ACE_ALLOC_HOOK_DECLARE; + // Declare the dynamic allocation hooks. + +protected: + virtual int enqueue_i (ACE_Message_Block *new_item); + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all + // enqueued messages. + + virtual int dequeue_head_i (ACE_Message_Block *&first_item); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. + + virtual int refresh_priorities (const ACE_Time_Value & tv); + // Refresh the priorities in the queue according to a specific + // priority assignment function. + + virtual int refresh_queue (const ACE_Time_Value & tv); + // Refresh the order of messages in the queue after refreshing their + // priorities. + + ACE_Message_Block *pending_list_tail_; + // Pointer to tail of the pending messages (those whose priority is + // and has been non-negative) in the <ACE_Message_Block list>. + + ACE_Dynamic_Message_Strategy &message_strategy_; + // Pointer to a dynamic priority evaluation function. + +private: + // = Disallow these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) + + // = These methods can wierdness in dynamically prioritized queue. + + // Disallow their use until and unless a coherent semantics for head + // and tail enqueueing can be identified. + ACE_UNIMPLEMENTED_FUNC (virtual int + enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) + ACE_UNIMPLEMENTED_FUNC (virtual int + enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0)) + + ACE_UNIMPLEMENTED_FUNC (virtual int + peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0)) + // Since messages are *dynamically* prioritized, it is not possible + // to guarantee that the message at the head of the queue when this + // method is called will still be at the head when the next method + // is called: disallow its use until and unless a coherent semantics + // for peeking at the head of the queue can be identified. +}; + +template <ACE_SYNCH_DECL> +class ACE_Export ACE_Message_Queue_Factory +{ + // = TITLE + // ACE_Message_Queue_Factory is a static factory class template which + // provides a separate factory method for each of the major kinds of + // priority based message dispatching: static, earliest deadline first + // (EDF), and minimum laxity first (MLF). + // + // = DESCRIPTION + // The ACE_Dynamic_Message_Queue class assumes responsibility for + // releasing the resources of the strategy with which it was + // constructed: the user of a message queue constructed by + // any of these factory methods is only responsible for + // ensuring destruction of the message queue itself. + +public: + static ACE_Message_Queue<ACE_SYNCH_USE> * + create_static_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0); + // factory method for a statically prioritized ACE_Message_Queue + + static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * + create_deadline_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0, + u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue + + static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * + create_laxity_message_queue (size_t hwm = DEFAULT_HWM, + size_t lwm = DEFAULT_LWM, + ACE_Notification_Strategy * = 0, + u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue +}; + +#if defined (__ACE_INLINE__) +#include "ace/Message_Queue_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Message_Queue_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Message_Queue_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_MESSAGE_QUEUE_T_H */ |