diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp | 238 |
1 files changed, 52 insertions, 186 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp index 8271f4e3faa..0c7411f72d6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp @@ -6,62 +6,30 @@ #include "Notify_ProxySupplier_T.h" #include "Notify_Event_Manager.h" #include "Notify_ConsumerAdmin_i.h" -#include "Notify_Factory.h" -#include "Notify_Channel_Objects_Factory.h" -#include "Notify_Event_Manager_Objects_Factory.h" -#include "Notify_Worker_Task.h" -#include "Notify_AdminProperties.h" - -ACE_RCSID(Notify, Notify_ProxySupplier_T, "$Id$") template <class SERVANT_TYPE> -TAO_Notify_ProxySupplier<SERVANT_TYPE>::TAO_Notify_ProxySupplier (TAO_Notify_ConsumerAdmin_i* consumer_admin) - :consumer_admin_ (consumer_admin), - is_suspended_ (0), - dispatching_task_ (0), - filter_eval_task_ (0) +TAO_Notify_ProxySupplier<SERVANT_TYPE>::TAO_Notify_ProxySupplier (TAO_Notify_ConsumerAdmin_i* consumeradmin, TAO_Notify_Resource_Manager* resource_manager) + :TAO_Notify_Proxy<SERVANT_TYPE> (resource_manager), + myadmin_ (consumeradmin), + is_suspended_ (0) { - event_manager_ = consumer_admin->get_event_manager (); + event_manager_ = consumeradmin->get_event_manager (); } -template <class SERVANT_TYPE> void -TAO_Notify_ProxySupplier<SERVANT_TYPE>::init (CosNotifyChannelAdmin::ProxyID proxy_id, CORBA::Environment& ACE_TRY_ENV) +// Implementation skeleton destructor +template <class SERVANT_TYPE> +TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void) { - consumer_admin_->_add_ref (ACE_TRY_ENV); - - this->proxy_id_ = proxy_id; - - TAO_Notify_CO_Factory* cof = - TAO_Notify_Factory::get_channel_objects_factory (); - - this->lock_ = cof->create_proxy_supplier_lock (ACE_TRY_ENV); - - TAO_Notify_EMO_Factory* event_manager_objects_factory = - TAO_Notify_Factory::get_event_manager_objects_factory (); - - // Create the task to forward filtering/dispatching commands to: - this->dispatching_task_ = - event_manager_objects_factory->create_dispatching_task (ACE_TRY_ENV); - ACE_CHECK; + if (!is_destroyed_) + this->cleanup_i (); - this->filter_eval_task_ = - event_manager_objects_factory->create_listener_eval_task (ACE_TRY_ENV); - ACE_CHECK; - - // Get hold of the admin properties. - TAO_Notify_AdminProperties* const admin_properties = - this->event_manager_->admin_properties (); - - // Init the tasks - this->dispatching_task_->init_task (admin_properties); - this->filter_eval_task_->init_task (admin_properties); + this->myadmin_->proxy_pushsupplier_destroyed (this->myID_); } -// Implementation skeleton destructor -template <class SERVANT_TYPE> -TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void) +template <class SERVANT_TYPE> void +TAO_Notify_ProxySupplier<SERVANT_TYPE>::cleanup_i (CORBA::Environment& ACE_TRY_ENV) { - ACE_DECLARE_NEW_CORBA_ENV; + this->is_destroyed_ = 1; this->event_manager_->unregister_from_publication_updates (this, ACE_TRY_ENV); @@ -73,42 +41,8 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void) added.length (0); this->event_manager_->subscribe_for_events (this, + 0, added, removed, ACE_TRY_ENV); - - delete this->lock_; - - this->consumer_admin_->proxy_pushsupplier_destroyed (this->proxy_id_); - consumer_admin_->_remove_ref (ACE_TRY_ENV); - - delete this->dispatching_task_; - delete this->filter_eval_task_; -} - -template <class SERVANT_TYPE> CORBA::Boolean -TAO_Notify_ProxySupplier<SERVANT_TYPE>::evaluate_filter (TAO_Notify_Event &event, CORBA::Boolean eval_parent, CORBA::Environment &ACE_TRY_ENV) -{ - if (eval_parent == 1) - { - CosNotifyChannelAdmin::InterFilterGroupOperator filter_operator = - consumer_admin_->MyOperator (ACE_TRY_ENV); - // Inter-filter group operator. - - CORBA::Boolean bval = - this->consumer_admin_->get_filter_admin ().match (event, ACE_TRY_ENV); - ACE_CHECK_RETURN (0); - - if ((bval == 1 && filter_operator == CosNotifyChannelAdmin::AND_OP) || - (bval == 0 && filter_operator == CosNotifyChannelAdmin::OR_OP)) - { - return this->filter_admin_.match (event, ACE_TRY_ENV); - } - else if (bval == 1 && filter_operator == CosNotifyChannelAdmin::OR_OP) - return 1; - else - return 0; - } - else - return this->filter_admin_.match (event, ACE_TRY_ENV); } template <class SERVANT_TYPE> void @@ -121,30 +55,34 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::dispatch_event (TAO_Notify_Event &event, return; } - if (this->is_suspended_ == 1) + // check if it passes the parent filter. + CORBA::Boolean bval = + this->myadmin_->get_filter_admin ().match (event, + ACE_TRY_ENV); + ACE_CHECK; + + if (bval == 0) // If the filter did not match, don't send the event. + return; + + // Do we need to check our filter too. + if (myadmin_->MyOperator (ACE_TRY_ENV) == CosNotifyChannelAdmin::AND_OP) { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); + bval = this->filter_admin_.match (event, + ACE_TRY_ENV); ACE_CHECK; + if (bval == 0) // If the filter did not match, don't send the event. + return; + } + + if (this->is_suspended_ == 1) + { this->event_list_.enqueue_tail (event.clone ()); } else this->dispatch_event_i (event, ACE_TRY_ENV); } -template <class SERVANT_TYPE> TAO_Notify_Worker_Task* -TAO_Notify_ProxySupplier<SERVANT_TYPE>::event_dispatch_task (void) -{ - return this->dispatching_task_; -} - -template <class SERVANT_TYPE> TAO_Notify_Worker_Task* -TAO_Notify_ProxySupplier<SERVANT_TYPE>::filter_eval_task (void) -{ - return this->filter_eval_task_; -} - template <class SERVANT_TYPE> void TAO_Notify_ProxySupplier<SERVANT_TYPE>::subscription_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC (( @@ -155,102 +93,36 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::subscription_change (const CosNotificati if (this->is_connected_ == 1) { this->event_manager_->subscribe_for_events (this, + &this->subscription_list_, added, removed, ACE_TRY_ENV); } - - { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; - - // simply update our subscription list. - this->subscription_list_.insert_seq (added); - this->subscription_list_.remove_seq (removed); - } + else // simply update our subscription list. + { + this->subscription_list_.insert_seq (added); + this->subscription_list_.remove_seq (removed); + } } template <class SERVANT_TYPE> void TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_connected (CORBA::Environment &ACE_TRY_ENV) { - // Get hold of the admin properties. - TAO_Notify_AdminProperties* const admin_properties = - this->event_manager_->admin_properties (); - - TAO_Notify_Property_Long* const consumer_count = - admin_properties->consumers (); - - if (admin_properties->max_consumers () != 0 && - consumer_count->value () >= admin_properties->max_consumers ()) - ACE_THROW (CORBA::IMP_LIMIT ()); // we've reached the limit of consumers connected. - // register with CA - this->consumer_admin_->register_listener (this, ACE_TRY_ENV); + this->myadmin_->register_listener (this, ACE_TRY_ENV); ACE_CHECK; - CosNotification::EventTypeSeq added; - - CosNotification::EventTypeSeq removed (0); - removed.length (0); - // subscribe it to our current subscriptions. - added.length (this->subscription_list_.size ()); + CosNotification::EventTypeSeq added (this->subscription_list_.size ()); + CosNotification::EventTypeSeq removed (0); this->subscription_list_.populate (added); + removed.length (0); this->event_manager_->subscribe_for_events (this, + 0, added, removed, ACE_TRY_ENV); ACE_CHECK; this->event_manager_->register_for_publication_updates (this, ACE_TRY_ENV); - ACE_CHECK; - - (*consumer_count)++; -} - -template <class SERVANT_TYPE> void -TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_disconnected (CORBA::Environment &ACE_TRY_ENV) -{ - { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; - - if (this->is_connected_ == 0) - return; - - this->is_connected_ = 0; - } - - CosNotification::EventTypeSeq removed; - - CosNotification::EventTypeSeq added (0); - added.length (0); - - // unsubscribe it to our current subscriptions. - removed.length (this->subscription_list_.size ()); - - this->subscription_list_.populate (removed); - - this->event_manager_->subscribe_for_events (this, - added, removed, ACE_TRY_ENV); - ACE_CHECK; - - this->event_manager_->unregister_from_publication_updates (this, ACE_TRY_ENV); - ACE_CHECK; - - // shutdown the tasks. - - this->dispatching_task_->shutdown (ACE_TRY_ENV); - ACE_CHECK; - - this->filter_eval_task_->shutdown (ACE_TRY_ENV); - ACE_CHECK; - - // Get hold of the admin properties. - TAO_Notify_AdminProperties* const admin_properties = - this->event_manager_->admin_properties (); - - (*(admin_properties->consumers ()))--; } template <class SERVANT_TYPE> void @@ -274,19 +146,13 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::resume_connection (CORBA::Environment &A { TAO_Notify_Event* event; - this->is_suspended_ = 0; + while (this->event_list_.dequeue_head (event) == 0) + { + this->dispatch_event_i (*event, ACE_TRY_ENV); + delete event; + } - { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; - - while (this->event_list_.dequeue_head (event) == 0) - { - this->dispatch_event_i (*event, ACE_TRY_ENV); - delete event; - } - } + this->is_suspended_ = 0; } template <class SERVANT_TYPE> CosNotifyChannelAdmin::ConsumerAdmin_ptr @@ -295,7 +161,7 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::MyAdmin (CORBA::Environment &ACE_TRY_ENV CORBA::SystemException )) { - return this->consumer_admin_->get_ref (ACE_TRY_ENV); + return this->myadmin_->get_ref (ACE_TRY_ENV); } template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr |