summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-22 22:53:02 +0000
committercoryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-05-22 22:53:02 +0000
commit07f48b6c6f3fea082660939227837b832a2e4394 (patch)
tree800544c6d26be1590e3d848ab2e01e659ffbd195
parentdc15d6f74631ce74e423e1a0925e48db83c95916 (diff)
downloadATCD-07f48b6c6f3fea082660939227837b832a2e4394.tar.gz
ChangeLogTag:Sat May 22 17:50:12 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
-rw-r--r--TAO/ChangeLog-99c61
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i24
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp8
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h14
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i16
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp250
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h23
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp83
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp26
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp83
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h14
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i11
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp12
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp11
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp12
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h2
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Observer.cpp56
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/Observer.h4
-rw-r--r--TAO/orbsvcs/tests/Event/Basic/svc.conf2
-rw-r--r--TAO/orbsvcs/tests/Event/lib/Driver.cpp30
-rw-r--r--TAO/orbsvcs/tests/Event/lib/Driver.h6
25 files changed, 569 insertions, 197 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c
index 217d00474b7..520982c98df 100644
--- a/TAO/ChangeLog-99c
+++ b/TAO/ChangeLog-99c
@@ -1,3 +1,64 @@
+Sat May 22 17:50:12 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
+
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i:
+ * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Event_Channel.h:
+ * orbsvcs/orbsvcs/Event/EC_Event_Channel.i:
+ * orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp:
+ The EC attributes include parameters that control the level of
+ concurrency in the proxy supplier sets.
+
+ * orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h:
+ * orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h:
+ * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp:
+ Added reference counting: several threads could be pushing on
+ one consumer proxy and another thread decides to disconnect from
+ it. The corresponging supplier set must be deleted only once
+ all the threads finish using it.
+ In many cases the implementation is trivial because there is a
+ clear owner (ex: the ConsumerAdmin), and in others the locking
+ is shared with the consumer proxy (Per_Supplier).
+
+ * orbsvcs/orbsvcs/Event/EC_Gateway.h:
+ * orbsvcs/orbsvcs/Event/EC_Gateway.cpp:
+ The gateway was not properly synchronized. The locking in the
+ push() method is interesting: we don't hold the lock during the
+ complete dispatch, instead we keep track of the number of
+ threads pushing to the gateway.
+ If we receive an update_consumer() call during a push we post
+ the change until all the push() call finish. Only the last
+ update_consumer() is kept because they overwrite each other.
+
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h:
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i:
+ * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp:
+ We needed an unprotected accessor to the publications, to avoid
+ dead-locks during connect_push_consumer call.
+ Use the new reference counting on the EC_SupplierFiltering
+ class.
+
+ * orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp:
+ We were holding locks during upcalls to the gateways and not
+ setting the QoS parameters correctly.
+ The update_consumer/update_supplier messages were backwards.
+
+ * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp:
+ Fixed several race conditions related to connect/disconnect
+ calls during pushes.
+
+ * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp:
+ Removed debug messages.
+
+ * orbsvcs/tests/Event/Basic/Observer.h:
+ * orbsvcs/tests/Event/Basic/Observer.cpp:
+ * orbsvcs/tests/Event/Basic/svc.conf:
+ Fixed minor problems with the test itself.
+
Sat May 22 17:10:11 1999 Chris Gill <cdgill@cs.wustl.edu>
* TAO version 0.3.23 released.
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
index 9df4ba2ea54..23b54f7bf11 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp
@@ -21,6 +21,8 @@ TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec,
{
this->supplier_set_ =
this->event_channel_->create_proxy_push_supplier_set ();
+ this->supplier_set_->busy_hwm (this->event_channel_->busy_hwm ());
+ this->supplier_set_->max_write_delay (this->event_channel_->max_write_delay ());
}
this->default_POA_ =
this->event_channel_->consumer_poa ();
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
index 52f070b9995..4dee0091898 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h
@@ -77,12 +77,6 @@ public:
typedef TAO_EC_ProxyPushSupplier_Set::Busy_Lock Busy_Lock;
Busy_Lock& busy_lock (void);
- void busy_hwm (CORBA::ULong hwm);
- CORBA::ULong busy_hwm (void) const;
- void max_write_delay (CORBA::ULong hwm);
- CORBA::ULong max_write_delay (void) const;
- // Delegate on the EC_ProxyPushSupplier....
-
virtual void connected (TAO_EC_ProxyPushConsumer*,
CORBA::Environment&);
virtual void disconnected (TAO_EC_ProxyPushConsumer*,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
index 9d316a56539..ee065379a5e 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i
@@ -17,27 +17,3 @@ TAO_EC_ConsumerAdmin::busy_lock (void)
{
return this->supplier_set_->busy_lock ();
}
-
-ACE_INLINE void
-TAO_EC_ConsumerAdmin::busy_hwm (CORBA::ULong hwm)
-{
- this->supplier_set_->busy_hwm (hwm);
-}
-
-ACE_INLINE CORBA::ULong
-TAO_EC_ConsumerAdmin::busy_hwm (void) const
-{
- return this->supplier_set_->busy_hwm ();
-}
-
-ACE_INLINE void
-TAO_EC_ConsumerAdmin::max_write_delay (CORBA::ULong hwm)
-{
- this->supplier_set_->max_write_delay (hwm);
-}
-
-ACE_INLINE CORBA::ULong
-TAO_EC_ConsumerAdmin::max_write_delay (void) const
-{
- return this->supplier_set_->max_write_delay ();
-}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
index 55a36426f62..72694d63538 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp
@@ -24,7 +24,9 @@ TAO_EC_Event_Channel (const TAO_EC_Event_Channel_Attributes& attr,
factory_ (factory),
own_factory_ (own_factory),
consumer_reconnect_ (attr.consumer_reconnect),
- supplier_reconnect_ (attr.supplier_reconnect)
+ supplier_reconnect_ (attr.supplier_reconnect),
+ busy_hwm_ (attr.busy_hwm),
+ max_write_delay_ (attr.max_write_delay)
{
if (this->factory_ == 0)
{
@@ -60,10 +62,6 @@ TAO_EC_Event_Channel (const TAO_EC_Event_Channel_Attributes& attr,
this->scheduling_strategy_ =
this->factory_->create_scheduling_strategy (this);
-
- this->consumer_admin_->busy_hwm (attr.consumer_admin_busy_hwm);
- this->consumer_admin_->max_write_delay (attr.consumer_admin_max_write_delay);
-
}
TAO_EC_Event_Channel::~TAO_EC_Event_Channel (void)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
index fd213d32c8b..264417b61d3 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h
@@ -66,8 +66,8 @@ public:
int supplier_reconnect;
// Can consumers or suppliers invoke connect_push_* multiple times?
- int consumer_admin_busy_hwm;
- int consumer_admin_max_write_delay;
+ int busy_hwm;
+ int max_write_delay;
// Flags for the Consumer Admin
RtecScheduler::Scheduler_ptr scheduler;
@@ -197,6 +197,11 @@ public:
RtecScheduler::Scheduler_ptr scheduler (void);
// Obtain the scheduler, the user must release
+ int busy_hwm (void) const;
+ int max_write_delay (void) const;
+ // Control the concurrency of the delayed connect/disconnect
+ // operations.
+
// = The RtecEventChannelAdmin::EventChannel methods...
virtual RtecEventChannelAdmin::ConsumerAdmin_ptr
for_consumers (CORBA::Environment& env);
@@ -268,6 +273,11 @@ private:
int consumer_reconnect_;
int supplier_reconnect_;
// Consumer/Supplier reconnection flags
+
+ int busy_hwm_;
+ int max_write_delay_;
+ // Control the level of concurrency in the supplier sets with
+ // delayed operations
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
index 8b4cb9999b2..003de1ebbef 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i
@@ -6,8 +6,8 @@ TAO_EC_Event_Channel_Attributes (PortableServer::POA_ptr s_poa,
PortableServer::POA_ptr c_poa)
: consumer_reconnect (0),
supplier_reconnect (0),
- consumer_admin_busy_hwm (1),
- consumer_admin_max_write_delay (1),
+ busy_hwm (1),
+ max_write_delay (1),
scheduler (RtecScheduler::Scheduler::_nil ()),
supplier_poa (s_poa),
consumer_poa (c_poa)
@@ -169,3 +169,15 @@ TAO_EC_Event_Channel::scheduler (void)
{
return RtecScheduler::Scheduler::_duplicate (this->scheduler_.in ());
}
+
+ACE_INLINE int
+TAO_EC_Event_Channel::busy_hwm (void) const
+{
+ return this->busy_hwm_;
+}
+
+ACE_INLINE int
+TAO_EC_Event_Channel::max_write_delay (void) const
+{
+ return this->max_write_delay_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
index 47031fd4c21..9b61bcae6a8 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp
@@ -30,8 +30,12 @@ TAO_EC_Gateway::observer_handle (void) const
// ****************************************************************
TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void)
- : consumer_ (this),
- supplier_ (this)
+ : busy_count_ (0),
+ update_posted_ (0),
+ consumer_ (this),
+ consumer_is_active_ (0),
+ supplier_ (this),
+ supplier_is_active_ (0)
{
}
@@ -48,6 +52,11 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
const char* rmt_name,
CORBA::Environment &ACE_TRY_ENV)
{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (!CORBA::is_nil (this->rmt_ec_.in ()))
+ return;
+
this->rmt_ec_ =
RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec);
this->lcl_ec_ =
@@ -103,9 +112,16 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec,
void
TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV)
{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->close_i (ACE_TRY_ENV);
+}
+
+
+void
+TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV)
+{
// ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n"));
- if (CORBA::is_nil (this->supplier_proxy_.in ()))
- return;
if (this->consumer_proxy_map_.current_size () > 0)
{
@@ -113,75 +129,105 @@ TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV)
j != this->consumer_proxy_map_.end ();
++j)
{
- (*j).int_id_->disconnect_push_consumer (ACE_TRY_ENV);
- CORBA::release ((*j).int_id_);
+ RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_;
+ if (CORBA::is_nil (consumer))
+ continue;
+ consumer->disconnect_push_consumer (ACE_TRY_ENV);
+ CORBA::release (consumer);
ACE_CHECK;
}
this->consumer_proxy_map_.close ();
}
- this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV);
- ACE_CHECK;
+ if (!CORBA::is_nil (this->default_consumer_proxy_.in ()))
+ {
+ this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
- this->default_consumer_proxy_ =
- RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+ this->default_consumer_proxy_ =
+ RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
+ }
- {
- PortableServer::POA_var poa =
- this->supplier_._default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->supplier_, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
- }
+ if (this->supplier_is_active_)
+ {
+ PortableServer::POA_var poa =
+ this->supplier_._default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->supplier_, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+ this->supplier_is_active_ = 0;
+ }
- this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV);
- ACE_CHECK;
+ if (!CORBA::is_nil (this->supplier_proxy_.in ()))
+ {
+ this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
- this->supplier_proxy_ =
- RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
- {
- PortableServer::POA_var poa =
- this->consumer_._default_POA (ACE_TRY_ENV);
- ACE_CHECK;
- PortableServer::ObjectId_var id =
- poa->servant_to_id (&this->consumer_, ACE_TRY_ENV);
- ACE_CHECK;
- poa->deactivate_object (id.in (), ACE_TRY_ENV);
- ACE_CHECK;
- }
+ this->supplier_proxy_ =
+ RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
+ }
+
+ if (this->consumer_is_active_)
+ {
+ PortableServer::POA_var poa =
+ this->consumer_._default_POA (ACE_TRY_ENV);
+ ACE_CHECK;
+ PortableServer::ObjectId_var id =
+ poa->servant_to_id (&this->consumer_, ACE_TRY_ENV);
+ ACE_CHECK;
+ poa->deactivate_object (id.in (), ACE_TRY_ENV);
+ ACE_CHECK;
+ this->consumer_is_active_ = 0;
+ }
}
void
-TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& c_qos,
- CORBA::Environment& env)
+TAO_EC_Gateway_IIOP::update_consumer (
+ const RtecEventChannelAdmin::ConsumerQOS& c_qos,
+ CORBA::Environment& ACE_TRY_ENV)
{
- this->close (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
-
if (c_qos.dependencies.length () <= 1)
return;
- // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n"));
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ if (this->busy_count_ != 0)
+ {
+ this->update_posted_ = 1;
+ this->c_qos_ = c_qos;
+ return;
+ }
+
+ this->update_consumer_i (c_qos, ACE_TRY_ENV);
+}
+
+void
+TAO_EC_Gateway_IIOP::update_consumer_i (
+ const RtecEventChannelAdmin::ConsumerQOS& c_qos,
+ CORBA::Environment& ACE_TRY_ENV)
+{
+ this->close_i (ACE_TRY_ENV);
+ ACE_CHECK;
+
if (CORBA::is_nil (this->lcl_ec_.in ())
|| CORBA::is_nil (this->rmt_ec_.in ()))
return;
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n"));
+
// = Connect as a supplier to the local EC
RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
- this->lcl_ec_->for_suppliers (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
-
- this->default_consumer_proxy_ =
- supplier_admin->obtain_push_consumer (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ this->lcl_ec_->for_suppliers (ACE_TRY_ENV);
+ ACE_CHECK;
// Change the RT_Info in the consumer QoS.
// On the same loop we discover the subscriptions by event source,
// and fill the consumer proxy map.
RtecEventChannelAdmin::ConsumerQOS sub = c_qos;
+ sub.is_gateway = 1;
for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i)
{
sub.dependencies[i].rt_info = this->rmt_info_;
@@ -192,19 +238,22 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS&
if (sid != 0
&& this->consumer_proxy_map_.find (sid, proxy) == -1)
{
- proxy = supplier_admin->obtain_push_consumer (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
this->consumer_proxy_map_.bind (sid, proxy);
}
}
- // Obtain a reference to our supplier personality...
- RtecEventComm::PushSupplier_var supplier_ref =
- this->supplier_._this (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
if (this->consumer_proxy_map_.current_size () > 0)
{
+ this->supplier_is_active_ = 1;
+
+ // Obtain a reference to our supplier personality...
+ RtecEventComm::PushSupplier_var supplier_ref =
+ this->supplier_._this (ACE_TRY_ENV);
+ ACE_CHECK;
+
// For each subscription by source build the set of publications
// (they may several, by type, for instance) and connect to the
// consumer proxy.
@@ -239,8 +288,8 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS&
pub.publications.length (c);
(*j).int_id_->connect_push_supplier (supplier_ref.in (),
pub,
- env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ ACE_TRY_ENV);
+ ACE_CHECK;
}
}
@@ -267,44 +316,59 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS&
pub.publications[c].dependency_info.rt_info = this->lcl_info_;
c++;
}
+
if (c > 0)
{
+ this->supplier_is_active_ = 1;
+
+ // Obtain a reference to our supplier personality...
+ RtecEventComm::PushSupplier_var supplier_ref =
+ this->supplier_._this (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ // Obtain the consumer....
+ this->default_consumer_proxy_ =
+ supplier_admin->obtain_push_consumer (ACE_TRY_ENV);
+ ACE_CHECK;
+
pub.publications.length (c);
this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (),
pub,
- env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ ACE_TRY_ENV);
+ ACE_CHECK;
}
- //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
- //ACE_SupplierQOS_Factory::debug (pub);
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier "));
+ // ACE_SupplierQOS_Factory::debug (pub);
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
- this->rmt_ec_->for_consumers (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ this->rmt_ec_->for_consumers (ACE_TRY_ENV);
+ ACE_CHECK;
this->supplier_proxy_ =
- consumer_admin->obtain_push_supplier (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ consumer_admin->obtain_push_supplier (ACE_TRY_ENV);
+ ACE_CHECK;
+ this->consumer_is_active_ = 1;
RtecEventComm::PushConsumer_var consumer_ref =
- this->consumer_._this (env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ this->consumer_._this (ACE_TRY_ENV);
+ ACE_CHECK;
- //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer "));
- //ACE_ConsumerQOS_Factory::debug (sub);
+ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer "));
+ // ACE_ConsumerQOS_Factory::debug (sub);
this->supplier_proxy_->connect_push_consumer (consumer_ref.in (),
sub,
- env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+ ACE_TRY_ENV);
+ ACE_CHECK;
}
void
-TAO_EC_Gateway_IIOP::update_supplier (const RtecEventChannelAdmin::SupplierQOS&,
- CORBA::Environment&)
+TAO_EC_Gateway_IIOP::update_supplier (
+ const RtecEventChannelAdmin::SupplierQOS&,
+ CORBA::Environment&)
{
// Do nothing...
}
@@ -327,7 +391,7 @@ TAO_EC_Gateway_IIOP::disconnect_push_supplier (CORBA::Environment &)
void
TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
- CORBA::Environment & env)
+ CORBA::Environment & ACE_TRY_ENV)
{
//ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push - "));
@@ -337,7 +401,13 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
return;
}
- //ACE_DEBUG ((LM_DEBUG, "ECP: %d event(s) - ", events.length ()));
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->busy_count_++;
+ }
+
+ // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ()));
// @@ TODO, there is an extra data copy here, we should do the event
// modification without it and only compact the necessary events.
@@ -357,21 +427,43 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events,
// default consumer proxy.
proxy = this->default_consumer_proxy_.in ();
}
+
+ if (CORBA::is_nil (proxy))
+ continue;
+
out[0] = events[i];
out[0].header.ttl--;
- proxy->push (out, env);
- TAO_CHECK_ENV_RETURN_VOID (env);
+
+ proxy->push (out, ACE_TRY_ENV);
+ ACE_CHECK;
}
+
+ {
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
+ this->busy_count_--;
+
+ if (this->busy_count_ == 0 && this->update_posted_ != 0)
+ {
+ this->update_posted_ = 0;
+ this->update_consumer_i (this->c_qos_, ACE_TRY_ENV);
+ }
+ }
+
}
int
-TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& TAO_IN_ENV)
+TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& ACE_TRY_ENV)
{
- this->close (TAO_IN_ENV);
- if (TAO_IN_ENV.exception () == 0) return -1;
+ ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1);
+
+ this->close_i (ACE_TRY_ENV);
+ ACE_CHECK_RETURN (-1);
- this->lcl_ec_ = 0;
- this->rmt_ec_ = 0;
+ this->lcl_ec_ =
+ RtecEventChannelAdmin::EventChannel::_nil ();
+ this->rmt_ec_ =
+ RtecEventChannelAdmin::EventChannel::_nil ();
return 0;
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
index 9d50dd0e8ae..50835bec895 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h
@@ -143,6 +143,25 @@ public:
CORBA::Environment& env);
private:
+ void close_i (CORBA::Environment& );
+
+ void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub,
+ CORBA::Environment& env);
+
+private:
+ ACE_SYNCH_MUTEX lock_;
+ // Lock to synchronize internal changes
+
+ CORBA::ULong busy_count_;
+ // How many threads are running push() we cannot make changes until
+ // that reaches 0
+
+ int update_posted_;
+ RtecEventChannelAdmin::ConsumerQOS c_qos_;
+ // An update_consumer() message arrived *while* we were doing a
+ // push() the modification is stored <pub_>, if multiple
+ // update_consumer messages arrive only the last one is executed.
+
RtecEventChannelAdmin::EventChannel_var rmt_ec_;
RtecEventChannelAdmin::EventChannel_var lcl_ec_;
// The remote and the local EC, so we can reconnect when the list changes.
@@ -153,9 +172,13 @@ private:
ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_;
// Our consumer personality....
+ int consumer_is_active_;
+ // If it is not 0 then we must deactivate the supplier
ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_;
// Our supplier personality....
+ int supplier_is_active_;
+ // If it is not 0 then we must deactivate the supplier
// We use a different Consumer_Proxy
typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map;
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
index 994683d9279..ac02d5b112f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp
@@ -86,18 +86,21 @@ TAO_EC_Basic_ObserverStrategy::append_observer (
RtecEventChannel::EventChannel::SYNCHRONIZATION_ERROR,
RtecEventChannel::EventChannel::CANT_APPEND_OBSERVER))
{
- ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
- RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
- ACE_CHECK_RETURN (0);
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR());
+ ACE_CHECK_RETURN (0);
- this->handle_generator_++;
- Observer_Entry entry (this->handle_generator_,
- RtecEventChannelAdmin::Observer::_duplicate (obs));
+ this->handle_generator_++;
+ Observer_Entry entry (this->handle_generator_,
+ RtecEventChannelAdmin::Observer::_duplicate (obs));
- if (this->observers_.bind (entry.handle, entry) == -1)
- ACE_THROW_RETURN (
- RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
- 0);
+ if (this->observers_.bind (entry.handle, entry) == -1)
+ ACE_THROW_RETURN (
+ RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(),
+ 0);
+ }
RtecEventChannelAdmin::ConsumerQOS c_qos;
this->fill_qos (c_qos, ACE_TRY_ENV);
@@ -133,11 +136,15 @@ TAO_EC_Basic_ObserverStrategy::remove_observer (
}
void
-TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_Basic_ObserverStrategy::connected (
+ TAO_EC_ProxyPushConsumer *consumer,
+ CORBA::Environment &ACE_TRY_ENV)
{
- RtecEventChannelAdmin::ConsumerQOS c_qos;
- this->fill_qos (c_qos, ACE_TRY_ENV);
+ if (consumer->publications ().is_gateway)
+ return;
+
+ RtecEventChannelAdmin::SupplierQOS s_qos;
+ this->fill_qos (s_qos, ACE_TRY_ENV);
ACE_CHECK;
int size = 0;
@@ -165,17 +172,21 @@ TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*,
for (int i = 0; i != size; ++i)
{
- copy[i]->update_consumer (c_qos, ACE_TRY_ENV);
+ copy[i]->update_supplier (s_qos, ACE_TRY_ENV);
ACE_CHECK;
}
}
void
-TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_Basic_ObserverStrategy::disconnected (
+ TAO_EC_ProxyPushConsumer* consumer,
+ CORBA::Environment &ACE_TRY_ENV)
{
- RtecEventChannelAdmin::ConsumerQOS c_qos;
- this->fill_qos (c_qos, ACE_TRY_ENV);
+ if (consumer->publications ().is_gateway)
+ return;
+
+ RtecEventChannelAdmin::SupplierQOS s_qos;
+ this->fill_qos (s_qos, ACE_TRY_ENV);
ACE_CHECK;
int size = 0;
@@ -203,17 +214,21 @@ TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*,
for (int i = 0; i != size; ++i)
{
- copy[i]->update_consumer (c_qos, ACE_TRY_ENV);
+ copy[i]->update_supplier (s_qos, ACE_TRY_ENV);
ACE_CHECK;
}
}
void
-TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_Basic_ObserverStrategy::connected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
- RtecEventChannelAdmin::SupplierQOS s_qos;
- this->fill_qos (s_qos, ACE_TRY_ENV);
+ if (supplier->subscriptions ().is_gateway)
+ return;
+
+ RtecEventChannelAdmin::ConsumerQOS c_qos;
+ this->fill_qos (c_qos, ACE_TRY_ENV);
ACE_CHECK;
int size = 0;
@@ -241,17 +256,21 @@ TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*,
for (int i = 0; i != size; ++i)
{
- copy[i]->update_supplier (s_qos, ACE_TRY_ENV);
+ copy[i]->update_consumer (c_qos, ACE_TRY_ENV);
ACE_CHECK;
}
}
void
-TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*,
- CORBA::Environment &ACE_TRY_ENV)
+TAO_EC_Basic_ObserverStrategy::disconnected (
+ TAO_EC_ProxyPushSupplier* supplier,
+ CORBA::Environment &ACE_TRY_ENV)
{
- RtecEventChannelAdmin::SupplierQOS s_qos;
- this->fill_qos (s_qos, ACE_TRY_ENV);
+ if (supplier->subscriptions ().is_gateway)
+ return;
+
+ RtecEventChannelAdmin::ConsumerQOS c_qos;
+ this->fill_qos (c_qos, ACE_TRY_ENV);
ACE_CHECK;
int size = 0;
@@ -279,7 +298,7 @@ TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*,
for (int i = 0; i != size; ++i)
{
- copy[i]->update_supplier (s_qos, ACE_TRY_ENV);
+ copy[i]->update_consumer (c_qos, ACE_TRY_ENV);
ACE_CHECK;
}
}
@@ -316,7 +335,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
sub.dependencies[j].event;
RtecEventComm::EventType type = event.header.type;
- if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED)
+ if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
continue;
headers.insert (event.header, 1);
}
@@ -371,7 +390,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos (
pub.publications[j].event;
RtecEventComm::EventType type = event.header.type;
- if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED)
+ if (0 < type && type < ACE_ES_EVENT_UNDEFINED)
continue;
headers.insert (event.header, 1);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
index d7c8740deff..9aa562b6060 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp
@@ -17,10 +17,13 @@ ACE_RCSID(Event, EC_Per_Supplier_Filter, "$Id$")
TAO_EC_Per_Supplier_Filter::
TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel* ec)
- : event_channel_ (ec)
+ : event_channel_ (ec),
+ refcnt_ (1)
{
this->supplier_set_ =
this->event_channel_->create_proxy_push_supplier_set ();
+ this->supplier_set_->busy_hwm (this->event_channel_->busy_hwm ());
+ this->supplier_set_->max_write_delay (this->event_channel_->max_write_delay ());
}
TAO_EC_Per_Supplier_Filter::~TAO_EC_Per_Supplier_Filter (void)
@@ -52,7 +55,7 @@ TAO_EC_Per_Supplier_Filter::connected (TAO_EC_ProxyPushSupplier* supplier,
return;
const RtecEventChannelAdmin::SupplierQOS& pub =
- this->consumer_->publications ();
+ this->consumer_->publications_i ();
for (CORBA::ULong j = 0; j < pub.publications.length (); ++j)
{
@@ -125,6 +128,25 @@ TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event,
}
}
+CORBA::ULong
+TAO_EC_Per_Supplier_Filter::_incr_refcnt (void)
+{
+ this->refcnt_++;
+ return this->refcnt_;
+}
+
+CORBA::ULong
+TAO_EC_Per_Supplier_Filter::_decr_refcnt (void)
+{
+ this->refcnt_--;
+ if (this->refcnt_ == 0)
+ {
+ this->event_channel_->supplier_filter_builder ()->destroy (this);
+ return 0;
+ }
+ return this->refcnt_;
+}
+
// ****************************************************************
TAO_EC_SupplierFiltering*
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h
index 9880d6d8a44..33201e670c1 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h
@@ -64,6 +64,8 @@ public:
virtual void shutdown (CORBA::Environment &env);
virtual void push (const RtecEventComm::EventSet& event,
CORBA::Environment &);
+ virtual CORBA::ULong _decr_refcnt (void);
+ virtual CORBA::ULong _incr_refcnt (void);
private:
TAO_EC_Event_Channel *event_channel_;
@@ -75,6 +77,9 @@ private:
TAO_EC_ProxyPushSupplier_Set* supplier_set_;
// Keep the list of proxies for the consumers that may be interested
// in our events.
+
+ CORBA::ULong refcnt_;
+ // Reference counting
};
// ****************************************************************
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
index 559434320de..c1512e55bcb 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp
@@ -35,16 +35,58 @@ void
TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment &ACE_TRY_ENV)
{
- if (this->is_connected ())
- this->filter_->connected (supplier, ACE_TRY_ENV);
+ TAO_EC_SupplierFiltering* filter = 0;
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+
+ if (this->is_connected_i () == 0)
+ return;
+
+ filter = this->filter_;
+ filter->_incr_refcnt ();
+ }
+
+ filter->connected (supplier, ACE_TRY_ENV);
+
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+ filter->_decr_refcnt ();
+ }
}
void
TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushSupplier* supplier,
CORBA::Environment &ACE_TRY_ENV)
{
- if (this->is_connected ())
- this->filter_->disconnected (supplier, ACE_TRY_ENV);
+ TAO_EC_SupplierFiltering* filter = 0;
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+
+ if (this->is_connected_i () == 0)
+ return;
+
+ filter = this->filter_;
+ filter->_incr_refcnt ();
+ }
+
+ filter->disconnected (supplier, ACE_TRY_ENV);
+
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+ filter->_decr_refcnt ();
+ }
}
void
@@ -94,7 +136,7 @@ TAO_EC_ProxyPushConsumer::cleanup_i (void)
RtecEventComm::PushSupplier::_nil ();
this->filter_->unbind (this);
- this->event_channel_->supplier_filter_builder ()->destroy (this->filter_);
+ this->filter_->_decr_refcnt ();
this->filter_ = 0;
}
@@ -189,12 +231,37 @@ void
TAO_EC_ProxyPushConsumer::push (const RtecEventComm::EventSet& event,
CORBA::Environment &ACE_TRY_ENV)
{
- if (this->is_connected () == 0)
- return; // @@ THROW something???
+ TAO_EC_SupplierFiltering* filter = 0;
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+
+ if (this->is_connected_i () == 0)
+ return; // @@ THROW something???
+
+ filter = this->filter_;
+ filter->_incr_refcnt ();
+
+ this->refcount_++;
+ }
// No need to keep the lock, the filter_ class is supposed to be
// thread safe....
- this->filter_->push (event, ACE_TRY_ENV);
+ filter->push (event, ACE_TRY_ENV);
+
+ {
+ ACE_GUARD_THROW_EX (
+ ACE_Lock, ace_mon, *this->lock_,
+ RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
+ ACE_CHECK;
+ filter->_decr_refcnt ();
+ this->refcount_--;
+ if (this->refcount_ != 0)
+ return;
+ }
+ this->event_channel_->destroy_proxy_push_consumer (this);
}
void
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
index 528df12f129..3cc9cd8b303 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h
@@ -77,29 +77,33 @@ public:
// The QoS (subscription) used to connect to the EC.
virtual void connected (TAO_EC_ProxyPushSupplier* supplier,
- CORBA::Environment &env);
+ CORBA::Environment &env);
virtual void disconnected (TAO_EC_ProxyPushSupplier* supplier,
- CORBA::Environment &env);
+ CORBA::Environment &env);
// Concrete implementations can use this methods to keep track of
// the consumers interested in this events.
virtual void connected (TAO_EC_ProxyPushConsumer* consumer,
- CORBA::Environment &env);
+ CORBA::Environment &env);
virtual void disconnected (TAO_EC_ProxyPushConsumer* consumer,
- CORBA::Environment &env);
+ CORBA::Environment &env);
// Usually implemented as no-ops, but some configurations may
// require this methods.
virtual void shutdown (CORBA::Environment&);
// The event channel is shutting down
+ const RtecEventChannelAdmin::SupplierQOS& publications_i (void) const;
+ // The QoS (subscription) used to connect to the EC, assumes the
+ // locks are held, use with care!
+
CORBA::ULong _incr_refcnt (void);
CORBA::ULong _decr_refcnt (void);
// Increment and decrement the reference count.
// = The RtecEventChannelAdmin::ProxyPushConsumer methods...
virtual void connect_push_supplier (
- RtecEventComm::PushSupplier_ptr push_supplier,
+ RtecEventComm::PushSupplier_ptr push_supplier,
const RtecEventChannelAdmin::SupplierQOS& qos,
CORBA::Environment &);
virtual void push (const RtecEventComm::EventSet& event,
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
index 0ca9186a78d..e5abd1f309f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i
@@ -24,9 +24,14 @@ TAO_EC_ProxyPushConsumer::supplier (void) const
ACE_INLINE const RtecEventChannelAdmin::SupplierQOS&
TAO_EC_ProxyPushConsumer::publications (void) const
{
- // @@ TODO There should be a better way to signal errors here.
- static RtecEventChannelAdmin::SupplierQOS empty_qos;
- ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, empty_qos);
+ // @@ TODO There should some way to signal errors here.
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, this->qos_);
return this->qos_;
}
+
+ACE_INLINE const RtecEventChannelAdmin::SupplierQOS&
+TAO_EC_ProxyPushConsumer::publications_i (void) const
+{
+ return this->qos_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp
index 94631a3f23f..0b832320644 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp
@@ -111,11 +111,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::connected (
ACE_NEW (command,
TAO_EC_ProxyPushSupplier_Set::Connected_Command (this,
supplier));
-
- ACE_DEBUG ((LM_DEBUG,
- "EC (%P|%t) Delayed connection command = %x\n",
- command));
-
this->command_queue_.enqueue_tail (command);
this->write_delay_++;
}
@@ -141,9 +136,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::disconnected (
ACE_NEW (command,
TAO_EC_ProxyPushSupplier_Set::Disconnected_Command (this,
supplier));
- ACE_DEBUG ((LM_DEBUG,
- "EC (%P|%t) Delayed disconnection command = %x\n",
- command));
this->command_queue_.enqueue_tail (command);
this->write_delay_++;
@@ -188,10 +180,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::execute_delayed_operations
command->execute ();
- ACE_DEBUG ((LM_DEBUG,
- "EC (%P|%t) Executed delayed command = %x\n",
- command));
-
delete command;
}
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
index 090bb115750..bbc4163bf0c 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp
@@ -218,7 +218,7 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier (
this->deactivate (ACE_TRY_ENV);
ACE_CHECK;
- // Notify the event channel...
+ // Notify the event channel....
this->event_channel_->disconnected (this, ACE_TRY_ENV);
this->_decr_refcnt ();
@@ -259,6 +259,9 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK_RETURN (0);
+ if (this->is_connected_i () == 0)
+ return 0;
+
result =
this->child_->filter (event, qos_info, ACE_TRY_ENV);
if (this->refcount_ > 0)
@@ -282,6 +285,9 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event,
RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ());
ACE_CHECK_RETURN (0);
+ if (this->is_connected_i () == 0)
+ return 0;
+
result =
this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV);
if (this->refcount_ > 0)
@@ -434,6 +440,9 @@ TAO_EC_ProxyPushSupplier::can_match (
{
ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0);
+ if (this->is_connected_i () == 0)
+ return 0;
+
return this->child_->can_match (header);
}
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h
index 0e307aa59d5..e9167f64c5d 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h
@@ -109,6 +109,11 @@ public:
CORBA::Environment &) = 0;
// The ProxyPushConsumer delegates on this class to actually send
// the event.
+
+ virtual CORBA::ULong _incr_refcnt (void) = 0;
+ virtual CORBA::ULong _decr_refcnt (void) = 0;
+ // Increment and decrement the reference count, locking must be
+ // provided by the user.
};
#if defined (__ACE_INLINE__)
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp
index e6a03a59a66..98a28ba0cd9 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp
@@ -83,6 +83,18 @@ TAO_EC_Trivial_Supplier_Filter::push (const RtecEventComm::EventSet& event,
}
}
+CORBA::ULong
+TAO_EC_Trivial_Supplier_Filter::_incr_refcnt (void)
+{
+ return 1;
+}
+
+CORBA::ULong
+TAO_EC_Trivial_Supplier_Filter::_decr_refcnt (void)
+{
+ return 1;
+}
+
// ****************************************************************
TAO_EC_Trivial_Supplier_Filter_Builder::
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h
index ac3d293bd68..26a6cd8cb1f 100644
--- a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h
+++ b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h
@@ -61,6 +61,8 @@ public:
virtual void shutdown (CORBA::Environment &env);
virtual void push (const RtecEventComm::EventSet& event,
CORBA::Environment &);
+ virtual CORBA::ULong _decr_refcnt (void);
+ virtual CORBA::ULong _incr_refcnt (void);
private:
TAO_EC_Event_Channel *event_channel_;
diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.cpp b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
index f4d43214a54..3cdd72a624d 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
+++ b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp
@@ -112,7 +112,15 @@ EC_Master::run (int argc, char* argv[])
}
}
- }
+ {
+ for (int i = 0; i != this->n_channels_; ++i)
+ {
+ this->channels_[i]->run_cleanup (ACE_TRY_ENV);
+ ACE_TRY_CHECK;
+ }
+ }
+
+ }
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC_Driver::run");
@@ -173,7 +181,7 @@ EC_Master::parse_args (int &argc, char *argv [])
arg_shifter.consume_arg ();
this->seed_ = ACE_OS::atoi (arg_shifter.get_current ());
}
-
+
arg_shifter.ignore_arg ();
}
return 0;
@@ -244,6 +252,13 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV)
RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
this->master_->channel (i)->event_channel_;
+ this->gwys_[i].init (rmt_ec,
+ this->event_channel_.in (),
+ RtecScheduler::Scheduler::_nil (),
+ RtecScheduler::Scheduler::_nil (),
+ 0, 0,
+ ACE_TRY_ENV);
+
RtecEventChannelAdmin::Observer_var obs =
this->gwys_[i]._this (ACE_TRY_ENV);
ACE_CHECK;
@@ -251,19 +266,26 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV)
RtecEventChannelAdmin::Observer_Handle h =
rmt_ec->append_observer (obs.in (), ACE_TRY_ENV);
ACE_CHECK;
-
+
this->gwys_[i].observer_handle (h);
- this->gwys_[i].init (rmt_ec,
- this->event_channel_.in (),
- RtecScheduler::Scheduler::_nil (),
- RtecScheduler::Scheduler::_nil (),
- 0, 0,
- ACE_TRY_ENV);
ACE_CHECK;
}
- this->EC_Driver::execute_test (ACE_TRY_ENV);
+ if (this->allocate_tasks () == -1)
+ return;
+
+ this->activate_tasks (ACE_TRY_ENV);
+ ACE_CHECK;
+
+ if (this->verbose ())
+ ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n",
+ this->id_));
+}
+
+void
+EC_Observer::run_cleanup (CORBA::Environment& ACE_TRY_ENV)
+{
for (int j = 0; j != this->master_->channel_count (); ++j)
{
if (j == this->id_)
@@ -275,7 +297,7 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV)
ACE_TRY_ENV);
ACE_CHECK;
- this->gwys_[j].close (ACE_TRY_ENV);
+ this->gwys_[j].shutdown (ACE_TRY_ENV);
ACE_CHECK;
}
}
@@ -300,7 +322,7 @@ EC_Observer::connect_consumer (
return;
}
unsigned int x = ACE_OS::rand_r (this->seed_);
- if (x < RAND_MAX / 2)
+ if (x < RAND_MAX / 8)
this->EC_Driver::connect_consumer (consumer_admin, i,
ACE_TRY_ENV);
}
@@ -311,14 +333,20 @@ EC_Observer::consumer_push (void*,
CORBA::Environment& ACE_TRY_ENV)
{
unsigned int x = ACE_OS::rand_r (this->seed_);
- if (x < RAND_MAX / 2)
+ if (x < (RAND_MAX / 64))
{
+ if (this->verbose ())
+ ACE_DEBUG ((LM_DEBUG,
+ "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_));
+
RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
this->event_channel_->for_consumers (ACE_TRY_ENV);
ACE_CHECK;
- for (int i = 0; i < this->n_consumers_; ++i)
+ for (int i = 1; i < this->n_consumers_; ++i)
{
+ ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
+
if (this->consumers_[i]->connected ())
{
this->consumers_[i]->disconnect (ACE_TRY_ENV);
diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.h b/TAO/orbsvcs/tests/Event/Basic/Observer.h
index 297a4096ae5..8b4d7ce5a9f 100644
--- a/TAO/orbsvcs/tests/Event/Basic/Observer.h
+++ b/TAO/orbsvcs/tests/Event/Basic/Observer.h
@@ -94,6 +94,7 @@ public:
// add some command line args to enable/disable observerions
void execute_test (CORBA::Environment& ACE_TRY_ENV);
+ void run_cleanup (CORBA::Environment& ACE_TRY_ENV);
// Run the suppliers, using the <thread_manager> parameter
void dump_results (void);
@@ -111,6 +112,9 @@ private:
int id_;
TAO_EC_Gateway_IIOP *gwys_;
+
+ ACE_SYNCH_MUTEX lock_;
+ // lock internal state
};
#endif /* EC_OBSERVER_H */
diff --git a/TAO/orbsvcs/tests/Event/Basic/svc.conf b/TAO/orbsvcs/tests/Event/Basic/svc.conf
new file mode 100644
index 00000000000..cd1a374e3f1
--- /dev/null
+++ b/TAO/orbsvcs/tests/Event/Basic/svc.conf
@@ -0,0 +1,2 @@
+# $Id$
+static EC_Factory "-ECpushsupplierset delayed -ECdispatching reactive -ECfiltering basic -ECproxyconsumerlock thread -ECproxysupplierlock thread -ECconsumeradminlock null -ECsupplieradminlock thread -ECsupplierfiltering null"
diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.cpp b/TAO/orbsvcs/tests/Event/lib/Driver.cpp
index 534a6fb503e..e86c03dd7bf 100644
--- a/TAO/orbsvcs/tests/Event/lib/Driver.cpp
+++ b/TAO/orbsvcs/tests/Event/lib/Driver.cpp
@@ -922,6 +922,30 @@ EC_Driver::parse_args (int &argc, char *argv [])
}
}
+ else if (ACE_OS::strcmp (arg, "-busyhwm") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ this->busy_hwm_ =
+ ACE_OS::atoi (arg_shifter.get_current ());
+ arg_shifter.consume_arg ();
+ }
+ }
+
+ else if (ACE_OS::strcmp (arg, "-maxwritedelay") == 0)
+ {
+ arg_shifter.consume_arg ();
+
+ if (arg_shifter.is_parameter_next ())
+ {
+ this->max_write_delay_ =
+ ACE_OS::atoi (arg_shifter.get_current ());
+ arg_shifter.consume_arg ();
+ }
+ }
+
else
{
arg_shifter.ignore_arg ();
@@ -952,12 +976,16 @@ EC_Driver::print_usage (void)
" -supplier_tstart <type>\n"
" -supplier_tcount <count>\n"
" -supplier_tshift <shift>\n"
+ " -busy_hwm <value>\n"
+ " -max_write_delay <value>\n"
));
}
void
-EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes&)
+EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes& attr)
{
+ attr.busy_hwm = this->busy_hwm_;
+ attr.max_write_delay = this->max_write_delay_;
}
void
diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.h b/TAO/orbsvcs/tests/Event/lib/Driver.h
index e6adbb9f822..f64ea4d7398 100644
--- a/TAO/orbsvcs/tests/Event/lib/Driver.h
+++ b/TAO/orbsvcs/tests/Event/lib/Driver.h
@@ -169,7 +169,7 @@ public:
const RtecEventComm::EventSet& event,
CORBA::Environment& ACE_TRY_ENV);
// One of the consumers in the test has received an event
-
+
virtual void consumer_shutdown (void* consumer_cookie,
CORBA::Environment& ACE_TRY_ENV);
// One of the consumers has received a shutdown event
@@ -384,6 +384,10 @@ protected:
RtecEventChannelAdmin::EventChannel_var event_channel_;
// The event channel object reference
+
+ int busy_hwm_;
+ int max_write_delay_;
+ // Control the concurrency inside the EC.
};
#if defined (__ACE_INLINE__)