summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp131
1 files changed, 60 insertions, 71 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
index 012486f7628..0b23e6bbb61 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
@@ -314,53 +314,21 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
{
Headers headers;
- {
- ACE_GUARD_THROW_EX (TAO_EC_ConsumerAdmin::Busy_Lock,
- ace_mon, this->event_channel_->consumer_admin ()->busy_lock (),
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
- ACE_CHECK;
-
- TAO_EC_ConsumerAdmin::SupplierSetIterator end =
- this->event_channel_->consumer_admin ()->end ();
- for (TAO_EC_ConsumerAdmin::SupplierSetIterator i =
- this->event_channel_->consumer_admin ()->begin ();
- i != end;
- ++i)
- {
- TAO_EC_ProxyPushSupplier* supplier = *i;
-
- const RtecEventChannelAdmin::ConsumerQOS& sub =
- supplier->subscriptions ();
- if (sub.is_gateway)
- continue;
- for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
- {
- const RtecEventComm::Event& event =
- sub.dependencies[j].event;
- RtecEventComm::EventType type = event.header.type;
-
- if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
- continue;
- headers.insert (event.header, 1);
- }
- }
- }
- CORBA::ULong count = 1;
- HeadersIterator i (headers);
- for (i.first (); !i.is_done (); i.next ())
- {
- count++;
- }
+ TAO_EC_Accumulate_Supplier_Headers worker (headers);
+ this->event_channel_->consumer_admin ()->for_each (&worker, ACE_TRY_ENV);
+ ACE_CHECK;
RtecEventChannelAdmin::DependencySet& dep = qos.dependencies;
- dep.length (count);
+ dep.length (headers.current_size () + 1);
dep[0].event.header.type = ACE_ES_DISJUNCTION_DESIGNATOR;
dep[0].event.header.source = 0;
dep[0].event.header.creation_time = ORBSVCS_Time::zero ();
dep[0].rt_info = 0;
- count = 1;
+
+ CORBA::ULong count = 1;
+ HeadersIterator i (headers);
for (i.first (); !i.is_done (); i.next ())
{
qos.dependencies[count++].event.header = *i.key ();
@@ -370,50 +338,71 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
void
TAO_EC_Basic_ObserverStrategy::fill_qos (
RtecEventChannelAdmin::SupplierQOS &qos,
- CORBA::Environment &)
+ CORBA::Environment &ACE_TRY_ENV)
{
Headers headers;
- {
- // @@ TODO locking in the consumer admin?
- TAO_EC_SupplierAdmin::ConsumerSetIterator end =
- this->event_channel_->supplier_admin ()->end ();
- for (TAO_EC_SupplierAdmin::ConsumerSetIterator i =
- this->event_channel_->supplier_admin ()->begin ();
- i != end;
- ++i)
- {
- TAO_EC_ProxyPushConsumer* consumer = *i;
- const RtecEventChannelAdmin::SupplierQOS& pub =
- consumer->publications ();
- if (pub.is_gateway)
- continue;
- for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
- {
- const RtecEventComm::Event& event =
- pub.publications[j].event;
- RtecEventComm::EventType type = event.header.type;
-
- if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
- continue;
- headers.insert (event.header, 1);
- }
- }
- }
+ TAO_EC_Accumulate_Consumer_Headers worker (headers);
+ this->event_channel_->supplier_admin ()->for_each (&worker,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ qos.publications.length (headers.current_size ());
+
CORBA::ULong count = 0;
HeadersIterator i (headers);
for (i.first (); !i.is_done (); i.next ())
{
- count++;
+ qos.publications[count++].event.header = *i.key ();
}
- qos.publications.length (count);
- count = 0;
- for (i.first (); !i.is_done (); i.next ())
+}
+
+// ****************************************************************
+
+void
+TAO_EC_Accumulate_Supplier_Headers::work (TAO_EC_ProxyPushSupplier *supplier,
+ CORBA::Environment &)
+{
+ const RtecEventChannelAdmin::ConsumerQOS& sub =
+ supplier->subscriptions ();
+ if (sub.is_gateway)
+ return;
+ for (CORBA::ULong j = 0; j < sub.dependencies.length (); ++j)
{
- qos.publications[count++].event.header = *i.key ();
+ const RtecEventComm::Event& event =
+ sub.dependencies[j].event;
+ RtecEventComm::EventType type = event.header.type;
+
+ if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
+ continue;
+ this->headers_.insert (event.header, 1);
}
}
+// ****************************************************************
+
+void
+TAO_EC_Accumulate_Consumer_Headers::work (TAO_EC_ProxyPushConsumer *consumer,
+ CORBA::Environment &)
+{
+ const RtecEventChannelAdmin::SupplierQOS& pub =
+ consumer->publications ();
+ if (pub.is_gateway)
+ return;
+ for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
+ {
+ const RtecEventComm::Event& event =
+ pub.publications[j].event;
+ RtecEventComm::EventType type = event.header.type;
+
+ if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
+ continue;
+ this->headers_.insert (event.header, 1);
+ }
+}
+
+// ****************************************************************
+
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Map_Manager<RtecEventChannelAdmin::Observer_Handle,TAO_EC_Basic_ObserverStrategy::Observer_Entry,ACE_Null_Mutex>;