diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-06-16 16:09:36 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-06-16 16:09:36 +0000 |
commit | 772cdbff7e5d748c8621347626206ada9132aadb (patch) | |
tree | be9ffb0b3116c5a507c883b765ee0774967613f6 | |
parent | 8b8ec36db646e66b34648148b5406af94fd1d895 (diff) | |
download | ATCD-772cdbff7e5d748c8621347626206ada9132aadb.tar.gz |
*** empty log message ***
-rw-r--r-- | TAO/examples/RTScheduling/DT_Creator.cpp | 711 | ||||
-rw-r--r-- | TAO/examples/RTScheduling/DT_Creator.h | 66 | ||||
-rw-r--r-- | TAO/examples/RTScheduling/Job.idl | 11 | ||||
-rw-r--r-- | TAO/examples/RTScheduling/Job_i.cpp | 180 | ||||
-rw-r--r-- | TAO/examples/RTScheduling/Job_i.h | 76 |
5 files changed, 917 insertions, 127 deletions
diff --git a/TAO/examples/RTScheduling/DT_Creator.cpp b/TAO/examples/RTScheduling/DT_Creator.cpp index f98dbedc098..e58dfc2ee10 100644 --- a/TAO/examples/RTScheduling/DT_Creator.cpp +++ b/TAO/examples/RTScheduling/DT_Creator.cpp @@ -1,155 +1,557 @@ //$Id$ + #include "DT_Creator.h" -#include "ace/Arg_Shifter.h" #include "Thread_Task.h" #include "tao/ORB_Core.h" #include "Task_Stats.h" #include "ace/High_Res_Timer.h" #include "Task.h" +#include "DT_Creator.h" +#include "tao/RTScheduling/Current.h" + + +//long guid_counter; + +int +DT_Creator::dt_task_init (ACE_Arg_Shifter& arg_shifter) +{ + static int dt_index = 0; + int start_time; + int load; + int iter; + int importance; + char *job_name = 0; + int dist = 0; + const ACE_TCHAR* current_arg = 0; + if (arg_shifter.cur_arg_strncasecmp ("-Importance") == 0) + { + arg_shifter.consume_arg (); + current_arg = arg_shifter.get_current (); + importance = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + if ((current_arg = arg_shifter.get_the_parameter ("-Start_Time"))) + { + start_time = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + if ((current_arg = arg_shifter.get_the_parameter ("-Iter"))) + { + iter = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + if ((current_arg = arg_shifter.get_the_parameter ("-Load"))) + { + load = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + if ((current_arg = arg_shifter.get_the_parameter ("-JobName"))) + { + job_name = (char *)current_arg; + dist = 1; + arg_shifter.consume_arg (); + } + dt_list_ [dt_index++] = this->create_thr_task (importance, + start_time, + load, + iter, + dist, + job_name); + ACE_CHECK; + + return 0; + +} int log_index = 0; int DT_Creator::init (int argc, char *argv []) { + gsf_ = ACE_High_Res_Timer::global_scale_factor (); state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>; shutdown_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>; active_dt_count_ = 0; - ACE_NEW_RETURN (log, char*[BUFSIZ],-1); + active_job_count_ = 0; + ACE_NEW_RETURN (log, char*[BUFSIZ * 100],-1); ACE_Arg_Shifter arg_shifter (argc, argv); const ACE_TCHAR* current_arg = 0; - - dt_count_ = 0; - int dt_index = 0; - int start_time; - int load; - int importance; - int total_load = 0; + dt_count_ = 0; + poa_count_ = 0; + int poa_count = 0; + job_count_ = 0; + int job_count = 0; while (arg_shifter.is_anything_left ()) { - if ((current_arg = arg_shifter.get_the_parameter ("-DT_Count"))) + if ((current_arg = arg_shifter.get_the_parameter ("-GuidSeed"))) { - dt_count_ = ACE_OS::atoi (current_arg); + guid_counter = (long) ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-DT_Count"))) + { + dt_count_ = ACE_OS::atoi (current_arg); ACE_NEW_RETURN (dt_list_, Thread_Task*[dt_count_], -1); active_dt_count_ = dt_count_; arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-POA_Count"))) + { + poa_count_ = ACE_OS::atoi (current_arg); + ACE_NEW_RETURN (poa_list_, POA_Holder*[poa_count_], -1); + arg_shifter.consume_arg (); } - if (arg_shifter.cur_arg_strncasecmp ("-DT_Task") == 0) + else if ((current_arg = arg_shifter.get_the_parameter ("-JOB_Count"))) + { + job_count_ = ACE_OS::atoi (current_arg); + active_job_count_ = job_count_; + ACE_NEW_RETURN (job_list_, Job_i*[job_count_], -1); + arg_shifter.consume_arg (); + } + else if (arg_shifter.cur_arg_strncasecmp ("-DT_Task") == 0) + { + arg_shifter.consume_arg (); + dt_task_init (arg_shifter + ACE_ENV_ARG_PARAMETER); + } + else if (arg_shifter.cur_arg_strncasecmp ("-POA") == 0) { arg_shifter.consume_arg (); - - if (arg_shifter.cur_arg_strncasecmp ("-Importance") == 0) - { - arg_shifter.consume_arg (); - current_arg = arg_shifter.get_current (); - importance = ACE_OS::atoi (current_arg); - arg_shifter.consume_arg (); - } - if ((current_arg = arg_shifter.get_the_parameter ("-Start_Time"))) - { - start_time = ACE_OS::atoi (current_arg); - arg_shifter.consume_arg (); - } - if ((current_arg = arg_shifter.get_the_parameter ("-Load"))) - { - load = ACE_OS::atoi (current_arg); - total_load += load; - arg_shifter.consume_arg (); - } - dt_list_ [dt_index++] = this->create_thr_task (importance, - start_time, - load); + POA_Holder *poa_holder; + + ACE_NEW_RETURN (poa_holder, POA_Holder (), -1); + + if (poa_holder->init (arg_shifter) == -1) + { + delete poa_holder; + return -1; + } + + this->poa_list_[poa_count++] = poa_holder; + } + else if (arg_shifter.cur_arg_strncasecmp ("-Job") == 0) + { + arg_shifter.consume_arg (); + + Job_i *job; + + ACE_NEW_RETURN (job, Job_i (this), -1); + + if (job->init (arg_shifter) == -1) + return -1; + + this->job_list_[job_count++] = job; + + } + else if ((current_arg = arg_shifter.get_the_parameter ("-OutFile"))) + { + file_name_ = CORBA::string_dup (current_arg); + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter ("-LogFile"))) + { + log_file_name_ = CORBA::string_dup (current_arg); + arg_shifter.consume_arg (); } + else + { + arg_shifter.ignore_arg (); + } } - - TASK_STATS::instance ()->init (total_load); - + return 0; - } -void -DT_Creator::create_distributable_threads (CORBA::ORB_ptr orb, - RTScheduling::Current_ptr current - ACE_ENV_ARG_DECL) +void +DT_Creator::register_synch_obj (ACE_ENV_SINGLE_ARG_DECL) { - ACE_NEW (barrier_, - ACE_Barrier (this->dt_count_ + 1)); - - orb_ = CORBA::ORB::_duplicate (orb); - - current_ = RTScheduling::Current::_duplicate (current); - - long flags; - flags = THR_NEW_LWP | THR_JOINABLE; - flags |= - orb_->orb_core ()->orb_params ()->scope_policy () | - orb_->orb_core ()->orb_params ()->sched_policy (); - - CORBA::Policy_var sched_param; - sched_param = CORBA::Policy::_duplicate (this->sched_param (100)); - const char * name = 0; - CORBA::Policy_ptr implicit_sched_param = 0; - current_->begin_scheduling_segment (name, - sched_param.in (), - implicit_sched_param - ACE_ENV_ARG_PARAMETER); + CosNaming::Name name (1); + name.length (1); + + CosNaming::NamingContext_var synch_context; + + ACE_TRY + { + // Try binding the sender context in the NS + name [0].id = + CORBA::string_dup ("Synch"); + + synch_context = this->naming_->bind_new_context (name + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // + // We reach here if there was no exception raised in + // <bind_new_context>. We then create a receiver context. + // + + } + ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex) + { + // + // The synch context already exists, probably created by the + // receiver(s). + // + + // Get the synch context. + name [0].id = + CORBA::string_dup ("Synch"); + + CORBA::Object_var object = + this->naming_->resolve (name + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + synch_context = CosNaming::NamingContext::_narrow (object.in ()); + + } + ACE_ENDTRY; + ACE_CHECK; + + ACE_CString synch_name ("Synch"); + ACE_Time_Value timestamp = ACE_OS::gettimeofday (); + + char buf [BUFSIZ]; + synch_name += CORBA::string_dup (ACE_OS::itoa (timestamp.sec (), buf, 10)); + + name [0].id = + CORBA::string_dup (synch_name.c_str ()); + + ACE_DEBUG ((LM_DEBUG, + "Synch Name %s\n", + synch_name.c_str ())); + + ACE_NEW (synch_, + Synch_i); + + Synch_var synch = synch_->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + // Register the synch object with the Synch context. + synch_context->rebind (name, + synch.in () + ACE_ENV_ARG_PARAMETER); ACE_CHECK; - - ACE_NEW (base_time_, - ACE_Time_Value (ACE_OS::gettimeofday ())); - - for (int i = 0; i < this->dt_count_; i++) + +} + + +int +DT_Creator::activate_root_poa (ACE_ENV_SINGLE_ARG_PARAMETER) +{ + CORBA::Object_var object = + orb_->resolve_initial_references ("RootPOA" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + root_poa_ = + PortableServer::POA::_narrow (object.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + PortableServer::POAManager_var poa_manager = + root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + return 0; +} + +void +DT_Creator::activate_poa_list (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "DT_Creator::activate_poa_list\n")); + + if (poa_count_ > 0) { - ACE_Time_Value now (ACE_OS::gettimeofday ()); - - ACE_Time_Value elapsed_time = now - *base_time_; - - char buf [BUFSIZ]; - ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n", - (int) elapsed_time.sec (), - (int) now.sec (), - (int) base_time_->sec()); - - log [log_index++] = ACE_OS::strdup (buf) ; - - ACE_hthread_t curr_thr; - ACE_Thread::self (curr_thr); - - if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ())) - { - int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec (); - ACE_OS::sprintf (buf,"suspension_tome = %d\n", - suspension_time); - log [log_index++] = ACE_OS::strdup (buf); - yield (suspension_time, - dt_list_[i]); - } - - sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ())); - dt_list_ [i]->activate_task (current, - sched_param.in (), - flags, - base_time_, - barrier_ - ACE_ENV_ARG_PARAMETER); + + CORBA::Object_var object = + orb_->resolve_initial_references ("RTORB" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + this->rt_orb_ = + RTCORBA::RTORB::_narrow (object.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + + } + + for (int i = 0; i < poa_count_; ++i) + { + poa_list_[i]->activate (this->rt_orb_.in(), this->root_poa_.in () + ACE_ENV_ARG_PARAMETER); ACE_CHECK; - } - - - while (active_dt_count_ > 0) +} + +void +DT_Creator::activate_job_list (ACE_ENV_SINGLE_ARG_DECL) +{ + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "DT_Creator::activate_job_list\n")); + + Job_i* job; + + for (int i = 0; i < job_count_; ++i) + { + job = job_list_[i]; + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "Activating job:%s\n", job->name ().c_str ())); + + // find your poa + PortableServer::POA_var host_poa = + root_poa_->find_POA (job->poa ().c_str (), 0 + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + PortableServer::ServantBase_var servant_var (job); + + // Register with poa. + PortableServer::ObjectId_var id; + + id = host_poa->activate_object (job + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + CORBA::Object_var server = + host_poa->id_to_reference (id.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + CORBA::String_var ior = + orb_->object_to_string (server.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + const ACE_CString &job_name = job->name (); + + CosNaming::Name_var name = + this->naming_->to_name (job_name.c_str () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->naming_->rebind (name.in (), + server.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG ((LM_DEBUG, + "Activated Job List\n")); + +// ACE_NEW (base_time_, +// ACE_Time_Value (ACE_OS::gettimeofday ())); + + //base_hr_time_ = ACE_OS::gethrtime (); + + // active_job_count_++; + + } /* while */ + +} + +void +DT_Creator::activate_schedule (ACE_ENV_SINGLE_ARG_DECL) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Activating schedule, task count = %d\n", + dt_count_)); + + Thread_Task* task; + + for (int i = 0; i < dt_count_; ++i) { - yield (1,0); + task = dt_list_[i]; + + if (task->dist ()) + { + // resolve the object from the naming service + CosNaming::Name name (1); + name.length (1); + name[0].id = CORBA::string_dup (task->job ()); + + CORBA::Object_var obj = + this->naming_->resolve (name ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + Job_var job = Job::_narrow (obj.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // if (TAO_debug_level > 0) + // { + // Check that the object is configured with some + // PriorityModelPolicy. + CORBA::Policy_var policy = + job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + RTCORBA::PriorityModelPolicy_var priority_policy = + RTCORBA::PriorityModelPolicy::_narrow (policy.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (CORBA::is_nil (priority_policy.in ())) + ACE_DEBUG ((LM_DEBUG, + "ERROR: Priority Model Policy not exposed!\n")); + else + { + /* + RTCORBA::PriorityModel priority_model = + priority_policy->priority_model (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + if (priority_model == RTCORBA::CLIENT_PROPAGATED) + ACE_DEBUG ((LM_DEBUG, + "%s priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ())); + else + ACE_DEBUG ((LM_DEBUG, + "%s priority_model = RTCORBA::SERVER_DECLARED\n", task->job ())); + */ + } + //} /* if (TAO_debug_level > 0) */ + + task->job (job.in ()); + } } - current_->end_scheduling_segment (name - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - + if (TAO_debug_level > 0 && dt_count_ > 0) + ACE_DEBUG ((LM_DEBUG, + "Activated schedule, task count = %d\n", + dt_count_)); + +} + +int +DT_Creator::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL) +{ + CORBA::Object_var naming_obj = + this->orb_->resolve_initial_references ("NameService" + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + // Need to check return value for errors. + if (CORBA::is_nil (naming_obj.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Unable to resolve the Naming Service.\n"), + -1); + + this->naming_ = + CosNaming::NamingContextExt::_narrow (naming_obj.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + + //@@tmp hack, otherwise crashes on exit!..?? + CosNaming::NamingContextExt::_duplicate (this->naming_.in()); + return 0; +} + +void +DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current + ACE_ENV_ARG_DECL) +{ + + // if (dt_count_ > 0) + // { + // ACE_NEW (barrier_, + // ACE_Barrier (this->dt_count_ + 1)); + + + current_ = RTScheduling::Current::_duplicate (current); + + long flags; + flags = THR_NEW_LWP | THR_JOINABLE; + flags |= + orb_->orb_core ()->orb_params ()->scope_policy () | + orb_->orb_core ()->orb_params ()->sched_policy (); + + CORBA::Policy_var sched_param; + sched_param = CORBA::Policy::_duplicate (this->sched_param (100)); + const char * name = 0; + //CORBA::Policy_ptr implicit_sched_param = sched_param.in (); + current_->begin_scheduling_segment (name, + sched_param.in (), + sched_param.in () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + while (!this->synch ()->synched ()) + { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "Waiting to Synch\n")); + + this->orb_->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + ACE_NEW (base_time_, + ACE_Time_Value (*(this->synch ()->base_time ()))); + + base_hr_time_ = ACE_OS::gethrtime (); + + for (int i = 0; i < this->dt_count_; i++) + { + ACE_Time_Value now (ACE_OS::gettimeofday ()); + + ACE_Time_Value elapsed_time = now - *base_time_; + + char buf [BUFSIZ]; + ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n", + (int) elapsed_time.sec (), + (int) now.sec (), + (int) base_time_->sec()); + + log [log_index++] = ACE_OS::strdup (buf) ; + + ACE_hthread_t curr_thr; + ACE_Thread::self (curr_thr); + + if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ())) + { + int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec (); + ACE_OS::sprintf (buf,"suspension_tome = %d\n", + suspension_time); + log [log_index++] = ACE_OS::strdup (buf); + yield (suspension_time, + dt_list_[i]); + } + + sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ())); + dt_list_ [i]->activate_task (current, + sched_param.in (), + flags, + base_time_, + base_hr_time_, + barrier_ + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + } + + /* + while (active_dt_count_ > 0 || active_job_count_ > 0) + { + yield(1,0); + } + */ + + current_->end_scheduling_segment (name + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + // } } void @@ -160,51 +562,81 @@ DT_Creator::dt_ended (void) --active_dt_count_; //ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_dt_count_)); char buf [BUFSIZ]; - ACE_OS::sprintf (buf,"Active job count = %d\n",active_dt_count_); + ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_); log [log_index++] = ACE_OS::strdup (buf); } this->check_ifexit (); } void +DT_Creator::job_ended (void) +{ + { + ACE_GUARD (ACE_Lock, ace_mon, *state_lock_); + --active_job_count_; + char buf [BUFSIZ]; + ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_); + log [log_index++] = ACE_OS::strdup (buf); + } + + this->check_ifexit (); +} + +void DT_Creator::check_ifexit (void) { static int shutdown = 0; { - ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_); + ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_); if (!shutdown) { // All tasks have finished and all jobs have been shutdown. - if (active_dt_count_ == 0) + if (active_dt_count_ == 0 && active_job_count_ == 0) { - + ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n")); - - TASK_STATS::instance ()->dump_samples ("schedule.dat", + + /* + for (int i = 0; i < dt_count_; i++) + { + dt_list_[i]->dump_stats (); + } + + for (int i = 0; i < job_count_; i ++) + { + job_list_[i]->dump_stats (); + } + */ + TASK_STATS::instance ()->dump_samples (file_name_, "#Schedule Output", ACE_High_Res_Timer::global_scale_factor ()); shutdown = 1; - - FILE* log_file = ACE_OS::fopen ("log_file", "w"); + + FILE* log_file = ACE_OS::fopen (log_file_name_, "w"); if (log_file != NULL) { // first dump what the caller has to say. ACE_OS::fprintf (log_file, "Log File\n"); - + for (int i = 0; i < log_index; i++) { ACE_OS::fprintf (log_file, "%s\n", log [i]); } - + ACE_OS::fclose (log_file); } + ACE_DEBUG ((LM_DEBUG, + "Log File Ready\n")); + + if (!orb_->work_pending ()) + { + orb_->destroy (); + } - // shutdown the ORB - orb_->shutdown (0); } } } @@ -233,3 +665,40 @@ DT_Creator::orb (void) { return this->orb_.in (); } + +void +DT_Creator::orb (CORBA::ORB_ptr orb) +{ + this->orb_ = CORBA::ORB::_duplicate (orb); +} + +ACE_Time_Value* +DT_Creator::base_time (void) +{ + return this->base_time_; +} + +void +DT_Creator::base_time (ACE_Time_Value* base_time) +{ + this->base_time_ = base_time; +} + +ACE_hrtime_t +DT_Creator::base_hr_time (void) +{ + return this->base_hr_time_; +} + +RTScheduling::Current_ptr +DT_Creator::current (void) +{ + return current_.in (); +} + + +Synch_i* +DT_Creator::synch (void) +{ + return this->synch_; +} diff --git a/TAO/examples/RTScheduling/DT_Creator.h b/TAO/examples/RTScheduling/DT_Creator.h index 8cf276d1446..9528eb51718 100644 --- a/TAO/examples/RTScheduling/DT_Creator.h +++ b/TAO/examples/RTScheduling/DT_Creator.h @@ -2,15 +2,22 @@ #ifndef DT_CREATOR_H #define DT_CREATOR_H +#include "orbsvcs/orbsvcs/CosNamingC.h" #include "ace/Service_Config.h" #include "ace/Service_Object.h" #include "tao/RTScheduling/RTScheduler.h" - +#include "ace/Arg_Shifter.h" +#include "POA_Holder.h" +#include "Job_i.h" +#include "Synch_i.h" class Thread_Task; class Task; + typedef Thread_Task **DT_LIST; +typedef POA_Holder **POA_LIST; +typedef Job_i **JOB_LIST; class DT_Creator : public ACE_Service_Object { @@ -19,11 +26,17 @@ class DT_Creator : public ACE_Service_Object ~DT_Creator (void); int init (int argc, char *argv []); - - virtual void create_distributable_threads (CORBA::ORB_ptr orb, - RTScheduling::Current_ptr current + + int dt_task_init (ACE_Arg_Shifter& arg_shifter + ACE_ENV_ARG_DECL_WITH_DEFAULTS); + + virtual void create_distributable_threads (RTScheduling::Current_ptr current ACE_ENV_ARG_DECL_WITH_DEFAULTS); - + + void activate_poa_list (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + void activate_job_list (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + void activate_schedule (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS); + virtual void yield (int suspend_time, Thread_Task* task) = 0; @@ -31,32 +44,73 @@ class DT_Creator : public ACE_Service_Object virtual Thread_Task* create_thr_task (int importance, int start_time, - int load) = 0; + int load, + int iter, + int dist, + char *job_name) = 0; // virtual Task* task (void) = 0; + + /// Resolve the naming service. + int resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL); int dt_count (void); void dt_ended (void); + void job_ended (void); void check_ifexit (void); void log_msg (char* msg); + void orb (CORBA::ORB_ptr); CORBA::ORB_ptr orb (void); + ACE_Time_Value* base_time (void); + void base_time (ACE_Time_Value*); + + ACE_hrtime_t base_hr_time (void); + + virtual int total_load (void) = 0; + + RTScheduling::Current_ptr current (void); + + Synch_i* synch (void); + + void register_synch_obj (ACE_ENV_SINGLE_ARG_PARAMETER); + + int activate_root_poa (ACE_ENV_SINGLE_ARG_PARAMETER); + protected: + + DT_LIST dt_list_; + POA_LIST poa_list_; + JOB_LIST job_list_; //Fixed_Priority_Scheduler* scheduler_; ACE_Barrier* barrier_; int dt_count_; + int poa_count_; + int job_count_; CORBA::ORB_var orb_; /// Mutex to serialize access to our internal state. ACE_Lock* state_lock_; ACE_Lock* shutdown_lock_; int active_dt_count_; + int active_job_count_; char** log; ACE_Time_Value* base_time_; + ACE_hrtime_t base_hr_time_; RTScheduling::Current_var current_; + /// RT ORB + RTCORBA::RTORB_var rt_orb_; + /// Reference to the root poa. + PortableServer::POA_var root_poa_; + /// A naming context. + CosNaming::NamingContextExt_var naming_; + char* file_name_; + char* log_file_name_; + ACE_UINT32 gsf_; + Synch_i* synch_; }; diff --git a/TAO/examples/RTScheduling/Job.idl b/TAO/examples/RTScheduling/Job.idl new file mode 100644 index 00000000000..f1ba4b5cff2 --- /dev/null +++ b/TAO/examples/RTScheduling/Job.idl @@ -0,0 +1,11 @@ +// +// $Id$ +// + +interface Job +{ + void work (in unsigned long work, + in short importance); + + void shutdown (); +}; diff --git a/TAO/examples/RTScheduling/Job_i.cpp b/TAO/examples/RTScheduling/Job_i.cpp new file mode 100644 index 00000000000..1ae94e786e0 --- /dev/null +++ b/TAO/examples/RTScheduling/Job_i.cpp @@ -0,0 +1,180 @@ +//$Id$ +#include "Job_i.h" + +#include "tao/debug.h" +#include "ace/Arg_Shifter.h" +#include "DT_Creator.h" +#include "Task_Stats.h" +#include "ace/High_Res_Timer.h" + +//#include "Activity.h" + +Job_i::Job_i (DT_Creator* dt_creator) + : dt_creator_ (dt_creator), + guid_ (0) +{ + // create the stat object. + ACE_NEW (task_stats_, Task_Stats); + task_stats_->init (100); + +} + +const ACE_CString& +Job_i::name (void) +{ + return job_name_; +} + +const ACE_CString& +Job_i::poa (void) +{ + return POA_name_; +} + +int +Job_i::init (ACE_Arg_Shifter& arg_shifter) +{ + job_name_ = arg_shifter.get_current (); // Read the name of the Job + arg_shifter.consume_arg (); + + POA_name_ = arg_shifter.get_current (); // Read the name of the POA + arg_shifter.consume_arg (); + + return 0; +} + +void +Job_i::work (CORBA::ULong work, + CORBA::Short importance + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + + int priority; + ACE_hthread_t current; + ACE_Thread::self (current); + if (ACE_Thread::getprio (current, priority) == -1) + return; + + + static CORBA::ULong prime_number = 9619; + // ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor (); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "test_i::method: %d units of work\n", + work)); + + if (guid_ == 0) + ACE_OS::memcpy (&guid_, + dt_creator_->current ()->id (ACE_ENV_SINGLE_ARG_PARAMETER)->get_buffer (), + sizeof (dt_creator_->current ()->id (ACE_ENV_SINGLE_ARG_PARAMETER)->length ())); + + + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "%t Guid is %d, Importance is %d\n", + guid_, + importance)); + + char msg [BUFSIZ]; + ACE_OS::sprintf (msg,"Thread Priority is %d, Guid is %d\n",priority, guid_); + dt_creator_->log_msg (msg); + + // CORBA::Policy_ptr sched_param; + // CORBA::Policy_var sched_param; + // sched_param = CORBA::Policy::_duplicate (dt_creator_->sched_param (importance)); + /* + const char * name = 0; + CORBA::Policy_ptr implicit_sched_param = 0; + dt_creator_->current ()->update_scheduling_segment (name, + sched_param, + sched_param + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + */ + + for (; work != 0; work--) + { + // ACE_hrtime_t now = ACE_OS::gethrtime (); + + ACE_Time_Value run_time = ACE_OS::gettimeofday () - *(dt_creator_->base_time ()); + TASK_STATS::instance ()->sample (ACE_UINT64 (run_time.sec ()), + guid_); + + ACE_Time_Value count_down_time (1); + ACE_Countdown_Time count_down (&count_down_time); + + while (count_down_time > ACE_Time_Value::zero) + { + ACE::is_prime (prime_number, + 2, + prime_number / 2); + count_down.update (); + } + + // convert to microseconds + // #if !defined ACE_LACKS_LONGLONG_T + + // ACE_UINT32 elapsed_microseconds = ACE_UINT32((now - dt_creator_->base_hr_time ()) / gsf); + + // #else /* ! ACE_LACKS_LONGLONG_T */ + + // ACE_UINT32 elapsed_microseconds = (now - dt_creator_->base_hr_time ()) / gsf; + + // #endif /* ! ACE_LACKS_LONGLONG_T */ + + // #if defined (ACE_WIN32) + // elapsed_microseconds*=1000; // convert to uSec on Win32 + // #endif /* ACE_WIN32 */ + + // TASK_STATS::instance ()->sample (elapsed_microseconds, + // guid_); + + + } + + // task_stats_->sample (ACE_UINT64 (run_time.sec ()), + // guid_); + +} + +void +Job_i::post_work (int /*guid*/, + int /*importance*/) +{ +} + +int +Job_i::guid (void) +{ + return this->guid_; +} + +void +Job_i::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + dt_creator_->job_ended (); +} + +void +Job_i::dump_stats (void) +{ + char fname [BUFSIZ]; + ACE_OS::sprintf (fname, "Job_%d.dat",guid_); + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "File name %s\n", + fname)); + + + char msg [BUFSIZ]; + ACE_OS::sprintf (msg, "#Schedule Output for DT %d", guid_); + + task_stats_->dump_samples (fname, + msg, + ACE_High_Res_Timer::global_scale_factor ()); + +} diff --git a/TAO/examples/RTScheduling/Job_i.h b/TAO/examples/RTScheduling/Job_i.h new file mode 100644 index 00000000000..940cd1b9b51 --- /dev/null +++ b/TAO/examples/RTScheduling/Job_i.h @@ -0,0 +1,76 @@ +/* -*- C++ -*- */ +//============================================================================= +/** + * @file Job_i.h + * + * $Id$ + * + * This file defines the servant for the Job.idl interface + * + * @author Pradeep Gore <pradeep@cs.wustl.edu> + */ +//============================================================================= +#ifndef JOB_I_H +#define JOB_I_H + +#include "JobS.h" +#include "Task_Stats.h" + +class ACE_Arg_Shifter; +class DT_Creator; + +/** + * @class Job_i + * + * @brief Implements a Job that performs some cpu bound work. + * + */ +class Job_i : public POA_Job, public virtual PortableServer::RefCountServantBase +{ + public: + /// Constructor + Job_i (DT_Creator* dt_creator); + + + /// Init the state of this object. + int init (ACE_Arg_Shifter& arg_shifter); + + /// = Accessors + const ACE_CString& name (void); + const ACE_CString& poa (void); + + /// = inteface Job method implementation. + virtual void work (CORBA::ULong work, + CORBA::Short importance + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + virtual void post_work (int guid, + int importance); + + void dump_stats (void); + + virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC (( + CORBA::SystemException + )); + + int guid (void); + + protected: + /// The name of this Job + ACE_CString job_name_; + + /// The name of the POA that we live in. + ACE_CString POA_name_; + + DT_Creator* dt_creator_; + + Task_Stats *task_stats_; + + int guid_; +}; + +#endif /* JOB_I_H */ |