summaryrefslogtreecommitdiff
path: root/ace/POSIX_Asynch_IO.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-16 06:33:51 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-02-16 06:33:51 +0000
commitb9bfea61002ead754d36c0d389dbcdae1f3356fe (patch)
treea9528d7338a82c07700f18c89a4bff371915946a /ace/POSIX_Asynch_IO.cpp
parent8afaac21b14b5892151a4796d69734f07aff0219 (diff)
downloadATCD-b9bfea61002ead754d36c0d389dbcdae1f3356fe.tar.gz
Applied Bridge pattern to the POSIX implementation of the Proactor
code. ACE_POSIX_AIOCB_Proactor works fine on Solaris 2.6. ACE_POSIX_SIG_Proactor works on LynxOS. Take a look at the tests at the $(ACE_ROOT)/examples/Reactor/Proactor/ and the README.
Diffstat (limited to 'ace/POSIX_Asynch_IO.cpp')
-rw-r--r--ace/POSIX_Asynch_IO.cpp2475
1 files changed, 2475 insertions, 0 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
new file mode 100644
index 00000000000..826f56818d5
--- /dev/null
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -0,0 +1,2475 @@
+// $Id$
+
+#include "ace/POSIX_Asynch_IO.h"
+
+#if defined (ACE_HAS_AIO_CALLS)
+
+
+#include "ace/Proactor.h"
+#include "ace/Message_Block.h"
+#include "ace/Service_Config.h"
+#include "ace/INET_Addr.h"
+#include "ace/Task_T.h"
+#include "ace/POSIX_Proactor.h"
+
+#if !defined (__ACE_INLINE__)
+#include "ace/POSIX_Asynch_IO.i"
+#endif /* __ACE_INLINE__ */
+
+u_long
+ACE_POSIX_Asynch_Result::bytes_transferred (void) const
+{
+ return this->AIO_SYSRETURN;
+}
+
+const void *
+ACE_POSIX_Asynch_Result::act (void) const
+{
+ return this->act_;
+}
+
+int
+ACE_POSIX_Asynch_Result::success (void) const
+{
+ return this->success_;
+}
+
+const void *
+ACE_POSIX_Asynch_Result::completion_key (void) const
+{
+ return this->completion_key_;
+}
+
+u_long
+ACE_POSIX_Asynch_Result::error (void) const
+{
+ return this->AIO_SYSERRNO;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Result::event (void) const
+{
+ return ACE_INVALID_HANDLE;
+}
+
+u_long
+ACE_POSIX_Asynch_Result::offset (void) const
+{
+ return this->aio_offset;
+}
+
+u_long
+ACE_POSIX_Asynch_Result::offset_high (void) const
+{
+ //
+ // @@ Support aiocb64??
+ //
+ ACE_NOTSUP_RETURN (0);
+}
+
+int
+ACE_POSIX_Asynch_Result::priority (void) const
+{
+ return this->aio_reqprio;
+}
+
+ACE_POSIX_Asynch_Result::~ACE_POSIX_Asynch_Result (void)
+{
+}
+
+ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result (ACE_Handler &handler,
+ const void* act,
+ ACE_HANDLE event,
+ u_long offset,
+ u_long offset_high,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ aiocb (),
+ handler_ (handler),
+ act_ (act),
+ success_ (0),
+ completion_key_ (0)
+{
+ aio_offset = offset;
+ aio_reqprio = priority;
+
+ // Event is not used on POSIX.
+ ACE_UNUSED_ARG (event);
+
+ //
+ // @@ Support offset_high with aiocb64.
+ //
+ ACE_UNUSED_ARG (offset_high);
+
+ // Other fields in the <aiocb> will be initialized by the
+ // subclasses.
+}
+
+// ****************************************************************
+
+int
+ACE_POSIX_Asynch_Operation::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ this->proactor_ = proactor;
+ this->handler_ = &handler;
+ this->handle_ = handle;
+
+ // Grab the handle from the <handler> if <handle> is invalid
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ this->handle_ = this->handler_->handle ();
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ return -1;
+
+#if 0
+ // @@ If <proactor> is 0, let us not bother about getting this
+ // Proactor, we have already got the specific implementation
+ // Proactor.
+
+ // If no proactor was passed
+ if (this->proactor_ == 0)
+ {
+ // Grab the proactor from the <Service_Config> if
+ // <handler->proactor> is zero
+ this->proactor_ = this->handler_->proactor ();
+ if (this->proactor_ == 0)
+ this->proactor_ = ACE_Proactor::instance();
+ }
+#endif /* 0 */
+
+ // AIO stuff is present. So no registering.
+ ACE_UNUSED_ARG (completion_key);
+ return 0;
+}
+
+int
+ACE_POSIX_Asynch_Operation::cancel (void)
+{
+ // @@ Not implemented.
+ ACE_NOTSUP_RETURN (0);
+}
+
+ACE_Proactor *
+ACE_POSIX_Asynch_Operation::proactor (void) const
+{
+ return this->proactor_;
+}
+
+ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation (void)
+{
+}
+
+ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (void)
+ : ACE_Asynch_Operation_Impl (),
+ handler_ (0),
+ handle_ (ACE_INVALID_HANDLE)
+{
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Operation::ACE_POSIX_AIOCB_Asynch_Operation (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_POSIX_Asynch_Operation (),
+ posix_aiocb_proactor_ (posix_aiocb_proactor)
+{
+}
+
+ACE_POSIX_AIOCB_Asynch_Operation::~ACE_POSIX_AIOCB_Asynch_Operation (void)
+{
+}
+
+// If the ptr is o, just check whether there is any slot free and
+// return 0 if yes, else return -1. If a valid ptr is passed, keep it
+// in a free slot.
+int
+ACE_POSIX_AIOCB_Asynch_Operation::register_aio_with_proactor (aiocb *aiocb_ptr)
+{
+ return this->posix_aiocb_proactor_->register_aio_with_proactor (aiocb_ptr);
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Operation::ACE_POSIX_SIG_Asynch_Operation (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_POSIX_Asynch_Operation (),
+ posix_sig_proactor_ (posix_sig_proactor)
+{
+}
+
+ACE_POSIX_SIG_Asynch_Operation::~ACE_POSIX_SIG_Asynch_Operation (void)
+{
+}
+
+// *********************************************************************
+
+u_long
+ACE_POSIX_Asynch_Read_Stream_Result::bytes_to_read (void) const
+{
+ return this->aio_nbytes;
+}
+
+ACE_Message_Block &
+ACE_POSIX_Asynch_Read_Stream_Result::message_block (void) const
+{
+ return this->message_block_;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Read_Stream_Result::handle (void) const
+{
+ return this->aio_fildes;
+}
+
+ACE_POSIX_Asynch_Read_Stream_Result::ACE_POSIX_Asynch_Read_Stream_Result (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void* act,
+ ACE_HANDLE event,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Read_Stream_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority),
+ message_block_ (message_block)
+{
+ this->aio_fildes = handle;
+ this->aio_buf = message_block.wr_ptr ();
+ this->aio_nbytes = bytes_to_read;
+ ACE_UNUSED_ARG (event);
+}
+
+void
+ACE_POSIX_Asynch_Read_Stream_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // <bytes_transferred> is availble in the aiocb.
+ ACE_UNUSED_ARG (bytes_transferred);
+
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+
+ // <errno> is available in the aiocb.
+ ACE_UNUSED_ARG (error);
+
+ // Appropriately move the pointers in the message block.
+ this->message_block_.wr_ptr (bytes_transferred);
+
+ // Create the interface result class.
+ ACE_Asynch_Read_Stream::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_read_stream (result);
+}
+
+ACE_POSIX_Asynch_Read_Stream_Result::~ACE_POSIX_Asynch_Read_Stream_Result (void)
+{
+}
+
+// ************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Read_Stream::ACE_POSIX_AIOCB_Asynch_Read_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Read_Stream_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void *act,
+ int priority)
+{
+ // Create the Asynch_Result.
+ ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Read_Stream_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_read,
+ act,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ ssize_t return_val = this->shared_read (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
+}
+
+ACE_POSIX_AIOCB_Asynch_Read_Stream::~ACE_POSIX_AIOCB_Asynch_Read_Stream (void)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Result *result)
+{
+ // AIO_CONTROL_BLOCKS strategy.
+
+ // Store this <result> with the proactor.
+
+ // Make sure there is space in the aiocb list.
+ if (this->register_aio_with_proactor (0) == -1)
+ {
+ // @@ Set errno to EAGAIN so that applications will know this as
+ // a queueing failure and try again this aio_read it they want.
+ errno = EAGAIN;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Fatal error:%N:%l:%p\n",
+ "AIOContol Block Array is full!!!. Didnt issue the aio call"),
+ -1);
+ }
+
+ // Setup AIOCB.
+
+ // We are not making use of the RT signal queueing in this
+ // strategy.
+ result->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+ // Fire off the aio write.
+ if (aio_read (result) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Read_Stream: aio_read queueing failed"),
+ -1);
+
+ // <aio_read> successfully issued. Store the aiocb_ptr with
+ // proactor.
+ aiocb *aiocb_ptr = (aiocb *) result;
+ if (this->register_aio_with_proactor (aiocb_ptr) == -1)
+ // This shouldn't happen anyway, since we have already checked for
+ // availability of free slots.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Fatal error:%N:%l:%p\n",
+ "AIOContol Block Array is full!!!. Didnt issue the aio call"),
+ -1);
+
+ // <aio_read> successfully issued and ptr stored.
+ return 0;
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Read_Stream::ACE_POSIX_SIG_Asynch_Read_Stream (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Read_Stream_Impl (),
+ ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void *act,
+ int priority)
+{
+ // Create the Asynch_Result.
+ ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Read_Stream_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_read,
+ act,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ ssize_t return_val = this->shared_read (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
+}
+
+ACE_POSIX_SIG_Asynch_Read_Stream::~ACE_POSIX_SIG_Asynch_Read_Stream (void)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Result *result)
+{
+ // Setup AIOCB.
+
+ // We want queuing of RT signal to notify completion.
+ result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ result->aio_sigevent.sigev_signo = ACE_SIG_AIO;
+ result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
+
+
+ // Fire off the aio write.
+ if (aio_read (result) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%N%l:%p\n",
+ "Asynch_Read_Stream: aio_read queueing failed"),
+ -1);
+ return 0;
+}
+
+// ****************************************************************
+
+u_long
+ACE_POSIX_Asynch_Write_Stream_Result::bytes_to_write (void) const
+{
+ return this->aio_nbytes;
+}
+
+ACE_Message_Block &
+ACE_POSIX_Asynch_Write_Stream_Result::message_block (void) const
+{
+ return this->message_block_;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Write_Stream_Result::handle (void) const
+{
+ return this->aio_fildes;
+}
+
+ACE_POSIX_Asynch_Write_Stream_Result::ACE_POSIX_Asynch_Write_Stream_Result (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void* act,
+ ACE_HANDLE event,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Write_Stream_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority),
+ message_block_ (message_block)
+{
+ this->aio_fildes = handle;
+ this->aio_buf = message_block.rd_ptr ();
+ this->aio_nbytes = bytes_to_write;
+ ACE_UNUSED_ARG (event);
+}
+
+void
+ACE_POSIX_Asynch_Write_Stream_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Get all the data copied.
+
+ // <bytes_transferred> is availble in the aiocb.
+ ACE_UNUSED_ARG (bytes_transferred);
+
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+
+ // <errno> is available in the aiocb.
+ ACE_UNUSED_ARG (error);
+
+ // Appropriately move the pointers in the message block.
+ this->message_block_.rd_ptr (bytes_transferred);
+
+ // Create the interface result class.
+ ACE_Asynch_Write_Stream::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_write_stream (result);
+}
+
+ACE_POSIX_Asynch_Write_Stream_Result::~ACE_POSIX_Asynch_Write_Stream_Result (void)
+{
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Write_Stream::ACE_POSIX_AIOCB_Asynch_Write_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Write_Stream_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Write_Stream_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Write_Stream_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_write,
+ act,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ ssize_t return_val = this->shared_write (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
+}
+
+ACE_POSIX_AIOCB_Asynch_Write_Stream::~ACE_POSIX_AIOCB_Asynch_Write_Stream (void)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_Result *result)
+{
+ // AIO_CONTROL_BLOCKS strategy.
+
+ // Make sure there is space in the aiocb list.
+ if (this->register_aio_with_proactor (0) == -1)
+ {
+ // @@ Set errno to EAGAIN so that applications will know this as
+ // a queueing failure and try again this aio_read it they want.
+ errno = EAGAIN;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Fatal error:%N:%l:%p\n",
+ "AIOContol Block Array is full!!!. Didnt issue the aio call"),
+ -1);
+ }
+
+ // Setup AIOCB.
+ result->aio_sigevent.sigev_notify = SIGEV_NONE;
+
+ // Fire off the aio write.
+ if (aio_write (result) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Write_Stream: aio_write queueing failed"),
+ -1);
+
+ // Success. Store the aiocb_ptr with Proactor.
+ if (this->register_aio_with_proactor (result) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Fatal error:%N:%l:%p\n",
+ "AIOContol Block Array is full!!!"),
+ -1);
+
+ // Aio successfully issued.
+ return 0;
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Write_Stream::ACE_POSIX_SIG_Asynch_Write_Stream (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Write_Stream_Impl (),
+ ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Write_Stream_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Write_Stream_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_write,
+ act,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ ssize_t return_val = this->shared_write (result);
+ if (return_val == -1)
+ delete result;
+ return return_val;
+}
+
+ACE_POSIX_SIG_Asynch_Write_Stream::~ACE_POSIX_SIG_Asynch_Write_Stream (void)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_Result *result)
+{
+ // Setup AIOCB.
+
+ // We want queuing of RT signal to notify completion.
+ result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ result->aio_sigevent.sigev_signo = ACE_SIG_AIO;
+ result->aio_sigevent.sigev_value.sival_ptr = (void *) result;
+
+ // Fire off the aio write.
+ if (aio_write (result) == -1)
+ // Queueing failed.
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Write_Stream: aio_write queueing failed"),
+ -1);
+ return 0;
+}
+
+// *************************************************************
+
+ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void* act,
+ u_long offset,
+ u_long offset_high,
+ ACE_HANDLE event,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Read_Stream_Result_Impl (),
+ ACE_Asynch_Read_File_Result_Impl (),
+ ACE_POSIX_Asynch_Read_Stream_Result (handler,
+ handle,
+ message_block,
+ bytes_to_read,
+ act,
+ event,
+ priority)
+{
+ this->aio_offset = offset;
+ //
+ // @@ Use aiocb64??
+ //
+ ACE_UNUSED_ARG (offset_high);
+}
+
+void
+ACE_POSIX_Asynch_Read_File_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy all the data.
+
+ // <bytes_transferred> is availble in the aiocb.
+ ACE_UNUSED_ARG (bytes_transferred);
+
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+
+ // <errno> is available in the aiocb.
+ ACE_UNUSED_ARG (error);
+
+ // Appropriately move the pointers in the message block.
+ this->message_block_.wr_ptr (bytes_transferred);
+
+ // Create the interface result class.
+ ACE_Asynch_Read_File::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_read_file (result);
+}
+
+ACE_POSIX_Asynch_Read_File_Result::~ACE_POSIX_Asynch_Read_File_Result (void)
+{
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Read_File::ACE_POSIX_AIOCB_Asynch_Read_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Read_Stream_Impl (),
+ ACE_Asynch_Read_File_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Read_Stream (posix_aiocb_proactor)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ u_long offset,
+ u_long offset_high,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Read_File_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Read_File_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_read,
+ act,
+ offset,
+ offset_high,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ return this->shared_read (result);
+}
+
+ACE_POSIX_AIOCB_Asynch_Read_File::~ACE_POSIX_AIOCB_Asynch_Read_File (void)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void *act,
+ int priority)
+{
+ return ACE_POSIX_AIOCB_Asynch_Read_Stream::read (message_block,
+ bytes_to_read,
+ act,
+ priority);
+}
+
+// ************************************************************
+
+ACE_POSIX_SIG_Asynch_Read_File::ACE_POSIX_SIG_Asynch_Read_File (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Read_Stream_Impl (),
+ ACE_Asynch_Read_File_Impl (),
+ ACE_POSIX_SIG_Asynch_Read_Stream (posix_sig_proactor)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Read_File::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ u_long offset,
+ u_long offset_high,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Read_File_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Read_File_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_read,
+ act,
+ offset,
+ offset_high,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ return this->shared_read (result);
+}
+
+int
+ACE_POSIX_SIG_Asynch_Read_File::read (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void *act,
+ int priority)
+{
+ return ACE_POSIX_SIG_Asynch_Read_Stream::read (message_block,
+ bytes_to_read,
+ act,
+ priority);
+}
+
+ACE_POSIX_SIG_Asynch_Read_File::~ACE_POSIX_SIG_Asynch_Read_File (void)
+{
+}
+
+// ************************************************************
+
+ACE_POSIX_Asynch_Write_File_Result::ACE_POSIX_Asynch_Write_File_Result (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void* act,
+ u_long offset,
+ u_long offset_high,
+ ACE_HANDLE event,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Write_Stream_Result_Impl (),
+ ACE_Asynch_Write_File_Result_Impl (),
+ ACE_POSIX_Asynch_Write_Stream_Result (handler,
+ handle,
+ message_block,
+ bytes_to_write,
+ act,
+ event,
+ priority)
+{
+ this->aio_offset = offset;
+ //
+ // @@ Support offset_high with aiocb64.
+ //
+ ACE_UNUSED_ARG (offset_high);
+}
+
+void
+ACE_POSIX_Asynch_Write_File_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy the data.
+
+ // <bytes_transferred> is available in <AIO_SYSRETURN>
+ ACE_UNUSED_ARG (bytes_transferred);
+
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+
+ // <error> is available in <aio_resultp.aio_error>
+ ACE_UNUSED_ARG (error);
+
+ // Appropriately move the pointers in the message block.
+ this->message_block_.rd_ptr (bytes_transferred);
+
+ // Create the interface result class.
+ ACE_Asynch_Write_File::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_write_file (result);
+}
+
+ACE_POSIX_Asynch_Write_File_Result::~ACE_POSIX_Asynch_Write_File_Result (void)
+{
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Write_File::ACE_POSIX_AIOCB_Asynch_Write_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Write_Stream_Impl (),
+ ACE_Asynch_Write_File_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Write_Stream (posix_aiocb_proactor)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ u_long offset,
+ u_long offset_high,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Write_File_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Write_File_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_write,
+ act,
+ offset,
+ offset_high,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ return this->shared_write (result);
+}
+
+ACE_POSIX_AIOCB_Asynch_Write_File::~ACE_POSIX_AIOCB_Asynch_Write_File (void)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void *act,
+ int priority)
+{
+ return ACE_POSIX_AIOCB_Asynch_Write_Stream::write (message_block,
+ bytes_to_write,
+ act,
+ priority);
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Write_File::ACE_POSIX_SIG_Asynch_Write_File (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Write_Stream_Impl (),
+ ACE_Asynch_Write_File_Impl (),
+ ACE_POSIX_SIG_Asynch_Write_Stream (posix_sig_proactor)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Write_File::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ u_long offset,
+ u_long offset_high,
+ const void *act,
+ int priority)
+{
+ ACE_POSIX_Asynch_Write_File_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Write_File_Result (*this->handler_,
+ this->handle_,
+ message_block,
+ bytes_to_write,
+ act,
+ offset,
+ offset_high,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ return this->shared_write (result);
+}
+
+ACE_POSIX_SIG_Asynch_Write_File::~ACE_POSIX_SIG_Asynch_Write_File (void)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Write_File::write (ACE_Message_Block &message_block,
+ u_long bytes_to_write,
+ const void *act,
+ int priority)
+{
+ return ACE_POSIX_SIG_Asynch_Write_Stream::write (message_block,
+ bytes_to_write,
+ act,
+ priority);
+}
+
+// *********************************************************************
+
+u_long
+ACE_POSIX_Asynch_Accept_Result::bytes_to_read (void) const
+{
+ return this->aio_nbytes;
+}
+
+ACE_Message_Block &
+ACE_POSIX_Asynch_Accept_Result::message_block (void) const
+{
+ return this->message_block_;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Accept_Result::listen_handle (void) const
+{
+ return this->listen_handle_;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Accept_Result::accept_handle (void) const
+{
+ return this->aio_fildes;
+}
+
+ACE_POSIX_Asynch_Accept_Result::ACE_POSIX_Asynch_Accept_Result (ACE_Handler &handler,
+ ACE_HANDLE listen_handle,
+ ACE_HANDLE accept_handle,
+ ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ const void* act,
+ ACE_HANDLE event,
+ int priority)
+
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Accept_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority),
+ message_block_ (message_block),
+ listen_handle_ (listen_handle)
+{
+ this->aio_fildes = accept_handle;
+ this->aio_nbytes = bytes_to_read;
+}
+
+void
+ACE_POSIX_Asynch_Accept_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy the data.
+ this->AIO_SYSRETURN = bytes_transferred;
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+ this->AIO_SYSERRNO = error;
+
+ // Appropriately move the pointers in the message block.
+ this->message_block_.wr_ptr (bytes_transferred);
+
+ // Create the interface result class.
+ ACE_Asynch_Accept::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_accept (result);
+}
+
+ACE_POSIX_Asynch_Accept_Result::~ACE_POSIX_Asynch_Accept_Result (void)
+{
+}
+
+// *********************************************************************
+
+class ACE_Export ACE_POSIX_Asynch_Accept_Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // For the POSIX implementation, this class takes care of doing
+ // Asynch_Accept.
+ //
+ // = DESCRIPTION
+ //
+public:
+ ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor);
+ // Constructor. Give the reactor so that it can activate/deactivate
+ // the handlers. Give also the proactor used here, so that the
+ // handler can send information through the notification pipe of the
+ // proactor, in case AIO_CONTROL_BLOCKS strategy is used.
+
+ ~ACE_POSIX_Asynch_Accept_Handler (void);
+ // Destructor.
+
+ int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result);
+ // Register this <accept> call with the local handler.
+
+ // virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE) = 0;
+ // Called when accept event comes up on the <listen_handle>.
+ // This is defined in ACE_Handler, the derived
+ // Asynch_Accept_Handler's will define this again.
+
+protected:
+ ACE_POSIX_Asynch_Accept_Result* deregister_accept_call (void);
+ // Undo the things done when registering.
+
+ ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_;
+ // Queue of Result pointers that correspond to all the <accept>'s
+ // pending.
+
+ ACE_Reactor* reactor_;
+ // Reactor used by the Asynch_Accept. We need this here to enable
+ // and disable the <handle> now and then, depending on whether any
+ // <accept> is pending or no.
+
+ ACE_Thread_Mutex lock_;
+ // The lock to protect the result queue which is shared. The queue
+ // is updated by main thread in the register function call and
+ // through the auxillary thread in the deregister fun. So let us
+ // mutex it.
+};
+
+ACE_POSIX_Asynch_Accept_Handler::ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor)
+ : reactor_ (reactor)
+{
+}
+
+ACE_POSIX_Asynch_Accept_Handler::~ACE_POSIX_Asynch_Accept_Handler (void)
+{
+}
+
+int
+ACE_POSIX_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n"));
+
+ // The queue is updated by main thread in the register function call
+ // and thru the auxillary thread in the deregister fun. So let us
+ // mutex it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+
+ // Insert this result to the queue.
+ int insert_result = this->result_queue_.enqueue_tail (result);
+ if (insert_result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"),
+ -1);
+
+ // If this is the only item, then it means there the set was empty
+ // before. So enable the <handle> in the reactor.
+ if (this->result_queue_.size () == 1)
+ {
+ int return_val = this->reactor_->resume_handler (result->listen_handle ());
+ ACE_DEBUG ((LM_DEBUG, "%N:%l:return_val = %d\n", return_val));
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Reactor::resume_handler failed\n"),
+ -1);
+ }
+
+ return 0;
+}
+
+ACE_POSIX_Asynch_Accept_Result *
+ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::deregister_accept_call\n"));
+
+ // The queue is updated by main thread in the register function call and
+ // thru the auxillary thread in the deregister fun. So let us mutex
+ // it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
+
+ // Get the first item (result ptr) from the Queue.
+ ACE_POSIX_Asynch_Accept_Result* result = 0;
+ int return_dequeue = this->result_queue_.dequeue_head (result);
+ if (return_dequeue == -1)
+ return 0;
+
+ ACE_ASSERT (result != 0);
+
+ // Disable the <handle> in the reactor if no <accept>'s are pending.
+ if (this->result_queue_.size () == 0)
+ {
+ int return_val = this->reactor_->suspend_handler (result->listen_handle ());
+ if (return_val != 0)
+ return 0;
+ }
+
+ // Return the result pointer.
+ return result;
+}
+
+// *********************************************************************
+
+class ACE_Export ACE_POSIX_AIOCB_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler
+{
+ // = TITLE
+ // For the POSIX implementation, this class takes care of doing
+ // Asynch_Accept.
+ //
+ // = DESCRIPTION
+ //
+public:
+ ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor *reactor,
+ ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
+ // Constructor. Give the reactor so that it can activate/deactivate
+ // the handlers. Give also the proactor used here, so that the
+ // handler can send information through the notification pipe of the
+ // proactor, in case AIO_CONTROL_BLOCKS strategy is used.
+
+ ~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void);
+ // Destructor.
+
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+ // Called when accept event comes up on the <listen_handle>.
+
+private:
+ ACE_POSIX_AIOCB_Proactor* posix_aiocb_proactor_;
+ // Proactor used by the Asynch_Accept class.
+};
+
+ACE_POSIX_AIOCB_Asynch_Accept_Handler::ACE_POSIX_AIOCB_Asynch_Accept_Handler (ACE_Reactor* reactor,
+ ACE_POSIX_AIOCB_Proactor* posix_aiocb_proactor)
+ : ACE_POSIX_Asynch_Accept_Handler (reactor),
+ posix_aiocb_proactor_ (posix_aiocb_proactor)
+{
+}
+
+ACE_POSIX_AIOCB_Asynch_Accept_Handler::~ACE_POSIX_AIOCB_Asynch_Accept_Handler (void)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
+{
+ // An <accept> has been sensed on the <listen_handle>. We should be
+ // able to just go ahead and do the <accept> now on this <fd>. This
+ // should be the same as the <listen_handle>.
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG, "ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input called\n"));
+
+ // Deregister this info pertaining to this <accept> call.
+ ACE_POSIX_Asynch_Accept_Result* result = this->deregister_accept_call ();
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%P:%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n",
+ fd,
+ result->listen_handle ()));
+
+ // Issue <accept> now.
+ // @@ We shouldnt block here since we have already done poll/select
+ // thru reactor. But are we sure?
+ ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0);
+ if (new_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "<accept> system call failed"),
+ -1);
+
+ // Accept has completed.
+
+ // Store the new handle.
+ result->aio_fildes = new_handle;
+
+ // Notify the main process about this completion
+ // Send the result pointer thru the notify pipe depending on what is Completion
+ // Notification Strategy.
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept_Handler::handle_input: AIO_CONTROL_BLOCKS\n"));
+
+ // Send the Result through the notification pipe.
+ if (this->posix_aiocb_proactor_->notify_asynch_accept (result) == -1)
+ return -1;
+
+ return 0;
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Accept::ACE_POSIX_AIOCB_Asynch_Accept (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Accept_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor),
+ accept_handler_ (0)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Accept::accept (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ ACE_HANDLE accept_handle,
+ const void *act,
+ int priority)
+{
+ // Sanity check: make sure that enough space has been allocated by
+ // the caller.
+ size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr);
+ size_t space_in_use = message_block.wr_ptr () - message_block.base ();
+ size_t total_size = message_block.size ();
+ size_t available_space = total_size - space_in_use;
+ size_t space_needed = bytes_to_read + 2 * address_size;
+ if (available_space < space_needed)
+ ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1);
+
+ // Common code for both WIN and POSIX.
+ ACE_POSIX_Asynch_Accept_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Accept_Result (*this->handler_,
+ this->handle_,
+ accept_handle,
+ message_block,
+ bytes_to_read,
+ act,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ // Register this <accept> call with the local handler.
+ this->accept_handler_->register_accept_call (result);
+
+ return 0;
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ int result = ACE_POSIX_Asynch_Operation::open (handler,
+ handle,
+ completion_key,
+ proactor);
+ if (result == -1)
+ return result;
+
+ // Init the Asynch_Accept_Handler now. It needs to keep Proactor
+ // also with it.
+ ACE_NEW_RETURN (this->accept_handler_,
+ ACE_POSIX_AIOCB_Asynch_Accept_Handler (&this->reactor_,
+ this->posix_aiocb_proactor_),
+ -1);
+
+ // Register the handle with the reactor.
+ int return_val = this->reactor_.register_handler (this->handle_,
+ this->accept_handler_,
+ ACE_Event_Handler::ACCEPT_MASK);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Reactor::register_handler failed\n"),
+ -1);
+
+ // Suspend the <handle> now. Enable only when the <accept> is issued
+ // by the application.
+ return_val = this->reactor_.suspend_handler (this->handle_);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Reactor::suspend_handler failed\n"),
+ -1);
+
+ // Spawn the thread. It is the only thread we are going to have. It
+ // will do the <handle_events> on the reactor.
+ return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_AIOCB_Asynch_Accept::thread_function,
+ (void *) &this->reactor_);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Thread_Manager::spawn failed\n"),
+ -1);
+
+ return 0;
+}
+
+ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void)
+{
+}
+
+void*
+ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n"));
+
+ // Retrieve the reactor pointer from the argument.
+ ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor;
+
+ // It should be valid Reactor, since we have a reactor_ ,e,ner we
+ // are passing only that one here.
+ if (reactor == 0)
+ ACE_ERROR ((LM_ERROR,
+ "%n:%l:Invalid Reactor pointer passed to the thread_function\n",
+ 0));
+
+ // For this reactor, this thread is the owner.
+ reactor->owner (ACE_OS::thr_self ());
+
+ // Handle events.
+ int result = 0;
+ while (result != -1)
+ {
+ result = reactor->handle_events ();
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n",
+ result));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n"));
+
+ return 0;
+}
+
+// *********************************************************************
+
+class ACE_Export ACE_POSIX_SIG_Asynch_Accept_Handler : public ACE_POSIX_Asynch_Accept_Handler
+{
+ // = TITLE
+ // For the POSIX implementation, this class takes care of doing
+ // Asynch_Accept.
+ //
+ // = DESCRIPTION
+ //
+public:
+ ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor);
+ // Constructor. Give the reactor so that it can activate/deactivate
+ // the handlers. Give also the proactor used here, so that the
+ // handler can send information through the notification pipe of the
+ // proactor, in case AIO_CONTROL_BLOCKS strategy is used.
+
+ ~ACE_POSIX_SIG_Asynch_Accept_Handler (void);
+ // Destructor.
+
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+ // Called when accept event comes up on the <listen_handle>.
+};
+
+ACE_POSIX_SIG_Asynch_Accept_Handler::ACE_POSIX_SIG_Asynch_Accept_Handler (ACE_Reactor* reactor)
+ : ACE_POSIX_Asynch_Accept_Handler (reactor)
+{
+}
+
+ACE_POSIX_SIG_Asynch_Accept_Handler::~ACE_POSIX_SIG_Asynch_Accept_Handler (void)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
+{
+ // An <accept> has been sensed on the <listen_handle>. We should be
+ // able to just go ahead and do the <accept> now on this <fd>. This
+ // should be the same as the <listen_handle>.
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG, "ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input called\n"));
+
+ // Deregister this info pertaining to this <accept> call.
+ ACE_POSIX_Asynch_Accept_Result* result = this->deregister_accept_call ();
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n",
+ fd,
+ result->listen_handle ()));
+
+ // Issue <accept> now.
+ // @@ We shouldnt block here since we have already done poll/select
+ // thru reactor. But are we sure?
+ ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0);
+ if (new_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p\n",
+ "<accept> system call failed"),
+ -1);
+
+ // Accept has completed.
+
+ // Store the new handle.
+ result->aio_fildes = new_handle;
+
+ // Notify the mail process about this completion.
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept_Handler::handle_input: RT_SIGNALS\n"));
+
+ // Get this process id.
+ pid_t pid = ACE_OS::getpid ();
+ if (pid == (pid_t) -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p",
+ "<getpid> failed\n"),
+ -1);
+
+ // Set the signal information.
+ sigval value;
+ value.sival_ptr = (void *) result;
+
+ // Queue the signal.
+ if (sigqueue (pid, ACE_SIG_AIO, value) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p",
+ "<sigqueue> failed\n"),
+ -1);
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Accept::ACE_POSIX_SIG_Asynch_Accept (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Accept_Impl (),
+ ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor),
+ accept_handler_ (0)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Accept::accept (ACE_Message_Block &message_block,
+ u_long bytes_to_read,
+ ACE_HANDLE accept_handle,
+ const void *act,
+ int priority)
+{
+ // Sanity check: make sure that enough space has been allocated by
+ // the caller.
+ size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr);
+ size_t space_in_use = message_block.wr_ptr () - message_block.base ();
+ size_t total_size = message_block.size ();
+ size_t available_space = total_size - space_in_use;
+ size_t space_needed = bytes_to_read + 2 * address_size;
+ if (available_space < space_needed)
+ ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1);
+
+ // Common code for both WIN and POSIX.
+ ACE_POSIX_Asynch_Accept_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Accept_Result (*this->handler_,
+ this->handle_,
+ accept_handle,
+ message_block,
+ bytes_to_read,
+ act,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ // Register this <accept> call with the local handler.
+ this->accept_handler_->register_accept_call (result);
+
+ return 0;
+}
+
+
+int
+ACE_POSIX_SIG_Asynch_Accept::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ int result = ACE_POSIX_Asynch_Operation::open (handler,
+ handle,
+ completion_key,
+ proactor);
+ if (result == -1)
+ return result;
+
+ // Init the Asynch_Accept_Handler now. It needs to keep Proactor
+ // also with it.
+ ACE_NEW_RETURN (this->accept_handler_,
+ ACE_POSIX_SIG_Asynch_Accept_Handler (&this->reactor_),
+ -1);
+
+ // Register the handle with the reactor.
+ int return_val = this->reactor_.register_handler (this->handle_,
+ this->accept_handler_,
+ ACE_Event_Handler::ACCEPT_MASK);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Reactor::register_handler failed\n"),
+ -1);
+
+ // Suspend the <handle> now. Enable only when the <accept> is issued
+ // by the application.
+ return_val = this->reactor_.suspend_handler (this->handle_);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Reactor::suspend_handler failed\n"),
+ -1);
+
+ // Spawn the thread. It is the only thread we are going to have. It
+ // will do the <handle_events> on the reactor.
+ return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_SIG_Asynch_Accept::thread_function,
+ (void *)&this->reactor_);
+ if (return_val == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Thread_Manager::spawn failed\n"),
+ -1);
+
+ return 0;
+}
+
+ACE_POSIX_SIG_Asynch_Accept::~ACE_POSIX_SIG_Asynch_Accept (void)
+{
+}
+
+void*
+ACE_POSIX_SIG_Asynch_Accept::thread_function (void* arg_reactor)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n"));
+
+ // Retrieve the reactor pointer from the argument.
+ ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor;
+ if (reactor == 0)
+ reactor = ACE_Reactor::instance ();
+
+ // For this reactor, this thread is the owner.
+ reactor->owner (ACE_OS::thr_self ());
+
+ // Handle events.
+ int result = 0;
+ while (result != -1)
+ {
+ result = reactor->handle_events ();
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n",
+ result));
+ }
+
+ ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n"));
+
+ return 0;
+}
+
+// *********************************************************************
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Transmit_File_Result::socket (void) const
+{
+ return this->socket_;
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Transmit_File_Result::file (void) const
+{
+ return this->aio_fildes;
+}
+
+ACE_Asynch_Transmit_File::Header_And_Trailer *
+ACE_POSIX_Asynch_Transmit_File_Result::header_and_trailer (void) const
+{
+ return this->header_and_trailer_;
+}
+
+u_long
+ACE_POSIX_Asynch_Transmit_File_Result::bytes_to_write (void) const
+{
+ return this->aio_nbytes;
+}
+
+u_long
+ACE_POSIX_Asynch_Transmit_File_Result::bytes_per_send (void) const
+{
+ return this->bytes_per_send_;
+}
+
+u_long
+ACE_POSIX_Asynch_Transmit_File_Result::flags (void) const
+{
+ return this->flags_;
+}
+
+ACE_POSIX_Asynch_Transmit_File_Result::ACE_POSIX_Asynch_Transmit_File_Result (ACE_Handler &handler,
+ ACE_HANDLE socket,
+ ACE_HANDLE file,
+ ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
+ u_long bytes_to_write,
+ u_long offset,
+ u_long offset_high,
+ u_long bytes_per_send,
+ u_long flags,
+ const void *act,
+ ACE_HANDLE event,
+ int priority)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Transmit_File_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, offset, offset_high, priority),
+ socket_ (socket),
+ header_and_trailer_ (header_and_trailer),
+ bytes_per_send_ (bytes_per_send),
+ flags_ (flags)
+{
+ this->aio_fildes = file;
+ this->aio_nbytes = bytes_to_write;
+}
+
+void
+ACE_POSIX_Asynch_Transmit_File_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy the data.
+ this->AIO_SYSRETURN = bytes_transferred;
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+ this->AIO_SYSERRNO = error;
+
+ // We will not do this because (a) the header and trailer blocks may
+ // be the same message_blocks and (b) in cases of failures we have
+ // no idea how much of what (header, data, trailer) was sent.
+ /*
+ if (this->success_ && this->header_and_trailer_ != 0)
+ {
+ ACE_Message_Block *header = this->header_and_trailer_->header ();
+ if (header != 0)
+ header->rd_ptr (this->header_and_trailer_->header_bytes ());
+
+ ACE_Message_Block *trailer = this->header_and_trailer_->trailer ();
+ if (trailer != 0)
+ trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
+ }
+ */
+
+ // Create the interface result class.
+ ACE_Asynch_Transmit_File::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_transmit_file (result);
+}
+
+ACE_POSIX_Asynch_Transmit_File_Result::~ACE_POSIX_Asynch_Transmit_File_Result (void)
+{
+}
+
+// *********************************************************************
+
+class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
+{
+ // = TITLE
+ //
+ // Auxillary handler for doing <Asynch_Transmit_File> in
+ // Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
+ //
+ // = DESCRIPTION
+ //
+ // This is a helper class for implementing
+ // <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
+
+public:
+ virtual ~ACE_POSIX_Asynch_Transmit_Handler (void);
+ // Destructor.
+
+protected:
+ ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Asynch_Transmit_File_Result *result);
+ // Constructor. Result pointer will have all the information to do
+ // the file transmission (socket, file, application handler, bytes
+ // to write).
+
+ ACE_POSIX_Asynch_Transmit_File_Result *result_;
+ // The asynch result pointer made from the initial transmit file
+ // request.
+
+ ACE_Message_Block *mb_;
+ // Message bloack used to do the transmission.
+
+ enum ACT
+ {
+ HEADER_ACT = 1,
+ DATA_ACT = 2,
+ TRAILER_ACT = 3
+ };
+
+ ACT header_act_;
+ ACT data_act_;
+ ACT trailer_act_;
+ // ACT to transmit header, data and trailer.
+
+ size_t file_offset_;
+ // Current offset of the file being transmitted.
+
+ size_t file_size_;
+ // Total size of the file.
+
+ size_t bytes_transferred_;
+ // Number of bytes transferred on the stream.
+};
+
+// ************************************************************
+
+class ACE_Export ACE_POSIX_AIOCB_Asynch_Transmit_Handler : public ACE_POSIX_Asynch_Transmit_Handler
+{
+ // = TITLE
+ //
+ // Auxillary handler for doing <Asynch_Transmit_File> in
+ // Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
+ //
+ // = DESCRIPTION
+ //
+ // This is a helper class for implementing
+ // <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
+
+public:
+ ACE_POSIX_AIOCB_Asynch_Transmit_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor,
+ ACE_POSIX_Asynch_Transmit_File_Result *result);
+ // Constructor. Result pointer will have all the information to do
+ // the file transmission (socket, file, application handler, bytes
+ // to write).
+
+ virtual ~ACE_POSIX_AIOCB_Asynch_Transmit_Handler (void);
+ // Destructor.
+
+ int transmit (void);
+ // Do the transmission. All the info to do the transmission is in
+ // the <result> member.
+
+protected:
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the file complete.
+
+ int initiate_read_file (void);
+ // Issue asynch read from the file.
+
+ ACE_POSIX_AIOCB_Asynch_Read_File rf_;
+ // To read from the file to be transmitted.
+
+ ACE_POSIX_AIOCB_Asynch_Write_Stream ws_;
+ // Write stream to write the header, trailer and the data.
+};
+
+// *********************************************************************
+
+class ACE_Export ACE_POSIX_SIG_Asynch_Transmit_Handler : public ACE_POSIX_Asynch_Transmit_Handler
+{
+ // = TITLE
+ // Auxillary handler for doing <Asynch_Transmit_File> in
+ // Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
+ //
+ // = DESCRIPTION
+ // This is a helper class for implementing
+ // <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
+public:
+ ACE_POSIX_SIG_Asynch_Transmit_Handler (ACE_POSIX_SIG_Proactor *proactor,
+ ACE_POSIX_Asynch_Transmit_File_Result *result);
+ // Constructor. Result pointer will have all the information to do
+ // the file transmission (socket, file, application handler, bytes
+ // to write).
+
+ virtual ~ACE_POSIX_SIG_Asynch_Transmit_Handler (void);
+ // Destructor.
+
+ int transmit (void);
+ // Do the transmission. All the info to do the transmission is in
+ // the <result> member.
+
+protected:
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the file complete.
+
+ int initiate_read_file (void);
+ // Issue asynch read from the file.
+
+ ACE_POSIX_SIG_Asynch_Read_File rf_;
+ // To read from the file to be transmitted.
+
+ ACE_POSIX_SIG_Asynch_Write_Stream ws_;
+ // Write stream to write the header, trailer and the data.
+};
+
+// *********************************************************************
+
+// Constructor.
+ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Asynch_Transmit_File_Result *result)
+ : result_ (result),
+ mb_ (0),
+ header_act_ (this->HEADER_ACT),
+ data_act_ (this->DATA_ACT),
+ trailer_act_ (this->TRAILER_ACT),
+ file_offset_ (result->offset ()),
+ file_size_ (0),
+ bytes_transferred_ (0)
+{
+ // Allocate memory for the message block.
+ ACE_NEW (this->mb_,
+ ACE_Message_Block (this->result_->bytes_per_send ()
+ + 1));
+ // Init the file size.
+ file_size_ = ACE_OS::filesize (this->result_->file ());
+}
+
+// Destructor.
+ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler (void)
+{
+ delete result_;
+ mb_->release ();
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::ACE_POSIX_AIOCB_Asynch_Transmit_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor,
+ ACE_POSIX_Asynch_Transmit_File_Result *result)
+ : ACE_POSIX_Asynch_Transmit_Handler (result),
+ rf_ (posix_aiocb_proactor),
+ ws_ (posix_aiocb_proactor)
+{
+}
+
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::~ACE_POSIX_AIOCB_Asynch_Transmit_Handler (void)
+{
+}
+
+// Do the transmission.
+// Initiate transmitting the header. When that completes
+// handle_write_stream will be called, there start transmitting the file.
+int
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::transmit (void)
+{
+ // No proactor is given for the <open>'s. Because we are using the
+ // concrete implementations of the Asynch_Operations, and we have
+ // already given them the specific proactor, so they wont need the
+ // general <proactor> interface pointer.
+
+ // Open Asynch_Read_File.
+ if (this->rf_.open (*this,
+ this->result_->file (),
+ 0,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
+ -1);
+
+ // Open Asynch_Write_Stream.
+ if (this->ws_.open (*this,
+ this->result_->socket (),
+ 0,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
+ -1);
+
+ // Transmit the header.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
+ this->result_->header_and_trailer ()->header_bytes (),
+ (void *) &this->header_act_,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
+ -1);
+ return 0;
+}
+
+void
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ // Update bytes transferred so far.
+ this->bytes_transferred_ += result.bytes_transferred ();
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ // Failure.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ 0); // @@ Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ // This is crucial to prevent memory leaks. This deletes
+ // the result pointer also.
+ delete this;
+ }
+ }
+ }
+
+ // Write stream successful.
+
+ // Partial write to socket.
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Partial write to socket: Asynch_write called again\n"));
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data,
+ result.act (),
+ result_->priority ()) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_Handler:write_stream failed\n"));
+ return;
+ }
+
+ // @@ Handling *partial write* to a socket. Let us not continue
+ // further before this write finishes. Because proceeding with
+ // another read and then write might change the order of the
+ // file transmission, because partial write to the stream is
+ // always possible.
+ return;
+ }
+
+ // Not a partial write. A full write.
+
+ // Check ACT to see what was sent.
+ ACT act = *(ACT *) result.act ();
+
+ switch (act)
+ {
+ case TRAILER_ACT:
+ // If it is the "trailer" that is just sent, then transmit file
+ // is complete.
+ // Call the application handler.
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 1, // @@ Success.
+ 0, // @@ Completion key.
+ 0); // @@ Errno.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ break;
+
+ case HEADER_ACT:
+ case DATA_ACT:
+ // If header/data was sent, initiate the file data transmission.
+ if (this->initiate_read_file () == -1)
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
+ break;
+
+ default:
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
+ }
+}
+
+void
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ // Failure.
+ if (result.success () == 0)
+ {
+ //
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ errno); // Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ return;
+ }
+
+ // Read successful.
+ if (result.bytes_transferred () == 0)
+ return;
+
+ // Increment offset and write data to network.
+ this->file_offset_ += result.bytes_transferred ();
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred (),
+ (void *)&this->data_act_,
+ result_->priority ()) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
+ return;
+ }
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Transmit_Handler::initiate_read_file (void)
+{
+ // Is there something to read.
+ if (this->file_offset_ >= this->file_size_)
+ {
+ // File is sent. Send the trailer.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
+ this->result_->header_and_trailer ()->trailer_bytes (),
+ (void *)&this->trailer_act_,
+ this->result_->priority ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
+ -1);
+ return 0;
+ }
+ else
+ {
+ // @@ Is this right??
+ // Previous reads and writes are over. For the new read, adjust
+ // the wr_ptr and the rd_ptr to the beginning.
+ this->mb_->rd_ptr (this->mb_->base ());
+ this->mb_->wr_ptr (this->mb_->base ());
+
+ // Inititiate an asynchronous read from the file.
+ if (this->rf_.read (*this->mb_,
+ this->mb_->size () - 1,
+ this->file_offset_,
+ 0, // @@ offset_high !!! if aiocb64 is used.
+ 0, // Act
+ this->result_->priority ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler::read from file failed\n"),
+ -1);
+ return 0;
+ }
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Transmit_Handler::ACE_POSIX_SIG_Asynch_Transmit_Handler (ACE_POSIX_SIG_Proactor *posix_sig_proactor,
+ ACE_POSIX_Asynch_Transmit_File_Result *result)
+ : ACE_POSIX_Asynch_Transmit_Handler (result),
+ rf_ (posix_sig_proactor),
+ ws_ (posix_sig_proactor)
+{
+}
+
+ACE_POSIX_SIG_Asynch_Transmit_Handler::~ACE_POSIX_SIG_Asynch_Transmit_Handler (void)
+{
+}
+
+// Do the transmission.
+// Initiate transmitting the header. When that completes
+// handle_write_stream will be called, there start transmitting the file.
+int
+ACE_POSIX_SIG_Asynch_Transmit_Handler::transmit (void)
+{
+ // The Proactor given for the <open>'s is not going to be
+ // used. Because we are using the
+ // concrete implementations of the Asynch_Operations, and we have
+ // already given them the specific proactor, so they wont need the
+ // general <proactor> interface pointer.
+
+ // Open Asynch_Read_File.
+ if (this->rf_.open (*this,
+ this->result_->file (),
+ this->result_->completion_key (), // Completion key
+ 0) // Proactor
+ == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
+ -1);
+
+ // Open Asynch_Write_Stream.
+ if (this->ws_.open (*this,
+ this->result_->socket (),
+ this->result_->completion_key (), // Completion key
+ 0) // Proactor
+ == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
+ -1);
+
+ // Transmit the header.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
+ this->result_->header_and_trailer ()->header_bytes (),
+ (void *) &this->header_act_,
+ 0) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
+ -1);
+ return 0;
+}
+
+void
+ACE_POSIX_SIG_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
+{
+ // Update bytes transferred so far.
+ this->bytes_transferred_ += result.bytes_transferred ();
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ // Failure.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ 0); // @@ Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ // This is crucial to prevent memory leaks. This deletes
+ // the result pointer also.
+ delete this;
+ }
+ }
+ }
+
+ // Write stream successful.
+
+ // Partial write to socket.
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.bytes_transferred ());
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data,
+ result.act (),
+ result.priority ()) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_Handler:write_stream failed\n"));
+ return;
+ }
+
+ // @@ Handling *partial write* to a socket. Let us not continue
+ // further before this write finishes. Because proceeding with
+ // another read and then write might change the order of the
+ // file transmission, because partial write to the stream is
+ // always possible.
+ return;
+ }
+
+ // Not a partial write. A full write.
+
+ // Check ACT to see what was sent.
+ ACT act = *(ACT *) result.act ();
+
+ switch (act)
+ {
+ case TRAILER_ACT:
+ // If it is the "trailer" that is just sent, then transmit file
+ // is complete.
+ // Call the application handler.
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 1, // @@ Success.
+ 0, // @@ Completion key.
+ 0); // @@ Errno.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ break;
+
+ case HEADER_ACT:
+ case DATA_ACT:
+ // If header/data was sent, initiate the file data transmission.
+ if (this->initiate_read_file () == -1)
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
+ break;
+
+ default:
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
+ }
+}
+
+void
+ACE_POSIX_SIG_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
+{
+ // Failure.
+ if (result.success () == 0)
+ {
+ //
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ errno); // Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ return;
+ }
+
+ // Read successful.
+ if (result.bytes_transferred () == 0)
+ return;
+
+ // Increment offset and write data to network.
+ this->file_offset_ += result.bytes_transferred ();
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred (),
+ (void *)&this->data_act_,
+ result.priority ()) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
+ return;
+ }
+}
+
+int
+ACE_POSIX_SIG_Asynch_Transmit_Handler::initiate_read_file (void)
+{
+ // Is there something to read.
+ if (this->file_offset_ >= this->file_size_)
+ {
+ // File is sent. Send the trailer.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
+ this->result_->header_and_trailer ()->trailer_bytes (),
+ (void *)&this->trailer_act_,
+ this->result_->priority ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
+ -1);
+ return 0;
+ }
+ else
+ {
+ // Inititiate an asynchronous read from the file.
+ if (this->rf_.read (*this->mb_,
+ this->mb_->size () - 1,
+ this->file_offset_,
+ 0, // @@, offset_high, not implemented.
+ 0, // ACT
+ this->result_->priority ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler::read from file failed\n"),
+ -1);
+ return 0;
+ }
+}
+
+// *********************************************************************
+
+ACE_POSIX_AIOCB_Asynch_Transmit_File::ACE_POSIX_AIOCB_Asynch_Transmit_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Transmit_File_Impl (),
+ ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor)
+{
+}
+
+int
+ACE_POSIX_AIOCB_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
+ ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
+ u_long bytes_to_write,
+ u_long offset,
+ u_long offset_high,
+ u_long bytes_per_send,
+ u_long flags,
+ const void *act,
+ int priority)
+{
+ // Adjust these parameters if there are default values specified.
+ ssize_t file_size = ACE_OS::filesize (file);
+
+ if (file_size == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ ":Asynch_Transmit_File:Couldnt know the file size"),
+ -1);
+
+ if (bytes_to_write == 0)
+ bytes_to_write = file_size;
+
+ if (offset > (size_t) file_size)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Transmit_File:File size is less than offset"),
+ -1);
+
+ if (offset != 0)
+ bytes_to_write = file_size - offset + 1;
+
+ if (bytes_per_send == 0)
+ bytes_per_send = bytes_to_write;
+
+ // Configure the result parameter.
+ ACE_POSIX_Asynch_Transmit_File_Result *result = 0;
+
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Transmit_File_Result (*this->handler_,
+ this->handle_,
+ file,
+ header_and_trailer,
+ bytes_to_write,
+ offset,
+ offset_high,
+ bytes_per_send,
+ flags,
+ act,
+ this->posix_aiocb_proactor_->get_handle (),
+ priority),
+ -1);
+
+ // Make the auxillary handler and initiate transmit.
+ ACE_POSIX_AIOCB_Asynch_Transmit_Handler *transmit_handler = 0;
+
+ ACE_NEW_RETURN (transmit_handler,
+ ::ACE_POSIX_AIOCB_Asynch_Transmit_Handler (this->posix_aiocb_proactor_, result),
+ -1);
+
+ ssize_t return_val = transmit_handler->transmit ();
+
+ if (return_val == -1)
+ // This deletes the <result> in it too.
+ delete transmit_handler;
+
+ return 0;
+}
+
+ACE_POSIX_AIOCB_Asynch_Transmit_File::~ACE_POSIX_AIOCB_Asynch_Transmit_File (void)
+{
+}
+
+// *********************************************************************
+
+ACE_POSIX_SIG_Asynch_Transmit_File::ACE_POSIX_SIG_Asynch_Transmit_File (ACE_POSIX_SIG_Proactor *posix_sig_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Transmit_File_Impl (),
+ ACE_POSIX_SIG_Asynch_Operation (posix_sig_proactor)
+{
+}
+
+int
+ACE_POSIX_SIG_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
+ ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
+ u_long bytes_to_write,
+ u_long offset,
+ u_long offset_high,
+ u_long bytes_per_send,
+ u_long flags,
+ const void *act,
+ int priority)
+{
+ // Adjust these parameters if there are default values specified.
+ ssize_t file_size = ACE_OS::filesize (file);
+
+ if (file_size == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ ":Asynch_Transmit_File:Couldnt know the file size"),
+ -1);
+
+ if (bytes_to_write == 0)
+ bytes_to_write = file_size;
+
+ if (offset > (size_t) file_size)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:%p\n",
+ "Asynch_Transmit_File:File size is less than offset"),
+ -1);
+
+ if (offset != 0)
+ bytes_to_write = file_size - offset + 1;
+
+ if (bytes_per_send == 0)
+ bytes_per_send = bytes_to_write;
+
+ // Configure the result parameter.
+ ACE_POSIX_Asynch_Transmit_File_Result *result = 0;
+
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Transmit_File_Result (*this->handler_,
+ this->handle_,
+ file,
+ header_and_trailer,
+ bytes_to_write,
+ offset,
+ offset_high,
+ bytes_per_send,
+ flags,
+ act,
+ this->posix_sig_proactor_->get_handle (),
+ priority),
+ -1);
+
+ // Make the auxillary handler and initiate transmit.
+ ACE_POSIX_SIG_Asynch_Transmit_Handler *transmit_handler = 0;
+
+ ACE_NEW_RETURN (transmit_handler,
+ ::ACE_POSIX_SIG_Asynch_Transmit_Handler (this->posix_sig_proactor_, result),
+ -1);
+
+ ssize_t return_val = transmit_handler->transmit ();
+
+ if (return_val == -1)
+ // This deletes the <result> in it too.
+ delete transmit_handler;
+
+ return 0;
+}
+
+ACE_POSIX_SIG_Asynch_Transmit_File::~ACE_POSIX_SIG_Asynch_Transmit_File (void)
+{
+}
+
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>;
+template class ACE_Node<ACE_POSIX_Asynch_Accept_Result *>;
+template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *>;
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+#pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>
+#pragma instantiate ACE_Node<ACE_POSIX_Asynch_Accept_Result *>
+#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *>
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+#endif /* ACE_HAS_AIO_CALLS */