diff options
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 62 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.h | 12 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 267 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 110 | ||||
-rw-r--r-- | ace/Proactor.cpp | 94 | ||||
-rw-r--r-- | ace/Proactor.h | 42 | ||||
-rw-r--r-- | ace/Proactor_Impl.h | 20 | ||||
-rw-r--r-- | ace/WIN32_Proactor.cpp | 2 | ||||
-rw-r--r-- | ace/config-linux-lxpthreads.h | 2 | ||||
-rw-r--r-- | ace/config-lynxos.h | 10 |
10 files changed, 371 insertions, 250 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index f789eb9cdec..a427e00c7ba 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -236,12 +236,6 @@ ACE_POSIX_SIG_Asynch_Operation::~ACE_POSIX_SIG_Asynch_Operation (void) { } -int -ACE_POSIX_SIG_Asynch_Operation::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) -{ - return this->posix_proactor ()->register_aio_with_proactor (result); -} - // ********************************************************************* u_long @@ -540,10 +534,6 @@ ACE_POSIX_SIG_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Resu result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; result->aio_sigevent.sigev_signo = result->signal_number (); result->aio_sigevent.sigev_value.sival_ptr = (void *) result; - - // Register the real-time signal with the Proactor. - if (this->register_aio_with_proactor (result) == -1) - return -1; // Fire off the aio read. if (aio_read (result) == -1) @@ -873,10 +863,6 @@ ACE_POSIX_SIG_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_R result->aio_sigevent.sigev_signo = result->signal_number (); result->aio_sigevent.sigev_value.sival_ptr = (void *) result; - // Register the real-time signal with the Proactor. - if (this->register_aio_with_proactor (result) == -1) - return -1; - // Fire off the aio write. if (aio_write (result) == -1) // Queueing failed. @@ -1957,9 +1943,8 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Acce // Do the work. if (this->register_accept_call_i (result) == -1) return -1; - - // Also register the real-time signal. - return this->posix_proactor_->register_aio_with_proactor (result); + + return 0; } int @@ -2476,7 +2461,9 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler // = DESCRIPTION // // This is a helper class for implementing - // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. + // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. This class + // abstracts out all the commonalities in the two different + // POSIX Transmit Handler implementations. public: virtual ~ACE_POSIX_Asynch_Transmit_Handler (void); @@ -2695,32 +2682,25 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W // Check the success parameter. if (result.success () == 0) { + // Failure. ACE_ERROR ((LM_ERROR, "Asynch_Transmit_File failed.\n")); - - // Check the success parameter. - if (result.success () == 0) + + ACE_SEH_TRY { - // Failure. - ACE_ERROR ((LM_ERROR, - "Asynch_Transmit_File failed.\n")); - - ACE_SEH_TRY - { - this->result_->complete (this->bytes_transferred_, - 0, // Failure. - 0, // @@ Completion key. - 0); // @@ Error no. - } - ACE_SEH_FINALLY - { - // This is crucial to prevent memory leaks. This deletes - // the result pointer also. - delete this; - } + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + 0); // @@ Error no. + } + ACE_SEH_FINALLY + { + // This is crucial to prevent memory leaks. This deletes + // the result pointer also. + delete this; } } - + // Write stream successful. // Partial write to socket. @@ -2816,8 +2796,10 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read if (result.bytes_transferred () == 0) return; - // Increment offset and write data to network. + // Increment offset. this->file_offset_ += result.bytes_transferred (); + + // Write data to network. if (this->ws_.write (result.message_block (), result.bytes_transferred (), (void *)&this->data_act_, diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h index 521ecd31b45..56224912845 100644 --- a/ace/POSIX_Asynch_IO.h +++ b/ace/POSIX_Asynch_IO.h @@ -172,12 +172,6 @@ protected: virtual ~ACE_POSIX_Asynch_Operation (void); // Destructor. - virtual int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) = 0; - // <Asynch_Operation> class call this method to register the <aio_> - // call with the Proactor, so that Proactor can do completion - // querying effectively. Different POSIX Proactor implementations do - // different things to register the <aio_>. - ACE_Proactor *proactor_; // Proactor that this Asynch IO will be registered with. @@ -245,12 +239,6 @@ protected: virtual ~ACE_POSIX_SIG_Asynch_Operation (void); // Destructor. - int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result); - // <Asynch_Operation> classes call this, to register the real-time - // signal used to issue the <aio_> call with the Proator. This - // should be done so that the Proactor can wait for completions of - // the asynchronous calls, issued using those signals. - ACE_POSIX_SIG_Proactor *posix_sig_proactor_; // It is easy to get this specific implementation proactor here, // since it is the one that creates the correct POSIX_Asynch_* diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 3861e8aa110..5bfcc4e1c5a 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -720,13 +720,80 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) { - // Make the sigset_t consisting of the completion signals. + // = Mask all the signals, keep a mask set with ACE_SIGRTMIN and set + // up signal handler for SIGRTMIN. + + // Mask all the signals. + if (this->mask_all () != 0) + return; + + // = Keep a mask set with ACE_SIGRTMIN. + + // Clear the signal set. if (sigemptyset (&this->RT_completion_signals_) == -1) ACE_ERROR ((LM_ERROR, "Error:%p\n", "Couldn't init the RT completion signal set")); + + // Add the signal number to the signal set. + if (sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Couldnt init the RT completion signal set")); + + // Set up the signal handler for SIGRTMIN. + setup_signal_handler (ACE_SIGRTMIN); } +ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set) +{ + // = Keep <Signal_set> with the Proactor, mask all the signals and + // setup signal handlers for the signals in the <signal_set>. + + // = Keep <signal_set> with the Proactor. + + // Empty the signal set first. + if (sigemptyset (&this->RT_completion_signals_) == -1) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "sigemptyset failed")); + + // Put the <signal_set>. + if (pthread_sigmask (SIG_SETMASK, &signal_set, 0) != 0) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed")); + + // Get the <signal_set> back from the OS. + if (pthread_sigmask (SIG_SETMASK, 0, &this->RT_completion_signals_) != 0) + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed")); + + + // Mask all the signals. + if (this->mask_all () != 0) + return; + + // For each signal number present in the <signal_set>, set up the + // signal handler. + int member = 0; + for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) + { + member = sigismember (&signal_set, + si); + if (member == -1) + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:" + "sigismember failed")); + else if (member == 1) + { + if (this->setup_signal_handler (si) == -1) + return; + } + } +} ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { @@ -754,23 +821,19 @@ ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) pid_t pid = ACE_OS::getpid (); if (pid == (pid_t) -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p", - "<getpid> failed\n"), + "Error:%N:%l(%P | %t):%p", + "<getpid> failed"), -1); // Set the signal information. sigval value; value.sival_ptr = (void *) result; - // Register the signal number with the Proactor. - if (this->register_aio_with_proactor (result) == -1) - return -1; - // Queue the signal. if (sigqueue (pid, result->signal_number (), value) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p", - "<sigqueue> failed\n"), + "Error:%N:%l:(%P | %t):%p\n", + "<sigqueue> failed"), -1); return 0; } @@ -825,15 +888,131 @@ ACE_POSIX_SIG_Proactor::create_asynch_transmit_file (void) return implementation; } +ACE_Asynch_Result_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &tv, + ACE_HANDLE event, + int priority, + int signal_number) +{ + int is_member = 0; + + // Fix the signal number. + if (signal_number == -1) + { + int si; + for (si = ACE_SIGRTMAX; + (is_member == 0) && (si >= ACE_SIGRTMIN); + si--) + { + is_member = sigismember (&this->RT_completion_signals_, + si); + if (is_member == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::\n", + "ACE_POSIX_SIG_Proactor::create_asynch_timer:" + "sigismember failed"), + 0); + } + + if (is_member == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%N:%l:(%P | %t)::%s\n", + "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:" + "Signal mask set empty"), + 0); + else + // + 1 to nullify loop increment. + signal_number = si + 1; + } + + ACE_Asynch_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Timer (handler, + act, + tv, + event, + priority, + signal_number), + 0); + return implementation; +} + +int +ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const +{ + // Set up the handler(actually Null handler) for this real-time + // signal. + struct sigaction reaction; + sigemptyset (&reaction.sa_mask); // Nothing else to mask. + reaction.sa_flags = SA_SIGINFO; // Realtime flag. +#if defined (SA_SIGACTION) + // Lynx says, it is better to set this bit, to be portable. + reaction.sa_flags &= SA_SIGACTION; +#endif /* SA_SIGACTION */ + reaction.sa_sigaction = null_handler; // Null handler function. + int sigaction_return = sigaction (signal_number, + &reaction, + 0); + if (sigaction_return == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Proactor couldnt do sigaction for the RT SIGNAL"), + -1); + return 0; +} + +void +ACE_POSIX_SIG_Proactor::null_handler (int signal_number, + siginfo_t * /* info */, + void * /* context */) +{ + ACE_ERROR ((LM_ERROR, + "Error:(%P | %t):%s:Signal number %d\n" + "Mask all the RT signals for this thread", + "ACE_POSIX_SIG_Proactor::null_handler called", + signal_number)); +} + +int +ACE_POSIX_SIG_Proactor::mask_all (void) const +{ + sigset_t full_set; + + // Get full set. + if (sigfillset (&full_set) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p\n", + "sigfillset failed"), + -1); + + // Mask them. + if (pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p\n", + "pthread_sigmask failed"), + -1); + + return 0; +} + int ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) { int result_sigwait = 0; siginfo_t sig_info; + + // Mask all the signals. + if (this->mask_all () != 0) + return -1; + // Wait for the signals. if (milli_seconds == ACE_INFINITE) - result_sigwait = sigwaitinfo (&this->RT_completion_signals_, - &sig_info); + { + result_sigwait = sigwaitinfo (&this->RT_completion_signals_, + &sig_info); + } else { // Wait for <milli_seconds> amount of time. @@ -862,22 +1041,10 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) // No errors, RT compleion signal is received. - // We deal only with the signal numbers that have been registered - // already. - int member = sigismember (&this->RT_completion_signals_, - result_sigwait); - if (member != 1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_SIG_Proactor::handle_events:" - "Unexpected signal (%d) received while waiting for RT Completion Signal", - result_sigwait), - -1); - // Is the signo returned consistent with the sig info? if (sig_info.si_signo != result_sigwait) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):" + "Error:%N:%l:(%P | %t):" "ACE_POSIX_SIG_Proactor::handle_events:" "Inconsistent signal number (%d) in the signal info block", sig_info.si_signo), @@ -948,58 +1115,6 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) return 1; } -int -ACE_POSIX_SIG_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) -{ - // Nothing to do if this signal is already there in the signal set, - // that has been already masked out. - int member = sigismember (&this->RT_completion_signals_, - result->signal_number ()); - if (member == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_SIG_Proactor::register_aio_with_proactor:" - "sigismember failed"), - -1); - // Return if it is already there. - if (member == 1) - return 0; - - // Add the signal number to the signal set. - if (sigaddset (&this->RT_completion_signals_, result->signal_number ()) < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:%p\n", - "Couldnt init the RT completion signal set"), - -1); - - // Mask them. - if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:%p\n", - "Couldnt mask the RT completion signals"), - -1); - - // Set up the handler(actually Null handler) for this real-time - // signal. - struct sigaction reaction; - sigemptyset (&reaction.sa_mask); // Nothing else to mask. - reaction.sa_flags = SA_SIGINFO; // Realtime flag. -#if defined (SA_SIGACTION) - // Lynx says, it is better to set this bit, to be portable. - reaction.sa_flags &= SA_SIGACTION; -#endif /* SA_SIGACTION */ - reaction.sa_sigaction = 0; // No handler function. - int sigaction_return = sigaction (result->signal_number (), - &reaction, - 0); - if (sigaction_return == -1) - ACE_ERROR ((LM_ERROR, - "Error:%p\n", - "Proactor couldnt do sigaction for the RT SIGNAL")); - return 0; -} - - // ********************************************************************* ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer (ACE_Handler &handler, diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index 889ff0c4234..f868d354034 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -92,18 +92,18 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Write_Stream_Result_Impl *create_asynch_write_stream_result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Read_File_Result_Impl *create_asynch_read_file_result (ACE_Handler &handler, ACE_HANDLE handle, @@ -112,9 +112,9 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Write_File_Result_Impl *create_asynch_write_file_result (ACE_Handler &handler, ACE_HANDLE handle, @@ -123,9 +123,9 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Accept_Result_Impl *create_asynch_accept_result (ACE_Handler &handler, ACE_HANDLE listen_handle, @@ -133,9 +133,9 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, @@ -147,16 +147,16 @@ public: u_long bytes_per_send, u_long flags, const void *act, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create a timer result object which can be used with the Timer // mechanism of the Proactor. @@ -171,25 +171,6 @@ protected: u_long error); // Protect against structured exceptions caused by user code when // dispatching handles. - - virtual int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) = 0; - // <Asynch_Operation> class call this method to register the <aio_> - // call with the Proactor, so that Proactor can do completion - // querying. Different <POSIX_Proactor> implementations do different - // things to register the <aio_>. - -#if 0 - ACE_Thread_Manager thr_mgr_; - // This will manage the thread in the Timer_Handler. - - ACE_Auto_Event event_; - // This event is used in conjunction with Reactor when we try to - // integrate the event loops of Reactor and the Proactor. - - int used_with_reactor_event_loop_; - // Flag that indicates whether we are used in conjunction with - // Reactor. -#endif /* 0 */ }; // Forward declarations. @@ -296,8 +277,11 @@ class ACE_Export ACE_POSIX_SIG_Proactor : public ACE_POSIX_Proactor // = TITLE // // This Proactor implementation does compeltion querying using - // POSIX Real Time signals. <sigtimedwait> call is used to get - // the notify/get the completions. + // POSIX Real Time signals. <sigtimedwait>/<sigwaitinfo> call is + // used to get the notify/get the completions. + // The real-time signals that are going to be used with this + // Proactor should be given apriori in the constructor, so that + // those signals can be masked from asynchornous delivery. // // = DESCRIPTION // @@ -309,7 +293,15 @@ class ACE_Export ACE_POSIX_SIG_Proactor : public ACE_POSIX_Proactor public: ACE_POSIX_SIG_Proactor (void); - // Constructor. + // This constructor masks only the <ACE_SIGRTMIN> + // real-time signal. Only this signal should be used to issue + // asynchronous operations using this Proctor. + + ACE_POSIX_SIG_Proactor (const sigset_t mask_set); + // This constructor should be used to tell the Proactor to mask and + // wait for the real-time signals specified in this set. Only these + // signals should be used by the asynchronous operations when they + // use this Proactor. virtual ~ACE_POSIX_SIG_Proactor (void); // Destructor. @@ -345,18 +337,33 @@ public: virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); + virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, + const void *act, + const ACE_Time_Value &tv, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN) ; + // If <signal_number> is -1, check with the Proactor and use one of + // the signals that is present in the mask set (i.e. the signals for + // which the Proactor will be waiting) of the Proactor. If there are + // more than one signal, the higher numbered signal will be chosen. + protected: + int setup_signal_handler (int signal_number) const; + // To setup the handler for a real-time signbal. + + static void null_handler (int signal_number, siginfo_t *info, void *context); + // Dummy signal handler. This wont get called at all, since we are + // going to be masking the signal in all the threads. + + int mask_all (void) const; + // To mask all the signals in a thread. + virtual int handle_events (unsigned long milli_seconds); // Dispatch a single set of events. If <milli_seconds> elapses // before any events occur, return 0. Return 1 if a completion is // dispatched. Return -1 on errors. - int register_aio_with_proactor (ACE_POSIX_Asynch_Result *result); - // <Asynch_Operation> classes call this, to register the real-time - // signal used to issue the <aio_> call with the Proator. This - // should be done so that the Proactor can wait for completions of - // the asynchronous calls, issued using those signals. - sigset_t RT_completion_signals_; // These signals are used for completion notification by the // Proactor. The signals specified while issueing <Asynch @@ -373,7 +380,8 @@ class ACE_Export ACE_POSIX_Asynch_Timer : public ACE_POSIX_Asynch_Result // called. friend class ACE_POSIX_Proactor; - // The factory method for this class is with the POSIX_Proactor + friend class ACE_POSIX_SIG_Proactor; + // The factory method for this class is with the POSIX_Proactor // class. protected: @@ -382,7 +390,7 @@ protected: const ACE_Time_Value &tv, ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, - int signal_number = SIGRTMIN); + int signal_number = ACE_SIGRTMIN); // Constructor. virtual ~ACE_POSIX_Asynch_Timer (void) {} diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 6e16958df67..bd2ce7a52e6 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -96,35 +96,28 @@ ACE_Proactor_Timer_Handler::svc (void) { ACE_Time_Value absolute_time; int empty_flag = 0; - + int result = 0; + while (this->shutting_down_ == 0) { - // If the timer queue is not empty + // Is the timer queue empty? empty_flag = this->proactor_.timer_queue ()->is_empty (); + if (!empty_flag) { // Get the earliest absolute time. - absolute_time = - this->proactor_.timer_queue ()->earliest_time () - - this->proactor_.timer_queue ()->gettimeofday (); -#if 0 - ACE_DEBUG ((LM_DEBUG, - "%N%l:(%t):Earliest Time %d sec, %d msec time\n", - absolute_time.sec (), - absolute_time.msec ())); -#endif - // Make it zero if it is negative. - if (absolute_time < ACE_Time_Value::zero) - absolute_time = ACE_Time_Value::zero; + absolute_time = this->proactor_.timer_queue ()->earliest_time (); + + // Block for absolute time. + result = this->timer_event_.wait (&absolute_time); } - - // Wait for event upto <absolute_time>. - int result = 0; - if (empty_flag) - result = this->timer_event_.wait (0); else - result = this->timer_event_.wait (&absolute_time); + { + // Wait for ever. + result = this->timer_event_.wait (); + } + // Check for timer expiries. if (result == -1) { switch (errno) @@ -142,7 +135,6 @@ ACE_Proactor_Timer_Handler::svc (void) } } } - return 0; } @@ -166,19 +158,21 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (TIMER_QUEUE &timer_queue, ASYS_TEXT ("(%t) No Proactor set in ACE_Proactor_Handle_Timeout_Upcall,") ASYS_TEXT (" no completion port to post timeout to?!@\n")), -1); - + // Create the Asynch_Timer. ACE_Asynch_Result_Impl *asynch_timer = this->proactor_->create_asynch_timer (*handler, - act, - time, - 0); + act, + time, + ACE_INVALID_HANDLE, + 0, + -1); if (asynch_timer == 0) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t):%p\n", "ACE_Proactor_Handle_Timeout_Upcall::timeout:" "create_asynch_timer failed"), -1); - + // Post a completion. if (asynch_timer->post_completion (this->proactor_->implementation ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -239,7 +233,7 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation, delete_timer_queue_ (0) { this->implementation (implementation); - + if (this->implementation () == 0) { #if defined (ACE_HAS_AIO_CALLS) @@ -248,8 +242,8 @@ ACE_Proactor::ACE_Proactor (ACE_Proactor_Impl *implementation, ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor); #elif defined (ACE_POSIX_SIG_PROACTOR) ACE_NEW (implementation, ACE_POSIX_SIG_Proactor); - #else /* Default is to use the AIOCB one */ - ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor); +#else /* Default is to use the SIG one */ + ACE_NEW (implementation, ACE_POSIX_SIG_Proactor); #endif #elif (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) // WIN_Proactor. @@ -637,7 +631,8 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, u_long bytes_to_read, const void* act, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_read_stream_result (handler, handle, @@ -645,7 +640,8 @@ ACE_Proactor::create_asynch_read_stream_result (ACE_Handler &handler, bytes_to_read, act, event, - priority); + priority, + signal_number); } @@ -656,7 +652,8 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, u_long bytes_to_write, const void* act, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_write_stream_result (handler, @@ -665,7 +662,8 @@ ACE_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, bytes_to_write, act, event, - priority); + priority, + signal_number); } @@ -679,7 +677,8 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler, u_long offset, u_long offset_high, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_read_file_result (handler, @@ -690,7 +689,8 @@ ACE_Proactor::create_asynch_read_file_result (ACE_Handler &handler, offset, offset_high, event, - priority); + priority, + signal_number); } @@ -704,7 +704,8 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler, u_long offset, u_long offset_high, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_write_file_result (handler, @@ -715,7 +716,8 @@ ACE_Proactor::create_asynch_write_file_result (ACE_Handler &handler, offset, offset_high, event, - priority); + priority, + signal_number); } @@ -727,7 +729,8 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler, u_long bytes_to_read, const void* act, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_accept_result (handler, @@ -737,7 +740,8 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler, bytes_to_read, act, event, - priority); + priority, + signal_number); } ACE_Asynch_Transmit_File_Result_Impl * @@ -752,7 +756,8 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, u_long flags, const void *act, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_transmit_file_result (handler, @@ -766,7 +771,8 @@ ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, flags, act, event, - priority); + priority, + signal_number); } ACE_Asynch_Result_Impl * @@ -774,13 +780,15 @@ ACE_Proactor::create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event, - int priority) + int priority, + int signal_number) { return this->implementation ()->create_asynch_timer (handler, act, tv, event, - priority); + priority, + signal_number); } void diff --git a/ace/Proactor.h b/ace/Proactor.h index abc0d605a82..4f3904f43e4 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -304,8 +304,9 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Read_Stream::Result class. virtual ACE_Asynch_Write_Stream_Result_Impl *create_asynch_write_stream_result (ACE_Handler &handler, @@ -313,8 +314,9 @@ public: ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Write_Stream::Result. virtual ACE_Asynch_Read_File_Result_Impl *create_asynch_read_file_result (ACE_Handler &handler, @@ -324,8 +326,9 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Read_File::Result. virtual ACE_Asynch_Write_File_Result_Impl *create_asynch_write_file_result (ACE_Handler &handler, @@ -335,8 +338,9 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Write_File::Result. virtual ACE_Asynch_Accept_Result_Impl *create_asynch_accept_result (ACE_Handler &handler, @@ -345,8 +349,9 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Accept::Result. virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, @@ -359,17 +364,22 @@ public: u_long bytes_per_send, u_long flags, const void *act, - ACE_HANDLE event, - int priority); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create the correct implementation class for ACE_Asynch_Transmit_File::Result. - + virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, - ACE_HANDLE event, - int priority = 0); + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); // Create a timer result object which can be used with the Timer - // mechanism of the Proactor. + // mechanism of the Proactor. + // If <signal_number> is -1, <POSIX_SIG_Proactor> will create a + // Timer object with a meaningful signal number, choosing the + // largest signal number from the signal mask of the Proactor. protected: virtual void implementation (ACE_Proactor_Impl *implementation); diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h index 347de43fcd7..edae3452e1a 100644 --- a/ace/Proactor_Impl.h +++ b/ace/Proactor_Impl.h @@ -113,7 +113,7 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Read_Stream::Result class. @@ -123,7 +123,7 @@ public: ACE_Message_Block &message_block, u_long bytes_to_write, const void* act, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Write_Stream::Result. @@ -135,7 +135,7 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Read_File::Result. @@ -147,7 +147,7 @@ public: const void* act, u_long offset, u_long offset_high, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Write_File::Result. @@ -158,7 +158,7 @@ public: ACE_Message_Block &message_block, u_long bytes_to_read, const void* act, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Accept::Result. @@ -173,7 +173,7 @@ public: u_long bytes_per_send, u_long flags, const void *act, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; // Create the correct implementation class for ACE_Asynch_Transmit_File::Result. @@ -181,10 +181,12 @@ public: virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, - ACE_HANDLE event, + ACE_HANDLE event = ACE_INVALID_HANDLE, int priority = 0, - int signal_number = ACE_SIGRTMIN) = 0; - // Create the correct implementation object for the Timer result. + int signal_number = 0) = 0; + // Create the correct implementation object for the Timer + // result. POSIX_SIG_Proactor will create a Timer object with a + // meaningful signal number, if you leave the signal number as 0. }; #endif /* (ACE_WIN32 && ACE_HAS_WINCE) || ACE_HAS_AIO_CALLS */ diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp index e67e3bef3e6..9e94e4699ec 100644 --- a/ace/WIN32_Proactor.cpp +++ b/ace/WIN32_Proactor.cpp @@ -462,7 +462,7 @@ ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result) ) == FALSE) { delete result; - ACE_ERROR_RETURN ((LM_ERROR, "Failure in dealing with timers: PostQueuedCompletionStatus failed\n"), -1); + ACE_ERROR_RETURN ((LM_ERROR, "PostQueuedCompletionStatus failed\n"), -1); } return 0; diff --git a/ace/config-linux-lxpthreads.h b/ace/config-linux-lxpthreads.h index 5995b9c8dd8..240ba39f8c3 100644 --- a/ace/config-linux-lxpthreads.h +++ b/ace/config-linux-lxpthreads.h @@ -73,4 +73,6 @@ #include /**/ <pthread.h> +#define ACE_HAS_AIO_CALLS + #endif /* ACE_CONFIG_H */ diff --git a/ace/config-lynxos.h b/ace/config-lynxos.h index e60b2125bec..b6862299735 100644 --- a/ace/config-lynxos.h +++ b/ace/config-lynxos.h @@ -200,6 +200,12 @@ extern "C" // Aio works on lynx #define ACE_HAS_AIO_CALLS -// AIOCB Proactor doesnt work on lynx yet. -#define ACE_POSIX_SIG_PROACTOR +// AIOCB Proactor works on Lynx. But it is not +// multi-threaded. +// Lynx OS 3.0.0 lacks POSIX call <pthread_sigmask>. So,we cannot use +// SIG Proactor also, with multiple threads. So, let us use the AIOCB +// Proactor. Once <pthreadd_sigmask> is available on Lynx, we can turn +// on SIG Proactor for this platform. +// #define ACE_POSIX_SIG_PROACTOR +#define ACE_POSIX_AIOCB_PROACTOR #endif /* ACE_CONFIG_H */ |