diff options
author | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-22 22:53:02 +0000 |
---|---|---|
committer | coryan <coryan@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-05-22 22:53:02 +0000 |
commit | 07f48b6c6f3fea082660939227837b832a2e4394 (patch) | |
tree | 800544c6d26be1590e3d848ab2e01e659ffbd195 | |
parent | dc15d6f74631ce74e423e1a0925e48db83c95916 (diff) | |
download | ATCD-07f48b6c6f3fea082660939227837b832a2e4394.tar.gz |
ChangeLogTag:Sat May 22 17:50:12 1999 Carlos O'Ryan <coryan@cs.wustl.edu>
25 files changed, 569 insertions, 197 deletions
diff --git a/TAO/ChangeLog-99c b/TAO/ChangeLog-99c index 217d00474b7..520982c98df 100644 --- a/TAO/ChangeLog-99c +++ b/TAO/ChangeLog-99c @@ -1,3 +1,64 @@ +Sat May 22 17:50:12 1999 Carlos O'Ryan <coryan@cs.wustl.edu> + + * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h: + * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i: + * orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp: + * orbsvcs/orbsvcs/Event/EC_Event_Channel.h: + * orbsvcs/orbsvcs/Event/EC_Event_Channel.i: + * orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp: + * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h: + * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp: + The EC attributes include parameters that control the level of + concurrency in the proxy supplier sets. + + * orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h: + * orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h: + * orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp: + * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h: + * orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp: + Added reference counting: several threads could be pushing on + one consumer proxy and another thread decides to disconnect from + it. The corresponging supplier set must be deleted only once + all the threads finish using it. + In many cases the implementation is trivial because there is a + clear owner (ex: the ConsumerAdmin), and in others the locking + is shared with the consumer proxy (Per_Supplier). + + * orbsvcs/orbsvcs/Event/EC_Gateway.h: + * orbsvcs/orbsvcs/Event/EC_Gateway.cpp: + The gateway was not properly synchronized. The locking in the + push() method is interesting: we don't hold the lock during the + complete dispatch, instead we keep track of the number of + threads pushing to the gateway. + If we receive an update_consumer() call during a push we post + the change until all the push() call finish. Only the last + update_consumer() is kept because they overwrite each other. + + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h: + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i: + * orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp: + We needed an unprotected accessor to the publications, to avoid + dead-locks during connect_push_consumer call. + Use the new reference counting on the EC_SupplierFiltering + class. + + * orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp: + We were holding locks during upcalls to the gateways and not + setting the QoS parameters correctly. + The update_consumer/update_supplier messages were backwards. + + * orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp: + Fixed several race conditions related to connect/disconnect + calls during pushes. + + * orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp: + Removed debug messages. + + * orbsvcs/tests/Event/Basic/Observer.h: + * orbsvcs/tests/Event/Basic/Observer.cpp: + * orbsvcs/tests/Event/Basic/svc.conf: + Fixed minor problems with the test itself. + Sat May 22 17:10:11 1999 Chris Gill <cdgill@cs.wustl.edu> * TAO version 0.3.23 released. diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp index 9df4ba2ea54..23b54f7bf11 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.cpp @@ -21,6 +21,8 @@ TAO_EC_ConsumerAdmin::TAO_EC_ConsumerAdmin (TAO_EC_Event_Channel *ec, { this->supplier_set_ = this->event_channel_->create_proxy_push_supplier_set (); + this->supplier_set_->busy_hwm (this->event_channel_->busy_hwm ()); + this->supplier_set_->max_write_delay (this->event_channel_->max_write_delay ()); } this->default_POA_ = this->event_channel_->consumer_poa (); diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h index 52f070b9995..4dee0091898 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.h @@ -77,12 +77,6 @@ public: typedef TAO_EC_ProxyPushSupplier_Set::Busy_Lock Busy_Lock; Busy_Lock& busy_lock (void); - void busy_hwm (CORBA::ULong hwm); - CORBA::ULong busy_hwm (void) const; - void max_write_delay (CORBA::ULong hwm); - CORBA::ULong max_write_delay (void) const; - // Delegate on the EC_ProxyPushSupplier.... - virtual void connected (TAO_EC_ProxyPushConsumer*, CORBA::Environment&); virtual void disconnected (TAO_EC_ProxyPushConsumer*, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i index 9d316a56539..ee065379a5e 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ConsumerAdmin.i @@ -17,27 +17,3 @@ TAO_EC_ConsumerAdmin::busy_lock (void) { return this->supplier_set_->busy_lock (); } - -ACE_INLINE void -TAO_EC_ConsumerAdmin::busy_hwm (CORBA::ULong hwm) -{ - this->supplier_set_->busy_hwm (hwm); -} - -ACE_INLINE CORBA::ULong -TAO_EC_ConsumerAdmin::busy_hwm (void) const -{ - return this->supplier_set_->busy_hwm (); -} - -ACE_INLINE void -TAO_EC_ConsumerAdmin::max_write_delay (CORBA::ULong hwm) -{ - this->supplier_set_->max_write_delay (hwm); -} - -ACE_INLINE CORBA::ULong -TAO_EC_ConsumerAdmin::max_write_delay (void) const -{ - return this->supplier_set_->max_write_delay (); -} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp index 55a36426f62..72694d63538 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.cpp @@ -24,7 +24,9 @@ TAO_EC_Event_Channel (const TAO_EC_Event_Channel_Attributes& attr, factory_ (factory), own_factory_ (own_factory), consumer_reconnect_ (attr.consumer_reconnect), - supplier_reconnect_ (attr.supplier_reconnect) + supplier_reconnect_ (attr.supplier_reconnect), + busy_hwm_ (attr.busy_hwm), + max_write_delay_ (attr.max_write_delay) { if (this->factory_ == 0) { @@ -60,10 +62,6 @@ TAO_EC_Event_Channel (const TAO_EC_Event_Channel_Attributes& attr, this->scheduling_strategy_ = this->factory_->create_scheduling_strategy (this); - - this->consumer_admin_->busy_hwm (attr.consumer_admin_busy_hwm); - this->consumer_admin_->max_write_delay (attr.consumer_admin_max_write_delay); - } TAO_EC_Event_Channel::~TAO_EC_Event_Channel (void) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h index fd213d32c8b..264417b61d3 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.h @@ -66,8 +66,8 @@ public: int supplier_reconnect; // Can consumers or suppliers invoke connect_push_* multiple times? - int consumer_admin_busy_hwm; - int consumer_admin_max_write_delay; + int busy_hwm; + int max_write_delay; // Flags for the Consumer Admin RtecScheduler::Scheduler_ptr scheduler; @@ -197,6 +197,11 @@ public: RtecScheduler::Scheduler_ptr scheduler (void); // Obtain the scheduler, the user must release + int busy_hwm (void) const; + int max_write_delay (void) const; + // Control the concurrency of the delayed connect/disconnect + // operations. + // = The RtecEventChannelAdmin::EventChannel methods... virtual RtecEventChannelAdmin::ConsumerAdmin_ptr for_consumers (CORBA::Environment& env); @@ -268,6 +273,11 @@ private: int consumer_reconnect_; int supplier_reconnect_; // Consumer/Supplier reconnection flags + + int busy_hwm_; + int max_write_delay_; + // Control the level of concurrency in the supplier sets with + // delayed operations }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i index 8b4cb9999b2..003de1ebbef 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Event_Channel.i @@ -6,8 +6,8 @@ TAO_EC_Event_Channel_Attributes (PortableServer::POA_ptr s_poa, PortableServer::POA_ptr c_poa) : consumer_reconnect (0), supplier_reconnect (0), - consumer_admin_busy_hwm (1), - consumer_admin_max_write_delay (1), + busy_hwm (1), + max_write_delay (1), scheduler (RtecScheduler::Scheduler::_nil ()), supplier_poa (s_poa), consumer_poa (c_poa) @@ -169,3 +169,15 @@ TAO_EC_Event_Channel::scheduler (void) { return RtecScheduler::Scheduler::_duplicate (this->scheduler_.in ()); } + +ACE_INLINE int +TAO_EC_Event_Channel::busy_hwm (void) const +{ + return this->busy_hwm_; +} + +ACE_INLINE int +TAO_EC_Event_Channel::max_write_delay (void) const +{ + return this->max_write_delay_; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp index 47031fd4c21..9b61bcae6a8 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.cpp @@ -30,8 +30,12 @@ TAO_EC_Gateway::observer_handle (void) const // **************************************************************** TAO_EC_Gateway_IIOP::TAO_EC_Gateway_IIOP (void) - : consumer_ (this), - supplier_ (this) + : busy_count_ (0), + update_posted_ (0), + consumer_ (this), + consumer_is_active_ (0), + supplier_ (this), + supplier_is_active_ (0) { } @@ -48,6 +52,11 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, const char* rmt_name, CORBA::Environment &ACE_TRY_ENV) { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (!CORBA::is_nil (this->rmt_ec_.in ())) + return; + this->rmt_ec_ = RtecEventChannelAdmin::EventChannel::_duplicate (rmt_ec); this->lcl_ec_ = @@ -103,9 +112,16 @@ TAO_EC_Gateway_IIOP::init (RtecEventChannelAdmin::EventChannel_ptr rmt_ec, void TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV) { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + this->close_i (ACE_TRY_ENV); +} + + +void +TAO_EC_Gateway_IIOP::close_i (CORBA::Environment &ACE_TRY_ENV) +{ // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Closing gateway\n")); - if (CORBA::is_nil (this->supplier_proxy_.in ())) - return; if (this->consumer_proxy_map_.current_size () > 0) { @@ -113,75 +129,105 @@ TAO_EC_Gateway_IIOP::close (CORBA::Environment &ACE_TRY_ENV) j != this->consumer_proxy_map_.end (); ++j) { - (*j).int_id_->disconnect_push_consumer (ACE_TRY_ENV); - CORBA::release ((*j).int_id_); + RtecEventComm::PushConsumer_ptr consumer = (*j).int_id_; + if (CORBA::is_nil (consumer)) + continue; + consumer->disconnect_push_consumer (ACE_TRY_ENV); + CORBA::release (consumer); ACE_CHECK; } this->consumer_proxy_map_.close (); } - this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV); - ACE_CHECK; + if (!CORBA::is_nil (this->default_consumer_proxy_.in ())) + { + this->default_consumer_proxy_->disconnect_push_consumer (ACE_TRY_ENV); + ACE_CHECK; - this->default_consumer_proxy_ = - RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + this->default_consumer_proxy_ = + RtecEventChannelAdmin::ProxyPushConsumer::_nil (); + } - { - PortableServer::POA_var poa = - this->supplier_._default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->supplier_, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; - } + if (this->supplier_is_active_) + { + PortableServer::POA_var poa = + this->supplier_._default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->supplier_, ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; + this->supplier_is_active_ = 0; + } - this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV); - ACE_CHECK; + if (!CORBA::is_nil (this->supplier_proxy_.in ())) + { + this->supplier_proxy_->disconnect_push_supplier (ACE_TRY_ENV); + ACE_CHECK; - this->supplier_proxy_ = - RtecEventChannelAdmin::ProxyPushSupplier::_nil (); - { - PortableServer::POA_var poa = - this->consumer_._default_POA (ACE_TRY_ENV); - ACE_CHECK; - PortableServer::ObjectId_var id = - poa->servant_to_id (&this->consumer_, ACE_TRY_ENV); - ACE_CHECK; - poa->deactivate_object (id.in (), ACE_TRY_ENV); - ACE_CHECK; - } + this->supplier_proxy_ = + RtecEventChannelAdmin::ProxyPushSupplier::_nil (); + } + + if (this->consumer_is_active_) + { + PortableServer::POA_var poa = + this->consumer_._default_POA (ACE_TRY_ENV); + ACE_CHECK; + PortableServer::ObjectId_var id = + poa->servant_to_id (&this->consumer_, ACE_TRY_ENV); + ACE_CHECK; + poa->deactivate_object (id.in (), ACE_TRY_ENV); + ACE_CHECK; + this->consumer_is_active_ = 0; + } } void -TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& c_qos, - CORBA::Environment& env) +TAO_EC_Gateway_IIOP::update_consumer ( + const RtecEventChannelAdmin::ConsumerQOS& c_qos, + CORBA::Environment& ACE_TRY_ENV) { - this->close (env); - TAO_CHECK_ENV_RETURN_VOID (env); - if (c_qos.dependencies.length () <= 1) return; - // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + if (this->busy_count_ != 0) + { + this->update_posted_ = 1; + this->c_qos_ = c_qos; + return; + } + + this->update_consumer_i (c_qos, ACE_TRY_ENV); +} + +void +TAO_EC_Gateway_IIOP::update_consumer_i ( + const RtecEventChannelAdmin::ConsumerQOS& c_qos, + CORBA::Environment& ACE_TRY_ENV) +{ + this->close_i (ACE_TRY_ENV); + ACE_CHECK; + if (CORBA::is_nil (this->lcl_ec_.in ()) || CORBA::is_nil (this->rmt_ec_.in ())) return; + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Open gateway\n")); + // = Connect as a supplier to the local EC RtecEventChannelAdmin::SupplierAdmin_var supplier_admin = - this->lcl_ec_->for_suppliers (env); - TAO_CHECK_ENV_RETURN_VOID (env); - - this->default_consumer_proxy_ = - supplier_admin->obtain_push_consumer (env); - TAO_CHECK_ENV_RETURN_VOID (env); + this->lcl_ec_->for_suppliers (ACE_TRY_ENV); + ACE_CHECK; // Change the RT_Info in the consumer QoS. // On the same loop we discover the subscriptions by event source, // and fill the consumer proxy map. RtecEventChannelAdmin::ConsumerQOS sub = c_qos; + sub.is_gateway = 1; for (CORBA::ULong i = 0; i < sub.dependencies.length (); ++i) { sub.dependencies[i].rt_info = this->rmt_info_; @@ -192,19 +238,22 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& if (sid != 0 && this->consumer_proxy_map_.find (sid, proxy) == -1) { - proxy = supplier_admin->obtain_push_consumer (env); - TAO_CHECK_ENV_RETURN_VOID (env); + proxy = supplier_admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; this->consumer_proxy_map_.bind (sid, proxy); } } - // Obtain a reference to our supplier personality... - RtecEventComm::PushSupplier_var supplier_ref = - this->supplier_._this (env); - TAO_CHECK_ENV_RETURN_VOID (env); if (this->consumer_proxy_map_.current_size () > 0) { + this->supplier_is_active_ = 1; + + // Obtain a reference to our supplier personality... + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (ACE_TRY_ENV); + ACE_CHECK; + // For each subscription by source build the set of publications // (they may several, by type, for instance) and connect to the // consumer proxy. @@ -239,8 +288,8 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& pub.publications.length (c); (*j).int_id_->connect_push_supplier (supplier_ref.in (), pub, - env); - TAO_CHECK_ENV_RETURN_VOID (env); + ACE_TRY_ENV); + ACE_CHECK; } } @@ -267,44 +316,59 @@ TAO_EC_Gateway_IIOP::update_consumer (const RtecEventChannelAdmin::ConsumerQOS& pub.publications[c].dependency_info.rt_info = this->lcl_info_; c++; } + if (c > 0) { + this->supplier_is_active_ = 1; + + // Obtain a reference to our supplier personality... + RtecEventComm::PushSupplier_var supplier_ref = + this->supplier_._this (ACE_TRY_ENV); + ACE_CHECK; + + // Obtain the consumer.... + this->default_consumer_proxy_ = + supplier_admin->obtain_push_consumer (ACE_TRY_ENV); + ACE_CHECK; + pub.publications.length (c); this->default_consumer_proxy_->connect_push_supplier (supplier_ref.in (), pub, - env); - TAO_CHECK_ENV_RETURN_VOID (env); + ACE_TRY_ENV); + ACE_CHECK; } - //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); - //ACE_SupplierQOS_Factory::debug (pub); + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Supplier ")); + // ACE_SupplierQOS_Factory::debug (pub); RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = - this->rmt_ec_->for_consumers (env); - TAO_CHECK_ENV_RETURN_VOID (env); + this->rmt_ec_->for_consumers (ACE_TRY_ENV); + ACE_CHECK; this->supplier_proxy_ = - consumer_admin->obtain_push_supplier (env); - TAO_CHECK_ENV_RETURN_VOID (env); + consumer_admin->obtain_push_supplier (ACE_TRY_ENV); + ACE_CHECK; + this->consumer_is_active_ = 1; RtecEventComm::PushConsumer_var consumer_ref = - this->consumer_._this (env); - TAO_CHECK_ENV_RETURN_VOID (env); + this->consumer_._this (ACE_TRY_ENV); + ACE_CHECK; - //ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer ")); - //ACE_ConsumerQOS_Factory::debug (sub); + // ACE_DEBUG ((LM_DEBUG, "ECG (%t) Gateway/Consumer ")); + // ACE_ConsumerQOS_Factory::debug (sub); this->supplier_proxy_->connect_push_consumer (consumer_ref.in (), sub, - env); - TAO_CHECK_ENV_RETURN_VOID (env); + ACE_TRY_ENV); + ACE_CHECK; } void -TAO_EC_Gateway_IIOP::update_supplier (const RtecEventChannelAdmin::SupplierQOS&, - CORBA::Environment&) +TAO_EC_Gateway_IIOP::update_supplier ( + const RtecEventChannelAdmin::SupplierQOS&, + CORBA::Environment&) { // Do nothing... } @@ -327,7 +391,7 @@ TAO_EC_Gateway_IIOP::disconnect_push_supplier (CORBA::Environment &) void TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, - CORBA::Environment & env) + CORBA::Environment & ACE_TRY_ENV) { //ACE_DEBUG ((LM_DEBUG, "TAO_EC_Gateway_IIOP::push - ")); @@ -337,7 +401,13 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, return; } - //ACE_DEBUG ((LM_DEBUG, "ECP: %d event(s) - ", events.length ())); + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + this->busy_count_++; + } + + // ACE_DEBUG ((LM_DEBUG, "ECG: %d event(s)\n", events.length ())); // @@ TODO, there is an extra data copy here, we should do the event // modification without it and only compact the necessary events. @@ -357,21 +427,43 @@ TAO_EC_Gateway_IIOP::push (const RtecEventComm::EventSet &events, // default consumer proxy. proxy = this->default_consumer_proxy_.in (); } + + if (CORBA::is_nil (proxy)) + continue; + out[0] = events[i]; out[0].header.ttl--; - proxy->push (out, env); - TAO_CHECK_ENV_RETURN_VOID (env); + + proxy->push (out, ACE_TRY_ENV); + ACE_CHECK; } + + { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + this->busy_count_--; + + if (this->busy_count_ == 0 && this->update_posted_ != 0) + { + this->update_posted_ = 0; + this->update_consumer_i (this->c_qos_, ACE_TRY_ENV); + } + } + } int -TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& TAO_IN_ENV) +TAO_EC_Gateway_IIOP::shutdown (CORBA::Environment& ACE_TRY_ENV) { - this->close (TAO_IN_ENV); - if (TAO_IN_ENV.exception () == 0) return -1; + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + this->close_i (ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); - this->lcl_ec_ = 0; - this->rmt_ec_ = 0; + this->lcl_ec_ = + RtecEventChannelAdmin::EventChannel::_nil (); + this->rmt_ec_ = + RtecEventChannelAdmin::EventChannel::_nil (); return 0; } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h index 9d50dd0e8ae..50835bec895 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Gateway.h @@ -143,6 +143,25 @@ public: CORBA::Environment& env); private: + void close_i (CORBA::Environment& ); + + void update_consumer_i (const RtecEventChannelAdmin::ConsumerQOS& sub, + CORBA::Environment& env); + +private: + ACE_SYNCH_MUTEX lock_; + // Lock to synchronize internal changes + + CORBA::ULong busy_count_; + // How many threads are running push() we cannot make changes until + // that reaches 0 + + int update_posted_; + RtecEventChannelAdmin::ConsumerQOS c_qos_; + // An update_consumer() message arrived *while* we were doing a + // push() the modification is stored <pub_>, if multiple + // update_consumer messages arrive only the last one is executed. + RtecEventChannelAdmin::EventChannel_var rmt_ec_; RtecEventChannelAdmin::EventChannel_var lcl_ec_; // The remote and the local EC, so we can reconnect when the list changes. @@ -153,9 +172,13 @@ private: ACE_PushConsumer_Adapter<TAO_EC_Gateway_IIOP> consumer_; // Our consumer personality.... + int consumer_is_active_; + // If it is not 0 then we must deactivate the supplier ACE_PushSupplier_Adapter<TAO_EC_Gateway_IIOP> supplier_; // Our supplier personality.... + int supplier_is_active_; + // If it is not 0 then we must deactivate the supplier // We use a different Consumer_Proxy typedef ACE_Map_Manager<RtecEventComm::EventSourceID,RtecEventChannelAdmin::ProxyPushConsumer_ptr,ACE_Null_Mutex> Consumer_Map; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp index 994683d9279..ac02d5b112f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ObserverStrategy.cpp @@ -86,18 +86,21 @@ TAO_EC_Basic_ObserverStrategy::append_observer ( RtecEventChannel::EventChannel::SYNCHRONIZATION_ERROR, RtecEventChannel::EventChannel::CANT_APPEND_OBSERVER)) { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); - ACE_CHECK_RETURN (0); + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR()); + ACE_CHECK_RETURN (0); - this->handle_generator_++; - Observer_Entry entry (this->handle_generator_, - RtecEventChannelAdmin::Observer::_duplicate (obs)); + this->handle_generator_++; + Observer_Entry entry (this->handle_generator_, + RtecEventChannelAdmin::Observer::_duplicate (obs)); - if (this->observers_.bind (entry.handle, entry) == -1) - ACE_THROW_RETURN ( - RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(), - 0); + if (this->observers_.bind (entry.handle, entry) == -1) + ACE_THROW_RETURN ( + RtecEventChannelAdmin::EventChannel::CANT_APPEND_OBSERVER(), + 0); + } RtecEventChannelAdmin::ConsumerQOS c_qos; this->fill_qos (c_qos, ACE_TRY_ENV); @@ -133,11 +136,15 @@ TAO_EC_Basic_ObserverStrategy::remove_observer ( } void -TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_Basic_ObserverStrategy::connected ( + TAO_EC_ProxyPushConsumer *consumer, + CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::ConsumerQOS c_qos; - this->fill_qos (c_qos, ACE_TRY_ENV); + if (consumer->publications ().is_gateway) + return; + + RtecEventChannelAdmin::SupplierQOS s_qos; + this->fill_qos (s_qos, ACE_TRY_ENV); ACE_CHECK; int size = 0; @@ -165,17 +172,21 @@ TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushConsumer*, for (int i = 0; i != size; ++i) { - copy[i]->update_consumer (c_qos, ACE_TRY_ENV); + copy[i]->update_supplier (s_qos, ACE_TRY_ENV); ACE_CHECK; } } void -TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_Basic_ObserverStrategy::disconnected ( + TAO_EC_ProxyPushConsumer* consumer, + CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::ConsumerQOS c_qos; - this->fill_qos (c_qos, ACE_TRY_ENV); + if (consumer->publications ().is_gateway) + return; + + RtecEventChannelAdmin::SupplierQOS s_qos; + this->fill_qos (s_qos, ACE_TRY_ENV); ACE_CHECK; int size = 0; @@ -203,17 +214,21 @@ TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushConsumer*, for (int i = 0; i != size; ++i) { - copy[i]->update_consumer (c_qos, ACE_TRY_ENV); + copy[i]->update_supplier (s_qos, ACE_TRY_ENV); ACE_CHECK; } } void -TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_Basic_ObserverStrategy::connected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::SupplierQOS s_qos; - this->fill_qos (s_qos, ACE_TRY_ENV); + if (supplier->subscriptions ().is_gateway) + return; + + RtecEventChannelAdmin::ConsumerQOS c_qos; + this->fill_qos (c_qos, ACE_TRY_ENV); ACE_CHECK; int size = 0; @@ -241,17 +256,21 @@ TAO_EC_Basic_ObserverStrategy::connected (TAO_EC_ProxyPushSupplier*, for (int i = 0; i != size; ++i) { - copy[i]->update_supplier (s_qos, ACE_TRY_ENV); + copy[i]->update_consumer (c_qos, ACE_TRY_ENV); ACE_CHECK; } } void -TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*, - CORBA::Environment &ACE_TRY_ENV) +TAO_EC_Basic_ObserverStrategy::disconnected ( + TAO_EC_ProxyPushSupplier* supplier, + CORBA::Environment &ACE_TRY_ENV) { - RtecEventChannelAdmin::SupplierQOS s_qos; - this->fill_qos (s_qos, ACE_TRY_ENV); + if (supplier->subscriptions ().is_gateway) + return; + + RtecEventChannelAdmin::ConsumerQOS c_qos; + this->fill_qos (c_qos, ACE_TRY_ENV); ACE_CHECK; int size = 0; @@ -279,7 +298,7 @@ TAO_EC_Basic_ObserverStrategy::disconnected (TAO_EC_ProxyPushSupplier*, for (int i = 0; i != size; ++i) { - copy[i]->update_supplier (s_qos, ACE_TRY_ENV); + copy[i]->update_consumer (c_qos, ACE_TRY_ENV); ACE_CHECK; } } @@ -316,7 +335,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( sub.dependencies[j].event; RtecEventComm::EventType type = event.header.type; - if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED) + if (0 < type && type < ACE_ES_EVENT_UNDEFINED) continue; headers.insert (event.header, 1); } @@ -371,7 +390,7 @@ TAO_EC_Basic_ObserverStrategy::fill_qos ( pub.publications[j].event; RtecEventComm::EventType type = event.header.type; - if (0 <= type && type <= ACE_ES_EVENT_UNDEFINED) + if (0 < type && type < ACE_ES_EVENT_UNDEFINED) continue; headers.insert (event.header, 1); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp index d7c8740deff..9aa562b6060 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.cpp @@ -17,10 +17,13 @@ ACE_RCSID(Event, EC_Per_Supplier_Filter, "$Id$") TAO_EC_Per_Supplier_Filter:: TAO_EC_Per_Supplier_Filter (TAO_EC_Event_Channel* ec) - : event_channel_ (ec) + : event_channel_ (ec), + refcnt_ (1) { this->supplier_set_ = this->event_channel_->create_proxy_push_supplier_set (); + this->supplier_set_->busy_hwm (this->event_channel_->busy_hwm ()); + this->supplier_set_->max_write_delay (this->event_channel_->max_write_delay ()); } TAO_EC_Per_Supplier_Filter::~TAO_EC_Per_Supplier_Filter (void) @@ -52,7 +55,7 @@ TAO_EC_Per_Supplier_Filter::connected (TAO_EC_ProxyPushSupplier* supplier, return; const RtecEventChannelAdmin::SupplierQOS& pub = - this->consumer_->publications (); + this->consumer_->publications_i (); for (CORBA::ULong j = 0; j < pub.publications.length (); ++j) { @@ -125,6 +128,25 @@ TAO_EC_Per_Supplier_Filter::push (const RtecEventComm::EventSet& event, } } +CORBA::ULong +TAO_EC_Per_Supplier_Filter::_incr_refcnt (void) +{ + this->refcnt_++; + return this->refcnt_; +} + +CORBA::ULong +TAO_EC_Per_Supplier_Filter::_decr_refcnt (void) +{ + this->refcnt_--; + if (this->refcnt_ == 0) + { + this->event_channel_->supplier_filter_builder ()->destroy (this); + return 0; + } + return this->refcnt_; +} + // **************************************************************** TAO_EC_SupplierFiltering* diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h index 9880d6d8a44..33201e670c1 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Per_Supplier_Filter.h @@ -64,6 +64,8 @@ public: virtual void shutdown (CORBA::Environment &env); virtual void push (const RtecEventComm::EventSet& event, CORBA::Environment &); + virtual CORBA::ULong _decr_refcnt (void); + virtual CORBA::ULong _incr_refcnt (void); private: TAO_EC_Event_Channel *event_channel_; @@ -75,6 +77,9 @@ private: TAO_EC_ProxyPushSupplier_Set* supplier_set_; // Keep the list of proxies for the consumers that may be interested // in our events. + + CORBA::ULong refcnt_; + // Reference counting }; // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp index 559434320de..c1512e55bcb 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.cpp @@ -35,16 +35,58 @@ void TAO_EC_ProxyPushConsumer::connected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { - if (this->is_connected ()) - this->filter_->connected (supplier, ACE_TRY_ENV); + TAO_EC_SupplierFiltering* filter = 0; + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + if (this->is_connected_i () == 0) + return; + + filter = this->filter_; + filter->_incr_refcnt (); + } + + filter->connected (supplier, ACE_TRY_ENV); + + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + filter->_decr_refcnt (); + } } void TAO_EC_ProxyPushConsumer::disconnected (TAO_EC_ProxyPushSupplier* supplier, CORBA::Environment &ACE_TRY_ENV) { - if (this->is_connected ()) - this->filter_->disconnected (supplier, ACE_TRY_ENV); + TAO_EC_SupplierFiltering* filter = 0; + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + if (this->is_connected_i () == 0) + return; + + filter = this->filter_; + filter->_incr_refcnt (); + } + + filter->disconnected (supplier, ACE_TRY_ENV); + + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + filter->_decr_refcnt (); + } } void @@ -94,7 +136,7 @@ TAO_EC_ProxyPushConsumer::cleanup_i (void) RtecEventComm::PushSupplier::_nil (); this->filter_->unbind (this); - this->event_channel_->supplier_filter_builder ()->destroy (this->filter_); + this->filter_->_decr_refcnt (); this->filter_ = 0; } @@ -189,12 +231,37 @@ void TAO_EC_ProxyPushConsumer::push (const RtecEventComm::EventSet& event, CORBA::Environment &ACE_TRY_ENV) { - if (this->is_connected () == 0) - return; // @@ THROW something??? + TAO_EC_SupplierFiltering* filter = 0; + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + + if (this->is_connected_i () == 0) + return; // @@ THROW something??? + + filter = this->filter_; + filter->_incr_refcnt (); + + this->refcount_++; + } // No need to keep the lock, the filter_ class is supposed to be // thread safe.... - this->filter_->push (event, ACE_TRY_ENV); + filter->push (event, ACE_TRY_ENV); + + { + ACE_GUARD_THROW_EX ( + ACE_Lock, ace_mon, *this->lock_, + RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); + ACE_CHECK; + filter->_decr_refcnt (); + this->refcount_--; + if (this->refcount_ != 0) + return; + } + this->event_channel_->destroy_proxy_push_consumer (this); } void diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h index 528df12f129..3cc9cd8b303 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.h @@ -77,29 +77,33 @@ public: // The QoS (subscription) used to connect to the EC. virtual void connected (TAO_EC_ProxyPushSupplier* supplier, - CORBA::Environment &env); + CORBA::Environment &env); virtual void disconnected (TAO_EC_ProxyPushSupplier* supplier, - CORBA::Environment &env); + CORBA::Environment &env); // Concrete implementations can use this methods to keep track of // the consumers interested in this events. virtual void connected (TAO_EC_ProxyPushConsumer* consumer, - CORBA::Environment &env); + CORBA::Environment &env); virtual void disconnected (TAO_EC_ProxyPushConsumer* consumer, - CORBA::Environment &env); + CORBA::Environment &env); // Usually implemented as no-ops, but some configurations may // require this methods. virtual void shutdown (CORBA::Environment&); // The event channel is shutting down + const RtecEventChannelAdmin::SupplierQOS& publications_i (void) const; + // The QoS (subscription) used to connect to the EC, assumes the + // locks are held, use with care! + CORBA::ULong _incr_refcnt (void); CORBA::ULong _decr_refcnt (void); // Increment and decrement the reference count. // = The RtecEventChannelAdmin::ProxyPushConsumer methods... virtual void connect_push_supplier ( - RtecEventComm::PushSupplier_ptr push_supplier, + RtecEventComm::PushSupplier_ptr push_supplier, const RtecEventChannelAdmin::SupplierQOS& qos, CORBA::Environment &); virtual void push (const RtecEventComm::EventSet& event, diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i index 0ca9186a78d..e5abd1f309f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyConsumer.i @@ -24,9 +24,14 @@ TAO_EC_ProxyPushConsumer::supplier (void) const ACE_INLINE const RtecEventChannelAdmin::SupplierQOS& TAO_EC_ProxyPushConsumer::publications (void) const { - // @@ TODO There should be a better way to signal errors here. - static RtecEventChannelAdmin::SupplierQOS empty_qos; - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, empty_qos); + // @@ TODO There should some way to signal errors here. + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, this->qos_); return this->qos_; } + +ACE_INLINE const RtecEventChannelAdmin::SupplierQOS& +TAO_EC_ProxyPushConsumer::publications_i (void) const +{ + return this->qos_; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp index 94631a3f23f..0b832320644 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxyPushSupplier_Set_T.cpp @@ -111,11 +111,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::connected ( ACE_NEW (command, TAO_EC_ProxyPushSupplier_Set::Connected_Command (this, supplier)); - - ACE_DEBUG ((LM_DEBUG, - "EC (%P|%t) Delayed connection command = %x\n", - command)); - this->command_queue_.enqueue_tail (command); this->write_delay_++; } @@ -141,9 +136,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::disconnected ( ACE_NEW (command, TAO_EC_ProxyPushSupplier_Set::Disconnected_Command (this, supplier)); - ACE_DEBUG ((LM_DEBUG, - "EC (%P|%t) Delayed disconnection command = %x\n", - command)); this->command_queue_.enqueue_tail (command); this->write_delay_++; @@ -188,10 +180,6 @@ TAO_EC_ProxyPushSupplier_Set_Delayed<ACE_SYNCH_USE>::execute_delayed_operations command->execute (); - ACE_DEBUG ((LM_DEBUG, - "EC (%P|%t) Executed delayed command = %x\n", - command)); - delete command; } } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp index 090bb115750..bbc4163bf0c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_ProxySupplier.cpp @@ -218,7 +218,7 @@ TAO_EC_ProxyPushSupplier::disconnect_push_supplier ( this->deactivate (ACE_TRY_ENV); ACE_CHECK; - // Notify the event channel... + // Notify the event channel.... this->event_channel_->disconnected (this, ACE_TRY_ENV); this->_decr_refcnt (); @@ -259,6 +259,9 @@ TAO_EC_ProxyPushSupplier::filter (const RtecEventComm::EventSet& event, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK_RETURN (0); + if (this->is_connected_i () == 0) + return 0; + result = this->child_->filter (event, qos_info, ACE_TRY_ENV); if (this->refcount_ > 0) @@ -282,6 +285,9 @@ TAO_EC_ProxyPushSupplier::filter_nocopy (RtecEventComm::EventSet& event, RtecEventChannelAdmin::EventChannel::SYNCHRONIZATION_ERROR ()); ACE_CHECK_RETURN (0); + if (this->is_connected_i () == 0) + return 0; + result = this->child_->filter_nocopy (event, qos_info, ACE_TRY_ENV); if (this->refcount_ > 0) @@ -434,6 +440,9 @@ TAO_EC_ProxyPushSupplier::can_match ( { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->lock_, 0); + if (this->is_connected_i () == 0) + return 0; + return this->child_->can_match (header); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h index 0e307aa59d5..e9167f64c5d 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_SupplierFiltering.h @@ -109,6 +109,11 @@ public: CORBA::Environment &) = 0; // The ProxyPushConsumer delegates on this class to actually send // the event. + + virtual CORBA::ULong _incr_refcnt (void) = 0; + virtual CORBA::ULong _decr_refcnt (void) = 0; + // Increment and decrement the reference count, locking must be + // provided by the user. }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp index e6a03a59a66..98a28ba0cd9 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.cpp @@ -83,6 +83,18 @@ TAO_EC_Trivial_Supplier_Filter::push (const RtecEventComm::EventSet& event, } } +CORBA::ULong +TAO_EC_Trivial_Supplier_Filter::_incr_refcnt (void) +{ + return 1; +} + +CORBA::ULong +TAO_EC_Trivial_Supplier_Filter::_decr_refcnt (void) +{ + return 1; +} + // **************************************************************** TAO_EC_Trivial_Supplier_Filter_Builder:: diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h index ac3d293bd68..26a6cd8cb1f 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Trivial_Supplier_Filter.h @@ -61,6 +61,8 @@ public: virtual void shutdown (CORBA::Environment &env); virtual void push (const RtecEventComm::EventSet& event, CORBA::Environment &); + virtual CORBA::ULong _decr_refcnt (void); + virtual CORBA::ULong _incr_refcnt (void); private: TAO_EC_Event_Channel *event_channel_; diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.cpp b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp index f4d43214a54..3cdd72a624d 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Observer.cpp +++ b/TAO/orbsvcs/tests/Event/Basic/Observer.cpp @@ -112,7 +112,15 @@ EC_Master::run (int argc, char* argv[]) } } - } + { + for (int i = 0; i != this->n_channels_; ++i) + { + this->channels_[i]->run_cleanup (ACE_TRY_ENV); + ACE_TRY_CHECK; + } + } + + } ACE_CATCHANY { ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "EC_Driver::run"); @@ -173,7 +181,7 @@ EC_Master::parse_args (int &argc, char *argv []) arg_shifter.consume_arg (); this->seed_ = ACE_OS::atoi (arg_shifter.get_current ()); } - + arg_shifter.ignore_arg (); } return 0; @@ -244,6 +252,13 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV) RtecEventChannelAdmin::EventChannel_ptr rmt_ec = this->master_->channel (i)->event_channel_; + this->gwys_[i].init (rmt_ec, + this->event_channel_.in (), + RtecScheduler::Scheduler::_nil (), + RtecScheduler::Scheduler::_nil (), + 0, 0, + ACE_TRY_ENV); + RtecEventChannelAdmin::Observer_var obs = this->gwys_[i]._this (ACE_TRY_ENV); ACE_CHECK; @@ -251,19 +266,26 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV) RtecEventChannelAdmin::Observer_Handle h = rmt_ec->append_observer (obs.in (), ACE_TRY_ENV); ACE_CHECK; - + this->gwys_[i].observer_handle (h); - this->gwys_[i].init (rmt_ec, - this->event_channel_.in (), - RtecScheduler::Scheduler::_nil (), - RtecScheduler::Scheduler::_nil (), - 0, 0, - ACE_TRY_ENV); ACE_CHECK; } - this->EC_Driver::execute_test (ACE_TRY_ENV); + if (this->allocate_tasks () == -1) + return; + + this->activate_tasks (ACE_TRY_ENV); + ACE_CHECK; + + if (this->verbose ()) + ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n", + this->id_)); +} + +void +EC_Observer::run_cleanup (CORBA::Environment& ACE_TRY_ENV) +{ for (int j = 0; j != this->master_->channel_count (); ++j) { if (j == this->id_) @@ -275,7 +297,7 @@ EC_Observer::execute_test (CORBA::Environment& ACE_TRY_ENV) ACE_TRY_ENV); ACE_CHECK; - this->gwys_[j].close (ACE_TRY_ENV); + this->gwys_[j].shutdown (ACE_TRY_ENV); ACE_CHECK; } } @@ -300,7 +322,7 @@ EC_Observer::connect_consumer ( return; } unsigned int x = ACE_OS::rand_r (this->seed_); - if (x < RAND_MAX / 2) + if (x < RAND_MAX / 8) this->EC_Driver::connect_consumer (consumer_admin, i, ACE_TRY_ENV); } @@ -311,14 +333,20 @@ EC_Observer::consumer_push (void*, CORBA::Environment& ACE_TRY_ENV) { unsigned int x = ACE_OS::rand_r (this->seed_); - if (x < RAND_MAX / 2) + if (x < (RAND_MAX / 64)) { + if (this->verbose ()) + ACE_DEBUG ((LM_DEBUG, + "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_)); + RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin = this->event_channel_->for_consumers (ACE_TRY_ENV); ACE_CHECK; - for (int i = 0; i < this->n_consumers_; ++i) + for (int i = 1; i < this->n_consumers_; ++i) { + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + if (this->consumers_[i]->connected ()) { this->consumers_[i]->disconnect (ACE_TRY_ENV); diff --git a/TAO/orbsvcs/tests/Event/Basic/Observer.h b/TAO/orbsvcs/tests/Event/Basic/Observer.h index 297a4096ae5..8b4d7ce5a9f 100644 --- a/TAO/orbsvcs/tests/Event/Basic/Observer.h +++ b/TAO/orbsvcs/tests/Event/Basic/Observer.h @@ -94,6 +94,7 @@ public: // add some command line args to enable/disable observerions void execute_test (CORBA::Environment& ACE_TRY_ENV); + void run_cleanup (CORBA::Environment& ACE_TRY_ENV); // Run the suppliers, using the <thread_manager> parameter void dump_results (void); @@ -111,6 +112,9 @@ private: int id_; TAO_EC_Gateway_IIOP *gwys_; + + ACE_SYNCH_MUTEX lock_; + // lock internal state }; #endif /* EC_OBSERVER_H */ diff --git a/TAO/orbsvcs/tests/Event/Basic/svc.conf b/TAO/orbsvcs/tests/Event/Basic/svc.conf new file mode 100644 index 00000000000..cd1a374e3f1 --- /dev/null +++ b/TAO/orbsvcs/tests/Event/Basic/svc.conf @@ -0,0 +1,2 @@ +# $Id$ +static EC_Factory "-ECpushsupplierset delayed -ECdispatching reactive -ECfiltering basic -ECproxyconsumerlock thread -ECproxysupplierlock thread -ECconsumeradminlock null -ECsupplieradminlock thread -ECsupplierfiltering null" diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.cpp b/TAO/orbsvcs/tests/Event/lib/Driver.cpp index 534a6fb503e..e86c03dd7bf 100644 --- a/TAO/orbsvcs/tests/Event/lib/Driver.cpp +++ b/TAO/orbsvcs/tests/Event/lib/Driver.cpp @@ -922,6 +922,30 @@ EC_Driver::parse_args (int &argc, char *argv []) } } + else if (ACE_OS::strcmp (arg, "-busyhwm") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->busy_hwm_ = + ACE_OS::atoi (arg_shifter.get_current ()); + arg_shifter.consume_arg (); + } + } + + else if (ACE_OS::strcmp (arg, "-maxwritedelay") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + this->max_write_delay_ = + ACE_OS::atoi (arg_shifter.get_current ()); + arg_shifter.consume_arg (); + } + } + else { arg_shifter.ignore_arg (); @@ -952,12 +976,16 @@ EC_Driver::print_usage (void) " -supplier_tstart <type>\n" " -supplier_tcount <count>\n" " -supplier_tshift <shift>\n" + " -busy_hwm <value>\n" + " -max_write_delay <value>\n" )); } void -EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes&) +EC_Driver::modify_attributes (TAO_EC_Event_Channel_Attributes& attr) { + attr.busy_hwm = this->busy_hwm_; + attr.max_write_delay = this->max_write_delay_; } void diff --git a/TAO/orbsvcs/tests/Event/lib/Driver.h b/TAO/orbsvcs/tests/Event/lib/Driver.h index e6adbb9f822..f64ea4d7398 100644 --- a/TAO/orbsvcs/tests/Event/lib/Driver.h +++ b/TAO/orbsvcs/tests/Event/lib/Driver.h @@ -169,7 +169,7 @@ public: const RtecEventComm::EventSet& event, CORBA::Environment& ACE_TRY_ENV); // One of the consumers in the test has received an event - + virtual void consumer_shutdown (void* consumer_cookie, CORBA::Environment& ACE_TRY_ENV); // One of the consumers has received a shutdown event @@ -384,6 +384,10 @@ protected: RtecEventChannelAdmin::EventChannel_var event_channel_; // The event channel object reference + + int busy_hwm_; + int max_write_delay_; + // Control the concurrency inside the EC. }; #if defined (__ACE_INLINE__) |