diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index 3b382128bd7..3f4ba2d2008 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -160,7 +160,9 @@ TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV) CORBA::release (consumer); ACE_CHECK; } - this->consumer_proxy_map_.close (); + // Remove all the elements on the map. Calling close() does not + // work because the map is left in an inconsistent state. + this->consumer_proxy_map_.open (); } if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) @@ -215,7 +217,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( || CORBA::is_nil (this->rmt_ec_.in ())) return; - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n")); // = Connect as a supplier to the local EC RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = @@ -232,17 +234,37 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( sub.dependencies[i].rt_info = this->rmt_info_; RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0; - RtecEventComm::EventSourceID sid = - sub.dependencies[i].event.header.source; - if (sid != 0 - && this->consumer_proxy_map_.find (sid, proxy) == -1) + const RtecEventComm::EventHeader &h = + sub.dependencies[i].event.header; + + RtecEventComm::EventSourceID sid = h.source; + + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) trying (%d,%d)\n", + // sid, h.type)); + + // Skip all subscriptions that do not require an specific source + // id. + if (sid == 0) + continue; + + // Skip all the magic event types. + if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED) + continue; + + if (this->consumer_proxy_map_.find (sid, proxy) == -1) { + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) binding source %d\n", + // sid)); proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV); ACE_CHECK; this->consumer_proxy_map_.bind (sid, proxy); } } - + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) consumer map computed (%d entries)\n", + // this->consumer_proxy_map_.current_size ())); if (this->consumer_proxy_map_.current_size () > 0) { @@ -261,9 +283,16 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( ++j) { RtecEventChannelAdmin::SupplierQOS pub; - pub.publications.length (sub.dependencies.length ()); + pub.publications.length (sub.dependencies.length () + 1); pub.is_gateway = 1; - int c = 0; + + pub.publications[0].event.header.type = + ACE_ES_DISJUNCTION_DESIGNATOR; + pub.publications[0].dependency_info.dependency_type = + RtecScheduler::TWO_WAY_CALL; + pub.publications[0].dependency_info.number_of_calls = 1; + pub.publications[0].dependency_info.rt_info = this->lcl_info_; + int c = 1; RtecEventComm::EventSourceID sid = (*j).ext_id_; for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k) @@ -271,7 +300,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( const RtecEventComm::EventHeader& h = sub.dependencies[k].event.header; if (h.source != sid - || (1 <= h.type + || (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED)) continue; pub.publications[c].event.header = h; @@ -281,9 +310,16 @@ TAO_EC_Gateway_IIOP::update_consumer_i ( pub.publications[c].dependency_info.rt_info = this->lcl_info_; c++; } - if (c == 0) + // ACE_DEBUG ((LM_DEBUG, + // "ECG (%t) supplier id %d has %d elements\n", + // sid, c)); + if (c == 1) continue; + + // The prefix filter builder needs to know the number of + // elements in the disjunction + pub.publications[0].event.header.source = c - 1; pub.publications.length (c); (*j).int_id_->connect_push_supplier (supplier_ref.in (), pub, |