/* -*- C++ -*- */ //============================================================================= /** * @file Proactor.h * * $Id$ * * @author Irfan Pyarali <irfan@cs.wustl.edu> * @author Tim Harrison <harrison@cs.wustl.edu> * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= #ifndef ACE_PROACTOR_H #define ACE_PROACTOR_H #include "ace/pre.h" #include "ace/config-all.h" #include "ace/ACE_export.h" #include "ace/OS.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) #pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ #if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) // This only works on Win32 platforms and on Unix platforms supporting // POSIX aio calls. # include "ace/Asynch_IO.h" # include "ace/Asynch_IO_Impl.h" # include "ace/Thread_Manager.h" # include "ace/Timer_Queue.h" # include "ace/Timer_List.h" # include "ace/Timer_Heap.h" # include "ace/Timer_Wheel.h" // Forward declarations. class ACE_Proactor_Impl; class ACE_Proactor_Timer_Handler; /** * @class ACE_Proactor_Handle_Timeout_Upcall * * @brief Functor for <ACE_Timer_Queue>. * * This class implements the functor required by the Timer * Queue to call <handle_timeout> on ACE_Handlers. */ class ACE_Export ACE_Proactor_Handle_Timeout_Upcall { /// Type def for the timer queue. typedef ACE_Timer_Queue_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_QUEUE; /// The main Proactor class has special permissions. friend class ACE_Proactor; public: /// Constructor. ACE_Proactor_Handle_Timeout_Upcall (void); /// This method is called when the timer expires. int timeout (TIMER_QUEUE &timer_queue, ACE_Handler *handler, const void *arg, const ACE_Time_Value &cur_time); /// This method is called when the timer is canceled. int cancellation (TIMER_QUEUE &timer_queue, ACE_Handler *handler); /// This method is called when the timer queue is destroyed and the /// timer is still contained in it. int deletion (TIMER_QUEUE &timer_queue, ACE_Handler *handler, const void *arg); protected: /// Set the proactor. This will fail, if one is already set! int proactor (ACE_Proactor &proactor); /// Handle to the proactor. This is needed for posting a timer result /// to the Proactor's completion queue. ACE_Proactor *proactor_; }; /** * @class ACE_Proactor * * @brief A manager for asynchronous event demultiplexing. * * See the Proactor pattern description at * http://www.cs.wustl.edu/~schmidt/proactor.ps.gz for more * details. */ class ACE_Export ACE_Proactor { // = Here are the private typedefs that the <ACE_Proactor> uses. typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_QUEUE_ITERATOR; typedef ACE_Timer_List_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_LIST; typedef ACE_Timer_List_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_LIST_ITERATOR; typedef ACE_Timer_Heap_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_HEAP; typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_HEAP_ITERATOR; typedef ACE_Timer_Wheel_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_WHEEL; typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_WHEEL_ITERATOR; // = Friendship. /// Timer handler runs a thread and manages the timers, on behalf of /// the Proactor. friend class ACE_Proactor_Timer_Handler; public: /// Public type. typedef ACE_Timer_Queue_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_QUEUE; /** * Constructor. If <implementation> is 0, the correct implementation * object will be created. <delete_implementation> flag determines * whether the implementation object should be deleted by the * Proactor or not. If <tq> is 0, a new TIMER_QUEUE is created. */ ACE_Proactor (ACE_Proactor_Impl *implementation = 0, int delete_implementation = 0, TIMER_QUEUE *tq = 0); /// Virtual destruction. virtual ~ACE_Proactor (void); /// Get pointer to a process-wide <ACE_Proactor>. <threads> should /// be part of another method. static ACE_Proactor *instance (size_t threads = 0); /// Set pointer to a process-wide <ACE_Proactor> and return existing /// pointer. static ACE_Proactor *instance (ACE_Proactor * proactor, int delete_proactor = 0); /// Delete the dynamically allocated Singleton. static void close_singleton (void); /// Cleanup method, used by the <ACE_Object_Manager> to destroy the /// singleton. static void cleanup (void *instance, void *arg); // = Proactor event loop management methods. /// Run the event loop until the <ACE_Proactor::handle_events> method /// returns -1 or the <end_event_loop> method is invoked. static int run_event_loop (void); /** * Run the event loop until the <ACE_Proactor::handle_events> method * returns -1, the <end_event_loop> method is invoked, or the * <ACE_Time_Value> expires. */ static int run_event_loop (ACE_Time_Value &tv); /** * Instruct the <ACE_Proactor::instance> to terminate its event * loop. * This method wakes up all the threads blocked on waiting for * completions and end the event loop. */ static int end_event_loop (void); /** * Resets the <ACE_Proactor::end_event_loop_> static so that the * <run_event_loop> method can be restarted. */ static int reset_event_loop (void); /** * The singleton proactor is used by the <ACE_Service_Config>. * Therefore, we must check for the reconfiguration request and * handle it after handling an event. */ static int check_reconfiguration (ACE_Proactor *); /// Report if the <ACE_Proactor::instance> event loop is finished. static int event_loop_done (void); /// Close the associated @c ACE_Proactor_Impl implementation object. /** * If @arg delete_implementation was specified to the @c open() method, * the implementation object is also deleted. */ virtual int close (void); /** * You can add a hook to various run_event methods and the hook will * be called after handling every proactor event. If this function * returns 0, proactor_run_event_loop will check for the return value of * handle_events. If it is -1, the the proactor_run_event_loop will return * (pre-maturely.) */ typedef int (*PROACTOR_EVENT_HOOK)(ACE_Proactor *); // These methods work with an instance of a proactor. /** * Run the event loop until the * <ACE_Proactor::handle_events> * method returns -1 or the <end_proactor_event_loop> method is invoked. */ virtual int proactor_run_event_loop (PROACTOR_EVENT_HOOK = 0); /** * Run the event loop until the <ACE_Proactor::handle_events> * method returns -1, the * <end_proactor_event_loop> method is invoked, * or the <ACE_Time_Value> * expires. */ virtual int proactor_run_event_loop (ACE_Time_Value &tv, PROACTOR_EVENT_HOOK = 0); /** * Instruct the ACE_Proactor to terminate its event loop * and notifies the ACE_Proactor so that it can wake up * and close down gracefully. */ virtual int proactor_end_event_loop (void); /// Report if the ACE_Proactor event loop is finished. virtual int proactor_event_loop_done (void); /// Resets the <ACE_Reactor::end_event_loop_> static so that the /// <run_event_loop> method can be restarted. virtual int proactor_reset_event_loop (void); /// This method adds the <handle> to the I/O completion port. This /// function is a no-op function for Unix systems and returns 0; virtual int register_handle (ACE_HANDLE handle, const void *completion_key); // = Timer management. /** * Schedule a <handler> that will expire after <time>. If it * expires then <act> is passed in as the value to the <handler>'s * <handle_timeout> callback method. This method returns a * <timer_id>. This <timer_id> can be used to cancel a timer before * it expires. The cancellation ensures that <timer_ids> are unique * up to values of greater than 2 billion timers. As long as timers * don't stay around longer than this there should be no problems * with accidentally deleting the wrong timer. Returns -1 on * failure (which is guaranteed never to be a valid <timer_id>). */ virtual long schedule_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &time); virtual long schedule_repeating_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &interval); // Same as above except <interval> it is used to reschedule the // <handler> automatically. /// This combines the above two methods into one. Mostly for backward /// compatibility. virtual long schedule_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &time, const ACE_Time_Value &interval); /// Cancel all timers associated with this <handler>. Returns number /// of timers cancelled. virtual int cancel_timer (ACE_Handler &handler, int dont_call_handle_close = 1); /** * Cancel the single <ACE_Handler> that matches the <timer_id> value * (which was returned from the <schedule> method). If <act> is * non-NULL then it will be set to point to the ``magic cookie'' * argument passed in when the <Handler> was registered. This makes * it possible to free up the memory and avoid memory leaks. * Returns 1 if cancellation succeeded and 0 if the <timer_id> * wasn't found. */ virtual int cancel_timer (long timer_id, const void **act = 0, int dont_call_handle_close = 1); /** * Dispatch a single set of events, waiting up to a specified time limit * if necessary. * @param wait_time the time to wait for an event to occur. This is * a relative time. On successful return, the time is updated to * reflect the amount of time spent waiting for event(s) to occur. * @return Returns 0 if no events occur before the wait_time expires. * Returns 1 when a completion is dispatched. On error, returns -1 * and sets errno accordingly. */ virtual int handle_events (ACE_Time_Value &wait_time); /** * Block indefinitely until at least one event is dispatched. * @return Returns 1 when a completion is dispatched. On error, returns -1 * and sets errno accordingly. */ virtual int handle_events (void); /// Add wakeup dispatch threads (reinit). int wake_up_dispatch_threads (void); /// Close all dispatch threads. int close_dispatch_threads (int wait); /// Get number of thread used as a parameter to CreatIoCompletionPort. size_t number_of_threads (void) const; /// Set number of thread used as a parameter to CreatIoCompletionPort. void number_of_threads (size_t threads); /// Get timer queue. TIMER_QUEUE *timer_queue (void) const; /// Set timer queue. void timer_queue (TIMER_QUEUE *timer_queue); /** * Get the event handle. * It is a no-op in POSIX platforms and it returns * ACE_INVALID_HANDLE. */ virtual ACE_HANDLE get_handle (void) const; /// Get the implementation class. virtual ACE_Proactor_Impl *implementation (void) const; // = Factory methods for the operations // Note that the user does not have to use or know about these // methods. /// Create the correct implementation class for doing /// Asynch_Read_Stream. virtual ACE_Asynch_Read_Stream_Impl *create_asynch_read_stream (void); /// Create the correct implementation class for doing /// Asynch_Write_Stream. virtual ACE_Asynch_Write_Stream_Impl *create_asynch_write_stream (void); /// Create the correct implementation class for doing /// Asynch_Read_File. virtual ACE_Asynch_Read_File_Impl *create_asynch_read_file (void); /// Create the correct implementation class for doing /// Asynch_Write_File. virtual ACE_Asynch_Write_File_Impl *create_asynch_write_file (void); /// Create the correct implementation class for doing Asynch_Accept. virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void); /// Create the correct implementation class for doing Asynch_Connect. virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void); /// Create the correct implementation class for doing /// Asynch_Transmit_File. virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); /// Create the correct implementation class for doing /// Asynch_Read_Dgram. virtual ACE_Asynch_Read_Dgram_Impl *create_asynch_read_dgram (void); /// Create the correct implementation class for doing /// Asynch_Write_Dgram. virtual ACE_Asynch_Write_Dgram_Impl *create_asynch_write_dgram (void); // = Factory methods for the results // Note that the user does not have to use or know about these // methods unless they want to "fake" results. /// Create the correct implementation class for /// ACE_Asynch_Read_Stream::Result class. virtual ACE_Asynch_Read_Stream_Result_Impl * create_asynch_read_stream_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Write_Stream::Result. virtual ACE_Asynch_Write_Stream_Result_Impl * create_asynch_write_stream_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Read_File::Result. virtual ACE_Asynch_Read_File_Result_Impl * create_asynch_read_file_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Write_File::Result. virtual ACE_Asynch_Write_File_Result_Impl * create_asynch_write_file_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, u_long offset, u_long offset_high, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Read_Dgram::Result. virtual ACE_Asynch_Read_Dgram_Result_Impl * create_asynch_read_dgram_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_read, int flags, int protocol_family, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Write_Dgram::Result. virtual ACE_Asynch_Write_Dgram_Result_Impl * create_asynch_write_dgram_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block *message_block, size_t bytes_to_write, int flags, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for ACE_Asynch_Accept::Result. virtual ACE_Asynch_Accept_Result_Impl * create_asynch_accept_result (ACE_Handler &handler, ACE_HANDLE listen_handle, ACE_HANDLE accept_handle, ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for ACE_Asynch_Connect::Result virtual ACE_Asynch_Connect_Result_Impl * create_asynch_connect_result (ACE_Handler &handler, ACE_HANDLE connect_handle, const void* act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /// Create the correct implementation class for /// ACE_Asynch_Transmit_File::Result. virtual ACE_Asynch_Transmit_File_Result_Impl * create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, ACE_HANDLE file, ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, u_long bytes_to_write, u_long offset, u_long offset_high, u_long bytes_per_send, u_long flags, const void *act, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); /** * Create a timer result object which can be used with the Timer * mechanism of the Proactor. * If <signal_number> is -1, <POSIX_SIG_Proactor> will create a * Timer object with a meaningful signal number, choosing the * largest signal number from the signal mask of the Proactor. */ virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN); protected: /** * Post <how_many> completions to the completion port so that all * threads can wake up. This is used in conjunction with the * <run_event_loop>. */ static int post_wakeup_completions (int how_many); /** * Post <how_many> completions to the completion port so that all * threads can wake up. This is used in conjunction with the * <proactor_run_event_loop>. */ virtual int proactor_post_wakeup_completions (int how_many); /// Set the implementation class. virtual void implementation (ACE_Proactor_Impl *implementation); /// Delegation/implementation class that all methods will be /// forwarded to. ACE_Proactor_Impl *implementation_; /// Flag used to indicate whether we are responsible for cleaning up /// the implementation instance. int delete_implementation_; /// Pointer to a process-wide <ACE_Proactor>. static ACE_Proactor *proactor_; /// Must delete the <proactor_> if non-0. static int delete_proactor_; /// Handles timeout events. ACE_Proactor_Timer_Handler *timer_handler_; /// This will manage the thread in the Timer_Handler. ACE_Thread_Manager thr_mgr_; /// Timer Queue. TIMER_QUEUE *timer_queue_; /// Flag on whether to delete the timer queue. int delete_timer_queue_; /// Terminate the proactor event loop. sig_atomic_t end_event_loop_; /// Number of threads in the event loop. sig_atomic_t event_loop_thread_count_; /// Mutex to protect work with lists. ACE_SYNCH_MUTEX mutex_; private: /// Deny access since member-wise won't work... ACE_Proactor (const ACE_Proactor &); ACE_Proactor &operator= (const ACE_Proactor &); }; # if defined (__ACE_INLINE__) # include "ace/Proactor.i" # endif /* __ACE_INLINE__ */ #else /* NOT WIN32 or POSIX with AIO features. */ class ACE_Time_Value; class ACE_Export ACE_Proactor { public: class Timer_Queue {}; ACE_Proactor (size_t /* number_of_threads */ = 0, Timer_Queue * /* tq */ = 0) {} virtual int handle_events (void) { return -1; } virtual int handle_events (ACE_Time_Value &) { return -1; } /// Placeholder to enable compilation on non-Win32 platforms static ACE_Proactor *instance (size_t threads = 0); /// Placeholder to enable compilation on non-Win32 platforms static ACE_Proactor *instance (ACE_Proactor *); /// Placeholder to enable compilation on non-Win32 platforms static void close_singleton (void); /// Placeholder to enable compilation on non-Win32 platforms static int run_event_loop (void); /// Placeholder to enable compilation on non-Win32 platforms static int run_event_loop (ACE_Time_Value &tv); /// Placeholder to enable compilation on non-Win32 platforms static int end_event_loop (void); /// Placeholder to enable compilation on non-Win32 platforms static sig_atomic_t event_loop_done (void); }; #endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ #include "ace/post.h" #endif /* ACE_PROACTOR_H */