diff options
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 209 |
1 files changed, 61 insertions, 148 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 68c8aaf7be0..38aac828254 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -13,38 +13,6 @@ #include "ace/POSIX_Proactor.i" #endif /* __ACE_INLINE__ */ -class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result -{ - // = TITLE - // - // This is result object is used by the <end_event_loop> of the - // ACE_Proactor interface to wake up all the threads blocking - // for completions. - // - // = DESCRIPTION - // - -public: - ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, - const void *act = 0, - ACE_HANDLE event = ACE_INVALID_HANDLE, - int priority = 0, - int signal_number = ACE_SIGRTMIN); - // Constructor. - - virtual ~ACE_POSIX_Wakeup_Completion (void); - // Destructor. - - - virtual void complete (u_long bytes_transferred = 0, - int success = 1, - const void *completion_key = 0, - u_long error = 0); - // This method calls the <handler>'s <handle_wakeup> method. -}; - -// ********************************************************************* - ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) { this->close (); @@ -80,7 +48,7 @@ ACE_POSIX_Proactor::close_dispatch_threads (int) size_t ACE_POSIX_Proactor::number_of_threads (void) const { - // @@ Implement it. + // @@ Implement it. ACE_NOTSUP_RETURN (0); } @@ -139,7 +107,7 @@ ACE_POSIX_Proactor::create_asynch_write_stream_result (ACE_Handler &handler, bytes_to_write, act, event, - priority, + priority, signal_number), 0); return implementation; @@ -322,7 +290,7 @@ void ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, u_long bytes_transferred, int success, - const void */* completion_key*/, + const void *completion_key, u_long error) { ACE_SEH_TRY @@ -330,7 +298,7 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r // Call completion hook asynch_result->complete (bytes_transferred, success, - 0, // No completion key. + (void *) completion_key, error); } ACE_SEH_FINALLY @@ -340,23 +308,6 @@ ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_r } } -int -ACE_POSIX_Proactor::post_wakeup_completions (int how_many) -{ - ACE_POSIX_Wakeup_Completion *wakeup_completion = 0; - for (ssize_t ci = 0; ci < how_many; ci++) - { - ACE_NEW_RETURN (wakeup_completion, - ACE_POSIX_Wakeup_Completion (this->wakeup_handler_), - -1); - - if (wakeup_completion->post_completion (this) == -1) - return -1; - } - - return 0; -} - // ********************************************************************* class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler @@ -378,7 +329,7 @@ class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler // completion of the <Asynch_Read_Stream> and calls the // <handle_read_stream> of this class. This class calls // <complete> on the <POSIX_Asynch_Result *> and thus calls the - // application handler. + // application handler. // Handling the MessageBlock: // We give this message block to read the result pointer through // the notify pipe. We expect that to read 4 bytes from the @@ -389,10 +340,10 @@ public: ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor); // Constructor. You need the posix proactor because you need to call // <application_specific_code> - + virtual ~ACE_AIOCB_Notify_Pipe_Manager (void); // Destructor. - + int notify (ACE_POSIX_Asynch_Result *result); // Send the result pointer through the notification pipe. @@ -403,10 +354,10 @@ public: private: ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_; // The implementation proactor class. - + ACE_Message_Block message_block_; // Message block to get ACE_POSIX_Asynch_Result pointer from the - // pipe. + // pipe. ACE_Pipe pipe_; // Pipe for the communication between Proactor and the @@ -459,8 +410,7 @@ ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result) { // Send the result pointer through the pipe. int return_val = ACE::send (this->pipe_.write_handle (), - ACE_reinterpret_cast (char *, - &result), + (char *) &result, sizeof (result)); if (return_val != sizeof (result)) ACE_ERROR_RETURN ((LM_ERROR, @@ -475,7 +425,7 @@ void ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { // The message block actually contains the ACE_POSIX_Asynch_Result - // pointer. + // pointer. ACE_POSIX_Asynch_Result *asynch_result = 0; asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr (); @@ -513,10 +463,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) { // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) - { - aiocb_list_[ai] = 0; - result_list_ [ai] = 0; - } + aiocb_list_[ai] = 0; // Accept Handler for aio_accept. Remember! this issues a Asynch_Read // on the notify pipe for doing the Asynch_Accept. @@ -615,12 +562,10 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) { int result_suspend = 0; if (milli_seconds == ACE_INFINITE) - { - // Indefinite blocking. - result_suspend = aio_suspend (this->aiocb_list_, - this->aiocb_list_max_size_, - 0); - } + // Indefinite blocking. + result_suspend = aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + 0); else { // Block on <aio_suspend> for <milli_seconds> @@ -631,12 +576,12 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) this->aiocb_list_max_size_, &timeout); } - + // Check for errors if (result_suspend == -1) { // If failure is because of timeout, then return *0*, otherwise - // return -1. + // return -1. if (errno == EAGAIN) return 0; else @@ -656,7 +601,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) // Dont process null blocks. if (aiocb_list_ [ai] == 0) continue; - + // Analyze error and return values. // Get the error status of the aio_ operation. @@ -688,21 +633,17 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) // This AIO has finished. break; } - + // Something should have completed. ACE_ASSERT (ai != this->aiocb_list_max_size_); - + // Retrive the result pointer. - ACE_POSIX_Asynch_Result *asynch_result = this->result_list_ [ai]; - - // ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, - // this->aiocb_list_[ai]); - // ACE_dynamic_cast (ACE_POSIX_Asynch_Result *, - // this->aiocb_list_[ai]); + ACE_POSIX_Asynch_Result *asynch_result = + (ACE_POSIX_Asynch_Result *) this->aiocb_list_[ai]; + // Invalidate entry in the aiocb list. this->aiocb_list_[ai] = 0; - this->result_list_ [ai] = 0; this->aiocb_list_cur_size_--; // Call the application code. @@ -711,7 +652,7 @@ ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) 1, // Success 0, // No completion key. error_status); // Error - + // Success return 1; } @@ -769,9 +710,7 @@ ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *r -1); // Store the pointers. - this->aiocb_list_[ai] = result; - this->result_list_ [ai] = result; - + this->aiocb_list_[ai] = result; this->aiocb_list_cur_size_ ++; return 0; @@ -783,19 +722,19 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) { // = Mask all the signals, keep a mask set with ACE_SIGRTMIN and set // up signal handler for SIGRTMIN. - + // Mask all the signals. if (this->mask_all () != 0) return; - + // = Keep a mask set with ACE_SIGRTMIN. - + // Clear the signal set. if (sigemptyset (&this->RT_completion_signals_) == -1) ACE_ERROR ((LM_ERROR, "Error:%p\n", "Couldn't init the RT completion signal set")); - + // Add the signal number to the signal set. if (sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1) ACE_ERROR ((LM_ERROR, @@ -809,16 +748,16 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set) { // = Keep <Signal_set> with the Proactor, mask all the signals and - // setup signal handlers for the signals in the <signal_set>. - + // setup signal handlers for the signals in the <signal_set>. + // = Keep <signal_set> with the Proactor. - + // Empty the signal set first. if (sigemptyset (&this->RT_completion_signals_) == -1) ACE_ERROR ((LM_ERROR, "Error:(%P | %t):%p\n", "sigemptyset failed")); - + // Put the <signal_set>. if (ACE_OS::pthread_sigmask (SIG_SETMASK, &signal_set, 0) != 0) ACE_ERROR ((LM_ERROR, @@ -832,11 +771,11 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set) "ACE_OS::pthread_sigmask failed")); - // Mask all the signals. + // Mask all the signals. if (this->mask_all () != 0) return; - - // For each signal number present in the <signal_set>, set up the + + // For each signal number present in the <signal_set>, set up the // signal handler. int member = 0; for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) @@ -885,12 +824,11 @@ ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) "Error:%N:%l(%P | %t):%p", "<getpid> failed"), -1); - + // Set the signal information. sigval value; - value.sival_ptr = ACE_reinterpret_cast (void *, - result); - + value.sival_ptr = (void *) result; + // Queue the signal. if (sigqueue (pid, result->signal_number (), value) == -1) ACE_ERROR_RETURN ((LM_ERROR, @@ -977,7 +915,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, "sigismember failed"), 0); } - + if (is_member == 0) ACE_ERROR_RETURN ((LM_ERROR, "Error:%N:%l:(%P | %t)::%s\n", @@ -988,7 +926,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, // + 1 to nullify loop increment. signal_number = si + 1; } - + ACE_Asynch_Result_Impl *implementation; ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Timer (handler, @@ -1005,12 +943,12 @@ int ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const { // Set up the handler(actually Null handler) for this real-time - // signal. + // signal. struct sigaction reaction; sigemptyset (&reaction.sa_mask); // Nothing else to mask. reaction.sa_flags = SA_SIGINFO; // Realtime flag. #if defined (SA_SIGACTION) - // Lynx says, it is better to set this bit, to be portable. + // Lynx says, it is better to set this bit, to be portable. reaction.sa_flags &= SA_SIGACTION; #endif /* SA_SIGACTION */ reaction.sa_sigaction = null_handler; // Null handler function. @@ -1031,9 +969,9 @@ ACE_POSIX_SIG_Proactor::null_handler (int signal_number, void * /* context */) { ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):ACE_POSIX_SIG_Proactor::null_handler called," - "Signal number %d," - "Mask all the RT signals for this thread\n", + "Error:(%P | %t):%s:Signal number %d\n" + "Mask all the RT signals for this thread", + "ACE_POSIX_SIG_Proactor::null_handler called", signal_number)); } @@ -1041,21 +979,21 @@ int ACE_POSIX_SIG_Proactor::mask_all (void) const { sigset_t full_set; - + // Get full set. if (sigfillset (&full_set) != 0) ACE_ERROR_RETURN ((LM_ERROR, "Error:(%P | %t):%p\n", "sigfillset failed"), -1); - + // Mask them. if (ACE_OS::pthread_sigmask (SIG_SETMASK, &full_set, 0) != 0) ACE_ERROR_RETURN ((LM_ERROR, "Error:(%P | %t):%p\n", "pthread_sigmask failed"), -1); - + return 0; } @@ -1068,7 +1006,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) // Mask all the signals. if (this->mask_all () != 0) return -1; - + // Wait for the signals. if (milli_seconds == ACE_INFINITE) { @@ -1085,7 +1023,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) &sig_info, &timeout); } - + // Check for errors if (result_sigwait == -1) { @@ -1100,9 +1038,9 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) "sigtimedwait/sigwaitinfo failed"), -1); } - + // No errors, RT compleion signal is received. - + // Is the signo returned consistent with the sig info? if (sig_info.si_signo != result_sigwait) ACE_ERROR_RETURN ((LM_ERROR, @@ -1111,15 +1049,15 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) "Inconsistent signal number (%d) in the signal info block", sig_info.si_signo), -1); - + // Retrive the result pointer. - ACE_POSIX_Asynch_Result *asynch_result = ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, - sig_info.si_value.sival_ptr); + ACE_POSIX_Asynch_Result *asynch_result = + (ACE_POSIX_Asynch_Result *) sig_info.si_value.sival_ptr; // Check the <signal code> and act according to that. if (sig_info.si_code == SI_ASYNCIO) { - // Analyze error and return values. + // Analyze error and return values. // Check the error status int error_status = aio_error (asynch_result); @@ -1130,11 +1068,11 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) "ACE_POSIX_SIG_Proactor::handle_events:" "<aio_error> has failed"), -1); - + // Completion signal has been received, so it can't be in // progress. ACE_ASSERT (error_status != EINPROGRESS); - + // Error_status is not -1 and not EINPROGRESS. So, an <aio_> // operation has finished (successfully or unsuccessfully!!!) // Get the return_status of the <aio_> operation. @@ -1152,7 +1090,7 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) this->application_specific_code (asynch_result, return_status, 1, // Result : True. - 0, // No completion key. + 0, // No completion_signal. error_status); // Error. } } @@ -1205,29 +1143,4 @@ ACE_POSIX_Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } -// ********************************************************************* - -ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, - const void *act, - ACE_HANDLE event, - int priority, - int signal_number) - : ACE_Asynch_Result_Impl (), - ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) -{ -} - -ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void) -{ -} - -void -ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */, - int /* success */, - const void * /* completion_key */, - u_long /* error */) -{ - this->handler_.handle_wakeup (); -} - #endif /* ACE_HAS_AIO_CALLS */ |