diff options
author | pradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-11-12 01:17:57 +0000 |
---|---|---|
committer | pradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2002-11-12 01:17:57 +0000 |
commit | 38b9c5540092225386e44e1e6c6323565cbbed3b (patch) | |
tree | 2efb9aecae08be85c871afc3cdc5a1448fdea2cf /TAO/orbsvcs/orbsvcs/Notify | |
parent | 6e88876b221cc28f52e7a00d48c6d0f7b3537851 (diff) | |
download | ATCD-38b9c5540092225386e44e1e6c6323565cbbed3b.tar.gz |
Mon Nov 11 20:11:56 2002 Pradeep Gore <pradeep@oomworks.com>
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify')
41 files changed, 763 insertions, 505 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h index 0a54fe04ee6..9beb79a6c68 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/CosEC_ProxyPushConsumer.h @@ -55,7 +55,7 @@ public: protected: ///= CosNotifyChannelAdmin::ProxyPushConsumer methods - void push (const CORBA::Any & data ACE_ENV_ARG_DECL) + virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, CosEventComm::Disconnected diff --git a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h index c63e4dd2a61..41b2e895840 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Any/ProxyPushConsumer.h @@ -60,7 +60,7 @@ protected: CORBA::SystemException )); - void push (const CORBA::Any & data ACE_ENV_ARG_DECL) + virtual void push (const CORBA::Any & data ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException, CosEventComm::Disconnected diff --git a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp index 54d3f2e4aab..b2dcaa416c4 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Builder.cpp @@ -36,7 +36,6 @@ ACE_RCSID(RT_Notify, TAO_NS_Builder, "$Id$") #include "Structured/StructuredProxyPushSupplier.h" #include "Sequence/SequenceProxyPushConsumer.h" #include "Sequence/SequenceProxyPushSupplier.h" -#include "Dispatch_Observer_T.h" #include "ETCL_FilterFactory.h" TAO_NS_Builder::TAO_NS_Builder (void) @@ -166,11 +165,6 @@ TAO_NS_Builder::build_event_channel (TAO_NS_EventChannelFactory* ecf, const CosN ec->event_manager_->init (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (ec_ret._retn ()); - /* Disable Retry Attempts */ - ec->event_manager_->event_dispatch_observer ()->max_retry_attempts (0); - ec->event_manager_->updates_dispatch_observer ()->max_retry_attempts (0); - /* Disable Retry Attempts */ - ec->set_qos (initial_qos ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (ec_ret._retn ()); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp index d2199303ca2..9924c38d391 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.cpp @@ -12,7 +12,6 @@ ACE_RCSID(RT_Notify, TAO_NS_Consumer, "$Id$") #include "ace/Unbounded_Queue.h" #include "tao/debug.h" #include "ProxySupplier.h" -#include "Dispatch_Observer_T.h" #include "Proxy.h" #include "Admin.h" #include "EventChannel.h" @@ -20,7 +19,7 @@ ACE_RCSID(RT_Notify, TAO_NS_Consumer, "$Id$") #include "Notify_Service.h" TAO_NS_Consumer::TAO_NS_Consumer (TAO_NS_ProxySupplier* proxy) - :proxy_ (proxy), event_dispatch_observer_ (0), event_collection_ (0), is_suspended_ (0) + :proxy_ (proxy), event_collection_ (0), is_suspended_ (0) { this->event_collection_ = new TAO_NS_Event_Collection (); } @@ -41,44 +40,29 @@ TAO_NS_Consumer::push (const TAO_NS_Event_var &event ACE_ENV_ARG_DECL) { if (this->is_suspended_ == 1) // If we're suspended, queue for later delivery. { - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - this->event_collection_->enqueue_head (event); - } + ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); + this->event_collection_->enqueue_head (event); + + return; } ACE_TRY { this->push_i (event ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - - if (this->event_dispatch_observer_ != 0) - { - this->event_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER); - - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - this->retry_count_ = 0; - } + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; } ACE_CATCHANY { - if (TAO_debug_level > 0) - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Consumer::push: error sending event. informing dispatch observer\n "); - } - //ACE_RE_THROW; - - if (this->event_dispatch_observer_ != 0) - { - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - - ++this->retry_count_; - this->event_collection_->enqueue_head (event); - } - - this->event_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER); - } } ACE_ENDTRY; } @@ -119,15 +103,9 @@ TAO_NS_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL) } void -TAO_NS_Consumer::dispatch_updates_i (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed +TAO_NS_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) { - CosNotification::EventTypeSeq cos_added; - CosNotification::EventTypeSeq cos_removed; - - added.populate (cos_added); - removed.populate (cos_removed); - if (!CORBA::is_nil (this->publish_.in ())) - this->publish_->offer_change (cos_added, cos_removed ACE_ENV_ARG_PARAMETER); + this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h index c70f93b7a3d..0a72eaadce0 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.h @@ -58,9 +58,6 @@ public: /// Push <event> to this consumer. virtual void push (const CosNotification::StructuredEvent& event ACE_ENV_ARG_DECL) = 0; - /// Set Observer. - void event_dispatch_observer (TAO_NS_Event_Dispatch_Observer* event_dispatch_observer); - /// Dispatch the pending events void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL); @@ -74,23 +71,20 @@ public: void resume (ACE_ENV_SINGLE_ARG_DECL); protected: - /// Get the shared Proxy Lock - TAO_SYNCH_MUTEX* proxy_lock (void); + // Dispatch updates + virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, + const CosNotification::EventTypeSeq& removed + ACE_ENV_ARG_DECL); /// Push Implementation. virtual void push_i (const TAO_NS_Event_var& event ACE_ENV_ARG_DECL) = 0; - // Dispatch updates implementation. - virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added, - const TAO_NS_EventTypeSeq & removed - ACE_ENV_ARG_DECL); + /// Get the shared Proxy Lock + TAO_SYNCH_MUTEX* proxy_lock (void); /// The Proxy that we associate with. TAO_NS_ProxySupplier* proxy_; - /// Observer - TAO_NS_Event_Dispatch_Observer* event_dispatch_observer_; - /// Events pending to be delivered. TAO_NS_Event_Collection* event_collection_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl index aa56593f7ab..d2a5a104c85 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Consumer.inl @@ -14,12 +14,6 @@ TAO_NS_Consumer::proxy_supplier (void) return this->proxy_; } -ACE_INLINE void -TAO_NS_Consumer::event_dispatch_observer (TAO_NS_Event_Dispatch_Observer* event_dispatch_observer) -{ - this->event_dispatch_observer_ = event_dispatch_observer; -} - ACE_INLINE CORBA::Boolean TAO_NS_Consumer::is_suspended (void) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp index ccee7778d4f..13d1e1f5168 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventType.cpp @@ -121,3 +121,10 @@ TAO_NS_EventType::is_special (void) const else return 0; } + +void +TAO_NS_EventType::dump (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%s,%s)", this->event_type_.domain_name.in (), this->event_type_.type_name.in ())); +} + diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventType.h b/TAO/orbsvcs/orbsvcs/Notify/EventType.h index 5bce8b9f532..231ec34effa 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventType.h +++ b/TAO/orbsvcs/orbsvcs/Notify/EventType.h @@ -66,6 +66,9 @@ public: const CosNotification::EventType& native (void) const; // Get the type underneath us. + /// Helper to print contents. + void dump (void); + protected: /// Init this object. void init_i (const char* domain_name, const char* type_name); diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp index 750253c0982..30be26405fc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.cpp @@ -32,6 +32,31 @@ TAO_NS_EventTypeSeq::populate (CosNotification::EventTypeSeq& event_type_seq) co } void +TAO_NS_EventTypeSeq::populate_no_special (CosNotification::EventTypeSeq& event_type_seq) const +{ + // If the special exists in us, don't include it. + const TAO_NS_EventType& special = TAO_NS_EventType::special (); + + if (this->find (special) == 0) + { + event_type_seq.length (this->size () - 1); + } + else + event_type_seq.length (this->size ()); + + inherited::CONST_ITERATOR iter (*this); + + TAO_NS_EventType* event_type; + + CORBA::ULong i = 0; + for (iter.first (); iter.next (event_type); iter.advance (), ++i) + { + if (event_type->is_special () == 0) // if its not the special event type. + event_type_seq[i] = event_type->native (); + } +} + +void TAO_NS_EventTypeSeq::insert_seq (const CosNotification::EventTypeSeq& event_type_seq) { TAO_NS_EventType event_type; @@ -78,63 +103,121 @@ TAO_NS_EventTypeSeq::remove_seq (const TAO_NS_EventTypeSeq& event_type_seq) } void -TAO_NS_EventTypeSeq::init (TAO_NS_EventTypeSeq& seq_added, TAO_NS_EventTypeSeq& seq_remove_seq) +TAO_NS_EventTypeSeq::init (TAO_NS_EventTypeSeq& seq_added, TAO_NS_EventTypeSeq& seq_remove) { const TAO_NS_EventType& special = TAO_NS_EventType::special (); - if (this->find (special) == 0) + if (this->find (special) == 0) // If this object has the special type. { - if (seq_added.find (special) == 0) + if (seq_added.find (special) == 0) // if the seq. being added has the special type, you cannot be adding or removing anythings. * overrides. { - seq_added.reset (); - seq_remove_seq.reset (); + seq_added.reset (); // remove everything from the sequence bening added. + seq_remove.reset (); // remove everything form whats being removed. } - else + else // sequence being added does not have * { - this->reset (); - this->insert_seq (seq_added); + this->reset (); // take away the * from this object. + this->insert_seq (seq_added); // insert the sequence being added as the new list of types. - seq_remove_seq.reset (); - seq_remove_seq.insert (special); + seq_remove.reset (); // reset all that is being removed. + seq_remove.insert (special); // remove * } } - else + else // if this object does not have the special type. { - if (seq_added.find (special) == 0) + if (seq_added.find (special) == 0) // but the seq. being added has the special type, { - if (seq_remove_seq.find (special) == 0) + if (seq_remove.find (special) == 0) // and you're removing * as well { - seq_added.reset (); - seq_remove_seq.reset (); + seq_added.reset (); // ignore the request + seq_remove.reset (); // ignore the request } - else + else // seq being removed does not have the special type { - seq_remove_seq.reset (); - seq_remove_seq.insert_seq (*this); + seq_remove.reset (); // everything that we're subscribed for is being removed. + seq_remove.insert_seq (*this); - this->reset (); + this->reset (); // insert the special in this object. this->insert (special); - seq_added.reset (); + seq_added.reset (); // also clear our set and add only * seq_added.insert (special); } } - else + else // seq being added does not have special. { - if (seq_remove_seq.find (special) == 0) + if (seq_remove.find (special) == 0) // but we're removing everything. { - - seq_remove_seq.reset (); - seq_remove_seq.insert_seq (*this); - - this->reset (); - this->insert_seq (seq_added); + seq_remove.reset (); // move all that we have currently to removed. + seq_remove.insert_seq (*this); } - else + + // so now there are no specials anywhere.. { + //= First remove the duplicates in the added and removes lists. + // compute the intersection. + + TAO_NS_EventTypeSeq common; + common.intersection (seq_added, seq_remove); + + // remove the common elements from both the lists so Added {BCDK} and Removed {CDEA} will yield Added {BK} and Removed {EA} + seq_added.remove_seq (common); + seq_remove.remove_seq (common); + + // If we're already subscribed for an element we should not subscribe again (duplicate events). + // so if we currently subscribe for ABC and we Added {BK} we should now get ABCK as current subscription and Added {K} + common.reset (); + common.intersection (*this, seq_added); + // remove the common elements from the added list. i,e. doent ask to add what we're already added for. + seq_added.remove_seq (common); + // update the current subscription. this->insert_seq (seq_added); - this->remove_seq (seq_remove_seq); + + + // Similarly for removed.. if we're removing EA and now our current list looks like ABC we should emd up with + // current subscription BC and Removed {A} + common.reset (); + common.intersection (*this, seq_remove); + + seq_remove.reset (); + seq_remove.insert_seq (common); // only remove what we currently have. + + this->remove_seq (seq_remove); } } } } + +void +TAO_NS_EventTypeSeq::intersection (const TAO_NS_EventTypeSeq& rhs, const TAO_NS_EventTypeSeq& lhs) +{ + // linear search. + TAO_NS_EventTypeSeq::CONST_ITERATOR rhs_iter (rhs); + TAO_NS_EventType* rhs_event_type; + + TAO_NS_EventTypeSeq::CONST_ITERATOR lhs_iter (lhs); + TAO_NS_EventType* lhs_event_type; + + for (rhs_iter.first (); rhs_iter.next (rhs_event_type); rhs_iter.advance ()) + { + for (lhs_iter.first (); lhs_iter.next (lhs_event_type); lhs_iter.advance ()) + { + if (*rhs_event_type == *lhs_event_type) // if both are same add to this object. + this->insert (*rhs_event_type); + } + } +} + +void +TAO_NS_EventTypeSeq::dump (void) +{ + TAO_NS_EventTypeSeq::CONST_ITERATOR iter (*this); + + TAO_NS_EventType* event_type; + + for (iter.first (); iter.next (event_type); iter.advance ()) + { + event_type->dump (); + ACE_DEBUG ((LM_DEBUG, ", ")); + } +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h index e4676c70a57..5104184f5f6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h +++ b/TAO/orbsvcs/orbsvcs/Notify/EventTypeSeq.h @@ -41,6 +41,9 @@ public: /// Preprocess the types added and removed. void init (TAO_NS_EventTypeSeq& added, TAO_NS_EventTypeSeq& removed); + /// Populate this sequence with the intersection of rhs and lhs. + void intersection (const TAO_NS_EventTypeSeq& rhs, const TAO_NS_EventTypeSeq& lhs); + /// insert_seq the contents of <event_type_seq> into this object. void insert_seq (const CosNotification::EventTypeSeq& event_type_seq); @@ -55,6 +58,13 @@ public: /// Populate <event_type_seq> with the contents of this object. void populate (CosNotification::EventTypeSeq& event_type) const; + + /// Populate <event_type_seq> with the contents of this object. + // Excludes the special event type. This is used to avoid sending * type updates to proxys. + void populate_no_special (CosNotification::EventTypeSeq& event_type) const; + + /// Print the contents. + void dump (void); }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp index ea3736b1074..b73557e9d57 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.cpp @@ -23,16 +23,10 @@ ACE_RCSID(RT_Notify, TAO_NS_Event_Manager, "$Id$") #include "EventChannel.h" #include "EventChannelFactory.h" #include "Notify_Service.h" -#include "Pending_Worker_T.h" -#include "Event_Map_Observer_T.h" -#include "Dispatch_Observer_T.h" #include "Event_Map_T.h" TAO_NS_Event_Manager::TAO_NS_Event_Manager (void) - :consumer_map_ (0), supplier_map_ (0), - consumer_map_observer_ (0), supplier_map_observer_ (0), - event_dispatch_observer_(0), updates_dispatch_observer_ (0), - event_pending_worker_ (0), updates_pending_worker_ (0) + :consumer_map_ (0), supplier_map_ (0) { } @@ -41,17 +35,11 @@ TAO_NS_Event_Manager::~TAO_NS_Event_Manager () if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, "destroying consumer/supplier map count = %d/%d, \n", - this->consumer_map_->event_type_count (), this->supplier_map_->event_type_count ())); + this->consumer_map_->proxy_count (), this->supplier_map_->proxy_count ())); } delete this->consumer_map_; delete this->supplier_map_; - delete this->consumer_map_observer_; - delete this->supplier_map_observer_; - delete this->event_dispatch_observer_; - delete this->updates_dispatch_observer_; - delete this->event_pending_worker_; - delete this->updates_pending_worker_; } void @@ -67,77 +55,87 @@ TAO_NS_Event_Manager::init (ACE_ENV_SINGLE_ARG_DECL) CORBA::NO_MEMORY ()); ACE_CHECK; - ACE_NEW_THROW_EX (this->consumer_map_observer_, - TAO_NS_Consumer_Map_Observer (), - CORBA::NO_MEMORY ()); + this->consumer_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; - - ACE_NEW_THROW_EX (this->supplier_map_observer_, - TAO_NS_Supplier_Map_Observer (), - CORBA::NO_MEMORY ()); + this->supplier_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; +} - ACE_NEW_THROW_EX (this->event_dispatch_observer_, - TAO_NS_Event_Dispatch_Observer (), - CORBA::NO_MEMORY ()); - ACE_CHECK; +void +TAO_NS_Event_Manager::shutdown (void) +{ +} - ACE_NEW_THROW_EX (this->updates_dispatch_observer_, - TAO_NS_Updates_Dispatch_Observer (), - CORBA::NO_MEMORY ()); - ACE_CHECK; +void +TAO_NS_Event_Manager::connect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL) +{ + this->consumer_map_->connect (proxy_supplier ACE_ENV_ARG_PARAMETER); - ACE_NEW_THROW_EX (this->event_pending_worker_, - TAO_NS_Event_Pending_Worker (), - CORBA::NO_MEMORY ()); - ACE_CHECK; + // Inform about offered types. + TAO_NS_EventTypeSeq removed; + proxy_supplier->types_changed (this->offered_types (), removed ACE_ENV_ARG_PARAMETER); +} - ACE_NEW_THROW_EX (this->updates_pending_worker_, - TAO_NS_Updates_Pending_Worker (), - CORBA::NO_MEMORY ()); - ACE_CHECK; +void +TAO_NS_Event_Manager::disconnect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL) +{ + this->consumer_map_->disconnect (proxy_supplier ACE_ENV_ARG_PARAMETER); +} - this->consumer_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; +void +TAO_NS_Event_Manager::connect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL) +{ + this->supplier_map_->connect (proxy_consumer ACE_ENV_ARG_PARAMETER); - this->supplier_map_->init (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; + // Inform about subscription types. + TAO_NS_EventTypeSeq removed; + proxy_consumer->types_changed (this->subscription_types (), removed ACE_ENV_ARG_PARAMETER); +} - this->event_dispatch_observer_->init (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; +void +TAO_NS_Event_Manager::disconnect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL) +{ + this->supplier_map_->disconnect (proxy_consumer ACE_ENV_ARG_PARAMETER); +} - this->updates_dispatch_observer_->init (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; +void +TAO_NS_Event_Manager::offer_change (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL) +{ + TAO_NS_EventTypeSeq new_added, last_removed; - this->consumer_map_observer_->init (this->supplier_map_, this->updates_dispatch_observer_); + this->publish (proxy_consumer, added, new_added ACE_ENV_ARG_PARAMETER); ACE_CHECK; - this->supplier_map_observer_->init (this->consumer_map_, this->updates_dispatch_observer_); + this->un_publish (proxy_consumer, removed, last_removed ACE_ENV_ARG_PARAMETER); ACE_CHECK; - this->consumer_map_->attach_observer (this->consumer_map_observer_); - this->supplier_map_->attach_observer (this->supplier_map_observer_); - - this->event_pending_worker_->init (this->event_dispatch_observer_, TAO_NS_PROPERTIES::instance()->update_period () ACE_ENV_ARG_PARAMETER); - ACE_CHECK; + TAO_NS_Consumer_Map::ENTRY::COLLECTION* updates_collection = this->consumer_map_->updates_collection (); - if (TAO_NS_PROPERTIES::instance()->updates () == 0) - this->updates_pending_worker_->worker_suspend (); + TAO_NS_ProxySupplier_Update_Worker worker (new_added, last_removed); - this->updates_pending_worker_->init (this->updates_dispatch_observer_, TAO_NS_PROPERTIES::instance()->update_period () ACE_ENV_ARG_PARAMETER); - ACE_CHECK; + if (updates_collection != 0) + updates_collection->for_each (&worker ACE_ENV_ARG_PARAMETER); } void -TAO_NS_Event_Manager::shutdown (void) +TAO_NS_Event_Manager::subscription_change (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL) { - this->event_pending_worker_->shutdown (); - this->updates_pending_worker_->shutdown (); + TAO_NS_EventTypeSeq new_added, last_removed; + + this->subscribe (proxy_supplier, added, new_added ACE_ENV_ARG_PARAMETER); + this->un_subscribe (proxy_supplier, removed, last_removed ACE_ENV_ARG_PARAMETER); + + TAO_NS_Supplier_Map::ENTRY::COLLECTION* updates_collection = this->supplier_map_->updates_collection (); + + TAO_NS_ProxyConsumer_Update_Worker worker (new_added, last_removed); + + if (updates_collection != 0) + updates_collection->for_each (&worker ACE_ENV_ARG_PARAMETER); } void -TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL) +TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL) { TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq); @@ -145,13 +143,16 @@ TAO_NS_Event_Manager::subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO for (iter.first (); iter.next (event_type) != 0; iter.advance ()) { - consumer_map_->insert (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER); + int result = consumer_map_->insert (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER); ACE_CHECK; + + if (result == 1) + new_seq.insert (*event_type); } } void -TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL) +TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL) { TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq); @@ -159,13 +160,16 @@ TAO_NS_Event_Manager::un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const for (iter.first (); iter.next (event_type) != 0; iter.advance ()) { - consumer_map_->remove (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER); + int result = consumer_map_->remove (proxy_supplier, *event_type ACE_ENV_ARG_PARAMETER); ACE_CHECK; + + if (result == 1) + last_seq.insert (*event_type); } } void -TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL) +TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL) { TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq); @@ -173,14 +177,16 @@ TAO_NS_Event_Manager::publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_N for (iter.first (); iter.next (event_type) != 0; iter.advance ()) { - supplier_map_->insert (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER); + int result = supplier_map_->insert (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER); ACE_CHECK; - } + if (result == 1) + new_seq.insert (*event_type); + } } void -TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL) +TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL) { TAO_NS_EventTypeSeq::CONST_ITERATOR iter (seq); @@ -188,8 +194,11 @@ TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TA for (iter.first (); iter.next (event_type) != 0; iter.advance ()) { - supplier_map_->remove (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER); + int result = supplier_map_->remove (proxy_consumer, *event_type ACE_ENV_ARG_PARAMETER); ACE_CHECK; + + if (result == 1) + last_seq.insert (*event_type); } } @@ -198,22 +207,6 @@ TAO_NS_Event_Manager::un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TA template class TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX>; template class TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX>; -template class TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer>; -template class TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier>; - -template class TAO_NS_Dispatch_Observer_T<TAO_NS_Peer>; -template class TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer>; - -template class TAO_NS_Pending_Worker_T<TAO_NS_Peer>; -template class TAO_NS_Pending_Worker_T<TAO_NS_Consumer>; - -template class TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Consumer>; -template class TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Peer>; - -template class TAO_NS_Update_Removed_Worker<TAO_NS_ProxyConsumer>; -template class TAO_NS_Update_Removed_Worker<TAO_NS_ProxySupplier>; -template class TAO_NS_Update_Added_Worker<TAO_NS_ProxyConsumer>; - template class ACE_Hash<TAO_NS_EventType>; template class ACE_Equal_To<TAO_NS_EventType>; @@ -230,8 +223,6 @@ template class TAO_NS_Object_Find_Worker_T<TAO_NS_Proxy>; template class TAO_NS_Object_Find_Worker_T<TAO_NS_Admin>; template class TAO_NS_Object_Find_Worker_T<TAO_NS_EventChannel>; -template class TAO_NS_Update_Added_Worker<TAO_NS_ProxySupplier>; - template class ACE_Unbounded_Set<TAO_NS_EventType>; template class ACE_Unbounded_Set_Const_Iterator<TAO_NS_EventType>; template class ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<TAO_NS_Event, TAO_SYNCH_MUTEX> >; @@ -287,22 +278,6 @@ template class TAO_ESF_Shutdown_Proxy<TAO_NS_Proxy>; #pragma instantiate TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX> #pragma instantiate TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX> -#pragma instantiate TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer> -#pragma instantiate TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier> - -#pragma instantiate TAO_NS_Dispatch_Observer_T<TAO_NS_Peer> -#pragma instantiate TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer> - -#pragma instantiate TAO_NS_Pending_Worker_T<TAO_NS_Peer> -#pragma instantiate TAO_NS_Pending_Worker_T<TAO_NS_Consumer> - -#pragma instantiate TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Consumer> -#pragma instantiate TAO_NS_Dispatch_Pending_Worker_T<TAO_NS_Peer> - -#pragma instantiate TAO_NS_Update_Removed_Worker<TAO_NS_ProxyConsumer> -#pragma instantiate TAO_NS_Update_Removed_Worker<TAO_NS_ProxySupplier> -#pragma instantiate TAO_NS_Update_Added_Worker<TAO_NS_ProxyConsumer> - #pragma instantiate ACE_Hash<TAO_NS_EventType> #pragma instantiate ACE_Equal_To<TAO_NS_EventType> @@ -319,8 +294,6 @@ template class TAO_ESF_Shutdown_Proxy<TAO_NS_Proxy>; #pragma instantiate TAO_NS_Object_Find_Worker_T<TAO_NS_Admin> #pragma instantiate TAO_NS_Object_Find_Worker_T<TAO_NS_EventChannel> -#pragma instantiate TAO_NS_Update_Added_Worker<TAO_NS_ProxySupplier> - #pragma instantiate ACE_Unbounded_Set<TAO_NS_EventType> #pragma instantiate ACE_Unbounded_Set_Const_Iterator<TAO_NS_EventType> #pragma instantiate ACE_Unbounded_Queue<ACE_Refcounted_Auto_Ptr<TAO_NS_Event, TAO_SYNCH_MUTEX> > diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h index eddef6ed5b6..5da8ed87089 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.h @@ -20,7 +20,8 @@ #endif /* ACE_LACKS_PRAGMA_ONCE */ #include "Types.h" - +#include "EventTypeSeq.h" +#include "orbsvcs/ESF/ESF_Worker.h" /** * @class TAO_NS_Event_Manager @@ -43,54 +44,98 @@ public: /// Init void shutdown (void); - /// Subscribe <proxy_supplier> to the event type sequence list <seq>. - void subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL); + /// Connect ProxySupplier + void connect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL); - /// Unsubscribe <proxy_supplier> to the event type sequence list <seq>. - void un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL); + /// Disconnect ProxySupplier + void disconnect (TAO_NS_ProxySupplier* proxy_supplier ACE_ENV_ARG_DECL); - /// Subscribe <proxy_consumer> to the event type sequence list <seq>. - void publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL); + /// Connect ProxyConsumer + void connect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL); - /// Subscribe <proxy_consumer> to the event type sequence list <seq>. - void un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq ACE_ENV_ARG_DECL); + /// Disconnect ProxyConsumer + void disconnect (TAO_NS_ProxyConsumer* proxy_consumer ACE_ENV_ARG_DECL); /// Map accessors. TAO_NS_Consumer_Map* consumer_map (void); TAO_NS_Supplier_Map* supplier_map (void); - /// Event Dispatch Observer - TAO_NS_Event_Dispatch_Observer* event_dispatch_observer (void); + /// Offer change received on <proxy_consumer>. + void offer_change (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL); - /// Update dispatch observer. - TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer (void); + /// Subscription change received on <proxy_supplier>. + void subscription_change (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL); + + /// What are the types being offered. + const TAO_NS_EventTypeSeq& offered_types (void); + + /// What are the types being subscribed. + const TAO_NS_EventTypeSeq& subscription_types (void); protected: + /// Subscribe <proxy_supplier> to the event type sequence list <seq>. + void subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL); + + /// Unsubscribe <proxy_supplier> to the event type sequence list <seq>. + void un_subscribe (TAO_NS_ProxySupplier* proxy_supplier, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL); + + /// Subscribe <proxy_consumer> to the event type sequence list <seq>. + void publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& new_seq ACE_ENV_ARG_DECL); + + /// Subscribe <proxy_consumer> to the event type sequence list <seq>. + void un_publish (TAO_NS_ProxyConsumer* proxy_consumer, const TAO_NS_EventTypeSeq& seq, TAO_NS_EventTypeSeq& last_seq ACE_ENV_ARG_DECL); + /// Consumer Map TAO_NS_Consumer_Map* consumer_map_; /// Supplier Map TAO_NS_Supplier_Map* supplier_map_; +}; + +/********************************************************************************/ - /// Consumer Map Observer - TAO_NS_Consumer_Map_Observer* consumer_map_observer_; +/** + * @class TAO_NS_ProxyConsumer_Update_Worker + * + * @brief Inform ProxyConsumer of updates. + * + */ +class TAO_Notify_Export TAO_NS_ProxyConsumer_Update_Worker : public TAO_ESF_Worker<TAO_NS_ProxyConsumer> +{ +public: + TAO_NS_ProxyConsumer_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed); - /// Supplier Map Observer - TAO_NS_Supplier_Map_Observer* supplier_map_observer_; +protected: + ///= TAO_ESF_Worker method + void work (TAO_NS_ProxyConsumer* proxy ACE_ENV_ARG_DECL); - /// Event Dispatch Observer. - TAO_NS_Event_Dispatch_Observer* event_dispatch_observer_; + const TAO_NS_EventTypeSeq& added_; + const TAO_NS_EventTypeSeq& removed_; +}; - /// Update dispatch observer. - TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer_; +/********************************************************************************/ - /// Worker task that dispatches pending events. - TAO_NS_Event_Pending_Worker* event_pending_worker_; +/** + * @class TAO_NS_ProxySupplier_Update_Worker + * + * @brief Inform ProxySupplier of updates. + * + */ +class TAO_Notify_Export TAO_NS_ProxySupplier_Update_Worker : public TAO_ESF_Worker<TAO_NS_ProxySupplier> +{ +public: + TAO_NS_ProxySupplier_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed); + +protected: + ///= TAO_ESF_Worker method + void work (TAO_NS_ProxySupplier* proxy ACE_ENV_ARG_DECL); - /// Worker task that dispatches pending update messges. - TAO_NS_Updates_Pending_Worker* updates_pending_worker_; + const TAO_NS_EventTypeSeq& added_; + const TAO_NS_EventTypeSeq& removed_; }; +/********************************************************************************/ + #if defined (__ACE_INLINE__) #include "Event_Manager.inl" #endif /* __ACE_INLINE__ */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl index f988e9d2914..3ad395f331a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Manager.inl @@ -1,25 +1,57 @@ // $Id$ +#include "ProxyConsumer.h" +#include "ProxySupplier.h" +#include "Event_Map_T.h" + ACE_INLINE TAO_NS_Consumer_Map* TAO_NS_Event_Manager::consumer_map (void) { - return consumer_map_; + return this->consumer_map_; } ACE_INLINE TAO_NS_Supplier_Map* TAO_NS_Event_Manager::supplier_map (void) { - return supplier_map_; + return this->supplier_map_; +} + +ACE_INLINE const TAO_NS_EventTypeSeq& +TAO_NS_Event_Manager::offered_types (void) +{ + return this->supplier_map_->event_types (); +} + +ACE_INLINE const TAO_NS_EventTypeSeq& +TAO_NS_Event_Manager::subscription_types (void) +{ + return this->consumer_map_->event_types (); +} + +/********************************************************************************/ + +ACE_INLINE TAO_NS_ProxyConsumer_Update_Worker::TAO_NS_ProxyConsumer_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed) + :added_ (added), removed_ (removed) +{ +} + +ACE_INLINE void +TAO_NS_ProxyConsumer_Update_Worker::work (TAO_NS_ProxyConsumer* proxy ACE_ENV_ARG_DECL) +{ + proxy->types_changed (added_, removed_ ACE_ENV_ARG_PARAMETER); } -ACE_INLINE TAO_NS_Event_Dispatch_Observer* -TAO_NS_Event_Manager::event_dispatch_observer (void) +/********************************************************************************/ + +ACE_INLINE TAO_NS_ProxySupplier_Update_Worker::TAO_NS_ProxySupplier_Update_Worker (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed) + :added_ (added), removed_ (removed) { - return this->event_dispatch_observer_; } -ACE_INLINE TAO_NS_Updates_Dispatch_Observer* -TAO_NS_Event_Manager::updates_dispatch_observer (void) +ACE_INLINE void +TAO_NS_ProxySupplier_Update_Worker::work (TAO_NS_ProxySupplier* proxy ACE_ENV_ARG_DECL) { - return this->updates_dispatch_observer_; + proxy->types_changed (added_, removed_ ACE_ENV_ARG_PARAMETER); } + +/********************************************************************************/ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp index 546486af2c6..c97cba0453c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.cpp @@ -8,7 +8,6 @@ #include "Event_Map_Entry_T.h" #include "Properties.h" #include "Factory.h" -#include "Event_Map_Observer.h" #if ! defined (__ACE_INLINE__) #include "Event_Map_T.inl" @@ -18,7 +17,7 @@ ACE_RCSID(RT_Notify, TAO_NS_Event_Map_T, "$Id$") template <class PROXY, class ACE_LOCK> TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::TAO_NS_Event_Map_T (void) - :event_type_count_ (0), observer_ (0) + :proxy_count_ (0) { } @@ -32,8 +31,30 @@ template <class PROXY, class ACE_LOCK> void TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::init (ACE_ENV_SINGLE_ARG_DECL) { this->broadcast_entry_.init (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + + this->updates_entry_.init (ACE_ENV_SINGLE_ARG_PARAMETER); +} + +template <class PROXY, class ACE_LOCK> void +TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::connect (PROXY* proxy ACE_ENV_ARG_DECL) +{ + this->updates_entry_.connected (proxy ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_WRITE_GUARD (ACE_LOCK, ace_mon, this->lock_); + ++this->proxy_count_; } +template <class PROXY, class ACE_LOCK> void +TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::disconnect (PROXY* proxy ACE_ENV_ARG_DECL) +{ + this->updates_entry_.disconnected (proxy ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + ACE_WRITE_GUARD (ACE_LOCK, ace_mon, this->lock_); + --this->proxy_count_; +} template <class PROXY, class ACE_LOCK> int TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL) @@ -45,6 +66,7 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp if (event_type.is_special () == 1) { entry = &this->broadcast_entry_; + result = 0; } else @@ -54,7 +76,7 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp result = this->map_.find (event_type, entry); } - if (result == -1) + if (result == -1) // This type is being seen for the first time. { ACE_NEW_THROW_EX (entry, ENTRY (), @@ -72,19 +94,18 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::insert (PROXY* proxy, const TAO_NS_EventTyp if (map_.bind (event_type, entry) == -1) ACE_THROW_RETURN (CORBA::NO_MEMORY (), -1); - if (this->observer_ != 0) - this->observer_->type_added (event_type ACE_ENV_ARG_PARAMETER); + if (this->event_types_.insert (event_type) == -1) + return -1; - return ++event_type_count_; + return 1; } - else + else // Add to existing entry or the broadcast entry. { entry->connected (proxy ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); - - ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1); - return ++event_type_count_; } + + return 0; } template <class PROXY, class ACE_LOCK> int @@ -92,42 +113,50 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::remove (PROXY* proxy, const TAO_NS_EventTyp { ENTRY* entry; - int result = -1; - if (event_type.is_special () == 1) { entry = &this->broadcast_entry_; - result = 0; + + entry->disconnected (proxy ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); } else { - ACE_READ_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1); - - result = this->map_.find (event_type, entry); - } + int result = -1; - if (result == 0) - { - entry->disconnected (proxy ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (-1); + { + ACE_READ_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1); - ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1); + result = this->map_.find (event_type, entry); + } - if (entry->count () == 0) + if (result == 0) { - if (this->observer_ != 0) - this->observer_->type_removed (event_type ACE_ENV_ARG_PARAMETER); + entry->disconnected (proxy ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); - /// @@TODO: Exec a strategy for removing entries. - /// Strategy 1: remove_immediately - /// Strategy 2: remove_bunch_after_threshold - /// Strategy 3: use cached allocator and 1 - } + if (entry->count () == 0) + { + /// Exec a strategy for removing entries. + /// Strategy 1: remove_immediately + /// Strategy 2: remove a bunch_after crossing a threshold + /// Strategy 3: use cached allocator and 1 - return --event_type_count_; + // Strategy 1: + ACE_WRITE_GUARD_RETURN (ACE_LOCK, ace_mon, this->lock_, -1); + + this->map_.unbind (event_type); + delete entry; + + if (this->event_types_.remove (event_type) == -1) + return -1; + + return 1; + } + } } - return -1; + return 0; } #endif /* TAO_NS_EVENT_MAP_T_C */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h index 8a2fb014707..b32f7f5861e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.h @@ -24,8 +24,6 @@ #include "EventType.h" #include "Event_Map_Entry_T.h" -class TAO_NS_Event_Map_Observer; - /** * @class TAO_NS_Event_Map_T * @@ -38,7 +36,7 @@ class TAO_NS_Event_Map_T public: typedef TAO_NS_Event_Map_Entry_T<PROXY> ENTRY; - + /// Constuctor TAO_NS_Event_Map_T (void); @@ -48,13 +46,20 @@ public: /// Init void init (ACE_ENV_SINGLE_ARG_DECL); - /// Attach an Observer. - void attach_observer (TAO_NS_Event_Map_Observer* observer); + /// Connect a PROXY + void connect (PROXY* proxy ACE_ENV_ARG_DECL); + + /// Disconnect a PROXY + void disconnect (PROXY* proxy ACE_ENV_ARG_DECL); - /// Associate PROXY and event_type. returns count of PROXYs. + /// Associate PROXY and event_type. + /// Returns 1 if <event_type> is being seem for the 1st time otherwise returns 0. + /// Returns -1 on error. int insert (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL); - /// Remove association of PROXY and event_type. returns count of PROXYs. + /// Remove association of PROXY and event_type. + /// Returns 1 if <event_type> is being seem for the last time otherwise returns 0. + /// Returns -1 on error. int remove (PROXY* proxy, const TAO_NS_EventType& event_type ACE_ENV_ARG_DECL); /// Find the collection mapped to the <event_type> @@ -63,8 +68,14 @@ public: /// Find the default broadcast list. ACE_TYPENAME ENTRY::COLLECTION* broadcast_collection (void); - /// Access count, number of different event types in the map. - int event_type_count (void); + /// Find the update list. This is all the PROXYS connected to this Map. + ACE_TYPENAME ENTRY::COLLECTION* updates_collection (void); + + /// Access all the event types available + const TAO_NS_EventTypeSeq& event_types (void); + + /// Access number of proxys connected in all. + int proxy_count (void); protected: /// The Map that stores eventtype to entry mapping. @@ -73,14 +84,17 @@ protected: /// The lock to use. ACE_LOCK lock_; - /// Count of items entered in the map. - int event_type_count_; + /// Count of proxys connected. + int proxy_count_; /// The default broadcast list for EventType::special. ENTRY broadcast_entry_; - /// Observer attached to us. - TAO_NS_Event_Map_Observer* observer_; + /// Update Entry - Keeps a list of all PROXY's connected to this Map. Updates are send to this list. + ENTRY updates_entry_; + + /// The event types that are available in this map. + TAO_NS_EventTypeSeq event_types_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl index d9f1ed75d56..7ea2cedce8c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Event_Map_T.inl @@ -19,14 +19,20 @@ TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::broadcast_collection (void) return this->broadcast_entry_.collection (); } -template <class PROXY, class ACE_LOCK> ACE_INLINE void -TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::attach_observer (TAO_NS_Event_Map_Observer* observer) +template <class PROXY, class ACE_LOCK> ACE_INLINE ACE_TYPENAME TAO_NS_Event_Map_Entry_T<PROXY>::COLLECTION* +TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::updates_collection (void) { - this->observer_ = observer; + return this->updates_entry_.collection (); } template <class PROXY, class ACE_LOCK> ACE_INLINE int -TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::event_type_count (void) +TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::proxy_count (void) +{ + return this->proxy_count_; +} + +template <class PROXY, class ACE_LOCK> ACE_INLINE const TAO_NS_EventTypeSeq& +TAO_NS_Event_Map_T<PROXY, ACE_LOCK>::event_types (void) { - return this->event_type_count_; + return this->event_types_; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp index b1178ba0030..e573e691f2a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Dispatch.cpp @@ -50,20 +50,18 @@ TAO_NS_Method_Request_Dispatch::execute (ACE_ENV_SINGLE_ARG_DECL) ACE_TRY { - this->proxy_supplier_->consumer ()->push (this->event_ ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + TAO_NS_Consumer* consumer = this->proxy_supplier_->consumer (); + + if (consumer != 0) + { + consumer->push (this->event_ ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } } - ACE_CATCH (CORBA::UserException, ue) - { - ACE_PRINT_EXCEPTION (ue, - "TAO_NS_Method_Request_Dispatch::: error sending event. "); - //ACE_RE_THROW; - } - ACE_CATCH (CORBA::SystemException, se) + ACE_CATCHANY { - ACE_PRINT_EXCEPTION (se, - "TAO_NS_Method_Request_Dispatch::: error sending event. "); - //ACE_RE_THROW; + if (TAO_debug_level > 0) + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Dispatch::: error sending event. \n "); } ACE_ENDTRY; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp new file mode 100644 index 00000000000..60049cf0bef --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.cpp @@ -0,0 +1,55 @@ +// $Id$ + +#include "Method_Request_Updates.h" + +#if ! defined (__ACE_INLINE__) +#include "Method_Request_Updates.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Notify, TAO_NS_Method_Request_Updates, "$id$") + +#include "tao/debug.h" +#include "Proxy.h" +#include "Peer.h" + +TAO_NS_Method_Request_Updates::TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy) + :added_ (added), removed_ (removed), proxy_ (proxy), refcountable_guard_ (*proxy) +{ +} + +TAO_NS_Method_Request_Updates::~TAO_NS_Method_Request_Updates () +{ +} + +TAO_NS_Method_Request* +TAO_NS_Method_Request_Updates::copy (void) +{ + /// @@use factory + return new TAO_NS_Method_Request_Updates (this->added_, this->removed_, this->proxy_); +} + +int +TAO_NS_Method_Request_Updates::execute (ACE_ENV_SINGLE_ARG_DECL) +{ + if (this->proxy_->has_shutdown ()) + return 0; // If we were shutdown while waiting in the queue, return with no action. + + ACE_TRY + { + TAO_NS_Peer* peer = this->proxy_->peer(); + + if (peer != 0) + { + peer->dispatch_updates (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + } + ACE_CATCHANY + { + if (TAO_debug_level > 0) + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_Method_Request_Updates::execute error sending updates\n "); + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h new file mode 100644 index 00000000000..00e68a21f18 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +/** + * @file Method_Request_Updates.h + * + * $Id$ + * + * @author Pradeep Gore <pradeep@oomworks.com> + * + * + */ + +#ifndef TAO_NS_METHOD_REQUEST_UPDATES_H +#define TAO_NS_METHOD_REQUEST_UPDATES_H +#include "ace/pre.h" + +#include "notify_export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "Method_Request.h" +#include "Types.h" +#include "EventTypeSeq.h" + +/** + * @class TAO_NS_Method_Request_Updates + * + * @brief + * + */ +class TAO_Notify_Export TAO_NS_Method_Request_Updates : public TAO_NS_Method_Request +{ +public: + /// Constuctor + TAO_NS_Method_Request_Updates (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed, TAO_NS_Proxy* proxy); + + /// Destructor + ~TAO_NS_Method_Request_Updates (); + + /// Create a copy of this object. + TAO_NS_Method_Request* copy (void); + + /// Execute the Request + virtual int execute (ACE_ENV_SINGLE_ARG_DECL); + +private: + /// The Updates + const TAO_NS_EventTypeSeq added_; + const TAO_NS_EventTypeSeq removed_; + + /// The proxy that will receive the updates. + TAO_NS_Proxy* proxy_; + + /// Guard to automatically inc/decr ref count on the proxy. + TAO_NS_Refcountable_Guard refcountable_guard_; +}; + +#if defined (__ACE_INLINE__) +#include "Method_Request_Updates.inl" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_NS_METHOD_REQUEST_UPDATES_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl new file mode 100644 index 00000000000..bf5cc3848c2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Method_Request_Updates.inl @@ -0,0 +1,3 @@ +// $Id$ + +#include "Method_Request_Updates.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h index e5bef05033d..89bf1bf98c9 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.h @@ -14,7 +14,6 @@ #define NOTIFY_EVENTCHANNELFACTORY_I_H #include "ace/pre.h" -#include "Notify_ID_Pool_T.h" #include "orbsvcs/CosNotifyChannelAdminS.h" #include "notify_export.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h index 925ad33185d..cdc3e80d223 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Signal_Property_T.h @@ -56,12 +56,8 @@ public: virtual int wait_for_change (const ACE_Time_Value* abstime); private: - ACE_UNIMPLEMENTED_FUNC ( - TAO_Notify_Signal_Property ( - const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs)) - ACE_UNIMPLEMENTED_FUNC ( - TAO_Notify_Signal_Property& operator= ( - const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs)) + ACE_UNIMPLEMENTED_FUNC (TAO_Notify_Signal_Property (const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs)) + ACE_UNIMPLEMENTED_FUNC (TAO_Notify_Signal_Property& operator= (const TAO_Notify_Signal_Property<ACE_LOCK, TYPE> &rhs)) ACE_Atomic_Op <ACE_LOCK, TYPE> value_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp index 57a8ac2cbaa..388aec1a7ca 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.cpp @@ -9,7 +9,6 @@ ACE_RCSID(Notify, TAO_NS_Peer, "$id$") #include "tao/debug.h" -#include "Dispatch_Observer_T.h" #include "Proxy.h" #include "Proxy.h" #include "Admin.h" @@ -18,7 +17,6 @@ ACE_RCSID(Notify, TAO_NS_Peer, "$id$") #include "Notify_Service.h" TAO_NS_Peer::TAO_NS_Peer (void) - :updates_dispatch_observer_ (0), retry_count_ (1) { } @@ -39,61 +37,75 @@ TAO_NS_Peer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) } void -TAO_NS_Peer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL) +TAO_NS_Peer::handle_dispatch_exception (ACE_ENV_SINGLE_ARG_DECL) { - TAO_NS_Proxy* proxy = this->proxy (); - TAO_NS_EventTypeSeq& added = proxy->added_; - TAO_NS_EventTypeSeq& removed = proxy->removed_; - - if (added.size () == 0 && removed.size () == 0) - return; // Return if nothing to send. - - TAO_NS_EventTypeSeq added_copy; - TAO_NS_EventTypeSeq removed_copy; - TAO_NS_Reverse_Lock unlock (proxy->lock_); - - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, proxy->lock_); - - added_copy = added; - removed_copy = removed; - added.reset (); - removed.reset (); + // Sever all association when a remote client misbehaves. Other strategies like reties are possible but not implemented. + this->proxy ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); +} +void +TAO_NS_Peer::dispatch_updates (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed ACE_ENV_ARG_DECL) +{ ACE_TRY { - { - ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock); + CosNotification::EventTypeSeq cos_added; + CosNotification::EventTypeSeq cos_removed; - this->dispatch_updates_i (added_copy, removed_copy ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; + const TAO_NS_EventTypeSeq& subscribed_types = this->proxy ()->subscribed_types (); + const TAO_NS_EventType& special = TAO_NS_EventType::special (); - if (this->updates_dispatch_observer_ != 0) - this->updates_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - } + // Don;t inform of types that we already know about. + // E.g. if we're subscribed for {A,B,C,F} + // and we receive an update with added list {A,B,G} + // then, we should only send {G} because peer already knows about {A, B} + // However if we're subscribed for everything, send all kinds of adds. - this->retry_count_ = 0; - } - ACE_CATCHANY - { - if (TAO_debug_level > 0) - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "Peer:dispatch_pending serror sending updates\n "); - } - //ACE_RE_THROW; + // Don;t inform of removed types that we don;t care about. + // e.g. if we're currently subscribed for {A,B,C,F} + // and we receive an update with removed list {A, B, D} + // then, we should only send {A,B} because the peer is not interested in D. + // However if we're subscribed for everything, send all kinds of removes. - ++this->retry_count_; + TAO_NS_EventTypeSeq added_result = added; + TAO_NS_EventTypeSeq removed_result; - if (this->updates_dispatch_observer_ != 0) + if (subscribed_types.find (special) != 0) { - /// Restore the lists. - added.insert_seq (added_copy); - removed.insert_seq (removed_copy); + added_result.remove_seq (subscribed_types); + removed_result.intersection (subscribed_types, removed); + } + else + { + removed_result = removed; + } - ACE_GUARD (TAO_NS_Reverse_Lock, ace_mon, unlock); + added_result.populate_no_special (cos_added); + removed_result.populate_no_special (cos_removed); - this->updates_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER); + if (cos_added.length () != 0 || cos_removed.length () != 0) + { + this->dispatch_updates_i (cos_added, cos_removed ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; } } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::NO_IMPLEMENT, no_impl) + { + // The peer does not implement the offer/subscription_change method + // Do nothing. Later, perhaps set a flag that helps us decide if we should dispatch_updates_i. + } + ACE_CATCH (CORBA::SystemException, sysex) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + // Do nothing + } ACE_ENDTRY; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.h b/TAO/orbsvcs/orbsvcs/Notify/Peer.h index 2d827073fda..658d9728711 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Peer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.h @@ -22,14 +22,11 @@ #include "orbsvcs/CosNotificationC.h" #include "Destroy_Callback.h" #include "EventTypeSeq.h" -//#include "Types.h" class TAO_NS_Proxy; class TAO_NS_QoSProperties; class TAO_NS_Peer; -template <class PEER> class TAO_NS_Dispatch_Observer_T; -typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Peer> TAO_NS_Updates_Dispatch_Observer; /** * @class TAO_NS_Peer @@ -47,38 +44,32 @@ public: /// Destructor virtual ~TAO_NS_Peer (); + /// This method sigantures deliberately match the RefCounting methods required for ESF Proxy + CORBA::ULong _incr_refcnt (void); + CORBA::ULong _decr_refcnt (void); + /// Shutdown the peer. virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); - /// Install the updates observer. - void updates_dispatch_observer (TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer); - /// Access Proxy. virtual TAO_NS_Proxy* proxy (void) = 0; - /// This method sigantures deliberately match the RefCounting methods required for ESF Proxy - CORBA::ULong _incr_refcnt (void); - CORBA::ULong _decr_refcnt (void); - - /// Dispatch Pending. - void dispatch_pending (ACE_ENV_SINGLE_ARG_DECL); + // Dispatch updates + virtual void dispatch_updates (const TAO_NS_EventTypeSeq & added, + const TAO_NS_EventTypeSeq & removed + ACE_ENV_ARG_DECL); /// QoS changed notification from the Peer. virtual void qos_changed (TAO_NS_QoSProperties& qos_properties); + /// Handle dispatch exceptions. + void handle_dispatch_exception (ACE_ENV_SINGLE_ARG_DECL); + protected: - // Dispatch updates implementation. - virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added, - const TAO_NS_EventTypeSeq & removed + /// Implementation of Peer specific dispatch_updates + virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, + const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) = 0; - - ///= Data Members - - // Updates Dispatch Observer - TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer_; - - /// Retry count. How many times have we failed to contact the remote peer? - int retry_count_; }; #if defined (__ACE_INLINE__) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Peer.inl b/TAO/orbsvcs/orbsvcs/Notify/Peer.inl index e466a588442..9fc05856b37 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Peer.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Peer.inl @@ -13,9 +13,3 @@ TAO_NS_Peer::_decr_refcnt (void) { return this->proxy ()->_decr_refcnt (); } - -ACE_INLINE void -TAO_NS_Peer::updates_dispatch_observer (TAO_NS_Updates_Dispatch_Observer* updates_dispatch_observer) -{ - this->updates_dispatch_observer_ = updates_dispatch_observer; -} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp b/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp index 66cd7650ccc..78c619faa23 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy.cpp @@ -14,12 +14,12 @@ ACE_RCSID(RT_Notify, TAO_NS_Proxy, "$Id$") #include "EventChannel.h" #include "EventChannelFactory.h" #include "Notify_Service.h" +#include "Method_Request_Updates.h" +#include "Worker_Task.h" TAO_NS_Proxy::TAO_NS_Proxy (void) :updates_off_ (0) { - // Set initial proxy mode to broadcast. - this->subscribed_types_.insert (TAO_NS_EventType::special ()); } TAO_NS_Proxy::~TAO_NS_Proxy () @@ -27,19 +27,11 @@ TAO_NS_Proxy::~TAO_NS_Proxy () } void -TAO_NS_Proxy::type_added (const TAO_NS_EventType& added) +TAO_NS_Proxy::types_changed (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL_NOT_USED) { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - this->added_.insert (added); - this->removed_.remove (added); -} + TAO_NS_Method_Request_Updates request (added, removed, this); -void -TAO_NS_Proxy::type_removed (const TAO_NS_EventType& removed) -{ - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_); - this->removed_.insert (removed); - this->added_.remove (removed); + this->worker_task ()->exec (request); } CORBA::Boolean @@ -67,7 +59,7 @@ TAO_NS_Proxy::check_filters (TAO_NS_Event_var &event ACE_ENV_ARG_DECL) } CosNotification::EventTypeSeq* -TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL) +TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode, const TAO_NS_EventTypeSeq& types ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException )) @@ -84,7 +76,7 @@ TAO_NS_Proxy::obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_A if (mode == CosNotifyChannelAdmin::ALL_NOW_UPDATES_OFF || mode == CosNotifyChannelAdmin::ALL_NOW_UPDATES_ON) { - this->added_.populate (event_type_seq); + types.populate (event_type_seq); } if (mode == CosNotifyChannelAdmin::NONE_NOW_UPDATES_ON || diff --git a/TAO/orbsvcs/orbsvcs/Notify/Proxy.h b/TAO/orbsvcs/orbsvcs/Notify/Proxy.h index 0bb33c1a91d..5f49f2321e5 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Proxy.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Proxy.h @@ -49,11 +49,8 @@ public: /// Check if this event passes the admin and proxy filters. CORBA::Boolean check_filters (TAO_NS_Event_var &event ACE_ENV_ARG_DECL); - /// Subscription type added - void type_added (const TAO_NS_EventType& added); - - /// Subscription type removed - void type_removed (const TAO_NS_EventType& removed); + /// Inform this proxy that the following types are being advertised. + void types_changed (const TAO_NS_EventTypeSeq& added, const TAO_NS_EventTypeSeq& removed ACE_ENV_ARG_DECL); /// Have updates been turned off. CORBA::Boolean updates_off (void); @@ -64,16 +61,16 @@ public: /// Access our Peer. virtual TAO_NS_Peer* peer (void) = 0; - /// Obtain Types. - virtual CosNotification::EventTypeSeq* obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode ACE_ENV_ARG_DECL) + /// Implement the Obtain Types. + virtual CosNotification::EventTypeSeq* obtain_types (CosNotifyChannelAdmin::ObtainInfoMode mode, const TAO_NS_EventTypeSeq& types ACE_ENV_ARG_DECL) ACE_THROW_SPEC (( CORBA::SystemException )); /// Notification of subscriptions/offers set at the admin. - virtual void admin_subscription (const CosNotification::EventTypeSeq & added, - const CosNotification::EventTypeSeq & removed - ACE_ENV_ARG_DECL) = 0; + virtual void admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) = 0; /// Override, TAO_NS_Object::qos_changed @@ -85,12 +82,6 @@ protected: /// Filter Administration TAO_NS_FilterAdmin filter_admin_; - /// Types added. - TAO_NS_EventTypeSeq added_; - - /// Types removed. - TAO_NS_EventTypeSeq removed_; - /// The types that we're subscribed with the event manager. TAO_NS_EventTypeSeq subscribed_types_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp index da2911b74c2..aec4fa2a758 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.cpp @@ -64,15 +64,18 @@ TAO_NS_ProxyConsumer::connect (TAO_NS_Supplier *supplier ACE_ENV_ARG_DECL) { supplier_ = supplier; - supplier->updates_dispatch_observer (this->event_manager_->updates_dispatch_observer ()); - this->parent_->subscribed_types (this->subscribed_types_ ACE_ENV_ARG_PARAMETER); // get the parents subscribed types. ACE_CHECK; // Inform QoS values. supplier_->qos_changed (this->qos_properties_); - event_manager_->publish (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + TAO_NS_EventTypeSeq removed; + + this->event_manager_->offer_change (this, this->subscribed_types_, removed ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->event_manager_->connect (this ACE_ENV_ARG_PARAMETER); ACE_CHECK; // Increment the global supplier count @@ -83,7 +86,12 @@ TAO_NS_ProxyConsumer::connect (TAO_NS_Supplier *supplier ACE_ENV_ARG_DECL) void TAO_NS_ProxyConsumer::disconnect (ACE_ENV_SINGLE_ARG_DECL) { - event_manager_->un_publish (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + TAO_NS_EventTypeSeq added; + + event_manager_->offer_change (this, added, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->event_manager_->disconnect (this ACE_ENV_ARG_PARAMETER); ACE_CHECK; // Decrement the global supplier count diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h index 781a03b6a0e..30852afa1f4 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer.h @@ -53,6 +53,7 @@ public: /// Shutdown (TAO_NS_Container_T method) virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL); + /// Start event propagation. virtual void push (TAO_NS_Event_var &event); /// Access our Peer. @@ -61,10 +62,10 @@ public: /// Access the Supplier TAO_NS_Supplier* supplier (void); -protected: /// Return 1 if connected int is_connected (void); +protected: /// The Supplier that we're connect to. TAO_NS_Supplier* supplier_; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp index 23bee77b8fc..90a166ec142 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.cpp @@ -24,9 +24,9 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::~TAO_NS_ProxyConsumer_T () } template <class SERVANT_TYPE> void -TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::admin_subscription (const CosNotification::EventTypeSeq & added, - const CosNotification::EventTypeSeq & removed - ACE_ENV_ARG_DECL) +TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) { this->offer_change (added, removed ACE_ENV_ARG_PARAMETER); } @@ -57,17 +57,15 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::offer_change (const CosNotification::Event TAO_NS_EventTypeSeq seq_added (added); TAO_NS_EventTypeSeq seq_removed (removed); - ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; - this->subscribed_types_.init (seq_added, seq_removed); + this->subscribed_types_.init (seq_added, seq_removed); + } - if (this->is_connected () == 1) - { - event_manager_->publish (this, seq_added ACE_ENV_ARG_PARAMETER); - event_manager_->un_publish (this, seq_removed ACE_ENV_ARG_PARAMETER); - } + this->event_manager_->offer_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER); } template <class SERVANT_TYPE> CosNotification::EventTypeSeq* @@ -76,7 +74,7 @@ TAO_NS_ProxyConsumer_T<SERVANT_TYPE>::obtain_subscription_types (CosNotifyChanne CORBA::SystemException )) { - return this->obtain_types (mode ACE_ENV_ARG_PARAMETER); + return this->obtain_types (mode, this->event_manager_->subscription_types () ACE_ENV_ARG_PARAMETER); } #endif /* TAO_NS_PROXYCONSUMER_T_CPP */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h index 475559088ab..6ccc7dcec1d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxyConsumer_T.h @@ -39,9 +39,9 @@ public: ~TAO_NS_ProxyConsumer_T (); /// Notification of subscriptions set at the admin. - virtual void admin_subscription (const CosNotification::EventTypeSeq & added, - const CosNotification::EventTypeSeq & removed - ACE_ENV_ARG_DECL); + virtual void admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL); virtual CosNotifyChannelAdmin::SupplierAdmin_ptr MyAdmin (ACE_ENV_SINGLE_ARG_DECL) ACE_THROW_SPEC (( diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp index dc9270191de..8b358a98682 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier.cpp @@ -47,7 +47,7 @@ TAO_NS_ProxySupplier::connect (TAO_NS_Consumer *consumer ACE_ENV_ARG_DECL) , CosEventChannelAdmin::AlreadyConnected )) { - const TAO_NS_Atomic_Property_Long& consumer_count = this->admin_properties_->consumers (); + TAO_NS_Atomic_Property_Long& consumer_count = this->admin_properties_->consumers (); const TAO_NS_Property_Long& max_consumers = this->admin_properties_->max_consumers (); if (max_consumers != 0 && @@ -67,24 +67,37 @@ TAO_NS_ProxySupplier::connect (TAO_NS_Consumer *consumer ACE_ENV_ARG_DECL) { consumer_ = consumer; - consumer->event_dispatch_observer (this->event_manager_->event_dispatch_observer ()); - consumer->updates_dispatch_observer (this->event_manager_->updates_dispatch_observer ()); - // Inform QoS values. consumer_->qos_changed (this->qos_properties_); this->parent_->subscribed_types (this->subscribed_types_ ACE_ENV_ARG_PARAMETER); // get the parents subscribed types. ACE_CHECK; - event_manager_->subscribe (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + TAO_NS_EventTypeSeq removed; + + this->event_manager_->subscription_change (this, this->subscribed_types_, removed ACE_ENV_ARG_PARAMETER); + + this->event_manager_->connect (this ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + // Increment the global consumer count + ++consumer_count; } } void TAO_NS_ProxySupplier::disconnect (ACE_ENV_SINGLE_ARG_DECL) { - event_manager_->un_subscribe (this, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + TAO_NS_EventTypeSeq added; + + this->event_manager_->subscription_change (this, added, this->subscribed_types_ ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + this->event_manager_->disconnect (this ACE_ENV_ARG_PARAMETER); ACE_CHECK; + + // Decrement the global consumer count + this->admin_properties_->consumers ()--; } void diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp index 38196db6784..7db52521e8b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.cpp @@ -32,9 +32,9 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::~TAO_NS_ProxySupplier_T () } template <class SERVANT_TYPE> void -TAO_NS_ProxySupplier_T<SERVANT_TYPE>::admin_subscription (const CosNotification::EventTypeSeq & added, - const CosNotification::EventTypeSeq & removed - ACE_ENV_ARG_DECL) +TAO_NS_ProxySupplier_T<SERVANT_TYPE>::admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL) { this->subscription_change (added, removed ACE_ENV_ARG_PARAMETER); } @@ -97,7 +97,7 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::obtain_offered_types (CosNotifyChannelAdmi CORBA::SystemException )) { - return this->obtain_types (mode ACE_ENV_ARG_PARAMETER); + return this->obtain_types (mode, this->event_manager_->offered_types () ACE_ENV_ARG_PARAMETER); } template <class SERVANT_TYPE> void @@ -110,17 +110,15 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::subscription_change (const CosNotification TAO_NS_EventTypeSeq seq_added (added); TAO_NS_EventTypeSeq seq_removed (removed); - ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; - this->subscribed_types_.init (seq_added, seq_removed); + this->subscribed_types_.init (seq_added, seq_removed); + } - if (this->is_connected () == 1) - { - event_manager_->subscribe (this, seq_added ACE_ENV_ARG_PARAMETER); - event_manager_->un_subscribe (this, seq_removed ACE_ENV_ARG_PARAMETER); - } + this->event_manager_->subscription_change (this, seq_added, seq_removed ACE_ENV_ARG_PARAMETER); } template <class SERVANT_TYPE> void @@ -131,13 +129,15 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::suspend_connection (ACE_ENV_SINGLE_ARG_DEC CosNotifyChannelAdmin::NotConnected )) { - ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ()); + { + ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_, CORBA::INTERNAL ()); - if (this->is_connected () == 0) - ACE_THROW (CosNotifyChannelAdmin::NotConnected ()); + if (this->is_connected () == 0) + ACE_THROW (CosNotifyChannelAdmin::NotConnected ()); - if (this->consumer_->is_suspended () == 1) - ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ()); + if (this->consumer_->is_suspended () == 1) + ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyInactive ()); + } this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER); } @@ -160,7 +160,7 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::resume_connection (ACE_ENV_SINGLE_ARG_DECL ACE_THROW (CosNotifyChannelAdmin::ConnectionAlreadyActive ()); } - this->consumer_->suspend (ACE_ENV_SINGLE_ARG_PARAMETER); + this->consumer_->resume (ACE_ENV_SINGLE_ARG_PARAMETER); } template <class SERVANT_TYPE> CosNotifyChannelAdmin::ConsumerAdmin_ptr @@ -179,6 +179,8 @@ TAO_NS_ProxySupplier_T<SERVANT_TYPE>::MyAdmin (ACE_ENV_SINGLE_ARG_DECL) return ret._retn (); } +/***************************** UNIMPLEMENTED METHODS***************************************/ + template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr TAO_NS_ProxySupplier_T<SERVANT_TYPE>::priority_filter (ACE_ENV_SINGLE_ARG_DECL) ACE_THROW_SPEC (( diff --git a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h index 1364569e244..c0e8d2054fb 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/ProxySupplier_T.h @@ -39,9 +39,9 @@ public: ~TAO_NS_ProxySupplier_T (); /// Notification of subscriptions set at the admin. - virtual void admin_subscription (const CosNotification::EventTypeSeq & added, - const CosNotification::EventTypeSeq & removed - ACE_ENV_ARG_DECL); + virtual void admin_types_changed (const CosNotification::EventTypeSeq & added, + const CosNotification::EventTypeSeq & removed + ACE_ENV_ARG_DECL); ///= POA_Notify_Internal methods /// POA_Notify_Internal::Event_Forwarder method diff --git a/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp b/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp index 46178ac6b46..c8d1073a3f5 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Refcountable.cpp @@ -25,7 +25,7 @@ TAO_NS_Refcountable::_incr_refcnt (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0); - if (TAO_debug_level > 0 ) + if (TAO_debug_level > 1 ) ACE_DEBUG ((LM_DEBUG,"object:%x incr refcount = %d\n", this, refcount_+1 )); @@ -38,7 +38,7 @@ TAO_NS_Refcountable::_decr_refcnt (void) { ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock_, 0); - if (TAO_debug_level > 0 ) + if (TAO_debug_level > 1 ) ACE_DEBUG ((LM_DEBUG,"object:%x decr refcount = %d\n", this, refcount_-1 )); this->refcount_--; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp index 161905dbfa9..363ebde793b 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Sequence/SequencePushConsumer.cpp @@ -15,7 +15,6 @@ ACE_RCSID(Notify, TAO_NS_SequencePushConsumer, "$id$") #include "../ProxySupplier.h" #include "../Worker_Task.h" #include "../Consumer.h" -#include "../Dispatch_Observer_T.h" #include "Method_Request_Dispatch_EventBatch.h" #include "EventBatch.h" @@ -186,34 +185,19 @@ TAO_NS_SequencePushConsumer::push (const TAO_NS_Event_Collection event_collectio this->push_consumer_->push_structured_events (event_batch ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - - if (this->event_dispatch_observer_ != 0) - { - this->event_dispatch_observer_->dispatch_success (this ACE_ENV_ARG_PARAMETER); - - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - this->retry_count_ = 0; - } + } + ACE_CATCH (CORBA::OBJECT_NOT_EXIST, not_exist) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCH (CORBA::SystemException, sysex) + { + this->handle_dispatch_exception (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; } ACE_CATCHANY { - if (TAO_debug_level > 0) - { - ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "TAO_NS_SequenceConsumer::push: error sending event. informing dispatch observer\n "); - } - //ACE_RE_THROW; - - if (this->event_dispatch_observer_ != 0) - { - { - ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ()); - - ++this->retry_count_; - this->event_batch_.insert (event_collection); - } - - this->event_dispatch_observer_->dispatch_failure (this, this->retry_count_ ACE_ENV_ARG_PARAMETER); - } } ACE_ENDTRY; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl b/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl index 82ebfbe8c69..cb6180d5d7a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl +++ b/TAO/orbsvcs/orbsvcs/Notify/Subscription_Change_Worker.inl @@ -5,5 +5,5 @@ ACE_INLINE void TAO_NS_Subscription_Change_Worker::work (TAO_NS_Proxy* proxy ACE_ENV_ARG_DECL) { - proxy->admin_subscription (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER); + proxy->admin_types_changed (this->added_, this->removed_ ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp b/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp index f1867c3e2fc..28d6883d524 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Supplier.cpp @@ -31,15 +31,9 @@ TAO_NS_Supplier::proxy (void) } void -TAO_NS_Supplier::dispatch_updates_i (const TAO_NS_EventTypeSeq & added, const TAO_NS_EventTypeSeq & removed +TAO_NS_Supplier::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL) { - CosNotification::EventTypeSeq cos_added; - CosNotification::EventTypeSeq cos_removed; - - added.populate (cos_added); - removed.populate (cos_removed); - if (!CORBA::is_nil (this->subscribe_.in ())) - this->subscribe_->subscription_change (cos_added, cos_removed ACE_ENV_ARG_PARAMETER); + this->subscribe_->subscription_change (added, removed ACE_ENV_ARG_PARAMETER); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Supplier.h b/TAO/orbsvcs/orbsvcs/Notify/Supplier.h index 813560ae893..f24bddae07c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Supplier.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Supplier.h @@ -47,8 +47,8 @@ public: protected: /// Dispatch updates implementation. - virtual void dispatch_updates_i (const TAO_NS_EventTypeSeq & added, - const TAO_NS_EventTypeSeq & removed + virtual void dispatch_updates_i (const CosNotification::EventTypeSeq& added, + const CosNotification::EventTypeSeq& removed ACE_ENV_ARG_DECL); /// The proxy that we associate with. diff --git a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp index 9b605767204..51f1b14f39e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/ThreadPool_Task.cpp @@ -132,6 +132,19 @@ void TAO_NS_ThreadPool_Task::shutdown (void) { this->msg_queue_.enqueue (new TAO_NS_Method_Request_Shutdown (this)); + + // We can not wait for ourselves to quit + if (this->thr_mgr ()) + { + // call this->thr_mgr ()->task () in the main thread will assert () + // fail in ACE_Thread_Manager::thread_desc_self (void) so I get + // task this way. + ACE_Thread_Descriptor *mydesc = this->thr_mgr ()->thread_descriptor (ACE_OS::thr_self ()); + + if (mydesc && mydesc->task () == this) + return; + } + this->wait (); return; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Types.h b/TAO/orbsvcs/orbsvcs/Notify/Types.h index 210cdce699c..bc17a2bd0ec 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Types.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Types.h @@ -29,9 +29,6 @@ template <class PROXY> class TAO_ESF_Proxy_Collection; template <class TYPE> class TAO_ESF_RefCount_Guard; template <class PROXY, class ACE_LOCK> class TAO_NS_Event_Map_T; -template <class PEER> class TAO_NS_Dispatch_Observer_T; -template <class PROXY> class TAO_NS_Event_Map_Observer_T; -template <class PEER> class TAO_NS_Pending_Worker_T; /** * Forward declare classes @@ -72,14 +69,5 @@ typedef TAO_ESF_RefCount_Guard<CORBA::ULong> TAO_NS_Object_RefCount_Guard; typedef TAO_NS_Event_Map_T<TAO_NS_ProxySupplier, TAO_SYNCH_RW_MUTEX> TAO_NS_Consumer_Map; typedef TAO_NS_Event_Map_T<TAO_NS_ProxyConsumer, TAO_SYNCH_RW_MUTEX> TAO_NS_Supplier_Map; -typedef TAO_NS_Event_Map_Observer_T<TAO_NS_ProxyConsumer> TAO_NS_Consumer_Map_Observer; -typedef TAO_NS_Event_Map_Observer_T<TAO_NS_ProxySupplier> TAO_NS_Supplier_Map_Observer; - -typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Peer> TAO_NS_Updates_Dispatch_Observer; -typedef TAO_NS_Dispatch_Observer_T<TAO_NS_Consumer> TAO_NS_Event_Dispatch_Observer; - -typedef TAO_NS_Pending_Worker_T<TAO_NS_Peer> TAO_NS_Updates_Pending_Worker; -typedef TAO_NS_Pending_Worker_T<TAO_NS_Consumer> TAO_NS_Event_Pending_Worker; - #include "ace/post.h" #endif /* TAO_NS_TYPES_H */ |