summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-22 20:35:27 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-22 20:35:27 +0000
commit1c1adb7754f8965462794c3205262e2eda3b636b (patch)
tree6d556aab09d32276b10b668dc56ce4ebee3adbf4
parent086c0a2b10b62250b89ccd52830de820ba030bff (diff)
downloadATCD-1c1adb7754f8965462794c3205262e2eda3b636b.tar.gz
ChangeLogTag:Tue Sep 22 15:33:48 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog-98c14
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp218
-rw-r--r--TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h36
3 files changed, 171 insertions, 97 deletions
diff --git a/TAO/ChangeLog-98c b/TAO/ChangeLog-98c
index cab17f5be89..6ea535688c3 100644
--- a/TAO/ChangeLog-98c
+++ b/TAO/ChangeLog-98c
@@ -1,8 +1,16 @@
+Tue Sep 22 15:33:48 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/tests/EC_Mcast/EC_Mcast.h:
+ * orbsvcs/tests/EC_Mcast/EC_Mcast.cpp:
+ Added simple support for dynamic reconfiguration of the consumer
+ subscriptions; we still have to propagate this to the multicast
+ groups that we join.
+
Tue Sep 22 14:29:29 1998 Vishal Kachroo <vishal@merengue.cs.wustl.edu>
- Made changes to the ACE_DEBUG to print the server/client Process ID/ thread ID
- for all messages to console. Also modified README to include the description of \
- tests done by client.
+ Made changes to the ACE_DEBUG to print the server/client Process
+ ID/ thread ID for all messages to console. Also modified README to
+ include the description of tests done by client.
* examples/Simple/bank/README:
* examples/Simple/bank/ AccountManager_i.cpp
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
index 9e4d7475711..fa9fcf84649 100644
--- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
+++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.cpp
@@ -22,8 +22,7 @@
ACE_RCSID(EC_Mcast, EC_Mcast, "$Id$")
ECM_Driver::ECM_Driver (void)
- : lcl_name_ ("ECM"),
- event_period_ (25000),
+ : event_period_ (250000),
event_count_ (100),
config_filename_ (0),
pid_filename_ (0),
@@ -64,12 +63,10 @@ ECM_Driver::run (int argc, char* argv[])
ACE_DEBUG ((LM_DEBUG,
"Execution parameters:\n"
- " lcl name = <%s>\n"
" event period = <%d> (usecs)\n"
" event count = <%d>\n"
" config file name = <%s>\n"
" pid file name = <%s>\n",
- this->lcl_name_?this->lcl_name_:"nil",
this->event_period_,
this->event_count_,
@@ -146,45 +143,18 @@ ECM_Driver::run (int argc, char* argv[])
}
#endif /* 0 */
- CORBA::Object_var naming_obj =
- this->orb_->resolve_initial_references ("NameService");
- if (CORBA::is_nil (naming_obj.in ()))
- ACE_ERROR_RETURN ((LM_ERROR,
- " (%P|%t) Unable to get the Naming Service.\n"),
- 1);
-
- CosNaming::NamingContext_var naming_context =
- CosNaming::NamingContext::_narrow (naming_obj.in (), TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
ACE_Config_Scheduler scheduler_impl;
RtecScheduler::Scheduler_var scheduler =
scheduler_impl._this (TAO_TRY_ENV);
TAO_CHECK_ENV;
- // We use this buffer to generate the names of the local
- // services.
- const int bufsize = 512;
- char buf[bufsize];
-
CORBA::String_var str =
this->orb_->object_to_string (scheduler.in (), TAO_TRY_ENV);
TAO_CHECK_ENV;
ACE_DEBUG ((LM_DEBUG, "The (local) scheduler IOR is <%s>\n",
str.in ()));
- ACE_OS::strcpy (buf, "ScheduleService@");
- ACE_OS::strcat (buf, this->lcl_name_);
-
- // Register the servant with the Naming Context....
- CosNaming::Name schedule_name (1);
- schedule_name.length (1);
- schedule_name[0].id = CORBA::string_dup (buf);
- naming_context->bind (schedule_name, scheduler.in (), TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- if (ACE_Scheduler_Factory::use_config (naming_context.in (),
- buf) == -1)
+ if (ACE_Scheduler_Factory::server (scheduler.in ()) == -1)
return -1;
// Create the EventService implementation, but don't start its
@@ -201,15 +171,6 @@ ECM_Driver::run (int argc, char* argv[])
ACE_DEBUG ((LM_DEBUG, "The (local) EC IOR is <%s>\n", str.in ()));
- ACE_OS::strcpy (buf, "EventChannel@");
- ACE_OS::strcat (buf, this->lcl_name_);
-
- CosNaming::Name channel_name (1);
- channel_name.length (1);
- channel_name[0].id = CORBA::string_dup (buf);
- naming_context->bind (channel_name, ec.in (), TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
ACE_DEBUG ((LM_DEBUG, "waiting to start\n"));
ACE_Time_Value tv (15, 0);
@@ -276,15 +237,10 @@ ECM_Driver::run (int argc, char* argv[])
TAO_CHECK_ENV;
ACE_DEBUG ((LM_DEBUG, "shutdown grace period\n"));
+
tv.set (5, 0);
if (this->orb_->run (&tv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "orb->run"), -1);
-
- naming_context->unbind (schedule_name, TAO_TRY_ENV);
- TAO_CHECK_ENV;
-
- naming_context->unbind (channel_name, TAO_TRY_ENV);
- TAO_CHECK_ENV;
}
TAO_CATCH (CORBA::SystemException, sys_ex)
{
@@ -425,10 +381,6 @@ ECM_Driver::parse_args (int argc, char *argv [])
{
switch (opt)
{
- case 'l':
- this->lcl_name_ = get_opt.optarg;
- break;
-
case 'p':
this->pid_filename_ = get_opt.optarg;
break;
@@ -469,7 +421,7 @@ ECM_Driver::parse_args (int argc, char *argv [])
"-l <localname> "
"-p <pid file name> "
"-c <config file name> "
- "-g federation,federation,... "
+ "-f federation,federation,... "
"\n",
argv[0]));
return -1;
@@ -909,7 +861,7 @@ ECM_Supplier::push (const RtecEventComm::EventSet& events,
void
ECM_Supplier::disconnect_push_supplier (CORBA::Environment& _env)
{
- this->supplier_proxy_->disconnect_push_supplier (_env);
+ // this->supplier_proxy_->disconnect_push_supplier (_env);
}
void
@@ -920,7 +872,9 @@ ECM_Supplier::disconnect_push_consumer (CORBA::Environment &)
// ****************************************************************
ECM_Consumer::ECM_Consumer (ECM_Local_Federation *federation)
- : federation_ (federation)
+ : federation_ (federation),
+ supplier_proxy_ (0),
+ consumer_admin_ (0)
{
}
@@ -928,9 +882,10 @@ void
ECM_Consumer::open (const char* name,
RtecEventChannelAdmin::EventChannel_ptr ec,
RtecScheduler::Scheduler_ptr scheduler,
+ ACE_RANDR_TYPE &seed,
CORBA::Environment& _env)
{
- RtecScheduler::handle_t rt_info =
+ this->rt_info_ =
scheduler->create (name, _env);
TAO_CHECK_ENV_RETURN_VOID(_env);
@@ -939,35 +894,60 @@ ECM_Consumer::open (const char* name,
ACE_Time_Value tv (0, 2000);
TimeBase::TimeT time;
ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- scheduler->set (rt_info,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- 0,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 0,
- RtecScheduler::OPERATION,
- _env);
+ scheduler->set (this->rt_info_,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 0,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION,
+ _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ // = Connect as a consumer.
+ this->consumer_admin_ = ec->for_consumers (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->connect (seed, _env);
+}
+
+void
+ECM_Consumer::connect (ACE_RANDR_TYPE &seed,
+ CORBA::Environment& _env)
+{
+ if (CORBA::is_nil (this->consumer_admin_.in ()))
+ return;
+
+ this->supplier_proxy_ =
+ this->consumer_admin_->obtain_push_supplier (_env);
TAO_CHECK_ENV_RETURN_VOID (_env);
ACE_ConsumerQOS_Factory qos;
qos.start_disjunction_group ();
- qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
+ qos.insert_type (ACE_ES_EVENT_SHUTDOWN,
+ this->rt_info_);
const ECM_Federation* federation = this->federation_->federation ();
for (int i = 0; i < federation->consumer_types (); ++i)
{
- qos.insert_type (federation->consumer_ipaddr (i), rt_info);
+ if (ACE_OS::rand_r (seed) < RAND_MAX / 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Federation %s leaves group %s\n",
+ federation->name (),
+ federation->consumer_name (i)));
+ this->federation_->subscribed_bit (i, 0);
+ continue;
+ }
+ ACE_DEBUG ((LM_DEBUG,
+ "Federation %s joins group %s\n",
+ federation->name (),
+ federation->consumer_name (i)));
+ this->federation_->subscribed_bit (i, 1);
+ qos.insert_type (federation->consumer_ipaddr (i),
+ this->rt_info_);
}
- // = Connect as a consumer.
- RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- ec->for_consumers (_env);
- TAO_CHECK_ENV_RETURN_VOID (_env);
-
- this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (_env);
- TAO_CHECK_ENV_RETURN_VOID (_env);
-
RtecEventComm::PushConsumer_var objref = this->_this (_env);
TAO_CHECK_ENV_RETURN_VOID (_env);
@@ -978,15 +958,24 @@ ECM_Consumer::open (const char* name,
}
void
-ECM_Consumer::close (CORBA::Environment &_env)
+ECM_Consumer::disconnect (CORBA::Environment& _env)
{
- if (CORBA::is_nil (this->supplier_proxy_.in ()))
+ if (CORBA::is_nil (this->supplier_proxy_.in ())
+ || CORBA::is_nil (this->consumer_admin_.in ()))
return;
this->supplier_proxy_->disconnect_push_supplier (_env);
TAO_CHECK_ENV_RETURN_VOID (_env);
+ this->supplier_proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+}
- this->supplier_proxy_ = 0;
+void
+ECM_Consumer::close (CORBA::Environment &_env)
+{
+ this->disconnect (_env);
+ this->consumer_admin_ =
+ RtecEventChannelAdmin::ConsumerAdmin::_nil ();
}
void
@@ -1017,8 +1006,17 @@ ECM_Local_Federation::ECM_Local_Federation (ECM_Federation *federation,
event_count_ (0),
last_publication_change_ (0),
last_subscription_change_ (0),
- mcast_eh_ (&receiver_)
+ mcast_eh_ (&receiver_),
+ subscription_change_period_ (10000),
+ publication_change_period_ (10000)
+{
+ ACE_NEW (this->subscription_subset_,
+ CORBA::Boolean[this->consumer_types ()]);
+}
+
+ECM_Local_Federation::~ECM_Local_Federation (void)
{
+ delete[] this->subscription_subset_;
}
void
@@ -1033,15 +1031,17 @@ ECM_Local_Federation::open (int event_count,
const int bufsize = 512;
char buf[bufsize];
ACE_OS::strcpy (buf, this->federation_->name ());
- ACE_OS::strcat (buf, "::supplier");
+ ACE_OS::strcat (buf, "/supplier");
this->supplier_.open (buf, period, ec, scheduler, _env);
TAO_CHECK_ENV_RETURN_VOID (_env);
ACE_OS::strcpy (buf, this->federation_->name ());
- ACE_OS::strcat (buf, "::consumer");
- this->consumer_.open (buf, ec, scheduler, _env);
+ ACE_OS::strcat (buf, "/consumer");
+ this->consumer_.open (buf, ec, scheduler, this->seed_, _env);
TAO_CHECK_ENV_RETURN_VOID (_env);
+
+ this->last_subscription_change_ = ACE_OS::gettimeofday ();
}
void
@@ -1101,6 +1101,24 @@ ECM_Local_Federation::supplier_timeout (RtecEventComm::PushConsumer_ptr consumer
TAO_CHECK_ENV_RETURN_VOID (_env);
this->send_count_++;
+
+ ACE_Time_Value delta = ACE_OS::gettimeofday () -
+ this->last_subscription_change_;
+
+ double p = double (ACE_OS::rand_r (this->seed_)) / RAND_MAX;
+ double maxp = double (delta.msec ()) / this->subscription_change_period_;
+
+ if (2 * p < maxp)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Reconfiguring federation %s: %f %f\n",
+ this->name (), p, maxp));
+ this->consumer_.disconnect (_env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+ this->consumer_.connect (this->seed_, _env);
+ TAO_CHECK_ENV_RETURN_VOID (_env);
+ this->last_subscription_change_ = ACE_OS::gettimeofday ();
+ }
}
void
@@ -1123,15 +1141,15 @@ ECM_Local_Federation::consumer_push (ACE_hrtime_t,
int j = 0;
for (; j < this->federation_->consumer_types (); ++j)
- {
- if (ACE_static_cast (ACE_CAST_CONST CORBA::ULong, e.header.type) ==
- this->federation_->consumer_ipaddr(j))
- {
- // @@ TODO check if the type is in the current
- // subscription list.
- break;
- }
- }
+ {
+ CORBA::ULong type = e.header.type;
+ if (type == this->federation_->consumer_ipaddr(j))
+ {
+ if (this->subscribed_bit (j) == 0)
+ this->unfiltered_count_++;
+ break;
+ }
+ }
if (j == this->federation_->consumer_types ())
this->invalid_count_++;
}
@@ -1236,6 +1254,24 @@ ECM_Local_Federation::dump_results (void) const
this->send_count_));
}
+void
+ECM_Local_Federation::subscribed_bit (CORBA::ULong i,
+ CORBA::Boolean x)
+{
+ if (i > this->consumer_types ())
+ return;
+ this->subscription_subset_[i] = x;
+}
+
+CORBA::Boolean
+ECM_Local_Federation::subscribed_bit (CORBA::ULong i) const
+{
+ if (i > this->consumer_types ())
+ return 0;
+ return this->subscription_subset_[i];
+}
+
+
// ****************************************************************
int
diff --git a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
index 758f5c3734b..c15c909b3b7 100644
--- a/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
+++ b/TAO/orbsvcs/tests/EC_Mcast/EC_Mcast.h
@@ -215,12 +215,19 @@ public:
void open (const char* name,
RtecEventChannelAdmin::EventChannel_ptr event_channel,
RtecScheduler::Scheduler_ptr scheduler,
+ ACE_RANDR_TYPE &seed,
CORBA::Environment& _env);
// This method connects the consumer to the EC.
void close (CORBA::Environment &_env);
// Disconnect from the EC.
+ void connect (ACE_RANDR_TYPE& seed,
+ CORBA::Environment &_env);
+ void disconnect (CORBA::Environment &_env);
+ // Disconnect from the supplier, but do not forget about it or close
+ // it.
+
// = The POA_RtecEventComm::PushComsumer methods.
virtual void push (const RtecEventComm::EventSet& events,
CORBA::Environment &_env);
@@ -230,8 +237,14 @@ private:
ECM_Local_Federation* federation_;
// To callback.
+ RtecScheduler::handle_t rt_info_;
+ // The handle for our RT_Info description.
+
RtecEventChannelAdmin::ProxyPushSupplier_var supplier_proxy_;
// We talk to the EC using this proxy.
+
+ RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin_;
+ // We talk to the EC using this proxy.
};
class ECM_Local_Federation
@@ -244,6 +257,8 @@ public:
ECM_Local_Federation (ECM_Federation *federation,
ECM_Driver *driver);
// Constructor.
+ ~ECM_Local_Federation (void);
+ // Destructor
void open (int event_count,
RtecScheduler::Period period,
@@ -284,6 +299,12 @@ public:
void dump_results (void) const;
// Report the results back to the user...
+ void subscribed_bit (CORBA::ULong i,
+ CORBA::Boolean x);
+ CORBA::Boolean subscribed_bit (CORBA::ULong i) const;
+ // Set&Get the subscribed bit; this defines the subset of events
+ // that we actually publish.
+
// = Delegate on the federation description
const char* name (void) const;
CORBA::UShort mcast_port (void) const;
@@ -344,6 +365,18 @@ private:
// dispatching of the event.
// @@ TODO Eventually we may need several of this objects to handle
// OS limitations on the number of multicast groups per socket.
+
+ ACE_RANDR_TYPE seed_;
+ // The seed for a random number generator.
+
+ CORBA::ULong subscription_change_period_;
+ // The (average) period between subscription changes, in usecs
+
+ CORBA::ULong publication_change_period_;
+ // The (average) period between publication changes, in usecs
+
+ CORBA::Boolean* subscription_subset_;
+ // The events we are actually subscribed to.
};
class ECM_Driver
@@ -439,9 +472,6 @@ private:
// Dump the results to the standard output.
private:
- char* lcl_name_;
- // The name of the "local" EC.
-
int event_period_;
// The events are generated using this interval.