summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 16:33:06 +0000
committercdgill <cdgill@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-06-24 16:33:06 +0000
commit7c1c009a9f05509740534c4aa9d86b3f98c08f48 (patch)
tree2f8d3ef6af798cd9475ee7d0fc6da37251e537ec
parent532f007a3ba02d737b31ec46d23c891b90f2800f (diff)
downloadATCD-7c1c009a9f05509740534c4aa9d86b3f98c08f48.tar.gz
added dynamic message queues
-rw-r--r--TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.cpp124
-rw-r--r--TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.h119
-rw-r--r--ace/Message_Block.cpp32
-rw-r--r--ace/Message_Block.h31
-rw-r--r--ace/Message_Block.i29
-rw-r--r--ace/Message_Queue.cpp530
-rw-r--r--ace/Message_Queue.h269
-rw-r--r--ace/Message_Queue.i78
8 files changed, 962 insertions, 250 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.cpp b/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.cpp
index ff0190d0688..bd1bb443d10 100644
--- a/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.cpp
+++ b/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.cpp
@@ -794,7 +794,7 @@ ACE_RMS_Scheduler_Strategy::~ACE_RMS_Scheduler_Strategy ()
long
ACE_RMS_Scheduler_Strategy::dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time)
+ u_long current_time)
{
return 0;
}
@@ -1137,13 +1137,13 @@ ACE_RMS_Dyn_Scheduler_Strategy::dynamic_subpriority (Dispatch_Entry &entry,
if (entry.task_entry ().rt_info ()->criticality <
RtecScheduler::HIGH_CRITICALITY)
{
- long laxity =
- entry.deadline ().low - current_time -
- entry.task_entry ().rt_info ()->worst_case_execution_time.low;
+ long laxity =
+ entry.deadline ().low - current_time -
+ entry.task_entry ().rt_info ()->worst_case_execution_time.low;
- return (laxity > 0) ? LONG_MAX - laxity : laxity;
+ return (laxity > 0) ? LONG_MAX - laxity : laxity;
}
-
+
return 0;
}
// = returns a dynamic subpriority value for the given entry and the
@@ -1216,118 +1216,6 @@ ACE_RMS_Dyn_Scheduler_Strategy::minimum_critical_priority ()
// = returns 0 for minimum critical priority number
-/////////////////////////////////////////////
-// Runtime Dispatch Subpriority Strategies //
-/////////////////////////////////////////////
-
-ACE_Dispatch_Subpriority_Strategy::ACE_Dispatch_Subpriority_Strategy (long frame_size)
- : frame_size_ (frame_size)
- , dynamic_max_ (LONG_MAX)
- , static_max_ (0)
- , static_bits_ (0)
-{
- // some platforms don't support floating point numbers, so compute delimiters
- // for static and dynamic subpriority representations the long way (once)
- long doubler = 1;
- while ((dynamic_max_ / 2) > frame_size)
- {
- dynamic_max_ /= 2;
- doubler *= 2;
- static_max_ = doubler - 1;
- ++static_bits_;
- }
-
- max_time_.set (0, dynamic_max_);
- min_time_.set (0, - dynamic_max_ - 1);
-}
-
-
-RtecScheduler::Preemption_Subpriority
-ACE_Dispatch_Subpriority_Strategy::response_function (
- const ACE_Time_Value &time_metric,
- RtecScheduler::Preemption_Subpriority static_subpriority)
-{
- RtecScheduler::Preemption_Subpriority dynamic_subpriority;
-
- if (time_metric < min_time_)
- {
- dynamic_subpriority = - dynamic_max_ - 1;
- static_subpriority = static_max_;
- }
- else if (time_metric > max_time_)
- {
- dynamic_subpriority = dynamic_max_;
- static_subpriority = static_max_;
- }
- else
- {
- dynamic_subpriority = time_metric.usec () +
- time_metric.sec () * ACE_ONE_SECOND_IN_USECS;
-
- if (static_subpriority > static_max_)
- {
- static_subpriority = static_max_;
- }
- }
-
- return (dynamic_subpriority > 0)
- ? (LONG_MAX -
- (dynamic_subpriority << static_bits_) -
- static_subpriority)
- : (LONG_MIN +
- ((-dynamic_subpriority) << static_bits_) +
- static_subpriority);
-}
-
-
-ACE_Deadline_Subpriority_Strategy::ACE_Deadline_Subpriority_Strategy (long frame_size)
- : ACE_Dispatch_Subpriority_Strategy (frame_size)
-{
-}
-
-RtecScheduler::Preemption_Subpriority
-ACE_Deadline_Subpriority_Strategy::runtime_subpriority (
- const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority)
-{
- ACE_Time_Value time_to_deadline (deadline_time);
- time_to_deadline -= current_time;
-
- return response_function (time_to_deadline, static_subpriority);
-}
-
-ACE_Laxity_Subpriority_Strategy::ACE_Laxity_Subpriority_Strategy (long frame_size)
- : ACE_Dispatch_Subpriority_Strategy (frame_size)
-{
-}
-
-RtecScheduler::Preemption_Subpriority
-ACE_Laxity_Subpriority_Strategy::runtime_subpriority (
- const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority)
-{
- ACE_Time_Value laxity (deadline_time);
- laxity -= current_time;
- laxity -= execution_time;
-
- return response_function (laxity, static_subpriority);
-}
-
-RtecScheduler::Preemption_Subpriority
-ACE_Static_Subpriority_Strategy::runtime_subpriority (
- const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority)
-{
- // map passed static subpriority directly to dispatch subpriority
- return static_subpriority;
-}
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Node<Dispatch_Entry *>;
diff --git a/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.h b/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.h
index 8310e1c47e8..e65c11d83e0 100644
--- a/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.h
+++ b/TAO/orbsvcs/orbsvcs/Sched/Strategy_Scheduler.h
@@ -139,8 +139,8 @@ public:
// is greater in the order, 0 if they are equivalent, or 1 if the
// second Dispatch_Entry is greater in the order
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time) = 0;
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ u_long current_time) = 0;
// = returns a dynamic subpriority value
// for the given timeline entry at the current time
@@ -204,8 +204,8 @@ public:
protected:
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time);
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ u_long current_time);
// = returns a dynamic subpriority value at the current time for
// the given timeline entry: if the operation has
// non-negative laxity, then the value is positive, and a lower
@@ -263,8 +263,8 @@ public:
protected:
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time);
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ u_long current_time);
// = just returns 0: all operations have
// the same dynamic subpriority value
@@ -317,7 +317,7 @@ public:
protected:
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
u_long current_time);
// = returns a dynamic subpriority value at the current time for
// the given timeline entry: if the operation has
@@ -372,8 +372,8 @@ public:
protected:
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time);
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ u_long current_time);
// = returns a dynamic subpriority value at the current time for the
// given timeline entry: if the operation has non-negative
// time to deadline, then value is positive, and a shorter time to
@@ -433,8 +433,8 @@ public:
protected:
- virtual long dynamic_subpriority (Dispatch_Entry &entry,
- u_long current_time);
+ virtual long dynamic_subpriority (Dispatch_Entry &entry,
+ u_long current_time);
// = returns a dynamic subpriority value at the current time for the
// given timeline entry: if the operation is in the
// critical set, the dynamic subpriority value is always 0; if the
@@ -465,103 +465,6 @@ private:
};
-/////////////////////////////////////////////
-// Runtime Dispatch Subpriority Strategies //
-/////////////////////////////////////////////
-
-class TAO_ORBSVCS_Export ACE_Dispatch_Subpriority_Strategy
-{
-public:
-
- ACE_Dispatch_Subpriority_Strategy (long frame_size = ACE_ONE_SECOND_IN_USECS);
- // ctor: frame size is the number of microseconds in a complete
- // dispatch frame (defaults to one million = one second).
-
- virtual RtecScheduler::Preemption_Subpriority
- runtime_subpriority (const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority) = 0;
- // abstract method to compute the dispatch subpriority for an operation
-
- virtual RtecScheduler::Preemption_Subpriority
- response_function (const ACE_Time_Value &time_metric,
- RtecScheduler::Preemption_Subpriority static_subpriority);
- // response function for run time subpriority: stepwise linear function that
- // gives appoximately the same dispatching behavior as a hyperbolic function,
- // but does not require us to use floating point math.
-
-
-protected:
-
- long frame_size_;
- // number of microseconds per scheduling frame
-
- long dynamic_max_;
- // max value that a dynamic priority representation can have
-
- long static_max_;
- // number of bits available for static subpriority representation
-
- u_int static_bits_;
- // number of bits available for static subpriority representation
-
- ACE_Time_Value max_time_;
- // maximum time value that can be represented
-
- ACE_Time_Value min_time_;
- // minimum time value that can be represented
-
-};
-
-
-class TAO_ORBSVCS_Export ACE_Deadline_Subpriority_Strategy
- : public ACE_Dispatch_Subpriority_Strategy
-{
-public:
-
- ACE_Deadline_Subpriority_Strategy (long frame_size = ACE_ONE_SECOND_IN_USECS);
- // ctor: frame size is the number of microseconds in a complete
- // dispatch frame (defaults to one million = one second).
-
-
- virtual RtecScheduler::Preemption_Subpriority
- runtime_subpriority (const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority);
-};
-
-class TAO_ORBSVCS_Export ACE_Laxity_Subpriority_Strategy
- : public ACE_Dispatch_Subpriority_Strategy
-{
-public:
-
- ACE_Laxity_Subpriority_Strategy (long frame_size = ACE_ONE_SECOND_IN_USECS);
- // ctor: frame size is the number of microseconds in a complete
- // dispatch frame (defaults to one million = one second).
-
-
- virtual RtecScheduler::Preemption_Subpriority
- runtime_subpriority (const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority);
-};
-
-class TAO_ORBSVCS_Export ACE_Static_Subpriority_Strategy
- : public ACE_Dispatch_Subpriority_Strategy
-{
-public:
-
- virtual RtecScheduler::Preemption_Subpriority
- runtime_subpriority (const ACE_Time_Value &current_time,
- const ACE_Time_Value &deadline_time,
- const ACE_Time_Value &execution_time,
- RtecScheduler::Preemption_Subpriority static_subpriority);
-};
-
-
#if defined (__ACE_INLINE__)
#include "Strategy_Scheduler.i"
#endif /* __ACE_INLINE__ */
diff --git a/ace/Message_Block.cpp b/ace/Message_Block.cpp
index 42d6550dedf..ee420545f3e 100644
--- a/ace/Message_Block.cpp
+++ b/ace/Message_Block.cpp
@@ -220,6 +220,8 @@ ACE_Message_Block::ACE_Message_Block (const char *data,
0, // locking strategy
ACE_Message_Block::DONT_DELETE, // flags
0, // priority
+ ACE_Time_Value::zero, // execution time
+ ACE_Time_Value::zero, // absolute time of deadline
0) == -1) // data block
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("ACE_Message_Block")));
}
@@ -236,6 +238,8 @@ ACE_Message_Block::ACE_Message_Block (void)
0, // locking strategy
ACE_Message_Block::DONT_DELETE, // flags
0, // priority
+ ACE_Time_Value::zero, // execution time
+ ACE_Time_Value::zero, // absolute time of deadline
0) == -1) // data block
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("ACE_Message_Block")));
}
@@ -246,7 +250,9 @@ ACE_Message_Block::ACE_Message_Block (size_t size,
const char *msg_data,
ACE_Allocator *allocator_strategy,
ACE_Lock *locking_strategy,
- u_long priority)
+ u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time)
{
ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
@@ -258,6 +264,8 @@ ACE_Message_Block::ACE_Message_Block (size_t size,
locking_strategy,
msg_data ? ACE_Message_Block::DONT_DELETE : 0,
priority,
+ execution_time,
+ deadline_time,
0) == -1) // data block
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("ACE_Message_Block")));
}
@@ -269,7 +277,9 @@ ACE_Message_Block::init (size_t size,
const char *msg_data,
ACE_Allocator *allocator_strategy,
ACE_Lock *locking_strategy,
- u_long priority)
+ u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time)
{
ACE_TRACE ("ACE_Message_Block::init");
@@ -281,6 +291,8 @@ ACE_Message_Block::init (size_t size,
locking_strategy,
msg_data ? ACE_Message_Block::DONT_DELETE : 0,
priority,
+ execution_time,
+ deadline_time,
0); // data block
}
@@ -299,6 +311,8 @@ ACE_Message_Block::init (const char *data,
0, // locking strategy
ACE_Message_Block::DONT_DELETE, // flags
0, // priority
+ ACE_Time_Value::zero, // execution time
+ ACE_Time_Value::zero, // absolute time of deadline
0); // data block
}
@@ -310,6 +324,8 @@ ACE_Message_Block::ACE_Message_Block (size_t size,
ACE_Lock *locking_strategy,
Message_Flags flags,
u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time,
ACE_Data_Block *db)
{
ACE_TRACE ("ACE_Message_Block::ACE_Message_Block");
@@ -322,6 +338,8 @@ ACE_Message_Block::ACE_Message_Block (size_t size,
locking_strategy,
flags,
priority,
+ execution_time,
+ deadline_time,
db) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("ACE_Message_Block")));
}
@@ -338,6 +356,8 @@ ACE_Message_Block::ACE_Message_Block (ACE_Data_Block *data_block)
0, // locking strategy
0, // flags
0, // priority
+ ACE_Time_Value::zero, // execution time
+ ACE_Time_Value::zero, // absolute time of deadline
data_block) == -1) // data block
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("ACE_Message_Block")));
}
@@ -351,11 +371,15 @@ ACE_Message_Block::init_i (size_t size,
ACE_Lock *locking_strategy,
Message_Flags flags,
u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time,
ACE_Data_Block *db)
{
ACE_TRACE ("ACE_Message_Block::init_i");
this->priority_ = priority;
+ this->execution_time_ = execution_time;
+ this->deadline_time_ = deadline_time;
this->cont_ = msg_cont;
this->next_ = 0;
this->prev_ = 0;
@@ -591,6 +615,8 @@ ACE_Message_Block::duplicate (void) const
0, // locking strategy
0, // flags
this->priority_, // priority
+ this->execution_time_, // execution time
+ this->deadline_time_, // absolute time to deadline
// Get a pointer to a
// "duplicated" <ACE_Data_Block>
// (will simply increment the
@@ -683,6 +709,8 @@ ACE_Message_Block::clone (Message_Flags mask) const
0, // locking strategy
0, // flags
this->priority_, // priority
+ this->execution_time_, // execution time
+ this->deadline_time_, // absolute time to deadline
db); // data_block
if (nb == 0)
{
diff --git a/ace/Message_Block.h b/ace/Message_Block.h
index 16387310589..92f82ae4843 100644
--- a/ace/Message_Block.h
+++ b/ace/Message_Block.h
@@ -24,6 +24,7 @@
// Forward declaration.
class ACE_Data_Block;
class ACE_Lock;
+class ACE_Time_Value;
class ACE_Export ACE_Message_Block
{
@@ -113,7 +114,9 @@ public:
const char *data = 0,
ACE_Allocator *allocator_strategy = 0,
ACE_Lock *locking_strategy = 0,
- u_long priority = 0);
+ u_long priority = 0,
+ const ACE_Time_Value & execution_time = ACE_Time_Value::zero,
+ const ACE_Time_Value & deadline_time = ACE_Time_Value::zero);
// Create an initialized message of type <type> containing <size>
// bytes. The <cont> argument initializes the continuation field in
// the <Message_Block>. If <data> == 0 then we create and own the
@@ -138,7 +141,9 @@ public:
const char *data = 0,
ACE_Allocator *allocator_strategy = 0,
ACE_Lock *locking_strategy = 0,
- u_long priority = 0);
+ u_long priority = 0,
+ const ACE_Time_Value & execution_time = ACE_Time_Value::zero,
+ const ACE_Time_Value & deadline_time = ACE_Time_Value::zero);
// Create an initialized message of type <type> containing <size>
// bytes. The <cont> argument initializes the continuation field in
// the <Message_Block>. If <data> == 0 then we create and own the
@@ -186,6 +191,18 @@ public:
void msg_priority (u_long priority);
// Set priority of the message.
+ const ACE_Time_Value & msg_execution_time (void) const;
+ // Get execution time associated with the message.
+
+ void msg_execution_time (const ACE_Time_Value & et);
+ // Set execution time associated with the message.
+
+ const ACE_Time_Value & msg_deadline_time (void) const;
+ // Get absolute time of deadline associated with the message.
+
+ void msg_deadline_time (const ACE_Time_Value & dt);
+ // Set absolute time of deadline associated with the message.
+
// = Deep copy and shallow copy methods.
virtual ACE_Message_Block *clone (Message_Flags mask = 0) const;
@@ -314,6 +331,8 @@ private:
ACE_Lock *locking_strategy,
Message_Flags flags,
u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time,
ACE_Data_Block *db);
// Perform the actual initialization.
@@ -328,6 +347,8 @@ private:
ACE_Lock *locking_strategy,
Message_Flags flags,
u_long priority,
+ const ACE_Time_Value & execution_time,
+ const ACE_Time_Value & deadline_time,
ACE_Data_Block *db);
// Perform the actual initialization.
@@ -340,6 +361,12 @@ private:
u_long priority_;
// Priority of message.
+ ACE_Time_Value execution_time_;
+ // execution time associated with the message
+
+ ACE_Time_Value deadline_time_;
+ // absolute deadline time for message
+
// = Links to other ACE_Message_Block *s.
ACE_Message_Block *cont_;
// Pointer to next message block in the chain.
diff --git a/ace/Message_Block.i b/ace/Message_Block.i
index a6f3b5fbd7f..ed98549b3b4 100644
--- a/ace/Message_Block.i
+++ b/ace/Message_Block.i
@@ -175,6 +175,35 @@ ACE_Message_Block::msg_priority (u_long pri)
this->priority_ = pri;
}
+ACE_INLINE const ACE_Time_Value &
+ACE_Message_Block::msg_execution_time (void) const
+{
+ ACE_TRACE ("ACE_Message_Block::msg_execution_time (void)");
+ return this->execution_time_;
+}
+
+
+ACE_INLINE void
+ACE_Message_Block::msg_execution_time (const ACE_Time_Value & et)
+{
+ ACE_TRACE ("ACE_Message_Block::msg_execution_time (const ACE_Time_Value & et)");
+ this->execution_time_ = et;
+}
+
+ACE_INLINE const ACE_Time_Value &
+ACE_Message_Block::msg_deadline_time (void) const
+{
+ ACE_TRACE ("ACE_Message_Block::msg_deadline_time (void)");
+ return this->deadline_time_;
+}
+
+ACE_INLINE void
+ACE_Message_Block::msg_deadline_time (const ACE_Time_Value & dt)
+{
+ ACE_TRACE ("ACE_Message_Block::msg_deadline_time (const ACE_Time_Value & et)");
+ this->deadline_time_ = dt;
+}
+
ACE_INLINE char *
ACE_Message_Block::base (void) const
{
diff --git a/ace/Message_Queue.cpp b/ace/Message_Queue.cpp
index 4d9c46d53b1..2fb8e432421 100644
--- a/ace/Message_Queue.cpp
+++ b/ace/Message_Queue.cpp
@@ -12,6 +12,12 @@
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue)
+ACE_ALLOC_HOOK_DEFINE(ACE_Dynamic_Message_Queue)
+
+//////////////////////////////////////
+// class ACE_Message_Queue_Iterator //
+//////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -58,6 +64,10 @@ ACE_Message_Queue_Iterator<ACE_SYNCH_USE>::dump (void) const
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Iterator)
+//////////////////////////////////////////////
+// class ACE_Message_Queue_Reverse_Iterator //
+//////////////////////////////////////////////
+
template <ACE_SYNCH_DECL>
ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::ACE_Message_Queue_Reverse_Iterator (ACE_Message_Queue <ACE_SYNCH_USE> &q)
: queue_ (q),
@@ -102,6 +112,10 @@ ACE_Message_Queue_Reverse_Iterator<ACE_SYNCH_USE>::dump (void) const
{
}
+/////////////////////////////
+// class ACE_Message_Queue //
+/////////////////////////////
+
template <ACE_SYNCH_DECL> void
ACE_Message_Queue<ACE_SYNCH_USE>::dump (void) const
{
@@ -377,34 +391,34 @@ ACE_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
// We start looking from the highest priority to the lowest
// priority.
- for (temp = this->tail_;
+ for (temp = this->head_;
temp != 0;
- temp = temp->prev ())
- if (temp->msg_priority () >= new_item->msg_priority ())
- // Break out when we've located an item that has higher
- // priority that <new_item>.
- break;
+ temp = temp->next ())
+ if (temp->msg_priority () < new_item->msg_priority ())
+ // Break out when we've located an item that has lower
+ // priority that <new_item>.
+ break;
if (temp == 0)
- // Check for simple case of inserting at the head of the queue,
- // where all we need to do is insert <new_item> before the
- // current head.
- return this->enqueue_head_i (new_item);
- else if (temp->next () == 0)
- // Check for simple case of inserting at the end of the
- // queue, where all we need to do is insert <new_item> after
- // the current tail.
+ // Check for simple case of inserting at the tail of the queue,
+ // where all we need to do is insert <new_item> after the
+ // current tail.
return this->enqueue_tail_i (new_item);
+ else if (temp->prev () == 0)
+ // Check for simple case of inserting at the head of the
+ // queue, where all we need to do is insert <new_item> before
+ // the current head.
+ return this->enqueue_head_i (new_item);
else
{
- // Insert the message right before the item of equal or
- // higher priority. This ensures that FIFO order is
+ // Insert the new message ahead of the item of
+ // lesser priority. This ensures that FIFO order is
// maintained when messages of the same priority are
// inserted consecutively.
- new_item->prev (temp);
- new_item->next (temp->next ());
- temp->next ()->prev (new_item);
- temp->next (new_item);
+ new_item->next (temp);
+ new_item->prev (temp->prev ());
+ temp->prev ()->next (new_item);
+ temp->prev (new_item);
}
}
@@ -706,4 +720,480 @@ ACE_Message_Queue<ACE_SYNCH_USE>::notify (void)
return this->notification_strategy_->notify ();
}
+////////////////////////////////////////
+// class ACE_Dynamic_Message_Strategy //
+////////////////////////////////////////
+
+ACE_Dynamic_Message_Strategy::ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : static_bit_field_mask_ (static_bit_field_mask)
+ , static_bit_field_shift_ (static_bit_field_shift)
+ , pending_threshold_ (pending_threshold)
+ , dynamic_priority_max_ (dynamic_priority_max)
+ , dynamic_priority_offset_ (dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Dynamic_Message_Strategy::~ACE_Dynamic_Message_Strategy ()
+{
+}
+// dtor
+
+///////////////////////////////////////
+// class ACE_Deadline_Message_Strategy //
+///////////////////////////////////////
+
+ACE_Deadline_Message_Strategy:: ACE_Deadline_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : ACE_Dynamic_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Deadline_Message_Strategy::~ACE_Deadline_Message_Strategy ()
+{
+}
+// dtor
+
+int
+ACE_Deadline_Message_Strategy::update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // The general formula for this deadline based dynamic priority
+ // function is to just subtract the current time and the execution
+ // time from the from the message deadline to get the time to deadline,
+ // then subtract the time to deadline from a constant C that depends on
+ // whether the time to deadline is negative (C is zero) or non-negative
+ // (C is the maximum allowed priority). But, to save operations for
+ // performance we use an optimized (albeit confusing: our apologies ;-)
+ // formula for the dynamic priority calculation.
+
+ // first, compute the *negative* (additive inverse) of the time to deadline
+ ACE_Time_Value priority (tv);
+ priority -= mb.msg_deadline_time ();
+
+ if (priority >= ACE_Time_Value::zero)
+ {
+ // if negative time to deadline is positive then the message is late:
+ // need to make sure the priority stays below the threshold
+ // between pending and late priority values
+ ACE_Time_Value
+ max_late (0, dynamic_priority_offset_ - 1);
+
+ if (priority > max_late)
+ {
+ priority = max_late;
+ }
+ }
+ else
+ {
+ // if negative time to deadline is negative then the message is pending:
+ // so, we need to shift priority upward by adding the maximum priority
+ // value and then make sure the value stays above the threshold between
+ // pending and late message priorities.
+ priority +=
+ ACE_Time_Value (0, dynamic_priority_max_);
+
+ ACE_Time_Value
+ min_pending (0, dynamic_priority_offset_);
+
+ if (priority < min_pending)
+ {
+ priority = min_pending;
+ }
+ }
+
+ // use (fast) bitwise operators to isolate and replace
+ // the dynamic portion of the message's priority
+ mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
+ ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
+ static_bit_field_shift_));
+
+ return 0;
+}
+ // priority evaluation function based on time to deadline
+
+int
+ACE_Deadline_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // first, compute the *negative* time to deadline
+ ACE_Time_Value priority (tv);
+ priority -= mb.msg_deadline_time ();
+
+ // construct a time value with the maximum late value that
+ // can be represented in the dynamic priority range
+ ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
+
+ // if negative time to deadline is greater than the maximum value
+ // that can be represented, it is identified as being beyond late
+ return (priority > max_late) ? 1 : 0;
+}
+ // returns true if the message is later than can can be represented
+
+///////////////////////////////////////
+// class ACE_Laxity_Message_Strategy //
+///////////////////////////////////////
+
+ACE_Laxity_Message_Strategy::ACE_Laxity_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+ : ACE_Dynamic_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset)
+{
+}
+// ctor
+
+ACE_Laxity_Message_Strategy::~ACE_Laxity_Message_Strategy ()
+{
+}
+// dtor
+
+
+int
+ACE_Laxity_Message_Strategy::update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // The general formula for this laxity based dynamic priority
+ // function is to just subtract the current time and the execution
+ // time from the from the message deadline to get the laxity,
+ // then subtract the laxity from a constant C that depends on whether
+ // the laxity is negative (C is zero) or non-negative (C is the maximum
+ // allowed priority). But, to save operations for performance we use
+ // an optimized (albeit confusing: our apologies ;-) formula
+ // for the dynamic priority calculation.
+
+ // first, compute the *negative* laxity
+ ACE_Time_Value priority (tv);
+ priority += mb.msg_execution_time ();
+ priority -= mb.msg_deadline_time ();
+
+ if (priority >= ACE_Time_Value::zero)
+ {
+ // if negative laxity is positive then the message is late:
+ // need to make sure the priority stays below the threshold
+ // between pending and late priority values
+ ACE_Time_Value
+ max_late (0, dynamic_priority_offset_ - 1);
+
+ if (priority > max_late)
+ {
+ priority = max_late;
+ }
+ }
+ else
+ {
+ // if negative laxity is negative then the message is pending: so, we
+ // need to shift priority upward by adding the maximum priority value
+ // and then make sure the value stays above the threshold between
+ // pending and late message priorities.
+ priority +=
+ ACE_Time_Value (0, dynamic_priority_max_);
+
+ ACE_Time_Value
+ min_pending (0, dynamic_priority_offset_);
+
+ if (priority < min_pending)
+ {
+ priority = min_pending;
+ }
+ }
+
+ // use (fast) bitwise operators to isolate and replace
+ // the dynamic portion of the message's priority
+ mb.msg_priority((mb.msg_priority() & static_bit_field_mask_) |
+ ((priority.usec () + ACE_ONE_SECOND_IN_USECS * priority.sec ()) <<
+ static_bit_field_shift_));
+
+ return 0;
+}
+ // priority evaluation function based on laxity
+
+int
+ACE_Laxity_Message_Strategy::is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ // first, compute the *negative* laxity
+ ACE_Time_Value priority (tv);
+ priority += mb.msg_execution_time ();
+ priority -= mb.msg_deadline_time ();
+
+ // construct a time value with the maximum late value that
+ // can be represented in the dynamic priority range
+ ACE_Time_Value max_late (0, dynamic_priority_offset_ - 1);
+
+ // if negative laxity is greater than the maximum value that
+ // can be represented, it is identified as being beyond late
+ return (priority > max_late) ? 1 : 0;
+}
+ // returns true if the message is later than can can be represented
+
+
+/////////////////////////////////////
+// class ACE_Dynamic_Message_Queue //
+/////////////////////////////////////
+
+ // = Initialization and termination methods.
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::ACE_Dynamic_Message_Queue (
+ ACE_Dynamic_Message_Strategy & message_strategy,
+ size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+ : ACE_Message_Queue (hwm, lwm, ns)
+ , message_strategy_ (message_strategy)
+{
+ // note, the ACE_Dynamic_Message_Queue assumes full responsibility for the
+ // passed ACE_Dynamic_Message_Strategy object, and deletes it in its own dtor
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::~ACE_Dynamic_Message_Queue (void)
+{
+ delete &message_strategy_;
+}
+// dtor: free message strategy and let base class dtor do the rest
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i (ACE_Message_Block *new_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::enqueue_i");
+
+ int result = 0;
+
+ // refresh dynamic priority of the new message
+ result = (*priority_eval_func_ptr_) (*new_item, tv);
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ this->refresh_priorities (current_time);
+
+ // reorganize the queue according to the new priorities
+ this->refresh_queue (current_time);
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (ACE_Message_Block *&first_item)
+{
+ ACE_TRACE ("ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i");
+
+ int result = 0;
+
+ // get the current time
+ ACE_Time_Value current_time = ACE_OS::gettimeofday ();
+
+ // refresh dynamic priorities of messages in the queue
+ result = this->refresh_priorities (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // reorganize the queue according to the new priorities
+ result = this->refresh_queue (current_time);
+ if (result < 0)
+ {
+ return result;
+ }
+
+ // if there is only one message in the pending list,
+ // the pending list will be empty after a *successful*
+ // dequeue operation
+ int empty_pending = (head_ == pending_list_tail_) ? 1 : 0;
+
+ // invoke the base class method
+ result = ACE_Message_Queue<ACE_SYNCH_USE>::dequeue_head_i (first_item);
+
+ // null out the pending list tail pointer if
+ // the pending list is now empty
+ if ((empty_pending) && (result > 0))
+ {
+ pending_list_tail_ = 0;
+ }
+
+ return result;
+}
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_priorities (const ACE_Time_Value & tv)
+{
+ int result = 0;
+
+ // apply the priority update function to all enqueued
+ // messages, starting at the head of the queue
+ ACE_Message_Block *temp = head_;
+ while (temp)
+ {
+ result = (*priority_eval_func_ptr_) (*temp, tv);
+ if (result < 0)
+ {
+ break;
+ }
+
+ temp = temp->next ();
+ }
+
+ return result;
+}
+ // refresh the priorities in the queue according
+ // to a specific priority assignment function
+
+template <ACE_SYNCH_DECL> int
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE>::refresh_queue (const ACE_Time_Value & tv)
+{
+ // first, drop any messages from the queue and delete them:
+ // reference counting at the data block level means that the
+ // underlying data block will not be deleted if another
+ // message block is still pointing to it.
+ ACE_Message_Block *temp = (pending_list_tail_)
+ ? pending_list_tail_->next ();
+ : head_;
+
+ while (temp)
+ {
+ // messages that have overflowed the given time bounds must be removed
+ if (message_strategy_.is_beyond_late (*temp, tv))
+ {
+ // find the end of the chain of overflowed messages
+ ACE_Message_Block *remove_tail = temp;
+ while ((remove_tail) && (remove_tail->next ()) &&
+ message_strategy_.is_beyond_late (*(remove_tail->next ()), tv))
+ {
+ remove_tail = remove_tail->next ();
+ }
+
+ temp = remove_tail->next ();
+ if (remove_temp->next ())
+ {
+ remove_temp->next ()->prev (0);
+ }
+ else if (remove_temp->prev ())
+ {
+ remove_temp->prev ()->next (0);
+ }
+ else
+ {
+ head_ = 0;
+ tail_ = 0;
+ }
+ remove_temp->prev (0);
+ remove_temp->next (0);
+
+ temp = remove_temp->next ();
+
+ }
+ else
+ {
+ temp = temp->next ();
+ }
+ }
+}
+ // refresh the order of messages in the queue
+ // after refreshing their priorities
+
+/////////////////////////////////////
+// class ACE_Message_Queue_Factory //
+/////////////////////////////////////
+
+template <ACE_SYNCH_DECL>
+ACE_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_static_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns)
+{
+ return new ACE_Message_Queue<ACE_SYNCH_USE> (hwm, lwm, ns);
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_deadline_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Deadline_Message_Strategy *adms;
+
+ ACE_NEW_RETURN (adms,
+ ACE_Deadline_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*adms, hwm, lwm, ns);
+}
+
+template <ACE_SYNCH_DECL>
+ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ACE_Message_Queue_Factory<ACE_SYNCH_USE>::create_laxity_message_queue (size_t hwm,
+ size_t lwm,
+ ACE_Notification_Strategy *ns,
+ u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset)
+{
+ ACE_Laxity_Message_Strategy *alms;
+
+ ACE_NEW_RETURN (alms,
+ ACE_Laxity_Message_Strategy (static_bit_field_mask,
+ static_bit_field_shift,
+ pending_threshold,
+ dynamic_priority_max,
+ dynamic_priority_offset),
+ 0);
+
+
+ return new ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> (*alms, hwm, lwm, ns);
+}
+
+
#endif /* ACE_MESSAGE_QUEUE_C */
diff --git a/ace/Message_Queue.h b/ace/Message_Queue.h
index 0dc14ea4776..91873b8964b 100644
--- a/ace/Message_Queue.h
+++ b/ace/Message_Queue.h
@@ -363,6 +363,275 @@ private:
// Keeps track of how far we've advanced...
};
+class ACE_Export ACE_Dynamic_Message_Strategy
+{
+ // = TITLE
+ // An abstract base class which provides dynamic priority evaluation
+ // methods for use by the ACE_Dynamic_Message_Queue class
+ // or any other class which needs to manage the priorities
+ // of a collection of ACE_Message_Blocks dynamically
+ //
+ // = DESCRIPTION
+ // Methods for deadline and laxity based priority evaluation
+ // are provided. These methods assume a specific partitioning
+ // of the message priority number into a higher order dynamic
+ // bit field and a lower order static priority bit field. The
+ // default partitioning assumes an unsigned dynamic message
+ // priority field of 22 bits and an unsigned static message
+ // priority field of 10 bits. This corresponds to the initial
+ // values of the static class members. To provide a different
+ // partitioning, assign a different set of values to the static
+ // class memebers before using the static member functions.
+public:
+
+ ACE_Dynamic_Message_Strategy (u_long static_bit_field_mask,
+ u_long static_bit_field_shift,
+ u_long pending_threshold,
+ u_long dynamic_priority_max,
+ u_long dynamic_priority_offset);
+ // ctor
+
+ virtual ~ACE_Dynamic_Message_Strategy ();
+ // virtual dtor
+
+ virtual int update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv) = 0;
+ // abstract dynamic priority evaluation function:
+ // updates the synamic priority bit field but does not
+ // alter the static priority bit field
+
+ int is_pending (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // returns true if the message has a pending (not late) priority value
+
+ virtual int is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv) = 0;
+ // returns true if the message is later than can can be represented
+
+ u_long static_bit_field_mask (void);
+ // get static bit field mask
+
+ void static_bit_field_mask (u_long);
+ // set static bit field mask
+
+ u_long static_bit_field_shift (void);
+ // get left shift value to make room for static bit field
+
+ void static_bit_field_shift (u_long);
+ // set left shift value to make room for static bit field
+
+ u_long pending_threshold (void);
+ // get pending threshold priority value
+
+ void pending_threshold (u_long);
+ // set pending threshold priority value
+
+ u_long dynamic_priority_max (void);
+ // get maximum supported priority value
+
+ void dynamic_priority_max (u_long);
+ // set maximum supported priority value
+
+ u_long dynamic_priority_offset (void);
+ // get axis shift to map signed range into unsigned range
+
+ void dynamic_priority_offset (u_long);
+ // set axis shift to map signed range into unsigned range
+
+protected:
+
+ u_long static_bit_field_mask_;
+ // this is a bit mask with all ones in the static bit field
+
+ u_long static_bit_field_shift_;
+ // this is a left shift value to make room for static bit
+ // field: this value should be the logarithm base 2 of
+ // (static_bit_field_mask_ + 1)
+
+ u_long pending_threshold_;
+ // threshold priority value below which a message is considered late
+
+ u_long dynamic_priority_max_;
+ // maximum supported priority value
+
+ u_long dynamic_priority_offset_;
+ // axis shift added to all values, in order to map signed
+ // range into unsigned range (priority is an unsigned value).
+};
+
+class ACE_Export ACE_Deadline_Message_Strategy : public ACE_Dynamic_Message_Strategy
+{
+public:
+
+ ACE_Deadline_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ // ctor, with all arguments defaulted
+
+ virtual ~ACE_Deadline_Message_Strategy ();
+ // virtual dtor
+
+ virtual int update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // dynamic priority evaluation function based on time to
+ // deadline: updates the synamic priority bit field but
+ // does not alter the static priority bit field
+
+ int is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // returns true if the message is later than can can be represented
+};
+
+class ACE_Export ACE_Laxity_Message_Strategy : public ACE_Dynamic_Message_Strategy
+{
+public:
+
+ ACE_Laxity_Message_Strategy (u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+ // ctor, with all arguments defaulted
+
+ virtual ~ACE_Laxity_Message_Strategy ();
+ // virtual dtor
+
+ virtual int update_priority (ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // dynamic priority evaluation function based on time to
+ // deadline: updates the dynamic priority bit field but
+ // does not alter the static priority bit field
+
+ int is_beyond_late (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv);
+ // returns true if the message is later than can can be represented
+};
+
+
+template <ACE_SYNCH_DECL>
+class ACE_Dynamic_Message_Queue : public ACE_Message_Queue<ACE_SYNCH_USE>
+{
+ // = TITLE
+ // A derived class which adapts the ACE_Message_Queue
+ // class in order to maintain dynamic priorities for enqueued
+ // ACE_Message_Blocks and manage the queue dynamically.
+ //
+ // = DESCRIPTION
+ // Priorities and queue orderings are refreshed at each enqueue and
+ // dequeue operation. Head and tail enqueue methods were made private
+ // to prevent out-of-order messages from confusing the pending
+ // and late portions of the queue. Messages in the pending portion of
+ // the queue whose dynamic priority becomes negative are placed into
+ // the late portion of the queue. Messages in the late portion of
+ // the queue whose dynamic priority becomes positive are dropped.
+ // These behaviors support a limited schedule overrun corresponding
+ // to one full cycle through dynamic priority values. These behaviors
+ // can be modified in derived classes by providing alternative
+ // definitions for the appropriate virtual methods.
+ //
+public:
+
+ // = Initialization and termination methods.
+ ACE_Dynamic_Message_Queue (ACE_Dynamic_Message_Strategy & message_strategy,
+ size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ virtual ~ACE_Dynamic_Message_Queue (void);
+ // Close down the message queue and release all resources.
+
+ ACE_ALLOC_HOOK_DECLARE;
+ // Declare the dynamic allocation hooks.
+
+protected:
+
+ virtual int enqueue_i (ACE_Message_Block *new_item);
+ // Enqueue an <ACE_Message_Block *> in accordance with its priority.
+ // priority may be *dynamic* or *static* or a combination or *both*
+ // It calls the priority evaluation function passed into the Dynamic
+ // Message Queue constructor to update the priorities of all enqueued
+ // messages.
+
+ virtual int dequeue_head_i (ACE_Message_Block *&first_item);
+ // Dequeue and return the <ACE_Message_Block *> at the head of the
+ // queue.
+
+ virtual int refresh_priorities (const ACE_Time_Value & tv);
+ // refresh the priorities in the queue according
+ // to a specific priority assignment function
+
+ virtual int refresh_queue (const ACE_Time_Value & tv);
+ // refresh the order of messages in the queue
+ // after refreshing their priorities
+
+ ACE_Message_Block *pending_list_tail_;
+ // Pointer to tail of the pending messages (those whose priority is
+ // and has been non-negative) in the ACE_Message_Block list.
+
+ ACE_Dynamic_Message_Strategy & message_strategy_;
+ // Pointer to a dynamic priority evaluation function
+
+private:
+
+ // = Disallow these operations.
+ ACE_UNIMPLEMENTED_FUNC (void operator= (const ACE_Message_Queue<ACE_SYNCH_USE> &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Message_Queue (const ACE_Message_Queue<ACE_SYNCH_USE> &))
+
+ // External invocation of these could cause extreme wierdness
+ // in a dynamically prioritized queue: disallow their use until
+ // and unless a coherent semantics for head and tail enqueueing
+ // can be identified.
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ enqueue_tail (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ enqueue_head (ACE_Message_Block *new_item,
+ ACE_Time_Value *timeout = 0))
+
+ // As messages are *dynamically* prioritized, it is not possible to
+ // guarantee that the message at the head of the queue when this
+ // method is called will still be at the head when the next method
+ // is called: disallow its use until and unless a coherent semantics
+ // for peeking at the head of the queue can be identified.
+ ACE_UNIMPLEMENTED_FUNC (virtual int
+ peek_dequeue_head (ACE_Message_Block *&first_item,
+ ACE_Time_Value *tv = 0))
+};
+
+
+template <ACE_SYNCH_DECL>
+class ACE_Export ACE_Message_Queue_Factory
+{
+public:
+
+ static ACE_Message_Queue<ACE_SYNCH_USE> *
+ create_static_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0);
+
+ static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ create_deadline_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0,
+ u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+
+ static ACE_Dynamic_Message_Queue<ACE_SYNCH_USE> *
+ create_laxity_message_queue (size_t hwm = DEFAULT_HWM,
+ size_t lwm = DEFAULT_LWM,
+ ACE_Notification_Strategy * = 0,
+ u_long static_bit_field_mask = 0x3FFUL, // 2^(10) - 1
+ u_long static_bit_field_shift = 10, // 10 low order bits
+ u_long pending_threshold = 0x200000UL, // 2^(22-1)
+ u_long dynamic_priority_max = 0x3FFFFFUL, // 2^(22)-1
+ u_long dynamic_priority_offset = 0x200000UL); // 2^(22-1)
+};
+
// This must go here to avoid problems with circular includes.
#include "ace/Strategies.h"
diff --git a/ace/Message_Queue.i b/ace/Message_Queue.i
index 6697eb242a4..804e8ea6e21 100644
--- a/ace/Message_Queue.i
+++ b/ace/Message_Queue.i
@@ -145,4 +145,82 @@ ACE_Message_Queue<ACE_SYNCH_USE>::deactivated (void)
ACE_ALLOC_HOOK_DEFINE(ACE_Message_Queue_Reverse_Iterator)
+ACE_INLINE int
+ACE_Dynamic_Message_Strategy::is_pending (const ACE_Message_Block & mb,
+ const ACE_Time_Value & tv)
+{
+ return ((mb.msg_priority () < pending_threshold_) ||
+ this->is_beyond_late (mb, tv))
+ ? 0 : 1;
+}
+ // returns true if the message has a pending (not late) priority value
+
+ACE_INLINE u_long
+ACE_Dynamic_Message_Strategy::static_bit_field_mask (void)
+{
+ return static_bit_field_mask_;
+}
+ // get static bit field mask
+
+ACE_INLINE void
+ACE_Dynamic_Message_Strategy::static_bit_field_mask (u_long ul)
+{
+ static_bit_field_mask_ = ul;
+}
+ // set static bit field mask
+ACE_INLINE u_long
+ACE_Dynamic_Message_Strategy::static_bit_field_shift (void)
+{
+ return static_bit_field_shift_;
+}
+ // get left shift value to make room for static bit field
+
+ACE_INLINE void
+ACE_Dynamic_Message_Strategy::static_bit_field_shift (u_long ul)
+{
+ static_bit_field_shift_ = ul;
+}
+ // set left shift value to make room for static bit field
+
+ACE_INLINE u_long
+ACE_Dynamic_Message_Strategy::pending_threshold (void)
+{
+ return pending_threshold_;
+}
+ // get pending threshold priority value
+
+ACE_INLINE void
+ACE_Dynamic_Message_Strategy::pending_threshold (u_long ul)
+{
+ pending_threshold_ = ul;
+}
+ // set pending threshold priority value
+
+ACE_INLINE u_long
+ACE_Dynamic_Message_Strategy::dynamic_priority_max (void)
+{
+ return dynamic_priority_max_;
+}
+ // get maximum supported priority value
+
+ACE_INLINE void
+ACE_Dynamic_Message_Strategy::dynamic_priority_max (u_long ul)
+{
+ dynamic_priority_max_ = ul;
+}
+ // set maximum supported priority value
+
+ACE_INLINE u_long
+ACE_Dynamic_Message_Strategy::dynamic_priority_offset (void)
+{
+ return dynamic_priority_offset_;
+}
+ // get axis shift to map signed range into unsigned range
+
+ACE_INLINE void
+ACE_Dynamic_Message_Strategy::dynamic_priority_offset (u_long ul)
+{
+ dynamic_priority_offset_ = ul;
+}
+ // set axis shift to map signed range into unsigned range