diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-22 20:35:27 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-22 20:35:27 +0000 |
commit | 8156aae78b9586201f737cf1e6d9c6fd36f84df1 (patch) | |
tree | 6d556aab09d32276b10b668dc56ce4ebee3adbf4 | |
parent | f346071f703c01bc988bff7403284f162eb0d32e (diff) | |
download | ATCD-8156aae78b9586201f737cf1e6d9c6fd36f84df1.tar.gz |
ChangeLogTag:Tue Sep 22 15:33:48 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r-- | TAO/ChangeLog-98c | 14 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp | 218 | ||||
-rw-r--r-- | TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h | 36 |
3 files changed, 171 insertions, 97 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c index cab17f5be89..6ea535688c3 100644 --- a/TAO/ChangeLog-98c +++ b/TAO/ChangeLog-98c @@ -1,8 +1,16 @@ +Tue Sep 22 15:33:48 1998 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/tests/EC_Mcast/EC_Mcast.h: + * orbsvcs/tests/EC_Mcast/EC_Mcast.cpp: + Added simple support for dynamic reconfiguration of the consumer + subscriptions; we still have to propagate this to the multicast + groups that we join. + Tue Sep 22 14:29:29 1998 Vishal Kachroo <vishal@merengue.cs.wustl.edu> - Made changes to the ACE_DEBUG to print the server/client Process ID/ thread ID - for all messages to console. Also modified README to include the description of \ - tests done by client. + Made changes to the ACE_DEBUG to print the server/client Process + ID/ thread ID for all messages to console. Also modified README to + include the description of tests done by client. * examples/Simple/bank/README: * examples/Simple/bank/ AccountManager_i.cpp diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp index 9e4d7475711..fa9fcf84649 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp @@ -22,8 +22,7 @@ ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$") ECM_Driver::ECM_Driver (void) - : lcl_name_ ("ECM"), - event_period_ (25000), + : event_period_ (250000), event_count_ (100), config_filename_ (0), pid_filename_ (0), @@ -64,12 +63,10 @@ ECM_Driver::run (int argc, char* argv[]) ACE_DEBUG ((LM_DEBUG, "Execution parameters:\n" - " lcl name = <%s>\n" " event period = <%d> (usecs)\n" " event count = <%d>\n" " config file name = <%s>\n" " pid file name = <%s>\n", - this->lcl_name_?this->lcl_name_:"nil", this->event_period_, this->event_count_, @@ -146,45 +143,18 @@ ECM_Driver::run (int argc, char* argv[]) } #endif /* 0 */ - CORBA::Object_var naming_obj = - this->orb_->resolve_initial_references ("NameService"); - if (CORBA::is_nil (naming_obj.in ())) - ACE_ERROR_RETURN ((LM_ERROR, - " (%P|%t) Unable to get the Naming Service.\n"), - 1); - - CosNaming::NamingContext_var naming_context = - CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; - ACE_Config_Scheduler scheduler_impl; RtecScheduler::Scheduler_var scheduler = scheduler_impl._this (TAO_TRY_ENV); TAO_CHECK_ENV; - // We use this buffer to generate the names of the local - // services. - const int bufsize = 512; - char buf[bufsize]; - CORBA::String_var str = this->orb_->object_to_string (scheduler.in (), TAO_TRY_ENV); TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n", str.in ())); - ACE_OS::strcpy (buf, "ScheduleService@"); - ACE_OS::strcat (buf, this->lcl_name_); - - // Register the servant with the Naming Context.... - CosNaming::Name schedule_name (1); - schedule_name.length (1); - schedule_name[0].id = CORBA::string_dup (buf); - naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; - - if (ACE_Scheduler_Factory::use_config (naming_context.in (), - buf) == -1) + if (ACE_Scheduler_Factory::server (scheduler.in ()) == -1) return -1; // Create the EventService implementation, but don't start its @@ -201,15 +171,6 @@ ECM_Driver::run (int argc, char* argv[]) ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ())); - ACE_OS::strcpy (buf, "EventChannel@"); - ACE_OS::strcat (buf, this->lcl_name_); - - CosNaming::Name channel_name (1); - channel_name.length (1); - channel_name[0].id = CORBA::string_dup (buf); - naming_context->bind (channel_name, ec.in (), TAO_TRY_ENV); - TAO_CHECK_ENV; - ACE_DEBUG ((LM_DEBUG, "waiting to start\n")); ACE_Time_Value tv (15, 0); @@ -276,15 +237,10 @@ ECM_Driver::run (int argc, char* argv[]) TAO_CHECK_ENV; ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n")); + tv.set (5, 0); if (this->orb_->run (&tv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1); - - naming_context->unbind (schedule_name, TAO_TRY_ENV); - TAO_CHECK_ENV; - - naming_context->unbind (channel_name, TAO_TRY_ENV); - TAO_CHECK_ENV; } TAO_CATCH (CORBA::SystemException, sys_ex) { @@ -425,10 +381,6 @@ ECM_Driver::parse_args (int argc, char *argv []) { switch (opt) { - case 'l': - this->lcl_name_ = get_opt.optarg; - break; - case 'p': this->pid_filename_ = get_opt.optarg; break; @@ -469,7 +421,7 @@ ECM_Driver::parse_args (int argc, char *argv []) "-l <localname> " "-p <pid file name> " "-c <config file name> " - "-g federation,federation,... " + "-f federation,federation,... " "\n", argv[0])); return -1; @@ -909,7 +861,7 @@ ECM_Supplier::push (const RtecEventComm::EventSet& events, void ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env) { - this->supplier_proxy_->disconnect_push_supplier (_env); + // this->supplier_proxy_->disconnect_push_supplier (_env); } void @@ -920,7 +872,9 @@ ECM_Supplier::disconnect_push_consumer (CORBA::Environment &) // **************************************************************** ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation) - : federation_ (federation) + : federation_ (federation), + supplier_proxy_ (0), + consumer_admin_ (0) { } @@ -928,9 +882,10 @@ void ECM_Consumer::open (const char* name, RtecEventChannelAdmin::EventChannel_ptr ec, RtecScheduler::Scheduler_ptr scheduler, + ACE_RANDR_TYPE &seed, CORBA::Environment& _env) { - RtecScheduler::handle_t rt_info = + this->rt_info_ = scheduler->create (name, _env); TAO_CHECK_ENV_RETURN_VOID(_env); @@ -939,35 +894,60 @@ ECM_Consumer::open (const char* name, ACE_Time_Value tv (0, 2000); TimeBase::TimeT time; ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - scheduler->set (rt_info, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - 0, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 0, - RtecScheduler::OPERATION, - _env); + scheduler->set (this->rt_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 0, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + // = Connect as a consumer. + this->consumer_admin_ = ec->for_consumers (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + + this->connect (seed, _env); +} + +void +ECM_Consumer::connect (ACE_RANDR_TYPE &seed, + CORBA::Environment& _env) +{ + if (CORBA::is_nil (this->consumer_admin_.in ())) + return; + + this->supplier_proxy_ = + this->consumer_admin_->obtain_push_supplier (_env); TAO_CHECK_ENV_RETURN_VOID (_env); ACE_ConsumerQOS_Factory qos; qos.start_disjunction_group (); - qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info); + qos.insert_type (ACE_ES_EVENT_SHUTDOWN, + this->rt_info_); const ECM_Federation* federation = this->federation_->federation (); for (int i = 0; i < federation->consumer_types (); ++i) { - qos.insert_type (federation->consumer_ipaddr (i), rt_info); + if (ACE_OS::rand_r (seed) < RAND_MAX / 2) + { + ACE_DEBUG ((LM_DEBUG, + "Federation %s leaves group %s\n", + federation->name (), + federation->consumer_name (i))); + this->federation_->subscribed_bit (i, 0); + continue; + } + ACE_DEBUG ((LM_DEBUG, + "Federation %s joins group %s\n", + federation->name (), + federation->consumer_name (i))); + this->federation_->subscribed_bit (i, 1); + qos.insert_type (federation->consumer_ipaddr (i), + this->rt_info_); } - // = Connect as a consumer. - RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - ec->for_consumers (_env); - TAO_CHECK_ENV_RETURN_VOID (_env); - - this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (_env); - TAO_CHECK_ENV_RETURN_VOID (_env); - RtecEventComm::PushConsumer_var objref = this->_this (_env); TAO_CHECK_ENV_RETURN_VOID (_env); @@ -978,15 +958,24 @@ ECM_Consumer::open (const char* name, } void -ECM_Consumer::close (CORBA::Environment &_env) +ECM_Consumer::disconnect (CORBA::Environment& _env) { - if (CORBA::is_nil (this->supplier_proxy_.in ())) + if (CORBA::is_nil (this->supplier_proxy_.in ()) + || CORBA::is_nil (this->consumer_admin_.in ())) return; this->supplier_proxy_->disconnect_push_supplier (_env); TAO_CHECK_ENV_RETURN_VOID (_env); + this->supplier_proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); +} - this->supplier_proxy_ = 0; +void +ECM_Consumer::close (CORBA::Environment &_env) +{ + this->disconnect (_env); + this->consumer_admin_ = + RtecEventChannelAdmin::ConsumerAdmin::_nil (); } void @@ -1017,8 +1006,17 @@ ECM_Local_Federation::ECM_Local_Federation (ECM_Federation *federation, event_count_ (0), last_publication_change_ (0), last_subscription_change_ (0), - mcast_eh_ (&receiver_) + mcast_eh_ (&receiver_), + subscription_change_period_ (10000), + publication_change_period_ (10000) +{ + ACE_NEW (this->subscription_subset_, + CORBA::Boolean[this->consumer_types ()]); +} + +ECM_Local_Federation::~ECM_Local_Federation (void) { + delete[] this->subscription_subset_; } void @@ -1033,15 +1031,17 @@ ECM_Local_Federation::open (int event_count, const int bufsize = 512; char buf[bufsize]; ACE_OS::strcpy (buf, this->federation_->name ()); - ACE_OS::strcat (buf, "::supplier"); + ACE_OS::strcat (buf, "/supplier"); this->supplier_.open (buf, period, ec, scheduler, _env); TAO_CHECK_ENV_RETURN_VOID (_env); ACE_OS::strcpy (buf, this->federation_->name ()); - ACE_OS::strcat (buf, "::consumer"); - this->consumer_.open (buf, ec, scheduler, _env); + ACE_OS::strcat (buf, "/consumer"); + this->consumer_.open (buf, ec, scheduler, this->seed_, _env); TAO_CHECK_ENV_RETURN_VOID (_env); + + this->last_subscription_change_ = ACE_OS::gettimeofday (); } void @@ -1101,6 +1101,24 @@ ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer TAO_CHECK_ENV_RETURN_VOID (_env); this->send_count_++; + + ACE_Time_Value delta = ACE_OS::gettimeofday () - + this->last_subscription_change_; + + double p = double (ACE_OS::rand_r (this->seed_)) / RAND_MAX; + double maxp = double (delta.msec ()) / this->subscription_change_period_; + + if (2 * p < maxp) + { + ACE_DEBUG ((LM_DEBUG, + "Reconfiguring federation %s: %f %f\n", + this->name (), p, maxp)); + this->consumer_.disconnect (_env); + TAO_CHECK_ENV_RETURN_VOID (_env); + this->consumer_.connect (this->seed_, _env); + TAO_CHECK_ENV_RETURN_VOID (_env); + this->last_subscription_change_ = ACE_OS::gettimeofday (); + } } void @@ -1123,15 +1141,15 @@ ECM_Local_Federation::consumer_push (ACE_hrtime_t, int j = 0; for (; j < this->federation_->consumer_types (); ++j) - { - if (ACE_static_cast (ACE_CAST_CONST CORBA::ULong, e.header.type) == - this->federation_->consumer_ipaddr(j)) - { - // @@ TODO check if the type is in the current - // subscription list. - break; - } - } + { + CORBA::ULong type = e.header.type; + if (type == this->federation_->consumer_ipaddr(j)) + { + if (this->subscribed_bit (j) == 0) + this->unfiltered_count_++; + break; + } + } if (j == this->federation_->consumer_types ()) this->invalid_count_++; } @@ -1236,6 +1254,24 @@ ECM_Local_Federation::dump_results (void) const this->send_count_)); } +void +ECM_Local_Federation::subscribed_bit (CORBA::ULong i, + CORBA::Boolean x) +{ + if (i > this->consumer_types ()) + return; + this->subscription_subset_[i] = x; +} + +CORBA::Boolean +ECM_Local_Federation::subscribed_bit (CORBA::ULong i) const +{ + if (i > this->consumer_types ()) + return 0; + return this->subscription_subset_[i]; +} + + // **************************************************************** int diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h index 758f5c3734b..c15c909b3b7 100644 --- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h +++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h @@ -215,12 +215,19 @@ public: void open (const char* name, RtecEventChannelAdmin::EventChannel_ptr event_channel, RtecScheduler::Scheduler_ptr scheduler, + ACE_RANDR_TYPE &seed, CORBA::Environment& _env); // This method connects the consumer to the EC. void close (CORBA::Environment &_env); // Disconnect from the EC. + void connect (ACE_RANDR_TYPE& seed, + CORBA::Environment &_env); + void disconnect (CORBA::Environment &_env); + // Disconnect from the supplier, but do not forget about it or close + // it. + // = The POA_RtecEventComm::PushComsumer methods. virtual void push (const RtecEventComm::EventSet& events, CORBA::Environment &_env); @@ -230,8 +237,14 @@ private: ECM_Local_Federation* federation_; // To callback. + RtecScheduler::handle_t rt_info_; + // The handle for our RT_Info description. + RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_; // We talk to the EC using this proxy. + + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin_; + // We talk to the EC using this proxy. }; class ECM_Local_Federation @@ -244,6 +257,8 @@ public: ECM_Local_Federation (ECM_Federation *federation, ECM_Driver *driver); // Constructor. + ~ECM_Local_Federation (void); + // Destructor void open (int event_count, RtecScheduler::Period period, @@ -284,6 +299,12 @@ public: void dump_results (void) const; // Report the results back to the user... + void subscribed_bit (CORBA::ULong i, + CORBA::Boolean x); + CORBA::Boolean subscribed_bit (CORBA::ULong i) const; + // Set&Get the subscribed bit; this defines the subset of events + // that we actually publish. + // = Delegate on the federation description const char* name (void) const; CORBA::UShort mcast_port (void) const; @@ -344,6 +365,18 @@ private: // dispatching of the event. // @@ TODO Eventually we may need several of this objects to handle // OS limitations on the number of multicast groups per socket. + + ACE_RANDR_TYPE seed_; + // The seed for a random number generator. + + CORBA::ULong subscription_change_period_; + // The (average) period between subscription changes, in usecs + + CORBA::ULong publication_change_period_; + // The (average) period between publication changes, in usecs + + CORBA::Boolean* subscription_subset_; + // The events we are actually subscribed to. }; class ECM_Driver @@ -439,9 +472,6 @@ private: // Dump the results to the standard output. private: - char* lcl_name_; - // The name of the "local" EC. - int event_period_; // The events are generated using this interval. |