summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-06-16 16:09:36 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-06-16 16:09:36 +0000
commit772cdbff7e5d748c8621347626206ada9132aadb (patch)
treebe9ffb0b3116c5a507c883b765ee0774967613f6
parent8b8ec36db646e66b34648148b5406af94fd1d895 (diff)
downloadATCD-772cdbff7e5d748c8621347626206ada9132aadb.tar.gz
*** empty log message ***
-rw-r--r--TAO/examples/RTScheduling/DT_Creator.cpp711
-rw-r--r--TAO/examples/RTScheduling/DT_Creator.h66
-rw-r--r--TAO/examples/RTScheduling/Job.idl11
-rw-r--r--TAO/examples/RTScheduling/Job_i.cpp180
-rw-r--r--TAO/examples/RTScheduling/Job_i.h76
5 files changed, 917 insertions, 127 deletions
diff --git a/TAO/examples/RTScheduling/DT_Creator.cpp b/TAO/examples/RTScheduling/DT_Creator.cpp
index f98dbedc098..e58dfc2ee10 100644
--- a/TAO/examples/RTScheduling/DT_Creator.cpp
+++ b/TAO/examples/RTScheduling/DT_Creator.cpp
@@ -1,155 +1,557 @@
//$Id$
+
#include "DT_Creator.h"
-#include "ace/Arg_Shifter.h"
#include "Thread_Task.h"
#include "tao/ORB_Core.h"
#include "Task_Stats.h"
#include "ace/High_Res_Timer.h"
#include "Task.h"
+#include "DT_Creator.h"
+#include "tao/RTScheduling/Current.h"
+
+
+//long guid_counter;
+
+int
+DT_Creator::dt_task_init (ACE_Arg_Shifter& arg_shifter)
+{
+ static int dt_index = 0;
+ int start_time;
+ int load;
+ int iter;
+ int importance;
+ char *job_name = 0;
+ int dist = 0;
+ const ACE_TCHAR* current_arg = 0;
+ if (arg_shifter.cur_arg_strncasecmp ("-Importance") == 0)
+ {
+ arg_shifter.consume_arg ();
+ current_arg = arg_shifter.get_current ();
+ importance = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ if ((current_arg = arg_shifter.get_the_parameter ("-Start_Time")))
+ {
+ start_time = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ if ((current_arg = arg_shifter.get_the_parameter ("-Iter")))
+ {
+ iter = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ if ((current_arg = arg_shifter.get_the_parameter ("-Load")))
+ {
+ load = ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ if ((current_arg = arg_shifter.get_the_parameter ("-JobName")))
+ {
+ job_name = (char *)current_arg;
+ dist = 1;
+ arg_shifter.consume_arg ();
+ }
+ dt_list_ [dt_index++] = this->create_thr_task (importance,
+ start_time,
+ load,
+ iter,
+ dist,
+ job_name);
+ ACE_CHECK;
+
+ return 0;
+
+}
int log_index = 0;
int
DT_Creator::init (int argc, char *argv [])
{
+ gsf_ = ACE_High_Res_Timer::global_scale_factor ();
state_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
shutdown_lock_ = new ACE_Lock_Adapter <TAO_SYNCH_MUTEX>;
active_dt_count_ = 0;
- ACE_NEW_RETURN (log, char*[BUFSIZ],-1);
+ active_job_count_ = 0;
+ ACE_NEW_RETURN (log, char*[BUFSIZ * 100],-1);
ACE_Arg_Shifter arg_shifter (argc, argv);
const ACE_TCHAR* current_arg = 0;
-
- dt_count_ = 0;
- int dt_index = 0;
- int start_time;
- int load;
- int importance;
- int total_load = 0;
+ dt_count_ = 0;
+ poa_count_ = 0;
+ int poa_count = 0;
+ job_count_ = 0;
+ int job_count = 0;
while (arg_shifter.is_anything_left ())
{
- if ((current_arg = arg_shifter.get_the_parameter ("-DT_Count")))
+ if ((current_arg = arg_shifter.get_the_parameter ("-GuidSeed")))
{
- dt_count_ = ACE_OS::atoi (current_arg);
+ guid_counter = (long) ACE_OS::atoi (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-DT_Count")))
+ {
+ dt_count_ = ACE_OS::atoi (current_arg);
ACE_NEW_RETURN (dt_list_, Thread_Task*[dt_count_], -1);
active_dt_count_ = dt_count_;
arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-POA_Count")))
+ {
+ poa_count_ = ACE_OS::atoi (current_arg);
+ ACE_NEW_RETURN (poa_list_, POA_Holder*[poa_count_], -1);
+ arg_shifter.consume_arg ();
}
- if (arg_shifter.cur_arg_strncasecmp ("-DT_Task") == 0)
+ else if ((current_arg = arg_shifter.get_the_parameter ("-JOB_Count")))
+ {
+ job_count_ = ACE_OS::atoi (current_arg);
+ active_job_count_ = job_count_;
+ ACE_NEW_RETURN (job_list_, Job_i*[job_count_], -1);
+ arg_shifter.consume_arg ();
+ }
+ else if (arg_shifter.cur_arg_strncasecmp ("-DT_Task") == 0)
+ {
+ arg_shifter.consume_arg ();
+ dt_task_init (arg_shifter
+ ACE_ENV_ARG_PARAMETER);
+ }
+ else if (arg_shifter.cur_arg_strncasecmp ("-POA") == 0)
{
arg_shifter.consume_arg ();
-
- if (arg_shifter.cur_arg_strncasecmp ("-Importance") == 0)
- {
- arg_shifter.consume_arg ();
- current_arg = arg_shifter.get_current ();
- importance = ACE_OS::atoi (current_arg);
- arg_shifter.consume_arg ();
- }
- if ((current_arg = arg_shifter.get_the_parameter ("-Start_Time")))
- {
- start_time = ACE_OS::atoi (current_arg);
- arg_shifter.consume_arg ();
- }
- if ((current_arg = arg_shifter.get_the_parameter ("-Load")))
- {
- load = ACE_OS::atoi (current_arg);
- total_load += load;
- arg_shifter.consume_arg ();
- }
- dt_list_ [dt_index++] = this->create_thr_task (importance,
- start_time,
- load);
+ POA_Holder *poa_holder;
+
+ ACE_NEW_RETURN (poa_holder, POA_Holder (), -1);
+
+ if (poa_holder->init (arg_shifter) == -1)
+ {
+ delete poa_holder;
+ return -1;
+ }
+
+ this->poa_list_[poa_count++] = poa_holder;
+ }
+ else if (arg_shifter.cur_arg_strncasecmp ("-Job") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ Job_i *job;
+
+ ACE_NEW_RETURN (job, Job_i (this), -1);
+
+ if (job->init (arg_shifter) == -1)
+ return -1;
+
+ this->job_list_[job_count++] = job;
+
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-OutFile")))
+ {
+ file_name_ = CORBA::string_dup (current_arg);
+ arg_shifter.consume_arg ();
+ }
+ else if ((current_arg = arg_shifter.get_the_parameter ("-LogFile")))
+ {
+ log_file_name_ = CORBA::string_dup (current_arg);
+ arg_shifter.consume_arg ();
}
+ else
+ {
+ arg_shifter.ignore_arg ();
+ }
}
-
- TASK_STATS::instance ()->init (total_load);
-
+
return 0;
-
}
-void
-DT_Creator::create_distributable_threads (CORBA::ORB_ptr orb,
- RTScheduling::Current_ptr current
- ACE_ENV_ARG_DECL)
+void
+DT_Creator::register_synch_obj (ACE_ENV_SINGLE_ARG_DECL)
{
- ACE_NEW (barrier_,
- ACE_Barrier (this->dt_count_ + 1));
-
- orb_ = CORBA::ORB::_duplicate (orb);
-
- current_ = RTScheduling::Current::_duplicate (current);
-
- long flags;
- flags = THR_NEW_LWP | THR_JOINABLE;
- flags |=
- orb_->orb_core ()->orb_params ()->scope_policy () |
- orb_->orb_core ()->orb_params ()->sched_policy ();
-
- CORBA::Policy_var sched_param;
- sched_param = CORBA::Policy::_duplicate (this->sched_param (100));
- const char * name = 0;
- CORBA::Policy_ptr implicit_sched_param = 0;
- current_->begin_scheduling_segment (name,
- sched_param.in (),
- implicit_sched_param
- ACE_ENV_ARG_PARAMETER);
+ CosNaming::Name name (1);
+ name.length (1);
+
+ CosNaming::NamingContext_var synch_context;
+
+ ACE_TRY
+ {
+ // Try binding the sender context in the NS
+ name [0].id =
+ CORBA::string_dup ("Synch");
+
+ synch_context = this->naming_->bind_new_context (name
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ //
+ // We reach here if there was no exception raised in
+ // <bind_new_context>. We then create a receiver context.
+ //
+
+ }
+ ACE_CATCH (CosNaming::NamingContext::AlreadyBound, al_ex)
+ {
+ //
+ // The synch context already exists, probably created by the
+ // receiver(s).
+ //
+
+ // Get the synch context.
+ name [0].id =
+ CORBA::string_dup ("Synch");
+
+ CORBA::Object_var object =
+ this->naming_->resolve (name
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ synch_context = CosNaming::NamingContext::_narrow (object.in ());
+
+ }
+ ACE_ENDTRY;
+ ACE_CHECK;
+
+ ACE_CString synch_name ("Synch");
+ ACE_Time_Value timestamp = ACE_OS::gettimeofday ();
+
+ char buf [BUFSIZ];
+ synch_name += CORBA::string_dup (ACE_OS::itoa (timestamp.sec (), buf, 10));
+
+ name [0].id =
+ CORBA::string_dup (synch_name.c_str ());
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Synch Name %s\n",
+ synch_name.c_str ()));
+
+ ACE_NEW (synch_,
+ Synch_i);
+
+ Synch_var synch = synch_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // Register the synch object with the Synch context.
+ synch_context->rebind (name,
+ synch.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
-
- ACE_NEW (base_time_,
- ACE_Time_Value (ACE_OS::gettimeofday ()));
-
- for (int i = 0; i < this->dt_count_; i++)
+
+}
+
+
+int
+DT_Creator::activate_root_poa (ACE_ENV_SINGLE_ARG_PARAMETER)
+{
+ CORBA::Object_var object =
+ orb_->resolve_initial_references ("RootPOA"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ root_poa_ =
+ PortableServer::POA::_narrow (object.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ PortableServer::POAManager_var poa_manager =
+ root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ return 0;
+}
+
+void
+DT_Creator::activate_poa_list (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "DT_Creator::activate_poa_list\n"));
+
+ if (poa_count_ > 0)
{
- ACE_Time_Value now (ACE_OS::gettimeofday ());
-
- ACE_Time_Value elapsed_time = now - *base_time_;
-
- char buf [BUFSIZ];
- ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
- (int) elapsed_time.sec (),
- (int) now.sec (),
- (int) base_time_->sec());
-
- log [log_index++] = ACE_OS::strdup (buf) ;
-
- ACE_hthread_t curr_thr;
- ACE_Thread::self (curr_thr);
-
- if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ()))
- {
- int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
- ACE_OS::sprintf (buf,"suspension_tome = %d\n",
- suspension_time);
- log [log_index++] = ACE_OS::strdup (buf);
- yield (suspension_time,
- dt_list_[i]);
- }
-
- sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ()));
- dt_list_ [i]->activate_task (current,
- sched_param.in (),
- flags,
- base_time_,
- barrier_
- ACE_ENV_ARG_PARAMETER);
+
+ CORBA::Object_var object =
+ orb_->resolve_initial_references ("RTORB"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ this->rt_orb_ =
+ RTCORBA::RTORB::_narrow (object.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+
+ }
+
+ for (int i = 0; i < poa_count_; ++i)
+ {
+ poa_list_[i]->activate (this->rt_orb_.in(), this->root_poa_.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
-
}
-
-
- while (active_dt_count_ > 0)
+}
+
+void
+DT_Creator::activate_job_list (ACE_ENV_SINGLE_ARG_DECL)
+{
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "DT_Creator::activate_job_list\n"));
+
+ Job_i* job;
+
+ for (int i = 0; i < job_count_; ++i)
+ {
+ job = job_list_[i];
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "Activating job:%s\n", job->name ().c_str ()));
+
+ // find your poa
+ PortableServer::POA_var host_poa =
+ root_poa_->find_POA (job->poa ().c_str (), 0
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ PortableServer::ServantBase_var servant_var (job);
+
+ // Register with poa.
+ PortableServer::ObjectId_var id;
+
+ id = host_poa->activate_object (job
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ CORBA::Object_var server =
+ host_poa->id_to_reference (id.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ CORBA::String_var ior =
+ orb_->object_to_string (server.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ const ACE_CString &job_name = job->name ();
+
+ CosNaming::Name_var name =
+ this->naming_->to_name (job_name.c_str ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ this->naming_->rebind (name.in (),
+ server.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Activated Job List\n"));
+
+// ACE_NEW (base_time_,
+// ACE_Time_Value (ACE_OS::gettimeofday ()));
+
+ //base_hr_time_ = ACE_OS::gethrtime ();
+
+ // active_job_count_++;
+
+ } /* while */
+
+}
+
+void
+DT_Creator::activate_schedule (ACE_ENV_SINGLE_ARG_DECL)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Activating schedule, task count = %d\n",
+ dt_count_));
+
+ Thread_Task* task;
+
+ for (int i = 0; i < dt_count_; ++i)
{
- yield (1,0);
+ task = dt_list_[i];
+
+ if (task->dist ())
+ {
+ // resolve the object from the naming service
+ CosNaming::Name name (1);
+ name.length (1);
+ name[0].id = CORBA::string_dup (task->job ());
+
+ CORBA::Object_var obj =
+ this->naming_->resolve (name ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ Job_var job = Job::_narrow (obj.in () ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ // if (TAO_debug_level > 0)
+ // {
+ // Check that the object is configured with some
+ // PriorityModelPolicy.
+ CORBA::Policy_var policy =
+ job->_get_policy (RTCORBA::PRIORITY_MODEL_POLICY_TYPE
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ RTCORBA::PriorityModelPolicy_var priority_policy =
+ RTCORBA::PriorityModelPolicy::_narrow (policy.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (CORBA::is_nil (priority_policy.in ()))
+ ACE_DEBUG ((LM_DEBUG,
+ "ERROR: Priority Model Policy not exposed!\n"));
+ else
+ {
+ /*
+ RTCORBA::PriorityModel priority_model =
+ priority_policy->priority_model (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+
+ if (priority_model == RTCORBA::CLIENT_PROPAGATED)
+ ACE_DEBUG ((LM_DEBUG,
+ "%s priority_model = RTCORBA::CLIENT_PROPAGATED\n", task->job ()));
+ else
+ ACE_DEBUG ((LM_DEBUG,
+ "%s priority_model = RTCORBA::SERVER_DECLARED\n", task->job ()));
+ */
+ }
+ //} /* if (TAO_debug_level > 0) */
+
+ task->job (job.in ());
+ }
}
- current_->end_scheduling_segment (name
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
+ if (TAO_debug_level > 0 && dt_count_ > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Activated schedule, task count = %d\n",
+ dt_count_));
+
+}
+
+int
+DT_Creator::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL)
+{
+ CORBA::Object_var naming_obj =
+ this->orb_->resolve_initial_references ("NameService"
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ // Need to check return value for errors.
+ if (CORBA::is_nil (naming_obj.in ()))
+ ACE_ERROR_RETURN ((LM_ERROR,
+ " (%P|%t) Unable to resolve the Naming Service.\n"),
+ -1);
+
+ this->naming_ =
+ CosNaming::NamingContextExt::_narrow (naming_obj.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
+ //@@tmp hack, otherwise crashes on exit!..??
+ CosNaming::NamingContextExt::_duplicate (this->naming_.in());
+ return 0;
+}
+
+void
+DT_Creator::create_distributable_threads (RTScheduling::Current_ptr current
+ ACE_ENV_ARG_DECL)
+{
+
+ // if (dt_count_ > 0)
+ // {
+ // ACE_NEW (barrier_,
+ // ACE_Barrier (this->dt_count_ + 1));
+
+
+ current_ = RTScheduling::Current::_duplicate (current);
+
+ long flags;
+ flags = THR_NEW_LWP | THR_JOINABLE;
+ flags |=
+ orb_->orb_core ()->orb_params ()->scope_policy () |
+ orb_->orb_core ()->orb_params ()->sched_policy ();
+
+ CORBA::Policy_var sched_param;
+ sched_param = CORBA::Policy::_duplicate (this->sched_param (100));
+ const char * name = 0;
+ //CORBA::Policy_ptr implicit_sched_param = sched_param.in ();
+ current_->begin_scheduling_segment (name,
+ sched_param.in (),
+ sched_param.in ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ while (!this->synch ()->synched ())
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "Waiting to Synch\n"));
+
+ this->orb_->perform_work (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ ACE_NEW (base_time_,
+ ACE_Time_Value (*(this->synch ()->base_time ())));
+
+ base_hr_time_ = ACE_OS::gethrtime ();
+
+ for (int i = 0; i < this->dt_count_; i++)
+ {
+ ACE_Time_Value now (ACE_OS::gettimeofday ());
+
+ ACE_Time_Value elapsed_time = now - *base_time_;
+
+ char buf [BUFSIZ];
+ ACE_OS::sprintf (buf, "elapsed time = %d\n now = %d\n base_time_ = %d\n",
+ (int) elapsed_time.sec (),
+ (int) now.sec (),
+ (int) base_time_->sec());
+
+ log [log_index++] = ACE_OS::strdup (buf) ;
+
+ ACE_hthread_t curr_thr;
+ ACE_Thread::self (curr_thr);
+
+ if (dt_list_ [i]->start_time () != 0 && (elapsed_time.sec () < dt_list_[i]->start_time ()))
+ {
+ int suspension_time = dt_list_[i]->start_time () - elapsed_time.sec ();
+ ACE_OS::sprintf (buf,"suspension_tome = %d\n",
+ suspension_time);
+ log [log_index++] = ACE_OS::strdup (buf);
+ yield (suspension_time,
+ dt_list_[i]);
+ }
+
+ sched_param = CORBA::Policy::_duplicate (this->sched_param (dt_list_ [i]->importance ()));
+ dt_list_ [i]->activate_task (current,
+ sched_param.in (),
+ flags,
+ base_time_,
+ base_hr_time_,
+ barrier_
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ }
+
+ /*
+ while (active_dt_count_ > 0 || active_job_count_ > 0)
+ {
+ yield(1,0);
+ }
+ */
+
+ current_->end_scheduling_segment (name
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ // }
}
void
@@ -160,51 +562,81 @@ DT_Creator::dt_ended (void)
--active_dt_count_;
//ACE_DEBUG ((LM_DEBUG, "Active job count = %d\n",active_dt_count_));
char buf [BUFSIZ];
- ACE_OS::sprintf (buf,"Active job count = %d\n",active_dt_count_);
+ ACE_OS::sprintf (buf,"Active dt count = %d\n",active_dt_count_);
log [log_index++] = ACE_OS::strdup (buf);
}
this->check_ifexit ();
}
void
+DT_Creator::job_ended (void)
+{
+ {
+ ACE_GUARD (ACE_Lock, ace_mon, *state_lock_);
+ --active_job_count_;
+ char buf [BUFSIZ];
+ ACE_OS::sprintf (buf,"Active job count = %d\n",active_job_count_);
+ log [log_index++] = ACE_OS::strdup (buf);
+ }
+
+ this->check_ifexit ();
+}
+
+void
DT_Creator::check_ifexit (void)
{
static int shutdown = 0;
{
- ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);
+ ACE_GUARD (ACE_Lock, ace_mon, *shutdown_lock_);
if (!shutdown)
{
// All tasks have finished and all jobs have been shutdown.
- if (active_dt_count_ == 0)
+ if (active_dt_count_ == 0 && active_job_count_ == 0)
{
-
+
ACE_DEBUG ((LM_DEBUG, "Shutdown in progress ...\n"));
-
- TASK_STATS::instance ()->dump_samples ("schedule.dat",
+
+ /*
+ for (int i = 0; i < dt_count_; i++)
+ {
+ dt_list_[i]->dump_stats ();
+ }
+
+ for (int i = 0; i < job_count_; i ++)
+ {
+ job_list_[i]->dump_stats ();
+ }
+ */
+ TASK_STATS::instance ()->dump_samples (file_name_,
"#Schedule Output",
ACE_High_Res_Timer::global_scale_factor ());
shutdown = 1;
-
- FILE* log_file = ACE_OS::fopen ("log_file", "w");
+
+ FILE* log_file = ACE_OS::fopen (log_file_name_, "w");
if (log_file != NULL)
{
// first dump what the caller has to say.
ACE_OS::fprintf (log_file, "Log File\n");
-
+
for (int i = 0; i < log_index; i++)
{
ACE_OS::fprintf (log_file, "%s\n", log [i]);
}
-
+
ACE_OS::fclose (log_file);
}
+ ACE_DEBUG ((LM_DEBUG,
+ "Log File Ready\n"));
+
+ if (!orb_->work_pending ())
+ {
+ orb_->destroy ();
+ }
- // shutdown the ORB
- orb_->shutdown (0);
}
}
}
@@ -233,3 +665,40 @@ DT_Creator::orb (void)
{
return this->orb_.in ();
}
+
+void
+DT_Creator::orb (CORBA::ORB_ptr orb)
+{
+ this->orb_ = CORBA::ORB::_duplicate (orb);
+}
+
+ACE_Time_Value*
+DT_Creator::base_time (void)
+{
+ return this->base_time_;
+}
+
+void
+DT_Creator::base_time (ACE_Time_Value* base_time)
+{
+ this->base_time_ = base_time;
+}
+
+ACE_hrtime_t
+DT_Creator::base_hr_time (void)
+{
+ return this->base_hr_time_;
+}
+
+RTScheduling::Current_ptr
+DT_Creator::current (void)
+{
+ return current_.in ();
+}
+
+
+Synch_i*
+DT_Creator::synch (void)
+{
+ return this->synch_;
+}
diff --git a/TAO/examples/RTScheduling/DT_Creator.h b/TAO/examples/RTScheduling/DT_Creator.h
index 8cf276d1446..9528eb51718 100644
--- a/TAO/examples/RTScheduling/DT_Creator.h
+++ b/TAO/examples/RTScheduling/DT_Creator.h
@@ -2,15 +2,22 @@
#ifndef DT_CREATOR_H
#define DT_CREATOR_H
+#include "orbsvcs/orbsvcs/CosNamingC.h"
#include "ace/Service_Config.h"
#include "ace/Service_Object.h"
#include "tao/RTScheduling/RTScheduler.h"
-
+#include "ace/Arg_Shifter.h"
+#include "POA_Holder.h"
+#include "Job_i.h"
+#include "Synch_i.h"
class Thread_Task;
class Task;
+
typedef Thread_Task **DT_LIST;
+typedef POA_Holder **POA_LIST;
+typedef Job_i **JOB_LIST;
class DT_Creator : public ACE_Service_Object
{
@@ -19,11 +26,17 @@ class DT_Creator : public ACE_Service_Object
~DT_Creator (void);
int init (int argc, char *argv []);
-
- virtual void create_distributable_threads (CORBA::ORB_ptr orb,
- RTScheduling::Current_ptr current
+
+ int dt_task_init (ACE_Arg_Shifter& arg_shifter
+ ACE_ENV_ARG_DECL_WITH_DEFAULTS);
+
+ virtual void create_distributable_threads (RTScheduling::Current_ptr current
ACE_ENV_ARG_DECL_WITH_DEFAULTS);
-
+
+ void activate_poa_list (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+ void activate_job_list (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+ void activate_schedule (ACE_ENV_SINGLE_ARG_DECL_WITH_DEFAULTS);
+
virtual void yield (int suspend_time,
Thread_Task* task) = 0;
@@ -31,32 +44,73 @@ class DT_Creator : public ACE_Service_Object
virtual Thread_Task* create_thr_task (int importance,
int start_time,
- int load) = 0;
+ int load,
+ int iter,
+ int dist,
+ char *job_name) = 0;
// virtual Task* task (void) = 0;
+
+ /// Resolve the naming service.
+ int resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL);
int dt_count (void);
void dt_ended (void);
+ void job_ended (void);
void check_ifexit (void);
void log_msg (char* msg);
+ void orb (CORBA::ORB_ptr);
CORBA::ORB_ptr orb (void);
+ ACE_Time_Value* base_time (void);
+ void base_time (ACE_Time_Value*);
+
+ ACE_hrtime_t base_hr_time (void);
+
+ virtual int total_load (void) = 0;
+
+ RTScheduling::Current_ptr current (void);
+
+ Synch_i* synch (void);
+
+ void register_synch_obj (ACE_ENV_SINGLE_ARG_PARAMETER);
+
+ int activate_root_poa (ACE_ENV_SINGLE_ARG_PARAMETER);
+
protected:
+
+
DT_LIST dt_list_;
+ POA_LIST poa_list_;
+ JOB_LIST job_list_;
//Fixed_Priority_Scheduler* scheduler_;
ACE_Barrier* barrier_;
int dt_count_;
+ int poa_count_;
+ int job_count_;
CORBA::ORB_var orb_;
/// Mutex to serialize access to our internal state.
ACE_Lock* state_lock_;
ACE_Lock* shutdown_lock_;
int active_dt_count_;
+ int active_job_count_;
char** log;
ACE_Time_Value* base_time_;
+ ACE_hrtime_t base_hr_time_;
RTScheduling::Current_var current_;
+ /// RT ORB
+ RTCORBA::RTORB_var rt_orb_;
+ /// Reference to the root poa.
+ PortableServer::POA_var root_poa_;
+ /// A naming context.
+ CosNaming::NamingContextExt_var naming_;
+ char* file_name_;
+ char* log_file_name_;
+ ACE_UINT32 gsf_;
+ Synch_i* synch_;
};
diff --git a/TAO/examples/RTScheduling/Job.idl b/TAO/examples/RTScheduling/Job.idl
new file mode 100644
index 00000000000..f1ba4b5cff2
--- /dev/null
+++ b/TAO/examples/RTScheduling/Job.idl
@@ -0,0 +1,11 @@
+//
+// $Id$
+//
+
+interface Job
+{
+ void work (in unsigned long work,
+ in short importance);
+
+ void shutdown ();
+};
diff --git a/TAO/examples/RTScheduling/Job_i.cpp b/TAO/examples/RTScheduling/Job_i.cpp
new file mode 100644
index 00000000000..1ae94e786e0
--- /dev/null
+++ b/TAO/examples/RTScheduling/Job_i.cpp
@@ -0,0 +1,180 @@
+//$Id$
+#include "Job_i.h"
+
+#include "tao/debug.h"
+#include "ace/Arg_Shifter.h"
+#include "DT_Creator.h"
+#include "Task_Stats.h"
+#include "ace/High_Res_Timer.h"
+
+//#include "Activity.h"
+
+Job_i::Job_i (DT_Creator* dt_creator)
+ : dt_creator_ (dt_creator),
+ guid_ (0)
+{
+ // create the stat object.
+ ACE_NEW (task_stats_, Task_Stats);
+ task_stats_->init (100);
+
+}
+
+const ACE_CString&
+Job_i::name (void)
+{
+ return job_name_;
+}
+
+const ACE_CString&
+Job_i::poa (void)
+{
+ return POA_name_;
+}
+
+int
+Job_i::init (ACE_Arg_Shifter& arg_shifter)
+{
+ job_name_ = arg_shifter.get_current (); // Read the name of the Job
+ arg_shifter.consume_arg ();
+
+ POA_name_ = arg_shifter.get_current (); // Read the name of the POA
+ arg_shifter.consume_arg ();
+
+ return 0;
+}
+
+void
+Job_i::work (CORBA::ULong work,
+ CORBA::Short importance
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+
+ int priority;
+ ACE_hthread_t current;
+ ACE_Thread::self (current);
+ if (ACE_Thread::getprio (current, priority) == -1)
+ return;
+
+
+ static CORBA::ULong prime_number = 9619;
+ // ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "test_i::method: %d units of work\n",
+ work));
+
+ if (guid_ == 0)
+ ACE_OS::memcpy (&guid_,
+ dt_creator_->current ()->id (ACE_ENV_SINGLE_ARG_PARAMETER)->get_buffer (),
+ sizeof (dt_creator_->current ()->id (ACE_ENV_SINGLE_ARG_PARAMETER)->length ()));
+
+
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "%t Guid is %d, Importance is %d\n",
+ guid_,
+ importance));
+
+ char msg [BUFSIZ];
+ ACE_OS::sprintf (msg,"Thread Priority is %d, Guid is %d\n",priority, guid_);
+ dt_creator_->log_msg (msg);
+
+ // CORBA::Policy_ptr sched_param;
+ // CORBA::Policy_var sched_param;
+ // sched_param = CORBA::Policy::_duplicate (dt_creator_->sched_param (importance));
+ /*
+ const char * name = 0;
+ CORBA::Policy_ptr implicit_sched_param = 0;
+ dt_creator_->current ()->update_scheduling_segment (name,
+ sched_param,
+ sched_param
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ */
+
+ for (; work != 0; work--)
+ {
+ // ACE_hrtime_t now = ACE_OS::gethrtime ();
+
+ ACE_Time_Value run_time = ACE_OS::gettimeofday () - *(dt_creator_->base_time ());
+ TASK_STATS::instance ()->sample (ACE_UINT64 (run_time.sec ()),
+ guid_);
+
+ ACE_Time_Value count_down_time (1);
+ ACE_Countdown_Time count_down (&count_down_time);
+
+ while (count_down_time > ACE_Time_Value::zero)
+ {
+ ACE::is_prime (prime_number,
+ 2,
+ prime_number / 2);
+ count_down.update ();
+ }
+
+ // convert to microseconds
+ // #if !defined ACE_LACKS_LONGLONG_T
+
+ // ACE_UINT32 elapsed_microseconds = ACE_UINT32((now - dt_creator_->base_hr_time ()) / gsf);
+
+ // #else /* ! ACE_LACKS_LONGLONG_T */
+
+ // ACE_UINT32 elapsed_microseconds = (now - dt_creator_->base_hr_time ()) / gsf;
+
+ // #endif /* ! ACE_LACKS_LONGLONG_T */
+
+ // #if defined (ACE_WIN32)
+ // elapsed_microseconds*=1000; // convert to uSec on Win32
+ // #endif /* ACE_WIN32 */
+
+ // TASK_STATS::instance ()->sample (elapsed_microseconds,
+ // guid_);
+
+
+ }
+
+ // task_stats_->sample (ACE_UINT64 (run_time.sec ()),
+ // guid_);
+
+}
+
+void
+Job_i::post_work (int /*guid*/,
+ int /*importance*/)
+{
+}
+
+int
+Job_i::guid (void)
+{
+ return this->guid_;
+}
+
+void
+Job_i::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ dt_creator_->job_ended ();
+}
+
+void
+Job_i::dump_stats (void)
+{
+ char fname [BUFSIZ];
+ ACE_OS::sprintf (fname, "Job_%d.dat",guid_);
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "File name %s\n",
+ fname));
+
+
+ char msg [BUFSIZ];
+ ACE_OS::sprintf (msg, "#Schedule Output for DT %d", guid_);
+
+ task_stats_->dump_samples (fname,
+ msg,
+ ACE_High_Res_Timer::global_scale_factor ());
+
+}
diff --git a/TAO/examples/RTScheduling/Job_i.h b/TAO/examples/RTScheduling/Job_i.h
new file mode 100644
index 00000000000..940cd1b9b51
--- /dev/null
+++ b/TAO/examples/RTScheduling/Job_i.h
@@ -0,0 +1,76 @@
+/* -*- C++ -*- */
+//=============================================================================
+/**
+ * @file Job_i.h
+ *
+ * $Id$
+ *
+ * This file defines the servant for the Job.idl interface
+ *
+ * @author Pradeep Gore <pradeep@cs.wustl.edu>
+ */
+//=============================================================================
+#ifndef JOB_I_H
+#define JOB_I_H
+
+#include "JobS.h"
+#include "Task_Stats.h"
+
+class ACE_Arg_Shifter;
+class DT_Creator;
+
+/**
+ * @class Job_i
+ *
+ * @brief Implements a Job that performs some cpu bound work.
+ *
+ */
+class Job_i : public POA_Job, public virtual PortableServer::RefCountServantBase
+{
+ public:
+ /// Constructor
+ Job_i (DT_Creator* dt_creator);
+
+
+ /// Init the state of this object.
+ int init (ACE_Arg_Shifter& arg_shifter);
+
+ /// = Accessors
+ const ACE_CString& name (void);
+ const ACE_CString& poa (void);
+
+ /// = inteface Job method implementation.
+ virtual void work (CORBA::ULong work,
+ CORBA::Short importance
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ virtual void post_work (int guid,
+ int importance);
+
+ void dump_stats (void);
+
+ virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((
+ CORBA::SystemException
+ ));
+
+ int guid (void);
+
+ protected:
+ /// The name of this Job
+ ACE_CString job_name_;
+
+ /// The name of the POA that we live in.
+ ACE_CString POA_name_;
+
+ DT_Creator* dt_creator_;
+
+ Task_Stats *task_stats_;
+
+ int guid_;
+};
+
+#endif /* JOB_I_H */