From 5fcae293c1af65794800a488f284a061e05464c7 Mon Sep 17 00:00:00 2001 From: thrall Date: Wed, 7 Jan 2004 19:16:42 +0000 Subject: Converted to use ACE_Reactor instead of EC timeouts. Also, consumers should do work (untested), and comments indicate some new locations to put DSUI instrumentation. --- TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp | 24 +++- TAO/orbsvcs/examples/RtEC/test_driver/Consumer.h | 8 ++ TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp | 118 ++++++++++++------- .../RtEC/test_driver/Makefile.test_driver_app | 12 +- .../RtEC/test_driver/Makefile.test_driver_lib | 13 ++- TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp | 8 +- .../examples/RtEC/test_driver/TimeoutConsumer.cpp | 64 +++++++++-- .../examples/RtEC/test_driver/TimeoutConsumer.h | 17 ++- .../RtEC/test_driver/Timer_Event_Handler.cpp | 31 +++++ .../RtEC/test_driver/Timer_Event_Handler.h | 24 ++++ TAO/orbsvcs/examples/RtEC/test_driver/cpuload.cpp | 128 +++++++++++++++++++++ TAO/orbsvcs/examples/RtEC/test_driver/cpuload.h | 15 +++ TAO/orbsvcs/examples/RtEC/test_driver/example.xml | 4 +- .../examples/RtEC/test_driver/test_driver.mpc | 3 +- 14 files changed, 398 insertions(+), 71 deletions(-) create mode 100644 TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.h create mode 100644 TAO/orbsvcs/examples/RtEC/test_driver/cpuload.cpp create mode 100644 TAO/orbsvcs/examples/RtEC/test_driver/cpuload.h diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp index 9f95d03187a..0359a3610c8 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.cpp @@ -1,6 +1,7 @@ // $Id$ #include "Consumer.h" +#include "cpuload.h" #include //for ostringstream @@ -86,8 +87,15 @@ Consumer::connect_impl (bool set_rtinfo, //true if should set RT_Info RtecScheduler::handle_t rt_info = scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); ACE_CHECK; + + //TODO: How get period if !set_rtinfo? if (set_rtinfo) { + //ORBSVCS_Time::TimeT_to_Time_Value(this->_work_time,period); + //this->_work_time *= 0.5; //only work for half the period + //TODO: How much work should we do? + this->_work_time.set(1,0); //based on DT test, work is 1 second long + //ignore period, since it will propagate from Supplier RtecScheduler::Period_t p = 0;//period; @@ -139,6 +147,9 @@ Consumer::connect_impl (bool set_rtinfo, //true if should set RT_Info ACE_DEBUG((LM_DEBUG,"Consumer %d connected\n",this->_consumer_id)); ACE_DEBUG((LM_DEBUG,"\tEvent type: %d\n",event_type)); + + //calibrate + this->_work.calibrate(); } void @@ -180,7 +191,7 @@ Consumer::push (const RtecEventComm::EventSet& events "Consumer (%P|%t) no events\n")); return; } - + /* int prio = -1; ACE_hthread_t handle; ACE_Thread::self(handle); @@ -188,6 +199,17 @@ Consumer::push (const RtecEventComm::EventSet& events //ACE_thread_t tid = ACE_Thread::self(); ACE_DEBUG ((LM_DEBUG, "Consumer #%d @%d (%P|%t) we received event type %d\n", this->_consumer_id,prio,events[0].header.type)); + */ + + //@BT INSTRUMENT with event ID: EVENT_WORK_START Measure time + //when work triggered by event starts. + + //do work + timeval load = (timeval) this->_work_time; + this->_work.run(load); + + //@BT INSTRUMENT with event ID: EVENT_WORK_END Measure time when + //work triggered by event finishes. } void diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.h b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.h index 800bc309f3b..deb8bc56133 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.h +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Consumer.h @@ -17,11 +17,15 @@ #ifndef CONSUMER_H #define CONSUMER_H +#include "ace/Time_Value.h" + #include "orbsvcs/RtecEventChannelAdminC.h" #include "orbsvcs/RtecEventCommC.h" #include "orbsvcs/RtecSchedulerC.h" #include "orbsvcs/Channel_Clients_T.h" +#include "cpuload.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -103,7 +107,11 @@ private: // We connect to the EC as a consumer so we can receive the // timeout events. + ACE_Time_Value _work_time; + int _consumer_id; + + CPULoad _work; }; #endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp index f9f4833492b..61a4e078897 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/ECConfig.cpp @@ -26,6 +26,9 @@ #include "orbsvcs/RtecEventCommC.h" #include "orbsvcs/Event/EC_Gateway_Sched.h" +//REACTOR CHANGE +#include "tao/ORB_Core.h" + namespace TestConfig { //TODO: Obviously, we can't just hardcode these! @@ -54,11 +57,12 @@ ECConfig::ECConfig (void) , periods(0) , importances(0) , crits(0) - , test_done(new ACE_RW_Mutex()) //, udp_mcast_address(ACE_DEFAULT_MULTICAST_ADDR ":10001") , configured (0) //false , use_federated (1) //TODO Check whether or not FEDERATED; default to true + //, use_federated (0) //TODO Check whether or not FEDERATED; default to false { + this->test_done = new ACE_RW_Mutex(); } template @@ -385,10 +389,13 @@ ECConfig::run (void) ACE_TRY { ACE_Thread_Manager *inst = ACE_Thread_Manager::instance(); + ACE_Reactor *reactor = ACE_Reactor::instance(); // Spawn orb thread (which calls orb.run(), then terminates on return) ACE_DEBUG((LM_DEBUG,"SPAWNING ORB thread\n")); - int ret = inst->spawn(ECConfig::run_orb,&(this->orb)); + //int ret = inst->spawn(ECConfig::run_orb,&(this->orb)); + int ret = inst->spawn(ECConfig::run_orb,this->test_done); + //int ret = inst->spawn(ECConfig::run_orb,reactor); //no need for getting tid? if (ret == -1) { @@ -397,22 +404,25 @@ ECConfig::run (void) return 1; } - // Block waiting for consumers to finish - //when can acquire write lock, all Suppliers are finished - ret = this->test_done->acquire_write(); - if (ret == -1) - { - ACE_DEBUG((LM_DEBUG, "ERROR: could not acquire write lock for ECConfig: %s\n", - ACE_OS::strerror(errno))); - return 1; - } + /* + orb->run(); + //this method returns when orb->shutdown() is called; then thread exits + */ - //all Suppliers done, so stop EC and ORB - //Shutdown EC - this->reset(); + //REACTOR CHANGE + //orb->orb_core()->reactor()->run_reactor_event_loop(); + ACE_DEBUG((LM_DEBUG,"Starting Reactor loop; work? %d\n", + reactor->work_pending())); - // Shutdown ORB - this->orb->shutdown(1); //argument is TRUE + //@BT INSTRUMENT with event ID: EVENT_TEST_BEGIN Measure time + //when test starts being able to push events. + reactor->run_reactor_event_loop(); + //this method returns when end_reactor_event_loop() is called; then thread exits + ACE_CHECK; + + //REACTOR CHANGE END + + ACE_DEBUG((LM_DEBUG, "ORB thread: Shutdown\n")); if (inst->wait() == -1) //wait for ORB thread to terminate { @@ -421,6 +431,10 @@ ECConfig::run (void) return 1; } + //all Suppliers done, so stop EC and ORB + //Shutdown EC + //this->reset(); + ACE_DEBUG ((LM_DEBUG, "suppliers finished\n")); } @@ -636,26 +650,35 @@ ECConfig::connect_consumers (ACE_ENV_SINGLE_ARG_DECL) template void ECConfig::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL) { - for (size_t i = 0; i < this->suppliers.size(); ++i) + if (this->configured) { - this->suppliers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; + for (size_t i = 0; i < this->suppliers.size(); ++i) + { + ACE_DEBUG((LM_DEBUG,"Disconnecting supplier %d\n",i)); + this->suppliers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; - delete this->suppliers[i]; - this->suppliers[i] = 0; + delete this->suppliers[i]; + this->suppliers[i] = 0; + } + this->suppliers.size(0); } } template void ECConfig::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL) { - for (size_t i = 0; i < this->consumers.size(); ++i) + if (this->configured) { - this->consumers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; + for (size_t i = 0; i < this->consumers.size(); ++i) + { + this->consumers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; - delete this->consumers[i]; - this->consumers[i] = 0; + delete this->consumers[i]; + this->consumers[i] = 0; + } + this->consumers.size(0); } } @@ -767,26 +790,35 @@ ECConfig::barrier(bool is_supplier) template ACE_THR_FUNC_RETURN ECConfig::run_orb(void *data) { - //ACE_DECLARE_NEW_CORBA_ENV; - ACE_TRY + ACE_RW_Mutex *test_done = ACE_reinterpret_cast(ACE_RW_Mutex*,data); + printf("test_done: %p\n",test_done); + //test_done->dump(); + //const ACE_rwlock_t& lock = test_done->lock(); + //printf("Number of: readers=%d\twriters=%d\n",lock.num_waiting_readers_,lock.num_waiting_writers_); + //printf("acquire_read(): %d\n",test_done->acquire_read()); + //printf("acquire_write(): %d\n",test_done->acquire_write()); + //std::exit(0); + + // Block waiting for consumers to finish + //when can acquire write lock, all Suppliers are finished + int ret = test_done->acquire_write(); + if (ret == -1) { - ACE_DEBUG((LM_DEBUG, "ORB thread: Casting %x\n",data)); - - CORBA::ORB_var orb = *(ACE_reinterpret_cast(CORBA::ORB_var*,data)); - - ACE_DEBUG((LM_DEBUG, "ORB thread: Running orb\n")); - - orb->run(); - //this method returns when orb->shutdown() is called; then thread exits - - ACE_DEBUG((LM_DEBUG, "ORB thread: Shutdown\n")); + ACE_DEBUG((LM_DEBUG, "ERROR: could not acquire write lock for ECConfig: %s\n", + ACE_OS::strerror(errno))); + return 0; } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "ECConfig ORB thread"); - } - ACE_ENDTRY; + //@BT INSTRUMENT with event ID: EVENT_TEST_END Measure time when + //all events have been pushed. + + //REACTOR CHANGE + // Shutdown ORB + //this->orb->shutdown(1); //argument is TRUE + //orb->orb_core()->reactor()->end_reactor_event_loop(); + ACE_DEBUG((LM_DEBUG,"DONE; stopping reactor event loop\n")); + ACE_Reactor::instance()->end_reactor_event_loop(); + ACE_DEBUG((LM_DEBUG,"ORB Thread exiting\n")); return 0; } diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_app b/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_app index 2e676afbb16..7263a2a4d89 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_app +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_app @@ -26,7 +26,7 @@ FILES = \ #---------------------------------------------------------------------------- # Include macros and targets #---------------------------------------------------------------------------- -LDLIBS = -lwrappers -lTest_Driver -lACEXML_Parser -lACEXML -lKokyu -ldsui -lTAO_RTSched -lTAO_CosNaming -lTAO_IORTable -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO_IORInterceptor -lTAO_ObjRefTemplate -lTAO_Valuetype -lTAO -lACE +LDLIBS = -lwrappers -lTest_Driver -lACEXML_Parser -lACEXML -lKokyu -ldsui -lTAO_RTSchedEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_IORTable -lTAO_RTEvent -lTAO_Svc_Utils -lTAO_Messaging -lTAO_PortableServer -lTAO_IORInterceptor -lTAO_ObjRefTemplate -lTAO_Valuetype -lTAO -lACE include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU ## We don't need the ACELIB setting from wrapper_macros.GNU ACELIB = @@ -52,6 +52,7 @@ $(OUTPUT_DIRECTORY): CURRENT_COMPONENTS := $(shell sh $(ACE_ROOT)/bin/ace_components --orbsvcs) +ifeq (RTSchedEvent, $(findstring RTSchedEvent, $(CURRENT_COMPONENTS))) ifeq (Sched, $(findstring Sched, $(CURRENT_COMPONENTS))) ifeq (Naming, $(findstring Naming, $(CURRENT_COMPONENTS))) ifeq (RTEvent, $(findstring RTEvent, $(CURRENT_COMPONENTS))) @@ -65,6 +66,9 @@ endif else all: comp_warning endif +else + all: comp_warning +endif OBJS = $(addsuffix .o, $(notdir $(FILES))) SRC = $(addsuffix .cpp, $(FILES)) @@ -80,7 +84,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU include $(ACE_ROOT)/include/makeinclude/rules.local.GNU include $(TAO_ROOT)/taoconfig.mk -CPPFLAGS += -I../../../../../ACEXML/common -I../../../../../Kokyu -I$(DATASTREAM_ROOT)/include -I../../../../orbsvcs -I../../../.. -I../../../../tao -I../../../../.. +CPPFLAGS += -I$(ACE_ROOT)/ACEXML/common -I$(ACE_ROOT)/Kokyu -I$(DATASTREAM_ROOT)/include -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT) -I$(TAO_ROOT)/tao -I$(ACE_ROOT) ifeq ($(static_libs),1) ifneq ($(LIB),) CPPFLAGS += -DTAO_AS_STATIC_LIBS -DTAO_AS_STATIC_LIBS -DACE_AS_STATIC_LIBS @@ -88,14 +92,14 @@ ifeq ($(static_libs),1) endif -LDFLAGS += -L. -L$(DATASTREAM_ROOT)/lib -L../../../../../lib +LDFLAGS += -L. -L$(DATASTREAM_ROOT)/lib -L$(ACE_ROOT)/lib #---------------------------------------------------------------------------- # Local targets #---------------------------------------------------------------------------- comp_warning: @echo This project will not be built due to one of the following missing components: - @echo Sched Naming RTEvent + @echo RTSchedEvent Sched Naming RTEvent ifndef kylix diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_lib b/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_lib index cb4427dea6f..7fd1eaab536 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_lib +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Makefile.test_driver_lib @@ -22,13 +22,14 @@ ifndef CIAO_ROOT endif FILES = \ - Config_Factory \ Consumer \ Supplier \ TimeoutConsumer \ TestConfig \ Test_Handler \ - AddrServer + Config_Factory \ + Timer_Event_Handler \ + cpuload #---------------------------------------------------------------------------- # Include macros and targets @@ -51,7 +52,7 @@ $(TEMPINCDIR): @-test -d $(TEMPINCDIR) || mkdir -p $(TEMPINCDIR) $(ACE_NUL_STDERR) endif -OUTPUT_DIRECTORY = ../../../../../lib +OUTPUT_DIRECTORY = $(ACE_ROOT)/lib all: $(OUTPUT_DIRECTORY) $(OUTPUT_DIRECTORY): @@ -106,7 +107,7 @@ include $(ACE_ROOT)/include/makeinclude/macros.GNU include $(ACE_ROOT)/include/makeinclude/rules.common.GNU include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU -INSTALL_CHECK = ../../../../../lib +INSTALL_CHECK = $(ACE_ROOT)/lib ifeq ($(INSTALL_CHECK),.) INSLIB = $(PWD) else @@ -117,7 +118,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.lib.GNU include $(ACE_ROOT)/include/makeinclude/rules.local.GNU include $(TAO_ROOT)/taoconfig.mk -CPPFLAGS += -I../../../../../ACEXML/common -I../../../../../Kokyu -I$(DATASTREAM_ROOT)/include -I../../../../orbsvcs -I../../../.. -I../../../../tao -I../../../../.. +CPPFLAGS += -I$(ACE_ROOT)/ACEXML/common -I$(ACE_ROOT)/Kokyu -I$(DATASTREAM_ROOT)/include -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT) -I$(TAO_ROOT)/tao -I$(ACE_ROOT) ifeq ($(static_libs),1) ifneq ($(LIB),) CPPFLAGS += -DTAO_AS_STATIC_LIBS -DTAO_AS_STATIC_LIBS -DACE_AS_STATIC_LIBS @@ -125,7 +126,7 @@ ifeq ($(static_libs),1) endif -LDFLAGS += -L$(DATASTREAM_ROOT)/lib -L../../../../../lib +LDFLAGS += -L$(DATASTREAM_ROOT)/lib -L$(ACE_ROOT)/lib #---------------------------------------------------------------------------- # Local targets diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp index 006adaa3ae2..9698626f935 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Supplier.cpp @@ -23,6 +23,9 @@ Supplier::update(ACE_ENV_SINGLE_ARG_DECL) if (this->_num_sent < this->_to_send) { + //@BT INSTRUMENT with event ID: EVENT_PUSH Measure time + //when event is pushed by client. + //send this->_events this->_consumer_proxy->push(this->_events ACE_ENV_ARG_PARAMETER); @@ -145,6 +148,8 @@ Supplier::connect (ACE_RW_Mutex* done, this->timeoutconsumer.connect(scheduler,supp_entry_pt.str().c_str(),period, importance,criticality,ec ACE_ENV_ARG_PARAMETER); + //REACTOR CHANGE + /* //Add Scheduler dependency between TimeoutConsumer and Supplier scheduler->add_dependency (this->timeoutconsumer.get_RT_Info(), rt_info, @@ -152,7 +157,8 @@ Supplier::connect (ACE_RW_Mutex* done, RtecBase::TWO_WAY_CALL ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - + */ + //REACTOR CHANGE END } void diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp index eddffc8ab60..87a19b30fea 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp +++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.cpp @@ -5,29 +5,36 @@ #include //for ostringstream #include "orbsvcs/Event_Utilities.h" //for ACE_ConsumerQOS_Factory -//#include "orbsvcs/Event_Service_Constants.h" //#include "orbsvcs/RtecSchedulerC.h" //#include "orbsvcs/RtecEventCommC.h" +//REACTOR CHANGE +#include "Timer_Event_Handler.h" +#include "orbsvcs/Event_Service_Constants.h" //for ORBSVCS_Time +#include "tao/ORB_Core.h" +#include "ace/Reactor.h" ACE_RCSID(EC_Examples, Consumer, "$Id$") TimeoutConsumer::TimeoutConsumer (Timeout_Observer* obs) : _observer(obs) , _consumer(this) + //REACTOR CHANGE + , _handler(0) + //REACTOR CHANGE END { } TimeoutConsumer::~TimeoutConsumer (void) { - // TODO this->disconnect() ??? + this->disconnect(); } - +/* RtecScheduler::handle_t TimeoutConsumer::get_RT_Info(void) { return this->_rt_info; } - +*/ void TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler, const char *entry_prefix, @@ -37,6 +44,7 @@ TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler, RtecEventChannelAdmin::EventChannel_ptr ec ACE_ENV_ARG_DECL) { + /* this->_scheduler = scheduler; //create consumer RT_Info @@ -58,7 +66,7 @@ TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler, // Register as consumer of timeout events ACE_ConsumerQOS_Factory timeoutQOS; - timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT /*??*/, + timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT, period, //TimeBase::TimeT this->_rt_info); @@ -83,6 +91,29 @@ TimeoutConsumer::connect (RtecScheduler::Scheduler_ptr scheduler, std::ostringstream prd; prd << period; ACE_DEBUG((LM_DEBUG,"\tTimeout period: %s\n",prd.str().c_str())); + */ + + //REACTOR CHANGE + ACE_UNUSED_ARG(entry_prefix); + ACE_UNUSED_ARG(importance); + ACE_UNUSED_ARG(criticality); + ACE_UNUSED_ARG(scheduler); + ACE_UNUSED_ARG(ec); + + this->entry_pt << entry_prefix << " TimeoutConsumer"; //unique RT_Info entry point + ACE_DEBUG((LM_DEBUG,"Creating %s\n",this->entry_pt.str().c_str())); + + this->_handler = new Timer_Event_Handler(this); + ACE_Time_Value interval; + ORBSVCS_Time::TimeT_to_Time_Value(interval,period); + //TAO_ORB_Core *core = ec->orb_core(); + //ACE_Reactor *reactor = core->reactor(); + ACE_Reactor *reactor = ACE_Reactor::instance(); + this->_timer_id = reactor->schedule_timer(this->_handler, + 0, + interval, + interval); + //REACTOR CHANGE END } void @@ -91,11 +122,12 @@ TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) //disconnect consumer if (! CORBA::is_nil (this->_supplier_proxy.in())) { + //REACTOR CHANGE + /* this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil(); - //Deactivate the servant PortableServer::POA_var poa = this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER); @@ -105,7 +137,15 @@ TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) ACE_CHECK; poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER); ACE_CHECK; + */ + //REACTOR CHANGE END } + + //REACTOR CHANGE + ACE_Reactor::instance()->cancel_timer(this->_timer_id); + delete this->_handler; + this->_handler = 0; + //REACTOR CHANGE END } void @@ -113,26 +153,28 @@ TimeoutConsumer::push (const RtecEventComm::EventSet& events ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)) { + /* RtecScheduler::RT_Info* info = this->_scheduler->get(this->_rt_info ACE_ENV_ARG_PARAMETER); ACE_CHECK; + */ if (events.length () == 0) { - ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer %s (%P|%t) push but no events\n",info->entry_point.in())); + ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer %s (%P|%t) push but no events\n",this->entry_pt.str().c_str())); return; } - ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received %d events:\n",info->entry_point.in(), + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received %d events:\n",this->entry_pt.str().c_str(), events.length())); for (size_t i=0; ientry_point.in())); + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) received timeout event\n",this->entry_pt.str().c_str())); if (this->_observer != 0) { - ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",info->entry_point.in())); + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %s (%P|%t) updating observer\n",this->entry_pt.str().c_str())); this->_observer->update(); } } @@ -144,7 +186,7 @@ TimeoutConsumer::push (const RtecEventComm::EventSet& events ACE_Thread::getprio(handle,prio); //ACE_thread_t tid = ACE_Thread::self(); ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer %s @%d (%P|%t) we received event type %d\n", - info->entry_point.in(),prio,events[0].header.type)); + this->entry_pt.str().c_str(),prio,events[0].header.type)); } } } diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h index 0e82cec212b..954bcf764d0 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h +++ b/TAO/orbsvcs/examples/RtEC/test_driver/TimeoutConsumer.h @@ -17,12 +17,18 @@ #ifndef TIMEOUTCONSUMER_H #define TIMEOUTCONSUMER_H +#include //for ostringstream + #include "orbsvcs/RtecEventChannelAdminC.h" #include "orbsvcs/RtecEventCommC.h" #include "orbsvcs/RtecSchedulerC.h" #include "orbsvcs/Channel_Clients_T.h" #include "TestConfig.h" +//REACTOR CHANGE +class Timer_Event_Handler; //forward decl +//REACTOR CHANGE END + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ @@ -65,7 +71,7 @@ public: virtual ~TimeoutConsumer (void); - RtecScheduler::handle_t get_RT_Info(void); + //RtecScheduler::handle_t get_RT_Info(void); void connect (RtecScheduler::Scheduler_ptr scheduler, const char *entry_prefix, @@ -93,7 +99,8 @@ public: private: Timeout_Observer* _observer; - RtecScheduler::handle_t _rt_info; + //RtecScheduler::handle_t _rt_info; + std::ostringstream entry_pt; RtecScheduler::Scheduler_ptr _scheduler; @@ -106,6 +113,12 @@ private: RtecEventComm::EventSet _events; // set of events to push when a timeout event is received. + + //REACTOR CHANGE + Timer_Event_Handler *_handler; + + long _timer_id; + //REACTOR CHANGE END }; #endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.cpp new file mode 100644 index 00000000000..861fb06573f --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.cpp @@ -0,0 +1,31 @@ +// $Id$ + +#include "Timer_Event_Handler.h" +#include "TimeoutConsumer.h" +#include "orbsvcs/Event_Service_Constants.h" + +Timer_Event_Handler::Timer_Event_Handler (TimeoutConsumer* cons) + : _consumer(cons) +{ +} + +int +Timer_Event_Handler::handle_timeout (const ACE_Time_Value &tv, + const void* arg) +{ + ACE_UNUSED_ARG(tv); + ACE_UNUSED_ARG(arg); + + RtecEventComm::EventSet event(1); + event.length(1); + + event[0].header.type = ACE_ES_EVENT_INTERVAL_TIMEOUT; + event[0].header.source = ACE_ES_EVENT_UNDEFINED; //for now, fake the supplier ID + event[0].header.ttl = 1; + + //need to create ACE_ENV, since one isn't passed into this function + ACE_DECLARE_NEW_CORBA_ENV; + this->_consumer->push(event ACE_ENV_ARG_PARAMETER); + + return 0; +} diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.h b/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.h new file mode 100644 index 00000000000..b92da0c2727 --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/test_driver/Timer_Event_Handler.h @@ -0,0 +1,24 @@ +// $Id$ + +#ifndef TIMER_EVENT_HANDLER_H_ +#define TIMER_EVENT_HANDLER_H_ + +#include +#include +#include "TimeoutConsumer.h" + +class Timer_Event_Handler : public ACE_Event_Handler { + + public: + + Timer_Event_Handler (TimeoutConsumer* cons); + + virtual int handle_timeout (const ACE_Time_Value &tv, + const void* arg); + + private: + + TimeoutConsumer *_consumer; +}; + +#endif /* TIMER_EVENT_HANDLER_H_ */ diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/cpuload.cpp b/TAO/orbsvcs/examples/RtEC/test_driver/cpuload.cpp new file mode 100644 index 00000000000..79912ef36bd --- /dev/null +++ b/TAO/orbsvcs/examples/RtEC/test_driver/cpuload.cpp @@ -0,0 +1,128 @@ +// $Id$ + +#include "cpuload.h" + +#include +#include +#include + +namespace +{ + int calibration_time = 1000; //msec + unsigned long loop_count=0; + int calibration_done = 0; + volatile int done = 0; + unsigned long per_iter_loop_count; + unsigned long calibrated_loop_count; +} + +static void +handle_signal (int signum) +{ + switch (signum) + { + case SIGVTALRM: + //printf ("VTALRM recvd\n"); + per_iter_loop_count = loop_count; + done = 1; + break; + + default: + printf ("signal %d unexpected\n", signum); + } +} + +int is_prime (const unsigned long n, + const unsigned long min_factor, + const unsigned long max_factor) +{ + if (n > 3) + for (unsigned long factor = min_factor; + factor <= max_factor; + ++factor) + if (n / factor * factor == n) + return factor; + + return 0; +} + +void do_computation (unsigned long& loop_count) +{ + static unsigned long prime_number = 961989999; + ++loop_count; + + is_prime (prime_number, 2, prime_number/2); +} + +unsigned long do_calibrate () +{ + per_iter_loop_count = 0; + loop_count = 0; + + signal (SIGVTALRM, handle_signal); + itimerval timerval; + + timerval.it_value.tv_sec = 0; + timerval.it_value.tv_usec = calibration_time*1000; + + if (setitimer(ITIMER_VIRTUAL, &timerval, 0) != 0) + { + printf("setitimer failed\n"); + } + + for (;!done;) + { + do_computation (loop_count); + } + //printf("iter done\n"); + done = 0; + return per_iter_loop_count; +} + +void CPULoad::run (struct timeval& tv) +{ + unsigned long loops=0; + unsigned long iters = (unsigned long) + (((float)calibrated_loop_count/(float)(calibration_time*1000)) * + (tv.tv_usec + tv.tv_sec*1000000)); + iters = iters + 1; + + // printf("tv = %lu sec %lu usec, iters = %lu\n", + // tv.tv_sec, tv.tv_usec, iters); + + for (;iters>0;--iters) + { + //loops is really not used here. It is used only during calibration + do_computation (loops); + } +} + +void CPULoad::calibrate (int num_iters) +{ + if (calibration_done) + return; + + printf("calibrating ...\n"); + for (int i=0; i + +class CPULoad +{ + public: + static void run (timeval& tv); + static void CPULoad::calibrate (int num_iters = 10); +}; + +#endif diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/example.xml b/TAO/orbsvcs/examples/RtEC/test_driver/example.xml index c08ee57e902..62ed8bad87c 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/example.xml +++ b/TAO/orbsvcs/examples/RtEC/test_driver/example.xml @@ -6,14 +6,14 @@ 1000 - 10 + 1 1 3000 - 10 + 1 diff --git a/TAO/orbsvcs/examples/RtEC/test_driver/test_driver.mpc b/TAO/orbsvcs/examples/RtEC/test_driver/test_driver.mpc index 33fee1717af..6ffac882fea 100644 --- a/TAO/orbsvcs/examples/RtEC/test_driver/test_driver.mpc +++ b/TAO/orbsvcs/examples/RtEC/test_driver/test_driver.mpc @@ -8,7 +8,8 @@ project(test_driver_lib): orbsvcslib, rtevent, rtsched, rtkokyuevent, kokyu, ace TestConfig.cpp Test_Handler.cpp Config_Factory.cpp - AddrServer.cpp + Timer_Event_Handler.cpp + cpuload.cpp } Template_Files { -- cgit v1.2.1