From c44379cc7d9c7aa113989237ab0f56db12aa5219 Mon Sep 17 00:00:00 2001 From: "William R. Otte" Date: Mon, 24 Jul 2006 15:50:30 +0000 Subject: Repo restructuring --- ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.cpp | 286 +++++++++++++++++++ ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.h | 86 ++++++ ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.cpp | 380 +++++++++++++++++++++++++ ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.h | 92 ++++++ ACE/Kokyu/DSRT_Dispatch_Item_T.cpp | 35 +++ ACE/Kokyu/DSRT_Dispatch_Item_T.h | 103 +++++++ ACE/Kokyu/DSRT_Dispatch_Item_T.i | 70 +++++ ACE/Kokyu/DSRT_Dispatcher_Impl_T.cpp | 56 ++++ ACE/Kokyu/DSRT_Dispatcher_Impl_T.h | 210 ++++++++++++++ ACE/Kokyu/DSRT_Dispatcher_Impl_T.i | 57 ++++ ACE/Kokyu/DSRT_Sched_Queue_T.cpp | 285 +++++++++++++++++++ ACE/Kokyu/DSRT_Sched_Queue_T.h | 230 +++++++++++++++ ACE/Kokyu/Default_Dispatcher_Impl.cpp | 170 ++++++++++++ ACE/Kokyu/Default_Dispatcher_Impl.h | 77 +++++ ACE/Kokyu/Default_Dispatcher_Impl.i | 11 + ACE/Kokyu/Dispatch_Deferrer.cpp | 112 ++++++++ ACE/Kokyu/Dispatch_Deferrer.h | 95 +++++++ ACE/Kokyu/Dispatch_Deferrer.i | 29 ++ ACE/Kokyu/Dispatcher_Impl.cpp | 19 ++ ACE/Kokyu/Dispatcher_Impl.h | 74 +++++ ACE/Kokyu/Dispatcher_Impl.i | 31 +++ ACE/Kokyu/Dispatcher_Task.cpp | 189 +++++++++++++ ACE/Kokyu/Dispatcher_Task.h | 106 +++++++ ACE/Kokyu/Dispatcher_Task.i | 81 ++++++ ACE/Kokyu/Kokyu.cpp | 55 ++++ ACE/Kokyu/Kokyu.dsui | 37 +++ ACE/Kokyu/Kokyu.h | 115 ++++++++ ACE/Kokyu/Kokyu.i | 10 + ACE/Kokyu/Kokyu.mpc | 27 ++ ACE/Kokyu/Kokyu.mwc | 5 + ACE/Kokyu/Kokyu.pc.in | 11 + ACE/Kokyu/Kokyu_defs.cpp | 66 +++++ ACE/Kokyu/Kokyu_defs.h | 191 +++++++++++++ ACE/Kokyu/Kokyu_defs.i | 86 ++++++ ACE/Kokyu/Kokyu_dsrt.cpp | 179 ++++++++++++ ACE/Kokyu/Kokyu_dsrt.h | 177 ++++++++++++ ACE/Kokyu/Kokyu_dsrt.i | 11 + ACE/Kokyu/Makefile.am | 78 ++++++ ACE/Kokyu/README | 22 ++ ACE/Kokyu/docs/Kokyu.html | 416 ++++++++++++++++++++++++++++ ACE/Kokyu/docs/KokyuEC.jpg | Bin 0 -> 58534 bytes ACE/Kokyu/docs/kokyu1.jpg | Bin 0 -> 86414 bytes ACE/Kokyu/docs/kokyu2.jpg | Bin 0 -> 82664 bytes ACE/Kokyu/kokyu_config.h | 10 + ACE/Kokyu/kokyu_export.h | 55 ++++ ACE/Kokyu/tests/DSRT_MIF/DSRT_MIF.mpc | 7 + ACE/Kokyu/tests/DSRT_MIF/MIF.cpp | 185 +++++++++++++ ACE/Kokyu/tests/DSRT_MIF/Makefile.am | 35 +++ ACE/Kokyu/tests/DSRT_MIF/svc.conf | 4 + ACE/Kokyu/tests/DSRT_MIF/svc.conf.xml | 8 + ACE/Kokyu/tests/EDF/EDF.mpc | 6 + ACE/Kokyu/tests/EDF/Makefile.am | 35 +++ ACE/Kokyu/tests/EDF/README | 24 ++ ACE/Kokyu/tests/EDF/test.cpp | 162 +++++++++++ ACE/Kokyu/tests/FIFO/FIFO.mpc | 6 + ACE/Kokyu/tests/FIFO/Makefile.am | 35 +++ ACE/Kokyu/tests/FIFO/README | 24 ++ ACE/Kokyu/tests/FIFO/test.cpp | 166 +++++++++++ ACE/Kokyu/tests/Makefile.am | 15 + 59 files changed, 5147 insertions(+) create mode 100644 ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.cpp create mode 100644 ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.h create mode 100644 ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.cpp create mode 100644 ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.h create mode 100644 ACE/Kokyu/DSRT_Dispatch_Item_T.cpp create mode 100644 ACE/Kokyu/DSRT_Dispatch_Item_T.h create mode 100644 ACE/Kokyu/DSRT_Dispatch_Item_T.i create mode 100644 ACE/Kokyu/DSRT_Dispatcher_Impl_T.cpp create mode 100644 ACE/Kokyu/DSRT_Dispatcher_Impl_T.h create mode 100644 ACE/Kokyu/DSRT_Dispatcher_Impl_T.i create mode 100644 ACE/Kokyu/DSRT_Sched_Queue_T.cpp create mode 100644 ACE/Kokyu/DSRT_Sched_Queue_T.h create mode 100644 ACE/Kokyu/Default_Dispatcher_Impl.cpp create mode 100644 ACE/Kokyu/Default_Dispatcher_Impl.h create mode 100644 ACE/Kokyu/Default_Dispatcher_Impl.i create mode 100644 ACE/Kokyu/Dispatch_Deferrer.cpp create mode 100644 ACE/Kokyu/Dispatch_Deferrer.h create mode 100644 ACE/Kokyu/Dispatch_Deferrer.i create mode 100644 ACE/Kokyu/Dispatcher_Impl.cpp create mode 100644 ACE/Kokyu/Dispatcher_Impl.h create mode 100644 ACE/Kokyu/Dispatcher_Impl.i create mode 100644 ACE/Kokyu/Dispatcher_Task.cpp create mode 100644 ACE/Kokyu/Dispatcher_Task.h create mode 100644 ACE/Kokyu/Dispatcher_Task.i create mode 100644 ACE/Kokyu/Kokyu.cpp create mode 100644 ACE/Kokyu/Kokyu.dsui create mode 100644 ACE/Kokyu/Kokyu.h create mode 100644 ACE/Kokyu/Kokyu.i create mode 100644 ACE/Kokyu/Kokyu.mpc create mode 100644 ACE/Kokyu/Kokyu.mwc create mode 100644 ACE/Kokyu/Kokyu.pc.in create mode 100644 ACE/Kokyu/Kokyu_defs.cpp create mode 100644 ACE/Kokyu/Kokyu_defs.h create mode 100644 ACE/Kokyu/Kokyu_defs.i create mode 100644 ACE/Kokyu/Kokyu_dsrt.cpp create mode 100644 ACE/Kokyu/Kokyu_dsrt.h create mode 100644 ACE/Kokyu/Kokyu_dsrt.i create mode 100644 ACE/Kokyu/Makefile.am create mode 100644 ACE/Kokyu/README create mode 100644 ACE/Kokyu/docs/Kokyu.html create mode 100644 ACE/Kokyu/docs/KokyuEC.jpg create mode 100644 ACE/Kokyu/docs/kokyu1.jpg create mode 100644 ACE/Kokyu/docs/kokyu2.jpg create mode 100644 ACE/Kokyu/kokyu_config.h create mode 100644 ACE/Kokyu/kokyu_export.h create mode 100644 ACE/Kokyu/tests/DSRT_MIF/DSRT_MIF.mpc create mode 100644 ACE/Kokyu/tests/DSRT_MIF/MIF.cpp create mode 100644 ACE/Kokyu/tests/DSRT_MIF/Makefile.am create mode 100644 ACE/Kokyu/tests/DSRT_MIF/svc.conf create mode 100644 ACE/Kokyu/tests/DSRT_MIF/svc.conf.xml create mode 100644 ACE/Kokyu/tests/EDF/EDF.mpc create mode 100644 ACE/Kokyu/tests/EDF/Makefile.am create mode 100644 ACE/Kokyu/tests/EDF/README create mode 100644 ACE/Kokyu/tests/EDF/test.cpp create mode 100644 ACE/Kokyu/tests/FIFO/FIFO.mpc create mode 100644 ACE/Kokyu/tests/FIFO/Makefile.am create mode 100644 ACE/Kokyu/tests/FIFO/README create mode 100644 ACE/Kokyu/tests/FIFO/test.cpp create mode 100644 ACE/Kokyu/tests/Makefile.am (limited to 'ACE/Kokyu') diff --git a/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.cpp b/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.cpp new file mode 100644 index 00000000000..a5d28115fac --- /dev/null +++ b/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.cpp @@ -0,0 +1,286 @@ +// $Id$ + +#ifndef DSRT_CV_DISPATCHER_IMPL_T_CPP +#define DSRT_CV_DISPATCHER_IMPL_T_CPP + +#include "DSRT_CV_Dispatcher_Impl_T.h" + +#if !defined (__ACE_INLINE__) +//#include "DSRT_CV_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, DSRT_CV_Dispatcher_Impl_T, "$Id$") + +namespace Kokyu +{ + +/* +//@@VS: This is somehow not being recognized by MSVC, which results +//in a link error. For now, the definition has been moved to the .h +//file. Needs further investigation. + +template +int Comparator_Adapter_Generator::MoreEligible:: +operator ()(const DSRT_Dispatch_Item_var& item1, + const DSRT_Dispatch_Item_var& item2) +{ + int rc = qos_comparator_ (item1->qos (), item2->qos ()); + + //more eligible + if (rc == 1) + return 1; + + //if equally eligible, then resolve tie with the creation time of + //the item + if (rc == 0 && item1->insertion_time () < item2->insertion_time ()) + return 1; + + return 0; +} +*/ + +template +DSRT_CV_Dispatcher_Impl:: +DSRT_CV_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope) + :DSRT_Dispatcher_Impl (sched_policy, sched_scope), + run_cond_ (run_cond_lock_) +{ +} + +template int +DSRT_CV_Dispatcher_Impl:: +init_i (const DSRT_ConfigInfo&) +{ + return 0; +} + +template +int DSRT_CV_Dispatcher_Impl:: +schedule_i (Guid_t id, const DSRT_QoSDescriptor& qos) +{ + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i enter\n")); +#endif + + DSRT_Dispatch_Item* item; + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + + if (ACE_OS::thr_setprio (thr_handle, + this->blocked_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio failed")), -1); + } + + ACE_NEW_RETURN (item, + DSRT_Dispatch_Item (id, qos), + -1); + item->thread_handle (thr_handle); + + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + if (this->ready_queue_.insert (item) == -1) + return -1; + +#ifdef KOKYU_DSRT_LOGGING + this->ready_queue_.dump (); + + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i after ready_q.insert\n")); +#endif + + DSRT_Dispatch_Item_var item_var; + + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, cond_guard, run_cond_lock_, -1); + this->ready_queue_.most_eligible (item_var); + + guard.release (); + + ACE_hthread_t most_eligible_thr_handle = item_var->thread_handle (); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):curr thr handle = %d\n", + thr_handle)); + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):curr scheduled thr handle = %d\n", + this->curr_scheduled_thr_handle_)); + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):most eligible thr handle = %d \n", + most_eligible_thr_handle)); +#endif + + if (this->curr_scheduled_thr_handle_ == thr_handle && + most_eligible_thr_handle != thr_handle) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):curr sched thr handle = thr_handle & " + "most eligible thr handle != curr thr handle. " + "about to do a broadcast on CV to wake up most eligible\n")); +#endif + this->curr_scheduled_thr_handle_ = most_eligible_thr_handle; + //wake up the most eligible thread + this->run_cond_.broadcast (); + } + + //if the current thread is not the most eligible, then wait. + //if the current thread is most eligible, but some thread is + //scheduled currently, then wait. + while (most_eligible_thr_handle != thr_handle || + (most_eligible_thr_handle == thr_handle && + this->curr_scheduled_thr_handle_ != thr_handle && + this->curr_scheduled_thr_handle_ != 0)) + { + ACE_Time_Value tv (60,0); + tv += ACE_OS::gettimeofday (); + //wait a maximum of 1 min. This is an escape latch against lockups. +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T): About to block on cv\n")); +#endif + if (this->run_cond_.wait (&tv) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%t|%T): run_cond.wait timed out -- Possible Lockup\n")); + } + this->ready_queue_.most_eligible (item_var); + most_eligible_thr_handle = item_var->thread_handle (); + } + this->curr_scheduled_guid_ = item_var->guid (); + this->curr_scheduled_thr_handle_ = most_eligible_thr_handle; + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T): %d is currently running\n", + thr_handle)); +#endif + + if (ACE_OS::thr_setprio (thr_handle, + this->active_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio failed"))); + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i exit\n")); +#endif + + return 0; +} + +template +int DSRT_CV_Dispatcher_Impl:: +update_schedule_i (Guid_t guid, const DSRT_QoSDescriptor& qos) +{ + return this->schedule_i (guid, qos); +} + +template +int DSRT_CV_Dispatcher_Impl:: +update_schedule_i (Guid_t guid, Block_Flag_t flag) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block entered\n")); +#endif + + DSRT_Dispatch_Item_var dispatch_item; + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + + int found = this->ready_queue_.find (guid, dispatch_item); + +#ifdef KOKYU_DSRT_LOGGING + if (found == 0) + ACE_DEBUG ((LM_DEBUG, "(%t|%T): %d found in ready queue\n", thr_handle)); + else + ACE_DEBUG ((LM_DEBUG, "(%t|%T): %d not found in ready queue\n", + thr_handle)); +#endif + + if (found == 0 && flag == BLOCK) + { + thr_handle = dispatch_item->thread_handle (); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T): update schedule: %d found\n", thr_handle)); +#endif + + if (ACE_OS::thr_setprio (thr_handle, + this->blocked_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio failed"))); + } + + int rc = this->cancel_schedule (guid); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n")); +#endif + + return rc; + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n")); +#endif + + return -1; +} + +template int +DSRT_CV_Dispatcher_Impl:: +cancel_schedule_i (Guid_t guid) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T): about to remove guid\n")); +#endif + + this->ready_queue_.remove (guid); + +#ifdef KOKYU_DSRT_LOGGING + this->ready_queue_.dump (); +#endif + + if (this->curr_scheduled_thr_handle_ == thr_handle) + { + this->curr_scheduled_guid_ = 0; + this->curr_scheduled_thr_handle_ = 0; + } + + ACE_GUARD_RETURN (cond_lock_t, + mon, this->run_cond_lock_, 0); + this->run_cond_.broadcast (); + return 0; +} + +template int +DSRT_CV_Dispatcher_Impl:: +shutdown_i () +{ + this->shutdown_flagged_ = 1; + return 0; +} + +} + +#endif /* DSRT_CV_DISPATCHER_IMPL_T_CPP */ diff --git a/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.h b/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.h new file mode 100644 index 00000000000..305afb55ccc --- /dev/null +++ b/ACE/Kokyu/DSRT_CV_Dispatcher_Impl_T.h @@ -0,0 +1,86 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_CV_Dispatcher_Impl_T.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef DSRT_CV_DISPATCHER_IMPL_T_H +#define DSRT_CV_DISPATCHER_IMPL_T_H +#include /**/ "ace/pre.h" +#include "ace/Task.h" +#include "ace/Copy_Disabled.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Kokyu_dsrt.h" +#include "DSRT_Sched_Queue_T.h" +#include "DSRT_Dispatcher_Impl_T.h" + +namespace Kokyu +{ + template + class DSRT_CV_Dispatcher_Impl : + public DSRT_Dispatcher_Impl, + public ACE_Copy_Disabled + { + public: + typedef typename + DSRT_Scheduler_Traits::Guid_t Guid_t; + + typedef typename + DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor; + + DSRT_CV_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope); + + int init_i (const DSRT_ConfigInfo&); + + /// Schedule a thread dynamically based on the qos info supplied. + int schedule_i (Guid_t, const DSRT_QoSDescriptor&); + + /// Update the schedule for a thread. This could alter the current + /// schedule. + int update_schedule_i (Guid_t, const DSRT_QoSDescriptor&); + + /// Inform the scheduler that the caller thread is about to + /// block. This could alter the current schedule. + int update_schedule_i (Guid_t, Block_Flag_t); + + /// Cancel the schedule for a thread. This could alter the current + /// schedule. + int cancel_schedule_i (Guid_t); + + /// Shut down the dispatcher. The dispatcher will stop processing + /// requests. + int shutdown_i (); + + private: + typedef ACE_SYNCH_MUTEX cond_lock_t; + typedef ACE_SYNCH_CONDITION cond_t; + + cond_lock_t run_cond_lock_; + cond_t run_cond_; + }; + +} + +#if !defined (__ACE_INLINE__) +//#include "DSRT_CV_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "DSRT_CV_Dispatcher_Impl_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("DSRT_CV_Dispatcher_Impl_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* DSRT_DIRECT_DISPATCHER_IMPL_T_H */ diff --git a/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.cpp b/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.cpp new file mode 100644 index 00000000000..74eefa681dc --- /dev/null +++ b/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.cpp @@ -0,0 +1,380 @@ +// $Id$ + +#ifndef DSRT_DIRECT_DISPATCHER_IMPL_T_CPP +#define DSRT_DIRECT_DISPATCHER_IMPL_T_CPP + +#include "DSRT_Direct_Dispatcher_Impl_T.h" + +#if !defined (__ACE_INLINE__) +//#include "DSRT_Direct_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, DSRT_Direct_Dispatcher_Impl_T, "$Id$") + +namespace Kokyu +{ + +/* +//@@VS: This is somehow not being recognized by MSVC, which results +//in a link error. For now, the definition has been moved to the .h +//file. Needs further investigation. + +template +int Comparator_Adapter_Generator::MoreEligible:: +operator ()(const DSRT_Dispatch_Item_var& item1, + const DSRT_Dispatch_Item_var& item2) +{ + int rc = qos_comparator_ (item1->qos (), item2->qos ()); + + //more eligible + if (rc == 1) + return 1; + + //if equally eligible, then resolve tie with the creation time of + //the item + if (rc == 0 && item1->insertion_time () < item2->insertion_time ()) + return 1; + + return 0; +} +*/ +template +DSRT_Direct_Dispatcher_Impl:: +DSRT_Direct_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope) + :DSRT_Dispatcher_Impl(sched_policy, sched_scope), + sched_queue_modified_ (0), + sched_queue_modified_cond_ (sched_queue_modified_cond_lock_) +{ + //Run scheduler thread at highest priority + if (this->activate (this->rt_thr_flags_, 1, 0, this->executive_prio_) == -1) + { + ACE_ERROR ((LM_ERROR, + "(%t|%T) cannot activate scheduler thread in RT mode." + "Trying in non RT mode\n")); + if (this->activate (this->non_rt_thr_flags_) == -1) + ACE_ERROR ((LM_ERROR, + "(%t|%T) cannot activate scheduler thread\n")); + } +} + +template int +DSRT_Direct_Dispatcher_Impl:: +init_i (const DSRT_ConfigInfo&) +{ + return 0; +} + +template int +DSRT_Direct_Dispatcher_Impl::svc (void) +{ + ACE_hthread_t scheduler_thr_handle; + ACE_Thread::self (scheduler_thr_handle); + +#ifdef KOKYU_DSRT_LOGGING + int prio; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("max prio=%d\n") + ACE_TEXT ("min prio=%d\n") + ACE_TEXT ("active prio=%d\n") + ACE_TEXT ("inactive prio=%d\n"), + max_prio_, + min_prio_, + active_prio_, + inactive_prio_)); + + if (ACE_OS::thr_getprio (scheduler_thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("getprio not supported\n") + )); + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n") + ACE_TEXT ("thr_getprio failed"))); + } + } + + ACE_DEBUG ((LM_DEBUG, "(%t): Scheduler thread prio is %d\n", prio)); +#endif /*DSRT_LOGGING*/ + + while(1) + { + ACE_GUARD_RETURN (cond_lock_t, + mon, + sched_queue_modified_cond_lock_, + 0); + + if (this->shutdown_flagged_) + break; + + while (!sched_queue_modified_) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T): sched thread about to wait on cv\n")); +#endif + sched_queue_modified_cond_.wait (); + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T): sched thread done waiting on cv\n")); +#endif + + sched_queue_modified_ = 0; + + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, + synch_lock_mon, + this->synch_lock_, + 0); + + if (this->ready_queue_.current_size () <= 0) + continue; + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T):Sched Queue contents===>\n")); + this->ready_queue_.dump (); +#endif + DSRT_Dispatch_Item_var item_var; + this->ready_queue_.most_eligible (item_var); + + ACE_hthread_t most_eligible_thr_handle = item_var->thread_handle (); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):curr scheduled thr handle = %d\n", + this->curr_scheduled_thr_handle_)); + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):most eligible thr handle = %d \n", + most_eligible_thr_handle)); +#endif + + if (this->curr_scheduled_thr_handle_ != most_eligible_thr_handle) + { + if (this->curr_scheduled_thr_handle_ != 0) + { + if (ACE_OS::thr_setprio (this->curr_scheduled_thr_handle_, + this->inactive_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio on curr_scheduled_thr_handle_ failed."))); + ACE_DEBUG ((LM_DEBUG, "thr_handle = %d, prio = %d\n", + this->curr_scheduled_thr_handle_, + this->inactive_prio_)); + } + } + + if (ACE_OS::thr_setprio (most_eligible_thr_handle, + this->active_prio_, this->sched_policy_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio on most_eligible_thr_handle failed"))); + } + + this->curr_scheduled_thr_handle_ = most_eligible_thr_handle; + this->curr_scheduled_guid_ = item_var->guid (); + } + /*change all threads in blocked_prio_ to inactive_prio_*/ + this->ready_queue_.change_prio(this->blocked_prio_, this->inactive_prio_,this->sched_policy_); + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): sched thread exiting\n")); +#endif + + return 0; +} + +template +int DSRT_Direct_Dispatcher_Impl:: +schedule_i (Guid_t id, const DSRT_QoSDescriptor& qos) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i enter\n")); +#endif + + DSRT_Dispatch_Item* item; + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + + ACE_NEW_RETURN (item, + DSRT_Dispatch_Item (id, qos), + -1); + item->thread_handle (thr_handle); + + if (this->ready_queue_.insert (item) == -1) + return -1; + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i after ready_q.insert\n")); +#endif + + if (ACE_OS::thr_setprio (thr_handle, + this->blocked_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio failed")), -1); + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i after thr_setprio\n")); +#endif + + //ready_queue_.dump (); + + /*first release ready_queue_ lock. Otherwise if the scheduler gets the + sched_queue_modified_cond_lock first, then try to get the ready_queue_ lock + just when one thread who gets the ready_queue_ lock first, then try to get + sched_queue_modified_cond_lock. Deadlock happens. + */ + guard.release (); + + //@@ Perhaps the lock could be moved further down just before + //setting the condition variable? + ACE_GUARD_RETURN (cond_lock_t, + mon, this->sched_queue_modified_cond_lock_, 0); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i after acquiring cond lock\n")); +#endif + + this->sched_queue_modified_ = 1; + this->sched_queue_modified_cond_.signal (); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):schedule_i exit\n")); +#endif + + return 0; +} + +template +int DSRT_Direct_Dispatcher_Impl:: +update_schedule_i (Guid_t guid, const DSRT_QoSDescriptor& qos) +{ + return this->schedule (guid, qos); +} + +template +int DSRT_Direct_Dispatcher_Impl:: +update_schedule_i (Guid_t guid, Block_Flag_t flag) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block entered\n")); +#endif + + DSRT_Dispatch_Item_var dispatch_item; + ACE_hthread_t thr_handle; + //@@ Perhaps the lock could be got rid of. It looks like the state + //of this object is not getting modified here. It makes calls to + //other methods, which already are thread-safe. + //ACE_Guard mon(sched_queue_modified_cond_lock_); + + int found = this->ready_queue_.find (guid, dispatch_item); + if (found == 0 && flag == BLOCK) + { + thr_handle = dispatch_item->thread_handle (); + if (ACE_OS::thr_setprio (thr_handle, + this->blocked_prio_, + this->sched_policy_) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("thr_setprio failed"))); + } + + //monitor released because cancel_schedule would acquire the + //lock. Using recursive mutex creates lock up. + // + //@@ Need to investigate this further. Also we can consider + //using the Thread-Safe interface pattern. + //mon.release (); + int rc = this->cancel_schedule (guid); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n")); +#endif + + return rc; + } + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): update schedule for block done\n")); +#endif + + return -1; +} + +template int +DSRT_Direct_Dispatcher_Impl:: +cancel_schedule_i (Guid_t guid) +{ + ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, guard, this->synch_lock_, -1); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t): about to remove guid\n")); +#endif + + this->ready_queue_.remove (guid); + +#ifdef KOKYU_DSRT_LOGGING + this->ready_queue_.dump (); +#endif + + if (this->curr_scheduled_guid_ == guid) + { + this->curr_scheduled_guid_ = 0; + this->curr_scheduled_thr_handle_ = 0; + } + + //release ready_queue_ lock first before getting another lock + guard.release (); + + ACE_GUARD_RETURN (cond_lock_t, + mon, this->sched_queue_modified_cond_lock_, 0); + this->sched_queue_modified_ = 1; + this->sched_queue_modified_cond_.signal (); + return 0; +} + +template int +DSRT_Direct_Dispatcher_Impl:: +shutdown_i () +{ + this->shutdown_flagged_ = 1; + + ACE_Guard mon(this->sched_queue_modified_cond_lock_); + this->sched_queue_modified_ = 1; + this->sched_queue_modified_cond_.signal (); + // We have to wait until the scheduler executive thread shuts + // down. But we have acquired the lock and if we wait without + // releasing it, the scheduler thread will try to acquire it after + // it gets woken up by the above signal and it fails to acquire the + // lock. This will lead to a deadlock. So release the lock before we + // wait. + mon.release (); + this->wait (); + return 0; +} + +} + +#endif /* DSRT_DIRECT_DISPATCHER_IMPL_T_CPP */ diff --git a/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.h b/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.h new file mode 100644 index 00000000000..1e5dddfaecd --- /dev/null +++ b/ACE/Kokyu/DSRT_Direct_Dispatcher_Impl_T.h @@ -0,0 +1,92 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Direct_Dispatcher_Impl_T.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef DSRT_DIRECT_DISPATCHER_IMPL_T_H +#define DSRT_DIRECT_DISPATCHER_IMPL_T_H +#include /**/ "ace/pre.h" + +#include "ace/Task.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ +#include "ace/Copy_Disabled.h" +#include "Kokyu_dsrt.h" +#include "DSRT_Sched_Queue_T.h" +#include "DSRT_Dispatcher_Impl_T.h" + +namespace Kokyu +{ + template + class DSRT_Direct_Dispatcher_Impl : + public ACE_Task_Base, + public DSRT_Dispatcher_Impl, + public ACE_Copy_Disabled + { + public: + typedef typename + DSRT_Scheduler_Traits::Guid_t Guid_t; + + typedef typename + DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor; + + DSRT_Direct_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope); + + int init_i (const DSRT_ConfigInfo&); + + /// Schedule a thread dynamically based on the qos info supplied. + int schedule_i (Guid_t, const DSRT_QoSDescriptor&); + + /// Update the schedule for a thread. This could alter the current + /// schedule. + int update_schedule_i (Guid_t, const DSRT_QoSDescriptor&); + + /// Inform the scheduler that the caller thread is about to + /// block. This could alter the current schedule. + int update_schedule_i (Guid_t, Block_Flag_t); + + /// Cancel the schedule for a thread. This could alter the current + /// schedule. + int cancel_schedule_i (Guid_t); + + /// Shut down the dispatcher. The dispatcher will stop processing + /// requests. + int shutdown_i (); + + private: + typedef ACE_SYNCH_MUTEX cond_lock_t; + typedef ACE_SYNCH_CONDITION cond_t; + + u_int sched_queue_modified_; + cond_lock_t sched_queue_modified_cond_lock_; + cond_t sched_queue_modified_cond_; + + private: + int svc (void); + }; + +} + +#if !defined (__ACE_INLINE__) +//#include "DSRT_Direct_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "DSRT_Direct_Dispatcher_Impl_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("DSRT_Direct_Dispatcher_Impl_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* DSRT_DIRECT_DISPATCHER_IMPL_T_H */ diff --git a/ACE/Kokyu/DSRT_Dispatch_Item_T.cpp b/ACE/Kokyu/DSRT_Dispatch_Item_T.cpp new file mode 100644 index 00000000000..b2cc29fa788 --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatch_Item_T.cpp @@ -0,0 +1,35 @@ +// $Id$ + +#ifndef DSRT_DISPATCH_ITEM_T_CPP +#define DSRT_DISPATCH_ITEM_T_CPP + +#include "DSRT_Dispatch_Item_T.h" + +#if ! defined (__ACE_INLINE__) +#include "DSRT_Dispatch_Item_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, DSRT_Dispatch_Item, "$Id$") + +namespace Kokyu +{ + +template +DSRT_Dispatch_Item_var:: +DSRT_Dispatch_Item_var (DSRT_Dispatch_Item *p) + :ACE_Strong_Bound_Ptr, + ACE_SYNCH_MUTEX> (p) +{ +} + +template +DSRT_Dispatch_Item_var:: +DSRT_Dispatch_Item_var (const DSRT_Dispatch_Item_var &r) + :ACE_Strong_Bound_Ptr, + ACE_SYNCH_MUTEX> (r) +{ +} + +} + +#endif /* DSRT_DISPATCH_ITEM_T_CPP */ diff --git a/ACE/Kokyu/DSRT_Dispatch_Item_T.h b/ACE/Kokyu/DSRT_Dispatch_Item_T.h new file mode 100644 index 00000000000..f87826832fe --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatch_Item_T.h @@ -0,0 +1,103 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Dispatch_Item_T.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef DSRT_DISPATCH_ITEM_H +#define DSRT_DISPATCH_ITEM_H +#include /**/ "ace/pre.h" +#include "ace/Bound_Ptr.h" +#include "ace/Copy_Disabled.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Kokyu_dsrt.h" + +namespace Kokyu +{ + /** + * @class DSRT_Dispatch_Item + * + * @brief This stores information about a schedulable thread. + */ + + template + class DSRT_Dispatch_Item : private ACE_Copy_Disabled + { + typedef typename + DSRT_Scheduler_Traits::Guid_t Guid_t; + + typedef typename + DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor; + + protected: + ACE_hthread_t thr_handle_; + Guid_t guid_; + DSRT_QoSDescriptor qos_; + ACE_Time_Value insertion_time_; + + public: + DSRT_Dispatch_Item (Guid_t guid, const DSRT_QoSDescriptor&); + + /// Get the guid. + Guid_t guid (); + + /// Get the associated qos value. + DSRT_QoSDescriptor qos (); + + /// Get the thread handle. + ACE_hthread_t thread_handle (); + + /// Set the thread handle. + void thread_handle (ACE_hthread_t &handle); + + /// Get the insertion time. + ACE_Time_Value insertion_time (); + + /// Set the insertion time. + void insertion_time (const ACE_Time_Value&); + }; + + /** + * @class DSRT_Dispatch_Item_var + * + * @brief Smart pointer to dynamically allocated + * DSRT_Dispatch_Item objects. + */ + template + class DSRT_Dispatch_Item_var : + public ACE_Strong_Bound_Ptr< + DSRT_Dispatch_Item, + ACE_SYNCH_MUTEX> + { + public: + explicit + DSRT_Dispatch_Item_var (DSRT_Dispatch_Item + *p = 0); + + DSRT_Dispatch_Item_var ( + const DSRT_Dispatch_Item_var &r); + }; +} + +#if defined (__ACE_INLINE__) +#include "DSRT_Dispatch_Item_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "DSRT_Dispatch_Item_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("DSRT_Dispatch_Item_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* DSRT_DISPATCH_ITEM_H */ diff --git a/ACE/Kokyu/DSRT_Dispatch_Item_T.i b/ACE/Kokyu/DSRT_Dispatch_Item_T.i new file mode 100644 index 00000000000..fca64cea38d --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatch_Item_T.i @@ -0,0 +1,70 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Dispatch_Item_T.i + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +namespace Kokyu +{ + +template +ACE_INLINE +DSRT_Dispatch_Item:: +DSRT_Dispatch_Item (Guid_t guid, const DSRT_QoSDescriptor& qos) + :guid_ (guid), qos_ (qos) +{ +} + +template +ACE_INLINE typename DSRT_Dispatch_Item::Guid_t +DSRT_Dispatch_Item:: +guid () +{ + return guid_; +} + +template +ACE_INLINE typename DSRT_Dispatch_Item::DSRT_QoSDescriptor +DSRT_Dispatch_Item:: +qos () +{ + return qos_; +} + +template +ACE_INLINE ACE_hthread_t +DSRT_Dispatch_Item:: +thread_handle () +{ + return thr_handle_; +} + +template +ACE_INLINE void +DSRT_Dispatch_Item:: +thread_handle (ACE_hthread_t &handle) +{ + thr_handle_ = handle; +} + +template +ACE_INLINE void +DSRT_Dispatch_Item:: +insertion_time (const ACE_Time_Value& tv) +{ + this->insertion_time_ = tv; +} + +template +ACE_INLINE ACE_Time_Value +DSRT_Dispatch_Item:: +insertion_time () +{ + return this->insertion_time_; +} + +} diff --git a/ACE/Kokyu/DSRT_Dispatcher_Impl_T.cpp b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.cpp new file mode 100644 index 00000000000..4e7cace0a1e --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.cpp @@ -0,0 +1,56 @@ +// $Id$ + +#ifndef DSRT_DISPATCHER_IMPL_T_CPP +#define DSRT_DISPATCHER_IMPL_T_CPP + +#include "DSRT_Dispatcher_Impl_T.h" + +#if ! defined (__ACE_INLINE__) +#include "DSRT_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, DSRT_Dispatcher_Impl, "$Id$") + +namespace Kokyu +{ + +template +DSRT_Dispatcher_Impl:: +DSRT_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope) + : sched_policy_ (sched_policy), + sched_scope_ (sched_scope), + min_prio_ (ACE_Sched_Params::priority_min + (sched_policy_, + sched_scope)), + max_prio_ (ACE_Sched_Params::priority_max + (sched_policy_, + sched_scope)), + executive_prio_ (max_prio_), + blocked_prio_ (ACE_Sched_Params::previous_priority + (sched_policy_, + max_prio_, + sched_scope)), + inactive_prio_ (min_prio_), + active_prio_ (ACE_Sched_Params::next_priority + (sched_policy_, + min_prio_)), + shutdown_flagged_ (0), + non_rt_thr_flags_ (THR_NEW_LWP | THR_JOINABLE | THR_BOUND), + curr_scheduled_thr_handle_ (0) +{ + if (sched_policy_ == ACE_SCHED_FIFO) + rt_thr_flags_ = non_rt_thr_flags_ | THR_SCHED_FIFO; + else if (sched_policy_ == ACE_SCHED_RR) + rt_thr_flags_ = non_rt_thr_flags_ | THR_SCHED_RR; +} + +//virtual - so don't inline +template +DSRT_Dispatcher_Impl::~DSRT_Dispatcher_Impl () +{ +} + +} + +#endif /* DSRT_DISPATCHER_IMPL_T_CPP */ diff --git a/ACE/Kokyu/DSRT_Dispatcher_Impl_T.h b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.h new file mode 100644 index 00000000000..e0883487bc6 --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.h @@ -0,0 +1,210 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Dispatcher_Impl_T.h + * + * $Id$ + * + */ + +#ifndef DSRT_DISPATCHER_IMPL_H +#define DSRT_DISPATCHER_IMPL_H +#include /**/ "ace/pre.h" + +#include "ace/Synch_Traits.h" +#if defined (ACE_HAS_THREADS) +# include "ace/Recursive_Thread_Mutex.h" +#else +# include "ace/Null_Mutex.h" +#endif /* ACE_HAS_THREADS */ + +#include "Kokyu_dsrt.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + + + +namespace Kokyu +{ + /** + * @class Comparator_Adapter_Generator + * + * @brief Generates function object adapter that adapts the + * QoSComparator function object to compare between two schedulable + * items instead of QoSDescriptors. + * + * The QoSComparator function object that gets passed through the + * DSRT_Scheduler_Traits takes two qos values and + * determines the more eligible one. Since the INT_ID (key) for + * RB_Tree needs to be of type DSRT_Dispatch_Item_var + * , the QoSComparator needs to be adapted using an adapter + * to compare two schedulable items. This adapter compares the two + * using their qos values. Ties are resolved by giving preference to + * items which arrived earlier. Note that this class serves the + * purpose of a generator class, since it generates the adapter + * class for a given qos comparator function object. + */ + + template + class Comparator_Adapter_Generator + { + public: + typedef typename + DSRT_Scheduler_Traits::QoSComparator_t QoSComparator_t; + + /** + * @class More_Eligible + * + * @brief Actual function object that gets generated. + */ + class MoreEligible + { + public: + /** + * Function call operator to do comparison between two + * schedulable items. Returns 1 if item1 is more eligible than + * item2, otherwise 0. + */ + int operator () + (const DSRT_Dispatch_Item_var& item1, + const DSRT_Dispatch_Item_var& item2) + { + int rc = qos_comparator_ (item1->qos (), item2->qos ()); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T): MoreEligible:: qos_comparator returned %d\n", + rc)); +#endif + + //more eligible + if (rc == 1) + return rc; + + //if equally eligible, then resolve tie with the creation time of + //the item + if (rc == 0 && item1->insertion_time () < item2->insertion_time ()) + return 1; + + return 0; + } + + private: + QoSComparator_t qos_comparator_; + }; + + /** + * Facilitates return of the generated function object adapter. + */ + typedef MoreEligible RET; + }; + + /** + * @class DSRT_Dispatcher + * + * @brief Base class for DSRT dispatcher implementations + * + * The responsibility of this class is to act as a common base class + * for different DSRT dispatcher implementations. This is an + * abstract base class and cannot be instantiated. + */ + template + class DSRT_Dispatcher_Impl + { + public: + typedef typename DSRT_Scheduler_Traits::Guid_t Guid_t; + typedef typename DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor; + + DSRT_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy, + int sched_scope); + + /// Configure the DSRT dispatcher. + int init (const DSRT_ConfigInfo&); + + /// Schedule a thread dynamically based on the qos info supplied. + int schedule (Guid_t guid, + const DSRT_QoSDescriptor&); + + /// Update the schedule for a thread. This could alter the current + /// schedule. + int update_schedule (Guid_t guid, + const DSRT_QoSDescriptor&); + + /// Inform the scheduler that the caller thread is about to + /// block. This could alter the current schedule. + int update_schedule (Guid_t guid, Block_Flag_t flag); + + /// Cancel the schedule for a thread. This could alter the current + /// schedule. + int cancel_schedule (Guid_t guid); + + /// Shut down the dispatcher. The dispatcher will stop processing + /// requests. + int shutdown (); + + virtual ~DSRT_Dispatcher_Impl (); + + private: + + //following an idiom to avoid public virtual functions. + //instead make them private and use the template method + //pattern - "Virtually Yours" article in CUJ Experts Forum + + virtual int init_i (const DSRT_ConfigInfo&)=0; + virtual int schedule_i (Guid_t guid, + const DSRT_QoSDescriptor&)=0; + virtual int update_schedule_i (Guid_t guid, + const DSRT_QoSDescriptor&)=0; + virtual int update_schedule_i (Guid_t guid, Block_Flag_t flag)=0; + virtual int cancel_schedule_i (Guid_t guid)=0; + virtual int shutdown_i ()=0; + + protected: + /// Generate the QoSComparator adapter. + typedef typename + Comparator_Adapter_Generator::RET + Queue_Item_Comparator_t; + + typedef Sched_Ready_Queue + DSRT_Sched_Queue_t; + + ACE_Sched_Params::Policy sched_policy_; + int sched_scope_; + + Priority_t min_prio_; + Priority_t max_prio_; + Priority_t executive_prio_; + Priority_t blocked_prio_; + Priority_t inactive_prio_; + Priority_t active_prio_; + + DSRT_Sched_Queue_t ready_queue_; + int shutdown_flagged_; + long non_rt_thr_flags_; + long rt_thr_flags_; + + ACE_SYNCH_RECURSIVE_MUTEX synch_lock_; + + ACE_hthread_t curr_scheduled_thr_handle_; + Guid_t curr_scheduled_guid_; + }; + +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "DSRT_Dispatcher_Impl_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "DSRT_Dispatcher_Impl_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("DSRT_Dispatcher_Impl_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* DSRT_DISPATCHER_IMPL_H */ diff --git a/ACE/Kokyu/DSRT_Dispatcher_Impl_T.i b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.i new file mode 100644 index 00000000000..e4983a17ba3 --- /dev/null +++ b/ACE/Kokyu/DSRT_Dispatcher_Impl_T.i @@ -0,0 +1,57 @@ +// $Id$ + +namespace Kokyu +{ + +template +ACE_INLINE int +DSRT_Dispatcher_Impl:: +init (const DSRT_ConfigInfo& config_info) +{ + return this->init_i (config_info); +} + +template +ACE_INLINE int +DSRT_Dispatcher_Impl:: +schedule (Guid_t guid, + const DSRT_QoSDescriptor& qos) +{ + return this->schedule_i (guid, qos); +} + +template +ACE_INLINE int +DSRT_Dispatcher_Impl:: +update_schedule (Guid_t guid, + const DSRT_QoSDescriptor& qos) +{ + return this->update_schedule_i (guid, qos); +} + +template +ACE_INLINE int +DSRT_Dispatcher_Impl:: +update_schedule (Guid_t guid, + Block_Flag_t flag) +{ + return this->update_schedule_i (guid, flag); +} + +template +ACE_INLINE int +DSRT_Dispatcher_Impl:: +cancel_schedule (Guid_t guid) +{ + return this->cancel_schedule_i (guid); +} + +template +ACE_INLINE int +DSRT_Dispatcher_Impl::shutdown () + +{ + return this->shutdown_i (); +} + +} diff --git a/ACE/Kokyu/DSRT_Sched_Queue_T.cpp b/ACE/Kokyu/DSRT_Sched_Queue_T.cpp new file mode 100644 index 00000000000..30845823aa1 --- /dev/null +++ b/ACE/Kokyu/DSRT_Sched_Queue_T.cpp @@ -0,0 +1,285 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Sched_Queue_T.cpp + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ +#ifndef DSRT_SCHED_QUEUE_T_CPP +#define DSRT_SCHED_QUEUE_T_CPP + +#include "DSRT_Sched_Queue_T.h" +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if !defined (__ACE_INLINE__) +//#include "DSRT_Sched_Queue_T.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, + DSRT_Sched_Queue_T, + "$Id$") + +namespace Kokyu +{ +/* +//@@VS: This is somehow not being recognized by MSVC, which results +//in a link error. For now, the definition has been moved to the .h +//file. Needs further investigation. + +template +u_long +Sched_Ready_Queue:: +Guid_Hash::operator () (const Guid_t& id) +{ + typename DSRT_Scheduler_Traits::Guid_Hash guid_hash; + return guid_hash(id); +} +*/ +template +int Sched_Ready_Queue:: +current_size () +{ + return dispatch_items_prio_queue_.current_size (); +} + +template +int Sched_Ready_Queue:: +most_eligible (DSRT_Dispatch_Item_var& item) +{ + if (dispatch_items_prio_queue_.current_size () == 0) + return -1; + + PRIO_QUEUE_ITERATOR start = dispatch_items_prio_queue_.begin (); + PRIO_QUEUE_ENTRY &ent = (*start); + item = ent.item (); + return 0; +} + +template +int Sched_Ready_Queue:: +find (Guid_t guid, + DSRT_Dispatch_Item_var& found_item) +{ + ACE_GUARD_RETURN (ACE_LOCK, mon, lock_, -1); + RB_Tree_Dispatch_Item_Node* rb_tree_node; + + if (dispatch_items_hash_map_.find(guid, rb_tree_node) == -1) + { + return -1; + } + else + { + found_item = rb_tree_node->item (); + return 0; + } + + return 0; +} + +template +int Sched_Ready_Queue:: +insert (DSRT_Dispatch_Item* item) +{ + item->insertion_time (ACE_OS::gettimeofday ()); + DSRT_Dispatch_Item_var item_var(item); + + ACE_GUARD_RETURN (ACE_LOCK, mon, lock_, -1); + + RB_Tree_Dispatch_Item_Node* rb_tree_node; + Guid_t guid = item->guid (); + +#ifdef KOKYU_DSRT_LOGGING + ACE_hthread_t thr_handle = item->thread_handle (); + + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) about to insert %d in sched queue\n", + thr_handle)); +#endif + + if (dispatch_items_hash_map_.find (guid, rb_tree_node) == -1) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) %d not found in hashmap\n", thr_handle)); +#endif + if (dispatch_items_prio_queue_.bind (item_var, + item_var, + rb_tree_node) == 0) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T): item bound in rbtree\n")); +#endif + if (dispatch_items_hash_map_.bind (guid, rb_tree_node) == 0) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, "(%t|%T): item bound in hashmap\n")); + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents Begin===>\n")); + dispatch_items_hash_map_.dump (); + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents End=====>\n")); +#endif + return 0; + } + } + } + else + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) %d found in hashmap\n", thr_handle)); +#endif + dispatch_items_hash_map_.unbind (guid); + dispatch_items_prio_queue_.unbind (rb_tree_node); + +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) %d removed from hashmap and rbtree\n", thr_handle)); +#endif + if (dispatch_items_prio_queue_.bind (item_var, + item_var, + rb_tree_node) == 0) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) %d bound to rbtree\n", thr_handle)); +#endif + if (dispatch_items_hash_map_.bind (guid, rb_tree_node) == 0) + { +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T) %d bound to hashmap\n", thr_handle)); + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents Begin===>\n")); + dispatch_items_hash_map_.dump (); + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents End===>\n")); +#endif + return 0; + } + } + } + + return -1; +} + +template +int Sched_Ready_Queue:: +remove (Guid_t guid) +{ + ACE_GUARD_RETURN (ACE_LOCK, mon, lock_, -1); + RB_Tree_Dispatch_Item_Node* rb_tree_node; + + if (dispatch_items_hash_map_.find(guid, rb_tree_node) == 0) + { + dispatch_items_hash_map_.unbind (guid); + dispatch_items_prio_queue_.unbind (rb_tree_node); +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents Begin===>\n")); + dispatch_items_hash_map_.dump (); + ACE_DEBUG ((LM_DEBUG, + "<===Hash Table contents End===>\n")); +#endif + + return 0; + } + + return -1; +} + +template +void Sched_Ready_Queue:: +dump () +{ + ACE_GUARD (ACE_LOCK, mon, lock_); + ACE_DEBUG ((LM_DEBUG, "(%t|%T):##########################\n")); + if (dispatch_items_prio_queue_.current_size ()) + { + PRIO_QUEUE_ITERATOR end_iter = dispatch_items_prio_queue_.end (); + PRIO_QUEUE_ITERATOR iter; + + iter = dispatch_items_prio_queue_.begin (); + while( iter != end_iter ) + { + PRIO_QUEUE_ENTRY &ent = (*iter); + DSRT_Dispatch_Item_var + item_var = ent.item (); + /* + int guid; + ACE_OS::memcpy (&guid, + item_var->guid ().get_buffer (), + item_var->guid ().length ()); + + ACE_DEBUG ((LM_DEBUG, "(%t|%T):guid %d, thr_handle = %d\n", + guid, item_var->thread_handle ())); + */ + ++iter; + } + } + ACE_DEBUG ((LM_DEBUG, "(%t|%T):##########################\n")); +} + +template +int Sched_Ready_Queue:: +change_prio(int old_prio, int new_prio, int policy) +{ + if (dispatch_items_prio_queue_.current_size ()) + { + PRIO_QUEUE_ITERATOR end_iter = dispatch_items_prio_queue_.end (); + PRIO_QUEUE_ITERATOR iter; + int prio; + + iter = dispatch_items_prio_queue_.begin (); + while( iter != end_iter ) + { + PRIO_QUEUE_ENTRY &ent = (*iter); + DSRT_Dispatch_Item_var + item_var = ent.item (); + ACE_OS::thr_getprio (item_var->thread_handle (), prio); + if (prio==old_prio) { + ACE_OS::thr_setprio(item_var->thread_handle (), new_prio, policy); + } + ++iter; + } + } + return(0); +} + +} + +#endif /* DSRT_SCHED_QUEUE_T_CPP */ diff --git a/ACE/Kokyu/DSRT_Sched_Queue_T.h b/ACE/Kokyu/DSRT_Sched_Queue_T.h new file mode 100644 index 00000000000..68ad4be8e69 --- /dev/null +++ b/ACE/Kokyu/DSRT_Sched_Queue_T.h @@ -0,0 +1,230 @@ +/* -*- C++ -*- */ +/** + * @file DSRT_Sched_Queue_T.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef DSRT_SCHED_QUEUE_T_H +#define DSRT_SCHED_QUEUE_T_H +#include /**/ "ace/pre.h" + +#include "DSRT_Dispatch_Item_T.h" +#include "ace/RB_Tree.h" +#include "ace/Hash_Map_Manager_T.h" +#include "ace/Null_Mutex.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Kokyu_dsrt.h" + +namespace Kokyu +{ + + /** + * @class Sched_Ready_Queue + * + * @brief RB_Tree based template class for implementation of + * reordering queue. + * + * This queue is used as a priority queue to store schedulable + * entities. The item at the top of the RB_Tree is the most eligible + * item. The comparator used to determine the most eligible item is + * passed as a template parameter More_Eligible_Comparator + * . This is expected to be a functor which compares two + * schedulable items. The mutex type template parameter for RB_Tree + * is chosen to be a null mutex since all the methods in the + * enclosing Sched_Ready_Queue class are thread + * safe. Since QoS is used for comparison between two schedulable + * items, QoSDescriptor is the ideal candidate to be used as the key + * or the EXT_ID for RB_Tree instantiation. But two qos descriptors + * could be the same. The existing implementation of RB_Tree does + * not allow duplicate keys. In order to facilitate insertion of + * duplicate qos descriptors, the qos descriptors are contained in a + * DSRT_Dispatch_Item and this is used as the basis + * of comparison. To resolve tie between equal qos values, an + * insertion time stamp is maintained in each item and an item with + * an earlier time stamp is more eligible than an item with an + * identical qos value. Another requirement is that it should be + * possible to remove an item from the RB_Tree based on guid. Since + * we have already used up the qos descriptor for the key, we need a + * separate index into the list of schedulable items. The second + * index should be based on guid. This is achieved by using a hash + * map to store pairs. This makes the deletion + * of nodes from RB_Tree more efficient. + * + */ + template + class Sched_Ready_Queue + { + /// Extract the necessary types from the traits class + typedef typename DSRT_Scheduler_Traits::Guid_t Guid_t; + + typedef typename + DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor_t; + + public: + + /** + * Given a guid, find an item in the priority queue. + * + * @param guid Guid of item + * + * @param found_item Reference to DSRT_Dispatch_Item_var + * to hold the found item. + * @return -1 if no item found and 0 otherwise. + */ + int find(Guid_t guid, + DSRT_Dispatch_Item_var& + found_item); + + + /** + * Insert an item in the priority queue. If item with same guid is + * already in the queue, the existing one is deleted and the new + * one inserted. A deletion and insertion has to happen instead of + * update since the rebalancing of the RB_Tree should take place. + * + * @param item DSRT_Dispatch_Item object containing guid and qos. + * + * @return -1 if insertion failed and 0 otherwise. + */ + int insert(DSRT_Dispatch_Item* item); + + /** + * Remove an item from the priority queue. + * + * @param guid Guid of item. + * + * @param qos QoS associated with item. + * + * @return -1 if removal failed and 0 otherwise. + */ + int remove(Guid_t guid); + + /** + * Returns current size of the priority queue. + */ + int current_size (); + + /** + * Get the most eligible item from the priority queue. + * + * @param item Item which is most eligible, i.e. one at the + * "top" of the priority queue. + * + * @return -1 if there are no items in the priority queue. + */ + int most_eligible (DSRT_Dispatch_Item_var& + item); + + /** + * change blocked_prio_ item to inactive_prio_ + */ + int change_prio (int old_prio, int new_prio, int policy); + + void dump(); + + private: + + /** + * @class Guid_Hash + * + * @brief Internal class to generate hash for guid. + * + * This acts just as a wrapper functor to the Hash functor passed + * as part of the traits class DSRT_Scheduler_Traits + * . + * + */ + class Guid_Hash + { + public: + /// Returns hash value. + u_long operator () (const typename DSRT_Scheduler_Traits::Guid_t &id) + { + typename DSRT_Scheduler_Traits::Guid_Hash guid_hash; + return guid_hash(id); + } + }; + + // RB_Tree related typedefs + typedef ACE_RB_Tree , + DSRT_Dispatch_Item_var, + More_Eligible_Comparator, + ACE_SYNCH_NULL_MUTEX> Dispatch_Items_Priority_Queue; + + + typedef + ACE_RB_Tree_Node, + DSRT_Dispatch_Item_var > + RB_Tree_Dispatch_Item_Node; + + typedef typename + Dispatch_Items_Priority_Queue::ITERATOR PRIO_QUEUE_ITERATOR; + + typedef typename + Dispatch_Items_Priority_Queue::ENTRY PRIO_QUEUE_ENTRY; + + // Hash map related typedefs + typedef ACE_Hash_Map_Manager_Ex, + ACE_SYNCH_NULL_MUTEX> + Dispatch_Items_Hash_Map; + + typedef ACE_Hash_Map_Iterator_Ex, + ACE_SYNCH_NULL_MUTEX> + Dispatch_Items_Hash_Map_Iterator; + + typedef ACE_Hash_Map_Entry + Dispatch_Items_Hash_Map_Entry; + + /** + * Lock used to protect the state of the scheduler queue. A + * separate lock is not used for the internal RB_Tree and hashmap. + */ + ACE_LOCK lock_; + + /** + * Hash table to maintain a second index into the list of + * schedulable items. This is for efficient removal of items from + * the RB_Tree based on guid. The guid is used as the key for the + * hash map, whereas the qos value is used as the key for the + * RB_Tree. + */ + Dispatch_Items_Hash_Map dispatch_items_hash_map_; + + /** + * RB_Tree implementation of priority queue of schedulable items. + */ + Dispatch_Items_Priority_Queue dispatch_items_prio_queue_; + }; +} + +#if !defined (__ACE_INLINE__) +//#include "DSRT_Sched_Queue_T.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "DSRT_Sched_Queue_T.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("DSRT_Sched_Queue_T.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* DSRT_SCHED_QUEUE_T_H */ diff --git a/ACE/Kokyu/Default_Dispatcher_Impl.cpp b/ACE/Kokyu/Default_Dispatcher_Impl.cpp new file mode 100644 index 00000000000..9403b46a892 --- /dev/null +++ b/ACE/Kokyu/Default_Dispatcher_Impl.cpp @@ -0,0 +1,170 @@ +// $Id$ + +#include "Default_Dispatcher_Impl.h" +#include "ace/Sched_Params.h" + +#if ! defined (__ACE_INLINE__) +#include "Default_Dispatcher_Impl.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Default_Dispatcher_Impl, "$Id$") + +namespace Kokyu +{ + +Default_Dispatcher_Impl::Default_Dispatcher_Impl () + : activated_ (0) +{ +} + +int +Default_Dispatcher_Impl::init_i (const Dispatcher_Attributes& attrs) +{ + //create and init the dispatcher tasks here + + ACE_DEBUG ((LM_DEBUG, "entering init_t\n" )); + int size; + size = attrs.config_info_set_.size (); + + if (size == 0) + return -1; + + this->ntasks_ = size; + + Dispatcher_Task_Auto_Ptr * tasks_array=0; + ACE_NEW_RETURN (tasks_array, Dispatcher_Task_Auto_Ptr[ntasks_], -1); + + //ACE_DEBUG ((LM_DEBUG, "after new on task array\n" )); + tasks_.reset(tasks_array); + + //ACE_DEBUG ((LM_DEBUG, "task array auto_ptr set\n" )); + + ConfigInfoSet& config_set = + const_cast (attrs.config_info_set_); + ConfigInfoSet::ITERATOR iter(config_set); + int i=0; + + ConfigInfo* config; + for (;i tmp_task_auto_ptr (task); + tasks_[i++] = tmp_task_auto_ptr; + //I couldn't use reset because MSVC6 auto_ptr does not have reset method. + //So in configurations where the auto_ptr maps to the std::auto_ptr instead + //of ACE auto_ptr, this would be a problem. + //tasks_[i++].reset (task); + } + + this->thr_creation_flags_ = attrs.thread_creation_flags (); + + if (attrs.immediate_activation_ && !this->activated_) + { + this->activate_i (); + } + + curr_config_info_ = attrs.config_info_set_; + return 0; +} + +int +Default_Dispatcher_Impl::activate_i () +{ + int i; + + if (this->activated_) + return 0; + + for(i=0; iget_curr_config_info ().thread_priority_; + + if (this->tasks_[i]->activate (this->thr_creation_flags_, + 1, 1, priority) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("EC (%P|%t) cannot activate queue.") + ACE_TEXT ("Need superuser privilege to run in RT class\n")), + -1); + } + } + + this->activated_ = 1; + return 0; +} + +Dispatcher_Task* +Default_Dispatcher_Impl::find_task_with_preemption_prio (Priority_t prio) +{ + int i; + + if (prio >=0) + { + for( i=0; ipreemption_priority () == prio) + return tasks_[i].get(); + } + } + + return 0; +} + +int +Default_Dispatcher_Impl::dispatch_i (const Dispatch_Command* cmd, + const QoSDescriptor& qos_info) +{ + //delegate to the appropriate task + if (qos_info.preemption_priority_ < 0) + return -1; + + Dispatcher_Task* task = + find_task_with_preemption_prio (qos_info.preemption_priority_); + + //@@VS - We should insert this into the lowest prio queue. + //How do we know that the last queue is the lowest prio queue. + if (task == 0) + task = tasks_[ntasks_-1].get (); + + return task->enqueue (cmd, qos_info); +} + +int +Default_Dispatcher_Impl::shutdown_i () +{ + //This needs to be revisited based on mode transition and + //consistent cut through the queues + + //post shutdown command to all tasks + int i; + + for(i=0; ienqueue (shutdown_cmd, qos_info); + } + + //wait for all tasks to exit + for (i=0; iwait (); + } + + return 0; +} + +int +Shutdown_Task_Command::execute () +{ + return -1; +} + +} diff --git a/ACE/Kokyu/Default_Dispatcher_Impl.h b/ACE/Kokyu/Default_Dispatcher_Impl.h new file mode 100644 index 00000000000..f87204d1569 --- /dev/null +++ b/ACE/Kokyu/Default_Dispatcher_Impl.h @@ -0,0 +1,77 @@ +/* -*- C++ -*- */ +/** + * @file Default_Dispatcher_Impl.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + * Based on previous work by Tim Harrison (harrison@cs.wustl.edu), + * Chris Gill, Carlos O'Ryan and other members of the DOC group. + */ + +#ifndef DEFAULT_DISPATCHER_IMPL_H +#define DEFAULT_DISPATCHER_IMPL_H +#include /**/ "ace/pre.h" + +#include "ace/Task.h" + + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Auto_Ptr.h" +#include "kokyu_export.h" +#include "Kokyu_defs.h" +#include "Dispatcher_Impl.h" +#include "Dispatcher_Task.h" + +namespace Kokyu +{ + /** + * @class Default_Dispatcher_Impl + * + * @brief Default implementation class for EC dispatcher + * implementations + * + */ + class Default_Dispatcher_Impl : public Dispatcher_Impl + { + public: + Default_Dispatcher_Impl (); + + private: + int activate_i (); + int init_i (const Dispatcher_Attributes&); + int dispatch_i (const Dispatch_Command*, + const QoSDescriptor&); + int shutdown_i (); + Dispatcher_Task* find_task_with_preemption_prio (Priority_t); + + private: + typedef auto_ptr Dispatcher_Task_Auto_Ptr; + ACE_Auto_Array_Ptr tasks_; + int ntasks_; + ConfigInfoSet curr_config_info_; + int activated_; + }; + + class Shutdown_Task_Command : public Dispatch_Command + { + public: + /// Constructor + Shutdown_Task_Command (ACE_Allocator *mb_allocator = 0); + + /// Command callback + int execute (); + }; + +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "Default_Dispatcher_Impl.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* DEFAULT_DISPATCHER_IMPL_H */ diff --git a/ACE/Kokyu/Default_Dispatcher_Impl.i b/ACE/Kokyu/Default_Dispatcher_Impl.i new file mode 100644 index 00000000000..bf1e76c3ce7 --- /dev/null +++ b/ACE/Kokyu/Default_Dispatcher_Impl.i @@ -0,0 +1,11 @@ +// $Id$ + +namespace Kokyu +{ +ACE_INLINE +Shutdown_Task_Command::Shutdown_Task_Command (ACE_Allocator *mb_allocator) + :Dispatch_Command(0,mb_allocator) +{ +} + +} diff --git a/ACE/Kokyu/Dispatch_Deferrer.cpp b/ACE/Kokyu/Dispatch_Deferrer.cpp new file mode 100644 index 00000000000..134a9a900e4 --- /dev/null +++ b/ACE/Kokyu/Dispatch_Deferrer.cpp @@ -0,0 +1,112 @@ +// $Id$ + +#include "ace/Sched_Params.h" +#include "Dispatch_Deferrer.h" +#include "Dispatcher_Task.h" + +#if ! defined (__ACE_INLINE__) +#include "Dispatch_Deferrer.i" +#endif /* __ACE_INLINE__ */ + +#include "kokyu_config.h" +#include "kokyu_dsui_families.h" +#include +ACE_RCSID(Kokyu, Dispatch_Deferrer, "$Id$") + +namespace Kokyu +{ + +int +Dispatch_Deferrer::init(const Dispatch_Deferrer_Attributes& attr) +{ + //set up reactor for timeouts + this->react_.open(0); + //Don't need any handles, since we're only using it for timeouts + + this->timers_.open(); + + this->task_ = attr.task_; + + return 0; +} + +int +Dispatch_Deferrer::dispatch (Dispatch_Queue_Item *qitem) +{ + ACE_ASSERT(qitem != 0); + + //setup timout + //For now, assume period = deadline + ACE_Time_Value tv; + tv = ACE_OS::gettimeofday() + qitem->qos_info().deadline_; + long timer_id = this->react_.schedule_timer(this, + 0, //NULL arg + tv); + if (timer_id < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("EC (%P|%t) cannot schedule Release Guard timer.") + ACE_TEXT ("ACE_Reactor.schedule_timer() returned -1\n")), + -1); + } + //else valid timer_id + this->timers_.bind(qitem,timer_id); + + //@BT INSTRUMENT with event ID: EVENT_DEFERRED_ENQUEUE Measure time + //between release and enqueue into dispatch queue because of RG + DSUI_EVENT_LOG(DISP_DEFERRER_FAM, EVENT_DEFERRED_ENQUEUE, timer_id, 0, NULL); + + //buffer until timer expires + return this->rgq_.enqueue_deadline(qitem,&tv); +} + + +int +Dispatch_Deferrer::handle_timeout (const ACE_Time_Value &, + const void *) +{ + //get all expired Dispatch_Queue_Items + ACE_Message_Block *begin,*end; + this->rgq_.remove_messages(begin,end, + (u_int) (ACE_Dynamic_Message_Strategy::LATE + | ACE_Dynamic_Message_Strategy::BEYOND_LATE)); + + //dispatch them back to Dispatcher_Impl + while (begin <= end) + { + Dispatch_Queue_Item *qitem = + dynamic_cast (begin); + + if (qitem == 0) + { + ACE_Message_Block::release (begin); + continue; + } + + + //remove timer for each enqueued qitem from reactor + long timer_id; + if (this->timers_.find(qitem,timer_id) < 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Could not cancel Release Guard timer.") + ACE_TEXT ("Unknown timer ID\n")), + -1); + } + //else got timer_id + this->react_.cancel_timer(timer_id); + + //@BT INSTRUMENT with event ID: EVENT_DEFERRED_DEQUEUE Measure + //time between release and enqueue into dispatch queue because + //of RG + DSUI_EVENT_LOG (DISP_DEFERRER_FAM, EVENT_DEFERRED_DEQUEUE, timer_id, 0, NULL); + + this->task_->enqueue(qitem); + + ++begin; + } + + return 0; +} + +} //namespace Kokyu diff --git a/ACE/Kokyu/Dispatch_Deferrer.h b/ACE/Kokyu/Dispatch_Deferrer.h new file mode 100644 index 00000000000..f110070a74c --- /dev/null +++ b/ACE/Kokyu/Dispatch_Deferrer.h @@ -0,0 +1,95 @@ +/* -*- C++ -*- */ +/** + * @file Dispatch_Deferrer.h + * + * $Id$ + * + * @author Bryan Thrall (thrall@cse.wustl.edu) + * + */ + +#ifndef DISPATCH_DEFERRER_H +#define DISPATCH_DEFERRER_H +#include /**/ "ace/pre.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" +#include "Kokyu_defs.h" +#include "ace/Event_Handler.h" +#include "ace/Thread_Mutex.h" +#include "ace/Synch_T.h" +#include "ace/Message_Block.h" +#include "ace/Message_Queue.h" +#include "ace/Reactor.h" +#include "ace/Map.h" + +namespace Kokyu +{ + +class Dispatch_Task; //forward decl +class Dispatch_Queue_Item; //forward decl + +/** + * @class Dispatch_Deferrer + * + * @brief Part of the Release Guard protocol. When a Dispatch_Command + * needs to be dispatched later rather than when dispatch_i() is + * called on the Default_Dispatcher_Impl, it is passed to this + * object. When the appropriate time to dispatch the Dispatch_Command + * comes (last release time + period), this object calls enqueue() on + * the Dispatcher_Task. + */ +class Dispatch_Deferrer : public ACE_Event_Handler +{ + public: + Dispatch_Deferrer(); + //Default constructor + + ~Dispatch_Deferrer(); + //Destructor + + int init(const Dispatch_Deferrer_Attributes& attr); + + int dispatch (Dispatch_Queue_Item *qitem); + + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); + //TODO: what if need higher resolution timers? + + private: + ACE_Deadline_Message_Strategy msg_strat_; + + ///Stores the Dispatch_Commands in earliest-release-time order, + ///until they are dispatched. I decided to use an + ///ACE_Dynamic_Message_Queue because it supports deadline + ///ordering. This decision is also good because we can simply store + ///the Dispatch_Queue_Item given to us by the + ///Default_Dispatcher_Impl rather than allocate some structure to + ///hold the Dispatch_Command and QoSDescriptor. + ACE_Dynamic_Message_Queue rgq_; + + //Stores timer_ids from the Reactor (longs) using the + //Dispatch_Queue_Item the timer is for as the key. Used to + //cancel timers if they expire and are enqueued before the + //callback happens. + typedef ACE_Map_Manager Timer_Map; + + Timer_Map timers_; + + ///Manages timers for the Dispatch_Commands + ACE_Reactor react_; + + Dispatcher_Task* task_; +}; + +} //namespace Kokyu + +#if defined (__ACE_INLINE__) +#include "Dispatch_Deferrer.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif //DISPATCH_DEFERRER_H diff --git a/ACE/Kokyu/Dispatch_Deferrer.i b/ACE/Kokyu/Dispatch_Deferrer.i new file mode 100644 index 00000000000..315afce5598 --- /dev/null +++ b/ACE/Kokyu/Dispatch_Deferrer.i @@ -0,0 +1,29 @@ +// $Id$ + +namespace Kokyu +{ + +ACE_INLINE +Dispatch_Deferrer_Attributes::Dispatch_Deferrer_Attributes() +{ +} + +ACE_INLINE +Dispatch_Deferrer::Dispatch_Deferrer() + : msg_strat_() + , rgq_(this->msg_strat_) + , timers_() + , react_() + , task_(0) +{ +} + +ACE_INLINE +Dispatch_Deferrer::~Dispatch_Deferrer() +{ + this->react_.close(); + + this->timers_.close(); +} + +} //namespace Kokyu diff --git a/ACE/Kokyu/Dispatcher_Impl.cpp b/ACE/Kokyu/Dispatcher_Impl.cpp new file mode 100644 index 00000000000..9ad5994eb63 --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Impl.cpp @@ -0,0 +1,19 @@ +// $Id$ + +#include "Dispatcher_Impl.h" + +#if ! defined (__ACE_INLINE__) +#include "Dispatcher_Impl.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Dispatcher_Impl, "$Id$") + +namespace Kokyu +{ + +//virtual - so don't inline +Dispatcher_Impl::~Dispatcher_Impl() +{ +} + +} diff --git a/ACE/Kokyu/Dispatcher_Impl.h b/ACE/Kokyu/Dispatcher_Impl.h new file mode 100644 index 00000000000..bc4f5c1064c --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Impl.h @@ -0,0 +1,74 @@ +/* -*- C++ -*- */ +/** + * @file Dispatcher_Impl.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + * Based on previous work by Tim Harrison (harrison@cs.wustl.edu), + * Chris Gill, Carlos O'Ryan and other members of the DOC group. + */ + +#ifndef DISPATCHER_IMPL_H +#define DISPATCHER_IMPL_H +#include /**/ "ace/pre.h" + +#include "Kokyu_defs.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" + +namespace Kokyu +{ + /** + * @class Dispatcher + * + * @brief Base class for EC dispatcher implementations + * + * The responsibility of this class is to act as a common base class + * for different EC dispatcher implementations. This is an + * abstract base class and cannot be instantiated. + */ + class Kokyu_Export Dispatcher_Impl + { + public: + /// Configure the dispatcher. + int init (const Dispatcher_Attributes&); + + int activate (); + + /// dispatch a command (eg. event) based on the QoS supplied. + int dispatch (const Dispatch_Command*, + const QoSDescriptor&); + + /// shutdown the dispatcher. + int shutdown (); + + virtual ~Dispatcher_Impl(); + + private: + //following an idiom to avoid public virtual functions. + //instead make them private and use the template method + //pattern - "Virtually Yours" article in CUJ Experts Forum + + virtual int init_i (const Dispatcher_Attributes&) =0; + virtual int dispatch_i (const Dispatch_Command*, + const QoSDescriptor&) =0; + virtual int shutdown_i () =0; + virtual int activate_i () =0; + + protected: + int thr_creation_flags_; + }; +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "Dispatcher_Impl.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* DISPATCHER_IMPL_H */ diff --git a/ACE/Kokyu/Dispatcher_Impl.i b/ACE/Kokyu/Dispatcher_Impl.i new file mode 100644 index 00000000000..a1c31d16212 --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Impl.i @@ -0,0 +1,31 @@ +// $Id$ + +namespace Kokyu +{ + +ACE_INLINE +int Dispatcher_Impl::init (const Dispatcher_Attributes& attr) +{ + return init_i (attr); +} + +ACE_INLINE +int Dispatcher_Impl::dispatch (const Dispatch_Command* cmd, + const QoSDescriptor& qos_info) +{ + return dispatch_i (cmd, qos_info); +} + +ACE_INLINE +int Dispatcher_Impl::shutdown () +{ + return shutdown_i (); +} + +ACE_INLINE +int Dispatcher_Impl::activate () +{ + return activate_i (); +} + +} diff --git a/ACE/Kokyu/Dispatcher_Task.cpp b/ACE/Kokyu/Dispatcher_Task.cpp new file mode 100644 index 00000000000..25ae499ec68 --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Task.cpp @@ -0,0 +1,189 @@ +// $Id$ + +#include "Dispatcher_Task.h" + +#include "ace/Malloc_T.h" +#include "ace/OS_NS_errno.h" + +#if ! defined (__ACE_INLINE__) +#include "Dispatcher_Task.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Dispatcher_Task, "$Id$") + +namespace +//anonymous namespace - use this to avoid polluting the global namespace +{ + const int ALLOC_POOL_CHUNKS = 200; +} + +namespace Kokyu +{ + +typedef ACE_Cached_Allocator +Dispatch_Queue_Item_Allocator; + +int +Dispatcher_Task::initialize () +{ + switch(curr_config_info_.dispatching_type_) + { + case FIFO_DISPATCHING: + ACE_NEW_RETURN ( + this->the_queue_, + ACE_Message_Queue, + -1); + break; + + case DEADLINE_DISPATCHING: + ACE_NEW_RETURN ( + this->the_queue_, + ACE_Dynamic_Message_Queue (deadline_msg_strategy_), + -1); + break; + + case LAXITY_DISPATCHING: + ACE_NEW_RETURN ( + this->the_queue_, + ACE_Dynamic_Message_Queue (laxity_msg_strategy_), + -1); + break; + + default: + return -1; + break; + } + + if (this->the_queue_ != 0) + { + this->msg_queue(this->the_queue_); + } + + if (this->allocator_ == 0) + { + ACE_NEW_RETURN (this->allocator_, + Dispatch_Queue_Item_Allocator(ALLOC_POOL_CHUNKS), + -1); + own_allocator_ = 1; + } + + return 0; +} + +int +Dispatcher_Task::svc (void) +{ + int done = 0; + + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + int prio; + + if (ACE_Thread::getprio (thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("getprio not supported on this platform\n") + )); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("getprio failed")), + -1); + } + + //ACE_DEBUG ((LM_DEBUG, "(%t) Dispatcher Thread started prio=%d\n", prio)); + + while (!done) + { + ACE_Message_Block *mb; + if (this->getq (mb) == -1) + if (ACE_OS::last_error () == ESHUTDOWN) + return 0; + else + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) getq error in Dispatching Queue\n")); + + //ACE_DEBUG ((LM_DEBUG, "(%t) : next command got from queue\n")); + + Dispatch_Queue_Item *qitem = + dynamic_cast (mb); + + if (qitem == 0) + { + ACE_Message_Block::release (mb); + continue; + } + + Dispatch_Command* command = qitem->command (); + + ACE_ASSERT(command != 0); + int result = command->execute (); + + if (command->can_be_deleted ()) + command->destroy (); + + ACE_Message_Block::release (mb); + + if (result == -1) + done = 1; + } + return 0; +} + +int +Dispatcher_Task::enqueue (const Dispatch_Command* cmd, + const QoSDescriptor& qos_info) +{ + void* buf = this->allocator_->malloc (sizeof (Dispatch_Queue_Item)); + + if (buf == 0) + return -1; + + ACE_Message_Block *mb = + new (buf) Dispatch_Queue_Item (cmd, + qos_info, + &(this->data_block_), + ACE_Message_Block::DONT_DELETE, + this->allocator_); + + this->putq (mb); + + return 0; +} + +int Dispatcher_Task::get_native_prio () +{ + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + int prio; + + if (ACE_Thread::getprio (thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("getprior not supported on this platform\n") + )); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("getprio failed")), + -1); + } + + return prio; +} + +void Dispatch_Queue_Item::init_i (const QoSDescriptor& qos_info) +{ + this->msg_priority (qos_info.preemption_priority_); + this->msg_execution_time (qos_info.execution_time_); + this->msg_deadline_time (qos_info.deadline_); +} + +} + diff --git a/ACE/Kokyu/Dispatcher_Task.h b/ACE/Kokyu/Dispatcher_Task.h new file mode 100644 index 00000000000..0bb91313f47 --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Task.h @@ -0,0 +1,106 @@ +/* -*- C++ -*- */ +/** + * @file Dispatcher_Task.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + * Based on previous work by Tim Harrison (harrison@cs.wustl.edu), + * Chris Gill, Carlos O'Ryan and other members of the DOC group. + */ + +#ifndef DISPATCHER_TASK_H +#define DISPATCHER_TASK_H +#include /**/ "ace/pre.h" +#include "ace/Task.h" +#include "ace/Lock_Adapter_T.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Kokyu_defs.h" + +namespace Kokyu +{ + +class Dispatch_Queue_Item : public ACE_Message_Block +{ +public: + Dispatch_Queue_Item ( + const Dispatch_Command* cmd, + const QoSDescriptor& qos_info, + ACE_Allocator* mb_allocator =0); + + Dispatch_Queue_Item ( + const Dispatch_Command* cmd, + const QoSDescriptor& qos_info, + ACE_Data_Block* data_block, + int flags, + ACE_Allocator* mb_allocator =0); + + Dispatch_Command* command (); + +private: + void init_i(const QoSDescriptor&); + +private: + const Dispatch_Command* command_; + QoSDescriptor qos_info_; +}; + +/** + * @class Dispatcher_Task + * + * @brief Implement the dispatching queues for FIFO and Priority + * dispatching. + * + */ +class Dispatcher_Task : public ACE_Task +{ +public: + /// Constructor + Dispatcher_Task (const ConfigInfo& config_info, + ACE_Thread_Manager* thr_manager = 0); + + + ~Dispatcher_Task (); + int initialize(); + + int enqueue (const Dispatch_Command* cmd, + const QoSDescriptor& qos_info); + + /// Process the events in the queue. + int svc (void); + + const ConfigInfo& get_curr_config_info() const; + Priority_t preemption_priority() const; + +private: + static int get_native_prio(); + +private: + ConfigInfo curr_config_info_; + + ACE_Allocator *allocator_; + int own_allocator_; + + /// Helper data structure to minimize memory allocations... + ACE_Locked_Data_Block > data_block_; + + /// The queue + ACE_Message_Queue* the_queue_; + + ACE_Deadline_Message_Strategy deadline_msg_strategy_; + ACE_Laxity_Message_Strategy laxity_msg_strategy_; +}; + +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "Dispatcher_Task.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* DISPATCHER_TASK_H */ diff --git a/ACE/Kokyu/Dispatcher_Task.i b/ACE/Kokyu/Dispatcher_Task.i new file mode 100644 index 00000000000..e663509d86b --- /dev/null +++ b/ACE/Kokyu/Dispatcher_Task.i @@ -0,0 +1,81 @@ +// $Id$ + +namespace Kokyu +{ +ACE_INLINE +Dispatcher_Task::Dispatcher_Task (const ConfigInfo& config_info, + ACE_Thread_Manager* thr_manager) + : ACE_Task (thr_manager), + curr_config_info_ (config_info), + allocator_ (config_info.allocator_), + own_allocator_ (0), + deadline_msg_strategy_ (config_info.reordering_flags_.static_bit_field_mask_, + config_info.reordering_flags_.static_bit_field_shift_, + config_info.reordering_flags_.dynamic_priority_max_, + config_info.reordering_flags_.dynamic_priority_offset_), + laxity_msg_strategy_ (config_info.reordering_flags_.static_bit_field_mask_, + config_info.reordering_flags_.static_bit_field_shift_, + config_info.reordering_flags_.dynamic_priority_max_, + config_info.reordering_flags_.dynamic_priority_offset_) +{ + this->initialize(); +} + +ACE_INLINE +Dispatcher_Task::~Dispatcher_Task () +{ + if (own_allocator_) + { + delete allocator_; + } +} + +ACE_INLINE +Priority_t +Dispatcher_Task::preemption_priority() const +{ + return curr_config_info_.preemption_priority_; +} + + +ACE_INLINE +const ConfigInfo& +Dispatcher_Task::get_curr_config_info() const +{ + return curr_config_info_; +} + +ACE_INLINE +Dispatch_Queue_Item::Dispatch_Queue_Item ( + const Dispatch_Command* cmd, + const QoSDescriptor& qos_info, + ACE_Data_Block *data_block, + int flags, + ACE_Allocator* mb_allocator) + : ACE_Message_Block (data_block, + flags, + mb_allocator), + command_ (cmd), qos_info_ (qos_info) + +{ + this->init_i (qos_info); +} + +ACE_INLINE +Dispatch_Queue_Item::Dispatch_Queue_Item ( + const Dispatch_Command* cmd, + const QoSDescriptor& qos_info, + ACE_Allocator* mb_allocator) + : ACE_Message_Block (mb_allocator), + command_ (cmd), qos_info_ (qos_info) +{ + this->init_i (qos_info); +} + +ACE_INLINE +Dispatch_Command* +Dispatch_Queue_Item::command() +{ + return const_cast (command_); +} +} diff --git a/ACE/Kokyu/Kokyu.cpp b/ACE/Kokyu/Kokyu.cpp new file mode 100644 index 00000000000..e447f731493 --- /dev/null +++ b/ACE/Kokyu/Kokyu.cpp @@ -0,0 +1,55 @@ +// $Id$ + +#include "Kokyu.h" + +#include "Default_Dispatcher_Impl.h" + +#if ! defined (__ACE_INLINE__) +#include "Kokyu.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Kokyu, "$Id$") + +namespace Kokyu +{ + +int Dispatcher::dispatch (const Dispatch_Command* cmd, const QoSDescriptor& qos) +{ + return dispatcher_impl_->dispatch (cmd, qos); +} + +int Dispatcher::shutdown () +{ + return dispatcher_impl_->shutdown (); +} + +int Dispatcher::activate () +{ + return dispatcher_impl_->activate (); +} + +void Dispatcher::implementation (Dispatcher_Impl* impl) +{ + auto_ptr tmp_impl (impl); + dispatcher_impl_ = tmp_impl; + + //I couldn't use reset because MSVC++ auto_ptr does not have reset method. + //So in configurations where the auto_ptr maps to the std::auto_ptr instead + //of ACE auto_ptr, this would be a problem. + //dispatcher_impl_.reset (impl); +} + +Dispatcher* +Dispatcher_Factory:: +create_dispatcher(const Dispatcher_Attributes& attrs) +{ + Dispatcher* disp; + Dispatcher_Impl* tmp; + ACE_NEW_RETURN (tmp, Default_Dispatcher_Impl, 0); + ACE_NEW_RETURN (disp, Dispatcher, 0); + disp->implementation (tmp); + tmp->init (attrs); + return disp; +} + +} diff --git a/ACE/Kokyu/Kokyu.dsui b/ACE/Kokyu/Kokyu.dsui new file mode 100644 index 00000000000..ab1e86740a1 --- /dev/null +++ b/ACE/Kokyu/Kokyu.dsui @@ -0,0 +1,37 @@ +# DSRT Dispatcher Impl (DSRT_Dispatcher_Impl.cpp) +DSTRM_EVENT DSRT_DISPATCH_IMPL 10 INIT2 0 "Initialize Dispatcher object" print_string + +# Dispatcher Task (Dispatcher_Task.cpp) +DSTRM_EVENT DISP_TASK 9 EVENT_DEFERRED 3 "Time event is deferred" print_string +DSTRM_EVENT DISP_TASK 9 EVENT_FINISHED_DISPATCHING 2 "End time of dispatching event" print_string +DSTRM_EVENT DISP_TASK 9 EVENT_START_DISPATCHING 1 "Start time of actually dispatching event" print_string +DSTRM_EVENT DISP_TASK 9 EVENT_DEQUEUED 0 "Time of event duqueue" print_string + +# Dispatch Deferrer (Dispatch_Deferrer.cpp) +DSTRM_EVENT DISP_DEFERRER 8 EVENT_DEFERRED_DEQUEUE 1 "Enqueue event in dispatch queue" print_string +DSTRM_EVENT DISP_DEFERRER 8 EVENT_DEFERRED_ENQUEUE 0 "Buffer event until timer expires" print_string + +# DSRT DIRECT Dispatcher (DSRT_DIRECT_Dispatcher_Impl_T.cpp) +DSTRM_EVENT DSRT_DIRECT_DISPATCH 7 SCHEDULE_EXIT 1 "Exit Schedule_i" print_string +DSTRM_EVENT DSRT_DIRECT_DISPATCH 7 SCHEDULE_ENTER 0 "Enter Schedule_i" print_string + +# DSRT CV Dispatcher (DSRT_CV_Dispatcher_Impl_T.cpp) +DSTRM_EVENT DSRT_CV_DISPATCH 6 INIT 13 "Enter DSRT_Dispatcher_Impl init" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 SHUTDOWN 12 "Enter shutdown_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 CANCEL_SCHEDULE_END 11 "Exit cancel_schedule_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 CANCEL_SCHEDULE_START 10 "Enter cancel_schedule_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 UPDATE_SCHEDULE_END 9 "Exit update_schedule_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 UPDATE_SCHEDULE_START 8 "Enter update_schedule_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 RELEASE_GUARD_END 7 "Exit release_guard_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 RELEASE_GUARD_START 6 "Enter release_guard_i function" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 INIT_I 5 "Initialization" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 NONRG_EVENT_RELEASED 4 "Event release time on the server side" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 RG_DELAYED_EVENT_RELEASED 3 "Event delayed by release guard release time on the server side" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 RG_EVENT_RELEASED 2 "Release guard Event release time on the server side" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 SCHEDULE_EXIT 1 "Exit Schedule_i" print_string +DSTRM_EVENT DSRT_CV_DISPATCH 6 SCHEDULE_ENTER 0 "Enter Schedule_i" print_string + +# DSRT_Dispatcher (Kokyu_dsrt.cpp) +DSTRM_EVENT DSRT_DISPATCH 5 CREATE_DSRT_DISPATCH_SCHED_END 2 "Done creating appropriate Scheduler" print_string +DSTRM_EVENT DSRT_DISPATCH 5 CREATE_DSRT_DISPATCH_SCHED_START 1 "Create appropriate Scheduler" print_string +DSTRM_EVENT DSRT_DISPATCH 5 SCHEDULE 0 "Schedule task" print_string diff --git a/ACE/Kokyu/Kokyu.h b/ACE/Kokyu/Kokyu.h new file mode 100644 index 00000000000..9414b9bfbe0 --- /dev/null +++ b/ACE/Kokyu/Kokyu.h @@ -0,0 +1,115 @@ +/* -*- C++ -*- */ +/** + * @file Kokyu.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + * Based on previous work by Tim Harrison Chris Gill, + * Carlos O'Ryan and other members of the DOC group. + */ + +#ifndef KOKYU_H +#define KOKYU_H +#include /**/ "ace/pre.h" +#include "ace/Copy_Disabled.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" +#include "Kokyu_defs.h" + +//Currently I am not seeing a way to avoid including these here. The +//whole purpose of the pImpl idiom is to avoid this dependency. But +//using the auto_ptr<> to store the implementation causes a compile +//error (in the destructor) that the implementation definition is not +//found. Note that the auto-ptr::~auto_ptr() calls delete on the +//internal pointer and at this point the class definition needs to be +//visible. Need to revisit this and see whether there is a work +//around. +#include "Dispatcher_Impl.h" + +//################################################################# +//Beware that this interface will be subject to change in the future +//since this is the very initial release. We will be working on +//trying to merge the DSRT and EC mechanisms in the future. If you +//are a user of this interface, kindly let us know so that we can +//coordinate with you when we refactor this interface. +//################################################################## + +namespace Kokyu +{ + //class Dispatcher_Impl; + + /** + * @class Dispatcher + * + * @brief Interface class for dynamic scheduling of events + * + * The responsibility of this class is to forward all methods to + * its delegation/implementation class, e.g., + * @c Default_Dispatcher_Impl. This class follows the pImpl idiom + * or the bridge pattern to separate the implementation from the interface. + * Dispatcher is the class that users will be using to achieve + * dynamic dispatching of events in an event channel. + */ + class Kokyu_Export Dispatcher : private ACE_Copy_Disabled + { + public: + /// Dispatch a command object based on the qos info supplied. + int dispatch (const Dispatch_Command*, const QoSDescriptor&); + + /// Shut down the dispatcher. The dispatcher will stop processing requests. + int shutdown (); + + /// Supply this interface with an appripriate implementation. + void implementation (Dispatcher_Impl*); + + int activate (); + + /// Non virtual destructor. Read as this class not available + /// for inheritance. + ~Dispatcher (); + private: + /// Auto ptr to the implementation. Implementation will be created on the + /// heap and deleted automatically when the dispatcher object is destructed. + auto_ptr dispatcher_impl_; + }; + + typedef auto_ptr Dispatcher_Auto_Ptr; + + /** + * @class Dispatcher_Factory + * + * @brief Factory class to create one of the dispatcher interface + * objects - for events. + * + * Factory class creates a dispatcher for EC and configures the + * interface object with the appropriate implementation. + */ + class Kokyu_Export Dispatcher_Factory : private ACE_Copy_Disabled + { + public: + /** + * Create a dispatcher for dynamic dispatching of commands + * (eg. events). The caller is responsible for freeing the + * returned dynamically allocated memory. + * + * @param config Configuration information for the dispatcher. + * + * @return pointer to the dispatcher. + */ + static Dispatcher* + create_dispatcher (const Dispatcher_Attributes& attr); + }; +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "Kokyu.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* KOKYU_H */ diff --git a/ACE/Kokyu/Kokyu.i b/ACE/Kokyu/Kokyu.i new file mode 100644 index 00000000000..0de9481c16d --- /dev/null +++ b/ACE/Kokyu/Kokyu.i @@ -0,0 +1,10 @@ +// $Id$ + +namespace Kokyu +{ +ACE_INLINE +Dispatcher::~Dispatcher() +{ +} + +} diff --git a/ACE/Kokyu/Kokyu.mpc b/ACE/Kokyu/Kokyu.mpc new file mode 100644 index 00000000000..aeb6e69941e --- /dev/null +++ b/ACE/Kokyu/Kokyu.mpc @@ -0,0 +1,27 @@ +// -*- MPC -*- now wouldn't this be cool... +// $Id$ + +project(Kokyu) : acelib, core { + sharedname = Kokyu + dynamicflags = KOKYU_BUILD_DLL + + Source_Files { + Dispatcher_Impl.cpp + Kokyu.cpp + Default_Dispatcher_Impl.cpp + Dispatcher_Task.cpp + Kokyu_defs.cpp + } + + Template_Files { + Kokyu_dsrt.cpp + DSRT_Direct_Dispatcher_Impl_T.cpp + DSRT_Dispatcher_Impl_T.cpp + DSRT_Dispatch_Item_T.cpp + DSRT_Sched_Queue_T.cpp + } + + Pkgconfig_Files { + Kokyu.pc.in + } +} diff --git a/ACE/Kokyu/Kokyu.mwc b/ACE/Kokyu/Kokyu.mwc new file mode 100644 index 00000000000..dd297256b34 --- /dev/null +++ b/ACE/Kokyu/Kokyu.mwc @@ -0,0 +1,5 @@ +// -*- MPC -*- +// $Id$ + +workspace { +} diff --git a/ACE/Kokyu/Kokyu.pc.in b/ACE/Kokyu/Kokyu.pc.in new file mode 100644 index 00000000000..f0130540d25 --- /dev/null +++ b/ACE/Kokyu/Kokyu.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: Kokyu +Description: Kokyu Scheduling Framework +Requires: ACE +Version: @VERSION@ +Libs: -L${libdir} -lKokyu +Cflags: -I${includedir} diff --git a/ACE/Kokyu/Kokyu_defs.cpp b/ACE/Kokyu/Kokyu_defs.cpp new file mode 100644 index 00000000000..33ebae25f31 --- /dev/null +++ b/ACE/Kokyu/Kokyu_defs.cpp @@ -0,0 +1,66 @@ +// $Id$ + +#include "Kokyu_defs.h" + +#if ! defined (__ACE_INLINE__) +#include "Kokyu_defs.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Kokyu_defs, "$Id$") + +namespace Kokyu +{ + Dispatch_Command::~Dispatch_Command (void) + { + } + + DSRT_ConfigInfo::DSRT_ConfigInfo () + :sched_policy_ (ACE_SCHED_RR), + sched_scope_ (ACE_SCOPE_THREAD) + { + } + +Dispatcher_Attributes::Dispatcher_Attributes() + :immediate_activation_ (0), + sched_policy_ (ACE_SCHED_FIFO), + sched_scope_ (ACE_SCOPE_THREAD), + base_thread_creation_flags_ (THR_NEW_LWP | THR_BOUND | THR_JOINABLE) +{ +} + +int Dispatcher_Attributes::thread_creation_flags () const +{ + int thread_creation_flags = base_thread_creation_flags_; + + switch (sched_policy_) + { + case ACE_SCHED_FIFO: + thread_creation_flags |= THR_SCHED_FIFO; + break; + + case ACE_SCHED_OTHER: + thread_creation_flags |= THR_SCHED_DEFAULT; + break; + + case ACE_SCHED_RR: + thread_creation_flags |= THR_SCHED_RR; + break; + } + + switch (sched_scope_) + { + case ACE_SCOPE_PROCESS: + case ACE_SCOPE_LWP: + thread_creation_flags |= THR_SCOPE_PROCESS; + break; + + case ACE_SCOPE_THREAD: + default: + thread_creation_flags |= THR_SCOPE_SYSTEM; + break; + } + return thread_creation_flags; +} + +} + diff --git a/ACE/Kokyu/Kokyu_defs.h b/ACE/Kokyu/Kokyu_defs.h new file mode 100644 index 00000000000..c2b83af847a --- /dev/null +++ b/ACE/Kokyu/Kokyu_defs.h @@ -0,0 +1,191 @@ +/* -*- C++ -*- */ +/** + * @file Kokyu_defs.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef KOKYU_DEFS_H +#define KOKYU_DEFS_H +#include /**/ "ace/pre.h" +#include "ace/Array.h" +#include "ace/Time_Value.h" +#include "ace/Auto_Ptr.h" +#include "ace/Message_Block.h" +#include "ace/Sched_Params.h" +#include "ace/Malloc_Allocator.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" + +namespace Kokyu +{ + typedef long Priority_t; + typedef ACE_Time_Value Deadline_t; //absolute deadline + typedef ACE_Time_Value Execution_Time_t; //execution time + //typedef int Guid_t; + + enum Dispatching_Type_t + // Defines the type of prioritization strategy + // to be used by a dispatching queue + { + FIFO_DISPATCHING, + DEADLINE_DISPATCHING, + LAXITY_DISPATCHING + }; + + enum Criticality_t + // Defines the criticality of the operation. + // For use with Dynamic Scheduler. + { + VERY_LOW_CRITICALITY, + LOW_CRITICALITY, + MEDIUM_CRITICALITY, + HIGH_CRITICALITY, + VERY_HIGH_CRITICALITY + }; + + enum Importance_t + // Defines the importance of the operation, + // which can be used by the RtecScheduler as a + // "tie-breaker" when other scheduling + // parameters are equal. + { + VERY_LOW_IMPORTANCE, + LOW_IMPORTANCE, + MEDIUM_IMPORTANCE, + HIGH_IMPORTANCE, + VERY_HIGH_IMPORTANCE + }; + + struct Kokyu_Export Reordering_Queue_Attributes + { + Reordering_Queue_Attributes (); + unsigned long static_bit_field_mask_; + unsigned long static_bit_field_shift_; + unsigned long dynamic_priority_max_; + unsigned long dynamic_priority_offset_; + }; + + struct Kokyu_Export ConfigInfo + { + Priority_t preemption_priority_; + + // OS priority of the dispatching thread associated with the queue + Priority_t thread_priority_; + + // type of dispatching queue + Dispatching_Type_t dispatching_type_; + + //allocator to be used for dynamic memory allocation. If each + //thread gets its own memory pool, contention will be less + ACE_Allocator *allocator_; + + Reordering_Queue_Attributes reordering_flags_; + + ConfigInfo (); + }; + + typedef ACE_Array ConfigInfoSet; + + class Kokyu_Export Dispatcher_Attributes + { + public: + ConfigInfoSet config_info_set_; + int immediate_activation_; + + public: + Dispatcher_Attributes (); + void sched_policy (int); + void sched_scope (int); + int thread_creation_flags () const; + + private: + int sched_policy_; + int sched_scope_; + int base_thread_creation_flags_; + }; + + + struct Kokyu_Export QoSDescriptor + { + Priority_t preemption_priority_; + Deadline_t deadline_; + Execution_Time_t execution_time_; + Importance_t importance_; + }; + + enum Block_Flag_t {BLOCK, UNBLOCK}; + + class Kokyu_Export Dispatch_Command + { + public: + Dispatch_Command(int dont_delete = 0, + ACE_Allocator *allocator = 0); + //dont_delete indicates whether this object needs to be deleted once processed. + //allocator indicates the ACE_Allocator, if any, from which this object was created. + //This same allocator has to be used for the deletion also + + /// Command callback + virtual int execute () = 0; + + int can_be_deleted () const; + + void destroy (void); + protected: + /// Destructor + // only inheritance is possible and object should be on heap, + // since object could be handed over to a different thread. + virtual ~Dispatch_Command (void); + + private: + int dont_delete_; + ACE_Allocator *allocator_; + //if this object has to be deleted, then delete it using the allocator + //if one is present. + }; + + enum DSRT_Sched_Type_t + { + DSRT_FP, + DSRT_MUF, + DSRT_MIF + }; + + enum DSRT_Dispatcher_Impl_t + { + DSRT_CV_BASED, + DSRT_OS_BASED + }; + + struct Kokyu_Export DSRT_ConfigInfo + { + //not used currently + DSRT_Sched_Type_t sched_strategy_; + + ACE_Sched_Params::Policy sched_policy_; + int sched_scope_; + + //type of implementation + DSRT_Dispatcher_Impl_t impl_type_; + + DSRT_ConfigInfo (); + }; + +} //end of namespace + +//to satisfy ACE_Array +ACE_INLINE bool operator != (const Kokyu::ConfigInfo& lhs, const Kokyu::ConfigInfo& rhs); + +#if defined (__ACE_INLINE__) +#include "Kokyu_defs.i" +#endif /* __ACE_INLINE__ */ + +#include /**/ "ace/post.h" +#endif /* KOKYU_DEFS_H */ diff --git a/ACE/Kokyu/Kokyu_defs.i b/ACE/Kokyu/Kokyu_defs.i new file mode 100644 index 00000000000..0cf793c2a1a --- /dev/null +++ b/ACE/Kokyu/Kokyu_defs.i @@ -0,0 +1,86 @@ +// $Id$ + + +namespace Kokyu +{ + +ACE_INLINE +void Dispatcher_Attributes::sched_policy(int policy) +{ + sched_policy_ = policy; +} + +ACE_INLINE +void Dispatcher_Attributes::sched_scope(int scope) +{ + sched_scope_ = scope; +} + +ACE_INLINE +Reordering_Queue_Attributes::Reordering_Queue_Attributes () + :static_bit_field_mask_ (0), // not used + static_bit_field_shift_ (0), // not used + dynamic_priority_max_ (0x7FFFFFFFUL), // 2^31-1 + dynamic_priority_offset_ (0x08000000UL) // 15/16th of dynamic prio range +{ + //bits for static priority = 0 + //max dynamic prio = 2^31 - 1 + //pending offset = 15/16th of the dynamic prio range + //which means that the LATE population will be in the + //1/16th part of the range. + + //For the Laxity and Deadline strategies these are the + //defaults defined in Message_Block.h + //static_bit_field_mask (0x3FFUL), // 2^(10) - 1 + //static_bit_field_shift (10), // 10 low order bits + //dynamic_priority_max (0x3FFFFFUL), // 2^(22)-1 + //dynamic_priority_offset (0x200000UL) // 2^(22-1) +} + +ACE_INLINE +Dispatch_Command::Dispatch_Command (int dont_delete, + ACE_Allocator *allocator) + :dont_delete_ (dont_delete), + allocator_ (allocator) +{ +} + +ACE_INLINE +int Dispatch_Command::can_be_deleted (void) const +{ + return !dont_delete_; +} + +ACE_INLINE +void Dispatch_Command::destroy (void) +{ + if (allocator_) + { + allocator_->free (this); + } + else + { + delete this; + } +} + +ACE_INLINE +ConfigInfo::ConfigInfo () + :preemption_priority_ (0), + thread_priority_ (0), + dispatching_type_ (FIFO_DISPATCHING), + allocator_ (0) +{ +} + +} + +//to satisfy ACE_Array +ACE_INLINE +bool operator != (const Kokyu::ConfigInfo& lhs, + const Kokyu::ConfigInfo& rhs) +{ + return (lhs.preemption_priority_ != rhs.preemption_priority_ || + lhs.thread_priority_ != rhs.thread_priority_ || + lhs.dispatching_type_ != rhs.dispatching_type_ ); +} diff --git a/ACE/Kokyu/Kokyu_dsrt.cpp b/ACE/Kokyu/Kokyu_dsrt.cpp new file mode 100644 index 00000000000..9cf8238994a --- /dev/null +++ b/ACE/Kokyu/Kokyu_dsrt.cpp @@ -0,0 +1,179 @@ +// $Id$ + +#include "Kokyu_dsrt.h" + +#include "ace/Dynamic_Service.h" +#include "DSRT_Direct_Dispatcher_Impl_T.h" +#include "DSRT_CV_Dispatcher_Impl_T.h" + +#if ! defined (__ACE_INLINE__) +#include "Kokyu_dsrt.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Kokyu, Kokyu, "$Id$") + +namespace Kokyu +{ + +template +void +DSRT_Dispatcher::implementation (DSRT_Dispatcher_Impl* impl) +{ + auto_ptr > tmp_impl (impl); + dispatcher_impl_ = tmp_impl; +} + +template +int +DSRT_Dispatcher::schedule (Guid_t guid, const DSRT_QoSDescriptor& qos) +{ + return dispatcher_impl_->schedule (guid, qos); +} + +template +int +DSRT_Dispatcher::update_schedule (Guid_t guid, const DSRT_QoSDescriptor& qos) +{ + return dispatcher_impl_->update_schedule (guid, qos); +} + +template +int +DSRT_Dispatcher::update_schedule (Guid_t guid, Kokyu::Block_Flag_t flag) +{ + return dispatcher_impl_->update_schedule (guid, flag); +} + +template +int +DSRT_Dispatcher::cancel_schedule (Guid_t guid) +{ + return dispatcher_impl_->cancel_schedule (guid); +} + +template +int DSRT_Dispatcher::shutdown () +{ + return dispatcher_impl_->shutdown (); +} + +template +DSRT_Dispatcher* +DSRT_Dispatcher_Factory:: +create_DSRT_dispatcher (const DSRT_ConfigInfo& config_info) +{ + ACE_UNUSED_ARG ((config_info)); + + DSRT_Dispatcher_Impl* tmp; + DSRT_Dispatcher* disp; + + switch (config_info.impl_type_) + { + case DSRT_OS_BASED: + ACE_NEW_RETURN (tmp, + DSRT_Direct_Dispatcher_Impl ( + config_info.sched_policy_, + config_info.sched_scope_), + 0); + break; + + case DSRT_CV_BASED: + default: + ACE_NEW_RETURN (tmp, + DSRT_CV_Dispatcher_Impl( + config_info.sched_policy_, + config_info.sched_scope_), + 0); + break; + } + + ACE_ASSERT (tmp != 0); + ACE_NEW_RETURN (disp, DSRT_Dispatcher, 0); + disp->implementation (tmp); + tmp->init (config_info); + return disp; +} + +template +int MUF_Comparator:: +operator ()(const QoSDescriptor_t& qos1, + const QoSDescriptor_t& qos2) +{ + if (qos1.criticality_ > qos2.criticality_) + { + return 1; + } + else if (qos2.criticality_ > qos1.criticality_) + { + return -1; + } + + typename QoSDescriptor_t::Now now_functor; + Time_t now = now_functor (); + + Time_t exec_time1 = qos1.exec_time_; + Time_t deadline1 = qos1.deadline_; + Time_t laxity1 = deadline1 - now - exec_time1; + Time_t exec_time2 = qos2.exec_time_; + Time_t deadline2 = qos2.deadline_; + Time_t laxity2 = deadline2 - now - exec_time2; + + if (laxity1 < laxity2) + { + return 1; + } + else if (laxity1 == laxity2) + { + return 0; + } + else + { + return -1; + } +} + +template +int MIF_Comparator:: +operator ()(const QoSDescriptor& qos1, + const QoSDescriptor& qos2) +{ +#ifdef KOKYU_DSRT_LOGGING + ACE_DEBUG ((LM_DEBUG, + "(%t|%T):qos1.importance = %d, qos2.importance = %d\n", + qos1.importance_, qos2.importance_)); +#endif + + if (qos1.importance_ > qos2.importance_) + { + return 1; + } + else if (qos1.importance_ == qos2.importance_) + { + return 0; + } + else + { + return -1; + } +} + +template +int Fixed_Priority_Comparator:: +operator ()(const QoSDescriptor& qos1, + const QoSDescriptor& qos2) +{ + if (qos1.priority_ > qos2.priority_) + { + return 1; + } + else if (qos1.priority_ == qos2.priority_) + { + return 0; + } + else + { + return -1; + } +} + +} diff --git a/ACE/Kokyu/Kokyu_dsrt.h b/ACE/Kokyu/Kokyu_dsrt.h new file mode 100644 index 00000000000..97aa4766ee6 --- /dev/null +++ b/ACE/Kokyu/Kokyu_dsrt.h @@ -0,0 +1,177 @@ +/* -*- C++ -*- */ +/** + * @file Kokyu_dsrt.h + * + * $Id$ + * + * @author Venkita Subramonian (venkita@cs.wustl.edu) + * + */ + +#ifndef KOKYU_DSRT_H +#define KOKYU_DSRT_H +#include /**/ "ace/pre.h" +#include "ace/Copy_Disabled.h" + +//#if !defined (ACE_LACKS_PRAGMA_ONCE) +//# pragma once +//#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "kokyu_export.h" +#include "Kokyu_defs.h" + +namespace Kokyu +{ + + template class DSRT_Dispatcher_Impl; + + /** + * @class DSRT_Dispatcher + * + * @brief Interface class for dynamic scheduling of threads + * + * The responsibility of this class is to forward all methods to + * its delegation/implementation class, e.g., + * @c Default_DSRT_Dispatcher_Impl. This class follows the pImpl idiom + * or the bridge pattern to separate the implementation from the interface. + * DSRT_Dispatcher is the class that users will be using to achieve + * dynamic scheduling of threads. + */ + template + class DSRT_Dispatcher : private ACE_Copy_Disabled + { + public: + typedef typename DSRT_Scheduler_Traits::Guid_t Guid_t; + typedef typename DSRT_Scheduler_Traits::QoSDescriptor_t DSRT_QoSDescriptor; + + // = Scheduling methods. + + /// Schedule a thread dynamically based on the qos info supplied. + int schedule (Guid_t guid, const DSRT_QoSDescriptor&); + + /// Update the schedule for a thread. This could alter the current schedule. + int update_schedule (Guid_t guid, const DSRT_QoSDescriptor&); + + /// Inform the scheduler that the caller thread is about to + /// block. This could alter the current schedule. + int update_schedule (Guid_t guid, Kokyu::Block_Flag_t flag); + + /// Cancel the schedule for a thread. This could alter the current schedule. + int cancel_schedule (Guid_t guid); + + /// Supply this interface with an appropriate implementation. + void implementation (DSRT_Dispatcher_Impl*); + + // = Termination methods. + + /// Shut down the dispatcher. The dispatcher will stop processing requests. + int shutdown (); + + /// Non virtual destructor. Read as this class not available + /// for inheritance. + ~DSRT_Dispatcher (); + + private: + /// Auto ptr to the implementation. Implementation will be created on the + /// heap and deleted automatically when the dispatcher object is destructed. + auto_ptr > dispatcher_impl_; + }; + + + /** + * @class DSRT_Dispatcher_Factory + * + * @brief Factory class to create one of the dispatcher interface + * objects - for events or DSRT threads. + * + * Factory class creates a dispatcher or DSRT dispatcher and configures + * the interface object with the appropriate implementation. + */ + + template + class DSRT_Dispatcher_Factory : private ACE_Copy_Disabled + { + public: + typedef auto_ptr > DSRT_Dispatcher_Auto_Ptr; + + /** + * Create a dispatcher for dynamic dispatching of threads. + * This will be used to dynamic scheduling of distributable threads for + * DSRTCORBA. The caller is responsible for freeing the memory. + * + * @param config Configuration information for the DSRT dispatcher. + * + * @return pointer to the DSRT dispatcher. + */ + static DSRT_Dispatcher* create_DSRT_dispatcher (const DSRT_ConfigInfo&); + }; + + /** + * @class MIF_Sched_Strategy + * + * @brief Strategy class implementing Maximum Importance First + * reordering strategy. + * + */ + template + class MIF_Comparator + { + public: + typedef typename QoSDesc::Importance_t Importance_t; + + int operator ()(const QoSDesc& qos1, + const QoSDesc& qos2); + }; + + /** + * @class Fixed_Priority_Sched_Strategy + * + * @brief Strategy class implementing Fixed Priority reordering + * strategy. + * + */ + template + class Fixed_Priority_Comparator + { + public: + typedef typename QoSDesc::Priority_t Priority_t; + + int operator ()(const QoSDesc& qos1, + const QoSDesc& qos2); + }; + + /** + * @class MUF_Sched_Strategy + * + * @brief Strategy class implementing Maximum Urgency First + * reordering strategy. + * + */ + template + class MUF_Comparator + { + public: + typedef typename QoSDesc::Criticality_t Criticality_t; + typedef typename QoSDesc::Time_t Time_t; + + int operator ()(const QoSDesc& qos1, + const QoSDesc& qos2); + }; + + +} //end of namespace + +#if defined (__ACE_INLINE__) +#include "Kokyu_dsrt.i" +#endif /* __ACE_INLINE__ */ + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "Kokyu_dsrt.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Kokyu_dsrt.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#include /**/ "ace/post.h" +#endif /* KOKYU_DSRT_H */ diff --git a/ACE/Kokyu/Kokyu_dsrt.i b/ACE/Kokyu/Kokyu_dsrt.i new file mode 100644 index 00000000000..1f9caf3c234 --- /dev/null +++ b/ACE/Kokyu/Kokyu_dsrt.i @@ -0,0 +1,11 @@ +// $Id$ + +namespace Kokyu +{ +template +ACE_INLINE +DSRT_Dispatcher::~DSRT_Dispatcher() +{ +} +} + diff --git a/ACE/Kokyu/Makefile.am b/ACE/Kokyu/Makefile.am new file mode 100644 index 00000000000..0df2e5495bb --- /dev/null +++ b/ACE/Kokyu/Makefile.am @@ -0,0 +1,78 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## + +includedir = @includedir@/Kokyu +pkgconfigdir = @libdir@/pkgconfig + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +SUBDIRS = \ + . \ + tests + +## Makefile.Kokyu.am + +lib_LTLIBRARIES = libKokyu.la + +libKokyu_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DKOKYU_BUILD_DLL + +libKokyu_la_SOURCES = \ + Default_Dispatcher_Impl.cpp \ + Dispatcher_Impl.cpp \ + Dispatcher_Task.cpp \ + Kokyu.cpp \ + Kokyu_defs.cpp + +libKokyu_la_LDFLAGS = \ + -version-number @ACE_MAJOR@:@ACE_MINOR@:@ACE_BETA@ + +libKokyu_la_LIBADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +nobase_include_HEADERS = \ + DSRT_Direct_Dispatcher_Impl_T.cpp \ + DSRT_Direct_Dispatcher_Impl_T.h \ + DSRT_Dispatch_Item_T.cpp \ + DSRT_Dispatch_Item_T.h \ + DSRT_Dispatch_Item_T.i \ + DSRT_Dispatcher_Impl_T.cpp \ + DSRT_Dispatcher_Impl_T.h \ + DSRT_Dispatcher_Impl_T.i \ + DSRT_Sched_Queue_T.cpp \ + DSRT_Sched_Queue_T.h \ + Default_Dispatcher_Impl.h \ + Default_Dispatcher_Impl.i \ + Dispatcher_Impl.h \ + Dispatcher_Impl.i \ + Dispatcher_Task.h \ + Dispatcher_Task.i \ + Kokyu.h \ + Kokyu.i \ + Kokyu_defs.h \ + Kokyu_defs.i \ + Kokyu_dsrt.cpp \ + Kokyu_dsrt.h \ + Kokyu_dsrt.i + +pkgconfig_DATA = Kokyu.pc + +Kokyu.pc: ${top_builddir}/config.status ${srcdir}/Kokyu.pc.in + ${top_builddir}/config.status --file $@:${srcdir}/Kokyu.pc.in + +EXTRA_DIST = \ + Kokyu.pc.in + + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/Kokyu/README b/ACE/Kokyu/README new file mode 100644 index 00000000000..9db0e95225b --- /dev/null +++ b/ACE/Kokyu/README @@ -0,0 +1,22 @@ +# $Id: + +Kokyu is a portable middleware scheduling framework designed to +provide flexible scheduling and dispatching services within the +context of higher-level middleware. Kokyu currently provides real-time +scheduling and dispatching services for TAO's real-time Event Service +which mediates supplier-consumer relationships between application +operations. Kokyu also provides a scheduling and dispatching framework +for threads. This is being used by the TAO RTCORBA 1.2 scheduler +implementations. For more information, see + +ACE_wrappers/Kokyu/docs/Kokyu.html + +To build Kokyu on Unix systems, from under $ACE_ROOT/Kokyu do + +make + +To build Kokyu on Windows, open the workspace +$ACE_ROOT/Kokyu/Kokyu.dsw and build it. + +An mpc file has also been provided - $ACE_ROOT/Kokyu/Kokyu.mpc from +which IDE specific build files can be generated. diff --git a/ACE/Kokyu/docs/Kokyu.html b/ACE/Kokyu/docs/Kokyu.html new file mode 100644 index 00000000000..55c8016cd1c --- /dev/null +++ b/ACE/Kokyu/docs/Kokyu.html @@ -0,0 +1,416 @@ + + + + + + + + Kokyu + + + +
+

