diff options
author | schmidt <douglascraigschmidt@users.noreply.github.com> | 2002-03-28 15:01:21 +0000 |
---|---|---|
committer | schmidt <douglascraigschmidt@users.noreply.github.com> | 2002-03-28 15:01:21 +0000 |
commit | fdde942e7fc7504258fa14140e21422d4561f90e (patch) | |
tree | 678de7d60a0c2ab2897e4bbf0b872915a8634b50 /ace/POSIX_Asynch_IO.cpp | |
parent | 8cbbb383832e36927b4b1b145233a049d4696fcd (diff) | |
download | ATCD-fdde942e7fc7504258fa14140e21422d4561f90e.tar.gz |
ChangeLogTag:Thu Mar 28 06:15:22 2002 Alex Libman <AlexL@rumblegroup.com>
Diffstat (limited to 'ace/POSIX_Asynch_IO.cpp')
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 1198 |
1 files changed, 659 insertions, 539 deletions
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 010200327b6..8e2347eed3c 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -181,37 +181,16 @@ ACE_POSIX_Asynch_Operation::open (ACE_Handler &handler, int ACE_POSIX_Asynch_Operation::cancel (void) { - ACE_Proactor *p = this->proactor () ; - - if (!p) - return -1; - - ACE_POSIX_Proactor * p_impl = ACE_dynamic_cast - (ACE_POSIX_Proactor *, - p->implementation ()); - if (!p_impl) + if (!posix_aiocb_proactor_) return -1; - // For ACE_SUN_Proactor and ACE_POSIX_AIOCB_Proactor - // and ACE_POSIX_SIG_Proactor now ! - // we should call cancel_aio (this->handle_) - // method to cancel correctly all deferred AIOs - - switch (p_impl->get_impl_type ()) + switch (posix_aiocb_proactor_->get_impl_type ()) { case ACE_POSIX_Proactor::PROACTOR_SUN: case ACE_POSIX_Proactor::PROACTOR_AIOCB: case ACE_POSIX_Proactor::PROACTOR_SIG: - { - ACE_POSIX_AIOCB_Proactor * p_impl_aiocb = ACE_dynamic_cast - (ACE_POSIX_AIOCB_Proactor *, - p_impl); - - if (! p_impl_aiocb) - return -1; - - return p_impl_aiocb->cancel_aio (this->handle_); - } + return posix_aiocb_proactor_->cancel_aio (this->handle_); + default: break; } @@ -225,41 +204,29 @@ ACE_POSIX_Asynch_Operation::proactor (void) const return this->proactor_; } -ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation (void) -{ -} - -ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (void) - : ACE_Asynch_Operation_Impl (), - handler_ (0), - handle_ (ACE_INVALID_HANDLE) -{ -} - -// ********************************************************************* - ACE_POSIX_AIOCB_Proactor * -ACE_POSIX_AIOCB_Asynch_Operation::posix_proactor (void) const +ACE_POSIX_Asynch_Operation::posix_proactor (void) const { return this->posix_aiocb_proactor_; } -ACE_POSIX_AIOCB_Asynch_Operation::ACE_POSIX_AIOCB_Asynch_Operation (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) - : ACE_Asynch_Operation_Impl (), - ACE_POSIX_Asynch_Operation (), - posix_aiocb_proactor_ (posix_aiocb_proactor) +int +ACE_POSIX_Asynch_Operation::register_and_start_aio (ACE_POSIX_Asynch_Result *result, + int op) { + return this->posix_proactor ()->register_and_start_aio (result, op); } -ACE_POSIX_AIOCB_Asynch_Operation::~ACE_POSIX_AIOCB_Asynch_Operation (void) +ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation (void) { } -int -ACE_POSIX_AIOCB_Asynch_Operation::register_and_start_aio (ACE_POSIX_Asynch_Result *result, - int op) +ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) + : ACE_Asynch_Operation_Impl (), + handler_ (0), + handle_ (ACE_INVALID_HANDLE), + posix_aiocb_proactor_ (posix_aiocb_proactor) { - return this->posix_proactor ()->register_and_start_aio (result, op); } // ********************************************************************* @@ -400,15 +367,15 @@ ACE_POSIX_Asynch_Read_Stream_Result::post_completion (ACE_Proactor_Impl *proacto // ************************************************************ -ACE_POSIX_AIOCB_Asynch_Read_Stream::ACE_POSIX_AIOCB_Asynch_Read_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Read_Stream::ACE_POSIX_Asynch_Read_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Read_Stream_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor) + ACE_POSIX_Asynch_Operation (posix_aiocb_proactor) { } int -ACE_POSIX_AIOCB_Asynch_Read_Stream::read (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block, u_long bytes_to_read, const void *act, int priority, @@ -427,13 +394,6 @@ ACE_POSIX_AIOCB_Asynch_Read_Stream::read (ACE_Message_Block &message_block, signal_number), -1); - // we do not need shared_read anymore - //ssize_t return_val = this->shared_read (result); - - // try to start read - // we will setup aio_sigevent later - // in ACE_POSIX_AIOCB/SIG_Proactor::register_and_start_aio () - ssize_t return_val = this->register_and_start_aio (result, 0); if (return_val == -1) @@ -442,26 +402,16 @@ ACE_POSIX_AIOCB_Asynch_Read_Stream::read (ACE_Message_Block &message_block, return return_val; } -ACE_POSIX_AIOCB_Asynch_Read_Stream::~ACE_POSIX_AIOCB_Asynch_Read_Stream (void) +ACE_POSIX_Asynch_Read_Stream::~ACE_POSIX_Asynch_Read_Stream (void) { } -//int -//ACE_POSIX_AIOCB_Asynch_Read_Stream::shared_read (ACE_POSIX_Asynch_Read_Stream_Result *result) -//{ -// -// result->aio_sigevent.sigev_notify = SIGEV_NONE; -// -// // try start read -// return register_and_start_aio (result, 0); -//} - // Methods belong to ACE_POSIX_Asynch_Operation base class. These // methods are defined here to avoid dominance warnings. They route // the call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Read_Stream::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Read_Stream::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -473,13 +423,13 @@ ACE_POSIX_AIOCB_Asynch_Read_Stream::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Read_Stream::cancel (void) +ACE_POSIX_Asynch_Read_Stream::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Read_Stream::proactor (void) const +ACE_POSIX_Asynch_Read_Stream::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } @@ -623,15 +573,15 @@ ACE_POSIX_Asynch_Write_Stream_Result::post_completion (ACE_Proactor_Impl *proact // ********************************************************************* -ACE_POSIX_AIOCB_Asynch_Write_Stream::ACE_POSIX_AIOCB_Asynch_Write_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Write_Stream::ACE_POSIX_Asynch_Write_Stream (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Write_Stream_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor) + ACE_POSIX_Asynch_Operation (posix_aiocb_proactor) { } int -ACE_POSIX_AIOCB_Asynch_Write_Stream::write (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Write_Stream::write (ACE_Message_Block &message_block, u_long bytes_to_write, const void *act, int priority, @@ -649,13 +599,6 @@ ACE_POSIX_AIOCB_Asynch_Write_Stream::write (ACE_Message_Block &message_block, signal_number), -1); - // we do not need shared_write anymore - //ssize_t return_val = this->shared_write (result); - - // try to start write - // we will setup aio_sigevent later - // in ACE_POSIX_AIOCB/SIG_Proactor::register_and_start_aio () - ssize_t return_val = this->register_and_start_aio (result, 1); if (return_val == -1) @@ -664,26 +607,17 @@ ACE_POSIX_AIOCB_Asynch_Write_Stream::write (ACE_Message_Block &message_block, return return_val; } -ACE_POSIX_AIOCB_Asynch_Write_Stream::~ACE_POSIX_AIOCB_Asynch_Write_Stream (void) +ACE_POSIX_Asynch_Write_Stream::~ACE_POSIX_Asynch_Write_Stream (void) { } -//int -//ACE_POSIX_AIOCB_Asynch_Write_Stream::shared_write (ACE_POSIX_Asynch_Write_Stream_Result *result) -//{ -// -// result->aio_sigevent.sigev_notify = SIGEV_NONE; -// -// // try start write -// return register_and_start_aio (result, 1); -//} // Methods belong to ACE_POSIX_Asynch_Operation base class. These // methods are defined here to avoid dominance warnings. They route // the call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Write_Stream::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Write_Stream::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -695,20 +629,19 @@ ACE_POSIX_AIOCB_Asynch_Write_Stream::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Write_Stream::cancel (void) +ACE_POSIX_Asynch_Write_Stream::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Write_Stream::proactor (void) const +ACE_POSIX_Asynch_Write_Stream::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } // ********************************************************************* - ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, @@ -861,16 +794,16 @@ ACE_POSIX_Asynch_Read_File_Result::post_completion (ACE_Proactor_Impl *proactor) // ********************************************************************* -ACE_POSIX_AIOCB_Asynch_Read_File::ACE_POSIX_AIOCB_Asynch_Read_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Read_File::ACE_POSIX_Asynch_Read_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Read_Stream_Impl (), ACE_Asynch_Read_File_Impl (), - ACE_POSIX_AIOCB_Asynch_Read_Stream (posix_aiocb_proactor) + ACE_POSIX_Asynch_Read_Stream (posix_aiocb_proactor) { } int -ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block, u_long bytes_to_read, u_long offset, u_long offset_high, @@ -892,7 +825,6 @@ ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block, signal_number), -1); - //ssize_t return_val = this->shared_read (result); ssize_t return_val = this->register_and_start_aio (result, 0); if (return_val == -1) @@ -901,18 +833,18 @@ ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block, return return_val; } -ACE_POSIX_AIOCB_Asynch_Read_File::~ACE_POSIX_AIOCB_Asynch_Read_File (void) +ACE_POSIX_Asynch_Read_File::~ACE_POSIX_Asynch_Read_File (void) { } int -ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block, u_long bytes_to_read, const void *act, int priority, int signal_number) { - return ACE_POSIX_AIOCB_Asynch_Read_Stream::read (message_block, + return ACE_POSIX_Asynch_Read_Stream::read (message_block, bytes_to_read, act, priority, @@ -924,7 +856,7 @@ ACE_POSIX_AIOCB_Asynch_Read_File::read (ACE_Message_Block &message_block, // the call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Read_File::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Read_File::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -936,13 +868,13 @@ ACE_POSIX_AIOCB_Asynch_Read_File::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Read_File::cancel (void) +ACE_POSIX_Asynch_Read_File::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Read_File::proactor (void) const +ACE_POSIX_Asynch_Read_File::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } @@ -1101,16 +1033,16 @@ ACE_POSIX_Asynch_Write_File_Result::post_completion (ACE_Proactor_Impl *proactor // ********************************************************************* -ACE_POSIX_AIOCB_Asynch_Write_File::ACE_POSIX_AIOCB_Asynch_Write_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Write_File::ACE_POSIX_Asynch_Write_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Write_Stream_Impl (), ACE_Asynch_Write_File_Impl (), - ACE_POSIX_AIOCB_Asynch_Write_Stream (posix_aiocb_proactor) + ACE_POSIX_Asynch_Write_Stream (posix_aiocb_proactor) { } int -ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block, u_long bytes_to_write, u_long offset, u_long offset_high, @@ -1132,7 +1064,6 @@ ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block, signal_number), -1); - //ssize_t return_val = this->shared_write (result); ssize_t return_val = this->register_and_start_aio (result, 1); if (return_val == -1) @@ -1141,18 +1072,18 @@ ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block, return return_val; } -ACE_POSIX_AIOCB_Asynch_Write_File::~ACE_POSIX_AIOCB_Asynch_Write_File (void) +ACE_POSIX_Asynch_Write_File::~ACE_POSIX_Asynch_Write_File (void) { } int -ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block, +ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block, u_long bytes_to_write, const void *act, int priority, int signal_number) { - return ACE_POSIX_AIOCB_Asynch_Write_Stream::write (message_block, + return ACE_POSIX_Asynch_Write_Stream::write (message_block, bytes_to_write, act, priority, @@ -1164,7 +1095,7 @@ ACE_POSIX_AIOCB_Asynch_Write_File::write (ACE_Message_Block &message_block, // the call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Write_File::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Write_File::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -1176,13 +1107,13 @@ ACE_POSIX_AIOCB_Asynch_Write_File::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Write_File::cancel (void) +ACE_POSIX_Asynch_Write_File::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Write_File::proactor (void) const +ACE_POSIX_Asynch_Write_File::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } @@ -1330,144 +1261,182 @@ ACE_POSIX_Asynch_Accept_Result::post_completion (ACE_Proactor_Impl *proactor) // ********************************************************************* -class ACE_Export ACE_POSIX_Asynch_Accept_Handler : public ACE_Event_Handler +ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_AIOCB_Proactor * posix_proactor) + : ACE_Asynch_Operation_Impl (), + ACE_Asynch_Accept_Impl (), + ACE_POSIX_Asynch_Operation (posix_proactor), + flg_open_ (0), + task_lock_count_ (0) { - // = TITLE - // For the POSIX implementation this class is common - // for all Proactors (AIOCB/SIG/SUN) - // - // = DESCRIPTION - // - -public: - ~ACE_POSIX_Asynch_Accept_Handler (void); - // Destructor. - - ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor, - ACE_POSIX_Proactor *posix_proactor); - // Constructor. Give the reactor so that it can activate/deactivate - // the handlers. Give also the proactor used here, so that the - // handler can send the <POSIX_Asynch_Accept> result block through - // <post_completion>. - - int cancel_uncompleted (int flg_notify); - // flg_notify points whether or not we should send notification about - // canceled accepts - - - int register_accept_call (ACE_POSIX_Asynch_Accept_Result* result); - // Worker method for registering this <accept> call with the local - // handler. This method obtains lock to access the shared the queues. - - ACE_POSIX_Asynch_Accept_Result* deregister_accept_call (void); - // Method for deregistering. - - int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE); - // Called when accept event comes up on <listen_hanlde> +} -protected: - ACE_Reactor* reactor_; - // Reactor used by the Asynch_Accept. We need this here to enable - // and disable the <handle> now and then, depending on whether any - // <accept> is pending or no. - - ACE_POSIX_Proactor *posix_proactor_; - // POSIX_Proactor implementation. - - ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_; - // Queue of Result pointers that correspond to all the <accept>'s - // pending. - - ACE_SYNCH_MUTEX lock_; - // The lock to protect the result queue which is shared. The queue - // is updated by main thread in the register function call and - // through the auxillary thread in the deregister fun. So let us - // mutex it. -}; +ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void) +{ + this->close (); +} -// ********************************************************************* +ACE_Proactor * +ACE_POSIX_Asynch_Accept::proactor (void) const +{ + return ACE_POSIX_Asynch_Operation::proactor (); +} -ACE_POSIX_Asynch_Accept_Handler::ACE_POSIX_Asynch_Accept_Handler (ACE_Reactor* reactor, - ACE_POSIX_Proactor *posix_proactor) - : reactor_ (reactor), - posix_proactor_ (posix_proactor) +ACE_HANDLE +ACE_POSIX_Asynch_Accept::get_handle (void) const { - ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::ctor"); + return this->handle_; } -ACE_POSIX_Asynch_Accept_Handler::~ACE_POSIX_Asynch_Accept_Handler (void) +void +ACE_POSIX_Asynch_Accept::set_handle (ACE_HANDLE handle) { - ACE_TRACE ("ACE_POSIX_Asynch_Accept_Handler::~ACE_POSIX_Asynch_Accept_Handler"); + ACE_ASSERT (handle_ == ACE_INVALID_HANDLE); + this->handle_ = handle; } int -ACE_POSIX_Asynch_Accept_Handler::register_accept_call (ACE_POSIX_Asynch_Accept_Result* result) +ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor) { - // The queue is updated by main thread in the register function call - // and thru the auxillary thread in the deregister fun. So let us - // mutex it. + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::open\n")); + + int result = 0; + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - // Insert this result to the queue. - int insert_result = this->result_queue_.enqueue_tail (result); - if (insert_result == -1) + // If we are already opened, we could not create a new handler + // without closing the previous. + + if (this->flg_open_ != 0) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:ACE_POSIX_Asynch_Accept_Handler::register_accept_call failed\n"), + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:") + ACE_LIB_TEXT("acceptor already open \n")), -1); + + result = ACE_POSIX_Asynch_Operation::open (handler, + handle, + completion_key, + proactor); + if (result == -1) + return result; - // If this is the only item, then it means there the set was empty - // before. So enable the <handle> in the reactor. - if (this->result_queue_.size () == 1) + flg_open_ = 1; + + task_lock_count_++; + + // At this moment asynch_accept_task does not know about us, so we + // can lock task's token with our lock_ locked. In all other cases + // we should release our lock_ before calling task's methods to + // avoid deadlock + + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); + + result = task.register_acceptor (this, ACE_Event_Handler::ACCEPT_MASK); + + task_lock_count_-- ; + + if (result < 0) { - int return_val = this->reactor_->resume_handler (result->listen_handle ()); - if (return_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:ACE_POSIX_Asynch_Accept_Handler::register_accept_call: " - "Reactor::resume_handler failed\n"), - -1); + + this->flg_open_= 0; + this->handle_ = ACE_INVALID_HANDLE; + + return -1 ; } return 0; } -// @@ We could have a queue where the <result> objects are arranged -// according to the priority. This will help us to demux the accept -// completions based on the priority. (Alex). - -ACE_POSIX_Asynch_Accept_Result * -ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void) +int +ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, + u_long bytes_to_read, + ACE_HANDLE accept_handle, + const void *act, + int priority, + int signal_number) { - // The queue is updated by main thread in the register function call and - // thru the auxillary thread in the deregister fun. So let us mutex - // it. - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n") ); - // Get the first item (result ptr) from the Queue. - ACE_POSIX_Asynch_Accept_Result* result = 0; - if (this->result_queue_.dequeue_head (result) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_Asynch_Accept_Handler::" - "deregister_accept_call:dequeueing failed"), - 0); + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - // ACE_ASSERT (result != 0); + if (this->flg_open_ == 0 ) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept") + ACE_LIB_TEXT("acceptor was not opened before\n")), + -1); - // Disable the <handle> in the reactor if no <accept>'s are pending. - if (this->result_queue_.size () == 0) - { - if (this->reactor_->suspend_handler (result->listen_handle ()) != 0) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_Asynch_Accept_Handler::" - "deregister_accept_call:suspend handler failed"), - 0); - } + // Sanity check: make sure that enough space has been allocated by + // the caller. + size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); + size_t space_in_use = message_block.wr_ptr () - message_block.base (); + size_t total_size = message_block.size (); + size_t available_space = total_size - space_in_use; + size_t space_needed = bytes_to_read + 2 * address_size; + + if (available_space < space_needed) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("Buffer too small\n")), + -1); + + // Common code for both WIN and POSIX. + // Create future Asynch_Accept_Result + ACE_POSIX_Asynch_Accept_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_POSIX_Asynch_Accept_Result (*this->handler_, + this->handle_, + accept_handle, + message_block, + bytes_to_read, + act, + this->posix_proactor()->get_handle (), + priority, + signal_number), + -1); - // Return the result pointer. - return result; -} + // Enqueue result + if (this->result_queue_.enqueue_tail (result) == -1) + { + delete result; // to avoid memory leak + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:") + ACE_LIB_TEXT("enqueue accept call failed\n")), + -1); + } + + if (this->result_queue_.size () > 1) + return 0; + + task_lock_count_ ++; + } + + // If this is the only item, then it means there the set was empty + // before. So enable the <handle> in the reactor. + + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); + + int rc_task = task.resume_acceptor (this); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + task_lock_count_ --; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + + } + + if (rc_task < 0) + return -1; + + return 0; +} //@@ New method cancel_uncompleted // It performs cancellation of all pending requests @@ -1482,9 +1451,9 @@ ACE_POSIX_Asynch_Accept_Handler::deregister_accept_call (void) // int -ACE_POSIX_Asynch_Accept_Handler::cancel_uncompleted (int flg_notify) +ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify) { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::cancel_uncompleted\n")); int retval = 0; @@ -1497,56 +1466,221 @@ ACE_POSIX_Asynch_Accept_Handler::cancel_uncompleted (int flg_notify) if (result == 0) break; - this->reactor_->suspend_handler (result->listen_handle ()); + if (this->flg_open_==0 || flg_notify == 0) //if we should not notify + delete result ; // we have to delete result + else //else notify as any cancelled AIO + { + // Store the new handle. + result->aio_fildes = ACE_INVALID_HANDLE ; + result->set_bytes_transferred (0); + result->set_error (ECANCELED); + + if (this->posix_proactor()->post_completion (result) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::") + ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed") + )); + } + } + return retval; +} + +int +ACE_POSIX_Asynch_Accept::cancel (void) +{ + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::cancel\n")); + + //We are not really ACE_POSIX_Asynch_Operation + //so we could not call ::aiocancel () + // or just write + //return ACE_POSIX_Asynch_Operation::cancel (); + //We delegate real cancelation to cancel_uncompleted (1) + + int rc = -1 ; // ERRORS + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + int num_cancelled = cancel_uncompleted (flg_open_); + + if (num_cancelled == 0) + rc = 1 ; // AIO_ALLDONE + else if (num_cancelled > 0) + rc = 0 ; // AIO_CANCELED + + if (this->flg_open_ == 0) + return rc ; + + task_lock_count_ ++; + } + + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); - if (! flg_notify) //if we should not notify - delete result ; // we have to delete result - else //else notify as any cancelled AIO + int rc_task = task.suspend_acceptor (this); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + + } + + return rc; +} + +int +ACE_POSIX_Asynch_Accept::close () +{ + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::close\n")); + + // 1. It performs cancellation of all pending requests + // 2. Removes itself from Reactor (ACE_POSIX_Asynch_Accept_Task) + // 3. close the socket + // + // Parameter flg_notify can be + // 0 - don't send notifications about canceled accepts + // !0 - notify user about canceled accepts + // according POSIX standards we should receive notifications + // on canceled AIO requests + // + // Return codes : 0 - OK , + // -1 - Errors + + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->cancel_uncompleted (flg_open_); + + if (this->flg_open_ == 0) { - // Store the new handle. - result->aio_fildes = ACE_INVALID_HANDLE ; - result->set_bytes_transferred (0); - result->set_error (ECANCELED); + if (this->handle_ != ACE_INVALID_HANDLE) + { + ACE_OS::closesocket (this->handle_); + this->handle_=ACE_INVALID_HANDLE; + } + return 0; + } - if (this->posix_proactor_->post_completion (result) == -1) - ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_POSIX_Asynch_Accept_Handler::" - "cancel_uncompleted:<post_completion> failed")); + task_lock_count_++; + } + + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); + + int rc_task = task.remove_acceptor (this); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + + if (this->handle_ != ACE_INVALID_HANDLE) + { + ACE_OS::closesocket (this->handle_); + this->handle_=ACE_INVALID_HANDLE; } - } - return retval; + this->flg_open_=0; + } + + return 0; } +int +ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) +{ + ACE_TRACE(ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + // handle_close is called only in one case : + // when Asynch_accept_task is closing (i.e. proactor destructor) + + // In all other cases we deregister ourself with + // ACE_Event_Handler::DONT_CALL mask + + this->cancel_uncompleted (0); + + this->flg_open_ = 0; + + // it means other thread is waiting for reactor token_ + if (task_lock_count_ > 0) + { + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); + + task.lock_finish (); + } + + return 0; +} int -ACE_POSIX_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) +ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) { + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input\n")); + // An <accept> has been sensed on the <listen_handle>. We should be // able to just go ahead and do the <accept> now on this <fd>. This // should be the same as the <listen_handle>. + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + ACE_POSIX_Asynch_Accept_Result* result = 0; + // Deregister this info pertaining to this <accept> call. - ACE_POSIX_Asynch_Accept_Result* result = this->deregister_accept_call (); - if (result == 0) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:deregister_accept_call failed"), - -1); + if (this->result_queue_.dequeue_head (result) != 0) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:") + ACE_LIB_TEXT(" dequeueing failed") + )); + + // Disable the <handle> in the reactor if no <accept>'s are pending. + + // we allow the following sequence of locks : + // reactor::token , then our mutex lock_ + // to avoid deadlock prohibited reverse sequence + + if (this->result_queue_.size () == 0) + { + ACE_POSIX_Asynch_Accept_Task & task = + this->posix_proactor()->get_asynch_accept_task(); + + task.suspend_acceptor (this); + } // Issue <accept> now. // @@ We shouldnt block here since we have already done poll/select // thru reactor. But are we sure? - ACE_HANDLE new_handle = ACE_OS::accept (result->listen_handle (), 0, 0); + + ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0); + + + if (result == 0) // there is nobody to notify + { + ACE_OS::closesocket (new_handle); + return 0; + } + + if (new_handle == ACE_INVALID_HANDLE) { result->set_error(errno); ACE_ERROR ((LM_ERROR, - "%N:%l:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:<accept> system call failed")); + ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ") + ACE_LIB_TEXT(" <accept> system call failed") + )); // Notify client as usual, "AIO" finished with errors } @@ -1556,217 +1690,130 @@ ACE_POSIX_Asynch_Accept_Handler::handle_input (ACE_HANDLE /* fd */) // Notify the main process about this completion // Send the Result through the notification pipe. - if (this->posix_proactor_->post_completion (result) == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "Error:(%P | %t):%p\n", - "ACE_POSIX_AIOCB_Asynch_Accept_Handler::" - "handle_input:<post_completion> failed"), - -1); - + if (this->posix_proactor()->post_completion (result) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ") + ACE_LIB_TEXT(" <post_completion> failed") + )); + return 0; } - // ********************************************************************* -ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor *posix_proactor) - : ACE_Asynch_Operation_Impl (), - ACE_Asynch_Accept_Impl (), - ACE_POSIX_Asynch_Operation (), - accept_handler_ (0), - grp_id_(-1), //thread not spawn - posix_proactor_ (posix_proactor) //save concrete proactor impl. +ACE_POSIX_Asynch_Accept_Task::ACE_POSIX_Asynch_Accept_Task() + : flg_active_ (0), + select_reactor_(), // should be initialized before reactor_ + reactor_ (& select_reactor_, 0), // don't delete implementation + token_ (select_reactor_.lock()), // we can use reactor token + finish_count_(0) { } -int -ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, - u_long bytes_to_read, - ACE_HANDLE accept_handle, - const void *act, - int priority, - int signal_number) -{ - // Sanity check: make sure that enough space has been allocated by - // the caller. - size_t address_size = sizeof (sockaddr_in) + sizeof (sockaddr); - size_t space_in_use = message_block.wr_ptr () - message_block.base (); - size_t total_size = message_block.size (); - size_t available_space = total_size - space_in_use; - size_t space_needed = bytes_to_read + 2 * address_size; - if (available_space < space_needed) - ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("Buffer too small\n")), -1); - - // Common code for both WIN and POSIX. - ACE_POSIX_Asynch_Accept_Result *result = 0; - ACE_NEW_RETURN (result, - ACE_POSIX_Asynch_Accept_Result (*this->handler_, - this->handle_, - accept_handle, - message_block, - bytes_to_read, - act, - this->posix_proactor_->get_handle (), - priority, - signal_number), - -1); - - // Register this <accept> call with the local handler. - if (this->accept_handler_->register_accept_call (result) == -1) - return -1; - - return 0; +ACE_POSIX_Asynch_Accept_Task::~ACE_POSIX_Asynch_Accept_Task() +{ + stop(); } -int -ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, - ACE_HANDLE handle, - const void *completion_key, - ACE_Proactor *proactor) +int +ACE_POSIX_Asynch_Accept_Task::start () { - // check for non zero accept_handler_ - // we could not create a new handler without closing the previous + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - if (this->accept_handler_ != 0) + if (this->flg_active_) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:ACE_POSIX_Asynch_Accept::open:" - "accept_handler_ not null\n"), - -1); - - - int result = ACE_POSIX_Asynch_Operation::open (handler, - handle, - completion_key, - proactor); - if (result == -1) - return result; + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start already started")), + -1); - // Init the Asynch_Accept_Handler now. It needs to keep Proactor - // also with it. - ACE_NEW_RETURN (this->accept_handler_, - ACE_POSIX_Asynch_Accept_Handler (&this->reactor_, - this->posix_proactor_), - -1); - - // Register the handle with the reactor. - int return_val = this->reactor_.register_handler (this->handle_, - this->accept_handler_, - ACE_Event_Handler::ACCEPT_MASK); - if (return_val == -1) + if (this->reactor_.initialized () == 0) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Reactor::register_handler failed\n"), - -1); + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start reactor is not initialized")), + -1); - // Suspend the <handle> now. Enable only when the <accept> is issued - // by the application. - return_val = this->reactor_.suspend_handler (this->handle_); - if (return_val == -1) - ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Reactor::suspend_handler failed\n"), - -1); - // Spawn the thread. It is the only thread we are going to have. It - // will do the <handle_events> on the reactor. - // save group id of the created thread - - grp_id_ = ACE_Thread_Manager::instance ()->spawn (ACE_POSIX_Asynch_Accept::thread_function, - ACE_reinterpret_cast (void *, &this->reactor_)); - if (grp_id_ == -1) + if (this->activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0 ) ACE_ERROR_RETURN ((LM_ERROR, - "%N:%l:Thread_Manager::spawn failed\n"), - -1); + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start failed")), + -1); + this->flg_active_ = 1; return 0; } -ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void) +int +ACE_POSIX_Asynch_Accept_Task::stop () { - this->close (0); // not send notifications to user -} + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); -int -ACE_POSIX_Asynch_Accept::close (int flg_notify) -{ - // 1. It performs cancellation of all pending requests - // 2. Stops and waits for the thread we had created - // 3. Removes accept_handler_ from Reactor - // 4. Deletes accept_handler_ - // 5. close the socket - // - // Parameter flg_notify can be - // 0 - don't send notifications about canceled accepts - // !0 - notify user about canceled accepts - // according POSIX standards we should receive notifications - // on canceled AIO requests - // - // Return codes : 0 - OK , - // -1 - Errors + if (this->flg_active_ == 0 ) // already stopped + return 0; - if (this->accept_handler_) - this->accept_handler_->cancel_uncompleted (flg_notify); + reactor_.end_reactor_event_loop (); + } - //stop and wait for the thread + int rc = this->wait (); - if (grp_id_ != -1) - { - reactor_.end_reactor_event_loop (); + if (rc != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::stop failed")), + -1); - if (ACE_Thread_Manager::instance ()->wait_grp (grp_id_) ==-1) - ACE_ERROR ((LM_ERROR, - "%N:%l:Thread_Manager::wait_grp failed\n")); - else - grp_id_ = -1; - } - //AL remove and destroy accept_handler_ + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + this->flg_active_ = 0; - if (this->accept_handler_ != 0) - { - this->reactor_.remove_handler - (this->accept_handler_, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - - delete this->accept_handler_ ; - this->accept_handler_ = 0 ; - } - - // It looks like a good place to close listen handle here. - // But I am not sure with compatibility with the old programs - // You can comment the closure of the socket + if (this->reactor_.initialized ()) + this->reactor_.close(); - if (this->handle_ != ACE_INVALID_HANDLE) - { - ACE_OS::closesocket (this->handle_); - this->handle_=ACE_INVALID_HANDLE; - } + while (finish_count_ > 0) + { + ace_mon.release (); + finish_event_.wait(); + ace_mon.acquire (); + finish_event_.reset (); + } + } + + return rc; +} + +int +ACE_POSIX_Asynch_Accept_Task::lock_finish () +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + finish_count_ ++; return 0; } -void * -ACE_POSIX_Asynch_Accept::thread_function (void *arg_reactor) +int +ACE_POSIX_Asynch_Accept_Task::unlock_finish () { - // Retrieve the reactor pointer from the argument. - ACE_Reactor *reactor = ACE_reinterpret_cast (ACE_Reactor *, - arg_reactor); + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - // It should be valid Reactor, since we have a reactor_ ,e,ner we - // are passing only that one here. - if (reactor == 0) - ACE_ERROR ((LM_ERROR, - "%n:%l:Invalid Reactor pointer passed to the thread_function\n", - 0)); + finish_count_ --; - // For this reactor, this thread is the owner. - reactor->owner (ACE_OS::thr_self ()); + finish_event_.signal (); + return 0; +} + +int +ACE_POSIX_Asynch_Accept_Task::svc () +{ sigset_t RT_signals; - if (sigemptyset ( & RT_signals ) == -1) + if (sigemptyset (& RT_signals) == -1) ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "sigemptyset failed")); + ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT ("sigemptyset failed"))); int member = 0; @@ -1779,54 +1826,148 @@ ACE_POSIX_Asynch_Accept::thread_function (void *arg_reactor) } } - if (ACE_OS::pthread_sigmask ( SIG_BLOCK, & RT_signals, 0) != 0) + if (ACE_OS::pthread_sigmask (SIG_BLOCK, & RT_signals, 0) != 0) ACE_ERROR ((LM_ERROR, - "Error:(%P | %t):%p\n", - "pthread_sigmask failed")); + ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT ("pthread_sigmask failed"))); - while (reactor->reactor_event_loop_done () == 0) - if (reactor->handle_events () == -1) - return ACE_reinterpret_cast (void *, -1); + + + reactor_.owner (ACE_Thread::self()); + + reactor_.run_reactor_event_loop (); return 0; } -// Methods belong to ACE_POSIX_Asynch_Operation base class. These -// methods are defined here to avoid dominance warnings. They route -// the call to the ACE_POSIX_Asynch_Operation base class. + int -ACE_POSIX_Asynch_Accept::cancel (void) +ACE_POSIX_Asynch_Accept_Task::register_acceptor (ACE_POSIX_Asynch_Accept * posix_accept, + ACE_Reactor_Mask mask) { - //We are not ACE_POSIX_Asynch_Operation - //so we could not call ::aiocancel () - //We delegate real cancelation to the accept_handler_ - // accept_handler_->cancel_uncompleted (1) + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active - //return ACE_POSIX_Asynch_Operation::cancel (); + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - if (this->accept_handler_ == 0) - return 1 ; // AIO_ALLDONE + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + // Register the handler with the reactor. + int retval = this->reactor_.register_handler (posix_accept->get_handle(), + posix_accept, + mask); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") + ACE_LIB_TEXT ("register_handler failed \n")), + -1); - //cancel with notifications as POSIX should do + // Suspend the <handle> now. Enable only when the <accept> is issued + // by the application. + retval = this->reactor_.suspend_handler (posix_accept->get_handle()); + if (retval == -1) + { + this->reactor_.remove_handler (posix_accept, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") + ACE_LIB_TEXT ("suspend_handler failed \n")), + -1); + } - int retval = this->accept_handler_->cancel_uncompleted (1); + return 0; +} - //retval contains now the number of canceled requests - - if (retval == 0) - return 1 ; // AIO_ALLDONE +int +ACE_POSIX_Asynch_Accept_Task::remove_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active - if (retval > 0) - return 0; // AIO_CANCELED + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - return -1; + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = this->reactor_.remove_handler (posix_accept, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n") + ACE_LIB_TEXT ("remove_handler failed \n")), + -1); + + return 0; } -ACE_Proactor * -ACE_POSIX_Asynch_Accept::proactor (void) const +int +ACE_POSIX_Asynch_Accept_Task::suspend_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) { - return ACE_POSIX_Asynch_Operation::proactor (); + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = this->reactor_.suspend_handler (posix_accept); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n") + ACE_LIB_TEXT ("suspend_handler failed \n")), + -1); + + return 0; +} + +int +ACE_POSIX_Asynch_Accept_Task::resume_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = this->reactor_.resume_handler (posix_accept); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n") + ACE_LIB_TEXT ("resume_handler failed \n")), + -1); + + return 0; } // ********************************************************************* @@ -2012,19 +2153,24 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler // = DESCRIPTION // // This is a helper class for implementing - // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. This class - // abstracts out all the commonalities in the two different - // POSIX Transmit Handler implementations. + // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. + // public: + ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor, + ACE_POSIX_Asynch_Transmit_File_Result *result); + // Constructor. Result pointer will have all the information to do + // the file transmission (socket, file, application handler, bytes + // to write). + virtual ~ACE_POSIX_Asynch_Transmit_Handler (void); // Destructor. + int transmit (void); + // Do the transmission. All the info to do the transmission is in + // the <result> member. + protected: - ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Asynch_Transmit_File_Result *result); - // Constructor. Result pointer will have all the information to do - // the file transmission (socket, file, application handler, bytes - // to write). ACE_POSIX_Asynch_Transmit_File_Result *result_; // The asynch result pointer made from the initial transmit file @@ -2053,37 +2199,7 @@ protected: size_t bytes_transferred_; // Number of bytes transferred on the stream. -}; - -// ************************************************************ -class ACE_Export ACE_POSIX_AIOCB_Asynch_Transmit_Handler : public ACE_POSIX_Asynch_Transmit_Handler -{ - // = TITLE - // - // Auxillary handler for doing <Asynch_Transmit_File> in - // Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this. - // - // = DESCRIPTION - // - // This is a helper class for implementing - // <ACE_POSIX_Asynch_Transmit_File> in Unix systems. - -public: - ACE_POSIX_AIOCB_Asynch_Transmit_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor, - ACE_POSIX_Asynch_Transmit_File_Result *result); - // Constructor. Result pointer will have all the information to do - // the file transmission (socket, file, application handler, bytes - // to write). - - virtual ~ACE_POSIX_AIOCB_Asynch_Transmit_Handler (void); - // Destructor. - - int transmit (void); - // Do the transmission. All the info to do the transmission is in - // the <result> member. - -protected: virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result); // This is called when asynchronous writes from the socket complete. @@ -2093,17 +2209,19 @@ protected: int initiate_read_file (void); // Issue asynch read from the file. - ACE_POSIX_AIOCB_Asynch_Read_File rf_; + ACE_POSIX_Asynch_Read_File rf_; // To read from the file to be transmitted. - ACE_POSIX_AIOCB_Asynch_Write_Stream ws_; + ACE_POSIX_Asynch_Write_Stream ws_; // Write stream to write the header, trailer and the data. }; -// ********************************************************************* +// ************************************************************ // Constructor. -ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Asynch_Transmit_File_Result *result) +ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler + (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor, + ACE_POSIX_Asynch_Transmit_File_Result *result) : result_ (result), mb_ (0), header_act_ (this->HEADER_ACT), @@ -2111,7 +2229,9 @@ ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_ trailer_act_ (this->TRAILER_ACT), file_offset_ (result->offset ()), file_size_ (0), - bytes_transferred_ (0) + bytes_transferred_ (0), + rf_ (posix_aiocb_proactor), + ws_ (posix_aiocb_proactor) { // Allocate memory for the message block. ACE_NEW (this->mb_, @@ -2128,25 +2248,12 @@ ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler (void) mb_->release (); } -// ********************************************************************* - -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::ACE_POSIX_AIOCB_Asynch_Transmit_Handler (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor, - ACE_POSIX_Asynch_Transmit_File_Result *result) - : ACE_POSIX_Asynch_Transmit_Handler (result), - rf_ (posix_aiocb_proactor), - ws_ (posix_aiocb_proactor) -{ -} - -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::~ACE_POSIX_AIOCB_Asynch_Transmit_Handler (void) -{ -} // Do the transmission. // Initiate transmitting the header. When that completes // handle_write_stream will be called, there start transmitting the file. int -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::transmit (void) +ACE_POSIX_Asynch_Transmit_Handler::transmit (void) { // No proactor is given for the <open>'s. Because we are using the // concrete implementations of the Asynch_Operations, and we have @@ -2183,7 +2290,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::transmit (void) } void -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) +ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) { // Update bytes transferred so far. this->bytes_transferred_ += result.bytes_transferred (); @@ -2281,7 +2388,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_W } void -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) +ACE_POSIX_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result) { // Failure. if (result.success () == 0) @@ -2323,7 +2430,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read } int -ACE_POSIX_AIOCB_Asynch_Transmit_Handler::initiate_read_file (void) +ACE_POSIX_Asynch_Transmit_Handler::initiate_read_file (void) { // Is there something to read. if (this->file_offset_ >= this->file_size_) @@ -2364,15 +2471,15 @@ ACE_POSIX_AIOCB_Asynch_Transmit_Handler::initiate_read_file (void) // ********************************************************************* -ACE_POSIX_AIOCB_Asynch_Transmit_File::ACE_POSIX_AIOCB_Asynch_Transmit_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Transmit_File::ACE_POSIX_Asynch_Transmit_File (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Transmit_File_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor) + ACE_POSIX_Asynch_Operation (posix_aiocb_proactor) { } int -ACE_POSIX_AIOCB_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, +ACE_POSIX_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer, u_long bytes_to_write, u_long offset, @@ -2389,7 +2496,7 @@ ACE_POSIX_AIOCB_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, if (file_size == -1) ACE_ERROR_RETURN ((LM_ERROR, "Error:%N:%l:%p\n", - "POSIX_AIOCB_Asynch_Transmit_File:filesize failed"), + "POSIX_Asynch_Transmit_File:filesize failed"), -1); if (bytes_to_write == 0) @@ -2427,10 +2534,10 @@ ACE_POSIX_AIOCB_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, -1); // Make the auxillary handler and initiate transmit. - ACE_POSIX_AIOCB_Asynch_Transmit_Handler *transmit_handler = 0; + ACE_POSIX_Asynch_Transmit_Handler *transmit_handler = 0; ACE_NEW_RETURN (transmit_handler, - ::ACE_POSIX_AIOCB_Asynch_Transmit_Handler (this->posix_proactor (), + ::ACE_POSIX_Asynch_Transmit_Handler (this->posix_proactor (), result), -1); @@ -2443,17 +2550,16 @@ ACE_POSIX_AIOCB_Asynch_Transmit_File::transmit_file (ACE_HANDLE file, return 0; } -ACE_POSIX_AIOCB_Asynch_Transmit_File::~ACE_POSIX_AIOCB_Asynch_Transmit_File (void) +ACE_POSIX_Asynch_Transmit_File::~ACE_POSIX_Asynch_Transmit_File (void) { } - // Methods belong to ACE_POSIX_Asynch_Operation base class. These // methods are defined here to avoid dominance warnings. They route the // call to the ACE_POSIX_Asynch_Operation base class. int -ACE_POSIX_AIOCB_Asynch_Transmit_File::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Transmit_File::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -2465,13 +2571,13 @@ ACE_POSIX_AIOCB_Asynch_Transmit_File::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Transmit_File::cancel (void) +ACE_POSIX_Asynch_Transmit_File::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Transmit_File::proactor (void) const +ACE_POSIX_Asynch_Transmit_File::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } @@ -2790,12 +2896,12 @@ ACE_POSIX_Asynch_Write_Dgram_Result::~ACE_POSIX_Asynch_Write_Dgram_Result (void) } /***************************************************************************/ -ACE_POSIX_AIOCB_Asynch_Read_Dgram::~ACE_POSIX_AIOCB_Asynch_Read_Dgram (void) +ACE_POSIX_Asynch_Read_Dgram::~ACE_POSIX_Asynch_Read_Dgram (void) { } ssize_t -ACE_POSIX_AIOCB_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block, +ACE_POSIX_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block, size_t &number_of_bytes_recvd, int flags, int protocol_family, @@ -2814,43 +2920,43 @@ ACE_POSIX_AIOCB_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block, } int -ACE_POSIX_AIOCB_Asynch_Read_Dgram::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Read_Dgram::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) { - return ACE_POSIX_AIOCB_Asynch_Operation::open (handler, + return ACE_POSIX_Asynch_Operation::open (handler, handle, completion_key, proactor); } int -ACE_POSIX_AIOCB_Asynch_Read_Dgram::cancel (void) +ACE_POSIX_Asynch_Read_Dgram::cancel (void) { - return ACE_POSIX_AIOCB_Asynch_Operation::cancel (); + return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Read_Dgram::proactor (void) const +ACE_POSIX_Asynch_Read_Dgram::proactor (void) const { - return ACE_POSIX_AIOCB_Asynch_Operation::proactor (); + return ACE_POSIX_Asynch_Operation::proactor (); } -ACE_POSIX_AIOCB_Asynch_Read_Dgram::ACE_POSIX_AIOCB_Asynch_Read_Dgram (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Read_Dgram::ACE_POSIX_Asynch_Read_Dgram (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Read_Dgram_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor) + ACE_POSIX_Asynch_Operation (posix_aiocb_proactor) { } //*************************************************************************** -ACE_POSIX_AIOCB_Asynch_Write_Dgram::~ACE_POSIX_AIOCB_Asynch_Write_Dgram (void) +ACE_POSIX_Asynch_Write_Dgram::~ACE_POSIX_Asynch_Write_Dgram (void) { } ssize_t -ACE_POSIX_AIOCB_Asynch_Write_Dgram::send (ACE_Message_Block *message_block, +ACE_POSIX_Asynch_Write_Dgram::send (ACE_Message_Block *message_block, size_t &number_of_bytes_sent, int flags, const ACE_Addr &addr, @@ -2869,7 +2975,7 @@ ACE_POSIX_AIOCB_Asynch_Write_Dgram::send (ACE_Message_Block *message_block, } int -ACE_POSIX_AIOCB_Asynch_Write_Dgram::open (ACE_Handler &handler, +ACE_POSIX_Asynch_Write_Dgram::open (ACE_Handler &handler, ACE_HANDLE handle, const void *completion_key, ACE_Proactor *proactor) @@ -2881,32 +2987,46 @@ ACE_POSIX_AIOCB_Asynch_Write_Dgram::open (ACE_Handler &handler, } int -ACE_POSIX_AIOCB_Asynch_Write_Dgram::cancel (void) +ACE_POSIX_Asynch_Write_Dgram::cancel (void) { return ACE_POSIX_Asynch_Operation::cancel (); } ACE_Proactor * -ACE_POSIX_AIOCB_Asynch_Write_Dgram::proactor (void) const +ACE_POSIX_Asynch_Write_Dgram::proactor (void) const { return ACE_POSIX_Asynch_Operation::proactor (); } -ACE_POSIX_AIOCB_Asynch_Write_Dgram::ACE_POSIX_AIOCB_Asynch_Write_Dgram (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) +ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : ACE_Asynch_Operation_Impl (), ACE_Asynch_Write_Dgram_Impl (), - ACE_POSIX_AIOCB_Asynch_Operation (posix_aiocb_proactor) + ACE_POSIX_Asynch_Operation (posix_aiocb_proactor) { } #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>; template class ACE_Node<ACE_POSIX_Asynch_Accept_Result *>; template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *>; + +template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>; +template class ACE_Node<ACE_POSIX_Asynch_Result *>; +template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>; + #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + #pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *> #pragma instantiate ACE_Node<ACE_POSIX_Asynch_Accept_Result *> #pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Accept_Result *> + +#pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> +#pragma instantiate ACE_Node<ACE_POSIX_Asynch_Result *> +#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *> + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + + #endif /* ACE_HAS_AIO_CALLS */ |