diff options
Diffstat (limited to 'examples/Reactor/Proactor/test_proactor.cpp')
-rw-r--r-- | examples/Reactor/Proactor/test_proactor.cpp | 100 |
1 files changed, 36 insertions, 64 deletions
diff --git a/examples/Reactor/Proactor/test_proactor.cpp b/examples/Reactor/Proactor/test_proactor.cpp index f6ebed26c53..4582fb82888 100644 --- a/examples/Reactor/Proactor/test_proactor.cpp +++ b/examples/Reactor/Proactor/test_proactor.cpp @@ -34,7 +34,6 @@ static u_short port = ACE_DEFAULT_SERVER_PORT; static char *file = "test_proactor.cpp"; static char *dump_file = "output"; static int done = 0; -static int initial_read_size = BUFSIZ; class Receiver : public ACE_Service_Handler // @@ -130,18 +129,13 @@ Receiver::open (ACE_HANDLE handle, return; } - // Duplicate the message block so that we can keep it around - ACE_Message_Block &duplicate = *message_block.duplicate (); - - // Initial data (data which came with the AcceptEx call) - ACE_Asynch_Read_Stream::Result fake_result (*this, - this->handle_, - duplicate, - initial_read_size, - 0, - ACE_INVALID_HANDLE); - // This will call the callback - fake_result.complete (message_block.length (), 1, 0); + // Print any initial data which came with the AcceptEx call + message_block.rd_ptr ()[message_block.length ()] = '\0'; + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "Initial data", message_block.rd_ptr ())); + + // Initiate new read from the stream + if (this->initiate_read_stream () == -1) + return; } int @@ -165,7 +159,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n")); // Reset pointers - result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + result.message_block ().rd_ptr ()[result.message_block ().length ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); @@ -182,7 +176,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) { // Successful read: Write the data to the file. if (this->wf_.write (result.message_block (), - result.bytes_transferred (), + result.message_block ().length (), this->file_offset_) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write")); @@ -217,10 +211,6 @@ Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result) if (result.success ()) // Write successful: Increment file offset this->file_offset_ += result.bytes_transferred (); - - // This code is not robust enough to deal with short file writes - // (which hardly ever happen) ;-) - ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ()); } class Sender : public ACE_Handler @@ -351,6 +341,22 @@ Sender::open (const char *host, return 0; } +int +Sender::initiate_read_file (void) +{ + // Create Message_Block + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); + + // Inititiate an asynchronous read from the file + if (this->rf_.read (*mb, + mb->size () - 1, + this->file_offset_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1); + + return 0; +} + int Sender::transmit_file (void) { @@ -410,28 +416,12 @@ Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result) done = 1; } -int -Sender::initiate_read_file (void) -{ - // Create Message_Block - ACE_Message_Block *mb = 0; - ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); - - // Inititiate an asynchronous read from the file - if (this->rf_.read (*mb, - mb->size () - 1, - this->file_offset_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1); - - return 0; -} - void Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result) { ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n")); - result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + result.message_block ().rd_ptr ()[result.message_block ().length ()] = '\0'; ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); @@ -449,7 +439,7 @@ Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result) // Read successful: increment offset and write data to network this->file_offset_ += result.bytes_transferred (); if (this->ws_.write (result.message_block (), - result.bytes_transferred ()) == -1) + result.message_block ().length ()) == -1) { ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write")); return; @@ -483,33 +473,15 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) ACE_DEBUG ((LM_DEBUG, "********************\n")); ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); - if (result.success ()) - { - // Partial write to socket - int unsent_data = result.bytes_to_write () - result.bytes_transferred (); - if (unsent_data != 0) - { - // Reset pointers - result.message_block ().rd_ptr (result.bytes_transferred ()); - - // Duplicate the message block and retry remaining data - if (this->ws_.write (*result.message_block ().duplicate (), - unsent_data) == -1) - { - ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write")); - return; - } - } - else if (!(this->file_size_ > this->file_offset_)) - { - this->stream_write_done_ = 1; - if (this->transmit_file_done_) - done = 1; - } - } - - // Release message block result.message_block ().release (); + + if (result.success ()) + if (!(this->file_size_ > this->file_offset_)) + { + this->stream_write_done_ = 1; + if (this->transmit_file_done_) + done = 1; + } } static int @@ -560,7 +532,7 @@ main (int argc, char *argv[]) if (host == 0) { if (acceptor.open (ACE_INET_Addr (port), - initial_read_size, + BUFSIZ, 1) == -1) return -1; } |