summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog30
-rw-r--r--ChangeLogs/ChangeLog-03a30
-rw-r--r--ace/Asynch_IO.cpp15
-rw-r--r--ace/POSIX_Asynch_IO.cpp13
-rw-r--r--ace/POSIX_Proactor.cpp162
-rw-r--r--ace/POSIX_Proactor.h19
-rw-r--r--ace/SUN_Proactor.cpp47
-rw-r--r--ace/SUN_Proactor.h4
-rw-r--r--ace/WIN32_Asynch_IO.cpp10
-rw-r--r--tests/Proactor_Test.cpp260
10 files changed, 355 insertions, 235 deletions
diff --git a/ChangeLog b/ChangeLog
index a88b17a63e6..6c437cbe240 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,33 @@
+Mon Oct 28 20:38:27 2002 Steve Huston <shuston@riverace.com>
+
+ * ace/Asynch_IO.cpp (ACE_Service_Handler::addresses()): Don't
+ print addresses from here. It's inappropriate for a framework
+ to be printing things out without being asked to.
+
+ * ace/POSIX_Asynch_IO.cpp (ACE_POSIX_Asynch_Read_Stream::read):
+ * ace/WIN32_Asynch_IO.cpp (ACE_WIN32_Asynch_Read_Stream::read):
+ Don't print a message for a 0-byte/no space read - set errno to
+ ENOSPC so the caller can figure out what's going on.
+
+ * ace/SUN_Proactor.{h cpp}:
+ * ace/POSIX_Proactor.{h cpp} (ACE_POSIX_AIOCB_Proactor):Change
+ 'return_status' arg to get_result_status(), find_completed_aio()
+ from int to size_t and rename transfer_count; get_result_status()
+ takes care of sensing -1 count and changing to 0. Passing back
+ a size_t smooths the path from here through to the result object.
+ Removed application_specific_code() - reuse the one from
+ ACE_POSIX_Proactor - this one called it; remove the middle-man.
+
+ * tests/Proactor_Test.cpp: Added addresses() method implementations
+ to print address with session IDs; helps to match Sender/Receiver
+ pairs in the log. Also added some logging of basic send/recv info
+ to help try to track down why this facility doesn't work well.
+ Added a check for comparable sends/receives when a session ends.
+ Added a warning if there are outstanding I/O when the session
+ ends. This probably should be an error, but I haven't thought
+ through it enough to go that far. For the SUN Proactor, use one
+ thread by default (not 3) - aiosuspend() is not MT safe.
+
Mon Oct 28 12:48:14 2002 Nanbor Wang <nanbor@cs.wustl.edu>
* bin/PerlACE/Process_Unix.pm (Spawn): Return 0 when the function
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index a88b17a63e6..6c437cbe240 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,33 @@
+Mon Oct 28 20:38:27 2002 Steve Huston <shuston@riverace.com>
+
+ * ace/Asynch_IO.cpp (ACE_Service_Handler::addresses()): Don't
+ print addresses from here. It's inappropriate for a framework
+ to be printing things out without being asked to.
+
+ * ace/POSIX_Asynch_IO.cpp (ACE_POSIX_Asynch_Read_Stream::read):
+ * ace/WIN32_Asynch_IO.cpp (ACE_WIN32_Asynch_Read_Stream::read):
+ Don't print a message for a 0-byte/no space read - set errno to
+ ENOSPC so the caller can figure out what's going on.
+
+ * ace/SUN_Proactor.{h cpp}:
+ * ace/POSIX_Proactor.{h cpp} (ACE_POSIX_AIOCB_Proactor):Change
+ 'return_status' arg to get_result_status(), find_completed_aio()
+ from int to size_t and rename transfer_count; get_result_status()
+ takes care of sensing -1 count and changing to 0. Passing back
+ a size_t smooths the path from here through to the result object.
+ Removed application_specific_code() - reuse the one from
+ ACE_POSIX_Proactor - this one called it; remove the middle-man.
+
+ * tests/Proactor_Test.cpp: Added addresses() method implementations
+ to print address with session IDs; helps to match Sender/Receiver
+ pairs in the log. Also added some logging of basic send/recv info
+ to help try to track down why this facility doesn't work well.
+ Added a check for comparable sends/receives when a session ends.
+ Added a warning if there are outstanding I/O when the session
+ ends. This probably should be an error, but I haven't thought
+ through it enough to go that far. For the SUN Proactor, use one
+ thread by default (not 3) - aiosuspend() is not MT safe.
+
Mon Oct 28 12:48:14 2002 Nanbor Wang <nanbor@cs.wustl.edu>
* bin/PerlACE/Process_Unix.pm (Spawn): Return 0 when the function
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 6149f8daf1e..25c7610c60a 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -1197,20 +1197,9 @@ ACE_Service_Handler::~ACE_Service_Handler (void)
}
void
-ACE_Service_Handler::addresses (const ACE_INET_Addr &remote_address,
- const ACE_INET_Addr &local_address)
+ACE_Service_Handler::addresses (const ACE_INET_Addr & /* remote_address */,
+ const ACE_INET_Addr & /* local_address */ )
{
- // Default behavior is to print out the addresses.
- ACE_TCHAR local_address_buf[BUFSIZ], remote_address_buf[BUFSIZ];
- if (local_address.addr_to_string (local_address_buf, sizeof local_address_buf) == -1)
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("can't obtain local_address's address string")));
-
- if (remote_address.addr_to_string (remote_address_buf, sizeof remote_address_buf) == -1)
- ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("can't obtain remote_address's address string")));
-
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("On fd %d\n"), this->handle ()));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("local address %s\n"), local_address_buf));
- ACE_DEBUG ((LM_DEBUG, ACE_LIB_TEXT ("remote address %s\n"), remote_address_buf));
}
void
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index 8ecb78bba14..6147e8cbb58 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -370,15 +370,14 @@ ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
int signal_number)
{
size_t space = message_block.space ();
- if ( bytes_to_read > space )
+ if (bytes_to_read > space)
bytes_to_read=space;
- if ( bytes_to_read == 0 )
- ACE_ERROR_RETURN
- ((LM_ERROR,
- ACE_LIB_TEXT ("ACE_POSIX_Asynch_Read_Stream::read:")
- ACE_LIB_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
- -1);
+ if (bytes_to_read == 0)
+ {
+ errno = ENOSPC;
+ return -1;
+ }
// Create the Asynch_Result.
ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 4fc723e56fa..63e7c1ea193 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -757,10 +757,10 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
// Get the error and return status of the aio_ operation.
int error_status = 0;
- int return_status = 0;
+ size_t transfer_count = 0;
int flg_completed = this->get_result_status (result_list_[ai],
error_status,
- return_status);
+ transfer_count);
//don't delete uncompleted AIOCB's
if (flg_completed == 0) // not completed !!!
@@ -776,12 +776,12 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("slot=%d op=%s status=%d return=%d %s\n"),
- ai,
- op,
- error_status,
- return_status,
- errtxt ));
+ ACE_LIB_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
+ ai,
+ op,
+ error_status,
+ transfer_count,
+ errtxt));
#endif /* 0 */
}
else // completed , OK
@@ -792,16 +792,16 @@ int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
}
}
- //if it is not possible cancel some operation (num_pending > 0 ),
- //we can we do only one thing -report about this
- //and complain about POSIX implementation
- //we know that we have memory leaks,
- //but it is better than segmentation fault!
- ACE_DEBUG ((LM_DEBUG,
- ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
- ACE_LIB_TEXT(" number pending AIO=%d\n"),
- num_pending
- ));
+ // If it is not possible cancel some operation (num_pending > 0 ),
+ // we can do only one thing -report about this
+ // and complain about POSIX implementation.
+ // We know that we have memory leaks, but it is better than
+ // segmentation fault!
+ ACE_DEBUG
+ ((LM_DEBUG,
+ ACE_LIB_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
+ ACE_LIB_TEXT(" number pending AIO=%d\n"),
+ num_pending));
delete [] this->aiocb_list_;
this->aiocb_list_ = 0;
@@ -1160,13 +1160,13 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
size_t index = 0;
size_t count = aiocb_list_max_size_; // max number to iterate
int error_status = 0;
- int return_status = 0;
+ size_t transfer_count = 0;
for (;; retval++)
{
ACE_POSIX_Asynch_Result *asynch_result =
find_completed_aio (error_status,
- return_status,
+ transfer_count,
index,
count);
@@ -1175,9 +1175,9 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
// Call the application code.
this->application_specific_code (asynch_result,
- return_status, // Bytes transferred.
- 0, // No completion key.
- error_status); // Error
+ transfer_count,
+ 0, // No completion key.
+ error_status);
}
}
@@ -1188,49 +1188,27 @@ ACE_POSIX_AIOCB_Proactor::handle_events (u_long milli_seconds)
}
int
-ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
+ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
int &error_status,
- int &return_status)
+ size_t &transfer_count)
{
- return_status = 0;
+ transfer_count = 0;
// Get the error status of the aio_ operation.
error_status = aio_error (asynch_result);
+ if (error_status == EINPROGRESS)
+ return 0; // not completed
-#if 0
- if (error_status == -1) // <aio_error> itself has failed.
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::get_result_status:"
- "<aio_error> has failed\n"));
-#endif /* 0 */
-
- if (error_status == EINPROGRESS)
- {
- return_status = 0;
- return 0; // not completed
- }
-
- return_status = aio_return (asynch_result);
-
- if (return_status < 0)
- {
- return_status = 0; // zero bytes transferred
-#if 0
- if (error_status == 0) // nonsense
- ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_POSIX_AIOCB_Proactor::get_result_status:"
- "<aio_return> failed\n"));
-#endif /* 0 */
- }
-
- return 1; // completed
+ ssize_t op_return = aio_return (asynch_result);
+ if (op_return > 0)
+ transfer_count = ACE_static_cast (size_t, op_return);
+ // else transfer_count is already 0, error_status reports the error.
+ return 1; // completed
}
ACE_POSIX_Asynch_Result *
ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
- int &return_status,
+ size_t &transfer_count,
size_t &index,
size_t &count)
{
@@ -1241,12 +1219,8 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
ACE_POSIX_Asynch_Result *asynch_result = 0;
- error_status = 0;
- return_status= 0;
-
if (num_started_aio_ == 0) // save time
- return asynch_result;
-
+ return 0;
for (; count > 0; index++ , count--)
{
@@ -1258,14 +1232,13 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
if (0 != this->get_result_status (result_list_[index],
error_status,
- return_status)) // completed
+ transfer_count)) // completed
break;
} // end for
if (count == 0) // all processed , nothing found
- return asynch_result;
-
+ return 0;
asynch_result = result_list_[index];
aiocb_list_[index] = 0;
@@ -1273,7 +1246,7 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
aiocb_list_cur_size_--;
num_started_aio_--; // decrement count active aios
- index++; // for next iteration
+ index++; // for next iteration
count--; // for next iteration
this->start_deferred_aio ();
@@ -1283,17 +1256,6 @@ ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
return asynch_result;
}
-void
-ACE_POSIX_AIOCB_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
- size_t bytes_transferred,
- const void *completion_key,
- u_long error)
-{
- ACE_POSIX_Proactor::application_specific_code (asynch_result,
- bytes_transferred,
- completion_key,
- error);
-}
int
ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *result,
@@ -1329,27 +1291,22 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul
if (ret_val != 0) // No free slot
{
errno = EAGAIN;
- ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:(%P | %t)::\n"
- "register_and_start_aio: "
- "No space to store the <aio>info\n"),
- -1);
+ return -1;
}
// Find a free slot and store.
- ret_val = allocate_aio_slot (result);
+ ssize_t slot = allocate_aio_slot (result);
- if (ret_val < 0)
+ if (slot < 0)
return -1;
- size_t index = ACE_static_cast (size_t, ret_val);
+ size_t index = ACE_static_cast (size_t, slot);
result_list_[index] = result; //Store result ptr anyway
aiocb_list_cur_size_++;
ret_val = start_aio (result);
-
switch (ret_val)
{
case 0 : // started OK
@@ -1373,7 +1330,7 @@ ACE_POSIX_AIOCB_Proactor::register_and_start_aio (ACE_POSIX_Asynch_Result *resul
return -1;
}
-int
+ssize_t
ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
{
size_t i = 0;
@@ -1407,11 +1364,10 @@ ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
"internal Proactor error 1\n"),
-1);
-
//setup OS notification methods for this aio
result->aio_sigevent.sigev_notify = SIGEV_NONE;
- return ACE_static_cast (int, i);
+ return ACE_static_cast (ssize_t, i);
}
// start_aio has new return codes
@@ -1432,30 +1388,30 @@ ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result)
switch (result->aio_lio_opcode )
{
case LIO_READ :
- ptype = "read ";
+ ptype = ACE_LIB_TEXT ("read ");
ret_val = aio_read (result);
break;
case LIO_WRITE :
- ptype = "write";
+ ptype = ACE_LIB_TEXT ("write");
ret_val = aio_write (result);
break;
default:
- ptype = "?????";
+ ptype = ACE_LIB_TEXT ("?????");
ret_val = -1;
break;
}
if (ret_val == 0)
- num_started_aio_ ++;
+ this->num_started_aio_++;
else // if (ret_val == -1)
{
if (errno == EAGAIN) //Ok, it will be deferred AIO
ret_val = 1;
else
ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::start_aio: aio_%s %p\n",
- ptype,
- "queueing failed\n"));
+ ACE_LIB_TEXT ("%N:%l:(%P | %t)::start_aio: aio_%s %p\n"),
+ ptype,
+ ACE_LIB_TEXT ("queueing failed\n")));
}
return ret_val;
@@ -1854,7 +1810,7 @@ ACE_POSIX_SIG_Proactor::mask_signals (const sigset_t *signals) const
return 0;
}
-int
+ssize_t
ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
{
size_t i = 0;
@@ -1871,19 +1827,17 @@ ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
"internal Proactor error 1\n"),
-1);
- int retval = ACE_static_cast (int, i);
-
// setup OS notification methods for this aio
// store index!!, not pointer in signal info
result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
result->aio_sigevent.sigev_signo = result->signal_number ();
#if defined (__FreeBSD__)
- result->aio_sigevent.sigev_value.sigval_int = retval;
+ result->aio_sigevent.sigev_value.sigval_int = ACE_static_cast (int, i);
#else
- result->aio_sigevent.sigev_value.sival_int = retval;
+ result->aio_sigevent.sigev_value.sival_int = ACE_static_cast (int, i);
#endif /* __FreeBSD__ */
- return retval;
+ return ACE_static_cast (ssize_t, i);
}
int
@@ -1916,7 +1870,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds)
size_t index = 0; // start index to scan aiocb list
size_t count = aiocb_list_max_size_; // max number to iterate
int error_status = 0;
- int return_status = 0;
+ size_t transfer_count = 0;
int flg_aio = 0; // 1 if AIO Completion possible
int flg_que = 0; // 1 if SIGQUEUE possible
@@ -2020,7 +1974,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds)
{
ACE_POSIX_Asynch_Result *asynch_result =
find_completed_aio (error_status,
- return_status,
+ transfer_count,
index,
count);
@@ -2029,7 +1983,7 @@ ACE_POSIX_SIG_Proactor::handle_events (u_long milli_seconds)
// Call the application code.
this->application_specific_code (asynch_result,
- return_status, // Bytes transferred.
+ transfer_count,
0, // No completion key.
error_status); // Error
}
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index 168b6fcd8c2..ba54a6f9431 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -358,9 +358,9 @@ protected:
/// Check AIO for completion, error and result status
/// Return: 1 - AIO completed , 0 - not completed yet
- virtual int get_result_status ( ACE_POSIX_Asynch_Result* asynch_result,
- int & error_status,
- int & return_status );
+ virtual int get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
+ int &error_status,
+ size_t &transfer_count);
/// Task to process pseudo-asynchronous operations
ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task();
@@ -392,13 +392,6 @@ protected:
*/
virtual int handle_events (u_long milli_seconds);
- /// We will call the base class's application_specific_code from
- /// here.
- void application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
- size_t bytes_transferred,
- const void *completion_key,
- u_long error);
-
virtual int register_and_start_aio (ACE_POSIX_Asynch_Result *result,
int op);
@@ -413,12 +406,12 @@ protected:
/// Extract the results of aio.
ACE_POSIX_Asynch_Result *find_completed_aio (int &error_status,
- int &return_status,
+ size_t &transfer_count,
size_t &index,
size_t &count);
/// Find free slot to store result and aiocb pointer
- virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result);
+ virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result);
/// Notify queue of "post_completed" ACE_POSIX_Asynch_Results
@@ -573,7 +566,7 @@ protected:
*/
/// Find free slot to store result and aiocb pointer
- virtual int allocate_aio_slot (ACE_POSIX_Asynch_Result *result);
+ virtual ssize_t allocate_aio_slot (ACE_POSIX_Asynch_Result *result);
/// Notify queue of "post_completed" ACE_POSIX_Asynch_Results
diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp
index ffeee0fcc9c..5a53c4f8be9 100644
--- a/ace/SUN_Proactor.cpp
+++ b/ace/SUN_Proactor.cpp
@@ -120,22 +120,21 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds)
else
{
int error_status = 0;
- int return_status = 0;
+ size_t transfer_count = 0;
ACE_POSIX_Asynch_Result *asynch_result =
find_completed_aio (result,
error_status,
- return_status);
+ transfer_count);
if (asynch_result != 0)
{
// Call the application code.
this->application_specific_code (asynch_result,
- return_status, // Bytes transferred.
- 0, // No completion key.
- error_status); // Error
- retval ++ ;
-
+ transfer_count,
+ 0, // No completion key.
+ error_status); // Error
+ retval++;
}
}
@@ -149,12 +148,12 @@ ACE_SUN_Proactor::handle_events (u_long milli_seconds)
int
ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
int &error_status,
- int &return_status)
+ size_t &transfer_count)
{
// Get the error status of the aio_ operation.
error_status = asynch_result->aio_resultp.aio_errno;
- return_status = asynch_result->aio_resultp.aio_return;
+ ssize_t op_return = asynch_result->aio_resultp.aio_return;
// ****** from Sun man pages *********************
// Upon completion of the operation both aio_return and aio_errno
@@ -163,27 +162,26 @@ ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
// so the client may detect a change in state
// by initializing aio_return to this value.
- if (return_status == AIO_INPROGRESS || error_status == EINPROGRESS)
- {
- return_status = 0;
- return 0; // not completed
- }
+ if (error_status == EINPROGRESS || op_return == AIO_INPROGRESS)
+ return 0; // not completed
if (error_status == -1) // should never be
ACE_ERROR ((LM_ERROR,
- "%N:%l:(%P | %t)::%p\n",
- "ACE_SUN_Proactor::get_result_status:"
- "<aio_errno> has failed\n"));
+ ACE_LIB_TEXT ("%N:%l:(%P | %t)::%p\n"),
+ ACE_LIB_TEXT ("ACE_SUN_Proactor::get_result_status:")
+ ACE_LIB_TEXT ("<aio_errno> has failed\n")));
- if (return_status < 0)
+ if (op_return < 0)
{
- return_status = 0; // zero bytes transferred
+ transfer_count = 0; // zero bytes transferred
if (error_status == 0) // nonsense
ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
"ACE_SUN_Proactor::get_result_status:"
"<aio_return> failed\n"));
}
+ else
+ transfer_count = ACE_static_cast (size_t, op_return);
return 1; // completed
}
@@ -191,18 +189,18 @@ ACE_SUN_Proactor::get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
ACE_POSIX_Asynch_Result *
ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
int &error_status,
- int &return_status)
+ size_t &transfer_count)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, mutex_, 0));
size_t ai;
error_status = -1;
- return_status = 0;
+ transfer_count = 0;
// we call find_completed_aio always with result != 0
for (ai = 0; ai < aiocb_list_max_size_; ai++)
- if (aiocb_list_[ai] !=0 && //check for non zero
+ if (aiocb_list_[ai] != 0 && //check for non zero
result == &aiocb_list_[ai]->aio_resultp)
break;
@@ -213,7 +211,7 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
if (this->get_result_status (asynch_result,
error_status,
- return_status) == 0)
+ transfer_count) == 0)
{ // should never be
ACE_ERROR ((LM_ERROR,
"%N:%l:(%P | %t)::%p\n",
@@ -222,9 +220,6 @@ ACE_SUN_Proactor::find_completed_aio (aio_result_t *result,
return 0;
}
- if (return_status < 0)
- return_status = 0; // zero bytes transferred
-
aiocb_list_[ai] = 0;
result_list_[ai] = 0;
aiocb_list_cur_size_--;
diff --git a/ace/SUN_Proactor.h b/ace/SUN_Proactor.h
index c1ddbaf0cbc..9a08cc9e7f4 100644
--- a/ace/SUN_Proactor.h
+++ b/ace/SUN_Proactor.h
@@ -104,12 +104,12 @@ protected:
/// Return: 1 - AIO completed , 0 - not completed yet
virtual int get_result_status (ACE_POSIX_Asynch_Result* asynch_result,
int &error_status,
- int &return_status);
+ size_t &transfer_count);
/// Extract the results of aio.
ACE_POSIX_Asynch_Result *find_completed_aio (aio_result_t *result,
int &error_status,
- int &return_status);
+ size_t &transfer_count);
/// From ACE_POSIX_AIOCB_Proactor.
/// Attempt to cancel running request
diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp
index 3a14601626c..c60519ef015 100644
--- a/ace/WIN32_Asynch_IO.cpp
+++ b/ace/WIN32_Asynch_IO.cpp
@@ -374,12 +374,10 @@ ACE_WIN32_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
bytes_to_read = space;
if (bytes_to_read == 0)
- ACE_ERROR_RETURN
- ((LM_ERROR,
- ACE_LIB_TEXT ("ACE_WIN32_Asynch_Read_Stream::read:")
- ACE_LIB_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
- -1);
-
+ {
+ errno = ENOSPC;
+ return -1;
+ }
// Create the Asynch_Result.
ACE_WIN32_Asynch_Read_Stream_Result *result = 0;
diff --git a/tests/Proactor_Test.cpp b/tests/Proactor_Test.cpp
index a99e8fffd49..2f7a4e00650 100644
--- a/tests/Proactor_Test.cpp
+++ b/tests/Proactor_Test.cpp
@@ -313,7 +313,7 @@ MyTask::stop ()
if (this->proactor_ != 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("End Proactor event loop\n")));
+ ACE_TEXT (" (%t) Calling End Proactor event loop\n")));
ACE_Proactor::end_event_loop ();
}
@@ -360,6 +360,10 @@ public:
long get_total_w (void) { return this->total_w_; }
long get_total_r (void) { return this->total_r_; }
+ // This is called to pass the new connection's addresses.
+ virtual void addresses (const ACE_INET_Addr& peer,
+ const ACE_INET_Addr& local);
+
/// This is called after the new connection has been accepted.
virtual void open (ACE_HANDLE handle,
ACE_Message_Block &message_block);
@@ -391,12 +395,12 @@ private:
ACE_HANDLE handle_;
ACE_SYNCH_MUTEX lock_;
- long io_count_;
+ long io_count_; // Number of currently outstanding I/O requests
int flg_cancel_;
- size_t total_snd_;
- size_t total_rcv_;
- long total_w_;
- long total_r_;
+ size_t total_snd_; // Number of bytes successfully sent
+ size_t total_rcv_; // Number of bytes successfully received
+ long total_w_; // Number of write operations
+ long total_r_; // Number of read operations
};
class Acceptor : public ACE_Asynch_Acceptor<Receiver>
@@ -492,7 +496,8 @@ Acceptor::on_new_receiver (Receiver & rcvr)
this->sessions_++;
this->list_receivers_[rcvr.index_] = &rcvr;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
+ ACE_TEXT ("(%t) Acceptor: receiver %d up; now have %d.\n"),
+ rcvr.index_,
this->sessions_));
}
@@ -513,22 +518,9 @@ Acceptor::on_delete_receiver (Receiver & rcvr)
&& this->list_receivers_[rcvr.index_] == &rcvr)
this->list_receivers_[rcvr.index_] = 0;
- ACE_TCHAR bufs [256];
- ACE_TCHAR bufr [256];
-
- ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"),
- rcvr.get_total_snd (),
- rcvr.get_total_w ());
-
- ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"),
- rcvr.get_total_rcv (),
- rcvr.get_total_r ());
-
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Receiver::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ ACE_TEXT ("(%t) Acceptor: receiver %d gone; %d remain\n"),
rcvr.index_,
- bufs,
- bufr,
this->sessions_));
}
@@ -571,6 +563,41 @@ Receiver::Receiver (Acceptor * acceptor, int index)
Receiver::~Receiver (void)
{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ")
+ ACE_TEXT ("%d recvs (%d bytes)\n"),
+ this->index_,
+ this->total_w_, this->total_snd_,
+ this->total_r_, this->total_rcv_));
+ if (this->io_count_ != 0)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("(%t) Receiver %d deleted with ")
+ ACE_TEXT ("%d I/O outstanding\n"),
+ this->index_,
+ this->io_count_));
+
+ // This test bounces data back and forth between Senders and Receivers.
+ // Therefore, if there was significantly more data in one direction, that's
+ // a problem. Remember, the byte counts are unsigned values.
+ int issue_data_warning = 0;
+ if (this->total_snd_ > this->total_rcv_)
+ {
+ if (this->total_rcv_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_snd_ / this->total_rcv_ > 2)
+ issue_data_warning = 1;
+ }
+ else
+ {
+ if (this->total_snd_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_rcv_ / this->total_snd_ > 2)
+ issue_data_warning = 1;
+ }
+ if (issue_data_warning)
+ ACE_DEBUG ((LM_WARNING,
+ ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
+
if (this->acceptor_ != 0)
this->acceptor_->on_delete_receiver (*this);
@@ -594,20 +621,44 @@ Receiver::cancel ()
void
+Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
+{
+ ACE_TCHAR str[256];
+ if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Receiver %d connection from %s\n"),
+ this->index_,
+ str));
+ else
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
+ this->index_,
+ ACE_TEXT ("addr_to_string")));
+ return;
+}
+
+
+void
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
this->handle_ = handle;
+ int nodelay = 1;
+ ACE_SOCK_Stream option_setter (handle);
+ if (-1 == option_setter.set_option (SOL_SOCKET,
+ TCP_NODELAY,
+ &nodelay,
+ sizeof (nodelay))) // Don't buffer serial sends.
+ ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
else
this->initiate_read_stream ();
@@ -634,7 +685,7 @@ Receiver::initiate_read_stream (void)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
-1);
}
@@ -657,7 +708,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
+ ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
-1);
}
@@ -665,7 +716,7 @@ Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::write")),
-1);
}
@@ -693,7 +744,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Receiver::handle_read_stream() SessionId = %d ****\n"),
+ ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -730,6 +781,13 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Receiver %d: read %d bytes ok\n"),
+ this->index_,
+ result.bytes_transferred ()));
+ }
if (result.error () == 0 && result.bytes_transferred () > 0)
{
@@ -770,7 +828,7 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Receiver::handle_write_stream() SessionId = %d ****\n"),
+ ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -807,6 +865,13 @@ Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"),
+ this->index_,
+ result.bytes_transferred ()));
+ }
mb.release ();
@@ -848,6 +913,10 @@ public:
long get_total_w (void) { return this->total_w_; }
long get_total_r (void) { return this->total_r_; }
+ // This is called to pass the new connection's addresses.
+ virtual void addresses (const ACE_INET_Addr& peer,
+ const ACE_INET_Addr& local);
+
virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
// This is called when asynchronous reads from the socket complete
@@ -969,7 +1038,8 @@ Connector::on_new_sender (Sender &sndr)
this->sessions_++;
this->list_senders_[sndr.index_] = &sndr;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
+ ACE_TEXT ("(%t) Connector: sender %d up; now have %d.\n"),
+ sndr.index_,
this->sessions_));
}
@@ -989,22 +1059,9 @@ Connector::on_delete_sender (Sender &sndr)
&& this->list_senders_[sndr.index_] == &sndr)
this->list_senders_[sndr.index_] = 0;
- ACE_TCHAR bufs [256];
- ACE_TCHAR bufr [256];
-
- ACE_OS::sprintf (bufs, ACE_TEXT ("%d(%ld)"),
- sndr.get_total_snd (),
- sndr.get_total_w ());
-
- ACE_OS::sprintf (bufr, ACE_TEXT ("%d(%ld)"),
- sndr.get_total_rcv (),
- sndr.get_total_r ());
-
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Sender::~DTOR index=%d snd=%s rcv=%s sessions_=%d\n"),
+ ACE_TEXT ("(%t) Connector: sender %d gone; %d remain\n"),
sndr.index_,
- bufs,
- bufr,
this->sessions_));
}
@@ -1052,7 +1109,7 @@ Connector::start (const ACE_INET_Addr& addr, int num)
if (this->open (1, 0, 1) != 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("(%t) %p\n"),
ACE_LIB_TEXT ("Connector::open failed")));
return rc;
}
@@ -1062,8 +1119,8 @@ Connector::start (const ACE_INET_Addr& addr, int num)
if (this->connect (addr) != 0)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%p\n"),
- ACE_LIB_TEXT ("Connector::connect failed")));
+ ACE_TEXT ("(%t) %p\n"),
+ ACE_TEXT ("Connector::connect failed")));
break;
}
}
@@ -1088,6 +1145,40 @@ Sender::Sender (Connector * connector, int index)
Sender::~Sender (void)
{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Sender %d dtor; %d sends (%d bytes); ")
+ ACE_TEXT ("%d recvs (%d bytes)\n"),
+ this->index_,
+ this->total_w_, this->total_snd_,
+ this->total_r_, this->total_rcv_));
+ if (this->io_count_ != 0)
+ ACE_ERROR ((LM_WARNING,
+ ACE_TEXT ("(%t) Sender %d deleted with %d I/O outstanding\n"),
+ this->index_,
+ this->io_count_));
+
+ // This test bounces data back and forth between Senders and Receivers.
+ // Therefore, if there was significantly more data in one direction, that's
+ // a problem. Remember, the byte counts are unsigned values.
+ int issue_data_warning = 0;
+ if (this->total_snd_ > this->total_rcv_)
+ {
+ if (this->total_rcv_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_snd_ / this->total_rcv_ > 2)
+ issue_data_warning = 1;
+ }
+ else
+ {
+ if (this->total_snd_ == 0)
+ issue_data_warning = 1;
+ else if (this->total_rcv_ / this->total_snd_ > 2)
+ issue_data_warning = 1;
+ }
+ if (issue_data_warning)
+ ACE_DEBUG ((LM_WARNING,
+ ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
+
if (this->connector_ != 0)
this->connector_->on_delete_sender (*this);
@@ -1113,22 +1204,46 @@ Sender::cancel ()
void
+Sender::addresses (const ACE_INET_Addr& /* peer */, const ACE_INET_Addr& local)
+{
+ ACE_TCHAR str[256];
+ if (0 == local.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Sender %d connected on %s\n"),
+ this->index_,
+ str));
+ else
+ ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
+ this->index_,
+ ACE_TEXT ("addr_to_string")));
+ return;
+}
+
+
+void
Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
this->handle_ = handle;
+ int nodelay = 1;
+ ACE_SOCK_Stream option_setter (handle);
+ if (option_setter.set_option (SOL_SOCKET,
+ TCP_NODELAY,
+ &nodelay,
+ sizeof (nodelay))) // Don't buffer serial sends.
+ ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
// Open ACE_Asynch_Write_Stream
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Write_Stream::open")));
// Open ACE_Asynch_Read_Stream
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::open")));
else if (this->initiate_write_stream () == 0)
@@ -1185,7 +1300,7 @@ Sender::initiate_write_stream (void)
{
mb1->release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
-1);
}
@@ -1203,7 +1318,7 @@ Sender::initiate_write_stream (void)
{
mb->release ();
ACE_ERROR_RETURN((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Stream::write")),
-1);
}
@@ -1267,7 +1382,7 @@ Sender::initiate_read_stream (void)
{
mb1->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::readv")),
-1);
}
@@ -1290,7 +1405,7 @@ Sender::initiate_read_stream (void)
{
mb->release ();
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("%p\n"),
+ ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Sender::ACE_Asynch_Read_Stream::read")),
-1);
}
@@ -1316,7 +1431,7 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Sender::handle_write_stream() SessionId = %d ****\n"),
+ ACE_TEXT ("(%t) **** Sender %d: handle_write_stream() ****\n"),
index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -1392,6 +1507,13 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Sender %d: wrote %d bytes ok\n"),
+ this->index_,
+ result.bytes_transferred ()));
+ }
mb.release ();
@@ -1399,9 +1521,11 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
this->total_snd_ += result.bytes_transferred ();
- if (duplex != 0 && // full duplex, continue write
- (this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
- this->initiate_write_stream ();
+ if (duplex != 0) // full duplex, continue write
+ {
+ if ((this->total_snd_- this->total_rcv_) < 1024*32 ) //flow control
+ this->initiate_write_stream ();
+ }
else // half-duplex read reply, after read we will start write
this->initiate_read_stream ();
}
@@ -1428,7 +1552,7 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
LogLocker log_lock;
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("**** Sender::handle_read_stream() SessionId = %d ****\n"),
+ ACE_TEXT ("(%t) **** Sender %d: handle_read_stream() ****\n"),
index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
@@ -1487,6 +1611,13 @@ Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%t) Sender %d: read %d bytes ok\n"),
+ this->index_,
+ result.bytes_transferred ()));
+ }
mb.release ();
@@ -1591,15 +1722,16 @@ parse_args (int argc, ACE_TCHAR *argv[])
max_aio_operations = 512; // POSIX Proactor params
#if defined (sun)
proactor_type = SUN; // Proactor type for SunOS
+ threads = 1; // aiosuspend() not MT Safe.
#else
proactor_type = DEFAULT; // Proactor type = default
-#endif
threads = 3; // size of Proactor thread pool
+#endif
#if defined(__sgi) || defined (ACE_LINUX_COMMON_H)
ACE_DEBUG (( LM_DEBUG,
"Weak AIO implementation, test will work with 1 client"));
- senders = 1; // number of senders
+ senders = 1; // number of senders
#else
senders = 20; // number of senders
#endif
@@ -1708,27 +1840,27 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
//Cancel all pending AIO on Connector and Senders
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Cancel Connector/Senders: sessions_=%d\n"),
+ ACE_TEXT ("(%t) Cancel Connector/Senders: sessions_=%d\n"),
connector.get_number_sessions ()
));
//connector.cancel_all ();
//Cancel all pending AIO on Acceptor And Receivers
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Cancel Acceptor/Receivers:sessions_=%d\n"),
+ ACE_TEXT ("(%t) Cancel Acceptor/Receivers:sessions_=%d\n"),
acceptor.get_number_sessions ()
));
//acceptor.cancel_all ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Stop Thread Pool Task\n")
+ ACE_TEXT ("(%t) Stop Thread Pool Task\n")
));
task1.stop ();
// As Proactor event loop now is inactive it is safe to destroy all
// Senders
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Stop Connector/Senders: sessions_=%d\n"),
+ ACE_TEXT ("(%t) Stop Connector/Senders: sessions_=%d\n"),
connector.get_number_sessions ()
));
connector.stop ();
@@ -1736,7 +1868,7 @@ ACE_TMAIN (int argc, ACE_TCHAR *argv[])
// As Proactor event loop now is inactive it is safe to destroy all
// Receivers
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("Stop Acceptor/Receivers:sessions_=%d\n"),
+ ACE_TEXT ("(%t) Stop Acceptor/Receivers:sessions_=%d\n"),
acceptor.get_number_sessions ()
));
acceptor.stop ();