diff options
-rw-r--r-- | ChangeLog-98b | 23 | ||||
-rw-r--r-- | ace/Asynch_IO.cpp | 311 | ||||
-rw-r--r-- | ace/Asynch_IO.h | 12 | ||||
-rw-r--r-- | ace/OS.h | 36 | ||||
-rw-r--r-- | ace/Proactor.cpp | 226 | ||||
-rw-r--r-- | ace/Proactor.h | 161 | ||||
-rw-r--r-- | ace/config-sunos5.6.h | 2 | ||||
-rw-r--r-- | examples/Reactor/Proactor/Makefile | 40 | ||||
-rw-r--r-- | examples/Reactor/Proactor/test_proactor_with_aio.cpp | 372 |
9 files changed, 984 insertions, 199 deletions
diff --git a/ChangeLog-98b b/ChangeLog-98b index ffeeef839c6..d71b13aebfa 100644 --- a/ChangeLog-98b +++ b/ChangeLog-98b @@ -1,12 +1,31 @@ +Thu Jun 11 1998 Alexander Babu Arulanthu <alex@merengue.cs.wustl.edu> + + * ace/OS.h: Added typedef's, #define's and #include's under the + definition ACE_HAS_AIO_CALLS for the proactor porting. + + * ace/config-sunos5.6.h: Defined ACE_HAS_AIO_CALLS for + solaris2.6. + + * ace/Asynch_IO.h: + * ace/Asynch_IO.cpp: Added aio call support for Asynch_Read_File. See + the code under #if defined (ACE_HAS_AIO_CALLS) + + * ace/Proactor.h: + * ace/Proactor.cpp: Added support for AIO calls in Proactor. See + the code under #if defined (ACE_HAS_AIO_CALLS) + + * examples/Reactor/Proactor/test_proactor_with_aio.cpp: The test + program for Proactor doing aio stuff. + Thu Jun 11 13:34:26 1998 Darrell Brunsch <brunsch@cs.wustl.edu> * ace/OS.cpp: Made change to uname to use a different structure - if certain Borland compilers are being used. Thanks to + if certain Borland compilers are being used. Thanks to Valik Solorzano <valik@geodan.nl> for this fix. Thu Jun 11 11:15:26 1998 Carlos O'Ryan <coryan@cs.wustl.edu> - * ace/XtReactor.cpp: + * ace/XtReactor.cpp: The notify_handler_ open method requires an extra argument. Wed Jun 10 14:31:55 1998 David L. Levine <levine@cs.wustl.edu> diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index d0f76ff3325..a350e4ce366 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -4,13 +4,14 @@ #define ACE_BUILD_DLL #include "ace/Asynch_IO.h" -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \ + || (defined (ACE_HAS_AIO_CALLS)) // This only works on Win32 platforms #include "ace/Proactor.h" #include "ace/Message_Block.h" #include "ace/Service_Config.h" -#include "ace/Inet_Addr.h" +#include "ace/INET_Addr.h" #if !defined (__ACE_INLINE__) #include "ace/Asynch_IO.i" @@ -28,7 +29,7 @@ ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler, completion_key_ (0), error_ (0) { - // Set the OVERLAPPED structure + // Set the ACE_OVERLAPPED structure this->Internal = 0; this->InternalHigh = 0; this->Offset = offset; @@ -40,19 +41,19 @@ ACE_Asynch_Result::~ACE_Asynch_Result (void) { } -u_long +u_long ACE_Asynch_Result::bytes_transferred (void) const { return this->bytes_transferred_; } - + const void * ACE_Asynch_Result::act (void) const { return this->act_; } -int +int ACE_Asynch_Result::success (void) const { return this->success_; @@ -64,25 +65,25 @@ ACE_Asynch_Result::completion_key (void) const return this->completion_key_; } -u_long +u_long ACE_Asynch_Result::error (void) const { return this->error_; } -u_long +u_long ACE_Asynch_Result::offset (void) const { return this->Offset; } -u_long +u_long ACE_Asynch_Result::offset_high (void) const { return this->OffsetHigh; } -ACE_HANDLE +ACE_HANDLE ACE_Asynch_Result::event (void) const { return this->hEvent; @@ -96,7 +97,7 @@ ACE_Asynch_Operation::ACE_Asynch_Operation (void) { } -int +int ACE_Asynch_Operation::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, @@ -106,12 +107,15 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, this->handler_ = &handler; this->handle_ = handle; + // @@ + ACE_UNUSED_ARG (completion_key); + // Grab the handle from the <handler> if <handle> is invalid if (this->handle_ == ACE_INVALID_HANDLE) this->handle_ = this->handler_->handle (); - if (this->handle_ == ACE_INVALID_HANDLE) + if (this->handle_ == ACE_INVALID_HANDLE) return -1; - + // If no proactor was passed if (this->proactor_ == 0) { @@ -122,8 +126,13 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, this->proactor_ = ACE_Proactor::instance(); } +#if !defined (ACE_HAS_AIO_CALLS) // Register with the <proactor> return this->proactor_->register_handle (this->handle_, completion_key); +#else /* ACE_HAS_AIO_CALLS */ + // AIO stuff is present. So no registering business. + return 1; +#endif /* ACE_HAS_AIO_CALLS */ } int @@ -145,28 +154,65 @@ ACE_Asynch_Read_Stream::ACE_Asynch_Read_Stream (void) { } -int +int ACE_Asynch_Read_Stream::read (ACE_Message_Block &message_block, u_long bytes_to_read, const void *act) { // Create the Asynch_Result Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, message_block, bytes_to_read, act, - this->proactor_->get_handle ()), + this->proactor_->get_handle ()), -1); return this->shared_read (result); } -int +int ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result) { +#if defined (ACE_HAS_AIO_CALLS) + // Make a new AIOCB and issue aio_read, if queueing is possible + // store this with the Proactor, so that that can be used for + // aio_return6 and aio_error. + aiocb *aiocb_ptr; + ACE_NEW_RETURN (aiocb_ptr, aiocb, -1); + + // Setup AIOCB. + // @@ Priority always 0? + // @@ Signal no, always? + aiocb_ptr->aio_fildes = result->handle (); + aiocb_ptr->aio_offset = result->Offset; + aiocb_ptr->aio_buf = result->message_block ().wr_ptr (); + aiocb_ptr->aio_nbytes = result->bytes_to_read (); + aiocb_ptr->aio_reqprio = 0; + aiocb_ptr->aio_sigevent.sigev_notify = SIGEV_NONE; + //this->this->aiocb_.aio_sigevent.sigev_signo = SIGRTMAX; + aiocb_ptr->aio_sigevent.sigev_value.sival_ptr = + (void *) aiocb_ptr; + + // Fire off the aio write. + if (aio_read (aiocb_ptr) < 0) + // Queuing failed. + ACE_ERROR_RETURN ((LM_ERROR, + "Queueing faile for aio_read"), + -1); + + // Success. Store the aiocb_ptr with Proactor. + if (this->proactor_->insert_to_aiocb_list (aiocb_ptr, result) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Fatal error : aio_read queuing suceeded, no place at aiocb_list"), + -1); + + // Aio successfully issued and ptr stored. + return 1; + +#else /* ACE_HAS_AIO_CALLS */ u_long bytes_read; // Initiate the read @@ -197,6 +243,7 @@ ACE_Asynch_Read_Stream::shared_read (ACE_Asynch_Read_Stream::Result *result) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ReadFile")), -1); } +#endif /* ACE_HAS_AIO_CALLS */ } // ************************************************************ @@ -208,13 +255,13 @@ ACE_Asynch_Read_Stream::Result::Result (ACE_Handler &handler, const void* act, ACE_HANDLE event) : ACE_Asynch_Result (handler, act, event), - handle_ (handle), + bytes_to_read_ (bytes_to_read), message_block_ (message_block), - bytes_to_read_ (bytes_to_read) + handle_ (handle) { } -u_long +u_long ACE_Asynch_Read_Stream::Result::bytes_to_read (void) const { return this->bytes_to_read_; @@ -226,14 +273,14 @@ ACE_Asynch_Read_Stream::Result::message_block (void) const return this->message_block_; } -ACE_HANDLE +ACE_HANDLE ACE_Asynch_Read_Stream::Result::handle (void) const { return this->handle_; } -void -ACE_Asynch_Read_Stream::Result::complete (u_long bytes_transferred, +void +ACE_Asynch_Read_Stream::Result::complete(u_long bytes_transferred, int success, const void *completion_key, u_long error) @@ -257,27 +304,31 @@ ACE_Asynch_Write_Stream::ACE_Asynch_Write_Stream (void) { } -int +int ACE_Asynch_Write_Stream::write (ACE_Message_Block &message_block, u_long bytes_to_write, const void *act) { Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, message_block, bytes_to_write, act, - this->proactor_->get_handle ()), + this->proactor_->get_handle ()), -1); - + return this->shared_write (result); } - -int + +int ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (result); + return 0; +#else /* ACE_HAS_AIO_CALLS */ u_long bytes_written; // Initiate the write @@ -308,6 +359,7 @@ ACE_Asynch_Write_Stream::shared_write (ACE_Asynch_Write_Stream::Result *result) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WriteFile")), -1); } +#endif /* ACE_HAS_AIO_CALLS */ } // ************************************************************ @@ -319,13 +371,13 @@ ACE_Asynch_Write_Stream::Result::Result (ACE_Handler &handler, const void* act, ACE_HANDLE event) : ACE_Asynch_Result (handler, act, event), - handle_ (handle), + bytes_to_write_ (bytes_to_write), message_block_ (message_block), - bytes_to_write_ (bytes_to_write) + handle_ (handle) { } -u_long +u_long ACE_Asynch_Write_Stream::Result::bytes_to_write (void) const { return this->bytes_to_write_; @@ -337,13 +389,13 @@ ACE_Asynch_Write_Stream::Result::message_block (void) const return this->message_block_; } -ACE_HANDLE +ACE_HANDLE ACE_Asynch_Write_Stream::Result::handle (void) const { return this->handle_; } -void +void ACE_Asynch_Write_Stream::Result::complete (u_long bytes_transferred, int success, const void *completion_key, @@ -364,7 +416,7 @@ ACE_Asynch_Write_Stream::Result::complete (u_long bytes_transferred, // ************************************************************ -int +int ACE_Asynch_Read_File::read (ACE_Message_Block &message_block, u_long bytes_to_read, u_long offset, @@ -372,7 +424,7 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block, const void *act) { Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, message_block, @@ -380,9 +432,9 @@ ACE_Asynch_Read_File::read (ACE_Message_Block &message_block, act, offset, offset_high, - this->proactor_->get_handle ()), + this->proactor_->get_handle ()), -1); - + return this->shared_read (result); } @@ -402,13 +454,15 @@ ACE_Asynch_Read_File::Result::Result (ACE_Handler &handler, this->OffsetHigh = offset_high; } -void +void ACE_Asynch_Read_File::Result::complete (u_long bytes_transferred, int success, const void *completion_key, u_long error) -{ - // Copy the data which was returned by GetQueuedCompletionStatus +{ + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Read_File::Result::complete\n")); + + // Copy the data which was returned by GetQueuedCompletionStatus. this->bytes_transferred_ = bytes_transferred; this->success_ = success; this->completion_key_ = completion_key; @@ -417,13 +471,13 @@ ACE_Asynch_Read_File::Result::complete (u_long bytes_transferred, // Appropriately move the pointers in the message block. this->message_block_.wr_ptr (bytes_transferred); - // Callback: notify <handler> of completion + // Callback: notify <handler> of completion. this->handler_.handle_read_file (*this); } // ************************************************************ -int +int ACE_Asynch_Write_File::write (ACE_Message_Block &message_block, u_long bytes_to_write, u_long offset, @@ -431,7 +485,7 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block, const void *act) { Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, message_block, @@ -439,9 +493,9 @@ ACE_Asynch_Write_File::write (ACE_Message_Block &message_block, act, offset, offset_high, - this->proactor_->get_handle ()), + this->proactor_->get_handle ()), -1); - + return this->shared_write (result); } @@ -461,12 +515,12 @@ ACE_Asynch_Write_File::Result::Result (ACE_Handler &handler, this->OffsetHigh = offset_high; } -void +void ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred, int success, const void *completion_key, u_long error) -{ +{ // Copy the data which was returned by GetQueuedCompletionStatus this->bytes_transferred_ = bytes_transferred; this->success_ = success; @@ -485,13 +539,20 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred, ACE_Asynch_Accept::ACE_Asynch_Accept (void) { } - -int + +int ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, u_long bytes_to_read, ACE_HANDLE accept_handle, const void *act) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (message_block); + ACE_UNUSED_ARG (bytes_to_read); + ACE_UNUSED_ARG (accept_handle); + ACE_UNUSED_ARG (act); + return 0; +#else /* ACE_HAS_AIO_CALLS */ #if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) // Sanity check: make sure that enough space has been allocated by the caller. size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); @@ -501,7 +562,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, size_t space_needed = bytes_to_read + 2 * address_size; if (available_space < space_needed) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1); - + int close_accept_handle = 0; // If the <accept_handle> is invalid, we will create a new socket if (accept_handle == ACE_INVALID_HANDLE) @@ -512,21 +573,21 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, if (accept_handle == ACE_INVALID_HANDLE) ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ACE_OS::socket")), -1); else - // Remember to close the socket down if failures occur. + // Remember to close the socket down if failures occur. close_accept_handle = 1; } Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, accept_handle, message_block, bytes_to_read, act, - this->proactor_->get_handle ()), + this->proactor_->get_handle ()), -1); - + u_long bytes_read; // Initiate the accept @@ -564,14 +625,15 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("ReadFile")), -1); } -#else +#else /* ACE_HAS_WINNT4 ... */ ACE_NOTSUP_RETURN (-1); #endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */ +#endif /* ACE_HAS_AIO_CALLS */ } - + // ************************************************************ -u_long +u_long ACE_Asynch_Accept::Result::bytes_to_read (void) const { return this->bytes_to_read_; @@ -583,18 +645,18 @@ ACE_Asynch_Accept::Result::message_block (void) const return this->message_block_; } -ACE_HANDLE +ACE_HANDLE ACE_Asynch_Accept::Result::listen_handle (void) const { return this->listen_handle_; } - -ACE_HANDLE + +ACE_HANDLE ACE_Asynch_Accept::Result::accept_handle (void) const { return this->accept_handle_; } - + ACE_Asynch_Accept::Result::Result (ACE_Handler &handler, ACE_HANDLE listen_handle, ACE_HANDLE accept_handle, @@ -603,14 +665,14 @@ ACE_Asynch_Accept::Result::Result (ACE_Handler &handler, const void* act, ACE_HANDLE event) : ACE_Asynch_Result (handler, act, event), - listen_handle_ (listen_handle), - accept_handle_ (accept_handle), + bytes_to_read_ (bytes_to_read), message_block_ (message_block), - bytes_to_read_ (bytes_to_read) + listen_handle_ (listen_handle), + accept_handle_ (accept_handle) { -} - -void +} + +void ACE_Asynch_Accept::Result::complete (u_long bytes_transferred, int success, const void *completion_key, @@ -628,26 +690,38 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred, // Callback: notify <handler> of completion this->handler_.handle_accept (*this); } - + // ************************************************************ ACE_Asynch_Transmit_File::ACE_Asynch_Transmit_File (void) { } - -int + +int ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, Header_And_Trailer *header_and_trailer, - u_long bytes_to_write, + u_long bytes_to_write, u_long offset, u_long offset_high, u_long bytes_per_send, u_long flags, const void *act) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (file); + ACE_UNUSED_ARG (header_and_trailer); + ACE_UNUSED_ARG (bytes_to_write); + ACE_UNUSED_ARG (offset); + ACE_UNUSED_ARG (offset_high); + ACE_UNUSED_ARG (bytes_per_send); + ACE_UNUSED_ARG (flags); + ACE_UNUSED_ARG (act); + + return 0; +#else /* ACE_HAS_AIO_CALLS */ #if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) Result *result = 0; - ACE_NEW_RETURN (result, + ACE_NEW_RETURN (result, Result (*this->handler_, this->handle_, file, @@ -660,11 +734,11 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, act, this->proactor_->get_handle ()), -1); - - LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0; + + ACE_LPTRANSMIT_FILE_BUFFERS transmit_buffers = 0; if (result->header_and_trailer () != 0) transmit_buffers = result->header_and_trailer ()->transmit_buffers (); - + // Initiate the transmit file int initiate_result = ::TransmitFile ((SOCKET) result->socket (), result->file (), @@ -694,12 +768,13 @@ ACE_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, delete result; ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("TransmitFile")), -1); - } -#else + } +#else /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */ ACE_NOTSUP_RETURN (-1); #endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) */ +#endif /* ACE_HAS_AIO_CALLS */ } - + // ************************************************************ ACE_HANDLE @@ -708,7 +783,7 @@ ACE_Asynch_Transmit_File::Result::socket (void) const return this->socket_; } -ACE_HANDLE +ACE_HANDLE ACE_Asynch_Transmit_File::Result::file (void) const { return this->file_; @@ -719,25 +794,25 @@ ACE_Asynch_Transmit_File::Result::header_and_trailer (void) const { return this->header_and_trailer_; } - -u_long + +u_long ACE_Asynch_Transmit_File::Result::bytes_to_write (void) const { return this->bytes_to_write_; } -u_long +u_long ACE_Asynch_Transmit_File::Result::bytes_per_send (void) const { return this->bytes_per_send_; } -u_long +u_long ACE_Asynch_Transmit_File::Result::flags (void) const { return this->flags_; } - + ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler, ACE_HANDLE socket, ACE_HANDLE file, @@ -759,7 +834,7 @@ ACE_Asynch_Transmit_File::Result::Result (ACE_Handler &handler, { } -void +void ACE_Asynch_Transmit_File::Result::complete (u_long bytes_transferred, int success, const void *completion_key, @@ -780,16 +855,16 @@ ACE_Asynch_Transmit_File::Result::complete (u_long bytes_transferred, ACE_Message_Block *header = this->header_and_trailer_->header (); if (header != 0) header->rd_ptr (this->header_and_trailer_->header_bytes ()); - + ACE_Message_Block *trailer = this->header_and_trailer_->trailer (); if (trailer != 0) - trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ()); + trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ()); } - */ + */ // Callback: notify <handler> of completion - this->handler_.handle_transmit_file (*this); -} + this->handler_.handle_transmit_file (*this); +} // ************************************************************ @@ -822,19 +897,19 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::header (void) const return this->header_; } -void +void ACE_Asynch_Transmit_File::Header_And_Trailer::header (ACE_Message_Block *message_block) { this->header_ = message_block; } -u_long +u_long ACE_Asynch_Transmit_File::Header_And_Trailer::header_bytes (void) const { return this->header_bytes_; } - -void + +void ACE_Asynch_Transmit_File::Header_And_Trailer::header_bytes (u_long bytes) { this->header_bytes_ = bytes; @@ -846,34 +921,35 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::trailer (void) const return this->trailer_; } -void +void ACE_Asynch_Transmit_File::Header_And_Trailer::trailer (ACE_Message_Block *message_block) { this->trailer_ = message_block; } -u_long +u_long ACE_Asynch_Transmit_File::Header_And_Trailer::trailer_bytes (void) const { return this->trailer_bytes_; } -void +void ACE_Asynch_Transmit_File::Header_And_Trailer::trailer_bytes (u_long bytes) { this->trailer_bytes_ = bytes; } -LPTRANSMIT_FILE_BUFFERS -ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void) +#if 0 +ACE_LPTRANSMIT_FILE_BUFFERS +ACE_Asynch_Transmit_Fie::Header_And_Trailer::transmit_buffers (void) { // If both are zero, return zero if (this->header_ == 0 && this->trailer_ == 0) return 0; - else + else { // Something is valid - + // If header is valid, set the fields if (this->header_ != 0) { @@ -884,7 +960,7 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void) { this->transmit_buffers_.Head = 0; this->transmit_buffers_.HeadLength = 0; - } + } // If trailer is valid, set the fields if (this->trailer_ != 0) @@ -896,12 +972,13 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void) { this->transmit_buffers_.Tail = 0; this->transmit_buffers_.TailLength = 0; - } + } // Return the transmit buffers return &this->transmit_buffers_; } } +#endif /* 0 */ // ************************************************************ @@ -925,19 +1002,19 @@ ACE_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) ACE_UNUSED_ARG (result); } -void +void ACE_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { ACE_UNUSED_ARG (result); } -void +void ACE_Handler::handle_accept (const ACE_Asynch_Accept::Result &result) { ACE_UNUSED_ARG (result); } -void +void ACE_Handler::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result) { ACE_UNUSED_ARG (result); @@ -949,14 +1026,14 @@ ACE_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) ACE_UNUSED_ARG (result); } -void +void ACE_Handler::handle_write_file (const ACE_Asynch_Write_File::Result &result) { ACE_UNUSED_ARG (result); } /* -void +void ACE_Handler::handle_notify (const ACE_Asynch_Notify::Result &result) { ACE_UNUSED_ARG (result); @@ -977,7 +1054,7 @@ ACE_Handler::proactor (void) return this->proactor_; } -void +void ACE_Handler::proactor (ACE_Proactor *p) { this->proactor_ = p; @@ -999,27 +1076,27 @@ ACE_Service_Handler::~ACE_Service_Handler (void) { } -void +void ACE_Service_Handler::addresses (const ACE_INET_Addr &remote_address, const ACE_INET_Addr &local_address) { - // Default behavior is to print out the addresses. + // Default behavior is to print out the addresses. ASYS_TCHAR local_address_buf[BUFSIZ], remote_address_buf[BUFSIZ]; if (local_address.addr_to_string (local_address_buf, sizeof local_address_buf) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("can't obtain local_address's address string"))); - + if (remote_address.addr_to_string (remote_address_buf, sizeof remote_address_buf) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("can't obtain remote_address's address string"))); - + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("On fd %d\n"), this->handle ())); ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("local address %s\n"), local_address_buf)); - ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("remote address %s\n"), remote_address_buf)); + ACE_DEBUG ((LM_DEBUG, ASYS_TEXT ("remote address %s\n"), remote_address_buf)); } void ACE_Service_Handler::open (ACE_HANDLE, ACE_Message_Block &) { -} +} -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/ diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h index 4a3a34aac99..ba9f8370e04 100644 --- a/ace/Asynch_IO.h +++ b/ace/Asynch_IO.h @@ -27,7 +27,8 @@ #include "ace/OS.h" -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + (defined (ACE_HAS_AIO_CALLS)) // Forward declarations class ACE_Proactor; @@ -35,7 +36,7 @@ class ACE_Handler; class ACE_Message_Block; class ACE_INET_Addr; -class ACE_Export ACE_Asynch_Result : public OVERLAPPED +class ACE_Export ACE_Asynch_Result : protected ACE_OVERLAPPED { // = TITLE // An abstract class which adds information to the OVERLAPPED @@ -724,7 +725,7 @@ public: void trailer_bytes (u_long bytes); // Size of the trailer data. - LPTRANSMIT_FILE_BUFFERS transmit_buffers (void); + ACE_LPTRANSMIT_FILE_BUFFERS transmit_buffers (void); // Conversion routine. protected: @@ -740,7 +741,7 @@ public: u_long trailer_bytes_; // Size of trailer data. - TRANSMIT_FILE_BUFFERS transmit_buffers_; + ACE_TRANSMIT_FILE_BUFFERS transmit_buffers_; // Target data structure. }; }; @@ -801,7 +802,6 @@ public: // Get the I/O handle used by this <handler>. This method will be // called by the ACE_Asynch_* classes when an ACE_INVALID_HANDLE is // passed to <open>. - protected: ACE_Proactor *proactor_; // The proactor associated with this handler. @@ -853,5 +853,5 @@ public: #include "ace/Asynch_IO.i" #endif /* __ACE_INLINE__ */ -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/ #endif /* ACE_ASYNCH_IO_H */ @@ -3084,6 +3084,10 @@ typedef void (*__sighandler_t)(int); // keep Signal compilation happy # include /**/ <unistd.h> # endif /* ACE_LACKS_UNISTD_H */ +# if defined (ACE_HAS_AIO_CALLS) +# include /**/ <aio.h> +# endif /* ACE_HAS_AIO_CALLS */ + # if !defined (ACE_LACKS_PARAM_H) # include /**/ <sys/param.h> # endif /* ACE_LACKS_PARAM_H */ @@ -5774,4 +5778,36 @@ ACE_Auto_Basic_Array_Ptr<char> (ACE_WString (WIDE_STRING).char_rep ()).get () # define ASYS_WIDE_STRING(ASCII_STRING) ASCII_STRING #endif /* ACE_HAS_MOSTLY_UNICODE_APIS */ +#if defined (ACE_WIN32) +typedef TRANSMIT_FILE_BUFFERS ACE_TRANSMIT_FILE_BUFFERS; +typedef LPTRANSMIT_FILE_BUFFERS ACE_LPTRANSMIT_FILE_BUFFERS; +typedef PTRANSMIT_FILE_BUFFERS ACE_PTRANSMIT_FILE_BUFFERS; + +#define ACE_INFINITE INFINITE +#define ACE_STATUS_TIMEOUT STATUS_TIMEOUT +#define ACE_WAIT_FAILED WAIT_FAILED +#define ACE_WAIT_TIMEOUT WAIT_TIMEOUT + +#define ACE_FALSE FALSE +#define ACE_TRUE TRUE +#else /* ACE_WIN32 */ +struct ACE_TRANSMIT_FILE_BUFFERS +{ + void *Head; + size_t HeadLength; + void *Tail; + size_t TailLength; +}; +typedef ACE_TRANSMIT_FILE_BUFFERS* ACE_PTRANSMIT_FILE_BUFFERS; +typedef ACE_TRANSMIT_FILE_BUFFERS* ACE_LPTRANSMIT_FILE_BUFFERS; + +#define ACE_INFINITE LONG_MAX +#define ACE_STATUS_TIMEOUT LONG_MAX +#define ACE_WAIT_FAILED LONG_MAX +#define ACE_WAIT_TIMEOUT LONG_MAX + +#define ACE_TRUE 1 +#define ACE_FALSE 0 +#endif /* ACE_WIN32 */ + #endif /* ACE_OS_H */ diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index ae41e67d811..52553a5a6f8 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -4,8 +4,10 @@ #define ACE_BUILD_DLL #include "ace/Proactor.h" -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) -// This only works on Win32 platforms +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) \ + || (defined (ACE_HAS_AIO_CALLS)) +// This only works on Win32 platforms and on Unix platforms with aio +// calls. #include "ace/Task_T.h" #include "ace/Log_Msg.h" @@ -89,13 +91,16 @@ ACE_Proactor_Timer_Handler::~ACE_Proactor_Timer_Handler (void) int ACE_Proactor_Timer_Handler::svc (void) { +#if defined (ACE_HAS_AIO_CALLS) + return 0; +#else /* ACE_HAS_AIO_CALLS */ u_long time; ACE_Time_Value absolute_time; for (; !this->shutting_down_;) { // default value - time = INFINITE; + time = ACE_INFINITE; // If the timer queue is not empty if (!this->proactor_.timer_queue ()->is_empty ()) @@ -118,17 +123,18 @@ ACE_Proactor_Timer_Handler::svc (void) time); switch (result) { - case WAIT_TIMEOUT: + case ACE_WAIT_TIMEOUT: // timeout: expire timers this->proactor_.timer_queue ()->expire (); break; - case WAIT_FAILED: + case ACE_WAIT_FAILED: // error ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("WaitForSingleObject")), -1); } } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall (void) @@ -205,13 +211,36 @@ ACE_Proactor_Handle_Timeout_Upcall::proactor (ACE_Proactor &proactor) ACE_Proactor::ACE_Proactor (size_t number_of_threads, Timer_Queue *tq, int used_with_reactor_event_loop) - : completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! - number_of_threads_ (number_of_threads), - timer_queue_ (0), - delete_timer_queue_ (0), - timer_handler_ (0), - used_with_reactor_event_loop_ (used_with_reactor_event_loop) -{ + : +#if defined (ACE_HAS_AIO_CALLS) + #if defined (AIO_LISTIO_MAX) + aiocb_list_max_size_ (AIO_LISTIO_MAX), + #else /* AIO_LISTIO_MAX */ + aiocb_list_max_size_ (2), + #endif /* AIO_LISTIO_MAX */ + + aiocb_list_cur_size_ (0), +#else /* ACE_HAS_AIO_CALLS */ + completion_port_ (0), // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE!!!! +#endif /* ACE_HAS_AIO_CALLS */ + + number_of_threads_ (number_of_threads), + timer_queue_ (0), + delete_timer_queue_ (0), + timer_handler_ (0), + used_with_reactor_event_loop_ (used_with_reactor_event_loop) +{ +#if defined (ACE_HAS_AIO_CALLS) + // Init the array. + for (size_t ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + { + aiocb_list_ [ai] = 0; + result_list_ [ai] = 0; + } + ACE_UNUSED_ARG (tq); +#else /* ACE_HAS_AIO_CALLS */ // create the completion port this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE, this->completion_port_, @@ -229,7 +258,7 @@ ACE_Proactor::ACE_Proactor (size_t number_of_threads, // activate <timer_handler> if (this->timer_handler_->activate (THR_NEW_LWP | THR_DETACHED) == -1) ACE_ERROR ((LM_ERROR, ASYS_TEXT ("%p Could not create thread\n"), ASYS_TEXT ("Task::activate"))); - +#endif /* ACE_HAS_AIO_CALLS */ } ACE_Proactor * @@ -350,6 +379,9 @@ ACE_Proactor::~ACE_Proactor (void) int ACE_Proactor::close (void) { +#if defined (ACE_HAS_AIO_CALLS) + return 0; +#else /* ACE_HAS_AIO_CALLS */ // Take care of the timer handler if (this->timer_handler_) { @@ -374,12 +406,18 @@ ACE_Proactor::close (void) } else return 0; +#endif /* ACE_HAS_AIO_CALLS */ } int ACE_Proactor::register_handle (ACE_HANDLE handle, const void *completion_key) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (handle); + ACE_UNUSED_ARG (completion_key); + return 0; +#else /* ACE_HAS_AIO_CALLS */ // No locking is needed here as no state changes ACE_HANDLE cp = ::CreateIoCompletionPort (handle, this->completion_port_, @@ -394,6 +432,7 @@ ACE_Proactor::register_handle (ACE_HANDLE handle, ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("%p\n"), ASYS_TEXT ("CreateIoCompletionPort")), -1); } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } long @@ -491,15 +530,22 @@ ACE_Proactor::handle_close (ACE_HANDLE handle, return this->close (); } + +// @@ get_handle () implementation. ACE_HANDLE ACE_Proactor::get_handle (void) const { +#if defined (ACE_HAS_AIO_CALLS) + return ACE_INVALID_HANDLE; +#else /* ACE_HAS_AIO_CALLS */ if (this->used_with_reactor_event_loop_) return this->event_.handle (); else return 0; +#endif /* ACE_HAS_AIO_CALLS */ } + int ACE_Proactor::handle_events (ACE_Time_Value &wait_time) { @@ -511,13 +557,80 @@ ACE_Proactor::handle_events (ACE_Time_Value &wait_time) int ACE_Proactor::handle_events (void) { - return this->handle_events (INFINITE); + return this->handle_events (ACE_INFINITE); } int ACE_Proactor::handle_events (unsigned long milli_seconds) { - OVERLAPPED *overlapped = 0; +#if defined (ACE_HAS_AIO_CALLS) + // Is there any entries in the list. + if (this->aiocb_list_cur_size_ == 0) + { + ACE_DEBUG ((LM_DEBUG, "No AIO pending")); + return 0; + } + + // Wait for asynch operation to complete. + timespec timeout; + timeout.tv_sec = milli_seconds; + timeout.tv_nsec = 0; + if (aio_suspend (this->aiocb_list_, + this->aiocb_list_max_size_, + &timeout) < 0) + // If failure is coz of timeout, then return *0* but set errno + // appropriately. This is what the Win proactor does. + if (errno == EINTR) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):aio_suspend"), + 0); + else + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):aio_suspend"), + 0); + + // Check which aio has finished. + size_t ai; + for (ai = 0; ai < this->aiocb_list_max_size_; ai++) + // Analyze error and return values. + if (aio_error (aiocb_list_ [ai]) != EINPROGRESS) + { + if (aio_return (aiocb_list_ [ai]) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "(%p):AIO failed"), + -1); + else + { + ACE_DEBUG ((LM_DEBUG, "An aio has finished\n")); + // This AIO is done. + break; + } + } + if (ai == this->aiocb_list_max_size_) + // Nothing completed. + return 0; + + // Get the values for the completed aio. + size_t bytes_transferred = aiocb_list_[ai]->aio_nbytes; + void *completion_key = (void *)aiocb_list_[ai]->aio_sigevent.sigev_value.sival_ptr; + ACE_Asynch_Result *asynch_result = this->result_list_[ai]; + + // Invalidate entry in the aiocb list. + delete this->aiocb_list_[ai]; + this->aiocb_list_[ai] = 0; + this->aiocb_list_cur_size_--; + this->result_list_[ai] = 0; + + // Call the application code. + this->application_specific_code (asynch_result, + bytes_transferred, + ACE_TRUE, + completion_key, + 0); + + return 0; +#else /* ACE_HAS_AIO_CALLS */ + ACE_OVERLAPPED *overlapped = 0; u_long bytes_transferred = 0; u_long completion_key = 0; @@ -556,6 +669,7 @@ ACE_Proactor::handle_events (unsigned long milli_seconds) errno); } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } void @@ -583,6 +697,10 @@ ACE_Proactor::application_specific_code (ACE_Asynch_Result *asynch_result, int ACE_Proactor::post_completion (ACE_Asynch_Result *result) { +#if defined (ACE_HAS_AIO_CALLS) + ACE_UNUSED_ARG (result); + return 0; +#else /* ACE_HAS_AIO_CALLS */ // Grab the event associated with the Proactor HANDLE handle = this->get_handle (); @@ -603,6 +721,7 @@ ACE_Proactor::post_completion (ACE_Asynch_Result *result) } return 0; +#endif /* ACE_HAS_AIO_CALLS */ } int @@ -684,6 +803,81 @@ ACE_Proactor::Asynch_Timer::complete (u_long bytes_transferred, this->handler_.handle_time_out (this->time_, this->act ()); } +int +ACE_Proactor::insert_to_aiocb_list (aiocb *aiocb_ptr, + ACE_Asynch_Result *result) +{ + // Is there any place? + if (this->aiocb_list_cur_size_ >= this->aiocb_list_max_size_) + return -1; + + // Find the first free slot. + size_t ai; + for (ai = 0; + ai < this->aiocb_list_max_size_; + ai++) + if (this->aiocb_list_ [ai] == 0) + break; + + if (ai == this->aiocb_list_max_size_) + return -1; + + // Store the pointers. + this->aiocb_list_ [ai] = aiocb_ptr; + this->result_list_ [ai] = result; + this->aiocb_list_cur_size_ ++; + return 0; +} +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Queue_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_List_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Node_T<ACE_Handler *>; +template class ACE_Unbounded_Set<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Unbounded_Set_Iterator<ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Node <ACE_Timer_Node_T<ACE_Handler *> *>; +template class ACE_Free_List<ACE_Timer_Node_T<ACE_Handler *> >; +template class ACE_Locked_Free_List<ACE_Timer_Node_T<ACE_Handler *>, ACE_Null_Mutex>; +template class ACE_Timer_Heap_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Heap_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +template class ACE_Timer_Wheel_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Timer_Queue_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Queue_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_List_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Heap_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#pragma instantiate ACE_Timer_Wheel_Iterator_T<ACE_Handler *,\ + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + #else /* ACE_WIN32 */ ACE_Proactor * @@ -731,4 +925,4 @@ ACE_Proactor::event_loop_done (void) { return sig_atomic_t (1); } -#endif /* ACE_WIN32 */ +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/ diff --git a/ace/Proactor.h b/ace/Proactor.h index f672335f37f..cfd3d555922 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -5,19 +5,20 @@ // // = LIBRARY // ace -// +// // = FILENAME // Proactor.h // // = AUTHOR // Irfan Pyarali (irfan@cs.wustl.edu) // Tim Harrison (harrison@cs.wustl.edu) -// +// // ============================================================================ #if !defined (ACE_PROACTOR_H) #define ACE_PROACTOR_H +#include "ace/OS.h" #include "ace/Asynch_IO.h" #include "ace/Thread_Manager.h" #include "ace/Event_Handler.h" @@ -26,9 +27,13 @@ #include "ace/Timer_List.h" #include "ace/Timer_Heap.h" #include "ace/Timer_Wheel.h" +#include "ace/Free_List.h" -#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) -// This only works on Win32 platforms + +#if (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE)) || \ + (defined (ACE_HAS_AIO_CALLS)) +// This only works on Win32 platforms and on Unix platforms supporting +// aio calls. // Forward declarations. class ACE_Asynch_Result; @@ -37,7 +42,7 @@ class ACE_Proactor; class ACE_Export ACE_Proactor_Handle_Timeout_Upcall { - // = TITLE + // = TITLE // Functor for Timer_Queues. // // = DESCRIPTION @@ -48,10 +53,10 @@ public: // Proactor has special privileges // Access needed to: proactor () - typedef ACE_Timer_Queue_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> TIMER_QUEUE; - + ACE_Proactor_Handle_Timeout_Upcall (void); // Constructor @@ -60,7 +65,7 @@ public: const void *arg, const ACE_Time_Value &cur_time); // This method is called when the timer expires - + int cancellation (TIMER_QUEUE &timer_queue, ACE_Handler *handler); // This method is called when the timer is canceled @@ -83,7 +88,7 @@ class ACE_Export ACE_Proactor : public ACE_Event_Handler { // = TITLE // A Proactor for asynchronous I/O events. - // + // // = DESCRIPTION // A manager for the I/O completion port. public: @@ -91,47 +96,47 @@ public: // Timer Handler has special privileges because // Access needed to: thr_mgr_ - friend class ACE_Proactor_Handle_Timeout_Upcall; + friend class ACE_Proactor_Handle_Timeout_Upcall; // Access needed to: Asynch_Timer, and completion_port_ // = Here are the typedefs that the <ACE_Proactor> uses. - typedef ACE_Timer_Queue_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Queue_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_Queue; - typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_Queue_Iterator; - typedef ACE_Timer_List_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_List_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_List; - typedef ACE_Timer_List_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_List_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_List_Iterator; - typedef ACE_Timer_Heap_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, - ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap; - typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Heap_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, + ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap; + typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_Heap_Iterator; - typedef ACE_Timer_Wheel_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Wheel_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_Wheel; - typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, - ACE_Proactor_Handle_Timeout_Upcall, + typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *, + ACE_Proactor_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX> Timer_Wheel_Iterator; - ACE_Proactor (size_t number_of_threads = 0, + ACE_Proactor (size_t number_of_threads = 0, Timer_Queue *tq = 0, int used_with_reactor_event_loop = 0); // A do nothing constructor. - + virtual ~ACE_Proactor (void); // Virtual destruction. - + static ACE_Proactor *instance (size_t threads = 0); // Get pointer to a process-wide <ACE_Proactor>. <threads> should // be part of another method. It's only here because I'm just a @@ -163,13 +168,14 @@ public: virtual int close (void); // Close the IO completion port - - virtual int register_handle (ACE_HANDLE handle, + + virtual int register_handle (ACE_HANDLE handle, const void *completion_key); - // This method adds the <handle> to the I/O completion port - - // = Timer management. - virtual long schedule_timer (ACE_Handler &handler, + // This method adds the <handle> to the I/O completion port. This + // function is a no-op function for Unix systems. + + // = Timer management. + virtual long schedule_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &time); // Schedule a <handler> that will expire after <time>. If it @@ -182,14 +188,14 @@ public: // with accidentally deleting the wrong timer. Returns -1 on // failure (which is guaranteed never to be a valid <timer_id>. - virtual long schedule_repeating_timer (ACE_Handler &handler, + virtual long schedule_repeating_timer (ACE_Handler &handler, const void *act, - const ACE_Time_Value &interval); - + const ACE_Time_Value &interval); + // Same as above except <interval> it is used to reschedule the // <handler> automatically. - - virtual long schedule_timer (ACE_Handler &handler, + + virtual long schedule_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &time, const ACE_Time_Value &interval); @@ -201,7 +207,7 @@ public: // Cancel all timers associated with this <handler>. Returns number // of timers cancelled. - virtual int cancel_timer (long timer_id, + virtual int cancel_timer (long timer_id, const void **act = 0, int dont_call_handle_close = 1); // Cancel the single <ACE_Handler> that matches the <timer_id> value @@ -215,9 +221,13 @@ public: virtual int handle_events (ACE_Time_Value &wait_time); // Dispatch a single set of events. If <wait_time> elapses before // any events occur, return. + // Return 0 on success, non-zero (-1) on timeouts/errors and errno + // is set accordingly. virtual int handle_events (void); // Block indefinitely until at least one event is dispatched. + // Return 0 on success, non-zero (-1) on timeouts/errors and errno + // is set accordingly. virtual int post_completion (ACE_Asynch_Result *result); // Post a result to the completion port of the Proactor. If errors @@ -243,9 +253,21 @@ public: virtual ACE_HANDLE get_handle (void) const; // Get the event handle. - + + + int insert_to_aiocb_list (aiocb *aiocb_ptr, ACE_Asynch_Result *result); + // @@ + // This call is for Unix aio_ calls. + // This method is used by ACE_Asynch_Operation to store some + // information with the Proactor. + // Inserting this aiocb_ptr to the array so that aio_return and + // aio_error can make use of that. Inserting result so that we can + // call the application back through complete. + // @@ Can array be full? That means, the aio issue is successful, + // but there are already AIO_LIST_AIO_MAX of calls pending. I will + // have to go for something other than arrays then. + protected: - virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); // Called when object is signaled by OS (either via UNIX signals or // when a Win32 object becomes signaled). @@ -261,11 +283,11 @@ protected: u_long error); // Protect against structured exceptions caused by user code when // dispatching handles - + virtual int handle_events (unsigned long milli_seconds); // Dispatch a single set of events. If <milli_seconds> elapses // before any events occur, return. - + class ACE_Export Asynch_Timer : protected ACE_Asynch_Result { // = TITLE @@ -280,28 +302,52 @@ protected: Asynch_Timer (ACE_Handler &handler, const void *act, - const ACE_Time_Value &tv, + const ACE_Time_Value &tv, ACE_HANDLE event = ACE_INVALID_HANDLE); - + protected: virtual void complete (u_long bytes_transferred, int success, const void *completion_key, - u_long error = 0); - // This method calls the <handler>'s handle_timeout method - + u_long error = 0); + // This method calls the <handler>'s handle_timeout method + ACE_Time_Value time_; // Time value requested by caller - }; - + }; + + + +#if defined (ACE_HAS_AIO_CALLS) + // Let us have an array ot keep track of the all the aio's issued + // currently. My intuition is to limit the array size to Maximum + // Aios that can be issued thru' a lio_list call. + // @@ AIO_LISTIO_MAX is something else in LynxOS!!! +#if defined (AIO_LISTIO_MAX) + aiocb *aiocb_list_ [AIO_LISTIO_MAX]; + ACE_Asynch_Result *result_list_ [AIO_LISTIO_MAX]; +#else /* AIO_LISTIO_MAX */ + // Minimum is 2. + struct aiocb *aiocb_list_ [2]; + ACE_Asynch_Result *result_list_ [2]; +#endif /* AIO_LIST_AIO_MAX */ + + size_t aiocb_list_max_size_; + // To maintain the maximum size of the array (list). + + size_t aiocb_list_cur_size_; + // To maintain the current size of the array (list). + +#else /* ACE_HAS_AIO_CALLS */ ACE_HANDLE completion_port_; - // Handle for the completion port - + // Handle for the completion port. +#endif /* ACE_HAS_AIO_CALLS */ + size_t number_of_threads_; // This number is passed to the CreatIOCompletionPort() system call Timer_Queue *timer_queue_; - // Timer Queue + // Timer Queue int delete_timer_queue_; // Flag on whether to delete the timer queue @@ -370,4 +416,3 @@ public: #endif /* ACE_WIN32 */ #endif /* ACE_PROACTOR_H */ - diff --git a/ace/config-sunos5.6.h b/ace/config-sunos5.6.h index c3e09e14673..0f260e66578 100644 --- a/ace/config-sunos5.6.h +++ b/ace/config-sunos5.6.h @@ -27,4 +27,6 @@ // SunOS 5.6 does support sched_get_priority_{min,max} #undef ACE_THR_PRI_FIFO_DEF +// SunOS 5.6 has AIO calls. +#define ACE_HAS_AIO_CALLS #endif /* ACE_CONFIG_H */ diff --git a/examples/Reactor/Proactor/Makefile b/examples/Reactor/Proactor/Makefile new file mode 100644 index 00000000000..4423a939c4f --- /dev/null +++ b/examples/Reactor/Proactor/Makefile @@ -0,0 +1,40 @@ +#---------------------------------------------------------------------------- +# $Id$ +# +# Makefile for aio calls test program. +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Local macros +#---------------------------------------------------------------------------- + +BIN = test_proactor_with_aio +LSRC = $(addsuffix .cpp,$(BIN)) +VLDLIBS = $(LDLIBS:%=%$(VAR)) +BUILD = $(VBIN) + +#---------------------------------------------------------------------------- +# Include macros and targets +#---------------------------------------------------------------------------- + +include $(ACE_ROOT)/include/makeinclude/wrapper_macros.GNU +include $(ACE_ROOT)/include/makeinclude/macros.GNU +include $(ACE_ROOT)/include/makeinclude/rules.common.GNU +include $(ACE_ROOT)/include/makeinclude/rules.nonested.GNU +include $(ACE_ROOT)/include/makeinclude/rules.bin.GNU +include $(ACE_ROOT)/include/makeinclude/rules.local.GNU +include $(ACE_ROOT)/include/makeinclude/platform_macros.GNU +#---------------------------------------------------------------------------- +# Local targets +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Dependencies +#---------------------------------------------------------------------------- + +# DO NOT DELETE THIS LINE -- g++dep uses it. +# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. + + + +# IF YOU PUT ANYTHING HERE IT WILL GO AWAY diff --git a/examples/Reactor/Proactor/test_proactor_with_aio.cpp b/examples/Reactor/Proactor/test_proactor_with_aio.cpp new file mode 100644 index 00000000000..26c09612a08 --- /dev/null +++ b/examples/Reactor/Proactor/test_proactor_with_aio.cpp @@ -0,0 +1,372 @@ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// examples +// +// = FILENAME +// test_proactor_with_aio.cpp +// +// = DESCRIPTION +// This program illustrates how the ACE_Proactor can be used to +// implement an application that does various asynchronous +// operations. +// +// = AUTHOR +// Alexander Babu Arulanthu <alex@cs.wustl.edu> +// +// ============================================================================ + +#include "ace/Service_Config.h" +#include "ace/Proactor.h" +#include "ace/Asynch_IO.h" +#include "ace/Asynch_Acceptor.h" +#include "ace/INET_Addr.h" +#include "ace/SOCK_Connector.h" +#include "ace/SOCK_Acceptor.h" +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" +#include "ace/Get_Opt.h" +#include "ace/streams.h" + +static char *file = "test_proactor.cpp"; +static char *dump_file = "aio_out.log"; +static int done = 0; + +class Sender : public ACE_Handler +{ + // = TITLE + // Sender + // + // = DESCRIPTION + // The class will be created by main(). After connecting to the + // host, this class will then read data from a file and send it + // to the network connection. +public: + Sender (void); + // Constructor. + + ~Sender (void); + // Destructor. + + int open (const char *host, + u_short port); + // ACE_HANDLE handle (void) const; + +protected: + +#if 0 + // These methods are called by the freamwork + virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result); + // This is called when asynchronous transmit files complete. + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete. +#endif /* 0 */ + + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + // This is called when asynchronous reads from the file complete. + +#if 0 + virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result); + // This is called when asynchronous write to a file complete. +#endif /* 0 */ + +private: + //int transmit_file (void); + + int initiate_read_file (void); + // Intiate read from the file. + + //ACE_SOCK_Stream stream_; + // Network I/O handle + + //ACE_Asynch_Write_Stream ws_; + // ws (write stream): for writing to the socket + + ACE_Asynch_Read_File rf_; + // rf (read file): for reading from the file. + + ACE_HANDLE input_file_; + // File to read from. + + u_long file_offset_; + // Current file offset. + + u_long file_size_; + // File size. + + //ACE_Message_Block welcome_message_; + // Welcome message + + //ACE_Asynch_Transmit_File::Header_And_Trailer header_and_trailer_; + // Header and trailer which goes with transmit_file + + //int stream_write_done_; + //int transmit_file_done_; + // These flags help to determine when to close down the event loop +}; + + +Sender::Sender (void) + : input_file_ (ACE_INVALID_HANDLE), + file_offset_ (0), + file_size_ (0) + //stream_write_done_ (0), + //transmit_file_done_ (0) +{ + // Moment of inspiration :-) + //static char *data = "Welcome to Irfan World! Irfan RULES here !!"; + //this->welcome_message_.init (data, ACE_OS::strlen (data)); + //this->welcome_message_.wr_ptr (ACE_OS::strlen (data)); +} + +Sender::~Sender (void) +{ + //this->stream_.close (); +} + +#if 0 +ACE_HANDLE +Sender::handle (void) const +{ + return this->stream_.get_handle (); +} +#endif /* 0 */ + +int +Sender::open (const char *host, + u_short port) +{ + // Initialize stuff. + + // Open input file (in OVERLAPPED mode) + this->input_file_ = ACE_OS::open (file, GENERIC_READ | FILE_FLAG_OVERLAPPED); + if (this->input_file_ == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1); + + // Find file size + this->file_size_ = ACE_OS::filesize (this->input_file_); + ACE_DEBUG ((LM_DEBUG, "Input file size :%d\n", this->file_size_)); +#if 0 + // Connect to remote host + ACE_INET_Addr address (port, host); + ACE_SOCK_Connector connector; + if (connector.connect (this->stream_, + address) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_SOCK_Connector::connect"), -1); + + // Open ACE_Asynch_Write_Stream + if (this->ws_.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::open"), -1); +#endif /* 0 */ + + // Open ACE_Asynch_Read_File + if (this->rf_.open (*this, this->input_file_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::open"), -1); + + // Start an asynchronous transmit file + // if (this->transmit_file () == -1) + // return -1; + + // Start an asynchronous read file + if (this->initiate_read_file () == -1) + return -1; + + return 0; +} + +#if 0 +int +Sender::transmit_file (void) +{ + // Open file (in SEQUENTIAL_SCAN mode) + ACE_HANDLE file_handle = ACE_OS::open (file, GENERIC_READ | FILE_FLAG_SEQUENTIAL_SCAN); + if (file_handle == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_OS::open"), -1); + + // Open ACE_Asynch_Transmit_File + ACE_Asynch_Transmit_File tf; + if (tf.open (*this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Transmit_File::open"), -1); + + // Header and trailer data for the file + this->header_and_trailer_.header_and_trailer (&this->welcome_message_, + this->welcome_message_.length (), + &this->welcome_message_, + this->welcome_message_.length ()); + + // Starting position + cerr << "Staring position: " << ACE_OS::lseek (file_handle, 0L, SEEK_CUR) << endl; + + // Send it + if (tf.transmit_file (file_handle, + &this->header_and_trailer_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Transmit_File::transmit_file"), -1); + + return 0; +} + +void +Sender::handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_transmit_file called\n")); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "socket", result.socket ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "file", result.file ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_per_send", result.bytes_per_send ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + // Ending position + cerr << "Ending position: " << ACE_OS::lseek (result.file (), 0L, SEEK_CUR) << endl; + + // Done with file + ACE_OS::close (result.file ()); + + this->transmit_file_done_ = 1; + if (this->stream_write_done_) + done = 1; +} +#endif /* 0 */ + +int +Sender::initiate_read_file (void) +{ + // Create Message_Block + ACE_Message_Block *mb = 0; + ACE_NEW_RETURN (mb, ACE_Message_Block (BUFSIZ + 1), -1); + + // Inititiate an asynchronous read from the file + if (this->rf_.read (*mb, + mb->size () - 1, + this->file_offset_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "ACE_Asynch_Read_File::read"), -1); + + return 0; +} + +void +Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_read_file called\n")); + + result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0'; + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + + + if (result.success ()) + { + // Read successful: increment offset and write data to network. + this->file_offset_ += result.bytes_transferred (); + + // @@ Just print out the buffer. + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + +#if 0 + if (this->ws_.write (result.message_block (), + result.bytes_transferred ()) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write")); + return; + } +#endif /* 0 */ + + if (this->file_size_ > this->file_offset_) + { + // Start an asynchronous read file + if (initiate_read_file () == -1) + return; + } + else + done = 1; + } +} + +#if 0 +void +Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n")); + + // Reset pointers + result.message_block ().rd_ptr (-result.bytes_transferred ()); + + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ())); + ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ())); + ACE_DEBUG ((LM_DEBUG, "********************\n")); + ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ())); + + if (result.success ()) + { + // Partial write to socket + int unsent_data = result.bytes_to_write () - result.bytes_transferred (); + if (unsent_data != 0) + { + // Reset pointers + result.message_block ().rd_ptr (result.bytes_transferred ()); + + // Duplicate the message block and retry remaining data + if (this->ws_.write (*result.message_block ().duplicate (), + unsent_data) == -1) + { + ACE_ERROR ((LM_ERROR, "%p\n", "ACE_Asynch_Write_Stream::write")); + return; + } + } + else if (!(this->file_size_ > this->file_offset_)) + { + this->stream_write_done_ = 1; + if (this->transmit_file_done_) + done = 1; + } + } + + // Release message block + result.message_block ().release (); +} +#endif /* 0 */ + +int +main (int argc, char *argv []) +{ + ACE_DEBUG ((LM_DEBUG, "test_proactor_with_aio\n")); + + ACE_UNUSED_ARG (argc); + ACE_UNUSED_ARG (argv); + ACE_UNUSED_ARG (dump_file); + + // Create a sender. It reads from a file. + Sender sender; + + sender.open (0, 0); + + int error = 0; + while (!error && !done) + // dispatch events + error = ACE_Proactor::instance ()->handle_events (); + + return 0; +} |