diff options
author | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-30 23:11:15 +0000 |
---|---|---|
committer | cdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-06-30 23:11:15 +0000 |
commit | f0e720811d7cf00b02e81d9153eaec82f47ad2a2 (patch) | |
tree | b45b77cecc23d206426ec9009ce6474bead95fce | |
parent | cb091e2772e88b76edd2749c49ad34c7892ced61 (diff) | |
download | ATCD-f0e720811d7cf00b02e81d9153eaec82f47ad2a2.tar.gz |
Dynamic Priority Queue test
-rw-r--r-- | ChangeLog-98b | 19 | ||||
-rw-r--r-- | ace/Message_Block.cpp | 108 | ||||
-rw-r--r-- | ace/Message_Block.h | 49 | ||||
-rw-r--r-- | ace/Message_Block.i | 2 | ||||
-rw-r--r-- | ace/Message_Queue_T.cpp | 306 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 65 | ||||
-rw-r--r-- | tests/Dynamic_Priority_Test.cpp | 284 | ||||
-rw-r--r-- | tests/Dynamic_Priority_Test.dsp | 73 | ||||
-rw-r--r-- | tests/Makefile | 73 | ||||
-rw-r--r-- | tests/run_tests.bat | 1 | ||||
-rwxr-xr-x | tests/run_tests.sh | 1 | ||||
-rw-r--r-- | tests/tests.dsw | 12 |
12 files changed, 906 insertions, 87 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b index f0da6e5c8d2..e9eb4164fa4 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,3 +1,22 @@ +Tue Jun 30 18:03:15 1998 Chris Gill <cdgill@cs.wustl.edu> + + * ace/Message_Queue_T.{cpp, h} + ace/Message_Block.{cpp, h, i} + + Reorganized ACE_Dynamic_Message_Queue and related classes, + fixed bugs that turned up in testing: it's ready to put + on the road and see how it runs ;-) + + * tests/Dynamic_Priority_Test.{cpp, dsp} + tests/tests.dsw + tests/Makefile + tests/run_tests.{bat, sh} + + Added a test for the static and dynamic (both deadline and laxity + based) message queues, which assigns various message attributes, + pushes messages into the queue, and makes sure the resulting + dequeue order is correct for the given kind of queue. + Tue Jun 30 14:04:42 1998 Steve Huston <shuston@riverace.com> * ace/Message_Queue_T.cpp: Fixed references to an undeclared diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp index e4fb1c960d5..83fe322e9e9 100644 --- a/ace/Message_Block.cpp +++ b/ace/Message_Block.cpp @@ -765,9 +765,23 @@ ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy () } // dtor -/////////////////////////////////////// + +int +ACE_Dynamic_Message_Strategy::drop_message (ACE_Message_Block * &mb) +{ + ACE_UNUSED_ARG (mb); + + return 0; +} + // cleanup policy for a message that is later than can be represented, + // and is being dropped from a dynamic message queue (this is a default + // method definition that does nothing, which derived classes may override + // to do things like deleting the message block object, etc). + + +///////////////////////////////////////// // class ACE_Deadline_Message_Strategy // -/////////////////////////////////////// +///////////////////////////////////////// ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask, u_long static_bit_field_shift, @@ -815,7 +829,9 @@ ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb, if (priority > max_late) { - priority = max_late; + // if the message is later than can be represented, its priority is 0. + mb.msg_priority (0); + return 0; } } else @@ -864,6 +880,45 @@ ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, } // returns true if the message is later than can can be represented +///////////////////////////////////////////////// +// class ACE_Deadline_Cleanup_Message_Strategy // +///////////////////////////////////////////////// + +ACE_Deadline_Cleanup_Message_Strategy:: ACE_Deadline_Cleanup_Message_Strategy ( + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) + + : ACE_Deadline_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset) +{ +} +// ctor + +ACE_Deadline_Cleanup_Message_Strategy::~ACE_Deadline_Cleanup_Message_Strategy () +{ +} +// dtor + +int +ACE_Deadline_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb) +{ + // free the memory + delete mb; + + // zero passed pointer to let caller know it's gone + mb = 0; + + return 0; +} + // deletion cleanup policy for a message that is later than can be + // represented, and is being dropped from a dynamic message queue + /////////////////////////////////////// // class ACE_Laxity_Message_Strategy // /////////////////////////////////////// @@ -916,7 +971,9 @@ ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb, if (priority > max_late) { - priority = max_late; + // if the message is later than can be represented, its priority is 0. + mb.msg_priority (0); + return 0; } } else @@ -966,6 +1023,49 @@ ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, } // returns true if the message is later than can can be represented +///////////////////////////////////////////////// +// class ACE_Laxity_Cleanup_Message_Strategy // +///////////////////////////////////////////////// + +ACE_Laxity_Cleanup_Message_Strategy:: ACE_Laxity_Cleanup_Message_Strategy ( + u_long static_bit_field_mask, + u_long static_bit_field_shift, + u_long pending_threshold, + u_long dynamic_priority_max, + u_long dynamic_priority_offset) + + : ACE_Laxity_Message_Strategy (static_bit_field_mask, + static_bit_field_shift, + pending_threshold, + dynamic_priority_max, + dynamic_priority_offset) +{ +} +// ctor + +ACE_Laxity_Cleanup_Message_Strategy::~ACE_Laxity_Cleanup_Message_Strategy () +{ +} +// dtor + +int +ACE_Laxity_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb) +{ + // free the memory + delete mb; + + // zero passed pointer to let caller know it's gone + mb = 0; + + return 0; +} + // deletion cleanup policy for a message that is later than can be + // represented, and is being dropped from a dynamic message queue + + + + + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) // These specializations aren't needed for the ACE library because // Service_Config.cpp has them: diff --git a/ace/Message_Block.h b/ace/Message_Block.h index 2f708fbc490..6a20e85f332 100644 --- a/ace/Message_Block.h +++ b/ace/Message_Block.h @@ -555,7 +555,7 @@ public: // updates the synamic priority bit field but does not // alter the static priority bit field - int is_pending (const ACE_Message_Block & mb, + int is_pending (const ACE_Message_Block & mb, const ACE_Time_Value & tv); // returns true if the message has a pending (not late) priority value @@ -563,6 +563,12 @@ public: const ACE_Time_Value & tv) = 0; // returns true if the message is later than can can be represented + virtual int drop_message (ACE_Message_Block * &mb); + // cleanup policy for a message that is later than can be represented, + // and is being dropped from a dynamic message queue (this is a default + // method definition that does nothing, which derived classes may override + // to do things like deleting the message block object, etc). + u_long static_bit_field_mask (void); // get static bit field mask @@ -639,6 +645,27 @@ public: // returns true if the message is later than can can be represented }; + +class ACE_Export ACE_Deadline_Cleanup_Message_Strategy : public ACE_Deadline_Message_Strategy +{ +public: + + ACE_Deadline_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // ctor, with all arguments defaulted + + virtual ~ACE_Deadline_Cleanup_Message_Strategy (); + // virtual dtor + + virtual int drop_message (ACE_Message_Block * &mb); + // deletion cleanup policy for a message that is later than can be + // represented, and is being dropped from a dynamic message queue +}; + + class ACE_Export ACE_Laxity_Message_Strategy : public ACE_Dynamic_Message_Strategy { public: @@ -666,6 +693,26 @@ public: }; +class ACE_Export ACE_Laxity_Cleanup_Message_Strategy : public ACE_Laxity_Message_Strategy +{ +public: + + ACE_Laxity_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long pending_threshold = 0x200000UL, // 2^(22-1) + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + // ctor, with all arguments defaulted + + virtual ~ACE_Laxity_Cleanup_Message_Strategy (); + // virtual dtor + + virtual int drop_message (ACE_Message_Block * &mb); + // deletion cleanup policy for a message that is later than can be + // represented, and is being dropped from a dynamic message queue +}; + + #if defined (__ACE_INLINE__) #include "ace/Message_Block.i" #endif /* __ACE_INLINE__ */ diff --git a/ace/Message_Block.i b/ace/Message_Block.i index 62774e6fb7d..a4aae16d557 100644 --- a/ace/Message_Block.i +++ b/ace/Message_Block.i @@ -365,7 +365,7 @@ ACE_Message_Block::locking_strategy (ACE_Lock *nls) } -ACE_INLINE int +ACE_INLINE int ACE_Dynamic_Message_Strategy::is_pending (const ACE_Message_Block & mb, const ACE_Time_Value & tv) { diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index 5fa605ef6d3..1fda2393cb5 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -733,7 +733,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::notify (void) ///////////////////////////////////// // = Initialization and termination methods. -template <ACE_SYNCH_DECL> +template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( ACE_Dynamic_Message_Strategy & message_strategy, size_t hwm, @@ -743,35 +743,46 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( , message_strategy_ (message_strategy) { // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the - // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor + // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor } -template <ACE_SYNCH_DECL> +template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) { delete &message_strategy_; } // dtor: free message strategy and let base class dtor do the rest -template <ACE_SYNCH_DECL> int +template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) { ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); int result = 0; - ACE_Time_Value tv(0); - - // refresh dynamic priority of the new message - result = this->message_strategy_.update_priority (*new_item, tv); // get the current time - ACE_Time_Value current_time = ACE_OS::gettimeofday (); + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh dynamic priority of the new message + result = message_strategy_.update_priority (*new_item, current_time); + if (result < 0) + { + return result; + } // refresh dynamic priorities of messages in the queue - this->refresh_priorities (current_time); + result = this->refresh_priorities (current_time); + if (result < 0) + { + return result; + } // reorganize the queue according to the new priorities - this->refresh_queue (current_time); + result = this->refresh_queue (current_time); + if (result < 0) + { + return result; + } // invoke the base class method result = ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (new_item); @@ -781,19 +792,28 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item // Enqueue an <ACE_Message_Block *> in accordance with its priority. // priority may be *dynamic* or *static* or a combination or *both* // It calls the priority evaluation function passed into the Dynamic - // Message Queue constructor to update the priorities of all enqueued + // Message Queue constructor to update the priorities of all enqueued // messages. -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv) { - ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); + + if (this->deactivated_) + { + errno = ESHUTDOWN; + return -1; + } int result = 0; // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); - + // refresh dynamic priorities of messages in the queue result = this->refresh_priorities (current_time); if (result < 0) @@ -801,35 +821,29 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&fi return result; } - // reorganize the queue according to the new priorities + // reorganize the queue according to the new priorities, + // possibly dropping messages which are later than can + // be represented by the range of priority values result = this->refresh_queue (current_time); if (result < 0) { return result; } - // if there is only one message in the pending list, - // the pending list will be empty after a *successful* - // dequeue operation - int empty_pending = - (ACE_Message_Queue<ACE_SYNCH_USE>::head_ == pending_list_tail_) ? 1 : 0; - - // invoke the base class method - result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item); - - // null out the pending list tail pointer if - // the pending list is now empty - if ((empty_pending) && (result > 0)) + // *now* it's appropriate to wait for an enqueued item + result = this->wait_not_empty_cond (ace_mon, tv); + if (result == -1) { - pending_list_tail_ = 0; + return result; } - return result; + // invoke the internal virtual method + return this->dequeue_head_i (first_item); } // Dequeue and return the <ACE_Message_Block *> at the head of the // queue. -template <ACE_SYNCH_DECL> int +template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) { int result = 0; @@ -839,12 +853,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Val ACE_Message_Block *temp = ACE_Message_Queue<ACE_SYNCH_USE>::head_; while (temp) { - result = this->message_strategy_.update_priority (*temp, tv); + result = message_strategy_.update_priority (*temp, tv); if (result < 0) { break; } - + temp = temp->next (); } @@ -853,58 +867,238 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Val // refresh the priorities in the queue according // to a specific priority assignment function -template <ACE_SYNCH_DECL> int +template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) { - // first, drop any messages from the queue and delete them: - // reference counting at the data block level means that the - // underlying data block will not be deleted if another - // message block is still pointing to it. - ACE_Message_Block *temp = (pending_list_tail_) - ? pending_list_tail_->next () - : ACE_Message_Queue<ACE_SYNCH_USE>::head_; + // Remove messages that are later than the priority range can represent + int result = remove_stale_messages (tv); + if (result < 0) + { + return result; + } + + // Refresh the order of messages in the queue, + // putting pending messages ahead of late messages + return reorder_queue (tv); +} + // refresh the order of messages in the queue + // after refreshing their priorities + + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_stale_messages (const ACE_Time_Value & tv) +{ + int result = 0; - while (temp) + // start at the beginning of the list + ACE_Message_Block *current = head_; + + // maintain a list of dropped messages to + // be appended to the end of the list after + // the sweep is complete + ACE_Message_Block *append_list_head = 0; + ACE_Message_Block *append_list_tail = 0; + + while (current) { // messages that have overflowed the given time bounds must be removed - if (message_strategy_.is_beyond_late (*temp, tv)) + if (message_strategy_.is_beyond_late (*current, tv)) { // find the end of the chain of overflowed messages - ACE_Message_Block *remove_tail = temp; + ACE_Message_Block *remove_head = current; + ACE_Message_Block *remove_tail = current; while ((remove_tail) && (remove_tail->next ()) && message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) { + // extend the chain of messages to be removed remove_tail = remove_tail->next (); } - temp = remove_tail->next (); + // fix up list pointers to bypass the overflowed message chain + if (remove_tail->next ()) { - remove_tail->next ()->prev (0); + remove_tail->next ()->prev (remove_head->prev ()); + } + else + { + tail_ = remove_head->prev (); } - else if (remove_tail->prev ()) + + if (remove_head->prev ()) { - remove_tail->prev ()->next (0); + remove_head->prev ()->next (remove_tail->next ()); } else { - ACE_Message_Queue<ACE_SYNCH_USE>::head_ = 0; - ACE_Message_Queue<ACE_SYNCH_USE>::tail_ = 0; + head_ = remove_tail->next (); } - remove_tail->prev (0); + + // move the current pointer past the end of the chain + current = remove_tail->next (); + + // Cut the chain of overflowed messages out of the list + remove_head->prev (0); remove_tail->next (0); + + // Call strategy's drop_message method on each overflowed message. + // Cannot just delete each message even though reference counting + // at the data bloc level means that the underlying data block will + // not be deleted if another message block is still pointing to it. + // If the entire set of message blocks is known in advance, they may + // be allocated on the stack instead of the heap (to speed performance), + // and the caller *cannot* surrender ownership of the memory to the + // list. Putting this policy in the strategy allows the correct memory + // management scheme to be configured in either case. + ACE_Message_Block *temp1 = remove_head; + ACE_Message_Block *temp2 = remove_head->next (); + ACE_Message_Block *size_temp = 0; + size_t msg_size = 0; + while (temp1) + { + // Make sure to count *all* the bytes in a composite message!!! + for (size_temp = temp1, msg_size = 0; + size_temp != 0; + size_temp = size_temp->cont ()) + { + msg_size += size_temp->size (); + } + + result = message_strategy_.drop_message (temp1); + if (result < 0) + { + return result; + } + + if (temp1) + { + // if the message was not destroyed, zero out its priority and + // put it on the list to append to the back of the queue + temp1->msg_priority (0); + temp1->next (0); + if (append_list_tail) + { + temp1->prev (append_list_tail); + append_list_tail->next (temp1); + } + else + { + temp1->prev (0); + append_list_head = temp1; + } + append_list_tail = temp1; + } + else + { + // if the message was destroyed, decrease the message + // count and byte count in the message queue + this->cur_count_--; + this->cur_bytes_ -= msg_size; + } - temp = remove_tail->next (); + temp1 = temp2; + temp2 = temp2 ? temp2->next () : temp2; + } + } + else + { + current = current->next (); + } + } + // append any saved dropped messages to the end of the queue + if (append_list_tail) + { + if (tail_) + { + tail_->next (append_list_head); + append_list_head->prev (tail_); + tail_ = append_list_tail; } else { - temp = temp->next (); + head_ = append_list_head; + tail_ = append_list_tail; } } + + return result; } - // refresh the order of messages in the queue - // after refreshing their priorities + // Remove messages that are later than the priority range can represent. + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::reorder_queue (const ACE_Time_Value & tv) +{ + // if the queue is not empty, and the first message is late, need to reorder + if ((head_) && (! message_strategy_.is_pending (*head_, tv))) + { + // find the end of the chain of newly late messages + // (since the last time the queue was reordered) + ACE_Message_Block *reorder_head = head_; + ACE_Message_Block *reorder_tail = head_; + while ((reorder_tail) && (reorder_tail->next ()) && + reorder_tail->next ()->msg_priority () <= reorder_head->msg_priority ()) + { + // extend the chain of messages to be removed + reorder_tail = reorder_tail->next (); + } + + // if a proper subset of the queue is out of order, reorganize the queue + if (reorder_tail != tail_) + { + // fix up list pointers to bypass the overflowed message chain + if (reorder_tail->next ()) + { + reorder_tail->next ()->prev (reorder_head->prev ()); + } + else + { + tail_ = reorder_head->prev (); + } + if (reorder_head->prev ()) + { + reorder_head->prev ()->next (reorder_tail->next ()); + } + else + { + head_ = reorder_tail->next (); + } + } + } + + return 0; +} + // Refresh the order of messages in the queue. + + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head ( + ACE_Message_Block *&first_item, + ACE_Time_Value *tv) +{ + return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, tv); +} + // private method to hide public base class method: just calls base class method + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_tail ( + ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + return ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (new_item, timeout); +} + // private method to hide public base class method: just calls base class method + + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head ( + ACE_Message_Block *new_item, + ACE_Time_Value *timeout) +{ + return ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_head (new_item, timeout); +} + // private method to hide public base class method: just calls base class method + ///////////////////////////////////// diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 040c8494957..df7f13d56de 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -359,6 +359,16 @@ class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> // be modified in derived classes by providing alternative // definitions for the appropriate virtual methods. // + // Caveat: the virtual methods enqueue_tail, enqueue_head, + // and peek_dequeue_head were made private, but for some + // compilers it is possible to gain access to these methods + // through nefarious means: achieving this may result in + // highly unpredictable results, as expectations about + // where a message starts or remains between method + // invocations may not hold for a dynamically managed + // message queue. + + public: // = Initialization and termination methods. ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, @@ -369,10 +379,17 @@ public: virtual ~ACE_Dynamic_Message_Queue (void); // Close down the message queue and release all resources. + virtual int dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *timeout = 0); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // queue. Returns -1 on failure, else the number of items still on + // the queue. + ACE_ALLOC_HOOK_DECLARE; // Declare the dynamic allocation hooks. protected: + virtual int enqueue_i (ACE_Message_Block *new_item); // Enqueue an <ACE_Message_Block *> in accordance with its priority. // priority may be *dynamic* or *static* or a combination or *both* @@ -380,10 +397,6 @@ protected: // Message Queue constructor to update the priorities of all // enqueued messages. - virtual int dequeue_head_i (ACE_Message_Block *&first_item); - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. - virtual int refresh_priorities (const ACE_Time_Value & tv); // Refresh the priorities in the queue according to a specific // priority assignment function. @@ -392,6 +405,12 @@ protected: // Refresh the order of messages in the queue after refreshing their // priorities. + virtual int remove_stale_messages (const ACE_Time_Value & tv); + // Remove messages that are later than the priority range can represent. + + virtual int reorder_queue (const ACE_Time_Value & tv); + // Refresh the order of messages in the queue. + ACE_Message_Block *pending_list_tail_; // Pointer to tail of the pending messages (those whose priority is // and has been non-negative) in the <ACE_Message_Block list>. @@ -400,33 +419,29 @@ protected: // Pointer to a dynamic priority evaluation function. private: - // = Disallow these operations. + // = Disallow public access to these operations. + ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) ACE_UNIMPLEMENTED_FUNC (ACE_Dynamic_Message_Queue (const ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> &)) - // = These methods can wierdness in dynamically prioritized queue. - - // Disallow their use until and unless a coherent semantics for head - // and tail enqueueing can be identified. - ACE_UNIMPLEMENTED_FUNC (virtual int - enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0)) - ACE_UNIMPLEMENTED_FUNC (virtual int - enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *timeout = 0)) - - ACE_UNIMPLEMENTED_FUNC (virtual int - peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv = 0)) - // Since messages are *dynamically* prioritized, it is not possible - // to guarantee that the message at the head of the queue when this - // method is called will still be at the head when the next method - // is called: disallow its use until and unless a coherent semantics - // for peeking at the head of the queue can be identified. + // provide definitions for these (just call base class method), + // but make them private so they're not accessible outside the class + + virtual int peek_dequeue_head (ACE_Message_Block *&first_item, + ACE_Time_Value *tv = 0); + // private method to hide public base class method: just calls base class method + + virtual int enqueue_tail (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // private method to hide public base class method: just calls base class method + + virtual int enqueue_head (ACE_Message_Block *new_item, + ACE_Time_Value *timeout = 0); + // private method to hide public base class method: just calls base class method }; template <ACE_SYNCH_DECL> -class ACE_Export ACE_Message_Queue_Factory +class ACE_Message_Queue_Factory { // = TITLE // ACE_Message_Queue_Factory is a static factory class template which diff --git a/tests/Dynamic_Priority_Test.cpp b/tests/Dynamic_Priority_Test.cpp new file mode 100644 index 00000000000..503e165881e --- /dev/null +++ b/tests/Dynamic_Priority_Test.cpp @@ -0,0 +1,284 @@ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// tests +// +// = FILENAME +// Dynamic_Priority_Test.cpp (based on Priority_Buffer_Test.cpp) +// +// = DESCRIPTION +// This is a test to verify and illustrate the static and dynamic +// priority mechanisms of the ACE_Message_Queue class and the +// ACE_Dynamic_Message_Queue class. As in the Priority_Buffer_Test, +// a producer generates messages and enqueues them, and a consumer +// dequeues them and checks their ordering. +// +// In these tests, every effort is made to ensure that there is plenty +// of time for the messages to be enqueued and dequeued, with messages +// that *should* meet their deadlines actually meeting them, +// while messages that should miss their deadlines are delayed +// so that they actually miss them. It is, however, remotely +// possible that this test could yield a false negative: +// the dynamic queues could work correctly but due to timing +// variations the test could indicate failure. +// +// Three message queues are obtained from the message queue factory, +// one static, two dynamic (one deadline based, and one laxity based) +// and the same supplier behavior is used each time: the messages +// are preallocated and their static information valued, the current +// time is obtained and deadlines are set, with half of the messages +// given late deadlines, and the other half of the messages given +// reachable deadlines. The producer then immediately enqueues all +// messages. +// +// In each test, the consumer is passed the filled queue and a string with +// the expected order in which the messages should dequeue. +// +// = AUTHOR +// Chris Gill +// +// ============================================================================ + +#include "ace/Message_Queue.h" +#include "ace/Thread_Manager.h" +#include "test_config.h" + +#if defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 +USELIB("..\ace\aced.lib"); +//--------------------------------------------------------------------------- +#endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */ + +// structure used to pass arguments to test functions + +struct ArgStruct +{ + ACE_Message_Queue<ACE_MT_SYNCH> *queue_; + const char *order_string_; + ACE_Message_Block **array_; +}; + +// order in which messages are sent +static const char send_order [] = "abcdefghijklmnop"; + +// order in which messages are received with static prioritization +static const char static_receipt_order [] = "ponmlkjihgfedcba"; + +// order in which messages are received with deadline prioritization +static const char deadline_receipt_order [] = "hgfedcbaponmlkji"; + +// order in which messages are received with laxity prioritization +static const char laxity_receipt_order [] = "hfgedbcapnomljki"; + +// fast and slow execution time values (sec, usec) +static const ACE_Time_Value fast_execution (0, 10); +static const ACE_Time_Value slow_execution (0, 100); + +// Make the queue be capable of being *very* large. +static const long max_queue = LONG_MAX; + +// The consumer dequeues a message from the passed Message_Queue, +// and checks its data character against the passed string of characters +// which has the expected ordering. The supplier and consumer do not +// allocate or deallocate messages, to avoid timing delays and timing +// jitter in the test: the main function is responsible for all +// initialization allocation and cleanup before, between, and after +// (but not during) the transfer of messages from the supplier to the +// consumer. + +static void * +consumer (void * args) +{ + ACE_ASSERT (args != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; + const char *receipt_order = ((ArgStruct *) args)->order_string_; + + ACE_ASSERT (receipt_order != 0); + ACE_ASSERT (msg_queue != 0); + + u_int local_count = 0; + + // Keep looping, reading a message out of the queue, until we + // reach the end of the receipt order string, which signals us to quit. + for (const char *expected = receipt_order; *expected != '\0'; ++expected) + { + ACE_Message_Block *mb = 0; + + int result = msg_queue->dequeue_head (mb); + + if (result == -1) + break; + + local_count++; + + ACE_ASSERT (*expected == *mb->rd_ptr ()); + } + + ACE_ASSERT (local_count == ACE_OS::strlen (receipt_order)); + return 0; +} + +// The producer runs through the passed send string, setting the read +// pointer of the current message to the current character position in +// the string, and then queueing the message in the message list, where +// it is removed by the consumer thread. + +static void * +producer (void *args) +{ + ACE_ASSERT (args != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; + const char *send_order = ((ArgStruct *) args)->order_string_; + ACE_Message_Block **block_array = ((ArgStruct *) args)->array_; + + ACE_ASSERT (send_order != 0); + ACE_ASSERT (block_array != 0); + + // iterate through the send order string and the message block array, + // setting the current message block's read pointer to the current + // position in the send order string. + int local_count; + const char *c; + for (local_count = 0, c = send_order; *c != '\0'; ++local_count, ++c) + { + // point to the current message block + ACE_Message_Block *mb = block_array [local_count]; + ACE_ASSERT (mb != 0); + + // Set the current send character in the current message block + // at its read pointer position, and adjust the write pointer + *mb->rd_ptr () = *c; + mb->wr_ptr (1); + + // Enqueue the message block in priority order. + if (msg_queue->enqueue_prio (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("(%t) %p\n"), ASYS_TEXT ("put_next")), 0); + } + + return 0; +} + + +int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_order, const char *receipt_order) +{ + u_int i = 0; + u_int array_size = ACE_OS::strlen (send_order); + + ACE_ASSERT (msg_queue != 0); + ACE_ASSERT (send_order != 0); + ACE_ASSERT (receipt_order != 0); + ACE_ASSERT (ACE_OS::strlen (send_order) == ACE_OS::strlen (receipt_order)); + + ArgStruct supplier_args, consumer_args; + + supplier_args.queue_ = msg_queue; + supplier_args.order_string_ = send_order; + + // allocate message blocks, fill in pointer array, set static information + ACE_NEW_RETURN (supplier_args.array_, ACE_Message_Block * [array_size], -1); + for (i = 0; i < array_size; ++i) + { + // construct a message new block off the heap, to hold a single character + ACE_NEW_RETURN (supplier_args.array_[i], ACE_Message_Block (1), -1); + + // assign static (minimal) message priority in ascending order + supplier_args.array_[i]->msg_priority (i); + + // assign every other message short or long execution time + supplier_args.array_[i]->msg_execution_time (((i % 2) ? slow_execution : fast_execution)); + } + + consumer_args.queue_ = msg_queue; + consumer_args.order_string_ = receipt_order; + consumer_args.array_ = 0; + + // Construct pending and late absolute deadline times. + + ACE_Time_Value current_time (0, 0); + ACE_Time_Value future_deadline (1, 0); + ACE_Time_Value near_deadline (0, 500000); + ACE_Time_Value recent_deadline (0, -1); + ACE_Time_Value past_deadline (0, -500000); + + current_time = ACE_OS::gettimeofday (); + + future_deadline += current_time; + near_deadline += current_time; + recent_deadline += current_time; + past_deadline += current_time; + + // Set absolute time of deadline associated with the message. + for (i = 0; i < array_size; ++i) + { + switch ((4*i)/array_size) + { + case 0: + supplier_args.array_[i]->msg_deadline_time (future_deadline); + break; + + case 1: + supplier_args.array_[i]->msg_deadline_time (near_deadline); + break; + + case 2: + supplier_args.array_[i]->msg_deadline_time (recent_deadline); + break; + + case 3: + supplier_args.array_[i]->msg_deadline_time (past_deadline); + break; + + // should never reach here, but its better to make sure + default: + ACE_ASSERT ((4*i)/array_size < 4); + break; + } + } + + // run the producer + producer (&supplier_args); + + // run the consumer + consumer (&consumer_args); + + // free all the allocated message blocks + for (i = 0; i < array_size; ++i) + { + delete supplier_args.array_[i]; + } + + // free the allocated pointer array + delete [] supplier_args.array_; + + return 0; +} + + +int +main (int, ASYS_TCHAR *[]) +{ + ACE_START_TEST (ASYS_TEXT ("Dynamic_Priority_Test")); + + ACE_Message_Queue<ACE_MT_SYNCH> *test_queue = 0; + + test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue (max_queue); + run_test (test_queue, send_order, static_receipt_order); + delete test_queue; + + test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_message_queue (max_queue); + run_test (test_queue, send_order, deadline_receipt_order); + delete test_queue; + + test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_message_queue (max_queue); + run_test (test_queue, send_order, laxity_receipt_order); + delete test_queue; + + ACE_END_TEST; + return 0; +} + + + diff --git a/tests/Dynamic_Priority_Test.dsp b/tests/Dynamic_Priority_Test.dsp new file mode 100644 index 00000000000..fb6ac298771 --- /dev/null +++ b/tests/Dynamic_Priority_Test.dsp @@ -0,0 +1,73 @@ +# Microsoft Developer Studio Project File - Name="Dynamic_Priority_Test" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 5.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Console Application" 0x0103
+
+CFG=Dynamic_Priority_Test - Win32 Debug
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "Dynamic_Priority_Test.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "Dynamic_Priority_Test.mak"\
+ CFG="Dynamic_Priority_Test - Win32 Debug"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "Dynamic_Priority_Test - Win32 Debug" (based on\
+ "Win32 (x86) Console Application")
+!MESSAGE
+
+# Begin Project
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+RSC=rc.exe
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "."
+# PROP Intermediate_Dir ".\DLL\Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /W3 /Gm /GX /Zi /Od /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /D "_MBCS" /YX /FD /c
+# ADD CPP /nologo /MDd /W3 /Gm /GX /Zi /Od /I "..\\" /D "WIN32" /D "_DEBUG" /D "_CONSOLE" /FD /c
+# SUBTRACT CPP /YX
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /d "_DEBUG"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib user32.lib gdi32.lib winspool.lib comdlg32.lib advapi32.lib shell32.lib ole32.lib oleaut32.lib uuid.lib odbc32.lib odbccp32.lib /nologo /subsystem:console /debug /machine:I386 /pdbtype:sept
+# ADD LINK32 aced.lib /nologo /subsystem:console /debug /machine:I386 /libpath:"..\ace"
+# SUBTRACT LINK32 /pdbtype:<none>
+# Begin Target
+
+# Name "Dynamic_Priority_Test - Win32 Debug"
+# Begin Group "Source Files"
+
+# PROP Default_Filter ".cpp .i"
+# Begin Source File
+
+SOURCE=.\Dynamic_Priority_Test.cpp
+# End Source File
+# End Group
+# Begin Group "Header Files"
+
+# PROP Default_Filter ".h"
+# Begin Source File
+
+SOURCE=.\test_config.h
+# End Source File
+# End Group
+# End Target
+# End Project
diff --git a/tests/Makefile b/tests/Makefile index 26a72f9c27e..42372936de3 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -35,6 +35,7 @@ BIN = Aio_Platform_Test \ Process_Mutex_Test \ Process_Strategy_Test \ Priority_Buffer_Test \ + Dynamic_Priority_Test \ Priority_Task_Test \ Priority_Reactor_Test \ Pipe_Test \ @@ -1553,6 +1554,78 @@ endif $(ACE_ROOT)/ace/Message_Queue.i \ $(ACE_ROOT)/ace/Thread_Manager.h \ $(ACE_ROOT)/ace/Thread_Manager.i test_config.h +.obj/Dynamic_Priority_Test.o .obj/Dynamic_Priority_Test.so .shobj/Dynamic_Priority_Test.o .shobj/Dynamic_Priority_Test.so: Dynamic_Priority_Test.cpp \ + $(ACE_ROOT)/ace/Message_Queue.h \ + $(ACE_ROOT)/ace/Message_Block.h \ + $(ACE_ROOT)/ace/ACE.h \ + $(ACE_ROOT)/ace/OS.h \ + $(ACE_ROOT)/ace/inc_user_config.h \ + $(ACE_ROOT)/ace/config.h \ + $(ACE_ROOT)/ace/streams.h \ + $(ACE_ROOT)/ace/Basic_Types.h \ + $(ACE_ROOT)/ace/Basic_Types.i \ + $(ACE_ROOT)/ace/OS.i \ + $(ACE_ROOT)/ace/Trace.h \ + $(ACE_ROOT)/ace/Log_Msg.h \ + $(ACE_ROOT)/ace/Log_Record.h \ + $(ACE_ROOT)/ace/ACE.i \ + $(ACE_ROOT)/ace/Log_Priority.h \ + $(ACE_ROOT)/ace/Log_Record.i \ + $(ACE_ROOT)/ace/Malloc.h \ + $(ACE_ROOT)/ace/Malloc.i \ + $(ACE_ROOT)/ace/Malloc_T.h \ + $(ACE_ROOT)/ace/Synch.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.h \ + $(ACE_ROOT)/ace/SV_Semaphore_Simple.i \ + $(ACE_ROOT)/ace/SV_Semaphore_Complex.i \ + $(ACE_ROOT)/ace/Synch.i \ + $(ACE_ROOT)/ace/Synch_T.h \ + $(ACE_ROOT)/ace/Event_Handler.h \ + $(ACE_ROOT)/ace/Event_Handler.i \ + $(ACE_ROOT)/ace/Synch_T.i \ + $(ACE_ROOT)/ace/Thread.h \ + $(ACE_ROOT)/ace/Thread.i \ + $(ACE_ROOT)/ace/Atomic_Op.i \ + $(ACE_ROOT)/ace/Free_List.h \ + $(ACE_ROOT)/ace/Free_List.i \ + $(ACE_ROOT)/ace/Malloc_T.i \ + $(ACE_ROOT)/ace/Memory_Pool.h \ + $(ACE_ROOT)/ace/Signal.h \ + $(ACE_ROOT)/ace/Containers.h \ + $(ACE_ROOT)/ace/Containers.i \ + $(ACE_ROOT)/ace/Signal.i \ + $(ACE_ROOT)/ace/Object_Manager.h \ + $(ACE_ROOT)/ace/Object_Manager.i \ + $(ACE_ROOT)/ace/Managed_Object.h \ + $(ACE_ROOT)/ace/Managed_Object.i \ + $(ACE_ROOT)/ace/Mem_Map.h \ + $(ACE_ROOT)/ace/Mem_Map.i \ + $(ACE_ROOT)/ace/Memory_Pool.i \ + $(ACE_ROOT)/ace/Message_Block.i \ + $(ACE_ROOT)/ace/IO_Cntl_Msg.h \ + $(ACE_ROOT)/ace/Strategies.h \ + $(ACE_ROOT)/ace/Strategies_T.h \ + $(ACE_ROOT)/ace/Service_Config.h \ + $(ACE_ROOT)/ace/Service_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.h \ + $(ACE_ROOT)/ace/Shared_Object.i \ + $(ACE_ROOT)/ace/Service_Object.i \ + $(ACE_ROOT)/ace/Service_Config.i \ + $(ACE_ROOT)/ace/Reactor.h \ + $(ACE_ROOT)/ace/Handle_Set.h \ + $(ACE_ROOT)/ace/Handle_Set.i \ + $(ACE_ROOT)/ace/Timer_Queue.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.h \ + $(ACE_ROOT)/ace/Timer_Queue_T.i \ + $(ACE_ROOT)/ace/Reactor.i \ + $(ACE_ROOT)/ace/Reactor_Impl.h \ + $(ACE_ROOT)/ace/Svc_Conf_Tokens.h \ + $(ACE_ROOT)/ace/Synch_Options.h \ + $(ACE_ROOT)/ace/Hash_Map_Manager.h \ + $(ACE_ROOT)/ace/Message_Queue.i \ + $(ACE_ROOT)/ace/Thread_Manager.h \ + $(ACE_ROOT)/ace/Thread_Manager.i test_config.h .obj/Priority_Task_Test.o .obj/Priority_Task_Test.so .shobj/Priority_Task_Test.o .shobj/Priority_Task_Test.so: Priority_Task_Test.cpp \ $(ACE_ROOT)/ace/Task.h \ $(ACE_ROOT)/ace/Service_Object.h \ diff --git a/tests/run_tests.bat b/tests/run_tests.bat index 47edfb3e66d..3f72acfe732 100644 --- a/tests/run_tests.bat +++ b/tests/run_tests.bat @@ -43,6 +43,7 @@ call %0 %dopure% Notify_Performance_Test call %0 %dopure% OrdMultiSet_Test call %0 %dopure% Pipe_Test call %0 %dopure% Priority_Buffer_Test +call %0 %dopure% Dynamic_Priority_Test call %0 %dopure% Priority_Reactor_Test call %0 %dopure% Priority_Task_Test call %0 %dopure% Process_Mutex_Test diff --git a/tests/run_tests.sh b/tests/run_tests.sh index cd66c79d4f1..39b86e789e8 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -115,6 +115,7 @@ run UPIPE_SAP_Test # uses UPIPE, Thread, Thread_Manager run Barrier_Test # uses Service_Config, Barrier run Buffer_Stream_Test # uses Service_Config, Module (Stream,Task, Message_Queue) run Priority_Buffer_Test # uses Service_Config, Message_Queue +run Dynamic_Priority_Test # uses ACE_Message_Queue, ACE_Dynamic_Message_Queue run Recursive_Mutex_Test # uses Service_Config, Recursive_Thread_Mutex test $chorus || test $LynxOS || run Time_Service_Test # uses libnetsvcs diff --git a/tests/tests.dsw b/tests/tests.dsw index cbe36d57073..379eae38b4d 100644 --- a/tests/tests.dsw +++ b/tests/tests.dsw @@ -63,6 +63,18 @@ Package=<4> ###############################################################################
+Project: "Dynamic_Priority_Test"=.\Dynamic_Priority_Test.dsp - Package Owner=<4>
+
+Package=<5>
+{{{
+}}}
+
+Package=<4>
+{{{
+}}}
+
+###############################################################################
+
Project: "Enum_Interfaces_Test"=.\Enum_Interfaces_Test.dsp - Package Owner=<4>
Package=<5>
|