diff options
-rw-r--r-- | ChangeLog | 30 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 30 | ||||
-rw-r--r-- | ace/Asynch_IO.cpp | 15 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 13 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 162 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 19 | ||||
-rw-r--r-- | ace/SUN_Proactor.cpp | 47 | ||||
-rw-r--r-- | ace/SUN_Proactor.h | 4 | ||||
-rw-r--r-- | ace/WIN32_Asynch_IO.cpp | 10 | ||||
-rw-r--r-- | tests/Proactor_Test.cpp | 260 |
10 files changed, 355 insertions, 235 deletions
diff --git a/ChangeLog b/ChangeLog index a88b17a63e6..6c437cbe240 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,33 @@ +Mon Oct 28 20:38:27 2002 Steve Huston <shuston@riverace.com> + + * ace/Asynch_IO.cpp (ACE_Service_Handler::addresses()): Don't + print addresses from here. It's inappropriate for a framework + to be printing things out without being asked to. + + * ace/POSIX_Asynch_IO.cpp (ACE_POSIX_Asynch_Read_Stream::read): + * ace/WIN32_Asynch_IO.cpp (ACE_WIN32_Asynch_Read_Stream::read): + Don't print a message for a 0-byte/no space read - set errno to + ENOSPC so the caller can figure out what's going on. + + * ace/SUN_Proactor.{h cpp}: + * ace/POSIX_Proactor.{h cpp} (ACE_POSIX_AIOCB_Proactor):Change + 'return_status' arg to get_result_status(), find_completed_aio() + from int to size_t and rename transfer_count; get_result_status() + takes care of sensing -1 count and changing to 0. Passing back + a size_t smooths the path from here through to the result object. + Removed application_specific_code() - reuse the one from + ACE_POSIX_Proactor - this one called it; remove the middle-man. + + * tests/Proactor_Test.cpp: Added addresses() method implementations + to print address with session IDs; helps to match Sender/Receiver + pairs in the log. Also added some logging of basic send/recv info + to help try to track down why this facility doesn't work well. + Added a check for comparable sends/receives when a session ends. + Added a warning if there are outstanding I/O when the session + ends. This probably should be an error, but I haven't thought + through it enough to go that far. For the SUN Proactor, use one + thread by default (not 3) - aiosuspend() is not MT safe. + Mon Oct 28 12:48:14 2002 Nanbor Wang <nanbor@cs.wustl.edu> * bin/PerlACE/Process_Unix.pm (Spawn): Return 0 when the function diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index a88b17a63e6..6c437cbe240 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,33 @@ +Mon Oct 28 20:38:27 2002 Steve Huston <shuston@riverace.com> + + * ace/Asynch_IO.cpp (ACE_Service_Handler::addresses()): Don't + print addresses from here. It's inappropriate for a framework + to be printing things out without being asked to. + + * ace/POSIX_Asynch_IO.cpp (ACE_POSIX_Asynch_Read_Stream::read): + * ace/WIN32_Asynch_IO.cpp (ACE_WIN32_Asynch_Read_Stream::read): + Don't print a message for a 0-byte/no space read - set errno to + ENOSPC so the caller can figure out what's going on. + + * ace/SUN_Proactor.{h cpp}: + * ace/POSIX_Proactor.{h cpp} (ACE_POSIX_AIOCB_Proactor):Change + 'return_status' arg to get_result_status(), find_completed_aio() + from int to size_t and rename transfer_count; get_result_status() + takes care of sensing -1 count and changing to 0. Passing back + a size_t smooths the path from here through to the result object. + Removed application_specific_code() - reuse the one from + ACE_POSIX_Proactor - this one called it; remove the middle-man. + + * tests/Proactor_Test.cpp: Added addresses() method implementations + to print address with session IDs; helps to match Sender/Receiver + pairs in the log. Also added some logging of basic send/recv info + to help try to track down why this facility doesn't work well. + Added a check for comparable sends/receives when a session ends. + Added a warning if there are outstanding I/O when the session + ends. This probably should be an error, but I haven't thought + through it enough to go that far. For the SUN Proactor, use one + thread by default (not 3) - aiosuspend() is not MT safe. + Mon Oct 28 12:48:14 2002 Nanbor Wang <nanbor@cs.wustl.edu> * bin/PerlACE/Process_Unix.pm (Spawn): Return 0 when the function diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 6149f8daf1e..25c7610c60a 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -1197,20 +1197,9 @@ ACE_Service_Handler::~ACE_Service_Handler (void) } void -ACE_Service_Handler::addresses (const ACE_INET_Addr &remote_address, - const ACE_INET_Addr &local_address) +ACE_Service_Handler::addresses (const ACE_INET_Addr & /* remote_address */, + const ACE_INET_Addr & /* local_address */ ) { - // Default behavior is to print out the addresses. - ACE_TCHAR local_address_buf[BUFSIZ], remote_address_buf[BUFSIZ]; - if (local_address.addr_to_string (local_address_buf, sizeof local_address_buf) == -1) - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("can't obtain local_address's address string"))); - - if (remote_address.addr_to_string (remote_address_buf, sizeof remote_address_buf) == -1) - ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("can't obtain remote_address's address string"))); - - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("On fd %d\n"), this->handle ())); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("local address %s\n"), local_address_buf)); - ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("remote address %s\n"), remote_address_buf)); } void diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 8ecb78bba14..6147e8cbb58 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -370,15 +370,14 @@ ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block, int signal_number) { size_t space = message_block.space (); - if ( bytes_to_read > space ) + if (bytes_to_read > space) bytes_to_read=space; - if ( bytes_to_read == 0 ) - ACE_ERROR_RETURN - ((LM_ERROR, - ACE_LIB_TEXT ("ACE_POSIX_Asynch_Read_Stream::read:") - ACE_LIB_TEXT ("Attempt to read 0 bytes or no space in the message block\n")), - -1); + if (bytes_to_read == 0) + { + errno = ENOSPC; + return -1; + } // Create the Asynch_Result. ACE_POSIX_Asynch_Read_Stream_Result *result = 0; diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 4fc723e56fa..63e7c1ea193 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -757,10 +757,10 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) // Get the error and return status of the aio_ operation. int error_status = 0; - int return_status = 0; + size_t transfer_count = 0; int flg_completed = this->get_result_status (result_list_[ai], error_status, - return_status); + transfer_count); //don't delete uncompleted AIOCB's if (flg_completed == 0) // not completed !!! @@ -776,12 +776,12 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"), - ai, - op, - error_status, - return_status, - errtxt )); + ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"), + ai, + op, + error_status, + transfer_count, + errtxt)); #endif /* 0 */ } else // completed , OK @@ -792,16 +792,16 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void) } } - //if it is not possible cancel some operation (num_pending > 0 ), - //we can we do only one thing -report about this - //and complain about POSIX implementation - //we know that we have memory leaks, - //but it is better than segmentation fault! - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") - ACE_LIB_TEXT(" number pending AIO=%d\n"), - num_pending - )); + // If it is not possible cancel some operation (num_pending > 0 ), + // we can do only one thing -report about this + // and complain about POSIX implementation. + // We know that we have memory leaks, but it is better than + // segmentation fault! + ACE_DEBUG + ((LM_DEBUG, + ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n") + ACE_LIB_TEXT(" number pending AIO=%d\n"), + num_pending)); delete [] this->aiocb_list_; this->aiocb_list_ = 0; @@ -1160,13 +1160,13 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) size_t index = 0; size_t count = aiocb_list_max_size_; // max number to iterate int error_status = 0; - int return_status = 0; + size_t transfer_count = 0; for (;; retval++) { ACE_POSIX_Asynch_Result *asynch_result = find_completed_aio (error_status, - return_status, + transfer_count, index, count); @@ -1175,9 +1175,9 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) // Call the application code. this->application_specific_code (asynch_result, - return_status, // Bytes transferred. - 0, // No completion key. - error_status); // Error + transfer_count, + 0, // No completion key. + error_status); } } @@ -1188,49 +1188,27 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds) } int -ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, +ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result, int &error_status, - int &return_status) + size_t &transfer_count) { - return_status = 0; + transfer_count = 0; // Get the error status of the aio_ operation. error_status = aio_error (asynch_result); + if (error_status == EINPROGRESS) + return 0; // not completed -#if 0 - if (error_status == -1) // <aio_error> itself has failed. - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::get_result_status:" - "<aio_error> has failed\n")); -#endif /* 0 */ - - if (error_status == EINPROGRESS) - { - return_status = 0; - return 0; // not completed - } - - return_status = aio_return (asynch_result); - - if (return_status < 0) - { - return_status = 0; // zero bytes transferred -#if 0 - if (error_status == 0) // nonsense - ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_POSIX_AIOCB_Proactor::get_result_status:" - "<aio_return> failed\n")); -#endif /* 0 */ - } - - return 1; // completed + ssize_t op_return = aio_return (asynch_result); + if (op_return > 0) + transfer_count = ACE_static_cast (size_t, op_return); + // else transfer_count is already 0, error_status reports the error. + return 1; // completed } ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, - int &return_status, + size_t &transfer_count, size_t &index, size_t &count) { @@ -1241,12 +1219,8 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, ACE_POSIX_Asynch_Result *asynch_result = 0; - error_status = 0; - return_status= 0; - if (num_started_aio_ == 0) // save time - return asynch_result; - + return 0; for (; count > 0; index++ , count--) { @@ -1258,14 +1232,13 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, if (0 != this->get_result_status (result_list_[index], error_status, - return_status)) // completed + transfer_count)) // completed break; } // end for if (count == 0) // all processed , nothing found - return asynch_result; - + return 0; asynch_result = result_list_[index]; aiocb_list_[index] = 0; @@ -1273,7 +1246,7 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, aiocb_list_cur_size_--; num_started_aio_--; // decrement count active aios - index++; // for next iteration + index++; // for next iteration count--; // for next iteration this->start_deferred_aio (); @@ -1283,17 +1256,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status, return asynch_result; } -void -ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, - size_t bytes_transferred, - const void *completion_key, - u_long error) -{ - ACE_POSIX_Proactor::application_specific_code (asynch_result, - bytes_transferred, - completion_key, - error); -} int ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *result, @@ -1329,27 +1291,22 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul if (ret_val != 0) // No free slot { errno = EAGAIN; - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t)::\n" - "register_and_start_aio: " - "No space to store the <aio>info\n"), - -1); + return -1; } // Find a free slot and store. - ret_val = allocate_aio_slot (result); + ssize_t slot = allocate_aio_slot (result); - if (ret_val < 0) + if (slot < 0) return -1; - size_t index = ACE_static_cast (size_t, ret_val); + size_t index = ACE_static_cast (size_t, slot); result_list_[index] = result; //Store result ptr anyway aiocb_list_cur_size_++; ret_val = start_aio (result); - switch (ret_val) { case 0 : // started OK @@ -1373,7 +1330,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul return -1; } -int +ssize_t ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) { size_t i = 0; @@ -1407,11 +1364,10 @@ ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) "internal Proactor error 1\n"), -1); - //setup OS notification methods for this aio result->aio_sigevent.sigev_notify = SIGEV_NONE; - return ACE_static_cast (int, i); + return ACE_static_cast (ssize_t, i); } // start_aio has new return codes @@ -1432,30 +1388,30 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result) switch (result->aio_lio_opcode ) { case LIO_READ : - ptype = "read "; + ptype = ACE_LIB_TEXT ("read "); ret_val = aio_read (result); break; case LIO_WRITE : - ptype = "write"; + ptype = ACE_LIB_TEXT ("write"); ret_val = aio_write (result); break; default: - ptype = "?????"; + ptype = ACE_LIB_TEXT ("?????"); ret_val = -1; break; } if (ret_val == 0) - num_started_aio_ ++; + this->num_started_aio_++; else // if (ret_val == -1) { if (errno == EAGAIN) //Ok, it will be deferred AIO ret_val = 1; else ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::start_aio: aio_%s %p\n", - ptype, - "queueing failed\n")); + ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio_%s %p\n"), + ptype, + ACE_LIB_TEXT ("queueing failed\n"))); } return ret_val; @@ -1854,7 +1810,7 @@ ACE_POSIX_SIG_Proactor::mask_signals (const sigset_t *signals) const return 0; } -int +ssize_t ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) { size_t i = 0; @@ -1871,19 +1827,17 @@ ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result) "internal Proactor error 1\n"), -1); - int retval = ACE_static_cast (int, i); - // setup OS notification methods for this aio // store index!!, not pointer in signal info result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; result->aio_sigevent.sigev_signo = result->signal_number (); #if defined (__FreeBSD__) - result->aio_sigevent.sigev_value.sigval_int = retval; + result->aio_sigevent.sigev_value.sigval_int = ACE_static_cast (int, i); #else - result->aio_sigevent.sigev_value.sival_int = retval; + result->aio_sigevent.sigev_value.sival_int = ACE_static_cast (int, i); #endif /* __FreeBSD__ */ - return retval; + return ACE_static_cast (ssize_t, i); } int @@ -1916,7 +1870,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds) size_t index = 0; // start index to scan aiocb list size_t count = aiocb_list_max_size_; // max number to iterate int error_status = 0; - int return_status = 0; + size_t transfer_count = 0; int flg_aio = 0; // 1 if AIO Completion possible int flg_que = 0; // 1 if SIGQUEUE possible @@ -2020,7 +1974,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds) { ACE_POSIX_Asynch_Result *asynch_result = find_completed_aio (error_status, - return_status, + transfer_count, index, count); @@ -2029,7 +1983,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds) // Call the application code. this->application_specific_code (asynch_result, - return_status, // Bytes transferred. + transfer_count, 0, // No completion key. error_status); // Error } diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index 168b6fcd8c2..ba54a6f9431 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -358,9 +358,9 @@ protected: /// Check AIO for completion, error and result status /// Return: 1 - AIO completed , 0 - not completed yet - virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result, - int & error_status, - int & return_status ); + virtual int get_result_status (ACE_POSIX_Asynch_Result *asynch_result, + int &error_status, + size_t &transfer_count); /// Task to process pseudo-asynchronous operations ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task(); @@ -392,13 +392,6 @@ protected: */ virtual int handle_events (u_long milli_seconds); - /// We will call the base class's application_specific_code from - /// here. - void application_specific_code (ACE_POSIX_Asynch_Result *asynch_result, - size_t bytes_transferred, - const void *completion_key, - u_long error); - virtual int register_and_start_aio (ACE_POSIX_Asynch_Result *result, int op); @@ -413,12 +406,12 @@ protected: /// Extract the results of aio. ACE_POSIX_Asynch_Result *find_completed_aio (int &error_status, - int &return_status, + size_t &transfer_count, size_t &index, size_t &count); /// Find free slot to store result and aiocb pointer - virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result); + virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result); /// Notify queue of "post_completed" ACE_POSIX_Asynch_Results @@ -573,7 +566,7 @@ protected: */ /// Find free slot to store result and aiocb pointer - virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result); + virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result); /// Notify queue of "post_completed" ACE_POSIX_Asynch_Results diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp index ffeee0fcc9c..5a53c4f8be9 100644 --- a/ace/SUN_Proactor.cpp +++ b/ace/SUN_Proactor.cpp @@ -120,22 +120,21 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds) else { int error_status = 0; - int return_status = 0; + size_t transfer_count = 0; ACE_POSIX_Asynch_Result *asynch_result = find_completed_aio (result, error_status, - return_status); + transfer_count); if (asynch_result != 0) { // Call the application code. this->application_specific_code (asynch_result, - return_status, // Bytes transferred. - 0, // No completion key. - error_status); // Error - retval ++ ; - + transfer_count, + 0, // No completion key. + error_status); // Error + retval++; } } @@ -149,12 +148,12 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds) int ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, int &error_status, - int &return_status) + size_t &transfer_count) { // Get the error status of the aio_ operation. error_status = asynch_result->aio_resultp.aio_errno; - return_status = asynch_result->aio_resultp.aio_return; + ssize_t op_return = asynch_result->aio_resultp.aio_return; // ****** from Sun man pages ********************* // Upon completion of the operation both aio_return and aio_errno @@ -163,27 +162,26 @@ ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, // so the client may detect a change in state // by initializing aio_return to this value. - if (return_status == AIO_INPROGRESS || error_status == EINPROGRESS) - { - return_status = 0; - return 0; // not completed - } + if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS) + return 0; // not completed if (error_status == -1) // should never be ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t)::%p\n", - "ACE_SUN_Proactor::get_result_status:" - "<aio_errno> has failed\n")); + ACE_LIB_TEXT ("%N:%l:(%P | %t)::%p\n"), + ACE_LIB_TEXT ("ACE_SUN_Proactor::get_result_status:") + ACE_LIB_TEXT ("<aio_errno> has failed\n"))); - if (return_status < 0) + if (op_return < 0) { - return_status = 0; // zero bytes transferred + transfer_count = 0; // zero bytes transferred if (error_status == 0) // nonsense ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", "ACE_SUN_Proactor::get_result_status:" "<aio_return> failed\n")); } + else + transfer_count = ACE_static_cast (size_t, op_return); return 1; // completed } @@ -191,18 +189,18 @@ ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result, ACE_POSIX_Asynch_Result * ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, int &error_status, - int &return_status) + size_t &transfer_count) { ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0)); size_t ai; error_status = -1; - return_status = 0; + transfer_count = 0; // we call find_completed_aio always with result != 0 for (ai = 0; ai < aiocb_list_max_size_; ai++) - if (aiocb_list_[ai] !=0 && //check for non zero + if (aiocb_list_[ai] != 0 && //check for non zero result == &aiocb_list_[ai]->aio_resultp) break; @@ -213,7 +211,7 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, if (this->get_result_status (asynch_result, error_status, - return_status) == 0) + transfer_count) == 0) { // should never be ACE_ERROR ((LM_ERROR, "%N:%l:(%P | %t)::%p\n", @@ -222,9 +220,6 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result, return 0; } - if (return_status < 0) - return_status = 0; // zero bytes transferred - aiocb_list_[ai] = 0; result_list_[ai] = 0; aiocb_list_cur_size_--; diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h index c1ddbaf0cbc..9a08cc9e7f4 100644 --- a/ace/SUN_Proactor.h +++ b/ace/SUN_Proactor.h @@ -104,12 +104,12 @@ protected: /// Return: 1 - AIO completed , 0 - not completed yet virtual int get_result_status (ACE_POSIX_Asynch_Result* asynch_result, int &error_status, - int &return_status); + size_t &transfer_count); /// Extract the results of aio. ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result, int &error_status, - int &return_status); + size_t &transfer_count); /// From ACE_POSIX_AIOCB_Proactor. /// Attempt to cancel running request diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp index 3a14601626c..c60519ef015 100644 --- a/ace/WIN32_Asynch_IO.cpp +++ b/ace/WIN32_Asynch_IO.cpp @@ -374,12 +374,10 @@ ACE_WIN32_Asynch_Read_Stream::read (ACE_Message_Block &message_block, bytes_to_read = space; if (bytes_to_read == 0) - ACE_ERROR_RETURN - ((LM_ERROR, - ACE_LIB_TEXT ("ACE_WIN32_Asynch_Read_Stream::read:") - ACE_LIB_TEXT ("Attempt to read 0 bytes or no space in the message block\n")), - -1); - + { + errno = ENOSPC; + return -1; + } // Create the Asynch_Result. ACE_WIN32_Asynch_Read_Stream_Result *result = 0; diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp index a99e8fffd49..2f7a4e00650 100644 --- a/tests/Proactor_Test.cpp +++ b/tests/Proactor_Test.cpp @@ -313,7 +313,7 @@ MyTask::stop () if (this->proactor_ != 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("End Proactor event loop\n"))); + ACE_TEXT (" (%t) Calling End Proactor event loop\n"))); ACE_Proactor::end_event_loop (); } @@ -360,6 +360,10 @@ public: long get_total_w (void) { return this->total_w_; } long get_total_r (void) { return this->total_r_; } + // This is called to pass the new connection's addresses. + virtual void addresses (const ACE_INET_Addr& peer, + const ACE_INET_Addr& local); + /// This is called after the new connection has been accepted. virtual void open (ACE_HANDLE handle, ACE_Message_Block &message_block); @@ -391,12 +395,12 @@ private: ACE_HANDLE handle_; ACE_SYNCH_MUTEX lock_; - long io_count_; + long io_count_; // Number of currently outstanding I/O requests int flg_cancel_; - size_t total_snd_; - size_t total_rcv_; - long total_w_; - long total_r_; + size_t total_snd_; // Number of bytes successfully sent + size_t total_rcv_; // Number of bytes successfully received + long total_w_; // Number of write operations + long total_r_; // Number of read operations }; class Acceptor : public ACE_Asynch_Acceptor<Receiver> @@ -492,7 +496,8 @@ Acceptor::on_new_receiver (Receiver & rcvr) this->sessions_++; this->list_receivers_[rcvr.index_] = &rcvr; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"), + ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"), + rcvr.index_, this->sessions_)); } @@ -513,22 +518,9 @@ Acceptor::on_delete_receiver (Receiver & rcvr) && this->list_receivers_[rcvr.index_] == &rcvr) this->list_receivers_[rcvr.index_] = 0; - ACE_TCHAR bufs [256]; - ACE_TCHAR bufr [256]; - - ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"), - rcvr.get_total_snd (), - rcvr.get_total_w ()); - - ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"), - rcvr.get_total_rcv (), - rcvr.get_total_r ()); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"), rcvr.index_, - bufs, - bufr, this->sessions_)); } @@ -571,6 +563,41 @@ Receiver::Receiver (Acceptor * acceptor, int index) Receiver::~Receiver (void) { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ") + ACE_TEXT ("%d recvs (%d bytes)\n"), + this->index_, + this->total_w_, this->total_snd_, + this->total_r_, this->total_rcv_)); + if (this->io_count_ != 0) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("(%t) Receiver %d deleted with ") + ACE_TEXT ("%d I/O outstanding\n"), + this->index_, + this->io_count_)); + + // This test bounces data back and forth between Senders and Receivers. + // Therefore, if there was significantly more data in one direction, that's + // a problem. Remember, the byte counts are unsigned values. + int issue_data_warning = 0; + if (this->total_snd_ > this->total_rcv_) + { + if (this->total_rcv_ == 0) + issue_data_warning = 1; + else if (this->total_snd_ / this->total_rcv_ > 2) + issue_data_warning = 1; + } + else + { + if (this->total_snd_ == 0) + issue_data_warning = 1; + else if (this->total_rcv_ / this->total_snd_ > 2) + issue_data_warning = 1; + } + if (issue_data_warning) + ACE_DEBUG ((LM_WARNING, + ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); + if (this->acceptor_ != 0) this->acceptor_->on_delete_receiver (*this); @@ -594,20 +621,44 @@ Receiver::cancel () void +Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&) +{ + ACE_TCHAR str[256]; + if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d connection from %s\n"), + this->index_, + str)); + else + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), + this->index_, + ACE_TEXT ("addr_to_string"))); + return; +} + + +void Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) { { ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); this->handle_ = handle; + int nodelay = 1; + ACE_SOCK_Stream option_setter (handle); + if (-1 == option_setter.set_option (SOL_SOCKET, + TCP_NODELAY, + &nodelay, + sizeof (nodelay))) // Don't buffer serial sends. + ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); if (this->ws_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); else if (this->rs_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); else this->initiate_read_stream (); @@ -634,7 +685,7 @@ Receiver::initiate_read_stream (void) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")), -1); } @@ -657,7 +708,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), + ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), -1); } @@ -665,7 +716,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) { mb.release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write")), -1); } @@ -693,7 +744,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"), + ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"), this->index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -730,6 +781,13 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d: read %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } if (result.error () == 0 && result.bytes_transferred () > 0) { @@ -770,7 +828,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"), + ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"), this->index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -807,6 +865,13 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } mb.release (); @@ -848,6 +913,10 @@ public: long get_total_w (void) { return this->total_w_; } long get_total_r (void) { return this->total_r_; } + // This is called to pass the new connection's addresses. + virtual void addresses (const ACE_INET_Addr& peer, + const ACE_INET_Addr& local); + virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result); // This is called when asynchronous reads from the socket complete @@ -969,7 +1038,8 @@ Connector::on_new_sender (Sender &sndr) this->sessions_++; this->list_senders_[sndr.index_] = &sndr; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Sender::CTOR sessions_ = %d\n"), + ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"), + sndr.index_, this->sessions_)); } @@ -989,22 +1059,9 @@ Connector::on_delete_sender (Sender &sndr) && this->list_senders_[sndr.index_] == &sndr) this->list_senders_[sndr.index_] = 0; - ACE_TCHAR bufs [256]; - ACE_TCHAR bufr [256]; - - ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"), - sndr.get_total_snd (), - sndr.get_total_w ()); - - ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"), - sndr.get_total_rcv (), - sndr.get_total_r ()); - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"), + ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"), sndr.index_, - bufs, - bufr, this->sessions_)); } @@ -1052,7 +1109,7 @@ Connector::start (const ACE_INET_Addr& addr, int num) if (this->open (1, 0, 1) != 0) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("(%t) %p\n"), ACE_LIB_TEXT ("Connector::open failed"))); return rc; } @@ -1062,8 +1119,8 @@ Connector::start (const ACE_INET_Addr& addr, int num) if (this->connect (addr) != 0) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("%p\n"), - ACE_LIB_TEXT ("Connector::connect failed"))); + ACE_TEXT ("(%t) %p\n"), + ACE_TEXT ("Connector::connect failed"))); break; } } @@ -1088,6 +1145,40 @@ Sender::Sender (Connector * connector, int index) Sender::~Sender (void) { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ") + ACE_TEXT ("%d recvs (%d bytes)\n"), + this->index_, + this->total_w_, this->total_snd_, + this->total_r_, this->total_rcv_)); + if (this->io_count_ != 0) + ACE_ERROR ((LM_WARNING, + ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"), + this->index_, + this->io_count_)); + + // This test bounces data back and forth between Senders and Receivers. + // Therefore, if there was significantly more data in one direction, that's + // a problem. Remember, the byte counts are unsigned values. + int issue_data_warning = 0; + if (this->total_snd_ > this->total_rcv_) + { + if (this->total_rcv_ == 0) + issue_data_warning = 1; + else if (this->total_snd_ / this->total_rcv_ > 2) + issue_data_warning = 1; + } + else + { + if (this->total_snd_ == 0) + issue_data_warning = 1; + else if (this->total_rcv_ / this->total_snd_ > 2) + issue_data_warning = 1; + } + if (issue_data_warning) + ACE_DEBUG ((LM_WARNING, + ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); + if (this->connector_ != 0) this->connector_->on_delete_sender (*this); @@ -1113,22 +1204,46 @@ Sender::cancel () void +Sender::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local) +{ + ACE_TCHAR str[256]; + if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d connected on %s\n"), + this->index_, + str)); + else + ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), + this->index_, + ACE_TEXT ("addr_to_string"))); + return; +} + + +void Sender::open (ACE_HANDLE handle, ACE_Message_Block &) { { ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); this->handle_ = handle; + int nodelay = 1; + ACE_SOCK_Stream option_setter (handle); + if (option_setter.set_option (SOL_SOCKET, + TCP_NODELAY, + &nodelay, + sizeof (nodelay))) // Don't buffer serial sends. + ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); // Open ACE_Asynch_Write_Stream if (this->ws_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open"))); // Open ACE_Asynch_Read_Stream else if (this->rs_.open (*this, this->handle_) == -1) ACE_ERROR ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open"))); else if (this->initiate_write_stream () == 0) @@ -1185,7 +1300,7 @@ Sender::initiate_write_stream (void) { mb1->release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")), -1); } @@ -1203,7 +1318,7 @@ Sender::initiate_write_stream (void) { mb->release (); ACE_ERROR_RETURN((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Stream::write")), -1); } @@ -1267,7 +1382,7 @@ Sender::initiate_read_stream (void) { mb1->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")), -1); } @@ -1290,7 +1405,7 @@ Sender::initiate_read_stream (void) { mb->release (); ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("%p\n"), + ACE_TEXT ("(%t) %p\n"), ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::read")), -1); } @@ -1316,7 +1431,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"), + ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"), index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -1392,6 +1507,13 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } mb.release (); @@ -1399,9 +1521,11 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { this->total_snd_ += result.bytes_transferred (); - if (duplex != 0 && // full duplex, continue write - (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control - this->initiate_write_stream (); + if (duplex != 0) // full duplex, continue write + { + if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control + this->initiate_write_stream (); + } else // half-duplex read reply, after read we will start write this->initiate_read_stream (); } @@ -1428,7 +1552,7 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) LogLocker log_lock; ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"), + ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"), index_)); ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s = %d\n"), @@ -1487,6 +1611,13 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("**** end of message ****************\n"))); } + else + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"), + this->index_, + result.bytes_transferred ())); + } mb.release (); @@ -1591,15 +1722,16 @@ parse_args (int argc, ACE_TCHAR *argv[]) max_aio_operations = 512; // POSIX Proactor params #if defined (sun) proactor_type = SUN; // Proactor type for SunOS + threads = 1; // aiosuspend() not MT Safe. #else proactor_type = DEFAULT; // Proactor type = default -#endif threads = 3; // size of Proactor thread pool +#endif #if defined(__sgi) || defined (ACE_LINUX_COMMON_H) ACE_DEBUG (( LM_DEBUG, "Weak AIO implementation, test will work with 1 client")); - senders = 1; // number of senders + senders = 1; // number of senders #else senders = 20; // number of senders #endif @@ -1708,27 +1840,27 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) //Cancel all pending AIO on Connector and Senders ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"), + ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"), connector.get_number_sessions () )); //connector.cancel_all (); //Cancel all pending AIO on Acceptor And Receivers ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"), + ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"), acceptor.get_number_sessions () )); //acceptor.cancel_all (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Stop Thread Pool Task\n") + ACE_TEXT ("(%t) Stop Thread Pool Task\n") )); task1.stop (); // As Proactor event loop now is inactive it is safe to destroy all // Senders ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"), + ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"), connector.get_number_sessions () )); connector.stop (); @@ -1736,7 +1868,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[]) // As Proactor event loop now is inactive it is safe to destroy all // Receivers ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"), + ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"), acceptor.get_number_sessions () )); acceptor.stop (); |