summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-98b21
-rw-r--r--ace/Message_Block.cpp287
-rw-r--r--ace/Message_Block.h150
-rw-r--r--ace/Message_Block.i131
-rw-r--r--ace/Message_Queue_T.cpp1033
-rw-r--r--ace/Message_Queue_T.h187
-rw-r--r--tests/Dynamic_Priority_Test.cpp488
7 files changed, 1544 insertions, 753 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b
index 45ef3f5344b..7f5070d5896 100644
--- a/ChangeLog-98b
+++ b/ChangeLog-98b
@@ -1,3 +1,24 @@
+Mon Jul 13 16:35:50 1998 Chris Gill <cdgill@cs.wustl.edu>
+
+ * ace/Message_Block.{cpp, h, i}
+ ace/Message_Queue_T.{cpp, h}:
+
+ Removed automatic deletion of beyond messages, replaced this
+ with a remove_messages method to be called by an external
+ "reaper" if at all. Reorganized dynamic message queues
+ to remove sources of overhead, especially in checking
+ message status while refreshing the queue: added separate
+ head and tail pointers for pending, late, and beyond late
+ protions of queue: only move these pointers, not messages
+ (except at enqueue).
+
+ * tests/Dynamic_Priority_Test.cpp:
+
+ Added performance tests for static and dynamic queues
+ which do best case, worst case, and randomized ordering
+ of messages, presenting each ordering to all queues
+ and clocking enqueue and dequeue performance.
+
Mon Jul 13 11:11:56 1998 Nanbor Wang <nanbor@cs.wustl.edu>
* ace/Svc_Handler.h (ACE_SYNCH_USE>): Changed type of member
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index b8a9f2fd5da..de2cbdaf666 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -750,14 +750,15 @@ ACE_Message_Block::operator= (const ACE_Message_Block &)
ACE_Dynamic_Message_Strategy::ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
: static_bit_field_mask_ (static_bit_field_mask)
, static_bit_field_shift_ (static_bit_field_shift)
- , pending_threshold_ (pending_threshold)
, dynamic_priority_max_ (dynamic_priority_max)
, dynamic_priority_offset_ (dynamic_priority_offset)
+ , max_late_ (0, dynamic_priority_offset - 1)
+ , min_pending_ (0, dynamic_priority_offset)
+ , pending_shift_ (0, dynamic_priority_max)
{
}
// ctor
@@ -767,18 +768,36 @@ ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy ()
}
// dtor
-
-int
-ACE_Dynamic_Message_Strategy::drop_message (ACE_Message_Block * &mb)
+void
+ACE_Dynamic_Message_Strategy::dump (void) const
{
- ACE_UNUSED_ARG (mb);
+ ACE_TRACE ("ACE_Dynamic_Message_Strategy::dump");
- return 0;
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("static_bit_field_mask_ = %lu\n")
+ ASYS_TEXT ("static_bit_field_shift_ = %lu\n")
+ ASYS_TEXT ("dynamic_priority_max_ = %lu\n")
+ ASYS_TEXT ("dynamic_priority_offset_ = %lu\n")
+ ASYS_TEXT ("max_late_ = [%ld sec, %ld usec]\n")
+ ASYS_TEXT ("min_pending_ = [%ld sec, %ld usec]\n")
+ ASYS_TEXT ("pending_shift_ = [%ld sec, %ld usec]\n"),
+ this->static_bit_field_mask_,
+ this->static_bit_field_shift_,
+ this->dynamic_priority_max_,
+ this->dynamic_priority_offset_,
+ this->max_late_.sec (),
+ this->max_late_.usec (),
+ this->min_pending_.sec (),
+ this->min_pending_.usec (),
+ this->pending_shift_.sec (),
+ this->pending_shift_.usec ()));
+
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
- // cleanup policy for a message that is later than can be represented,
- // and is being dropped from a dynamic message queue (this is a default
- // method definition that does nothing, which derived classes may override
- // to do things like deleting the message block object, etc).
+ // Dump the state of the strategy.
+
/////////////////////////////////////////
@@ -787,12 +806,10 @@ ACE_Dynamic_Message_Strategy::drop_message (ACE_Message_Block * &mb)
ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
: ACE_Dynamic_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset)
{
@@ -804,122 +821,23 @@ ACE_Deadline_Message_Strategy::~ACE_Deadline_Message_Strategy ()
}
// dtor
-int
-ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb,
- const ACE_Time_Value & tv)
-{
- // The general formula for this deadline based dynamic priority
- // function is to just subtract the current time and the execution
- // time from the from the message deadline to get the time to deadline,
- // then subtract the time to deadline from a constant C that depends on
- // whether the time to deadline is negative (C is zero) or non-negative
- // (C is the maximum allowed priority). But, to save operations for
- // performance we use an optimized (albeit confusing: our apologies ;-)
- // formula for the dynamic priority calculation.
-
- // first, compute the *negative* (additive inverse) of the time to deadline
- ACE_Time_Value priority (tv);
- priority -= mb.msg_deadline_time ();
-
- if (priority >= ACE_Time_Value::zero)
- {
- // if negative time to deadline is positive then the message is late:
- // need to make sure the priority stays below the threshold
- // between pending and late priority values
- ACE_Time_Value
- max_late (0, dynamic_priority_offset_ - 1);
-
- if (priority > max_late)
- {
- // if the message is later than can be represented, its priority is 0.
- mb.msg_priority (0);
- return 0;
- }
- }
- else
- {
- // if negative time to deadline is negative then the message is pending:
- // so, we need to shift priority upward by adding the maximum priority
- // value and then make sure the value stays above the threshold between
- // pending and late message priorities.
- priority +=
- ACE_Time_Value (0, dynamic_priority_max_);
-
- ACE_Time_Value
- min_pending (0, dynamic_priority_offset_);
-
- if (priority < min_pending)
- {
- priority = min_pending;
- }
- }
-
- // use (fast) bitwise operators to isolate and replace
- // the dynamic portion of the message's priority
- mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
- ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
- static_bit_field_shift_));
-
- return 0;
-}
- // priority evaluation function based on time to deadline
-int
-ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv)
+void
+ACE_Deadline_Message_Strategy::dump (void) const
{
- // first, compute the *negative* time to deadline
- ACE_Time_Value priority (tv);
- priority -= mb.msg_deadline_time ();
+ ACE_TRACE ("ACE_Deadline_Message_Strategy::dump");
- // construct a time value with the maximum late value that
- // can be represented in the dynamic priority range
- ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
-
- // if negative time to deadline is greater than the maximum value
- // that can be represented, it is identified as being beyond late
- return (priority > max_late) ? 1 : 0;
-}
- // returns true if the message is later than can can be represented
-
-/////////////////////////////////////////////////
-// class ACE_Deadline_Cleanup_Message_Strategy //
-/////////////////////////////////////////////////
-
-ACE_Deadline_Cleanup_Message_Strategy:: ACE_Deadline_Cleanup_Message_Strategy (
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-
- : ACE_Deadline_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset)
-{
-}
-// ctor
-
-ACE_Deadline_Cleanup_Message_Strategy::~ACE_Deadline_Cleanup_Message_Strategy ()
-{
-}
-// dtor
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
-int
-ACE_Deadline_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb)
-{
- // free the memory
- delete mb;
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Dynamic_Message_Strategy base class: \n")));
+ this->ACE_Dynamic_Message_Strategy::dump ();
- // zero passed pointer to let caller know it's gone
- mb = 0;
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nderived class: ACE_Deadline_Message_Strategy\n")));
- return 0;
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
- // deletion cleanup policy for a message that is later than can be
- // represented, and is being dropped from a dynamic message queue
+ // Dump the state of the strategy.
+
///////////////////////////////////////
// class ACE_Laxity_Message_Strategy //
@@ -927,12 +845,10 @@ ACE_Deadline_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb)
ACE_Laxity_Message_Strategy::ACE_Laxity_Message_Strategy (u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
: ACE_Dynamic_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset)
{
@@ -945,126 +861,21 @@ ACE_Laxity_Message_Strategy::~ACE_Laxity_Message_Strategy ()
// dtor
-int
-ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb,
- const ACE_Time_Value & tv)
+void
+ACE_Laxity_Message_Strategy::dump (void) const
{
- // The general formula for this laxity based dynamic priority
- // function is to just subtract the current time and the execution
- // time from the from the message deadline to get the laxity,
- // then subtract the laxity from a constant C that depends on whether
- // the laxity is negative (C is zero) or non-negative (C is the maximum
- // allowed priority). But, to save operations for performance we use
- // an optimized (albeit confusing: our apologies ;-) formula
- // for the dynamic priority calculation.
-
- // first, compute the *negative* laxity
- ACE_Time_Value priority (tv);
- priority += mb.msg_execution_time ();
- priority -= mb.msg_deadline_time ();
-
- if (priority >= ACE_Time_Value::zero)
- {
- // if negative laxity is positive then the message is late:
- // need to make sure the priority stays below the threshold
- // between pending and late priority values
- ACE_Time_Value
- max_late (0, dynamic_priority_offset_ - 1);
-
- if (priority > max_late)
- {
- // if the message is later than can be represented, its priority is 0.
- mb.msg_priority (0);
- return 0;
- }
- }
- else
- {
- // if negative laxity is negative then the message is pending: so, we
- // need to shift priority upward by adding the maximum priority value
- // and then make sure the value stays above the threshold between
- // pending and late message priorities.
- priority +=
- ACE_Time_Value (0, dynamic_priority_max_);
-
- ACE_Time_Value
- min_pending (0, dynamic_priority_offset_);
-
- if (priority < min_pending)
- {
- priority = min_pending;
- }
- }
-
- // use (fast) bitwise operators to isolate and replace
- // the dynamic portion of the message's priority
- mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
- ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
- static_bit_field_shift_));
+ ACE_TRACE ("ACE_Laxity_Message_Strategy::dump");
- return 0;
-}
- // priority evaluation function based on laxity
-
-int
-ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv)
-{
- // first, compute the *negative* laxity
- ACE_Time_Value priority (tv);
- priority += mb.msg_execution_time ();
- priority -= mb.msg_deadline_time ();
-
- // construct a time value with the maximum late value that
- // can be represented in the dynamic priority range
- ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
-
- // if negative laxity is greater than the maximum value that
- // can be represented, it is identified as being beyond late
- return (priority > max_late) ? 1 : 0;
-}
- // returns true if the message is later than can can be represented
-
-/////////////////////////////////////////////////
-// class ACE_Laxity_Cleanup_Message_Strategy //
-/////////////////////////////////////////////////
-
-ACE_Laxity_Cleanup_Message_Strategy:: ACE_Laxity_Cleanup_Message_Strategy (
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-
- : ACE_Laxity_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset)
-{
-}
-// ctor
-
-ACE_Laxity_Cleanup_Message_Strategy::~ACE_Laxity_Cleanup_Message_Strategy ()
-{
-}
-// dtor
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
-int
-ACE_Laxity_Cleanup_Message_Strategy::drop_message (ACE_Message_Block * &mb)
-{
- // free the memory
- delete mb;
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Dynamic_Message_Strategy base class: \n")));
+ this->ACE_Dynamic_Message_Strategy::dump ();
- // zero passed pointer to let caller know it's gone
- mb = 0;
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("\nderived class: ACE_Laxity_Message_Strategy\n")));
- return 0;
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
}
- // deletion cleanup policy for a message that is later than can be
- // represented, and is being dropped from a dynamic message queue
-
-
+ // Dump the state of the strategy.
diff --git a/ace/Message_Block.h b/ace/Message_Block.h
index 6a20e85f332..26fd9dbaaa7 100644
--- a/ace/Message_Block.h
+++ b/ace/Message_Block.h
@@ -539,9 +539,18 @@ class ACE_Export ACE_Dynamic_Message_Strategy
// class memebers before using the static member functions.
public:
+ enum Priority_Status
+ {
+ PENDING = 0x01, // message can still make its deadline
+ LATE = 0x02, // message cannot make its deadline
+ BEYOND_LATE = 0x04, // message is so late its priority is undefined
+ ANY_STATUS = 0x07 // mask to match any priority status
+ };
+ // message priority status: values are defined as bit flags
+ // so that status combinations may be specified easily.
+
ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset);
// ctor
@@ -549,25 +558,9 @@ public:
virtual ~ACE_Dynamic_Message_Strategy ();
// virtual dtor
- virtual int update_priority (ACE_Message_Block & mb,
- const ACE_Time_Value & tv) = 0;
- // abstract dynamic priority evaluation function:
- // updates the synamic priority bit field but does not
- // alter the static priority bit field
-
- int is_pending (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv);
- // returns true if the message has a pending (not late) priority value
-
- virtual int is_beyond_late (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv) = 0;
- // returns true if the message is later than can can be represented
-
- virtual int drop_message (ACE_Message_Block * &mb);
- // cleanup policy for a message that is later than can be represented,
- // and is being dropped from a dynamic message queue (this is a default
- // method definition that does nothing, which derived classes may override
- // to do things like deleting the message block object, etc).
+ Priority_Status priority_status (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // updates the message's priority and returns its priority status
u_long static_bit_field_mask (void);
// get static bit field mask
@@ -581,12 +574,6 @@ public:
void static_bit_field_shift (u_long);
// set left shift value to make room for static bit field
- u_long pending_threshold (void);
- // get pending threshold priority value
-
- void pending_threshold (u_long);
- // set pending threshold priority value
-
u_long dynamic_priority_max (void);
// get maximum supported priority value
@@ -594,13 +581,20 @@ public:
// set maximum supported priority value
u_long dynamic_priority_offset (void);
- // get axis shift to map signed range into unsigned range
+ // get offset to boundary between signed range and unsigned range
void dynamic_priority_offset (u_long);
- // set axis shift to map signed range into unsigned range
+ // set offset to boundary between signed range and unsigned range
+
+ virtual void dump (void) const;
+ // Dump the state of the strategy.
protected:
+ virtual void convert_priority (ACE_Time_Value & priority,
+ const ACE_Message_Block & mb) = 0;
+ // hook method for dynamic priority conversion
+
u_long static_bit_field_mask_;
// this is a bit mask with all ones in the static bit field
@@ -609,60 +603,41 @@ protected:
// field: this value should be the logarithm base 2 of
// (static_bit_field_mask_ + 1)
- u_long pending_threshold_;
- // threshold priority value below which a message is considered late
-
u_long dynamic_priority_max_;
// maximum supported priority value
u_long dynamic_priority_offset_;
- // axis shift added to all values, in order to map signed
- // range into unsigned range (priority is an unsigned value).
+ // offset to boundary between signed range and unsigned range
+
+ ACE_Time_Value max_late_;
+ // maximum late time value that can be represented
+
+ ACE_Time_Value min_pending_;
+ // minimum pending time value that can be represented
+
+ ACE_Time_Value pending_shift_;
+ // time value by which to shift pending priority
};
class ACE_Export ACE_Deadline_Message_Strategy : public ACE_Dynamic_Message_Strategy
{
public:
- ACE_Deadline_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ ACE_Deadline_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
// ctor, with all arguments defaulted
virtual ~ACE_Deadline_Message_Strategy ();
// virtual dtor
- virtual int update_priority (ACE_Message_Block & mb,
- const ACE_Time_Value & tv);
- // dynamic priority evaluation function based on time to
- // deadline: updates the synamic priority bit field but
- // does not alter the static priority bit field
+ virtual void convert_priority (ACE_Time_Value & priority,
+ const ACE_Message_Block & mb);
+ // dynamic priority conversion function based on time to deadline
- int is_beyond_late (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv);
- // returns true if the message is later than can can be represented
-};
-
-
-class ACE_Export ACE_Deadline_Cleanup_Message_Strategy : public ACE_Deadline_Message_Strategy
-{
-public:
-
- ACE_Deadline_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // ctor, with all arguments defaulted
-
- virtual ~ACE_Deadline_Cleanup_Message_Strategy ();
- // virtual dtor
-
- virtual int drop_message (ACE_Message_Block * &mb);
- // deletion cleanup policy for a message that is later than can be
- // represented, and is being dropped from a dynamic message queue
+ virtual void dump (void) const;
+ // Dump the state of the strategy.
};
@@ -670,49 +645,26 @@ class ACE_Export ACE_Laxity_Message_Strategy : public ACE_Dynamic_Message_Strate
{
public:
- ACE_Laxity_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ ACE_Laxity_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
// ctor, with all arguments defaulted
virtual ~ACE_Laxity_Message_Strategy ();
// virtual dtor
- virtual int update_priority (ACE_Message_Block & mb,
- const ACE_Time_Value & tv);
- // dynamic priority evaluation function based on laxity
- // (time to deadline minus execution time): updates the
- // dynamic priority bit field but does not alter the
- // static priority bit field
-
- int is_beyond_late (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv);
- // returns true if the message is later than can can be represented
-};
-
-
-class ACE_Export ACE_Laxity_Cleanup_Message_Strategy : public ACE_Laxity_Message_Strategy
-{
-public:
-
- ACE_Laxity_Cleanup_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // ctor, with all arguments defaulted
- virtual ~ACE_Laxity_Cleanup_Message_Strategy ();
- // virtual dtor
+ virtual void convert_priority (ACE_Time_Value & priority,
+ const ACE_Message_Block & mb);
+ // dynamic priority conversion function based on laxity
- virtual int drop_message (ACE_Message_Block * &mb);
- // deletion cleanup policy for a message that is later than can be
- // represented, and is being dropped from a dynamic message queue
+ virtual void dump (void) const;
+ // Dump the state of the strategy.
};
+
#if defined (__ACE_INLINE__)
#include "ace/Message_Block.i"
#endif /* __ACE_INLINE__ */
diff --git a/ace/Message_Block.i b/ace/Message_Block.i
index a4aae16d557..4f58c137176 100644
--- a/ace/Message_Block.i
+++ b/ace/Message_Block.i
@@ -365,15 +365,9 @@ ACE_Message_Block::locking_strategy (ACE_Lock *nls)
}
-ACE_INLINE int
-ACE_Dynamic_Message_Strategy::is_pending (const ACE_Message_Block & mb,
- const ACE_Time_Value & tv)
-{
- return ((mb.msg_priority () < pending_threshold_) ||
- this->is_beyond_late (mb, tv))
- ? 0 : 1;
-}
- // returns true if the message has a pending (not late) priority value
+////////////////////////////////////////
+// class ACE_Dynamic_Message_Strategy //
+////////////////////////////////////////
ACE_INLINE u_long
ACE_Dynamic_Message_Strategy::static_bit_field_mask (void)
@@ -404,20 +398,6 @@ ACE_Dynamic_Message_Strategy::static_bit_field_shift (u_long ul)
// set left shift value to make room for static bit field
ACE_INLINE u_long
-ACE_Dynamic_Message_Strategy::pending_threshold (void)
-{
- return pending_threshold_;
-}
- // get pending threshold priority value
-
-ACE_INLINE void
-ACE_Dynamic_Message_Strategy::pending_threshold (u_long ul)
-{
- pending_threshold_ = ul;
-}
- // set pending threshold priority value
-
-ACE_INLINE u_long
ACE_Dynamic_Message_Strategy::dynamic_priority_max (void)
{
return dynamic_priority_max_;
@@ -427,7 +407,12 @@ ACE_Dynamic_Message_Strategy::dynamic_priority_max (void)
ACE_INLINE void
ACE_Dynamic_Message_Strategy::dynamic_priority_max (u_long ul)
{
+ // pending_shift_ depends on dynamic_priority_max_: for performance
+ // reasons, the value in pending_shift_ is (re)calculated only when
+ // dynamic_priority_max_ is initialized or changes, and is stored
+ // as a class member rather than being a derived value.
dynamic_priority_max_ = ul;
+ pending_shift_ = ACE_Time_Value (0, ul);
}
// set maximum supported priority value
@@ -436,14 +421,108 @@ ACE_Dynamic_Message_Strategy::dynamic_priority_offset (void)
{
return dynamic_priority_offset_;
}
- // get axis shift to map signed range into unsigned range
+ // get offset for boundary between signed range and unsigned range
ACE_INLINE void
ACE_Dynamic_Message_Strategy::dynamic_priority_offset (u_long ul)
{
- dynamic_priority_offset_ = ul;
+
+
+ // max_late_ and min_pending_ depend on dynamic_priority_offset_: for
+ // performance reasons, the values in max_late_ and min_pending_ are
+ // (re)calculated only when dynamic_priority_offset_ is initialized
+ // or changes, and are stored as a class member rather than being
+ // derived each time one of their values is needed.
+ dynamic_priority_offset_ = ul;
+ max_late_ = ACE_Time_Value (0, ul - 1);
+ min_pending_ = ACE_Time_Value (0, ul);
+}
+ // set offset for boundary between signed range and unsigned range
+
+
+ACE_INLINE ACE_Dynamic_Message_Strategy::Priority_Status
+ACE_Dynamic_Message_Strategy::priority_status (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // default the message to have pending priority status
+ Priority_Status status = ACE_Dynamic_Message_Strategy::PENDING;
+
+ // start with the passed absolute time as the message's priority, then
+ // call the polymorphic hook method to (at least partially) convert
+ // the absolute time and message attributes into the message's priority
+ ACE_Time_Value priority (tv);
+ convert_priority (priority, mb);
+
+ // if the priority is negative, the message is pending
+ if (priority < ACE_Time_Value::zero)
+ {
+ // priority for pending messages must be shifted
+ // upward above the late priority range
+ priority += pending_shift_;
+ if (priority < min_pending_)
+ {
+ priority = min_pending_;
+ }
+ }
+ // otherwise, if the priority is greater than the maximum late
+ // priority value that can be represented, it is beyond late
+ else if (priority > max_late_)
+ {
+ // all messages that are beyond late are assigned lowest priority (zero)
+ mb.msg_priority (0);
+ return ACE_Dynamic_Message_Strategy::BEYOND_LATE;
+ }
+ // otherwise, the message is late, but its priority is correct
+ else
+ {
+ status = ACE_Dynamic_Message_Strategy::LATE;
+ }
+
+ // use (fast) bitwise operators to isolate and replace
+ // the dynamic portion of the message's priority
+ mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
+ ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
+ static_bit_field_shift_));
+
+ return status;
}
- // set axis shift to map signed range into unsigned range
+ // returns the priority status of the message
+
+
+
+/////////////////////////////////////////
+// class ACE_Deadline_Message_Strategy //
+/////////////////////////////////////////
+
+ACE_INLINE void
+ACE_Deadline_Message_Strategy::convert_priority (ACE_Time_Value & priority,
+ const ACE_Message_Block & mb)
+{
+ // Convert absolute time passed in tv to negative time
+ // to deadline of mb with respect to that absolute time.
+ priority -= mb.msg_deadline_time ();
+}
+ // dynamic priority conversion function based on time to deadline
+
+
+///////////////////////////////////////
+// class ACE_Laxity_Message_Strategy //
+///////////////////////////////////////
+
+ACE_INLINE void
+ACE_Laxity_Message_Strategy::convert_priority (ACE_Time_Value & priority,
+ const ACE_Message_Block & mb)
+{
+ // Convert absolute time passed in tv to negative
+ // laxity of mb with respect to that absolute time.
+ priority += mb.msg_execution_time ();
+ priority -= mb.msg_deadline_time ();
+}
+ // dynamic priority conversion function based on laxity
+
+
+
+
diff --git a/ace/Message_Queue_T.cpp b/ace/Message_Queue_T.cpp
index 0e952d50b2b..3e30afa25f1 100644
--- a/ace/Message_Queue_T.cpp
+++ b/ace/Message_Queue_T.cpp
@@ -400,34 +400,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
// We start looking from the highest priority to the lowest
// priority.
- for (temp = this->head_;
+ for (temp = this->tail_;
temp != 0;
- temp = temp->next ())
- if (temp->msg_priority () < new_item->msg_priority ())
- // Break out when we've located an item that has lower
- // priority that <new_item>.
+ temp = temp->prev ())
+ if (temp->msg_priority () >= new_item->msg_priority ())
+ // Break out when we've located an item that has
+ // greater or equal priority.
break;
if (temp == 0)
- // Check for simple case of inserting at the tail of the queue,
- // where all we need to do is insert <new_item> after the
- // current tail.
- return this->enqueue_tail_i (new_item);
- else if (temp->prev () == 0)
- // Check for simple case of inserting at the head of the
- // queue, where all we need to do is insert <new_item> before
- // the current head.
+ // Check for simple case of inserting at the head of the queue,
+ // where all we need to do is insert <new_item> before the
+ // current head.
return this->enqueue_head_i (new_item);
+ else if (temp->next () == 0)
+ // Check for simple case of inserting at the tail of the
+ // queue, where all we need to do is insert <new_item> after
+ // the current tail.
+ return this->enqueue_tail_i (new_item);
else
{
- // Insert the new message ahead of the item of
- // lesser priority. This ensures that FIFO order is
+ // Insert the new message behind the message of
+ // greater or equal priority. This ensures that FIFO order is
// maintained when messages of the same priority are
// inserted consecutively.
- new_item->next (temp);
- new_item->prev (temp->prev ());
- temp->prev ()->next (new_item);
- temp->prev (new_item);
+ new_item->prev (temp);
+ new_item->next (temp->next ());
+ temp->next ()->prev (new_item);
+ temp->next (new_item);
}
}
@@ -453,6 +453,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
{
+ if (this->head_ ==0)
+ {
+ ACE_ERROR_RETURN((LM_ERROR, ASYS_TEXT ("Attempting to dequeue from empty queue")), -1);
+ }
+
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
first_item = this->head_;
this->head_ = this->head_->next ();
@@ -483,7 +488,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -496,7 +501,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i
// Wait for at least one item to become available.
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
return -1;
first_item = this->head_;
@@ -505,7 +510,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (ACE_Message_Block *&first_i
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
int result = 0;
#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
@@ -514,10 +519,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
++this->enqueue_waiters_;
// @@ Need to add sanity checks for failure...
mon.release ();
- if (tv == 0)
+ if (timeout == 0)
result = this->not_full_cond_.acquire ();
else
- result = this->not_full_cond_.acquire (*tv);
+ result = this->not_full_cond_.acquire (*timeout);
int error = errno;
mon.acquire ();
errno = error;
@@ -529,7 +534,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
while (this->is_full_i ())
{
- if (this->not_full_cond_.wait (tv) == -1)
+ if (this->not_full_cond_.wait (timeout) == -1)
{
if (errno == ETIME)
errno = EWOULDBLOCK;
@@ -549,7 +554,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
int result = 0;
#if defined (ACE_HAS_OPTIMIZED_MESSAGE_QUEUE)
@@ -558,11 +563,11 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
++this->dequeue_waiters_;
// @@ Need to add sanity checks for failure...
mon.release ();
- if (tv == 0)
+ if (timeout == 0)
result = this->not_empty_cond_.acquire ();
else
{
- result = this->not_empty_cond_.acquire (*tv);
+ result = this->not_empty_cond_.acquire (*timeout);
if (result == -1 && errno == ETIME)
errno = EWOULDBLOCK;
}
@@ -577,7 +582,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
while (this->is_empty_i ())
{
- if (this->not_empty_cond_.wait (tv) == -1)
+ if (this->not_empty_cond_.wait (timeout) == -1)
{
if (errno == ETIME)
errno = EWOULDBLOCK;
@@ -600,7 +605,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -611,7 +616,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_head_i (new_item);
@@ -631,7 +636,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_head (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -642,7 +647,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_i (new_item);
@@ -658,10 +663,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_prio (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue");
- return this->enqueue_prio (new_item, tv);
+ return this->enqueue_prio (new_item, timeout);
}
// Block indefinitely waiting for an item to arrive,
@@ -669,7 +674,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue (ACE_Message_Block *new_item,
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -680,7 +685,7 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
return -1;
}
- if (this->wait_not_full_cond (ace_mon, tv) == -1)
+ if (this->wait_not_full_cond (ace_mon, timeout) == -1)
return -1;
int queue_count = this->enqueue_tail_i (new_item);
@@ -694,13 +699,13 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_tail (ACE_Message_Block *new_item,
}
}
-// Remove an item from the front of the queue. If TV == 0 block
+// Remove an item from the front of the queue. If timeout == 0 block
// indefinitely (or until an alert occurs). Otherwise, block for upto
-// the amount of time specified by TV.
+// the amount of time specified by timeout.
template <ACE_SYNCH_DECL> int
ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX_T, ace_mon, this->lock_, -1);
@@ -711,8 +716,10 @@ ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
return -1;
}
- if (this->wait_not_empty_cond (ace_mon, tv) == -1)
+ if (this->wait_not_empty_cond (ace_mon, timeout) == -1)
+ {
return -1;
+ }
return this->dequeue_head_i (first_item);
}
@@ -742,6 +749,12 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
size_t lwm,
ACE_Notification_Strategy *ns)
: ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns)
+ , pending_head_ (0)
+ , pending_tail_ (0)
+ , late_head_ (0)
+ , late_tail_ (0)
+ , beyond_late_head_ (0)
+ , beyond_late_tail_ (0)
, message_strategy_ (message_strategy)
{
// note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
@@ -751,55 +764,157 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
template <ACE_SYNCH_DECL>
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
{
- delete &message_strategy_;
+ delete &(this->message_strategy_);
}
// dtor: free message strategy and let base class dtor do the rest
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_messages (
+ ACE_Message_Block *&list_head,
+ ACE_Message_Block *&list_tail,
+ u_int status_flags)
{
- ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+ int result = 0;
- int result;
+ // start with an empty list
+ list_head = 0;
+ list_tail = 0;
- // get the current time
- ACE_Time_Value current_time = ACE_OS::gettimeofday ();
- // refresh dynamic priority of the new message
- result = message_strategy_.update_priority (*new_item, current_time);
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh priority status boundaries in the queue
+ result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
- if (result < 0)
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::PENDING) &&
+ (this->pending_head_) && (this->pending_tail_))
{
- return result;
+ // patch up pointers for the new tail of the queue
+ if (this->pending_head_->prev ())
+ {
+ this->tail_ = this->pending_head_->prev ();
+ this->pending_head_->prev ()->next (0);
+ }
+ else
+ {
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
+ }
+
+ // point to the head and tail of the list
+ list_head = this->pending_head_;
+ list_tail = this->pending_tail_;
+
+ // cut the pending messages out of the queue entirely
+ this->pending_head_->prev (0);
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
}
- // reorganize the queue according to the new priorities
- result = this->refresh_queue (current_time);
- if (result < 0)
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::LATE) &&
+ (this->late_head_) && (this->late_tail_))
{
- return result;
+ // patch up pointers for the (possibly) new head and tail of the queue
+ if (this->late_tail_->next ())
+ {
+ this->late_tail_->next ()->prev (this->late_head_->prev ());
+ }
+ else
+ {
+ this->tail_ = this->late_head_->prev ();
+ }
+ if (this->late_head_->prev ())
+ {
+ this->late_head_->prev ()->next (this->late_tail_->next ());
+ }
+ else
+ {
+ this->head_ = this->late_tail_->next ();
+ }
+
+ // put late messages behind pending messages (if any) being returned
+ this->late_head_->prev (list_tail);
+ if (list_tail)
+ {
+ list_tail->next (this->late_head_);
+ }
+ else
+ {
+ list_head = this->late_head_;
+ }
+ list_tail = this->late_tail_;
+
+ this->late_tail_->next (0);
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
}
- // invoke the base class method
- result = ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (new_item);
+ if ((status_flags & (u_int) ACE_Dynamic_Message_Strategy::BEYOND_LATE) &&
+ (this->beyond_late_head_) && (this->beyond_late_tail_))
+ {
+ // patch up pointers for the new tail of the queue
+ if (this->beyond_late_tail_->next ())
+ {
+ this->head_ = this->beyond_late_tail_->next ();
+ this->beyond_late_tail_->next ()->prev (0);
+ }
+ else
+ {
+ // the list has become empty
+ this->head_ = 0;
+ this->tail_ = 0;
+ }
+
+ // put beyond late messages at the end of the list being returned
+ if (list_tail)
+ {
+ this->beyond_late_head_->prev (list_tail);
+ list_tail->next (this->beyond_late_head_);
+ }
+ else
+ {
+ list_head = this->beyond_late_head_;
+ }
+ list_tail = this->beyond_late_tail_;
+
+ this->beyond_late_tail_->next (0);
+ this->beyond_late_head_ = 0;
+ this->beyond_late_tail_ = 0;
+ }
+
+ // decrement message and size counts for removed messages
+ ACE_Message_Block *temp1, *temp2;
+ for (temp1 = list_head; temp1 != 0; temp1 = temp1->next ())
+ {
+ this->cur_count_--;
+
+ for (temp2 = temp1; temp2 != 0; temp2 = temp2->cont ())
+ {
+ this->cur_bytes_ -= temp2->size ();
+ }
+ }
return result;
}
- // Enqueue an <ACE_Message_Block *> in accordance with its priority.
- // priority may be *dynamic* or *static* or a combination or *both*
- // It calls the priority evaluation function passed into the Dynamic
- // Message Queue constructor to update the priorities of all enqueued
- // messages.
+ // Detach all messages with status given in the passed flags from
+ // the queue and return them by setting passed head and tail pointers
+ // to the linked list they comprise. This method is intended primarily
+ // as a means of periodically harvesting messages that have missed
+ // their deadlines, but is available in its most general form. All
+ // messages are returned in priority order, from head to tail, as of
+ // the time this method was called.
+
+
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head");
@@ -816,269 +931,636 @@ ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head (ACE_Message_Block *&firs
// get the current time
ACE_Time_Value current_time = ACE_OS::gettimeofday ();
- // refresh dynamic priorities of messages in the queue
- result = this->refresh_priorities (current_time);
+ // refresh priority status boundaries in the queue
+ result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // reorganize the queue according to the new priorities,
- // possibly dropping messages which are later than can
- // be represented by the range of priority values
+ // *now* it's appropriate to wait for an enqueued item
+ result = this->wait_not_empty_cond (ace_mon, timeout);
+ if (result == -1)
+ {
+ return result;
+ }
+
+ // call the internal dequeue method, which selects an
+ // item from the highest priority status portion of
+ // the queue that has messages enqueued.
+ result = dequeue_head_i (first_item);
+
+ return result;
+}
+ // Dequeue and return the <ACE_Message_Block *>
+ // at the (logical) head of the queue.
+
+template <ACE_SYNCH_DECL> void
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump (void) const
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dump");
+ ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
+
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("ACE_Message_Queue<ACE_SYNCH_USE> (base class): \n")));
+ this->ACE_Message_Queue<ACE_SYNCH_USE>::dump ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ASYS_TEXT ("pending_head_ = %u\n")
+ ASYS_TEXT ("pending_tail_ = %u\n")
+ ASYS_TEXT ("late_head_ = %u\n")
+ ASYS_TEXT ("late_tail_ = %u\n")
+ ASYS_TEXT ("beyond_late_head_ = %u\n")
+ ASYS_TEXT ("beyond_late_tail_ = %u\n"),
+ this->pending_head_,
+ this->pending_tail_,
+ this->late_head_,
+ this->late_tail_,
+ this->beyond_late_head_,
+ this->beyond_late_tail_));
+
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("message_strategy_ : \n")));
+ message_strategy_.dump ();
+
+ ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
+}
+ // dump the state of the queue
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ int result = 0;
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh priority status boundaries in the queue
result = this->refresh_queue (current_time);
if (result < 0)
{
return result;
}
- // *now* it's appropriate to wait for an enqueued item
- result = this->wait_not_empty_cond (ace_mon, tv);
- if (result == -1)
+ // where we enqueue depends on the message's priority status
+ switch (message_strategy_.priority_status (*new_item, current_time))
{
- return result;
+ case ACE_Dynamic_Message_Strategy::PENDING:
+ if (this->pending_tail_ == 0)
+ {
+ // Check for simple case of an empty pending queue, where all we need to
+ // do is insert <new_item> into the tail of the queue.
+ pending_head_ = new_item;
+ pending_tail_ = pending_head_;
+ result = this->enqueue_tail_i (new_item);
+ }
+ else
+ {
+ // enqueue the new message in priority order in the pending sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->pending_head_,
+ this->pending_tail_,
+ ACE_Dynamic_Message_Strategy::PENDING);
+ }
+
+ break;
+
+ case ACE_Dynamic_Message_Strategy::LATE:
+ if (this->late_tail_ == 0)
+ {
+ late_head_ = new_item;
+ late_tail_ = late_head_;
+
+ if (this->pending_head_ == 0)
+ {
+ // Check for simple case of an empty pending queue, where all
+ // we need to do is insert <new_item> into the tail of the queue.
+ result = this->enqueue_tail_i (new_item);
+ }
+ else if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ result = this->enqueue_head_i (new_item);
+ }
+ else
+ {
+ // otherwise, we can just splice the new message in between
+ // the pending and beyond late portions of the queue
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->pending_head_->prev (new_item);
+ new_item->next (this->pending_head_);
+ }
+ }
+ else
+ {
+ // enqueue the new message in priority order in the late sublist
+ result = sublist_enqueue_i (new_item,
+ current_time,
+ this->late_head_,
+ this->late_tail_,
+ ACE_Dynamic_Message_Strategy::LATE);
+ }
+ break;
+
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
+ if (this->beyond_late_tail_ == 0)
+ {
+ // Check for simple case of an empty beyond late queue, where all
+ // we need to do is insert <new_item> into the head of the queue.
+ beyond_late_head_ = new_item;
+ beyond_late_tail_ = beyond_late_head_;
+ result = this->enqueue_head_i (new_item);
+ }
+ else
+ {
+ // all beyond late messages have the same (zero) priority, so
+ // just put the new one at the end of the beyond late messages
+ if (this->beyond_late_tail_->next ())
+ {
+ this->beyond_late_tail_->next ()->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ }
+
+ new_item->next (this->beyond_late_tail_->next ());
+ this->beyond_late_tail_->next (new_item);
+ new_item->prev (this->beyond_late_tail_);
+ this->beyond_late_tail_ = new_item;
+ }
+
+ break;
+
+ // should never get here, but just in case...
+ default:
+ result = -1;
+ break;
}
- // invoke the internal virtual method
- return this->dequeue_head_i (first_item);
+ return result;
}
- // Dequeue and return the <ACE_Message_Block *> at the head of the
- // queue.
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
-template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::sublist_enqueue_i (ACE_Message_Block *new_item,
+ const ACE_Time_Value &current_time,
+ ACE_Message_Block *&sublist_head,
+ ACE_Message_Block *&sublist_tail,
+ ACE_Dynamic_Message_Strategy::Priority_Status status)
{
int result = 0;
+ ACE_Message_Block *current_item = 0;
- // apply the priority update function to all enqueued
- // messages, starting at the head of the queue
- ACE_Message_Block *temp = ACE_Message_Queue<ACE_SYNCH_USE>::head_;
- while (temp)
+ // find message after which to enqueue new item,
+ // based on message priority and priority status
+ for (current_item = sublist_tail;
+ current_item;
+ current_item = current_item->prev ())
{
- result = message_strategy_.update_priority (*temp, tv);
- if (result < 0)
+ if (message_strategy_.priority_status (*current_item, current_time) == status)
+ {
+ if (current_item->msg_priority () >= new_item->msg_priority ())
+ {
+ break;
+ }
+ }
+ else
{
+ sublist_head = new_item;
break;
}
-
- temp = temp->next ();
+ }
+
+ if (current_item == 0)
+ {
+ // if the new message has highest priority of any,
+ // put it at the head of the list (and sublist)
+ result = enqueue_head_i (new_item);
+ sublist_head = new_item;
+ }
+ else
+ {
+ // insert the new item into the list
+ new_item->next (current_item->next ());
+ new_item->prev (current_item);
+ if (current_item->next ())
+ {
+ current_item->next ()->prev (new_item);
+ }
+ else
+ {
+ this->tail_ = new_item;
+ }
+
+ current_item->next (new_item);
+
+ // if the new item has lowest priority of any in the sublist,
+ // move the tail pointer of the sublist back to the new item
+ if (current_item == sublist_tail)
+ {
+ sublist_tail = new_item;
+ }
}
return result;
}
- // refresh the priorities in the queue according
- // to a specific priority assignment function
+ // enqueue a message in priority order within a given priority status sublist
+
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
{
- // Remove messages that are later than the priority range can represent
- int result = remove_stale_messages (tv);
- if (result < 0)
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+
+ int result = 0;
+ int last_in_subqueue = 0;
+
+ // first, try to dequeue from the head of the pending list
+ if (this->pending_head_)
{
- return result;
+ first_item = this->pending_head_;
+
+ if (0 == this->pending_head_->prev ())
+ {
+ this->head_ = this->pending_head_->next ();
+ }
+ else
+ {
+ this->pending_head_->prev ()->next (this->pending_head_->next ());
+ }
+
+ if (0 == this->pending_head_->next ())
+ {
+ this->tail_ = this->pending_head_->prev ();
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
+ }
+ else
+ {
+ this->pending_head_->next ()->prev (this->pending_head_->prev ());
+ this->pending_head_ = this->pending_head_->next ();
+ }
+
+ first_item->prev (0);
+ first_item->next (0);
+ }
+ // second, try to dequeue from the head of the late list
+ else if (this->late_head_)
+ {
+ last_in_subqueue =
+ (this->late_head_ == this->late_tail_) ? 1 : 0;
+
+ first_item = this->late_head_;
+
+ if (0 == this->late_head_->prev ())
+ {
+ this->head_ = this->late_head_->next ();
+ }
+ else
+ {
+ this->late_head_->prev ()->next (this->late_head_->next ());
+ }
+
+ if (0 == this->late_head_->next ())
+ {
+ this->tail_ = this->late_head_->prev ();
+ }
+ else
+ {
+ this->late_head_->next ()->prev (this->late_head_->prev ());
+ this->late_head_ = this->late_head_->next ();
+ }
+
+ if (last_in_subqueue)
+ {
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+
+ first_item->prev (0);
+ first_item->next (0);
}
+ // finally, try to dequeue from the head of the beyond late list
+ else if (this->beyond_late_head_)
+ {
+ last_in_subqueue =
+ (this->beyond_late_head_ == this->beyond_late_tail_) ? 1 : 0;
+
+ first_item = this->beyond_late_head_;
+ this->head_ = this->beyond_late_head_->next ();
+
+ if (0 == this->beyond_late_head_->next ())
+ {
+ this->tail_ = this->beyond_late_head_->prev ();
+ }
+ else
+ {
+ this->beyond_late_head_->next ()->prev (this->beyond_late_head_->prev ());
+ this->beyond_late_head_ = this->beyond_late_head_->next ();
+ }
+
+ if (last_in_subqueue)
+ {
+ this->beyond_late_head_ = 0;
+ this->beyond_late_tail_ = 0;
+ }
- // Refresh the order of messages in the queue,
- // putting pending messages ahead of late messages
- return reorder_queue (tv);
+ first_item->prev (0);
+ first_item->next (0);
+ }
+ else
+ {
+ // nothing to dequeue: set the pointer to zero and return an error code
+ first_item = 0;
+ result = -1;
+ }
+
+ return result;
}
- // refresh the order of messages in the queue
- // after refreshing their priorities
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // logical queue. Attempts first to dequeue from the pending
+ // portion of the queue, or if that is empty from the late portion,
+ // or if that is empty from the beyond late portion, or if that is
+ // empty just sets the passed pointer to zero and returns -1.
+
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::remove_stale_messages (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value &current_time)
{
- int result = 0;
+ int result;
+
+ result = refresh_pending_queue (current_time);
+
+ if (result != -1)
+ {
+ result = refresh_late_queue (current_time);
+ }
- // start at the beginning of the list
- ACE_Message_Block *current = head_;
+ return result;
+}
+ // Refresh the queue using the strategy
+ // specific priority status function.
- // maintain a list of dropped messages to
- // be appended to the end of the list after
- // the sweep is complete
- ACE_Message_Block *append_list_head = 0;
- ACE_Message_Block *append_list_tail = 0;
- while (current)
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_pending_queue (const ACE_Time_Value &current_time)
+{
+ ACE_Dynamic_Message_Strategy::Priority_Status current_status;
+
+ // refresh priority status boundaries in the queue
+ if (this->pending_head_)
{
- // messages that have overflowed the given time bounds must be removed
- if (message_strategy_.is_beyond_late (*current, tv))
+ current_status = message_strategy_.priority_status (*this->pending_head_, current_time);
+ switch (current_status)
{
- // find the end of the chain of overflowed messages
- ACE_Message_Block *remove_head = current;
- ACE_Message_Block *remove_tail = current;
- while ((remove_tail) && (remove_tail->next ()) &&
- message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
- {
- // extend the chain of messages to be removed
- remove_tail = remove_tail->next ();
- }
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
- // fix up list pointers to bypass the overflowed message chain
+ // make sure the head of the beyond late queue is set
+ // (there may not have been any beyond late messages previously)
+ this->beyond_late_head_ = this->head_;
- if (remove_tail->next ())
- {
- remove_tail->next ()->prev (remove_head->prev ());
- }
- else
- {
- tail_ = remove_head->prev ();
- }
+ // zero out the late queue pointers, and set them only if
+ // there turn out to be late messages in the pending sublist
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
- if (remove_head->prev ())
- {
- remove_head->prev ()->next (remove_tail->next ());
- }
- else
- {
- head_ = remove_tail->next ();
- }
+ // advance through the beyond late messages in the pending queue
+ do
+ {
+ this->pending_head_ = this->pending_head_->next ();
+
+ if (this->pending_head_)
+ {
+ current_status = message_strategy_.priority_status (*this->pending_head_,
+ current_time);
+ }
+ else
+ {
+ break; // do while
+ }
- // move the current pointer past the end of the chain
- current = remove_tail->next ();
-
- // Cut the chain of overflowed messages out of the list
- remove_head->prev (0);
- remove_tail->next (0);
-
- // Call strategy's drop_message method on each overflowed message.
- // Cannot just delete each message even though reference counting
- // at the data bloc level means that the underlying data block will
- // not be deleted if another message block is still pointing to it.
- // If the entire set of message blocks is known in advance, they may
- // be allocated on the stack instead of the heap (to speed performance),
- // and the caller *cannot* surrender ownership of the memory to the
- // list. Putting this policy in the strategy allows the correct memory
- // management scheme to be configured in either case.
- ACE_Message_Block *temp1 = remove_head;
- ACE_Message_Block *temp2 = remove_head->next ();
- ACE_Message_Block *size_temp;
- size_t msg_size;
- while (temp1)
- {
- // Make sure to count *all* the bytes in a composite message!!!
- for (size_temp = temp1, msg_size = 0;
- size_temp != 0;
- size_temp = size_temp->cont ())
+ } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
+
+ if (this->pending_head_)
{
- msg_size += size_temp->size ();
+ // point tail of beyond late sublist to previous item
+ this->beyond_late_tail_ = this->pending_head_->prev ();
+
+ if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // there are no late messages left in the queue
+ break; // switch
+ }
+ else
+ {
+ if (current_status != ACE_Dynamic_Message_Strategy::LATE)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
+ (int) current_status),
+ -1);
+ }
+
+ // intentionally fall through to the next case
+ }
+ }
+ else
+ {
+ // there are no pending or late messages left in the queue
+ this->beyond_late_tail_ = this->tail_;
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
+
+ break; // switch
}
- result = message_strategy_.drop_message (temp1);
- if (result < 0)
+ case ACE_Dynamic_Message_Strategy::LATE:
+
+ // make sure the head of the late queue is set (there may not have been
+ // any late messages previously, or they may have all become beyond late)
+ if (this->late_head_ == 0)
{
- return result;
+ this->late_head_ = this->pending_head_;
}
- if (temp1)
+ // advance through the beyond late messages in the pending queue
+ do
{
- // if the message was not destroyed, zero out its priority and
- // put it on the list to append to the back of the queue
- temp1->msg_priority (0);
- temp1->next (0);
- if (append_list_tail)
+ this->pending_head_ = this->pending_head_->next ();
+
+ if (this->pending_head_)
{
- temp1->prev (append_list_tail);
- append_list_tail->next (temp1);
+ current_status = message_strategy_.priority_status (*this->pending_head_,
+ current_time);
}
else
{
- temp1->prev (0);
- append_list_head = temp1;
+ break; // do while
+ }
+
+ } while (current_status == ACE_Dynamic_Message_Strategy::LATE);
+
+ if (this->pending_head_)
+ {
+ if (current_status != ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected PENDING)"),
+ (int) current_status),
+ -1);
}
- append_list_tail = temp1;
+
+ // point tail of late sublist to previous item
+ this->late_tail_ = this->pending_head_->prev ();
}
else
{
- // if the message was destroyed, decrease the message
- // count and byte count in the message queue
- this->cur_count_--;
- this->cur_bytes_ -= msg_size;
+ // there are no pending messages left in the queue
+ this->late_tail_ = this->tail_;
+ this->pending_head_ = 0;
+ this->pending_tail_ = 0;
}
- temp1 = temp2;
- temp2 = temp2 ? temp2->next () : temp2;
- }
- }
- else
- {
- current = current->next ();
- }
- }
+ break; // switch
- // append any saved dropped messages to the end of the queue
- if (append_list_tail)
- {
- if (tail_)
- {
- tail_->next (append_list_head);
- append_list_head->prev (tail_);
- tail_ = append_list_tail;
- }
- else
- {
- head_ = append_list_head;
- tail_ = append_list_tail;
+ case ACE_Dynamic_Message_Strategy::PENDING:
+
+ // do nothing - the pending queue is unchanged
+
+ break; // switch
+
+ default:
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unknown message priority status [%d]"),
+ (int) current_status),
+ -1);
+ break; // switch
}
}
- return result;
+ return 0;
}
- // Remove messages that are later than the priority range can represent.
+ // Refresh the pending queue using the strategy
+ // specific priority status function.
template <ACE_SYNCH_DECL> int
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::reorder_queue (const ACE_Time_Value & tv)
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_late_queue (const ACE_Time_Value &current_time)
{
- // if the queue is not empty, and the first message is late, need to reorder
- if ((head_) && (! message_strategy_.is_pending (*head_, tv)))
+ ACE_Dynamic_Message_Strategy::Priority_Status current_status;
+
+ if (this->late_head_)
{
- // find the end of the chain of newly late messages
- // (since the last time the queue was reordered)
- ACE_Message_Block *reorder_head = head_;
- ACE_Message_Block *reorder_tail = head_;
- while ((reorder_tail) && (reorder_tail->next ()) &&
- reorder_tail->next ()->msg_priority () <= reorder_head->msg_priority ())
+ current_status = message_strategy_.priority_status (*this->late_head_,
+ current_time);
+ switch (current_status)
{
- // extend the chain of messages to be removed
- reorder_tail = reorder_tail->next ();
- }
+ case ACE_Dynamic_Message_Strategy::BEYOND_LATE:
- // if a proper subset of the queue is out of order, reorganize the queue
- if (reorder_tail != tail_)
- {
- // fix up list pointers to bypass the overflowed message chain
- if (reorder_tail->next ())
- {
- reorder_tail->next ()->prev (reorder_head->prev ());
- }
- else
- {
- tail_ = reorder_head->prev ();
- }
- if (reorder_head->prev ())
- {
- reorder_head->prev ()->next (reorder_tail->next ());
- }
- else
- {
- head_ = reorder_tail->next ();
- }
+ // make sure the head of the beyond late queue is set
+ // (there may not have been any beyond late messages previously)
+ this->beyond_late_head_ = this->head_;
+
+ // advance through the beyond late messages in the late queue
+ do
+ {
+ this->late_head_ = this->late_head_->next ();
+
+ if (this->late_head_)
+ {
+ current_status = message_strategy_.priority_status (*this->late_head_,
+ current_time);
+ }
+ else
+ {
+ break; // do while
+ }
+
+ } while (current_status == ACE_Dynamic_Message_Strategy::BEYOND_LATE);
+
+ if (this->late_head_)
+ {
+ // point tail of beyond late sublist to previous item
+ this->beyond_late_tail_ = this->late_head_->prev ();
+
+ if (current_status == ACE_Dynamic_Message_Strategy::PENDING)
+ {
+ // there are no late messages left in the queue
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+ else if (current_status != ACE_Dynamic_Message_Strategy::LATE)
+ {
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status [%d] (expected LATE)"),
+ (int) current_status),
+ -1);
+ }
+ }
+ else
+ {
+ // there are no late messages left in the queue
+ this->beyond_late_tail_ = this->tail_;
+ this->late_head_ = 0;
+ this->late_tail_ = 0;
+ }
+
+ break; // switch
+
+ case ACE_Dynamic_Message_Strategy::LATE:
+
+ // do nothing - the late queue is unchanged
+
+ break; // switch
+
+ case ACE_Dynamic_Message_Strategy::PENDING:
+
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unexpected message priority status "
+ "[%d] (expected LATE or BEYOND_LATE)"),
+ (int) current_status),
+ -1);
+
+ break; // switch
+
+ default:
+
+ // if we got here, something is *seriously* wrong with the queue
+ ACE_ERROR_RETURN((LM_ERROR,
+ ASYS_TEXT ("Unknown message priority status [%d]"),
+ (int) current_status),
+ -1);
+
+ break; // switch
}
- }
+ }
return 0;
}
- // Refresh the order of messages in the queue.
+ // Refresh the late queue using the strategy
+ // specific priority status function.
template <ACE_SYNCH_DECL> int
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (
ACE_Message_Block *&first_item,
- ACE_Time_Value *tv)
+ ACE_Time_Value *timeout)
{
- return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, tv);
+ return ACE_Message_Queue<ACE_SYNCH_USE>::peek_dequeue_head (first_item, timeout);
}
// private method to hide public base class method: just calls base class method
@@ -1124,7 +1606,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
ACE_Notification_Strategy *ns,
u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
{
@@ -1133,7 +1614,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
ACE_NEW_RETURN (adms,
ACE_Deadline_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset),
0);
@@ -1142,32 +1622,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t
}
// factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_cleanup_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Deadline_Cleanup_Message_Strategy *adcms;
-
- ACE_NEW_RETURN (adcms,
- ACE_Deadline_Cleanup_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
-
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adcms, hwm, lwm, ns);
-}
- // factory method for a dynamically prioritized (by time to deadline)
- // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages
-
template <ACE_SYNCH_DECL>
ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
@@ -1176,7 +1630,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
ACE_Notification_Strategy *ns,
u_long static_bit_field_mask,
u_long static_bit_field_shift,
- u_long pending_threshold,
u_long dynamic_priority_max,
u_long dynamic_priority_offset)
{
@@ -1185,7 +1638,6 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
ACE_NEW_RETURN (alms,
ACE_Laxity_Message_Strategy (static_bit_field_mask,
static_bit_field_shift,
- pending_threshold,
dynamic_priority_max,
dynamic_priority_offset),
0);
@@ -1196,30 +1648,19 @@ ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hw
// factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
-template <ACE_SYNCH_DECL>
-ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
-ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_cleanup_message_queue (size_t hwm,
- size_t lwm,
- ACE_Notification_Strategy *ns,
- u_long static_bit_field_mask,
- u_long static_bit_field_shift,
- u_long pending_threshold,
- u_long dynamic_priority_max,
- u_long dynamic_priority_offset)
-{
- ACE_Laxity_Message_Strategy *alcms;
-
- ACE_NEW_RETURN (alcms,
- ACE_Laxity_Cleanup_Message_Strategy (static_bit_field_mask,
- static_bit_field_shift,
- pending_threshold,
- dynamic_priority_max,
- dynamic_priority_offset),
- 0);
+#if defined (VXWORKS)
- return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alcms, hwm, lwm, ns);
+ACE_Message_Queue_Vx *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_Vx_message_queue (size_t max_messages,
+ size_t max_message_length,
+ ACE_Notification_Strategy *ns)
+{
+ return new ACE_Message_Queue_Vx (max_messages, max_message_length, ns);
}
- // factory method for a dynamically prioritized (by laxity)
- // ACE_Dynamic_Message_Queue, with automatic cleanup of beyond late messages
+ // factory method for a wrapped VxWorks message queue
+
+#endif /* defined (VXWORKS) */
+
+
#endif /* ACE_MESSAGE_QUEUE_T_C */
diff --git a/ace/Message_Queue_T.h b/ace/Message_Queue_T.h
index 1d140abb7e6..6022eb6e5e6 100644
--- a/ace/Message_Queue_T.h
+++ b/ace/Message_Queue_T.h
@@ -66,7 +66,7 @@ public:
// = EWOULDBLOCK).
virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv = 0);
+ ACE_Time_Value *timeout = 0);
// Retrieve the first <ACE_Message_Block> without removing it.
// Returns -1 on failure, else the number of items still on the
// queue.
@@ -160,7 +160,7 @@ public:
virtual ACE_Notification_Strategy *notification_strategy (void);
virtual void notification_strategy (ACE_Notification_Strategy *s);
- void dump (void) const;
+ virtual void dump (void) const;
// Dump the state of an object.
ACE_ALLOC_HOOK_DECLARE;
@@ -199,11 +199,11 @@ protected:
// = Helper methods to factor out common #ifdef code.
virtual int wait_not_full_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv);
+ ACE_Time_Value *timeout);
// Wait for the queue to become non-full.
virtual int wait_not_empty_cond (ACE_Guard<ACE_SYNCH_MUTEX_T> &mon,
- ACE_Time_Value *tv);
+ ACE_Time_Value *timeout);
// Wait for the queue to become non-empty.
virtual int signal_enqueue_waiters (void);
@@ -343,32 +343,75 @@ class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
// = TITLE
// A derived class which adapts the <ACE_Message_Queue>
// class in order to maintain dynamic priorities for enqueued
- // <ACE_Message_Blocks> and manage the queue dynamically.
+ // <ACE_Message_Blocks> and manage the queue order according
+ // to these dynamic priorities.
//
// = DESCRIPTION
- // Priorities and queue orderings are refreshed at each enqueue
- // and dequeue operation. Head and tail enqueue methods were
- // made private to prevent out-of-order messages from confusing
- // the pending and late portions of the queue. Messages in the
- // pending portion of the queue whose dynamic priority becomes
- // negative are placed into the late portion of the queue.
- // Messages in the late portion of the queue whose dynamic
- // priority becomes positive are dropped. These behaviors
- // support a limited schedule overrun corresponding to one full
- // cycle through dynamic priority values. These behaviors can
- // be modified in derived classes by providing alternative
- // definitions for the appropriate virtual methods.
+ //
+ // The messages in the queue are managed so as to preserve
+ // a logical ordering with minimal overhead per enqueue and
+ // dequeue operation. For this reason, the actual order of
+ // messages in the linked list of the queue may differ from
+ // their priority order. As time passes, a message may change
+ // from pending status to late status, and eventually to beyond
+ // late status. To minimize reordering overhead under this
+ // design force, three separate boundaries are maintained
+ // within the linked list of messages. Messages are dequeued
+ // preferentially from the head of the pending portion, then
+ // the head of the late portion, and finally from the head
+ // of the beyond late portion. In this way, only the boundaries
+ // need to be maintained (which can be done efficiently, as
+ // aging messages maintain the same linked list order as they
+ // progress from one status to the next), with no reordering
+ // of the messages themselves, while providing correct priority
+ // ordered dequeueing semantics.
+ //
+ // Head and tail enqueue methods inherited from ACE_Message_Queue
+ // are made private to prevent out-of-order messages from confusing
+ // management of the various portions of the queue. Messages in
+ // the pending portion of the queue whose priority becomes late
+ // (according to the specific dynamic strategy) advance into
+ // the late portion of the queue. Messages in the late portion
+ // of the queue whose priority becomes later than can be represented
+ // advance to the beyond_late portion of the queue. These behaviors
+ // support a limited schedule overrun, with pending messages prioritized
+ // ahead of late messages, and late messages ahead of beyond late
+ // messages. These behaviors can be modified in derived classes by
+ // providing alternative definitions for the appropriate virtual methods.
+ //
+ // When filled with messages, the queue's linked list should look like:
+ //
+ // H T
+ // | |
+ //
+ // B - B - B - B - L - L - L - P - P - P - P - P
+ //
+ // | | | | | |
+ // BH BT LH LT PH PT
+ //
+ // Where the symbols are as follows:
+ //
+ // H = Head of the entire list
+ // T = Tail of the entire list
+ // B = Beyond late message
+ // BH = Beyond late messages Head
+ // BT = Beyond late messages Tail
+ // L = Late message
+ // LH = Late messages Head
+ // LT = Late messages Tail
+ // P = Pending message
+ // PH = Pending messages Head
+ // PT = Pending messages Tail
//
// Caveat: the virtual methods enqueue_tail, enqueue_head,
// and peek_dequeue_head were made private, but for some
// compilers it is possible to gain access to these methods
- // through nefarious means: achieving this may result in
+ // through base class pointers: achieving this may result in
// highly unpredictable results, as expectations about
// where a message starts or remains between method
// invocations may not hold for a dynamically managed
// message queue.
-
-
+ //
public:
// = Initialization and termination methods.
ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
@@ -379,12 +422,26 @@ public:
virtual ~ACE_Dynamic_Message_Queue (void);
// Close down the message queue and release all resources.
+ virtual int remove_messages (ACE_Message_Block *&list_head,
+ ACE_Message_Block *&list_tail,
+ u_int status_flags);
+ // Detach all messages with status given in the passed flags from
+ // the queue and return them by setting passed head and tail pointers
+ // to the linked list they comprise. This method is intended primarily
+ // as a means of periodically harvesting messages that have missed
+ // their deadlines, but is available in its most general form. All
+ // messages are returned in priority order, from head to tail, as of
+ // the time this method was called.
+
virtual int dequeue_head (ACE_Message_Block *&first_item,
ACE_Time_Value *timeout = 0);
// Dequeue and return the <ACE_Message_Block *> at the head of the
// queue. Returns -1 on failure, else the number of items still on
// the queue.
+ virtual void dump (void) const;
+ // Dump the state of the queue.
+
ACE_ALLOC_HOOK_DECLARE;
// Declare the dynamic allocation hooks.
@@ -397,23 +454,49 @@ protected:
// Message Queue constructor to update the priorities of all
// enqueued messages.
- virtual int refresh_priorities (const ACE_Time_Value & tv);
- // Refresh the priorities in the queue according to a specific
- // priority assignment function.
+ virtual int sublist_enqueue_i (ACE_Message_Block *new_item,
+ const ACE_Time_Value &current_time,
+ ACE_Message_Block *&sublist_head,
+ ACE_Message_Block *&sublist_tail,
+ ACE_Dynamic_Message_Strategy::Priority_Status status);
+ // enqueue a message in priority order within a given priority status sublist
- virtual int refresh_queue (const ACE_Time_Value & tv);
- // Refresh the order of messages in the queue after refreshing their
- // priorities.
+ virtual int dequeue_head_i (ACE_Message_Block *&first_item);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // logical queue. Attempts first to dequeue from the pending
+ // portion of the queue, or if that is empty from the late portion,
+ // or if that is empty from the beyond late portion, or if that is
+ // empty just sets the passed pointer to zero and returns -1.
+
+ virtual int refresh_queue (const ACE_Time_Value & current_time);
+ // Refresh the queue using the strategy
+ // specific priority status function.
- virtual int remove_stale_messages (const ACE_Time_Value & tv);
- // Remove messages that are later than the priority range can represent.
+ virtual int refresh_pending_queue (const ACE_Time_Value & current_time);
+ // Refresh the pending queue using the strategy
+ // specific priority status function.
- virtual int reorder_queue (const ACE_Time_Value & tv);
- // Refresh the order of messages in the queue.
+ virtual int refresh_late_queue (const ACE_Time_Value & current_time);
+ // Refresh the late queue using the strategy
+ // specific priority status function.
- ACE_Message_Block *pending_list_tail_;
- // Pointer to tail of the pending messages (those whose priority is
- // and has been non-negative) in the <ACE_Message_Block list>.
+ ACE_Message_Block *pending_head_;
+ // Pointer to head of the pending messages
+
+ ACE_Message_Block *pending_tail_;
+ // Pointer to tail of the pending messages
+
+ ACE_Message_Block *late_head_;
+ // Pointer to head of the late messages
+
+ ACE_Message_Block *late_tail_;
+ // Pointer to tail of the late messages
+
+ ACE_Message_Block *beyond_late_head_;
+ // Pointer to head of the beyond late messages
+
+ ACE_Message_Block *beyond_late_tail_;
+ // Pointer to tail of the beyond late messages
ACE_Dynamic_Message_Strategy &message_strategy_;
// Pointer to a dynamic priority evaluation function.
@@ -428,7 +511,7 @@ private:
// but make them private so they're not accessible outside the class
virtual int peek_dequeue_head (ACE_Message_Block *&first_item,
- ACE_Time_Value *tv = 0);
+ ACE_Time_Value *timeout = 0);
// private method to hide public base class method: just calls base class method
virtual int enqueue_tail (ACE_Message_Block *new_item,
@@ -469,46 +552,30 @@ public:
ACE_Notification_Strategy * = 0,
u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
// factory method for a dynamically prioritized (by time to deadline) ACE_Dynamic_Message_Queue
static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
- create_deadline_cleanup_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
- size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
- ACE_Notification_Strategy * = 0,
- u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // factory method for a dynamically prioritized (by time to deadline)
- // ACE_Dynamic_Message_Queue, with automatic deletion of beyond late messages
-
-
- static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
create_laxity_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
ACE_Notification_Strategy * = 0,
u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
// factory method for a dynamically prioritized (by laxity) ACE_Dynamic_Message_Queue
- static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
- create_laxity_cleanup_message_queue (size_t hwm = ACE_Message_Queue_Base::DEFAULT_HWM,
- size_t lwm = ACE_Message_Queue_Base::DEFAULT_LWM,
- ACE_Notification_Strategy * = 0,
- u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
- u_long static_bit_field_shift = 10, // 10 low order bits
- u_long pending_threshold = 0x200000UL, // 2^(22-1)
- u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
- u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
- // factory method for a dynamically prioritized (by laxity)
- // ACE_Dynamic_Message_Queue, with automatic deletion of beyond late messages
+
+#if defined (VXWORKS)
+
+ static ACE_Message_Queue_Vx *
+ create_Vx_message_queue (size_t max_messages, size_t max_message_length,
+ ACE_Notification_Strategy *ns = 0);
+ // factory method for a wrapped VxWorks message queue
+
+#endif /* defined (VXWORKS) */
+
};
#if defined (__ACE_INLINE__)
diff --git a/tests/Dynamic_Priority_Test.cpp b/tests/Dynamic_Priority_Test.cpp
index 57b5758fee6..baeeeab9beb 100644
--- a/tests/Dynamic_Priority_Test.cpp
+++ b/tests/Dynamic_Priority_Test.cpp
@@ -33,8 +33,13 @@
// reachable deadlines. The producer then immediately enqueues all
// messages.
//
-// In each test, the consumer is passed the filled queue and a string with
-// the expected order in which the messages should dequeue.
+// Two separate tests are run, one which verifies messages are correctly
+// ordered my the given queues, and one which generates performance
+// numbers for the various queues under increasing numbers of messages.
+// In the first test, the consumer is passed the filled queue and a string
+// with the expected order in which the messages should dequeue. In the
+// second test, measurements are made as non-intrusive as possible, with
+// no ordering checks.
//
// = AUTHOR
// Chris Gill
@@ -43,6 +48,8 @@
#include "ace/Message_Queue.h"
#include "ace/Thread_Manager.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Sched_Params.h"
#include "test_config.h"
ACE_RCSID(tests, Dynamic_Priority_Test, "$Id$")
@@ -52,18 +59,24 @@ USELIB("..\ace\aced.lib");
//---------------------------------------------------------------------------
#endif /* defined(__BORLANDC__) && __BORLANDC__ >= 0x0530 */
-// structure used to pass arguments to test functions
+enum Test_Type {BEST, WORST, RANDOM};
+
+// structure used to pass arguments to test functions
struct ArgStruct
{
ACE_Message_Queue<ACE_MT_SYNCH> *queue_;
const char *order_string_;
ACE_Message_Block **array_;
+ u_int expected_count_;
};
// order in which messages are sent
static const char send_order [] = "abcdefghijklmnop";
+// order in which messages are received with "FIFO prioritization" (i.e., none)
+static const char FIFO_receipt_order [] = "abcdefghijklmnop";
+
// order in which messages are received with static prioritization
static const char static_receipt_order [] = "ponmlkjihgfedcba";
@@ -73,29 +86,51 @@ static const char deadline_receipt_order [] = "hgfedcbaponmlkji";
// order in which messages are received with laxity prioritization
static const char laxity_receipt_order [] = "hfgedbcapnomljki";
-// fast and slow execution time values (sec, usec)
-static const ACE_Time_Value fast_execution (0, 10);
-static const ACE_Time_Value slow_execution (0, 100);
+// fast and slow execution time values (sec, usec),
+// kept very small to allow comparison of deadline,
+// laxity, and static strategies across a very wide
+// range of message counts.
+static const ACE_Time_Value fast_execution (0, 1);
+static const ACE_Time_Value slow_execution (0, 2);
// Make the queue be capable of being *very* large.
static const long max_queue = LONG_MAX;
-// The consumer dequeues a message from the passed Message_Queue,
+#if defined (VXWORKS)
+// VxWorks Message Queue parameters
+vx_max_queue = INT_MAX
+vx_msg_size = 32
+#endif /* defined (VXWORKS) */
+
+// loading parameters (number of messages to push through queues)
+// for performance tests
+static int MIN_LOAD = 20;
+static int MAX_LOAD = 1000;
+static int LOAD_STEP = 20;
+
+// time offsets for a minute in the past (for the best case test) and
+// two seconds in the future (for the worst case and randomized tests)
+const static ACE_Time_Value far_past_offset (-60, 0);
+const static ACE_Time_Value near_future_offset (2, 0);
+const static ACE_Time_Value offset_step (0, 5);
+
+// The order consumer dequeues a message from the passed Message_Queue,
// and checks its data character against the passed string of characters
-// which has the expected ordering. The supplier and consumer do not
+// which has the expected ordering. Suppliers and consumers do not
// allocate or deallocate messages, to avoid timing delays and timing
// jitter in the test: the main function is responsible for all
// initialization allocation and cleanup before, between, and after
-// (but not during) the transfer of messages from the supplier to the
-// consumer.
+// (but not during) the transfer of messages from a supplier to the
+// corresponding consumer.
static void *
-consumer (void * args)
+order_consumer (void * args)
{
ACE_ASSERT (args != 0);
ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_;
const char *receipt_order = ((ArgStruct *) args)->order_string_;
+ u_int expected_count = ((ArgStruct *) args)->expected_count_;
ACE_ASSERT (receipt_order != 0);
ACE_ASSERT (msg_queue != 0);
@@ -111,30 +146,36 @@ consumer (void * args)
int result = msg_queue->dequeue_head (mb);
if (result == -1)
+ {
break;
+ }
local_count++;
- ACE_ASSERT (*expected == *mb->rd_ptr ());
+ ACE_ASSERT (*expected == *mb->rd_ptr ());
}
ACE_ASSERT (local_count == ACE_OS::strlen (receipt_order));
+
+ ACE_ASSERT (local_count == expected_count);
+
return 0;
}
-// The producer runs through the passed send string, setting the read
+// The order producer runs through the passed send string, setting the read
// pointer of the current message to the current character position in
// the string, and then queueing the message in the message list, where
-// it is removed by the consumer thread.
+// it is removed by the order consumer.
static void *
-producer (void *args)
+order_producer (void *args)
{
ACE_ASSERT (args != 0);
ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_;
const char *send_order = ((ArgStruct *) args)->order_string_;
ACE_Message_Block **block_array = ((ArgStruct *) args)->array_;
+ int expected_count = ((ArgStruct *) args)->expected_count_;
ACE_ASSERT (send_order != 0);
ACE_ASSERT (block_array != 0);
@@ -142,7 +183,7 @@ producer (void *args)
// iterate through the send order string and the message block array,
// setting the current message block's read pointer to the current
// position in the send order string.
- int local_count;
+ int local_count = 0;
const char *c;
for (local_count = 0, c = send_order; *c != '\0'; ++local_count, ++c)
{
@@ -155,16 +196,21 @@ producer (void *args)
*mb->rd_ptr () = *c;
mb->wr_ptr (1);
+
// Enqueue the message block in priority order.
if (msg_queue->enqueue_prio (mb) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("(%t) %p\n"), ASYS_TEXT ("put_next")), 0);
+ {
+ break;
+ }
}
+ ACE_ASSERT (local_count == expected_count);
+
return 0;
}
-int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_order, const char *receipt_order)
+int run_order_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_order, const char *receipt_order)
{
u_int i;
u_int array_size = ACE_OS::strlen (send_order);
@@ -178,6 +224,7 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde
supplier_args.queue_ = msg_queue;
supplier_args.order_string_ = send_order;
+ supplier_args.expected_count_ = ACE_OS::strlen (send_order);
// allocate message blocks, fill in pointer array, set static information
ACE_NEW_RETURN (supplier_args.array_, ACE_Message_Block * [array_size], -1);
@@ -195,6 +242,7 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde
consumer_args.queue_ = msg_queue;
consumer_args.order_string_ = receipt_order;
+ consumer_args.expected_count_ = ACE_OS::strlen (receipt_order);
consumer_args.array_ = 0;
// Construct pending and late absolute deadline times.
@@ -240,11 +288,11 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde
}
}
- // run the producer
- producer (&supplier_args);
+ // run the order test producer
+ order_producer (&supplier_args);
- // run the consumer
- consumer (&consumer_args);
+ // run the order test consumer
+ order_consumer (&consumer_args);
// free all the allocated message blocks
for (i = 0; i < array_size; ++i)
@@ -259,40 +307,412 @@ int run_test (ACE_Message_Queue<ACE_MT_SYNCH>* msg_queue, const char *send_orde
}
+// The performance consumer starts a timer, dequeues all messages from the
+// passed Message_Queue, stops the timer, and reports the number of
+// dequeued messages, the elapsed time, and the average time per message.
+
+static void *
+performance_consumer (void * args)
+{
+ ACE_High_Res_Timer timer;
+
+ ACE_ASSERT (args != 0);
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_;
+ u_int expected_count = ((ArgStruct *) args)->expected_count_;
+
+ ACE_ASSERT (msg_queue != 0);
+
+ u_int local_count = 0;
+ ACE_Message_Block *mb = 0;
+
+ // reset, then start timer
+ timer.reset ();
+ timer.start ();
+
+ // Keep looping, reading a message out of the queue, until
+ // the expected number of messages have been dequeued.
+ for (local_count = 0; local_count < expected_count; ++local_count)
+ {
+ if (msg_queue->dequeue_head (mb) == -1)
+ {
+ break;
+ }
+
+ // ACE_ASSERT ('a' == *mb->rd_ptr ());
+ }
+
+ // stop timer, obtain and report its elapsed time
+ timer.stop ();
+ ACE_Time_Value tv;
+ timer.elapsed_time (tv);
+ ACE_DEBUG ((LM_INFO, "%6u, %6u, %f",
+ local_count,
+ tv.msec (),
+ (double) tv.msec () / local_count));
+
+ ACE_ASSERT (local_count == expected_count);
+
+ return 0;
+}
+
+// The performance producer starts a timer, enqueues the passed messages setting the
+// read pointer of each message to the first character position in the passed string,
+// stops the timer, and reports the number of enqueued messages, the elapsed time,
+// and the average time per message.
+
+static void *
+performance_producer (void *args)
+{
+ ACE_High_Res_Timer timer;
+
+ ACE_ASSERT (args != 0);
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *msg_queue = ((ArgStruct *) args)->queue_;
+ ACE_Message_Block **block_array = ((ArgStruct *) args)->array_;
+ int expected_count = ((ArgStruct *) args)->expected_count_;
+
+ ACE_ASSERT (send_order != 0);
+ ACE_ASSERT (block_array != 0);
+
+ // reset, then start timer
+ timer.reset ();
+ timer.start ();
+
+ // iterate through the message block array, setting the character under
+ // the current message block's read pointer to null before enqueueing
+ // the message block.
+ int local_count = 0;
+ for (local_count = 0; local_count < expected_count; ++local_count)
+ {
+ // point to the current message block
+ ACE_Message_Block *mb = block_array [local_count];
+ ACE_ASSERT (mb != 0);
+
+ // Set a character in the current message block at its
+ // read pointer position, and adjust the write pointer
+ *mb->rd_ptr () = 'a';
+ mb->wr_ptr (1);
+
+ // Enqueue the message block in priority order.
+ if (msg_queue->enqueue_prio (mb) == -1)
+ {
+ break;
+ }
+ }
+
+ // stop timer, obtain and report its elapsed time
+ timer.stop ();
+ ACE_Time_Value tv;
+ timer.elapsed_time (tv);
+ ACE_DEBUG ((LM_INFO, "%6u, %6u, %f, ",
+ local_count,
+ tv.msec (),
+ (double) tv.msec () / local_count));
+
+ ACE_ASSERT (local_count == expected_count);
+
+ return 0;
+}
+
+int run_performance_test (u_int min_load, u_int max_load, u_int load_step,
+ Test_Type test_type)
+{
+ ArgStruct supplier_args, consumer_args; // supplier and consumer argument strings
+ u_int load = 0; // message load
+ ACE_Time_Value *time_offsets; // pointer to array of time offsets
+ ACE_Time_Value current_time; // current time value
+ u_int shuffle_index; // used to shuffle arrays
+ int random_int; // also used to shuffle arrays
+ ACE_Message_Block *temp_block; // temporary message block pointer
+ ACE_Time_Value temp_time; // temporary time value
+
+ // build a static queue, a deadline based dynamic
+ // queue, and a laxity based dynamic queue
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *static_queue = 0;
+ static_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue (max_queue);
+ ACE_ASSERT (static_queue != 0);
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *deadline_queue = 0;
+ deadline_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_message_queue (max_queue);
+ ACE_ASSERT (deadline_queue != 0);
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *laxity_queue = 0;
+ laxity_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_message_queue (max_queue);
+ ACE_ASSERT (laxity_queue != 0);
+
+ // zero out unused struct members
+ supplier_args.order_string_ = 0;
+ consumer_args.order_string_ = 0;
+ consumer_args.array_ = 0;
+
+ // print column headings for the specific test type
+ switch (test_type)
+ {
+ case BEST:
+
+ ACE_DEBUG ((LM_INFO,
+ "\n\nenqueued, best static time, best static avg, "
+ "dequeued, best static time, best static avg, "
+ "enqueued, best deadline time, best deadline avg, "
+ "dequeued, best deadline time, best deadline avg, "
+ "enqueued, best laxity time, best laxity avg, "
+ "dequeued, best laxity time, best laxity avg\n"));
+ break;
+
+ case WORST:
+
+ ACE_DEBUG ((LM_INFO,
+ "\n\nenqueued, worst static time, worst static avg, "
+ "dequeued, worst static time, worst static avg, "
+ "enqueued, worst deadline time, worst deadline avg, "
+ "dequeued, worst deadline time, worst deadline avg, "
+ "enqueued, worst laxity time, worst laxity avg, "
+ "dequeued, worst laxity time, worst laxity avg\n"));
+
+ break;
+
+ case RANDOM:
+
+ ACE_DEBUG ((LM_INFO,
+ "\n\nenqueued, random static time, random static avg, "
+ "dequeued, random static time, random static avg, "
+ "enqueued, random deadline time, random deadline avg, "
+ "dequeued, random deadline time, random deadline avg, "
+ "enqueued, random laxity time, random laxity avg, "
+ "dequeued, random laxity time, random laxity avg\n"));
+ break;
+
+ default:
+
+ ACE_ERROR_RETURN ((LM_ERROR, "unknown test type %d", test_type), -1);
+ }
+
+ // iterate through the message loads, and at
+ // each load do an identical test on all queues
+ for (load = min_load; load <= max_load; load += load_step)
+ {
+ u_int i;
+
+ supplier_args.expected_count_ = load;
+ consumer_args.expected_count_ = load;
+
+ // allocate message blocks, fill in pointer array, set static information
+ ACE_NEW_RETURN (supplier_args.array_, ACE_Message_Block * [load], -1);
+
+ // allocate array of timing offsets
+ ACE_NEW_RETURN (time_offsets, ACE_Time_Value [load], -1);
+
+ // fill in information for all types of tests
+ for (i = 0; i < load; ++i)
+ {
+ // construct a message new block off the heap, to hold a single character
+ ACE_NEW_RETURN (supplier_args.array_[i], ACE_Message_Block (1), -1);
+
+ // assign every other message short or long execution time
+ supplier_args.array_[i]->msg_execution_time (((i % 2) ? slow_execution : fast_execution));
+ }
+
+ // fill in information for the specific type of test
+ switch (test_type)
+ {
+ case BEST:
+
+ // fill in best case information
+ time_offsets [0] = far_past_offset;
+ supplier_args.array_[0]->msg_priority (load);
+ for (i = 1; i < load; ++i)
+ {
+ // assign static (minimal) message priority in descending order
+ supplier_args.array_[i]->msg_priority (load - i);
+
+ // assign time to deadline in descending order
+ time_offsets [i] = time_offsets [i - 1] + offset_step;
+ }
+
+ break;
+
+ case WORST:
+
+ // fill in worst case information
+ time_offsets [0] = near_future_offset;
+ supplier_args.array_[0]->msg_priority (0);
+ for (i = 1; i < load; ++i)
+ {
+ // assign static (minimal) message priority in ascending order
+ supplier_args.array_[i]->msg_priority (i);
+
+ // assign time to deadline in descending order
+ // (puts dynamic priority in ascending order)
+ time_offsets [i] = time_offsets [i - 1] - offset_step;
+ }
+
+ break;
+
+ case RANDOM:
+
+ // fill in worst case information
+ time_offsets [0] = near_future_offset;
+ supplier_args.array_[0]->msg_priority (0);
+ for (i = 1; i < load; ++i)
+ {
+ // assign static (minimal) message priority in ascending order
+ supplier_args.array_[i]->msg_priority (i);
+
+ // assign time to deadline in descending order
+ // (puts dynamic priority in ascending order)
+ time_offsets [i] = time_offsets [i - 1] - offset_step;
+ }
+
+ // then shuffle the arrays in tandem
+ for (i = 0; i < load; ++i)
+ {
+ // choose a (pseudo) random integer (evenly distributed over [0, load-1])
+ if (RAND_MAX >= load)
+ {
+ // discard integers in the tail of the random range that
+ // do not distribute evenly modulo the number of messages
+ do
+ {
+ random_int = ACE_OS::rand ();
+ } while (random_int >= (int)(RAND_MAX - (RAND_MAX % load)));
+ }
+ else if (RAND_MAX < load - 1)
+ {
+ // this should only happen for a *very* large messages
+ // relative to the system's representation size
+ ACE_ERROR_RETURN ((LM_ERROR, "Insufficient range of random numbers"), -1);
+ }
+
+ shuffle_index = random_int % load;
+
+ // swap the message at the current index with the one at the shuffle index
+ temp_block = supplier_args.array_[i];
+ supplier_args.array_[i] = supplier_args.array_[shuffle_index];
+ supplier_args.array_[shuffle_index] = temp_block;
+
+ // swap the time at the current index with the one at the shuffle index
+ temp_time = time_offsets [i];
+ time_offsets [i] = time_offsets [shuffle_index];
+ time_offsets [shuffle_index] = temp_time;
+ }
+
+ break;
+
+ default:
+
+ ACE_ERROR_RETURN ((LM_ERROR, "unknown test type %d", test_type), -1);
+ }
+
+ // Set absolute time of deadline associated with each message.
+ current_time = ACE_OS::gettimeofday ();
+ for (i = 0; i < load; ++i)
+ {
+ supplier_args.array_[i]->msg_deadline_time (time_offsets [i] + current_time);
+ }
+
+ // run the performance test producer and consumer on the static queue
+ supplier_args.queue_ = static_queue;
+ performance_producer (&supplier_args);
+ consumer_args.queue_ = static_queue;
+ performance_consumer (&consumer_args);
+
+ // add a comma delimiter for most recent outputs
+ ACE_DEBUG ((LM_INFO, ", "));
+
+ // run the performance test producer and consumer on the deadline queue
+ supplier_args.queue_ = deadline_queue;
+ performance_producer (&supplier_args);
+ consumer_args.queue_ = deadline_queue;
+ performance_consumer (&consumer_args);
+
+ // add a comma delimiter for most recent outputs
+ ACE_DEBUG ((LM_INFO, ", "));
+
+ // run the performance test producer and consumer on the laxity queue
+ supplier_args.queue_ = laxity_queue;
+ performance_producer (&supplier_args);
+ consumer_args.queue_ = laxity_queue;
+ performance_consumer (&consumer_args);
+
+ // move to the next line of output
+ ACE_DEBUG ((LM_INFO, "\n"));
+
+ // free all the allocated message blocks
+ for (i = 0; i < load; ++i)
+ {
+ delete supplier_args.array_[i];
+ }
+
+ // free the allocated pointer array
+ delete [] supplier_args.array_;
+
+ }
+
+ // free resources and leave
+ delete static_queue;
+ delete deadline_queue;
+ delete laxity_queue;
+ return 0;
+}
+
int
main (int, ASYS_TCHAR *[])
{
ACE_START_TEST (ASYS_TEXT ("Dynamic_Priority_Test"));
- ACE_Message_Queue<ACE_MT_SYNCH> *test_queue;
+ // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
+ if (ACE_OS::sched_params (
+ ACE_Sched_Params (
+ ACE_SCHED_FIFO,
+ ACE_Sched_Params::priority_min (ACE_SCHED_FIFO),
+ ACE_SCOPE_PROCESS)) != 0)
+ {
+ if (ACE_OS::last_error () == EPERM)
+ ACE_DEBUG ((LM_MAX, "preempt: user is not superuser, "
+ "so remain in time-sharing class\n"));
+ else
+ ACE_ERROR_RETURN ((LM_ERROR, "%n: ACE_OS::sched_params failed\n%a"),
+ -1);
+ }
+
+
+ ACE_Message_Queue<ACE_MT_SYNCH> *test_queue = 0;
// test factory, static message queue
test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_static_message_queue (max_queue);
ACE_ASSERT (test_queue != 0);
- run_test (test_queue, send_order, static_receipt_order);
+ run_order_test (test_queue, send_order, static_receipt_order);
delete test_queue;
- // test factory, dynamic message queue (deadline strategy, no cleanup)
+ // test factory, dynamic message queue (deadline strategy)
test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_message_queue (max_queue);
ACE_ASSERT (test_queue != 0);
- run_test (test_queue, send_order, deadline_receipt_order);
+ run_order_test (test_queue, send_order, deadline_receipt_order);
delete test_queue;
- // test factory, dynamic message queue (laxity strategy, no cleanup)
+ // test factory, dynamic message queue (laxity strategy)
test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_message_queue (max_queue);
ACE_ASSERT (test_queue != 0);
- run_test (test_queue, send_order, laxity_receipt_order);
+ run_order_test (test_queue, send_order, laxity_receipt_order);
delete test_queue;
- // test factory (deadline strategy, with cleanup)
- test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_deadline_cleanup_message_queue (max_queue);
+#if defined (VXWORKS)
+ // test factory for VxWorks message queue
+ test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_Vx_message_queue (vx_max_queue, vx_msg_size);
ACE_ASSERT (test_queue != 0);
+ // (TBD - does message receipt order test make any sense for Vx Queue ?
+ // If so, uncomment order test, or if not remove order test, below)
+ // run_order_test (test_queue, send_order, static_receipt_order);
delete test_queue;
+#endif /* VXWORKS */
- // test factory (laxity strategy, with cleanup)
- test_queue = ACE_Message_Queue_Factory<ACE_MT_SYNCH>::create_laxity_cleanup_message_queue (max_queue);
- ACE_ASSERT (test_queue != 0);
- delete test_queue;
+ // For each of an increasing number of message loads, run the same performance
+ // test (best case, worst case, and randomized, over each kind of queue
+ run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, BEST);
+ run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, WORST);
+ run_performance_test (MIN_LOAD, MAX_LOAD, LOAD_STEP, RANDOM);
ACE_END_TEST;
return 0;