+Kokyu - A middleware framework for flexible scheduling +and dispatching

+Introduction +
Strategized Scheduling framework +
Flexible Dispatching Framework +
Use of Kokyu within the TAO Real-time Event Channel(RTEC) +
Configuration of RTEC to use Kokyu dispatching +
Use of Kokyu within the Dynamic Scheduling +Real-time CORBA (DSRTCORBA) schedulers +
How to write a new DSRT scheduler using Kokyu +
Kokyu DSRTCORBA vs Kokyu RTEC +
Current status +
Future work +
Papers on Kokyu +
  +

+Introduction

+Kokyu is a portable middleware scheduling framework designed to provide +flexible scheduling and dispatching services within the context of higher-level +middleware. Kokyu currently provides real-time scheduling and dispatching +services for TAO’s real-time CORBA Event Service, which mediates supplier-consumer +relationships between application operations. Kokyu consists primarily +of two cooperating infrastructure segments, illustrated in Figure 1: +
+

+
Figure 1: Kokyu Scheduling and Dispatching Infrastructure

+ +
    +
  1. +A pluggable scheduling infrastructure with efficient support for adaptive +execution of diverse static, dynamic, and hybrid static/dynamic scheduling +heuristics.
  2. + +
  3. +A flexible dispatching infrastructure that allows composition of primitive +operating system and middleware mechanisms to enforce arbitrary scheduling +heuristics.
  4. +
