summaryrefslogtreecommitdiff
path: root/examples/Reactor/Proactor/test_proactor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'examples/Reactor/Proactor/test_proactor.cpp')
-rw-r--r--examples/Reactor/Proactor/test_proactor.cpp100
1 files changed, 36 insertions, 64 deletions
diff --git a/examples/Reactor/Proactor/test_proactor.cpp b/examples/Reactor/Proactor/test_proactor.cpp
index f6ebed26c53..4582fb82888 100644
--- a/examples/Reactor/Proactor/test_proactor.cpp
+++ b/examples/Reactor/Proactor/test_proactor.cpp
@@ -34,7 +34,6 @@ static u_short port = ACE_DEFAULT_SERVER_PORT;
static char *file = "test_proactor.cpp";
static char *dump_file = "output";
static int done = 0;
-static int initial_read_size = BUFSIZ;
class Receiver : public ACE_Service_Handler
//
@@ -130,18 +129,13 @@ Receiver::open (ACE_HANDLE handle,
return;
}
- // Duplicate the message block so that we can keep it around
- ACE_Message_Block &duplicate = *message_block.duplicate ();
-
- // Initial data (data which came with the AcceptEx call)
- ACE_Asynch_Read_Stream::Result fake_result (*this,
- this->handle_,
- duplicate,
- initial_read_size,
- 0,
- ACE_INVALID_HANDLE);
- // This will call the callback
- fake_result.complete (message_block.length (), 1, 0);
+ // Print any initial data which came with the AcceptEx call
+ message_block.rd_ptr ()[message_block.length ()] = '\0';
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "Initial data", message_block.rd_ptr ()));
+
+ // Initiate new read from the stream
+ if (this->initiate_read_stream () == -1)
+ return;
}
int
@@ -165,7 +159,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
// Reset pointers
- result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+ result.message_block ().rd_ptr ()[result.message_block ().length ()] = '\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
@@ -182,7 +176,7 @@ Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
// Successful read: Write the data to the file.
if (this->wf_.write (result.message_block (),
- result.bytes_transferred (),
+ result.message_block ().length (),
this->file_offset_) == -1)
{
ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_File::write"));
@@ -217,10 +211,6 @@ Receiver::handle_write_file (const ACE_Asynch_Write_File::Result &result)
if (result.success ())
// Write successful: Increment file offset
this->file_offset_ += result.bytes_transferred ();
-
- // This code is not robust enough to deal with short file writes
- // (which hardly ever happen) ;-)
- ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
}
class Sender : public ACE_Handler
@@ -351,6 +341,22 @@ Sender::open (const char *host,
return 0;
}
+int
+Sender::initiate_read_file (void)
+{
+ // Create Message_Block
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1);
+
+ // Inititiate an asynchronous read from the file
+ if (this->rf_.read (*mb,
+ mb->size () - 1,
+ this->file_offset_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1);
+
+ return 0;
+}
+
int
Sender::transmit_file (void)
{
@@ -410,28 +416,12 @@ Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result)
done = 1;
}
-int
-Sender::initiate_read_file (void)
-{
- // Create Message_Block
- ACE_Message_Block *mb = 0;
- ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1);
-
- // Inititiate an asynchronous read from the file
- if (this->rf_.read (*mb,
- mb->size () - 1,
- this->file_offset_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1);
-
- return 0;
-}
-
void
Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
{
ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n"));
- result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+ result.message_block ().rd_ptr ()[result.message_block ().length ()] = '\0';
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
@@ -449,7 +439,7 @@ Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
// Read successful: increment offset and write data to network
this->file_offset_ += result.bytes_transferred ();
if (this->ws_.write (result.message_block (),
- result.bytes_transferred ()) == -1)
+ result.message_block ().length ()) == -1)
{
ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"));
return;
@@ -483,33 +473,15 @@ Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
ACE_DEBUG ((LM_DEBUG, "********************\n"));
ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
- if (result.success ())
- {
- // Partial write to socket
- int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
- if (unsent_data != 0)
- {
- // Reset pointers
- result.message_block ().rd_ptr (result.bytes_transferred ());
-
- // Duplicate the message block and retry remaining data
- if (this->ws_.write (*result.message_block ().duplicate (),
- unsent_data) == -1)
- {
- ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"));
- return;
- }
- }
- else if (!(this->file_size_ > this->file_offset_))
- {
- this->stream_write_done_ = 1;
- if (this->transmit_file_done_)
- done = 1;
- }
- }
-
- // Release message block
result.message_block ().release ();
+
+ if (result.success ())
+ if (!(this->file_size_ > this->file_offset_))
+ {
+ this->stream_write_done_ = 1;
+ if (this->transmit_file_done_)
+ done = 1;
+ }
}
static int
@@ -560,7 +532,7 @@ main (int argc, char *argv[])
if (host == 0)
{
if (acceptor.open (ACE_INET_Addr (port),
- initial_read_size,
+ BUFSIZ,
1) == -1)
return -1;
}