diff options
Diffstat (limited to 'TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp | 296 |
1 files changed, 238 insertions, 58 deletions
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp index cd524449573..482856f1cc3 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp @@ -7,11 +7,13 @@ #include "orbsvcs/Event_Utilities.h" #include "orbsvcs/Event_Service_Constants.h" +#include "orbsvcs/Scheduler_Factory.h" #include "orbsvcs/Time_Utilities.h" - -#include "orbsvcs/Event/EC_Event_Channel.h" -#include "orbsvcs/Event/EC_Default_Factory.h" - +#include "orbsvcs/RtecEventChannelAdminC.h" +#include "orbsvcs/Sched/Config_Scheduler.h" +#include "orbsvcs/Runtime_Scheduler.h" +#include "orbsvcs/Event/Event_Channel.h" +#include "orbsvcs/Event/Module_Factory.h" #include "EC_Mcast.h" #if !defined (__ACE_INLINE__) @@ -118,17 +120,57 @@ ECM_Driver::run (int argc, char* argv[]) } } - TAO_EC_Event_Channel_Attributes attr (root_poa.in (), - root_poa.in ()); - TAO_EC_Event_Channel ec_impl (attr); +#if 0 + int min_priority = + ACE_Sched_Params::priority_min (ACE_SCHED_FIFO); + // Enable FIFO scheduling, e.g., RT scheduling class on Solaris. + + if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO, + min_priority, + ACE_SCOPE_PROCESS)) != 0) + { + if (ACE_OS::last_error () == EPERM) + ACE_DEBUG ((LM_DEBUG, + "%s: user is not superuser, " + "so remain in time-sharing class\n", argv[0])); + else + ACE_ERROR ((LM_ERROR, + "%s: ACE_OS::sched_params failed\n", argv[0])); + } + + if (ACE_OS::thr_setprio (min_priority) == -1) + { + ACE_DEBUG ((LM_DEBUG, "(%P|%t) main thr_setprio failed\n")); + } +#endif /* 0 */ + + ACE_Config_Scheduler scheduler_impl; + RtecScheduler::Scheduler_var scheduler = + scheduler_impl._this (TAO_TRY_ENV); + TAO_CHECK_ENV; + + CORBA::String_var str = + this->orb_->object_to_string (scheduler.in (), TAO_TRY_ENV); + TAO_CHECK_ENV; + ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n", + str.in ())); + + if (ACE_Scheduler_Factory::server (scheduler.in ()) == -1) + return -1; + + // Create the EventService implementation, but don't start its + // internal threads. + TAO_Reactive_Module_Factory module_factory; + ACE_EventChannel ec_impl (0, + ACE_DEFAULT_EVENT_CHANNEL_TYPE, + &module_factory); // Register Event_Service with the Naming Service. RtecEventChannelAdmin::EventChannel_var ec = ec_impl._this (TAO_TRY_ENV); TAO_CHECK_ENV; - CORBA::String_var str = - this->orb_->object_to_string (ec.in (), TAO_TRY_ENV); + str = this->orb_->object_to_string (ec.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ())); @@ -136,35 +178,45 @@ ECM_Driver::run (int argc, char* argv[]) poa_manager->activate (TAO_TRY_ENV); TAO_CHECK_ENV; - ec_impl.activate (TAO_TRY_ENV); + RtecEventChannelAdmin::EventChannel_var local_ec = + ec_impl._this (TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: local EC objref ready\n")); - this->open_federations (ec.in (), + this->open_federations (local_ec.in (), + scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_federations done\n")); - this->open_senders (ec.in (), + this->open_senders (local_ec.in (), + scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_senders done\n")); - this->open_receivers (ec.in (), + this->open_receivers (local_ec.in (), + scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: open_receivers done\n")); - this->activate_federations (ec.in (), + this->activate_federations (local_ec.in (), + scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: activate_federations done\n")); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: activate the EC\n")); + + // Create the EC internal threads + ec_impl.activate (); + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: running the test\n")); if (this->orb_->run () == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); @@ -180,9 +232,14 @@ ECM_Driver::run (int argc, char* argv[]) TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "EC_Mcast: shutdown the EC\n")); + ec_impl.shutdown (); + + ACE_DEBUG ((LM_DEBUG, "EC_Mcast: shutdown grace period\n")); + + ACE_Time_Value tv (5, 0); + if (this->orb_->run (&tv) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); - ec_impl.shutdown (TAO_TRY_ENV); - TAO_CHECK_ENV; } TAO_CATCH (CORBA::SystemException, sys_ex) { @@ -209,28 +266,28 @@ ECM_Driver::federation_has_shutdown (ECM_Local_Federation *federation, void ECM_Driver::open_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { for (int i = 0; i < this->local_federations_count_; ++i) { this->local_federations_[i]->open (this->event_count_, - ec, TAO_IN_ENV); + this->event_period_, + ec, scheduler, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); } } void ECM_Driver::activate_federations (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { this->federations_running_ = this->local_federations_count_; - RtecEventComm::Time interval = this->event_period_; - interval *= 10; for (int i = 0; i < this->local_federations_count_; ++i) { - this->local_federations_[i]->activate (ec, - interval, - TAO_IN_ENV); + this->local_federations_[i]->activate (this->event_period_, + ec, scheduler, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); } } @@ -247,12 +304,13 @@ ECM_Driver::close_federations (CORBA::Environment &TAO_IN_ENV) void ECM_Driver::open_senders (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { if (this->endpoint_.dgram ().open (ACE_Addr::sap_any) == -1) { // @@ TODO throw an application specific exception. - TAO_IN_ENV.exception (new CORBA::COMM_FAILURE ()); + TAO_IN_ENV.exception (new CORBA::COMM_FAILURE (CORBA::COMPLETED_NO)); return; } ACE_INET_Addr ignore_from; @@ -265,6 +323,7 @@ ECM_Driver::open_senders (RtecEventChannelAdmin::EventChannel_ptr ec, { this->all_federations_[i]->open (&this->endpoint_, ec, + scheduler, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); } @@ -283,11 +342,13 @@ ECM_Driver::close_senders (CORBA::Environment &TAO_IN_ENV) void ECM_Driver::open_receivers (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { for (int i = 0; i < this->local_federations_count_; ++i) { this->local_federations_[i]->open_receiver (ec, + scheduler, &this->endpoint_, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); @@ -575,13 +636,20 @@ ECM_Federation::ECM_Federation (char* name, void ECM_Federation::open (TAO_ECG_UDP_Out_Endpoint *endpoint, RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, this->name ()); + ACE_OS::strcat (buf, "/sender"); + RtecUDPAdmin::AddrServer_var addr_server = this->addr_server (TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); - this->sender_.init (ec, + this->sender_.init (ec, scheduler, + buf, addr_server.in (), endpoint, TAO_IN_ENV); @@ -590,17 +658,31 @@ ECM_Federation::open (TAO_ECG_UDP_Out_Endpoint *endpoint, // @@ TODO Make this a parameter.... this->sender_.mtu (64); + RtecScheduler::handle_t rt_info = + scheduler->create (buf, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); + // The worst case execution time is far less than 2 // milliseconds, but that is a safe estimate.... ACE_Time_Value tv (0, 2000); TimeBase::TimeT time; ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); ACE_ConsumerQOS_Factory qos; qos.start_disjunction_group (); for (int i = 0; i < this->consumer_types (); ++i) { - qos.insert_type (this->consumer_ipaddr (i), 0); + qos.insert_type (this->consumer_ipaddr (i), rt_info); } RtecEventChannelAdmin::ConsumerQOS qos_copy = qos.get_ConsumerQOS (); this->sender_.open (qos_copy, TAO_IN_ENV); @@ -632,9 +714,34 @@ ECM_Supplier::ECM_Supplier (ECM_Local_Federation* federation) void ECM_Supplier::open (const char* name, + RtecScheduler::Period_t period, RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { + RtecScheduler::handle_t rt_info = + scheduler->create (name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + period, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + this->supplier_id_ = ACE::crc32 (name); ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name, this->supplier_id_)); @@ -644,11 +751,11 @@ ECM_Supplier::open (const char* name, { qos.insert (this->supplier_id_, this->federation_->supplier_ipaddr (i), - 0, 1); + rt_info, 1); } qos.insert (this->supplier_id_, ACE_ES_EVENT_SHUTDOWN, - 0, 1); + rt_info, 1); RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = ec->for_suppliers (TAO_IN_ENV); @@ -680,15 +787,50 @@ ECM_Supplier::close (CORBA::Environment &TAO_IN_ENV) } void -ECM_Supplier::activate (RtecEventChannelAdmin::EventChannel_ptr ec, - RtecEventComm::Time interval, +ECM_Supplier::activate (const char* name, + RtecScheduler::Period_t period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment &TAO_IN_ENV) { + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, "consumer_"); + ACE_OS::strcat (buf, name); + RtecScheduler::handle_t rt_info = + scheduler->create (buf, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + + // The execution times are set to reasonable values, but + // actually they are changed on the real execution, i.e. we + // lie to the scheduler to obtain right priorities; but we + // don't care if the set is schedulable. + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + period, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); + + // Also connect our consumer for timeout events from the EC. + int interval = period / 10; + ACE_Time_Value tv_timeout (interval / ACE_ONE_SECOND_IN_USECS, + interval % ACE_ONE_SECOND_IN_USECS); + TimeBase::TimeT timeout; + ORBSVCS_Time::Time_Value_to_TimeT (timeout, tv_timeout); + ACE_ConsumerQOS_Factory consumer_qos; consumer_qos.start_disjunction_group (); consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT, - interval, - 0); + timeout, + rt_info); // = Connect as a consumer. RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = @@ -754,16 +896,31 @@ ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation) } void -ECM_Consumer::open (const char*, +ECM_Consumer::open (const char* name, RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, ACE_RANDR_TYPE &seed, CORBA::Environment& TAO_IN_ENV) { + this->rt_info_ = + scheduler->create (name, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); + // The worst case execution time is far less than 2 // milliseconds, but that is a safe estimate.... ACE_Time_Value tv (0, 2000); TimeBase::TimeT time; ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (this->rt_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); // = Connect as a consumer. this->consumer_admin_ = ec->for_consumers (TAO_IN_ENV); @@ -785,12 +942,12 @@ ECM_Consumer::connect (ACE_RANDR_TYPE &seed, ACE_ConsumerQOS_Factory qos; qos.start_disjunction_group (); - qos.insert_type (ACE_ES_EVENT_SHUTDOWN, 0); + qos.insert_type (ACE_ES_EVENT_SHUTDOWN, + this->rt_info_); const ECM_Federation* federation = this->federation_->federation (); for (int i = 0; i < federation->consumer_types (); ++i) { - unsigned int x = ACE_OS::rand_r (seed); - if (x < RAND_MAX / 2) + if (ACE_OS::rand_r (seed) < RAND_MAX / 2) { ACE_DEBUG ((LM_DEBUG, "Federation %s leaves group %s\n", @@ -804,7 +961,8 @@ ECM_Consumer::connect (ACE_RANDR_TYPE &seed, federation->name (), federation->consumer_name (i))); this->federation_->subscribed_bit (i, 1); - qos.insert_type (federation->consumer_ipaddr (i), 0); + qos.insert_type (federation->consumer_ipaddr (i), + this->rt_info_); } RtecEventComm::PushConsumer_var objref = this->_this (TAO_IN_ENV); @@ -881,7 +1039,9 @@ ECM_Local_Federation::~ECM_Local_Federation (void) void ECM_Local_Federation::open (int event_count, + RtecScheduler::Period_t period, RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment& TAO_IN_ENV) { this->event_count_ = event_count; @@ -891,12 +1051,12 @@ ECM_Local_Federation::open (int event_count, ACE_OS::strcpy (buf, this->federation_->name ()); ACE_OS::strcat (buf, "/supplier"); - this->supplier_.open (buf, ec, TAO_IN_ENV); + this->supplier_.open (buf, period, ec, scheduler, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); ACE_OS::strcpy (buf, this->federation_->name ()); ACE_OS::strcat (buf, "/consumer"); - this->consumer_.open (buf, ec, this->seed_, TAO_IN_ENV); + this->consumer_.open (buf, ec, scheduler, this->seed_, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); this->last_subscription_change_ = ACE_OS::gettimeofday (); @@ -913,11 +1073,14 @@ ECM_Local_Federation::close (CORBA::Environment &TAO_IN_ENV) } void -ECM_Local_Federation::activate (RtecEventChannelAdmin::EventChannel_ptr ec, - RtecEventComm::Time interval, +ECM_Local_Federation::activate (RtecScheduler::Period_t period, + RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, CORBA::Environment& TAO_IN_ENV) { - this->supplier_.activate (ec, interval, TAO_IN_ENV); + this->supplier_.activate (this->federation_->name (), + period, + ec, scheduler, TAO_IN_ENV); } void @@ -933,8 +1096,8 @@ ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer ACE_hrtime_t t = ACE_OS::gethrtime (); ORBSVCS_Time::hrtime_to_TimeT (s.header.creation_time, t); - s.header.ec_recv_time = ORBSVCS_Time::zero (); - s.header.ec_send_time = ORBSVCS_Time::zero (); + s.header.ec_recv_time = ORBSVCS_Time::zero; + s.header.ec_send_time = ORBSVCS_Time::zero; s.data.x = 0; s.data.y = 0; @@ -960,15 +1123,14 @@ ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer ACE_Time_Value delta = ACE_OS::gettimeofday () - this->last_subscription_change_; - unsigned int x = ACE_OS::rand_r (this->seed_); - double p = double (x) / RAND_MAX; + double p = double (ACE_OS::rand_r (this->seed_)) / RAND_MAX; double maxp = double (delta.msec ()) / this->subscription_change_period_; if (4 * p < maxp) { ACE_DEBUG ((LM_DEBUG, - "Reconfiguring federation %s: %f %f [%d]\n", - this->name (), p, maxp, x)); + "Reconfiguring federation %s: %f %f\n", + this->name (), p, maxp)); this->consumer_.disconnect (TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); this->consumer_.connect (this->seed_, TAO_IN_ENV); @@ -1013,9 +1175,15 @@ ECM_Local_Federation::consumer_push (ACE_hrtime_t, void ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, + RtecScheduler::Scheduler_ptr scheduler, TAO_ECG_UDP_Out_Endpoint* ignore_from, CORBA::Environment &TAO_IN_ENV) { + const int bufsize = 512; + char buf[bufsize]; + ACE_OS::strcpy (buf, this->name ()); + ACE_OS::strcat (buf, "/receiver"); + RtecUDPAdmin::AddrServer_var addr_server = this->federation_->addr_server (TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); @@ -1025,19 +1193,33 @@ ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, // @@ This should be parameters... ACE_Time_Value expire_interval (1, 0); const int max_timeouts = 5; - this->receiver_.init (ec, + this->receiver_.init (ec, scheduler, + buf, ignore_from, addr_server.in (), - reactor, - expire_interval, - max_timeouts, + reactor, expire_interval, max_timeouts, TAO_IN_ENV); TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); - const int bufsize = 512; - char buf[bufsize]; - ACE_OS::strcpy (buf, this->name ()); - ACE_OS::strcat (buf, "/receiver"); + RtecScheduler::handle_t rt_info = + scheduler->create (buf, TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID(TAO_IN_ENV); + + // The worst case execution time is far less than 2 + // milliseconds, but that is a safe estimate.... + ACE_Time_Value tv (0, 2000); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + scheduler->set (rt_info, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::OPERATION, + TAO_IN_ENV); + TAO_CHECK_ENV_RETURN_VOID (TAO_IN_ENV); RtecEventComm::EventSourceID source = ACE::crc32 (buf); @@ -1051,7 +1233,7 @@ ECM_Local_Federation::open_receiver (RtecEventChannelAdmin::EventChannel_ptr ec, { qos.insert (source, this->consumer_ipaddr (i), - 0, 1); + rt_info, 1); } RtecEventChannelAdmin::SupplierQOS qos_copy = @@ -1122,8 +1304,6 @@ ECM_Local_Federation::subscribed_bit (int i) const int main (int argc, char *argv []) { - TAO_EC_Default_Factory::init_svcs (); - ECM_Driver driver; return driver.run (argc, argv); } |