summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/EC_Config/ECConfig.cpp')
-rw-r--r--TAO/orbsvcs/tests/EC_Config/ECConfig.cpp501
1 files changed, 501 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
new file mode 100644
index 00000000000..9a0b356ebe9
--- /dev/null
+++ b/TAO/orbsvcs/tests/EC_Config/ECConfig.cpp
@@ -0,0 +1,501 @@
+// $Id$
+
+#ifndef ECCONFIG_C
+#define ECCONFIG_C
+
+#include <sstream> //for ostringstream
+
+#include "ace/Array.h"
+#include "ace/Bound_Ptr.h"
+#include "orbsvcs/Scheduler_Factory.h"
+#include "orbsvcs/Event_Utilities.h"
+#include "orbsvcs/Event_Service_Constants.h"
+#include "orbsvcs/Event/EC_Event_Channel.h"
+#include "orbsvcs/Event/EC_Kokyu_Factory.h"
+
+#include "ECConfig.h"
+
+namespace TestConfig {
+
+template <class SCHED_STRAT>
+ECConfig<SCHED_STRAT>::ECConfig (void)
+ : Test_Config (),
+ configured (false)
+{
+}
+
+template <class SCHED_STRAT>
+ECConfig<SCHED_STRAT>::~ECConfig (void)
+{
+ this->reset();
+}
+
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::reset (void)
+{
+ // We should do a lot of cleanup (disconnect from the EC,
+ // deactivate all the objects with the POA, etc.).
+
+ delete this->ec_impl;
+
+ delete this->sched_impl;
+
+ for(size_t i=0; i<consumers.size(); ++i) {
+ delete this->consumers[i];
+ }
+
+ for(size_t i=0; i<suppliers.size(); ++i) {
+ delete this->suppliers[i];
+ }
+
+ configured = false;
+}
+
+template <class SCHED_STRAT> int
+ECConfig<SCHED_STRAT>::configure (TCFG_SET_WPTR testconfigs)
+{
+ if (configured) {
+ this->reset(); //delete memory used by previous configuration
+ }
+
+ ACE_DECLARE_NEW_CORBA_ENV;
+ ACE_TRY
+ {
+ this->initEC();
+
+ ////////////////// EC ready; do config ////////////////////
+ size_t tsize = testconfigs->size();
+ supplier_cfgs.size(tsize);
+ consumer_cfgs.size(tsize);
+ testcfgs.size(tsize);
+ consumers.size(tsize);
+ suppliers.size(tsize);
+ for (size_t i=0; i<tsize; ++i)
+ {
+ //ACE_Weak_Bound_Ptr doesn't have operator*()! ??
+ //test_config_t *curcfg = (*testconfigs)[i];
+ test_config_t *curcfg = (*testconfigs.unsafe_get())[i];
+ testcfgs[i] = curcfg;
+
+ RtecScheduler::Criticality_t criticality;
+ switch (curcfg->criticality) {
+ case VERY_LOW_CRITICALITY :
+ criticality = RtecScheduler::VERY_LOW_CRITICALITY;
+ break;
+ case LOW_CRITICALITY :
+ criticality = RtecScheduler::LOW_CRITICALITY;
+ break;
+ case MEDIUM_CRITICALITY :
+ criticality = RtecScheduler::MEDIUM_CRITICALITY;
+ break;
+ case HIGH_CRITICALITY :
+ criticality = RtecScheduler::HIGH_CRITICALITY;
+ break;
+ case VERY_HIGH_CRITICALITY :
+ criticality = RtecScheduler::VERY_HIGH_CRITICALITY;
+ break;
+ }
+
+ RtecScheduler::Importance_t importance;
+ switch (curcfg->importance) {
+ case VERY_LOW_IMPORTANCE :
+ importance = RtecScheduler::VERY_LOW_IMPORTANCE;
+ break;
+ case LOW_IMPORTANCE :
+ importance = RtecScheduler::LOW_IMPORTANCE;
+ break;
+ case MEDIUM_IMPORTANCE :
+ importance = RtecScheduler::MEDIUM_IMPORTANCE;
+ break;
+ case HIGH_IMPORTANCE :
+ importance = RtecScheduler::HIGH_IMPORTANCE;
+ break;
+ case VERY_HIGH_IMPORTANCE :
+ importance = RtecScheduler::VERY_HIGH_IMPORTANCE;
+ break;
+ }
+
+ //create supplier RT_Info
+ std::ostringstream supp_entry_pt;
+ supp_entry_pt << "Supplier Event Class " << i; //unique RT_Info entry point
+ RtecScheduler::handle_t rt_info =
+ this->scheduler->create (supp_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
+ ACE_Time_Value tv (0, curcfg->period);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ this->scheduler->set (rt_info,
+ criticality,
+ time, time, time,
+ time,
+ importance,
+ time,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (this->supplier_cfgs.set(rt_info,i) != 0) {
+ ACE_DEBUG((LM_DEBUG, "Could not set supplier RT_Info into config: %d of %d\n",
+ i,consumer_cfgs.max_size()));
+ return 1;
+ }
+
+ //create consumer RT_Info
+ std::ostringstream cons_entry_pt;
+ cons_entry_pt << "Consumer Event Class " << i; //unique RT_Info entry point
+ rt_info =
+ this->scheduler->create (cons_entry_pt.str().c_str() ACE_ENV_ARG_PARAMETER);
+ tv.set (0, curcfg->period);
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ this->scheduler->set (rt_info,
+ criticality,
+ time, time, time,
+ time,
+ importance,
+ time,
+ 0,
+ RtecScheduler::OPERATION
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ if (this->consumer_cfgs.set(rt_info,i) != 0) {
+ ACE_DEBUG((LM_DEBUG, "Could not set consumer RT_Info into config: %d of %d\n",
+ i,consumer_cfgs.max_size()));
+ return 1;
+ }
+ }
+
+ this->connectConsumers();
+ this->connectSuppliers();
+
+ ACE_DEBUG ((LM_DEBUG, "Consumer RT_Infos:\n"));
+ print_RT_Infos (this->consumer_cfgs);
+ ACE_DEBUG ((LM_DEBUG, "\nSupplier RT_Infos:\n"));
+ print_RT_Infos (this->supplier_cfgs);
+
+ ////////////////// Configured; compute schedule ///////////
+ ACE_DEBUG ((LM_DEBUG, "Computing schedule\n"));
+ RtecScheduler::RT_Info_Set_var infos;
+ RtecScheduler::Config_Info_Set_var configs;
+ RtecScheduler::Scheduling_Anomaly_Set_var anomalies;
+
+ // Obtain the range of valid priorities in the current
+ // platform, the scheduler hard-code this values in the
+ // generated file, but in the future we may just use the
+ // "logical" priorities and define the mapping to OS
+ // priorities at run-time.
+ int min_os_priority =
+ ACE_Sched_Params::priority_min (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+ int max_os_priority =
+ ACE_Sched_Params::priority_max (ACE_SCHED_FIFO,
+ ACE_SCOPE_THREAD);
+ this->scheduler->compute_scheduling (min_os_priority,
+ max_os_priority,
+ infos.out (),
+ configs.out (),
+ anomalies.out ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Dump the schedule to a file..
+ ACE_Scheduler_Factory::dump_schedule (infos.in (),
+ configs.in (),
+ anomalies.in (),
+ "ecconfig.out");
+
+ ///////////// Activate the EC /////////////////
+ ACE_DEBUG ((LM_DEBUG, "activating EC\n"));
+ this->ec_impl->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ ACE_DEBUG ((LM_DEBUG, "EC activated\n"));
+
+ configured = true;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ return 1;
+ }
+ ACE_ENDTRY;
+ return 0; //successful config
+}
+
+template <class SCHED_STRAT> int
+ECConfig<SCHED_STRAT>::run (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "Running ECConfig\n"));
+ if (!this->configured) {
+ ACE_DEBUG ((LM_DEBUG, "Tried to run before configured\n"));
+ return 1;
+ }
+
+ ACE_TRY
+ {
+ RtecEventComm::EventSet event (1);
+ event.length (1);
+
+ ACE_Array<int> evt_counts(this->testcfgs.size());
+ for(size_t i=0; i<this->testcfgs.size(); ++i)
+ {
+ //copy over total number of events per test_config_t to send
+ evt_counts[i] = this->testcfgs[i]->num_entities;
+ }
+
+ size_t num_done = 0; //total number of testcfgs which have no more events to push
+ while (num_done<this->testcfgs.size())
+ {
+ //for each consumer, push an event
+ for(size_t i=0; i<this->testcfgs.size() && i<this->consumer_proxys.size(); ++i)
+ {
+ if (evt_counts[i]<=0)
+ {
+ if (evt_counts[i]==0)
+ {
+ //just finished
+ ++num_done;
+ evt_counts[i]--; //indicate accounted for in num_done
+ } //else already incr num_done for this one
+ continue; //no more events of this to push
+ } //else...
+ test_config_t *tcfg = this->testcfgs[i];
+ ProxyList::TYPE curproxy = this->consumer_proxys[i];
+
+ event[0].header.type = ACE_ES_EVENT_UNDEFINED+tcfg->type;
+ event[0].header.source = supplier_ids[i];
+ event[0].header.ttl = 1;
+
+ curproxy->push (event ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ //event pushed, so decr corresponding evt_count
+ evt_counts[i]--;
+
+ // BT TODO sleep until next period expires
+ ACE_Time_Value rate (0, 10000);
+ ACE_OS::sleep (rate);
+
+ }
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0; //successful run
+}
+
+template <class SCHED_STRAT> int
+ECConfig<SCHED_STRAT>::initEC()
+{
+ TAO_EC_Kokyu_Factory::init_svcs ();
+
+ ACE_DEBUG ((LM_DEBUG, "Initializing event channel\n"));
+ ACE_TRY
+ {
+ // ORB initialization boiler plate...
+ int argc = 0;
+ char** argv = 0;
+ this->orb =
+ CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ CORBA::Object_var object =
+ orb->resolve_initial_references ("RootPOA" ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->poa =
+ PortableServer::POA::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ this->poa_manager =
+ poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ // DO these need to remain in scope beyond this function?
+
+ // Create a scheduling service
+ ACE_NEW_RETURN (this->sched_impl,SCHED_STRAT,1);
+
+ this->scheduler = sched_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ // Create an event channel implementation...
+ TAO_EC_Event_Channel_Attributes attributes (poa.in (),
+ poa.in ());
+ attributes.scheduler = scheduler.in (); // no need to dup
+
+ ACE_NEW_RETURN (this->ec_impl,TAO_EC_Event_Channel (attributes),1);
+
+ this->event_channel =
+ this->ec_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+template <class SCHED_STRAT> int
+ECConfig<SCHED_STRAT>::connectConsumers()
+{
+ ACE_TRY
+ {
+ // The canonical protocol to connect to the EC
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
+ this->event_channel->for_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "connecting consumers\n"));
+ for (size_t i=0; i<this->consumer_cfgs.size() && i<this->testcfgs.size(); ++i)
+ {
+ ACE_NEW_RETURN(this->consumers[i],Consumer(i),1);
+
+ RtecScheduler::handle_t hndl = this->consumer_cfgs[i];
+ test_config_t *tcfg = this->testcfgs[i];
+
+ ACE_ConsumerQOS_Factory consumer_qos;
+ //consumer_qos.start_disjunction_group ();
+ // The types in the range [0,ACE_ES_EVENT_UNDEFINED) are
+ // reserved for the EC...
+ consumer_qos.insert_type (ACE_ES_EVENT_UNDEFINED+tcfg->type,
+ hndl);
+
+ RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy =
+ consumer_admin->obtain_push_supplier (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ RtecEventComm::PushConsumer_var consumerv =
+ consumers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ supplier_proxy->connect_push_consumer (consumerv.in (),
+ consumer_qos.get_ConsumerQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0; //successful run
+}
+
+template <class SCHED_STRAT> int
+ECConfig<SCHED_STRAT>::connectSuppliers()
+{
+ ACE_TRY
+ {
+ // The canonical protocol to connect to the EC
+ RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
+ event_channel->for_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "connecting suppliers\n"));
+ supplier_ids.size(supplier_cfgs.size());
+ consumer_proxys.size(supplier_cfgs.size());
+ for (size_t i=0; i<supplier_cfgs.size() && i<this->testcfgs.size(); ++i)
+ {
+ ACE_NEW_RETURN(this->suppliers[i],Supplier(),1);
+
+ RtecScheduler::handle_t hndl = this->supplier_cfgs[i];
+ test_config_t *tcfg = this->testcfgs[i];
+
+ RtecEventComm::EventSourceID supplier_id = i;
+ this->supplier_ids[i] = supplier_id;
+
+ ACE_SupplierQOS_Factory supplier_qos;
+ supplier_qos.insert (supplier_id,
+ ACE_ES_EVENT_UNDEFINED+tcfg->type,
+ hndl,
+ 1); // number of calls, but what does that mean?
+
+ consumer_proxys[i] =
+ supplier_admin->obtain_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ RtecEventComm::PushSupplier_var supplier =
+ suppliers[i]->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ //PROBLEM: Occasional segfault here:
+ consumer_proxys[i]->connect_push_supplier (supplier.in (),
+ supplier_qos.get_SupplierQOS ()
+ ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "suppliers connected\n"));
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ return 1;
+ }
+ ACE_ENDTRY;
+
+ return 0;
+}
+
+template <class SCHED_STRAT> void
+ECConfig<SCHED_STRAT>::print_RT_Infos (ACE_Array<RtecScheduler::handle_t> cfg_set)
+{
+ char *rt_info_format = "{%20s, %10d, %10d, %10d, "
+ "%10d, %10d, "
+ "(RtecScheduler::Criticality_t) %d, "
+ "(RtecScheduler::Importance_t) %d, "
+ "%10d, %10d, %10d, %10d, %10d, "
+ "(RtecScheduler::Info_Type_t) %d }";
+ ACE_TRY
+ {
+ for (size_t i=0; i<cfg_set.size(); ++i) {
+ RtecScheduler::handle_t hndl = cfg_set[i];
+ RtecScheduler::RT_Info info = *(this->scheduler->get(hndl));
+ ACE_TRY_CHECK;
+
+ if (i!=0)
+ {
+ //finish previous line
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+ }
+ ACE_DEBUG ((LM_DEBUG, rt_info_format,
+ (const char *) info.entry_point,
+ info.handle,
+ ACE_CU64_TO_CU32 (info.worst_case_execution_time),
+ ACE_CU64_TO_CU32 (info.typical_execution_time),
+ ACE_CU64_TO_CU32 (info.cached_execution_time),
+ info.period,
+ info.criticality,
+ info.importance,
+ ACE_CU64_TO_CU32 (info.quantum),
+ info.threads,
+ info.priority,
+ info.preemption_subpriority,
+ info.preemption_priority,
+ info.info_type));
+ }
+ //finish last line
+ ACE_DEBUG ((LM_DEBUG, "\n"));
+ }
+ ACE_CATCHANY
+ {
+ ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "ECConfig");
+ }
+ ACE_ENDTRY;
+
+}
+
+} /* namespace TestConfig */
+
+#endif /* ECCONFIG_C */