diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h new file mode 100644 index 00000000000..bf872280346 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/Dispatching_Modules.h @@ -0,0 +1,486 @@ +/* -*- C++ -*- */ +// $Id$ +// +// ============================================================================ +// +// = LIBRARY +// orbsvcs +// +// = FILENAME +// Dispatching_Module +// +// = AUTHOR +// Tim Harrison (harrison@cs.wustl.edu) +// +// = DESCRIPTION +// This file holds the different Event Service dispatching +// mechanisms. These include null-dispatching (EFD), +// single-threaded with (RTU) and without preemption (LAME), and a +// multithreaded implementation. +// +// ============================================================================ + +#ifndef ACE_DISPATCHING_MODULES_H +#define ACE_DISPATCHING_MODULES_H + +#include "tao/Timeprobe.h" +#include "ReactorTask.h" +#include "Event_Channel.h" + +// ************************************************************ + +// Forward declarations. +class ACE_ES_Dispatch_Queue; +class ACE_ES_Dispatch_Request; + +// ************************************************************ + +// Forward declarations. +class ACE_ES_Consumer_Module; + +class ACE_ES_Dispatching_Base : public ACE_Event_Handler +// = TITLE +// Event Service Dispatch Module base class +// +// = DESCRIPTION +// We inherit from ACE_Event_Handler so that we can be called back +// by the ReactorEx when requests are queued. The virtual +// dispatch_event method allows ACE_ES_Dispatch_Requests to call +// back the dispatching module when acting as command objects. When +// this implementation is used by the Event Channel it forwards all +// dispatch calls without any queuing. Therefore, it can be +// used to build an EFD. It is also inherited by the Priority +// Dispatching module. +{ +public: + ACE_ES_Dispatching_Base (ACE_EventChannel *channel); + // Default construction. + + virtual void open (ACE_ES_Consumer_Module *up, + ACE_ES_Correlation_Module *down); + // Link to adjacent modules. + + virtual void connected (ACE_Push_Consumer_Proxy *consumer, + CORBA::Environment &); + // Forward down_. + + virtual void disconnecting (ACE_Push_Consumer_Proxy *consumer, + CORBA::Environment &); + // Forward down_. + + virtual void disconnected (ACE_Push_Consumer_Proxy *consumer); + // Release any unneeded dispatching resources. + + // = Not needed. + // void connected (ACE_Push_Supplier_Proxy *supplier); + // void disconnecting (ACE_Push_Supplier_Proxy *supplier); + + virtual void push (ACE_ES_Dispatch_Request *request, + CORBA::Environment &) = 0; + // Forward up_. + + virtual int dispatch_event (ACE_ES_Dispatch_Request *request, + u_long &command_action); + // Called by ACE_ES_Dispatch_Requests when dequeued by RT_Tasks. + + virtual void dispatch_queue_closed (ACE_ES_Dispatch_Queue *q); + // Called when all the threads of a <q> have exited. + + virtual void shutdown (void); + // This is called by the Event Channel. This will attempt to shut + // down all of its threads gracefully. Wish it luck. + +protected: + ACE_EventChannel *channel_; + // Dat der channel. + + ACE_ES_MUTEX lock_; + // To synchronize thr_count_. + + int thr_count_; + // The total number of threads in the Dispatching Module. This will + // be the sum of all the Dispatch Queue threads. + + ACE_ES_Consumer_Module *up_; + // Next module up. + + ACE_ES_Correlation_Module *down_; + // Next module down. +}; + +// ************************************************************ + +class ACE_ES_Dispatch_Request : public ACE_RT_Task_Command +// = TITLE +// ACE Event Service Dispatch Request +// +// = DESCRIPTION +// Encapsulates a consumer and the events that will be sent to the +// consumer. Right now, this class keeps a single_event_ that can +// be used when only one event is sent to the consumer. Since this +// is frequently the case (except for correlations), this +// optimization reduces the amount of dynamic memory allocation is +// necessary. This class is also a GOF Command object since it can +// be dequeued by an RT_Task to call back the dispatching module +// for request dispatching. +{ +public: + typedef ACE_CORBA_Sequence<ACE_ES_Event_Container_var> Event_Set; + + ACE_ES_Dispatch_Request (void); + // Default construction. + + virtual ~ACE_ES_Dispatch_Request (void); + // Default destruction. + + ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, + RtecScheduler::handle_t rt_info); + // All the events must be added after construction to the + // event_set. + + ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, + const Event_Set &event_set, + RtecScheduler::handle_t rt_info); + // Set consumer_ to <consumer> and copy <event_set> to event_set_. + // <rt_info> describes the method receiving this dispatch. + + ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, + const RtecEventComm::Time &time, + RtecScheduler::handle_t rt_info); + // Set consumer_ to <consumer> and sets single_event_.creation_time_ + // to <time>. Sets use_single_event_ to 1. <rt_info> describes the + // method receiving this dispatch. + + ACE_ES_Dispatch_Request (ACE_Push_Consumer_Proxy *consumer, + ACE_ES_Event_Container *event, + RtecScheduler::handle_t rt_info); + // Sets consumer_ and the first slot of event_set_. We use the + // event_set_ instead of the single_event_ so that we can just carry + // around the pointer to <event>. <rt_info> describes the method + // receiving this dispatch. + + RtecScheduler::handle_t rt_info (void); + // Description of the method receiving this request. + + void set (ACE_ES_Dispatching_Base *dispatching_module, + RtecScheduler::OS_Priority priority, + RtecScheduler::Sub_Priority sub_priority); + // For multi-threaded implementations, <dispatching_module> is + // called back when a request is dequeued. <priority> is the + // dispatch priority of the event. <sub_priority> is the enqueue + // priority of the event and will be forwarded to + // ACE_Message_Block. + + ACE_Push_Consumer_Proxy *consumer (void) const; + // Consumer accessor. + + const Event_Set &event_set (void) const; + // If accessed, make_copy will use event_set_. + + Event_Set &event_set (void); + // If accessed, make_copy will use event_set_. + + CORBA::ULong number_of_events (void) const; + // Returns 1 if we're using single_event, or event_set_.size (). + + void make_copy (RtecEventComm::EventSet &dest) const; + // Copy single_event or event_set into <dest>. + + virtual int execute (u_long &command_action); + // Calls dispatching_module_->dispatch_event. + + RtecScheduler::OS_Priority priority (void); + // Priority accessor. + +#if 0 + // @@ This cannot be done: the object would be allocated using this + // class operator new, but it will be removed using the + // ACE_Message_Block operator delete! + void *operator new (size_t nbytes); + // Allocates memory from a thread-specific memory pool. + + void operator delete (void *); + // Returns memory to a thread-specific memory pool. +#endif + +protected: + RtecScheduler::OS_Priority priority_; + + RtecScheduler::handle_t rt_info_; + // Describes the method receiving this dispatch. + + ACE_ES_Dispatching_Base *dispatching_module_; + // The dispatching module called back when we're dequeued by a + // thread. + + int use_single_event_; + // Is true if we're using a single event. Is 0 is we're using + // event_set_. + + ACE_Push_Consumer_Proxy *consumer_; + // The final destination for single_event_ or event_set_. + + ACE_ES_Event_Container single_event_; + // This is used for single event dispatches. + + Event_Set event_set_; + // This is used for event sets that need to be dispatched. +}; + +// ************************************************************ + +#if defined (ACE_WIN32) +class ACE_ES_ReactorEx_NS : public ACE_Notification_Strategy +// = TITLE +// Event Service ReactorEx Notification Strategy +// +// = DESCRIPTION +// Integrates the ACE_Message_Queue notification to signal a +// handle that will wake up the ACE_ES_Priority_Dispatching +// module. This is used in place of the +// ACE_ReactorEx_Notification_Strategy to avoid any queueing by +// the ReactorEx::notify mechanism. +{ +public: + ACE_ES_ReactorEx_NS (ACE_Event_Handler *eh); + // Stores away <eh> for when this->open is called. + + int open (void); + // Registers eh_ with the ReactorEx to be notified when this->notify + // is called. + + void shutdown (void); + // Removes self from the reactor. + + // = These result in eh_->handle_signal getting called. eh_ should + // point to a dispatching module. + virtual int notify (void); + virtual int notify (ACE_Event_Handler *, + ACE_Reactor_Mask mask); + + // ACE_HANDLE get_handle (void); + // Returns event_.handle (). + +private: + ACE_Auto_Event event_; + // Registered with the ReactorEx. +}; + +typedef ACE_ES_ReactorEx_NS ACE_ES_Notification_Strategy; + +#else // ******************************************************* + +class ACE_ES_Reactor_NS : public ACE_Reactor_Notification_Strategy +// = TITLE +// Event Service Reactor Notification Strategy +// +// = DESCRIPTION +// Maps to the ACE_Reactor_Notification_Strategy interface. This +// version is for non WIN32 platforms. +{ +public: + ACE_ES_Reactor_NS (ACE_Event_Handler *eh); + // Calls ACE_Reactor_Notification_Strategy with the ORB's reactor + // and signal mask. + + int open (void); + // Does nothing. + + void shutdown (void); + // Does nothing. +}; + +typedef ACE_ES_Reactor_NS ACE_ES_Notification_Strategy; + +#endif /* ACE_WIN32 */ + +// ************************************************************ + +class ACE_ES_MQ : public ACE_ES_QUEUE +// = TITLE +// Event Service Message Queue +{ + virtual int notify (void) { return 0;} + // Does nothing. +}; + +// ************************************************************ + +class ACE_ES_Dispatch_Queue : public ACE_RT_Task +// = TITLE +// Event Service Dispatch Queue +// +// = DESCRIPTION +// An encapsulation of a dispatch queue. By inheriting from +// ACE_RT_Task, we can make this zero-threaded or multi-threaded. +{ +public: + ACE_ES_Dispatch_Queue (ACE_ES_Dispatching_Base *dispatching_module, + ACE_ES_Notification_Strategy *notification_strategy); + // Stores <dispatching_module> for this->threads_closed. Stores + // away <notification_strategy> for this->synch_threads. + + int open_queue (RtecScheduler::Period &period, + int threads); + // This is a hack to get the channel to work with the new + // scheduler. + +#if 0 + int open_queue (RtecScheduler::OS_Priority priority, + int threads); + // Creates a name from the <priority> and tries to find a scheduling + // qos structure. If one is not found, but created, qos_ is set + // with default values. Either way, if qos_->thread_ > 0, it calls + // this->synch_threads. Otherwise, our message queue will use + // notification_strategy_. This will cause the ReactorEx to call + // back the dispatching_module_ when requests are queued on our + // message queue. Returns 0 on success, -1 on failure. +#endif + + virtual void threads_closed (void); + // Called when every thread has exited. This will call + // dispatching_module_->dispatch_queue_closed. + +private: + ACE_ES_Dispatching_Base *dispatching_module_; + // Used in threads_closed. + + ACE_ES_Notification_Strategy *notification_strategy_; + // Notifies the Dispatching Module when a request has been queued on + // our message queue. +}; + +// ************************************************************ + +class ACE_ES_Priority_Dispatching : public ACE_ES_Dispatching_Base +// = TITLE +// Event Service Priority Dispatching Module +// +// = DESCRIPTION +// Inherits from ACE_Event_Handler to utilitize the +// ACE_Message_Queue notification strategy. This implementation +// does priority dispatching without preemption. +{ +public: + ACE_ES_Priority_Dispatching (ACE_EventChannel *channel, + int threads_per_queue); + // Store <channel>. Spawns <threads_per_queue> thread for each + // dispatch queue. If != 0, then the channel is an MT_CHANNEL. If + // == 0, then the channel is an ST_CHANNEL. + + ~ACE_ES_Priority_Dispatching (void); + // Delete queues. + + void connected (ACE_Push_Consumer_Proxy *consumer, + CORBA::Environment &); + // Allocate any needed dispatching resources for this consumers + // priority. + + void disconnected (ACE_Push_Consumer_Proxy *consumer); + // Release unneeded dispatch queues. + + // = Not needed. + // void connected (ACE_Push_Supplier_Proxy *supplier); + // void disconnecting (ACE_Push_Supplier_Proxy *supplier); + + virtual void push (ACE_ES_Dispatch_Request *request, + CORBA::Environment &); + // Enqueues the request on the appropriate Dispatch Queue. + + virtual void shutdown (void); + // Closes all queues "asynchronously." When all queues are closed, + // deletes them all and then deletes itself. + + virtual void dispatch_queue_closed (ACE_ES_Dispatch_Queue *q); + // Called when all the threads of a <q> have exited. Deletes <q>. + + // virtual ACE_HANDLE get_handle (void) const; + // Get the I/O handle. + +protected: + virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + // Called when input has arrived on a message queue. This is used + // for single-threaded implementations. + + virtual int handle_input (ACE_HANDLE); + // For single-threaded implementations on non-win32 platforms that + // use the ACE_Reactor_Notification_Strategy. This just forwards + // all calls to this->handle_signal (). + + ACE_ES_Notification_Strategy notification_strategy_; + // Shared between all dispatch queues. + + void initialize_queues (void); + // This is a hack to create a queue for each of the 4 rate groups. + + ACE_ES_Dispatch_Queue *queues_[ACE_Scheduler_MAX_PRIORITIES]; + // Pointers to dispatch queues. + + ACE_ES_Dispatch_Queue *delete_me_queues_[ACE_Scheduler_MAX_PRIORITIES]; + // Pointers to dispatch queues that want to die. + + int queue_count_[ACE_Scheduler_MAX_PRIORITIES]; + // The number of consumers using each queue. + + int highest_priority_; + // The highest priority queue in queues_. This allows us to + // optimize the handle_signal method. + + int shutdown_; + // Make sure to only shutdown the dispatching module once. + + int threads_per_queue_; + // The number of threads to spawn for each dispatch queue. +}; + +// ************************************************************ + +class ACE_ES_EFD_Dispatching : public ACE_ES_Dispatching_Base +// = TITLE +// Event Service EFD Dispatching Module +// +// = DESCRIPTION +// Implements a zero-threaded dispatcher with no preemption. +{ +public: + ACE_ES_EFD_Dispatching (ACE_EventChannel *channel); + // Acquires the proper qos structure and passes <channel> onto to + // the dispatching base constructor. + + virtual void push (ACE_ES_Dispatch_Request *request, + CORBA::Environment &); + // Forward up_. +}; + +// ************************************************************ + +class ACE_ES_RTU_Dispatching : public ACE_ES_Priority_Dispatching +// = TITLE +// Event Service RTU Dispatching Module +// +// = DESCRIPTION +// Implements a single-threaded dispatcher with delayed preemption. +{ +public: + ACE_ES_RTU_Dispatching (ACE_EventChannel *channel); + // Store <channel>. + + virtual int dispatch_event (ACE_ES_Dispatch_Request *request, + u_long &command_action); + // Called by ACE_Dispatch_Queues and handle_signal when an event + // needs to be dequeued. Implements an RTU-like delayed preemption + // policy. + + virtual void push (ACE_ES_Dispatch_Request *request, + CORBA::Environment &); + // Calls ACE_ES_Priority_Dispatching::push and then checks if + // preemption is necessary. +}; + +#if defined (__ACE_INLINE__) +#include "Dispatching_Modules.i" +#endif /* __ACE_INLINE__ */ + +#endif /* ACE_DISPATCHING_MODULES_H */ + + |