diff options
Diffstat (limited to 'TAO/orbsvcs')
23 files changed, 1500 insertions, 11 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp index 74adea8576b..af95e004b51 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.cpp @@ -32,6 +32,7 @@ #include "ace/Arg_Shifter.h" #include "ace/Sched_Params.h" #include "ace/OS_NS_strings.h" +#include "ace/Dynamic_Service.h" #if ! defined (__ACE_INLINE__) #include "EC_Default_Factory.i" @@ -46,6 +47,7 @@ TAO_EC_Default_Factory::~TAO_EC_Default_Factory (void) int TAO_EC_Default_Factory::init_svcs (void) { + TAO_EC_Simple_Queue_Full_Action::init_svcs(); return ACE_Service_Config::static_svcs ()-> insert (&ace_svc_desc_TAO_EC_Default_Factory); } @@ -439,6 +441,17 @@ TAO_EC_Default_Factory::init (int argc, ACE_TCHAR* argv[]) arg_shifter.consume_arg (); } + else if (ACE_OS::strcasecmp (arg, ACE_LIB_TEXT("-ECQueueFullServiceObject")) == 0) + { + arg_shifter.consume_arg (); + if (arg_shifter.is_parameter_next ()) + { + const char* opt = arg_shifter.get_current (); + this->queue_full_service_object_name_.set(opt); + arg_shifter.consume_arg (); + } + } + else if (ACE_OS::strcasecmp (arg, ACE_LIB_TEXT("-ECConsumerAdminLock")) == 0) { ACE_ERROR ((LM_ERROR, @@ -483,16 +496,51 @@ TAO_EC_Default_Factory::fini (void) // **************************************************************** +TAO_EC_Queue_Full_Service_Object* +TAO_EC_Default_Factory::find_service_object (const char* wanted, + const char* fallback) +{ + TAO_EC_Queue_Full_Service_Object* so = 0; + so = ACE_Dynamic_Service<TAO_EC_Queue_Full_Service_Object>::instance (wanted); + if (so != 0) + return so; + + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) EC_Default_Factory::create_dispatching " + "unable to find queue full service object '%s'; " + "using '%s' instead\n", + wanted, + fallback)); + + so = ACE_Dynamic_Service<TAO_EC_Queue_Full_Service_Object>::instance (fallback); + if (so != 0) + return so; + + ACE_ERROR ((LM_ERROR, + "EC (%P|%t) EC_Default_Factory::create_dispatching " + "unable find default queue full service object '%s'; " + "aborting.\n", + fallback)); + ACE_OS::abort (); + return 0; // superfluous return to de-warn; we should never reach here +} + TAO_EC_Dispatching* TAO_EC_Default_Factory::create_dispatching (TAO_EC_Event_Channel_Base *) { if (this->dispatching_ == 0) return new TAO_EC_Reactive_Dispatching (); else if (this->dispatching_ == 1) - return new TAO_EC_MT_Dispatching (this->dispatching_threads_, - this->dispatching_threads_flags_, - this->dispatching_threads_priority_, - this->dispatching_threads_force_active_); + { + TAO_EC_Queue_Full_Service_Object* so = + this->find_service_object (this->queue_full_service_object_name_.fast_rep(), + TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME); + return new TAO_EC_MT_Dispatching (this->dispatching_threads_, + this->dispatching_threads_flags_, + this->dispatching_threads_priority_, + this->dispatching_threads_force_active_, + so); + } return 0; } @@ -950,3 +998,514 @@ ACE_STATIC_SVC_DEFINE (TAO_EC_Default_Factory, 0) ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_EC_Default_Factory) +// **************************************************************** + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Dynamic_Service<TAO_EC_Queue_Full_Service_Object>; + +template class ACE_Node<ACE_Command_Base*>; +template class ACE_Unbounded_Queue<ACE_Command_Base*>; +template class ACE_Unbounded_Queue_Iterator<ACE_Command_Base*>; +template class ACE_Unbounded_Set<ACE_Static_Svc_Descriptor*>; +template class ACE_Unbounded_Set_Iterator<ACE_Static_Svc_Descriptor*>; +template class ACE_Node<ACE_Static_Svc_Descriptor*>; + +template class TAO_ESF_Proxy_Collection<TAO_EC_ProxyPushConsumer>; +template class TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>; +template class TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>; +template class ACE_Unbounded_Set<TAO_EC_ProxyPushConsumer *>; +template class ACE_Node<TAO_EC_ProxyPushConsumer *>; +template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushConsumer *>; +template class ACE_RB_Tree<TAO_EC_ProxyPushConsumer *, int, ACE_Less_Than<TAO_EC_ProxyPushConsumer *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator<TAO_EC_ProxyPushConsumer *, int, ACE_Less_Than<TAO_EC_ProxyPushConsumer *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Reverse_Iterator<TAO_EC_ProxyPushConsumer *, int, ACE_Less_Than<TAO_EC_ProxyPushConsumer *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator_Base<TAO_EC_ProxyPushConsumer *, int, ACE_Less_Than<TAO_EC_ProxyPushConsumer *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Node<TAO_EC_ProxyPushConsumer *, int>; +template class ACE_Less_Than<TAO_EC_ProxyPushConsumer *>; +template class TAO_ESF_Proxy_RB_Tree_Iterator<TAO_EC_ProxyPushConsumer>; + +template class TAO_ESF_Copy_On_Write_Collection< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator >; +template class TAO_ESF_Copy_On_Write_Collection< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator >; +template class TAO_ESF_Copy_On_Write_Collection< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator >; +template class TAO_ESF_Copy_On_Write_Collection< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator >; + +template class TAO_ESF_Proxy_Collection<TAO_EC_ProxyPushSupplier>; +template class TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>; +template class TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>; +template class ACE_Unbounded_Set<TAO_EC_ProxyPushSupplier *>; +template class ACE_Node<TAO_EC_ProxyPushSupplier *>; +template class ACE_Unbounded_Set_Iterator<TAO_EC_ProxyPushSupplier *>; +template class ACE_RB_Tree<TAO_EC_ProxyPushSupplier *, int, ACE_Less_Than<TAO_EC_ProxyPushSupplier *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator<TAO_EC_ProxyPushSupplier *, int, ACE_Less_Than<TAO_EC_ProxyPushSupplier *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Iterator_Base<TAO_EC_ProxyPushSupplier *, int, ACE_Less_Than<TAO_EC_ProxyPushSupplier *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Reverse_Iterator<TAO_EC_ProxyPushSupplier *, int, ACE_Less_Than<TAO_EC_ProxyPushSupplier *>, ACE_Null_Mutex>; +template class ACE_RB_Tree_Node<TAO_EC_ProxyPushSupplier *, int>; +template class ACE_Less_Than<TAO_EC_ProxyPushSupplier *>; +template class TAO_ESF_Proxy_RB_Tree_Iterator<TAO_EC_ProxyPushSupplier>; + +#if defined (ACE_HAS_THREADS) +// +// To avoid duplicate instantiations of templates we must put the MT +// versions on this #ifdef, otherwise the ACE_SYNCH* macros expand to +// the ACE_NULL* versions, duplicating the non-MT versions below. +// We *cannot* use explicit ACE_Synch classes because that will not +// compile in platforms without threads. +// +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH> >; +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH> >; + +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH> >; +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH> >; + +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + TAO_SYNCH_MUTEX>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_SYNCH>; + +#endif /* ACE_HAS_THREADS */ + +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH> >; +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushConsumer >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushConsumer, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH> >; + +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH> >; +template class TAO_ESF_Immediate_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Read<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH> >; +template class ACE_Guard< TAO_ESF_Busy_Lock_Adapter< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH> > >; +template class TAO_ESF_Connected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Reconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Disconnected_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>,TAO_EC_ProxyPushSupplier >; +template class TAO_ESF_Shutdown_Command< + TAO_ESF_Delayed_Changes<TAO_EC_ProxyPushSupplier, + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH> >; + +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write_Read_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_Null_Mutex>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_List<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_List_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushConsumer>, + TAO_EC_Consumer_RB_Tree_Iterator, + ACE_NULL_SYNCH>; +template class TAO_ESF_Copy_On_Write_Write_Guard< + TAO_ESF_Proxy_RB_Tree<TAO_EC_ProxyPushSupplier>, + TAO_EC_Supplier_RB_Tree_Iterator, + ACE_NULL_SYNCH>; + +#elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Dynamic_Service<TAO_EC_Queue_Full_Service_Object> +// @@ TODO!!! + +#if defined (ACE_HAS_THREADS) +#endif /* ACE_HAS_THREADS */ + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h index a5f3e0f5113..406739f25b8 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.h @@ -29,6 +29,8 @@ #include "ace/SString.h" #include "ace/Time_Value.h" +class TAO_EC_Queue_Full_Service_Object; + /** * @class TAO_EC_Default_Factory * @@ -158,6 +160,9 @@ protected: int dispatching_threads_flags_; int dispatching_threads_priority_; int dispatching_threads_force_active_; + ACE_CString queue_full_service_object_name_; + TAO_EC_Queue_Full_Service_Object* find_service_object (const char* wanted, + const char* fallback); /// Use this ORB to locate global resources. ACE_CString orbid_; diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i index 08dbae36c08..3ff451b166a 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Default_Factory.i @@ -16,6 +16,7 @@ TAO_EC_Default_Factory::TAO_EC_Default_Factory (void) dispatching_threads_flags_ (TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS), dispatching_threads_priority_ (TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY), dispatching_threads_force_active_ (TAO_EC_DEFAULT_DISPATCHING_THREADS_FORCE_ACTIVE), + queue_full_service_object_name_ (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME), orbid_ (TAO_EC_DEFAULT_ORB_ID), consumer_control_ (TAO_EC_DEFAULT_CONSUMER_CONTROL), supplier_control_ (TAO_EC_DEFAULT_SUPPLIER_CONTROL), diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Defaults.h b/TAO/orbsvcs/orbsvcs/Event/EC_Defaults.h index 63c18e87104..bcabf232a45 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Defaults.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Defaults.h @@ -123,6 +123,10 @@ # define TAO_EC_DEFAULT_SUPPLIER_CONTROL_PERIOD 5000000 /* usecs */ #endif /* TAO_EC_DEFAULT_SUPPLIER_CONTROL_PERIOD */ +#ifndef TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME +# define TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME "EC_QueueFullSimpleActions" +#endif /* TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME */ + #ifndef TAO_EC_DEFAULT_CONSUMER_CONTROL_TIMEOUT # define TAO_EC_DEFAULT_CONSUMER_CONTROL_TIMEOUT 10000 /* usecs */ #endif /* TAO_EC_DEFAULT_CONSUMER_CONTROL_TIMEOUT */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp index ec794142b7e..20faf6a860c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.cpp @@ -2,9 +2,11 @@ #include "EC_Dispatching_Task.h" #include "EC_ProxySupplier.h" +#include "EC_Defaults.h" #include "tao/ORB_Constants.h" #include "ace/OS_NS_errno.h" +#include "ace/OS_NS_strings.h" #if ! defined (__ACE_INLINE__) #include "EC_Dispatching_Task.i" @@ -14,6 +16,75 @@ ACE_RCSID (Event, EC_Dispatching, "$Id$") +TAO_EC_Simple_Queue_Full_Action::TAO_EC_Simple_Queue_Full_Action() + : queue_full_action_return_value_ (WAIT_TO_EMPTY) +{ +} + +/// Helper function to register the default action into the service +/// configurator. +int +TAO_EC_Simple_Queue_Full_Action::init_svcs (void) +{ + ACE_DEBUG ((LM_DEBUG, "(%P|%t) Simple_Queue_Full_Action::init_svcs()\n")); + return ACE_Service_Config::static_svcs ()-> + insert (&ace_svc_desc_TAO_EC_Simple_Queue_Full_Action); +} + +int +TAO_EC_Simple_Queue_Full_Action::init (int argc, char* argv[]) +{ + // Here we look at the args and set an internal flag indicating whether + // the default action should be to wait for the queue to not be full + // or whether it should be to silently discard the event. + + // @@ This should use the arg shifter stuff, but let's keep it simple to + // start. + + do { + if (argc == 0) + break; + + if (ACE_OS::strcasecmp ("wait", argv[0]) == 0) + this->queue_full_action_return_value_ = WAIT_TO_EMPTY; + else if (ACE_OS::strcasecmp ("discard", argv[0]) == 0) + this->queue_full_action_return_value_ = SILENTLY_DISCARD; +#if 0 + else + ; + // probably ought to print an error message here +#endif + } while (0); + + return 0; +} + +int +TAO_EC_Simple_Queue_Full_Action::fini (void) +{ + return 0; +} + +int +TAO_EC_Simple_Queue_Full_Action::queue_full_action (TAO_EC_Dispatching_Task */*task*/, + TAO_EC_ProxyPushSupplier */*proxy*/, + RtecEventComm::PushConsumer_ptr /*consumer*/, + RtecEventComm::EventSet& /*event*/ + ACE_ENV_ARG_DECL) +{ + return this->queue_full_action_return_value_; +} + +ACE_STATIC_SVC_DEFINE (TAO_EC_Simple_Queue_Full_Action, + ACE_TEXT (TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_EC_Simple_Queue_Full_Action), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0) +ACE_FACTORY_DEFINE (TAO_RTEvent, TAO_EC_Simple_Queue_Full_Action) + + + int TAO_EC_Queue::is_full_i (void) { @@ -71,6 +142,17 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy, RtecEventComm::EventSet& event ACE_ENV_ARG_DECL) { + if (this->msg_queue()->is_full ()) + { + int action = + this->queue_full_service_object_->queue_full_action (this, proxy, + consumer, event + ACE_ENV_ARG_PARAMETER); + if (action == TAO_EC_Queue_Full_Service_Object::SILENTLY_DISCARD) + return; + // if action == WAIT_TO_EMPTY then we just go ahead and queue it + } + if (this->allocator_ == 0) this->allocator_ = ACE_Allocator::instance (); @@ -86,7 +168,11 @@ TAO_EC_Dispatching_Task::push (TAO_EC_ProxyPushSupplier *proxy, event, this->data_block_.duplicate (), this->allocator_); + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size before putq: %d\n", + this, this->the_queue_.message_count ())); this->putq (mb); + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): task %@ queue size after putq: %d\n", + this, this->the_queue_.message_count ())); } // **************************************************************** diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h index 5fdaec09763..86869bc6e3c 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.h @@ -29,6 +29,52 @@ #include "ace/Task.h" #include "ace/Message_Block.h" #include "ace/Lock_Adapter_T.h" +#include "ace/Service_Config.h" +#include "ace/Global_Macros.h" + +// Forward decl +class TAO_EC_Dispatching_Task; + +class TAO_RTEvent_Serv_Export TAO_EC_Queue_Full_Service_Object : public ACE_Service_Object +{ +public: + enum QueueFullActionReturnValue + { + WAIT_TO_EMPTY = 0, + SILENTLY_DISCARD = -1 + }; + + // Called when + virtual int queue_full_action (TAO_EC_Dispatching_Task *task, + TAO_EC_ProxyPushSupplier *proxy, + RtecEventComm::PushConsumer_ptr consumer, + RtecEventComm::EventSet& event + ACE_ENV_ARG_DECL) = 0; +}; + +class TAO_RTEvent_Serv_Export TAO_EC_Simple_Queue_Full_Action : + public TAO_EC_Queue_Full_Service_Object +{ +public: + TAO_EC_Simple_Queue_Full_Action (); + + /// Helper function to register the default action into the service + /// configurator. + static int init_svcs (void); + + // = The Service_Object entry points + virtual int init (int argc, char* argv[]); + virtual int fini (void); + + virtual int queue_full_action (TAO_EC_Dispatching_Task *task, + TAO_EC_ProxyPushSupplier *proxy, + RtecEventComm::PushConsumer_ptr consumer, + RtecEventComm::EventSet& event + ACE_ENV_ARG_DECL); + +protected: + int queue_full_action_return_value_; +}; class TAO_RTEvent_Serv_Export TAO_EC_Queue : public ACE_Message_Queue<ACE_SYNCH> { @@ -54,7 +100,7 @@ class TAO_RTEvent_Serv_Export TAO_EC_Dispatching_Task : public ACE_Task<ACE_SYNC { public: /// Constructor - TAO_EC_Dispatching_Task (ACE_Thread_Manager* thr_manager = 0); + TAO_EC_Dispatching_Task (ACE_Thread_Manager* thr_manager = 0, TAO_EC_Queue_Full_Service_Object* queue_full_service_object = 0); /// Process the events in the queue. virtual int svc (void); @@ -73,6 +119,8 @@ private: /// The queue TAO_EC_Queue the_queue_; + + TAO_EC_Queue_Full_Service_Object* queue_full_service_object_; }; // **************************************************************** @@ -139,6 +187,9 @@ private: #include "EC_Dispatching_Task.i" #endif /* __ACE_INLINE__ */ +ACE_STATIC_SVC_DECLARE (TAO_EC_Simple_Queue_Full_Action) +ACE_FACTORY_DECLARE (TAO_RTEvent, TAO_EC_Simple_Queue_Full_Action) + #include /**/ "ace/post.h" #endif /* TAO_EC_DISPATCHING_TASK_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i index 59fecf9da0e..12694120844 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i +++ b/TAO/orbsvcs/orbsvcs/Event/EC_Dispatching_Task.i @@ -13,12 +13,23 @@ TAO_EC_Queue:: /// @todo The high water mark and low water mark shouldn't be /// hardcoded, check http://deuce.doc.wustl.edu/bugzilla/show_bug.cgi?id=565 +#ifndef TAO_EC_QUEUE_HWM +#define TAO_EC_QUEUE_HWM 16384 +//#define TAO_EC_QUEUE_HWM 2 +#endif + +#ifndef TAO_EC_QUEUE_LWM +#define TAO_EC_QUEUE_LWM 16 +//#define TAO_EC_QUEUE_LWM 1 +#endif + ACE_INLINE TAO_EC_Dispatching_Task:: -TAO_EC_Dispatching_Task (ACE_Thread_Manager* thr_manager) +TAO_EC_Dispatching_Task (ACE_Thread_Manager* thr_manager, TAO_EC_Queue_Full_Service_Object* so) : ACE_Task<ACE_SYNCH> (thr_manager), allocator_ (0), - the_queue_ (16384, 16) // @@ + the_queue_ (TAO_EC_QUEUE_HWM, TAO_EC_QUEUE_LWM), + queue_full_service_object_ (so) { this->msg_queue (&this->the_queue_); } diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.cpp index 5be3be22db3..d243e05c7ab 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.cpp +++ b/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.cpp @@ -7,14 +7,16 @@ ACE_RCSID(Event, EC_MT_Dispatching, "$Id$") TAO_EC_MT_Dispatching::TAO_EC_MT_Dispatching (int nthreads, int thread_creation_flags, int thread_priority, - int force_activate) + int force_activate, + TAO_EC_Queue_Full_Service_Object* service_object) : nthreads_ (nthreads), thread_creation_flags_ (thread_creation_flags), thread_priority_ (thread_priority), force_activate_ (force_activate), - task_ (&this->thread_manager_), - active_ (0) + active_ (0), + queue_full_service_object_ (service_object) { + this->task_.open (&this->thread_manager_); } void @@ -83,3 +85,4 @@ TAO_EC_MT_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy, this->task_.push (proxy, consumer, event ACE_ENV_ARG_PARAMETER); } + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.h b/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.h index 4be04076480..293fce9464b 100644 --- a/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.h +++ b/TAO/orbsvcs/orbsvcs/Event/EC_MT_Dispatching.h @@ -41,7 +41,8 @@ public: TAO_EC_MT_Dispatching (int nthreads, int thread_creation_flags, int thread_priority, - int force_activate); + int force_activate, + TAO_EC_Queue_Full_Service_Object* queue_full_service_object_name); // = The EC_Dispatching methods. virtual void activate (void); @@ -83,6 +84,9 @@ private: /// Are the threads running? int active_; + + /// Service Object information + TAO_EC_Queue_Full_Service_Object* queue_full_service_object_; }; #include /**/ "ace/post.h" diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.cpp new file mode 100644 index 00000000000..53547dd3bcf --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.cpp @@ -0,0 +1,206 @@ +// $Id$ + +extern unsigned long EC_TPC_debug_level; + +#include "EC_TPC_Dispatching.h" +#include "EC_Defaults.h" + +#include <ace/Dynamic_Service.h> + +#if ! defined (__ACE_INLINE__) +#include "EC_TPC_Dispatching.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_TPC_Dispatching, "$Id$") + +#if !defined(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE) +#define TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE 32 +#endif + +TAO_EC_TPC_Dispatching::TAO_EC_TPC_Dispatching (TAO_EC_Queue_Full_Service_Object* so) + : consumer_task_map_(TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE) + , queue_full_service_object_(so) +{ + ACE_ASSERT (this->queue_full_service_object_ != 0); +} + +TAO_EC_TPC_Dispatching::~TAO_EC_TPC_Dispatching () +{ + // No other dispatching strategy has a DTOR body. I can only + // assume that it's guaranteed that shutdown() is called before + // the DTOR, so the tear-down logic needs to go in the shutdown, + // and the DTOR need not call shutdown. +} + +int +TAO_EC_TPC_Dispatching::add_consumer (RtecEventComm::PushConsumer_ptr consumer + ACE_ENV_ARG_DECL) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + // Duplicate the pointer and hold it safely + RtecEventComm::PushConsumer_var pc = + RtecEventComm::PushConsumer::_duplicate(consumer); + + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@)\n", pc.in())); + + TAO_EC_Dispatching_Task* dtask = + new TAO_EC_TPC_Dispatching_Task (&this->thread_manager_, this->queue_full_service_object_); + + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::add_consumer(%@): new task %@\n", pc.in(), dtask)); + + if ((dtask->activate (TAO_EC_DEFAULT_DISPATCHING_THREADS_FLAGS, + 1, // we only want one thread to dispatch to a consumer + 1, // magic number?? + TAO_EC_DEFAULT_DISPATCHING_THREADS_PRIORITY)) == -1) + { + ACE_DEBUG ((LM_WARNING, + "EC (%P|%t): TPC_Dispatching::add_consumer unable to activate" + " dispatching task for consumer (%@)\n", + consumer)); + delete dtask; + return -1; + } + + int bindresult = + this->consumer_task_map_.bind (RtecEventComm::PushConsumer::_duplicate(pc.in()), + dtask); + const char* explanation = 0; + if (bindresult == -1) + explanation = "general failure"; + else if (bindresult == 1) + explanation = "entry already exists"; + + if (explanation != 0) + { + ACE_DEBUG ((LM_WARNING, + "EC (%P|%t): TPC_Dispatching::add_consumer failed to bind consumer (%@)" + " and dispatch task in map (%s): %p\n", + consumer, explanation)); + dtask->putq (new TAO_EC_Shutdown_Task_Command); + dtask->wait (); + delete dtask; + return -1; + } + + return 0; +} + +int +TAO_EC_TPC_Dispatching::remove_consumer (RtecEventComm::PushConsumer_ptr consumer + ACE_ENV_ARG_DECL) +{ + ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + + TAO_EC_Dispatching_Task* dtask = 0; + + if (this->consumer_task_map_.find (consumer, dtask) == -1) + { + ACE_DEBUG ((LM_WARNING, + "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" + " find consumer (%@) in map\n", consumer)); + return -1; + } + + // Must have found it...first try to unbind + if (this->consumer_task_map_.unbind (consumer) == -1) + { + ACE_DEBUG ((LM_WARNING, + "EC (%P|%t): TPC_Dispatching::remove_consumer failed to" + " unbind consumer (%@) and task in map\n", consumer)); + return -1; + } + + dtask->putq (new TAO_EC_Shutdown_Task_Command); + CORBA::release (consumer); // This matches the _duplicate in add_consumer + return 0; +} + +void +TAO_EC_TPC_Dispatching::activate (void) +{ +} + +void +TAO_EC_TPC_Dispatching::shutdown (void) +{ + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + + // The MT_Dispatching strategy sends a TAO_EC_Shutdown_Task_Command + // to the dispatching task. Is that what we should do here? + MAPTYPE::ITERATOR iter = this->consumer_task_map_.begin (); + while (! iter.done()) + { + MAPTYPE::ENTRY* entry; + if (! iter.next(entry)) + continue; + + entry->int_id_->putq (new TAO_EC_Shutdown_Task_Command); + iter.advance (); + } + + this->thread_manager_.wait (); // Wait for the threads to terminate + + // Now iterate again and call CORBA::release on the ext_id; + // we don't have to delete int_id_ b/c that happens in its close() method. + iter = this->consumer_task_map_.begin (); + while (! iter.done()) + { + MAPTYPE::ENTRY* entry; + if (! iter.next(entry)) + continue; + + CORBA::release (entry->ext_id_); + iter.advance (); + } + + this->consumer_task_map_.unbind_all (); +} + +void +TAO_EC_TPC_Dispatching::push (TAO_EC_ProxyPushSupplier* proxy, + RtecEventComm::PushConsumer_ptr consumer, + const RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info + ACE_ENV_ARG_DECL) +{ + RtecEventComm::EventSet event_copy = event; + this->push_nocopy (proxy, consumer, event_copy, qos_info ACE_ENV_ARG_PARAMETER); +} + +void +TAO_EC_TPC_Dispatching::push_nocopy (TAO_EC_ProxyPushSupplier* proxy, + RtecEventComm::PushConsumer_ptr consumer, + RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& + ACE_ENV_ARG_DECL) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) TPC_Dispatching::push_nocopy(supplier=%@,consumer=%@)\n", proxy, consumer)); + + ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_); + TAO_EC_Dispatching_Task* dtask; + + if (this->consumer_task_map_.find (consumer, dtask) == -1) + { + ACE_DEBUG ((LM_WARNING, + "EC (%P|%t): TPC_Dispatching::push_nocopy failed to" + " find consumer (%@) in map\n", consumer)); + } + else + { + dtask->push (proxy, consumer, event ACE_ENV_ARG_PARAMETER); + } +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Hash_Map_Manager_Ex<RtecEventComm::PushConsumer*, TAO_EC_Dispatching_Task*, ACE_Pointer_Hash<RtecEventComm::PushConsumer*>, ACE_Equal_To<RtecEventComm::PushConsumer*>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<RtecEventComm::PushConsumer*, TAO_EC_Dispatching_Task*, ACE_Pointer_Hash<RtecEventComm::PushConsumer*>, ACE_Equal_To<RtecEventComm::PushConsumer*>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<RtecEventComm::PushConsumer*,TAO_EC_Dispatching_Task*, ACE_Pointer_Hash<RtecEventComm::PushConsumer*>,ACE_Equal_To<RtecEventComm::PushConsumer*>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<RtecEventComm::PushConsumer*, TAO_EC_Dispatching_Task*, ACE_Pointer_Hash<RtecEventComm::PushConsumer*>, ACE_Equal_To<RtecEventComm::PushConsumer*>, ACE_Null_Mutex>; +template class ACE_Equal_To<RtecEventComm::PushConsumer*>; +template class ACE_Pointer_Hash<RtecEventComm::PushConsumer*>; +template class ACE_Hash_Map_Entry<RtecEventComm::PushConsumer*, TAO_EC_Dispatching_Task*>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.h b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.h new file mode 100644 index 00000000000..dbc8cda7061 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.h @@ -0,0 +1,83 @@ +/* -*- C++ -*- */ +/** + * @file EC_TPC_Dispatching.h + * + * $Id$ + * + * @author Chris Cleeland <cleeland at ociweb.com> + * + */ + +#ifndef TAO_EC_TPC_DISPATCHING_H +#define TAO_EC_TPC_DISPATCHING_H +#include "ace/pre.h" + +#include "ace/Hash_Map_Manager_T.h" + +#include "EC_Dispatching.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "EC_TPC_Dispatching_Task.h" + +class TAO_EC_Event_Channel_Base; + +/** + * @class TAO_EC_TPC_Dispatching + * + * @brief Dispatching strategy that isolates deliveries to a consumer from any other. + * + * This strategy uses a thread per consumer, and was specifically designed to + * isolate the effects of an ill-behaved consumer from affecting other consumers. + */ +class TAO_RTEvent_Serv_Export TAO_EC_TPC_Dispatching : public TAO_EC_Dispatching +{ +public: + TAO_EC_TPC_Dispatching (TAO_EC_Queue_Full_Service_Object* so); + ~TAO_EC_TPC_Dispatching (); + + // = The EC_Dispatching methods. + virtual void activate (void); + virtual void shutdown (void); + virtual void push (TAO_EC_ProxyPushSupplier* proxy, + RtecEventComm::PushConsumer_ptr consumer, + const RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info + ACE_ENV_ARG_DECL); + virtual void push_nocopy (TAO_EC_ProxyPushSupplier* proxy, + RtecEventComm::PushConsumer_ptr consumer, + RtecEventComm::EventSet& event, + TAO_EC_QOS_Info& qos_info + ACE_ENV_ARG_DECL); + + int add_consumer (RtecEventComm::PushConsumer_ptr consumer ACE_ENV_ARG_DECL); + int remove_consumer (RtecEventComm::PushConsumer_ptr consumer ACE_ENV_ARG_DECL); + +private: + // Use our own thread manager + ACE_Thread_Manager thread_manager_; + + typedef ACE_Hash_Map_Manager_Ex<RtecEventComm::PushConsumer_ptr,TAO_EC_Dispatching_Task*,ACE_Pointer_Hash<RtecEventComm::PushConsumer_ptr>,ACE_Equal_To<RtecEventComm::PushConsumer_ptr>,ACE_Null_Mutex> MAPTYPE; + + // Tweak the default size of this map by #defining + // TAO_EC_TPC_DISPATCHING_DEFAULT_MAP_SIZE + MAPTYPE consumer_task_map_; + + // Lock for modifying the map. It's not enough to have a lock only + // on the map, because we have to hold the map constant while doing + // multiple distinct map manipulations, such as in remove_consumer(). + ACE_SYNCH_MUTEX lock_; + + // Service object information + TAO_EC_Queue_Full_Service_Object* queue_full_service_object_; // @@ who will release? + // @@ check to see how the factory gets released... +}; + +#if defined (__ACE_INLINE__) +#include "EC_TPC_Dispatching.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" +#endif /* TAO_EC_TPC_DISPATCHING_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.i b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.i new file mode 100644 index 00000000000..cfa1da318d3 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching.i @@ -0,0 +1 @@ +// $Id$ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.cpp new file mode 100644 index 00000000000..5de281b24cd --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.cpp @@ -0,0 +1,20 @@ +// $Id$ + +#include "EC_TPC_Dispatching_Task.h" + +//ACE_RCS_ID(RTEvent, EC_TPC_Dispatching_Task, "$Id$") + +TAO_EC_TPC_Dispatching_Task::TAO_EC_TPC_Dispatching_Task +(ACE_Thread_Manager* thr_mgr, + TAO_EC_Queue_Full_Service_Object* so) + : TAO_EC_Dispatching_Task (thr_mgr, so) +{ +} + +int +TAO_EC_TPC_Dispatching_Task::close (u_long flags) +{ + ACE_UNUSED_ARG (flags); + delete this; + return 0; +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.h b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.h new file mode 100644 index 00000000000..7223dccfb2d --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Dispatching_Task.h @@ -0,0 +1,29 @@ +/* -*- C++ -*- */ +/** + * @file EC_TPC_Dispatching_Task.h + * + * $Id$ + * + * @author Chris Cleeland <cleeland at ociweb.com> + */ + +#ifndef TAO_EC_TPC_DISPATCHING_TASK_H +#define TAO_EC_TPC_DISPATCHING_TASK_H +#include "ace/pre.h" + +#include "EC_Dispatching_Task.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_RTEvent_Serv_Export TAO_EC_TPC_Dispatching_Task : public TAO_EC_Dispatching_Task +{ +public: + /// Constructor + TAO_EC_TPC_Dispatching_Task (ACE_Thread_Manager* thr_mgr, TAO_EC_Queue_Full_Service_Object* so); + virtual int close (u_long flags = 0); +}; + +#include "ace/post.h" +#endif /* TAO_EC_TPC_DISPATCHING_TASK_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.cpp new file mode 100644 index 00000000000..d350c76c677 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.cpp @@ -0,0 +1,125 @@ +// $Id$ + +#include "EC_TPC_Factory.h" + +#include "EC_TPC_Dispatching.h" +#include "EC_TPC_ProxySupplier.h" +#include "EC_TPC_ProxyConsumer.h" + +#include "tao/ORB_Core.h" + +#include "ace/Arg_Shifter.h" +#include "ace/Sched_Params.h" +#include "ace/OS_NS_strings.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_TPC_Factory.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_TPC_Factory, "$Id$") + +unsigned long EC_TPC_debug_level; + +TAO_EC_TPC_Factory::TAO_EC_TPC_Factory (void) +{ + EC_TPC_debug_level = 0; +} + +TAO_EC_TPC_Factory::~TAO_EC_TPC_Factory (void) +{ +} + +int +TAO_EC_TPC_Factory::init_svcs (void) +{ + TAO_EC_Simple_Queue_Full_Action::init_svcs(); + return ACE_Service_Config::static_svcs ()-> + insert (&ace_svc_desc_TAO_EC_TPC_Factory); +} + +int +TAO_EC_TPC_Factory::init (int argc, char* argv[]) +{ + ACE_Arg_Shifter arg_shifter (argc, argv); + + while (arg_shifter.is_anything_left ()) + { + const char *arg = arg_shifter.get_current (); + + if (ACE_OS::strcasecmp (arg, "-ECDispatching") == 0) + { + arg_shifter.consume_arg (); + + if (arg_shifter.is_parameter_next ()) + { + // Here we print out a message indicating that this + // option isn't valid with this factory and that it's + // being ignored. + + ACE_DEBUG ((LM_ERROR, + "EC_TPC_Factory - " + "-ECDispatching not supported with TPC_Factory; ignoring the option and using thread-per-consumer dispatch strategy\n")); + arg_shifter.consume_arg (); + } + } + if (ACE_OS::strcasecmp (arg, "-ECTPCDebug") == 0) + { + arg_shifter.consume_arg (); + ++EC_TPC_debug_level; + } + else + { + arg_shifter.ignore_arg (); + } + + } + + return TAO_EC_Default_Factory::init (argc, argv); +} + +// **************************************************************** + +TAO_EC_Dispatching* +TAO_EC_TPC_Factory::create_dispatching (TAO_EC_Event_Channel_Base *) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) EC_TPC_Factory::create_dispatching\n")); + + TAO_EC_Queue_Full_Service_Object* so = + this->find_service_object (this->queue_full_service_object_name_.fast_rep(), + TAO_EC_DEFAULT_QUEUE_FULL_SERVICE_OBJECT_NAME); + + return new TAO_EC_TPC_Dispatching (so); +} + + +TAO_EC_ProxyPushSupplier* +TAO_EC_TPC_Factory::create_proxy_push_supplier (TAO_EC_Event_Channel_Base *ec) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) EC_TPC_Factory::create_proxy_push_supplier\n")); + return new TAO_EC_TPC_ProxyPushSupplier (ec, this->consumer_validate_connection_); +} + + +TAO_EC_ProxyPushConsumer* +TAO_EC_TPC_Factory::create_proxy_push_consumer (TAO_EC_Event_Channel_Base *ec) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t) EC_TPC_Factory::create_proxy_push_consumer\n")); + return new TAO_EC_TPC_ProxyPushConsumer (ec); +} + + +// **************************************************************** + +ACE_STATIC_SVC_DEFINE (TAO_EC_TPC_Factory, + ACE_TEXT ("EC_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_EC_TPC_Factory), + ACE_Service_Type::DELETE_THIS | ACE_Service_Type::DELETE_OBJ, + 0) +ACE_FACTORY_DEFINE (TAO_RTEvent_Serv, TAO_EC_TPC_Factory) + +// **************************************************************** + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.h b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.h new file mode 100644 index 00000000000..3d9455564e5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.h @@ -0,0 +1,67 @@ +/* -*- C++ -*- */ +/** + * @file EC_TPC_Factory.h + * + * $Id$ + * + */ + +#ifndef TAO_EC_TPC_FACTORY_H +#define TAO_EC_TPC_FACTORY_H +#include "ace/pre.h" + +#include "EC_Default_Factory.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +/** + * @class TAO_EC_TPC_Factory + * + * @brief A generic factory for EC experimentation. + * + * This class allows the user to experiment with different EC + * configurations. Using a command-line like interface the user + * can specify which strategies will this factory generate. + * Since the class can be dynamically loaded the strategies can be + * set in the service configurator file. + */ +class TAO_RTEvent_Serv_Export TAO_EC_TPC_Factory : public TAO_EC_Default_Factory +{ +public: + /// Constructor + TAO_EC_TPC_Factory (void); + + /// destructor... + virtual ~TAO_EC_TPC_Factory (void); + + /// Helper function to register the default factory into the service + /// configurator. + static int init_svcs (void); + + // = The Service_Object entry points + virtual int init (int argc, char* argv[]); + + // = The EC_Factory methods + virtual TAO_EC_Dispatching* + create_dispatching (TAO_EC_Event_Channel_Base*); + + virtual TAO_EC_ProxyPushSupplier* + create_proxy_push_supplier (TAO_EC_Event_Channel_Base*); + + virtual TAO_EC_ProxyPushConsumer* + create_proxy_push_consumer (TAO_EC_Event_Channel_Base*); +}; + +extern unsigned long EC_TPC_debug_level; + +#if defined (__ACE_INLINE__) +#include "EC_TPC_Factory.i" +#endif /* __ACE_INLINE__ */ + +ACE_STATIC_SVC_DECLARE (TAO_EC_TPC_Factory) +ACE_FACTORY_DECLARE (TAO_RTEvent_Serv, TAO_EC_TPC_Factory) + +#include "ace/post.h" +#endif /* TAO_EC_TPC_FACTORY_H */ diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.i b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.i new file mode 100644 index 00000000000..74e88caa0c5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_Factory.i @@ -0,0 +1,2 @@ +// $Id$ + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.cpp new file mode 100644 index 00000000000..a9179ccae1b --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.cpp @@ -0,0 +1,43 @@ +// $Id$ + +#include "EC_TPC_ProxyConsumer.h" + +ACE_RCSID(Event, EC_TPC_ProxyConsumer, "$Id$") + +#include "EC_Event_Channel_Base.h" +#include "EC_TPC_Dispatching.h" + +extern unsigned long EC_TPC_debug_level; + +TAO_EC_TPC_ProxyPushConsumer::TAO_EC_TPC_ProxyPushConsumer (TAO_EC_Event_Channel_Base* ec) +: TAO_EC_Default_ProxyPushConsumer (ec) +{ +} + +TAO_EC_TPC_Dispatching* +TAO_EC_TPC_ProxyPushConsumer::tpc_dispatching () +{ + TAO_EC_Dispatching* dispatcher = this->event_channel_->dispatching (); + TAO_EC_TPC_Dispatching* tpcdispatcher = + ACE_dynamic_cast (TAO_EC_TPC_Dispatching*, dispatcher); + return tpcdispatcher; +} + +TAO_EC_TPC_ProxyPushConsumer::~TAO_EC_TPC_ProxyPushConsumer (void) +{ + // @@@ Do I need to call the logic to remove the consumer from the + // dispatch map in here? I'm not sure... But, if I do, then I need + // to fact that "remove" code out of just the + // disconnect_push_consumer. + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "RTEC (%P|%t): inside ~TAO_EC_TPC_ProxyPushConsumer (%x)\n", this)); +} + +void +TAO_EC_TPC_ProxyPushConsumer::disconnect_push_consumer ( + ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->tpc_dispatching ()->remove_consumer (this->_this(ACE_ENV_SINGLE_ARG_PARAMETER)); + BASECLASS::disconnect_push_consumer (ACE_ENV_SINGLE_ARG_PARAMETER); +} diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.h b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.h new file mode 100644 index 00000000000..4189082451e --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxyConsumer.h @@ -0,0 +1,45 @@ +/* -*- C++ -*- */ +/** + * @file EC_TPC_ProxyConsumer.h + * + * $Id$ + * + * @author Chris Cleeland <cleeland at ociweb.com > + * + */ + +#ifndef TAO_EC_TPC_PROXYCONSUMER_H +#define TAO_EC_TPC_PROXYCONSUMER_H +#include "ace/pre.h" + +#include "EC_Default_ProxyConsumer.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_EC_Dispatching; +class TAO_EC_TPC_Dispatching; + +/** + * @class TAO_EC_TPC_ProxyPushConsumer + * + */ +class TAO_RTEvent_Serv_Export TAO_EC_TPC_ProxyPushConsumer : + public TAO_EC_Default_ProxyPushConsumer +{ +public: + TAO_EC_TPC_ProxyPushConsumer (TAO_EC_Event_Channel_Base* ec); + virtual ~TAO_EC_TPC_ProxyPushConsumer (void); + + virtual void disconnect_push_consumer (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + typedef TAO_EC_Default_ProxyPushConsumer BASECLASS; + TAO_EC_TPC_Dispatching* tpc_dispatching (); +}; + +#include "ace/post.h" + +#endif diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.cpp b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.cpp new file mode 100644 index 00000000000..66495f649b5 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.cpp @@ -0,0 +1,66 @@ +// $Id$ + +#include "EC_TPC_ProxySupplier.h" + +#if ! defined (__ACE_INLINE__) +#include "EC_TPC_ProxySupplier.i" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID(Event, EC_TPC_ProxySupplier, "$Id$") + +#include "EC_Event_Channel_Base.h" +#include "EC_TPC_Dispatching.h" + +extern unsigned long EC_TPC_debug_level; + +TAO_EC_TPC_ProxyPushSupplier::~TAO_EC_TPC_ProxyPushSupplier (void) +{ + +} + +void +TAO_EC_TPC_ProxyPushSupplier:: disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): enter EC_TPC_ProxySupplier::disconnect_push_supplier (%@): refcount=%u,consumer=%@\n", this, this->refcount_, this->consumer_.in())); + + if (this->is_connected_i ()) + this->tpc_dispatching ()->remove_consumer (this->consumer_.in()); + BASECLASS::disconnect_push_supplier (); + + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): leave EC_TPC_ProxySupplier::disconnect_push_supplier (%@)\n", this)); +} + +TAO_EC_TPC_Dispatching* +TAO_EC_TPC_ProxyPushSupplier::tpc_dispatching () +{ + TAO_EC_Dispatching* dispatcher = this->event_channel_->dispatching (); + TAO_EC_TPC_Dispatching* tpcdispatcher = + ACE_dynamic_cast (TAO_EC_TPC_Dispatching*, dispatcher); + return tpcdispatcher; +} + +void +TAO_EC_TPC_ProxyPushSupplier::connect_push_consumer ( + RtecEventComm::PushConsumer_ptr push_consumer, + const RtecEventChannelAdmin::ConsumerQOS& qos + ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException, + RtecEventChannelAdmin::AlreadyConnected, + RtecEventChannelAdmin::TypeError)) +{ + BASECLASS::connect_push_consumer (push_consumer, qos ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "EC (%P|%t): EC_ProxySupplier(%@): refcount=%u,consumer=%@\n", + this, this->refcount_, this->consumer_.in())); + + TAO_EC_TPC_Dispatching* tpcdispatcher = this->tpc_dispatching (); + + // the new dispatching task gets automatically created + tpcdispatcher->add_consumer (push_consumer); +} + diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.h b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.h new file mode 100644 index 00000000000..9797eb3c305 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.h @@ -0,0 +1,60 @@ +/* -*- C++ -*- */ +/** + * @file EC_TPC_ProxySupplier.h + * + * $Id$ + * + * @author Chris Cleeland <cleeland at ociweb.com> + * + */ + +#ifndef TAO_EC_TPC_PROXYSUPPLIER_H +#define TAO_EC_TPC_PROXYSUPPLIER_H +#include "ace/pre.h" + +#include "EC_Default_ProxySupplier.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +class TAO_EC_Dispatching; +class TAO_EC_TPC_Dispatching; + +class TAO_RTEvent_Serv_Export TAO_EC_TPC_ProxyPushSupplier : + public TAO_EC_Default_ProxyPushSupplier +{ +public: + /// Constructor + TAO_EC_TPC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* event_channel, int validate_connection); + + /// Dtor + virtual ~TAO_EC_TPC_ProxyPushSupplier (void); + + /*! These are overriden from the base class in order to maintain the + map in the dispatcher class. */ + + // = The RtecEventChannelAdmin::ProxyPushSupplier methods... + virtual void connect_push_consumer ( + RtecEventComm::PushConsumer_ptr push_consumer, + const RtecEventChannelAdmin::ConsumerQOS &qos + ACE_ENV_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException, + RtecEventChannelAdmin::AlreadyConnected, + RtecEventChannelAdmin::TypeError)); + + virtual void disconnect_push_supplier (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + typedef TAO_EC_Default_ProxyPushSupplier BASECLASS; + TAO_EC_TPC_Dispatching* tpc_dispatching (); +}; + +#if defined (__ACE_INLINE__) +#include "EC_TPC_ProxySupplier.i" +#endif /* __ACE_INLINE__ */ + +#include "ace/post.h" + +#endif diff --git a/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.i b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.i new file mode 100644 index 00000000000..7120a4017f2 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/EC_TPC_ProxySupplier.i @@ -0,0 +1,12 @@ +// $Id$ + +extern unsigned long EC_TPC_debug_level; + +ACE_INLINE +TAO_EC_TPC_ProxyPushSupplier::TAO_EC_TPC_ProxyPushSupplier (TAO_EC_Event_Channel_Base* ec, int validate_connection) +: TAO_EC_Default_ProxyPushSupplier (ec, validate_connection) +{ + if (EC_TPC_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "(%P|%t) EC_TPC_ProxyPushSupplier::CTOR (%@)\n", this)); +} + diff --git a/TAO/orbsvcs/orbsvcs/RTEvent_Serv.mpc b/TAO/orbsvcs/orbsvcs/RTEvent_Serv.mpc index e6ccf146625..3c19e408e47 100644 --- a/TAO/orbsvcs/orbsvcs/RTEvent_Serv.mpc +++ b/TAO/orbsvcs/orbsvcs/RTEvent_Serv.mpc @@ -76,6 +76,11 @@ project(RTEvent_Serv) : orbsvcslib, core, rtevent_skel, svc_utils, messaging { Event/EC_Trivial_Supplier_Filter.cpp Event/EC_Type_Filter.cpp Event/EC_UDP_Admin.cpp + Event/EC_TPC_Dispatching.cpp + Event/EC_TPC_Dispatching_Task.cpp + Event/EC_TPC_Factory.cpp + Event/EC_TPC_ProxyConsumer.cpp + Event/EC_TPC_ProxySupplier.cpp } Template_Files { @@ -88,3 +93,4 @@ project(RTEvent_Serv) : orbsvcslib, core, rtevent_skel, svc_utils, messaging { RTEvent_Serv.rc } } + |