diff options
-rw-r--r-- | ChangeLog-98b | 21 | ||||
-rw-r--r-- | ace/Message_Block.cpp | 287 | ||||
-rw-r--r-- | ace/Message_Block.h | 150 | ||||
-rw-r--r-- | ace/Message_Block.i | 131 | ||||
-rw-r--r-- | ace/Message_Queue_T.cpp | 1033 | ||||
-rw-r--r-- | ace/Message_Queue_T.h | 187 | ||||
-rw-r--r-- | tests/Dynamic_Priority_Test.cpp | 488 |
7 files changed, 1544 insertions, 753 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b index 45ef3f5344b..7f5070d5896 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,3 +1,24 @@ +Mon Jul 13 16:35:50 1998 Chris Gill <cdgill@cs.wustl.edu> + + * ace/Message_Block.{cpp, h, i} + ace/Message_Queue_T.{cpp, h}: + + Removed automatic deletion of beyond messages, replaced this + with a remove_messages method to be called by an external + "reaper" if at all. Reorganized dynamic message queues + to remove sources of overhead, especially in checking + message status while refreshing the queue: added separate + head and tail pointers for pending, late, and beyond late + protions of queue: only move these pointers, not messages + (except at enqueue). + + * tests/Dynamic_Priority_Test.cpp: + + Added performance tests for static and dynamic queues + which do best case, worst case, and randomized ordering + of messages, presenting each ordering to all queues + and clocking enqueue and dequeue performance. + Mon Jul 13 11:11:56 1998 Nanbor Wang <nanbor@cs.wustl.edu> * ace/Svc_Handler.h (ACE_SYNCH_USE>): Changed type of member diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp index b8a9f2fd5da..de2cbdaf666 100644 --- a/ace/Message_Block.cpp +++ b/ace/Message_Block.cpp @@ -750,14 +750,15 @@ ACE_Message_Block::operator= (const ACE_Message_Block &) ACE_Dynamic_Message_Strategy::ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) : static_bit_field_mask_ (static_bit_field_mask) , static_bit_field_shift_ (static_bit_field_shift) - , pending_threshold_ (pending_threshold) , dynamic_priority_max_ (dynamic_priority_max) , dynamic_priority_offset_ (dynamic_priority_offset) + , max_late_ (0, dynamic_priority_offset - 1) + , min_pending_ (0, dynamic_priority_offset) + , pending_shift_ (0, dynamic_priority_max) { } // ctor @@ -767,18 +768,36 @@ ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy () } // dtor - -int -ACE_Dynamic_Message_Strategy::drop_message (ACE_Message_Block * &mb) +void +ACE_Dynamic_Message_Strategy::dump (void) const { - ACE_UNUSED_ARG (mb); + ACE_TRACE ("ACE_Dynamic_Message_Strategy::dump"); - return 0; + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("static_bit_field_mask_ = %lu\n") + ASYS_TEXT ("static_bit_field_shift_ = %lu\n") + ASYS_TEXT ("dynamic_priority_max_ = %lu\n") + ASYS_TEXT ("dynamic_priority_offset_ = %lu\n") + ASYS_TEXT ("max_late_ = [%ld sec, %ld usec]\n") + ASYS_TEXT ("min_pending_ = [%ld sec, %ld usec]\n") + ASYS_TEXT ("pending_shift_ = [%ld sec, %ld usec]\n"), + this->static_bit_field_mask_, + this->static_bit_field_shift_, + this->dynamic_priority_max_, + this->dynamic_priority_offset_, + this->max_late_.sec (), + this->max_late_.usec (), + this->min_pending_.sec (), + this->min_pending_.usec (), + this->pending_shift_.sec (), + this->pending_shift_.usec ())); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } - // cleanup policy for a message that is later than can be represented, - // and is being dropped from a dynamic message queue (this is a default - // method definition that does nothing, which derived classes may override - // to do things like deleting the message block object, etc). + // Dump the state of the strategy. + ///////////////////////////////////////// @@ -787,12 +806,10 @@ ACE_Dynamic_Message_Strategy::drop_message (ACE_Message_Block * &mb) ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) : ACE_Dynamic_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset) { @@ -804,122 +821,23 @@ ACE_Deadline_Message_Strategy::~ACE_Deadline_Message_Strategy () } // dtor -int -ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb, - const ACE_Time_Value & tv) -{ - // The general formula for this deadline based dynamic priority - // function is to just subtract the current time and the execution - // time from the from the message deadline to get the time to deadline, - // then subtract the time to deadline from a constant C that depends on - // whether the time to deadline is negative (C is zero) or non-negative - // (C is the maximum allowed priority). But, to save operations for - // performance we use an optimized (albeit confusing: our apologies ;-) - // formula for the dynamic priority calculation. - - // first, compute the *negative* (additive inverse) of the time to deadline - ACE_Time_Value priority (tv); - priority -= mb.msg_deadline_time (); - - if (priority >= ACE_Time_Value::zero) - { - // if negative time to deadline is positive then the message is late: - // need to make sure the priority stays below the threshold - // between pending and late priority values - ACE_Time_Value - max_late (0, dynamic_priority_offset_ - 1); - - if (priority > max_late) - { - // if the message is later than can be represented, its priority is 0. - mb.msg_priority (0); - return 0; - } - } - else - { - // if negative time to deadline is negative then the message is pending: - // so, we need to shift priority upward by adding the maximum priority - // value and then make sure the value stays above the threshold between - // pending and late message priorities. - priority += - ACE_Time_Value (0, dynamic_priority_max_); - - ACE_Time_Value - min_pending (0, dynamic_priority_offset_); - - if (priority < min_pending) - { - priority = min_pending; - } - } - - // use (fast) bitwise operators to isolate and replace - // the dynamic portion of the message's priority - mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) | - ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) << - static_bit_field_shift_)); - - return 0; -} - // priority evaluation function based on time to deadline -int -ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, - const ACE_Time_Value & tv) +void +ACE_Deadline_Message_Strategy::dump (void) const { - // first, compute the *negative* time to deadline - ACE_Time_Value priority (tv); - priority -= mb.msg_deadline_time (); + ACE_TRACE ("ACE_Deadline_Message_Strategy::dump"); - // construct a time value with the maximum late value that - // can be represented in the dynamic priority range - ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1); - - // if negative time to deadline is greater than the maximum value - // that can be represented, it is identified as being beyond late - return (priority > max_late) ? 1 : 0; -} - // returns true if the message is later than can can be represented - -///////////////////////////////////////////////// -// class ACE_Deadline_Cleanup_Message_Strategy // -///////////////////////////////////////////////// - -ACE_Deadline_Cleanup_Message_Strategy:: ACE_Deadline_Cleanup_Message_Strategy ( - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) - - : ACE_Deadline_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset) -{ -} -// ctor - -ACE_Deadline_Cleanup_Message_Strategy::~ACE_Deadline_Cleanup_Message_Strategy () -{ -} -// dtor + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); -int -ACE_Deadline_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb) -{ - // free the memory - delete mb; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Dynamic_Message_Strategy base class: \n"))); + this->ACE_Dynamic_Message_Strategy::dump (); - // zero passed pointer to let caller know it's gone - mb = 0; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nderived class: ACE_Deadline_Message_Strategy\n"))); - return 0; + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } - // deletion cleanup policy for a message that is later than can be - // represented, and is being dropped from a dynamic message queue + // Dump the state of the strategy. + /////////////////////////////////////// // class ACE_Laxity_Message_Strategy // @@ -927,12 +845,10 @@ ACE_Deadline_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb) ACE_Laxity_Message_Strategy::ACE_Laxity_Message_Strategy (u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) : ACE_Dynamic_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset) { @@ -945,126 +861,21 @@ ACE_Laxity_Message_Strategy::~ACE_Laxity_Message_Strategy () // dtor -int -ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb, - const ACE_Time_Value & tv) +void +ACE_Laxity_Message_Strategy::dump (void) const { - // The general formula for this laxity based dynamic priority - // function is to just subtract the current time and the execution - // time from the from the message deadline to get the laxity, - // then subtract the laxity from a constant C that depends on whether - // the laxity is negative (C is zero) or non-negative (C is the maximum - // allowed priority). But, to save operations for performance we use - // an optimized (albeit confusing: our apologies ;-) formula - // for the dynamic priority calculation. - - // first, compute the *negative* laxity - ACE_Time_Value priority (tv); - priority += mb.msg_execution_time (); - priority -= mb.msg_deadline_time (); - - if (priority >= ACE_Time_Value::zero) - { - // if negative laxity is positive then the message is late: - // need to make sure the priority stays below the threshold - // between pending and late priority values - ACE_Time_Value - max_late (0, dynamic_priority_offset_ - 1); - - if (priority > max_late) - { - // if the message is later than can be represented, its priority is 0. - mb.msg_priority (0); - return 0; - } - } - else - { - // if negative laxity is negative then the message is pending: so, we - // need to shift priority upward by adding the maximum priority value - // and then make sure the value stays above the threshold between - // pending and late message priorities. - priority += - ACE_Time_Value (0, dynamic_priority_max_); - - ACE_Time_Value - min_pending (0, dynamic_priority_offset_); - - if (priority < min_pending) - { - priority = min_pending; - } - } - - // use (fast) bitwise operators to isolate and replace - // the dynamic portion of the message's priority - mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) | - ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) << - static_bit_field_shift_)); + ACE_TRACE ("ACE_Laxity_Message_Strategy::dump"); - return 0; -} - // priority evaluation function based on laxity - -int -ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb, - const ACE_Time_Value & tv) -{ - // first, compute the *negative* laxity - ACE_Time_Value priority (tv); - priority += mb.msg_execution_time (); - priority -= mb.msg_deadline_time (); - - // construct a time value with the maximum late value that - // can be represented in the dynamic priority range - ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1); - - // if negative laxity is greater than the maximum value that - // can be represented, it is identified as being beyond late - return (priority > max_late) ? 1 : 0; -} - // returns true if the message is later than can can be represented - -///////////////////////////////////////////////// -// class ACE_Laxity_Cleanup_Message_Strategy // -///////////////////////////////////////////////// - -ACE_Laxity_Cleanup_Message_Strategy:: ACE_Laxity_Cleanup_Message_Strategy ( - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) - - : ACE_Laxity_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset) -{ -} -// ctor - -ACE_Laxity_Cleanup_Message_Strategy::~ACE_Laxity_Cleanup_Message_Strategy () -{ -} -// dtor + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); -int -ACE_Laxity_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb) -{ - // free the memory - delete mb; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Dynamic_Message_Strategy base class: \n"))); + this->ACE_Dynamic_Message_Strategy::dump (); - // zero passed pointer to let caller know it's gone - mb = 0; + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nderived class: ACE_Laxity_Message_Strategy\n"))); - return 0; + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); } - // deletion cleanup policy for a message that is later than can be - // represented, and is being dropped from a dynamic message queue - - + // Dump the state of the strategy. diff --git a/ace/Message_Block.h b/ace/Message_Block.h index 6a20e85f332..26fd9dbaaa7 100644 --- a/ace/Message_Block.h +++ b/ace/Message_Block.h @@ -539,9 +539,18 @@ class ACE_Export ACE_Dynamic_Message_Strategy // class memebers before using the static member functions. public: + enum Priority_Status + { + PENDING = 0x01, // message can still make its deadline + LATE = 0x02, // message cannot make its deadline + BEYOND_LATE = 0x04, // message is so late its priority is undefined + ANY_STATUS = 0x07 // mask to match any priority status + }; + // message priority status: values are defined as bit flags + // so that status combinations may be specified easily. + ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset); // ctor @@ -549,25 +558,9 @@ public: virtual ~ACE_Dynamic_Message_Strategy (); // virtual dtor - virtual int update_priority (ACE_Message_Block & mb, - const ACE_Time_Value & tv) = 0; - // abstract dynamic priority evaluation function: - // updates the synamic priority bit field but does not - // alter the static priority bit field - - int is_pending (const ACE_Message_Block & mb, - const ACE_Time_Value & tv); - // returns true if the message has a pending (not late) priority value - - virtual int is_beyond_late (const ACE_Message_Block & mb, - const ACE_Time_Value & tv) = 0; - // returns true if the message is later than can can be represented - - virtual int drop_message (ACE_Message_Block * &mb); - // cleanup policy for a message that is later than can be represented, - // and is being dropped from a dynamic message queue (this is a default - // method definition that does nothing, which derived classes may override - // to do things like deleting the message block object, etc). + Priority_Status priority_status (ACE_Message_Block & mb, + const ACE_Time_Value & tv); + // updates the message's priority and returns its priority status u_long static_bit_field_mask (void); // get static bit field mask @@ -581,12 +574,6 @@ public: void static_bit_field_shift (u_long); // set left shift value to make room for static bit field - u_long pending_threshold (void); - // get pending threshold priority value - - void pending_threshold (u_long); - // set pending threshold priority value - u_long dynamic_priority_max (void); // get maximum supported priority value @@ -594,13 +581,20 @@ public: // set maximum supported priority value u_long dynamic_priority_offset (void); - // get axis shift to map signed range into unsigned range + // get offset to boundary between signed range and unsigned range void dynamic_priority_offset (u_long); - // set axis shift to map signed range into unsigned range + // set offset to boundary between signed range and unsigned range + + virtual void dump (void) const; + // Dump the state of the strategy. protected: + virtual void convert_priority (ACE_Time_Value & priority, + const ACE_Message_Block & mb) = 0; + // hook method for dynamic priority conversion + u_long static_bit_field_mask_; // this is a bit mask with all ones in the static bit field @@ -609,60 +603,41 @@ protected: // field: this value should be the logarithm base 2 of // (static_bit_field_mask_ + 1) - u_long pending_threshold_; - // threshold priority value below which a message is considered late - u_long dynamic_priority_max_; // maximum supported priority value u_long dynamic_priority_offset_; - // axis shift added to all values, in order to map signed - // range into unsigned range (priority is an unsigned value). + // offset to boundary between signed range and unsigned range + + ACE_Time_Value max_late_; + // maximum late time value that can be represented + + ACE_Time_Value min_pending_; + // minimum pending time value that can be represented + + ACE_Time_Value pending_shift_; + // time value by which to shift pending priority }; class ACE_Export ACE_Deadline_Message_Strategy : public ACE_Dynamic_Message_Strategy { public: - ACE_Deadline_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + ACE_Deadline_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) // ctor, with all arguments defaulted virtual ~ACE_Deadline_Message_Strategy (); // virtual dtor - virtual int update_priority (ACE_Message_Block & mb, - const ACE_Time_Value & tv); - // dynamic priority evaluation function based on time to - // deadline: updates the synamic priority bit field but - // does not alter the static priority bit field + virtual void convert_priority (ACE_Time_Value & priority, + const ACE_Message_Block & mb); + // dynamic priority conversion function based on time to deadline - int is_beyond_late (const ACE_Message_Block & mb, - const ACE_Time_Value & tv); - // returns true if the message is later than can can be represented -}; - - -class ACE_Export ACE_Deadline_Cleanup_Message_Strategy : public ACE_Deadline_Message_Strategy -{ -public: - - ACE_Deadline_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // ctor, with all arguments defaulted - - virtual ~ACE_Deadline_Cleanup_Message_Strategy (); - // virtual dtor - - virtual int drop_message (ACE_Message_Block * &mb); - // deletion cleanup policy for a message that is later than can be - // represented, and is being dropped from a dynamic message queue + virtual void dump (void) const; + // Dump the state of the strategy. }; @@ -670,49 +645,26 @@ class ACE_Export ACE_Laxity_Message_Strategy : public ACE_Dynamic_Message_Strate { public: - ACE_Laxity_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) + ACE_Laxity_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 + u_long static_bit_field_shift = 10, // 10 low order bits + u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 + u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) // ctor, with all arguments defaulted virtual ~ACE_Laxity_Message_Strategy (); // virtual dtor - virtual int update_priority (ACE_Message_Block & mb, - const ACE_Time_Value & tv); - // dynamic priority evaluation function based on laxity - // (time to deadline minus execution time): updates the - // dynamic priority bit field but does not alter the - // static priority bit field - - int is_beyond_late (const ACE_Message_Block & mb, - const ACE_Time_Value & tv); - // returns true if the message is later than can can be represented -}; - - -class ACE_Export ACE_Laxity_Cleanup_Message_Strategy : public ACE_Laxity_Message_Strategy -{ -public: - - ACE_Laxity_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // ctor, with all arguments defaulted - virtual ~ACE_Laxity_Cleanup_Message_Strategy (); - // virtual dtor + virtual void convert_priority (ACE_Time_Value & priority, + const ACE_Message_Block & mb); + // dynamic priority conversion function based on laxity - virtual int drop_message (ACE_Message_Block * &mb); - // deletion cleanup policy for a message that is later than can be - // represented, and is being dropped from a dynamic message queue + virtual void dump (void) const; + // Dump the state of the strategy. }; + #if defined (__ACE_INLINE__) #include "ace/Message_Block.i" #endif /* __ACE_INLINE__ */ diff --git a/ace/Message_Block.i b/ace/Message_Block.i index a4aae16d557..4f58c137176 100644 --- a/ace/Message_Block.i +++ b/ace/Message_Block.i @@ -365,15 +365,9 @@ ACE_Message_Block::locking_strategy (ACE_Lock *nls) } -ACE_INLINE int -ACE_Dynamic_Message_Strategy::is_pending (const ACE_Message_Block & mb, - const ACE_Time_Value & tv) -{ - return ((mb.msg_priority () < pending_threshold_) || - this->is_beyond_late (mb, tv)) - ? 0 : 1; -} - // returns true if the message has a pending (not late) priority value +//////////////////////////////////////// +// class ACE_Dynamic_Message_Strategy // +//////////////////////////////////////// ACE_INLINE u_long ACE_Dynamic_Message_Strategy::static_bit_field_mask (void) @@ -404,20 +398,6 @@ ACE_Dynamic_Message_Strategy::static_bit_field_shift (u_long ul) // set left shift value to make room for static bit field ACE_INLINE u_long -ACE_Dynamic_Message_Strategy::pending_threshold (void) -{ - return pending_threshold_; -} - // get pending threshold priority value - -ACE_INLINE void -ACE_Dynamic_Message_Strategy::pending_threshold (u_long ul) -{ - pending_threshold_ = ul; -} - // set pending threshold priority value - -ACE_INLINE u_long ACE_Dynamic_Message_Strategy::dynamic_priority_max (void) { return dynamic_priority_max_; @@ -427,7 +407,12 @@ ACE_Dynamic_Message_Strategy::dynamic_priority_max (void) ACE_INLINE void ACE_Dynamic_Message_Strategy::dynamic_priority_max (u_long ul) { + // pending_shift_ depends on dynamic_priority_max_: for performance + // reasons, the value in pending_shift_ is (re)calculated only when + // dynamic_priority_max_ is initialized or changes, and is stored + // as a class member rather than being a derived value. dynamic_priority_max_ = ul; + pending_shift_ = ACE_Time_Value (0, ul); } // set maximum supported priority value @@ -436,14 +421,108 @@ ACE_Dynamic_Message_Strategy::dynamic_priority_offset (void) { return dynamic_priority_offset_; } - // get axis shift to map signed range into unsigned range + // get offset for boundary between signed range and unsigned range ACE_INLINE void ACE_Dynamic_Message_Strategy::dynamic_priority_offset (u_long ul) { - dynamic_priority_offset_ = ul; + + + // max_late_ and min_pending_ depend on dynamic_priority_offset_: for + // performance reasons, the values in max_late_ and min_pending_ are + // (re)calculated only when dynamic_priority_offset_ is initialized + // or changes, and are stored as a class member rather than being + // derived each time one of their values is needed. + dynamic_priority_offset_ = ul; + max_late_ = ACE_Time_Value (0, ul - 1); + min_pending_ = ACE_Time_Value (0, ul); +} + // set offset for boundary between signed range and unsigned range + + +ACE_INLINE ACE_Dynamic_Message_Strategy::Priority_Status +ACE_Dynamic_Message_Strategy::priority_status (ACE_Message_Block & mb, + const ACE_Time_Value & tv) +{ + // default the message to have pending priority status + Priority_Status status = ACE_Dynamic_Message_Strategy::PENDING; + + // start with the passed absolute time as the message's priority, then + // call the polymorphic hook method to (at least partially) convert + // the absolute time and message attributes into the message's priority + ACE_Time_Value priority (tv); + convert_priority (priority, mb); + + // if the priority is negative, the message is pending + if (priority < ACE_Time_Value::zero) + { + // priority for pending messages must be shifted + // upward above the late priority range + priority += pending_shift_; + if (priority < min_pending_) + { + priority = min_pending_; + } + } + // otherwise, if the priority is greater than the maximum late + // priority value that can be represented, it is beyond late + else if (priority > max_late_) + { + // all messages that are beyond late are assigned lowest priority (zero) + mb.msg_priority (0); + return ACE_Dynamic_Message_Strategy::BEYOND_LATE; + } + // otherwise, the message is late, but its priority is correct + else + { + status = ACE_Dynamic_Message_Strategy::LATE; + } + + // use (fast) bitwise operators to isolate and replace + // the dynamic portion of the message's priority + mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) | + ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) << + static_bit_field_shift_)); + + return status; } - // set axis shift to map signed range into unsigned range + // returns the priority status of the message + + + +///////////////////////////////////////// +// class ACE_Deadline_Message_Strategy // +///////////////////////////////////////// + +ACE_INLINE void +ACE_Deadline_Message_Strategy::convert_priority (ACE_Time_Value & priority, + const ACE_Message_Block & mb) +{ + // Convert absolute time passed in tv to negative time + // to deadline of mb with respect to that absolute time. + priority -= mb.msg_deadline_time (); +} + // dynamic priority conversion function based on time to deadline + + +/////////////////////////////////////// +// class ACE_Laxity_Message_Strategy // +/////////////////////////////////////// + +ACE_INLINE void +ACE_Laxity_Message_Strategy::convert_priority (ACE_Time_Value & priority, + const ACE_Message_Block & mb) +{ + // Convert absolute time passed in tv to negative + // laxity of mb with respect to that absolute time. + priority += mb.msg_execution_time (); + priority -= mb.msg_deadline_time (); +} + // dynamic priority conversion function based on laxity + + + + diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp index 0e952d50b2b..3e30afa25f1 100644 --- a/ace/Message_Queue_T.cpp +++ b/ace/Message_Queue_T.cpp @@ -400,34 +400,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) // We start looking from the highest priority to the lowest // priority. - for (temp = this->head_; + for (temp = this->tail_; temp != 0; - temp = temp->next ()) - if (temp->msg_priority () < new_item->msg_priority ()) - // Break out when we've located an item that has lower - // priority that <new_item>. + temp = temp->prev ()) + if (temp->msg_priority () >= new_item->msg_priority ()) + // Break out when we've located an item that has + // greater or equal priority. break; if (temp == 0) - // Check for simple case of inserting at the tail of the queue, - // where all we need to do is insert <new_item> after the - // current tail. - return this->enqueue_tail_i (new_item); - else if (temp->prev () == 0) - // Check for simple case of inserting at the head of the - // queue, where all we need to do is insert <new_item> before - // the current head. + // Check for simple case of inserting at the head of the queue, + // where all we need to do is insert <new_item> before the + // current head. return this->enqueue_head_i (new_item); + else if (temp->next () == 0) + // Check for simple case of inserting at the tail of the + // queue, where all we need to do is insert <new_item> after + // the current tail. + return this->enqueue_tail_i (new_item); else { - // Insert the new message ahead of the item of - // lesser priority. This ensures that FIFO order is + // Insert the new message behind the message of + // greater or equal priority. This ensures that FIFO order is // maintained when messages of the same priority are // inserted consecutively. - new_item->next (temp); - new_item->prev (temp->prev ()); - temp->prev ()->next (new_item); - temp->prev (new_item); + new_item->prev (temp); + new_item->next (temp->next ()); + temp->next ()->prev (new_item); + temp->next (new_item); } } @@ -453,6 +453,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) { + if (this->head_ ==0) + { + ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1); + } + ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); first_item = this->head_; this->head_ = this->head_->next (); @@ -483,7 +488,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -496,7 +501,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i // Wait for at least one item to become available. - if (this->wait_not_empty_cond (ace_mon, tv) == -1) + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) return -1; first_item = this->head_; @@ -505,7 +510,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) @@ -514,10 +519,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ ++this->enqueue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); - if (tv == 0) + if (timeout == 0) result = this->not_full_cond_.acquire (); else - result = this->not_full_cond_.acquire (*tv); + result = this->not_full_cond_.acquire (*timeout); int error = errno; mon.acquire (); errno = error; @@ -529,7 +534,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ while (this->is_full_i ()) { - if (this->not_full_cond_.wait (tv) == -1) + if (this->not_full_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; @@ -549,7 +554,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_ template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { int result = 0; #if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE) @@ -558,11 +563,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX ++this->dequeue_waiters_; // @@ Need to add sanity checks for failure... mon.release (); - if (tv == 0) + if (timeout == 0) result = this->not_empty_cond_.acquire (); else { - result = this->not_empty_cond_.acquire (*tv); + result = this->not_empty_cond_.acquire (*timeout); if (result == -1 && errno == ETIME) errno = EWOULDBLOCK; } @@ -577,7 +582,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX while (this->is_empty_i ()) { - if (this->not_empty_cond_.wait (tv) == -1) + if (this->not_empty_cond_.wait (timeout) == -1) { if (errno == ETIME) errno = EWOULDBLOCK; @@ -600,7 +605,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -611,7 +616,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_head_i (new_item); @@ -631,7 +636,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -642,7 +647,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_i (new_item); @@ -658,10 +663,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue"); - return this->enqueue_prio (new_item, tv); + return this->enqueue_prio (new_item, timeout); } // Block indefinitely waiting for an item to arrive, @@ -669,7 +674,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item, template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -680,7 +685,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, return -1; } - if (this->wait_not_full_cond (ace_mon, tv) == -1) + if (this->wait_not_full_cond (ace_mon, timeout) == -1) return -1; int queue_count = this->enqueue_tail_i (new_item); @@ -694,13 +699,13 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item, } } -// Remove an item from the front of the queue. If TV == 0 block +// Remove an item from the front of the queue. If timeout == 0 block // indefinitely (or until an alert occurs). Otherwise, block for upto -// the amount of time specified by TV. +// the amount of time specified by timeout. template <ACE_SYNCH_DECL> int ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1); @@ -711,8 +716,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, return -1; } - if (this->wait_not_empty_cond (ace_mon, tv) == -1) + if (this->wait_not_empty_cond (ace_mon, timeout) == -1) + { return -1; + } return this->dequeue_head_i (first_item); } @@ -742,6 +749,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( size_t lwm, ACE_Notification_Strategy *ns) : ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns) + , pending_head_ (0) + , pending_tail_ (0) + , late_head_ (0) + , late_tail_ (0) + , beyond_late_head_ (0) + , beyond_late_tail_ (0) , message_strategy_ (message_strategy) { // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the @@ -751,55 +764,157 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue ( template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void) { - delete &message_strategy_; + delete &(this->message_strategy_); } // dtor: free message strategy and let base class dtor do the rest template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages ( + ACE_Message_Block *&list_head, + ACE_Message_Block *&list_tail, + u_int status_flags) { - ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + int result = 0; - int result; + // start with an empty list + list_head = 0; + list_tail = 0; - // get the current time - ACE_Time_Value current_time = ACE_OS::gettimeofday (); - // refresh dynamic priority of the new message - result = message_strategy_.update_priority (*new_item, current_time); + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh priority status boundaries in the queue + result = this->refresh_queue (current_time); if (result < 0) { return result; } - // refresh dynamic priorities of messages in the queue - result = this->refresh_priorities (current_time); - if (result < 0) + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) && + (this->pending_head_) && (this->pending_tail_)) { - return result; + // patch up pointers for the new tail of the queue + if (this->pending_head_->prev ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_->prev ()->next (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // point to the head and tail of the list + list_head = this->pending_head_; + list_tail = this->pending_tail_; + + // cut the pending messages out of the queue entirely + this->pending_head_->prev (0); + this->pending_head_ = 0; + this->pending_tail_ = 0; } - // reorganize the queue according to the new priorities - result = this->refresh_queue (current_time); - if (result < 0) + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) && + (this->late_head_) && (this->late_tail_)) { - return result; + // patch up pointers for the (possibly) new head and tail of the queue + if (this->late_tail_->next ()) + { + this->late_tail_->next ()->prev (this->late_head_->prev ()); + } + else + { + this->tail_ = this->late_head_->prev (); + } + if (this->late_head_->prev ()) + { + this->late_head_->prev ()->next (this->late_tail_->next ()); + } + else + { + this->head_ = this->late_tail_->next (); + } + + // put late messages behind pending messages (if any) being returned + this->late_head_->prev (list_tail); + if (list_tail) + { + list_tail->next (this->late_head_); + } + else + { + list_head = this->late_head_; + } + list_tail = this->late_tail_; + + this->late_tail_->next (0); + this->late_head_ = 0; + this->late_tail_ = 0; } - // invoke the base class method - result = ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (new_item); + if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) && + (this->beyond_late_head_) && (this->beyond_late_tail_)) + { + // patch up pointers for the new tail of the queue + if (this->beyond_late_tail_->next ()) + { + this->head_ = this->beyond_late_tail_->next (); + this->beyond_late_tail_->next ()->prev (0); + } + else + { + // the list has become empty + this->head_ = 0; + this->tail_ = 0; + } + + // put beyond late messages at the end of the list being returned + if (list_tail) + { + this->beyond_late_head_->prev (list_tail); + list_tail->next (this->beyond_late_head_); + } + else + { + list_head = this->beyond_late_head_; + } + list_tail = this->beyond_late_tail_; + + this->beyond_late_tail_->next (0); + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } + + // decrement message and size counts for removed messages + ACE_Message_Block *temp1, *temp2; + for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ()) + { + this->cur_count_--; + + for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ()) + { + this->cur_bytes_ -= temp2->size (); + } + } return result; } - // Enqueue an <ACE_Message_Block *> in accordance with its priority. - // priority may be *dynamic* or *static* or a combination or *both* - // It calls the priority evaluation function passed into the Dynamic - // Message Queue constructor to update the priorities of all enqueued - // messages. + // Detach all messages with status given in the passed flags from + // the queue and return them by setting passed head and tail pointers + // to the linked list they comprise. This method is intended primarily + // as a means of periodically harvesting messages that have missed + // their deadlines, but is available in its most general form. All + // messages are returned in priority order, from head to tail, as of + // the time this method was called. + + template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head"); @@ -816,269 +931,636 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs // get the current time ACE_Time_Value current_time = ACE_OS::gettimeofday (); - // refresh dynamic priorities of messages in the queue - result = this->refresh_priorities (current_time); + // refresh priority status boundaries in the queue + result = this->refresh_queue (current_time); if (result < 0) { return result; } - // reorganize the queue according to the new priorities, - // possibly dropping messages which are later than can - // be represented by the range of priority values + // *now* it's appropriate to wait for an enqueued item + result = this->wait_not_empty_cond (ace_mon, timeout); + if (result == -1) + { + return result; + } + + // call the internal dequeue method, which selects an + // item from the highest priority status portion of + // the queue that has messages enqueued. + result = dequeue_head_i (first_item); + + return result; +} + // Dequeue and return the <ACE_Message_Block *> + // at the (logical) head of the queue. + +template <ACE_SYNCH_DECL> void +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump"); + ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this)); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n"))); + this->ACE_Message_Queue<ACE_SYNCH_USE>::dump (); + + ACE_DEBUG ((LM_DEBUG, + ASYS_TEXT ("pending_head_ = %u\n") + ASYS_TEXT ("pending_tail_ = %u\n") + ASYS_TEXT ("late_head_ = %u\n") + ASYS_TEXT ("late_tail_ = %u\n") + ASYS_TEXT ("beyond_late_head_ = %u\n") + ASYS_TEXT ("beyond_late_tail_ = %u\n"), + this->pending_head_, + this->pending_tail_, + this->late_head_, + this->late_tail_, + this->beyond_late_head_, + this->beyond_late_tail_)); + + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message_strategy_ : \n"))); + message_strategy_.dump (); + + ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP)); +} + // dump the state of the queue + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item) +{ + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i"); + + int result = 0; + + // get the current time + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + // refresh priority status boundaries in the queue result = this->refresh_queue (current_time); if (result < 0) { return result; } - // *now* it's appropriate to wait for an enqueued item - result = this->wait_not_empty_cond (ace_mon, tv); - if (result == -1) + // where we enqueue depends on the message's priority status + switch (message_strategy_.priority_status (*new_item, current_time)) { - return result; + case ACE_Dynamic_Message_Strategy::PENDING: + if (this->pending_tail_ == 0) + { + // Check for simple case of an empty pending queue, where all we need to + // do is insert <new_item> into the tail of the queue. + pending_head_ = new_item; + pending_tail_ = pending_head_; + result = this->enqueue_tail_i (new_item); + } + else + { + // enqueue the new message in priority order in the pending sublist + result = sublist_enqueue_i (new_item, + current_time, + this->pending_head_, + this->pending_tail_, + ACE_Dynamic_Message_Strategy::PENDING); + } + + break; + + case ACE_Dynamic_Message_Strategy::LATE: + if (this->late_tail_ == 0) + { + late_head_ = new_item; + late_tail_ = late_head_; + + if (this->pending_head_ == 0) + { + // Check for simple case of an empty pending queue, where all + // we need to do is insert <new_item> into the tail of the queue. + result = this->enqueue_tail_i (new_item); + } + else if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + result = this->enqueue_head_i (new_item); + } + else + { + // otherwise, we can just splice the new message in between + // the pending and beyond late portions of the queue + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->pending_head_->prev (new_item); + new_item->next (this->pending_head_); + } + } + else + { + // enqueue the new message in priority order in the late sublist + result = sublist_enqueue_i (new_item, + current_time, + this->late_head_, + this->late_tail_, + ACE_Dynamic_Message_Strategy::LATE); + } + break; + + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: + if (this->beyond_late_tail_ == 0) + { + // Check for simple case of an empty beyond late queue, where all + // we need to do is insert <new_item> into the head of the queue. + beyond_late_head_ = new_item; + beyond_late_tail_ = beyond_late_head_; + result = this->enqueue_head_i (new_item); + } + else + { + // all beyond late messages have the same (zero) priority, so + // just put the new one at the end of the beyond late messages + if (this->beyond_late_tail_->next ()) + { + this->beyond_late_tail_->next ()->prev (new_item); + } + else + { + this->tail_ = new_item; + } + + new_item->next (this->beyond_late_tail_->next ()); + this->beyond_late_tail_->next (new_item); + new_item->prev (this->beyond_late_tail_); + this->beyond_late_tail_ = new_item; + } + + break; + + // should never get here, but just in case... + default: + result = -1; + break; } - // invoke the internal virtual method - return this->dequeue_head_i (first_item); + return result; } - // Dequeue and return the <ACE_Message_Block *> at the head of the - // queue. + // Enqueue an <ACE_Message_Block *> in accordance with its priority. + // priority may be *dynamic* or *static* or a combination or *both* + // It calls the priority evaluation function passed into the Dynamic + // Message Queue constructor to update the priorities of all enqueued + // messages. -template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv) + +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item, + const ACE_Time_Value ¤t_time, + ACE_Message_Block *&sublist_head, + ACE_Message_Block *&sublist_tail, + ACE_Dynamic_Message_Strategy::Priority_Status status) { int result = 0; + ACE_Message_Block *current_item = 0; - // apply the priority update function to all enqueued - // messages, starting at the head of the queue - ACE_Message_Block *temp = ACE_Message_Queue<ACE_SYNCH_USE>::head_; - while (temp) + // find message after which to enqueue new item, + // based on message priority and priority status + for (current_item = sublist_tail; + current_item; + current_item = current_item->prev ()) { - result = message_strategy_.update_priority (*temp, tv); - if (result < 0) + if (message_strategy_.priority_status (*current_item, current_time) == status) + { + if (current_item->msg_priority () >= new_item->msg_priority ()) + { + break; + } + } + else { + sublist_head = new_item; break; } - - temp = temp->next (); + } + + if (current_item == 0) + { + // if the new message has highest priority of any, + // put it at the head of the list (and sublist) + result = enqueue_head_i (new_item); + sublist_head = new_item; + } + else + { + // insert the new item into the list + new_item->next (current_item->next ()); + new_item->prev (current_item); + if (current_item->next ()) + { + current_item->next ()->prev (new_item); + } + else + { + this->tail_ = new_item; + } + + current_item->next (new_item); + + // if the new item has lowest priority of any in the sublist, + // move the tail pointer of the sublist back to the new item + if (current_item == sublist_tail) + { + sublist_tail = new_item; + } } return result; } - // refresh the priorities in the queue according - // to a specific priority assignment function + // enqueue a message in priority order within a given priority status sublist + template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item) { - // Remove messages that are later than the priority range can represent - int result = remove_stale_messages (tv); - if (result < 0) + ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i"); + + int result = 0; + int last_in_subqueue = 0; + + // first, try to dequeue from the head of the pending list + if (this->pending_head_) { - return result; + first_item = this->pending_head_; + + if (0 == this->pending_head_->prev ()) + { + this->head_ = this->pending_head_->next (); + } + else + { + this->pending_head_->prev ()->next (this->pending_head_->next ()); + } + + if (0 == this->pending_head_->next ()) + { + this->tail_ = this->pending_head_->prev (); + this->pending_head_ = 0; + this->pending_tail_ = 0; + } + else + { + this->pending_head_->next ()->prev (this->pending_head_->prev ()); + this->pending_head_ = this->pending_head_->next (); + } + + first_item->prev (0); + first_item->next (0); + } + // second, try to dequeue from the head of the late list + else if (this->late_head_) + { + last_in_subqueue = + (this->late_head_ == this->late_tail_) ? 1 : 0; + + first_item = this->late_head_; + + if (0 == this->late_head_->prev ()) + { + this->head_ = this->late_head_->next (); + } + else + { + this->late_head_->prev ()->next (this->late_head_->next ()); + } + + if (0 == this->late_head_->next ()) + { + this->tail_ = this->late_head_->prev (); + } + else + { + this->late_head_->next ()->prev (this->late_head_->prev ()); + this->late_head_ = this->late_head_->next (); + } + + if (last_in_subqueue) + { + this->late_head_ = 0; + this->late_tail_ = 0; + } + + first_item->prev (0); + first_item->next (0); } + // finally, try to dequeue from the head of the beyond late list + else if (this->beyond_late_head_) + { + last_in_subqueue = + (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0; + + first_item = this->beyond_late_head_; + this->head_ = this->beyond_late_head_->next (); + + if (0 == this->beyond_late_head_->next ()) + { + this->tail_ = this->beyond_late_head_->prev (); + } + else + { + this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ()); + this->beyond_late_head_ = this->beyond_late_head_->next (); + } + + if (last_in_subqueue) + { + this->beyond_late_head_ = 0; + this->beyond_late_tail_ = 0; + } - // Refresh the order of messages in the queue, - // putting pending messages ahead of late messages - return reorder_queue (tv); + first_item->prev (0); + first_item->next (0); + } + else + { + // nothing to dequeue: set the pointer to zero and return an error code + first_item = 0; + result = -1; + } + + return result; } - // refresh the order of messages in the queue - // after refreshing their priorities + // Dequeue and return the <ACE_Message_Block *> at the head of the + // logical queue. Attempts first to dequeue from the pending + // portion of the queue, or if that is empty from the late portion, + // or if that is empty from the beyond late portion, or if that is + // empty just sets the passed pointer to zero and returns -1. + template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_stale_messages (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value ¤t_time) { - int result = 0; + int result; + + result = refresh_pending_queue (current_time); + + if (result != -1) + { + result = refresh_late_queue (current_time); + } - // start at the beginning of the list - ACE_Message_Block *current = head_; + return result; +} + // Refresh the queue using the strategy + // specific priority status function. - // maintain a list of dropped messages to - // be appended to the end of the list after - // the sweep is complete - ACE_Message_Block *append_list_head = 0; - ACE_Message_Block *append_list_tail = 0; - while (current) +template <ACE_SYNCH_DECL> int +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value ¤t_time) +{ + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + // refresh priority status boundaries in the queue + if (this->pending_head_) { - // messages that have overflowed the given time bounds must be removed - if (message_strategy_.is_beyond_late (*current, tv)) + current_status = message_strategy_.priority_status (*this->pending_head_, current_time); + switch (current_status) { - // find the end of the chain of overflowed messages - ACE_Message_Block *remove_head = current; - ACE_Message_Block *remove_tail = current; - while ((remove_tail) && (remove_tail->next ()) && - message_strategy_.is_beyond_late (*(remove_tail->next ()), tv)) - { - // extend the chain of messages to be removed - remove_tail = remove_tail->next (); - } + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: - // fix up list pointers to bypass the overflowed message chain + // make sure the head of the beyond late queue is set + // (there may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; - if (remove_tail->next ()) - { - remove_tail->next ()->prev (remove_head->prev ()); - } - else - { - tail_ = remove_head->prev (); - } + // zero out the late queue pointers, and set them only if + // there turn out to be late messages in the pending sublist + this->late_head_ = 0; + this->late_tail_ = 0; - if (remove_head->prev ()) - { - remove_head->prev ()->next (remove_tail->next ()); - } - else - { - head_ = remove_tail->next (); - } + // advance through the beyond late messages in the pending queue + do + { + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) + { + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); + } + else + { + break; // do while + } - // move the current pointer past the end of the chain - current = remove_tail->next (); - - // Cut the chain of overflowed messages out of the list - remove_head->prev (0); - remove_tail->next (0); - - // Call strategy's drop_message method on each overflowed message. - // Cannot just delete each message even though reference counting - // at the data bloc level means that the underlying data block will - // not be deleted if another message block is still pointing to it. - // If the entire set of message blocks is known in advance, they may - // be allocated on the stack instead of the heap (to speed performance), - // and the caller *cannot* surrender ownership of the memory to the - // list. Putting this policy in the strategy allows the correct memory - // management scheme to be configured in either case. - ACE_Message_Block *temp1 = remove_head; - ACE_Message_Block *temp2 = remove_head->next (); - ACE_Message_Block *size_temp; - size_t msg_size; - while (temp1) - { - // Make sure to count *all* the bytes in a composite message!!! - for (size_temp = temp1, msg_size = 0; - size_temp != 0; - size_temp = size_temp->cont ()) + } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->pending_head_) { - msg_size += size_temp->size (); + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->pending_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + { + // there are no late messages left in the queue + break; // switch + } + else + { + if (current_status != ACE_Dynamic_Message_Strategy::LATE) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + + // intentionally fall through to the next case + } + } + else + { + // there are no pending or late messages left in the queue + this->beyond_late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; + + break; // switch } - result = message_strategy_.drop_message (temp1); - if (result < 0) + case ACE_Dynamic_Message_Strategy::LATE: + + // make sure the head of the late queue is set (there may not have been + // any late messages previously, or they may have all become beyond late) + if (this->late_head_ == 0) { - return result; + this->late_head_ = this->pending_head_; } - if (temp1) + // advance through the beyond late messages in the pending queue + do { - // if the message was not destroyed, zero out its priority and - // put it on the list to append to the back of the queue - temp1->msg_priority (0); - temp1->next (0); - if (append_list_tail) + this->pending_head_ = this->pending_head_->next (); + + if (this->pending_head_) { - temp1->prev (append_list_tail); - append_list_tail->next (temp1); + current_status = message_strategy_.priority_status (*this->pending_head_, + current_time); } else { - temp1->prev (0); - append_list_head = temp1; + break; // do while + } + + } while (current_status == ACE_Dynamic_Message_Strategy::LATE); + + if (this->pending_head_) + { + if (current_status != ACE_Dynamic_Message_Strategy::PENDING) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected PENDING)"), + (int) current_status), + -1); } - append_list_tail = temp1; + + // point tail of late sublist to previous item + this->late_tail_ = this->pending_head_->prev (); } else { - // if the message was destroyed, decrease the message - // count and byte count in the message queue - this->cur_count_--; - this->cur_bytes_ -= msg_size; + // there are no pending messages left in the queue + this->late_tail_ = this->tail_; + this->pending_head_ = 0; + this->pending_tail_ = 0; } - temp1 = temp2; - temp2 = temp2 ? temp2->next () : temp2; - } - } - else - { - current = current->next (); - } - } + break; // switch - // append any saved dropped messages to the end of the queue - if (append_list_tail) - { - if (tail_) - { - tail_->next (append_list_head); - append_list_head->prev (tail_); - tail_ = append_list_tail; - } - else - { - head_ = append_list_head; - tail_ = append_list_tail; + case ACE_Dynamic_Message_Strategy::PENDING: + + // do nothing - the pending queue is unchanged + + break; // switch + + default: + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + break; // switch } } - return result; + return 0; } - // Remove messages that are later than the priority range can represent. + // Refresh the pending queue using the strategy + // specific priority status function. template <ACE_SYNCH_DECL> int -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::reorder_queue (const ACE_Time_Value & tv) +ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value ¤t_time) { - // if the queue is not empty, and the first message is late, need to reorder - if ((head_) && (! message_strategy_.is_pending (*head_, tv))) + ACE_Dynamic_Message_Strategy::Priority_Status current_status; + + if (this->late_head_) { - // find the end of the chain of newly late messages - // (since the last time the queue was reordered) - ACE_Message_Block *reorder_head = head_; - ACE_Message_Block *reorder_tail = head_; - while ((reorder_tail) && (reorder_tail->next ()) && - reorder_tail->next ()->msg_priority () <= reorder_head->msg_priority ()) + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + switch (current_status) { - // extend the chain of messages to be removed - reorder_tail = reorder_tail->next (); - } + case ACE_Dynamic_Message_Strategy::BEYOND_LATE: - // if a proper subset of the queue is out of order, reorganize the queue - if (reorder_tail != tail_) - { - // fix up list pointers to bypass the overflowed message chain - if (reorder_tail->next ()) - { - reorder_tail->next ()->prev (reorder_head->prev ()); - } - else - { - tail_ = reorder_head->prev (); - } - if (reorder_head->prev ()) - { - reorder_head->prev ()->next (reorder_tail->next ()); - } - else - { - head_ = reorder_tail->next (); - } + // make sure the head of the beyond late queue is set + // (there may not have been any beyond late messages previously) + this->beyond_late_head_ = this->head_; + + // advance through the beyond late messages in the late queue + do + { + this->late_head_ = this->late_head_->next (); + + if (this->late_head_) + { + current_status = message_strategy_.priority_status (*this->late_head_, + current_time); + } + else + { + break; // do while + } + + } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE); + + if (this->late_head_) + { + // point tail of beyond late sublist to previous item + this->beyond_late_tail_ = this->late_head_->prev (); + + if (current_status == ACE_Dynamic_Message_Strategy::PENDING) + { + // there are no late messages left in the queue + this->late_head_ = 0; + this->late_tail_ = 0; + } + else if (current_status != ACE_Dynamic_Message_Strategy::LATE) + { + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"), + (int) current_status), + -1); + } + } + else + { + // there are no late messages left in the queue + this->beyond_late_tail_ = this->tail_; + this->late_head_ = 0; + this->late_tail_ = 0; + } + + break; // switch + + case ACE_Dynamic_Message_Strategy::LATE: + + // do nothing - the late queue is unchanged + + break; // switch + + case ACE_Dynamic_Message_Strategy::PENDING: + + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unexpected message priority status " + "[%d] (expected LATE or BEYOND_LATE)"), + (int) current_status), + -1); + + break; // switch + + default: + + // if we got here, something is *seriously* wrong with the queue + ACE_ERROR_RETURN((LM_ERROR, + ASYS_TEXT ("Unknown message priority status [%d]"), + (int) current_status), + -1); + + break; // switch } - } + } return 0; } - // Refresh the order of messages in the queue. + // Refresh the late queue using the strategy + // specific priority status function. template <ACE_SYNCH_DECL> int ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head ( ACE_Message_Block *&first_item, - ACE_Time_Value *tv) + ACE_Time_Value *timeout) { - return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, tv); + return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout); } // private method to hide public base class method: just calls base class method @@ -1124,7 +1606,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) { @@ -1133,7 +1614,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t ACE_NEW_RETURN (adms, ACE_Deadline_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset), 0); @@ -1142,32 +1622,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t } // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_cleanup_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Deadline_Cleanup_Message_Strategy *adcms; - - ACE_NEW_RETURN (adcms, - ACE_Deadline_Cleanup_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); - - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adcms, hwm, lwm, ns); -} - // factory method for a dynamically prioritized (by time to deadline) - // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages - template <ACE_SYNCH_DECL> ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * @@ -1176,7 +1630,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw ACE_Notification_Strategy *ns, u_long static_bit_field_mask, u_long static_bit_field_shift, - u_long pending_threshold, u_long dynamic_priority_max, u_long dynamic_priority_offset) { @@ -1185,7 +1638,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw ACE_NEW_RETURN (alms, ACE_Laxity_Message_Strategy (static_bit_field_mask, static_bit_field_shift, - pending_threshold, dynamic_priority_max, dynamic_priority_offset), 0); @@ -1196,30 +1648,19 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue -template <ACE_SYNCH_DECL> -ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * -ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_cleanup_message_queue (size_t hwm, - size_t lwm, - ACE_Notification_Strategy *ns, - u_long static_bit_field_mask, - u_long static_bit_field_shift, - u_long pending_threshold, - u_long dynamic_priority_max, - u_long dynamic_priority_offset) -{ - ACE_Laxity_Message_Strategy *alcms; - - ACE_NEW_RETURN (alcms, - ACE_Laxity_Cleanup_Message_Strategy (static_bit_field_mask, - static_bit_field_shift, - pending_threshold, - dynamic_priority_max, - dynamic_priority_offset), - 0); +#if defined (VXWORKS) - return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alcms, hwm, lwm, ns); +ACE_Message_Queue_Vx * +ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages, + size_t max_message_length, + ACE_Notification_Strategy *ns) +{ + return new ACE_Message_Queue_Vx (max_messages, max_message_length, ns); } - // factory method for a dynamically prioritized (by laxity) - // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages + // factory method for a wrapped VxWorks message queue + +#endif /* defined (VXWORKS) */ + + #endif /* ACE_MESSAGE_QUEUE_T_C */ diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h index 1d140abb7e6..6022eb6e5e6 100644 --- a/ace/Message_Queue_T.h +++ b/ace/Message_Queue_T.h @@ -66,7 +66,7 @@ public: // = EWOULDBLOCK). virtual int peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv = 0); + ACE_Time_Value *timeout = 0); // Retrieve the first <ACE_Message_Block> without removing it. // Returns -1 on failure, else the number of items still on the // queue. @@ -160,7 +160,7 @@ public: virtual ACE_Notification_Strategy *notification_strategy (void); virtual void notification_strategy (ACE_Notification_Strategy *s); - void dump (void) const; + virtual void dump (void) const; // Dump the state of an object. ACE_ALLOC_HOOK_DECLARE; @@ -199,11 +199,11 @@ protected: // = Helper methods to factor out common #ifdef code. virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv); + ACE_Time_Value *timeout); // Wait for the queue to become non-full. virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon, - ACE_Time_Value *tv); + ACE_Time_Value *timeout); // Wait for the queue to become non-empty. virtual int signal_enqueue_waiters (void); @@ -343,32 +343,75 @@ class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE> // = TITLE // A derived class which adapts the <ACE_Message_Queue> // class in order to maintain dynamic priorities for enqueued - // <ACE_Message_Blocks> and manage the queue dynamically. + // <ACE_Message_Blocks> and manage the queue order according + // to these dynamic priorities. // // = DESCRIPTION - // Priorities and queue orderings are refreshed at each enqueue - // and dequeue operation. Head and tail enqueue methods were - // made private to prevent out-of-order messages from confusing - // the pending and late portions of the queue. Messages in the - // pending portion of the queue whose dynamic priority becomes - // negative are placed into the late portion of the queue. - // Messages in the late portion of the queue whose dynamic - // priority becomes positive are dropped. These behaviors - // support a limited schedule overrun corresponding to one full - // cycle through dynamic priority values. These behaviors can - // be modified in derived classes by providing alternative - // definitions for the appropriate virtual methods. + // + // The messages in the queue are managed so as to preserve + // a logical ordering with minimal overhead per enqueue and + // dequeue operation. For this reason, the actual order of + // messages in the linked list of the queue may differ from + // their priority order. As time passes, a message may change + // from pending status to late status, and eventually to beyond + // late status. To minimize reordering overhead under this + // design force, three separate boundaries are maintained + // within the linked list of messages. Messages are dequeued + // preferentially from the head of the pending portion, then + // the head of the late portion, and finally from the head + // of the beyond late portion. In this way, only the boundaries + // need to be maintained (which can be done efficiently, as + // aging messages maintain the same linked list order as they + // progress from one status to the next), with no reordering + // of the messages themselves, while providing correct priority + // ordered dequeueing semantics. + // + // Head and tail enqueue methods inherited from ACE_Message_Queue + // are made private to prevent out-of-order messages from confusing + // management of the various portions of the queue. Messages in + // the pending portion of the queue whose priority becomes late + // (according to the specific dynamic strategy) advance into + // the late portion of the queue. Messages in the late portion + // of the queue whose priority becomes later than can be represented + // advance to the beyond_late portion of the queue. These behaviors + // support a limited schedule overrun, with pending messages prioritized + // ahead of late messages, and late messages ahead of beyond late + // messages. These behaviors can be modified in derived classes by + // providing alternative definitions for the appropriate virtual methods. + // + // When filled with messages, the queue's linked list should look like: + // + // H T + // | | + // + // B - B - B - B - L - L - L - P - P - P - P - P + // + // | | | | | | + // BH BT LH LT PH PT + // + // Where the symbols are as follows: + // + // H = Head of the entire list + // T = Tail of the entire list + // B = Beyond late message + // BH = Beyond late messages Head + // BT = Beyond late messages Tail + // L = Late message + // LH = Late messages Head + // LT = Late messages Tail + // P = Pending message + // PH = Pending messages Head + // PT = Pending messages Tail // // Caveat: the virtual methods enqueue_tail, enqueue_head, // and peek_dequeue_head were made private, but for some // compilers it is possible to gain access to these methods - // through nefarious means: achieving this may result in + // through base class pointers: achieving this may result in // highly unpredictable results, as expectations about // where a message starts or remains between method // invocations may not hold for a dynamically managed // message queue. - - + // public: // = Initialization and termination methods. ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy, @@ -379,12 +422,26 @@ public: virtual ~ACE_Dynamic_Message_Queue (void); // Close down the message queue and release all resources. + virtual int remove_messages (ACE_Message_Block *&list_head, + ACE_Message_Block *&list_tail, + u_int status_flags); + // Detach all messages with status given in the passed flags from + // the queue and return them by setting passed head and tail pointers + // to the linked list they comprise. This method is intended primarily + // as a means of periodically harvesting messages that have missed + // their deadlines, but is available in its most general form. All + // messages are returned in priority order, from head to tail, as of + // the time this method was called. + virtual int dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout = 0); // Dequeue and return the <ACE_Message_Block *> at the head of the // queue. Returns -1 on failure, else the number of items still on // the queue. + virtual void dump (void) const; + // Dump the state of the queue. + ACE_ALLOC_HOOK_DECLARE; // Declare the dynamic allocation hooks. @@ -397,23 +454,49 @@ protected: // Message Queue constructor to update the priorities of all // enqueued messages. - virtual int refresh_priorities (const ACE_Time_Value & tv); - // Refresh the priorities in the queue according to a specific - // priority assignment function. + virtual int sublist_enqueue_i (ACE_Message_Block *new_item, + const ACE_Time_Value ¤t_time, + ACE_Message_Block *&sublist_head, + ACE_Message_Block *&sublist_tail, + ACE_Dynamic_Message_Strategy::Priority_Status status); + // enqueue a message in priority order within a given priority status sublist - virtual int refresh_queue (const ACE_Time_Value & tv); - // Refresh the order of messages in the queue after refreshing their - // priorities. + virtual int dequeue_head_i (ACE_Message_Block *&first_item); + // Dequeue and return the <ACE_Message_Block *> at the head of the + // logical queue. Attempts first to dequeue from the pending + // portion of the queue, or if that is empty from the late portion, + // or if that is empty from the beyond late portion, or if that is + // empty just sets the passed pointer to zero and returns -1. + + virtual int refresh_queue (const ACE_Time_Value & current_time); + // Refresh the queue using the strategy + // specific priority status function. - virtual int remove_stale_messages (const ACE_Time_Value & tv); - // Remove messages that are later than the priority range can represent. + virtual int refresh_pending_queue (const ACE_Time_Value & current_time); + // Refresh the pending queue using the strategy + // specific priority status function. - virtual int reorder_queue (const ACE_Time_Value & tv); - // Refresh the order of messages in the queue. + virtual int refresh_late_queue (const ACE_Time_Value & current_time); + // Refresh the late queue using the strategy + // specific priority status function. - ACE_Message_Block *pending_list_tail_; - // Pointer to tail of the pending messages (those whose priority is - // and has been non-negative) in the <ACE_Message_Block list>. + ACE_Message_Block *pending_head_; + // Pointer to head of the pending messages + + ACE_Message_Block *pending_tail_; + // Pointer to tail of the pending messages + + ACE_Message_Block *late_head_; + // Pointer to head of the late messages + + ACE_Message_Block *late_tail_; + // Pointer to tail of the late messages + + ACE_Message_Block *beyond_late_head_; + // Pointer to head of the beyond late messages + + ACE_Message_Block *beyond_late_tail_; + // Pointer to tail of the beyond late messages ACE_Dynamic_Message_Strategy &message_strategy_; // Pointer to a dynamic priority evaluation function. @@ -428,7 +511,7 @@ private: // but make them private so they're not accessible outside the class virtual int peek_dequeue_head (ACE_Message_Block *&first_item, - ACE_Time_Value *tv = 0); + ACE_Time_Value *timeout = 0); // private method to hide public base class method: just calls base class method virtual int enqueue_tail (ACE_Message_Block *new_item, @@ -469,46 +552,30 @@ public: ACE_Notification_Strategy * = 0, u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) // factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * - create_deadline_cleanup_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, - size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, - ACE_Notification_Strategy * = 0, - u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // factory method for a dynamically prioritized (by time to deadline) - // ACE_Dynamic_Message_Queue, with automatic deletion of beyond late messages - - - static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, ACE_Notification_Strategy * = 0, u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) // factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue - static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> * - create_laxity_cleanup_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM, - size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM, - ACE_Notification_Strategy * = 0, - u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1 - u_long static_bit_field_shift = 10, // 10 low order bits - u_long pending_threshold = 0x200000UL, // 2^(22-1) - u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1 - u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1) - // factory method for a dynamically prioritized (by laxity) - // ACE_Dynamic_Message_Queue, with automatic deletion of beyond late messages + +#if defined (VXWORKS) + + static ACE_Message_Queue_Vx * + create_Vx_message_queue (size_t max_messages, size_t max_message_length, + ACE_Notification_Strategy *ns = 0); + // factory method for a wrapped VxWorks message queue + +#endif /* defined (VXWORKS) */ + }; #if defined (__ACE_INLINE__) diff --git a/tests/Dynamic_Priority_Test.cpp b/tests/Dynamic_Priority_Test.cpp index 57b5758fee6..baeeeab9beb 100644 --- a/tests/Dynamic_Priority_Test.cpp +++ b/tests/Dynamic_Priority_Test.cpp @@ -33,8 +33,13 @@ // reachable deadlines. The producer then immediately enqueues all // messages. // -// In each test, the consumer is passed the filled queue and a string with -// the expected order in which the messages should dequeue. +// Two separate tests are run, one which verifies messages are correctly +// ordered my the given queues, and one which generates performance +// numbers for the various queues under increasing numbers of messages. +// In the first test, the consumer is passed the filled queue and a string +// with the expected order in which the messages should dequeue. In the +// second test, measurements are made as non-intrusive as possible, with +// no ordering checks. // // = AUTHOR // Chris Gill @@ -43,6 +48,8 @@ #include "ace/Message_Queue.h" #include "ace/Thread_Manager.h" +#include "ace/High_Res_Timer.h" +#include "ace/Sched_Params.h" #include "test_config.h" ACE_RCSID(tests, Dynamic_Priority_Test, "$Id$") @@ -52,18 +59,24 @@ USELIB("..\ace\aced.lib"); //--------------------------------------------------------------------------- #endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */ -// structure used to pass arguments to test functions +enum Test_Type {BEST, WORST, RANDOM}; + +// structure used to pass arguments to test functions struct ArgStruct { ACE_Message_Queue<ACE_MT_SYNCH> *queue_; const char *order_string_; ACE_Message_Block **array_; + u_int expected_count_; }; // order in which messages are sent static const char send_order [] = "abcdefghijklmnop"; +// order in which messages are received with "FIFO prioritization" (i.e., none) +static const char FIFO_receipt_order [] = "abcdefghijklmnop"; + // order in which messages are received with static prioritization static const char static_receipt_order [] = "ponmlkjihgfedcba"; @@ -73,29 +86,51 @@ static const char deadline_receipt_order [] = "hgfedcbaponmlkji"; // order in which messages are received with laxity prioritization static const char laxity_receipt_order [] = "hfgedbcapnomljki"; -// fast and slow execution time values (sec, usec) -static const ACE_Time_Value fast_execution (0, 10); -static const ACE_Time_Value slow_execution (0, 100); +// fast and slow execution time values (sec, usec), +// kept very small to allow comparison of deadline, +// laxity, and static strategies across a very wide +// range of message counts. +static const ACE_Time_Value fast_execution (0, 1); +static const ACE_Time_Value slow_execution (0, 2); // Make the queue be capable of being *very* large. static const long max_queue = LONG_MAX; -// The consumer dequeues a message from the passed Message_Queue, +#if defined (VXWORKS) +// VxWorks Message Queue parameters +vx_max_queue = INT_MAX +vx_msg_size = 32 +#endif /* defined (VXWORKS) */ + +// loading parameters (number of messages to push through queues) +// for performance tests +static int MIN_LOAD = 20; +static int MAX_LOAD = 1000; +static int LOAD_STEP = 20; + +// time offsets for a minute in the past (for the best case test) and +// two seconds in the future (for the worst case and randomized tests) +const static ACE_Time_Value far_past_offset (-60, 0); +const static ACE_Time_Value near_future_offset (2, 0); +const static ACE_Time_Value offset_step (0, 5); + +// The order consumer dequeues a message from the passed Message_Queue, // and checks its data character against the passed string of characters -// which has the expected ordering. The supplier and consumer do not +// which has the expected ordering. Suppliers and consumers do not // allocate or deallocate messages, to avoid timing delays and timing // jitter in the test: the main function is responsible for all // initialization allocation and cleanup before, between, and after -// (but not during) the transfer of messages from the supplier to the -// consumer. +// (but not during) the transfer of messages from a supplier to the +// corresponding consumer. static void * -consumer (void * args) +order_consumer (void * args) { ACE_ASSERT (args != 0); ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; const char *receipt_order = ((ArgStruct *) args)->order_string_; + u_int expected_count = ((ArgStruct *) args)->expected_count_; ACE_ASSERT (receipt_order != 0); ACE_ASSERT (msg_queue != 0); @@ -111,30 +146,36 @@ consumer (void * args) int result = msg_queue->dequeue_head (mb); if (result == -1) + { break; + } local_count++; - ACE_ASSERT (*expected == *mb->rd_ptr ()); + ACE_ASSERT (*expected == *mb->rd_ptr ()); } ACE_ASSERT (local_count == ACE_OS::strlen (receipt_order)); + + ACE_ASSERT (local_count == expected_count); + return 0; } -// The producer runs through the passed send string, setting the read +// The order producer runs through the passed send string, setting the read // pointer of the current message to the current character position in // the string, and then queueing the message in the message list, where -// it is removed by the consumer thread. +// it is removed by the order consumer. static void * -producer (void *args) +order_producer (void *args) { ACE_ASSERT (args != 0); ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; const char *send_order = ((ArgStruct *) args)->order_string_; ACE_Message_Block **block_array = ((ArgStruct *) args)->array_; + int expected_count = ((ArgStruct *) args)->expected_count_; ACE_ASSERT (send_order != 0); ACE_ASSERT (block_array != 0); @@ -142,7 +183,7 @@ producer (void *args) // iterate through the send order string and the message block array, // setting the current message block's read pointer to the current // position in the send order string. - int local_count; + int local_count = 0; const char *c; for (local_count = 0, c = send_order; *c != '\0'; ++local_count, ++c) { @@ -155,16 +196,21 @@ producer (void *args) *mb->rd_ptr () = *c; mb->wr_ptr (1); + // Enqueue the message block in priority order. if (msg_queue->enqueue_prio (mb) == -1) - ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("(%t) %p\n"), ASYS_TEXT ("put_next")), 0); + { + break; + } } + ACE_ASSERT (local_count == expected_count); + return 0; } -int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_order, const char *receipt_order) +int run_order_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_order, const char *receipt_order) { u_int i; u_int array_size = ACE_OS::strlen (send_order); @@ -178,6 +224,7 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde supplier_args.queue_ = msg_queue; supplier_args.order_string_ = send_order; + supplier_args.expected_count_ = ACE_OS::strlen (send_order); // allocate message blocks, fill in pointer array, set static information ACE_NEW_RETURN (supplier_args.array_, ACE_Message_Block * [array_size], -1); @@ -195,6 +242,7 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde consumer_args.queue_ = msg_queue; consumer_args.order_string_ = receipt_order; + consumer_args.expected_count_ = ACE_OS::strlen (receipt_order); consumer_args.array_ = 0; // Construct pending and late absolute deadline times. @@ -240,11 +288,11 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde } } - // run the producer - producer (&supplier_args); + // run the order test producer + order_producer (&supplier_args); - // run the consumer - consumer (&consumer_args); + // run the order test consumer + order_consumer (&consumer_args); // free all the allocated message blocks for (i = 0; i < array_size; ++i) @@ -259,40 +307,412 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde } +// The performance consumer starts a timer, dequeues all messages from the +// passed Message_Queue, stops the timer, and reports the number of +// dequeued messages, the elapsed time, and the average time per message. + +static void * +performance_consumer (void * args) +{ + ACE_High_Res_Timer timer; + + ACE_ASSERT (args != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; + u_int expected_count = ((ArgStruct *) args)->expected_count_; + + ACE_ASSERT (msg_queue != 0); + + u_int local_count = 0; + ACE_Message_Block *mb = 0; + + // reset, then start timer + timer.reset (); + timer.start (); + + // Keep looping, reading a message out of the queue, until + // the expected number of messages have been dequeued. + for (local_count = 0; local_count < expected_count; ++local_count) + { + if (msg_queue->dequeue_head (mb) == -1) + { + break; + } + + // ACE_ASSERT ('a' == *mb->rd_ptr ()); + } + + // stop timer, obtain and report its elapsed time + timer.stop (); + ACE_Time_Value tv; + timer.elapsed_time (tv); + ACE_DEBUG ((LM_INFO, "%6u, %6u, %f", + local_count, + tv.msec (), + (double) tv.msec () / local_count)); + + ACE_ASSERT (local_count == expected_count); + + return 0; +} + +// The performance producer starts a timer, enqueues the passed messages setting the +// read pointer of each message to the first character position in the passed string, +// stops the timer, and reports the number of enqueued messages, the elapsed time, +// and the average time per message. + +static void * +performance_producer (void *args) +{ + ACE_High_Res_Timer timer; + + ACE_ASSERT (args != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_; + ACE_Message_Block **block_array = ((ArgStruct *) args)->array_; + int expected_count = ((ArgStruct *) args)->expected_count_; + + ACE_ASSERT (send_order != 0); + ACE_ASSERT (block_array != 0); + + // reset, then start timer + timer.reset (); + timer.start (); + + // iterate through the message block array, setting the character under + // the current message block's read pointer to null before enqueueing + // the message block. + int local_count = 0; + for (local_count = 0; local_count < expected_count; ++local_count) + { + // point to the current message block + ACE_Message_Block *mb = block_array [local_count]; + ACE_ASSERT (mb != 0); + + // Set a character in the current message block at its + // read pointer position, and adjust the write pointer + *mb->rd_ptr () = 'a'; + mb->wr_ptr (1); + + // Enqueue the message block in priority order. + if (msg_queue->enqueue_prio (mb) == -1) + { + break; + } + } + + // stop timer, obtain and report its elapsed time + timer.stop (); + ACE_Time_Value tv; + timer.elapsed_time (tv); + ACE_DEBUG ((LM_INFO, "%6u, %6u, %f, ", + local_count, + tv.msec (), + (double) tv.msec () / local_count)); + + ACE_ASSERT (local_count == expected_count); + + return 0; +} + +int run_performance_test (u_int min_load, u_int max_load, u_int load_step, + Test_Type test_type) +{ + ArgStruct supplier_args, consumer_args; // supplier and consumer argument strings + u_int load = 0; // message load + ACE_Time_Value *time_offsets; // pointer to array of time offsets + ACE_Time_Value current_time; // current time value + u_int shuffle_index; // used to shuffle arrays + int random_int; // also used to shuffle arrays + ACE_Message_Block *temp_block; // temporary message block pointer + ACE_Time_Value temp_time; // temporary time value + + // build a static queue, a deadline based dynamic + // queue, and a laxity based dynamic queue + + ACE_Message_Queue<ACE_MT_SYNCH> *static_queue = 0; + static_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue (max_queue); + ACE_ASSERT (static_queue != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *deadline_queue = 0; + deadline_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_message_queue (max_queue); + ACE_ASSERT (deadline_queue != 0); + + ACE_Message_Queue<ACE_MT_SYNCH> *laxity_queue = 0; + laxity_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_message_queue (max_queue); + ACE_ASSERT (laxity_queue != 0); + + // zero out unused struct members + supplier_args.order_string_ = 0; + consumer_args.order_string_ = 0; + consumer_args.array_ = 0; + + // print column headings for the specific test type + switch (test_type) + { + case BEST: + + ACE_DEBUG ((LM_INFO, + "\n\nenqueued, best static time, best static avg, " + "dequeued, best static time, best static avg, " + "enqueued, best deadline time, best deadline avg, " + "dequeued, best deadline time, best deadline avg, " + "enqueued, best laxity time, best laxity avg, " + "dequeued, best laxity time, best laxity avg\n")); + break; + + case WORST: + + ACE_DEBUG ((LM_INFO, + "\n\nenqueued, worst static time, worst static avg, " + "dequeued, worst static time, worst static avg, " + "enqueued, worst deadline time, worst deadline avg, " + "dequeued, worst deadline time, worst deadline avg, " + "enqueued, worst laxity time, worst laxity avg, " + "dequeued, worst laxity time, worst laxity avg\n")); + + break; + + case RANDOM: + + ACE_DEBUG ((LM_INFO, + "\n\nenqueued, random static time, random static avg, " + "dequeued, random static time, random static avg, " + "enqueued, random deadline time, random deadline avg, " + "dequeued, random deadline time, random deadline avg, " + "enqueued, random laxity time, random laxity avg, " + "dequeued, random laxity time, random laxity avg\n")); + break; + + default: + + ACE_ERROR_RETURN ((LM_ERROR, "unknown test type %d", test_type), -1); + } + + // iterate through the message loads, and at + // each load do an identical test on all queues + for (load = min_load; load <= max_load; load += load_step) + { + u_int i; + + supplier_args.expected_count_ = load; + consumer_args.expected_count_ = load; + + // allocate message blocks, fill in pointer array, set static information + ACE_NEW_RETURN (supplier_args.array_, ACE_Message_Block * [load], -1); + + // allocate array of timing offsets + ACE_NEW_RETURN (time_offsets, ACE_Time_Value [load], -1); + + // fill in information for all types of tests + for (i = 0; i < load; ++i) + { + // construct a message new block off the heap, to hold a single character + ACE_NEW_RETURN (supplier_args.array_[i], ACE_Message_Block (1), -1); + + // assign every other message short or long execution time + supplier_args.array_[i]->msg_execution_time (((i % 2) ? slow_execution : fast_execution)); + } + + // fill in information for the specific type of test + switch (test_type) + { + case BEST: + + // fill in best case information + time_offsets [0] = far_past_offset; + supplier_args.array_[0]->msg_priority (load); + for (i = 1; i < load; ++i) + { + // assign static (minimal) message priority in descending order + supplier_args.array_[i]->msg_priority (load - i); + + // assign time to deadline in descending order + time_offsets [i] = time_offsets [i - 1] + offset_step; + } + + break; + + case WORST: + + // fill in worst case information + time_offsets [0] = near_future_offset; + supplier_args.array_[0]->msg_priority (0); + for (i = 1; i < load; ++i) + { + // assign static (minimal) message priority in ascending order + supplier_args.array_[i]->msg_priority (i); + + // assign time to deadline in descending order + // (puts dynamic priority in ascending order) + time_offsets [i] = time_offsets [i - 1] - offset_step; + } + + break; + + case RANDOM: + + // fill in worst case information + time_offsets [0] = near_future_offset; + supplier_args.array_[0]->msg_priority (0); + for (i = 1; i < load; ++i) + { + // assign static (minimal) message priority in ascending order + supplier_args.array_[i]->msg_priority (i); + + // assign time to deadline in descending order + // (puts dynamic priority in ascending order) + time_offsets [i] = time_offsets [i - 1] - offset_step; + } + + // then shuffle the arrays in tandem + for (i = 0; i < load; ++i) + { + // choose a (pseudo) random integer (evenly distributed over [0, load-1]) + if (RAND_MAX >= load) + { + // discard integers in the tail of the random range that + // do not distribute evenly modulo the number of messages + do + { + random_int = ACE_OS::rand (); + } while (random_int >= (int)(RAND_MAX - (RAND_MAX % load))); + } + else if (RAND_MAX < load - 1) + { + // this should only happen for a *very* large messages + // relative to the system's representation size + ACE_ERROR_RETURN ((LM_ERROR, "Insufficient range of random numbers"), -1); + } + + shuffle_index = random_int % load; + + // swap the message at the current index with the one at the shuffle index + temp_block = supplier_args.array_[i]; + supplier_args.array_[i] = supplier_args.array_[shuffle_index]; + supplier_args.array_[shuffle_index] = temp_block; + + // swap the time at the current index with the one at the shuffle index + temp_time = time_offsets [i]; + time_offsets [i] = time_offsets [shuffle_index]; + time_offsets [shuffle_index] = temp_time; + } + + break; + + default: + + ACE_ERROR_RETURN ((LM_ERROR, "unknown test type %d", test_type), -1); + } + + // Set absolute time of deadline associated with each message. + current_time = ACE_OS::gettimeofday (); + for (i = 0; i < load; ++i) + { + supplier_args.array_[i]->msg_deadline_time (time_offsets [i] + current_time); + } + + // run the performance test producer and consumer on the static queue + supplier_args.queue_ = static_queue; + performance_producer (&supplier_args); + consumer_args.queue_ = static_queue; + performance_consumer (&consumer_args); + + // add a comma delimiter for most recent outputs + ACE_DEBUG ((LM_INFO, ", ")); + + // run the performance test producer and consumer on the deadline queue + supplier_args.queue_ = deadline_queue; + performance_producer (&supplier_args); + consumer_args.queue_ = deadline_queue; + performance_consumer (&consumer_args); + + // add a comma delimiter for most recent outputs + ACE_DEBUG ((LM_INFO, ", ")); + + // run the performance test producer and consumer on the laxity queue + supplier_args.queue_ = laxity_queue; + performance_producer (&supplier_args); + consumer_args.queue_ = laxity_queue; + performance_consumer (&consumer_args); + + // move to the next line of output + ACE_DEBUG ((LM_INFO, "\n")); + + // free all the allocated message blocks + for (i = 0; i < load; ++i) + { + delete supplier_args.array_[i]; + } + + // free the allocated pointer array + delete [] supplier_args.array_; + + } + + // free resources and leave + delete static_queue; + delete deadline_queue; + delete laxity_queue; + return 0; +} + int main (int, ASYS_TCHAR *[]) { ACE_START_TEST (ASYS_TEXT ("Dynamic_Priority_Test")); - ACE_Message_Queue<ACE_MT_SYNCH> *test_queue; + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + if (ACE_OS::sched_params ( + ACE_Sched_Params ( + ACE_SCHED_FIFO, + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO), + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + ACE_DEBUG ((LM_MAX, "preempt: user is not superuser, " + "so remain in time-sharing class\n")); + else + ACE_ERROR_RETURN ((LM_ERROR, "%n: ACE_OS::sched_params failed\n%a"), + -1); + } + + + ACE_Message_Queue<ACE_MT_SYNCH> *test_queue = 0; // test factory, static message queue test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue (max_queue); ACE_ASSERT (test_queue != 0); - run_test (test_queue, send_order, static_receipt_order); + run_order_test (test_queue, send_order, static_receipt_order); delete test_queue; - // test factory, dynamic message queue (deadline strategy, no cleanup) + // test factory, dynamic message queue (deadline strategy) test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_message_queue (max_queue); ACE_ASSERT (test_queue != 0); - run_test (test_queue, send_order, deadline_receipt_order); + run_order_test (test_queue, send_order, deadline_receipt_order); delete test_queue; - // test factory, dynamic message queue (laxity strategy, no cleanup) + // test factory, dynamic message queue (laxity strategy) test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_message_queue (max_queue); ACE_ASSERT (test_queue != 0); - run_test (test_queue, send_order, laxity_receipt_order); + run_order_test (test_queue, send_order, laxity_receipt_order); delete test_queue; - // test factory (deadline strategy, with cleanup) - test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_cleanup_message_queue (max_queue); +#if defined (VXWORKS) + // test factory for VxWorks message queue + test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_Vx_message_queue (vx_max_queue, vx_msg_size); ACE_ASSERT (test_queue != 0); + // (TBD - does message receipt order test make any sense for Vx Queue ? + // If so, uncomment order test, or if not remove order test, below) + // run_order_test (test_queue, send_order, static_receipt_order); delete test_queue; +#endif /* VXWORKS */ - // test factory (laxity strategy, with cleanup) - test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_cleanup_message_queue (max_queue); - ACE_ASSERT (test_queue != 0); - delete test_queue; + // For each of an increasing number of message loads, run the same performance + // test (best case, worst case, and randomized, over each kind of queue + run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, BEST); + run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, WORST); + run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, RANDOM); ACE_END_TEST; return 0; |