From 036c81e60d05de7d1613dbf9b29f3be7cf2ab5a1 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 25 Sep 1998 19:51:45 +0000 Subject: Implemented Asynch_Accept to work for AIO_CONTROL_BLOCKS strategy of completion notification. Defined an auxillary Accept_Handler called ACE_AIO_Accept_Handler in addition to the ACE_Asynch_Accept_Handler. ACE_AIO_Accept_Handler holds the notification pipe and does a read on it to handle the coming from Asynch_Accept_handler. THANKS to Doug and Irfan for suggesting this 'notification pipe' based implementation for AIO_CONTROL_BLOCKS strategy, so that Proactor will work on platforms where POSIX4 RT_Signals are broken (Solaris 2.6) --- ace/Asynch_IO.cpp | 1111 ++++++++++++++++++++++++++++------------------------- 1 file changed, 577 insertions(+), 534 deletions(-) (limited to 'ace/Asynch_IO.cpp') diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 192836fe56e..505c5f9b1ae 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -18,131 +18,8 @@ ACE_RCSID(ace, Asynch_IO, "$Id$") #include "ace/Asynch_IO.i" #endif /* __ACE_INLINE__ */ -#if defined (ACE_HAS_AIO_CALLS) -class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler -{ - // = TITLE - // For the POSIX implementation, this class takes care of doing - // Asynch_Accept. - // - // = DESCRIPTION - // -public: - ACE_Asynch_Accept_Handler (ACE_Reactor* reactor); - // Constructor. - - ~ACE_Asynch_Accept_Handler (void); - // Destructor. - - int register_accept_call (ACE_Asynch_Accept::Result* result); - // Register this call with the local handler. - - virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - // Called when accept event comes up on the . - -private: - ACE_Asynch_Accept::Result* deregister_accept_call (void); - // Undo the things done when registering. - - ACE_Unbounded_Queue result_queue_; - // Queue of Result pointers that correspond to all the 's - // pending. - - ACE_Reactor* reactor_; - // Reactor used by the Asynch_Accept. We need this here to enable - // and disable the now and then, depending on whether any - // is pending or no. - - ACE_Thread_Mutex lock_; - // The lock to protect the result queue which is shared. The queue - // is updated by main thread in the register function call and - // through the auxillary thread in the deregister fun. So let us - // mutex it. - - ACE_Asynch_Accept_Handler (void); - // Default constructor shouldn't be called. Without a reactor, this - // class wont make sense. -}; -#endif /* ACE_HAS_AIO_CALLS*/ - -#if defined (ACE_HAS_AIO_CALLS) -class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler -{ - // = TITLE - // Auxillary handler for doing in - // Unix. internally uses this. - // - // = DESCRIPTION - // This is a helper class for implementing - // in Unix systems. -public: - ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result, - ACE_Proactor *proactor); - // Constructor. Result pointer will have all the information to do - // the file transmission (socket, file, application handler, bytes - // to write....) and the the pointer tells this class - // the that is being used by the - // Asynch_Transmit_Operation and the application. - - virtual ~ACE_Asynch_Transmit_Handler (void); - // Destructor. - - int transmit (void); - // Do the transmission. All the info to do the transmission is in - // the member. - -protected: - virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); - // This is called when asynchronous writes from the socket complete. - - virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); - // This is called when asynchronous reads from the file complete. -private: - int initiate_read_file (void); - // Issue asynch read from the file. - - ACE_Asynch_Transmit_File::Result *result_; - // The asynch result pointer made from the initial transmit file - // request. - - ACE_Proactor *proactor_; - // The Proactor that is being used by the application handler and - // so the Asynch_Transmit_File. - - ACE_Asynch_Read_File rf_; - // To read from the file to be transmitted. - - ACE_Asynch_Write_Stream ws_; - // Write stream to write the header, trailer and the data. - - ACE_Message_Block *mb_; - // Message bloack used to do the txn. - enum ACT - { - HEADER_ACT = 1, - DATA_ACT = 2, - TRAILER_ACT = 3 - }; - - ACT header_act_; - ACT data_act_; - ACT trailer_act_; - // ACT to transmit header, data and trailer. - - size_t file_offset_; - // Current offset of the file being transmitted. - - size_t file_size_; - // Total size of the file. - size_t bytes_transferred_; - // Number of bytes transferred on the stream. - - size_t transmit_file_done_; - // Flag to indicate that the transmitting is over. -}; -#endif /* ACE_HAS_AIO_CALLS */ ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler, const void* act, @@ -895,78 +772,312 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred, // ************************************************************ #if defined (ACE_HAS_AIO_CALLS) -// We need accept handlers here. -ACE_Asynch_Accept::ACE_Asynch_Accept (void) - : accept_handler_ (0) +class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler { - ACE_NEW (this->accept_handler_, - ACE_Asynch_Accept_Handler (&this->reactor_)); -} -#else /* Not ACE_HAS_AIO_CALLS */ -ACE_Asynch_Accept::ACE_Asynch_Accept (void) + // = TITLE + // For the POSIX implementation, this class takes care of doing + // Asynch_Accept. + // + // = DESCRIPTION + // +public: + ACE_Asynch_Accept_Handler (ACE_Reactor* reactor, + ACE_Proactor* proactor); + // Constructor. Give the reactor so that it can activate/deactivate + // the handlers. Give also the proactor used here, so that the + // handler can send information through the notification pipe of the + // proactor, in case AIO_CONTROL_BLOCKS strategy is used. + + ~ACE_Asynch_Accept_Handler (void); + // Destructor. + + int register_accept_call (ACE_Asynch_Accept::Result* result); + // Register this call with the local handler. + + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + // Called when accept event comes up on the . + +private: + ACE_Asynch_Accept::Result* deregister_accept_call (void); + // Undo the things done when registering. + + ACE_Unbounded_Queue result_queue_; + // Queue of Result pointers that correspond to all the 's + // pending. + + ACE_Reactor* reactor_; + // Reactor used by the Asynch_Accept. We need this here to enable + // and disable the now and then, depending on whether any + // is pending or no. + + ACE_Thread_Mutex lock_; + // The lock to protect the result queue which is shared. The queue + // is updated by main thread in the register function call and + // through the auxillary thread in the deregister fun. So let us + // mutex it. + + ACE_Asynch_Accept_Handler (void); + // Default constructor shouldn't be called. Without a reactor, this + // class wont make sense. + + ACE_Proactor* proactor_; + // Proactor used by the Asynch_Accept class. +}; + +ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor, + ACE_Proactor* proactor) + : reactor_ (reactor), + proactor_ (proactor) +{ +} + +ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void) { } -#endif /* ACE_HAS_AIO_CALLS */ -#if defined (ACE_HAS_AIO_CALLS) int -ACE_Asynch_Accept::open(ACE_Handler &handler, - ACE_HANDLE handle, - const void *completion_key, - ACE_Proactor *proactor) +ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result) { + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n")); - ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::open called\n")); + // The queue is updated by main thread in the register function call and + // thru the auxillary thread in the deregister fun. So let us mutex + // it. + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); - // Common things to register for any Asynch Operation. We need to - // call the base class' method. - int result_open = this->ACE_Asynch_Operation::open (handler, - handle, - completion_key, - proactor); - if (result_open < 0) - return result_open; - - // Register the handle with the reactor. - this->reactor_.register_handler (this->handle_, - this->accept_handler_, - ACE_Event_Handler::ACCEPT_MASK); + // Insert this result to the queue. + int insert_result = this->result_queue_.enqueue_tail (result); + if (insert_result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"), + -1); - // Suspend the now. Enable only when the is issued - // by the application. - this->reactor_.suspend_handlers (); + // If this is the only item, then it means there the set was empty + // before. So enable the in the reactor. + if (this->result_queue_.size () == 1) + this->reactor_->resume_handlers (); - // Spawn the thread. It is the only thread we are going to have. It - // will do the on the reactor. - ACE_Thread_Manager::instance ()->spawn (ACE_Asynch_Accept::thread_function, - (void *)&this->reactor_); + return 0; } -#endif /* ACE_HAS_AIO_CALLS */ -int -ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, - u_long bytes_to_read, - ACE_HANDLE accept_handle, - const void *act) +ACE_Asynch_Accept::Result* +ACE_Asynch_Accept_Handler::deregister_accept_call (void) { -#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS)) - - // @@ Debugging. - ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::accept called\n")); + // The queue is updated by main thread in the register function call and + // thru the auxillary thread in the deregister fun. So let us mutex + // it. + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0); - // Common code for both WIN and POSIX. + // Get the first item (result ptr) from the Queue. + ACE_Asynch_Accept::Result* result = 0; + int return_dequeue = this->result_queue_.dequeue_head (result); + if (return_dequeue == -1) + return 0; + // Sanity check. + if (result == 0) + return 0; - // Sanity check: make sure that enough space has been allocated by - // the caller. - size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); - size_t space_in_use = message_block.wr_ptr () - message_block.base (); - size_t total_size = message_block.size (); - size_t available_space = total_size - space_in_use; - size_t space_needed = bytes_to_read + 2 * address_size; - if (available_space < space_needed) - ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1); + // Disable the in the reactor if no 's are + // pending. + if (this->result_queue_.size () == 0) + this->reactor_->suspend_handlers (); -#if !defined (ACE_HAS_AIO_CALLS) + // Return the result pointer. + return result; +} + +int +ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd) +{ + // An has been sensed on the . We should be + // able to just go ahead and do the now on this . This + // should be the same as the . + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n")); + + // Deregister this info pertaining to this call. + ACE_Asynch_Accept::Result* result = this->deregister_accept_call (); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n", + fd, + result->listen_handle ())); + + // There is not going to be any data read. So we can use the + // itself to take the as well as + // the size of that address. + + // We will have atleast 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)). + size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr); + + // Parameters for the call. + int *address_size = (int *)result->message_block ().wr_ptr (); + *address_size = buffer_size; + + // Increment the wr_ptr. + result->message_block ().wr_ptr (sizeof (int)); + + // Issue now. + // @@ We shouldnt block here since we have already done poll/select + // thru reactor. But are we sure? + ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), + (struct sockaddr *) result->message_block ().wr_ptr (), + address_size); + if (new_handle == ACE_INVALID_HANDLE) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p:\n", + " system call failed"), + -1); + + // Accept has completed. + + // Update the for the . + result->message_block ().wr_ptr (*address_size); + + // @@ Just debugging. + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Address_size = [%d], New_handle = [%d]\n", + *address_size, new_handle)); + + // Store the new handle. + result->accept_handle_ = new_handle; + + // Notify the mail process about this completion + // Signal the main process about this completion or send the result + // pointer thru the notify pipe depending on what is Completion + // Notification Strategy. + switch (this->proactor_->posix_completion_strategy ()) + { + case ACE_Proactor::RT_SIGNALS: + // Queue a signal to the proactor. + { + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Accept_Handler::handle_input: RT_SIGNALS\n")); + + // Get this process id. + pid_t pid = ACE_OS::getpid (); + if (pid == (pid_t) -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p:", + " failed\n"), + -1); + + // Set the signal information. + sigval value; + value.sival_ptr = (void *) result; + + // Queue the signal. + if (sigqueue (pid, ACE_SIG_AIO, value) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:(%P | %t):%p:", + " failed\n"), + -1); + } + break; + + case ACE_Proactor::AIO_CONTROL_BLOCKS: + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Accept_Handler::handle_input: AIO_CONTROL_BLOCKS\n")); + + // Send the Result through the notification pipe. + if (this->proactor_->notify_asynch_accept (result) == -1) + return -1; + break; + + default: + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:Unknow POSIX_COMPLETION_STRATEGY\n"), + -1); + } + + return 0; +} +#endif /* ACE_HAS_AIO_CALLS */ + +#if defined (ACE_HAS_AIO_CALLS) +// We need accept handlers here. +ACE_Asynch_Accept::ACE_Asynch_Accept (void) + : accept_handler_ (0) +{ + +} +#else /* Not ACE_HAS_AIO_CALLS */ +ACE_Asynch_Accept::ACE_Asynch_Accept (void) +{ +} +#endif /* ACE_HAS_AIO_CALLS */ + +#if defined (ACE_HAS_AIO_CALLS) +int +ACE_Asynch_Accept::open(ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor) +{ + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::open called\n")); + + // Common things to register for any Asynch Operation. We need to + // call the base class' method. + int result_open = this->ACE_Asynch_Operation::open (handler, + handle, + completion_key, + proactor); + if (result_open < 0) + return result_open; + + // Init the Asynch_Accept_Handler now. Only now it can be inited, + // because it needs to keep Proactor also with it. + ACE_NEW_RETURN (this->accept_handler_, + ACE_Asynch_Accept_Handler (&this->reactor_, + this->proactor_), + -1); + + // Register the handle with the reactor. + this->reactor_.register_handler (this->handle_, + this->accept_handler_, + ACE_Event_Handler::ACCEPT_MASK); + + // Suspend the now. Enable only when the is issued + // by the application. + this->reactor_.suspend_handlers (); + + // Spawn the thread. It is the only thread we are going to have. It + // will do the on the reactor. + ACE_Thread_Manager::instance ()->spawn (ACE_Asynch_Accept::thread_function, + (void *)&this->reactor_); + + return 0; +} +#endif /* ACE_HAS_AIO_CALLS */ + +int +ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, + u_long bytes_to_read, + ACE_HANDLE accept_handle, + const void *act) +{ +#if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS)) + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::accept called\n")); + + // Common code for both WIN and POSIX. + + // Sanity check: make sure that enough space has been allocated by + // the caller. + size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); + size_t space_in_use = message_block.wr_ptr () - message_block.base (); + size_t total_size = message_block.size (); + size_t available_space = total_size - space_in_use; + size_t space_needed = bytes_to_read + 2 * address_size; + if (available_space < space_needed) + ACE_ERROR_RETURN ((LM_ERROR, ASYS_TEXT ("Buffer too small\n")), -1); + +#if !defined (ACE_HAS_AIO_CALLS) // WIN Specific. int close_accept_handle = 0; @@ -1142,149 +1253,314 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred, // ************************************************************ #if defined (ACE_HAS_AIO_CALLS) -ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor) - : reactor_ (reactor) +class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler { -} + // = TITLE + // Auxillary handler for doing in + // Unix. internally uses this. + // + // = DESCRIPTION + // This is a helper class for implementing + // in Unix systems. +public: + ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result, + ACE_Proactor *proactor); + // Constructor. Result pointer will have all the information to do + // the file transmission (socket, file, application handler, bytes + // to write....) and the the pointer tells this class + // the that is being used by the + // Asynch_Transmit_Operation and the application. + + virtual ~ACE_Asynch_Transmit_Handler (void); + // Destructor. -ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void) -{ -} + int transmit (void); + // Do the transmission. All the info to do the transmission is in + // the member. -ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (void) -{ -} +protected: + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete. -int -ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result) -{ - ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n")); + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + // This is called when asynchronous reads from the file complete. +private: + int initiate_read_file (void); + // Issue asynch read from the file. - // The queue is updated by main thread in the register function call and - // thru the auxillary thread in the deregister fun. So let us mutex - // it. - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); - - // Insert this result to the queue. - int insert_result = this->result_queue_.enqueue_tail (result); - if (insert_result == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"), - -1); - - // If this is the only item, then it means there the set was empty - // before. So enable the in the reactor. - if (this->result_queue_.size () == 1) - this->reactor_->resume_handlers (); -} + ACE_Asynch_Transmit_File::Result *result_; + // The asynch result pointer made from the initial transmit file + // request. -ACE_Asynch_Accept::Result* -ACE_Asynch_Accept_Handler::deregister_accept_call (void) -{ - // The queue is updated by main thread in the register function call and - // thru the auxillary thread in the deregister fun. So let us mutex - // it. - ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0); - - // Get the first item (result ptr) from the Queue. - ACE_Asynch_Accept::Result* result = 0; - int return_dequeue = this->result_queue_.dequeue_head (result); - if (return_dequeue == -1) - return 0; - // Sanity check. - if (result == 0) - return 0; + ACE_Proactor *proactor_; + // The Proactor that is being used by the application handler and + // so the Asynch_Transmit_File. - // Disable the in the reactor if no 's are - // pending. - if (this->result_queue_.size () == 0) - this->reactor_->suspend_handlers (); + ACE_Asynch_Read_File rf_; + // To read from the file to be transmitted. - // Return the result pointer. - return result; -} + ACE_Asynch_Write_Stream ws_; + // Write stream to write the header, trailer and the data. -int -ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd) -{ - // An has been sensed on the . We should be - // able to just go ahead and do the now on this . This - // should be the same as the . + ACE_Message_Block *mb_; + // Message bloack used to do the txn. - // @@ Debugging. - ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n")); - - // Deregister this info pertaining to this call. - ACE_Asynch_Accept::Result* result = this->deregister_accept_call (); - - // @@ Debugging. - ACE_DEBUG ((LM_DEBUG, - "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n", - fd, - result->listen_handle ())); + enum ACT + { + HEADER_ACT = 1, + DATA_ACT = 2, + TRAILER_ACT = 3 + }; - // There is not going to be any data read. So we can use the - // itself to take the as well as - // the size of that address. - - // We will have atleast 2 * (sizeof (sockaddr_in) + sizeof (sockaddr)). - size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr); - - // Parameters for the call. - int *address_size = (int *)result->message_block ().wr_ptr (); - *address_size = buffer_size; - - // Increment the wr_ptr. - result->message_block ().wr_ptr (sizeof (int)); + ACT header_act_; + ACT data_act_; + ACT trailer_act_; + // ACT to transmit header, data and trailer. + + size_t file_offset_; + // Current offset of the file being transmitted. + + size_t file_size_; + // Total size of the file. + + size_t bytes_transferred_; + // Number of bytes transferred on the stream. - // Issue now. - // @@ We shouldnt block here since we have already done poll/select - // thru reactor. But are we sure? - ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), - (struct sockaddr *) result->message_block ().wr_ptr (), - address_size); - if (new_handle == ACE_INVALID_HANDLE) + size_t transmit_file_done_; + // Flag to indicate that the transmitting is over. +}; + +// Constructor. +ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result, + ACE_Proactor *proactor) + : result_ (result), + proactor_ (proactor), + mb_ (0), + header_act_ (this->HEADER_ACT), + data_act_ (this->DATA_ACT), + trailer_act_ (this->TRAILER_ACT), + file_offset_ (result->offset ()), + file_size_ (0), + bytes_transferred_ (0), + transmit_file_done_ (0) +{ + // Allocate memory for the message block. + ACE_NEW (this->mb_, + ACE_Message_Block (this->result_->bytes_per_send () + + 1)); + // Init the file size. + file_size_ = ACE_OS::filesize (this->result_->file ()); +} + +// Destructor. +ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void) +{ + delete result_; + delete mb_; +} + +// Do the transmission. +// Initiate transmitting the header. When that completes +// handle_write_stream will be called, there start transmitting the file. +int +ACE_Asynch_Transmit_Handler::transmit (void) +{ + // Open Asynch_Read_File. + if (this->rf_.open (*this, + this->result_->file ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p:\n", - " system call failed"), + "ACE_Asynch_Transmit_Handler:read_file open failed\n"), -1); - - // Accept has completed. - // Update the for the . - result->message_block ().wr_ptr (*address_size); - - // @@ Just debugging. - ACE_DEBUG ((LM_DEBUG, - "%N:%l:Address_size = [%d], New_handle = [%d]\n", - *address_size, new_handle)); + // Open Asynch_Write_Stream. + if (this->ws_.open (*this, + this->result_->socket ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "ACE_Asynch_Transmit_Handler:write_stream open failed\n"), + -1); - // Store the new handle. - result->accept_handle_ = new_handle; + // Transmit the header. + if (this->ws_.write (*this->result_->header_and_trailer ()->header (), + this->result_->header_and_trailer ()->header_bytes (), + (void *) &this->header_act_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"), + -1); - // Signal the main process about this completion. + // @@ We need to finish the transmitting, before returning to the + // user code. Otherwise may not be atmoic. User's other + // or on the same socket might damage the order of + // the current file transmission. + int error = 0; + while (!error && !this->transmit_file_done_) + error = this->proactor_->handle_events (); - // Get this process id. - pid_t pid = ACE_OS::getpid (); - if (pid == (pid_t) -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - " failed."), - -1); + if (!error && this->transmit_file_done_) + // No error, transmission done. + return 0; + else + return -1; +} + +void +ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +{ + // Update bytes transferred so far. + this->bytes_transferred_ += result.bytes_transferred (); + + // Check the success parameter. + if (result.success () == 0) + { + ACE_ERROR ((LM_ERROR, + "Asynch_Transmit_File failed.\n")); + + ACE_SEH_TRY + { + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + 0); // @@ Error no. + } + ACE_SEH_FINALLY + { + // This is crucial to prevent memory leaks + delete this; + } + } - // Set the signal information. - sigval value; - value.sival_ptr = (void *) result; + // Write stream successful. - // Queue the signal. - if (sigqueue (pid, ACE_SIG_AIO, value) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p:\n", - " failed"), - -1); + // Partial write to socket. + int unsent_data = result.bytes_to_write () - result.bytes_transferred (); + if (unsent_data != 0) + { + // Reset pointers. + result.message_block ().rd_ptr (result.bytes_transferred ()); + + // Duplicate the message block and retry remaining data + if (this->ws_.write (*result.message_block ().duplicate (), + unsent_data, + result.act ()) == -1) + { + // @@ Handle this error. + ACE_ERROR ((LM_ERROR, + "Asynch_Transmit_Handler:write_stream failed\n")); + return; + } + + // @@ Handling *partial write* to a socket. Let us not continue + // further before this write finishes. Because proceeding with + // another read and then write might change the order of the + // file transmission, because partial write to the stream is + // always possible. + return; + } + + // Not a partial write. + + // Check ACT to see what was sent. + ACT act = *(ACT *) result.act (); + + switch (act) + { + case TRAILER_ACT: + // If it is the "trailer" that is just sent, then transmit file + // is complete. + ACE_SEH_TRY + { + this->result_->complete (this->bytes_transferred_, + 1, // @@ Success. + 0, // @@ Completion key. + 0); // @@ Errno. + } + ACE_SEH_FINALLY + { + transmit_file_done_ = 1; + delete this; + } + break; + + case HEADER_ACT: + case DATA_ACT: + // If header/data was sent, initiate the file data transmission. + if (this->initiate_read_file () == -1) + // @@ Handle this error. + ACE_ERROR ((LM_ERROR, + "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n")); + break; + + default: + // @@ Handle this error. + ACE_ERROR ((LM_ERROR, + "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n")); + } +} + +void +ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) +{ + // Failure. + if (result.success () == 0) + { + // + ACE_SEH_TRY + { + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + errno); // Error no. + } + ACE_SEH_FINALLY + { + delete this; + } + return; + } + + // Read successful. + if (result.bytes_transferred () == 0) + return; - return 0; -} + // Increment offset and write data to network. + this->file_offset_ += result.bytes_transferred (); + if (this->ws_.write (result.message_block (), + result.bytes_transferred (), + (void *)&this->data_act_) == -1) + { + // @@ Handle this error. + ACE_ERROR ((LM_ERROR, + "Error:ACE_Asynch_Transmit_File : write to the stream failed\n")); + return; + } +} + +int +ACE_Asynch_Transmit_Handler::initiate_read_file (void) +{ + // Is there something to read. + if (this->file_offset_ >= this->file_size_) + { + // File is sent. Send the trailer. + if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (), + this->result_->header_and_trailer ()->trailer_bytes (), + (void *)&this->trailer_act_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"), + -1); + return 0; + } + else + { + // Inititiate an asynchronous read from the file. + if (this->rf_.read (*this->mb_, + this->mb_->size () - 1, + this->file_offset_) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "Error:Asynch_Transmit_Handler::read from file failed\n"), + -1); + return 0; + } +} #endif /* ACE_HAS_AIO_CALLS */ // ************************************************************ @@ -1617,239 +1893,6 @@ ACE_Asynch_Transmit_File::Header_And_Trailer::transmit_buffers (void) } } -#if defined (ACE_HAS_AIO_CALLS) -// Constructor. -ACE_Asynch_Transmit_Handler::ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result, - ACE_Proactor *proactor) - : result_ (result), - proactor_ (proactor), - mb_ (0), - header_act_ (this->HEADER_ACT), - data_act_ (this->DATA_ACT), - trailer_act_ (this->TRAILER_ACT), - file_offset_ (result->offset ()), - file_size_ (0), - bytes_transferred_ (0), - transmit_file_done_ (0) -{ - // Allocate memory for the message block. - ACE_NEW (this->mb_, - ACE_Message_Block (this->result_->bytes_per_send () - + 1)); - // Init the file size. - file_size_ = ACE_OS::filesize (this->result_->file ()); -} - -// Destructor. -ACE_Asynch_Transmit_Handler::~ACE_Asynch_Transmit_Handler (void) -{ - delete result_; - delete mb_; -} - -// Do the transmission. -// Initiate transmitting the header. When that completes -// handle_write_stream will be called, there start transmitting the file. -int -ACE_Asynch_Transmit_Handler::transmit (void) -{ - // Open Asynch_Read_File. - if (this->rf_.open (*this, - this->result_->file ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "ACE_Asynch_Transmit_Handler:read_file open failed\n"), - -1); - - // Open Asynch_Write_Stream. - if (this->ws_.open (*this, - this->result_->socket ()) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "ACE_Asynch_Transmit_Handler:write_stream open failed\n"), - -1); - - // Transmit the header. - if (this->ws_.write (*this->result_->header_and_trailer ()->header (), - this->result_->header_and_trailer ()->header_bytes (), - (void *) &this->header_act_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"), - -1); - - // @@ We need to finish the transmitting, before returning to the - // user code. Otherwise may not be atmoic. User's other - // or on the same socket might damage the order of - // the current file transmission. - int error = 0; - while (!error && !this->transmit_file_done_) - error = this->proactor_->handle_events (); - - if (!error && this->transmit_file_done_) - // No error, transmission done. - return 0; - else - return -1; -} - -void -ACE_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) -{ - // Update bytes transferred so far. - this->bytes_transferred_ += result.bytes_transferred (); - - // Check the success parameter. - if (result.success () == 0) - { - ACE_ERROR ((LM_ERROR, - "Asynch_Transmit_File failed.\n")); - - ACE_SEH_TRY - { - this->result_->complete (this->bytes_transferred_, - 0, // Failure. - 0, // @@ Completion key. - 0); // @@ Error no. - } - ACE_SEH_FINALLY - { - // This is crucial to prevent memory leaks - delete this; - } - } - - // Write stream successful. - - // Partial write to socket. - int unsent_data = result.bytes_to_write () - result.bytes_transferred (); - if (unsent_data != 0) - { - // Reset pointers. - result.message_block ().rd_ptr (result.bytes_transferred ()); - - // Duplicate the message block and retry remaining data - if (this->ws_.write (*result.message_block ().duplicate (), - unsent_data, - result.act ()) == -1) - { - // @@ Handle this error. - ACE_ERROR ((LM_ERROR, - "Asynch_Transmit_Handler:write_stream failed\n")); - return; - } - - // @@ Handling *partial write* to a socket. Let us not continue - // further before this write finishes. Because proceeding with - // another read and then write might change the order of the - // file transmission, because partial write to the stream is - // always possible. - return; - } - - // Not a partial write. - - // Check ACT to see what was sent. - ACT act = *(ACT *) result.act (); - - switch (act) - { - case TRAILER_ACT: - // If it is the "trailer" that is just sent, then transmit file - // is complete. - ACE_SEH_TRY - { - this->result_->complete (this->bytes_transferred_, - 1, // @@ Success. - 0, // @@ Completion key. - 0); // @@ Errno. - } - ACE_SEH_FINALLY - { - transmit_file_done_ = 1; - delete this; - } - break; - - case HEADER_ACT: - case DATA_ACT: - // If header/data was sent, initiate the file data transmission. - if (this->initiate_read_file () == -1) - // @@ Handle this error. - ACE_ERROR ((LM_ERROR, - "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n")); - break; - - default: - // @@ Handle this error. - ACE_ERROR ((LM_ERROR, - "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n")); - } -} - -void -ACE_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) -{ - // Failure. - if (result.success () == 0) - { - // - ACE_SEH_TRY - { - this->result_->complete (this->bytes_transferred_, - 0, // Failure. - 0, // @@ Completion key. - errno); // Error no. - } - ACE_SEH_FINALLY - { - delete this; - } - return; - } - - // Read successful. - if (result.bytes_transferred () == 0) - return; - - // Increment offset and write data to network. - this->file_offset_ += result.bytes_transferred (); - if (this->ws_.write (result.message_block (), - result.bytes_transferred (), - (void *)&this->data_act_) == -1) - { - // @@ Handle this error. - ACE_ERROR ((LM_ERROR, - "Error:ACE_Asynch_Transmit_File : write to the stream failed\n")); - return; - } -} - -int -ACE_Asynch_Transmit_Handler::initiate_read_file (void) -{ - // Is there something to read. - if (this->file_offset_ >= this->file_size_) - { - // File is sent. Send the trailer. - if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (), - this->result_->header_and_trailer ()->trailer_bytes (), - (void *)&this->trailer_act_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"), - -1); - return 0; - } - else - { - // Inititiate an asynchronous read from the file. - if (this->rf_.read (*this->mb_, - this->mb_->size () - 1, - this->file_offset_) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:Asynch_Transmit_Handler::read from file failed\n"), - -1); - return 0; - } -} -#endif /* ACE_HAS_AIO_CALLS */ // ************************************************************ -- cgit v1.2.1