+The scheduler is responsible for specifying how operation dispatch requests +are ordered, by assigning priority levels and rates to tasks, and producing +a configuration specification for the dispatching mechanism. The dispatcher +is responsible for enforcing the ordering of operation dispatches using +different threads, requests queues, and timers configured according to +the scheduler’s specification. The combined framework provides an implicit +projection of scheduling heuristics into appropriate dispatching infrastructure +configurations, so that the scheduling and dispatching infrastructure segments +can be optimized both separately and in combination. +

+Strategized Scheduling framework

+The Kokyu scheduling framework is designed to support a variety of scheduling +heuristics including RMS, EDF, MLF, and MUF. In addition, this framework +provides a common environment to compare systematically both existing and +new scheduling strategies. This flexibility is achieved in the Kokyu framework +via the Strategy pattern, which allows parts of the sequence of steps in +an algorithm to be replaced, thus providing interchangeable variations +within a consistent algorithmic framework. The Kokyu scheduling framework +uses the Strategy pattern to encapsulate a family of scheduling algorithms +within a fixed CORBA IDL interface, thereby enabling different strategies +to be configured independently from applications that use them. +

+Flexible Dispatching Framework

+The right side of Figure 1 shows the essential features of Kokyu’s flexible +task dispatching infrastructure. Key features of the dispatching infrastructure +that are essential to performing our optimizations are as follows: +

