summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp58
1 files changed, 47 insertions, 11 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
index 3b382128bd7..3f4ba2d2008 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
@@ -160,7 +160,9 @@ TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV)
CORBA::release (consumer);
ACE_CHECK;
}
- this->consumer_proxy_map_.close ();
+ // Remove all the elements on the map. Calling close() does not
+ // work because the map is left in an inconsistent state.
+ this->consumer_proxy_map_.open ();
}
if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
@@ -215,7 +217,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
|| CORBA::is_nil (this->rmt_ec_.in ()))
return;
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n"));
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) update_consumer_i \n"));
// = Connect as a supplier to the local EC
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
@@ -232,17 +234,37 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
sub.dependencies[i].rt_info = this->rmt_info_;
RtecEventChannelAdmin::ProxyPushConsumer_ptr proxy = 0;
- RtecEventComm::EventSourceID sid =
- sub.dependencies[i].event.header.source;
- if (sid != 0
- && this->consumer_proxy_map_.find (sid, proxy) == -1)
+ const RtecEventComm::EventHeader &h =
+ sub.dependencies[i].event.header;
+
+ RtecEventComm::EventSourceID sid = h.source;
+
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) trying (%d,%d)\n",
+ // sid, h.type));
+
+ // Skip all subscriptions that do not require an specific source
+ // id.
+ if (sid == 0)
+ continue;
+
+ // Skip all the magic event types.
+ if (0 < h.type && h.type < ACE_ES_EVENT_UNDEFINED)
+ continue;
+
+ if (this->consumer_proxy_map_.find (sid, proxy) == -1)
{
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) binding source %d\n",
+ // sid));
proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
ACE_CHECK;
this->consumer_proxy_map_.bind (sid, proxy);
}
}
-
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) consumer map computed (%d entries)\n",
+ // this->consumer_proxy_map_.current_size ()));
if (this->consumer_proxy_map_.current_size () > 0)
{
@@ -261,9 +283,16 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
++j)
{
RtecEventChannelAdmin::SupplierQOS pub;
- pub.publications.length (sub.dependencies.length ());
+ pub.publications.length (sub.dependencies.length () + 1);
pub.is_gateway = 1;
- int c = 0;
+
+ pub.publications[0].event.header.type =
+ ACE_ES_DISJUNCTION_DESIGNATOR;
+ pub.publications[0].dependency_info.dependency_type =
+ RtecScheduler::TWO_WAY_CALL;
+ pub.publications[0].dependency_info.number_of_calls = 1;
+ pub.publications[0].dependency_info.rt_info = this->lcl_info_;
+ int c = 1;
RtecEventComm::EventSourceID sid = (*j).ext_id_;
for (CORBA::ULong k = 0; k < sub.dependencies.length (); ++k)
@@ -271,7 +300,7 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
const RtecEventComm::EventHeader& h =
sub.dependencies[k].event.header;
if (h.source != sid
- || (1 <= h.type
+ || (0 < h.type
&& h.type < ACE_ES_EVENT_UNDEFINED))
continue;
pub.publications[c].event.header = h;
@@ -281,9 +310,16 @@ TAO_EC_Gateway_IIOP::update_consumer_i (
pub.publications[c].dependency_info.rt_info = this->lcl_info_;
c++;
}
- if (c == 0)
+ // ACE_DEBUG ((LM_DEBUG,
+ // "ECG (%t) supplier id %d has %d elements\n",
+ // sid, c));
+ if (c == 1)
continue;
+
+ // The prefix filter builder needs to know the number of
+ // elements in the disjunction
+ pub.publications[0].event.header.source = c - 1;
pub.publications.length (c);
(*j).int_id_->connect_push_supplier (supplier_ref.in (),
pub,