diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp | 131 |
1 files changed, 60 insertions, 71 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp index 012486f7628..0b23e6bbb61 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp @@ -314,53 +314,21 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( { Headers headers; - { - ACE_GUARD_THROW_EX (TAO_EC_ConsumerAdmin::Busy_Lock, - ace_mon, this->event_channel_->consumer_admin ()->busy_lock (), - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); - ACE_CHECK; - - TAO_EC_ConsumerAdmin::SupplierSetIterator end = - this->event_channel_->consumer_admin ()->end (); - for (TAO_EC_ConsumerAdmin::SupplierSetIterator i = - this->event_channel_->consumer_admin ()->begin (); - i != end; - ++i) - { - TAO_EC_ProxyPushSupplier* supplier = *i; - - const RtecEventChannelAdmin::ConsumerQOS& sub = - supplier->subscriptions (); - if (sub.is_gateway) - continue; - for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j) - { - const RtecEventComm::Event& event = - sub.dependencies[j].event; - RtecEventComm::EventType type = event.header.type; - - if (0 < type && type < ACE_ES_EVENT_UNDEFINED) - continue; - headers.insert (event.header, 1); - } - } - } - CORBA::ULong count = 1; - HeadersIterator i (headers); - for (i.first (); !i.is_done (); i.next ()) - { - count++; - } + TAO_EC_Accumulate_Supplier_Headers worker (headers); + this->event_channel_->consumer_admin ()->for_each (&worker, ACE_TRY_ENV); + ACE_CHECK; RtecEventChannelAdmin::DependencySet& dep = qos.dependencies; - dep.length (count); + dep.length (headers.current_size () + 1); dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR; dep[0].event.header.source = 0; dep[0].event.header.creation_time = ORBSVCS_Time::zero (); dep[0].rt_info = 0; - count = 1; + + CORBA::ULong count = 1; + HeadersIterator i (headers); for (i.first (); !i.is_done (); i.next ()) { qos.dependencies[count++].event.header = *i.key (); @@ -370,50 +338,71 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( void TAO_EC_Basic_ObserverStrategy::fill_qos ( RtecEventChannelAdmin::SupplierQOS &qos, - CORBA::Environment &) + CORBA::Environment &ACE_TRY_ENV) { Headers headers; - { - // @@ TODO locking in the consumer admin? - TAO_EC_SupplierAdmin::ConsumerSetIterator end = - this->event_channel_->supplier_admin ()->end (); - for (TAO_EC_SupplierAdmin::ConsumerSetIterator i = - this->event_channel_->supplier_admin ()->begin (); - i != end; - ++i) - { - TAO_EC_ProxyPushConsumer* consumer = *i; - const RtecEventChannelAdmin::SupplierQOS& pub = - consumer->publications (); - if (pub.is_gateway) - continue; - for (CORBA::ULong j = 0; j < pub.publications.length (); ++j) - { - const RtecEventComm::Event& event = - pub.publications[j].event; - RtecEventComm::EventType type = event.header.type; - - if (0 < type && type < ACE_ES_EVENT_UNDEFINED) - continue; - headers.insert (event.header, 1); - } - } - } + TAO_EC_Accumulate_Consumer_Headers worker (headers); + this->event_channel_->supplier_admin ()->for_each (&worker, + ACE_TRY_ENV); + ACE_CHECK; + + qos.publications.length (headers.current_size ()); + CORBA::ULong count = 0; HeadersIterator i (headers); for (i.first (); !i.is_done (); i.next ()) { - count++; + qos.publications[count++].event.header = *i.key (); } - qos.publications.length (count); - count = 0; - for (i.first (); !i.is_done (); i.next ()) +} + +// **************************************************************** + +void +TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier, + CORBA::Environment &) +{ + const RtecEventChannelAdmin::ConsumerQOS& sub = + supplier->subscriptions (); + if (sub.is_gateway) + return; + for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j) { - qos.publications[count++].event.header = *i.key (); + const RtecEventComm::Event& event = + sub.dependencies[j].event; + RtecEventComm::EventType type = event.header.type; + + if (0 < type && type < ACE_ES_EVENT_UNDEFINED) + continue; + this->headers_.insert (event.header, 1); } } +// **************************************************************** + +void +TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &) +{ + const RtecEventChannelAdmin::SupplierQOS& pub = + consumer->publications (); + if (pub.is_gateway) + return; + for (CORBA::ULong j = 0; j < pub.publications.length (); ++j) + { + const RtecEventComm::Event& event = + pub.publications[j].event; + RtecEventComm::EventType type = event.header.type; + + if (0 < type && type < ACE_ES_EVENT_UNDEFINED) + continue; + this->headers_.insert (event.header, 1); + } +} + +// **************************************************************** + #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry,ACE_Null_Mutex>; |