summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-98b23
-rw-r--r--ace/Asynch_IO.cpp311
-rw-r--r--ace/Asynch_IO.h12
-rw-r--r--ace/OS.h36
-rw-r--r--ace/Proactor.cpp226
-rw-r--r--ace/Proactor.h161
-rw-r--r--ace/config-sunos5.6.h2
-rw-r--r--examples/Reactor/Proactor/Makefile40
-rw-r--r--examples/Reactor/Proactor/test_proactor_with_aio.cpp372
9 files changed, 984 insertions, 199 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b
index ffeeef839c6..d71b13aebfa 100644
--- a/ChangeLog-98b
+++ b/ChangeLog-98b
@@ -1,12 +1,31 @@
+Thu Jun 11 1998 Alexander Babu Arulanthu <alex@merengue.cs.wustl.edu>
+
+ * ace/OS.h: Added typedef's, #define's and #include's under the
+ definition ACE_HAS_AIO_CALLS for the proactor porting.
+
+ * ace/config-sunos5.6.h: Defined ACE_HAS_AIO_CALLS for
+ solaris2.6.
+
+ * ace/Asynch_IO.h:
+ * ace/Asynch_IO.cpp: Added aio call support for Asynch_Read_File. See
+ the code under #if defined (ACE_HAS_AIO_CALLS)
+
+ * ace/Proactor.h:
+ * ace/Proactor.cpp: Added support for AIO calls in Proactor. See
+ the code under #if defined (ACE_HAS_AIO_CALLS)
+
+ * examples/Reactor/Proactor/test_proactor_with_aio.cpp: The test
+ program for Proactor doing aio stuff.
+
Thu Jun 11 13:34:26 1998 Darrell Brunsch <brunsch@cs.wustl.edu>
* ace/OS.cpp: Made change to uname to use a different structure
- if certain Borland compilers are being used. Thanks to
+ if certain Borland compilers are being used. Thanks to
Valik Solorzano <valik@geodan.nl> for this fix.
Thu Jun 11 11:15:26 1998 Carlos O'Ryan <coryan@cs.wustl.edu>
- * ace/XtReactor.cpp:
+ * ace/XtReactor.cpp:
The notify_handler_ open method requires an extra argument.
Wed Jun 10 14:31:55 1998 David L. Levine <levine@cs.wustl.edu>
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index d0f76ff3325..a350e4ce366 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -4,13 +4,14 @@
#define ACE_BUILD_DLL
#include "ace/Asynch_IO.h"
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \
+ || (defined (ACE_HAS_AIO_CALLS))
// This only works on Win32 platforms
#include "ace/Proactor.h"
#include "ace/Message_Block.h"
#include "ace/Service_Config.h"
-#include "ace/Inet_Addr.h"
+#include "ace/INET_Addr.h"
#if !defined (__ACE_INLINE__)
#include "ace/Asynch_IO.i"
@@ -28,7 +29,7 @@ ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler,
completion_key_ (0),
error_ (0)
{
- // Set the OVERLAPPED structure
+ // Set the ACE_OVERLAPPED structure
this->Internal = 0;
this->InternalHigh = 0;
this->Offset = offset;
@@ -40,19 +41,19 @@ ACE_Asynch_Result::~ACE_Asynch_Result (void)
{
}
-u_long
+u_long
ACE_Asynch_Result::bytes_transferred (void) const
{
return this->bytes_transferred_;
}
-
+
const void *
ACE_Asynch_Result::act (void) const
{
return this->act_;
}
-int
+int
ACE_Asynch_Result::success (void) const
{
return this->success_;
@@ -64,25 +65,25 @@ ACE_Asynch_Result::completion_key (void) const
return this->completion_key_;
}
-u_long
+u_long
ACE_Asynch_Result::error (void) const
{
return this->error_;
}
-u_long
+u_long
ACE_Asynch_Result::offset (void) const
{
return this->Offset;
}
-u_long
+u_long
ACE_Asynch_Result::offset_high (void) const
{
return this->OffsetHigh;
}
-ACE_HANDLE
+ACE_HANDLE
ACE_Asynch_Result::event (void) const
{
return this->hEvent;
@@ -96,7 +97,7 @@ ACE_Asynch_Operation::ACE_Asynch_Operation (void)
{
}
-int
+int
ACE_Asynch_Operation::open (ACE_Handler &handler,
ACE_HANDLE handle,
const void *completion_key,
@@ -106,12 +107,15 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
this->handler_ = &handler;
this->handle_ = handle;
+ // @@
+ ACE_UNUSED_ARG (completion_key);
+
// Grab the handle from the <handler> if <handle> is invalid
if (this->handle_ == ACE_INVALID_HANDLE)
this->handle_ = this->handler_->handle ();
- if (this->handle_ == ACE_INVALID_HANDLE)
+ if (this->handle_ == ACE_INVALID_HANDLE)
return -1;
-
+
// If no proactor was passed
if (this->proactor_ == 0)
{
@@ -122,8 +126,13 @@ ACE_Asynch_Operation::open (ACE_Handler &handler,
this->proactor_ = ACE_Proactor::instance();
}
+#if !defined (ACE_HAS_AIO_CALLS)
// Register with the <proactor>
return this->proactor_->register_handle (this->handle_, completion_key);
+#else /* ACE_HAS_AIO_CALLS */
+ // AIO stuff is present. So no registering business.
+ return 1;
+#endif /* ACE_HAS_AIO_CALLS */
}
int
@@ -145,28 +154,65 @@ ACE_Asynch_Read_Stream::ACE_Asynch_Read_Stream (void)
{
}
-int
+int
ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
u_long bytes_to_read,
const void *act)
{
// Create the Asynch_Result
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
message_block,
bytes_to_read,
act,
- this->proactor_->get_handle ()),
+ this->proactor_->get_handle ()),
-1);
return this->shared_read (result);
}
-int
+int
ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ // Make a new AIOCB and issue aio_read, if queueing is possible
+ // store this with the Proactor, so that that can be used for
+ // aio_return6 and aio_error.
+ aiocb *aiocb_ptr;
+ ACE_NEW_RETURN (aiocb_ptr, aiocb, -1);
+
+ // Setup AIOCB.
+ // @@ Priority always 0?
+ // @@ Signal no, always?
+ aiocb_ptr->aio_fildes = result->handle ();
+ aiocb_ptr->aio_offset = result->Offset;
+ aiocb_ptr->aio_buf = result->message_block ().wr_ptr ();
+ aiocb_ptr->aio_nbytes = result->bytes_to_read ();
+ aiocb_ptr->aio_reqprio = 0;
+ aiocb_ptr->aio_sigevent.sigev_notify = SIGEV_NONE;
+ //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX;
+ aiocb_ptr->aio_sigevent.sigev_value.sival_ptr =
+ (void *) aiocb_ptr;
+
+ // Fire off the aio write.
+ if (aio_read (aiocb_ptr) < 0)
+ // Queuing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Queueing faile for aio_read"),
+ -1);
+
+ // Success. Store the aiocb_ptr with Proactor.
+ if (this->proactor_->insert_to_aiocb_list (aiocb_ptr, result) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Fatal error : aio_read queuing suceeded, no place at aiocb_list"),
+ -1);
+
+ // Aio successfully issued and ptr stored.
+ return 1;
+
+#else /* ACE_HAS_AIO_CALLS */
u_long bytes_read;
// Initiate the read
@@ -197,6 +243,7 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result)
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ReadFile")), -1);
}
+#endif /* ACE_HAS_AIO_CALLS */
}
// ************************************************************
@@ -208,13 +255,13 @@ ACE_Asynch_Read_Stream::Result::Result (ACE_Handler &handler,
const void* act,
ACE_HANDLE event)
: ACE_Asynch_Result (handler, act, event),
- handle_ (handle),
+ bytes_to_read_ (bytes_to_read),
message_block_ (message_block),
- bytes_to_read_ (bytes_to_read)
+ handle_ (handle)
{
}
-u_long
+u_long
ACE_Asynch_Read_Stream::Result::bytes_to_read (void) const
{
return this->bytes_to_read_;
@@ -226,14 +273,14 @@ ACE_Asynch_Read_Stream::Result::message_block (void) const
return this->message_block_;
}
-ACE_HANDLE
+ACE_HANDLE
ACE_Asynch_Read_Stream::Result::handle (void) const
{
return this->handle_;
}
-void
-ACE_Asynch_Read_Stream::Result::complete (u_long bytes_transferred,
+void
+ACE_Asynch_Read_Stream::Result::complete(u_long bytes_transferred,
int success,
const void *completion_key,
u_long error)
@@ -257,27 +304,31 @@ ACE_Asynch_Write_Stream::ACE_Asynch_Write_Stream (void)
{
}
-int
+int
ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
u_long bytes_to_write,
const void *act)
{
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
message_block,
bytes_to_write,
act,
- this->proactor_->get_handle ()),
+ this->proactor_->get_handle ()),
-1);
-
+
return this->shared_write (result);
}
-
-int
+
+int
ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (result);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
u_long bytes_written;
// Initiate the write
@@ -308,6 +359,7 @@ ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result)
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WriteFile")), -1);
}
+#endif /* ACE_HAS_AIO_CALLS */
}
// ************************************************************
@@ -319,13 +371,13 @@ ACE_Asynch_Write_Stream::Result::Result (ACE_Handler &handler,
const void* act,
ACE_HANDLE event)
: ACE_Asynch_Result (handler, act, event),
- handle_ (handle),
+ bytes_to_write_ (bytes_to_write),
message_block_ (message_block),
- bytes_to_write_ (bytes_to_write)
+ handle_ (handle)
{
}
-u_long
+u_long
ACE_Asynch_Write_Stream::Result::bytes_to_write (void) const
{
return this->bytes_to_write_;
@@ -337,13 +389,13 @@ ACE_Asynch_Write_Stream::Result::message_block (void) const
return this->message_block_;
}
-ACE_HANDLE
+ACE_HANDLE
ACE_Asynch_Write_Stream::Result::handle (void) const
{
return this->handle_;
}
-void
+void
ACE_Asynch_Write_Stream::Result::complete (u_long bytes_transferred,
int success,
const void *completion_key,
@@ -364,7 +416,7 @@ ACE_Asynch_Write_Stream::Result::complete (u_long bytes_transferred,
// ************************************************************
-int
+int
ACE_Asynch_Read_File::read (ACE_Message_Block &message_block,
u_long bytes_to_read,
u_long offset,
@@ -372,7 +424,7 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block,
const void *act)
{
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
message_block,
@@ -380,9 +432,9 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block,
act,
offset,
offset_high,
- this->proactor_->get_handle ()),
+ this->proactor_->get_handle ()),
-1);
-
+
return this->shared_read (result);
}
@@ -402,13 +454,15 @@ ACE_Asynch_Read_File::Result::Result (ACE_Handler &handler,
this->OffsetHigh = offset_high;
}
-void
+void
ACE_Asynch_Read_File::Result::complete (u_long bytes_transferred,
int success,
const void *completion_key,
u_long error)
-{
- // Copy the data which was returned by GetQueuedCompletionStatus
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Read_File::Result::complete\n"));
+
+ // Copy the data which was returned by GetQueuedCompletionStatus.
this->bytes_transferred_ = bytes_transferred;
this->success_ = success;
this->completion_key_ = completion_key;
@@ -417,13 +471,13 @@ ACE_Asynch_Read_File::Result::complete (u_long bytes_transferred,
// Appropriately move the pointers in the message block.
this->message_block_.wr_ptr (bytes_transferred);
- // Callback: notify <handler> of completion
+ // Callback: notify <handler> of completion.
this->handler_.handle_read_file (*this);
}
// ************************************************************
-int
+int
ACE_Asynch_Write_File::write (ACE_Message_Block &message_block,
u_long bytes_to_write,
u_long offset,
@@ -431,7 +485,7 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block,
const void *act)
{
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
message_block,
@@ -439,9 +493,9 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block,
act,
offset,
offset_high,
- this->proactor_->get_handle ()),
+ this->proactor_->get_handle ()),
-1);
-
+
return this->shared_write (result);
}
@@ -461,12 +515,12 @@ ACE_Asynch_Write_File::Result::Result (ACE_Handler &handler,
this->OffsetHigh = offset_high;
}
-void
+void
ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred,
int success,
const void *completion_key,
u_long error)
-{
+{
// Copy the data which was returned by GetQueuedCompletionStatus
this->bytes_transferred_ = bytes_transferred;
this->success_ = success;
@@ -485,13 +539,20 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred,
ACE_Asynch_Accept::ACE_Asynch_Accept (void)
{
}
-
-int
+
+int
ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
u_long bytes_to_read,
ACE_HANDLE accept_handle,
const void *act)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (message_block);
+ ACE_UNUSED_ARG (bytes_to_read);
+ ACE_UNUSED_ARG (accept_handle);
+ ACE_UNUSED_ARG (act);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
// Sanity check: make sure that enough space has been allocated by the caller.
size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr);
@@ -501,7 +562,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
size_t space_needed = bytes_to_read + 2 * address_size;
if (available_space < space_needed)
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1);
-
+
int close_accept_handle = 0;
// If the <accept_handle> is invalid, we will create a new socket
if (accept_handle == ACE_INVALID_HANDLE)
@@ -512,21 +573,21 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
if (accept_handle == ACE_INVALID_HANDLE)
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ACE_OS::socket")), -1);
else
- // Remember to close the socket down if failures occur.
+ // Remember to close the socket down if failures occur.
close_accept_handle = 1;
}
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
accept_handle,
message_block,
bytes_to_read,
act,
- this->proactor_->get_handle ()),
+ this->proactor_->get_handle ()),
-1);
-
+
u_long bytes_read;
// Initiate the accept
@@ -564,14 +625,15 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block,
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ReadFile")), -1);
}
-#else
+#else /* ACE_HAS_WINNT4 ... */
ACE_NOTSUP_RETURN (-1);
#endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */
+#endif /* ACE_HAS_AIO_CALLS */
}
-
+
// ************************************************************
-u_long
+u_long
ACE_Asynch_Accept::Result::bytes_to_read (void) const
{
return this->bytes_to_read_;
@@ -583,18 +645,18 @@ ACE_Asynch_Accept::Result::message_block (void) const
return this->message_block_;
}
-ACE_HANDLE
+ACE_HANDLE
ACE_Asynch_Accept::Result::listen_handle (void) const
{
return this->listen_handle_;
}
-
-ACE_HANDLE
+
+ACE_HANDLE
ACE_Asynch_Accept::Result::accept_handle (void) const
{
return this->accept_handle_;
}
-
+
ACE_Asynch_Accept::Result::Result (ACE_Handler &handler,
ACE_HANDLE listen_handle,
ACE_HANDLE accept_handle,
@@ -603,14 +665,14 @@ ACE_Asynch_Accept::Result::Result (ACE_Handler &handler,
const void* act,
ACE_HANDLE event)
: ACE_Asynch_Result (handler, act, event),
- listen_handle_ (listen_handle),
- accept_handle_ (accept_handle),
+ bytes_to_read_ (bytes_to_read),
message_block_ (message_block),
- bytes_to_read_ (bytes_to_read)
+ listen_handle_ (listen_handle),
+ accept_handle_ (accept_handle)
{
-}
-
-void
+}
+
+void
ACE_Asynch_Accept::Result::complete (u_long bytes_transferred,
int success,
const void *completion_key,
@@ -628,26 +690,38 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred,
// Callback: notify <handler> of completion
this->handler_.handle_accept (*this);
}
-
+
// ************************************************************
ACE_Asynch_Transmit_File::ACE_Asynch_Transmit_File (void)
{
}
-
-int
+
+int
ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
Header_And_Trailer *header_and_trailer,
- u_long bytes_to_write,
+ u_long bytes_to_write,
u_long offset,
u_long offset_high,
u_long bytes_per_send,
u_long flags,
const void *act)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (file);
+ ACE_UNUSED_ARG (header_and_trailer);
+ ACE_UNUSED_ARG (bytes_to_write);
+ ACE_UNUSED_ARG (offset);
+ ACE_UNUSED_ARG (offset_high);
+ ACE_UNUSED_ARG (bytes_per_send);
+ ACE_UNUSED_ARG (flags);
+ ACE_UNUSED_ARG (act);
+
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
Result *result = 0;
- ACE_NEW_RETURN (result,
+ ACE_NEW_RETURN (result,
Result (*this->handler_,
this->handle_,
file,
@@ -660,11 +734,11 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
act,
this->proactor_->get_handle ()),
-1);
-
- LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0;
+
+ ACE_LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0;
if (result->header_and_trailer () != 0)
transmit_buffers = result->header_and_trailer ()->transmit_buffers ();
-
+
// Initiate the transmit file
int initiate_result = ::TransmitFile ((SOCKET) result->socket (),
result->file (),
@@ -694,12 +768,13 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
delete result;
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("TransmitFile")), -1);
- }
-#else
+ }
+#else /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */
ACE_NOTSUP_RETURN (-1);
#endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */
+#endif /* ACE_HAS_AIO_CALLS */
}
-
+
// ************************************************************
ACE_HANDLE
@@ -708,7 +783,7 @@ ACE_Asynch_Transmit_File::Result::socket (void) const
return this->socket_;
}
-ACE_HANDLE
+ACE_HANDLE
ACE_Asynch_Transmit_File::Result::file (void) const
{
return this->file_;
@@ -719,25 +794,25 @@ ACE_Asynch_Transmit_File::Result::header_and_trailer (void) const
{
return this->header_and_trailer_;
}
-
-u_long
+
+u_long
ACE_Asynch_Transmit_File::Result::bytes_to_write (void) const
{
return this->bytes_to_write_;
}
-u_long
+u_long
ACE_Asynch_Transmit_File::Result::bytes_per_send (void) const
{
return this->bytes_per_send_;
}
-u_long
+u_long
ACE_Asynch_Transmit_File::Result::flags (void) const
{
return this->flags_;
}
-
+
ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler,
ACE_HANDLE socket,
ACE_HANDLE file,
@@ -759,7 +834,7 @@ ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler,
{
}
-void
+void
ACE_Asynch_Transmit_File::Result::complete (u_long bytes_transferred,
int success,
const void *completion_key,
@@ -780,16 +855,16 @@ ACE_Asynch_Transmit_File::Result::complete (u_long bytes_transferred,
ACE_Message_Block *header = this->header_and_trailer_->header ();
if (header != 0)
header->rd_ptr (this->header_and_trailer_->header_bytes ());
-
+
ACE_Message_Block *trailer = this->header_and_trailer_->trailer ();
if (trailer != 0)
- trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
+ trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
}
- */
+ */
// Callback: notify <handler> of completion
- this->handler_.handle_transmit_file (*this);
-}
+ this->handler_.handle_transmit_file (*this);
+}
// ************************************************************
@@ -822,19 +897,19 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::header (void) const
return this->header_;
}
-void
+void
ACE_Asynch_Transmit_File::Header_And_Trailer::header (ACE_Message_Block *message_block)
{
this->header_ = message_block;
}
-u_long
+u_long
ACE_Asynch_Transmit_File::Header_And_Trailer::header_bytes (void) const
{
return this->header_bytes_;
}
-
-void
+
+void
ACE_Asynch_Transmit_File::Header_And_Trailer::header_bytes (u_long bytes)
{
this->header_bytes_ = bytes;
@@ -846,34 +921,35 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::trailer (void) const
return this->trailer_;
}
-void
+void
ACE_Asynch_Transmit_File::Header_And_Trailer::trailer (ACE_Message_Block *message_block)
{
this->trailer_ = message_block;
}
-u_long
+u_long
ACE_Asynch_Transmit_File::Header_And_Trailer::trailer_bytes (void) const
{
return this->trailer_bytes_;
}
-void
+void
ACE_Asynch_Transmit_File::Header_And_Trailer::trailer_bytes (u_long bytes)
{
this->trailer_bytes_ = bytes;
}
-LPTRANSMIT_FILE_BUFFERS
-ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void)
+#if 0
+ACE_LPTRANSMIT_FILE_BUFFERS
+ACE_Asynch_Transmit_Fie::Header_And_Trailer::transmit_buffers (void)
{
// If both are zero, return zero
if (this->header_ == 0 && this->trailer_ == 0)
return 0;
- else
+ else
{
// Something is valid
-
+
// If header is valid, set the fields
if (this->header_ != 0)
{
@@ -884,7 +960,7 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void)
{
this->transmit_buffers_.Head = 0;
this->transmit_buffers_.HeadLength = 0;
- }
+ }
// If trailer is valid, set the fields
if (this->trailer_ != 0)
@@ -896,12 +972,13 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void)
{
this->transmit_buffers_.Tail = 0;
this->transmit_buffers_.TailLength = 0;
- }
+ }
// Return the transmit buffers
return &this->transmit_buffers_;
}
}
+#endif /* 0 */
// ************************************************************
@@ -925,19 +1002,19 @@ ACE_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
ACE_UNUSED_ARG (result);
}
-void
+void
ACE_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
ACE_UNUSED_ARG (result);
}
-void
+void
ACE_Handler::handle_accept (const ACE_Asynch_Accept::Result &result)
{
ACE_UNUSED_ARG (result);
}
-void
+void
ACE_Handler::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result)
{
ACE_UNUSED_ARG (result);
@@ -949,14 +1026,14 @@ ACE_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
ACE_UNUSED_ARG (result);
}
-void
+void
ACE_Handler::handle_write_file (const ACE_Asynch_Write_File::Result &result)
{
ACE_UNUSED_ARG (result);
}
/*
-void
+void
ACE_Handler::handle_notify (const ACE_Asynch_Notify::Result &result)
{
ACE_UNUSED_ARG (result);
@@ -977,7 +1054,7 @@ ACE_Handler::proactor (void)
return this->proactor_;
}
-void
+void
ACE_Handler::proactor (ACE_Proactor *p)
{
this->proactor_ = p;
@@ -999,27 +1076,27 @@ ACE_Service_Handler::~ACE_Service_Handler (void)
{
}
-void
+void
ACE_Service_Handler::addresses (const ACE_INET_Addr &remote_address,
const ACE_INET_Addr &local_address)
{
- // Default behavior is to print out the addresses.
+ // Default behavior is to print out the addresses.
ASYS_TCHAR local_address_buf[BUFSIZ], remote_address_buf[BUFSIZ];
if (local_address.addr_to_string (local_address_buf, sizeof local_address_buf) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("can't obtain local_address's address string")));
-
+
if (remote_address.addr_to_string (remote_address_buf, sizeof remote_address_buf) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("can't obtain remote_address's address string")));
-
+
ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("On fd %d\n"), this->handle ()));
ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("local address %s\n"), local_address_buf));
- ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("remote address %s\n"), remote_address_buf));
+ ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("remote address %s\n"), remote_address_buf));
}
void
ACE_Service_Handler::open (ACE_HANDLE,
ACE_Message_Block &)
{
-}
+}
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/
diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h
index 4a3a34aac99..ba9f8370e04 100644
--- a/ace/Asynch_IO.h
+++ b/ace/Asynch_IO.h
@@ -27,7 +27,8 @@
#include "ace/OS.h"
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
+ (defined (ACE_HAS_AIO_CALLS))
// Forward declarations
class ACE_Proactor;
@@ -35,7 +36,7 @@ class ACE_Handler;
class ACE_Message_Block;
class ACE_INET_Addr;
-class ACE_Export ACE_Asynch_Result : public OVERLAPPED
+class ACE_Export ACE_Asynch_Result : protected ACE_OVERLAPPED
{
// = TITLE
// An abstract class which adds information to the OVERLAPPED
@@ -724,7 +725,7 @@ public:
void trailer_bytes (u_long bytes);
// Size of the trailer data.
- LPTRANSMIT_FILE_BUFFERS transmit_buffers (void);
+ ACE_LPTRANSMIT_FILE_BUFFERS transmit_buffers (void);
// Conversion routine.
protected:
@@ -740,7 +741,7 @@ public:
u_long trailer_bytes_;
// Size of trailer data.
- TRANSMIT_FILE_BUFFERS transmit_buffers_;
+ ACE_TRANSMIT_FILE_BUFFERS transmit_buffers_;
// Target data structure.
};
};
@@ -801,7 +802,6 @@ public:
// Get the I/O handle used by this <handler>. This method will be
// called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is
// passed to <open>.
-
protected:
ACE_Proactor *proactor_;
// The proactor associated with this handler.
@@ -853,5 +853,5 @@ public:
#include "ace/Asynch_IO.i"
#endif /* __ACE_INLINE__ */
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/
#endif /* ACE_ASYNCH_IO_H */
diff --git a/ace/OS.h b/ace/OS.h
index 0ef32ceaf71..82625bc9af5 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -3084,6 +3084,10 @@ typedef void (*__sighandler_t)(int); // keep Signal compilation happy
# include /**/ <unistd.h>
# endif /* ACE_LACKS_UNISTD_H */
+# if defined (ACE_HAS_AIO_CALLS)
+# include /**/ <aio.h>
+# endif /* ACE_HAS_AIO_CALLS */
+
# if !defined (ACE_LACKS_PARAM_H)
# include /**/ <sys/param.h>
# endif /* ACE_LACKS_PARAM_H */
@@ -5774,4 +5778,36 @@ ACE_Auto_Basic_Array_Ptr<char> (ACE_WString (WIDE_STRING).char_rep ()).get ()
# define ASYS_WIDE_STRING(ASCII_STRING) ASCII_STRING
#endif /* ACE_HAS_MOSTLY_UNICODE_APIS */
+#if defined (ACE_WIN32)
+typedef TRANSMIT_FILE_BUFFERS ACE_TRANSMIT_FILE_BUFFERS;
+typedef LPTRANSMIT_FILE_BUFFERS ACE_LPTRANSMIT_FILE_BUFFERS;
+typedef PTRANSMIT_FILE_BUFFERS ACE_PTRANSMIT_FILE_BUFFERS;
+
+#define ACE_INFINITE INFINITE
+#define ACE_STATUS_TIMEOUT STATUS_TIMEOUT
+#define ACE_WAIT_FAILED WAIT_FAILED
+#define ACE_WAIT_TIMEOUT WAIT_TIMEOUT
+
+#define ACE_FALSE FALSE
+#define ACE_TRUE TRUE
+#else /* ACE_WIN32 */
+struct ACE_TRANSMIT_FILE_BUFFERS
+{
+ void *Head;
+ size_t HeadLength;
+ void *Tail;
+ size_t TailLength;
+};
+typedef ACE_TRANSMIT_FILE_BUFFERS* ACE_PTRANSMIT_FILE_BUFFERS;
+typedef ACE_TRANSMIT_FILE_BUFFERS* ACE_LPTRANSMIT_FILE_BUFFERS;
+
+#define ACE_INFINITE LONG_MAX
+#define ACE_STATUS_TIMEOUT LONG_MAX
+#define ACE_WAIT_FAILED LONG_MAX
+#define ACE_WAIT_TIMEOUT LONG_MAX
+
+#define ACE_TRUE 1
+#define ACE_FALSE 0
+#endif /* ACE_WIN32 */
+
#endif /* ACE_OS_H */
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index ae41e67d811..52553a5a6f8 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -4,8 +4,10 @@
#define ACE_BUILD_DLL
#include "ace/Proactor.h"
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
-// This only works on Win32 platforms
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \
+ || (defined (ACE_HAS_AIO_CALLS))
+// This only works on Win32 platforms and on Unix platforms with aio
+// calls.
#include "ace/Task_T.h"
#include "ace/Log_Msg.h"
@@ -89,13 +91,16 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void)
int
ACE_Proactor_Timer_Handler::svc (void)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
u_long time;
ACE_Time_Value absolute_time;
for (; !this->shutting_down_;)
{
// default value
- time = INFINITE;
+ time = ACE_INFINITE;
// If the timer queue is not empty
if (!this->proactor_.timer_queue ()->is_empty ())
@@ -118,17 +123,18 @@ ACE_Proactor_Timer_Handler::svc (void)
time);
switch (result)
{
- case WAIT_TIMEOUT:
+ case ACE_WAIT_TIMEOUT:
// timeout: expire timers
this->proactor_.timer_queue ()->expire ();
break;
- case WAIT_FAILED:
+ case ACE_WAIT_FAILED:
// error
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WaitForSingleObject")), -1);
}
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void)
@@ -205,13 +211,36 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor)
ACE_Proactor::ACE_Proactor (size_t number_of_threads,
Timer_Queue *tq,
int used_with_reactor_event_loop)
- : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
- number_of_threads_ (number_of_threads),
- timer_queue_ (0),
- delete_timer_queue_ (0),
- timer_handler_ (0),
- used_with_reactor_event_loop_ (used_with_reactor_event_loop)
-{
+ :
+#if defined (ACE_HAS_AIO_CALLS)
+ #if defined (AIO_LISTIO_MAX)
+ aiocb_list_max_size_ (AIO_LISTIO_MAX),
+ #else /* AIO_LISTIO_MAX */
+ aiocb_list_max_size_ (2),
+ #endif /* AIO_LISTIO_MAX */
+
+ aiocb_list_cur_size_ (0),
+#else /* ACE_HAS_AIO_CALLS */
+ completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!!
+#endif /* ACE_HAS_AIO_CALLS */
+
+ number_of_threads_ (number_of_threads),
+ timer_queue_ (0),
+ delete_timer_queue_ (0),
+ timer_handler_ (0),
+ used_with_reactor_event_loop_ (used_with_reactor_event_loop)
+{
+#if defined (ACE_HAS_AIO_CALLS)
+ // Init the array.
+ for (size_t ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ {
+ aiocb_list_ [ai] = 0;
+ result_list_ [ai] = 0;
+ }
+ ACE_UNUSED_ARG (tq);
+#else /* ACE_HAS_AIO_CALLS */
// create the completion port
this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
this->completion_port_,
@@ -229,7 +258,7 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads,
// activate <timer_handler>
if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1)
ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p Could not create thread\n"), ASYS_TEXT ("Task::activate")));
-
+#endif /* ACE_HAS_AIO_CALLS */
}
ACE_Proactor *
@@ -350,6 +379,9 @@ ACE_Proactor::~ACE_Proactor (void)
int
ACE_Proactor::close (void)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// Take care of the timer handler
if (this->timer_handler_)
{
@@ -374,12 +406,18 @@ ACE_Proactor::close (void)
}
else
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
int
ACE_Proactor::register_handle (ACE_HANDLE handle,
const void *completion_key)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (handle);
+ ACE_UNUSED_ARG (completion_key);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// No locking is needed here as no state changes
ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
this->completion_port_,
@@ -394,6 +432,7 @@ ACE_Proactor::register_handle (ACE_HANDLE handle,
ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("CreateIoCompletionPort")), -1);
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
long
@@ -491,15 +530,22 @@ ACE_Proactor::handle_close (ACE_HANDLE handle,
return this->close ();
}
+
+// @@ get_handle () implementation.
ACE_HANDLE
ACE_Proactor::get_handle (void) const
{
+#if defined (ACE_HAS_AIO_CALLS)
+ return ACE_INVALID_HANDLE;
+#else /* ACE_HAS_AIO_CALLS */
if (this->used_with_reactor_event_loop_)
return this->event_.handle ();
else
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
+
int
ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
{
@@ -511,13 +557,80 @@ ACE_Proactor::handle_events (ACE_Time_Value &wait_time)
int
ACE_Proactor::handle_events (void)
{
- return this->handle_events (INFINITE);
+ return this->handle_events (ACE_INFINITE);
}
int
ACE_Proactor::handle_events (unsigned long milli_seconds)
{
- OVERLAPPED *overlapped = 0;
+#if defined (ACE_HAS_AIO_CALLS)
+ // Is there any entries in the list.
+ if (this->aiocb_list_cur_size_ == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG, "No AIO pending"));
+ return 0;
+ }
+
+ // Wait for asynch operation to complete.
+ timespec timeout;
+ timeout.tv_sec = milli_seconds;
+ timeout.tv_nsec = 0;
+ if (aio_suspend (this->aiocb_list_,
+ this->aiocb_list_max_size_,
+ &timeout) < 0)
+ // If failure is coz of timeout, then return *0* but set errno
+ // appropriately. This is what the Win proactor does.
+ if (errno == EINTR)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):aio_suspend"),
+ 0);
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):aio_suspend"),
+ 0);
+
+ // Check which aio has finished.
+ size_t ai;
+ for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
+ // Analyze error and return values.
+ if (aio_error (aiocb_list_ [ai]) != EINPROGRESS)
+ {
+ if (aio_return (aiocb_list_ [ai]) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%p):AIO failed"),
+ -1);
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "An aio has finished\n"));
+ // This AIO is done.
+ break;
+ }
+ }
+ if (ai == this->aiocb_list_max_size_)
+ // Nothing completed.
+ return 0;
+
+ // Get the values for the completed aio.
+ size_t bytes_transferred = aiocb_list_[ai]->aio_nbytes;
+ void *completion_key = (void *)aiocb_list_[ai]->aio_sigevent.sigev_value.sival_ptr;
+ ACE_Asynch_Result *asynch_result = this->result_list_[ai];
+
+ // Invalidate entry in the aiocb list.
+ delete this->aiocb_list_[ai];
+ this->aiocb_list_[ai] = 0;
+ this->aiocb_list_cur_size_--;
+ this->result_list_[ai] = 0;
+
+ // Call the application code.
+ this->application_specific_code (asynch_result,
+ bytes_transferred,
+ ACE_TRUE,
+ completion_key,
+ 0);
+
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
+ ACE_OVERLAPPED *overlapped = 0;
u_long bytes_transferred = 0;
u_long completion_key = 0;
@@ -556,6 +669,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds)
errno);
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
void
@@ -583,6 +697,10 @@ ACE_Proactor::application_specific_code (ACE_Asynch_Result *asynch_result,
int
ACE_Proactor::post_completion (ACE_Asynch_Result *result)
{
+#if defined (ACE_HAS_AIO_CALLS)
+ ACE_UNUSED_ARG (result);
+ return 0;
+#else /* ACE_HAS_AIO_CALLS */
// Grab the event associated with the Proactor
HANDLE handle = this->get_handle ();
@@ -603,6 +721,7 @@ ACE_Proactor::post_completion (ACE_Asynch_Result *result)
}
return 0;
+#endif /* ACE_HAS_AIO_CALLS */
}
int
@@ -684,6 +803,81 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred,
this->handler_.handle_time_out (this->time_, this->act ());
}
+int
+ACE_Proactor::insert_to_aiocb_list (aiocb *aiocb_ptr,
+ ACE_Asynch_Result *result)
+{
+ // Is there any place?
+ if (this->aiocb_list_cur_size_ >= this->aiocb_list_max_size_)
+ return -1;
+
+ // Find the first free slot.
+ size_t ai;
+ for (ai = 0;
+ ai < this->aiocb_list_max_size_;
+ ai++)
+ if (this->aiocb_list_ [ai] == 0)
+ break;
+
+ if (ai == this->aiocb_list_max_size_)
+ return -1;
+
+ // Store the pointers.
+ this->aiocb_list_ [ai] = aiocb_ptr;
+ this->result_list_ [ai] = result;
+ this->aiocb_list_cur_size_ ++;
+ return 0;
+}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Timer_Queue_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Queue_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_List_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Node_T<ACE_Handler *>;
+template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>;
+template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >;
+template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>;
+template class ACE_Timer_Heap_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Heap_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\
+ ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
#else /* ACE_WIN32 */
ACE_Proactor *
@@ -731,4 +925,4 @@ ACE_Proactor::event_loop_done (void)
{
return sig_atomic_t (1);
}
-#endif /* ACE_WIN32 */
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/
diff --git a/ace/Proactor.h b/ace/Proactor.h
index f672335f37f..cfd3d555922 100644
--- a/ace/Proactor.h
+++ b/ace/Proactor.h
@@ -5,19 +5,20 @@
//
// = LIBRARY
// ace
-//
+//
// = FILENAME
// Proactor.h
//
// = AUTHOR
// Irfan Pyarali (irfan@cs.wustl.edu)
// Tim Harrison (harrison@cs.wustl.edu)
-//
+//
// ============================================================================
#if !defined (ACE_PROACTOR_H)
#define ACE_PROACTOR_H
+#include "ace/OS.h"
#include "ace/Asynch_IO.h"
#include "ace/Thread_Manager.h"
#include "ace/Event_Handler.h"
@@ -26,9 +27,13 @@
#include "ace/Timer_List.h"
#include "ace/Timer_Heap.h"
#include "ace/Timer_Wheel.h"
+#include "ace/Free_List.h"
-#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)
-// This only works on Win32 platforms
+
+#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \
+ (defined (ACE_HAS_AIO_CALLS))
+// This only works on Win32 platforms and on Unix platforms supporting
+// aio calls.
// Forward declarations.
class ACE_Asynch_Result;
@@ -37,7 +42,7 @@ class ACE_Proactor;
class ACE_Export ACE_Proactor_Handle_Timeout_Upcall
{
- // = TITLE
+ // = TITLE
// Functor for Timer_Queues.
//
// = DESCRIPTION
@@ -48,10 +53,10 @@ public:
// Proactor has special privileges
// Access needed to: proactor ()
- typedef ACE_Timer_Queue_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Queue_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> TIMER_QUEUE;
-
+
ACE_Proactor_Handle_Timeout_Upcall (void);
// Constructor
@@ -60,7 +65,7 @@ public:
const void *arg,
const ACE_Time_Value &cur_time);
// This method is called when the timer expires
-
+
int cancellation (TIMER_QUEUE &timer_queue,
ACE_Handler *handler);
// This method is called when the timer is canceled
@@ -83,7 +88,7 @@ class ACE_Export ACE_Proactor : public ACE_Event_Handler
{
// = TITLE
// A Proactor for asynchronous I/O events.
- //
+ //
// = DESCRIPTION
// A manager for the I/O completion port.
public:
@@ -91,47 +96,47 @@ public:
// Timer Handler has special privileges because
// Access needed to: thr_mgr_
- friend class ACE_Proactor_Handle_Timeout_Upcall;
+ friend class ACE_Proactor_Handle_Timeout_Upcall;
// Access needed to: Asynch_Timer, and completion_port_
// = Here are the typedefs that the <ACE_Proactor> uses.
- typedef ACE_Timer_Queue_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Queue_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_Queue;
- typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_Queue_Iterator;
- typedef ACE_Timer_List_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_List_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_List;
- typedef ACE_Timer_List_Iterator_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_List_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_List_Iterator;
- typedef ACE_Timer_Heap_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
- ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap;
- typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Heap_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
+ ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap;
+ typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap_Iterator;
- typedef ACE_Timer_Wheel_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Wheel_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_Wheel;
- typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
- ACE_Proactor_Handle_Timeout_Upcall,
+ typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
+ ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX> Timer_Wheel_Iterator;
- ACE_Proactor (size_t number_of_threads = 0,
+ ACE_Proactor (size_t number_of_threads = 0,
Timer_Queue *tq = 0,
int used_with_reactor_event_loop = 0);
// A do nothing constructor.
-
+
virtual ~ACE_Proactor (void);
// Virtual destruction.
-
+
static ACE_Proactor *instance (size_t threads = 0);
// Get pointer to a process-wide <ACE_Proactor>. <threads> should
// be part of another method. It's only here because I'm just a
@@ -163,13 +168,14 @@ public:
virtual int close (void);
// Close the IO completion port
-
- virtual int register_handle (ACE_HANDLE handle,
+
+ virtual int register_handle (ACE_HANDLE handle,
const void *completion_key);
- // This method adds the <handle> to the I/O completion port
-
- // = Timer management.
- virtual long schedule_timer (ACE_Handler &handler,
+ // This method adds the <handle> to the I/O completion port. This
+ // function is a no-op function for Unix systems.
+
+ // = Timer management.
+ virtual long schedule_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &time);
// Schedule a <handler> that will expire after <time>. If it
@@ -182,14 +188,14 @@ public:
// with accidentally deleting the wrong timer. Returns -1 on
// failure (which is guaranteed never to be a valid <timer_id>.
- virtual long schedule_repeating_timer (ACE_Handler &handler,
+ virtual long schedule_repeating_timer (ACE_Handler &handler,
const void *act,
- const ACE_Time_Value &interval);
-
+ const ACE_Time_Value &interval);
+
// Same as above except <interval> it is used to reschedule the
// <handler> automatically.
-
- virtual long schedule_timer (ACE_Handler &handler,
+
+ virtual long schedule_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &time,
const ACE_Time_Value &interval);
@@ -201,7 +207,7 @@ public:
// Cancel all timers associated with this <handler>. Returns number
// of timers cancelled.
- virtual int cancel_timer (long timer_id,
+ virtual int cancel_timer (long timer_id,
const void **act = 0,
int dont_call_handle_close = 1);
// Cancel the single <ACE_Handler> that matches the <timer_id> value
@@ -215,9 +221,13 @@ public:
virtual int handle_events (ACE_Time_Value &wait_time);
// Dispatch a single set of events. If <wait_time> elapses before
// any events occur, return.
+ // Return 0 on success, non-zero (-1) on timeouts/errors and errno
+ // is set accordingly.
virtual int handle_events (void);
// Block indefinitely until at least one event is dispatched.
+ // Return 0 on success, non-zero (-1) on timeouts/errors and errno
+ // is set accordingly.
virtual int post_completion (ACE_Asynch_Result *result);
// Post a result to the completion port of the Proactor. If errors
@@ -243,9 +253,21 @@ public:
virtual ACE_HANDLE get_handle (void) const;
// Get the event handle.
-
+
+
+ int insert_to_aiocb_list (aiocb *aiocb_ptr, ACE_Asynch_Result *result);
+ // @@
+ // This call is for Unix aio_ calls.
+ // This method is used by ACE_Asynch_Operation to store some
+ // information with the Proactor.
+ // Inserting this aiocb_ptr to the array so that aio_return and
+ // aio_error can make use of that. Inserting result so that we can
+ // call the application back through complete.
+ // @@ Can array be full? That means, the aio issue is successful,
+ // but there are already AIO_LIST_AIO_MAX of calls pending. I will
+ // have to go for something other than arrays then.
+
protected:
-
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
// Called when object is signaled by OS (either via UNIX signals or
// when a Win32 object becomes signaled).
@@ -261,11 +283,11 @@ protected:
u_long error);
// Protect against structured exceptions caused by user code when
// dispatching handles
-
+
virtual int handle_events (unsigned long milli_seconds);
// Dispatch a single set of events. If <milli_seconds> elapses
// before any events occur, return.
-
+
class ACE_Export Asynch_Timer : protected ACE_Asynch_Result
{
// = TITLE
@@ -280,28 +302,52 @@ protected:
Asynch_Timer (ACE_Handler &handler,
const void *act,
- const ACE_Time_Value &tv,
+ const ACE_Time_Value &tv,
ACE_HANDLE event = ACE_INVALID_HANDLE);
-
+
protected:
virtual void complete (u_long bytes_transferred,
int success,
const void *completion_key,
- u_long error = 0);
- // This method calls the <handler>'s handle_timeout method
-
+ u_long error = 0);
+ // This method calls the <handler>'s handle_timeout method
+
ACE_Time_Value time_;
// Time value requested by caller
- };
-
+ };
+
+
+
+#if defined (ACE_HAS_AIO_CALLS)
+ // Let us have an array ot keep track of the all the aio's issued
+ // currently. My intuition is to limit the array size to Maximum
+ // Aios that can be issued thru' a lio_list call.
+ // @@ AIO_LISTIO_MAX is something else in LynxOS!!!
+#if defined (AIO_LISTIO_MAX)
+ aiocb *aiocb_list_ [AIO_LISTIO_MAX];
+ ACE_Asynch_Result *result_list_ [AIO_LISTIO_MAX];
+#else /* AIO_LISTIO_MAX */
+ // Minimum is 2.
+ struct aiocb *aiocb_list_ [2];
+ ACE_Asynch_Result *result_list_ [2];
+#endif /* AIO_LIST_AIO_MAX */
+
+ size_t aiocb_list_max_size_;
+ // To maintain the maximum size of the array (list).
+
+ size_t aiocb_list_cur_size_;
+ // To maintain the current size of the array (list).
+
+#else /* ACE_HAS_AIO_CALLS */
ACE_HANDLE completion_port_;
- // Handle for the completion port
-
+ // Handle for the completion port.
+#endif /* ACE_HAS_AIO_CALLS */
+
size_t number_of_threads_;
// This number is passed to the CreatIOCompletionPort() system call
Timer_Queue *timer_queue_;
- // Timer Queue
+ // Timer Queue
int delete_timer_queue_;
// Flag on whether to delete the timer queue
@@ -370,4 +416,3 @@ public:
#endif /* ACE_WIN32 */
#endif /* ACE_PROACTOR_H */
-
diff --git a/ace/config-sunos5.6.h b/ace/config-sunos5.6.h
index c3e09e14673..0f260e66578 100644
--- a/ace/config-sunos5.6.h
+++ b/ace/config-sunos5.6.h
@@ -27,4 +27,6 @@
// SunOS 5.6 does support sched_get_priority_{min,max}
#undef ACE_THR_PRI_FIFO_DEF
+// SunOS 5.6 has AIO calls.
+#define ACE_HAS_AIO_CALLS
#endif /* ACE_CONFIG_H */
diff --git a/examples/Reactor/Proactor/Makefile b/examples/Reactor/Proactor/Makefile
new file mode 100644
index 00000000000..4423a939c4f
--- /dev/null
+++ b/examples/Reactor/Proactor/Makefile
@@ -0,0 +1,40 @@
+#----------------------------------------------------------------------------
+# $Id$
+#
+# Makefile for aio calls test program.
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Local macros
+#----------------------------------------------------------------------------
+
+BIN = test_proactor_with_aio
+LSRC = $(addsuffix .cpp,$(BIN))
+VLDLIBS = $(LDLIBS:%=%$(VAR))
+BUILD = $(VBIN)
+
+#----------------------------------------------------------------------------
+# Include macros and targets
+#----------------------------------------------------------------------------
+
+include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU
+include $(ACE_ROOT)/include/makeinclude/macros.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.common.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU
+include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
+include $(ACE_ROOT)/include/makeinclude/platform_macros.GNU
+#----------------------------------------------------------------------------
+# Local targets
+#----------------------------------------------------------------------------
+
+#----------------------------------------------------------------------------
+# Dependencies
+#----------------------------------------------------------------------------
+
+# DO NOT DELETE THIS LINE -- g++dep uses it.
+# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
+
+
+
+# IF YOU PUT ANYTHING HERE IT WILL GO AWAY
diff --git a/examples/Reactor/Proactor/test_proactor_with_aio.cpp b/examples/Reactor/Proactor/test_proactor_with_aio.cpp
new file mode 100644
index 00000000000..26c09612a08
--- /dev/null
+++ b/examples/Reactor/Proactor/test_proactor_with_aio.cpp
@@ -0,0 +1,372 @@
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// examples
+//
+// = FILENAME
+// test_proactor_with_aio.cpp
+//
+// = DESCRIPTION
+// This program illustrates how the ACE_Proactor can be used to
+// implement an application that does various asynchronous
+// operations.
+//
+// = AUTHOR
+// Alexander Babu Arulanthu <alex@cs.wustl.edu>
+//
+// ============================================================================
+
+#include "ace/Service_Config.h"
+#include "ace/Proactor.h"
+#include "ace/Asynch_IO.h"
+#include "ace/Asynch_Acceptor.h"
+#include "ace/INET_Addr.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/SOCK_Acceptor.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/Get_Opt.h"
+#include "ace/streams.h"
+
+static char *file = "test_proactor.cpp";
+static char *dump_file = "aio_out.log";
+static int done = 0;
+
+class Sender : public ACE_Handler
+{
+ // = TITLE
+ // Sender
+ //
+ // = DESCRIPTION
+ // The class will be created by main(). After connecting to the
+ // host, this class will then read data from a file and send it
+ // to the network connection.
+public:
+ Sender (void);
+ // Constructor.
+
+ ~Sender (void);
+ // Destructor.
+
+ int open (const char *host,
+ u_short port);
+ // ACE_HANDLE handle (void) const;
+
+protected:
+
+#if 0
+ // These methods are called by the freamwork
+ virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
+ // This is called when asynchronous transmit files complete.
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+#endif /* 0 */
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the file complete.
+
+#if 0
+ virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
+ // This is called when asynchronous write to a file complete.
+#endif /* 0 */
+
+private:
+ //int transmit_file (void);
+
+ int initiate_read_file (void);
+ // Intiate read from the file.
+
+ //ACE_SOCK_Stream stream_;
+ // Network I/O handle
+
+ //ACE_Asynch_Write_Stream ws_;
+ // ws (write stream): for writing to the socket
+
+ ACE_Asynch_Read_File rf_;
+ // rf (read file): for reading from the file.
+
+ ACE_HANDLE input_file_;
+ // File to read from.
+
+ u_long file_offset_;
+ // Current file offset.
+
+ u_long file_size_;
+ // File size.
+
+ //ACE_Message_Block welcome_message_;
+ // Welcome message
+
+ //ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_;
+ // Header and trailer which goes with transmit_file
+
+ //int stream_write_done_;
+ //int transmit_file_done_;
+ // These flags help to determine when to close down the event loop
+};
+
+
+Sender::Sender (void)
+ : input_file_ (ACE_INVALID_HANDLE),
+ file_offset_ (0),
+ file_size_ (0)
+ //stream_write_done_ (0),
+ //transmit_file_done_ (0)
+{
+ // Moment of inspiration :-)
+ //static char *data = "Welcome to Irfan World! Irfan RULES here !!";
+ //this->welcome_message_.init (data, ACE_OS::strlen (data));
+ //this->welcome_message_.wr_ptr (ACE_OS::strlen (data));
+}
+
+Sender::~Sender (void)
+{
+ //this->stream_.close ();
+}
+
+#if 0
+ACE_HANDLE
+Sender::handle (void) const
+{
+ return this->stream_.get_handle ();
+}
+#endif /* 0 */
+
+int
+Sender::open (const char *host,
+ u_short port)
+{
+ // Initialize stuff.
+
+ // Open input file (in OVERLAPPED mode)
+ this->input_file_ = ACE_OS::open (file, GENERIC_READ | FILE_FLAG_OVERLAPPED);
+ if (this->input_file_ == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1);
+
+ // Find file size
+ this->file_size_ = ACE_OS::filesize (this->input_file_);
+ ACE_DEBUG ((LM_DEBUG, "Input file size :%d\n", this->file_size_));
+#if 0
+ // Connect to remote host
+ ACE_INET_Addr address (port, host);
+ ACE_SOCK_Connector connector;
+ if (connector.connect (this->stream_,
+ address) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Connector::connect"), -1);
+
+ // Open ACE_Asynch_Write_Stream
+ if (this->ws_.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open"), -1);
+#endif /* 0 */
+
+ // Open ACE_Asynch_Read_File
+ if (this->rf_.open (*this, this->input_file_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::open"), -1);
+
+ // Start an asynchronous transmit file
+ // if (this->transmit_file () == -1)
+ // return -1;
+
+ // Start an asynchronous read file
+ if (this->initiate_read_file () == -1)
+ return -1;
+
+ return 0;
+}
+
+#if 0
+int
+Sender::transmit_file (void)
+{
+ // Open file (in SEQUENTIAL_SCAN mode)
+ ACE_HANDLE file_handle = ACE_OS::open (file, GENERIC_READ | FILE_FLAG_SEQUENTIAL_SCAN);
+ if (file_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1);
+
+ // Open ACE_Asynch_Transmit_File
+ ACE_Asynch_Transmit_File tf;
+ if (tf.open (*this) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Transmit_File::open"), -1);
+
+ // Header and trailer data for the file
+ this->header_and_trailer_.header_and_trailer (&this->welcome_message_,
+ this->welcome_message_.length (),
+ &this->welcome_message_,
+ this->welcome_message_.length ());
+
+ // Starting position
+ cerr << "Staring position: " << ACE_OS::lseek (file_handle, 0L, SEEK_CUR) << endl;
+
+ // Send it
+ if (tf.transmit_file (file_handle,
+ &this->header_and_trailer_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Transmit_File::transmit_file"), -1);
+
+ return 0;
+}
+
+void
+Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_transmit_file called\n"));
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "socket", result.socket ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "file", result.file ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_per_send", result.bytes_per_send ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+ // Ending position
+ cerr << "Ending position: " << ACE_OS::lseek (result.file (), 0L, SEEK_CUR) << endl;
+
+ // Done with file
+ ACE_OS::close (result.file ());
+
+ this->transmit_file_done_ = 1;
+ if (this->stream_write_done_)
+ done = 1;
+}
+#endif /* 0 */
+
+int
+Sender::initiate_read_file (void)
+{
+ // Create Message_Block
+ ACE_Message_Block *mb = 0;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1);
+
+ // Inititiate an asynchronous read from the file
+ if (this->rf_.read (*mb,
+ mb->size () - 1,
+ this->file_offset_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1);
+
+ return 0;
+}
+
+void
+Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n"));
+
+ result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+
+
+ if (result.success ())
+ {
+ // Read successful: increment offset and write data to network.
+ this->file_offset_ += result.bytes_transferred ();
+
+ // @@ Just print out the buffer.
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+
+#if 0
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred ()) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"));
+ return;
+ }
+#endif /* 0 */
+
+ if (this->file_size_ > this->file_offset_)
+ {
+ // Start an asynchronous read file
+ if (initiate_read_file () == -1)
+ return;
+ }
+ else
+ done = 1;
+ }
+}
+
+#if 0
+void
+Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
+
+ // Reset pointers
+ result.message_block ().rd_ptr (-result.bytes_transferred ());
+
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
+ ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
+ ACE_DEBUG ((LM_DEBUG, "********************\n"));
+ ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
+
+ if (result.success ())
+ {
+ // Partial write to socket
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ // Reset pointers
+ result.message_block ().rd_ptr (result.bytes_transferred ());
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data) == -1)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write"));
+ return;
+ }
+ }
+ else if (!(this->file_size_ > this->file_offset_))
+ {
+ this->stream_write_done_ = 1;
+ if (this->transmit_file_done_)
+ done = 1;
+ }
+ }
+
+ // Release message block
+ result.message_block ().release ();
+}
+#endif /* 0 */
+
+int
+main (int argc, char *argv [])
+{
+ ACE_DEBUG ((LM_DEBUG, "test_proactor_with_aio\n"));
+
+ ACE_UNUSED_ARG (argc);
+ ACE_UNUSED_ARG (argv);
+ ACE_UNUSED_ARG (dump_file);
+
+ // Create a sender. It reads from a file.
+ Sender sender;
+
+ sender.open (0, 0);
+
+ int error = 0;
+ while (!error && !done)
+ // dispatch events
+ error = ACE_Proactor::instance ()->handle_events ();
+
+ return 0;
+}