summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-12-02 00:08:12 +0000
committerthrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-12-02 00:08:12 +0000
commitaf3f97ac4930f05f6ba3d562988e030dd144ecd3 (patch)
treeac20ffcfce424cfe27ce7c8c644d4e15eca2efbc
parentd3a98f5c49315692c10fa4767dc99fbf930cf126 (diff)
downloadATCD-af3f97ac4930f05f6ba3d562988e030dd144ecd3.tar.gz
Added support for Release Guard algorithm, which is enacted if KOKYU_HAS_RELEASE_GUARD is defined at compile-time.
-rw-r--r--Kokyu/Dispatch_Deferrer.cpp98
-rw-r--r--Kokyu/Dispatch_Deferrer.h94
-rw-r--r--Kokyu/Dispatch_Deferrer.i29
-rw-r--r--Kokyu/Dispatcher_Task.cpp59
-rw-r--r--Kokyu/Dispatcher_Task.h28
-rw-r--r--Kokyu/Dispatcher_Task.i33
-rw-r--r--Kokyu/Kokyu.mpc1
-rw-r--r--Kokyu/Kokyu_defs.h16
-rw-r--r--Kokyu/Kokyu_defs.i11
9 files changed, 364 insertions, 5 deletions
diff --git a/Kokyu/Dispatch_Deferrer.cpp b/Kokyu/Dispatch_Deferrer.cpp
new file mode 100644
index 00000000000..a959f932fe0
--- /dev/null
+++ b/Kokyu/Dispatch_Deferrer.cpp
@@ -0,0 +1,98 @@
+// $Id$
+
+#include "ace/Sched_Params.h"
+#include "Dispatch_Deferrer.h"
+#include "Dispatcher_Task.h"
+
+#if ! defined (__ACE_INLINE__)
+#include "Dispatch_Deferrer.i"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Kokyu, Dispatch_Deferrer, "$Id$")
+
+namespace Kokyu
+{
+
+int
+Dispatch_Deferrer::init(const Dispatch_Deferrer_Attributes& attr)
+{
+ //set up reactor for timeouts
+ this->react_.open(0);
+ //Don't need any handles, since we're only using it for timeouts
+
+ this->timers_.open();
+
+ this->task_ = attr.task_;
+
+ return 0;
+}
+
+int
+Dispatch_Deferrer::dispatch (Dispatch_Queue_Item *qitem)
+{
+ ACE_ASSERT(qitem != 0);
+
+ //setup timout
+ //For now, assume period = deadline
+ ACE_Time_Value tv;
+ tv = ACE_OS::gettimeofday() + qitem->qos_info().deadline_;
+ long timer_id = this->react_.schedule_timer(this,
+ 0, //NULL arg
+ tv);
+ if (timer_id < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("EC (%P|%t) cannot schedule Release Guard timer.")
+ ACE_TEXT ("ACE_Reactor.schedule_timer() returned -1\n")),
+ -1);
+ }
+ //else valid timer_id
+ this->timers_.bind(qitem,timer_id);
+
+ //buffer until timer expires
+ return this->rgq_.enqueue_deadline(qitem,&tv);
+}
+
+
+int
+Dispatch_Deferrer::handle_timeout (const ACE_Time_Value &,
+ const void *)
+{
+ //get all expired Dispatch_Queue_Items
+ ACE_Message_Block *begin,*end;
+ this->rgq_.remove_messages(begin,end,
+ (u_int) (ACE_Dynamic_Message_Strategy::LATE
+ | ACE_Dynamic_Message_Strategy::BEYOND_LATE));
+
+ //dispatch them back to Dispatcher_Impl
+ while (begin <= end)
+ {
+ Dispatch_Queue_Item *qitem =
+ ACE_dynamic_cast(Dispatch_Queue_Item*, begin);
+
+ if (qitem == 0)
+ {
+ ACE_Message_Block::release (begin);
+ continue;
+ }
+
+
+ //remove timer for each enqueued qitem from reactor
+ long timer_id;
+ if (this->timers_.find(qitem,timer_id) < 0)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Could not cancel Release Guard timer.")
+ ACE_TEXT ("Unknown timer ID\n")),
+ -1);
+ }
+ //else got timer_id
+ this->react_.cancel_timer(timer_id);
+
+ this->task_->enqueue(qitem);
+
+ ++begin;
+ }
+}
+
+} //namespace Kokyu
diff --git a/Kokyu/Dispatch_Deferrer.h b/Kokyu/Dispatch_Deferrer.h
new file mode 100644
index 00000000000..c7ae8cc6cc1
--- /dev/null
+++ b/Kokyu/Dispatch_Deferrer.h
@@ -0,0 +1,94 @@
+/* -*- C++ -*- */
+/**
+ * @file Dispatch_Deferrer.h
+ *
+ * $Id$
+ *
+ * @author Bryan Thrall (thrall@cse.wustl.edu)
+ *
+ */
+
+#ifndef DISPATCH_DEFERRER_H
+#define DISPATCH_DEFERRER_H
+#include /**/ "ace/pre.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "kokyu_export.h"
+#include "Kokyu_defs.h"
+#include "ace/Event_Handler.h"
+#include "ace/Thread_Mutex.h"
+#include "ace/Message_Block.h"
+#include "ace/Message_Queue.h"
+#include "ace/Reactor.h"
+#include "ace/Map.h"
+
+namespace Kokyu
+{
+
+class Dispatch_Task; //forward decl
+class Dispatch_Queue_Item; //forward decl
+
+/**
+ * @class Dispatch_Deferrer
+ *
+ * @brief Part of the Release Guard protocol. When a Dispatch_Command
+ * needs to be dispatched later rather than when dispatch_i() is
+ * called on the Default_Dispatcher_Impl, it is passed to this
+ * object. When the appropriate time to dispatch the Dispatch_Command
+ * comes (last release time + period), this object calls enqueue() on
+ * the Dispatcher_Task.
+ */
+class Dispatch_Deferrer : public ACE_Event_Handler
+{
+ public:
+ Dispatch_Deferrer();
+ //Default constructor
+
+ ~Dispatch_Deferrer();
+ //Destructor
+
+ int init(const Dispatch_Deferrer_Attributes& attr);
+
+ int dispatch (Dispatch_Queue_Item *qitem);
+
+ virtual int handle_timeout (const ACE_Time_Value &current_time,
+ const void *act = 0);
+ //TODO: what if need higher resolution timers?
+
+ private:
+ ACE_Deadline_Message_Strategy msg_strat_;
+
+ ///Stores the Dispatch_Commands in earliest-release-time order,
+ ///until they are dispatched. I decided to use an
+ ///ACE_Dynamic_Message_Queue because it supports deadline
+ ///ordering. This decision is also good because we can simply store
+ ///the Dispatch_Queue_Item given to us by the
+ ///Default_Dispatcher_Impl rather than allocate some structure to
+ ///hold the Dispatch_Command and QoSDescriptor.
+ ACE_Dynamic_Message_Queue<ACE_SYNCH> rgq_;
+
+ //Stores timer_ids from the Reactor (longs) using the
+ //Dispatch_Queue_Item the timer is for as the key. Used to
+ //cancel timers if they expire and are enqueued before the
+ //callback happens.
+ typedef ACE_Map_Manager<Dispatch_Queue_Item*,long,ACE_Thread_Mutex> Timer_Map;
+
+ Timer_Map timers_;
+
+ ///Manages timers for the Dispatch_Commands
+ ACE_Reactor react_;
+
+ Dispatcher_Task* task_;
+};
+
+} //namespace Kokyu
+
+#if defined (__ACE_INLINE__)
+#include "Dispatch_Deferrer.i"
+#endif /* __ACE_INLINE__ */
+
+#include /**/ "ace/post.h"
+#endif //DISPATCH_DEFERRER_H
diff --git a/Kokyu/Dispatch_Deferrer.i b/Kokyu/Dispatch_Deferrer.i
new file mode 100644
index 00000000000..315afce5598
--- /dev/null
+++ b/Kokyu/Dispatch_Deferrer.i
@@ -0,0 +1,29 @@
+// $Id$
+
+namespace Kokyu
+{
+
+ACE_INLINE
+Dispatch_Deferrer_Attributes::Dispatch_Deferrer_Attributes()
+{
+}
+
+ACE_INLINE
+Dispatch_Deferrer::Dispatch_Deferrer()
+ : msg_strat_()
+ , rgq_(this->msg_strat_)
+ , timers_()
+ , react_()
+ , task_(0)
+{
+}
+
+ACE_INLINE
+Dispatch_Deferrer::~Dispatch_Deferrer()
+{
+ this->react_.close();
+
+ this->timers_.close();
+}
+
+} //namespace Kokyu
diff --git a/Kokyu/Dispatcher_Task.cpp b/Kokyu/Dispatcher_Task.cpp
index e6d86711865..5fcce5f328e 100644
--- a/Kokyu/Dispatcher_Task.cpp
+++ b/Kokyu/Dispatcher_Task.cpp
@@ -68,6 +68,13 @@ Dispatcher_Task::initialize ()
own_allocator_ = 1;
}
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ this->deferrer_attr_.task_ = this;
+ this->deferrer_.init(this->deferrer_attr_);
+
+ this->releases_.open();
+#endif //KOKYU_HAS_RELEASE_GUARD
+
return 0;
}
@@ -138,6 +145,13 @@ int
Dispatcher_Task::enqueue (const Dispatch_Command* cmd,
const QoSDescriptor& qos_info)
{
+ //Subclasses which override this function should now be overriding
+ //enqueue(ACE_Message_Block*) because that is where the main
+ //behavior is defined. This might invalidate those classes, but the
+ //decision seemed reasonable since it allows Dispatch_Deferrer to
+ //use the same Dispatch_Queue_Item as Dispatcher_Task without any
+ //more memory allocation and it doesn't break the interface.
+
void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item));
if (buf == 0)
@@ -145,12 +159,47 @@ Dispatcher_Task::enqueue (const Dispatch_Command* cmd,
ACE_Message_Block *mb =
new (buf) Dispatch_Queue_Item (cmd,
- qos_info,
- &(this->data_block_),
- ACE_Message_Block::DONT_DELETE,
- this->allocator_);
+ qos_info,
+ &(this->data_block_),
+ ACE_Message_Block::DONT_DELETE,
+ this->allocator_);
+
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ //if current release time < last release time + period then defer dispatch
+ ACE_Time_Value now(ACE_OS::gettimeofday());
+ long rel_msec;
+ if (this->releases_.find(qos_info,rel_msec) < 0)
+ {
+ //new QosDescriptor, so just set last release to zero
+ rel_msec = 0;
+ }
+ ACE_Time_Value release;
+ release.msec(rel_msec);
+ release += qos_info.deadline_;
+ if (now < release)
+ {
+ //defer until last release time + period
+ Dispatch_Queue_Item *qitem =
+ ACE_dynamic_cast(Dispatch_Queue_Item*, mb);
+
+ if (qitem == 0)
+ {
+ ACE_Message_Block::release (mb);
+ continue;
+ }
+
+ this->deferrer_.dispatch(qitem);
+ }
+ else
+ {
+ //release!
+#endif //KOKYU_HAS_RELEASE_GUARD
+
+ this->enqueue (mb);
- this->putq (mb);
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ } //else now >= release
+#endif //KOKYU_HAS_RELEASE_GUARD
return 0;
}
diff --git a/Kokyu/Dispatcher_Task.h b/Kokyu/Dispatcher_Task.h
index 0bb91313f47..cb18cb588c2 100644
--- a/Kokyu/Dispatcher_Task.h
+++ b/Kokyu/Dispatcher_Task.h
@@ -16,6 +16,11 @@
#include "ace/Task.h"
#include "ace/Lock_Adapter_T.h"
+#ifdef KOKYU_HAS_RELEASE_GUARD
+#include "ace/Map.h"
+#include "Dispatch_Deferrer.h"
+#endif //KOKYU_HAS_RELEASE_GUARD
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
@@ -42,6 +47,9 @@ public:
Dispatch_Command* command ();
+ //Any reason why this shouldn't be visible?
+ const QoSDescriptor& qos_info() const;
+
private:
void init_i(const QoSDescriptor&);
@@ -71,6 +79,8 @@ public:
int enqueue (const Dispatch_Command* cmd,
const QoSDescriptor& qos_info);
+ int enqueue (Dispatch_Queue_Item *qitem);
+
/// Process the events in the queue.
int svc (void);
@@ -94,6 +104,24 @@ private:
ACE_Deadline_Message_Strategy deadline_msg_strategy_;
ACE_Laxity_Message_Strategy laxity_msg_strategy_;
+
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ //TODO: What's the best way to identify periodic events?
+ //For now, use QoSDescriptor equivalence.
+ //Maps QoSDescriptors to last release time of events for that
+ //QoSDescriptor. Seems kind of wasteful to store the whole
+ //QoSDescriptor as the key, but I don't see any other way since each
+ //Dispatch_Queue_Item has its own instance of the QoSDescriptor.
+ //The release time is stored as a long as if from
+ //ACE_Time_Value.msec().
+ typedef ACE_Map_Manager<QoSDescriptor,long,ACE_SYNCH_NULL_MUTEX> Release_Time_Map;
+
+ Release_Time_Map releases_;
+
+ //For delaying dispatch until required by RG:
+ Dispatch_Deferrer_Attributes deferrer_attr_;
+ Dispatch_Deferrer deferrer_;
+#endif //KOKYU_HAS_RELEASE_GUARD
};
} //end of namespace
diff --git a/Kokyu/Dispatcher_Task.i b/Kokyu/Dispatcher_Task.i
index e663509d86b..c67a2acdaf8 100644
--- a/Kokyu/Dispatcher_Task.i
+++ b/Kokyu/Dispatcher_Task.i
@@ -17,6 +17,11 @@ Dispatcher_Task::Dispatcher_Task (const ConfigInfo& config_info,
config_info.reordering_flags_.static_bit_field_shift_,
config_info.reordering_flags_.dynamic_priority_max_,
config_info.reordering_flags_.dynamic_priority_offset_)
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ , releases_()
+ , deferrer_attr_()
+ , deferrer_()
+#endif //KOKYU_HAS_RELEASE_GUARD
{
this->initialize();
}
@@ -46,6 +51,26 @@ Dispatcher_Task::get_curr_config_info() const
}
ACE_INLINE
+int
+Dispatcher_Task::enqueue (Dispatch_Queue_Item *qitem)
+{
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ //update release time
+ //TODO: want release time before or after enqueuing call?
+ ACE_Time_Value release = ACE_OS::gettimeofday();
+#endif //KOKYU_HAS_RELEASE_GUARD
+
+ this->putq (qitem);
+
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ //if qos_info is not in map, this should add it
+ this->releases_.rebind(qitem->qos_info(),release.msec());
+#endif //KOKYU_HAS_RELEASE_GUARD
+
+ return 0;
+}
+
+ACE_INLINE
Dispatch_Queue_Item::Dispatch_Queue_Item (
const Dispatch_Command* cmd,
const QoSDescriptor& qos_info,
@@ -78,4 +103,12 @@ Dispatch_Queue_Item::command()
{
return const_cast<Dispatch_Command*> (command_);
}
+
+ACE_INLINE
+const QoSDescriptor&
+Dispatch_Queue_Item::qos_info() const
+{
+ return this->qos_info_;
}
+
+} //namespace Kokyu
diff --git a/Kokyu/Kokyu.mpc b/Kokyu/Kokyu.mpc
index 10d856daf66..f5c49bc64b2 100644
--- a/Kokyu/Kokyu.mpc
+++ b/Kokyu/Kokyu.mpc
@@ -3,6 +3,7 @@ project(Kokyu) : acelib {
dynamicflags = KOKYU_BUILD_DLL
Source_Files {
+ Dispatch_Deferrer.cpp
Dispatcher_Impl.cpp
Kokyu.cpp
Default_Dispatcher_Impl.cpp
diff --git a/Kokyu/Kokyu_defs.h b/Kokyu/Kokyu_defs.h
index 469fea15090..d83bd2edbfa 100644
--- a/Kokyu/Kokyu_defs.h
+++ b/Kokyu/Kokyu_defs.h
@@ -181,11 +181,27 @@ namespace Kokyu
DSRT_ConfigInfo ();
};
+
+#ifdef KOKYU_HAS_RELEASE_GUARD
+ class Dispatcher_Task; //forward declaration
+
+ class Dispatch_Deferrer_Attributes
+ {
+ public:
+ Dispatcher_Task* task_;
+
+ Dispatch_Deferrer_Attributes();
+ };
+#endif //KOKYU_HAS_RELEASE_GUARD
+
} //end of namespace
//to satisfy ACE_Array<ConfigInfo>
ACE_INLINE int operator != (const Kokyu::ConfigInfo& lhs, const Kokyu::ConfigInfo& rhs);
+//to satisfy ACE_Map_Manager<QoSDescriptor>
+ACE_INLINE int operator == (const Kokyu::QoSDescriptor& lhs, const Kokyu::QoSDescriptor& rhs);
+
#if defined (__ACE_INLINE__)
#include "Kokyu_defs.i"
#endif /* __ACE_INLINE__ */
diff --git a/Kokyu/Kokyu_defs.i b/Kokyu/Kokyu_defs.i
index 1ee9799d1de..035be9375d6 100644
--- a/Kokyu/Kokyu_defs.i
+++ b/Kokyu/Kokyu_defs.i
@@ -84,3 +84,14 @@ int operator != (const Kokyu::ConfigInfo& lhs,
lhs.thread_priority_ != rhs.thread_priority_ ||
lhs.dispatching_type_ != rhs.dispatching_type_ );
}
+
+//to satisfy ACE_Map_Manager<QoSDescriptor>
+ACE_INLINE
+int operator == (const Kokyu::QoSDescriptor& lhs,
+ const Kokyu::QoSDescriptor& rhs)
+{
+ return (lhs.preemption_priority_ == rhs.preemption_priority_ &&
+ lhs.deadline_ == rhs.deadline_ &&
+ lhs.execution_time_ == rhs.execution_time_ &&
+ lhs.importance_ == rhs.importance_ );
+}