// $Id$ #include "ace/Get_Opt.h" #include "ace/Task.h" #include "ace/High_Res_Timer.h" #include "ace/OS_NS_unistd.h" #include "tao/RTScheduling/RTScheduler_Manager.h" #include "testC.h" #include "FP_Scheduler.h" #include "ace/Argv_Type_Converter.h" ACE_RCSID(MT_Server, client, "$Id$") const char *ior = "file://test.ior"; int niterations = 5; int do_shutdown = 0; int enable_dynamic_scheduling = 0; int enable_yield = 1; class Worker : public ACE_Task_Base { // = TITLE // Run a server thread // // = DESCRIPTION // Use the ACE_Task_Base class to run server threads // public: Worker (CORBA::ORB_ptr orb, Simple_Server_ptr server_ptr, RTScheduling::Current_ptr current, Fixed_Priority_Scheduler* scheduler, CORBA::Short importance, CORBA::Long server_load); // int sleep_time); // ctor virtual int svc (void); // The thread entry point. private: CORBA::ORB_var orb_; // The orb Simple_Server_var server_; RTScheduling::Current_var scheduler_current_; Fixed_Priority_Scheduler* scheduler_; RTCORBA::Priority prio_; CORBA::Long server_load_; int sleep_time_; }; int parse_args (int argc, char *argv[]) { ACE_Get_Arg_Opt get_opts (argc, argv, "xk:i:ds"); int c; while ((c = get_opts ()) != -1) switch (c) { case 'x': do_shutdown = 1; break; case 'k': ior = get_opts.opt_arg (); break; case 'i': niterations = ACE_OS::atoi (get_opts.opt_arg ()); break; case 'd': enable_dynamic_scheduling = 1; break; case 's': enable_yield = 0; break; case '?': default: ACE_ERROR_RETURN ((LM_ERROR, "usage: %s " "-k " "-i " "-d (enable dynamic scheduling)" "-s (disable yield)" "\n", argv [0]), -1); } // Indicates sucessful parsing of the command line return 0; } int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { ACE_Argv_Type_Converter convert (argc, argv); Fixed_Priority_Scheduler* scheduler=0; RTScheduling::Current_var current; int prio; int max_prio; ACE_Sched_Params::Policy sched_policy = ACE_SCHED_RR; int sched_scope = ACE_SCOPE_THREAD; long flags; if (sched_policy == ACE_SCHED_RR) flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_RR; else flags = THR_NEW_LWP | THR_BOUND | THR_JOINABLE | THR_SCHED_FIFO; ACE_hthread_t main_thr_handle; ACE_Thread::self (main_thr_handle); max_prio = ACE_Sched_Params::priority_max (sched_policy, sched_scope); ACE_Sched_Params sched_params (sched_policy, max_prio); ACE_OS::sched_params (sched_params); if (ACE_Thread::getprio (main_thr_handle, prio) == -1) { if (errno == ENOTSUP) { ACE_ERROR((LM_ERROR, ACE_TEXT ("getprio not supported\n") )); } else { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n") ACE_TEXT ("thr_getprio failed"))); } } ACE_DEBUG ((LM_DEBUG, "(%t): main thread prio is %d\n", prio)); ACE_TRY_NEW_ENV { CORBA::ORB_var orb = CORBA::ORB_init (convert.get_argc(), convert.get_ASCII_argv(), "" ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; if (parse_args (convert.get_argc(), convert.get_ASCII_argv()) != 0) return 1; CORBA::Object_var object = orb->string_to_object (ior ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; Simple_Server_var server = Simple_Server::_narrow (object.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; if (CORBA::is_nil (server.in ())) { ACE_ERROR_RETURN ((LM_ERROR, "Object reference <%s> is nil\n", ior), 1); } if (enable_dynamic_scheduling) { ACE_DEBUG ((LM_DEBUG, "Dyn Sched enabled\n")); CORBA::Object_ptr manager_obj = orb->resolve_initial_references ("RTSchedulerManager" ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; TAO_RTScheduler_Manager_var manager = TAO_RTScheduler_Manager::_narrow (manager_obj ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type; if (enable_yield) { disp_impl_type = Kokyu::DSRT_CV_BASED; } else { disp_impl_type = Kokyu::DSRT_OS_BASED; } ACE_NEW_RETURN (scheduler, Fixed_Priority_Scheduler (orb.in (), disp_impl_type, sched_policy, sched_scope), -1); manager->rtscheduler (scheduler); CORBA::Object_var object = orb->resolve_initial_references ("RTScheduler_Current" ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; current = RTScheduling::Current::_narrow (object.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } Worker worker1 (orb.in (), server.in (), current.in (), scheduler, 10, 15); if (worker1.activate (flags, 1, 0, max_prio) != 0) { ACE_ERROR ((LM_ERROR, "(%t|%T) cannot activate worker thread.\n")); } ACE_OS::sleep(2); Worker worker2 (orb.in (), server.in (), current.in (), scheduler, 12, 5); if (worker2.activate (flags, 1, 0, max_prio) != 0) { ACE_ERROR ((LM_ERROR, "(%t|%T) cannot activate scheduler thread in RT mode.\n")); } worker1.wait (); worker2.wait (); ACE_DEBUG ((LM_DEBUG, "(%t): wait for worker threads done in main thread\n")); if (do_shutdown) { if (enable_dynamic_scheduling) { FP_Scheduling::SegmentSchedulingParameter sched_param; sched_param.base_priority = 0; CORBA::Policy_var sched_param_policy = scheduler->create_segment_scheduling_parameter (sched_param); CORBA::Policy_ptr implicit_sched_param = 0; current->begin_scheduling_segment (0, sched_param_policy.in (), implicit_sched_param ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } ACE_DEBUG ((LM_DEBUG, "(%t): about to call server shutdown\n")); server->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_TRY_CHECK; ACE_DEBUG ((LM_DEBUG, "after shutdown call in main thread\n")); if (enable_dynamic_scheduling) { current->end_scheduling_segment (0 ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } } scheduler->shutdown (); ACE_DEBUG ((LM_DEBUG, "scheduler shutdown done\n")); } ACE_CATCHANY { ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Exception caught:"); return 1; } ACE_ENDTRY; return 0; } // **************************************************************** Worker::Worker (CORBA::ORB_ptr orb, Simple_Server_ptr server_ptr, RTScheduling::Current_ptr current, Fixed_Priority_Scheduler* scheduler, RTCORBA::Priority prio, CORBA::Long server_load) // int sleep_time) : orb_ (CORBA::ORB::_duplicate (orb)), server_ (Simple_Server::_duplicate (server_ptr)), scheduler_current_ (RTScheduling::Current::_duplicate (current)), scheduler_ (scheduler), prio_ (prio), server_load_ (server_load) // sleep_time_ (sleep_time) { } int Worker::svc (void) { ACE_DECLARE_NEW_CORBA_ENV; const char * name = 0; /* ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to sleep for %d sec\n", sleep_time_)); ACE_OS::sleep (sleep_time_); ACE_DEBUG ((LM_DEBUG, "(%t|%T):woke up from sleep for %d sec\n", sleep_time_)); */ ACE_hthread_t thr_handle; ACE_Thread::self (thr_handle); int prio; if (ACE_Thread::getprio (thr_handle, prio) == -1) { if (errno == ENOTSUP) { ACE_ERROR((LM_ERROR, ACE_TEXT ("getprio not supported\n") )); } else { ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n") ACE_TEXT ("thr_getprio failed"))); } } ACE_DEBUG ((LM_DEBUG, "(%t|%T) worker activated with prio %d\n", prio)); if (enable_dynamic_scheduling) { FP_Scheduling::SegmentSchedulingParameter sched_param; sched_param.base_priority = prio_; CORBA::Policy_var sched_param_policy = scheduler_->create_segment_scheduling_parameter (sched_param); CORBA::Policy_ptr implicit_sched_param = 0; ACE_DEBUG ((LM_DEBUG, "(%t|%T):before begin_sched_segment\n")); scheduler_current_->begin_scheduling_segment (name, sched_param_policy.in (), implicit_sched_param ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); ACE_DEBUG ((LM_DEBUG, "(%t|%T):after begin_sched_segment\n")); } ACE_DEBUG ((LM_DEBUG, "(%t|%T):about to make two way call\n")); server_->test_method (server_load_ ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); ACE_DEBUG ((LM_DEBUG, "(%t|%T):two way call done\n")); if (enable_dynamic_scheduling) { scheduler_current_->end_scheduling_segment (name); ACE_CHECK_RETURN (-1); } ACE_DEBUG ((LM_DEBUG, "client worker thread (%t) done\n")); return 0; }