From 7e83a1bb944f50b67f118923afd055d5cb5b1fa6 Mon Sep 17 00:00:00 2001 From: thrall Date: Wed, 8 Oct 2003 22:08:53 +0000 Subject: Lots of changes. Refactored RT_Info creation into TimeoutConsumer, Consumer. Spawns thread for orb->run(), shuts down correctly when all events sent from TimeoutConsumers. --- TAO/orbsvcs/tests/EC_Config/Config_Factory.h | 6 +- TAO/orbsvcs/tests/EC_Config/Consumer.cpp | 151 ++++++- TAO/orbsvcs/tests/EC_Config/Consumer.h | 75 +++- TAO/orbsvcs/tests/EC_Config/ECConfig.cpp | 506 ++++++++++++------------ TAO/orbsvcs/tests/EC_Config/ECConfig.h | 52 +-- TAO/orbsvcs/tests/EC_Config/Makefile | 8 +- TAO/orbsvcs/tests/EC_Config/Test.cpp | 64 ++- TAO/orbsvcs/tests/EC_Config/TestConfig.h | 2 +- TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp | 10 +- TAO/orbsvcs/tests/EC_Config/Test_Handler.h | 7 +- TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp | 281 +++++++++++++ TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h | 110 ++++++ TAO/orbsvcs/tests/EC_Config/test.xml | 6 +- TAO/orbsvcs/tests/EC_Config/testconfig.dtd | 1 + 14 files changed, 952 insertions(+), 327 deletions(-) create mode 100644 TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp create mode 100644 TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h diff --git a/TAO/orbsvcs/tests/EC_Config/Config_Factory.h b/TAO/orbsvcs/tests/EC_Config/Config_Factory.h index 95af81c0620..0c38b350f16 100644 --- a/TAO/orbsvcs/tests/EC_Config/Config_Factory.h +++ b/TAO/orbsvcs/tests/EC_Config/Config_Factory.h @@ -37,6 +37,10 @@ namespace ConfigFactory { class Config_Factory : ACE_Service_Object { public: + ///Constructor + Config_Factory (void) {} + + ///Destructor virtual ~Config_Factory (void); /// Create and destroy the TestConfig module. @@ -73,7 +77,7 @@ public: /// Constructor Default_Config_Factory (void); - /// destructor... + /// Destructor... virtual ~Default_Config_Factory (void); /// Helper function to register the default factory into the service diff --git a/TAO/orbsvcs/tests/EC_Config/Consumer.cpp b/TAO/orbsvcs/tests/EC_Config/Consumer.cpp index 78d35d206de..8acacceeea6 100644 --- a/TAO/orbsvcs/tests/EC_Config/Consumer.cpp +++ b/TAO/orbsvcs/tests/EC_Config/Consumer.cpp @@ -1,18 +1,159 @@ // $Id$ -//#include "ace/Thread.h" #include "Consumer.h" +#include //for ostringstream + +#include "ace/Thread.h" +#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory +#include "orbsvcs/RtecSchedulerC.h" + ACE_RCSID(EC_Examples, Consumer, "$Id$") Consumer::Consumer (void) - : _ordinal(-1) + : _consumer(this) + , _consumer_id(-1) +{ +} + +Consumer::~Consumer(void) +{ +} + +void +Consumer::connect (RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + this->connect_impl(false, + scheduler, + entry_prefix, + consumer_id, + event_type, + 0, //period; ignored + RtecScheduler::VERY_LOW_IMPORTANCE, //ignored + RtecScheduler::VERY_LOW_CRITICALITY, //ignored + ec + ACE_ENV_ARG_PARAMETER); +} + +void +Consumer::connect (RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + this->connect_impl(true, + scheduler, + entry_prefix, + consumer_id, + event_type, + period, + importance, + criticality, + ec + ACE_ENV_ARG_PARAMETER); +} + +void +Consumer::connect_impl (bool set_rtinfo, //true if should set RT_Info + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) { + this->_consumer_id = consumer_id; + + //create consumer RT_Info + std::ostringstream cons_entry_pt; + cons_entry_pt << entry_prefix; //unique RT_Info entry point + ACE_DEBUG((LM_DEBUG,"Creating %s\n",cons_entry_pt.str().c_str())); + RtecScheduler::handle_t rt_info = scheduler->create (cons_entry_pt.str().c_str() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + if (set_rtinfo) + { + scheduler->set (rt_info, + criticality, + period, period, period, + period, + importance, + period, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + ACE_DEBUG((LM_DEBUG,"Set Consumer %d RT_Info\n",this->_consumer_id)); + } else + { + ACE_DEBUG((LM_DEBUG,"NOT Set Consumer %d RT_Info\n",this->_consumer_id)); + } + + // Register as consumer of appropriate event type + ACE_ConsumerQOS_Factory consQoS; + consQoS.insert_type(event_type, + rt_info); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushConsumer_var consumerv = + this->_consumer._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy->connect_push_consumer (consumerv.in (), + consQoS.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG,"Consumer %d connected\n",this->_consumer_id)); + ACE_DEBUG((LM_DEBUG,"\tEvent type: %d\n",event_type)); } -Consumer::Consumer(int ord) - : _ordinal(ord) +void +Consumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) { + //disconnect consumer + + if (! CORBA::is_nil (this->_supplier_proxy.in())) + { + this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil(); + + //Deactivate the servant + PortableServer::POA_var poa = + this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->_consumer ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + ACE_DEBUG((LM_DEBUG,"Consumer %d disconnected\n",this->_consumer_id)); + } else + { + ACE_DEBUG((LM_DEBUG,"Cannot disconnect; Consumer %d not connected!\n",this->_consumer_id)); + } } void @@ -33,7 +174,7 @@ Consumer::push (const RtecEventComm::EventSet& events ACE_Thread::getprio(handle,prio); //ACE_thread_t tid = ACE_Thread::self(); ACE_DEBUG ((LM_DEBUG, "Consumer #%d @%d (%P|%t) we received event type %d\n", - _ordinal,prio,events[0].header.type)); + this->_consumer_id,prio,events[0].header.type)); } void diff --git a/TAO/orbsvcs/tests/EC_Config/Consumer.h b/TAO/orbsvcs/tests/EC_Config/Consumer.h index d92e461fd20..800bc309f3b 100644 --- a/TAO/orbsvcs/tests/EC_Config/Consumer.h +++ b/TAO/orbsvcs/tests/EC_Config/Consumer.h @@ -10,51 +10,100 @@ // Consumer // // = AUTHOR -// Carlos O'Ryan (coryan@cs.wustl.edu) +// Bryan A. Thrall (thrall@cse.wustl.edu) // // ============================================================================ #ifndef CONSUMER_H #define CONSUMER_H -#include "orbsvcs/RtecEventCommS.h" +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommC.h" +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/Channel_Clients_T.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -class Consumer : public POA_RtecEventComm::PushConsumer +class Consumer { // = TITLE // Simple consumer object // // = DESCRIPTION - // This class is a consumer of events. - // It simply register for two event typesone event type - // The class is just a helper to simplify common tasks in EC - // tests, such as subscribing for a range of events, disconnecting - // from the EC, informing the driver of shutdown messages, etc. + // This class is a consumer of the events pushed by a TimeoutConsumer + // every timeout. // - // There are several ways to connect and disconnect this class, - // and it is up to the driver program to use the right one. + // It simply registers for the event type specified in its connect() + // function. // public: Consumer (void); - // Constructor + // Default Constructor. - Consumer(int ord); + virtual ~Consumer (void); + + void connect (RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); + // This method connects the consumer to the EC without setting anything + // in the RT_Info (such as period, criticality, etc.). The consumer + // subscribes to events with the specified event_type. + + void connect (RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); + // This method connects the consumer to the EC, setting RT_Info values + // for period, criticality, and importance. The consumer subscribes + // to events with the specified event_type. + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the EC. // = The RtecEventComm::PushConsumer methods virtual void push (const RtecEventComm::EventSet& events ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)); + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)); // The skeleton methods. +protected: + void connect_impl (bool set_rtinfo, //true if should set RT_Info + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + int consumer_id, //unique identifier + long event_type, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); + // This method implements the Consumer::connect() methods; if the first + // parameter is false, then the RT_Info values are ignored. Otherwise, + // they are set. + private: - int _ordinal; + RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy; + // We talk to the EC (as a consumer) using this proxy. + + ACE_PushConsumer_Adapter _consumer; + // We connect to the EC as a consumer so we can receive the + // timeout events. + + int _consumer_id; }; #endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp index 470a931da06..370b0e28cba 100644 --- a/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp +++ b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp @@ -4,59 +4,100 @@ #define ECCONFIG_C #include "ECConfig.h" +#include "Consumer.h" +#include "TimeoutConsumer.h" #include //for ostringstream #include "ace/Array.h" #include "ace/Bound_Ptr.h" +#include "ace/Thread_Manager.h" #include "orbsvcs/Scheduler_Factory.h" #include "orbsvcs/Event_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" #include "orbsvcs/Event/EC_Event_Channel.h" #include "orbsvcs/Event/EC_Kokyu_Factory.h" +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/RtecEventCommC.h" namespace TestConfig { template ECConfig::ECConfig (void) - : Test_Config (), - ec_impl(0), - sched_impl(0), - configured (0) //false + : Test_Config () + , ec_impl(0) + , sched_impl(0) + , periods(0) + , importances(0) + , crits(0) + , test_done(new ACE_RW_Mutex()) + , configured (0) //false { } template ECConfig::~ECConfig (void) { - this->reset(); + this->reset(ACE_ENV_SINGLE_ARG_PARAMETER); + + delete this->test_done; } template void -ECConfig::reset (void) +ECConfig::reset (ACE_ENV_SINGLE_ARG_DECL) { // We should do a lot of cleanup (disconnect from the EC, // deactivate all the objects with the POA, etc.). - delete this->ec_impl; + this->disconnect_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); //should release all read locks on this->test_done - delete this->sched_impl; + this->disconnect_consumers(ACE_ENV_SINGLE_ARG_PARAMETER); + + { + // Deactivate the EC + PortableServer::POA_var poa = + this->ec_impl->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (this->ec_impl ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; - for(size_t i=0; iconsumers[i]; + ACE_DEBUG ((LM_DEBUG, "EC deactivated\n")); } - for(size_t i=0; isuppliers[i]; + { + // Deactivate the Scheduler + PortableServer::POA_var poa = + this->sched_impl->_default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (this->sched_impl ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG ((LM_DEBUG, "scheduler deactivated\n")); } + delete this->ec_impl; + this->ec_impl = 0; + + delete this->sched_impl; + this->sched_impl = 0; + + //TODO clear config_infos? + + //TODO clear RT_Infos from scheduler? + configured = 0; //false } template int ECConfig::configure (TCFG_SET_WPTR testconfigs) { - if (configured) { + if (this->configured) { ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resetting EC\n"))); this->reset(); //delete memory used by previous configuration } @@ -70,119 +111,67 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) ////////////////// EC ready; do config //////////////////// size_t tsize = testconfigs->size(); - supplier_cfgs.size(tsize); - consumer_cfgs.size(tsize); - testcfgs.size(tsize); - consumers.size(tsize); - suppliers.size(tsize); + this->testcfgs.size(tsize); + this->periods.size(tsize); + this->importances.size(tsize); + this->crits.size(tsize); for (size_t i=0; itestcfgs[i] = curcfg; - RtecScheduler::Criticality_t criticality; switch (curcfg->criticality) { case VERY_LOW_CRITICALITY : - criticality = RtecScheduler::VERY_LOW_CRITICALITY; + this->crits[i] = RtecScheduler::VERY_LOW_CRITICALITY; break; case LOW_CRITICALITY : - criticality = RtecScheduler::LOW_CRITICALITY; + this->crits[i] = RtecScheduler::LOW_CRITICALITY; break; case MEDIUM_CRITICALITY : - criticality = RtecScheduler::MEDIUM_CRITICALITY; + this->crits[i] = RtecScheduler::MEDIUM_CRITICALITY; break; case HIGH_CRITICALITY : - criticality = RtecScheduler::HIGH_CRITICALITY; + this->crits[i] = RtecScheduler::HIGH_CRITICALITY; break; case VERY_HIGH_CRITICALITY : - criticality = RtecScheduler::VERY_HIGH_CRITICALITY; + this->crits[i] = RtecScheduler::VERY_HIGH_CRITICALITY; break; } - RtecScheduler::Importance_t importance; switch (curcfg->importance) { case VERY_LOW_IMPORTANCE : - importance = RtecScheduler::VERY_LOW_IMPORTANCE; + this->importances[i] = RtecScheduler::VERY_LOW_IMPORTANCE; break; case LOW_IMPORTANCE : - importance = RtecScheduler::LOW_IMPORTANCE; + this->importances[i] = RtecScheduler::LOW_IMPORTANCE; break; case MEDIUM_IMPORTANCE : - importance = RtecScheduler::MEDIUM_IMPORTANCE; + this->importances[i] = RtecScheduler::MEDIUM_IMPORTANCE; break; case HIGH_IMPORTANCE : - importance = RtecScheduler::HIGH_IMPORTANCE; + this->importances[i] = RtecScheduler::HIGH_IMPORTANCE; break; case VERY_HIGH_IMPORTANCE : - importance = RtecScheduler::VERY_HIGH_IMPORTANCE; + this->importances[i] = RtecScheduler::VERY_HIGH_IMPORTANCE; break; } - //create supplier RT_Info - std::ostringstream supp_entry_pt; - supp_entry_pt << "Supplier Event Class " << i; //unique RT_Info entry point - RtecScheduler::handle_t rt_info = - this->scheduler->create (supp_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); - ACE_Time_Value tv (0, curcfg->period); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - this->scheduler->set (rt_info, - criticality, - time, time, time, - time, - importance, - time, - 0, - RtecScheduler::OPERATION - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - if (this->supplier_cfgs.set(rt_info,i) != 0) { - ACE_DEBUG((LM_DEBUG, "Could not set supplier RT_Info into config: %d of %d\n", - i,consumer_cfgs.max_size())); - return 1; - } - - //create consumer RT_Info - std::ostringstream cons_entry_pt; - cons_entry_pt << "Consumer Event Class " << i; //unique RT_Info entry point - rt_info = - this->scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); - tv.set (0, curcfg->period); - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - this->scheduler->set (rt_info, - criticality, - time, time, time, - time, - importance, - time, - 0, - RtecScheduler::OPERATION - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - if (this->consumer_cfgs.set(rt_info,i) != 0) { - ACE_DEBUG((LM_DEBUG, "Could not set consumer RT_Info into config: %d of %d\n", - i,consumer_cfgs.max_size())); - return 1; - } + ACE_Time_Value tv; + tv.msec(curcfg->period); + ORBSVCS_Time::Time_Value_to_TimeT (this->periods[i], tv); } - this->connectConsumers(); - this->connectSuppliers(); - - ACE_DEBUG ((LM_DEBUG, "Consumer RT_Infos:\n")); - print_RT_Infos (this->consumer_cfgs); - ACE_DEBUG ((LM_DEBUG, "\nSupplier RT_Infos:\n")); - print_RT_Infos (this->supplier_cfgs); + this->connect_consumers(ACE_ENV_SINGLE_ARG_PARAMETER); + this->connect_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); ////////////////// Configured; compute schedule /////////// ACE_DEBUG ((LM_DEBUG, "Computing schedule\n")); RtecScheduler::RT_Info_Set_var infos; RtecScheduler::Config_Info_Set_var configs; RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + RtecScheduler::Dependency_Set_var deps; // Obtain the range of valid priorities in the current // platform, the scheduler hard-code this values in the @@ -198,6 +187,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) this->scheduler->compute_scheduling (min_os_priority, max_os_priority, infos.out (), + deps.out (), configs.out (), anomalies.out () ACE_ENV_ARG_PARAMETER); @@ -205,6 +195,7 @@ ECConfig::configure (TCFG_SET_WPTR testconfigs) // Dump the schedule to a file.. ACE_Scheduler_Factory::dump_schedule (infos.in (), + deps.in(), configs.in (), anomalies.in (), "ecconfig.out"); @@ -237,51 +228,45 @@ ECConfig::run (void) ACE_TRY { - RtecEventComm::EventSet event (1); - event.length (1); + ACE_Thread_Manager *inst = ACE_Thread_Manager::instance(); - ACE_Array evt_counts(this->testcfgs.size()); - for(size_t i=0; itestcfgs.size(); ++i) + // Spawn orb thread (which calls orb.run(), then terminates on return) + ACE_DEBUG((LM_DEBUG,"SPAWNING ORB thread\n")); + int ret = inst->spawn(ECConfig::run_orb,&(this->orb)); + //no need for getting tid? + if (ret == -1) { - //copy over total number of events per test_config_t to send - evt_counts[i] = this->testcfgs[i]->num_entities; + ACE_DEBUG ((LM_DEBUG, "ERROR: Couldn't spawn ORB->run() thread: %s\n", + ACE_OS::strerror(errno))); + return 1; } - size_t num_done = 0; //total number of testcfgs which have no more events to push - while (num_donetestcfgs.size()) + // Block waiting for consumers to finish + //when can acquire write lock, all TimeoutConsumers are finished + ret = this->test_done->acquire_write(); + if (ret == -1) { - //for each consumer, push an event - for(size_t i=0; itestcfgs.size() && iconsumer_proxys.size(); ++i) - { - if (evt_counts[i]<=0) - { - if (evt_counts[i]==0) - { - //just finished - ++num_done; - evt_counts[i]--; //indicate accounted for in num_done - } //else already incr num_done for this one - continue; //no more events of this to push - } //else... - test_config_t *tcfg = this->testcfgs[i]; - ProxyList::TYPE curproxy = this->consumer_proxys[i]; - - event[0].header.type = ACE_ES_EVENT_UNDEFINED+tcfg->type; - event[0].header.source = supplier_ids[i]; - event[0].header.ttl = 1; - - curproxy->push (event ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - //event pushed, so decr corresponding evt_count - evt_counts[i]--; - - // BT TODO sleep until next period expires - ACE_Time_Value rate (0, 10000); - ACE_OS::sleep (rate); + ACE_DEBUG((LM_DEBUG, "ERROR: could not acquire write lock for ECConfig: %s\n", + ACE_OS::strerror(errno))); + return 1; + } - } + //all TimeoutConsumers done, so stop EC and ORB + //Shutdown EC + this->reset(); + + // Shutdown ORB + this->orb->shutdown(1); //argument is TRUE + + if (inst->wait() == -1) //wait for ORB thread to terminate + { + ACE_ERROR((LM_ERROR, "ERROR: Thread_Manager wait failed: %s\n", + ACE_OS::strerror(errno))); + return 1; } + + ACE_DEBUG ((LM_DEBUG, "suppliers finished\n")); + } ACE_CATCHANY { @@ -293,170 +278,145 @@ ECConfig::run (void) return 0; //successful run } -template int -ECConfig::initEC() +template void +ECConfig::initEC(ACE_ENV_SINGLE_ARG_DECL) { TAO_EC_Kokyu_Factory::init_svcs (); - ACE_DEBUG ((LM_DEBUG, ACE_TEXT("Initializing event channel\n"))); - ACE_TRY - { - // ORB initialization boiler plate... - int argc = 0; - char** argv = 0; - this->orb = - CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_DEBUG ((LM_DEBUG,ACE_TEXT("Initializing event channel\n"))); - ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resolving initial references\n"))); + // ORB initialization boiler plate... + int argc = 0; + char** argv = 0; + this->orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_CHECK; - CORBA::Object_var object = - orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - this->poa = - PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - this->poa_manager = - poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - // DO these need to remain in scope beyond this function? + ACE_DEBUG((LM_DEBUG,ACE_TEXT("Resolving initial references\n"))); - ACE_DEBUG((LM_DEBUG,ACE_TEXT("Creating sched service\n"))); + CORBA::Object_var object = + orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->poa = + PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + this->poa_manager = + poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + // DO these need to remain in scope beyond this function? - // Create a scheduling service - ACE_NEW_RETURN (this->sched_impl,SCHED_STRAT,1); + ACE_DEBUG((LM_DEBUG,ACE_TEXT("Creating sched service\n"))); - this->scheduler = sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + // Create a scheduling service + ACE_NEW (this->sched_impl,SCHED_STRAT); - // Create an event channel implementation... - TAO_EC_Event_Channel_Attributes attributes (poa.in (), - poa.in ()); - attributes.scheduler = scheduler.in (); // no need to dup + this->scheduler = sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; - ACE_NEW_RETURN (this->ec_impl,TAO_EC_Event_Channel (attributes),1); + // Create an event channel implementation... + TAO_EC_Event_Channel_Attributes attributes (poa.in (), + poa.in ()); + attributes.scheduler = scheduler.in (); // no need to dup - ACE_DEBUG((LM_DEBUG,ACE_TEXT("Created ec_impl\n"))); + ACE_NEW (this->ec_impl,TAO_EC_Event_Channel (attributes)); - this->event_channel = - this->ec_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - } - ACE_CATCHANY - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig"); - return 1; - } - ACE_ENDTRY; + ACE_DEBUG((LM_DEBUG,ACE_TEXT("Created ec_impl\n"))); - return 0; + this->event_channel = + this->ec_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; } -template int -ECConfig::connectConsumers() +template void +ECConfig::connect_suppliers (ACE_ENV_SINGLE_ARG_DECL) { - ACE_TRY - { - // The canonical protocol to connect to the EC - RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - this->event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + RtecEventComm::EventSet event(1); + event.length(1); - ACE_DEBUG ((LM_DEBUG, "connecting consumers\n")); - for (size_t i=0; iconsumer_cfgs.size() && itestcfgs.size(); ++i) - { - ACE_NEW_RETURN(this->consumers[i],Consumer(i),1); - - RtecScheduler::handle_t hndl = this->consumer_cfgs[i]; - test_config_t *tcfg = this->testcfgs[i]; - - ACE_ConsumerQOS_Factory consumer_qos; - //consumer_qos.start_disjunction_group (); - // The types in the range [0,ACE_ES_EVENT_UNDEFINED) are - // reserved for the EC... - consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED+tcfg->type, - hndl); - - RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy = - consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - RtecEventComm::PushConsumer_var consumerv = - consumers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - supplier_proxy->connect_push_consumer (consumerv.in (), - consumer_qos.get_ConsumerQOS () - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } - } - ACE_CATCHANY + ACE_DEBUG((LM_DEBUG,"TimeoutConsumers to connect: %d\n",this->testcfgs.size())); + this->suppliers.size(this->testcfgs.size()); + for (size_t i=0; itestcfgs.size(); ++i) { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig"); - return 1; + ACE_NEW (this->suppliers[i], TimeoutConsumer ()); + + event[0].header.type = ACE_ES_EVENT_UNDEFINED+this->testcfgs[i]->type; + event[0].header.source = i; //supplier_id + event[0].header.ttl = 1; + + std::ostringstream entry_prefix; + entry_prefix << "TimeoutConsumer " << i; + + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer.connect() for %s\n",entry_prefix.str().c_str())); + this->suppliers[i]->connect (this->test_done, + this->scheduler.in(), + entry_prefix.str().c_str(), + this->periods[i], //period + this->importances[i], //importance + this->crits[i], //criticality + i, //supplier_id + this->testcfgs[i]->num_entities, //to_send + event, //event set + this->event_channel.in() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; } - ACE_ENDTRY; - - return 0; //successful run } -template int -ECConfig::connectSuppliers() +template void +ECConfig::connect_consumers (ACE_ENV_SINGLE_ARG_DECL) { - ACE_TRY + ACE_DEBUG((LM_DEBUG,"Consumers to connect: %d\n",this->testcfgs.size())); + this->consumers.size(this->testcfgs.size()); + for (size_t i=0; itestcfgs.size(); ++i) { - // The canonical protocol to connect to the EC - RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - ACE_DEBUG ((LM_DEBUG, "connecting suppliers\n")); - supplier_ids.size(supplier_cfgs.size()); - consumer_proxys.size(supplier_cfgs.size()); - for (size_t i=0; itestcfgs.size(); ++i) - { - ACE_NEW_RETURN(this->suppliers[i],Supplier(),1); - - RtecScheduler::handle_t hndl = this->supplier_cfgs[i]; - test_config_t *tcfg = this->testcfgs[i]; - - RtecEventComm::EventSourceID supplier_id = i; - this->supplier_ids[i] = supplier_id; - - ACE_SupplierQOS_Factory supplier_qos; - supplier_qos.insert (supplier_id, - ACE_ES_EVENT_UNDEFINED+tcfg->type, - hndl, - 1); // number of calls, but what does that mean? - - consumer_proxys[i] = - supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; - - RtecEventComm::PushSupplier_var supplier = - suppliers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_NEW (this->consumers[i], Consumer ()); + + std::ostringstream entry_prefix; + entry_prefix << "Consumer " << i; + + ACE_DEBUG((LM_DEBUG,"Consumer.connect() for %s\n",entry_prefix.str().c_str())); + //don't set the RT_Info values + this->consumers[i]->connect (this->scheduler.in(), + entry_prefix.str().c_str(), + i, //consumer id + ACE_ES_EVENT_UNDEFINED+this->testcfgs[i]->type, //type + /* + this->periods[i], + this->importances[i], + this->crits[i], + */ + this->event_channel.in() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } +} - //PROBLEM: Occasional segfault here: - consumer_proxys[i]->connect_push_supplier (supplier.in (), - supplier_qos.get_SupplierQOS () - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } +template void +ECConfig::disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL) +{ + for (size_t i = 0; i < this->suppliers.size(); ++i) + { + this->suppliers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; - ACE_DEBUG ((LM_DEBUG, "suppliers connected\n")); + delete this->suppliers[i]; + this->suppliers[i] = 0; } - ACE_CATCHANY +} + +template void +ECConfig::disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL) +{ + for (size_t i = 0; i < this->consumers.size(); ++i) { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig"); - return 1; - } - ACE_ENDTRY; + this->consumers[i]->disconnect (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; - return 0; + delete this->consumers[i]; + this->consumers[i] = 0; + } } template void @@ -507,6 +467,32 @@ ECConfig::print_RT_Infos (ACE_Array cfg_se } +template ACE_THR_FUNC_RETURN +ECConfig::run_orb(void *data) +{ + //ACE_DECLARE_NEW_CORBA_ENV; + ACE_TRY + { + ACE_DEBUG((LM_DEBUG, "ORB thread: Casting %x\n",data)); + + CORBA::ORB_var orb = *(ACE_reinterpret_cast(CORBA::ORB_var*,data)); + + ACE_DEBUG((LM_DEBUG, "ORB thread: Running orb\n")); + + orb->run(); + //this method returns when orb->shutdown() is called; then thread exits + + ACE_DEBUG((LM_DEBUG, "ORB thread: Shutdown\n")); + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION(ACE_ANY_EXCEPTION, "ECConfig ORB thread"); + } + ACE_ENDTRY; + + return 0; +} + } /* namespace TestConfig */ #endif /* ECCONFIG_C */ diff --git a/TAO/orbsvcs/tests/EC_Config/ECConfig.h b/TAO/orbsvcs/tests/EC_Config/ECConfig.h index cef06505ecc..e0b3b1c2384 100644 --- a/TAO/orbsvcs/tests/EC_Config/ECConfig.h +++ b/TAO/orbsvcs/tests/EC_Config/ECConfig.h @@ -15,7 +15,8 @@ #define ECCONFIG_H #include "ace/Array.h" -#include "ace/Synch.h" +//#include "ace/Synch.h" +#include "ace/RW_Mutex.h" #include "orbsvcs/RtecSchedulerS.h" //for POA_RtecScheduler #include "orbsvcs/RtecSchedulerC.h" #include "orbsvcs/RtecEventChannelAdminC.h" @@ -23,7 +24,7 @@ #include "TestConfig.h" #include "Consumer.h" -#include "Supplier.h" +#include "TimeoutConsumer.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -34,9 +35,12 @@ namespace TestConfig { typedef ACE_Array ProxyList; typedef ACE_Array ConfigList; typedef ACE_Array SupplierIDList; +typedef ACE_Array PeriodList; +typedef ACE_Array ImportanceList; +typedef ACE_Array CritList; typedef ACE_Array ConsumerList; -typedef ACE_Array SupplierList; +typedef ACE_Array SupplierList; template class ECConfig : public Test_Config { @@ -60,19 +64,26 @@ public: //events. protected: - virtual int initEC (void); + // TODO USE DEFAULTS FOR ANY OF THESE? - virtual int connectConsumers (void); + virtual void initEC (ACE_ENV_SINGLE_ARG_DECL); - virtual int connectSuppliers (void); + void connect_suppliers (ACE_ENV_SINGLE_ARG_DECL); - virtual void reset (void); - // + void disconnect_suppliers (ACE_ENV_SINGLE_ARG_DECL); + + void connect_consumers (ACE_ENV_SINGLE_ARG_DECL); + + void disconnect_consumers (ACE_ENV_SINGLE_ARG_DECL); + + virtual void reset (ACE_ENV_SINGLE_ARG_DECL); private: void print_RT_Infos (ACE_Array cfg_set); + static ACE_THR_FUNC_RETURN run_orb(void *data); //thread fcn for running ORB + Test_Config_Set testcfgs; //copy of the currently configured Test_Config_Set //using the same test_config_t objects @@ -86,30 +97,25 @@ private: RtecEventChannelAdmin::EventChannel_var event_channel; RtecScheduler::Scheduler_var scheduler; - /* - ACE_Strong_Bound_Ptr ec_impl; - ACE_Strong_Bound_Ptr sched_impl; - */ TAO_EC_Event_Channel *ec_impl; - POA_RtecScheduler::Scheduler *sched_impl; + SCHED_STRAT *sched_impl; - ProxyList consumer_proxys; - //proxy objects for pushing events to consumers + ConsumerList consumers; - ConfigList supplier_cfgs; - //RT_Infos generated by configure() for suppliers. + SupplierList suppliers; - ConfigList consumer_cfgs; - //RT_Infos generated by configure() for consumers. + PeriodList periods; - SupplierIDList supplier_ids; - //IDs of the suppliers + ImportanceList importances; - ConsumerList consumers; + CritList crits; - SupplierList suppliers; + ACE_RW_Mutex* test_done; + //TimeoutConsumers acquire read locks; when the ECConfig can acquire + //a write lock, all TimeoutConsumers must've finished, so the test + //is finished. int configured; //boolean }; diff --git a/TAO/orbsvcs/tests/EC_Config/Makefile b/TAO/orbsvcs/tests/EC_Config/Makefile index 511b965f130..fb585fceabb 100644 --- a/TAO/orbsvcs/tests/EC_Config/Makefile +++ b/TAO/orbsvcs/tests/EC_Config/Makefile @@ -23,7 +23,7 @@ ifeq (Event,$(findstring Event,$(TAO_ORBSVCS))) endif # Sched endif # Event -PSRC= Test.cpp Supplier.cpp Consumer.cpp ECConfig.cpp Service.cpp Config_Factory.cpp Test_Handler.cpp TestConfig.cpp +PSRC= Test.cpp Supplier.cpp Consumer.cpp ECConfig.cpp Service.cpp Config_Factory.cpp Test_Handler.cpp TestConfig.cpp TimeoutConsumer.cpp LDLIBS = -lTAO_RTOLDEvent -lTAO_RTEvent -lTAO_RTSched -lTAO_CosNaming -lTAO_Svc_Utils -lTAO_IORTable -lTAO_Messaging -lTAO_PortableServer -lTAO -lTAO_RTKokyuEvent -lACEXML_Parser # The complete path to orbsvcs/orbsvcs/Sched is required for DU/CXX @@ -32,13 +32,17 @@ CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs \ -I$(TAO_ROOT)/orbsvcs/orbsvcs/Sched \ $(foreach svc, $(TAO_ORBSVCS), -DTAO_ORBSVCS_HAS_$(svc)) -Test_OBJS=$(addsuffix .o, Test Supplier Consumer ECConfig Config_Factory Test_Handler TestConfig) +Test_OBJS=$(addsuffix .o, Test Consumer ECConfig Config_Factory Test_Handler TestConfig TimeoutConsumer) Service_OBJS=$(addsuffix .o, Service Supplier Consumer ECConfig TestConfig) #---------------------------------------------------------------------------- # Include macros and targets #---------------------------------------------------------------------------- +# To allow debug, disable optimization: +debug=1 +optimize=0 + include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU include $(ACE_ROOT)/include/makeinclude/macros.GNU include $(TAO_ROOT)/rules.tao.GNU diff --git a/TAO/orbsvcs/tests/EC_Config/Test.cpp b/TAO/orbsvcs/tests/EC_Config/Test.cpp index 2941938a70b..705725f9e52 100644 --- a/TAO/orbsvcs/tests/EC_Config/Test.cpp +++ b/TAO/orbsvcs/tests/EC_Config/Test.cpp @@ -3,6 +3,8 @@ #include "ace/Array.h" #include "ace/Bound_Ptr.h" #include "ace/Synch.h" +#include "ace/Get_Opt.h" +#include "ace/String_Base.h" #include "ACEXML/parser/parser/Parser.h" #include "ACEXML/common/InputSource.h" #include "ACEXML/common/FileCharStream.h" @@ -14,7 +16,12 @@ using namespace TestConfig; -//NOTE: Read from a formatted file rather than hardcode the configuration. Check ACE_XML. +struct Arguments +{ + ACE_CString filename_; +}; + +int parse_args (int argc, char *argv[],Arguments &args); int main (int argc, char *argv[]) @@ -24,19 +31,24 @@ main (int argc, char *argv[]) ACEXML_TRY_NEW_ENV { ACEXML_Parser parser; - - // TODO parse args for config filename - - ACEXML_Char *filename = ACE_LIB_TEXT("test.xml"); - ACEXML_FileCharStream fcs; - if ((retval = fcs.open(filename)) != 0) { - ACE_DEBUG ((LM_DEBUG, "Could not open file %s\n",filename)); + Arguments args; + args.filename_.set(ACE_TEXT("test.xml")); + + // parse args for config filename + if (parse_args(argc,argv,args) == -1) + { + return 1; + } + + ACEXML_FileCharStream *fcs = new ACEXML_FileCharStream(); + if ((retval = fcs->open(args.filename_.c_str())) != 0) { + ACE_DEBUG ((LM_DEBUG, "Could not open file %s\n",args.filename_.c_str())); return retval; } - ACEXML_InputSource is (&fcs); + ACEXML_InputSource is (fcs); //takes responsibility of fcs - Test_Handler handler (filename); + Test_Handler handler (args.filename_.c_str()); ACEXML_DefaultHandler dflt; parser.setContentHandler (&handler); @@ -47,9 +59,14 @@ main (int argc, char *argv[]) parser.parse(&is); ACEXML_TRY_CHECK; + if ((retval = fcs->close()) != 0) { + ACE_DEBUG ((LM_DEBUG, "Could not close file %s\n",args.filename_.c_str())); + return retval; + } + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Finished parsing\n"))); - // TODO configure according to parsed XML + // configure according to parsed XML ConfigFactory::Default_Config_Factory fact; fact.init(argc,argv); @@ -89,3 +106,28 @@ main (int argc, char *argv[]) return retval; } + +int parse_args (int argc, char *argv[], Arguments &args) +{ + ACE_Get_Opt get_opts (argc, argv, "f:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'f': + args.filename_.set(get_opts.opt_arg()); + ACE_DEBUG((LM_DEBUG,ACE_TEXT("Filename argument: %s\n"),args.filename_.c_str())); + break; + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "[-f ] " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} diff --git a/TAO/orbsvcs/tests/EC_Config/TestConfig.h b/TAO/orbsvcs/tests/EC_Config/TestConfig.h index 536569f4b3b..1ab044cd160 100644 --- a/TAO/orbsvcs/tests/EC_Config/TestConfig.h +++ b/TAO/orbsvcs/tests/EC_Config/TestConfig.h @@ -35,7 +35,7 @@ typedef ACE_Weak_Bound_Ptr TCFG_SET_WPTR; // distinct values for each. typedef unsigned long Entity_Type_t; -typedef long Period_t; +typedef long Period_t; //in milliseconds enum Criticality_t { // Defines the criticality of the entity. diff --git a/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp b/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp index 6e11fa54b2c..5e5be2a5140 100644 --- a/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp +++ b/TAO/orbsvcs/tests/EC_Config/Test_Handler.cpp @@ -9,9 +9,9 @@ #include //for atol #include //for istringstream -Test_Handler::Test_Handler (ACEXML_Char* fileName) +Test_Handler::Test_Handler (const char *filename) : configs_(new TestConfig::Test_Config_Set(0)), - fileName_(ACE::strnew (fileName)), + fileName_(filename), didtype_(0), //false didperiod_(0), didcrit_(0), @@ -23,8 +23,6 @@ Test_Handler::Test_Handler (ACEXML_Char* fileName) Test_Handler::~Test_Handler (void) { - delete[] this->fileName_; - const TestConfig::Test_Config_Set &cfgs = *this->configs_; for(size_t i=0; ifileName_, + ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_.c_str(), this->locator_->getLineNumber(), this->locator_->getColumnNumber())); ex.print(); @@ -419,7 +417,7 @@ Test_Handler::fatalError (ACEXML_SAXParseException& ex ACEXML_ENV_ARG_DECL_NOT_U ACE_THROW_SPEC ((ACEXML_SAXException)) { - ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_, + ACE_DEBUG ((LM_DEBUG, "%s:%d:%d ", this->fileName_.c_str(), this->locator_->getLineNumber(), this->locator_->getColumnNumber())); ex.print(); diff --git a/TAO/orbsvcs/tests/EC_Config/Test_Handler.h b/TAO/orbsvcs/tests/EC_Config/Test_Handler.h index d783386fecd..85320731c81 100644 --- a/TAO/orbsvcs/tests/EC_Config/Test_Handler.h +++ b/TAO/orbsvcs/tests/EC_Config/Test_Handler.h @@ -16,6 +16,7 @@ #include "TestConfig.h" +#include "ace/String_Base.h" #include "ACEXML/common/ContentHandler.h" #include "ACEXML/common/ErrorHandler.h" @@ -23,6 +24,8 @@ enum element { TESTCONFIG,TEST_CONFIG_T,TYPE,PERIOD,CRITICALITY,IMPORTANCE,NUM_ENTITIES }; +// TODO Change XML format to list supplier and client RT_Infos separate, plus dependencies + /** * @class Test_Handler * @@ -40,7 +43,7 @@ public: /* * Default constructor. */ - Test_Handler (ACEXML_Char* fileName); + Test_Handler (const char *filename); /* * Default destructor. @@ -167,7 +170,7 @@ public: private: TestConfig::TCFG_SET_SPTR configs_; - ACEXML_Char* fileName_; + ACE_CString fileName_; ACEXML_Locator* locator_; STACK scope_; diff --git a/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp new file mode 100644 index 00000000000..7546364855c --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.cpp @@ -0,0 +1,281 @@ +// $Id$ + +#include "TimeoutConsumer.h" + +#include //for ostringstream + +#include "orbsvcs/Event_Utilities.h" //for ACE_Supplier/ConsumerQOS_Factory +#include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/RtecEventCommC.h" +#include "ace/RW_Mutex.h" + +ACE_RCSID(EC_Examples, Consumer, "$Id$") + +TimeoutConsumer::TimeoutConsumer (void) + : _to_send(0) + , _num_sent(0) + , _hold_mtx(0) + , _done(0) + , _supplier(this) + , _consumer(this) + , _events(0) +{ +} + +TimeoutConsumer::~TimeoutConsumer (void) +{ + // TODO this->disconnect() ??? + + if (this->_hold_mtx && this->_done!=0) + { + this->_done->release(); + this->_hold_mtx = 0; + } +} + +void +TimeoutConsumer::connect (ACE_RW_Mutex* done, + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventComm::EventSourceID supplier_id, + size_t to_send, + const RtecEventComm::EventSet& events, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL) +{ + this->_supplier_id = supplier_id; + this->_to_send = to_send; + this->_num_sent = 0; + this->_hold_mtx = 0; + this->_done = done; + if (this->_done!= 0 && this->_num_sent_to_send) + { + int ret = done->acquire_read(); + if (ret == -1) + { + ACE_DEBUG((LM_DEBUG,"ERROR: Could not acquire read lock for TimeoutConsumer: %s\n", + ACE_OS::strerror(errno))); + } else + { + ACE_DEBUG((LM_DEBUG,"ACQUIRED read lock for TimeoutConsumer %d\n",this->_supplier_id)); + this->_hold_mtx = 1; + } + } else + { + ACE_DEBUG((LM_DEBUG,"Already done; did not grab read lock for TimeoutConsumer\n")); + } + + this->_events.length(events.length()); + for (size_t i=0; i_events[i] = events[i]; //copy event to local set + this->_events[i].header.source = this->_supplier_id; //make sure event source is this + } + + //create supplier RT_Info + std::ostringstream supp_entry_pt; + supp_entry_pt << entry_prefix << " Supplier " << this->_supplier_id; //unique RT_Info entry point + ACE_DEBUG((LM_DEBUG,"Creating %s\n",supp_entry_pt.str().c_str())); + RtecScheduler::handle_t rt_info = scheduler->create (supp_entry_pt.str().c_str() + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + scheduler->set (rt_info, + criticality, + period, period, period, + period, + importance, + period, + 0, + RtecScheduler::OPERATION + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Register as supplier of events + ACE_SupplierQOS_Factory supplierQOS; + for (size_t i=0; i_supplier_id, + events[i].header.type, + rt_info, + 1); + } + + RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = + ec->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy = + supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushSupplier_var supplierv = + this->_supplier._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy->connect_push_supplier (supplierv.in (), + supplierQOS.get_SupplierQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as event supplier\n")); + for (size_t i=0; icreate (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Register as consumer of timeout events + ACE_ConsumerQOS_Factory timeoutQOS; + timeoutQOS.insert_time(ACE_ES_EVENT_INTERVAL_TIMEOUT /*??*/, + period, //TimeBase::TimeT + rt_info); + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = + ec->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy = + consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + RtecEventComm::PushConsumer_var consumerv = + this->_consumer._this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy->connect_push_consumer (consumerv.in (), + timeoutQOS.get_ConsumerQOS () + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer connected as timeout consumer\n")); + std::ostringstream prd; + prd << period; + ACE_DEBUG((LM_DEBUG,"\tTimeout period: %s\n",prd.str().c_str())); +} + +void +TimeoutConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) +{ + if (! CORBA::is_nil (this->_consumer_proxy.in ())) + { + this->_consumer_proxy->disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_consumer_proxy = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + + // Deactivate the servant + PortableServer::POA_var poa = + this->_supplier._default_POA (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->_supplier ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object (id.in () ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + //disconnect consumer ??? + if (! CORBA::is_nil (this->_supplier_proxy.in())) + { + this->_supplier_proxy->disconnect_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->_supplier_proxy = RtecEventChannelAdmin::ProxyPushSupplier::_nil(); + + //Deactivate the servant + PortableServer::POA_var poa = + this->_consumer._default_POA(ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->_consumer ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + poa->deactivate_object(id.in() ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + } + + if (this->_hold_mtx && this->_done!=0) + { + this->_done->release(); + this->_hold_mtx = 0; + } +} + +void +TimeoutConsumer::push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (events.length () == 0) + { + ACE_DEBUG ((LM_DEBUG,"TimeoutConsumer (%P|%t) push but no events\n")); + return; + } + + ACE_DEBUG((LM_DEBUG,"TimeoutConsumer %d (%P|%t) received %d events:\n",this->_supplier_id, + events.length())); + if (this->_num_sent < this->_to_send) + { + for (size_t i=0; i_supplier_id)); + + //TODO send this->_events + ++this->_num_sent; + ACE_DEBUG((LM_DEBUG,"Sent events; %d sent\t%d total\n",this->_num_sent,this->_to_send)); + if (this->_num_sent >= this->_to_send) + { + //done + ACE_DEBUG((LM_DEBUG,"RELEASE read lock from TimeoutConsumer %d\n", + this->_supplier_id)); + this->_done->release(); + this->_hold_mtx = 0; + } + } + else + { + int prio = -1; + ACE_hthread_t handle; + ACE_Thread::self(handle); + ACE_Thread::getprio(handle,prio); + //ACE_thread_t tid = ACE_Thread::self(); + ACE_DEBUG ((LM_DEBUG, "TimeoutConsumer @%d (%P|%t) we received event type %d\n", + prio,events[0].header.type)); + } + } + } else + { + //do nothing + } +} + +void +TimeoutConsumer::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +void +TimeoutConsumer::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ +} + +// **************************************************************** + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h new file mode 100644 index 00000000000..db55b9970c3 --- /dev/null +++ b/TAO/orbsvcs/tests/EC_Config/TimeoutConsumer.h @@ -0,0 +1,110 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// ORBSVCS Real-time Event Channel examples +// +// = FILENAME +// TimeoutConsumer +// +// = AUTHOR +// Bryan Thrall (thrall@cse.wustl.edu) +// +// ============================================================================ + +#ifndef TIMEOUTCONSUMER_H +#define TIMEOUTCONSUMER_H + +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/RtecEventCommC.h" +#include "orbsvcs/RtecSchedulerC.h" +#include "orbsvcs/Channel_Clients_T.h" +//#include "ace/Task.h" +//#include "ace/Synch.h" +#include "ace/RW_Mutex.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TimeoutConsumer +{ + // = TITLE + // Simple consumer object which responds to timeout events. + // + // = DESCRIPTION + // This class is a consumer of timeout events. + // For each timeout event it consumes, it pushes a specified EventSet into the EC. + // + // There are several ways to connect and disconnect this class, + // and it is up to the driver program to use the right one. + // +public: + TimeoutConsumer (void); + // Default Constructor. + + virtual ~TimeoutConsumer (void); + + void connect (ACE_RW_Mutex* done, + RtecScheduler::Scheduler_ptr scheduler, + const char *entry_prefix, + TimeBase::TimeT period, + RtecScheduler::Importance_t importance, + RtecScheduler::Criticality_t criticality, + RtecEventComm::EventSourceID supplier_id, + size_t to_send, + const RtecEventComm::EventSet& events, + RtecEventChannelAdmin::EventChannel_ptr ec + ACE_ENV_ARG_DECL); + // This method connects the supplier to the EC. + + void disconnect (ACE_ENV_SINGLE_ARG_DECL); + // Disconnect from the EC. + + // = The RtecEventComm::PushConsumer methods + + virtual void push (const RtecEventComm::EventSet& events + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + // pushes _events as a supplier until _to_send pushes, then calls resume() on this task. + + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + + // = The RtecEventComm::PushSupplier methods + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + // The skeleton methods. + +private: + size_t _to_send; //number of times to push on timeout + size_t _num_sent; //number of pushes so far + int _hold_mtx; //1 when hold _done mutex; 0 else + ACE_RW_Mutex* _done; //release read lock when _num_sent >= _to_send + + RtecEventComm::EventSourceID _supplier_id; + // We generate an id based on the name.... + + RtecEventChannelAdmin::ProxyPushConsumer_var _consumer_proxy; + // We talk to the EC (as a supplier) using this proxy. + + ACE_PushSupplier_Adapter _supplier; + // We connect to the EC as a supplier so we can push events + // every time we receive a timeout event. + + RtecEventChannelAdmin::ProxyPushSupplier_var _supplier_proxy; + // We talk to the EC (as a consumer) using this proxy. + + ACE_PushConsumer_Adapter _consumer; + // We connect to the EC as a consumer so we can receive the + // timeout events. + + RtecEventComm::EventSet _events; + // set of events to push when a timeout event is received. +}; + +#endif /* CONSUMER_H */ diff --git a/TAO/orbsvcs/tests/EC_Config/test.xml b/TAO/orbsvcs/tests/EC_Config/test.xml index 5a37434eb0f..dd90d444e5b 100644 --- a/TAO/orbsvcs/tests/EC_Config/test.xml +++ b/TAO/orbsvcs/tests/EC_Config/test.xml @@ -3,16 +3,16 @@ 0 - 1000 + 700 10 1 - 2000 + 2100 10 - \ No newline at end of file + diff --git a/TAO/orbsvcs/tests/EC_Config/testconfig.dtd b/TAO/orbsvcs/tests/EC_Config/testconfig.dtd index 6330b60e817..c658b70003c 100644 --- a/TAO/orbsvcs/tests/EC_Config/testconfig.dtd +++ b/TAO/orbsvcs/tests/EC_Config/testconfig.dtd @@ -1,6 +1,7 @@ + -- cgit v1.2.1