diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp | 321 |
1 files changed, 99 insertions, 222 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index 9b61bcae6a8..1f68c500a66 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -30,12 +30,8 @@ TAO_EC_Gateway::observer_handle (void) const // **************************************************************** TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void) - : busy_count_ (0), - update_posted_ (0), - consumer_ (this), - consumer_is_active_ (0), - supplier_ (this), - supplier_is_active_ (0) + : consumer_ (this), + supplier_ (this) { } @@ -50,78 +46,58 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, RtecScheduler::Scheduler_ptr lcl_sched, const char* lcl_name, const char* rmt_name, - CORBA::Environment &ACE_TRY_ENV) + CORBA::Environment &TAO_IN_ENV) { - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - if (!CORBA::is_nil (this->rmt_ec_.in ())) - return; - this->rmt_ec_ = RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec); this->lcl_ec_ = RtecEventChannelAdmin::EventChannel::_duplicate (lcl_ec); - if (!CORBA::is_nil (rmt_sched)) - { - this->rmt_info_ = - rmt_sched->create (rmt_name, ACE_TRY_ENV); - ACE_CHECK; - - // @@ TODO Many things are hard-coded in the RT_Info here. - - // The worst case execution time is far less than 500 usecs, but - // that is a safe estimate.... - ACE_Time_Value tv (0, 500); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - rmt_sched->set (this->rmt_info_, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - 25000 * 10, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 0, - RtecScheduler::OPERATION, - ACE_TRY_ENV); - ACE_CHECK; - } - - if (!CORBA::is_nil (lcl_sched)) - { - this->lcl_info_ = - lcl_sched->create (lcl_name, ACE_TRY_ENV); - ACE_CHECK; - - ACE_Time_Value tv (0, 500); - TimeBase::TimeT time; - ORBSVCS_Time::Time_Value_to_TimeT (time, tv); - lcl_sched->set (this->lcl_info_, - RtecScheduler::VERY_HIGH_CRITICALITY, - time, time, time, - 25000 * 10, - RtecScheduler::VERY_LOW_IMPORTANCE, - time, - 1, - RtecScheduler::REMOTE_DEPENDANT, - ACE_TRY_ENV); - ACE_CHECK; - } -} - -void -TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV) -{ - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + this->rmt_info_ = + rmt_sched->create (rmt_name, TAO_IN_ENV); + if (TAO_IN_ENV.exception () != 0) return; + + // @@ TODO Many things are hard-coded in the RT_Info here. + + // The worst case execution time is far less than 500 usecs, but + // that is a safe estimate.... + ACE_Time_Value tv (0, 500); + TimeBase::TimeT time; + ORBSVCS_Time::Time_Value_to_TimeT (time, tv); + rmt_sched->set (this->rmt_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 0, + RtecScheduler::OPERATION, + TAO_IN_ENV); + if (TAO_IN_ENV.exception () != 0) return; + + this->lcl_info_ = + lcl_sched->create (lcl_name, TAO_IN_ENV); + if (TAO_IN_ENV.exception () != 0) return; + + lcl_sched->set (this->lcl_info_, + RtecScheduler::VERY_HIGH_CRITICALITY, + time, time, time, + 25000 * 10, + RtecScheduler::VERY_LOW_IMPORTANCE, + time, + 1, + RtecScheduler::REMOTE_DEPENDANT, + TAO_IN_ENV); + if (TAO_IN_ENV.exception () != 0) return; - this->close_i (ACE_TRY_ENV); } - void -TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV) +TAO_EC_Gateway_IIOP::close (CORBA::Environment &env) { // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); + if (CORBA::is_nil (this->supplier_proxy_.in ())) + return; if (this->consumer_proxy_map_.current_size () > 0) { @@ -129,105 +105,52 @@ TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV) j != this->consumer_proxy_map_.end (); ++j) { - RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; - if (CORBA::is_nil (consumer)) - continue; - consumer->disconnect_push_consumer (ACE_TRY_ENV); - CORBA::release (consumer); - ACE_CHECK; + (*j).int_id_->disconnect_push_consumer (env); + CORBA::release ((*j).int_id_); + TAO_CHECK_ENV_RETURN_VOID (env); } this->consumer_proxy_map_.close (); } - if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) - { - this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV); - ACE_CHECK; + this->default_consumer_proxy_->disconnect_push_consumer (env); + TAO_CHECK_ENV_RETURN_VOID (env); + this->default_consumer_proxy_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); - this->default_consumer_proxy_ = - RtecEventChannelAdmin::ProxyPushConsumer::_nil (); - } - - if (this->supplier_is_active_) - { - PortableServer::POA_var poa = - this->supplier_._default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->supplier_, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; - this->supplier_is_active_ = 0; - } - - if (!CORBA::is_nil (this->supplier_proxy_.in ())) - { - this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV); - ACE_CHECK; - - this->supplier_proxy_ = - RtecEventChannelAdmin::ProxyPushSupplier::_nil (); - } - - if (this->consumer_is_active_) - { - PortableServer::POA_var poa = - this->consumer_._default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->consumer_, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; - this->consumer_is_active_ = 0; - } + this->supplier_proxy_->disconnect_push_supplier (env); + TAO_CHECK_ENV_RETURN_VOID (env); + this->supplier_proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); } void -TAO_EC_Gateway_IIOP::update_consumer ( - const RtecEventChannelAdmin::ConsumerQOS& c_qos, - CORBA::Environment& ACE_TRY_ENV) +TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& c_qos, + CORBA::Environment& env) { + this->close (env); + TAO_CHECK_ENV_RETURN_VOID (env); + if (c_qos.dependencies.length () <= 1) return; - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - if (this->busy_count_ != 0) - { - this->update_posted_ = 1; - this->c_qos_ = c_qos; - return; - } - - this->update_consumer_i (c_qos, ACE_TRY_ENV); -} - -void -TAO_EC_Gateway_IIOP::update_consumer_i ( - const RtecEventChannelAdmin::ConsumerQOS& c_qos, - CORBA::Environment& ACE_TRY_ENV) -{ - this->close_i (ACE_TRY_ENV); - ACE_CHECK; - + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); if (CORBA::is_nil (this->lcl_ec_.in ()) || CORBA::is_nil (this->rmt_ec_.in ())) return; - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); - // = Connect as a supplier to the local EC RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - this->lcl_ec_->for_suppliers (ACE_TRY_ENV); - ACE_CHECK; + this->lcl_ec_->for_suppliers (env); + TAO_CHECK_ENV_RETURN_VOID (env); + + this->default_consumer_proxy_ = + supplier_admin->obtain_push_consumer (env); + TAO_CHECK_ENV_RETURN_VOID (env); // Change the RT_Info in the consumer QoS. // On the same loop we discover the subscriptions by event source, // and fill the consumer proxy map. RtecEventChannelAdmin::ConsumerQOS sub = c_qos; - sub.is_gateway = 1; for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) { sub.dependencies[i].rt_info = this->rmt_info_; @@ -238,22 +161,19 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( if (sid != 0 && this->consumer_proxy_map_.find (sid, proxy) == -1) { - proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV); - ACE_CHECK; + proxy = supplier_admin->obtain_push_consumer (env); + TAO_CHECK_ENV_RETURN_VOID (env); this->consumer_proxy_map_.bind (sid, proxy); } } + // Obtain a reference to our supplier personality... + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (env); + TAO_CHECK_ENV_RETURN_VOID (env); if (this->consumer_proxy_map_.current_size () > 0) { - this->supplier_is_active_ = 1; - - // Obtain a reference to our supplier personality... - RtecEventComm::PushSupplier_var supplier_ref = - this->supplier_._this (ACE_TRY_ENV); - ACE_CHECK; - // For each subscription by source build the set of publications // (they may several, by type, for instance) and connect to the // consumer proxy. @@ -262,7 +182,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( ++j) { RtecEventChannelAdmin::SupplierQOS pub; - pub.publications.length (sub.dependencies.length ()); + pub.publications.length (sub.dependencies.length () - 1); pub.is_gateway = 1; int c = 0; @@ -288,8 +208,8 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( pub.publications.length (c); (*j).int_id_->connect_push_supplier (supplier_ref.in (), pub, - ACE_TRY_ENV); - ACE_CHECK; + env); + TAO_CHECK_ENV_RETURN_VOID (env); } } @@ -316,59 +236,44 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( pub.publications[c].dependency_info.rt_info = this->lcl_info_; c++; } - if (c > 0) { - this->supplier_is_active_ = 1; - - // Obtain a reference to our supplier personality... - RtecEventComm::PushSupplier_var supplier_ref = - this->supplier_._this (ACE_TRY_ENV); - ACE_CHECK; - - // Obtain the consumer.... - this->default_consumer_proxy_ = - supplier_admin->obtain_push_consumer (ACE_TRY_ENV); - ACE_CHECK; - pub.publications.length (c); this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), pub, - ACE_TRY_ENV); - ACE_CHECK; + env); + TAO_CHECK_ENV_RETURN_VOID (env); } - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); - // ACE_SupplierQOS_Factory::debug (pub); + //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); + //ACE_SupplierQOS_Factory::debug (pub); RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - this->rmt_ec_->for_consumers (ACE_TRY_ENV); - ACE_CHECK; + this->rmt_ec_->for_consumers (env); + TAO_CHECK_ENV_RETURN_VOID (env); this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (ACE_TRY_ENV); - ACE_CHECK; + consumer_admin->obtain_push_supplier (env); + TAO_CHECK_ENV_RETURN_VOID (env); - this->consumer_is_active_ = 1; RtecEventComm::PushConsumer_var consumer_ref = - this->consumer_._this (ACE_TRY_ENV); - ACE_CHECK; + this->consumer_._this (env); + TAO_CHECK_ENV_RETURN_VOID (env); - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer ")); - // ACE_ConsumerQOS_Factory::debug (sub); + //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer ")); + //ACE_ConsumerQOS_Factory::debug (sub); this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), sub, - ACE_TRY_ENV); - ACE_CHECK; + env); + TAO_CHECK_ENV_RETURN_VOID (env); } void -TAO_EC_Gateway_IIOP::update_supplier ( - const RtecEventChannelAdmin::SupplierQOS&, - CORBA::Environment&) +TAO_EC_Gateway_IIOP::update_supplier (const RtecEventChannelAdmin::SupplierQOS&, + CORBA::Environment&) { // Do nothing... } @@ -391,7 +296,7 @@ TAO_EC_Gateway_IIOP::disconnect_push_supplier (CORBA::Environment &) void TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, - CORBA::Environment & ACE_TRY_ENV) + CORBA::Environment & env) { //ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push - ")); @@ -401,13 +306,7 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, return; } - { - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - this->busy_count_++; - } - - // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); + //ACE_DEBUG ((LM_DEBUG, "ECP: %d event(s) - ", events.length ())); // @@ TODO, there is an extra data copy here, we should do the event // modification without it and only compact the necessary events. @@ -427,43 +326,21 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, // default consumer proxy. proxy = this->default_consumer_proxy_.in (); } - - if (CORBA::is_nil (proxy)) - continue; - out[0] = events[i]; out[0].header.ttl--; - - proxy->push (out, ACE_TRY_ENV); - ACE_CHECK; + proxy->push (out, env); + TAO_CHECK_ENV_RETURN_VOID (env); } - - { - ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); - - this->busy_count_--; - - if (this->busy_count_ == 0 && this->update_posted_ != 0) - { - this->update_posted_ = 0; - this->update_consumer_i (this->c_qos_, ACE_TRY_ENV); - } - } - } int -TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& ACE_TRY_ENV) +TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& TAO_IN_ENV) { - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + this->close (TAO_IN_ENV); + if (TAO_IN_ENV.exception () == 0) return -1; - this->close_i (ACE_TRY_ENV); - ACE_CHECK_RETURN (-1); - - this->lcl_ec_ = - RtecEventChannelAdmin::EventChannel::_nil (); - this->rmt_ec_ = - RtecEventChannelAdmin::EventChannel::_nil (); + this->lcl_ec_ = 0; + this->rmt_ec_ = 0; return 0; } |