diff options
author | pradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-08-30 01:30:39 +0000 |
---|---|---|
committer | pradeep <pradeep@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2000-08-30 01:30:39 +0000 |
commit | fb143fbe9869dd1964f8c5e2f24c26954e53ed12 (patch) | |
tree | 3eb7a807b23d50f0d01189a369d1073575619871 | |
parent | 7a87a90524230fe8bb9fefe01a52f85a5116c986 (diff) | |
download | ATCD-fb143fbe9869dd1964f8c5e2f24c26954e53ed12.tar.gz |
Tue Aug 29 20:28:30 2000 Pradeep Gore <pradeep@cs.wustl.edu>
42 files changed, 748 insertions, 497 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.h index 52ad40fd060..d14b4ea5a66 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.h @@ -30,7 +30,8 @@ #include "tao/corba.h" #include "notify_export.h" -class TAO_Notify_Worker_Task; +class TAO_Notify_Event_Processor; +class TAO_Notify_Event; class TAO_Notify_Export TAO_Notify_Command : public ACE_Message_Block { @@ -38,13 +39,31 @@ class TAO_Notify_Export TAO_Notify_Command : public ACE_Message_Block // TAO_Notify_Command // // = DESCRIPTION - // + // Base class for Command Objects. + // Command objects are executed by the Notify_Worker_Task. // public: - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment&) = 0; + TAO_Notify_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event); + ~TAO_Notify_Command (); + + virtual int execute (CORBA::Environment& ACE_TRY_ENV) = 0; // Command callback + +protected: + // = Data Members + TAO_Notify_Event_Processor* event_processor_; + // The command objects should notify the event processor once they have successfully + // executed commands. + + TAO_Notify_Event* event_; + // The event that we "execute" this command on. }; + +#if defined (__ACE_INLINE__) +#include "Notify_Command.i" +#endif /* __ACE_INLINE__ */ + #include "ace/post.h" #endif /* TAO_NOTIFY_COMMAND_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.i b/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.i new file mode 100644 index 00000000000..35f981fd2ef --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Command.i @@ -0,0 +1,20 @@ +//$Id$ + +#include "Notify_Command.h" +#include "Notify_Event.h" + +ACE_INLINE +TAO_Notify_Command::TAO_Notify_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event) + :event_processor_ (event_processor), + event_ (event) +{ + // Set the parameters that affect queuing in the message queue. + if (this->event_) + this->msg_priority (this->event_->priority ()); + // this->msg_execution_time (this->event_->); +} + +ACE_INLINE +TAO_Notify_Command::~TAO_Notify_Command () +{ +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.cpp index e3269f6ce32..35ee448cf20 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.cpp @@ -38,11 +38,11 @@ TAO_Notify_ConsumerAdmin_i::TAO_Notify_ConsumerAdmin_i (TAO_Notify_EventChannel_ collection_factory_ (TAO_Notify_Factory::get_collection_factory ()), event_manager_ (event_channel->get_event_manager ()), event_listener_list_ (0), - filter_eval_task_ (0), - dispatching_task_ (0) + dispatching_task_ (0), + filter_eval_task_ (0) { // @@ Pradeep: don't forget the this-> stuff for local variables. - event_channel_->_add_ref (); // we don't want our parent to go away! + this->event_channel_->_add_ref (); // we don't want our parent to go away! } // Implementation skeleton destructor @@ -58,6 +58,9 @@ TAO_Notify_ConsumerAdmin_i::~TAO_Notify_ConsumerAdmin_i (void) this->event_channel_->consumer_admin_destroyed (this->my_id_); event_channel_->_remove_ref (); + + delete this->dispatching_task_; + delete this->filter_eval_task_; } CORBA::ULong @@ -97,7 +100,7 @@ void TAO_Notify_ConsumerAdmin_i::dispatch_event (TAO_Notify_Event &event, CORBA::Environment &ACE_TRY_ENV) { // Dispatch the event to all the registered listeners. - TAO_Notify_Dispatch_Command_Worker worker (&event, this->dispatching_task_); + TAO_Notify_Dispatch_Command_Worker worker (&event, this->event_manager_->event_processor ()); // Propogate the filter command. this->event_listener_list_->for_each (&worker, ACE_TRY_ENV); @@ -116,7 +119,7 @@ TAO_Notify_ConsumerAdmin_i::evaluate_filter (TAO_Notify_Event &event, CORBA::Boo // If the filter operator requires that each listener attached to this admin be evaluated, // we feed the listeners to the "listener filter evaluation" task. - TAO_Notify_Filter_Command_Worker worker (&event, this->filter_eval_task_, 0); + TAO_Notify_Filter_Command_Worker worker (&event, this->event_manager_->event_processor (), 0); // note the last param. we ask that the parent filter *not* be evaluated again // because we've done it here. @@ -145,11 +148,8 @@ TAO_Notify_ConsumerAdmin_i::evaluate_filter (TAO_Notify_Event &event, CORBA::Boo } void -TAO_Notify_ConsumerAdmin_i::proxy_pushsupplier_destroyed (CosNotifyChannelAdmin::ProxyID proxyID) +TAO_Notify_ConsumerAdmin_i::proxy_pushsupplier_destroyed (CosNotifyChannelAdmin::ProxyID /*proxyID*/) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - - this->proxy_pushsupplier_ids_.put (proxyID); } void @@ -174,10 +174,20 @@ TAO_Notify_ConsumerAdmin_i::init (CosNotifyChannelAdmin::AdminID my_id, this->event_listener_list_ = this->collection_factory_->create_event_listener_list (ACE_TRY_ENV); + ACE_CHECK; - // Save the task to forward filtering/dispatching commands to: - this->filter_eval_task_ = this->event_manager_->event_processor ()->get_listener_filter_eval_task (); - this->dispatching_task_ = this->event_manager_->event_processor ()->get_dispatching_task (); + // Create the task to forward filtering/dispatching commands to: + this->dispatching_task_ = + this->event_manager_objects_factory_->create_dispatching_task (ACE_TRY_ENV); + ACE_CHECK; + + this->filter_eval_task_ = + this->event_manager_objects_factory_->create_listener_eval_task (ACE_TRY_ENV); + ACE_CHECK; + + // open the tasks + this->dispatching_task_->open (0); + this->filter_eval_task_->open (0); // Initially we set up things so that all listeners are subscribed for // all the events so that things "work" even if we don't twiddle with @@ -227,6 +237,12 @@ TAO_Notify_ConsumerAdmin_i::destroy_i (CORBA::Environment &ACE_TRY_ENV) this->poa_factory_->destroy_POA (this->proxy_pushsupplier_POA_.in (), ACE_TRY_ENV); ACE_CHECK; + + this->dispatching_task_->shutdown (ACE_TRY_ENV); + ACE_CHECK; + + this->filter_eval_task_->shutdown (ACE_TRY_ENV); + ACE_CHECK; } void @@ -248,6 +264,18 @@ TAO_Notify_ConsumerAdmin_i::shutdown (CORBA::Environment &ACE_TRY_ENV) this->destroy_i (ACE_TRY_ENV); } +TAO_Notify_Worker_Task* +TAO_Notify_ConsumerAdmin_i::event_dispatch_task (void) +{ + return this->dispatching_task_; +} + +TAO_Notify_Worker_Task* +TAO_Notify_ConsumerAdmin_i::filter_eval_task (void) +{ + return this->filter_eval_task_; +} + void TAO_Notify_ConsumerAdmin_i::subscription_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC (( @@ -442,48 +470,35 @@ TAO_Notify_ConsumerAdmin_i::obtain_notification_push_supplier (CosNotifyChannelA ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxySupplier::_nil ()); proxy_id = this->proxy_pushsupplier_ids_.get (); - this->proxy_pushsupplier_ids_.next (); } - ACE_TRY + switch (ctype) { - switch (ctype) - { - case CosNotifyChannelAdmin::ANY_EVENT: - { - obj = this->obtain_proxy_pushsupplier_i (proxy_id, ACE_TRY_ENV); - ACE_TRY_CHECK; - } - break; - case CosNotifyChannelAdmin::STRUCTURED_EVENT: - { - obj = this->obtain_struct_proxy_pushsupplier_i (proxy_id, ACE_TRY_ENV); - ACE_TRY_CHECK; - } - break; - - case CosNotifyChannelAdmin::SEQUENCE_EVENT: - { - obj = this->obtain_sequence_proxy_pushsupplier_i (proxy_id, - ACE_TRY_ENV); - ACE_TRY_CHECK; - } - break; - - default: - ACE_THROW_RETURN (CORBA::BAD_PARAM (), - CosNotifyChannelAdmin::ProxySupplier::_nil ()); - } - - // this->proxy_pushsupplier_ids_.next (); + case CosNotifyChannelAdmin::ANY_EVENT: + { + obj = this->obtain_proxy_pushsupplier_i (proxy_id, ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxySupplier::_nil ()); + } + break; + case CosNotifyChannelAdmin::STRUCTURED_EVENT: + { + obj = this->obtain_struct_proxy_pushsupplier_i (proxy_id, ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxySupplier::_nil ()); + } + break; + + case CosNotifyChannelAdmin::SEQUENCE_EVENT: + { + obj = this->obtain_sequence_proxy_pushsupplier_i (proxy_id, + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxySupplier::_nil ()); + } + break; + + default: + ACE_THROW_RETURN (CORBA::BAD_PARAM (), + CosNotifyChannelAdmin::ProxySupplier::_nil ()); } - ACE_CATCHALL - { - this->proxy_pushsupplier_ids_.put (proxy_id); - ACE_RE_THROW; - } - ACE_ENDTRY; - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxySupplier::_nil ()); return CosNotifyChannelAdmin::ProxySupplier::_narrow (obj.in (), ACE_TRY_ENV); @@ -663,32 +678,25 @@ TAO_Notify_ConsumerAdmin_i::pull_suppliers (CORBA::Environment &ACE_TRY_ENV) /****************************************************************************************************/ -TAO_Notify_Filter_Command_Worker::TAO_Notify_Filter_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Worker_Task* task, CORBA::Boolean eval_parent) - :event_ (event), - task_ (task), - eval_parent_ (eval_parent) +TAO_Notify_Filter_Command_Worker::TAO_Notify_Filter_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Event_Processor* event_processor, CORBA::Boolean eval_parent) + : event_ (event), + event_processor_ (event_processor), + eval_parent_ (eval_parent) { } void TAO_Notify_Filter_Command_Worker::work (TAO_Notify_EventListener* event_listener, CORBA::Environment &ACE_TRY_ENV) { - // @@ Pradeep: you should use ACE_NEW here.... - // @@ Pradeep: do you really need to allocate this guy from the - // heap? Maybe you can just allocate it from the stack and only if - // somebody really wants to keep it around you make a copy? The - // idea is to save allocations in the good case. - TAO_Notify_Listener_Filter_Eval_Command* mb = - new TAO_Notify_Listener_Filter_Eval_Command (this->event_, event_listener, this->eval_parent_); - - this->task_->process_event (mb, ACE_TRY_ENV); + this->event_processor_->evaluate_listener_filter (this->event_, event_listener, + this->eval_parent_, ACE_TRY_ENV); } /****************************************************************************************************/ -TAO_Notify_Dispatch_Command_Worker::TAO_Notify_Dispatch_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Worker_Task* task) - :event_ (event), - task_ (task) +TAO_Notify_Dispatch_Command_Worker::TAO_Notify_Dispatch_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Event_Processor* event_processor) + : event_ (event), + event_processor_ (event_processor) { } @@ -699,10 +707,7 @@ TAO_Notify_Dispatch_Command_Worker::~TAO_Notify_Dispatch_Command_Worker () void TAO_Notify_Dispatch_Command_Worker::work (TAO_Notify_EventListener* event_listener, CORBA::Environment &ACE_TRY_ENV) { - TAO_Notify_Event_Dispatch_Command* dispatch = - new TAO_Notify_Event_Dispatch_Command (this->event_, event_listener); - - task_->process_event (dispatch, ACE_TRY_ENV); + this->event_processor_->dispatch_event (this->event_, event_listener, ACE_TRY_ENV); } /****************************************************************************************************/ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.h index 72916cf408f..40457951016 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ConsumerAdmin_i.h @@ -38,6 +38,7 @@ class TAO_Notify_POA_Factory; class TAO_Notify_EMO_Factory; class TAO_Notify_Collection_Factory; class TAO_Notify_Worker_Task; +class TAO_Notify_Event_Processor; #if defined(_MSC_VER) #if (_MSC_VER >= 1200) @@ -81,7 +82,13 @@ class TAO_Notify_Export TAO_Notify_ConsumerAdmin_i : public TAO_Notify_EventList virtual void shutdown (CORBA::Environment &ACE_TRY_ENV); // Ask the listener to relinqish any bindings and prepare to be disposed. + virtual TAO_Notify_Worker_Task* event_dispatch_task (void); + // The Worker task associated with the event listener for event dispatching + + virtual TAO_Notify_Worker_Task* filter_eval_task (void); + // The Worker task associated with the event listener for filter evaluation. //= Admin Methods. + void init (CosNotifyChannelAdmin::AdminID myID, CosNotifyChannelAdmin::InterFilterGroupOperator myOperator, PortableServer::POA_ptr my_POA, @@ -359,24 +366,24 @@ protected: TAO_Notify_EventType_List subscription_list_; // The list of event types that all our proxys are interested in receiving. - TAO_Notify_EventListener_List* event_listener_list_; + TAO_Notify_EventListener_List* event_listener_list_; // The list of event listeners that have registered with us - TAO_Notify_ID_Pool_Ex<CosNotifyChannelAdmin::ProxyID, + TAO_Notify_ID_Pool_Ex<CosNotifyChannelAdmin::ProxyID, CosNotifyChannelAdmin::ProxyIDSeq> proxy_pushsupplier_ids_; - // Id generator for proxy push suppliers. + // Id generator for proxy push suppliers. - TAO_Notify_QoSAdmin_i qos_admin_; - // Handle QoS admin methods. + TAO_Notify_QoSAdmin_i qos_admin_; + // Handle QoS admin methods. TAO_Notify_FilterAdmin_i filter_admin_; // Handles the Filter Admin methods. - TAO_Notify_Worker_Task* filter_eval_task_; - // The task to forward filter evaluation commands to. - TAO_Notify_Worker_Task* dispatching_task_; - // The task to forward event dispatching commands to. + // The dispatching task to send events to a listener group affiliated with this admin. + + TAO_Notify_Worker_Task* filter_eval_task_; + // The filter evaluation task for this admin. }; /****************************************************************************************************/ @@ -390,14 +397,14 @@ class TAO_Notify_Export TAO_Notify_Filter_Command_Worker : public TAO_ESF_Worker // Enqueue each listener for the filter evaluation command. // public: - TAO_Notify_Filter_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Worker_Task* task, CORBA::Boolean eval_parent); + TAO_Notify_Filter_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Event_Processor* event_processor, CORBA::Boolean eval_parent); // = TAO_ESF_Worker method void work (TAO_Notify_EventListener* listener, CORBA::Environment &ACE_TRY_ENV); protected: TAO_Notify_Event* event_; - TAO_Notify_Worker_Task* task_; + TAO_Notify_Event_Processor* event_processor_; CORBA::Boolean eval_parent_; }; @@ -412,7 +419,7 @@ class TAO_Notify_Export TAO_Notify_Dispatch_Command_Worker : public TAO_ESF_Work // Worker to invoke the dispatch command for each member of the collection. // public: - TAO_Notify_Dispatch_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Worker_Task* task); + TAO_Notify_Dispatch_Command_Worker (TAO_Notify_Event* event, TAO_Notify_Event_Processor* event_processor); ~TAO_Notify_Dispatch_Command_Worker (); // = TAO_ESF_Worker method @@ -420,7 +427,7 @@ public: protected: TAO_Notify_Event* event_; - TAO_Notify_Worker_Task* task_; + TAO_Notify_Event_Processor* event_processor_; }; /****************************************************************************************************/ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_Collection_Factory.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_Collection_Factory.cpp index fe110261e28..04143a328aa 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_Collection_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_Collection_Factory.cpp @@ -1,8 +1,9 @@ // $Id$ +#include "Notify_Default_Collection_Factory.h" #include "orbsvcs/ESF/ESF_Proxy_List.h" #include "orbsvcs/ESF/ESF_Immediate_Changes.h" -#include "Notify_Default_Collection_Factory.h" +#include "orbsvcs/ESF/ESF_Copy_On_Write.h" #include "Notify_Factory.h" ACE_RCSID(Notify, Notify_Default_Collection_Factory, "$Id$") @@ -46,13 +47,19 @@ TAO_Notify_Default_Collection_Factory::create_event_listener_list (CORBA::Enviro // fails with any sort of recursive calls. TAO_Notify_EventListener_List* listener_list = + new TAO_ESF_Copy_On_Write<TAO_Notify_EventListener, + TAO_ESF_Proxy_List<TAO_Notify_EventListener>, + TAO_Notify_EventListener_List_Iterator, + ACE_SYNCH> (); + return listener_list; + + /* new TAO_ESF_Immediate_Changes<TAO_Notify_EventListener, TAO_ESF_Proxy_List<TAO_Notify_EventListener>, TAO_Notify_EventListener_List_Iterator, ACE_SYNCH_MUTEX> (); - return listener_list; - /*ACE_NEW_THROW_EX (listener_list, +ACE_NEW_THROW_EX (listener_list, TAO_ESF_Immediate_Changes<TAO_Notify_EventListener, TAO_ESF_Proxy_List<TAO_Notify_EventListener>, TAO_Notify_EventListener_List_Iterator, @@ -64,13 +71,23 @@ TAO_Notify_Default_Collection_Factory::create_event_listener_list (CORBA::Enviro TAO_Notify_UpdateListener_List* TAO_Notify_Default_Collection_Factory::create_update_listener_list (CORBA::Environment &/*ACE_TRY_ENV*/) { - TAO_Notify_UpdateListener_List* listener_list = + + /* TAO_Notify_UpdateListener_List* listener_list = new TAO_ESF_Immediate_Changes<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, TAO_Notify_UpdateListener_List_Iterator, ACE_SYNCH_MUTEX> (); // ACE_SYNCH_NULL_MUTEX return listener_list; + */ + + TAO_Notify_UpdateListener_List* listener_list = + new TAO_ESF_Copy_On_Write<TAO_Notify_UpdateListener, + TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, + TAO_Notify_UpdateListener_List_Iterator, + ACE_SYNCH> (); + return listener_list; + } // **************************************************************** @@ -94,6 +111,13 @@ template class TAO_ESF_Immediate_Changes<TAO_Notify_EventListener, TAO_ESF_Proxy template class TAO_ESF_Immediate_Changes<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_SYNCH_MUTEX>; template class TAO_ESF_Immediate_Changes<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write<TAO_Notify_EventListener, TAO_ESF_Proxy_List<TAO_Notify_EventListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_EventListener *>, ACE_SYNCH>; +template class TAO_ESF_Copy_On_Write<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_SYNCH>; + +template class TAO_ESF_Copy_On_Write_Collection<TAO_ESF_Proxy_List<TAO_Notify_EventListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_EventListener *> >; + +template class TAO_ESF_Copy_On_Write_Collection<TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *> >; + template class TAO_ESF_Proxy_List<TAO_Notify_EventListener>; template class TAO_ESF_Proxy_Collection<TAO_Notify_EventListener>; @@ -109,6 +133,14 @@ template class TAO_ESF_Proxy_Collection<TAO_Notify_UpdateListener>; #pragma instantiate TAO_ESF_Immediate_Changes<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_SYNCH_MUTEX> #pragma instantiate TAO_ESF_Immediate_Changes<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_Null_Mutex> + +#pragma instantiate TAO_ESF_Copy_On_Write<TAO_Notify_EventListener, TAO_ESF_Proxy_List<TAO_Notify_EventListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_EventListener *>, ACE_SYNCH> +#pragma instantiate TAO_ESF_Copy_On_Write<TAO_Notify_UpdateListener, TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *>, ACE_SYNCH> + +#pragma instantiate TAO_ESF_Copy_On_Write_Collection<TAO_ESF_Proxy_List<TAO_Notify_EventListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_EventListener *> > + +#pragma instantiate TAO_ESF_Copy_On_Write_Collection<TAO_ESF_Proxy_List<TAO_Notify_UpdateListener>, ACE_Unbounded_Set_Iterator<TAO_Notify_UpdateListener *> > + #pragma instantiate TAO_ESF_Proxy_List<TAO_Notify_EventListener> #pragma instantiate TAO_ESF_Proxy_Collection<TAO_Notify_EventListener> diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp index 537c55f0e6f..3b9b34be88e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.cpp @@ -109,46 +109,45 @@ TAO_Notify_Default_EMO_Factory::create_event_processor (TAO_Notify_Event_Manager TAO_Notify_Worker_Task* -TAO_Notify_Default_EMO_Factory::create_source_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV) +TAO_Notify_Default_EMO_Factory::create_source_eval_task (CORBA::Environment &ACE_TRY_ENV) { // @@ pass the correct option to initialize this as passive/active object. TAO_Notify_Worker_Task* task; ACE_NEW_THROW_EX (task, - TAO_Notify_Worker_Task (event_manager,this->mt_source_eval_), + TAO_Notify_Worker_Task (this->mt_source_eval_), CORBA::NO_MEMORY ()); return task; } TAO_Notify_Worker_Task* -TAO_Notify_Default_EMO_Factory::create_lookup_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV) +TAO_Notify_Default_EMO_Factory::create_lookup_task (CORBA::Environment &ACE_TRY_ENV) { // @@ pass the correct option to initialize this as passive/active object. TAO_Notify_Worker_Task* task; ACE_NEW_THROW_EX (task, - TAO_Notify_Worker_Task (event_manager,this->mt_lookup_), + TAO_Notify_Worker_Task (this->mt_lookup_), CORBA::NO_MEMORY ()); return task; } TAO_Notify_Worker_Task* -TAO_Notify_Default_EMO_Factory::create_listener_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV) +TAO_Notify_Default_EMO_Factory::create_listener_eval_task (CORBA::Environment &ACE_TRY_ENV) { // @@ pass the correct option to initialize this as passive/active object. TAO_Notify_Worker_Task* task; ACE_NEW_THROW_EX (task, - TAO_Notify_Worker_Task (event_manager, this->mt_listener_eval_), + TAO_Notify_Worker_Task (this->mt_listener_eval_), CORBA::NO_MEMORY ()); return task; } TAO_Notify_Worker_Task* -TAO_Notify_Default_EMO_Factory::create_dispatching_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV) +TAO_Notify_Default_EMO_Factory::create_dispatching_task (CORBA::Environment &ACE_TRY_ENV) { // @@ pass the correct option to initialize this as passive/active object. TAO_Notify_Worker_Task* task; ACE_NEW_THROW_EX (task, - TAO_Notify_Worker_Task (event_manager, - this->mt_dispatching_), + TAO_Notify_Worker_Task (this->mt_dispatching_), CORBA::NO_MEMORY ()); return task; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.h index f5ecb820f4d..487b0122389 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Default_EMO_Factory.h @@ -58,10 +58,10 @@ class TAO_Notify_Export TAO_Notify_Default_EMO_Factory : public TAO_Notify_EMO_F // Create event processor. // = Create processing tasks. - virtual TAO_Notify_Worker_Task* create_source_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV); - virtual TAO_Notify_Worker_Task* create_lookup_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV); - virtual TAO_Notify_Worker_Task* create_listener_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV); - virtual TAO_Notify_Worker_Task* create_dispatching_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV); + virtual TAO_Notify_Worker_Task* create_source_eval_task (CORBA::Environment &ACE_TRY_ENV); + virtual TAO_Notify_Worker_Task* create_lookup_task (CORBA::Environment &ACE_TRY_ENV); + virtual TAO_Notify_Worker_Task* create_listener_eval_task ( CORBA::Environment &ACE_TRY_ENV); + virtual TAO_Notify_Worker_Task* create_dispatching_task (CORBA::Environment &ACE_TRY_ENV); protected: // = Params read via the svc.conf diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.cpp index efd0e1941e2..f5f85c32964 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.cpp @@ -1,6 +1,10 @@ // $Id$ #include "Notify_Event.h" +#if ! defined (__ACE_INLINE__) +#include "Notify_Event.i" +#endif /* __ACE_INLINE__ */ + ACE_RCSID(Notify, Notify_Event, "$Id$") // @@ Pradeep: David is going to give you a hard time from having a @@ -114,7 +118,12 @@ TAO_Notify_EventType::get_native (void) const TAO_Notify_Event::TAO_Notify_Event (void) :lock_ (0), - refcount_ (1) + refcount_ (1), + event_reliability_ (CosNotification::BestEffort), + priority_ (CosNotification::DefaultPriority), + // start_time_ (0), + // stop_time_ (0), + timeout_ (0) { lock_ = new ACE_Lock_Adapter<ACE_SYNCH_MUTEX> (); } @@ -258,6 +267,8 @@ TAO_Notify_StructuredEvent::TAO_Notify_StructuredEvent (CosNotification::Structu event_type_ (notification->header.fixed_header.event_type), is_owner_ (1) { + + this->init_QoS (); } TAO_Notify_StructuredEvent::TAO_Notify_StructuredEvent (const CosNotification::StructuredEvent * notification) @@ -273,6 +284,34 @@ TAO_Notify_StructuredEvent::~TAO_Notify_StructuredEvent () delete this->data_; } +void +TAO_Notify_StructuredEvent::init_QoS (void) +{ + CosNotification::PropertySeq& qos = data_->header.variable_header; + + for (CORBA::ULong index = 0; index < qos.length (); ++index) + { + ACE_CString property_name(qos[index].name); + + if (property_name.compare (CosNotification::Priority) == 0) + { + qos[index].value >>= this->priority_; + } + else if (property_name.compare (CosNotification::StartTime)) + { + // qos[index].value >>= this->start_time_; + } + else if (property_name.compare (CosNotification::StopTime)) + { + // qos[index].value >>= this->stop_time_; + } + else if (property_name.compare (CosNotification::Timeout)) + { + qos[index].value >>= this->timeout_; + } + } +} + TAO_Notify_Event* TAO_Notify_StructuredEvent::clone (void) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.h index 7e3789f17e1..cc6c9efe38c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.h @@ -129,6 +129,29 @@ public: virtual void do_push (CosNotifyComm::StructuredPushConsumer_ptr consumer, CORBA::Environment &ACE_TRY_ENV) const = 0; // Push self to <consumer> + // = QoS Properties. + // + + CORBA::Short event_reliability (void); + void event_reliability (CORBA::Short event_reliability); + // Not implemented. + + CORBA::Short priority (void); + void priority (CORBA::Short priority); + // Event priority + + TimeBase::UtcT start_time (void); + void start_time (TimeBase::UtcT start_time); + // Earliest delivery time. + + TimeBase::UtcT stop_time (void); + void stop_time (TimeBase::UtcT stop_time); + // Latest absolute expiry time for this event. + + TimeBase::TimeT timeout (void); + void timeout (TimeBase::TimeT timeout); + // Relative expiry time. + // = Refcounted lifetime void _incr_refcnt (void); void _decr_refcnt (void); @@ -139,6 +162,13 @@ public: CORBA::ULong refcount_; // The reference count. + + // = QoS properties + CORBA::Short event_reliability_; + CORBA::Short priority_; + TimeBase::UtcT start_time_; + TimeBase::UtcT stop_time_; + TimeBase::TimeT timeout_; }; // **************************************************************** @@ -204,6 +234,11 @@ public: virtual void do_push (CosNotifyComm::StructuredPushConsumer_ptr consumer, CORBA::Environment &ACE_TRY_ENV) const; protected: + + void init_QoS (void); + // Load the QoS properties specified for this event from <data_>. + + // = Data Members CosNotification::StructuredEvent* data_; // The data @@ -214,6 +249,11 @@ protected: // Do we own the data. }; + +#if defined (__ACE_INLINE__) +#include "Notify_Event.i" +#endif /* __ACE_INLINE__ */ + #include "ace/post.h" #endif /* TAO_NOTIFY_EVENT_H */ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.i b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.i new file mode 100644 index 00000000000..2e1541c89fd --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event.i @@ -0,0 +1,63 @@ +// $Id$ + +#include "Notify_Event.h" + +ACE_INLINE CORBA::Short +TAO_Notify_Event::event_reliability (void) +{ + return this->event_reliability_; +} + +ACE_INLINE void +TAO_Notify_Event::event_reliability (CORBA::Short event_reliability) +{ + this->event_reliability_ = event_reliability; +} + +ACE_INLINE CORBA::Short +TAO_Notify_Event::priority (void) +{ + return this->priority_; +} + +ACE_INLINE void +TAO_Notify_Event::priority (CORBA::Short priority) +{ + this->priority_ = priority; +} + +ACE_INLINE TimeBase::UtcT +TAO_Notify_Event::start_time (void) +{ + return this->start_time_; +} + +ACE_INLINE void +TAO_Notify_Event::start_time (TimeBase::UtcT start_time) +{ + this->start_time_ = start_time; +} + +ACE_INLINE TimeBase::UtcT +TAO_Notify_Event::stop_time (void) +{ + return this->stop_time_; +} + +ACE_INLINE void +TAO_Notify_Event::stop_time (TimeBase::UtcT stop_time) +{ + this->stop_time_ = stop_time; +} + +ACE_INLINE TimeBase::TimeT +TAO_Notify_Event::timeout (void) +{ + return this->timeout_; +} + +ACE_INLINE void +TAO_Notify_Event::timeout (TimeBase::TimeT timeout) +{ + this->timeout_ = timeout; +} diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.cpp index bcb00683b20..0bf24fd02f6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannelFactory_i.cpp @@ -80,9 +80,6 @@ TAO_Notify_EventChannelFactory_i::get_ref (CORBA::Environment &ACE_TRY_ENV) void TAO_Notify_EventChannelFactory_i::event_channel_destroyed (CosNotifyChannelAdmin::ChannelID channel_id) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - - this->ec_ids_.put (channel_id); ACE_DEBUG ((LM_DEBUG, "event_channel_destroyed %d\n", channel_id)); } @@ -114,8 +111,6 @@ TAO_Notify_EventChannelFactory_i::create_channel(const CosNotification::QoSPrope CosNotification::UnsupportedAdmin )) { - ACE_DEBUG ((LM_DEBUG, - "In TAO_Notify_EventChannelFactory_i::create_channel\n")); TAO_Notify_EventChannel_i* channel = this->channel_objects_factory_->create_event_channel (this, ACE_TRY_ENV); @@ -131,21 +126,18 @@ TAO_Notify_EventChannelFactory_i::create_channel(const CosNotification::QoSPrope ec_id = this->ec_ids_.get (); ACE_DEBUG ((LM_DEBUG, "event_channel created %d\n", ec_id)); - channel->init (ec_id, initial_qos, initial_admin, this->my_POA_.in (), this->ec_POA_.in (), - ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil ()); + } - CORBA::Object_var obj = this->poa_factory_-> - activate_object_with_id (ec_id, - this->ec_POA_.in (), - channel, - ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil ()); + channel->init (ec_id, initial_qos, initial_admin, this->my_POA_.in (), this->ec_POA_.in (), + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil ()); - this->ec_ids_.next (); + CORBA::Object_var obj = this->poa_factory_-> + activate_object_with_id (ec_id, this->ec_POA_.in (), channel, + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::EventChannel::_nil ()); - return CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ()); - } + return CosNotifyChannelAdmin::EventChannel::_narrow (obj.in ()); } CosNotifyChannelAdmin::ChannelIDSeq* diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannel_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannel_i.cpp index 97f7d0338ea..6b3237d7e0e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannel_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_EventChannel_i.cpp @@ -107,19 +107,13 @@ TAO_Notify_EventChannel_i::get_event_manager (void) } void -TAO_Notify_EventChannel_i::consumer_admin_destroyed (CosNotifyChannelAdmin::AdminID CA_ID) +TAO_Notify_EventChannel_i::consumer_admin_destroyed (CosNotifyChannelAdmin::AdminID /*CA_ID*/) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - - this->consumer_admin_ids_.put (CA_ID); } void -TAO_Notify_EventChannel_i::supplier_admin_destroyed (CosNotifyChannelAdmin::AdminID SA_ID) +TAO_Notify_EventChannel_i::supplier_admin_destroyed (CosNotifyChannelAdmin::AdminID /*SA_ID*/) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - - this->supplier_admin_ids_.put (SA_ID); } PortableServer::POA_ptr @@ -227,25 +221,23 @@ TAO_Notify_EventChannel_i::new_for_consumers (CosNotifyChannelAdmin::InterFilter ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); id = this->consumer_admin_ids_.get (); + } - consumer_admin->init (id, op, this->CA_POA_.in (), ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); - - CORBA::Object_var obj = - this->poa_factory_->activate_object_with_id (id, - this->CA_POA_.in (), - consumer_admin, - ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); + consumer_admin->init (id, op, this->CA_POA_.in (), ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); - this->consumer_admin_ids_.next (); + CORBA::Object_var obj = + this->poa_factory_->activate_object_with_id (id, + this->CA_POA_.in (), + consumer_admin, + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); - // Register the group listener. - this->event_listener_list_->connected (consumer_admin, ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); + // Register the group listener. + this->event_listener_list_->connected (consumer_admin, ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ConsumerAdmin::_nil ()); - return CosNotifyChannelAdmin::ConsumerAdmin::_narrow (obj.in ()); - } + return CosNotifyChannelAdmin::ConsumerAdmin::_narrow (obj.in ()); } CosNotifyChannelAdmin::SupplierAdmin_ptr @@ -264,21 +256,19 @@ TAO_Notify_EventChannel_i::new_for_suppliers (CosNotifyChannelAdmin::InterFilter ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, CORBA::INTERNAL ()); id = this->supplier_admin_ids_.get (); + } - supplieradmin->init (id, op, this->SA_POA_.in (), ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::SupplierAdmin::_nil ()); - - CORBA::Object_var obj = this->poa_factory_-> - activate_object_with_id (id, - this->SA_POA_.in (), - supplieradmin, - ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::SupplierAdmin::_nil ()); + supplieradmin->init (id, op, this->SA_POA_.in (), ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::SupplierAdmin::_nil ()); - supplier_admin_ids_.next (); + CORBA::Object_var obj = this->poa_factory_-> + activate_object_with_id (id, + this->SA_POA_.in (), + supplieradmin, + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::SupplierAdmin::_nil ()); - return CosNotifyChannelAdmin::SupplierAdmin::_narrow (obj.in ()); - } + return CosNotifyChannelAdmin::SupplierAdmin::_narrow (obj.in ()); } CosNotifyChannelAdmin::ConsumerAdmin_ptr diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.cpp index 1c4d8d15f7b..dad635d4c7f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.cpp @@ -3,11 +3,12 @@ #include "Notify_Event_Dispatch_Command.h" #include "Notify_Listeners.h" #include "Notify_Event.h" +#include "Notify_Event_Processor.h" ACE_RCSID(Notify, TAO_Notify_Event_Dispatch_Command, "$Id$") -TAO_Notify_Event_Dispatch_Command::TAO_Notify_Event_Dispatch_Command (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener) - :event_ (event), +TAO_Notify_Event_Dispatch_Command::TAO_Notify_Event_Dispatch_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener) + :TAO_Notify_Command (event_processor, event), event_listener_ (event_listener) { event_listener_->_incr_refcnt (); @@ -22,7 +23,7 @@ TAO_Notify_Event_Dispatch_Command::~TAO_Notify_Event_Dispatch_Command () } int -TAO_Notify_Event_Dispatch_Command::execute (TAO_Notify_Worker_Task* /*parent_task*/, CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Event_Dispatch_Command::execute (CORBA::Environment& ACE_TRY_ENV) { this->event_listener_->dispatch_event (*this->event_, ACE_TRY_ENV); ACE_CHECK_RETURN (-1); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.h index 4f6d58e54e3..342f6f3a066 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Dispatch_Command.h @@ -41,15 +41,15 @@ class TAO_Notify_Export TAO_Notify_Event_Dispatch_Command : public TAO_Notify_Co // public: // = Initialization and termination code. - TAO_Notify_Event_Dispatch_Command (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener); + TAO_Notify_Event_Dispatch_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener); + ~TAO_Notify_Event_Dispatch_Command (); - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment& ACE_TRY_ENV); + virtual int execute (CORBA::Environment& ACE_TRY_ENV); // Command callback protected: // = Data Members - TAO_Notify_Event* event_; TAO_Notify_EventListener* event_listener_; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.cpp index b7217bd560a..fc671fa57a6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.cpp @@ -48,7 +48,7 @@ TAO_Notify_Event_Manager::init (CORBA::Environment &ACE_TRY_ENV) this->updates_dispatching_task_ = // @@ add another method to RM - this->emo_factory_->create_dispatching_task (this, ACE_TRY_ENV); + this->emo_factory_->create_dispatching_task (ACE_TRY_ENV); ACE_CHECK; // Init the objects diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.h index 4d06aaea1bb..93603d73068 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.h @@ -29,10 +29,6 @@ #include "Notify_Listeners.h" #include "Notify_Collection.h" -//#include "Notify_Listeners.h" -//#include "orbsvcs/ESF/ESF_Worker.h" -// - class TAO_Notify_EventChannel_i; class TAO_Notify_Event; class TAO_Notify_Event_Processor; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.i b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.i index 1b979a93d17..8de0334aab4 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.i +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager.i @@ -6,7 +6,8 @@ ACE_INLINE void TAO_Notify_Event_Manager::process_event (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment &ACE_TRY_ENV) { - this->event_processor_->process_event (event, event_source, ACE_TRY_ENV); + this->event_processor_->evaluate_source_filter (event, event_source, ACE_TRY_ENV); + // Start by checking if the event passes through the Source's filter. } ACE_INLINE void diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager_Objects_Factory.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager_Objects_Factory.h index bc6c4972ce5..ac927c3c62a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager_Objects_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Manager_Objects_Factory.h @@ -55,10 +55,10 @@ class TAO_Notify_Export TAO_Notify_EMO_Factory : public ACE_Service_Object // Create event processor. // = Create processing tasks. - virtual TAO_Notify_Worker_Task* create_source_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV)= 0; - virtual TAO_Notify_Worker_Task* create_lookup_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV)= 0; - virtual TAO_Notify_Worker_Task* create_listener_eval_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV)= 0; - virtual TAO_Notify_Worker_Task* create_dispatching_task (TAO_Notify_Event_Manager* event_manager, CORBA::Environment &ACE_TRY_ENV)= 0; + virtual TAO_Notify_Worker_Task* create_source_eval_task (CORBA::Environment &ACE_TRY_ENV)= 0; + virtual TAO_Notify_Worker_Task* create_lookup_task (CORBA::Environment &ACE_TRY_ENV)= 0; + virtual TAO_Notify_Worker_Task* create_listener_eval_task (CORBA::Environment &ACE_TRY_ENV)= 0; + virtual TAO_Notify_Worker_Task* create_dispatching_task (CORBA::Environment &ACE_TRY_ENV)= 0; }; #include "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.cpp index 863f329b0d1..dee2aaa6e0e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.cpp @@ -3,29 +3,26 @@ #include "Notify_Event_Processor.h" #include "Notify_Worker_Task.h" #include "Notify_Source_Filter_Eval_Command.h" -#include "Notify_Factory.h" +#include "Notify_Listener_Filter_Eval_Command.h" #include "Notify_Event_Manager_Objects_Factory.h" +#include "Notify_Lookup_Command.h" +#include "Notify_Event_Dispatch_Command.h" +#include "Notify_Factory.h" +#include "Notify_Listeners.h" +#include "Notify_Event_Manager.h" ACE_RCSID(Notify, Notify_Event_Processor, "$Id$") -#define NOTIFY_EVENT_PROCESSING_STAGES 4 - -#define NOTIFY_SOURCE_FILTER_EVAL_STAGE 0 -#define NOTIFY_LOOKUP_STAGE 1 -#define NOTIFY_LISTENER_FILTER_EVAL_STAGE 2 -#define NOTIFY_DISPATCHING_STAGE 3 - TAO_Notify_Event_Processor::TAO_Notify_Event_Processor (TAO_Notify_Event_Manager* event_manager) :event_manager_ (event_manager), - first_task_ (0), - listener_filter_eval_task_ (0), - dispatching_task_ (0), + lookup_task_ (0), emo_factory_ (0) { } TAO_Notify_Event_Processor::~TAO_Notify_Event_Processor () { + delete this->lookup_task_; } void @@ -34,92 +31,57 @@ TAO_Notify_Event_Processor::init (CORBA::Environment& ACE_TRY_ENV) this->emo_factory_ = TAO_Notify_Factory::get_event_manager_objects_factory (); - // = Create the tasks. - TAO_Notify_Worker_Task* tasks[NOTIFY_EVENT_PROCESSING_STAGES]; - - tasks[NOTIFY_SOURCE_FILTER_EVAL_STAGE] = - this->emo_factory_->create_source_eval_task (this->event_manager_, - ACE_TRY_ENV); - ACE_CHECK; - - tasks[NOTIFY_LOOKUP_STAGE] = - this->emo_factory_->create_lookup_task (this->event_manager_, - ACE_TRY_ENV); - ACE_CHECK; - - tasks[NOTIFY_LISTENER_FILTER_EVAL_STAGE] = - this->emo_factory_->create_listener_eval_task (this->event_manager_, - ACE_TRY_ENV); + this->lookup_task_ = this->emo_factory_->create_lookup_task (ACE_TRY_ENV); ACE_CHECK; - tasks[NOTIFY_DISPATCHING_STAGE] = - this->emo_factory_->create_dispatching_task (this->event_manager_, - ACE_TRY_ENV); - ACE_CHECK; + this->lookup_task_->open (0); +} - // = Create the Modules. - TAO_Notify_Module* modules[NOTIFY_EVENT_PROCESSING_STAGES]; - - ACE_NEW_THROW_EX (modules[NOTIFY_SOURCE_FILTER_EVAL_STAGE], - TAO_Notify_Module ("1", - tasks[NOTIFY_SOURCE_FILTER_EVAL_STAGE]), - CORBA::NO_MEMORY ()); - - ACE_NEW_THROW_EX (modules[NOTIFY_LOOKUP_STAGE], - TAO_Notify_Module ("2", - tasks[NOTIFY_LOOKUP_STAGE]), - CORBA::NO_MEMORY ()); - - ACE_NEW_THROW_EX (modules[NOTIFY_LISTENER_FILTER_EVAL_STAGE], - TAO_Notify_Module ("3", - tasks[NOTIFY_LISTENER_FILTER_EVAL_STAGE]), - CORBA::NO_MEMORY ()); - - ACE_NEW_THROW_EX (modules[NOTIFY_DISPATCHING_STAGE], - TAO_Notify_Module ("4", - tasks[NOTIFY_DISPATCHING_STAGE]), - CORBA::NO_MEMORY ()); - - for (int index = NOTIFY_EVENT_PROCESSING_STAGES -1; - index > -1; --index) - // push modules backworks - { - if (this->processing_stream_.push (modules[index]) == -1) - ACE_THROW (CORBA::INTERNAL ()); - } - - // set the first stream - this->first_task_ = tasks[NOTIFY_SOURCE_FILTER_EVAL_STAGE]; - this->listener_filter_eval_task_ = tasks [NOTIFY_LISTENER_FILTER_EVAL_STAGE]; - this->dispatching_task_ = tasks [NOTIFY_DISPATCHING_STAGE]; +void +TAO_Notify_Event_Processor::shutdown (CORBA::Environment & ACE_TRY_ENV) +{ + this->lookup_task_->shutdown (ACE_TRY_ENV); } -TAO_Notify_Worker_Task* -TAO_Notify_Event_Processor::get_listener_filter_eval_task (void) +void +TAO_Notify_Event_Processor::evaluate_source_filter (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment& ACE_TRY_ENV) { - return this->listener_filter_eval_task_; + // TODO: use cache allocator here. + TAO_Notify_Source_Filter_Eval_Command* mb = + new TAO_Notify_Source_Filter_Eval_Command (this, event, event_source); + + event_source->filter_eval_task ()->process_event (mb, ACE_TRY_ENV); } -TAO_Notify_Worker_Task* -TAO_Notify_Event_Processor::get_dispatching_task (void) +void +TAO_Notify_Event_Processor::lookup_subscriptions (TAO_Notify_Event* event, TAO_Notify_EventSource* /*event_source*/, CORBA::Environment &ACE_TRY_ENV) { - return this->dispatching_task_; + TAO_Notify_Lookup_Command* lookup = + new TAO_Notify_Lookup_Command (this, event, this->event_manager_->event_map ()); + + this->lookup_task_->process_event (lookup, ACE_TRY_ENV); } void -TAO_Notify_Event_Processor::shutdown (CORBA::Environment &/*ACE_TRY_ENV*/) +TAO_Notify_Event_Processor::evaluate_listener_filter (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent, CORBA::Environment &ACE_TRY_ENV) { - this->processing_stream_.close (); - // this->first_task_->shutdown (ACE_TRY_ENV); - // This will post a "shutdown" message to all linked tasks. + // @@ Pradeep: you should use ACE_NEW here.... + // @@ Pradeep: do you really need to allocate this guy from the + // heap? Maybe you can just allocate it from the stack and only if + // somebody really wants to keep it around you make a copy? The + // idea is to save allocations in the good case. + + TAO_Notify_Listener_Filter_Eval_Command* mb = + new TAO_Notify_Listener_Filter_Eval_Command (this, event, event_listener, eval_parent); + + event_listener->filter_eval_task ()->process_event (mb, ACE_TRY_ENV); } void -TAO_Notify_Event_Processor::process_event (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Event_Processor::dispatch_event (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Environment &ACE_TRY_ENV) { - // TODO: use cache allocator here. - TAO_Notify_Source_Filter_Eval_Command* mb = - new TAO_Notify_Source_Filter_Eval_Command (event, event_source); + TAO_Notify_Event_Dispatch_Command* dispatch = + new TAO_Notify_Event_Dispatch_Command (this, event, event_listener); - this->first_task_->process_event (mb, ACE_TRY_ENV); + event_listener->event_dispatch_task ()->process_event (dispatch, ACE_TRY_ENV); } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.h index 882f1a997e6..909ac0f8b4e 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Event_Processor.h @@ -47,6 +47,7 @@ class TAO_Notify_Event_Manager; class TAO_Notify_EventSource; class TAO_Notify_Worker_Task; class TAO_Notify_EMO_Factory; +class TAO_Notify_EventListener; class TAO_Notify_Export TAO_Notify_Event_Processor { @@ -67,28 +68,26 @@ class TAO_Notify_Export TAO_Notify_Event_Processor void shutdown (CORBA::Environment &ACE_TRY_ENV); // Shutdown operations. - void process_event (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment& ACE_TRY_ENV); + void evaluate_source_filter(TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment& ACE_TRY_ENV); // Event processing entry point. - // = Accessors - TAO_Notify_Worker_Task* get_listener_filter_eval_task (void); - TAO_Notify_Worker_Task* get_dispatching_task (void); + // = Callbacks for Source/Event Listeners. + void lookup_subscriptions (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source, CORBA::Environment &ACE_TRY_ENV); + // This method is called by an Event_Source after it has successfully evaluated its filter. - protected: - typedef ACE_Module<ACE_MT_SYNCH> TAO_Notify_Module; - typedef ACE_Stream<ACE_MT_SYNCH> TAO_Notify_Stream; + void evaluate_listener_filter (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent, CORBA::Environment &ACE_TRY_ENV); + // This method is called by the subscription lookup command asking that <event> be delivered + // to <event_listener>. + void dispatch_event (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Environment &ACE_TRY_ENV); + // This method is called by an Event_Listener after it has successfully evaluated its filter. + +protected: // = Data Members TAO_Notify_Event_Manager* event_manager_; // The Event Manager - TAO_Notify_Stream processing_stream_; - // The processing stream. - - TAO_Notify_Worker_Task* first_task_; - TAO_Notify_Worker_Task* listener_filter_eval_task_; - TAO_Notify_Worker_Task* dispatching_task_; - // Tasks required for direct access. + TAO_Notify_Worker_Task* lookup_task_; TAO_Notify_EMO_Factory* emo_factory_; // Factory for manager ojects diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_FilterAdmin_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_FilterAdmin_i.cpp index 76f96833df2..b339e9ef888 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_FilterAdmin_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_FilterAdmin_i.cpp @@ -63,10 +63,7 @@ TAO_Notify_FilterAdmin_i::add_filter ( ACE_THROW_RETURN (CORBA::INTERNAL (), 0); else - { - this->filter_ids_.next (); - return new_id; - } + return new_id; } void TAO_Notify_FilterAdmin_i::remove_filter ( diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Filter_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Filter_i.cpp index 8ca6c1f211b..e6f357901c3 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Filter_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Filter_i.cpp @@ -88,10 +88,6 @@ TAO_Notify_Filter_i::add_constraints_i ( for (u_int i = 0; i < index; i++) // those from previous iterations. { - // Put the id back to the pool. - constraint_expr_ids_. - put (constraint_info_seq[index].constraint_id); - // Unbind from the Hash Table if (constraint_expr_list_.unbind (constraint_info_seq[index].constraint_id, @@ -139,8 +135,6 @@ TAO_Notify_Filter_i::add_constraints ( // Get an id. (*infoseq)[pop_index].constraint_id = constraint_expr_ids_.get (); - // <add_constraints_i> will put the ids back to the id pool if things - // go wrong. } this->add_constraints_i (*infoseq, diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.cpp index e978a4ae31c..5730e44a04d 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.cpp @@ -11,8 +11,7 @@ template <class ID_TYPE> TAO_Notify_ID_Pool<ID_TYPE>::TAO_Notify_ID_Pool (void) - :id_ (0), - max_id_ (0) + : max_id_ (0) { // No-Op. } @@ -23,46 +22,10 @@ TAO_Notify_ID_Pool<ID_TYPE>::~TAO_Notify_ID_Pool () // No-Op. } -template <class ID_TYPE> void -TAO_Notify_ID_Pool<ID_TYPE>::put (ID_TYPE id ) -{ - if (this->active_list_.remove (id) == 0) // if removed successfully.. - { - // return to reuse list. - this->reuse_list_.insert (id); - } -} - template <class ID_TYPE> ID_TYPE TAO_Notify_ID_Pool<ID_TYPE>::get (void) { - return this->id_; -} - -template <class ID_TYPE> void -TAO_Notify_ID_Pool<ID_TYPE>::next (void) -{ - this->active_list_.insert (this->id_); - - if (this->reuse_list_.is_empty ()) - { - ++this->max_id_; // stretch the upper limit on the window of ids. - this->id_ = this->max_id_; - } - else - { - // remove any id from reuse list - ID_TYPE* id_next; - - // find the first id. - // (I wish ACE_Unbounded_Set had a <remove_any> method.) - ACE_Unbounded_Set_Iterator<ID_TYPE> iter (this->reuse_list_); - iter.first (); - iter.next (id_next); - - this->id_ = *id_next; - this->reuse_list_.remove (*id_next); - } + return this->max_id_++; } template <class ID_TYPE, class ID_TYPE_SEQ> diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.h index a2f67fc9cef..cc0c56e4a34 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ID_Pool_T.h @@ -8,7 +8,7 @@ // Notify_ID_Pool_T.h // // = DESCRIPTION -// A class to generate ID's and recycle them. +// A class to generate ID's. // // = AUTHOR // Pradeep Gore <pradeep@cs.wustl.edu> @@ -35,10 +35,7 @@ class TAO_Notify_ID_Pool // // = DESCRIPTION // This class is used by factories that need to associate id's with the - // objects that they create.When the objects are destroyed, these id's - // can be reused, hence the need to recycle. - // Using <get> several times in succession gives back the same id. - // Only when <next> is called, the current id is moved to the active_list_ + // objects that they create. // The very first id generated is always 0.The condition is necessary to // support ids for default objects that require an id of 0. // @@ -53,16 +50,7 @@ public: ID_TYPE get (void); // Returns the current id. - void put (ID_TYPE id); - // Put an ID back so that it might be recycled. - - void next (void); - // Generates the next id. - // The current id is moved to the active_list_. - protected: - ID_TYPE id_; - // The next available id. ID_TYPE max_id_; // This is the last id circulating in the list.When there are no more id's @@ -71,21 +59,31 @@ public: // @@ Pradeep: do you really need to do this? I mean, with a long // or so you have 4 billion different IDs, recycling them is the // least of your problems, right? + + // @@ Carlos: absolutely. I did not realize this. + // The recycling logic can be done away with! + // Even if you need to recycle them, do you *really* need to keep // *both* the active and free collections around? Isn't it enough // to know which one is the maximum generated and the list of free // ones? Anything else is active, right? // Or if you know the active ones and the maximum given you know // that anything else is free, right? + + // Carlos: right. with one list amd the max, the elements of the other list could be determined. + // Also: in most cases the IDs will index some sort of map, right? // You could obtain the active list from the set of keys in the map. + + // @@ Carlos: no, the <active_list_> list is the only list of id's. + // well, actually because i convert these ID's to ObjectID's and put them in POA's, + // i could get the list from the POA but there is no iterator in POA to get objectid's, + // + // I guess it is a classical tradeoff between CPU and memory, but I // would tend to conserve memory in this case... ACE_Unbounded_Set <ID_TYPE> active_list_; // List of ids currently in use by clients of this class. - - ACE_Unbounded_Set <ID_TYPE> reuse_list_; - // List of ids returned to this list and eligible to be used again. }; template <class ID_TYPE, class ID_TYPE_SEQ> diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.cpp index f4be3401dbf..e05ab096455 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.cpp @@ -6,11 +6,12 @@ #include "Notify_Worker_Task.h" #include "Notify_Listeners.h" #include "Notify_Event.h" +#include "Notify_Event_Processor.h" ACE_RCSID(Notify, Notify_Listener_Filter_Eval_Command, "$Id$") -TAO_Notify_Listener_Filter_Eval_Command::TAO_Notify_Listener_Filter_Eval_Command (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent) - :event_ (event), +TAO_Notify_Listener_Filter_Eval_Command::TAO_Notify_Listener_Filter_Eval_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent) + :TAO_Notify_Command (event_processor, event), event_listener_ (event_listener), eval_parent_ (eval_parent) { @@ -25,8 +26,7 @@ TAO_Notify_Listener_Filter_Eval_Command::~TAO_Notify_Listener_Filter_Eval_Comman } int -TAO_Notify_Listener_Filter_Eval_Command::execute (TAO_Notify_Worker_Task* parent_task, - CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Listener_Filter_Eval_Command::execute (CORBA::Environment& ACE_TRY_ENV) { CORBA::Boolean result = this->event_listener_->evaluate_filter (*this->event_, this->eval_parent_, ACE_TRY_ENV); @@ -34,15 +34,10 @@ TAO_Notify_Listener_Filter_Eval_Command::execute (TAO_Notify_Worker_Task* parent if (result == 1) { - TAO_Notify_Event_Dispatch_Command* dispatch = - new TAO_Notify_Event_Dispatch_Command (this->event_, this->event_listener_); + this->event_processor_->dispatch_event (this->event_, this->event_listener_, ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); - ACE_ASSERT (parent_task->next () != 0); - - TAO_Notify_Worker_Task* next_task = - ACE_static_cast (TAO_Notify_Worker_Task*, parent_task->next()); - - return next_task->process_event (dispatch, ACE_TRY_ENV); + return 0; } return -1; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.h index 3e27fe99ba5..394339bac7c 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listener_Filter_Eval_Command.h @@ -29,6 +29,7 @@ class TAO_Notify_Event; class TAO_Notify_EventListener; +class TAO_Notify_Event_Processor; class TAO_Notify_Export TAO_Notify_Listener_Filter_Eval_Command : public TAO_Notify_Command { @@ -39,17 +40,16 @@ class TAO_Notify_Export TAO_Notify_Listener_Filter_Eval_Command : public TAO_Not // Listener filter evaluation command. // public: - TAO_Notify_Listener_Filter_Eval_Command (TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent); + TAO_Notify_Listener_Filter_Eval_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventListener* event_listener, CORBA::Boolean eval_parent); // The event, listener, and hint to pass (see the listener interface for details) ~TAO_Notify_Listener_Filter_Eval_Command(); - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment& ACE_TRY_ENV); + virtual int execute (CORBA::Environment& ACE_TRY_ENV); // Command callback protected: // = Data Members - TAO_Notify_Event* event_; TAO_Notify_EventListener* event_listener_; CORBA::Boolean eval_parent_; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listeners.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listeners.h index 17217a171ee..2424c8db0af 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Listeners.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Listeners.h @@ -29,6 +29,7 @@ class TAO_Notify_Event; class TAO_Notify_EventType_List; +class TAO_Notify_Worker_Task; // @@ Pradeep: this file has three separate classes, that do not seem // related in anyway, you should move them to their own files. @@ -79,11 +80,17 @@ public: // Evaluates true if this event is acceptable by the listener. // The <eval_parent> is a hint to the listener to help it determine // if its wise to evaluate the parents filter too. This helps in - // implementing the "interfilter group operator" logic. + // implementing the "interfilter group operator" logic. virtual void shutdown (CORBA::Environment &ACE_TRY_ENV) = 0; // Ask the listener to relinquish any bindings and prepare to be // disposed. + + virtual TAO_Notify_Worker_Task* event_dispatch_task (void) = 0; + // The Worker task associated with the event listener for event dispatching + + virtual TAO_Notify_Worker_Task* filter_eval_task (void) = 0; + // The Worker task associated with the event listener for filter evaluation. }; // **************************************************************** @@ -97,9 +104,14 @@ class TAO_Notify_Export TAO_Notify_EventSource : virtual public TAO_Notify_RefCo // The event source suppliers events to the Notify Manager. // public: + // TODO: add a shutdown method to this interface!! + virtual CORBA::Boolean evaluate_filter (TAO_Notify_Event &event, CORBA::Environment &ACE_TRY_ENV) = 0; // Evaluates true if this event is acceptable by the Source. + + virtual TAO_Notify_Worker_Task* filter_eval_task (void) = 0; + // The Worker task associated with the event listener for filter evaluation. }; // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.cpp index aa422a1d747..275be23fff7 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.cpp @@ -4,14 +4,15 @@ #include "Notify_Lookup_Command.h" #include "Notify_Event_Manager.h" #include "Notify_Listener_Filter_Eval_Command.h" -#include "Notify_Worker_Task.h" #include "Notify_Event.h" #include "Notify_Event_Map.h" +#include "Notify_Event_Processor.h" ACE_RCSID(Notify, Notify_Lookup_Command, "$Id$") -TAO_Notify_Lookup_Command::TAO_Notify_Lookup_Command (TAO_Notify_Event* event) - :event_ (event) +TAO_Notify_Lookup_Command::TAO_Notify_Lookup_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_Event_Map* event_map) + :TAO_Notify_Command (event_processor, event), + event_map_ (event_map) { this->event_->_incr_refcnt (); } @@ -22,14 +23,8 @@ TAO_Notify_Lookup_Command::~TAO_Notify_Lookup_Command () } int -TAO_Notify_Lookup_Command::execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Lookup_Command::execute (CORBA::Environment& ACE_TRY_ENV) { - this->parent_task_ = parent_task; - // There better be a next task and of type TAO_Notify_Worker_Task ! - ACE_ASSERT (parent_task->next () != 0); - this->next_task_ = ACE_static_cast (TAO_Notify_Worker_Task*, - parent_task->next()); - // If the event is *not* the special event // send it to the list that matches it. // In any case send it to the default list. @@ -39,15 +34,12 @@ TAO_Notify_Lookup_Command::execute (TAO_Notify_Worker_Task* parent_task, CORBA:: event->event_type ().event_type_.type_name.in ())); #endif - TAO_Notify_Event_Map* event_map = - this->parent_task_->event_manager ()->event_map (); - if (!this->event_->is_special_event_type ()) { TAO_Notify_EventListener_List* listener_list; // find the subscription list for <event_type> - if (event_map->find (this->event_->event_type (), + if (event_map_->find (this->event_->event_type (), listener_list) == 0) { listener_list->for_each (this, ACE_TRY_ENV); @@ -58,7 +50,7 @@ TAO_Notify_Lookup_Command::execute (TAO_Notify_Worker_Task* parent_task, CORBA:: // if (this->default_subscription_list_->is_empty () == 0) // @@ can't do this test - is_empty is not impl. { - event_map->default_subscription_list ()->for_each (this, ACE_TRY_ENV); + event_map_->default_subscription_list ()->for_each (this, ACE_TRY_ENV); ACE_CHECK_RETURN (-1); } @@ -69,9 +61,6 @@ void TAO_Notify_Lookup_Command::work (TAO_Notify_EventListener* event_listener, CORBA::Environment &ACE_TRY_ENV) { - TAO_Notify_Listener_Filter_Eval_Command* mb = - new TAO_Notify_Listener_Filter_Eval_Command (this->event_, event_listener, 1); - // Notr the last parameter, we want the parent filter to be evaluated. - - this->next_task_->process_event (mb, ACE_TRY_ENV); + this->event_processor_->evaluate_listener_filter (this->event_, event_listener, 1, ACE_TRY_ENV); + // Note the last parameter, we want the parent filter to be evaluated. } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.h index d7637726f01..6c01563fda3 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Lookup_Command.h @@ -30,7 +30,7 @@ class TAO_Notify_Event; class TAO_Notify_EventListener; -class TAO_Notify_Worker_Task; +class TAO_Notify_Event_Map; class TAO_Notify_Export TAO_Notify_Lookup_Command : public TAO_Notify_Command, public TAO_ESF_Worker<TAO_Notify_EventListener> { @@ -42,21 +42,19 @@ class TAO_Notify_Export TAO_Notify_Lookup_Command : public TAO_Notify_Command, p // public: // = Initialization and termination code - TAO_Notify_Lookup_Command (TAO_Notify_Event* event); + TAO_Notify_Lookup_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_Event_Map* event_map); + ~TAO_Notify_Lookup_Command (); - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment&); + virtual int execute (CORBA::Environment&); // Command callback // = TAO_ESF_Worker method void work (TAO_Notify_EventListener* listener, CORBA::Environment &ACE_TRY_ENV); protected: // = Data member - TAO_Notify_Event* event_; - // The evnt we are processing - - TAO_Notify_Worker_Task* parent_task_; - TAO_Notify_Worker_Task* next_task_; + TAO_Notify_Event_Map* event_map_; + // The event map to lookup in. }; #include "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.cpp index 4bb02f0cf68..fae3bc9241a 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.cpp @@ -7,12 +7,15 @@ #include "Notify_SupplierAdmin_i.h" #include "Notify_Factory.h" #include "Notify_Channel_Objects_Factory.h" +#include "Notify_Event_Manager_Objects_Factory.h" +#include "Notify_Worker_Task.h" ACE_RCSID(Notify, Notify_ProxyConsumer_T, "$Id$") template <class SERVANT_TYPE> TAO_Notify_ProxyConsumer<SERVANT_TYPE>::TAO_Notify_ProxyConsumer (TAO_Notify_SupplierAdmin_i* supplier_admin) - : supplier_admin_ (supplier_admin) + : supplier_admin_ (supplier_admin), + filter_eval_task_ (0) { event_manager_ = supplier_admin->get_event_manager (); supplier_admin_->_add_ref (); @@ -27,6 +30,18 @@ TAO_Notify_ProxyConsumer<SERVANT_TYPE>::init (CosNotifyChannelAdmin::ProxyID pro TAO_Notify_Factory::get_channel_objects_factory (); this->lock_ = cof->create_proxy_consumer_lock (ACE_TRY_ENV); + + // Create the task to forward filtering commands to: + + TAO_Notify_EMO_Factory* event_manager_objects_factory = + TAO_Notify_Factory::get_event_manager_objects_factory (); + + this->filter_eval_task_ = + event_manager_objects_factory->create_listener_eval_task (ACE_TRY_ENV); + ACE_CHECK; + + // open the tasks + this->filter_eval_task_->open (0); } // Implementation skeleton destructor @@ -37,10 +52,18 @@ TAO_Notify_ProxyConsumer<SERVANT_TYPE>::~TAO_Notify_ProxyConsumer (void) this->event_manager_->unregister_from_subscription_updates (this, ACE_TRY_ENV); + ACE_CHECK; + delete this->lock_; this->supplier_admin_->proxy_pushconsumer_destroyed (this->proxy_id_); supplier_admin_->_remove_ref (); + + // @@: Move this to on_disconnected + this->filter_eval_task_->shutdown (ACE_TRY_ENV); + ACE_CHECK; + + delete this->filter_eval_task_; } template <class SERVANT_TYPE> CORBA::Boolean @@ -66,6 +89,12 @@ TAO_Notify_ProxyConsumer<SERVANT_TYPE>::evaluate_filter (TAO_Notify_Event &event ); } +template <class SERVANT_TYPE> TAO_Notify_Worker_Task* +TAO_Notify_ProxyConsumer<SERVANT_TYPE>::filter_eval_task (void) +{ + return this->filter_eval_task_; +} + template <class SERVANT_TYPE> void TAO_Notify_ProxyConsumer<SERVANT_TYPE>::on_connected (CORBA::Environment &ACE_TRY_ENV) { diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.h index d7cb4d8efbf..1f5242be110 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxyConsumer_T.h @@ -59,6 +59,9 @@ public: virtual CORBA::Boolean evaluate_filter (TAO_Notify_Event &event, CORBA::Environment &ACE_TRY_ENV); // Evaluates true if this event is acceptable by the Source. + TAO_Notify_Worker_Task* filter_eval_task (void); + // The Worker task associated with the event listener for filter evaluation. + virtual CosNotifyChannelAdmin::SupplierAdmin_ptr MyAdmin (CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC (( CORBA::SystemException @@ -82,14 +85,17 @@ virtual void offer_change ( CosNotifyComm::InvalidEventType )); - protected: -// = Helper methods - void on_connected (CORBA::Environment &ACE_TRY_ENV); - // Derived classes should call this when their suppliers connect. +protected: + // = Helper methods + void on_connected (CORBA::Environment &ACE_TRY_ENV); + // Derived classes should call this when their suppliers connect. + + // = Data members + TAO_Notify_SupplierAdmin_i* supplier_admin_; + // My parent supplier admin. - // = Data members - TAO_Notify_SupplierAdmin_i* supplier_admin_; - // My parent supplier admin. + TAO_Notify_Worker_Task* filter_eval_task_; + // The filter evaluation task for this listener. }; #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp index 9a73c5e1729..022df9b5b72 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.cpp @@ -8,13 +8,17 @@ #include "Notify_ConsumerAdmin_i.h" #include "Notify_Factory.h" #include "Notify_Channel_Objects_Factory.h" +#include "Notify_Event_Manager_Objects_Factory.h" +#include "Notify_Worker_Task.h" ACE_RCSID(Notify, Notify_ProxySupplier_T, "$Id$") template <class SERVANT_TYPE> TAO_Notify_ProxySupplier<SERVANT_TYPE>::TAO_Notify_ProxySupplier (TAO_Notify_ConsumerAdmin_i* consumer_admin) :consumer_admin_ (consumer_admin), - is_suspended_ (0) + is_suspended_ (0), + dispatching_task_ (0), + filter_eval_task_ (0) { event_manager_ = consumer_admin->get_event_manager (); } @@ -30,6 +34,22 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::init (CosNotifyChannelAdmin::ProxyID pro TAO_Notify_Factory::get_channel_objects_factory (); this->lock_ = cof->create_proxy_supplier_lock (ACE_TRY_ENV); + + TAO_Notify_EMO_Factory* event_manager_objects_factory = + TAO_Notify_Factory::get_event_manager_objects_factory (); + + // Create the task to forward filtering/dispatching commands to: + this->dispatching_task_ = + event_manager_objects_factory->create_dispatching_task (ACE_TRY_ENV); + ACE_CHECK; + + this->filter_eval_task_ = + event_manager_objects_factory->create_listener_eval_task (ACE_TRY_ENV); + ACE_CHECK; + + // open the tasks + this->dispatching_task_->open (0); + this->filter_eval_task_->open (0); } // Implementation skeleton destructor @@ -54,6 +74,9 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::~TAO_Notify_ProxySupplier (void) this->consumer_admin_->proxy_pushsupplier_destroyed (this->proxy_id_); consumer_admin_->_remove_ref (ACE_TRY_ENV); + + delete this->dispatching_task_; + delete this->filter_eval_task_; } template <class SERVANT_TYPE> CORBA::Boolean @@ -105,6 +128,18 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::dispatch_event (TAO_Notify_Event &event, this->dispatch_event_i (event, ACE_TRY_ENV); } +template <class SERVANT_TYPE> TAO_Notify_Worker_Task* +TAO_Notify_ProxySupplier<SERVANT_TYPE>::event_dispatch_task (void) +{ + return this->dispatching_task_; +} + +template <class SERVANT_TYPE> TAO_Notify_Worker_Task* +TAO_Notify_ProxySupplier<SERVANT_TYPE>::filter_eval_task (void) +{ + return this->filter_eval_task_; +} + template <class SERVANT_TYPE> void TAO_Notify_ProxySupplier<SERVANT_TYPE>::subscription_change (const CosNotification::EventTypeSeq & added, const CosNotification::EventTypeSeq & removed, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC (( @@ -157,14 +192,16 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_connected (CORBA::Environment &ACE_TR template <class SERVANT_TYPE> void TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_disconnected (CORBA::Environment &ACE_TRY_ENV) { - ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, - CORBA::INTERNAL ()); - ACE_CHECK; + { + ACE_GUARD_THROW_EX (ACE_Lock, ace_mon, *this->lock_, + CORBA::INTERNAL ()); + ACE_CHECK; - if (this->is_connected_ == 0) - return; + if (this->is_connected_ == 0) + return; - this->is_connected_ = 0; + this->is_connected_ = 0; + } CosNotification::EventTypeSeq removed; @@ -182,6 +219,14 @@ TAO_Notify_ProxySupplier<SERVANT_TYPE>::on_disconnected (CORBA::Environment &ACE this->event_manager_->unregister_from_publication_updates (this, ACE_TRY_ENV); ACE_CHECK; + + // shutdown the tasks. + + this->dispatching_task_->shutdown (ACE_TRY_ENV); + ACE_CHECK; + + this->filter_eval_task_->shutdown (ACE_TRY_ENV); + ACE_CHECK; } template <class SERVANT_TYPE> void diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.h index 240f89ca5e3..a44601f32d6 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_ProxySupplier_T.h @@ -52,8 +52,15 @@ public: // = Notify_Event_Listener methods virtual void dispatch_event (TAO_Notify_Event &event, CORBA::Environment &ACE_TRY_ENV); + virtual CORBA::Boolean evaluate_filter (TAO_Notify_Event &event, CORBA::Boolean eval_parent, CORBA::Environment &ACE_TRY_ENV); + virtual TAO_Notify_Worker_Task* event_dispatch_task (void); + // The Worker task associated with the event listener for event dispatching + + virtual TAO_Notify_Worker_Task* filter_eval_task (void); + // The Worker task associated with the event listener for filter evaluation. + // = Interface methods virtual CosNotifyChannelAdmin::ConsumerAdmin_ptr MyAdmin ( CORBA::Environment &ACE_TRY_ENV @@ -152,6 +159,12 @@ public: typedef ACE_Unbounded_Queue<TAO_Notify_Event*> TAO_Notify_Event_List; TAO_Notify_Event_List event_list_; // A list of events populated when we're suspended. + + TAO_Notify_Worker_Task* dispatching_task_; + // The dispatching task to send events to a listener group affiliated with this listener. + + TAO_Notify_Worker_Task* filter_eval_task_; + // The filter evaluation task for this listener. }; #if defined (ACE_TEMPLATES_REQUIRE_SOURCE) diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.cpp index 6267d640b62..01d329da0ba 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.cpp @@ -6,6 +6,17 @@ ACE_RCSID(Notify, Notify_QoSAdmin_i, "$Id$") // Implementation skeleton constructor TAO_Notify_QoSAdmin_i::TAO_Notify_QoSAdmin_i (void) + :event_reliability_ (CosNotification::BestEffort), + connection_reliability_ (CosNotification::BestEffort), + priority_ (CosNotification::DefaultPriority), + timeout_ (0), + start_time_supported_ (0), + stop_time_supported_ (0), + max_events_per_consumer_ (0), + order_policy_ (CosNotification::AnyOrder), + discard_policy_ (CosNotification::AnyOrder), + maximum_batch_size_ (1), + pacing_interval_ (0) { } @@ -26,16 +37,75 @@ CosNotification::QoSProperties * TAO_Notify_QoSAdmin_i::get_qos ( } void -TAO_Notify_QoSAdmin_i::set_qos ( - const CosNotification::QoSProperties & /*qos*/, - CORBA::Environment & //ACE_TRY_ENV +TAO_Notify_QoSAdmin_i::set_qos (const CosNotification::QoSProperties & qos, + CORBA::Environment & ACE_TRY_ENV ) ACE_THROW_SPEC (( CORBA::SystemException, CosNotification::UnsupportedQoS )) { - //Add your implementation here + for (CORBA::ULong index = 0; index < qos.length (); ++index) + { + ACE_CString property_name(qos[index].name); + + if (property_name.compare (CosNotification::EventReliability) == 0) + { + CosNotification::PropertyErrorSeq err_seq(1); + err_seq.length (1); + + err_seq[0].code = CosNotification::UNSUPPORTED_PROPERTY; + err_seq[0].name = CORBA::string_dup (CosNotification::EventReliability); + + ACE_THROW (CosNotification::UnsupportedQoS (err_seq)); + } + else if (property_name.compare (CosNotification::ConnectionReliability) == 0) + { + CosNotification::PropertyErrorSeq err_seq(1); + err_seq.length (1); + + err_seq[0].code = CosNotification::UNSUPPORTED_PROPERTY; + err_seq[0].name = CORBA::string_dup (CosNotification::ConnectionReliability); + + ACE_THROW (CosNotification::UnsupportedQoS (err_seq)); + } + else if (property_name.compare (CosNotification::Priority) == 0) + { + qos[index].value >>= this->priority_; + } + else if (property_name.compare (CosNotification::Timeout)) + { + qos[index].value >>= this->timeout_; + } + else if (property_name.compare (CosNotification::StartTimeSupported)) + { + qos[index].value >>= CORBA::Any::to_boolean (this->start_time_supported_); + } + else if (property_name.compare (CosNotification::StopTimeSupported)) + { + qos[index].value >>= CORBA::Any::to_boolean (this->stop_time_supported_); + } + else if (property_name.compare (CosNotification::MaxEventsPerConsumer)) + { + qos[index].value >>= this->max_events_per_consumer_; + } + else if (property_name.compare (CosNotification::OrderPolicy)) + { + qos[index].value >>= this->order_policy_; + } + else if (property_name.compare (CosNotification::DiscardPolicy)) + { + qos[index].value >>= this->discard_policy_; + } + else if (property_name.compare (CosNotification::MaximumBatchSize)) + { + qos[index].value >>= this->maximum_batch_size_; + } + else if (property_name.compare (CosNotification::DiscardPolicy)) + { + qos[index].value >>= this->pacing_interval_; + } + } return; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.h index 1e3f92f5ef2..4a615cee2ae 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_QoSAdmin_i.h @@ -86,7 +86,7 @@ protected: */ - TimeBase::UtcT timeout_; + TimeBase::TimeT timeout_; // Expiry time CORBA::Boolean start_time_supported_; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.cpp index 07d4194c5ae..ee701550b85 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.cpp @@ -5,11 +5,12 @@ #include "Notify_Lookup_Command.h" #include "Notify_Worker_Task.h" #include "Notify_Event.h" +#include "Notify_Event_Processor.h" ACE_RCSID(Notify, Notify_Source_Filter_Eval_Command, "$Id$") -TAO_Notify_Source_Filter_Eval_Command::TAO_Notify_Source_Filter_Eval_Command (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source) - :event_ (event), +TAO_Notify_Source_Filter_Eval_Command::TAO_Notify_Source_Filter_Eval_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventSource* event_source) + :TAO_Notify_Command (event_processor, event), event_source_ (event_source) { this->event_source_->_incr_refcnt (); @@ -23,8 +24,7 @@ TAO_Notify_Source_Filter_Eval_Command::~TAO_Notify_Source_Filter_Eval_Command () } int -TAO_Notify_Source_Filter_Eval_Command::execute (TAO_Notify_Worker_Task* parent_task, - CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Source_Filter_Eval_Command::execute (CORBA::Environment& ACE_TRY_ENV) { CORBA::Boolean result = this->event_source_->evaluate_filter (*this->event_, ACE_TRY_ENV); @@ -32,16 +32,12 @@ TAO_Notify_Source_Filter_Eval_Command::execute (TAO_Notify_Worker_Task* parent_t if (result == 1) { - TAO_Notify_Lookup_Command* lookup = - new TAO_Notify_Lookup_Command (this->event_); + this->event_processor_-> + lookup_subscriptions (this->event_, this->event_source_, ACE_TRY_ENV); + ACE_CHECK_RETURN (-1); - ACE_ASSERT (parent_task->next () != 0); - - TAO_Notify_Worker_Task* next_task = - ACE_static_cast (TAO_Notify_Worker_Task*, parent_task->next()); - - return next_task->process_event (lookup, ACE_TRY_ENV); + return 0; } - - return -1; + else + return -1; } diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.h index 78b37977a09..4ec6829e654 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Source_Filter_Eval_Command.h @@ -31,7 +31,6 @@ class TAO_Notify_Event; class TAO_Notify_EventSource; -class TAO_Notify_Worker_Task; class TAO_Notify_Export TAO_Notify_Source_Filter_Eval_Command : public TAO_Notify_Command { @@ -43,15 +42,17 @@ class TAO_Notify_Export TAO_Notify_Source_Filter_Eval_Command : public TAO_Notif // public: // = Initialization and termination code - TAO_Notify_Source_Filter_Eval_Command (TAO_Notify_Event* event, TAO_Notify_EventSource* event_source); + TAO_Notify_Source_Filter_Eval_Command (TAO_Notify_Event_Processor* event_processor, TAO_Notify_Event* event, TAO_Notify_EventSource* event_source); + // Constructor. + ~TAO_Notify_Source_Filter_Eval_Command (); + // Destructor. - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment& ACE_TRY_ENV); + virtual int execute (CORBA::Environment& ACE_TRY_ENV); // Command callback protected: // = Data Members - TAO_Notify_Event* event_; TAO_Notify_EventSource* event_source_; }; diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_SupplierAdmin_i.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_SupplierAdmin_i.cpp index bbe309849cb..696285c0801 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_SupplierAdmin_i.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_SupplierAdmin_i.cpp @@ -39,11 +39,8 @@ TAO_Notify_SupplierAdmin_i::~TAO_Notify_SupplierAdmin_i () } void -TAO_Notify_SupplierAdmin_i::proxy_pushconsumer_destroyed (CosNotifyChannelAdmin::ProxyID proxyID) +TAO_Notify_SupplierAdmin_i::proxy_pushconsumer_destroyed (CosNotifyChannelAdmin::ProxyID /*proxyID*/) { - ACE_GUARD (ACE_Lock, ace_mon, *this->lock_); - - this->proxy_pushconsumer_ids_.put (proxyID); } void @@ -240,38 +237,37 @@ TAO_Notify_SupplierAdmin_i::obtain_notification_push_consumer (CosNotifyChannelA ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); proxy_id = this->proxy_pushconsumer_ids_.get (); + } + + switch (ctype) + { + case CosNotifyChannelAdmin::ANY_EVENT: + { + obj = this->obtain_proxy_pushconsumer_i (proxy_id, ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); + } + break; - switch (ctype) + case CosNotifyChannelAdmin::STRUCTURED_EVENT: { - case CosNotifyChannelAdmin::ANY_EVENT: - { - obj = this->obtain_proxy_pushconsumer_i (proxy_id, ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); - } - break; - case CosNotifyChannelAdmin::STRUCTURED_EVENT: - { - obj = this->obtain_struct_proxy_pushconsumer_i (proxy_id, + obj = this->obtain_struct_proxy_pushconsumer_i (proxy_id, + ACE_TRY_ENV); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); + } + break; + + case CosNotifyChannelAdmin::SEQUENCE_EVENT: + { + obj = this->obtain_sequence_proxy_pushconsumer_i (proxy_id, ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); - } - break; - - case CosNotifyChannelAdmin::SEQUENCE_EVENT: - { - obj = this->obtain_sequence_proxy_pushconsumer_i (proxy_id, - ACE_TRY_ENV); - ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); - } - break; - - default: - ACE_THROW_RETURN (CORBA::BAD_PARAM (), - CosNotifyChannelAdmin::ProxyConsumer::_nil ()); + ACE_CHECK_RETURN (CosNotifyChannelAdmin::ProxyConsumer::_nil ()); } + break; - this->proxy_pushconsumer_ids_.next (); - } + default: + ACE_THROW_RETURN (CORBA::BAD_PARAM (), + CosNotifyChannelAdmin::ProxyConsumer::_nil ()); + } return CosNotifyChannelAdmin::ProxyConsumer::_narrow (obj.in (), ACE_TRY_ENV); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.cpp index 8f27dba7914..0a94673baa2 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.cpp @@ -8,7 +8,8 @@ ACE_RCSID(Notify, Notify_Update_Dispatch_Command, "$Id$") TAO_Notify_Update_Dispatch_Command::TAO_Notify_Update_Dispatch_Command (TAO_Notify_UpdateListener* update_listener, TAO_Notify_EventType_List& added, TAO_Notify_EventType_List& removed) - :update_listener_ (update_listener), + :TAO_Notify_Command (0,0), + update_listener_ (update_listener), added_ (added), removed_ (removed) { @@ -21,7 +22,7 @@ TAO_Notify_Update_Dispatch_Command::~TAO_Notify_Update_Dispatch_Command () } int -TAO_Notify_Update_Dispatch_Command::execute (TAO_Notify_Worker_Task* /*parent_task*/, CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Update_Dispatch_Command::execute (CORBA::Environment& ACE_TRY_ENV) { this->update_listener_->dispatch_update (this->added_, this->removed_, ACE_TRY_ENV); diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.h index 6ae6a7216fd..c8ab14ad24f 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Update_Dispatch_Command.h @@ -43,9 +43,10 @@ class TAO_Notify_Export TAO_Notify_Update_Dispatch_Command : public TAO_Notify_C public: // = Initialization and termination code TAO_Notify_Update_Dispatch_Command (TAO_Notify_UpdateListener* listener, TAO_Notify_EventType_List& added, TAO_Notify_EventType_List& removed); + ~TAO_Notify_Update_Dispatch_Command (); - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment& ACE_TRY_ENV); + virtual int execute (CORBA::Environment& ACE_TRY_ENV); // Command callback protected: diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.cpp b/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.cpp index b9ee44296a1..c953b206aba 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.cpp @@ -5,18 +5,11 @@ ACE_RCSID(Notify, Notify_Worker_Task, "$Id$") -TAO_Notify_Worker_Task::TAO_Notify_Worker_Task (TAO_Notify_Event_Manager* event_manager, CORBA::Boolean activate_object) - :event_manager_ (event_manager), - activate_object_ (activate_object) +TAO_Notify_Worker_Task::TAO_Notify_Worker_Task (CORBA::Boolean activate_object) + :activate_object_ (activate_object) { } -TAO_Notify_Event_Manager* -TAO_Notify_Worker_Task::event_manager (void) -{ - return this->event_manager_; -} - int TAO_Notify_Worker_Task::open (void* /*args*/) { @@ -61,7 +54,7 @@ TAO_Notify_Worker_Task::process_event (TAO_Notify_Command *mb, CORBA::Environmen // @@ Create Reactive_Worker and MT_Worker if (this->activate_object_ == 0) { - int result = mb->execute (this, ACE_TRY_ENV); + int result = mb->execute (ACE_TRY_ENV); ACE_Message_Block::release (mb); return result; } @@ -95,7 +88,7 @@ TAO_Notify_Worker_Task::svc (void) continue; } - int result = command->execute (this, ACE_TRY_ENV); + int result = command->execute (ACE_TRY_ENV); ACE_TRY_CHECK; ACE_Message_Block::release (mb); @@ -115,22 +108,14 @@ TAO_Notify_Worker_Task::svc (void) /**************************************************************************/ -int -TAO_Notify_Shutdown_Command::execute (TAO_Notify_Worker_Task* parent_task, - CORBA::Environment& ACE_TRY_ENV) +TAO_Notify_Shutdown_Command::TAO_Notify_Shutdown_Command (void) + :TAO_Notify_Command (0,0) { - if (parent_task->next()) // if there are other tasks, tell them!. - { - TAO_Notify_Worker_Task* next_task = - ACE_static_cast (TAO_Notify_Worker_Task*, parent_task->next()); - - TAO_Notify_Shutdown_Command * mb = new TAO_Notify_Shutdown_Command (); - // ACE_Message_Block::duplicate (this); // increment our ref count. - // @@ investigate crash due to duplicate. - - next_task->process_event (mb, ACE_TRY_ENV); - } +} +int +TAO_Notify_Shutdown_Command::execute (CORBA::Environment& /*ACE_TRY_ENV*/) +{ return -1; } /**************************************************************************/ diff --git a/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.h b/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.h index 5933500078c..deb46a215dc 100644 --- a/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.h +++ b/TAO/orbsvcs/orbsvcs/Notify/Notify_Worker_Task.h @@ -18,75 +18,72 @@ // // ============================================================================ -#ifndef TAO_NOTIFY_WORKER_TASK_H -#define TAO_NOTIFY_WORKER_TASK_H +#ifndef TAO_NOTIFY_WORKER_TASK_H +#define TAO_NOTIFY_WORKER_TASK_H #include "ace/pre.h" #include "ace/Task.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once -#endif /* ACE_LACKS_PRAGMA_ONCE */ +#endif /* ACE_LACKS_PRAGMA_ONCE */ #include "tao/corba.h" #include "Notify_Command.h" #include "notify_export.h" -class TAO_Notify_Event_Manager; +class TAO_Notify_Event_Processor; -class TAO_Notify_Export TAO_Notify_Worker_Task : public ACE_Task<ACE_SYNCH> +class TAO_Notify_Export TAO_Notify_Worker_Task : public ACE_Task<ACE_SYNCH> { // = TITLE // TAO_Notify_Worker_Task // // = DESCRIPTION - // A worker task that we use for various event processing jobs. - // Also see TAO_Notify_Command - This task executes Notify_Command objects. + // A worker task that we use for various event processing jobs. + // Also see TAO_Notify_Command - This task executes Notify_Command objects. // public: // = Initialization and termination code - TAO_Notify_Worker_Task (TAO_Notify_Event_Manager* event_manager, CORBA::Boolean activate_object); + TAO_Notify_Worker_Task (CORBA::Boolean activate_object); // Constructor. virtual int open (void *args); // Activate the task if <active_object_> is set. virtual int svc (void); - // svc command objects stored in the message queue. + // svc command processes objects stored in the message queue. - void shutdown (CORBA::Environment& ACE_TRY_ENV); + void shutdown (CORBA::Environment& ACE_TRY_ENV); // shutdown this task. - virtual int process_event (TAO_Notify_Command *mb, CORBA::Environment& ACE_TRY_ENV, ACE_Time_Value *tv = 0); - // Allows the producer to pass messages to the <Message_Block>. - - TAO_Notify_Event_Manager* event_manager (void); - // access manager. + virtual int process_event (TAO_Notify_Command *mb, CORBA::Environment& ACE_TRY_ENV, ACE_Time_Value *tv = 0); + // Allows the producer to pass messages to the <Message_Block>. protected: virtual int close (u_long); // Close hook. - TAO_Notify_Event_Manager* event_manager_; - // The Event Manager using us. - CORBA::Boolean activate_object_; - // Is this an active object. + // Is this an active object. }; //**************************************************************************************** -class TAO_Notify_Export TAO_Notify_Shutdown_Command : public TAO_Notify_Command +class TAO_Notify_Export TAO_Notify_Shutdown_Command : public TAO_Notify_Command { // = TITLE // TAO_Notify_Shutdown_Command // // = DESCRIPTION - // Shutdown command to shutdown the task. + // Shutdown command to shutdown the task. // public: - virtual int execute (TAO_Notify_Worker_Task* parent_task, CORBA::Environment&); - // Returns -1. This signals worker threads to finish servicing requests. + + TAO_Notify_Shutdown_Command (void); + + virtual int execute (CORBA::Environment& ACE_TRY_ENV); + // Returns -1. This signals worker threads to finish servicing requests. }; //**************************************************************************************** |