diff options
author | thrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-05-25 21:38:52 +0000 |
---|---|---|
committer | thrall <thrall@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2004-05-25 21:38:52 +0000 |
commit | b2c36806de01b12099d3d546385935adc9d5800d (patch) | |
tree | b78d76e6e45582a072441c7f4f7dd0bdedb1ddb3 | |
parent | 7f6aee383581cdce6b1cc8064454db7cfcf14623 (diff) | |
download | ATCD-b2c36806de01b12099d3d546385935adc9d5800d.tar.gz |
Fixed problem of Client sending to only one Middle; next, set up multiple consumer subscriptions, then join semantics in the Server
9 files changed, 242 insertions, 150 deletions
diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp index 1fe20dd823d..1998c7319c7 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_EC.cpp @@ -13,6 +13,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" #include "tao/ORB_Core.h" #include "Kokyu_EC.h" @@ -30,6 +31,7 @@ namespace { int config_run = 0; ACE_CString sched_type ="rms"; + ACE_CString ior_output_filename; FILE * ior_output_file; } /* @@ -158,6 +160,44 @@ public: ); ACE_CHECK; + //DEBUG: print out schedule + RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER); + //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL); + + RtecScheduler::RT_Info_Set_var infos; + RtecScheduler::Config_Info_Set_var configs; + RtecScheduler::Dependency_Set_var dependencies; + RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies; + RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + + int min_os_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + int max_os_priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + + scheduler->compute_scheduling (min_os_priority, + max_os_priority, + infos.out (), + dependencies.out (), + configs.out (), + anomalies.out () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + std::stringstream sched_out; + ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior' + sched_out << "schedule_" << ior_prefix.c_str() << ".out"; + + ACE_DEBUG((LM_DEBUG,"Consumer_EC writing schedule to %s\n",sched_out.str().c_str())); + + ACE_Scheduler_Factory::dump_schedule (infos.in (), + dependencies.in (), + configs.in (), + anomalies.in (), + sched_out.str().c_str()); + ////END DEBUG ACE_DEBUG((LM_DEBUG,"Consumer_EC set_up_supp_and_cons() DONE\n")); } //set_up_supp_and_cons() @@ -303,7 +343,8 @@ while ((c = get_opts ()) != -1) switch (c) { case 'o': - ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w"); + ior_output_filename = get_opts.opt_arg(); + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); if (ior_output_file == 0) ACE_ERROR_RETURN ((LM_ERROR, "Unable to open %s for writing: %p\n", @@ -324,7 +365,8 @@ while ((c = get_opts ()) != -1) // Indicates sucessful parsing of the command line if (ior_output_file == 0) { - ior_output_file = ACE_OS::fopen ("consumer_ec.ior", "w"); + ior_output_filename = "consumer_ec.ior"; + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); } return 0; } diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp index efca4690cfa..df8f14809db 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Consumer_Supplier_EC.cpp @@ -12,6 +12,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" #include "tao/ORB_Core.h" #include "Kokyu_EC.h" @@ -31,6 +32,7 @@ namespace int supp_id = 3; ACE_CString sched_type = "rms"; ACE_CString ior_input_file = "file://consumer_ec.ior"; + ACE_CString ior_output_filename; FILE * ior_output_file; } @@ -177,7 +179,7 @@ public: RtecScheduler::VERY_LOW_IMPORTANCE, supplier_impl1_3, "supplier1_3", - supp1_3_types[0] + supp1_3_types ACE_ENV_ARG_PARAMETER ); ACE_CHECK; @@ -192,6 +194,44 @@ public: add_dummy_supplier(types); ACE_CHECK; + //DEBUG: print out schedule + RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER); + //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL); + + RtecScheduler::RT_Info_Set_var infos; + RtecScheduler::Config_Info_Set_var configs; + RtecScheduler::Dependency_Set_var dependencies; + RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies; + RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + + int min_os_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + int max_os_priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + + scheduler->compute_scheduling (min_os_priority, + max_os_priority, + infos.out (), + dependencies.out (), + configs.out (), + anomalies.out () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + std::stringstream sched_out; + ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior' + sched_out << "schedule_" << ior_prefix.c_str() << ".out"; + + ACE_DEBUG((LM_DEBUG,"Supplier_EC writing schedule to %s\n",sched_out.str().c_str())); + + ACE_Scheduler_Factory::dump_schedule (infos.in (), + dependencies.in (), + configs.in (), + anomalies.in (), + sched_out.str().c_str()); + ////END DEBUG //Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER); //ACE_CHECK; } @@ -335,7 +375,8 @@ int parse_args (int argc, char *argv[]) { case 'o': { - ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w"); + ior_output_filename = get_opts.opt_arg(); + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); if (ior_output_file == 0) ACE_ERROR_RETURN ((LM_ERROR, "Unable to open %s for writing: %p\n", @@ -376,7 +417,8 @@ int parse_args (int argc, char *argv[]) } if (ior_output_file == 0) { - ior_output_file = ACE_OS::fopen ("consumer_supplier_ec.ior", "w"); + ior_output_filename = "consumer_supplier_ec.ior"; + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); } // Indicates sucessful parsing of the command line return 0; diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp index 30e8ab245cf..eb6fdcfb5c6 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.cpp @@ -1,12 +1,7 @@ // $Id$ #include "Dynamic_Supplier.h" -//#include "ace/Time_Value.h" -//#include "ace/Thread.h" //for ACE_Thread::self() -//#include "ace/Counter.h" -//#include "ace/OS_NS_sys_time.h" #include "orbsvcs/Event_Service_Constants.h" -//#include "orbsvcs/Event/EC_Event_Channel.h" #include "orbsvcs/RtecEventCommC.h" #if defined (ACE_HAS_DSUI) @@ -17,11 +12,11 @@ ACE_RCSID(EC_Examples, Dynamic_Supplier, "$Id$") -Dynamic_Supplier::Dynamic_Supplier(RtecEventComm::EventSourceID id, - RtecEventComm::EventType normal_type1, - RtecEventComm::EventType normal_type2, - RtecEventComm::EventType ft_type1, - RtecEventComm::EventType ft_type2, +Dynamic_Supplier::Dynamic_Supplier(SourceID id, + EventType normal_type1, + EventType normal_type2, + EventType ft_type1, + EventType ft_type2, Service_Handler *handler) : Supplier(id,normal_type1,ft_type1,handler) , norm_type2_(normal_type2) @@ -44,17 +39,15 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL) ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (%P|%t) handle_service_start() DONE\n")); } - PushConsumer_Vector *proxies = &(this->consumer_proxy_); - RtecEventComm::EventSet eventA (1); eventA.length (1); eventA[0].header.source = this->id_; eventA[0].header.ttl = 1; RtecEventComm::EventSet eventB (1); - eventA.length (1); - eventA[0].header.source = this->id_; - eventA[0].header.ttl = 1; + eventB.length (1); + eventB[0].header.source = this->id_; + eventB[0].header.ttl = 1; switch (this->mode_) { case FAULT_TOLERANT: @@ -93,31 +86,18 @@ Dynamic_Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL) oid.type = eventA[0].header.type; DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid); //TODO: BUG? This code pushes eventA/B to ALL consumers! - for(PushConsumer_Vector::Iterator iter(*proxies); - !iter.done(); iter.advance()) - { - PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported! - iter.next(proxy); - ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventA %d\n",this->id_,eventA[0].header.type)); - (*proxy)->push (eventA ACE_ENV_ARG_PARAMETER); - } + ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventA %d\n",this->id_,eventA[0].header.type)); + this->consumer_proxy_->push (eventA ACE_ENV_ARG_PARAMETER); DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid); oid.type = eventB[0].header.type; DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid); - for(PushConsumer_Vector::Iterator iter(*proxies); - !iter.done(); iter.advance()) - { - PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported! - iter.next(proxy); - ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventB %d\n",this->id_,eventB[0].header.type)); - (*proxy)->push (eventB ACE_ENV_ARG_PARAMETER); - } - DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid); - //DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL); + ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t pushing eventB %d\n",this->id_,eventB[0].header.type)); + this->consumer_proxy_->push (eventB ACE_ENV_ARG_PARAMETER); ACE_DEBUG((LM_DEBUG,"Dynamic_Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec())); + DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid); if (this->handler_ != 0) { diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h index 2fc9cdc853b..82e5eb1990b 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Dynamic_Supplier.h @@ -35,11 +35,11 @@ class Dynamic_Supplier : public Supplier public: ///Calls up to Service_Handler every time a timeout ///occurs. normal_channel_id1 is used as id returned by get_id(). - Dynamic_Supplier(RtecEventComm::EventSourceID id, - RtecEventComm::EventType normal_type1, - RtecEventComm::EventType normal_type2, - RtecEventComm::EventType ft_type1, - RtecEventComm::EventType ft_type2, + Dynamic_Supplier(SourceID id, + EventType normal_type1, + EventType normal_type2, + EventType ft_type1, + EventType ft_type2, Service_Handler *handler = 0); virtual ~Dynamic_Supplier(void); diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp index a9cc4004d0c..ba890822a96 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.cpp @@ -96,9 +96,9 @@ Kokyu_EC::init(const char* schedule_discipline, PortableServer::POA_ptr poa) ec_impl_ = ec; consumer_admin_ = ec_impl_->for_consumers(ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; supplier_admin_ = ec_impl_->for_suppliers(ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; } ACE_CATCHALL { return -1; @@ -126,7 +126,7 @@ Kokyu_EC::register_consumer ( { RtecScheduler::handle_t consumer1_rt_info = scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; scheduler_impl_->set (consumer1_rt_info, info.criticality, @@ -139,26 +139,30 @@ Kokyu_EC::register_consumer ( info.threads, info.info_type ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; ACE_ConsumerQOS_Factory consumer_qos1; if (type != ACE_ES_EVENT_INTERVAL_TIMEOUT) - consumer_qos1.insert_type (type, consumer1_rt_info); + { + ACE_DEBUG((LM_DEBUG,"Kokyu_EC::register_consumer() inserting type %d into RT_Info %d\n",type,consumer1_rt_info)); + consumer_qos1.insert_type (type, consumer1_rt_info); + } else - consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, - info.period, //in 100s of nanosec - consumer1_rt_info); - + { + consumer_qos1.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, + info.period, //in 100s of nanosec + consumer1_rt_info); + } proxy_supplier = consumer_admin_->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; proxy_supplier->connect_push_consumer (consumer, consumer_qos1.get_ConsumerQOS () ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; return consumer1_rt_info; } @@ -166,7 +170,7 @@ RtEventChannelAdmin::handle_t Kokyu_EC::register_supplier ( const char * entry_point, RtecEventComm::EventSourceID source, - RtecEventComm::EventType type, + EventType_Vector& supp_types, //RtecEventComm::EventType type, RtecEventComm::PushSupplier_ptr supplier, RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer ACE_ENV_ARG_DECL @@ -180,24 +184,32 @@ Kokyu_EC::register_supplier ( { RtecScheduler::handle_t supplier1_rt_info = scheduler_impl_->create (entry_point ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; ACE_SupplierQOS_Factory supplier_qos1; - supplier_qos1.insert (source, - type, - supplier1_rt_info, - 1 /* number of calls, but what does that mean? */); + for(EventType_Vector::Iterator iter(supp_types); + !iter.done(); iter.advance()) + { + EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported! + iter.next(type); + + ACE_DEBUG((LM_DEBUG,"Kokyu_EC::register_supplier() inserting type %d into RT_Info %d\n",*type,supplier1_rt_info)); + supplier_qos1.insert (source, + *type, + supplier1_rt_info, + 1 /* number of calls, but what does that mean? */); + } proxy_consumer = supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; proxy_consumer->connect_push_supplier (supplier, supplier_qos1.get_SupplierQOS () ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; return supplier1_rt_info; -} +} //register_suppliers() void Kokyu_EC::add_dependency ( @@ -248,9 +260,9 @@ Kokyu_EC::start (ACE_ENV_SINGLE_ARG_DECL) configs.out (), anomalies.out () ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; ec_impl_->activate (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; //@BT: EC activated is roughly equivalent to having the DT scheduler ready to run //DSTRM_EVENT (MAIN_GROUP_FAM, SCHEDULER_STARTED, 1, 0, NULL); @@ -276,7 +288,7 @@ void Kokyu_EC::add_supplier_with_timeout( Supplier * supplier_impl, const char * supp_entry_point, - RtecEventComm::EventType supp_type, + EventType_Vector& supp_types, //RtecEventComm::EventType supp_type, Timeout_Consumer * timeout_consumer_impl, const char * timeout_entry_point, ACE_Time_Value period, @@ -291,7 +303,7 @@ Kokyu_EC::add_supplier_with_timeout( , RtecScheduler::SYNCHRONIZATION_FAILURE )) { - add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER); + add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER); ACE_CHECK; add_timeout_consumer(supplier_impl,timeout_consumer_impl,timeout_entry_point,period,crit,imp ACE_ENV_ARG_PARAMETER); ACE_CHECK; @@ -338,10 +350,8 @@ Kokyu_EC::add_timeout_consumer( ACE_CHECK; //don't need to save supplier_timeout_consumer_rt_info because only used to set dependency here: - Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info(); this->add_dependency (supplier_timeout_consumer_rt_info, - supp_infos[0], - //supplier_impl->rt_info(), + supplier_impl->rt_info(), 1, RtecBase::TWO_WAY_CALL ACE_ENV_ARG_PARAMETER); @@ -355,7 +365,7 @@ void Kokyu_EC::add_supplier( Supplier * supplier_impl, const char * entry_point, - RtecEventComm::EventType type + EventType_Vector& supp_types //RtecEventComm::EventType type ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( @@ -365,7 +375,7 @@ Kokyu_EC::add_supplier( , RtecScheduler::SYNCHRONIZATION_FAILURE )) { - RtecEventComm::EventSourceID supplier_id = supplier_impl->get_id(); + Supplier::SourceID supplier_id = supplier_impl->get_id(); RtecEventChannelAdmin::ProxyPushConsumer_var consumer_proxy; RtecEventComm::PushSupplier_var supplier; @@ -376,18 +386,14 @@ Kokyu_EC::add_supplier( RtecScheduler::handle_t supplier_rt_info = this->register_supplier(entry_point, supplier_id, - type, + supp_types, //type, supplier.in(), consumer_proxy.out() ACE_ENV_ARG_PARAMETER); ACE_CHECK; - Supplier::PushConsumer_Vector cons_proxies; - cons_proxies.push_back(consumer_proxy); - supplier_impl->set_consumer_proxy(cons_proxies); - Supplier::RT_Info_Vector supp_infos; - supp_infos.push_back(supplier_rt_info); - supplier_impl->rt_info(supp_infos); + supplier_impl->set_consumer_proxy(consumer_proxy); + supplier_impl->rt_info(supplier_rt_info); this->suppliers_.push_back(supplier_impl); } //add_supplier() @@ -403,7 +409,7 @@ Kokyu_EC::add_consumer_with_supplier( RtecScheduler::Importance_t cons_imp, Supplier * supplier_impl, const char * supp_entry_point, - RtecEventComm::EventType supp_type + EventType_Vector& supp_types //RtecEventComm::EventType supp_type ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( @@ -415,14 +421,12 @@ Kokyu_EC::add_consumer_with_supplier( { add_consumer(consumer_impl,cons_entry_point,cons_period,cons_type,cons_crit,cons_imp ACE_ENV_ARG_PARAMETER); ACE_CHECK; - add_supplier(supplier_impl,supp_entry_point,supp_type ACE_ENV_ARG_PARAMETER); + add_supplier(supplier_impl,supp_entry_point,supp_types ACE_ENV_ARG_PARAMETER); ACE_CHECK; - Supplier::RT_Info_Vector supp_infos = supplier_impl->rt_info(); Consumer::RT_Info_Vector cons_infos = consumer_impl->rt_info(); this->add_dependency (cons_infos[0],//consumer_impl->rt_info(), - supp_infos[0], - //supplier_impl->rt_info(), + supplier_impl->rt_info(), 1, RtecBase::TWO_WAY_CALL ACE_ENV_ARG_PARAMETER); @@ -479,7 +483,7 @@ Kokyu_EC::add_consumer( } //add_consumer() void -Kokyu_EC::add_dummy_supplier(EventType_Vector& types +Kokyu_EC::add_dummy_supplier(EventType_Vector& supp_types ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( @@ -496,7 +500,7 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types //known event types. RtecScheduler::handle_t general_rt_info = this->scheduler_->create ("Generalized Supplier (DUMMY)" ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; ACE_Time_Value exec(0,500); //some arbitrary execution time RtecScheduler::Time exec_time; @@ -515,12 +519,12 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types 0, //threads RtecScheduler::OPERATION ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; - RtecEventComm::EventSourceID general_id = 3000; //we assume 3000 is big enough to not overlap w/ actual suppliers + Supplier::SourceID general_id = 3000; //we assume 3000 is big enough to not overlap w/ actual suppliers ACE_SupplierQOS_Factory general_qos; //NOTE that this is kind of hard-cody since it assumes types between UNDEF+6 and UNDEF+9 - for(EventType_Vector::Iterator iter(types); + for(EventType_Vector::Iterator iter(supp_types); !iter.done(); iter.advance()) { EventType_Vector::TYPE *type; //would rather const to ensure we don't change it, but not supported! @@ -534,7 +538,7 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types } //we need to actually connect to the EC so the scheduler will set up the dependencies right - RtecEventComm::EventType supp_type = general_id; //some arbitrary type, never pushed + Supplier::EventType supp_type = general_id; //some arbitrary type, never pushed Supplier * general_impl; ACE_NEW(general_impl, Supplier(general_id,supp_type,supp_type)); @@ -542,23 +546,19 @@ Kokyu_EC::add_dummy_supplier(EventType_Vector& types RtecEventComm::PushSupplier_var general; general = general_impl->_this(ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer; proxy_consumer = this->supplier_admin_->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_TRY_CHECK; + ACE_CHECK; proxy_consumer->connect_push_supplier (general.in(), general_qos.get_SupplierQOS() ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - Supplier::PushConsumer_Vector proxies(1); - proxies.push_back(proxy_consumer); - general_impl->set_consumer_proxy(proxies); - Supplier::RT_Info_Vector infos(1); - infos.push_back(general_rt_info); - general_impl->rt_info(infos); + ACE_CHECK; + + general_impl->set_consumer_proxy(proxy_consumer); + general_impl->rt_info(general_rt_info); this->suppliers_.push_back(general_impl); } //add_dummy_supplier diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h index caac54bd51c..56bccc56135 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Kokyu_EC.h @@ -49,7 +49,7 @@ public: virtual RtEventChannelAdmin::handle_t register_supplier ( const char * entry_point, RtecEventComm::EventSourceID source, - RtecEventComm::EventType type, + EventType_Vector& supp_types, //RtecEventComm::EventType type, RtecEventComm::PushSupplier_ptr supplier, RtecEventChannelAdmin::ProxyPushConsumer_out proxy_consumer ACE_ENV_ARG_DECL @@ -93,7 +93,7 @@ public: void add_supplier_with_timeout( Supplier * supplier_impl, const char * supp_entry_point, - RtecEventComm::EventType supp_type, + EventType_Vector& supp_types, //RtecEventComm::EventType supp_type, Timeout_Consumer * timeout_consumer_impl, const char * timeout_entry_point, ACE_Time_Value period, @@ -129,7 +129,7 @@ public: void add_supplier( Supplier * supplier_impl, const char * entry_point, - RtecEventComm::EventType type + EventType_Vector& supp_types //RtecEventComm::EventType type ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( @@ -149,7 +149,7 @@ public: RtecScheduler::Importance_t cons_imp, Supplier * supplier_impl, const char * supp_entry_point, - RtecEventComm::EventType supp_type + EventType_Vector& supp_types //RtecEventComm::EventType supp_type ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( @@ -176,7 +176,7 @@ public: , RtecScheduler::SYNCHRONIZATION_FAILURE )); - void add_dummy_supplier(EventType_Vector& types + void add_dummy_supplier(EventType_Vector& supp_types ACE_ENV_ARG_DECL ) ACE_THROW_SPEC (( diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp index d182e1e9062..fe688bc7ed6 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.cpp @@ -17,9 +17,9 @@ ACE_RCSID(EC_Examples, Supplier, "$Id$") -Supplier::Supplier (RtecEventComm::EventSourceID id, - RtecEventComm::EventType norm_type, - RtecEventComm::EventType ft_type, +Supplier::Supplier (SourceID id, + EventType norm_type, + EventType ft_type, Service_Handler *handler) : id_ (id) , norm_type_(norm_type) @@ -34,27 +34,18 @@ Supplier::~Supplier (void) } void -Supplier::set_consumer_proxy(PushConsumer_Vector consumer_proxies) +Supplier::set_consumer_proxy(ConsumerProxy proxy) { - this->consumer_proxy_.clear(); - - for(PushConsumer_Vector::Iterator iter(consumer_proxies); - !iter.done(); iter.advance()) - { - PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported! - iter.next(proxy); - - this->consumer_proxy_.push_back(*proxy); - } + this->consumer_proxy_ = proxy; } void -Supplier::rt_info(RT_Info_Vector& supplier_rt_info) +Supplier::rt_info(InfoHandle supplier_rt_info) { this->rt_info_ = supplier_rt_info; } -Supplier::RT_Info_Vector& +Supplier::InfoHandle Supplier::rt_info(void) { return this->rt_info_; @@ -103,14 +94,8 @@ Supplier::timeout_occured (ACE_ENV_SINGLE_ARG_DECL) ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_START at %u\n",this->id_,ACE_OS::gettimeofday().msec())); DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_START, 0, sizeof(Object_ID), (char*)&oid); - for(PushConsumer_Vector::Iterator iter(this->consumer_proxy_); - !iter.done(); iter.advance()) - { - PushConsumer_Vector::TYPE *proxy; //would rather const to ensure we don't change it, but not supported! - iter.next(proxy); + this->consumer_proxy_->push (event ACE_ENV_ARG_PARAMETER); - (*proxy)->push (event ACE_ENV_ARG_PARAMETER); - } //DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, m_id, 0, NULL); ACE_DEBUG((LM_DEBUG,"Supplier (id %d) in thread %t ONE_WAY_CALL_DONE at %u\n",this->id_,ACE_OS::gettimeofday().msec())); DSTRM_EVENT (WORKER_GROUP_FAM, ONE_WAY_CALL_DONE, 0, sizeof(Object_ID), (char*)&oid); @@ -129,14 +114,14 @@ Supplier::disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) { } -RtecEventComm::EventSourceID +Supplier::SourceID Supplier::get_id(void) const { return this->id_; } void -Supplier::mode(Supplier::mode_t mode) +Supplier::mode(mode_t mode) { ACE_DEBUG((LM_DEBUG,"Supplier (%P|%t) changing mode from %d to %d\n",this->mode_,mode)); this->mode_ = mode; diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h index 44adcaa76b5..6186a4eb9b6 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier.h @@ -37,7 +37,7 @@ class Supplier : public POA_RtecEventComm::PushSupplier // // = DESCRIPTION // This class is a supplier of events. - // It simply register for two event typesone event type + // It simply registers for two event types. // The class is just a helper to simplify common tasks in EC // tests, such as subscribing for a range of events, disconnecting // from the EC, informing the driver of shutdown messages, etc. @@ -51,9 +51,12 @@ public: FAULT_TOLERANT }; - typedef ACE_Vector<RtecEventChannelAdmin::ProxyPushConsumer_var> PushConsumer_Vector; + typedef RtecEventChannelAdmin::ProxyPushConsumer_var ConsumerProxy; + typedef RtecEventComm::EventSourceID SourceID; + typedef RtecEventComm::EventType EventType; + typedef RtecScheduler::handle_t InfoHandle; - Supplier (RtecEventComm::EventSourceID id, RtecEventComm::EventType norm_type, RtecEventComm::EventType ft_type, + Supplier (SourceID id, EventType norm_type, EventType ft_type, Service_Handler * handler = 0); // Constructor @@ -67,14 +70,12 @@ public: virtual void timeout_occured (ACE_ENV_SINGLE_ARG_DECL); - void set_consumer_proxy(PushConsumer_Vector consumer_proxies); + void set_consumer_proxy(ConsumerProxy consumer_proxies); - typedef ACE_Vector<RtecScheduler::handle_t> RT_Info_Vector; + void rt_info(InfoHandle supplier_rt_info); + InfoHandle rt_info(void); - void rt_info(RT_Info_Vector& supplier_rt_info); - RT_Info_Vector& rt_info(void); - - RtecEventComm::EventSourceID get_id(void) const; + SourceID get_id(void) const; void mode(mode_t mode); mode_t mode(void) const; @@ -82,15 +83,15 @@ public: Service_Handler * handler(void) const; protected: - RtecEventComm::EventSourceID id_; - RtecEventComm::EventType norm_type_; - RtecEventComm::EventType ft_type_; + SourceID id_; + EventType norm_type_; + EventType ft_type_; - PushConsumer_Vector consumer_proxy_; + ConsumerProxy consumer_proxy_; mode_t mode_; - RT_Info_Vector rt_info_; + InfoHandle rt_info_; Service_Handler *handler_; }; //class Supplier diff --git a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp index c8d40714f21..8dfee69b2a3 100644 --- a/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp +++ b/TAO/orbsvcs/examples/RtEC/Federated_Kokyu/dynamic_topology_test/Supplier_EC.cpp @@ -14,6 +14,7 @@ #include "orbsvcs/Event/EC_Kokyu_Factory.h" #include "orbsvcs/Time_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" #include "tao/ORB_Core.h" #include "Kokyu_EC.h" @@ -31,7 +32,8 @@ namespace { int config_run = 0; - ACE_CString sched_type ="rms"; + ACE_CString sched_type = "rms"; + ACE_CString ior_output_filename; FILE * ior_output_file; typedef ACE_Vector<const char*> Filename_Array; @@ -223,7 +225,7 @@ public: Mode_Handler *mode_handler; ACE_NEW(mode_handler, - Mode_Handler(100)); //mode switch after first event + Mode_Handler(5)); //mode switch after first event Supplier *supplier_impl1_1; Timeout_Consumer *timeout_consumer_impl1_1; ACE_NEW(supplier_impl1_1, @@ -235,7 +237,7 @@ public: ACE_Time_Value tv(1,200000); //period DEBUG: set to much longer period add_supplier_with_timeout(supplier_impl1_1, "supplier1_1", - supp1_1_types[0], + supp1_1_types, timeout_consumer_impl1_1, "supplier1_1_timeout_consumer", tv, @@ -282,7 +284,7 @@ public: RtecScheduler::VERY_LOW_IMPORTANCE, supplier_impl1_2, "supplier1_2", - supp1_2_types[0] + supp1_2_types ACE_ENV_ARG_PARAMETER ); ACE_CHECK; @@ -297,6 +299,44 @@ public: add_dummy_supplier(types); ACE_CHECK; + //DEBUG: print out schedule + RtecScheduler::Scheduler_ptr scheduler = this->scheduler(ACE_ENV_SINGLE_ARG_PARAMETER); + //RtecEventChannelAdmin::EventChannel_ptr event_channel = this->event_channel(ACE_ENV_SINGLE_ARG_DECL); + + RtecScheduler::RT_Info_Set_var infos; + RtecScheduler::Config_Info_Set_var configs; + RtecScheduler::Dependency_Set_var dependencies; + RtecScheduler::Scheduling_Anomaly_Set unsafe_anomalies; + RtecScheduler::Scheduling_Anomaly_Set_var anomalies; + + int min_os_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + int max_os_priority = + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO, + ACE_SCOPE_THREAD); + + scheduler->compute_scheduling (min_os_priority, + max_os_priority, + infos.out (), + dependencies.out (), + configs.out (), + anomalies.out () + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + std::stringstream sched_out; + ACE_CString ior_prefix(ior_output_filename.c_str(),ior_output_filename.length()-4); //cut off '.ior' + sched_out << "schedule_" << ior_prefix.c_str() << ".out"; + + ACE_DEBUG((LM_DEBUG,"Supplier_EC writing schedule to %s\n",sched_out.str().c_str())); + + ACE_Scheduler_Factory::dump_schedule (infos.in (), + dependencies.in (), + configs.in (), + anomalies.in (), + sched_out.str().c_str()); + ////END DEBUG Kokyu_EC::start(ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; } //start() @@ -444,7 +484,8 @@ int parse_args (int argc, char *argv[]) switch (c) { case 'o': - ior_output_file = ACE_OS::fopen (get_opts.opt_arg (), "w"); + ior_output_filename = get_opts.opt_arg(); + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); if (ior_output_file == 0) { ACE_ERROR_RETURN ((LM_ERROR, @@ -477,7 +518,8 @@ int parse_args (int argc, char *argv[]) } if (ior_output_file == 0) { - ior_output_file = ACE_OS::fopen ("supplier_ec.ior", "w"); + ior_output_filename = "supplier_ec.ior"; + ior_output_file = ACE_OS::fopen (ior_output_filename.c_str(), "w"); } if (ior_input_files.size() == 0) { |