diff options
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 434 |
1 files changed, 116 insertions, 318 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 68c8aaf7be0..3861e8aa110 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -13,38 +13,6 @@ #include "ace/POSIX_Proactor.i" #endif /* __ACE_INLINE__ */ -class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result -{ - // = TITLE - // - // This is result object is used by the <end_event_loop> of the - // ACE_Proactor interface to wake up all the threads blocking - // for completions. - // - // = DESCRIPTION - // - -public: - ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, - const void *act = 0, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Constructor. - - virtual ~ACE_POSIX_Wakeup_Completion (void); - // Destructor. - - - virtual void complete (u_long bytes_transferred = 0, - int success = 1, - const void *completion_key = 0, - u_long error = 0); - // This method calls the <handler>'s <handle_wakeup> method. -}; - -// ********************************************************************* - ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) { this->close (); @@ -80,7 +48,7 @@ ACE_POSIX_Proactor::close_dispatch_threads (int) size_t ACE_POSIX_Proactor::number_of_threads (void) const { - // @@ Implement it. + // @@ Implement it. ACE_NOTSUP_RETURN (0); } @@ -139,7 +107,7 @@ ACE_POSIX_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, bytes_to_write, act, event, - priority, + priority, signal_number), 0); return implementation; @@ -322,7 +290,7 @@ void ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, u_long bytes_transferred, int success, - const void */* completion_key*/, + const void *completion_key, u_long error) { ACE_SEH_TRY @@ -330,7 +298,7 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r // Call completion hook asynch_result->complete (bytes_transferred, success, - 0, // No completion key. + (void *) completion_key, error); } ACE_SEH_FINALLY @@ -340,23 +308,6 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r } } -int -ACE_POSIX_Proactor::post_wakeup_completions (int how_many) -{ - ACE_POSIX_Wakeup_Completion *wakeup_completion = 0; - for (ssize_t ci = 0; ci < how_many; ci++) - { - ACE_NEW_RETURN (wakeup_completion, - ACE_POSIX_Wakeup_Completion (this->wakeup_handler_), - -1); - - if (wakeup_completion->post_completion (this) == -1) - return -1; - } - - return 0; -} - // ********************************************************************* class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler @@ -378,7 +329,7 @@ class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler // completion of the <Asynch_Read_Stream> and calls the // <handle_read_stream> of this class. This class calls // <complete> on the <POSIX_Asynch_Result *> and thus calls the - // application handler. + // application handler. // Handling the MessageBlock: // We give this message block to read the result pointer through // the notify pipe. We expect that to read 4 bytes from the @@ -389,10 +340,10 @@ public: ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); // Constructor. You need the posix proactor because you need to call // <application_specific_code> - + virtual ~ACE_AIOCB_Notify_Pipe_Manager (void); // Destructor. - + int notify (ACE_POSIX_Asynch_Result *result); // Send the result pointer through the notification pipe. @@ -403,10 +354,10 @@ public: private: ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_; // The implementation proactor class. - + ACE_Message_Block message_block_; // Message block to get ACE_POSIX_Asynch_Result pointer from the - // pipe. + // pipe. ACE_Pipe pipe_; // Pipe for the communication between Proactor and the @@ -459,8 +410,7 @@ ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result) { // Send the result pointer through the pipe. int return_val = ACE::send (this->pipe_.write_handle (), - ACE_reinterpret_cast (char *, - &result), + (char *) &result, sizeof (result)); if (return_val != sizeof (result)) ACE_ERROR_RETURN ((LM_ERROR, @@ -475,7 +425,7 @@ void ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { // The message block actually contains the ACE_POSIX_Asynch_Result - // pointer. + // pointer. ACE_POSIX_Asynch_Result *asynch_result = 0; asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr (); @@ -513,10 +463,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) { // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) - { - aiocb_list_[ai] = 0; - result_list_ [ai] = 0; - } + aiocb_list_[ai] = 0; // Accept Handler for aio_accept. Remember! this issues a Asynch_Read // on the notify pipe for doing the Asynch_Accept. @@ -615,12 +562,10 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) { int result_suspend = 0; if (milli_seconds == ACE_INFINITE) - { - // Indefinite blocking. - result_suspend = aio_suspend (this->aiocb_list_, - this->aiocb_list_max_size_, - 0); - } + // Indefinite blocking. + result_suspend = aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + 0); else { // Block on <aio_suspend> for <milli_seconds> @@ -631,12 +576,12 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) this->aiocb_list_max_size_, &timeout); } - + // Check for errors if (result_suspend == -1) { // If failure is because of timeout, then return *0*, otherwise - // return -1. + // return -1. if (errno == EAGAIN) return 0; else @@ -656,7 +601,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) // Dont process null blocks. if (aiocb_list_ [ai] == 0) continue; - + // Analyze error and return values. // Get the error status of the aio_ operation. @@ -688,21 +633,17 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) // This AIO has finished. break; } - + // Something should have completed. ACE_ASSERT (ai != this->aiocb_list_max_size_); - + // Retrive the result pointer. - ACE_POSIX_Asynch_Result *asynch_result = this->result_list_ [ai]; - - // ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, - // this->aiocb_list_[ai]); - // ACE_dynamic_cast (ACE_POSIX_Asynch_Result *, - // this->aiocb_list_[ai]); + ACE_POSIX_Asynch_Result *asynch_result = + (ACE_POSIX_Asynch_Result *) this->aiocb_list_[ai]; + // Invalidate entry in the aiocb list. this->aiocb_list_[ai] = 0; - this->result_list_ [ai] = 0; this->aiocb_list_cur_size_--; // Call the application code. @@ -711,7 +652,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) 1, // Success 0, // No completion key. error_status); // Error - + // Success return 1; } @@ -769,9 +710,7 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r -1); // Store the pointers. - this->aiocb_list_[ai] = result; - this->result_list_ [ai] = result; - + this->aiocb_list_[ai] = result; this->aiocb_list_cur_size_ ++; return 0; @@ -781,80 +720,13 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) { - // = Mask all the signals, keep a mask set with ACE_SIGRTMIN and set - // up signal handler for SIGRTMIN. - - // Mask all the signals. - if (this->mask_all () != 0) - return; - - // = Keep a mask set with ACE_SIGRTMIN. - - // Clear the signal set. + // Make the sigset_t consisting of the completion signals. if (sigemptyset (&this->RT_completion_signals_) == -1) ACE_ERROR ((LM_ERROR, "Error:%p\n", "Couldn't init the RT completion signal set")); - - // Add the signal number to the signal set. - if (sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1) - ACE_ERROR ((LM_ERROR, - "Error:%p\n", - "Couldnt init the RT completion signal set")); - - // Set up the signal handler for SIGRTMIN. - setup_signal_handler (ACE_SIGRTMIN); } -ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set) -{ - // = Keep <Signal_set> with the Proactor, mask all the signals and - // setup signal handlers for the signals in the <signal_set>. - - // = Keep <signal_set> with the Proactor. - - // Empty the signal set first. - if (sigemptyset (&this->RT_completion_signals_) == -1) - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "sigemptyset failed")); - - // Put the <signal_set>. - if (ACE_OS::pthread_sigmask (SIG_SETMASK, &signal_set, 0) != 0) - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "pthread_sigmask failed")); - - // Get the <signal_set> back from the OS. - if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, &this->RT_completion_signals_) != 0) - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_OS::pthread_sigmask failed")); - - - // Mask all the signals. - if (this->mask_all () != 0) - return; - - // For each signal number present in the <signal_set>, set up the - // signal handler. - int member = 0; - for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) - { - member = sigismember (&signal_set, - si); - if (member == -1) - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:" - "sigismember failed")); - else if (member == 1) - { - if (this->setup_signal_handler (si) == -1) - return; - } - } -} ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { @@ -882,20 +754,23 @@ ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) pid_t pid = ACE_OS::getpid (); if (pid == (pid_t) -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:%N:%l(%P | %t):%p", - "<getpid> failed"), + "Error:(%P | %t):%p", + "<getpid> failed\n"), -1); - + // Set the signal information. sigval value; - value.sival_ptr = ACE_reinterpret_cast (void *, - result); - + value.sival_ptr = (void *) result; + + // Register the signal number with the Proactor. + if (this->register_aio_with_proactor (result) == -1) + return -1; + // Queue the signal. if (sigqueue (pid, result->signal_number (), value) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:%N:%l:(%P | %t):%p\n", - "<sigqueue> failed"), + "Error:(%P | %t):%p", + "<sigqueue> failed\n"), -1); return 0; } @@ -950,131 +825,15 @@ ACE_POSIX_SIG_Proactor::create_asynch_transmit_file (void) return implementation; } -ACE_Asynch_Result_Impl * -ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, - const void *act, - const ACE_Time_Value &tv, - ACE_HANDLE event, - int priority, - int signal_number) -{ - int is_member = 0; - - // Fix the signal number. - if (signal_number == -1) - { - int si; - for (si = ACE_SIGRTMAX; - (is_member == 0) && (si >= ACE_SIGRTMIN); - si--) - { - is_member = sigismember (&this->RT_completion_signals_, - si); - if (is_member == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::\n", - "ACE_POSIX_SIG_Proactor::create_asynch_timer:" - "sigismember failed"), - 0); - } - - if (is_member == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:%N:%l:(%P | %t)::%s\n", - "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:" - "Signal mask set empty"), - 0); - else - // + 1 to nullify loop increment. - signal_number = si + 1; - } - - ACE_Asynch_Result_Impl *implementation; - ACE_NEW_RETURN (implementation, - ACE_POSIX_Asynch_Timer (handler, - act, - tv, - event, - priority, - signal_number), - 0); - return implementation; -} - -int -ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const -{ - // Set up the handler(actually Null handler) for this real-time - // signal. - struct sigaction reaction; - sigemptyset (&reaction.sa_mask); // Nothing else to mask. - reaction.sa_flags = SA_SIGINFO; // Realtime flag. -#if defined (SA_SIGACTION) - // Lynx says, it is better to set this bit, to be portable. - reaction.sa_flags &= SA_SIGACTION; -#endif /* SA_SIGACTION */ - reaction.sa_sigaction = null_handler; // Null handler function. - int sigaction_return = sigaction (signal_number, - &reaction, - 0); - if (sigaction_return == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:%p\n", - "Proactor couldnt do sigaction for the RT SIGNAL"), - -1); - return 0; -} - -void -ACE_POSIX_SIG_Proactor::null_handler (int signal_number, - siginfo_t * /* info */, - void * /* context */) -{ - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):ACE_POSIX_SIG_Proactor::null_handler called," - "Signal number %d," - "Mask all the RT signals for this thread\n", - signal_number)); -} - -int -ACE_POSIX_SIG_Proactor::mask_all (void) const -{ - sigset_t full_set; - - // Get full set. - if (sigfillset (&full_set) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - "sigfillset failed"), - -1); - - // Mask them. - if (ACE_OS::pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - "pthread_sigmask failed"), - -1); - - return 0; -} - int ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) { int result_sigwait = 0; siginfo_t sig_info; - - // Mask all the signals. - if (this->mask_all () != 0) - return -1; - - // Wait for the signals. + if (milli_seconds == ACE_INFINITE) - { - result_sigwait = sigwaitinfo (&this->RT_completion_signals_, - &sig_info); - } + result_sigwait = sigwaitinfo (&this->RT_completion_signals_, + &sig_info); else { // Wait for <milli_seconds> amount of time. @@ -1085,7 +844,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) &sig_info, &timeout); } - + // Check for errors if (result_sigwait == -1) { @@ -1100,26 +859,38 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) "sigtimedwait/sigwaitinfo failed"), -1); } - + // No errors, RT compleion signal is received. - + + // We deal only with the signal numbers that have been registered + // already. + int member = sigismember (&this->RT_completion_signals_, + result_sigwait); + if (member != 1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::handle_events:" + "Unexpected signal (%d) received while waiting for RT Completion Signal", + result_sigwait), + -1); + // Is the signo returned consistent with the sig info? if (sig_info.si_signo != result_sigwait) ACE_ERROR_RETURN ((LM_ERROR, - "Error:%N:%l:(%P | %t):" + "%N:%l:(%P | %t):" "ACE_POSIX_SIG_Proactor::handle_events:" "Inconsistent signal number (%d) in the signal info block", sig_info.si_signo), -1); - + // Retrive the result pointer. - ACE_POSIX_Asynch_Result *asynch_result = ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, - sig_info.si_value.sival_ptr); + ACE_POSIX_Asynch_Result *asynch_result = + (ACE_POSIX_Asynch_Result *) sig_info.si_value.sival_ptr; // Check the <signal code> and act according to that. if (sig_info.si_code == SI_ASYNCIO) { - // Analyze error and return values. + // Analyze error and return values. // Check the error status int error_status = aio_error (asynch_result); @@ -1130,11 +901,11 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) "ACE_POSIX_SIG_Proactor::handle_events:" "<aio_error> has failed"), -1); - + // Completion signal has been received, so it can't be in // progress. ACE_ASSERT (error_status != EINPROGRESS); - + // Error_status is not -1 and not EINPROGRESS. So, an <aio_> // operation has finished (successfully or unsuccessfully!!!) // Get the return_status of the <aio_> operation. @@ -1152,7 +923,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) this->application_specific_code (asynch_result, return_status, 1, // Result : True. - 0, // No completion key. + 0, // No completion_signal. error_status); // Error. } } @@ -1177,6 +948,58 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) return 1; } +int +ACE_POSIX_SIG_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) +{ + // Nothing to do if this signal is already there in the signal set, + // that has been already masked out. + int member = sigismember (&this->RT_completion_signals_, + result->signal_number ()); + if (member == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::register_aio_with_proactor:" + "sigismember failed"), + -1); + // Return if it is already there. + if (member == 1) + return 0; + + // Add the signal number to the signal set. + if (sigaddset (&this->RT_completion_signals_, result->signal_number ()) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Couldnt init the RT completion signal set"), + -1); + + // Mask them. + if (sigprocmask (SIG_BLOCK, &RT_completion_signals_, 0) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Couldnt mask the RT completion signals"), + -1); + + // Set up the handler(actually Null handler) for this real-time + // signal. + struct sigaction reaction; + sigemptyset (&reaction.sa_mask); // Nothing else to mask. + reaction.sa_flags = SA_SIGINFO; // Realtime flag. +#if defined (SA_SIGACTION) + // Lynx says, it is better to set this bit, to be portable. + reaction.sa_flags &= SA_SIGACTION; +#endif /* SA_SIGACTION */ + reaction.sa_sigaction = 0; // No handler function. + int sigaction_return = sigaction (result->signal_number (), + &reaction, + 0); + if (sigaction_return == -1) + ACE_ERROR ((LM_ERROR, + "Error:%p\n", + "Proactor couldnt do sigaction for the RT SIGNAL")); + return 0; +} + + // ********************************************************************* ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer (ACE_Handler &handler, @@ -1205,29 +1028,4 @@ ACE_POSIX_Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } -// ********************************************************************* - -ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, - const void *act, - ACE_HANDLE event, - int priority, - int signal_number) - : ACE_Asynch_Result_Impl (), - ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) -{ -} - -ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void) -{ -} - -void -ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */, - int /* success */, - const void * /* completion_key */, - u_long /* error */) -{ - this->handler_.handle_wakeup (); -} - #endif /* ACE_HAS_AIO_CALLS */ |