summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp238
1 files changed, 52 insertions, 186 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp
index 8271f4e3faa..0c7411f72d6 100644
--- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp
+++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp
@@ -6,62 +6,30 @@
#include "Notify_ProxySupplier_T.h"
#include "Notify_Event_Manager.h"
#include "Notify_ConsumerAdmin_i.h"
-#include "Notify_Factory.h"
-#include "Notify_Channel_Objects_Factory.h"
-#include "Notify_Event_Manager_Objects_Factory.h"
-#include "Notify_Worker_Task.h"
-#include "Notify_AdminProperties.h"
-
-ACE_RCSID(Notify, Notify_ProxySupplier_T, "$Id$")
template <class SERVANT_TYPE>
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::TAO_Notify_ProxySupplier (TAO_Notify_ConsumerAdmin_i* consumer_admin)
- :consumer_admin_ (consumer_admin),
- is_suspended_ (0),
- dispatching_task_ (0),
- filter_eval_task_ (0)
+TAO_Notify_ProxySupplier<SERVANT_TYPE>::TAO_Notify_ProxySupplier (TAO_Notify_ConsumerAdmin_i* consumeradmin, TAO_Notify_Resource_Manager* resource_manager)
+ :TAO_Notify_Proxy<SERVANT_TYPE> (resource_manager),
+ myadmin_ (consumeradmin),
+ is_suspended_ (0)
{
- event_manager_ = consumer_admin->get_event_manager ();
+ event_manager_ = consumeradmin->get_event_manager ();
}
-template <class SERVANT_TYPE> void
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::init (CosNotifyChannelAdmin::ProxyID proxy_id, CORBA::Environment& ACE_TRY_ENV)
+// Implementation skeleton destructor
+template <class SERVANT_TYPE>
+TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void)
{
- consumer_admin_->_add_ref (ACE_TRY_ENV);
-
- this->proxy_id_ = proxy_id;
-
- TAO_Notify_CO_Factory* cof =
- TAO_Notify_Factory::get_channel_objects_factory ();
-
- this->lock_ = cof->create_proxy_supplier_lock (ACE_TRY_ENV);
-
- TAO_Notify_EMO_Factory* event_manager_objects_factory =
- TAO_Notify_Factory::get_event_manager_objects_factory ();
-
- // Create the task to forward filtering/dispatching commands to:
- this->dispatching_task_ =
- event_manager_objects_factory->create_dispatching_task (ACE_TRY_ENV);
- ACE_CHECK;
+ if (!is_destroyed_)
+ this->cleanup_i ();
- this->filter_eval_task_ =
- event_manager_objects_factory->create_listener_eval_task (ACE_TRY_ENV);
- ACE_CHECK;
-
- // Get hold of the admin properties.
- TAO_Notify_AdminProperties* const admin_properties =
- this->event_manager_->admin_properties ();
-
- // Init the tasks
- this->dispatching_task_->init_task (admin_properties);
- this->filter_eval_task_->init_task (admin_properties);
+ this->myadmin_->proxy_pushsupplier_destroyed (this->myID_);
}
-// Implementation skeleton destructor
-template <class SERVANT_TYPE>
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void)
+template <class SERVANT_TYPE> void
+TAO_Notify_ProxySupplier<SERVANT_TYPE>::cleanup_i (CORBA::Environment& ACE_TRY_ENV)
{
- ACE_DECLARE_NEW_CORBA_ENV;
+ this->is_destroyed_ = 1;
this->event_manager_->unregister_from_publication_updates (this,
ACE_TRY_ENV);
@@ -73,42 +41,8 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void)
added.length (0);
this->event_manager_->subscribe_for_events (this,
+ 0,
added, removed, ACE_TRY_ENV);
-
- delete this->lock_;
-
- this->consumer_admin_->proxy_pushsupplier_destroyed (this->proxy_id_);
- consumer_admin_->_remove_ref (ACE_TRY_ENV);
-
- delete this->dispatching_task_;
- delete this->filter_eval_task_;
-}
-
-template <class SERVANT_TYPE> CORBA::Boolean
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::evaluate_filter (TAO_Notify_Event &event, CORBA::Boolean eval_parent, CORBA::Environment &ACE_TRY_ENV)
-{
- if (eval_parent == 1)
- {
- CosNotifyChannelAdmin::InterFilterGroupOperator filter_operator =
- consumer_admin_->MyOperator (ACE_TRY_ENV);
- // Inter-filter group operator.
-
- CORBA::Boolean bval =
- this->consumer_admin_->get_filter_admin ().match (event, ACE_TRY_ENV);
- ACE_CHECK_RETURN (0);
-
- if ((bval == 1 && filter_operator == CosNotifyChannelAdmin::AND_OP) ||
- (bval == 0 && filter_operator == CosNotifyChannelAdmin::OR_OP))
- {
- return this->filter_admin_.match (event, ACE_TRY_ENV);
- }
- else if (bval == 1 && filter_operator == CosNotifyChannelAdmin::OR_OP)
- return 1;
- else
- return 0;
- }
- else
- return this->filter_admin_.match (event, ACE_TRY_ENV);
}
template <class SERVANT_TYPE> void
@@ -121,30 +55,34 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::dispatch_event (TAO_Notify_Event &event,
return;
}
- if (this->is_suspended_ == 1)
+ // check if it passes the parent filter.
+ CORBA::Boolean bval =
+ this->myadmin_->get_filter_admin ().match (event,
+ ACE_TRY_ENV);
+ ACE_CHECK;
+
+ if (bval == 0) // If the filter did not match, don't send the event.
+ return;
+
+ // Do we need to check our filter too.
+ if (myadmin_->MyOperator (ACE_TRY_ENV) == CosNotifyChannelAdmin::AND_OP)
{
- ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
+ bval = this->filter_admin_.match (event,
+ ACE_TRY_ENV);
ACE_CHECK;
+ if (bval == 0) // If the filter did not match, don't send the event.
+ return;
+ }
+
+ if (this->is_suspended_ == 1)
+ {
this->event_list_.enqueue_tail (event.clone ());
}
else
this->dispatch_event_i (event, ACE_TRY_ENV);
}
-template <class SERVANT_TYPE> TAO_Notify_Worker_Task*
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::event_dispatch_task (void)
-{
- return this->dispatching_task_;
-}
-
-template <class SERVANT_TYPE> TAO_Notify_Worker_Task*
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::filter_eval_task (void)
-{
- return this->filter_eval_task_;
-}
-
template <class SERVANT_TYPE> void
TAO_Notify_ProxySupplier<SERVANT_TYPE>::subscription_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed, CORBA::Environment &ACE_TRY_ENV)
ACE_THROW_SPEC ((
@@ -155,102 +93,36 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::subscription_change (const CosNotificati
if (this->is_connected_ == 1)
{
this->event_manager_->subscribe_for_events (this,
+ &this->subscription_list_,
added, removed, ACE_TRY_ENV);
}
-
- {
- ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- ACE_CHECK;
-
- // simply update our subscription list.
- this->subscription_list_.insert_seq (added);
- this->subscription_list_.remove_seq (removed);
- }
+ else // simply update our subscription list.
+ {
+ this->subscription_list_.insert_seq (added);
+ this->subscription_list_.remove_seq (removed);
+ }
}
template <class SERVANT_TYPE> void
TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_connected (CORBA::Environment &ACE_TRY_ENV)
{
- // Get hold of the admin properties.
- TAO_Notify_AdminProperties* const admin_properties =
- this->event_manager_->admin_properties ();
-
- TAO_Notify_Property_Long* const consumer_count =
- admin_properties->consumers ();
-
- if (admin_properties->max_consumers () != 0 &&
- consumer_count->value () >= admin_properties->max_consumers ())
- ACE_THROW (CORBA::IMP_LIMIT ()); // we've reached the limit of consumers connected.
-
// register with CA
- this->consumer_admin_->register_listener (this, ACE_TRY_ENV);
+ this->myadmin_->register_listener (this, ACE_TRY_ENV);
ACE_CHECK;
- CosNotification::EventTypeSeq added;
-
- CosNotification::EventTypeSeq removed (0);
- removed.length (0);
-
// subscribe it to our current subscriptions.
- added.length (this->subscription_list_.size ());
+ CosNotification::EventTypeSeq added (this->subscription_list_.size ());
+ CosNotification::EventTypeSeq removed (0);
this->subscription_list_.populate (added);
+ removed.length (0);
this->event_manager_->subscribe_for_events (this,
+ 0,
added, removed, ACE_TRY_ENV);
ACE_CHECK;
this->event_manager_->register_for_publication_updates (this, ACE_TRY_ENV);
- ACE_CHECK;
-
- (*consumer_count)++;
-}
-
-template <class SERVANT_TYPE> void
-TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_disconnected (CORBA::Environment &ACE_TRY_ENV)
-{
- {
- ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- ACE_CHECK;
-
- if (this->is_connected_ == 0)
- return;
-
- this->is_connected_ = 0;
- }
-
- CosNotification::EventTypeSeq removed;
-
- CosNotification::EventTypeSeq added (0);
- added.length (0);
-
- // unsubscribe it to our current subscriptions.
- removed.length (this->subscription_list_.size ());
-
- this->subscription_list_.populate (removed);
-
- this->event_manager_->subscribe_for_events (this,
- added, removed, ACE_TRY_ENV);
- ACE_CHECK;
-
- this->event_manager_->unregister_from_publication_updates (this, ACE_TRY_ENV);
- ACE_CHECK;
-
- // shutdown the tasks.
-
- this->dispatching_task_->shutdown (ACE_TRY_ENV);
- ACE_CHECK;
-
- this->filter_eval_task_->shutdown (ACE_TRY_ENV);
- ACE_CHECK;
-
- // Get hold of the admin properties.
- TAO_Notify_AdminProperties* const admin_properties =
- this->event_manager_->admin_properties ();
-
- (*(admin_properties->consumers ()))--;
}
template <class SERVANT_TYPE> void
@@ -274,19 +146,13 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::resume_connection (CORBA::Environment &A
{
TAO_Notify_Event* event;
- this->is_suspended_ = 0;
+ while (this->event_list_.dequeue_head (event) == 0)
+ {
+ this->dispatch_event_i (*event, ACE_TRY_ENV);
+ delete event;
+ }
- {
- ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_,
- CORBA::INTERNAL ());
- ACE_CHECK;
-
- while (this->event_list_.dequeue_head (event) == 0)
- {
- this->dispatch_event_i (*event, ACE_TRY_ENV);
- delete event;
- }
- }
+ this->is_suspended_ = 0;
}
template <class SERVANT_TYPE> CosNotifyChannelAdmin::ConsumerAdmin_ptr
@@ -295,7 +161,7 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::MyAdmin (CORBA::Environment &ACE_TRY_ENV
CORBA::SystemException
))
{
- return this->consumer_admin_->get_ref (ACE_TRY_ENV);
+ return this->myadmin_->get_ref (ACE_TRY_ENV);
}
template <class SERVANT_TYPE> CosNotifyFilter::MappingFilter_ptr