Dispatching queues: Each task is assigned by our strategized +Kokyu scheduling framework  to a specific dispatching queue, each +of which has an associated queue number, a queueing discipline, and a unique +operating-system-specific priority for its single associated dispatching +thread. +

Dispatching threads: Operating-system thread priorities decrease +as the queue number increases, so that the 0th queue is served by the highest +priority thread. Each dispatching thread removes the task from the head +of its queue and runs its entry point function to completion before retrieving +the next task to dispatch. Adapters can be applied to operations to intercept +and possibly short-circuit the entry-point upcall. In general, however, +the outermost operation entry point must complete on each dispatch. +

Queueing disciplines: Dispatching thread priorities determine +which queue is active at any given time: the highest priority queue with +a task to dispatch is always active, preempting tasks in lower priority +queues. In addition, each queue may have a distinct discipline for determining +which of its enqueued tasks has the highest eligibility, and must ensure +the highest is at the head of the queue at the point when one is to be +dequeued. We consider three disciplines: +

    +
  • +Static – Tasks are ordered by a static subpriority value – results in FIFO +ordering if all static subpriorities are made the same; static queues at +different priority levels can be used to implement an RMS scheduling strategy.
  • + +
  • +Deadline – Tasks are ordered by time to deadline; a single deadline queue +can be used to implement the earliest deadline first (EDF) scheduling strategy.
  • + +
  • +Laxity – Tasks are ordered by slack time, or laxity – the time to deadline +minus the execution time; a single laxity queue can be used to implement +the minimum laxity first (MLF) scheduling strategy; laxity queues at different +priority levels can be used to implement the maximum urgency first (MUF) +scheduling strategy.
  • +
