diff options
author | thrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-12-02 00:08:12 +0000 |
---|---|---|
committer | thrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-12-02 00:08:12 +0000 |
commit | af3f97ac4930f05f6ba3d562988e030dd144ecd3 (patch) | |
tree | ac20ffcfce424cfe27ce7c8c644d4e15eca2efbc | |
parent | d3a98f5c49315692c10fa4767dc99fbf930cf126 (diff) | |
download | ATCD-af3f97ac4930f05f6ba3d562988e030dd144ecd3.tar.gz |
Added support for Release Guard algorithm, which is enacted if KOKYU_HAS_RELEASE_GUARD is defined at compile-time.
-rw-r--r-- | Kokyu/Dispatch_Deferrer.cpp | 98 | ||||
-rw-r--r-- | Kokyu/Dispatch_Deferrer.h | 94 | ||||
-rw-r--r-- | Kokyu/Dispatch_Deferrer.i | 29 | ||||
-rw-r--r-- | Kokyu/Dispatcher_Task.cpp | 59 | ||||
-rw-r--r-- | Kokyu/Dispatcher_Task.h | 28 | ||||
-rw-r--r-- | Kokyu/Dispatcher_Task.i | 33 | ||||
-rw-r--r-- | Kokyu/Kokyu.mpc | 1 | ||||
-rw-r--r-- | Kokyu/Kokyu_defs.h | 16 | ||||
-rw-r--r-- | Kokyu/Kokyu_defs.i | 11 |
9 files changed, 364 insertions, 5 deletions
diff --git a/Kokyu/Dispatch_Deferrer.cpp b/Kokyu/Dispatch_Deferrer.cpp new file mode 100644 index 00000000000..a959f932fe0 --- /dev/null +++ b/Kokyu/Dispatch_Deferrer.cpp @@ -0,0 +1,98 @@ +// $Id$ + +#include "ace/Sched_Params.h" +#include "Dispatch_Deferrer.h" +#include "Dispatcher_Task.h" + +#if ! defined (__ACE_INLINE__) +#include "Dispatch_Deferrer.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Dispatch_Deferrer, "$Id$") + +namespace Kokyu +{ + +int +Dispatch_Deferrer::init(const Dispatch_Deferrer_Attributes& attr) +{ + //set up reactor for timeouts + this->react_.open(0); + //Don't need any handles, since we're only using it for timeouts + + this->timers_.open(); + + this->task_ = attr.task_; + + return 0; +} + +int +Dispatch_Deferrer::dispatch (Dispatch_Queue_Item *qitem) +{ + ACE_ASSERT(qitem != 0); + + //setup timout + //For now, assume period = deadline + ACE_Time_Value tv; + tv = ACE_OS::gettimeofday() + qitem->qos_info().deadline_; + long timer_id = this->react_.schedule_timer(this, + 0, //NULL arg + tv); + if (timer_id < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("EC (%P|%t) cannot schedule Release Guard timer.") + ACE_TEXT ("ACE_Reactor.schedule_timer() returned -1\n")), + -1); + } + //else valid timer_id + this->timers_.bind(qitem,timer_id); + + //buffer until timer expires + return this->rgq_.enqueue_deadline(qitem,&tv); +} + + +int +Dispatch_Deferrer::handle_timeout (const ACE_Time_Value &, + const void *) +{ + //get all expired Dispatch_Queue_Items + ACE_Message_Block *begin,*end; + this->rgq_.remove_messages(begin,end, + (u_int) (ACE_Dynamic_Message_Strategy::LATE + | ACE_Dynamic_Message_Strategy::BEYOND_LATE)); + + //dispatch them back to Dispatcher_Impl + while (begin <= end) + { + Dispatch_Queue_Item *qitem = + ACE_dynamic_cast(Dispatch_Queue_Item*, begin); + + if (qitem == 0) + { + ACE_Message_Block::release (begin); + continue; + } + + + //remove timer for each enqueued qitem from reactor + long timer_id; + if (this->timers_.find(qitem,timer_id) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Could not cancel Release Guard timer.") + ACE_TEXT ("Unknown timer ID\n")), + -1); + } + //else got timer_id + this->react_.cancel_timer(timer_id); + + this->task_->enqueue(qitem); + + ++begin; + } +} + +} //namespace Kokyu diff --git a/Kokyu/Dispatch_Deferrer.h b/Kokyu/Dispatch_Deferrer.h new file mode 100644 index 00000000000..c7ae8cc6cc1 --- /dev/null +++ b/Kokyu/Dispatch_Deferrer.h @@ -0,0 +1,94 @@ +/* -*- C++ -*- */ +/** + * @file Dispatch_Deferrer.h + * + * $Id$ + * + * @author Bryan Thrall (thrall@cse.wustl.edu) + * + */ + +#ifndef DISPATCH_DEFERRER_H +#define DISPATCH_DEFERRER_H +#include /**/ "ace/pre.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" +#include "Kokyu_defs.h" +#include "ace/Event_Handler.h" +#include "ace/Thread_Mutex.h" +#include "ace/Message_Block.h" +#include "ace/Message_Queue.h" +#include "ace/Reactor.h" +#include "ace/Map.h" + +namespace Kokyu +{ + +class Dispatch_Task; //forward decl +class Dispatch_Queue_Item; //forward decl + +/** + * @class Dispatch_Deferrer + * + * @brief Part of the Release Guard protocol. When a Dispatch_Command + * needs to be dispatched later rather than when dispatch_i() is + * called on the Default_Dispatcher_Impl, it is passed to this + * object. When the appropriate time to dispatch the Dispatch_Command + * comes (last release time + period), this object calls enqueue() on + * the Dispatcher_Task. + */ +class Dispatch_Deferrer : public ACE_Event_Handler +{ + public: + Dispatch_Deferrer(); + //Default constructor + + ~Dispatch_Deferrer(); + //Destructor + + int init(const Dispatch_Deferrer_Attributes& attr); + + int dispatch (Dispatch_Queue_Item *qitem); + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); + //TODO: what if need higher resolution timers? + + private: + ACE_Deadline_Message_Strategy msg_strat_; + + ///Stores the Dispatch_Commands in earliest-release-time order, + ///until they are dispatched. I decided to use an + ///ACE_Dynamic_Message_Queue because it supports deadline + ///ordering. This decision is also good because we can simply store + ///the Dispatch_Queue_Item given to us by the + ///Default_Dispatcher_Impl rather than allocate some structure to + ///hold the Dispatch_Command and QoSDescriptor. + ACE_Dynamic_Message_Queue<ACE_SYNCH> rgq_; + + //Stores timer_ids from the Reactor (longs) using the + //Dispatch_Queue_Item the timer is for as the key. Used to + //cancel timers if they expire and are enqueued before the + //callback happens. + typedef ACE_Map_Manager<Dispatch_Queue_Item*,long,ACE_Thread_Mutex> Timer_Map; + + Timer_Map timers_; + + ///Manages timers for the Dispatch_Commands + ACE_Reactor react_; + + Dispatcher_Task* task_; +}; + +} //namespace Kokyu + +#if defined (__ACE_INLINE__) +#include "Dispatch_Deferrer.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif //DISPATCH_DEFERRER_H diff --git a/Kokyu/Dispatch_Deferrer.i b/Kokyu/Dispatch_Deferrer.i new file mode 100644 index 00000000000..315afce5598 --- /dev/null +++ b/Kokyu/Dispatch_Deferrer.i @@ -0,0 +1,29 @@ +// $Id$ + +namespace Kokyu +{ + +ACE_INLINE +Dispatch_Deferrer_Attributes::Dispatch_Deferrer_Attributes() +{ +} + +ACE_INLINE +Dispatch_Deferrer::Dispatch_Deferrer() + : msg_strat_() + , rgq_(this->msg_strat_) + , timers_() + , react_() + , task_(0) +{ +} + +ACE_INLINE +Dispatch_Deferrer::~Dispatch_Deferrer() +{ + this->react_.close(); + + this->timers_.close(); +} + +} //namespace Kokyu diff --git a/Kokyu/Dispatcher_Task.cpp b/Kokyu/Dispatcher_Task.cpp index e6d86711865..5fcce5f328e 100644 --- a/Kokyu/Dispatcher_Task.cpp +++ b/Kokyu/Dispatcher_Task.cpp @@ -68,6 +68,13 @@ Dispatcher_Task::initialize () own_allocator_ = 1; } +#ifdef KOKYU_HAS_RELEASE_GUARD + this->deferrer_attr_.task_ = this; + this->deferrer_.init(this->deferrer_attr_); + + this->releases_.open(); +#endif //KOKYU_HAS_RELEASE_GUARD + return 0; } @@ -138,6 +145,13 @@ int Dispatcher_Task::enqueue (const Dispatch_Command* cmd, const QoSDescriptor& qos_info) { + //Subclasses which override this function should now be overriding + //enqueue(ACE_Message_Block*) because that is where the main + //behavior is defined. This might invalidate those classes, but the + //decision seemed reasonable since it allows Dispatch_Deferrer to + //use the same Dispatch_Queue_Item as Dispatcher_Task without any + //more memory allocation and it doesn't break the interface. + void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item)); if (buf == 0) @@ -145,12 +159,47 @@ Dispatcher_Task::enqueue (const Dispatch_Command* cmd, ACE_Message_Block *mb = new (buf) Dispatch_Queue_Item (cmd, - qos_info, - &(this->data_block_), - ACE_Message_Block::DONT_DELETE, - this->allocator_); + qos_info, + &(this->data_block_), + ACE_Message_Block::DONT_DELETE, + this->allocator_); + +#ifdef KOKYU_HAS_RELEASE_GUARD + //if current release time < last release time + period then defer dispatch + ACE_Time_Value now(ACE_OS::gettimeofday()); + long rel_msec; + if (this->releases_.find(qos_info,rel_msec) < 0) + { + //new QosDescriptor, so just set last release to zero + rel_msec = 0; + } + ACE_Time_Value release; + release.msec(rel_msec); + release += qos_info.deadline_; + if (now < release) + { + //defer until last release time + period + Dispatch_Queue_Item *qitem = + ACE_dynamic_cast(Dispatch_Queue_Item*, mb); + + if (qitem == 0) + { + ACE_Message_Block::release (mb); + continue; + } + + this->deferrer_.dispatch(qitem); + } + else + { + //release! +#endif //KOKYU_HAS_RELEASE_GUARD + + this->enqueue (mb); - this->putq (mb); +#ifdef KOKYU_HAS_RELEASE_GUARD + } //else now >= release +#endif //KOKYU_HAS_RELEASE_GUARD return 0; } diff --git a/Kokyu/Dispatcher_Task.h b/Kokyu/Dispatcher_Task.h index 0bb91313f47..cb18cb588c2 100644 --- a/Kokyu/Dispatcher_Task.h +++ b/Kokyu/Dispatcher_Task.h @@ -16,6 +16,11 @@ #include "ace/Task.h" #include "ace/Lock_Adapter_T.h" +#ifdef KOKYU_HAS_RELEASE_GUARD +#include "ace/Map.h" +#include "Dispatch_Deferrer.h" +#endif //KOKYU_HAS_RELEASE_GUARD + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -42,6 +47,9 @@ public: Dispatch_Command* command (); + //Any reason why this shouldn't be visible? + const QoSDescriptor& qos_info() const; + private: void init_i(const QoSDescriptor&); @@ -71,6 +79,8 @@ public: int enqueue (const Dispatch_Command* cmd, const QoSDescriptor& qos_info); + int enqueue (Dispatch_Queue_Item *qitem); + /// Process the events in the queue. int svc (void); @@ -94,6 +104,24 @@ private: ACE_Deadline_Message_Strategy deadline_msg_strategy_; ACE_Laxity_Message_Strategy laxity_msg_strategy_; + +#ifdef KOKYU_HAS_RELEASE_GUARD + //TODO: What's the best way to identify periodic events? + //For now, use QoSDescriptor equivalence. + //Maps QoSDescriptors to last release time of events for that + //QoSDescriptor. Seems kind of wasteful to store the whole + //QoSDescriptor as the key, but I don't see any other way since each + //Dispatch_Queue_Item has its own instance of the QoSDescriptor. + //The release time is stored as a long as if from + //ACE_Time_Value.msec(). + typedef ACE_Map_Manager<QoSDescriptor,long,ACE_SYNCH_NULL_MUTEX> Release_Time_Map; + + Release_Time_Map releases_; + + //For delaying dispatch until required by RG: + Dispatch_Deferrer_Attributes deferrer_attr_; + Dispatch_Deferrer deferrer_; +#endif //KOKYU_HAS_RELEASE_GUARD }; } //end of namespace diff --git a/Kokyu/Dispatcher_Task.i b/Kokyu/Dispatcher_Task.i index e663509d86b..c67a2acdaf8 100644 --- a/Kokyu/Dispatcher_Task.i +++ b/Kokyu/Dispatcher_Task.i @@ -17,6 +17,11 @@ Dispatcher_Task::Dispatcher_Task (const ConfigInfo& config_info, config_info.reordering_flags_.static_bit_field_shift_, config_info.reordering_flags_.dynamic_priority_max_, config_info.reordering_flags_.dynamic_priority_offset_) +#ifdef KOKYU_HAS_RELEASE_GUARD + , releases_() + , deferrer_attr_() + , deferrer_() +#endif //KOKYU_HAS_RELEASE_GUARD { this->initialize(); } @@ -46,6 +51,26 @@ Dispatcher_Task::get_curr_config_info() const } ACE_INLINE +int +Dispatcher_Task::enqueue (Dispatch_Queue_Item *qitem) +{ +#ifdef KOKYU_HAS_RELEASE_GUARD + //update release time + //TODO: want release time before or after enqueuing call? + ACE_Time_Value release = ACE_OS::gettimeofday(); +#endif //KOKYU_HAS_RELEASE_GUARD + + this->putq (qitem); + +#ifdef KOKYU_HAS_RELEASE_GUARD + //if qos_info is not in map, this should add it + this->releases_.rebind(qitem->qos_info(),release.msec()); +#endif //KOKYU_HAS_RELEASE_GUARD + + return 0; +} + +ACE_INLINE Dispatch_Queue_Item::Dispatch_Queue_Item ( const Dispatch_Command* cmd, const QoSDescriptor& qos_info, @@ -78,4 +103,12 @@ Dispatch_Queue_Item::command() { return const_cast<Dispatch_Command*> (command_); } + +ACE_INLINE +const QoSDescriptor& +Dispatch_Queue_Item::qos_info() const +{ + return this->qos_info_; } + +} //namespace Kokyu diff --git a/Kokyu/Kokyu.mpc b/Kokyu/Kokyu.mpc index 10d856daf66..f5c49bc64b2 100644 --- a/Kokyu/Kokyu.mpc +++ b/Kokyu/Kokyu.mpc @@ -3,6 +3,7 @@ project(Kokyu) : acelib { dynamicflags = KOKYU_BUILD_DLL Source_Files { + Dispatch_Deferrer.cpp Dispatcher_Impl.cpp Kokyu.cpp Default_Dispatcher_Impl.cpp diff --git a/Kokyu/Kokyu_defs.h b/Kokyu/Kokyu_defs.h index 469fea15090..d83bd2edbfa 100644 --- a/Kokyu/Kokyu_defs.h +++ b/Kokyu/Kokyu_defs.h @@ -181,11 +181,27 @@ namespace Kokyu DSRT_ConfigInfo (); }; + +#ifdef KOKYU_HAS_RELEASE_GUARD + class Dispatcher_Task; //forward declaration + + class Dispatch_Deferrer_Attributes + { + public: + Dispatcher_Task* task_; + + Dispatch_Deferrer_Attributes(); + }; +#endif //KOKYU_HAS_RELEASE_GUARD + } //end of namespace //to satisfy ACE_Array<ConfigInfo> ACE_INLINE int operator != (const Kokyu::ConfigInfo& lhs, const Kokyu::ConfigInfo& rhs); +//to satisfy ACE_Map_Manager<QoSDescriptor> +ACE_INLINE int operator == (const Kokyu::QoSDescriptor& lhs, const Kokyu::QoSDescriptor& rhs); + #if defined (__ACE_INLINE__) #include "Kokyu_defs.i" #endif /* __ACE_INLINE__ */ diff --git a/Kokyu/Kokyu_defs.i b/Kokyu/Kokyu_defs.i index 1ee9799d1de..035be9375d6 100644 --- a/Kokyu/Kokyu_defs.i +++ b/Kokyu/Kokyu_defs.i @@ -84,3 +84,14 @@ int operator != (const Kokyu::ConfigInfo& lhs, lhs.thread_priority_ != rhs.thread_priority_ || lhs.dispatching_type_ != rhs.dispatching_type_ ); } + +//to satisfy ACE_Map_Manager<QoSDescriptor> +ACE_INLINE +int operator == (const Kokyu::QoSDescriptor& lhs, + const Kokyu::QoSDescriptor& rhs) +{ + return (lhs.preemption_priority_ == rhs.preemption_priority_ && + lhs.deadline_ == rhs.deadline_ && + lhs.execution_time_ == rhs.execution_time_ && + lhs.importance_ == rhs.importance_ ); +} |