summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp321
1 files changed, 99 insertions, 222 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
index 9b61bcae6a8..1f68c500a66 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
@@ -30,12 +30,8 @@ TAO_EC_Gateway::observer_handle (void) const
// ****************************************************************
TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
- : busy_count_ (0),
- update_posted_ (0),
- consumer_ (this),
- consumer_is_active_ (0),
- supplier_ (this),
- supplier_is_active_ (0)
+ : consumer_ (this),
+ supplier_ (this)
{
}
@@ -50,78 +46,58 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
RtecScheduler::Scheduler_ptr lcl_sched,
const char* lcl_name,
const char* rmt_name,
- CORBA::Environment &ACE_TRY_ENV)
+ CORBA::Environment &TAO_IN_ENV)
{
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- if (!CORBA::is_nil (this->rmt_ec_.in ()))
- return;
-
this->rmt_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec);
this->lcl_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec);
- if (!CORBA::is_nil (rmt_sched))
- {
- this->rmt_info_ =
- rmt_sched->create (rmt_name, ACE_TRY_ENV);
- ACE_CHECK;
-
- // @@ TODO Many things are hard-coded in the RT_Info here.
-
- // The worst case execution time is far less than 500 usecs, but
- // that is a safe estimate....
- ACE_Time_Value tv (0, 500);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- rmt_sched->set (this->rmt_info_,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- 25000 * 10,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 0,
- RtecScheduler::OPERATION,
- ACE_TRY_ENV);
- ACE_CHECK;
- }
-
- if (!CORBA::is_nil (lcl_sched))
- {
- this->lcl_info_ =
- lcl_sched->create (lcl_name, ACE_TRY_ENV);
- ACE_CHECK;
-
- ACE_Time_Value tv (0, 500);
- TimeBase::TimeT time;
- ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
- lcl_sched->set (this->lcl_info_,
- RtecScheduler::VERY_HIGH_CRITICALITY,
- time, time, time,
- 25000 * 10,
- RtecScheduler::VERY_LOW_IMPORTANCE,
- time,
- 1,
- RtecScheduler::REMOTE_DEPENDANT,
- ACE_TRY_ENV);
- ACE_CHECK;
- }
-}
-
-void
-TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV)
-{
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+ this->rmt_info_ =
+ rmt_sched->create (rmt_name, TAO_IN_ENV);
+ if (TAO_IN_ENV.exception () != 0) return;
+
+ // @@ TODO Many things are hard-coded in the RT_Info here.
+
+ // The worst case execution time is far less than 500 usecs, but
+ // that is a safe estimate....
+ ACE_Time_Value tv (0, 500);
+ TimeBase::TimeT time;
+ ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
+ rmt_sched->set (this->rmt_info_,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 25000 * 10,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 0,
+ RtecScheduler::OPERATION,
+ TAO_IN_ENV);
+ if (TAO_IN_ENV.exception () != 0) return;
+
+ this->lcl_info_ =
+ lcl_sched->create (lcl_name, TAO_IN_ENV);
+ if (TAO_IN_ENV.exception () != 0) return;
+
+ lcl_sched->set (this->lcl_info_,
+ RtecScheduler::VERY_HIGH_CRITICALITY,
+ time, time, time,
+ 25000 * 10,
+ RtecScheduler::VERY_LOW_IMPORTANCE,
+ time,
+ 1,
+ RtecScheduler::REMOTE_DEPENDANT,
+ TAO_IN_ENV);
+ if (TAO_IN_ENV.exception () != 0) return;
- this->close_i (ACE_TRY_ENV);
}
-
void
-TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_Gateway_IIOP::close (CORBA::Environment &env)
{
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
+ if (CORBA::is_nil (this->supplier_proxy_.in ()))
+ return;
if (this->consumer_proxy_map_.current_size () > 0)
{
@@ -129,105 +105,52 @@ TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV)
j != this->consumer_proxy_map_.end ();
++j)
{
- RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
- if (CORBA::is_nil (consumer))
- continue;
- consumer->disconnect_push_consumer (ACE_TRY_ENV);
- CORBA::release (consumer);
- ACE_CHECK;
+ (*j).int_id_->disconnect_push_consumer (env);
+ CORBA::release ((*j).int_id_);
+ TAO_CHECK_ENV_RETURN_VOID (env);
}
this->consumer_proxy_map_.close ();
}
- if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
- {
- this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV);
- ACE_CHECK;
+ this->default_consumer_proxy_->disconnect_push_consumer (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+ this->default_consumer_proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
- this->default_consumer_proxy_ =
- RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
- }
-
- if (this->supplier_is_active_)
- {
- PortableServer::POA_var poa =
- this->supplier_._default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->supplier_, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
- this->supplier_is_active_ = 0;
- }
-
- if (!CORBA::is_nil (this->supplier_proxy_.in ()))
- {
- this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV);
- ACE_CHECK;
-
- this->supplier_proxy_ =
- RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
- }
-
- if (this->consumer_is_active_)
- {
- PortableServer::POA_var poa =
- this->consumer_._default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->consumer_, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
- this->consumer_is_active_ = 0;
- }
+ this->supplier_proxy_->disconnect_push_supplier (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+ this->supplier_proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
}
void
-TAO_EC_Gateway_IIOP::update_consumer (
- const RtecEventChannelAdmin::ConsumerQOS& c_qos,
- CORBA::Environment& ACE_TRY_ENV)
+TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& c_qos,
+ CORBA::Environment& env)
{
+ this->close (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+
if (c_qos.dependencies.length () <= 1)
return;
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- if (this->busy_count_ != 0)
- {
- this->update_posted_ = 1;
- this->c_qos_ = c_qos;
- return;
- }
-
- this->update_consumer_i (c_qos, ACE_TRY_ENV);
-}
-
-void
-TAO_EC_Gateway_IIOP::update_consumer_i (
- const RtecEventChannelAdmin::ConsumerQOS& c_qos,
- CORBA::Environment& ACE_TRY_ENV)
-{
- this->close_i (ACE_TRY_ENV);
- ACE_CHECK;
-
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n"));
if (CORBA::is_nil (this->lcl_ec_.in ())
|| CORBA::is_nil (this->rmt_ec_.in ()))
return;
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n"));
-
// = Connect as a supplier to the local EC
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- this->lcl_ec_->for_suppliers (ACE_TRY_ENV);
- ACE_CHECK;
+ this->lcl_ec_->for_suppliers (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
+
+ this->default_consumer_proxy_ =
+ supplier_admin->obtain_push_consumer (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
// Change the RT_Info in the consumer QoS.
// On the same loop we discover the subscriptions by event source,
// and fill the consumer proxy map.
RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
- sub.is_gateway = 1;
for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
{
sub.dependencies[i].rt_info = this->rmt_info_;
@@ -238,22 +161,19 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
if (sid != 0
&& this->consumer_proxy_map_.find (sid, proxy) == -1)
{
- proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
- ACE_CHECK;
+ proxy = supplier_admin->obtain_push_consumer (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
this->consumer_proxy_map_.bind (sid, proxy);
}
}
+ // Obtain a reference to our supplier personality...
+ RtecEventComm::PushSupplier_var supplier_ref =
+ this->supplier_._this (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
if (this->consumer_proxy_map_.current_size () > 0)
{
- this->supplier_is_active_ = 1;
-
- // Obtain a reference to our supplier personality...
- RtecEventComm::PushSupplier_var supplier_ref =
- this->supplier_._this (ACE_TRY_ENV);
- ACE_CHECK;
-
// For each subscription by source build the set of publications
// (they may several, by type, for instance) and connect to the
// consumer proxy.
@@ -262,7 +182,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
++j)
{
RtecEventChannelAdmin::SupplierQOS pub;
- pub.publications.length (sub.dependencies.length ());
+ pub.publications.length (sub.dependencies.length () - 1);
pub.is_gateway = 1;
int c = 0;
@@ -288,8 +208,8 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
pub.publications.length (c);
(*j).int_id_->connect_push_supplier (supplier_ref.in (),
pub,
- ACE_TRY_ENV);
- ACE_CHECK;
+ env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
}
}
@@ -316,59 +236,44 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
pub.publications[c].dependency_info.rt_info = this->lcl_info_;
c++;
}
-
if (c > 0)
{
- this->supplier_is_active_ = 1;
-
- // Obtain a reference to our supplier personality...
- RtecEventComm::PushSupplier_var supplier_ref =
- this->supplier_._this (ACE_TRY_ENV);
- ACE_CHECK;
-
- // Obtain the consumer....
- this->default_consumer_proxy_ =
- supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
- ACE_CHECK;
-
pub.publications.length (c);
this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
pub,
- ACE_TRY_ENV);
- ACE_CHECK;
+ env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
}
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
- // ACE_SupplierQOS_Factory::debug (pub);
+ //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
+ //ACE_SupplierQOS_Factory::debug (pub);
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- this->rmt_ec_->for_consumers (ACE_TRY_ENV);
- ACE_CHECK;
+ this->rmt_ec_->for_consumers (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (ACE_TRY_ENV);
- ACE_CHECK;
+ consumer_admin->obtain_push_supplier (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
- this->consumer_is_active_ = 1;
RtecEventComm::PushConsumer_var consumer_ref =
- this->consumer_._this (ACE_TRY_ENV);
- ACE_CHECK;
+ this->consumer_._this (env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer "));
- // ACE_ConsumerQOS_Factory::debug (sub);
+ //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer "));
+ //ACE_ConsumerQOS_Factory::debug (sub);
this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
sub,
- ACE_TRY_ENV);
- ACE_CHECK;
+ env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
}
void
-TAO_EC_Gateway_IIOP::update_supplier (
- const RtecEventChannelAdmin::SupplierQOS&,
- CORBA::Environment&)
+TAO_EC_Gateway_IIOP::update_supplier (const RtecEventChannelAdmin::SupplierQOS&,
+ CORBA::Environment&)
{
// Do nothing...
}
@@ -391,7 +296,7 @@ TAO_EC_Gateway_IIOP::disconnect_push_supplier (CORBA::Environment &)
void
TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
- CORBA::Environment & ACE_TRY_ENV)
+ CORBA::Environment & env)
{
//ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push - "));
@@ -401,13 +306,7 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
return;
}
- {
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->busy_count_++;
- }
-
- // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
+ //ACE_DEBUG ((LM_DEBUG, "ECP: %d event(s) - ", events.length ()));
// @@ TODO, there is an extra data copy here, we should do the event
// modification without it and only compact the necessary events.
@@ -427,43 +326,21 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
// default consumer proxy.
proxy = this->default_consumer_proxy_.in ();
}
-
- if (CORBA::is_nil (proxy))
- continue;
-
out[0] = events[i];
out[0].header.ttl--;
-
- proxy->push (out, ACE_TRY_ENV);
- ACE_CHECK;
+ proxy->push (out, env);
+ TAO_CHECK_ENV_RETURN_VOID (env);
}
-
- {
- ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
-
- this->busy_count_--;
-
- if (this->busy_count_ == 0 && this->update_posted_ != 0)
- {
- this->update_posted_ = 0;
- this->update_consumer_i (this->c_qos_, ACE_TRY_ENV);
- }
- }
-
}
int
-TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& ACE_TRY_ENV)
+TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& TAO_IN_ENV)
{
- ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+ this->close (TAO_IN_ENV);
+ if (TAO_IN_ENV.exception () == 0) return -1;
- this->close_i (ACE_TRY_ENV);
- ACE_CHECK_RETURN (-1);
-
- this->lcl_ec_ =
- RtecEventChannelAdmin::EventChannel::_nil ();
- this->rmt_ec_ =
- RtecEventChannelAdmin::EventChannel::_nil ();
+ this->lcl_ec_ = 0;
+ this->rmt_ec_ = 0;
return 0;
}