+Any discipline for which a maximal eligibility may be selected can be employed +to manage a given dispatching queue in this approach. Scheduling strategies +can be constructed from one or more queues of each discipline alone, or +combinations of queues with different disciplines can be used. Figure 2  +illustrates the general queueing mechanism used by the dispatching modules +in the Kokyu dispatching framework. +
+

+

Figure 2: Example Queueing Mechanism in a Kokyu Dispatching Module

+ +

In addition, this figure shows how the output information provided by +the Kokyu scheduling framework is used to configure and operate a dispatching +module. During system initialization, each dispatching module obtains the +thread priority and dispatching type for each of its queues, typically +from the scheduling service’s output interface. Next, each queue is assigned +a unique dispatching priority number, a unique thread priority, and an +enumerated dispatching type. Finally, each dispatching module has an ordered +queue of pending dispatches per dispatching priority. To preserve QoS guarantees, +operations are inserted into the appropriate dispatching queue according +to their assigned dispatching priority. Operations within a dispatching +queue are ordered by their assigned dispatching subpriority. To minimize +priority inversions, operations are dispatched from the queue with the +highest thread priority, preempting any operation executing in a lower +priority thread. To minimize preemption overhead, there is no preemption +within a given priority queue. The following three values are defined for +the dispatching type: +

    +
  • +STATIC DISPATCHING: This type specifies a queue that only considers +the static portion of an operation’s dispatching subpriority.
  • + +
  • +DEADLINE DISPATCHING: This type specifies a queue that considers +the dynamic and static portions of an operation’s dispatching subpriority, +and updates the dynamic portion according to the time remaining until the +operation’s deadline.
  • + +
  • +LAXITY DISPATCHING: This type specifies a queue that considers the +dynamic and static portions of an operation’s dispatching subpriority, +and updates the dynamic portion according to the operation’s laxity.
  • +
