summaryrefslogtreecommitdiff
path: root/ace/Asynch_IO.cpp
diff options
context:
space:
mode:
authoralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-25 19:51:45 +0000
committeralex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1998-09-25 19:51:45 +0000
commit036c81e60d05de7d1613dbf9b29f3be7cf2ab5a1 (patch)
tree0a8a701211e88221ae2bcce719c8e72f2eef55bd /ace/Asynch_IO.cpp
parent447c0d71a34fc01455b1cfeff5971d8b209701ce (diff)
downloadATCD-036c81e60d05de7d1613dbf9b29f3be7cf2ab5a1.tar.gz
Implemented Asynch_Accept to work for AIO_CONTROL_BLOCKS strategy of
completion notification. Defined an auxillary Accept_Handler called ACE_AIO_Accept_Handler in addition to the ACE_Asynch_Accept_Handler. ACE_AIO_Accept_Handler holds the notification pipe and does a read on it to handle the <ACE_Asynch_Accept::Result> coming from Asynch_Accept_handler. THANKS to Doug and Irfan for suggesting this 'notification pipe' based implementation for AIO_CONTROL_BLOCKS strategy, so that Proactor will work on platforms where POSIX4 RT_Signals are broken (Solaris 2.6)
Diffstat (limited to 'ace/Asynch_IO.cpp')
-rw-r--r--ace/Asynch_IO.cpp995
1 files changed, 519 insertions, 476 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 192836fe56e..505c5f9b1ae 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -18,131 +18,8 @@ ACE_RCSID(ace, Asynch_IO, "$Id$")
#include "ace/Asynch_IO.i"
#endif /* __ACE_INLINE__ */
-#if defined (ACE_HAS_AIO_CALLS)
-class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler
-{
- // = TITLE
- // For the POSIX implementation, this class takes care of doing
- // Asynch_Accept.
- //
- // = DESCRIPTION
- //
-public:
- ACE_Asynch_Accept_Handler (ACE_Reactor* reactor);
- // Constructor.
-
- ~ACE_Asynch_Accept_Handler (void);
- // Destructor.
-
- int register_accept_call (ACE_Asynch_Accept::Result* result);
- // Register this <accept> call with the local handler.
-
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
- // Called when accept event comes up on the <listen_handle>.
-
-private:
- ACE_Asynch_Accept::Result* deregister_accept_call (void);
- // Undo the things done when registering.
-
- ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> result_queue_;
- // Queue of Result pointers that correspond to all the <accept>'s
- // pending.
-
- ACE_Reactor* reactor_;
- // Reactor used by the Asynch_Accept. We need this here to enable
- // and disable the <handle> now and then, depending on whether any
- // <accept> is pending or no.
-
- ACE_Thread_Mutex lock_;
- // The lock to protect the result queue which is shared. The queue
- // is updated by main thread in the register function call and
- // through the auxillary thread in the deregister fun. So let us
- // mutex it.
-
- ACE_Asynch_Accept_Handler (void);
- // Default constructor shouldn't be called. Without a reactor, this
- // class wont make sense.
-};
-#endif /* ACE_HAS_AIO_CALLS*/
-
-#if defined (ACE_HAS_AIO_CALLS)
-class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler
-{
- // = TITLE
- // Auxillary handler for doing <Asynch_Transmit_File> in
- // Unix. <ACE_Asynch_Transmit_File> internally uses this.
- //
- // = DESCRIPTION
- // This is a helper class for implementing
- // <ACE_Asynch_Transmit_File> in Unix systems.
-public:
- ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
- ACE_Proactor *proactor);
- // Constructor. Result pointer will have all the information to do
- // the file transmission (socket, file, application handler, bytes
- // to write....) and the the <proactor> pointer tells this class
- // the <proactor> that is being used by the
- // Asynch_Transmit_Operation and the application.
-
- virtual ~ACE_Asynch_Transmit_Handler (void);
- // Destructor.
-
- int transmit (void);
- // Do the transmission. All the info to do the transmission is in
- // the <result> member.
-
-protected:
- virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
- // This is called when asynchronous writes from the socket complete.
-
- virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
- // This is called when asynchronous reads from the file complete.
-private:
- int initiate_read_file (void);
- // Issue asynch read from the file.
-
- ACE_Asynch_Transmit_File::Result *result_;
- // The asynch result pointer made from the initial transmit file
- // request.
-
- ACE_Proactor *proactor_;
- // The Proactor that is being used by the application handler and
- // so the Asynch_Transmit_File.
-
- ACE_Asynch_Read_File rf_;
- // To read from the file to be transmitted.
- ACE_Asynch_Write_Stream ws_;
- // Write stream to write the header, trailer and the data.
- ACE_Message_Block *mb_;
- // Message bloack used to do the txn.
-
- enum ACT
- {
- HEADER_ACT = 1,
- DATA_ACT = 2,
- TRAILER_ACT = 3
- };
-
- ACT header_act_;
- ACT data_act_;
- ACT trailer_act_;
- // ACT to transmit header, data and trailer.
-
- size_t file_offset_;
- // Current offset of the file being transmitted.
-
- size_t file_size_;
- // Total size of the file.
-
- size_t bytes_transferred_;
- // Number of bytes transferred on the stream.
-
- size_t transmit_file_done_;
- // Flag to indicate that the transmitting is over.
-};
-#endif /* ACE_HAS_AIO_CALLS */
ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler,
const void* act,
@@ -895,12 +772,238 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred,
// ************************************************************
#if defined (ACE_HAS_AIO_CALLS)
+class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler
+{
+ // = TITLE
+ // For the POSIX implementation, this class takes care of doing
+ // Asynch_Accept.
+ //
+ // = DESCRIPTION
+ //
+public:
+ ACE_Asynch_Accept_Handler (ACE_Reactor* reactor,
+ ACE_Proactor* proactor);
+ // Constructor. Give the reactor so that it can activate/deactivate
+ // the handlers. Give also the proactor used here, so that the
+ // handler can send information through the notification pipe of the
+ // proactor, in case AIO_CONTROL_BLOCKS strategy is used.
+
+ ~ACE_Asynch_Accept_Handler (void);
+ // Destructor.
+
+ int register_accept_call (ACE_Asynch_Accept::Result* result);
+ // Register this <accept> call with the local handler.
+
+ virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);
+ // Called when accept event comes up on the <listen_handle>.
+
+private:
+ ACE_Asynch_Accept::Result* deregister_accept_call (void);
+ // Undo the things done when registering.
+
+ ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> result_queue_;
+ // Queue of Result pointers that correspond to all the <accept>'s
+ // pending.
+
+ ACE_Reactor* reactor_;
+ // Reactor used by the Asynch_Accept. We need this here to enable
+ // and disable the <handle> now and then, depending on whether any
+ // <accept> is pending or no.
+
+ ACE_Thread_Mutex lock_;
+ // The lock to protect the result queue which is shared. The queue
+ // is updated by main thread in the register function call and
+ // through the auxillary thread in the deregister fun. So let us
+ // mutex it.
+
+ ACE_Asynch_Accept_Handler (void);
+ // Default constructor shouldn't be called. Without a reactor, this
+ // class wont make sense.
+
+ ACE_Proactor* proactor_;
+ // Proactor used by the Asynch_Accept class.
+};
+
+ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor,
+ ACE_Proactor* proactor)
+ : reactor_ (reactor),
+ proactor_ (proactor)
+{
+}
+
+ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void)
+{
+}
+
+int
+ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result)
+{
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n"));
+
+ // The queue is updated by main thread in the register function call and
+ // thru the auxillary thread in the deregister fun. So let us mutex
+ // it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+
+ // Insert this result to the queue.
+ int insert_result = this->result_queue_.enqueue_tail (result);
+ if (insert_result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"),
+ -1);
+
+ // If this is the only item, then it means there the set was empty
+ // before. So enable the <handle> in the reactor.
+ if (this->result_queue_.size () == 1)
+ this->reactor_->resume_handlers ();
+
+ return 0;
+}
+
+ACE_Asynch_Accept::Result*
+ACE_Asynch_Accept_Handler::deregister_accept_call (void)
+{
+ // The queue is updated by main thread in the register function call and
+ // thru the auxillary thread in the deregister fun. So let us mutex
+ // it.
+ ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
+
+ // Get the first item (result ptr) from the Queue.
+ ACE_Asynch_Accept::Result* result = 0;
+ int return_dequeue = this->result_queue_.dequeue_head (result);
+ if (return_dequeue == -1)
+ return 0;
+ // Sanity check.
+ if (result == 0)
+ return 0;
+
+ // Disable the <handle> in the reactor if no <accept>'s are
+ // pending.
+ if (this->result_queue_.size () == 0)
+ this->reactor_->suspend_handlers ();
+
+ // Return the result pointer.
+ return result;
+}
+
+int
+ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
+{
+ // An <accept> has been sensed on the <listen_handle>. We should be
+ // able to just go ahead and do the <accept> now on this <fd>. This
+ // should be the same as the <listen_handle>.
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n"));
+
+ // Deregister this info pertaining to this <accept> call.
+ ACE_Asynch_Accept::Result* result = this->deregister_accept_call ();
+
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n",
+ fd,
+ result->listen_handle ()));
+
+ // There is not going to be any data read. So we can use the
+ // <message_block> itself to take the <remote_address> as well as
+ // the size of that address.
+
+ // We will have atleast 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)).
+ size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr);
+
+ // Parameters for the <accept> call.
+ int *address_size = (int *)result->message_block ().wr_ptr ();
+ *address_size = buffer_size;
+
+ // Increment the wr_ptr.
+ result->message_block ().wr_ptr (sizeof (int));
+
+ // Issue <accept> now.
+ // @@ We shouldnt block here since we have already done poll/select
+ // thru reactor. But are we sure?
+ ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (),
+ (struct sockaddr *) result->message_block ().wr_ptr (),
+ address_size);
+ if (new_handle == ACE_INVALID_HANDLE)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p:\n",
+ "<accept> system call failed"),
+ -1);
+
+ // Accept has completed.
+
+ // Update the <wr_ptr> for the <message block>.
+ result->message_block ().wr_ptr (*address_size);
+
+ // @@ Just debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "%N:%l:Address_size = [%d], New_handle = [%d]\n",
+ *address_size, new_handle));
+
+ // Store the new handle.
+ result->accept_handle_ = new_handle;
+
+ // Notify the mail process about this completion
+ // Signal the main process about this completion or send the result
+ // pointer thru the notify pipe depending on what is Completion
+ // Notification Strategy.
+ switch (this->proactor_->posix_completion_strategy ())
+ {
+ case ACE_Proactor::RT_SIGNALS:
+ // Queue a signal to the proactor.
+ {
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept_Handler::handle_input: RT_SIGNALS\n"));
+
+ // Get this process id.
+ pid_t pid = ACE_OS::getpid ();
+ if (pid == (pid_t) -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p:",
+ "<getpid> failed\n"),
+ -1);
+
+ // Set the signal information.
+ sigval value;
+ value.sival_ptr = (void *) result;
+
+ // Queue the signal.
+ if (sigqueue (pid, ACE_SIG_AIO, value) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:(%P | %t):%p:",
+ "<sigqueue> failed\n"),
+ -1);
+ }
+ break;
+
+ case ACE_Proactor::AIO_CONTROL_BLOCKS:
+ // @@ Debugging.
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Accept_Handler::handle_input: AIO_CONTROL_BLOCKS\n"));
+
+ // Send the Result through the notification pipe.
+ if (this->proactor_->notify_asynch_accept (result) == -1)
+ return -1;
+ break;
+
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "%N:%l:Unknow POSIX_COMPLETION_STRATEGY\n"),
+ -1);
+ }
+
+ return 0;
+}
+#endif /* ACE_HAS_AIO_CALLS */
+
+#if defined (ACE_HAS_AIO_CALLS)
// We need accept handlers here.
ACE_Asynch_Accept::ACE_Asynch_Accept (void)
: accept_handler_ (0)
{
- ACE_NEW (this->accept_handler_,
- ACE_Asynch_Accept_Handler (&this->reactor_));
+
}
#else /* Not ACE_HAS_AIO_CALLS */
ACE_Asynch_Accept::ACE_Asynch_Accept (void)
@@ -915,7 +1018,6 @@ ACE_Asynch_Accept::open(ACE_Handler &handler,
const void *completion_key,
ACE_Proactor *proactor)
{
-
ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::open called\n"));
// Common things to register for any Asynch Operation. We need to
@@ -927,6 +1029,13 @@ ACE_Asynch_Accept::open(ACE_Handler &handler,
if (result_open < 0)
return result_open;
+ // Init the Asynch_Accept_Handler now. Only now it can be inited,
+ // because it needs to keep Proactor also with it.
+ ACE_NEW_RETURN (this->accept_handler_,
+ ACE_Asynch_Accept_Handler (&this->reactor_,
+ this->proactor_),
+ -1);
+
// Register the handle with the reactor.
this->reactor_.register_handler (this->handle_,
this->accept_handler_,
@@ -940,6 +1049,8 @@ ACE_Asynch_Accept::open(ACE_Handler &handler,
// will do the <handle_events> on the reactor.
ACE_Thread_Manager::instance ()->spawn (ACE_Asynch_Accept::thread_function,
(void *)&this->reactor_);
+
+ return 0;
}
#endif /* ACE_HAS_AIO_CALLS */
@@ -1142,149 +1253,314 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred,
// ************************************************************
#if defined (ACE_HAS_AIO_CALLS)
-ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor)
- : reactor_ (reactor)
+class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler
{
-}
+ // = TITLE
+ // Auxillary handler for doing <Asynch_Transmit_File> in
+ // Unix. <ACE_Asynch_Transmit_File> internally uses this.
+ //
+ // = DESCRIPTION
+ // This is a helper class for implementing
+ // <ACE_Asynch_Transmit_File> in Unix systems.
+public:
+ ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
+ ACE_Proactor *proactor);
+ // Constructor. Result pointer will have all the information to do
+ // the file transmission (socket, file, application handler, bytes
+ // to write....) and the the <proactor> pointer tells this class
+ // the <proactor> that is being used by the
+ // Asynch_Transmit_Operation and the application.
+
+ virtual ~ACE_Asynch_Transmit_Handler (void);
+ // Destructor.
-ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void)
+ int transmit (void);
+ // Do the transmission. All the info to do the transmission is in
+ // the <result> member.
+
+protected:
+ virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
+ // This is called when asynchronous writes from the socket complete.
+
+ virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
+ // This is called when asynchronous reads from the file complete.
+private:
+ int initiate_read_file (void);
+ // Issue asynch read from the file.
+
+ ACE_Asynch_Transmit_File::Result *result_;
+ // The asynch result pointer made from the initial transmit file
+ // request.
+
+ ACE_Proactor *proactor_;
+ // The Proactor that is being used by the application handler and
+ // so the Asynch_Transmit_File.
+
+ ACE_Asynch_Read_File rf_;
+ // To read from the file to be transmitted.
+
+ ACE_Asynch_Write_Stream ws_;
+ // Write stream to write the header, trailer and the data.
+
+ ACE_Message_Block *mb_;
+ // Message bloack used to do the txn.
+
+ enum ACT
+ {
+ HEADER_ACT = 1,
+ DATA_ACT = 2,
+ TRAILER_ACT = 3
+ };
+
+ ACT header_act_;
+ ACT data_act_;
+ ACT trailer_act_;
+ // ACT to transmit header, data and trailer.
+
+ size_t file_offset_;
+ // Current offset of the file being transmitted.
+
+ size_t file_size_;
+ // Total size of the file.
+
+ size_t bytes_transferred_;
+ // Number of bytes transferred on the stream.
+
+ size_t transmit_file_done_;
+ // Flag to indicate that the transmitting is over.
+};
+
+// Constructor.
+ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
+ ACE_Proactor *proactor)
+ : result_ (result),
+ proactor_ (proactor),
+ mb_ (0),
+ header_act_ (this->HEADER_ACT),
+ data_act_ (this->DATA_ACT),
+ trailer_act_ (this->TRAILER_ACT),
+ file_offset_ (result->offset ()),
+ file_size_ (0),
+ bytes_transferred_ (0),
+ transmit_file_done_ (0)
{
+ // Allocate memory for the message block.
+ ACE_NEW (this->mb_,
+ ACE_Message_Block (this->result_->bytes_per_send ()
+ + 1));
+ // Init the file size.
+ file_size_ = ACE_OS::filesize (this->result_->file ());
}
-ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (void)
+// Destructor.
+ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void)
{
+ delete result_;
+ delete mb_;
}
+// Do the transmission.
+// Initiate transmitting the header. When that completes
+// handle_write_stream will be called, there start transmitting the file.
int
-ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result)
+ACE_Asynch_Transmit_Handler::transmit (void)
{
- ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n"));
+ // Open Asynch_Read_File.
+ if (this->rf_.open (*this,
+ this->result_->file ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
+ -1);
- // The queue is updated by main thread in the register function call and
- // thru the auxillary thread in the deregister fun. So let us mutex
- // it.
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1);
+ // Open Asynch_Write_Stream.
+ if (this->ws_.open (*this,
+ this->result_->socket ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
+ -1);
- // Insert this result to the queue.
- int insert_result = this->result_queue_.enqueue_tail (result);
- if (insert_result == -1)
+ // Transmit the header.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
+ this->result_->header_and_trailer ()->header_bytes (),
+ (void *) &this->header_act_) == -1)
ACE_ERROR_RETURN ((LM_ERROR,
- "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"),
- -1);
+ "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
+ -1);
- // If this is the only item, then it means there the set was empty
- // before. So enable the <handle> in the reactor.
- if (this->result_queue_.size () == 1)
- this->reactor_->resume_handlers ();
+ // @@ We need to finish the transmitting, before returning to the
+ // user code. Otherwise <transmit> may not be atmoic. User's other
+ // <read> or <write> on the same socket might damage the order of
+ // the current file transmission.
+ int error = 0;
+ while (!error && !this->transmit_file_done_)
+ error = this->proactor_->handle_events ();
+
+ if (!error && this->transmit_file_done_)
+ // No error, transmission done.
+ return 0;
+ else
+ return -1;
}
-ACE_Asynch_Accept::Result*
-ACE_Asynch_Accept_Handler::deregister_accept_call (void)
+void
+ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
- // The queue is updated by main thread in the register function call and
- // thru the auxillary thread in the deregister fun. So let us mutex
- // it.
- ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0);
+ // Update bytes transferred so far.
+ this->bytes_transferred_ += result.bytes_transferred ();
+
+ // Check the success parameter.
+ if (result.success () == 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_File failed.\n"));
+
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ 0); // @@ Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ // This is crucial to prevent memory leaks
+ delete this;
+ }
+ }
- // Get the first item (result ptr) from the Queue.
- ACE_Asynch_Accept::Result* result = 0;
- int return_dequeue = this->result_queue_.dequeue_head (result);
- if (return_dequeue == -1)
- return 0;
- // Sanity check.
- if (result == 0)
- return 0;
+ // Write stream successful.
- // Disable the <handle> in the reactor if no <accept>'s are
- // pending.
- if (this->result_queue_.size () == 0)
- this->reactor_->suspend_handlers ();
+ // Partial write to socket.
+ int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
+ if (unsent_data != 0)
+ {
+ // Reset pointers.
+ result.message_block ().rd_ptr (result.bytes_transferred ());
+
+ // Duplicate the message block and retry remaining data
+ if (this->ws_.write (*result.message_block ().duplicate (),
+ unsent_data,
+ result.act ()) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Asynch_Transmit_Handler:write_stream failed\n"));
+ return;
+ }
- // Return the result pointer.
- return result;
+ // @@ Handling *partial write* to a socket. Let us not continue
+ // further before this write finishes. Because proceeding with
+ // another read and then write might change the order of the
+ // file transmission, because partial write to the stream is
+ // always possible.
+ return;
+ }
+
+ // Not a partial write.
+
+ // Check ACT to see what was sent.
+ ACT act = *(ACT *) result.act ();
+
+ switch (act)
+ {
+ case TRAILER_ACT:
+ // If it is the "trailer" that is just sent, then transmit file
+ // is complete.
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 1, // @@ Success.
+ 0, // @@ Completion key.
+ 0); // @@ Errno.
+ }
+ ACE_SEH_FINALLY
+ {
+ transmit_file_done_ = 1;
+ delete this;
+ }
+ break;
+
+ case HEADER_ACT:
+ case DATA_ACT:
+ // If header/data was sent, initiate the file data transmission.
+ if (this->initiate_read_file () == -1)
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
+ break;
+
+ default:
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
+ }
}
-int
-ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd)
+void
+ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
{
- // An <accept> has been sensed on the <listen_handle>. We should be
- // able to just go ahead and do the <accept> now on this <fd>. This
- // should be the same as the <listen_handle>.
-
- // @@ Debugging.
- ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n"));
-
- // Deregister this info pertaining to this <accept> call.
- ACE_Asynch_Accept::Result* result = this->deregister_accept_call ();
-
- // @@ Debugging.
- ACE_DEBUG ((LM_DEBUG,
- "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n",
- fd,
- result->listen_handle ()));
+ // Failure.
+ if (result.success () == 0)
+ {
+ //
+ ACE_SEH_TRY
+ {
+ this->result_->complete (this->bytes_transferred_,
+ 0, // Failure.
+ 0, // @@ Completion key.
+ errno); // Error no.
+ }
+ ACE_SEH_FINALLY
+ {
+ delete this;
+ }
+ return;
+ }
- // There is not going to be any data read. So we can use the
- // <message_block> itself to take the <remote_address> as well as
- // the size of that address.
-
- // We will have atleast 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)).
- size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr);
-
- // Parameters for the <accept> call.
- int *address_size = (int *)result->message_block ().wr_ptr ();
- *address_size = buffer_size;
-
- // Increment the wr_ptr.
- result->message_block ().wr_ptr (sizeof (int));
-
- // Issue <accept> now.
- // @@ We shouldnt block here since we have already done poll/select
- // thru reactor. But are we sure?
- ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (),
- (struct sockaddr *) result->message_block ().wr_ptr (),
- address_size);
- if (new_handle == ACE_INVALID_HANDLE)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p:\n",
- "<accept> system call failed"),
- -1);
+ // Read successful.
+ if (result.bytes_transferred () == 0)
+ return;
- // Accept has completed.
+ // Increment offset and write data to network.
+ this->file_offset_ += result.bytes_transferred ();
+ if (this->ws_.write (result.message_block (),
+ result.bytes_transferred (),
+ (void *)&this->data_act_) == -1)
+ {
+ // @@ Handle this error.
+ ACE_ERROR ((LM_ERROR,
+ "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
+ return;
+ }
+}
- // Update the <wr_ptr> for the <message block>.
- result->message_block ().wr_ptr (*address_size);
-
- // @@ Just debugging.
- ACE_DEBUG ((LM_DEBUG,
- "%N:%l:Address_size = [%d], New_handle = [%d]\n",
- *address_size, new_handle));
-
- // Store the new handle.
- result->accept_handle_ = new_handle;
-
- // Signal the main process about this completion.
-
- // Get this process id.
- pid_t pid = ACE_OS::getpid ();
- if (pid == (pid_t) -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p\n",
- "<getpid> failed."),
- -1);
-
- // Set the signal information.
- sigval value;
- value.sival_ptr = (void *) result;
-
- // Queue the signal.
- if (sigqueue (pid, ACE_SIG_AIO, value) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:(%P | %t):%p:\n",
- "<sigqueue> failed"),
- -1);
-
- return 0;
-}
+int
+ACE_Asynch_Transmit_Handler::initiate_read_file (void)
+{
+ // Is there something to read.
+ if (this->file_offset_ >= this->file_size_)
+ {
+ // File is sent. Send the trailer.
+ if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
+ this->result_->header_and_trailer ()->trailer_bytes (),
+ (void *)&this->trailer_act_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
+ -1);
+ return 0;
+ }
+ else
+ {
+ // Inititiate an asynchronous read from the file.
+ if (this->rf_.read (*this->mb_,
+ this->mb_->size () - 1,
+ this->file_offset_) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "Error:Asynch_Transmit_Handler::read from file failed\n"),
+ -1);
+ return 0;
+ }
+}
#endif /* ACE_HAS_AIO_CALLS */
// ************************************************************
@@ -1617,239 +1893,6 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void)
}
}
-#if defined (ACE_HAS_AIO_CALLS)
-// Constructor.
-ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result,
- ACE_Proactor *proactor)
- : result_ (result),
- proactor_ (proactor),
- mb_ (0),
- header_act_ (this->HEADER_ACT),
- data_act_ (this->DATA_ACT),
- trailer_act_ (this->TRAILER_ACT),
- file_offset_ (result->offset ()),
- file_size_ (0),
- bytes_transferred_ (0),
- transmit_file_done_ (0)
-{
- // Allocate memory for the message block.
- ACE_NEW (this->mb_,
- ACE_Message_Block (this->result_->bytes_per_send ()
- + 1));
- // Init the file size.
- file_size_ = ACE_OS::filesize (this->result_->file ());
-}
-
-// Destructor.
-ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void)
-{
- delete result_;
- delete mb_;
-}
-
-// Do the transmission.
-// Initiate transmitting the header. When that completes
-// handle_write_stream will be called, there start transmitting the file.
-int
-ACE_Asynch_Transmit_Handler::transmit (void)
-{
- // Open Asynch_Read_File.
- if (this->rf_.open (*this,
- this->result_->file ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
- -1);
-
- // Open Asynch_Write_Stream.
- if (this->ws_.open (*this,
- this->result_->socket ()) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
- -1);
-
- // Transmit the header.
- if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
- this->result_->header_and_trailer ()->header_bytes (),
- (void *) &this->header_act_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
- -1);
-
- // @@ We need to finish the transmitting, before returning to the
- // user code. Otherwise <transmit> may not be atmoic. User's other
- // <read> or <write> on the same socket might damage the order of
- // the current file transmission.
- int error = 0;
- while (!error && !this->transmit_file_done_)
- error = this->proactor_->handle_events ();
-
- if (!error && this->transmit_file_done_)
- // No error, transmission done.
- return 0;
- else
- return -1;
-}
-
-void
-ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
-{
- // Update bytes transferred so far.
- this->bytes_transferred_ += result.bytes_transferred ();
-
- // Check the success parameter.
- if (result.success () == 0)
- {
- ACE_ERROR ((LM_ERROR,
- "Asynch_Transmit_File failed.\n"));
-
- ACE_SEH_TRY
- {
- this->result_->complete (this->bytes_transferred_,
- 0, // Failure.
- 0, // @@ Completion key.
- 0); // @@ Error no.
- }
- ACE_SEH_FINALLY
- {
- // This is crucial to prevent memory leaks
- delete this;
- }
- }
-
- // Write stream successful.
-
- // Partial write to socket.
- int unsent_data = result.bytes_to_write () - result.bytes_transferred ();
- if (unsent_data != 0)
- {
- // Reset pointers.
- result.message_block ().rd_ptr (result.bytes_transferred ());
-
- // Duplicate the message block and retry remaining data
- if (this->ws_.write (*result.message_block ().duplicate (),
- unsent_data,
- result.act ()) == -1)
- {
- // @@ Handle this error.
- ACE_ERROR ((LM_ERROR,
- "Asynch_Transmit_Handler:write_stream failed\n"));
- return;
- }
-
- // @@ Handling *partial write* to a socket. Let us not continue
- // further before this write finishes. Because proceeding with
- // another read and then write might change the order of the
- // file transmission, because partial write to the stream is
- // always possible.
- return;
- }
-
- // Not a partial write.
-
- // Check ACT to see what was sent.
- ACT act = *(ACT *) result.act ();
-
- switch (act)
- {
- case TRAILER_ACT:
- // If it is the "trailer" that is just sent, then transmit file
- // is complete.
- ACE_SEH_TRY
- {
- this->result_->complete (this->bytes_transferred_,
- 1, // @@ Success.
- 0, // @@ Completion key.
- 0); // @@ Errno.
- }
- ACE_SEH_FINALLY
- {
- transmit_file_done_ = 1;
- delete this;
- }
- break;
-
- case HEADER_ACT:
- case DATA_ACT:
- // If header/data was sent, initiate the file data transmission.
- if (this->initiate_read_file () == -1)
- // @@ Handle this error.
- ACE_ERROR ((LM_ERROR,
- "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
- break;
-
- default:
- // @@ Handle this error.
- ACE_ERROR ((LM_ERROR,
- "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
- }
-}
-
-void
-ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
-{
- // Failure.
- if (result.success () == 0)
- {
- //
- ACE_SEH_TRY
- {
- this->result_->complete (this->bytes_transferred_,
- 0, // Failure.
- 0, // @@ Completion key.
- errno); // Error no.
- }
- ACE_SEH_FINALLY
- {
- delete this;
- }
- return;
- }
-
- // Read successful.
- if (result.bytes_transferred () == 0)
- return;
-
- // Increment offset and write data to network.
- this->file_offset_ += result.bytes_transferred ();
- if (this->ws_.write (result.message_block (),
- result.bytes_transferred (),
- (void *)&this->data_act_) == -1)
- {
- // @@ Handle this error.
- ACE_ERROR ((LM_ERROR,
- "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
- return;
- }
-}
-
-int
-ACE_Asynch_Transmit_Handler::initiate_read_file (void)
-{
- // Is there something to read.
- if (this->file_offset_ >= this->file_size_)
- {
- // File is sent. Send the trailer.
- if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
- this->result_->header_and_trailer ()->trailer_bytes (),
- (void *)&this->trailer_act_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
- -1);
- return 0;
- }
- else
- {
- // Inititiate an asynchronous read from the file.
- if (this->rf_.read (*this->mb_,
- this->mb_->size () - 1,
- this->file_offset_) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "Error:Asynch_Transmit_Handler::read from file failed\n"),
- -1);
- return 0;
- }
-}
-#endif /* ACE_HAS_AIO_CALLS */
// ************************************************************