diff options
Diffstat (limited to 'ace/Proactor.h')
-rw-r--r-- | ace/Proactor.h | 492 |
1 files changed, 250 insertions, 242 deletions
diff --git a/ace/Proactor.h b/ace/Proactor.h index 7c08bfac14b..2766a512b48 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -10,36 +10,45 @@ // Proactor.h // // = AUTHOR -// Irfan Pyarali <irfan@cs.wustl.edu>, -// Tim Harrison <harrison@cs.wustl.edu> and +// Irfan Pyarali (irfan@cs.wustl.edu), +// Tim Harrison (harrison@cs.wustl.edu) and // Alexander Babu Arulanthu <alex@cs.wustl.edu> // // ============================================================================ -#if !defined (ACE_PROACTOR_H) +#ifndef ACE_PROACTOR_H #define ACE_PROACTOR_H #include "ace/OS.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) -#pragma once +# pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ -#if ((defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || (defined (ACE_HAS_AIO_CALLS))) -// This only works on Win32 platforms and on Unix platforms supporting -// POSIX aio calls. - #include "ace/Asynch_IO.h" -#include "ace/Asynch_IO_Impl.h" #include "ace/Thread_Manager.h" +#include "ace/Event_Handler.h" + #include "ace/Timer_Queue.h" #include "ace/Timer_List.h" #include "ace/Timer_Heap.h" #include "ace/Timer_Wheel.h" +#include "ace/Free_List.h" +#include "ace/Pipe.h" + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + (defined (ACE_HAS_AIO_CALLS)) +// This only works on Win32 platforms and on Unix platforms supporting +// aio calls. // Forward declarations. -class ACE_Proactor_Impl; +class ACE_Asynch_Result; +class ACE_Asynch_Operation; class ACE_Proactor_Timer_Handler; +class ACE_Proactor; +#if defined (ACE_HAS_AIO_CALLS) +class ACE_AIO_Accept_Handler; +#endif /* ACE_HAS_AIO_CALLS */ class ACE_Export ACE_Proactor_Handle_Timeout_Upcall { @@ -49,28 +58,26 @@ class ACE_Export ACE_Proactor_Handle_Timeout_Upcall // = DESCRIPTION // This class implements the functor required by the Timer // Queue to call <handle_timeout> on ACE_Handlers. - +public: + friend class ACE_Proactor; + // Proactor has special privileges, access needed to: proactor (). + typedef ACE_Timer_Queue_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_QUEUE; - // Type def for the timer queue. - - friend class ACE_Proactor; - // The main Proactor class has special permissions. + TIMER_QUEUE; -public: ACE_Proactor_Handle_Timeout_Upcall (void); // Constructor. - + int timeout (TIMER_QUEUE &timer_queue, - ACE_Handler *handler, - const void *arg, - const ACE_Time_Value &cur_time); + ACE_Handler *handler, + const void *arg, + const ACE_Time_Value &cur_time); // This method is called when the timer expires. int cancellation (TIMER_QUEUE &timer_queue, - ACE_Handler *handler); + ACE_Handler *handler); // This method is called when the timer is canceled. int deletion (TIMER_QUEUE &timer_queue, @@ -84,11 +91,10 @@ protected: // Set the proactor. This will fail, if one is already set! ACE_Proactor *proactor_; - // Handle to the proactor. This is needed for posting a timer result - // to the Proactor's completion queue. + // Handle to the proactor. This is needed for the completion port. }; -class ACE_Export ACE_Proactor +class ACE_Export ACE_Proactor : public ACE_Event_Handler { // = TITLE // A manager for asynchronous event demultiplexing. @@ -97,58 +103,84 @@ class ACE_Export ACE_Proactor // See the Proactor pattern description at // http://www.cs.wustl.edu/~schmidt/proactor.ps.gz for more // details. +public: + friend class ACE_Proactor_Timer_Handler; + // Timer Handler has special privileges because Access needed to: + // thr_mgr_ + + friend class ACE_Proactor_Handle_Timeout_Upcall; + // Access needed to: Asynch_Timer, and completion_port_. - // = Here are the private typedefs that the <ACE_Proactor> uses. + friend class ACE_Asynch_Operation; + // For POSIX4-compliant-Unix systems, the + // <register_aio_with_proactor> call is used by + // <ACE_Asynch_Operation> to store some information with the + // Proactor after an <aio_> call is issued, so that the Proactor can + // retrive this information to do <aio_return> and <aio_error>. +#if defined (ACE_HAS_AIO_CALLS) + friend class ACE_Asynch_Accept_Handler; + // For POSIX4 implementation, this class takes care of doing the + // Asynch_Accept. + + friend class ACE_AIO_Accept_Handler; + // We need also this class with the Proactor to take care of + // Asynch_Accept when we use AIO_CONTROL_BLOCKS. +#endif /* ACE_HAS_AIO_CALLS */ + + // = Here are the typedefs that the <ACE_Proactor> uses. + + // @@ Can these typedefs be capitalized? + typedef ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Queue; typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_QUEUE_ITERATOR; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Queue_Iterator; typedef ACE_Timer_List_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_LIST; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_List; typedef ACE_Timer_List_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_LIST_ITERATOR; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_List_Iterator; typedef ACE_Timer_Heap_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_HEAP; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Heap; typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_HEAP_ITERATOR; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Heap_Iterator; typedef ACE_Timer_Wheel_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_WHEEL; + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Wheel; typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_WHEEL_ITERATOR; - - // = Friendship. - - friend class ACE_Proactor_Timer_Handler; - // Timer handler runs a thread and manages the timers, on behalf of - // the Proactor. - -public: - typedef ACE_Timer_Queue_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> - TIMER_QUEUE; - // Public type. - - ACE_Proactor (ACE_Proactor_Impl *implementation = 0, - int delete_implementation = 0, - TIMER_QUEUE *tq = 0); - // Constructor. If <implementation> is 0, the correct implementation - // object will be created. <delete_implementation> flag determines - // whether the implementation object should be deleted by the - // Proactor or not. If <tq> is 0, a new TIMER_QUEUE is created. + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> + Timer_Wheel_Iterator; + + enum POSIX_COMPLETION_STRATEGY + { + // Use the real time signals and do <sigtimedwait> on the + // signals. + RT_SIGNALS, + // Store the <aio> control blocks with the <Proactor> and do + // <aio_suspend> on them, + AIO_CONTROL_BLOCKS + }; + // For Posix4-Compliat-Unix systems how the completion of the + // asynchronous calls should be got from the OS. + + ACE_Proactor (size_t number_of_threads = 0, + Timer_Queue *tq = 0, + int used_with_reactor_event_loop = 0, + POSIX_COMPLETION_STRATEGY completion_strategy = AIO_CONTROL_BLOCKS); + // A do nothing constructor. virtual ~ACE_Proactor (void); // Virtual destruction. @@ -177,9 +209,7 @@ public: static int end_event_loop (void); // Instruct the <ACE_Proactor::instance> to terminate its event - // loop. - // This method wakes up all the threads blocked on waiting for - // completions and end the event loop. + // loop. static int event_loop_done (void); // Report if the <ACE_Proactor::instance> event loop is finished. @@ -188,14 +218,14 @@ public: // Close the IO completion port. virtual int register_handle (ACE_HANDLE handle, - const void *completion_key); + const void *completion_key); // This method adds the <handle> to the I/O completion port. This - // function is a no-op function for Unix systems and returns 0; + // function is a no-op function for Unix systems. // = Timer management. virtual long schedule_timer (ACE_Handler &handler, - const void *act, - const ACE_Time_Value &time); + const void *act, + const ACE_Time_Value &time); // Schedule a <handler> that will expire after <time>. If it // expires then <act> is passed in as the value to the <handler>'s // <handle_timeout> callback method. This method returns a @@ -204,30 +234,30 @@ public: // up to values of greater than 2 billion timers. As long as timers // don't stay around longer than this there should be no problems // with accidentally deleting the wrong timer. Returns -1 on - // failure (which is guaranteed never to be a valid <timer_id>). + // failure (which is guaranteed never to be a valid <timer_id>. virtual long schedule_repeating_timer (ACE_Handler &handler, - const void *act, - const ACE_Time_Value &interval); + const void *act, + const ACE_Time_Value &interval); // Same as above except <interval> it is used to reschedule the // <handler> automatically. virtual long schedule_timer (ACE_Handler &handler, - const void *act, - const ACE_Time_Value &time, - const ACE_Time_Value &interval); + const void *act, + const ACE_Time_Value &time, + const ACE_Time_Value &interval); // This combines the above two methods into one. Mostly for backward // compatibility. virtual int cancel_timer (ACE_Handler &handler, - int dont_call_handle_close = 1); + int dont_call_handle_close = 1); // Cancel all timers associated with this <handler>. Returns number // of timers cancelled. virtual int cancel_timer (long timer_id, - const void **act = 0, - int dont_call_handle_close = 1); + const void **act = 0, + int dont_call_handle_close = 1); // Cancel the single <ACE_Handler> that matches the <timer_id> value // (which was returned from the <schedule> method). If <act> is // non-NULL then it will be set to point to the ``magic cookie'' @@ -238,16 +268,21 @@ public: virtual int handle_events (ACE_Time_Value &wait_time); // Dispatch a single set of events. If <wait_time> elapses before - // any events occur, return 0. Return 1 on success i.e., when a - // completion is dispatched, non-zero (-1) on errors and errno is - // set accordingly. + // any events occur, return. Return 0 on success, non-zero (-1) on + // timeouts/errors and errno is set accordingly. virtual int handle_events (void); // Block indefinitely until at least one event is dispatched. - // Dispatch a single set of events. If <wait_time> elapses before - // any events occur, return 0. Return 1 on success i.e., when a - // completion is dispatched, non-zero (-1) on errors and errno is - // set accordingly. + // Return 0 on success, non-zero (-1) on timeouts/errors and errno + // is set accordingly. + + virtual int post_completion (ACE_Asynch_Result *result); + // Post a result to the completion port of the Proactor. If errors + // occur, the result will be deleted by this method. If successful, + // the result will be deleted by the Proactor when the result is + // removed from the completion port. Therefore, the result should + // have been dynamically allocated and should be orphaned by the + // user once this method is called. int wake_up_dispatch_threads (void); // Add wakeup dispatch threads (reinit). @@ -259,176 +294,148 @@ public: void number_of_threads (size_t threads); // Number of thread used as a parameter to CreatIoCompletionPort. - TIMER_QUEUE *timer_queue (void) const; - void timer_queue (TIMER_QUEUE *timer_queue); + Timer_Queue *timer_queue (void) const; + void timer_queue (Timer_Queue *); // Get/Set timer queue. - + virtual ACE_HANDLE get_handle (void) const; // Get the event handle. - // It is a no-op in POSIX platforms and it returns - // ACE_INVALID_HANDLE. - virtual ACE_Proactor_Impl *implementation (void) const; - // Get the implementation class. +#if defined (ACE_HAS_AIO_CALLS) +#if 0 + void posix_completion_strategy (POSIX_COMPLETION_STRATEGY strategy); + // Set the completion strategy. +#endif /* 0 */ - // - // = Factory methods for the operations - // - // Note that the user does not have to use or know about these - // methods. - - virtual ACE_Asynch_Read_Stream_Impl *create_asynch_read_stream (void); - // Create the correct implementation class for doing Asynch_Read_Stream. - - virtual ACE_Asynch_Write_Stream_Impl *create_asynch_write_stream (void); - // Create the correct implementation class for doing Asynch_Write_Stream. + POSIX_COMPLETION_STRATEGY posix_completion_strategy (void); + // Return the completion strategy used. - virtual ACE_Asynch_Read_File_Impl *create_asynch_read_file (void); - // Create the correct implementation class for doing Asynch_Read_File. - - virtual ACE_Asynch_Write_File_Impl *create_asynch_write_file (void); - // Create the correct implementation class for doing Asynch_Write_File. - - virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void); - // Create the correct implementation class for doing Asynch_Accept. - - virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); - // Create the correct implementation class for doing Asynch_Transmit_File. - - // - // = Factory methods for the results - // - // Note that the user does not have to use or know about these - // methods unless they want to "fake" results. - - virtual ACE_Asynch_Read_Stream_Result_Impl *create_asynch_read_stream_result (ACE_Handler &handler, - ACE_HANDLE handle, - ACE_Message_Block &message_block, - u_long bytes_to_read, - const void* act, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Read_Stream::Result class. - - virtual ACE_Asynch_Write_Stream_Result_Impl *create_asynch_write_stream_result (ACE_Handler &handler, - ACE_HANDLE handle, - ACE_Message_Block &message_block, - u_long bytes_to_write, - const void* act, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Write_Stream::Result. - - virtual ACE_Asynch_Read_File_Result_Impl *create_asynch_read_file_result (ACE_Handler &handler, - ACE_HANDLE handle, - ACE_Message_Block &message_block, - u_long bytes_to_read, - const void* act, - u_long offset, - u_long offset_high, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Read_File::Result. - - virtual ACE_Asynch_Write_File_Result_Impl *create_asynch_write_file_result (ACE_Handler &handler, - ACE_HANDLE handle, - ACE_Message_Block &message_block, - u_long bytes_to_write, - const void* act, - u_long offset, - u_long offset_high, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Write_File::Result. - - virtual ACE_Asynch_Accept_Result_Impl *create_asynch_accept_result (ACE_Handler &handler, - ACE_HANDLE listen_handle, - ACE_HANDLE accept_handle, - ACE_Message_Block &message_block, - u_long bytes_to_read, - const void* act, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Accept::Result. - - virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, - ACE_HANDLE socket, - ACE_HANDLE file, - ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, - u_long bytes_to_write, - u_long offset, - u_long offset_high, - u_long bytes_per_send, - u_long flags, - const void *act, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create the correct implementation class for ACE_Asynch_Transmit_File::Result. - - virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, - const void *act, - const ACE_Time_Value &tv, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Create a timer result object which can be used with the Timer - // mechanism of the Proactor. - // If <signal_number> is -1, <POSIX_SIG_Proactor> will create a - // Timer object with a meaningful signal number, choosing the - // largest signal number from the signal mask of the Proactor. + int notify_asynch_accept (ACE_Asynch_Accept::Result* result); + // Asynch_Accept calls this function to notify an accept to the + // Proactor. +#endif /* ACE_HAS_AIO_CALLS */ protected: + virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); + // Called when object is signaled by OS (either via UNIX signals or + // when a Win32 object becomes signaled). + + virtual int handle_close (ACE_HANDLE handle, + ACE_Reactor_Mask close_mask); + // Called when object is removed from the ACE_Reactor. + + void application_specific_code (ACE_Asynch_Result *asynch_result, + u_long bytes_transferred, + int success, + const void *completion_key, + u_long error); + // Protect against structured exceptions caused by user code when + // dispatching handles. + + virtual int handle_events (unsigned long milli_seconds); + // Dispatch a single set of events. If <milli_seconds> elapses + // before any events occur, return. + + // @@ Alex, many C++ compilers don't like nested classes. Can you + // please bring this into the "outer scope" and add an "ACE_" prefix + // to it? + class ACE_Export Asynch_Timer : protected ACE_Asynch_Result + { + // = TITLE + // This class is posted to the completion port when a timer + // expires. When the complete method of this object is + // called, the <handler>'s handle_timeout method will be + // called. + public: + friend class ACE_Proactor_Handle_Timeout_Upcall; + // Timer Handler has special privileges + // Access needed to: convert Asynch_Timer into an OVERLAPPED + + Asynch_Timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &tv, + ACE_HANDLE event = ACE_INVALID_HANDLE); + + protected: + virtual void complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error = 0); + // This method calls the <handler>'s handle_timeout method + + ACE_Time_Value time_; + // Time value requested by caller + }; + +#if defined (ACE_HAS_AIO_CALLS) + POSIX_COMPLETION_STRATEGY posix_completion_strategy_; + // Flag that indicates how the completion status is got from the OS + // on the POSIX4-Compliant-Unix systems. + + sigset_t RT_completion_signals_; + // These signals are used for completion notification by the + // Proactor. + // These signals are masked in the current process. + // By default, ACE_SIG_AIO_READ and ACE_SIG_AIO_WRITE are + // the two signals used for completion notification. But if the + // user has specified someother signals in any of the + // read/write/transmit operations, some other signals might also + // have got masked. + + ACE_AIO_Accept_Handler* aio_accept_handler_; + // This class takes care of doing <accept> when we use + // AIO_CONTROL_BLOCKS strategy. + + aiocb *aiocb_list_ [ACE_RTSIG_MAX]; + // Use an array to keep track of all the aio's issued + // currently. We'll limit the array size to Maximum RT signals that + // can be queued in a process. This is the upper limit how many aio + // operations can be pending at a time. + + size_t aiocb_list_max_size_; + // To maintain the maximum size of the array (list). + + size_t aiocb_list_cur_size_; + // To maintain the current size of the array (list). +#elif defined (ACE_WIN32) + ACE_HANDLE completion_port_; + // Handle for the completion port. Unix doesnt have completion + // ports. + + size_t number_of_threads_; + // This number is passed to the <CreatIOCompletionPort> system + // call. +#endif /* ACE_HAS_AIO_CALLS */ + + Timer_Queue *timer_queue_; + // Timer Queue. - static int post_wakeup_completions (int how_many); - // Post <how_many> completions to the completion port so that all - // threads can wake up. This is used in conjunction with the - // <run_event_loop>. + int delete_timer_queue_; + // Flag on whether to delete the timer queue. + + ACE_Proactor_Timer_Handler *timer_handler_; + // Handles timeouts events. - virtual void implementation (ACE_Proactor_Impl *implementation); - // Set the implementation class. + ACE_Thread_Manager thr_mgr_; + // This will manage the thread in the Timer_Handler. - ACE_Proactor_Impl *implementation_; - // Delegation/implementation class that all methods will be - // forwarded to. + ACE_Auto_Event event_; + // This event is used in conjunction with Reactor when we try to + // integrate the event loops of Reactor and the Proactor. - int delete_implementation_; - // Flag used to indicate whether we are responsible for cleaning up - // the implementation instance. + int used_with_reactor_event_loop_; + // Flag that indicates whether we are used in conjunction with + // Reactor. +private: static ACE_Proactor *proactor_; // Pointer to a process-wide <ACE_Proactor>. static int delete_proactor_; // Must delete the <proactor_> if non-0. - - ACE_Proactor_Timer_Handler *timer_handler_; - // Handles timeout events. - - ACE_Thread_Manager thr_mgr_; - // This will manage the thread in the Timer_Handler. - - TIMER_QUEUE *timer_queue_; - // Timer Queue. - - int delete_timer_queue_; - // Flag on whether to delete the timer queue. static sig_atomic_t end_event_loop_; // Terminate the proactor event loop. - - static sig_atomic_t event_loop_thread_count_; - // Number of threads in the event loop. - -private: - ACE_Proactor (const ACE_Proactor &); - ACE_Proactor &operator= (const ACE_Proactor &); - // Deny access since member-wise won't work... }; #if defined (__ACE_INLINE__) @@ -441,7 +448,7 @@ class ACE_Export ACE_Proactor public: class Timer_Queue {}; ACE_Proactor (size_t /* number_of_threads */ = 0, - Timer_Queue * /* tq */ = 0) {} + Timer_Queue * /* tq */ = 0) {} virtual int handle_events (void) { return -1; } virtual int handle_events (ACE_Time_Value &) { return -1; } @@ -466,5 +473,6 @@ public: static sig_atomic_t event_loop_done (void); // Placeholder to enable compilation on non-Win32 platforms }; -#endif /* ACE_WIN32 && !ACE_HAS_WINCE || ACE_HAS_AIO_CALLS*/ + +#endif /* ACE_WIN32 */ #endif /* ACE_PROACTOR_H */ |