+ +

+Use of Kokyu within the TAO Real-time Event Channel(RTEC)

+Figure 3 shows the sequence of operations that take place in the Kokyu +based dispatching module in the TAO RTEC. The client application registers +all relevant operations with the scheduler along with their real-time requirements. +This is done through the concept of an RT_Info +(see +TAO/orbsvcs/orbsvcs/RtecScheduler.idl) structure which is a structure that +contains the execution time, criticality, period, etc of an operation.  +The client then calls compute_schedule +method on the scheduler. The scheduler creates a dependency graphs of all +operations and partitions operations into equivalence classes based on +the scheduling parameters supplied. The scheduler can be configured to +have any scheduling policy which determines the equivalence class partitioning +(queues) and possibly a partial ordering of operations within an equivalence +class (ordering within a queue). Once this is done, the scheduler has the +configuration information for the Kokyu dispatcher like the number of dispatch +queues, priorities for the threads processing each queue, etc. +

When the client calls activate +on the event channel, the EC inturn activates the Kokyu based EC dispatching +module. The EC dispatching module queries the dispatch configuration from +the scheduler and uses that to create the Kokyu dispatcher with the appropriate +number of lanes and threads. When an event is pushed into the EC, the EC +pushes the event to the appropriate consumers, who are subscribed to that +event. For each consumer, the EC queries the scheduler for the RT_Info +of that consumer. It then hands over the event to the Kokyu based dispatching +module. The dispatching module then enqueues the event into the appropriate +queue for processing by the thread watching that queue. +

