diff options
Diffstat (limited to 'ACE/TAO/examples/Kokyu_dsrt_schedulers/muf_example/client.cpp')
-rw-r--r-- | ACE/TAO/examples/Kokyu_dsrt_schedulers/muf_example/client.cpp | 410 |
1 files changed, 410 insertions, 0 deletions
diff --git a/ACE/TAO/examples/Kokyu_dsrt_schedulers/muf_example/client.cpp b/ACE/TAO/examples/Kokyu_dsrt_schedulers/muf_example/client.cpp new file mode 100644 index 00000000000..e5b449da7e8 --- /dev/null +++ b/ACE/TAO/examples/Kokyu_dsrt_schedulers/muf_example/client.cpp @@ -0,0 +1,410 @@ +// $Id$ + +#include "ace/Get_Opt.h" +#include "ace/Task.h" +#include "ace/High_Res_Timer.h" +#include "ace/OS_NS_unistd.h" +#include "tao/RTScheduling/RTScheduler_Manager.h" +#include "testC.h" +#include "MUF_Scheduler.h" +#include "orbsvcs/Time_Utilities.h" + +ACE_RCSID(MT_Server, client, "$Id$") + +const ACE_TCHAR *ior = ACE_TEXT("file://test.ior"); +int niterations = 5; +int do_shutdown = 0; +int enable_dynamic_scheduling = 0; +int enable_yield = 1; + +class Worker : public ACE_Task_Base +{ + // = TITLE + // Run a server thread + // + // = DESCRIPTION + // Use the ACE_Task_Base class to run server threads + // +public: + Worker (CORBA::ORB_ptr orb, + Simple_Server_ptr server_ptr, + RTScheduling::Current_ptr current, + MUF_Scheduler* scheduler, + TimeBase::TimeT deadline, + TimeBase::TimeT estimated_initial_execution_time, + long criticality, + CORBA::Long server_load); + // int sleep_time); + // ctor + + virtual int svc (void); + // The thread entry point. + +private: + CORBA::ORB_var orb_; + // The orb + + Simple_Server_var server_; + RTScheduling::Current_var scheduler_current_; + MUF_Scheduler* scheduler_; + TimeBase::TimeT deadline_; + TimeBase::TimeT estimated_initial_execution_time_; + long criticality_; + CORBA::Long server_load_; + int sleep_time_; +}; + +int +parse_args (int argc, ACE_TCHAR *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("xk:i:ds")); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'x': + do_shutdown = 1; + break; + + case 'k': + ior = get_opts.opt_arg (); + break; + + case 'i': + niterations = ACE_OS::atoi (get_opts.opt_arg ()); + break; + + case 'd': + enable_dynamic_scheduling = 1; + break; + + case 's': + enable_yield = 0; + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <ior> " + "-i <niterations> " + "-d (enable dynamic scheduling)" + "-s (disable yield)" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +ACE_TMAIN(int argc, ACE_TCHAR *argv[]) +{ + MUF_Scheduler* scheduler=0; + RTScheduling::Current_var current; + int prio; + int max_prio; + ACE_Sched_Params::Policy sched_policy = ACE_SCHED_RR; + int sched_scope = ACE_SCOPE_THREAD; + long flags; + + if (sched_policy == ACE_SCHED_RR) + flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_RR; + else + flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_FIFO; + + ACE_hthread_t main_thr_handle; + ACE_Thread::self (main_thr_handle); + + max_prio = ACE_Sched_Params::priority_max (sched_policy, + sched_scope); + + //FUZZ: disable check_for_lack_ACE_OS + ACE_Sched_Params sched_params (sched_policy, max_prio); + //FUZZ: enable check_for_lack_ACE_OS + + ACE_OS::sched_params (sched_params); + + if (ACE_Thread::getprio (main_thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("getprio not supported\n") + )); + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n") + ACE_TEXT ("thr_getprio failed"))); + } + } + + ACE_DEBUG ((LM_DEBUG, "(%t): main thread prio is %d\n", prio)); + + try + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv); + + if (parse_args (argc, argv) != 0) + return 1; + + CORBA::Object_var object = + orb->string_to_object (ior); + + Simple_Server_var server = + Simple_Server::_narrow (object.in ()); + + if (CORBA::is_nil (server.in ())) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Object reference <%s> is nil.\n", + ior), + 1); + } + + if (enable_dynamic_scheduling) + { + ACE_DEBUG ((LM_DEBUG, "Dyn Sched enabled\n")); + CORBA::Object_ptr manager_obj = + orb->resolve_initial_references ("RTSchedulerManager"); + + TAO_RTScheduler_Manager_var manager = + TAO_RTScheduler_Manager::_narrow (manager_obj); + + Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type; + if (enable_yield) + { + disp_impl_type = Kokyu::DSRT_CV_BASED; + } + else + { + disp_impl_type = Kokyu::DSRT_OS_BASED; + } + + ACE_NEW_RETURN (scheduler, + MUF_Scheduler (orb.in (), + disp_impl_type, + sched_policy, + sched_scope), -1); + + manager->rtscheduler (scheduler); + + CORBA::Object_var object = + orb->resolve_initial_references ("RTScheduler_Current"); + + current = + RTScheduling::Current::_narrow (object.in ()); + + } + + TimeBase::TimeT deadline; + TimeBase::TimeT exec_time; + int criticality=0; + + ORBSVCS_Time::Time_Value_to_TimeT (deadline, + ACE_OS::gettimeofday () + + ACE_Time_Value (50,0) ); + + ORBSVCS_Time::Time_Value_to_TimeT (exec_time, + ACE_OS::gettimeofday () + + ACE_Time_Value (10,0) ); + + Worker worker1 (orb.in (), + server.in (), + current.in (), + scheduler, + deadline, + exec_time, + criticality, + 30); + + if (worker1.activate (flags, 1, 0, max_prio) != 0) + { + ACE_ERROR ((LM_ERROR, + "(%t|%T) cannot activate worker thread.\n")); + } + + ACE_OS::sleep(2); + + ORBSVCS_Time::Time_Value_to_TimeT (deadline, + ACE_OS::gettimeofday () + + ACE_Time_Value (30,0) ); + + ORBSVCS_Time::Time_Value_to_TimeT (exec_time, + ACE_OS::gettimeofday () + + ACE_Time_Value (10,0) ); + + criticality = 0; + Worker worker2 (orb.in (), + server.in (), + current.in (), + scheduler, + deadline, + exec_time, + criticality, + 10); + + if (worker2.activate (flags, 1, 0, max_prio) != 0) + { + ACE_ERROR ((LM_ERROR, + "(%t|%T) cannot activate scheduler thread in RT mode.\n")); + } + + ORBSVCS_Time::Time_Value_to_TimeT (deadline, + ACE_OS::gettimeofday () + + ACE_Time_Value (100,0) ); + + ORBSVCS_Time::Time_Value_to_TimeT (exec_time, + ACE_OS::gettimeofday () + + ACE_Time_Value (10,0) ); + criticality = 1; + + Worker worker3 (orb.in (), + server.in (), + current.in (), + scheduler, + deadline, + exec_time, + criticality, + 5); + if (worker3.activate (flags, 1, 0, max_prio) != 0) + { + ACE_ERROR ((LM_ERROR, + "(%t|%T) cannot activate scheduler thread in RT mode.\n")); + } + + worker1.wait (); + worker2.wait (); + worker3.wait (); + + ACE_DEBUG ((LM_DEBUG, + "(%t): wait for worker threads done in main thread\n")); + + if (do_shutdown) + { + if (enable_dynamic_scheduling) + { + MUF_Scheduling::SchedulingParameter sched_param; + sched_param.criticality = 0; + sched_param.deadline = 0; + sched_param.estimated_initial_execution_time = 0; + CORBA::Policy_var sched_param_policy = + scheduler->create_scheduling_parameter (sched_param); + CORBA::Policy_ptr implicit_sched_param = 0; + current->begin_scheduling_segment (0, + sched_param_policy.in (), + implicit_sched_param); + } + + ACE_DEBUG ((LM_DEBUG, "(%t): about to call server shutdown\n")); + server->shutdown (); + + ACE_DEBUG ((LM_DEBUG, "after shutdown call in main thread\n")); + + + if (enable_dynamic_scheduling) + { + current->end_scheduling_segment (0); + } + } + + scheduler->shutdown (); + ACE_DEBUG ((LM_DEBUG, "scheduler shutdown done\n")); + } + catch (const CORBA::Exception& ex) + { + ex._tao_print_exception ("Exception caught:"); + return 1; + } + + return 0; +} + +// **************************************************************** + +Worker::Worker (CORBA::ORB_ptr orb, + Simple_Server_ptr server_ptr, + RTScheduling::Current_ptr current, + MUF_Scheduler* scheduler, + TimeBase::TimeT deadline, + TimeBase::TimeT estimated_initial_execution_time, + long criticality, + CORBA::Long server_load) + // int sleep_time) + : orb_ (CORBA::ORB::_duplicate (orb)), + server_ (Simple_Server::_duplicate (server_ptr)), + scheduler_current_ (RTScheduling::Current::_duplicate (current)), + scheduler_ (scheduler), + deadline_ (deadline), + estimated_initial_execution_time_ ( estimated_initial_execution_time), + criticality_ (criticality), + server_load_ (server_load) + // sleep_time_ (sleep_time) +{ +} + +int +Worker::svc (void) +{ + const char * name = 0; + /* + ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to sleep for %d sec\n", sleep_time_)); + ACE_OS::sleep (sleep_time_); + ACE_DEBUG ((LM_DEBUG, "(%t|%T):woke up from sleep for %d sec\n", sleep_time_)); + */ + ACE_hthread_t thr_handle; + ACE_Thread::self (thr_handle); + int prio; + + if (ACE_Thread::getprio (thr_handle, prio) == -1) + { + if (errno == ENOTSUP) + { + ACE_ERROR((LM_ERROR, + ACE_TEXT ("getprio not supported\n") + )); + } + else + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n") + ACE_TEXT ("thr_getprio failed"))); + } + } + + ACE_DEBUG ((LM_DEBUG, "(%t|%T) worker activated with prio %d\n", prio)); + + if (enable_dynamic_scheduling) + { + MUF_Scheduling::SchedulingParameter sched_param; + CORBA::Policy_var sched_param_policy; + sched_param.criticality = criticality_; + sched_param.deadline = deadline_; + sched_param.estimated_initial_execution_time = 0; + sched_param_policy = scheduler_->create_scheduling_parameter (sched_param); + CORBA::Policy_ptr implicit_sched_param = 0; + ACE_DEBUG ((LM_DEBUG, "(%t|%T):before begin_sched_segment\n")); + scheduler_current_->begin_scheduling_segment (name, + sched_param_policy.in (), + implicit_sched_param); + ACE_DEBUG ((LM_DEBUG, "(%t|%T):after begin_sched_segment\n")); + } + + ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to make two way call\n")); + server_->test_method (server_load_); + ACE_DEBUG ((LM_DEBUG, "(%t|%T):two way call done\n")); + + if (enable_dynamic_scheduling) + { + scheduler_current_->end_scheduling_segment (name); + } + + ACE_DEBUG ((LM_DEBUG, "client worker thread (%t) done\n")); + + return 0; +} |