diff options
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 422 |
1 files changed, 184 insertions, 238 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 8634454e154..77220da3abd 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -1,6 +1,5 @@ -// $Id$ - -#define ACE_BUILD_DLL +// $Id$ + #include "ace/POSIX_Proactor.h" #if defined (ACE_HAS_AIO_CALLS) @@ -16,9 +15,14 @@ class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result { // = TITLE - // This is result object is used by the <end_event_loop> of the + // + // This is result object is used by the <end_event_loop> of the // ACE_Proactor interface to wake up all the threads blocking // for completions. + // + // = DESCRIPTION + // + public: ACE_POSIX_Wakeup_Completion (ACE_Handler &handler, const void *act = 0, @@ -26,11 +30,11 @@ public: int priority = 0, int signal_number = ACE_SIGRTMIN); // Constructor. - + virtual ~ACE_POSIX_Wakeup_Completion (void); // Destructor. - - + + virtual void complete (u_long bytes_transferred = 0, int success = 1, const void *completion_key = 0, @@ -290,7 +294,7 @@ ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) ACE_Time_Value timeout (0, 0); int result = 0; - for (;;) + while (1) { result = this->handle_events (timeout); if (result != 0 || errno == ETIME) @@ -339,19 +343,18 @@ int ACE_POSIX_Proactor::post_wakeup_completions (int how_many) { ACE_POSIX_Wakeup_Completion *wakeup_completion = 0; - for (ssize_t ci = 0; ci < how_many; ci++) { ACE_NEW_RETURN (wakeup_completion, ACE_POSIX_Wakeup_Completion (this->wakeup_handler_), -1); - + if (wakeup_completion->post_completion (this) == -1) return -1; } - + return 0; -} +} class ACE_Export ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler { @@ -498,96 +501,29 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream: "Read from pipe failed")); } -// Public constructor for common use. -ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) - : aiocb_notify_pipe_manager_ (0), - aiocb_list_ (0), - result_list_ (0), - aiocb_list_max_size_ (max_aio_operations), - aiocb_list_cur_size_ (0) -{ - if (aiocb_list_max_size_ > 8192) - // @@ Alex, this shouldn't be a magic number -- it should be a - // constant, e.g., ACE_AIO_MAX_SIZE or something. - aiocb_list_max_size_ = 8192; - - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); - - // Initialize the array. - for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) - { - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - } - - create_notify_manager (); -} +// ********************************************************************* -// Special protected constructor for ACE_SUN_Proactor -ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,int Flg) +ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (void) : aiocb_notify_pipe_manager_ (0), - aiocb_list_ (0), - result_list_ (0), - aiocb_list_max_size_ (max_aio_operations), + aiocb_list_max_size_ (ACE_RTSIG_MAX), aiocb_list_cur_size_ (0) { - ACE_UNUSED_ARG (Flg); - - if (aiocb_list_max_size_ > 8192) - // @@ Alex, this shouldn't be a magic number -- it should be a - // constant, e.g., ACE_AIO_MAX_SIZE or something. - aiocb_list_max_size_ = 8192; - - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); - // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) { aiocb_list_[ai] = 0; - result_list_[ai] = 0; + result_list_ [ai] = 0; } - // @@ We should create Notify_Pipe_Manager in the derived class to - // provide correct calls for virtual functions !!! -} - -// Destructor. -ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) -{ - delete_notify_manager (); - - delete [] aiocb_list_; - aiocb_list_ = 0; - - delete [] result_list_; - result_list_ = 0; -} - -void -ACE_POSIX_AIOCB_Proactor::create_notify_manager (void) -{ // Accept Handler for aio_accept. Remember! this issues a Asynch_Read // on the notify pipe for doing the Asynch_Accept. - - if (aiocb_notify_pipe_manager_ == 0) - ACE_NEW (aiocb_notify_pipe_manager_, - ACE_AIOCB_Notify_Pipe_Manager (this)); + ACE_NEW (aiocb_notify_pipe_manager_, + ACE_AIOCB_Notify_Pipe_Manager (this)); } -void -ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void) +// Destructor. +ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) { - // We are responsible for delete as all pointers set to 0 after - // delete, it is save to delete twice - - delete aiocb_notify_pipe_manager_; - aiocb_notify_pipe_manager_ = 0; } int @@ -672,122 +608,132 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_transmit_file (void) } int -ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) +ACE_POSIX_AIOCB_Proactor::handle_events (unsigned long milli_seconds) { int result_suspend = 0; - if (milli_seconds == ACE_INFINITE) - // Indefinite blocking. - result_suspend = aio_suspend (aiocb_list_, - aiocb_list_max_size_, - 0); + { + // Indefinite blocking. + result_suspend = aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + 0); + } else { // Block on <aio_suspend> for <milli_seconds> timespec timeout; timeout.tv_sec = milli_seconds / 1000; timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000; - result_suspend = aio_suspend (aiocb_list_, - aiocb_list_max_size_, + result_suspend = aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, &timeout); } // Check for errors if (result_suspend == -1) { - if (errno == EAGAIN) // Timeout + // If failure is because of timeout, then return *0*, otherwise + // return -1. + if (errno == EAGAIN) return 0; + else + { + ACE_DEBUG ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::handle_events:" + "aio_suspend failed")); - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::handle_events:" - "aio_suspend failed\n"), - 0); // let continue work + return 0; + } } + // Retrive the result pointer. + ACE_POSIX_Asynch_Result *asynch_result = 0; + size_t ai; int error_status = 0; int return_status = 0; - ACE_POSIX_Asynch_Result *asynch_result = - find_completed_aio (error_status, return_status); + // !!! Protected area. + { + ACE_Guard<ACE_Thread_Mutex> locker (this->mtx_AIOCB_); - if (asynch_result == 0) - return 0; + for (ai = 0; ai < this->aiocb_list_max_size_; ai++) + { + // Dont process null blocks. + if (aiocb_list_ [ai] == 0) + continue; - // Call the application code. - this->application_specific_code (asynch_result, - return_status, // Bytes transferred. - 1, // Success - 0, // No completion key. - error_status); // Error - return 1; -} + // = Analyze error and return values. -ACE_POSIX_Asynch_Result * -ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, - int &return_status) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0)); + // Get the error status of the aio_ operation. + error_status = aio_error (aiocb_list_[ai]); + if (error_status == -1) + // <aio_error> itself has failed. + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::handle_events:" + "<aio_error> has failed"), + -1); - size_t ai; - ACE_POSIX_Asynch_Result *asynch_result = 0; + // Continue the loop if <aio_> operation is still in progress. + if (error_status == EINPROGRESS) + continue; - error_status = 0; - return_status= 0; - - for (ai = 0; ai < aiocb_list_max_size_; ai++) - { - if (aiocb_list_[ai] == 0) // Dont process null blocks. - continue; + // Handle cancel'ed asynchronous operation. We dont have to call + // <aio_return> in this case, since return_status is going to be + // -1. We will pass 0 for the <bytes_transferred> in this case + if (error_status == ECANCELED) + { + return_status = 0; + break; + } + else if (error_status == 0) + { + // Error_status is not -1 and not EINPROGRESS. So, an <aio_> + // operation has finished (successfully or unsuccessfully!!!) + // Get the return_status of the <aio_> operation. + return_status = aio_return (aiocb_list_[ai]); - // Get the error status of the aio_ operation. - error_status = aio_error (aiocb_list_[ai]); + if (return_status == -1) + { + ACE_DEBUG ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::handle_events:" + "<aio_return> failed to transfer any data\n")); - if (error_status == -1) // <aio_error> itself has failed. - { - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_error> has failed\n")); - - // skip this operation - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - aiocb_list_cur_size_--; - - continue; - } + return_status = 0; + } - // Continue the loop if <aio_> operation is still in progress. - if (error_status != EINPROGRESS) - break; + break; + } + } - } // end for + // Something should have completed. + ACE_ASSERT (ai != this->aiocb_list_max_size_); - if (ai >= this->aiocb_list_max_size_) // all processed - return asynch_result; - else if (error_status == ECANCELED) - return_status = 0; - else - return_status = aio_return (aiocb_list_[ai]); + // Retrive the result pointer. + asynch_result = this->result_list_ [ai]; - if (return_status == -1) - { - // was ACE_ERROR_RETURN - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_return> failed\n")); - return_status = 0; // zero bytes transferred - } + // ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, + // this->aiocb_list_[ai]); + // ACE_dynamic_cast (ACE_POSIX_Asynch_Result *, + // this->aiocb_list_[ai]); - asynch_result = result_list_[ai]; + // Invalidate entry in the aiocb list. + this->aiocb_list_[ai] = 0; + this->result_list_ [ai] = 0; + this->aiocb_list_cur_size_--; + } // !! End of protected area. - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - aiocb_list_cur_size_--; + // Call the application code. + this->application_specific_code (asynch_result, + return_status, // Bytes transferred. + 1, // Success + 0, // No completion key. + error_status); // Error - return asynch_result; + // Success + return 1; } void @@ -805,79 +751,79 @@ ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *as } int -ACE_POSIX_AIOCB_Proactor::register_and_start_aio - (ACE_POSIX_Asynch_Result *result, int op) +ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result, int operation) { - ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_and_start_aio"); - - ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); + ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_aio_with_proactor"); - int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0; + // Protect the atomic action , which is: find free slot , start IO , + // save ptr in the lists - if (result == 0) // Just check the status of the list - return ret_val; + ACE_Guard<ACE_Thread_Mutex> locker (this->mtx_AIOCB_); - // Non-zero ptr. Find a free slot and store. - if (ret_val == 0) + if (result == 0) { - for (size_t i= 0; i < this->aiocb_list_max_size_; i++) - if (aiocb_list_[i] == 0) - { - ret_val = start_aio (result, op); - - if (ret_val == 0) // Store the pointers. - { - aiocb_list_[i] = result; - result_list_[i] = result; - - aiocb_list_cur_size_++; - } - return ret_val; - } - - errno = EAGAIN; - ret_val = -1; + // Just check the status of the list. + if (this->aiocb_list_cur_size_ >= + this->aiocb_list_max_size_) + return -1; + else + return 0; } - - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::\n" - "register_and_start_aio: No space to store the <aio>info\n")); - return ret_val; -} -int -ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result, int op) -{ - ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio"); + // Non-zero ptr. Find a free slot and store. + + // Make sure again. + if (this->aiocb_list_cur_size_ >= + this->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Operation: No space to store the <aio> info.\n"), + -1); - int ret_val; - const ACE_TCHAR *ptype; + // Slot(s) available. Find a free one. + size_t ai; + for (ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + if (this->aiocb_list_[ai] == 0) + break; - // Start IO + // Sanity check. + if (ai == this->aiocb_list_max_size_) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Operation: No space to store the <aio> info.\n"), + -1); - switch (op) + // Start the IO. + if (operation == 0) { - case 0: - ptype = "read "; - ret_val = aio_read (result); - break; - case 1: - ptype = "write"; - ret_val = aio_write (result); - break; - default: - ptype = "?????"; - ret_val = -1; - break; + // Read + if (aio_read (result) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Asynch_Read_XXXX: aio_read queueing failed\n"), + -1); + } + } + else + { + // write + if (aio_write (result) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%p\n", + "Asynch_Read_XXXX: aio_read queueing failed\n"), + -1); + } } - - if (ret_val == -1) - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::start_aio: aio_%s %p\n", - ptype, - "queueing failed\n")); - return ret_val; + // Store the pointers. + this->aiocb_list_[ai] = result; + this->result_list_ [ai] = result; + + this->aiocb_list_cur_size_ ++; + + return 0; } // ********************************************************************* @@ -1085,7 +1031,7 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, si); if (is_member == -1) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::%s\n", + "%N:%l:(%P | %t)::\n", "ACE_POSIX_SIG_Proactor::create_asynch_timer:" "sigismember failed"), 0); @@ -1235,10 +1181,10 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) if (sig_info.si_code == SI_ASYNCIO) { // Analyze error and return values. - + int error_status = 0; int return_status = 0; - + // Check the error status error_status = aio_error (asynch_result); @@ -1267,25 +1213,25 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) { return_status = 0; } - else + else { // Get the return_status of the <aio_> operation. return_status = aio_return (asynch_result); // Failure. if (return_status == -1) - { - ACE_ERROR ((LM_ERROR, + { + ACE_DEBUG ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", "ACE_POSIX_SIG_Proactor::handle_events:" - "<aio_return> failed")); - return_status = 0; // zero bytes transferred - } + "<aio_return> failed to transfer any data\n")); + return_status = 0; + } } - // Error status and return status are obtained. Dispatch the - // completion. + // error status and return status are obtained. Dispatch the + // completion . this->application_specific_code (asynch_result, return_status, 1, // Result : True. |