diff options
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 969 |
1 files changed, 398 insertions, 571 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 7fe02c98da6..6e2e76ebdc5 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -14,10 +14,6 @@ #include "ace/POSIX_Proactor.i" #endif /* __ACE_INLINE__ */ -# if defined (ACE_HAS_SYSINFO) -# include <sys/systeminfo.h> -# endif /* ACE_HAS_SYS_INFO */ - class ACE_Export ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result { // = TITLE @@ -44,35 +40,6 @@ public: }; // ********************************************************************* -ACE_POSIX_Proactor::ACE_POSIX_Proactor (void) - : os_id_ (OS_UNDEFINED) -{ -#if defined(sun) - - os_id_ = OS_SUN; // set family - - char Buf [32]; - - ::memset(Buf,0,sizeof(Buf)); - - ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1); - - if (ACE_OS_String::strcasecmp (Buf , "5.6") == 0) - os_id_ = OS_SUN_56; - else if (ACE_OS_String::strcasecmp (Buf , "5.7") == 0) - os_id_ = OS_SUN_57; - else if (ACE_OS_String::strcasecmp (Buf , "5.8") == 0) - os_id_ = OS_SUN_58; - -#elif defined(HPUX) - - os_id_ = OS_HPUX; // set family - // do the same - -//#else defined (LINUX, __FreeBSD__ ...) -//setup here os_id_ -#endif -} ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void) { @@ -366,6 +333,10 @@ ACE_POSIX_Proactor::create_asynch_timer (ACE_Handler &handler, return implementation; } +ACE_POSIX_Proactor::ACE_POSIX_Proactor (void) +{ +} + #if 0 int ACE_POSIX_Proactor::handle_signal (int, siginfo_t *, ucontext_t *) @@ -473,7 +444,7 @@ public: virtual ~ACE_AIOCB_Notify_Pipe_Manager (void); // Destructor. - int notify (); + int notify (ACE_POSIX_Asynch_Result *result); // Send the result pointer through the notification pipe. virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); @@ -507,9 +478,6 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr // Open the pipe. this->pipe_.open (); - // Set write side in NONBLOCK mode - ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK); - // Let AIOCB_Proactor know about our handle posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ()); @@ -526,7 +494,7 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr // Issue an asynch_read on the read_stream of the notify pipe. if (this->read_stream_.read (this->message_block_, - 1, // enough to read 1 byte + sizeof (ACE_POSIX_Asynch_Result *), 0, // ACT 0) // Priority == -1) @@ -541,15 +509,14 @@ ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void) } int -ACE_AIOCB_Notify_Pipe_Manager::notify () +ACE_AIOCB_Notify_Pipe_Manager::notify (ACE_POSIX_Asynch_Result *result) { // Send the result pointer through the pipe. - char char_send = 0; - int ret_val = ACE::send (this->pipe_.write_handle (), - & char_send , - sizeof (char_send)); - - if (ret_val < 0 && errno != EWOULDBLOCK) + int return_val = ACE::send (this->pipe_.write_handle (), + ACE_reinterpret_cast (char *, + &result), + sizeof (result)); + if (return_val != sizeof (result)) ACE_ERROR_RETURN ((LM_ERROR, "(%P %t):%p\n", "ACE_AIOCB_Notify_Pipe_Manager::notify" @@ -559,10 +526,20 @@ ACE_AIOCB_Notify_Pipe_Manager::notify () } void -ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream - (const ACE_Asynch_Read_Stream::Result & /*result*/) +ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { - // 1. Start new read to avoid pipe overflow + // The message block actually contains the ACE_POSIX_Asynch_Result + // pointer. + ACE_POSIX_Asynch_Result *asynch_result = 0; + asynch_result = *(ACE_POSIX_Asynch_Result **) result.message_block ().rd_ptr (); + + // Do the upcall. + this->posix_aiocb_proactor_->application_specific_code + (asynch_result, + asynch_result->bytes_transferred(), // 0, No bytes transferred. + 1, // Result : True. + 0, // No completion key. + asynch_result->error()); //0, No error. // Set the message block properly. Put the <wr_ptr> back in the // initial position. @@ -572,7 +549,7 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream // One accept has completed. Issue a read to handle any // <post_completion>s in the future. if (this->read_stream_.read (this->message_block_, - 1, // enough to read 1 byte + sizeof (ACE_POSIX_Asynch_Result *), 0, // ACT 0) // Priority == -1) @@ -580,12 +557,8 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream "%N:%l:(%P | %t):%p\n", "ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:" "Read from pipe failed")); - - - // 2. Do the upcalls - // this->posix_aiocb_proactor_->process_result_queue (); } - + // Public constructor for common use. ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) : aiocb_notify_pipe_manager_ (0), @@ -595,10 +568,10 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) aiocb_list_cur_size_ (0), notify_pipe_read_handle_ (ACE_INVALID_HANDLE), num_deferred_aiocb_ (0), - num_started_aio_ (0) + num_started_aio_(0) { //check for correct value for max_aio_operations - check_max_aio_num (); + check_max_aio_num () ; ACE_NEW (aiocb_list_, aiocb *[aiocb_list_max_size_]); @@ -616,8 +589,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) } // Special protected constructor for ACE_SUN_Proactor -ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, - ACE_POSIX_Proactor::Proactor_Type ptype) +ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,int Flg) : aiocb_notify_pipe_manager_ (0), aiocb_list_ (0), result_list_ (0), @@ -625,12 +597,12 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, aiocb_list_cur_size_ (0), notify_pipe_read_handle_ (ACE_INVALID_HANDLE), num_deferred_aiocb_ (0), - num_started_aio_ (0) + num_started_aio_(0) { - ACE_UNUSED_ARG (ptype); + ACE_UNUSED_ARG (Flg); //check for correct value for max_aio_operations - check_max_aio_num (); + check_max_aio_num () ; ACE_NEW (aiocb_list_, aiocb *[aiocb_list_max_size_]); @@ -657,73 +629,60 @@ ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) // as nobody will notify client since now for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) { - delete result_list_[ai]; + delete result_list_[ai] ; result_list_[ai] = 0; aiocb_list_[ai] = 0; } + delete [] aiocb_list_; aiocb_list_ = 0; delete [] result_list_; result_list_ = 0; - - clear_result_queue (); } void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) { - notify_pipe_read_handle_ = h; + notify_pipe_read_handle_ = h ; } void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () { - long max_os_aio_num = ACE_OS ::sysconf (_SC_AIO_MAX); + long max_os_aio_num = ACE_OS ::sysconf ( _SC_AIO_MAX ); // Define max limit AIO's for concrete OS // -1 means that there is no limit, but it is not true - // (example, SunOS 5.6) + // ( example, SunOS 5.6) - if (max_os_aio_num > 0 - && aiocb_list_max_size_ > (unsigned long) max_os_aio_num - ) - aiocb_list_max_size_ = max_os_aio_num; + if ( max_os_aio_num > 0 + && aiocb_list_max_size_ > ( unsigned long ) max_os_aio_num + ) + aiocb_list_max_size_ = max_os_aio_num ; + +#if defined(HPUX) -#if defined (HPUX) // Although HPUX 11.00 allows to start 2048 AIO's // for all process in system // it has a limit 256 max elements for aio_suspend () // It is a pity, but ... - long max_os_listio_num = ACE_OS ::sysconf (_SC_AIO_LISTIO_MAX); - if (max_os_listio_num > 0 - && aiocb_list_max_size_ > (unsigned long) max_os_listio_num) - aiocb_list_max_size_ = max_os_listio_num; -#endif /* HPUX */ + long max_os_listio_num = ACE_OS ::sysconf ( _SC_AIO_LISTIO_MAX ); + if ( max_os_listio_num > 0 + && aiocb_list_max_size_ > ( unsigned long ) max_os_listio_num + ) + aiocb_list_max_size_ = max_os_listio_num ; + +#endif - // check for user-defined value + // The last check for user-defined value // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h - if (aiocb_list_max_size_ <= 0 - || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE) + if ( aiocb_list_max_size_ <= 0 + || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE + ) aiocb_list_max_size_ = ACE_AIO_MAX_SIZE; - // check for max number files to open - - int max_num_files = ACE::max_handles (); - - if (max_num_files > 0 - && aiocb_list_max_size_ > (unsigned long) max_num_files) - { - ACE::set_handle_limit (aiocb_list_max_size_); - - max_num_files = ACE::max_handles (); - } - - if (max_num_files > 0 - && aiocb_list_max_size_ > (unsigned long) max_num_files) - aiocb_list_max_size_ = (unsigned long) max_num_files; - ACE_DEBUG ((LM_DEBUG, "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", aiocb_list_max_size_)); @@ -766,97 +725,10 @@ ACE_POSIX_AIOCB_Proactor::handle_events (void) } int -ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num) -{ - ACE_UNUSED_ARG (sig_num); - - return this->aiocb_notify_pipe_manager_->notify (); -} - -int ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1)); - - int ret_val = this->putq_result (result); - - return ret_val; -} - -int -ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result) -{ - // this protected method should be called with locked mutex_ - // we can't use GUARD as Proactor uses non-recursive mutex - - if (!result) - return -1; - - int sig_num = result->signal_number (); - int ret_val = this->result_queue_.enqueue_tail (result); - - if (ret_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"), - -1); - - this->notify_completion (sig_num); - - return 0; -} - -ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0)); - - - ACE_POSIX_Asynch_Result* result = 0; - - if (this->result_queue_.dequeue_head (result) != 0) - return 0; - -// don;t waste time if queue is empty - it is normal -// or check queue size before dequeue_head -// ACE_ERROR_RETURN ((LM_ERROR, -// "%N:%l:(%P | %t):%p\n", -// "ACE_POSIX_AIOCB_Proactor::getq_result failed"), -// 0); - - return result; -} - -int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void) -{ - int ret_val = 0; - ACE_POSIX_Asynch_Result* result = 0; - - while ((result = this->getq_result ()) != 0) - { - delete result; - ret_val++; - } - - return ret_val; -} - -int ACE_POSIX_AIOCB_Proactor::process_result_queue (void) -{ - int ret_val = 0; - ACE_POSIX_Asynch_Result* result = 0; - - while ((result = this->getq_result ()) != 0) - { - this->application_specific_code - (result, - result->bytes_transferred(), // 0, No bytes transferred. - 1, // Result : True. - 0, // No completion key. - result->error()); //0, No error. - - ret_val++; - } - - return ret_val; + // Notify to the completion queue. + return this->aiocb_notify_pipe_manager_->notify (result); } ACE_Asynch_Read_Stream_Impl * @@ -946,7 +818,6 @@ int ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) { int result_suspend = 0; - int retval= 0; if (milli_seconds == ACE_INFINITE) // Indefinite blocking. @@ -967,58 +838,49 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) // Check for errors if (result_suspend == -1) { - if (errno != EAGAIN && // Timeout - errno != EINTR ) // Interrupted call - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::handle_events:" - "aio_suspend failed\n")); + if (errno == EAGAIN) // Timeout + return 0; - // let continue work - // we should check "post_completed" queue + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::handle_events:" + "aio_suspend failed\n"), + 0); // let continue work } - else - { - size_t index = 0; - size_t count = aiocb_list_max_size_; // max number to iterate - int error_status = 0; - int return_status = 0; - for (;; retval++) - { - ACE_POSIX_Asynch_Result *asynch_result = - find_completed_aio (error_status, - return_status, - index, - count); - - if (asynch_result == 0) - break; - - // Call the application code. - this->application_specific_code (asynch_result, - return_status, // Bytes transferred. - 1, // Success - 0, // No completion key. - error_status); // Error - } - } + size_t index = 0; + int error_status = 0; + int return_status = 0; + + int retval= 0; + + for ( ; ; ) + { + ACE_POSIX_Asynch_Result *asynch_result = + find_completed_aio (error_status, return_status,index); + + if (asynch_result == 0) + break; + + //at least one processed + retval = 1 ; // more informative retval++ - // process post_completed results - retval += this->process_result_queue (); + // Call the application code. + this->application_specific_code (asynch_result, + return_status, // Bytes transferred. + 1, // Success + 0, // No completion key. + error_status); // Error + } - return retval > 0 ? 1 : 0; + return retval; } ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, int &return_status, - size_t &index, - size_t &count) + size_t &index ) { - // parameter index defines initial slot to scan - // parameter count tells us how many slots should we scan - ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0)); ACE_POSIX_Asynch_Result *asynch_result = 0; @@ -1026,15 +888,11 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, error_status = 0; return_status= 0; - if (num_started_aio_ == 0) // save time + if ( num_started_aio_ == 0 ) // save time return asynch_result; - - for (; count > 0; index++ , count--) + for (; index < aiocb_list_max_size_; index++) { - if (index >= aiocb_list_max_size_) // like a wheel - index = 0; - if (aiocb_list_[index] == 0) // Dont process null blocks. continue; @@ -1066,10 +924,9 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, } // end for - if (count == 0) // all processed , nothing found + if (index >= this->aiocb_list_max_size_) // all processed return asynch_result; - - if (error_status == ECANCELED) + else if (error_status == ECANCELED) return_status = 0; else if (error_status == -1) return_status = 0; @@ -1095,8 +952,7 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, aiocb_list_cur_size_--; num_started_aio_--; // decrement count active aios - index++; // for next iteration - count--; // for next iteration + index++ ; // for next iteration this->start_deferred_aio (); //make attempt to start deferred AIO @@ -1120,8 +976,8 @@ ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *as } int -ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *result, - int op) +ACE_POSIX_AIOCB_Proactor::register_and_start_aio + (ACE_POSIX_Asynch_Result *result, int op) { ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::register_and_start_aio"); @@ -1133,8 +989,8 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul return ret_val; // Save operation code in the aiocb - switch (op) - { + switch ( op ) + { case 0 : result->aio_lio_opcode = LIO_READ; break; @@ -1145,97 +1001,79 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul default: ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::\n" - "register_and_start_aio: Invalid operation code\n"), - -1); - } + "%N:%l:(%P | %t)::\n" + "register_and_start_aio: Invalid operation code\n"), + -1); + } if (ret_val != 0) // No free slot { errno = EAGAIN; ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::\n" - "register_and_start_aio: " - "No space to store the <aio>info\n"), - -1); + "%N:%l:(%P | %t)::\n" + "register_and_start_aio: " + "No space to store the <aio>info\n"), + -1); } // Find a free slot and store. - - ret_val = allocate_aio_slot (result); - - if (ret_val < 0) - return -1; - - size_t index = ACE_static_cast (size_t, ret_val); - - result_list_[index] = result; //Store result ptr anyway - aiocb_list_cur_size_++; - - ret_val = start_aio (result); - - switch (ret_val) - { - case 0 : // started OK - aiocb_list_[index] = result; - return 0; - - case 1 : //OS AIO queue overflow - num_deferred_aiocb_ ++; - return 0; - - default: //Invalid request, there is no point - break; // to start it later - } - - result_list_[index] = 0; - aiocb_list_cur_size_--; - - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "register_and_start_aio: Invalid request to start <aio>\n")); - return -1; -} - -int -ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) -{ - size_t i = 0; - // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager // so make check for ACE_AIOCB_Notify_Pipe_Manager request - if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ? + size_t i = 0; + + if ( notify_pipe_read_handle_ == result->aio_fildes ) // Notify_Pipe ? { // should be free, - if (result_list_[i] != 0) // only 1 request + if ( result_list_[i] != 0 ) // only 1 request { // is allowed errno = EAGAIN; ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" - "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" + "register_and_start_aio:" "internal Proactor error 0\n"), - -1); + -1 ); } } else //try to find free slot as usual, but starting from 1 { - for (i= 1; i < this->aiocb_list_max_size_; i++) + for ( i= 1; i < this->aiocb_list_max_size_; i++) if (result_list_[i] == 0) - break; + break ; } - if (i >= this->aiocb_list_max_size_) + if ( i >= this->aiocb_list_max_size_ ) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" - "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:" + "register_and_start_aio: " "internal Proactor error 1\n"), -1); + result_list_[i] = result; //Store result ptr anyway + aiocb_list_cur_size_++; + + ret_val = start_aio (result); + + switch ( ret_val ) + { + case 0 : // started OK + aiocb_list_[i] = result; + return 0 ; + + case 1 : //OS AIO queue overflow + num_deferred_aiocb_ ++ ; + return 0 ; + + default: //Invalid request, there is no point + break; // to start it later + } - //setup OS notification methods for this aio - result->aio_sigevent.sigev_notify = SIGEV_NONE; + result_list_[i] = 0; + aiocb_list_cur_size_--; - return ACE_static_cast (int, i); + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "register_and_start_aio: Invalid request to start <aio>\n")); + return -1; } // start_aio has new return codes @@ -1253,7 +1091,7 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) // Start IO - switch (result->aio_lio_opcode ) + switch (result->aio_lio_opcode ) { case LIO_READ : ptype = "read "; @@ -1269,12 +1107,12 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) break; } - if (ret_val == 0) - num_started_aio_ ++; + if (ret_val == 0 ) + num_started_aio_ ++ ; else // if (ret_val == -1) { - if (errno == EAGAIN) //Ok, it will be deferred AIO - ret_val = 1; + if ( errno == EAGAIN ) //Ok, it will be deferred AIO + ret_val = 1 ; else ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::start_aio: aio_%s %p\n", @@ -1294,22 +1132,22 @@ ACE_POSIX_AIOCB_Proactor::start_deferred_aio () // This protected method is called from // find_completed_aio after any AIO completion // We should call this method always with locked - // ACE_POSIX_AIOCB_Proactor::mutex_ + // ACE_POSIX_AIOCB_Proactor::mutex_ // // It tries to start the first deferred AIO // if such exists - if (num_deferred_aiocb_ == 0) - return 0; // nothing to do + if ( num_deferred_aiocb_ == 0 ) + return 0 ; // nothing to do size_t i = 0; - for (i= 0; i < this->aiocb_list_max_size_; i++) - if (result_list_[i] !=0 // check for - && aiocb_list_[i] ==0) // deferred AIO - break; + for ( i= 0; i < this->aiocb_list_max_size_; i++) + if ( result_list_[i] !=0 // check for + && aiocb_list_[i] ==0 ) // deferred AIO + break ; - if (i >= this->aiocb_list_max_size_) + if ( i >= this->aiocb_list_max_size_ ) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "start_deferred_aio:" @@ -1320,18 +1158,18 @@ ACE_POSIX_AIOCB_Proactor::start_deferred_aio () int ret_val = start_aio (result); - switch (ret_val) + switch ( ret_val ) { case 0 : //started OK , decrement count of deferred AIOs aiocb_list_[i] = result; - num_deferred_aiocb_ --; - return 0; + num_deferred_aiocb_ -- ; + return 0 ; case 1 : - return 0; //try again later + return 0 ; //try again later default : // Invalid Parameters , should never be - break; + break ; } //AL notify user @@ -1339,17 +1177,17 @@ ACE_POSIX_AIOCB_Proactor::start_deferred_aio () result_list_[i] = 0; aiocb_list_cur_size_--; - num_deferred_aiocb_ --; + num_deferred_aiocb_ -- ; result->set_error (errno); result->set_bytes_transferred (0); - this->putq_result (result); // we are with locked mutex_ here ! + this->post_completion ( result ); return -1; } int -ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle) +ACE_POSIX_AIOCB_Proactor::cancel_aio ( ACE_HANDLE handle ) { // This new method should be called from // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel @@ -1366,66 +1204,61 @@ ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle) ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio"); + ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); + int num_total = 0; int num_cancelled = 0; + size_t ai = 0; - { - ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1)); - - size_t ai = 0; - - for (ai = 0; ai < aiocb_list_max_size_; ai++) - { - if (result_list_[ai] == 0) //skip empty slot - continue; - - if (result_list_[ai]->aio_fildes != handle) //skip not our slot - continue; + for (ai = 0; ai < aiocb_list_max_size_; ai++) + { + if ( result_list_[ai] == 0 ) //skip empty slot + continue ; - num_total++; + if ( result_list_[ai]->aio_fildes != handle ) //skip not our slot + continue ; - ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; + num_total++ ; - if (aiocb_list_ [ai] == 0) //deferred aio - { - num_cancelled ++; - num_deferred_aiocb_ --; + ACE_POSIX_Asynch_Result *asynch_result = result_list_[ai]; - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - aiocb_list_cur_size_--; + if ( aiocb_list_ [ai] == 0 ) //deferred aio + { + num_cancelled ++ ; + num_deferred_aiocb_ -- ; - asynch_result->set_error (ECANCELED); - asynch_result->set_bytes_transferred (0); - this->putq_result (asynch_result); - // we are with locked mutex_ here ! - } - else //cancel started aio - { - int rc_cancel = this->cancel_aiocb (asynch_result); + aiocb_list_[ai] = 0; + result_list_[ai] = 0; + aiocb_list_cur_size_--; - if (rc_cancel == 0) //notification in the future - num_cancelled ++; //it is OS responsiblity - } - } + asynch_result->set_error (ECANCELED); + asynch_result->set_bytes_transferred (0); + this->post_completion ( asynch_result ); + } + else //cancel started aio + { + int rc_cancel = this->cancel_aiocb (asynch_result ); - } // release mutex_ + if ( rc_cancel == 0 ) //notification in the future + num_cancelled ++ ; //it is OS responsiblity + } + } - if (num_total == 0) + if ( num_total == 0 ) return 1; // ALLDONE - if (num_cancelled == num_total) + if ( num_cancelled == num_total ) return 0; // CANCELLED return 2; // NOT CANCELLED } int -ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result) +ACE_POSIX_AIOCB_Proactor::cancel_aiocb ( ACE_POSIX_Asynch_Result * result ) { // This new method is called from cancel_aio // to cancel concrete running AIO request - int rc = ::aio_cancel (0, result); + int rc = ::aio_cancel (0, result ); // Check the return value and return 0/1/2 appropriately. if (rc == AIO_CANCELED) @@ -1446,9 +1279,7 @@ ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result) // ********************************************************************* -ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations) - : ACE_POSIX_AIOCB_Proactor (max_aio_operations, - ACE_POSIX_Proactor::PROACTOR_SIG) +ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (void) { // = Set up the mask we'll use to block waiting for SIGRTMIN. Use that // to add it to the signal mask for this thread, and also set the process @@ -1468,15 +1299,9 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations) this->mask_signals (&this->RT_completion_signals_); // Set up the signal action for SIGRTMIN. this->setup_signal_handler (ACE_SIGRTMIN); - - // we do not have to create notify manager - return; } -ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, - size_t max_aio_operations) - : ACE_POSIX_AIOCB_Proactor (max_aio_operations, - ACE_POSIX_Proactor::PROACTOR_SIG) +ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set) { // = Keep <Signal_set> with the Proactor, mask all the signals and // setup signal actions for the signals in the <signal_set>. @@ -1512,7 +1337,6 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, // Mask all the signals. this->mask_signals (&this->RT_completion_signals_); - // we do not have to create notify manager return; } @@ -1536,7 +1360,7 @@ ACE_POSIX_SIG_Proactor::handle_events (void) } int -ACE_POSIX_SIG_Proactor::notify_completion (int sig_num) +ACE_POSIX_SIG_Proactor::post_completion (ACE_POSIX_Asynch_Result *result) { // Get this process id. pid_t pid = ACE_OS::getpid (); @@ -1549,29 +1373,105 @@ ACE_POSIX_SIG_Proactor::notify_completion (int sig_num) // Set the signal information. sigval value; #if defined (__FreeBSD__) - value.sigval_int = -1; + value.sigval_ptr = ACE_reinterpret_cast (void *, + result); #else - value.sival_int = -1; + value.sival_ptr = ACE_reinterpret_cast (void *, + result); #endif /* __FreeBSD__ */ - // Solaris 8 can "forget" to delivery - // two or more signals queued immediately. - // Just comment the following "if" statement - // and try this->post_completion(2) - - if (os_id_ == OS_SUN_58 && result_queue_.size() > 1) - return 0; - // Queue the signal. - if (sigqueue (pid, sig_num, value) == 0) - return 0; - - if (errno != EAGAIN) + if (sigqueue (pid, result->signal_number (), value) == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error:%N:%l:(%P | %t):%p\n", "<sigqueue> failed"), -1); - return -1; + return 0; +} + +ACE_Asynch_Read_Stream_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_read_stream (void) +{ + ACE_Asynch_Read_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Read_Stream (this), + 0); + return implementation; +} + +ACE_Asynch_Write_Stream_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_write_stream (void) +{ + ACE_Asynch_Write_Stream_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Write_Stream (this), + 0); + return implementation; +} + +ACE_Asynch_Read_Dgram_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_read_dgram (void) +{ + ACE_Asynch_Read_Dgram_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Read_Dgram(this), + 0); + + return implementation; +} + +ACE_Asynch_Write_Dgram_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_write_dgram (void) +{ + ACE_Asynch_Write_Dgram_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Write_Dgram(this), + 0); + + + return implementation; +} + +ACE_Asynch_Read_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_read_file (void) +{ + ACE_Asynch_Read_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Read_File (this), + 0); + return implementation; +} + +ACE_Asynch_Write_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_write_file (void) +{ + ACE_Asynch_Write_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Write_File (this), + 0); + return implementation; +} + +ACE_Asynch_Accept_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_accept (void) +{ + ACE_Asynch_Accept_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Accept (this), + 0); + + // was ACE_POSIX_SIG_Asynch_Accept (this), + return implementation; +} + +ACE_Asynch_Transmit_File_Impl * +ACE_POSIX_SIG_Proactor::create_asynch_transmit_file (void) +{ + ACE_Asynch_Transmit_File_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_SIG_Asynch_Transmit_File (this), + 0); + return implementation; } ACE_Asynch_Result_Impl * @@ -1625,16 +1525,6 @@ ACE_POSIX_SIG_Proactor::create_asynch_timer (ACE_Handler &handler, return implementation; } - -void sig_handler (int sig_num, siginfo_t *, ucontext_t *) -{ - // Should never be called - ACE_DEBUG ((LM_DEBUG, - "%N:%l:(%P | %t)::sig_handler received signal: %d\n", - sig_num)); - return; -} - int ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const { @@ -1642,17 +1532,10 @@ ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const // passed to sigwaitinfo/sigtimedwait. Don't change the default // signal handler - having a handler and waiting for the signal can // produce undefined behavior. - - // But can not use SIG_DFL - // With SIG_DFL after delivering the first signal - // SIG_DFL handler resets SA_SIGINFO flags - // and we will lose all information sig_info - // At least all SunOS have such behavior - struct sigaction reaction; sigemptyset (&reaction.sa_mask); // Nothing else to mask. reaction.sa_flags = SA_SIGINFO; // Realtime flag. - reaction.sa_sigaction = ACE_SIGNAL_C_FUNC(sig_handler); // (SIG_DFL); + reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (SIG_DFL); int sigaction_return = ACE_OS::sigaction (signal_number, &reaction, 0); @@ -1677,34 +1560,6 @@ ACE_POSIX_SIG_Proactor::mask_signals (const sigset_t *signals) const } int -ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) -{ - size_t i = 0; - - //try to find free slot as usual, starting from 0 - for (i = 0; i < this->aiocb_list_max_size_; i++) - if (result_list_[i] == 0) - break; - - if (i >= this->aiocb_list_max_size_) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::\n" - "ACE_POSIX_SIG_Proactor::allocate_aio_slot " - "internal Proactor error 1\n"), - -1); - - int retval = ACE_static_cast (int, i); - - // setup OS notification methods for this aio - // store index!!, not pointer in signal info - result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; - result->aio_sigevent.sigev_signo = result->signal_number (); - result->aio_sigevent.sigev_value.sival_int = retval; - - return retval; -} - -int ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) { int result_sigwait = 0; @@ -1731,139 +1586,126 @@ ACE_POSIX_SIG_Proactor::handle_events (unsigned long milli_seconds) &timeout); } - size_t index = 0; // start index to scan aiocb list - size_t count = aiocb_list_max_size_; // max number to iterate - int error_status = 0; - int return_status = 0; - int flg_aio = 0; // 1 if AIO Completion possible - int flg_que = 0; // 1 if SIGQUEUE possible + // Check for errors + if (result_sigwait == -1) + { + // If failure is coz of timeout, then return *0* but set errno + // appropriately. + if (errno == EAGAIN) + return 0; + else + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::handle_events:" + "sigtimedwait/sigwaitinfo failed"), + -1); + } + + // No errors, RT compleion signal is received. + + // Is the signo returned consistent with the sig info? + if (sig_info.si_signo != result_sigwait) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:%N:%l:(%P | %t):" + "ACE_POSIX_SIG_Proactor::handle_events:" + "Inconsistent signal number (%d) in the signal info block", + sig_info.si_signo), + -1); + + // Retrive the result pointer. + ACE_POSIX_Asynch_Result *asynch_result = 0; - // define index to start - // nothing will happen if it contains garbage #if defined (__FreeBSD__) - index = ACE_static_cast (size_t, sig_info.si_value.sigval_int); + asynch_result = + ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, + sig_info.si_value.sigval_ptr); #else - index = ACE_static_cast (size_t, sig_info.si_value.sival_int); + asynch_result = + ACE_reinterpret_cast (ACE_POSIX_Asynch_Result *, + sig_info.si_value.sival_ptr); #endif - // Check for errors - // but let continue work in case of errors - // we should check "post_completed" queue - if (result_sigwait == -1) + // Check the <signal code> and act according to that. + if (sig_info.si_code == SI_ASYNCIO) { - if (errno != EAGAIN && // timeout - errno != EINTR ) // interrupted system call - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_SIG_Proactor::handle_events:" - "sigtimedwait/sigwaitinfo failed" - )); - } - else if (sig_info.si_signo != result_sigwait) - { - // No errors, RT compleion signal is received. - // Is the signo returned consistent with the sig info? - ACE_ERROR ((LM_ERROR, - "Error:%N:%l:(%P | %t):" - "ACE_POSIX_SIG_Proactor::handle_events:" - "Inconsistent signal number (%d) in the signal info block", - sig_info.si_signo - )); - } - else if (sig_info.si_code == SI_ASYNCIO) - flg_aio = 1; // AIO signal received - else if (sig_info.si_code == SI_QUEUE) - flg_que = 1; // SIGQUEUE received - else - { - // Unknown signal code. - // may some other third-party libraries could send it - // or message queue could also generate it ! - // So print the message and check our completions - ACE_ERROR ((LM_DEBUG, - "%N:%l:(%P | %t):" - "ACE_POSIX_SIG_Proactor::handle_events:\n" - "Unexpected signal code (%d) returned on completion querying\n", - sig_info.si_code)); - } + // Analyze error and return values. - // extra actions for different systems - if (os_id_ == OS_SUN_58) // Solaris 8 - { - // Solaris 8 never loses any AIO completion It can store more - // than 40000 notifications! So don't waste time to scan all - // aiocb list We know exactly what finished in case SI_ASYNCHIO - - // But we can easy have lost SI_QUEUE - - if (flg_aio) // AIO - correct behavior - count = 1; - flg_que=1; // not to miss "post_completed" results - } - else if (os_id_ == OS_SUN_56) // Solaris 6 - { - // 1. Solaris 6 always loses any RT signal, - // if it has more SIGQUEMAX=32 pending signals - // so we should scan the whole aiocb list - // 2. Moreover,it has one more bad habit - // to notify aio completion - // with SI_QUEUE code instead of SI_ASYNCIO. - - // this is reliable solution - flg_aio =1; // always find_completed_aio - flg_que =1; // always scan queue - count = aiocb_list_max_size_; - } - else // insert here specific for other systems - { - // this is reliable solution - flg_aio =1; // always find_completed_aio - flg_que =1; // always scan queue - count = aiocb_list_max_size_; - } - - // At this point we have - // if (flg_aio) - // scan aiocb list starting with "index" slot - // no more "count" times - // till we have no more AIO completed - // if (flg_que) - // check "post_completed" queue - - int ret_aio = 0; - int ret_que = 0; - - if (flg_aio) - for (;; ret_aio++) - { - ACE_POSIX_Asynch_Result *asynch_result = - find_completed_aio (error_status, - return_status, - index, - count); - - if (asynch_result == 0) - break; + int error_status = 0; + int return_status = 0; - // Call the application code. - this->application_specific_code (asynch_result, - return_status, // Bytes transferred. - 1, // Success - 0, // No completion key. - error_status); // Error - } + // Check the error status + error_status = aio_error (asynch_result); - // process post_completed results - if (flg_que) - ret_que = this->process_result_queue (); + // <aio_error> itself has failed. + if (error_status == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::handle_events:" + "<aio_error> has failed"), + -1); + + // Completion signal has been received, so it can't be in + // progress. + if (error_status == EINPROGRESS) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::handle_events:" + "Internal error: AIO in progress. " + "But completion signal was received"), + -1); + + // Handle cancel'ed asynchronous operation. We dont have to call + // <aio_return> in this case, since return_status is going to be + // -1. We will pass 0 for the <bytes_transferred> in this case + if (error_status == ECANCELED) + { + return_status = 0; + } + else + { + // Get the return_status of the <aio_> operation. + return_status = aio_return (asynch_result); + + // Failure. + if (return_status == -1) + { + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_SIG_Proactor::handle_events:" + "<aio_return> failed")); + return_status = 0; // zero bytes transferred + } + + } - // Uncomment this if you want to test - // and research the behavior of you system - // ACE_DEBUG ((LM_DEBUG, - // "(%t) NumAIO=%d NumQueue=%d\n", - // ret_aio, ret_que)); + // Error status and return status are obtained. Dispatch the + // completion. + this->application_specific_code (asynch_result, + return_status, + 1, // Result : True. + 0, // No completion key. + error_status); // Error. + } + else if (sig_info.si_code == SI_QUEUE) + { + this->application_specific_code + (asynch_result, + asynch_result->bytes_transferred(), // 0, No bytes transferred. + 1, // Result : True. + 0, // No completion key. + asynch_result->error()); //0, No error. + } + else + // Unknown signal code. + ACE_ERROR_RETURN ((LM_DEBUG, + "%N:%l:(%P | %t):", + "ACE_POSIX_SIG_Proactor::handle_events:\n" + "Unexpected signal code (%d) returned on completion querying\n", + sig_info.si_code), + -1); - return ret_aio + ret_que > 0 ? 1 : 0; + // Success + return 1; } // ********************************************************************* @@ -1911,22 +1753,7 @@ ACE_POSIX_Wakeup_Completion::complete (u_long /* bytes_transferred */, const void * /* completion_key */, u_long /* error */) { - this->handler_.handle_wakeup (); } -#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) - -template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>; -template class ACE_Node<ACE_POSIX_Asynch_Result *>; -template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>; - -#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) - -#pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> -#pragma instantiate ACE_Node<ACE_POSIX_Asynch_Result *> -#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *> - -#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ - #endif /* ACE_HAS_AIO_CALLS */ |