+

+

Figure 3: Kokyu based dispatching module within TAO RTEC

+ +

+Configuration of RTEC to use Kokyu dispatching

+Static configuration: In the svc.conf file, make sure you +have the following configuration for Kokyu dispatching. You can combine +this with other -ECxxx options. +

static EC_Factory "-ECdispatching kokyu +SCHED_FIFO -ECscheduling kokyu -ECfiltering kokyu" +

To run the threads in the real-time FIFO class, use SCHED_FIFO. You +could use SCHED_RR and SCHED_OTHER also. +
The default is SCHED_FIFO. +

In your program, call +

TAO_EC_Kokyu_Factory::init_svcs (); +

to statically create the EC Kokyu dispatching and other Kokyu related +modules. +

Dynamic configuration: In the svc.conf file, make sure +you have the following configuration for Kokyu dispatching. You can combine +this with other -ECxxx options. +

dynamic EC_Factory Service_Object * +TAO_RTKokyuEvent:_make_TAO_EC_Kokyu_Factory() "-ECdispatching kokyu -ECscheduling +kokyu -ECfiltering kokyu" +

+Use of Kokyu within the Dynamic Scheduling +Real-time CORBA (DSRTCORBA) schedulers

+An initial implementation of mechanisms to support DSRTCORBA schedulers +have been released. DSRTCORBA uses the concept of distributed threads, +which traverse multiple end systems giving the application the illusion +of a single logical thread executing an end-to-end task. The distributed +thread carries with it the scheduling parameters like importance, deadline, +etc so that it can get scheduled by a local scheduler on each endsystem. +The Kokyu DSRT dispatching framework is used as an enforcing mechanism. +

The DSRT schedulers are available in the directory $TAO_ROOT/examples/Kokyu_dsrt_schedulers. +They use the Kokyu DSRT +
dispatching classes present in $ACE_ROOT/Kokyu. These act as wrappers/adapters +around the Kokyu DSRT dispatcher. The Kokyu DSRT dispatcher is responsible +for scheduling threads which ask the dispatcher to schedule themselves. +Currently there are two implementations for the Kokyu DSRT dispatcher. +One uses a condition-variable based approach for scheduling threads and +the other manipulates priorities of threads and relies on the OS scheduler +for dispatching the threads appropriately. +

+CV-based approach:

+In this approach, it is assumed that the threads "yield" on a regular basis +to the scheduler by calling update_scheduling_segment. Only one +thread is running at any point in time. All the other threads are blocked +on a condition variable. When the currently running thread yields, it will +cause the condition variable to be signalled. All the eligible threads +are stored in a scheduler queue (rbtree), the most eligible thread determined +by the scheduling discipline. This approach has the drawback that it requires +a cooperative threading model, where threads yield voluntarily on a regular +basis. The application threads are responsible for doing this voluntary +yielding. +

+OS-based approach:

+This approach relies on the OS scheduler to do the actual thread dispatching. +The Kokyu DSRT dispatcher manipulates the priorities of the threads. The +scheduler maintains a queue (rbtree) of threads. The scheduler also has +an executive thread, which runs at the maximum available priority. This +thread runs in a continuous loop until the dispatcher is shut down. The +executive thread is responsible for selecting the most eligible thread +from the scheduler queue and bump up its priority if necessary while bumping +down the priority of the currently running thread, if it is not the most +eligible. There are four priority levels required for this mechanism to +work, listed in descending order of priorities. For example, a thread running +at Active priority will preempt a +
thread running at Inactive priority level. +
    +
  1. +Executive priority - priority at which the scheduler executive thread runs.
  2. + +
  3. +Blocked priority - this is the priority to which threads about to block +on remote calls will be bumped up to.
  4. + +
  5. +Active priority - this is the priority to which the most eligible thread +is set to.
  6. + +
  7. +Inactive priority - this is the priority to which all threads except the +most eligible thread is set to.
  8. +
+As soon as a thread asks to be scheduled, a wrapper object is created and +inserted into the queue. This object carries the qos (sched params) associated +with that thread. A condition variable is signalled to inform the executive +thread that the queue is "dirty". The scheduler thread picks up the most +eligble one and sets its priority to active and sets the currently +running thread priority to +
inactive. +

The drawback to this approach is that it relies on the OS scheduler +to dispatch the threads. Also, with the current implementation, there is +only one thread running at active priority and others are all at inactive +level. This will create undesirable effects with multi-processor systems, +which could select any one of the inactive level threads and this +could cause priority inversions. +

+How to write a new DSRT scheduler using Kokyu

+One can use one of the schedulers as a starting point. The variation points +are +
    +
  1. +The scheduler parameters that need to be propagated along with the service +context.
  2. + +
  3. +The QoS comparison function, that determines which thread is more eligible.
  4. +
+To aid (1), we have created a Svc_Ctxt_DSRT_QoS idl interface (see ./Kokyu_qos.pidl). +This interface currently has the necessary things to be propagated for +FP, MIF and MUF schedulers. This can be altered if necessary to accomodate +new sched params. The idea here is to let the IDL compiler generate the +marshalling code (including Any operators) so that these parameters can +be shipped across in the service context in an encapsulated CDR. +

To create customized QoS comparator functions, we used the idea of C++ +traits to let the user define customized comparator functions. For example, +the MIF scheduler uses the following traits class. +

  struct MIF_Scheduler_Traits +
  { +
    typedef RTScheduling::Current::IdType Guid_t; +

    struct _QoSDescriptor_t +
    { +
      typedef long Importance_t; +
      Importance_t importance_; +
    }; +

    typedef _QoSDescriptor_t QoSDescriptor_t; +

    typedef Kokyu::MIF_Comparator<QoSDescriptor_t> +QoSComparator_t; +

    class _Guid_Hash +
    { +
    public: +
      u_long operator () (const Guid_t& +id) +
      { +
        return ACE::hash_pjw +((const char *) id.get_buffer (), +
                              +id.length ()); +
      } +
    }; +

    typedef _Guid_Hash Guid_Hash; +
  }; +

The idea of traits makes the Kokyu dispatcher more flexible in terms +of creating new schedulers. For example, the Kokyu classes do not care +about what concrete type Guid is. It could be an OctetSequence for some +applications, whereas it could be an int for some others. The exact type +is defined by the application (in this case, the MIF scheduler) using the +traits class. In the above traits class the Guid's type is defined to be +an octet sequence (indirectly). The Kokyu dispatcher expects the following +typedef's to +
be present in the traits class: +

Guid_t - Type of GUID. +
QoSDescriptor_t - aggregate for scheduler parameters +
QoSComparator_t - used by the scheduler queue to determine +most eligible item +
Guid_Hash - used by the internal hash map in the scheduler +to hash the guid. +

It is also expected that the following operator be defined for comparing +QoS parameters. This comparator function will be used by the scheduler +queue to determine the most eligible item in the queue. +

QoSComparator_t::operator ()(const QoSDescriptor_t& qos1, +
            +const QoSDescriptor_t& qos2) +

+Kokyu DSRTCORBA vs Kokyu RTEC

+Currently we have separate interfaces for DSRTCORBA and RTEC dispatching +mechanisms. Once we get more use cases and experience, there is a possibility +of these getting merged in the future. The RTEC related dispatching interface +is in Kokyu::Dispatcher (Kokyu.h) and DSRTCORBA related dispatching +interface is in Kokyu::DSRT_Dispatcher (Kokyu_dsrt.h) +

+Current status

+Kokyu dispatching framework is available as a separate module under ACE_wrappers/Kokyu +as part of the ACE/TAO +distribution. Note that this module is not dependent on TAO, though +it is built on top of ACE. The TAO Event Channel uses the Strategy and +Service Configurator patterns to use configurable dispatching modules. +A Kokyu based EC dispatching module is available in the TAO/orbsvcs/orbsvcs/RTKokyuEvent +module. This module acts as an adapter between the Kokyu dispatcher and +the RTEC. +

Kokyu scheduling framework is available under the TAO source tree (TAO/orbsvcs/orbsvcs/Sched). +

An example using the RTEC Kokyu dispatching module is available under +TAO/orbsvcs/examples/RtEC/Kokyu. +

+Future work

+ +
    +
  1. +Currently there is no support for timers in the Kokyu dispatching module. +We plan to do this in the near future.
  2. + +
  3. +It looks like there is a general structure to the different schedulers. +May be this can be abstracted using templates or some similar mechanism.
  4. + +
  5. +Thread sched policy and sched scope are currently being passed explicitly +from the application to the scheduler. This can be changed later to get +this information from the ORB. This requires the usage of RTORB and the +actual values can be set using svc.conf parameters for RT_ORB_Loader.
  6. + +
      +
  7. +See whether the approaches could be extended to multiprocessor systems.
  8. +
+ +

+Papers on Kokyu

+ +
    +
  1. +Christopher D. Gill, Dissertation:Flexible +Scheduling in Middleware for Distributed Rate-Based Real-Time Applications
  2. + +
  3. +Christopher D. Gill, David L. Levine, and Douglas C. Schmidt The +Design and Performance of a Real-Time CORBA Scheduling Service, Real-Time +Systems: the International Journal of Time-Critical Computing Systems, +special issue on Real-Time Middleware, guest editor Wei Zhao, March 2001, +Vol. 20 No. 2
  4. + +
  5. +Christopher D. Gill, Douglas C. Schmidt, and Ron Cytron, Multi-Paradigm +Scheduling for Distributed Real-Time Embedded Computing, IEEE Proceedings +Special Issue on Modeling and Design of Embedded Systems, Volume 91, Number +1, January 2003.
  6. +
