diff options
author | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-18 23:12:59 +0000 |
---|---|---|
committer | alex <alex@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1998-09-18 23:12:59 +0000 |
commit | 7498dbf073a617d26bf8e96b4421eddd239a2559 (patch) | |
tree | c866e6c7d518af63c04747da5114564873ea497a /ace/Asynch_IO.cpp | |
parent | bfd5dd563b4fa392bd7614f6440969660e9b2eb4 (diff) | |
download | ATCD-7498dbf073a617d26bf8e96b4421eddd239a2559.tar.gz |
*** empty log message ***
Diffstat (limited to 'ace/Asynch_IO.cpp')
-rw-r--r-- | ace/Asynch_IO.cpp | 343 |
1 files changed, 299 insertions, 44 deletions
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 957d454079c..dbfabdc79d7 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -18,6 +18,132 @@ ACE_RCSID(ace, Asynch_IO, "$Id$") #include "ace/Asynch_IO.i" #endif /* __ACE_INLINE__ */ +#if defined (ACE_HAS_AIO_CALLS) +class ACE_Export ACE_Asynch_Accept_Handler : public ACE_Event_Handler +{ + // = TITLE + // For the POSIX implementation, this class takes care of doing + // Asynch_Accept. + // + // = DESCRIPTION + // +public: + ACE_Asynch_Accept_Handler (ACE_Reactor* reactor); + // Constructor. + + ~ACE_Asynch_Accept_Handler (void); + // Destructor. + + int register_accept_call (ACE_Asynch_Accept::Result* result); + // Register this <accept> call with the local handler. + + virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); + // Called when accept event comes up on the <listen_handle>. + +private: + ACE_Asynch_Accept::Result* deregister_accept_call (void); + // Undo the things done when registering. + + ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> result_queue_; + // Queue of Result pointers that correspond to all the <accept>'s + // pending. + + ACE_Reactor* reactor_; + // Reactor used by the Asynch_Accept. We need this here to enable + // and disable the <handle> now and then, depending on whether any + // <accept> is pending or no. + + ACE_Thread_Mutex lock_; + // The lock to protect the result queue which is shared. The queue + // is updated by main thread in the register function call and + // through the auxillary thread in the deregister fun. So let us + // mutex it. + + ACE_Asynch_Accept_Handler (void); + // Default constructor shouldn't be called. Without a reactor, this + // class wont make sense. +}; +#endif /* ACE_HAS_AIO_CALLS*/ + +#if defined (ACE_HAS_AIO_CALLS) +class ACE_Export ACE_Asynch_Transmit_Handler : public ACE_Handler +{ + // = TITLE + // Auxillary handler for doing <Asynch_Transmit_File> in + // Unix. <ACE_Asynch_Transmit_File> internally uses this. + // + // = DESCRIPTION + // This is a helper class for implementing + // <ACE_Asynch_Transmit_File> in Unix systems. +public: + ACE_Asynch_Transmit_Handler (ACE_Asynch_Transmit_File::Result *result, + ACE_Proactor *proactor); + // Constructor. Result pointer will have all the information to do + // the file transmission (socket, file, application handler, bytes + // to write....) and the the <proactor> pointer tells this class + // the <proactor> that is being used by the + // Asynch_Transmit_Operation and the application. + + virtual ~ACE_Asynch_Transmit_Handler (void); + // Destructor. + + int transmit (void); + // Do the transmission. All the info to do the transmission is in + // the <result> member. + +protected: + virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); + // This is called when asynchronous writes from the socket complete. + + virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result); + // This is called when asynchronous reads from the file complete. +private: + int initiate_read_file (void); + // Issue asynch read from the file. + + ACE_Asynch_Transmit_File::Result *result_; + // The asynch result pointer made from the initial transmit file + // request. + + ACE_Proactor *proactor_; + // The Proactor that is being used by the application handler and + // so the Asynch_Transmit_File. + + ACE_Asynch_Read_File rf_; + // To read from the file to be transmitted. + + ACE_Asynch_Write_Stream ws_; + // Write stream to write the header, trailer and the data. + + ACE_Message_Block *mb_; + // Message bloack used to do the txn. + + enum ACT + { + HEADER_ACT = 1, + DATA_ACT = 2, + TRAILER_ACT = 3 + }; + + ACT header_act_; + ACT data_act_; + ACT trailer_act_; + // ACT to transmit header, data and trailer. + + size_t file_offset_; + // Current offset of the file being transmitted. + + size_t file_size_; + // Total size of the file. + + size_t bytes_transferred_; + // Number of bytes transferred on the stream. + + size_t transmit_file_done_; + // Flag to indicate that the transmitting is over. +}; +#endif /* ACE_HAS_AIO_CALLS */ + ACE_Asynch_Result::ACE_Asynch_Result (ACE_Handler &handler, const void* act, ACE_HANDLE event, @@ -138,13 +264,16 @@ ACE_Asynch_Operation::open (ACE_Handler &handler, } #if defined (ACE_HAS_AIO_CALLS) + // AIO stuff is present. So no registering. // @@ But something has to be done to associate completion key with // the handle provided. How about a HashTable of size "the number of // file descriptors that are possible in the system". ACE_UNUSED_ARG (completion_key); return 0; + #else /* ACE_HAS_AIO_CALLS */ + // Register with the <proactor>. return this->proactor_->register_handle (this->handle_, completion_key); @@ -155,8 +284,10 @@ int ACE_Asynch_Operation::cancel (void) { #if defined (ACE_HAS_AIO_CALLS) + // @@ <aio_cancel> will come here soon. return 0; + #elif (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) && (defined (_MSC_VER) && (_MSC_VER > 1020)) // All I/O operations that are canceled will complete with the error // ERROR_OPERATION_ABORTED. All completion notifications for the I/O @@ -764,7 +895,43 @@ ACE_Asynch_Write_File::Result::complete (u_long bytes_transferred, // ************************************************************ ACE_Asynch_Accept::ACE_Asynch_Accept (void) + : accept_handler_ (0) { + ACE_NEW (this->accept_handler_, + ACE_Asynch_Accept_Handler (&this->reactor_)); +} + +int +ACE_Asynch_Accept::open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor) +{ + + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::open called\n")); + + // Common things to register for any Asynch Operation. We need to + // call the base class' <open> method. + int result_open = this->ACE_Asynch_Operation::open (handler, + handle, + completion_key, + proactor); + if (result_open < 0) + return result_open; + + // Register the handle with the reactor. + this->reactor_.register_handler (this->handle_, + this->accept_handler_, + ACE_Event_Handler::ACCEPT_MASK); + + // Suspend the <handle> now. Enable only when the <accept> is issued + // by the application. + this->reactor_.suspend_handlers (); + + // Spawn the thread. It is the only thread we are going to have. It + // will do the <handle_events> on the reactor. + ACE_Thread_Manager::instance ()->spawn (ACE_Asynch_Accept::thread_function, + (void *)&this->reactor_); } int @@ -774,7 +941,10 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, const void *act) { #if (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS)) - + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::accept called\n")); + // Common code for both WIN and POSIX. // Sanity check: make sure that enough space has been allocated by @@ -807,6 +977,7 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, } #endif /* Not ACE_HAS_AIO_CALLS */ + // Common code for both WIN and POSIX. Result *result = 0; ACE_NEW_RETURN (result, Result (*this->handler_, @@ -821,23 +992,9 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, #if defined (ACE_HAS_AIO_CALLS) // Code specific to the POSIX Implementation. - // Make the auxillary class for doing accept and initiate - // accept. This class will be delete itself when the accept succeeds - // or some failures occur there. - ACE_Asynch_Accept_Handler *accept_handler = 0; - - ACE_NEW_RETURN (accept_handler, - ::ACE_Asynch_Accept_Handler (result), - -1); + // Register this <accept> call with the local handler. + this->accept_handler_->register_accept_call (result); - int return_val = accept_handler->activate (THR_NEW_LWP | - THR_DETACHED); - if (return_val == -1) - // Something went wrong. - ACE_ERROR_RETURN ((LM_ERROR, - "Error:%p:\n", - "Could not create thread to do Asynch_Accept"), - -1); return 0; #else /* Not ACE_HAS_AIO_CALLS */ // Code specific to WIN platforms. @@ -886,6 +1043,31 @@ ACE_Asynch_Accept::accept (ACE_Message_Block &message_block, #endif /* (defined (ACE_HAS_WINNT4) && (ACE_HAS_WINNT4 != 0)) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || (defined (ACE_HAS_AIO_CALLS) */ } +void* +ACE_Asynch_Accept::thread_function (void* arg_reactor) +{ + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n")); + + // Retrieve the reactor pointer from the argument. + ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor; + + // For this reactor, this thread is the owner. + reactor->owner (ACE_OS::thr_self ()); + + // Handle events. + int result = 0; + while (result != -1) + { + result = reactor->handle_events (); + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n", + result)); + } + + ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n")); + return 0; +} + // ************************************************************ u_long @@ -949,22 +1131,86 @@ ACE_Asynch_Accept::Result::complete (u_long bytes_transferred, // ************************************************************ #if defined (ACE_HAS_AIO_CALLS) -ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Asynch_Accept::Result *result) - : result_ (result) +ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (ACE_Reactor* reactor) + : reactor_ (reactor) { } ACE_Asynch_Accept_Handler::~ACE_Asynch_Accept_Handler (void) { - // We shouldnt delete the result_ inside, because the completion is - // going thru the proactor's handle_events, it will delete the - // result after the calling the application specific code. +} + +ACE_Asynch_Accept_Handler::ACE_Asynch_Accept_Handler (void) +{ +} + +int +ACE_Asynch_Accept_Handler::register_accept_call (ACE_Asynch_Accept::Result* result) +{ + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::register_accept_call called\n")); + + // The queue is updated by main thread in the register function call and + // thru the auxillary thread in the deregister fun. So let us mutex + // it. + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); + + // Insert this result to the queue. + int insert_result = this->result_queue_.enqueue_tail (result); + if (insert_result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "%N:%l:ACE_Asynch_Accept_Handler::register_accept_call failed\n"), + -1); + + // If this is the only item, then it means there the set was empty + // before. So enable the <handle> in the reactor. + if (this->result_queue_.size () == 1) + this->reactor_->resume_handlers (); +} + +ACE_Asynch_Accept::Result* +ACE_Asynch_Accept_Handler::deregister_accept_call (void) +{ + // The queue is updated by main thread in the register function call and + // thru the auxillary thread in the deregister fun. So let us mutex + // it. + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0); + + // Get the first item (result ptr) from the Queue. + ACE_Asynch_Accept::Result* result = 0; + int return_dequeue = this->result_queue_.dequeue_head (result); + if (return_dequeue == -1) + return 0; + // Sanity check. + if (result == 0) + return 0; + + // Disable the <handle> in the reactor if no <accept>'s are + // pending. + if (this->result_queue_.size () == 0) + this->reactor_->suspend_handlers (); + + // Return the result pointer. + return result; } int -ACE_Asynch_Accept_Handler::svc (void) +ACE_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd) { - // @@ Can this wait forever for accept? + // An <accept> has been sensed on the <listen_handle>. We should be + // able to just go ahead and do the <accept> now on this <fd>. This + // should be the same as the <listen_handle>. + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept_Handler::handle_input called\n")); + + // Deregister this info pertaining to this <accept> call. + ACE_Asynch_Accept::Result* result = this->deregister_accept_call (); + + // @@ Debugging. + ACE_DEBUG ((LM_DEBUG, + "(%t):ACE_Asynch_Accept_Handler::handle_input : fd = [%d], Result->listen_handle = [%d]\n", + fd, + result->listen_handle ())); // There is not going to be any data read. So we can use the // <message_block> itself to take the <remote_address> as well as @@ -974,20 +1220,17 @@ ACE_Asynch_Accept_Handler::svc (void) size_t buffer_size = sizeof (sockaddr_in) + sizeof (sockaddr); // Parameters for the <accept> call. - int *address_size = (int *)this->result_->message_block ().wr_ptr (); + int *address_size = (int *)result->message_block ().wr_ptr (); *address_size = buffer_size; // Increment the wr_ptr. - this->result_->message_block ().wr_ptr (sizeof (int)); - - // @@ Debugging. - ACE_DEBUG ((LM_DEBUG, - "Asynch_Accept_Handler::svc : Listen_handle = [%d]\n", - this->result_->listen_handle ())); + result->message_block ().wr_ptr (sizeof (int)); // Issue <accept> now. - ACE_HANDLE new_handle = ACE_OS::accept (this->result_->listen_handle (), - (struct sockaddr *) this->result_->message_block ().wr_ptr (), + // @@ We shouldnt block here since we have already done poll/select + // thru reactor. But are we sure? + ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), + (struct sockaddr *) result->message_block ().wr_ptr (), address_size); if (new_handle == ACE_INVALID_HANDLE) ACE_ERROR_RETURN ((LM_ERROR, @@ -998,13 +1241,15 @@ ACE_Asynch_Accept_Handler::svc (void) // Accept has completed. // Update the <wr_ptr> for the <message block>. - this->result_->message_block ().wr_ptr (*address_size); + result->message_block ().wr_ptr (*address_size); // @@ Just debugging. - ACE_DEBUG ((LM_DEBUG, "%N:%l:Address_size = [%d], New_handle = [%d]\n", *address_size, new_handle)); + ACE_DEBUG ((LM_DEBUG, + "%N:%l:Address_size = [%d], New_handle = [%d]\n", + *address_size, new_handle)); // Store the new handle. - this->result_->accept_handle_ = new_handle; + result->accept_handle_ = new_handle; // Signal the main process about this completion. @@ -1012,27 +1257,23 @@ ACE_Asynch_Accept_Handler::svc (void) pid_t pid = ACE_OS::getpid (); if (pid == (pid_t) -1) ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p:<getpid> failed.\n"), + "Error:(%P | %t):%p\n", + "<getpid> failed."), -1); // Set the signal information. sigval value; - value.sival_ptr = (void *) this->result_; + value.sival_ptr = (void *) result; // Queue the signal. if (sigqueue (pid, ACE_SIG_AIO, value) == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error:(%P | %t):%p:\n", - "<sigqueue> failed.\n"), + "<sigqueue> failed"), -1); - // @@ Just debugging. - ACE_DEBUG ((LM_DEBUG, - "(%P | %t): Accept is done. Signal is queued. Exiting thread\n")); - return 0; } - #endif /* ACE_HAS_AIO_CALLS */ // ************************************************************ @@ -1718,4 +1959,18 @@ ACE_Service_Handler::open (ACE_HANDLE, { } +#if defined (ACE_HAS_AIO_CALLS) +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>; +template class ACE_Node<ACE_Asynch_Accept::Result*>; +template class ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*>; +template class ACE_Unbounded_Queue_Iterator<ACE_Asynch_Accept::Result*>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> +#pragma instantiate ACE_Node<ACE_Asynch_Accept::Result*> +#pragma instantiate ACE_Unbounded_Queue<ACE_Asynch_Accept::Result*> +#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_Asynch_Accept::Result*> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ +#endif /* ACE_HAS_AIO_CALLS */ + #endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS*/ |