diff options
Diffstat (limited to 'ace/POSIX_Asynch_IO.cpp')
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 114 |
1 files changed, 63 insertions, 51 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 6c3683ed4fb..f789eb9cdec 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -78,7 +78,6 @@ ACE_POSIX_Asynch_Result::signal_number (void) const { return this->aio_sigevent.sigev_signo; } - int ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl) { @@ -237,6 +236,12 @@ ACE_POSIX_SIG_Asynch_Operation::~ACE_POSIX_SIG_Asynch_Operation (void) { } +int +ACE_POSIX_SIG_Asynch_Operation::register_aio_with_proactor (ACE_POSIX_Asynch_Result *result) +{ + return this->posix_proactor ()->register_aio_with_proactor (result); +} + // ********************************************************************* u_long @@ -534,13 +539,12 @@ ACE_POSIX_SIG_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Resu // We want queuing of RT signal to notify completion. result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; result->aio_sigevent.sigev_signo = result->signal_number (); - - // Keep ACE_POSIX_Asynch_Result, the base class pointer in the - // signal value. - ACE_POSIX_Asynch_Result *base_result = result; - result->aio_sigevent.sigev_value.sival_ptr = ACE_reinterpret_cast (void *, - base_result); + result->aio_sigevent.sigev_value.sival_ptr = (void *) result; + // Register the real-time signal with the Proactor. + if (this->register_aio_with_proactor (result) == -1) + return -1; + // Fire off the aio read. if (aio_read (result) == -1) // Queueing failed. @@ -867,13 +871,12 @@ ACE_POSIX_SIG_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_R // We want queuing of RT signal to notify completion. result->aio_sigevent.sigev_notify = SIGEV_SIGNAL; result->aio_sigevent.sigev_signo = result->signal_number (); + result->aio_sigevent.sigev_value.sival_ptr = (void *) result; - // Keep ACE_POSIX_Asynch_Result, the base class pointer in the - // signal value. - ACE_POSIX_Asynch_Result *base_result = result; - result->aio_sigevent.sigev_value.sival_ptr = ACE_reinterpret_cast (void *, - base_result); - + // Register the real-time signal with the Proactor. + if (this->register_aio_with_proactor (result) == -1) + return -1; + // Fire off the aio write. if (aio_write (result) == -1) // Queueing failed. @@ -1725,12 +1728,12 @@ protected: ACE_POSIX_Proactor *posix_proactor_; // POSIX_Proactor. - + ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_; // Queue of Result pointers that correspond to all the <accept>'s // pending. - ACE_SYNCH_MUTEX lock_; + ACE_Thread_Mutex lock_; // The lock to protect the result queue which is shared. The queue // is updated by main thread in the register function call and // through the auxillary thread in the deregister fun. So let us @@ -1836,7 +1839,7 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void) // The queue is updated by main thread in the register function call and // thru the auxillary thread in the deregister fun. So let us mutex // it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, 0); // Get the first item (result ptr) from the Queue. ACE_POSIX_Asynch_Accept_Result* result = 0; @@ -1882,13 +1885,13 @@ ACE_POSIX_AIOCB_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Ac // The queue is updated by main thread in the register function call // and thru the auxillary thread in the deregister fun. So let us // mutex it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); return register_accept_call_i (result); } int -ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) +ACE_POSIX_AIOCB_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd) { // An <accept> has been sensed on the <listen_handle>. We should be // able to just go ahead and do the <accept> now on this <fd>. This @@ -1949,17 +1952,18 @@ ACE_POSIX_SIG_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Acce // The queue is updated by main thread in the register function call // and thru the auxillary thread in the deregister fun. So let us // mutex it. - ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1); + ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); // Do the work. if (this->register_accept_call_i (result) == -1) return -1; - - return 0; + + // Also register the real-time signal. + return this->posix_proactor_->register_aio_with_proactor (result); } int -ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) +ACE_POSIX_SIG_Asynch_Accept_Handler::handle_input (ACE_HANDLE fd) { // An <accept> has been sensed on the <listen_handle>. We should be // able to just go ahead and do the <accept> now on this <fd>. This @@ -2090,7 +2094,7 @@ ACE_POSIX_AIOCB_Asynch_Accept::open (ACE_Handler &handler, // Spawn the thread. It is the only thread we are going to have. It // will do the <handle_events> on the reactor. return_val = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_AIOCB_Asynch_Accept::thread_function, - ACE_reinterpret_cast (void *, &this->reactor_)); + (void *) &this->reactor_); if (return_val == -1) ACE_ERROR_RETURN ((LM_ERROR, "%N:%l:Thread_Manager::spawn failed\n"), @@ -2106,9 +2110,10 @@ ACE_POSIX_AIOCB_Asynch_Accept::~ACE_POSIX_AIOCB_Asynch_Accept (void) void* ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) { + ACE_DEBUG ((LM_DEBUG, "ACE_Asynch_Accept::thread_function called\n")); + // Retrieve the reactor pointer from the argument. - ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *, - arg_reactor); + ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor; // It should be valid Reactor, since we have a reactor_ ,e,ner we // are passing only that one here. @@ -2125,7 +2130,13 @@ ACE_POSIX_AIOCB_Asynch_Accept::thread_function (void* arg_reactor) while (result != -1) { result = reactor->handle_events (); + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Accept::Thread_Function : handle_events : result = [%d]\n", + result)); } + + ACE_DEBUG ((LM_DEBUG, "Exiting ACE_Asynch_Accept::thread_function \n")); + return 0; } @@ -2251,8 +2262,7 @@ void* ACE_POSIX_SIG_Asynch_Accept::thread_function (void* arg_reactor) { // Retrieve the reactor pointer from the argument. - ACE_Reactor* reactor = ACE_reinterpret_cast (ACE_Reactor *, - arg_reactor); + ACE_Reactor* reactor = (ACE_Reactor *) arg_reactor; if (reactor == 0) reactor = ACE_Reactor::instance (); @@ -2466,9 +2476,7 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler // = DESCRIPTION // // This is a helper class for implementing - // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. This class - // abstracts out all the commonalities in the two different - // POSIX Transmit Handler implementations. + // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. public: virtual ~ACE_POSIX_Asynch_Transmit_Handler (void); @@ -2670,7 +2678,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::transmit (void) // Transmit the header. if (this->ws_.write (*this->result_->header_and_trailer ()->header (), this->result_->header_and_trailer ()->header_bytes (), - ACE_reinterpret_cast (void *, &this->header_act_), + (void *) &this->header_act_, 0) == -1) ACE_ERROR_RETURN ((LM_ERROR, "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"), @@ -2687,25 +2695,32 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W // Check the success parameter. if (result.success () == 0) { - // Failure. ACE_ERROR ((LM_ERROR, "Asynch_Transmit_File failed.\n")); - - ACE_SEH_TRY - { - this->result_->complete (this->bytes_transferred_, - 0, // Failure. - 0, // @@ Completion key. - 0); // @@ Error no. - } - ACE_SEH_FINALLY + + // Check the success parameter. + if (result.success () == 0) { - // This is crucial to prevent memory leaks. This deletes - // the result pointer also. - delete this; + // Failure. + ACE_ERROR ((LM_ERROR, + "Asynch_Transmit_File failed.\n")); + + ACE_SEH_TRY + { + this->result_->complete (this->bytes_transferred_, + 0, // Failure. + 0, // @@ Completion key. + 0); // @@ Error no. + } + ACE_SEH_FINALLY + { + // This is crucial to prevent memory leaks. This deletes + // the result pointer also. + delete this; + } } } - + // Write stream successful. // Partial write to socket. @@ -2739,7 +2754,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W // Not a partial write. A full write. // Check ACT to see what was sent. - ACT act = * (ACT *) result.act (); + ACT act = *(ACT *) result.act (); switch (act) { @@ -2801,10 +2816,8 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read if (result.bytes_transferred () == 0) return; - // Increment offset. + // Increment offset and write data to network. this->file_offset_ += result.bytes_transferred (); - - // Write data to network. if (this->ws_.write (result.message_block (), result.bytes_transferred (), (void *)&this->data_act_, @@ -2905,8 +2918,7 @@ ACE_POSIX_SIG_Asynch_Transmit_Handler::transmit (void) // Transmit the header. if (this->ws_.write (*this->result_->header_and_trailer ()->header (), this->result_->header_and_trailer ()->header_bytes (), - ACE_reinterpret_cast (void *, - &this->header_act_), + (void *) &this->header_act_, this->result_->priority (), this->result_->signal_number ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, |