+ + + diff --git a/ACE/Kokyu/docs/KokyuEC.jpg b/ACE/Kokyu/docs/KokyuEC.jpg new file mode 100644 index 00000000000..b0c8103d2aa Binary files /dev/null and b/ACE/Kokyu/docs/KokyuEC.jpg differ diff --git a/ACE/Kokyu/docs/kokyu1.jpg b/ACE/Kokyu/docs/kokyu1.jpg new file mode 100644 index 00000000000..268c6ae3302 Binary files /dev/null and b/ACE/Kokyu/docs/kokyu1.jpg differ diff --git a/ACE/Kokyu/docs/kokyu2.jpg b/ACE/Kokyu/docs/kokyu2.jpg new file mode 100644 index 00000000000..f5e2386774e Binary files /dev/null and b/ACE/Kokyu/docs/kokyu2.jpg differ diff --git a/ACE/Kokyu/kokyu_config.h b/ACE/Kokyu/kokyu_config.h new file mode 100644 index 00000000000..4eab747376e --- /dev/null +++ b/ACE/Kokyu/kokyu_config.h @@ -0,0 +1,10 @@ +/* $Id$ */ +#ifndef KOKYU_CONFIG_H +#define KOKYU_CONFIG_H + +#define CONFIG_DSTREAM_DSRT_DISPATCH +#define CONFIG_DSTREAM_DSRT_CV_DISPATCH +#define CONFIG_DSTREAM_DSRT_DIRECT_DISPATCH +#define CONFIG_DSTREAM_DSRT_DISPATCH_IMPL + +#endif /* KOKYU_CONFIG_H */ diff --git a/ACE/Kokyu/kokyu_export.h b/ACE/Kokyu/kokyu_export.h new file mode 100644 index 00000000000..8b15053da15 --- /dev/null +++ b/ACE/Kokyu/kokyu_export.h @@ -0,0 +1,55 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl Kokyu +// ------------------------------ +#ifndef KOKYU_EXPORT_H +#define KOKYU_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) +# if !defined (KOKYU_HAS_DLL) +# define KOKYU_HAS_DLL 0 +# endif /* ! KOKYU_HAS_DLL */ +#else +# if !defined (KOKYU_HAS_DLL) +# define KOKYU_HAS_DLL 1 +# endif /* ! KOKYU_HAS_DLL */ +#endif + +#if defined (KOKYU_HAS_DLL) && (KOKYU_HAS_DLL == 1) +# if defined (KOKYU_BUILD_DLL) +# define Kokyu_Export ACE_Proper_Export_Flag +# define KOKYU_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define KOKYU_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* KOKYU_BUILD_DLL */ +# define Kokyu_Export ACE_Proper_Import_Flag +# define KOKYU_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define KOKYU_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* KOKYU_BUILD_DLL */ +#else /* KOKYU_HAS_DLL == 1 */ +# define Kokyu_Export +# define KOKYU_SINGLETON_DECLARATION(T) +# define KOKYU_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* KOKYU_HAS_DLL == 1 */ + +// Set KOKYU_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (KOKYU_NTRACE) +# if (ACE_NTRACE == 1) +# define KOKYU_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define KOKYU_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !KOKYU_NTRACE */ + +#if (KOKYU_NTRACE == 1) +# define KOKYU_TRACE(X) +#else /* (KOKYU_NTRACE == 1) */ +# define KOKYU_TRACE(X) ACE_TRACE_IMPL(X) +#endif /* (KOKYU_NTRACE == 1) */ + +#endif /* KOKYU_EXPORT_H */ + +// End of auto generated file. diff --git a/ACE/Kokyu/tests/DSRT_MIF/DSRT_MIF.mpc b/ACE/Kokyu/tests/DSRT_MIF/DSRT_MIF.mpc new file mode 100644 index 00000000000..62da58ed055 --- /dev/null +++ b/ACE/Kokyu/tests/DSRT_MIF/DSRT_MIF.mpc @@ -0,0 +1,7 @@ +// -*- MPC -*- +// $Id$ + +project: kokyu { + avoids += ace_for_tao + exename = MIF +} diff --git a/ACE/Kokyu/tests/DSRT_MIF/MIF.cpp b/ACE/Kokyu/tests/DSRT_MIF/MIF.cpp new file mode 100644 index 00000000000..4c943533ad6 --- /dev/null +++ b/ACE/Kokyu/tests/DSRT_MIF/MIF.cpp @@ -0,0 +1,185 @@ +// $Id$ + +#include "ace/ACE.h" +#include "ace/Auto_Ptr.h" +#include "ace/Task.h" +#include "ace/Sched_Params.h" +#include "ace/Atomic_Op.h" +#include "ace/High_Res_Timer.h" +#include "ace/Barrier.h" +#include "ace/Lock_Adapter_T.h" +#include "ace/Countdown_Time.h" + +#include "Kokyu_dsrt.h" + +ACE_Atomic_Op guid=0; + +struct mif_scheduler_traits +{ + typedef int Guid_t; + + struct QoSDescriptor_t + { + typedef long Importance_t; + + long importance_; + }; + + /* + static Time_t now() + { + ACE_Time_Value now = ACE_OS::gettimeofday (); + return now.sec () * 10000000 + now.usec () * 10; + } + */ + + typedef Kokyu::MIF_Comparator QoSComparator_t; + + struct Guid_Hash + { + u_long operator () (const Guid_t& guid) + { + return guid; + } + }; +}; + + +class MyTask : public ACE_Task_Base +{ +public: + + MyTask (ACE_Barrier& bar, + Kokyu::DSRT_Dispatcher* dispatcher, + mif_scheduler_traits::QoSDescriptor_t& qos, + int exec_duration) + :barrier_ (bar), + dispatcher_ (dispatcher), + qos_ (qos), + guid_ (++guid), + exec_duration_ (exec_duration) + {} + + int svc (void); + + private: + ACE_Barrier& barrier_; + Kokyu::DSRT_Dispatcher* dispatcher_; + mif_scheduler_traits::QoSDescriptor_t qos_; + mif_scheduler_traits::Guid_t guid_; + int exec_duration_; +}; + +int MyTask::svc (void) +{ + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + + ACE_DEBUG ((LM_DEBUG, "(%t|%T): task activated\n")); + ACE_ASSERT (dispatcher_ != 0); + + (void) dispatcher_->schedule (guid_, qos_); + + barrier_.wait (); + + long prime_number = 9619899; + + ACE_High_Res_Timer timer; + ACE_Time_Value elapsed_time; + ACE_Time_Value seconds_tracker(0,0); + + ACE_Time_Value one_second (1,0); + ACE_Time_Value compute_count_down_time (exec_duration_, 0); + ACE_Countdown_Time compute_count_down (&compute_count_down_time); + + timer.start (); + while (compute_count_down_time > ACE_Time_Value::zero) + { + ACE::is_prime (prime_number, + 2, + prime_number / 2); + + compute_count_down.update (); + timer.stop (); + timer.elapsed_time (elapsed_time); + seconds_tracker += elapsed_time; + if (seconds_tracker >= one_second) + { + seconds_tracker.set (0,0); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Currently running guid=%d") + ACE_TEXT (", qos_.importance=%d \n"), + guid_, qos_.importance_)); + } + timer.reset (); + timer.start (); + } + + dispatcher_->cancel_schedule (this->guid_); + return 0; +} + +int ACE_TMAIN (int,ACE_TCHAR**) +{ + Kokyu::DSRT_ConfigInfo config_info; + + // config_info.scheduler_type_ = Kokyu::SCHED_MIF; + config_info.impl_type_ = Kokyu::DSRT_OS_BASED; + + ACE_Barrier bar (3); + + ACE_DEBUG ((LM_DEBUG, "before create_dispatcher\n" )); + + config_info.sched_strategy_ = Kokyu::DSRT_MIF; + + Kokyu::DSRT_Dispatcher_Factory::DSRT_Dispatcher_Auto_Ptr + disp (Kokyu::DSRT_Dispatcher_Factory:: + create_DSRT_dispatcher (config_info)); + + ACE_DEBUG ((LM_DEBUG, "after create_dispatcher\n" )); + + ACE_ASSERT (disp.get () != 0); + + mif_scheduler_traits::QoSDescriptor_t qos1, qos2, qos3; + + qos1.importance_ = 1; + qos2.importance_ = 2; + qos3.importance_ = 3; + + MyTask mytask1 (bar, disp.get (), qos1, 15); + MyTask mytask2 (bar, disp.get (), qos2, 6); + MyTask mytask3 (bar, disp.get (), qos3, 4); + + long flags = THR_BOUND | THR_SCHED_FIFO; + + if (mytask1.activate (flags) == -1) + { + flags = THR_BOUND; + if (mytask1.activate (flags) == -1) + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) cannot activate task\n")); + } + + if (mytask2.activate (flags) == -1) + { + flags = THR_BOUND; + if (mytask2.activate (flags) == -1) + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) cannot activate task\n")); + } + + if (mytask3.activate (flags) == -1) + { + flags = THR_BOUND; + if (mytask3.activate (flags) == -1) + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) cannot activate task\n")); + } + + disp->shutdown (); + + ACE_DEBUG ((LM_DEBUG, "main thread exiting\n")); + + return 0; +} + diff --git a/ACE/Kokyu/tests/DSRT_MIF/Makefile.am b/ACE/Kokyu/tests/DSRT_MIF/Makefile.am new file mode 100644 index 00000000000..9e51858b971 --- /dev/null +++ b/ACE/Kokyu/tests/DSRT_MIF/Makefile.am @@ -0,0 +1,35 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -include /home/jtc/ACE/ACE-config3/MPC/config -include /home/jtc/ACE/ACE-config3/MPC/templates -type automake ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.DSRT_MIF.am +noinst_PROGRAMS = MIF + +MIF_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(ACE_ROOT)/Kokyu + +MIF_SOURCES = \ + MIF.cpp + +MIF_LDADD = \ + $(ACE_BUILDDIR)/Kokyu/libKokyu.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/Kokyu/tests/DSRT_MIF/svc.conf b/ACE/Kokyu/tests/DSRT_MIF/svc.conf new file mode 100644 index 00000000000..247c85945d9 --- /dev/null +++ b/ACE/Kokyu/tests/DSRT_MIF/svc.conf @@ -0,0 +1,4 @@ +# +dynamic DSRT_Scheduler_Impl Service_Object * +Kokyu:_make_MIF_Scheduler_Impl() +"-sched_scope thread -sched_policy fifo -min_importance 0 -max_importance 10" \ No newline at end of file diff --git a/ACE/Kokyu/tests/DSRT_MIF/svc.conf.xml b/ACE/Kokyu/tests/DSRT_MIF/svc.conf.xml new file mode 100644 index 00000000000..aef4b3ea0a9 --- /dev/null +++ b/ACE/Kokyu/tests/DSRT_MIF/svc.conf.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/ACE/Kokyu/tests/EDF/EDF.mpc b/ACE/Kokyu/tests/EDF/EDF.mpc new file mode 100644 index 00000000000..d0c7749e28e --- /dev/null +++ b/ACE/Kokyu/tests/EDF/EDF.mpc @@ -0,0 +1,6 @@ +// -*- MPC -*- +// $Id$ + +project: kokyu { + exename = EDF +} diff --git a/ACE/Kokyu/tests/EDF/Makefile.am b/ACE/Kokyu/tests/EDF/Makefile.am new file mode 100644 index 00000000000..22b09e8251a --- /dev/null +++ b/ACE/Kokyu/tests/EDF/Makefile.am @@ -0,0 +1,35 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -include /home/jtc/ACE/ACE-config3/MPC/config -include /home/jtc/ACE/ACE-config3/MPC/templates -type automake ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.EDF.am +noinst_PROGRAMS = EDF + +EDF_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(ACE_ROOT)/Kokyu + +EDF_SOURCES = \ + test.cpp + +EDF_LDADD = \ + $(ACE_BUILDDIR)/Kokyu/libKokyu.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/Kokyu/tests/EDF/README b/ACE/Kokyu/tests/EDF/README new file mode 100644 index 00000000000..4f209605e14 --- /dev/null +++ b/ACE/Kokyu/tests/EDF/README @@ -0,0 +1,24 @@ +This example is a very simple example, showing how to use the Kokyu +dispatcher to dispatch command objects in a EDF manner. The test +configures the Kokyu dispatcher with a single EDF lane. Typically this +would be done by an EDF scheduler which assigns priorities to tasks +based on the deadlines for tasks. To ensure that the command objects +enqueued in the dispatcher are dispatched in the correct order, we +enqueue the command objects and *then* activate the dispatcher. Only +when the dispatcher is activated, the thread watching each dispatch +queue starts running. + +To run this example, + +./test -p + +The following is the expected output + +Deadline of command1 is 1065966081 +Deadline of command2 is 1065966131 +Deadline of command3 is 1065966031 +command 3 executed +command 1 executed +command 2 executed + +Note that the deadlines are absolute deadlines. diff --git a/ACE/Kokyu/tests/EDF/test.cpp b/ACE/Kokyu/tests/EDF/test.cpp new file mode 100644 index 00000000000..0c6939ee613 --- /dev/null +++ b/ACE/Kokyu/tests/EDF/test.cpp @@ -0,0 +1,162 @@ +// $Id$ + +#include "ace/Auto_Ptr.h" + +#include "Kokyu.h" +#include "ace/Task.h" +#include "ace/Sched_Params.h" +#include "ace/SString.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_strings.h" +#include "ace/OS_NS_sys_time.h" + +ACE_CString sched_policy_str = "fifo"; + +int parse_args (int argc, ACE_TCHAR *argv[]); + +class MyCommand : public Kokyu::Dispatch_Command +{ +public: + MyCommand(int i) + :Kokyu::Dispatch_Command(1),id_(i) + { + } + int execute(); + +private: + int id_; +}; + +int MyCommand::execute() +{ + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + int prio; + + if (ACE_Thread::getprio (thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("getprior not supported on this platform\n") + )); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("getprio failed")), + -1); + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t|prio=%d) | command %d executed\n"), + prio, id_)); + return 0; +} + +int ACE_TMAIN (int argc, ACE_TCHAR** argv) +{ + Kokyu::ConfigInfoSet config_info(3); + + int sched_policy=ACE_SCHED_FIFO; + + Kokyu::Dispatcher_Attributes attrs; + + if (parse_args (argc, argv) == -1) + return 0; + + if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "fifo") == 0) + { + sched_policy = ACE_SCHED_FIFO; + } + else if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "other") == 0) + { + sched_policy = ACE_SCHED_OTHER; + } + else if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "rr") == 0) + { + sched_policy = ACE_SCHED_RR; + } + + attrs.sched_policy (sched_policy); + + Kokyu::Priority_t min_prio = + ACE_Sched_Params::priority_min (sched_policy); + + config_info[0].preemption_priority_ = 1; + config_info[0].thread_priority_ = min_prio; + config_info[0].dispatching_type_ = Kokyu::DEADLINE_DISPATCHING; + + ACE_DEBUG ((LM_DEBUG, "before create_dispatcher\n" )); + + attrs.config_info_set_ = config_info; + auto_ptr + disp (Kokyu::Dispatcher_Factory::create_dispatcher (attrs)); + + ACE_ASSERT (disp.get() != 0); + + MyCommand cmd1(1), cmd2(2), cmd3(3); + + Kokyu::QoSDescriptor qos1, qos2, qos3; + + // Get the current time. + ACE_Time_Value current_time = ACE_OS::gettimeofday (); + + ACE_Time_Value deadline1, deadline2, deadline3; + + deadline1 = current_time + ACE_Time_Value(150,0); + deadline2 = current_time + ACE_Time_Value(200,0); + deadline3 = current_time + ACE_Time_Value(100,0); + + qos1.preemption_priority_ = 1; + qos1.deadline_ = deadline1; + qos2.preemption_priority_ = 1; + qos2.deadline_ = deadline2; + qos3.preemption_priority_ = 1; + qos3.deadline_ = deadline3; + + ACE_DEBUG ((LM_DEBUG, "Deadline of command1 is %d\n", + qos1.deadline_.sec ())); + disp->dispatch (&cmd1, qos1); + + ACE_DEBUG ((LM_DEBUG, "Deadline of command2 is %d\n", + qos2.deadline_.sec ())); + disp->dispatch (&cmd2, qos2); + + ACE_DEBUG ((LM_DEBUG, "Deadline of command3 is %d\n", + qos3.deadline_.sec ())); + disp->dispatch (&cmd3, qos3); + + disp->activate (); + + disp->shutdown (); + + ACE_DEBUG ((LM_DEBUG, "after shutdown\n")); + + return 0; +} + +int parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("p:")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'p': + sched_policy_str = ACE_TEXT_ALWAYS_CHAR(get_opts.opt_arg ()); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s %s" + "\n", + argv [0], + "-p "), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} diff --git a/ACE/Kokyu/tests/FIFO/FIFO.mpc b/ACE/Kokyu/tests/FIFO/FIFO.mpc new file mode 100644 index 00000000000..b6afad4c977 --- /dev/null +++ b/ACE/Kokyu/tests/FIFO/FIFO.mpc @@ -0,0 +1,6 @@ +// -*- MPC -*- +// $Id$ + +project: kokyu { + exename = FIFO +} diff --git a/ACE/Kokyu/tests/FIFO/Makefile.am b/ACE/Kokyu/tests/FIFO/Makefile.am new file mode 100644 index 00000000000..9852af122e0 --- /dev/null +++ b/ACE/Kokyu/tests/FIFO/Makefile.am @@ -0,0 +1,35 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -include /home/jtc/ACE/ACE-config3/MPC/config -include /home/jtc/ACE/ACE-config3/MPC/templates -type automake ACE.mwc + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + +## Makefile.FIFO.am +noinst_PROGRAMS = FIFO + +FIFO_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -I$(ACE_ROOT)/Kokyu + +FIFO_SOURCES = \ + test.cpp + +FIFO_LDADD = \ + $(ACE_BUILDDIR)/Kokyu/libKokyu.la \ + $(ACE_BUILDDIR)/ace/libACE.la + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/Kokyu/tests/FIFO/README b/ACE/Kokyu/tests/FIFO/README new file mode 100644 index 00000000000..5700e4a5038 --- /dev/null +++ b/ACE/Kokyu/tests/FIFO/README @@ -0,0 +1,24 @@ +This example is a very simple example, showing how to use the Kokyu +dispatcher to dispatch command objects in a FIFO manner. The test +configures the Kokyu dispatcher with 3 FIFO lanes, each having a +different priority. Typically this would be done by an RMS scheduler +which assigns priorities to tasks based on the rate of tasks. To +ensure that the command objects enqueued in the dispatcher are +dispatched in the correct order, we enqueue the command objects and +*then* activate the dispatcher. Only when the dispatcher is activated, +the thread watching each dispatch queue starts running. + +To run this example, + +./test -p + +The following is the expected output + +Priority of command1 is 2 +Priority of command2 is 3 +Priority of command3 is 1 +command 3 executed +command 1 executed +command 2 executed + +Note that a lower number means a higher priority for the task. diff --git a/ACE/Kokyu/tests/FIFO/test.cpp b/ACE/Kokyu/tests/FIFO/test.cpp new file mode 100644 index 00000000000..deb25823a3b --- /dev/null +++ b/ACE/Kokyu/tests/FIFO/test.cpp @@ -0,0 +1,166 @@ +// $Id$ + +#include "ace/Auto_Ptr.h" + +#include "Kokyu.h" +#include "ace/Task.h" +#include "ace/SString.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_strings.h" + +ACE_CString sched_policy_str = "fifo"; + +int parse_args (int argc, ACE_TCHAR *argv[]); + +class MyCommand : public Kokyu::Dispatch_Command +{ +public: + MyCommand(int i) + :Kokyu::Dispatch_Command(1),id_(i) + { + } + int execute (); + +private: + int id_; +}; + +int MyCommand::execute() +{ + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + int prio; + + if (ACE_Thread::getprio (thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_DEBUG((LM_DEBUG, + ACE_TEXT ("getprior not supported on this platform\n") + )); + return 0; + } + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("getprio failed")), + -1); + } + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t|prio=%d) | command %d executed\n"), + prio, id_)); + return 0; +} + +int ACE_TMAIN (int argc, ACE_TCHAR** argv) +{ + Kokyu::ConfigInfoSet config_info(3); + + int hi_prio, me_prio, lo_prio; + int sched_policy=ACE_SCHED_FIFO; + + Kokyu::Dispatcher_Attributes attrs; + + if (parse_args (argc, argv) == -1) + return 0; + + if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "fifo") == 0) + { + sched_policy = ACE_SCHED_FIFO; + } + else if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "other") == 0) + { + sched_policy = ACE_SCHED_OTHER; + } + else if (ACE_OS::strcasecmp(sched_policy_str.c_str(), "rr") == 0) + { + sched_policy = ACE_SCHED_RR; + } + + attrs.sched_policy (sched_policy); + + hi_prio = ACE_Sched_Params::priority_max (sched_policy); + me_prio = ACE_Sched_Params::previous_priority (sched_policy, + hi_prio); + lo_prio = ACE_Sched_Params::previous_priority (sched_policy, + me_prio); + + config_info[0].preemption_priority_ = 1; + config_info[0].thread_priority_ = hi_prio ; + config_info[0].dispatching_type_ = Kokyu::FIFO_DISPATCHING; + + config_info[1].preemption_priority_ = 2; + config_info[1].thread_priority_ = me_prio; + config_info[1].dispatching_type_ = Kokyu::FIFO_DISPATCHING; + + config_info[2].preemption_priority_ = 3; + config_info[2].thread_priority_ = lo_prio; + config_info[2].dispatching_type_ = Kokyu::FIFO_DISPATCHING; + + attrs.config_info_set_ = config_info; + + ACE_DEBUG ((LM_DEBUG, "before create_dispatcher\n" )); + auto_ptr + disp (Kokyu::Dispatcher_Factory::create_dispatcher (attrs)); + + ACE_ASSERT (disp.get() != 0); + + MyCommand cmd1(1), cmd2(2), cmd3(3); + + Kokyu::QoSDescriptor qos1, qos2, qos3; + + qos1.preemption_priority_ = 2; + ACE_DEBUG ((LM_DEBUG, "Priority of command1 is %d\n", + qos1.preemption_priority_)); + + qos2.preemption_priority_ = 3; + ACE_DEBUG ((LM_DEBUG, "Priority of command2 is %d\n", + qos2.preemption_priority_)); + + qos3.preemption_priority_ = 1; + ACE_DEBUG ((LM_DEBUG, "Priority of command3 is %d\n", + qos3.preemption_priority_)); + + if (disp->dispatch (&cmd1, qos1) == -1 || + disp->dispatch (&cmd2, qos2) == -1 || + disp->dispatch (&cmd3, qos3) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "Error in dispatching command object\n"), -1); + + if (disp->activate () == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Error activating dispatcher. ") + ACE_TEXT ("You might not have superuser privileges ") + ACE_TEXT ("to run FIFO class. Try \"-p other\"\n")), -1); + } + + disp->shutdown (); + + ACE_DEBUG ((LM_DEBUG, "after shutdown\n")); + return 0; +} + +int parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("p:")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'p': + sched_policy_str = ACE_TEXT_ALWAYS_CHAR(get_opts.opt_arg ()); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s %s" + "\n", + argv [0], + "-p "), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} diff --git a/ACE/Kokyu/tests/Makefile.am b/ACE/Kokyu/tests/Makefile.am new file mode 100644 index 00000000000..39737061deb --- /dev/null +++ b/ACE/Kokyu/tests/Makefile.am @@ -0,0 +1,15 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -include /home/jtc/ACE/ACE-config3/MPC/config -include /home/jtc/ACE/ACE-config3/MPC/templates -type automake ACE.mwc + +SUBDIRS = \ + DSRT_MIF \ + EDF \ + FIFO + -- cgit v1.2.1