diff options
author | Steve Huston <shuston@riverace.com> | 2005-02-28 22:11:39 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2005-02-28 22:11:39 +0000 |
commit | 85cfcf17aeb34239fc26c02bbccfba6d48e532e9 (patch) | |
tree | 697ad7816271e34002c17f44e48df9597635202d | |
parent | f1feefef931c8ede1f421846013c412a1261402d (diff) | |
download | ATCD-85cfcf17aeb34239fc26c02bbccfba6d48e532e9.tar.gz |
ChangeLogTag:Mon Feb 28 17:10:41 2005 Steve Huston <shuston@riverace.com>
-rw-r--r-- | ChangeLog | 19 | ||||
-rw-r--r-- | ace/Asynch_Pseudo_Task.cpp | 195 | ||||
-rw-r--r-- | ace/Asynch_Pseudo_Task.h | 24 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 605 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.h | 51 | ||||
-rw-r--r-- | ace/WIN32_Asynch_IO.cpp | 304 | ||||
-rw-r--r-- | ace/WIN32_Asynch_IO.h | 32 |
7 files changed, 402 insertions, 828 deletions
diff --git a/ChangeLog b/ChangeLog index f5884744127..f787268aa15 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,20 @@ +Mon Feb 28 17:10:41 2005 Steve Huston <shuston@riverace.com> + + * ace/Asynch_Pseudo_Task.{h cpp}: Removed all the flg_active_ and + finish locking stuff. Use the thr_count() value to tell if the + thread is running, and don't try to interlock cleanup activities + with other classes. It's messy and doesn't work right. There are + too many race conditions between closing handles and closing down + this object. + Corrected ACE_LIB_TEXT use instead of + ACE_TEXT, and added missing commas between some strings. + + * ace/POSIX_Asynch_IO.{h cpp}: + * ace/WIN32_Asynch_IO.{h cpp}: Don't try to interlock against the + Asynch_Pseudo_Task. If it's going, it's going. Only hold the lock + around access to the connection/handle map since that's accessed + from the asynch pseudo task thread as well as the caller's. + Mon Feb 28 09:59:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl> * include/makeinclude/platform_hpux_aCC.GNU: @@ -12,7 +29,7 @@ Mon Feb 28 09:59:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl> Mon Feb 28 11:10:58 2005 Boris Kolpackov <boris@kolpackov.net> * protocols/ace/RMCast/Acknowledge.h: Made Acknowledge::Queue - a friend of Acknowledge. Hopefully this will help Sun C++ 5.4. + a friend of Acknowledge. Hopefully this will help Sun C++ 5.4. Sun Feb 27 08:51:23 2005 Douglas C. Schmidt <schmidt@cs.wustl.edu> diff --git a/ace/Asynch_Pseudo_Task.cpp b/ace/Asynch_Pseudo_Task.cpp index 9f46a098547..ce936471e52 100644 --- a/ace/Asynch_Pseudo_Task.cpp +++ b/ace/Asynch_Pseudo_Task.cpp @@ -8,98 +8,39 @@ ACE_RCSID(ace, Asynch_Pseudo_Task, "$Id$") ACE_Asynch_Pseudo_Task::ACE_Asynch_Pseudo_Task() - : flg_active_ (0), - select_reactor_ (), // should be initialized before reactor_ - reactor_ (&select_reactor_, 0), // don't delete implementation - token_ (select_reactor_.lock ()), // we can use reactor token - finish_count_ (0) + : select_reactor_ (), // should be initialized before reactor_ + reactor_ (&select_reactor_, 0) // don't delete implementation { } ACE_Asynch_Pseudo_Task::~ACE_Asynch_Pseudo_Task() { - stop(); -} - -int -ACE_Asynch_Pseudo_Task::is_active (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - return flg_active_; + this->stop(); } int ACE_Asynch_Pseudo_Task::start (void) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_) - return 0; - if (this->reactor_.initialized () == 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start reactor is not initialized")), + ACE_LIB_TEXT ("start reactor is not initialized")), -1); - - if (this->activate () != 0) - return -1; - - this->flg_active_ = 1; - return 0; + return this->activate () == -1 ? -1 : 0; // If started, return 0 } int ACE_Asynch_Pseudo_Task::stop (void) { - { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) // already stopped - return 0; - - reactor_.end_reactor_event_loop (); - } + if (this->thr_count () == 0) // already stopped + return 0; - if (-1 == this->wait ()) + if (this->reactor_.end_reactor_event_loop () == -1) return -1; - { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - this->flg_active_ = 0; - - this->reactor_.close (); - - while (this->finish_count_ > 0) - { - ACE_MT (ace_mon.release ()); - finish_event_.wait (); - - ACE_MT (ace_mon.acquire ()); - finish_event_.reset (); - } - } - - return 0; -} - -int -ACE_Asynch_Pseudo_Task::lock_finish (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - finish_count_ ++; - return 0; -} - -int -ACE_Asynch_Pseudo_Task::unlock_finish (void) -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - --finish_count_; - finish_event_.signal (); - + this->wait (); + this->reactor_.close (); return 0; } @@ -110,30 +51,17 @@ ACE_Asynch_Pseudo_Task::svc (void) sigset_t RT_signals; - if (sigemptyset (&RT_signals) == -1) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), - ACE_LIB_TEXT ("sigemptyset failed"))); - - int member = 0; - + sigemptyset (&RT_signals); for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) - { - member = sigismember (& RT_signals , si); - if (member == 1) - { - sigaddset (&RT_signals, si); - } - } + sigaddset (&RT_signals, si); if (ACE_OS::pthread_sigmask (SIG_BLOCK, &RT_signals, 0) != 0) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), - ACE_LIB_TEXT ("pthread_sigmask failed"))); + ACE_LIB_TEXT ("pthread_sigmask"))); #endif reactor_.owner (ACE_Thread::self()); - reactor_.run_reactor_event_loop (); return 0; @@ -147,36 +75,21 @@ ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle, ACE_Reactor_Mask mask, int flg_suspend) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) - { - ACE_OS::last_error (ESHUTDOWN); - return -1; - } - // Register the handler with the reactor. - int retval = this->reactor_.register_handler (handle, handler, mask); - - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::register_io_handler")), - -1); + if (-1 == this->reactor_.register_handler (handle, handler, mask)) + return -1; - if (flg_suspend == 0 ) + if (flg_suspend == 0) return 0; - // Suspend the <handle> now. Enable only when the <accept> is issued + // Suspend the handle now. Enable only when the accept is issued // by the application. - retval = this->reactor_.suspend_handler (handle); - - if (retval == -1) + if (this->reactor_.suspend_handler (handle) == -1) { ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::register_io_handler (suspended)"))); + ACE_LIB_TEXT ("register_io_handler (suspended)"))); this->reactor_.remove_handler (handle, ACE_Event_Handler::ALL_EVENTS_MASK | ACE_Event_Handler::DONT_CALL); @@ -189,83 +102,27 @@ ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle, int ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_HANDLE handle) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) - { - ACE_OS::last_error (ESHUTDOWN); - return -1; - } - - int retval = - this->reactor_.remove_handler (handle , - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n") - ACE_TEXT ("ACE_Asynch_Pseudo_Task::remove_io_handler")), - -1); - - return 0; + return this->reactor_.remove_handler (handle , + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); } int ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_Handle_Set &set) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) - { - ACE_OS::last_error (ESHUTDOWN); - return -1; - } - - int retval = - this->reactor_.remove_handler (set, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n") - ACE_TEXT ("ACE_Asynch_Pseudo_Task::remove_io_handler")), - -1); - - return 0; + return this->reactor_.remove_handler (set, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); } int ACE_Asynch_Pseudo_Task::suspend_io_handler (ACE_HANDLE handle) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) - { - ACE_OS::last_error (ESHUTDOWN); - return -1; - } - - int retval = this->reactor_.suspend_handler (handle); - - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n") - ACE_TEXT ("ACE_Asynch_Pseudo_Task::suspend_io_handler")), - -1); - - return 0; + return this->reactor_.suspend_handler (handle); } int ACE_Asynch_Pseudo_Task::resume_io_handler (ACE_HANDLE handle) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - - if (this->flg_active_ == 0) - { - ACE_OS::last_error (ESHUTDOWN); - return -1; - } - return this->reactor_.resume_handler (handle); } diff --git a/ace/Asynch_Pseudo_Task.h b/ace/Asynch_Pseudo_Task.h index 060bfa3223b..55313b00505 100644 --- a/ace/Asynch_Pseudo_Task.h +++ b/ace/Asynch_Pseudo_Task.h @@ -24,31 +24,21 @@ #include "ace/Reactor.h" #include "ace/Select_Reactor.h" #include "ace/Task.h" -#include "ace/Manual_Event.h" /** * @class ACE_Asynch_Pseudo_Task * */ -class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_SYNCH> +class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_NULL_SYNCH> { - friend class ACE_POSIX_Asynch_Accept; - friend class ACE_POSIX_Asynch_Connect; - friend class ACE_WIN32_Asynch_Connect; - public: - ACE_Asynch_Pseudo_Task(); virtual ~ACE_Asynch_Pseudo_Task(); int start (void); int stop (void); - virtual int svc (void); - - int is_active (void); - int register_io_handler (ACE_HANDLE handle, ACE_Event_Handler *handler, ACE_Reactor_Mask mask, @@ -60,21 +50,11 @@ public: int suspend_io_handler (ACE_HANDLE handle); protected: - - int lock_finish (void); - int unlock_finish (void); - - int flg_active_; + virtual int svc (void); ACE_Select_Reactor select_reactor_; // should be initialized before reactor_ - ACE_Reactor reactor_; - - ACE_Lock &token_; - - int finish_count_; - ACE_Manual_Event finish_event_; }; #include /**/ "ace/post.h" diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index 7acb9766f95..5b7416458b6 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -804,15 +804,14 @@ ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_pro : ACE_Asynch_Operation_Impl (), ACE_Asynch_Accept_Impl (), ACE_POSIX_Asynch_Operation (posix_proactor), - flg_open_ (0), - task_lock_count_ (0) + flg_open_ (false) { } ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void) { this->close (); - this->reactor(0); // to avoid purge_pending_notifications + this->reactor (0); // to avoid purge_pending_notifications } ACE_HANDLE @@ -836,47 +835,31 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler::Proxy_Ptr &handler_proxy, { ACE_TRACE ("ACE_POSIX_Asynch_Accept::open"); - int result=0; - - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - // if we are already opened, // we could not create a new handler without closing the previous - - if (this->flg_open_ != 0) + if (this->flg_open_) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:") ACE_LIB_TEXT("acceptor already open \n")), -1); - result = ACE_POSIX_Asynch_Operation::open (handler_proxy, - handle, - completion_key, - proactor); - if (result == -1) - return result; - - flg_open_ = 1; + if (-1 == ACE_POSIX_Asynch_Operation::open (handler_proxy, + handle, + completion_key, + proactor)) + return -1; - this->task_lock_count_++; + flg_open_ = true; - // At this moment asynch_accept_task does not know about us, - // so we can lock task's token with our lock_ locked. - // In all other cases we should release our lock_ before - // calling task's methods to avoid deadlock ACE_Asynch_Pseudo_Task & task = - this->posix_proactor()->get_asynch_pseudo_task(); - - result = task.register_io_handler (this->get_handle(), - this, - ACE_Event_Handler::ACCEPT_MASK, - 1); // suspend after register - - this->task_lock_count_-- ; + this->posix_proactor ()->get_asynch_pseudo_task (); - if (result < 0) + if (-1 == task.register_io_handler (this->get_handle(), + this, + ACE_Event_Handler::ACCEPT_MASK, + 1)) // suspend after register { - this->flg_open_= 0; + this->flg_open_= false; this->handle_ = ACE_INVALID_HANDLE; return -1 ; } @@ -895,49 +878,48 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, { ACE_TRACE ("ACE_POSIX_Asynch_Accept::accept"); - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - if (this->flg_open_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept") - ACE_LIB_TEXT("acceptor was not opened before\n")), - -1); + if (!this->flg_open_) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept") + ACE_LIB_TEXT("acceptor was not opened before\n")), + -1); - // Sanity check: make sure that enough space has been allocated by - // the caller. - size_t address_size = sizeof (sockaddr_in); + // Sanity check: make sure that enough space has been allocated by + // the caller. + size_t address_size = sizeof (sockaddr_in); #if defined (ACE_HAS_IPV6) - if (addr_family == AF_INET6) - address_size = sizeof (sockaddr_in6); + if (addr_family == AF_INET6) + address_size = sizeof (sockaddr_in6); #else - ACE_UNUSED_ARG (addr_family); + ACE_UNUSED_ARG (addr_family); #endif - size_t available_space = message_block.space (); - size_t space_needed = bytes_to_read + 2 * address_size; - - if (available_space < space_needed) - { - ACE_OS::last_error (ENOBUFS); - return -1; - } - - // Common code for both WIN and POSIX. - // Create future Asynch_Accept_Result - ACE_POSIX_Asynch_Accept_Result *result = 0; - ACE_NEW_RETURN (result, - ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_, - this->handle_, - accept_handle, - message_block, - bytes_to_read, - act, - this->posix_proactor()->get_handle (), - priority, - signal_number), + size_t available_space = message_block.space (); + size_t space_needed = bytes_to_read + 2 * address_size; + + if (available_space < space_needed) + { + ACE_OS::last_error (ENOBUFS); + return -1; + } + + // Common code for both WIN and POSIX. + // Create future Asynch_Accept_Result + ACE_POSIX_Asynch_Accept_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_, + this->handle_, + accept_handle, + message_block, + bytes_to_read, + act, + this->posix_proactor()->get_handle (), + priority, + signal_number), -1); - // Enqueue result + // Enqueue result + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); if (this->result_queue_.enqueue_tail (result) == -1) { ACE_ERROR ((LM_ERROR, @@ -949,8 +931,6 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, if (this->result_queue_.size () > 1) return 0; - - this->task_lock_count_ ++; } // If this is the only item, then it means there the set was empty @@ -959,22 +939,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.resume_io_handler (this->get_handle()); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_ --; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - } - - if (rc_task < 0) - return -1; - - return 0; + return task.resume_io_handler (this->get_handle ()); } //@@ New method cancel_uncompleted @@ -1016,9 +981,9 @@ ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify) if (this->posix_proactor ()->post_completion (result) == -1) ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT("(%P | %t):%p\n"), ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::") - ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed") + ACE_LIB_TEXT("cancel_uncompleted") )); } } @@ -1030,11 +995,9 @@ ACE_POSIX_Asynch_Accept::cancel (void) { ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel"); - //We are not really ACE_POSIX_Asynch_Operation - //so we could not call ::aiocancel () - // or just write - //return ACE_POSIX_Asynch_Operation::cancel (); - //We delegate real cancelation to cancel_uncompleted (1) + // Since this is not a real POSIX asynch I/O operation, we can't + // call ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel (). + // We delegate real cancelation to cancel_uncompleted (1) int rc = -1 ; // ERRORS @@ -1044,32 +1007,19 @@ ACE_POSIX_Asynch_Accept::cancel (void) int num_cancelled = cancel_uncompleted (flg_open_); if (num_cancelled == 0) - rc = 1 ; // AIO_ALLDONE + rc = 1 ; // AIO_ALLDONE else if (num_cancelled > 0) - rc = 0 ; // AIO_CANCELED - - if (this->flg_open_ == 0) - return rc ; + rc = 0 ; // AIO_CANCELED - this->task_lock_count_++; + if (!this->flg_open_) + return rc ; } ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.suspend_io_handler (this->get_handle()); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - } - - return rc; + task.suspend_io_handler (this->get_handle()); + return 0; } int @@ -1092,47 +1042,33 @@ ACE_POSIX_Asynch_Accept::close () { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - this->cancel_uncompleted (flg_open_); + } - if (this->flg_open_ == 0) - { - if (this->handle_ != ACE_INVALID_HANDLE) - { - ACE_OS::closesocket (this->handle_); - this->handle_ = ACE_INVALID_HANDLE; - } - return 0; - } - - if (this->handle_ == ACE_INVALID_HANDLE) + if (!this->flg_open_) + { + if (this->handle_ != ACE_INVALID_HANDLE) + { + ACE_OS::closesocket (this->handle_); + this->handle_ = ACE_INVALID_HANDLE; + } return 0; + } - this->task_lock_count_++; - } + if (this->handle_ == ACE_INVALID_HANDLE) + return 0; ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.remove_io_handler (this->get_handle ()); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - - if (this->handle_ != ACE_INVALID_HANDLE) - { - ACE_OS::closesocket (this->handle_); - this->handle_ = ACE_INVALID_HANDLE; - } + task.remove_io_handler (this->get_handle ()); + if (this->handle_ != ACE_INVALID_HANDLE) + { + ACE_OS::closesocket (this->handle_); + this->handle_ = ACE_INVALID_HANDLE; + } - this->flg_open_ = 0; - } + this->flg_open_ = false; return 0; } @@ -1144,27 +1080,14 @@ ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE, ACE_Reactor_Mask) ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); - // handle_close is called only in two cases: + // handle_close is called in two cases: // 1. Pseudo task is closing (i.e. proactor destructor) // 2. The listen handle is closed (we don't have exclusive access to this) - // - // In all other cases we deregister ourself - // with ACE_Event_Handler::DONT_CALL mask this->cancel_uncompleted (0); - this->flg_open_ = 0; + this->flg_open_ = false; this->handle_ = ACE_INVALID_HANDLE; - - // it means other thread is waiting for reactor token_ - if (this->task_lock_count_ > 0) - { - ACE_Asynch_Pseudo_Task & task = - this->posix_proactor ()->get_asynch_pseudo_task (); - - task.lock_finish (); - } - return 0; } @@ -1177,30 +1100,27 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) // able to just go ahead and do the <accept> now on this <fd>. This // should be the same as the <listen_handle>. - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); - ACE_POSIX_Asynch_Accept_Result* result = 0; - // Deregister this info pertaining to this <accept> call. - if (this->result_queue_.dequeue_head (result) != 0) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), - ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:") - ACE_LIB_TEXT( " dequeueing failed"))); - - // Disable the <handle> in the reactor if no <accept>'s are pending. + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); - // we allow the following sequence of locks : - // reactor::token , then our mutex lock_ - // to avoid deadlock prohibited reverse sequence + // Deregister this info pertaining to this accept call. + if (this->result_queue_.dequeue_head (result) != 0) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:") + ACE_LIB_TEXT( " dequeueing failed"))); - if (this->result_queue_.size () == 0) - { - ACE_Asynch_Pseudo_Task & task = - this->posix_proactor ()->get_asynch_pseudo_task (); + // Disable the handle in the reactor if no more accepts are pending. + if (this->result_queue_.size () == 0) + { + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); - task.suspend_io_handler (this->get_handle()); - } + task.suspend_io_handler (this->get_handle()); + } + } // Issue <accept> now. // @@ We shouldnt block here since we have already done poll/select @@ -1216,11 +1136,11 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) if (new_handle == ACE_INVALID_HANDLE) { - result->set_error(errno); + result->set_error (errno); ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ") - ACE_LIB_TEXT(" <accept> system call failed"))); + ACE_LIB_TEXT("accept"))); // Notify client as usual, "AIO" finished with errors } @@ -1301,8 +1221,7 @@ ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor * posix_p : ACE_Asynch_Operation_Impl (), ACE_Asynch_Connect_Impl (), ACE_POSIX_Asynch_Operation (posix_proactor), - flg_open_ (0), - task_lock_count_ (0) + flg_open_ (false) { } @@ -1315,7 +1234,6 @@ ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void) ACE_HANDLE ACE_POSIX_Asynch_Connect::get_handle (void) const { - ACE_ASSERT (0); return ACE_INVALID_HANDLE; } @@ -1334,16 +1252,8 @@ ACE_POSIX_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy, { ACE_TRACE ("ACE_POSIX_Asynch_Connect::open"); - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - // if we are already opened, - // we could not create a new handler without closing the previous - - if (this->flg_open_ != 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::open:") - ACE_LIB_TEXT("connector already open \n")), - -1); + if (this->flg_open_) + return -1; //int result = ACE_POSIX_Asynch_Operation::open (handler_proxy, @@ -1355,7 +1265,7 @@ ACE_POSIX_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy, //if (result == -1) // return result; - this->flg_open_ = 1; + this->flg_open_ = true; return 0; } @@ -1371,99 +1281,86 @@ ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle, { ACE_TRACE ("ACE_POSIX_Asynch_Connect::connect"); - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + if (this->flg_open_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect") + ACE_LIB_TEXT("connector was not opened before\n")), + -1); - if (this->flg_open_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect") - ACE_LIB_TEXT("connector was not opened before\n")), - -1); - - // Common code for both WIN and POSIX. - // Create future Asynch_Connect_Result - ACE_POSIX_Asynch_Connect_Result *result = 0; - ACE_NEW_RETURN (result, - ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_, - connect_handle, - act, - this->posix_proactor ()->get_handle (), - priority, - signal_number), - -1); + // Common code for both WIN and POSIX. + // Create future Asynch_Connect_Result + ACE_POSIX_Asynch_Connect_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_, + connect_handle, + act, + this->posix_proactor ()->get_handle (), + priority, + signal_number), + -1); - int rc = connect_i (result, - remote_sap, - local_sap, - reuse_addr); + int rc = connect_i (result, + remote_sap, + local_sap, + reuse_addr); - // update handle - connect_handle = result->connect_handle (); + // update handle + connect_handle = result->connect_handle (); - if (rc != 0) - return post_result (result, 1); + if (rc != 0) + return post_result (result, true); - // Enqueue result we will wait for completion + // Enqueue result we will wait for completion + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); if (this->result_map_.bind (connect_handle, result) == -1) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect:") - ACE_LIB_TEXT("result map binding failed\n"))); + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_POSIX_Asynch_Connect::connect:") + ACE_LIB_TEXT ("bind"))); result->set_error (EFAULT); - return post_result (result, 1); + return post_result (result, true); } - - this->task_lock_count_ ++; } ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.register_io_handler (connect_handle, - this, - ACE_Event_Handler::CONNECT_MASK, - 0); // not to suspend after register - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_ --; - - int post_enable = 1; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - { - post_enable = 0; - task.unlock_finish (); - } - - if (rc_task < 0) + rc = task.register_io_handler (connect_handle, + this, + ACE_Event_Handler::CONNECT_MASK, + 0); // don't suspend after register + if (rc < 0) + { { - ACE_POSIX_Asynch_Connect_Result *result = 0; + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); this->result_map_.unbind (connect_handle, result); - - if (result != 0) - { - result->set_error (EFAULT); - - return post_result (result, post_enable); - } } - } + if (result != 0) + { + result->set_error (EFAULT); + this->post_result (result, true); + } + return -1; + } + else + result = 0; + return 0; } int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result, - int post_enable) + bool post_enable) { - if (this->flg_open_ != 0 && post_enable != 0) + if (this->flg_open_ && post_enable != 0) { if (this->posix_proactor ()->post_completion (result) == 0) - return 0 ; + return 0; ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT("Error:(%P | %t):%p\n"), @@ -1481,7 +1378,7 @@ int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * res return -1; } -//@@ New method connect_i +//connect_i // return code : // -1 errors before attempt to connect // 0 connect started @@ -1506,33 +1403,32 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result, 0); // save it result->connect_handle (handle); - if (handle == ACE_INVALID_HANDLE) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT(" ACE_OS::socket failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT("socket")), + -1); } // Reuse the address int one = 1; - if (protocol_family != PF_UNIX && - reuse_addr != 0 && - ACE_OS::setsockopt (handle, - SOL_SOCKET, - SO_REUSEADDR, - (const char*) &one, - sizeof one) == -1 ) + if (protocol_family != PF_UNIX && + reuse_addr != 0 && + ACE_OS::setsockopt (handle, + SOL_SOCKET, + SO_REUSEADDR, + (const char*) &one, + sizeof one) == -1 ) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT(" ACE_OS::setsockopt failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT("setsockopt")), + -1); } } @@ -1544,11 +1440,11 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result, if (ACE_OS::bind (handle, laddr, size) == -1) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT(" ACE_OS::bind failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT("bind")), + -1); } } @@ -1556,19 +1452,19 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result, if (ACE::set_flags (handle, ACE_NONBLOCK) != 0) { result->set_error (errno); - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i, %p\n") - ACE_LIB_TEXT("ACE::set_flags failed")), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n") + ACE_LIB_TEXT("set_flags")), -1); } for (;;) { - int rc = ACE_OS::connect (handle, - reinterpret_cast<sockaddr *> (remote_sap.get_addr ()), - remote_sap.get_size ()); + int rc = ACE_OS::connect + (handle, + reinterpret_cast<sockaddr *> (remote_sap.get_addr ()), + remote_sap.get_size ()); if (rc < 0) // failure { if (errno == EWOULDBLOCK || errno == EINPROGRESS) @@ -1600,7 +1496,7 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result, // int -ACE_POSIX_Asynch_Connect::cancel_uncompleted (int flg_notify, +ACE_POSIX_Asynch_Connect::cancel_uncompleted (bool flg_notify, ACE_Handle_Set & set) { ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel_uncompleted"); @@ -1634,47 +1530,30 @@ ACE_POSIX_Asynch_Connect::cancel (void) { ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel"); - //We are not really ACE_POSIX_Asynch_Operation - //so we could not call ::aiocancel () - // or just write - //return ACE_POSIX_Asynch_Operation::cancel (); - //We delegate real cancelation to cancel_uncompleted (1) + // Since this is not a real asynch I/O operation, we can't just call + // ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel (). + // Delegate real cancelation to cancel_uncompleted (1) int rc = -1 ; // ERRORS ACE_Handle_Set set; - + int num_cancelled = 0; { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - int num_cancelled = cancel_uncompleted (flg_open_, set); - - if (num_cancelled == 0) - rc = 1 ; // AIO_ALLDONE - else if (num_cancelled > 0) - rc = 0 ; // AIO_CANCELED - - if (this->flg_open_ == 0) - return rc ; - - this->task_lock_count_++; + num_cancelled = cancel_uncompleted (flg_open_, set); } + if (num_cancelled == 0) + rc = 1 ; // AIO_ALLDONE + else if (num_cancelled > 0) + rc = 0 ; // AIO_CANCELED + + if (!this->flg_open_) + return rc ; ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.remove_io_handler (set); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - } - + task.remove_io_handler (set); return rc; } @@ -1684,67 +1563,39 @@ ACE_POSIX_Asynch_Connect::close (void) ACE_TRACE ("ACE_POSIX_Asynch_Connect::close"); ACE_Handle_Set set ; - + int num_cancelled = 0; { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - int num_cancelled = cancel_uncompleted (flg_open_, set); - - if (num_cancelled == 0 || this->flg_open_ == 0) - { - this->flg_open_ = 0; - return 0; - } - - this->task_lock_count_++; + num_cancelled = cancel_uncompleted (flg_open_, set); } + if (num_cancelled == 0 || !this->flg_open_) + { + this->flg_open_ = false; + return 0; + } + ACE_Asynch_Pseudo_Task & task = this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.remove_io_handler (set); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - - this->flg_open_ = 0; - } + task.remove_io_handler (set); + this->flg_open_ = false; return 0; } int -ACE_POSIX_Asynch_Connect::handle_exception (ACE_HANDLE fd) -{ - ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_exception"); - return handle_input (fd); -} - -int -ACE_POSIX_Asynch_Connect::handle_input (ACE_HANDLE fd) -{ - ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_input"); - - return handle_input (fd); -} - -int ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd) { ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_output"); - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); - ACE_POSIX_Asynch_Connect_Result* result = 0; - if (this->result_map_.unbind (fd, result) != 0) // not found - return -1; + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + } int sockerror = 0 ; int lsockerror = sizeof sockerror; @@ -1774,36 +1625,18 @@ ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) { ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_close"); - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); - ACE_Asynch_Pseudo_Task &task = this->posix_proactor ()->get_asynch_pseudo_task (); - if (task.is_active() == 0) // task is closing - { - if (this->flg_open_ !=0) // we are open - { - this->flg_open_ = 0; - - // it means other thread is waiting for reactor token_ - if (this->task_lock_count_ > 0) - task.lock_finish (); - } - - ACE_Handle_Set set; - this->cancel_uncompleted (0, set); - - return 0; - } - - // remove_io_handler() contains flag DONT_CALL - // so it is save task.remove_io_handler (fd); ACE_POSIX_Asynch_Connect_Result* result = 0; - if (this->result_map_.unbind (fd, result) != 0 ) // not found - return -1; + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + } result->set_bytes_transferred (0); result->set_error (ECANCELED); diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h index ebb4e9bdbb1..45bc94cc7dc 100644 --- a/ace/POSIX_Asynch_IO.h +++ b/ace/POSIX_Asynch_IO.h @@ -735,17 +735,9 @@ private: /// on canceled AIO requests int cancel_uncompleted (int flg_notify); - /// 1 - Accept is registered in ACE_Asynch_Pseudo_Task - /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task - int flg_open_ ; - - /// To prevent ACE_Asynch_Pseudo_Task from deletion - /// while we make a call to the ACE_Asynch_Pseudo_Task - /// This is extra cost !!! - /// we could avoid them if all applications will follow the rule: - /// Proactor should be deleted only after deletion all - /// AsynchOperation objects connected with it - int task_lock_count_; + /// true - Accept is registered in ACE_Asynch_Pseudo_Task + /// false - Accept is deregisted in ACE_Asynch_Pseudo_Task + bool flg_open_ ; /// Queue of Result pointers that correspond to all the pending /// accept operations. @@ -867,10 +859,10 @@ public: void set_handle (ACE_HANDLE handle); /// virtual from ACE_Event_Handler - /// Called when accept event comes up on <listen_hanlde> - int handle_input (ACE_HANDLE handle); + /// The default action on handle_input() and handle_exception is to + /// return -1. Since that's what we want to do, just reuse them. + /// handle_output(), however, is where successful connects are reported. int handle_output (ACE_HANDLE handle); - int handle_exception (ACE_HANDLE handle); /// virtual from ACE_Event_Handler int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ; @@ -881,31 +873,22 @@ private: const ACE_Addr & local_sap, int reuse_addr); - int post_result (ACE_POSIX_Asynch_Connect_Result *result, int flg_post); + int post_result (ACE_POSIX_Asynch_Connect_Result *result, bool flg_post); /// Cancel uncompleted connect operations. /** * @arg flg_notify Indicates whether or not we should send notification - * about canceled accepts. If this is 0, don't send - * notifications about canceled connects. If 1, notify + * about canceled accepts. If this is false, don't send + * notifications about canceled connects. If true, notify * user about canceled connects according POSIX * standards we should receive notifications on canceled * AIO requests. */ - int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set); - - int flg_open_ ; - /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task - /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task - + int cancel_uncompleted (bool flg_notify, ACE_Handle_Set &set); - /// to prevent ACE_Asynch_Pseudo_Task from deletion - /// while we make a call to the ACE_Asynch_Pseudo_Task - /// This is extra cost !!! - /// we could avoid them if all applications will follow the rule: - /// Proactor should be deleted only after deletion all - /// AsynchOperation objects connected with it - int task_lock_count_; + bool flg_open_ ; + /// true - Connect is registered in ACE_Asynch_Pseudo_Task + /// false - Aceept is deregisted in ACE_Asynch_Pseudo_Task typedef ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> MAP_MANAGER; @@ -914,14 +897,12 @@ private: typedef MAP_MANAGER::ITERATOR MAP_ITERATOR; typedef MAP_MANAGER::ENTRY MAP_ENTRY; - /// Map of Result pointers that correspond to all the <accept>'s - /// pending. + /// Map of Result pointers that correspond to all the pending connects. MAP_MANAGER result_map_; - /// The lock to protect the result queue which is shared. The queue + /// The lock to protect the result map which is shared. The queue /// is updated by main thread in the register function call and - /// through the auxillary thread in the deregister fun. So let us - /// mutex it. + /// through the auxillary thread in the asynch pseudo task. ACE_SYNCH_MUTEX lock_; }; diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp index 239d0c69388..5b5c2eb87f3 100644 --- a/ace/WIN32_Asynch_IO.cpp +++ b/ace/WIN32_Asynch_IO.cpp @@ -2352,13 +2352,11 @@ ACE_WIN32_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy, const void *completion_key, ACE_Proactor *proactor) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::open\n")); - - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::open"); // if we are already opened, // we could not create a new handler without closing the previous - if (this->flg_open_ != 0) + if (this->flg_open_) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::open:") ACE_LIB_TEXT ("connector already open \n")), @@ -2374,7 +2372,7 @@ ACE_WIN32_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy, //if (result == -1) // return result; - this->flg_open_ = 1; + this->flg_open_ = true; return 0; } @@ -2388,98 +2386,78 @@ ACE_WIN32_Asynch_Connect::connect (ACE_HANDLE connect_handle, int priority, int signal_number) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect\n")); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::connect"); - if (this->flg_open_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect") - ACE_LIB_TEXT ("connector was not opened before\n")), - -1); + if (!this->flg_open_) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect") + ACE_LIB_TEXT ("connector was not opened before\n")), + -1); - // Common code for both WIN and WIN32. - // Create future Asynch_Connect_Result - ACE_WIN32_Asynch_Connect_Result *result = 0; - ACE_NEW_RETURN (result, - ACE_WIN32_Asynch_Connect_Result (this->handler_proxy_, - connect_handle, - act, - this->win32_proactor_->get_handle (), - priority, - signal_number), - -1); + // Common code for both WIN and WIN32. + // Create future Asynch_Connect_Result + ACE_WIN32_Asynch_Connect_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_WIN32_Asynch_Connect_Result (this->handler_proxy_, + connect_handle, + act, + this->win32_proactor_->get_handle (), + priority, + signal_number), + -1); - int rc = connect_i (result, - remote_sap, - local_sap, - reuse_addr); + int rc = connect_i (result, + remote_sap, + local_sap, + reuse_addr); - // update handle - connect_handle = result->connect_handle (); + // update handle + connect_handle = result->connect_handle (); - if (rc != 0) - return post_result (result, 1); + if (rc != 0) + return post_result (result, true); - // Enqueue result we will wait for completion + // Enqueue result we will wait for completion + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); if (this->result_map_.bind (connect_handle, result) == -1) { ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect:") - ACE_LIB_TEXT ("result map binding failed\n"))); + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect: %p\n"), + ACE_LIB_TEXT ("bind"))); result->set_error (EFAULT); - return post_result (result, 1); + return post_result (result, true); } - - this->task_lock_count_++; } ACE_Asynch_Pseudo_Task & task = this->win32_proactor_->get_asynch_pseudo_task (); - int rc_task = task.register_io_handler (connect_handle, - this, - ACE_Event_Handler::CONNECT_MASK, - 0); // not to suspend after register - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - int post_enable = 1; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - { - post_enable = 0; - task.unlock_finish (); - } - - if (rc_task < 0) + if (-1 == task.register_io_handler (connect_handle, + this, + ACE_Event_Handler::CONNECT_MASK, + 0)) // not to suspend after register + { + result = 0; { - ACE_WIN32_Asynch_Connect_Result *result = 0; - + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); this->result_map_.unbind (connect_handle, result); - - if (result != 0) - { - result->set_error (EFAULT); - - return post_result (result, post_enable); - } } - } + if (result != 0) + { + result->set_error (EFAULT); + this->post_result (result, true); + } + } return 0; } int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * result, - int post_enable) + bool post_enable) { - if (this->flg_open_ != 0 && post_enable != 0) + if (this->flg_open_ && post_enable) { if (this->win32_proactor_ ->post_completion (result) == 0) return 0; @@ -2500,7 +2478,7 @@ int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * res return -1; } -//@@ New method connect_i +// connect_i // return code : // -1 errors before attempt to connect // 0 connect started @@ -2515,26 +2493,23 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, result->set_bytes_transferred (0); ACE_HANDLE handle = result->connect_handle (); - if (handle == ACE_INVALID_HANDLE) { int protocol_family = remote_sap.get_type (); - handle = ACE_OS::socket (protocol_family, SOCK_STREAM, 0); // save it result->connect_handle (handle); - if (handle == ACE_INVALID_HANDLE) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT (" ACE_OS::socket failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT ("socket")), + -1); } // Reuse the address @@ -2548,11 +2523,11 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, sizeof one) == -1) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT (" ACE_OS::setsockopt failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT ("setsockopt")), + -1); } } @@ -2560,35 +2535,34 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, { sockaddr * laddr = reinterpret_cast<sockaddr *> (local_sap.get_addr ()); int size = local_sap.get_size (); - if (ACE_OS::bind (handle, laddr, size) == -1) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: %p\n"), - ACE_LIB_TEXT ("ACE_OS::bind")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT ("bind")), + -1); } } // set non blocking mode - if (ACE::set_flags (handle, ACE_NONBLOCK) != 0) { result->set_error (errno); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") - ACE_LIB_TEXT (" ACE::set_flags failed\n")), - -1); + ACE_ERROR_RETURN + ((LM_ERROR, + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"), + ACE_LIB_TEXT ("set_flags")), + -1); } for (;;) { - int rc = ACE_OS::connect (handle, - reinterpret_cast<sockaddr *> (remote_sap.get_addr ()), - remote_sap.get_size ()); + int rc = ACE_OS::connect + (handle, + reinterpret_cast<sockaddr *> (remote_sap.get_addr ()), + remote_sap.get_size ()); if (rc < 0) // failure { @@ -2607,7 +2581,7 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, } -//@@ New method cancel_uncompleted +// cancel_uncompleted // It performs cancellation of all pending requests // // Parameter flg_notify can be @@ -2620,14 +2594,14 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, // int -ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & set) +ACE_WIN32_Asynch_Connect::cancel_uncompleted (bool flg_notify, + ACE_Handle_Set &set) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel_uncompleted\n")); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::cancel_uncompleted"); int retval = 0; MAP_MANAGER::ITERATOR iter (result_map_); - MAP_MANAGER::ENTRY * me = 0; set.reset (); @@ -2652,118 +2626,83 @@ ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & s int ACE_WIN32_Asynch_Connect::cancel (void) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel\n")); - - //We are not really ACE_WIN32_Asynch_Operation - //so we could not call ::aiocancel () - // or just write - //return ACE_WIN32_Asynch_Operation::cancel (); - //We delegate real cancelation to cancel_uncompleted (1) + ACE_TRACE ("ACE_WIN32_Asynch_Connect::cancel"); int rc = -1 ; // ERRORS ACE_Handle_Set set; - + int num_cancelled = 0; { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - int num_cancelled = cancel_uncompleted (flg_open_, set); - - if (num_cancelled == 0) - rc = 1; // AIO_ALLDONE - else if (num_cancelled > 0) - rc = 0; // AIO_CANCELED - - if (this->flg_open_ == 0) - return rc; - - this->task_lock_count_++; + num_cancelled = cancel_uncompleted (flg_open_, set); } + if (num_cancelled == 0) + rc = 1; // AIO_ALLDONE + else if (num_cancelled > 0) + rc = 0; // AIO_CANCELED + + if (!this->flg_open_) + return rc; ACE_Asynch_Pseudo_Task & task = this->win32_proactor_->get_asynch_pseudo_task (); - int rc_task = task.remove_io_handler (set); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - } - + task.remove_io_handler (set); return rc; } int ACE_WIN32_Asynch_Connect::close (void) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::close\n")); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::close"); ACE_Handle_Set set; - + int num_cancelled = 0; { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - int num_cancelled = cancel_uncompleted (flg_open_, set); - - if (num_cancelled == 0 || this->flg_open_ == 0) - { - this->flg_open_ = 0; - return 0; - } - - this->task_lock_count_++; + num_cancelled = cancel_uncompleted (flg_open_, set); } + if (num_cancelled == 0 || this->flg_open_ == 0) + { + this->flg_open_ = false; + return 0; + } ACE_Asynch_Pseudo_Task & task = this->win32_proactor_->get_asynch_pseudo_task (); - int rc_task = task.remove_io_handler (set); - - { - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - - this->task_lock_count_--; - - if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN && - this->task_lock_count_ == 0) // task is closing - task.unlock_finish (); - - this->flg_open_ = 0; - } - + task.remove_io_handler (set); return 0; } int ACE_WIN32_Asynch_Connect::handle_exception (ACE_HANDLE fd) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_exception\n")); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_exception"); return handle_output (fd); } int ACE_WIN32_Asynch_Connect::handle_input (ACE_HANDLE fd) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_input\n")); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_input"); return handle_output (fd); } int ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd) { - ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_output\n")); - - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_output"); ACE_WIN32_Asynch_Connect_Result* result = 0; - if (this->result_map_.unbind (fd, result) != 0) // not found - return -1; + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + } int sockerror = 0 ; int lsockerror = sizeof sockerror; @@ -2791,38 +2730,19 @@ ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd) int ACE_WIN32_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask) { - ACE_TRACE(ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_close\n")); - - ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_close"); ACE_Asynch_Pseudo_Task & task = this->win32_proactor_->get_asynch_pseudo_task (); - - if (task.is_active () == 0) // task is closing - { - if (this->flg_open_ != 0) // we are open - { - this->flg_open_ = 0; - - // it means other thread is waiting for reactor token_ - if (this->task_lock_count_ > 0) - task.lock_finish (); - } - - ACE_Handle_Set set; - this->cancel_uncompleted (0, set); - - return 0; - } - - // remove_io_handler() contains flag DONT_CALL - // so it is save task.remove_io_handler (fd); ACE_WIN32_Asynch_Connect_Result* result = 0; - if (this->result_map_.unbind (fd, result) != 0) // not found - return -1; + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + } result->set_bytes_transferred (0); result->set_error (ERROR_OPERATION_ABORTED); diff --git a/ace/WIN32_Asynch_IO.h b/ace/WIN32_Asynch_IO.h index 53e9f0ba873..3efd39417d2 100644 --- a/ace/WIN32_Asynch_IO.h +++ b/ace/WIN32_Asynch_IO.h @@ -1307,13 +1307,13 @@ private: const ACE_Addr &local_sap, int reuse_addr); - int post_result (ACE_WIN32_Asynch_Connect_Result *result, int flg_post); + int post_result (ACE_WIN32_Asynch_Connect_Result *result, bool flg_post); /// Cancel uncompleted connect operations. /** * @param flg_notify Indicates whether or not to send notification about - * canceled connect operations. If 0, don't send - * notifications. If 1, notify user about canceled + * canceled connect operations. If false, don't send + * notifications. If true, notify user about canceled * connects. * According WIN32 standards we should receive * notifications on canceled AIO requests. @@ -1323,36 +1323,22 @@ private: * method. The contents of @a set are completely * replaced. */ - int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set); + int cancel_uncompleted (bool flg_notify, ACE_Handle_Set &set); - /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task - /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task - int flg_open_ ; - - /// To prevent ACE_Asynch_Pseudo_Task from deletion - /// while we make a call to the ACE_Asynch_Pseudo_Task - /// This is extra cost !!! - /// we could avoid them if all applications will follow the rule: - /// Proactor should be deleted only after deletion all - /// AsynchOperation objects connected with it - int task_lock_count_; + /// true - Connect is registered in ACE_Asynch_Pseudo_Task + /// false - Accept is deregisted in ACE_Asynch_Pseudo_Task + bool flg_open_ ; typedef ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> MAP_MANAGER; - // (Two) Deprecated typedefs. Use appropriate MAP_MANAGER traits - // instead. - typedef MAP_MANAGER::ITERATOR MAP_ITERATOR; - typedef MAP_MANAGER::ENTRY MAP_ENTRY; - /// Map of Result pointers that correspond to all the <accept>'s /// pending. MAP_MANAGER result_map_; - /// The lock to protect the result queue which is shared. The queue + /// The lock to protect the result map which is shared. The queue /// is updated by main thread in the register function call and - /// through the auxillary thread in the deregister fun. So let us - /// mutex it. + /// through the auxillary thread in the asynch pseudo task. ACE_SYNCH_MUTEX lock_; }; |