diff options
author | Steve Huston <shuston@riverace.com> | 2002-04-25 19:50:17 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2002-04-25 19:50:17 +0000 |
commit | 8286d1808e4e4369bbe90c214b28a13ab8e9c009 (patch) | |
tree | 23892818bb314bea6233b2cf61449e8777f800fa /ace/POSIX_Proactor.cpp | |
parent | 90c994f617ee3cb95a7b2c379602dd822a0fb9be (diff) | |
download | ATCD-8286d1808e4e4369bbe90c214b28a13ab8e9c009.tar.gz |
ChangeLogTag:Thu Apr 25 15:46:39 2002 Steve Huston <shuston@riverace.com>
Diffstat (limited to 'ace/POSIX_Proactor.cpp')
-rw-r--r-- | ace/POSIX_Proactor.cpp | 356 |
1 files changed, 244 insertions, 112 deletions
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 381d7521303..c3fba575a62 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -562,23 +562,51 @@ ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Pr ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void) { + // 1. try to cancel pending aio + this->read_stream_.cancel (); + + // 2. close both handles + // Destuctor of ACE_Pipe does not close handles. + // We can not use ACE_Pipe::close() as it + // closes read_handle and than write_handle. + // In some systems close() may wait for + // completion for all asynch. pending requests. + // So we should close write_handle firstly + // to force read completion ( if 1. does not help ) + // and then read_handle and not vice versa + + ACE_HANDLE h = this->pipe_.write_handle (); + if (h != ACE_INVALID_HANDLE) + ACE_OS::closesocket (h); + + h = this->pipe_.read_handle (); + if ( h != ACE_INVALID_HANDLE) + ACE_OS::closesocket (h); + } + int ACE_AIOCB_Notify_Pipe_Manager::notify () { // Send the result pointer through the pipe. char char_send = 0; - int ret_val = ACE::send (this->pipe_.write_handle (), - & char_send , - sizeof (char_send)); + int ret_val = ACE::send (this->pipe_.write_handle (), + &char_send, + sizeof (char_send)); + + if (ret_val < 0) + { + if (errno != EWOULDBLOCK) +#if 0 + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("(%P %t):%p\n"), + ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify") + ACE_LIB_TEXT ("Error:Writing on to notify pipe failed"))); +#endif /* 0 */ + return -1; + } - if (ret_val < 0 && errno != EWOULDBLOCK) - ACE_ERROR_RETURN ((LM_ERROR, - "(%P %t):%p\n", - "ACE_AIOCB_Notify_Pipe_Manager::notify" - "Error:Writing on to notify pipe failed"), - -1); return 0; } @@ -595,15 +623,14 @@ ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream // One accept has completed. Issue a read to handle any // <post_completion>s in the future. - if (this->read_stream_.read (this->message_block_, - 1, // enough to read 1 byte - 0, // ACT - 0) // Priority - == -1) + if (-1 == this->read_stream_.read (this->message_block_, + 1, // enough to read 1 byte + 0, // ACT + 0)) // Priority ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:" - "Read from pipe failed")); + ACE_LIB_TEXT ("%N:%l:(%P | %t):%p\n"), + ACE_LIB_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:") + ACE_LIB_TEXT ("Read from pipe failed"))); // 2. Do the upcalls @@ -621,22 +648,12 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) num_deferred_aiocb_ (0), num_started_aio_ (0) { - //check for correct value for max_aio_operations + // Check for correct value for max_aio_operations check_max_aio_num (); - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); + this->create_result_aiocb_list (); - // Initialize the array. - for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) - { - aiocb_list_[ai] = 0; - result_list_[ai] = 0; - } - - create_notify_manager (); + this->create_notify_manager (); // start pseudo-asynchronous accept task // one per all future acceptors @@ -646,7 +663,7 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) // Special protected constructor for ACE_SUN_Proactor ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, - ACE_POSIX_Proactor::Proactor_Type ptype) + ACE_POSIX_Proactor::Proactor_Type) : aiocb_notify_pipe_manager_ (0), aiocb_list_ (0), result_list_ (0), @@ -656,15 +673,49 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, num_deferred_aiocb_ (0), num_started_aio_ (0) { - ACE_UNUSED_ARG (ptype); - //check for correct value for max_aio_operations - check_max_aio_num (); + this->check_max_aio_num (); + + this->create_result_aiocb_list (); + + // @@ We should create Notify_Pipe_Manager in the derived class to + // provide correct calls for virtual functions !!! +} + +// Destructor. +ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) +{ + this->close(); +} + +int +ACE_POSIX_AIOCB_Proactor::close (void) +{ + // stop asynch accept task + this->get_asynch_pseudo_task().stop (); + + this->delete_notify_manager (); + + this->clear_result_queue (); + + return this->delete_result_aiocb_list (); +} + +void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) +{ + notify_pipe_read_handle_ = h; +} - ACE_NEW (aiocb_list_, - aiocb *[aiocb_list_max_size_]); - ACE_NEW (result_list_, - ACE_POSIX_Asynch_Result *[aiocb_list_max_size_]); +int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void) +{ + if (aiocb_list_ != 0) + return 0; + + ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1); + + ACE_NEW_RETURN (result_list_, + ACE_POSIX_Asynch_Result *[aiocb_list_max_size_], + -1); // Initialize the array. for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++) @@ -673,54 +724,99 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, result_list_[ai] = 0; } - - // @@ We should create Notify_Pipe_Manager in the derived class to - // provide correct calls for virtual functions !!! + return 0; } -// Destructor. -ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) +int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) { - // stop asynch accept task - this->get_asynch_pseudo_task().stop (); + if (aiocb_list_ == 0) // already deleted + return 0; + + // try to cancel all uncomlpeted operarion + // POSIX systems may have hidden system threads + // that still can work with our aiocb's! + for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) + { + if (this->aiocb_list_[ai] != 0) // active operation + this->cancel_aiocb (result_list_[ai]); + } - delete_notify_manager (); + int num_pending = 0; - // delete all uncomlpeted operarion - // as nobody will notify client since now for (size_t ai = 0; ai < aiocb_list_max_size_; ai++) { - delete result_list_[ai]; - result_list_[ai] = 0; - aiocb_list_[ai] = 0; + if (this->aiocb_list_[ai] == 0 ) // not active operation + continue; + + // Get the error and return status of the aio_ operation. + int error_status = 0; + int return_status = 0; + int flg_completed = this->get_result_status (result_list_[ai], + error_status, + return_status); + + //don't delete uncompleted AIOCB's + if (flg_completed == 0) // not completed !!! + { + num_pending++; +#if 0 + char * errtxt = ACE_OS::strerror (error_status); + if (errtxt == 0) + errtxt ="?????????"; + + char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )? + "WRITE":"READ" ; + + + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"), + ai, + op, + error_status, + return_status, + errtxt )); +#endif /* 0 */ + } + else // completed , OK + { + delete this->result_list_[ai]; + this->result_list_[ai] = 0; + this->aiocb_list_[ai] = 0; + } } - delete [] aiocb_list_; - aiocb_list_ = 0; + //if it is not possible cancel some operation (num_pending > 0 ), + //we can we do only one thing -report about this + //and complain about POSIX implementation + //we know that we have memory leaks, + //but it is better than segmentation fault! + ACE_DEBUG ((LM_DEBUG, + ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") + ACE_LIB_TEXT(" number pending AIO=%d\n"), + num_pending + )); - delete [] result_list_; - result_list_ = 0; + delete [] this->aiocb_list_; + this->aiocb_list_ = 0; - clear_result_queue (); -} + delete [] this->result_list_; + this->result_list_ = 0; -void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h) -{ - notify_pipe_read_handle_ = h; + return (num_pending == 0 ? 0 : -1); + // ?? or just always return 0; } void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () { - long max_os_aio_num = ACE_OS ::sysconf (_SC_AIO_MAX); + long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX); // Define max limit AIO's for concrete OS // -1 means that there is no limit, but it is not true // (example, SunOS 5.6) - if (max_os_aio_num > 0 - && aiocb_list_max_size_ > (unsigned long) max_os_aio_num - ) + if (max_os_aio_num > 0 && + aiocb_list_max_size_ > (unsigned long) max_os_aio_num) aiocb_list_max_size_ = max_os_aio_num; #if defined (HPUX) @@ -729,7 +825,7 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () // it has a limit 256 max elements for aio_suspend () // It is a pity, but ... - long max_os_listio_num = ACE_OS ::sysconf (_SC_AIO_LISTIO_MAX); + long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX); if (max_os_listio_num > 0 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num) aiocb_list_max_size_ = max_os_listio_num; @@ -762,6 +858,39 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n", aiocb_list_max_size_)); +#if defined(__sgi) + + ACE_DEBUG((LM_DEBUG, + ACE_LIB_TEXT( "SGI IRIX specific: aio_init!\n"))); + +//typedef struct aioinit { +// int aio_threads; /* The number of aio threads to start (5) */ +// int aio_locks; /* Initial number of preallocated locks (3) */ +// int aio_num; /* estimated total simultanious aiobc structs (1000) */ +// int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */ +// int aio_debug; /* turn on debugging (0) */ +// int aio_numusers; /* max number of user sprocs making aio_* calls (5) */ +// int aio_reserved[3]; +//} aioinit_t; + + aioinit_t aioinit; + + aioinit.aio_threads = 10; /* The number of aio threads to start (5) */ + aioinit.aio_locks = 20; /* Initial number of preallocated locks (3) */ + /* estimated total simultaneous aiobc structs (1000) */ + aioinit.aio_num = aiocb_list_max_size_; + aioinit.aio_usedba = 0; /* Try to use DBA for raw IO in lio_listio (0) */ + aioinit.aio_debug = 0; /* turn on debugging (0) */ + aioinit.aio_numusers = 100; /* max number of user sprocs making aio_* calls (5) */ + aioinit.aio_reserved[0] = 0; + aioinit.aio_reserved[1] = 0; + aioinit.aio_reserved[2] = 0; + + aio_sgi_init (&aioinit); + +#endif + + return; } void @@ -834,7 +963,10 @@ ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result) "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"), -1); - this->notify_completion (sig_num); + // let try not to overflow signal queue or notification pipe + + if (this->result_queue_.size () == 1) + this->notify_completion (sig_num); return 0; } @@ -1054,6 +1186,47 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) return retval > 0 ? 1 : 0; } +int +ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, + int &error_status, + int &return_status) +{ + return_status = 0; + + // Get the error status of the aio_ operation. + error_status = aio_error (asynch_result); + +#if 0 + if (error_status == -1) // <aio_error> itself has failed. + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::get_result_status:" + "<aio_error> has failed\n")); +#endif /* 0 */ + + if (error_status == EINPROGRESS) + { + return_status = 0; + return 0; // not completed + } + + return_status = aio_return (asynch_result); + + if (return_status < 0) + { + return_status = 0; // zero bytes transferred +#if 0 + if (error_status == 0) // nonsense + ACE_ERROR ((LM_ERROR, + "%N:%l:(%P | %t)::%p\n", + "ACE_POSIX_AIOCB_Proactor::get_result_status:" + "<aio_return> failed\n")); +#endif /* 0 */ + } + + return 1; // completed +} + ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, int &return_status, @@ -1082,30 +1255,9 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, if (aiocb_list_[index] == 0) // Dont process null blocks. continue; - // Get the error status of the aio_ operation. - error_status = aio_error (aiocb_list_[index]); - - if (error_status == -1) // <aio_error> itself has failed. - { - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_error> has failed\n")); - - break; - - // we should notify user, otherwise : - // memory leak for result and "hanging" user - // what was before skip this operation - - //aiocb_list_[index] = 0; - //result_list_[index] = 0; - //aiocb_list_cur_size_--; - //continue; - } - - // Continue the loop if <aio_> operation is still in progress. - if (error_status != EINPROGRESS) + if (0 != this->get_result_status (result_list_[index], + error_status, + return_status)) // completed break; } // end for @@ -1113,25 +1265,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, if (count == 0) // all processed , nothing found return asynch_result; - if (error_status == ECANCELED) - return_status = 0; - else if (error_status == -1) - return_status = 0; - else - return_status = aio_return (aiocb_list_[index]); - - if (return_status == -1) - { - return_status = 0; // zero bytes transferred - - if (error_status == 0) // nonsense - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::find_completed_aio:" - "<aio_return> failed\n")); - } - - asynch_result = result_list_[index]; aiocb_list_[index] = 0; @@ -1196,7 +1329,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul if (ret_val != 0) // No free slot { - errno = EAGAIN; + errno = EAGAIN; ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:(%P | %t)::\n" "register_and_start_aio: " @@ -1570,8 +1703,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { - // stop asynch accept task - this->get_asynch_pseudo_task().stop (); + this->close (); // @@ Enable the masked signals again. } |