summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Huston <shuston@riverace.com>2005-02-28 22:11:39 +0000
committerSteve Huston <shuston@riverace.com>2005-02-28 22:11:39 +0000
commit85cfcf17aeb34239fc26c02bbccfba6d48e532e9 (patch)
tree697ad7816271e34002c17f44e48df9597635202d
parentf1feefef931c8ede1f421846013c412a1261402d (diff)
downloadATCD-85cfcf17aeb34239fc26c02bbccfba6d48e532e9.tar.gz
ChangeLogTag:Mon Feb 28 17:10:41 2005 Steve Huston <shuston@riverace.com>
-rw-r--r--ChangeLog19
-rw-r--r--ace/Asynch_Pseudo_Task.cpp195
-rw-r--r--ace/Asynch_Pseudo_Task.h24
-rw-r--r--ace/POSIX_Asynch_IO.cpp605
-rw-r--r--ace/POSIX_Asynch_IO.h51
-rw-r--r--ace/WIN32_Asynch_IO.cpp304
-rw-r--r--ace/WIN32_Asynch_IO.h32
7 files changed, 402 insertions, 828 deletions
diff --git a/ChangeLog b/ChangeLog
index f5884744127..f787268aa15 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,20 @@
+Mon Feb 28 17:10:41 2005 Steve Huston <shuston@riverace.com>
+
+ * ace/Asynch_Pseudo_Task.{h cpp}: Removed all the flg_active_ and
+ finish locking stuff. Use the thr_count() value to tell if the
+ thread is running, and don't try to interlock cleanup activities
+ with other classes. It's messy and doesn't work right. There are
+ too many race conditions between closing handles and closing down
+ this object.
+ Corrected ACE_LIB_TEXT use instead of
+ ACE_TEXT, and added missing commas between some strings.
+
+ * ace/POSIX_Asynch_IO.{h cpp}:
+ * ace/WIN32_Asynch_IO.{h cpp}: Don't try to interlock against the
+ Asynch_Pseudo_Task. If it's going, it's going. Only hold the lock
+ around access to the connection/handle map since that's accessed
+ from the asynch pseudo task thread as well as the caller's.
+
Mon Feb 28 09:59:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl>
* include/makeinclude/platform_hpux_aCC.GNU:
@@ -12,7 +29,7 @@ Mon Feb 28 09:59:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl>
Mon Feb 28 11:10:58 2005 Boris Kolpackov <boris@kolpackov.net>
* protocols/ace/RMCast/Acknowledge.h: Made Acknowledge::Queue
- a friend of Acknowledge. Hopefully this will help Sun C++ 5.4.
+ a friend of Acknowledge. Hopefully this will help Sun C++ 5.4.
Sun Feb 27 08:51:23 2005 Douglas C. Schmidt <schmidt@cs.wustl.edu>
diff --git a/ace/Asynch_Pseudo_Task.cpp b/ace/Asynch_Pseudo_Task.cpp
index 9f46a098547..ce936471e52 100644
--- a/ace/Asynch_Pseudo_Task.cpp
+++ b/ace/Asynch_Pseudo_Task.cpp
@@ -8,98 +8,39 @@
ACE_RCSID(ace, Asynch_Pseudo_Task, "$Id$")
ACE_Asynch_Pseudo_Task::ACE_Asynch_Pseudo_Task()
- : flg_active_ (0),
- select_reactor_ (), // should be initialized before reactor_
- reactor_ (&select_reactor_, 0), // don't delete implementation
- token_ (select_reactor_.lock ()), // we can use reactor token
- finish_count_ (0)
+ : select_reactor_ (), // should be initialized before reactor_
+ reactor_ (&select_reactor_, 0) // don't delete implementation
{
}
ACE_Asynch_Pseudo_Task::~ACE_Asynch_Pseudo_Task()
{
- stop();
-}
-
-int
-ACE_Asynch_Pseudo_Task::is_active (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
- return flg_active_;
+ this->stop();
}
int
ACE_Asynch_Pseudo_Task::start (void)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_)
- return 0;
-
if (this->reactor_.initialized () == 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start reactor is not initialized")),
+ ACE_LIB_TEXT ("start reactor is not initialized")),
-1);
-
- if (this->activate () != 0)
- return -1;
-
- this->flg_active_ = 1;
- return 0;
+ return this->activate () == -1 ? -1 : 0; // If started, return 0
}
int
ACE_Asynch_Pseudo_Task::stop (void)
{
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0) // already stopped
- return 0;
-
- reactor_.end_reactor_event_loop ();
- }
+ if (this->thr_count () == 0) // already stopped
+ return 0;
- if (-1 == this->wait ())
+ if (this->reactor_.end_reactor_event_loop () == -1)
return -1;
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
- this->flg_active_ = 0;
-
- this->reactor_.close ();
-
- while (this->finish_count_ > 0)
- {
- ACE_MT (ace_mon.release ());
- finish_event_.wait ();
-
- ACE_MT (ace_mon.acquire ());
- finish_event_.reset ();
- }
- }
-
- return 0;
-}
-
-int
-ACE_Asynch_Pseudo_Task::lock_finish (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
- finish_count_ ++;
- return 0;
-}
-
-int
-ACE_Asynch_Pseudo_Task::unlock_finish (void)
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- --finish_count_;
- finish_event_.signal ();
-
+ this->wait ();
+ this->reactor_.close ();
return 0;
}
@@ -110,30 +51,17 @@ ACE_Asynch_Pseudo_Task::svc (void)
sigset_t RT_signals;
- if (sigemptyset (&RT_signals) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
- ACE_LIB_TEXT ("sigemptyset failed")));
-
- int member = 0;
-
+ sigemptyset (&RT_signals);
for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
- {
- member = sigismember (& RT_signals , si);
- if (member == 1)
- {
- sigaddset (&RT_signals, si);
- }
- }
+ sigaddset (&RT_signals, si);
if (ACE_OS::pthread_sigmask (SIG_BLOCK, &RT_signals, 0) != 0)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
- ACE_LIB_TEXT ("pthread_sigmask failed")));
+ ACE_LIB_TEXT ("pthread_sigmask")));
#endif
reactor_.owner (ACE_Thread::self());
-
reactor_.run_reactor_event_loop ();
return 0;
@@ -147,36 +75,21 @@ ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle,
ACE_Reactor_Mask mask,
int flg_suspend)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0)
- {
- ACE_OS::last_error (ESHUTDOWN);
- return -1;
- }
-
// Register the handler with the reactor.
- int retval = this->reactor_.register_handler (handle, handler, mask);
-
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::register_io_handler")),
- -1);
+ if (-1 == this->reactor_.register_handler (handle, handler, mask))
+ return -1;
- if (flg_suspend == 0 )
+ if (flg_suspend == 0)
return 0;
- // Suspend the <handle> now. Enable only when the <accept> is issued
+ // Suspend the handle now. Enable only when the accept is issued
// by the application.
- retval = this->reactor_.suspend_handler (handle);
-
- if (retval == -1)
+ if (this->reactor_.suspend_handler (handle) == -1)
{
ACE_ERROR
((LM_ERROR,
ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::register_io_handler (suspended)")));
+ ACE_LIB_TEXT ("register_io_handler (suspended)")));
this->reactor_.remove_handler (handle,
ACE_Event_Handler::ALL_EVENTS_MASK
| ACE_Event_Handler::DONT_CALL);
@@ -189,83 +102,27 @@ ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle,
int
ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_HANDLE handle)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0)
- {
- ACE_OS::last_error (ESHUTDOWN);
- return -1;
- }
-
- int retval =
- this->reactor_.remove_handler (handle ,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n")
- ACE_TEXT ("ACE_Asynch_Pseudo_Task::remove_io_handler")),
- -1);
-
- return 0;
+ return this->reactor_.remove_handler (handle ,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
}
int
ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_Handle_Set &set)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0)
- {
- ACE_OS::last_error (ESHUTDOWN);
- return -1;
- }
-
- int retval =
- this->reactor_.remove_handler (set,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n")
- ACE_TEXT ("ACE_Asynch_Pseudo_Task::remove_io_handler")),
- -1);
-
- return 0;
+ return this->reactor_.remove_handler (set,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
}
int
ACE_Asynch_Pseudo_Task::suspend_io_handler (ACE_HANDLE handle)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0)
- {
- ACE_OS::last_error (ESHUTDOWN);
- return -1;
- }
-
- int retval = this->reactor_.suspend_handler (handle);
-
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n")
- ACE_TEXT ("ACE_Asynch_Pseudo_Task::suspend_io_handler")),
- -1);
-
- return 0;
+ return this->reactor_.suspend_handler (handle);
}
int
ACE_Asynch_Pseudo_Task::resume_io_handler (ACE_HANDLE handle)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
-
- if (this->flg_active_ == 0)
- {
- ACE_OS::last_error (ESHUTDOWN);
- return -1;
- }
-
return this->reactor_.resume_handler (handle);
}
diff --git a/ace/Asynch_Pseudo_Task.h b/ace/Asynch_Pseudo_Task.h
index 060bfa3223b..55313b00505 100644
--- a/ace/Asynch_Pseudo_Task.h
+++ b/ace/Asynch_Pseudo_Task.h
@@ -24,31 +24,21 @@
#include "ace/Reactor.h"
#include "ace/Select_Reactor.h"
#include "ace/Task.h"
-#include "ace/Manual_Event.h"
/**
* @class ACE_Asynch_Pseudo_Task
*
*/
-class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_SYNCH>
+class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_NULL_SYNCH>
{
- friend class ACE_POSIX_Asynch_Accept;
- friend class ACE_POSIX_Asynch_Connect;
- friend class ACE_WIN32_Asynch_Connect;
-
public:
-
ACE_Asynch_Pseudo_Task();
virtual ~ACE_Asynch_Pseudo_Task();
int start (void);
int stop (void);
- virtual int svc (void);
-
- int is_active (void);
-
int register_io_handler (ACE_HANDLE handle,
ACE_Event_Handler *handler,
ACE_Reactor_Mask mask,
@@ -60,21 +50,11 @@ public:
int suspend_io_handler (ACE_HANDLE handle);
protected:
-
- int lock_finish (void);
- int unlock_finish (void);
-
- int flg_active_;
+ virtual int svc (void);
ACE_Select_Reactor select_reactor_;
// should be initialized before reactor_
-
ACE_Reactor reactor_;
-
- ACE_Lock &token_;
-
- int finish_count_;
- ACE_Manual_Event finish_event_;
};
#include /**/ "ace/post.h"
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index 7acb9766f95..5b7416458b6 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -804,15 +804,14 @@ ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_pro
: ACE_Asynch_Operation_Impl (),
ACE_Asynch_Accept_Impl (),
ACE_POSIX_Asynch_Operation (posix_proactor),
- flg_open_ (0),
- task_lock_count_ (0)
+ flg_open_ (false)
{
}
ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void)
{
this->close ();
- this->reactor(0); // to avoid purge_pending_notifications
+ this->reactor (0); // to avoid purge_pending_notifications
}
ACE_HANDLE
@@ -836,47 +835,31 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler::Proxy_Ptr &handler_proxy,
{
ACE_TRACE ("ACE_POSIX_Asynch_Accept::open");
- int result=0;
-
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
// if we are already opened,
// we could not create a new handler without closing the previous
-
- if (this->flg_open_ != 0)
+ if (this->flg_open_)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
ACE_LIB_TEXT("acceptor already open \n")),
-1);
- result = ACE_POSIX_Asynch_Operation::open (handler_proxy,
- handle,
- completion_key,
- proactor);
- if (result == -1)
- return result;
-
- flg_open_ = 1;
+ if (-1 == ACE_POSIX_Asynch_Operation::open (handler_proxy,
+ handle,
+ completion_key,
+ proactor))
+ return -1;
- this->task_lock_count_++;
+ flg_open_ = true;
- // At this moment asynch_accept_task does not know about us,
- // so we can lock task's token with our lock_ locked.
- // In all other cases we should release our lock_ before
- // calling task's methods to avoid deadlock
ACE_Asynch_Pseudo_Task & task =
- this->posix_proactor()->get_asynch_pseudo_task();
-
- result = task.register_io_handler (this->get_handle(),
- this,
- ACE_Event_Handler::ACCEPT_MASK,
- 1); // suspend after register
-
- this->task_lock_count_-- ;
+ this->posix_proactor ()->get_asynch_pseudo_task ();
- if (result < 0)
+ if (-1 == task.register_io_handler (this->get_handle(),
+ this,
+ ACE_Event_Handler::ACCEPT_MASK,
+ 1)) // suspend after register
{
- this->flg_open_= 0;
+ this->flg_open_= false;
this->handle_ = ACE_INVALID_HANDLE;
return -1 ;
}
@@ -895,49 +878,48 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
{
ACE_TRACE ("ACE_POSIX_Asynch_Accept::accept");
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- if (this->flg_open_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
- ACE_LIB_TEXT("acceptor was not opened before\n")),
- -1);
+ if (!this->flg_open_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
+ ACE_LIB_TEXT("acceptor was not opened before\n")),
+ -1);
- // Sanity check: make sure that enough space has been allocated by
- // the caller.
- size_t address_size = sizeof (sockaddr_in);
+ // Sanity check: make sure that enough space has been allocated by
+ // the caller.
+ size_t address_size = sizeof (sockaddr_in);
#if defined (ACE_HAS_IPV6)
- if (addr_family == AF_INET6)
- address_size = sizeof (sockaddr_in6);
+ if (addr_family == AF_INET6)
+ address_size = sizeof (sockaddr_in6);
#else
- ACE_UNUSED_ARG (addr_family);
+ ACE_UNUSED_ARG (addr_family);
#endif
- size_t available_space = message_block.space ();
- size_t space_needed = bytes_to_read + 2 * address_size;
-
- if (available_space < space_needed)
- {
- ACE_OS::last_error (ENOBUFS);
- return -1;
- }
-
- // Common code for both WIN and POSIX.
- // Create future Asynch_Accept_Result
- ACE_POSIX_Asynch_Accept_Result *result = 0;
- ACE_NEW_RETURN (result,
- ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_,
- this->handle_,
- accept_handle,
- message_block,
- bytes_to_read,
- act,
- this->posix_proactor()->get_handle (),
- priority,
- signal_number),
+ size_t available_space = message_block.space ();
+ size_t space_needed = bytes_to_read + 2 * address_size;
+
+ if (available_space < space_needed)
+ {
+ ACE_OS::last_error (ENOBUFS);
+ return -1;
+ }
+
+ // Common code for both WIN and POSIX.
+ // Create future Asynch_Accept_Result
+ ACE_POSIX_Asynch_Accept_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_,
+ this->handle_,
+ accept_handle,
+ message_block,
+ bytes_to_read,
+ act,
+ this->posix_proactor()->get_handle (),
+ priority,
+ signal_number),
-1);
- // Enqueue result
+ // Enqueue result
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
if (this->result_queue_.enqueue_tail (result) == -1)
{
ACE_ERROR ((LM_ERROR,
@@ -949,8 +931,6 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
if (this->result_queue_.size () > 1)
return 0;
-
- this->task_lock_count_ ++;
}
// If this is the only item, then it means there the set was empty
@@ -959,22 +939,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.resume_io_handler (this->get_handle());
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_ --;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
- }
-
- if (rc_task < 0)
- return -1;
-
- return 0;
+ return task.resume_io_handler (this->get_handle ());
}
//@@ New method cancel_uncompleted
@@ -1016,9 +981,9 @@ ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify)
if (this->posix_proactor ()->post_completion (result) == -1)
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT("(%P | %t):%p\n"),
ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::")
- ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed")
+ ACE_LIB_TEXT("cancel_uncompleted")
));
}
}
@@ -1030,11 +995,9 @@ ACE_POSIX_Asynch_Accept::cancel (void)
{
ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel");
- //We are not really ACE_POSIX_Asynch_Operation
- //so we could not call ::aiocancel ()
- // or just write
- //return ACE_POSIX_Asynch_Operation::cancel ();
- //We delegate real cancelation to cancel_uncompleted (1)
+ // Since this is not a real POSIX asynch I/O operation, we can't
+ // call ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
+ // We delegate real cancelation to cancel_uncompleted (1)
int rc = -1 ; // ERRORS
@@ -1044,32 +1007,19 @@ ACE_POSIX_Asynch_Accept::cancel (void)
int num_cancelled = cancel_uncompleted (flg_open_);
if (num_cancelled == 0)
- rc = 1 ; // AIO_ALLDONE
+ rc = 1 ; // AIO_ALLDONE
else if (num_cancelled > 0)
- rc = 0 ; // AIO_CANCELED
-
- if (this->flg_open_ == 0)
- return rc ;
+ rc = 0 ; // AIO_CANCELED
- this->task_lock_count_++;
+ if (!this->flg_open_)
+ return rc ;
}
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.suspend_io_handler (this->get_handle());
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
- }
-
- return rc;
+ task.suspend_io_handler (this->get_handle());
+ return 0;
}
int
@@ -1092,47 +1042,33 @@ ACE_POSIX_Asynch_Accept::close ()
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
this->cancel_uncompleted (flg_open_);
+ }
- if (this->flg_open_ == 0)
- {
- if (this->handle_ != ACE_INVALID_HANDLE)
- {
- ACE_OS::closesocket (this->handle_);
- this->handle_ = ACE_INVALID_HANDLE;
- }
- return 0;
- }
-
- if (this->handle_ == ACE_INVALID_HANDLE)
+ if (!this->flg_open_)
+ {
+ if (this->handle_ != ACE_INVALID_HANDLE)
+ {
+ ACE_OS::closesocket (this->handle_);
+ this->handle_ = ACE_INVALID_HANDLE;
+ }
return 0;
+ }
- this->task_lock_count_++;
- }
+ if (this->handle_ == ACE_INVALID_HANDLE)
+ return 0;
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.remove_io_handler (this->get_handle ());
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
-
- if (this->handle_ != ACE_INVALID_HANDLE)
- {
- ACE_OS::closesocket (this->handle_);
- this->handle_ = ACE_INVALID_HANDLE;
- }
+ task.remove_io_handler (this->get_handle ());
+ if (this->handle_ != ACE_INVALID_HANDLE)
+ {
+ ACE_OS::closesocket (this->handle_);
+ this->handle_ = ACE_INVALID_HANDLE;
+ }
- this->flg_open_ = 0;
- }
+ this->flg_open_ = false;
return 0;
}
@@ -1144,27 +1080,14 @@ ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
- // handle_close is called only in two cases:
+ // handle_close is called in two cases:
// 1. Pseudo task is closing (i.e. proactor destructor)
// 2. The listen handle is closed (we don't have exclusive access to this)
- //
- // In all other cases we deregister ourself
- // with ACE_Event_Handler::DONT_CALL mask
this->cancel_uncompleted (0);
- this->flg_open_ = 0;
+ this->flg_open_ = false;
this->handle_ = ACE_INVALID_HANDLE;
-
- // it means other thread is waiting for reactor token_
- if (this->task_lock_count_ > 0)
- {
- ACE_Asynch_Pseudo_Task & task =
- this->posix_proactor ()->get_asynch_pseudo_task ();
-
- task.lock_finish ();
- }
-
return 0;
}
@@ -1177,30 +1100,27 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
// able to just go ahead and do the <accept> now on this <fd>. This
// should be the same as the <listen_handle>.
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
-
ACE_POSIX_Asynch_Accept_Result* result = 0;
- // Deregister this info pertaining to this <accept> call.
- if (this->result_queue_.dequeue_head (result) != 0)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
- ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
- ACE_LIB_TEXT( " dequeueing failed")));
-
- // Disable the <handle> in the reactor if no <accept>'s are pending.
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
- // we allow the following sequence of locks :
- // reactor::token , then our mutex lock_
- // to avoid deadlock prohibited reverse sequence
+ // Deregister this info pertaining to this accept call.
+ if (this->result_queue_.dequeue_head (result) != 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
+ ACE_LIB_TEXT( " dequeueing failed")));
- if (this->result_queue_.size () == 0)
- {
- ACE_Asynch_Pseudo_Task & task =
- this->posix_proactor ()->get_asynch_pseudo_task ();
+ // Disable the handle in the reactor if no more accepts are pending.
+ if (this->result_queue_.size () == 0)
+ {
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
- task.suspend_io_handler (this->get_handle());
- }
+ task.suspend_io_handler (this->get_handle());
+ }
+ }
// Issue <accept> now.
// @@ We shouldnt block here since we have already done poll/select
@@ -1216,11 +1136,11 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
if (new_handle == ACE_INVALID_HANDLE)
{
- result->set_error(errno);
+ result->set_error (errno);
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
- ACE_LIB_TEXT(" <accept> system call failed")));
+ ACE_LIB_TEXT("accept")));
// Notify client as usual, "AIO" finished with errors
}
@@ -1301,8 +1221,7 @@ ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor * posix_p
: ACE_Asynch_Operation_Impl (),
ACE_Asynch_Connect_Impl (),
ACE_POSIX_Asynch_Operation (posix_proactor),
- flg_open_ (0),
- task_lock_count_ (0)
+ flg_open_ (false)
{
}
@@ -1315,7 +1234,6 @@ ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void)
ACE_HANDLE
ACE_POSIX_Asynch_Connect::get_handle (void) const
{
-
ACE_ASSERT (0);
return ACE_INVALID_HANDLE;
}
@@ -1334,16 +1252,8 @@ ACE_POSIX_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy,
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::open");
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- // if we are already opened,
- // we could not create a new handler without closing the previous
-
- if (this->flg_open_ != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::open:")
- ACE_LIB_TEXT("connector already open \n")),
- -1);
+ if (this->flg_open_)
+ return -1;
//int result =
ACE_POSIX_Asynch_Operation::open (handler_proxy,
@@ -1355,7 +1265,7 @@ ACE_POSIX_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy,
//if (result == -1)
// return result;
- this->flg_open_ = 1;
+ this->flg_open_ = true;
return 0;
}
@@ -1371,99 +1281,86 @@ ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle,
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::connect");
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+ if (this->flg_open_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
+ ACE_LIB_TEXT("connector was not opened before\n")),
+ -1);
- if (this->flg_open_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
- ACE_LIB_TEXT("connector was not opened before\n")),
- -1);
-
- // Common code for both WIN and POSIX.
- // Create future Asynch_Connect_Result
- ACE_POSIX_Asynch_Connect_Result *result = 0;
- ACE_NEW_RETURN (result,
- ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_,
- connect_handle,
- act,
- this->posix_proactor ()->get_handle (),
- priority,
- signal_number),
- -1);
+ // Common code for both WIN and POSIX.
+ // Create future Asynch_Connect_Result
+ ACE_POSIX_Asynch_Connect_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_,
+ connect_handle,
+ act,
+ this->posix_proactor ()->get_handle (),
+ priority,
+ signal_number),
+ -1);
- int rc = connect_i (result,
- remote_sap,
- local_sap,
- reuse_addr);
+ int rc = connect_i (result,
+ remote_sap,
+ local_sap,
+ reuse_addr);
- // update handle
- connect_handle = result->connect_handle ();
+ // update handle
+ connect_handle = result->connect_handle ();
- if (rc != 0)
- return post_result (result, 1);
+ if (rc != 0)
+ return post_result (result, true);
- // Enqueue result we will wait for completion
+ // Enqueue result we will wait for completion
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
if (this->result_map_.bind (connect_handle, result) == -1)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect:")
- ACE_LIB_TEXT("result map binding failed\n")));
+ ACE_LIB_TEXT ("%N:%l:%p\n"),
+ ACE_LIB_TEXT ("ACE_POSIX_Asynch_Connect::connect:")
+ ACE_LIB_TEXT ("bind")));
result->set_error (EFAULT);
- return post_result (result, 1);
+ return post_result (result, true);
}
-
- this->task_lock_count_ ++;
}
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.register_io_handler (connect_handle,
- this,
- ACE_Event_Handler::CONNECT_MASK,
- 0); // not to suspend after register
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_ --;
-
- int post_enable = 1;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- {
- post_enable = 0;
- task.unlock_finish ();
- }
-
- if (rc_task < 0)
+ rc = task.register_io_handler (connect_handle,
+ this,
+ ACE_Event_Handler::CONNECT_MASK,
+ 0); // don't suspend after register
+ if (rc < 0)
+ {
{
- ACE_POSIX_Asynch_Connect_Result *result = 0;
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
this->result_map_.unbind (connect_handle, result);
-
- if (result != 0)
- {
- result->set_error (EFAULT);
-
- return post_result (result, post_enable);
- }
}
- }
+ if (result != 0)
+ {
+ result->set_error (EFAULT);
+ this->post_result (result, true);
+ }
+ return -1;
+ }
+ else
+ result = 0;
+
return 0;
}
int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result,
- int post_enable)
+ bool post_enable)
{
- if (this->flg_open_ != 0 && post_enable != 0)
+ if (this->flg_open_ && post_enable != 0)
{
if (this->posix_proactor ()->post_completion (result) == 0)
- return 0 ;
+ return 0;
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
@@ -1481,7 +1378,7 @@ int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * res
return -1;
}
-//@@ New method connect_i
+//connect_i
// return code :
// -1 errors before attempt to connect
// 0 connect started
@@ -1506,33 +1403,32 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
0);
// save it
result->connect_handle (handle);
-
if (handle == ACE_INVALID_HANDLE)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT(" ACE_OS::socket failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT("socket")),
+ -1);
}
// Reuse the address
int one = 1;
- if (protocol_family != PF_UNIX &&
- reuse_addr != 0 &&
- ACE_OS::setsockopt (handle,
- SOL_SOCKET,
- SO_REUSEADDR,
- (const char*) &one,
- sizeof one) == -1 )
+ if (protocol_family != PF_UNIX &&
+ reuse_addr != 0 &&
+ ACE_OS::setsockopt (handle,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ (const char*) &one,
+ sizeof one) == -1 )
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT(" ACE_OS::setsockopt failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT("setsockopt")),
+ -1);
}
}
@@ -1544,11 +1440,11 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
if (ACE_OS::bind (handle, laddr, size) == -1)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT(" ACE_OS::bind failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT("bind")),
+ -1);
}
}
@@ -1556,19 +1452,19 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
{
result->set_error (errno);
-
ACE_ERROR_RETURN
((LM_ERROR,
- ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i, %p\n")
- ACE_LIB_TEXT("ACE::set_flags failed")),
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n")
+ ACE_LIB_TEXT("set_flags")),
-1);
}
for (;;)
{
- int rc = ACE_OS::connect (handle,
- reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
- remote_sap.get_size ());
+ int rc = ACE_OS::connect
+ (handle,
+ reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
+ remote_sap.get_size ());
if (rc < 0) // failure
{
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
@@ -1600,7 +1496,7 @@ ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
//
int
-ACE_POSIX_Asynch_Connect::cancel_uncompleted (int flg_notify,
+ACE_POSIX_Asynch_Connect::cancel_uncompleted (bool flg_notify,
ACE_Handle_Set & set)
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel_uncompleted");
@@ -1634,47 +1530,30 @@ ACE_POSIX_Asynch_Connect::cancel (void)
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel");
- //We are not really ACE_POSIX_Asynch_Operation
- //so we could not call ::aiocancel ()
- // or just write
- //return ACE_POSIX_Asynch_Operation::cancel ();
- //We delegate real cancelation to cancel_uncompleted (1)
+ // Since this is not a real asynch I/O operation, we can't just call
+ // ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
+ // Delegate real cancelation to cancel_uncompleted (1)
int rc = -1 ; // ERRORS
ACE_Handle_Set set;
-
+ int num_cancelled = 0;
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- int num_cancelled = cancel_uncompleted (flg_open_, set);
-
- if (num_cancelled == 0)
- rc = 1 ; // AIO_ALLDONE
- else if (num_cancelled > 0)
- rc = 0 ; // AIO_CANCELED
-
- if (this->flg_open_ == 0)
- return rc ;
-
- this->task_lock_count_++;
+ num_cancelled = cancel_uncompleted (flg_open_, set);
}
+ if (num_cancelled == 0)
+ rc = 1 ; // AIO_ALLDONE
+ else if (num_cancelled > 0)
+ rc = 0 ; // AIO_CANCELED
+
+ if (!this->flg_open_)
+ return rc ;
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.remove_io_handler (set);
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
- }
-
+ task.remove_io_handler (set);
return rc;
}
@@ -1684,67 +1563,39 @@ ACE_POSIX_Asynch_Connect::close (void)
ACE_TRACE ("ACE_POSIX_Asynch_Connect::close");
ACE_Handle_Set set ;
-
+ int num_cancelled = 0;
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- int num_cancelled = cancel_uncompleted (flg_open_, set);
-
- if (num_cancelled == 0 || this->flg_open_ == 0)
- {
- this->flg_open_ = 0;
- return 0;
- }
-
- this->task_lock_count_++;
+ num_cancelled = cancel_uncompleted (flg_open_, set);
}
+ if (num_cancelled == 0 || !this->flg_open_)
+ {
+ this->flg_open_ = false;
+ return 0;
+ }
+
ACE_Asynch_Pseudo_Task & task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.remove_io_handler (set);
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
-
- this->flg_open_ = 0;
- }
+ task.remove_io_handler (set);
+ this->flg_open_ = false;
return 0;
}
int
-ACE_POSIX_Asynch_Connect::handle_exception (ACE_HANDLE fd)
-{
- ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_exception");
- return handle_input (fd);
-}
-
-int
-ACE_POSIX_Asynch_Connect::handle_input (ACE_HANDLE fd)
-{
- ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_input");
-
- return handle_input (fd);
-}
-
-int
ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd)
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_output");
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
-
ACE_POSIX_Asynch_Connect_Result* result = 0;
- if (this->result_map_.unbind (fd, result) != 0) // not found
- return -1;
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+ }
int sockerror = 0 ;
int lsockerror = sizeof sockerror;
@@ -1774,36 +1625,18 @@ ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
{
ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_close");
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
-
ACE_Asynch_Pseudo_Task &task =
this->posix_proactor ()->get_asynch_pseudo_task ();
- if (task.is_active() == 0) // task is closing
- {
- if (this->flg_open_ !=0) // we are open
- {
- this->flg_open_ = 0;
-
- // it means other thread is waiting for reactor token_
- if (this->task_lock_count_ > 0)
- task.lock_finish ();
- }
-
- ACE_Handle_Set set;
- this->cancel_uncompleted (0, set);
-
- return 0;
- }
-
- // remove_io_handler() contains flag DONT_CALL
- // so it is save
task.remove_io_handler (fd);
ACE_POSIX_Asynch_Connect_Result* result = 0;
- if (this->result_map_.unbind (fd, result) != 0 ) // not found
- return -1;
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+ }
result->set_bytes_transferred (0);
result->set_error (ECANCELED);
diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h
index ebb4e9bdbb1..45bc94cc7dc 100644
--- a/ace/POSIX_Asynch_IO.h
+++ b/ace/POSIX_Asynch_IO.h
@@ -735,17 +735,9 @@ private:
/// on canceled AIO requests
int cancel_uncompleted (int flg_notify);
- /// 1 - Accept is registered in ACE_Asynch_Pseudo_Task
- /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
- int flg_open_ ;
-
- /// To prevent ACE_Asynch_Pseudo_Task from deletion
- /// while we make a call to the ACE_Asynch_Pseudo_Task
- /// This is extra cost !!!
- /// we could avoid them if all applications will follow the rule:
- /// Proactor should be deleted only after deletion all
- /// AsynchOperation objects connected with it
- int task_lock_count_;
+ /// true - Accept is registered in ACE_Asynch_Pseudo_Task
+ /// false - Accept is deregisted in ACE_Asynch_Pseudo_Task
+ bool flg_open_ ;
/// Queue of Result pointers that correspond to all the pending
/// accept operations.
@@ -867,10 +859,10 @@ public:
void set_handle (ACE_HANDLE handle);
/// virtual from ACE_Event_Handler
- /// Called when accept event comes up on <listen_hanlde>
- int handle_input (ACE_HANDLE handle);
+ /// The default action on handle_input() and handle_exception is to
+ /// return -1. Since that's what we want to do, just reuse them.
+ /// handle_output(), however, is where successful connects are reported.
int handle_output (ACE_HANDLE handle);
- int handle_exception (ACE_HANDLE handle);
/// virtual from ACE_Event_Handler
int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ;
@@ -881,31 +873,22 @@ private:
const ACE_Addr & local_sap,
int reuse_addr);
- int post_result (ACE_POSIX_Asynch_Connect_Result *result, int flg_post);
+ int post_result (ACE_POSIX_Asynch_Connect_Result *result, bool flg_post);
/// Cancel uncompleted connect operations.
/**
* @arg flg_notify Indicates whether or not we should send notification
- * about canceled accepts. If this is 0, don't send
- * notifications about canceled connects. If 1, notify
+ * about canceled accepts. If this is false, don't send
+ * notifications about canceled connects. If true, notify
* user about canceled connects according POSIX
* standards we should receive notifications on canceled
* AIO requests.
*/
- int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set);
-
- int flg_open_ ;
- /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task
- /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
-
+ int cancel_uncompleted (bool flg_notify, ACE_Handle_Set &set);
- /// to prevent ACE_Asynch_Pseudo_Task from deletion
- /// while we make a call to the ACE_Asynch_Pseudo_Task
- /// This is extra cost !!!
- /// we could avoid them if all applications will follow the rule:
- /// Proactor should be deleted only after deletion all
- /// AsynchOperation objects connected with it
- int task_lock_count_;
+ bool flg_open_ ;
+ /// true - Connect is registered in ACE_Asynch_Pseudo_Task
+ /// false - Aceept is deregisted in ACE_Asynch_Pseudo_Task
typedef ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
MAP_MANAGER;
@@ -914,14 +897,12 @@ private:
typedef MAP_MANAGER::ITERATOR MAP_ITERATOR;
typedef MAP_MANAGER::ENTRY MAP_ENTRY;
- /// Map of Result pointers that correspond to all the <accept>'s
- /// pending.
+ /// Map of Result pointers that correspond to all the pending connects.
MAP_MANAGER result_map_;
- /// The lock to protect the result queue which is shared. The queue
+ /// The lock to protect the result map which is shared. The queue
/// is updated by main thread in the register function call and
- /// through the auxillary thread in the deregister fun. So let us
- /// mutex it.
+ /// through the auxillary thread in the asynch pseudo task.
ACE_SYNCH_MUTEX lock_;
};
diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp
index 239d0c69388..5b5c2eb87f3 100644
--- a/ace/WIN32_Asynch_IO.cpp
+++ b/ace/WIN32_Asynch_IO.cpp
@@ -2352,13 +2352,11 @@ ACE_WIN32_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy,
const void *completion_key,
ACE_Proactor *proactor)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::open\n"));
-
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::open");
// if we are already opened,
// we could not create a new handler without closing the previous
- if (this->flg_open_ != 0)
+ if (this->flg_open_)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::open:")
ACE_LIB_TEXT ("connector already open \n")),
@@ -2374,7 +2372,7 @@ ACE_WIN32_Asynch_Connect::open (ACE_Handler::Proxy_Ptr &handler_proxy,
//if (result == -1)
// return result;
- this->flg_open_ = 1;
+ this->flg_open_ = true;
return 0;
}
@@ -2388,98 +2386,78 @@ ACE_WIN32_Asynch_Connect::connect (ACE_HANDLE connect_handle,
int priority,
int signal_number)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect\n"));
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::connect");
- if (this->flg_open_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect")
- ACE_LIB_TEXT ("connector was not opened before\n")),
- -1);
+ if (!this->flg_open_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect")
+ ACE_LIB_TEXT ("connector was not opened before\n")),
+ -1);
- // Common code for both WIN and WIN32.
- // Create future Asynch_Connect_Result
- ACE_WIN32_Asynch_Connect_Result *result = 0;
- ACE_NEW_RETURN (result,
- ACE_WIN32_Asynch_Connect_Result (this->handler_proxy_,
- connect_handle,
- act,
- this->win32_proactor_->get_handle (),
- priority,
- signal_number),
- -1);
+ // Common code for both WIN and WIN32.
+ // Create future Asynch_Connect_Result
+ ACE_WIN32_Asynch_Connect_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_WIN32_Asynch_Connect_Result (this->handler_proxy_,
+ connect_handle,
+ act,
+ this->win32_proactor_->get_handle (),
+ priority,
+ signal_number),
+ -1);
- int rc = connect_i (result,
- remote_sap,
- local_sap,
- reuse_addr);
+ int rc = connect_i (result,
+ remote_sap,
+ local_sap,
+ reuse_addr);
- // update handle
- connect_handle = result->connect_handle ();
+ // update handle
+ connect_handle = result->connect_handle ();
- if (rc != 0)
- return post_result (result, 1);
+ if (rc != 0)
+ return post_result (result, true);
- // Enqueue result we will wait for completion
+ // Enqueue result we will wait for completion
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
if (this->result_map_.bind (connect_handle, result) == -1)
{
ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect:")
- ACE_LIB_TEXT ("result map binding failed\n")));
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect: %p\n"),
+ ACE_LIB_TEXT ("bind")));
result->set_error (EFAULT);
- return post_result (result, 1);
+ return post_result (result, true);
}
-
- this->task_lock_count_++;
}
ACE_Asynch_Pseudo_Task & task =
this->win32_proactor_->get_asynch_pseudo_task ();
- int rc_task = task.register_io_handler (connect_handle,
- this,
- ACE_Event_Handler::CONNECT_MASK,
- 0); // not to suspend after register
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- int post_enable = 1;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- {
- post_enable = 0;
- task.unlock_finish ();
- }
-
- if (rc_task < 0)
+ if (-1 == task.register_io_handler (connect_handle,
+ this,
+ ACE_Event_Handler::CONNECT_MASK,
+ 0)) // not to suspend after register
+ {
+ result = 0;
{
- ACE_WIN32_Asynch_Connect_Result *result = 0;
-
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
this->result_map_.unbind (connect_handle, result);
-
- if (result != 0)
- {
- result->set_error (EFAULT);
-
- return post_result (result, post_enable);
- }
}
- }
+ if (result != 0)
+ {
+ result->set_error (EFAULT);
+ this->post_result (result, true);
+ }
+ }
return 0;
}
int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * result,
- int post_enable)
+ bool post_enable)
{
- if (this->flg_open_ != 0 && post_enable != 0)
+ if (this->flg_open_ && post_enable)
{
if (this->win32_proactor_ ->post_completion (result) == 0)
return 0;
@@ -2500,7 +2478,7 @@ int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * res
return -1;
}
-//@@ New method connect_i
+// connect_i
// return code :
// -1 errors before attempt to connect
// 0 connect started
@@ -2515,26 +2493,23 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
result->set_bytes_transferred (0);
ACE_HANDLE handle = result->connect_handle ();
-
if (handle == ACE_INVALID_HANDLE)
{
int protocol_family = remote_sap.get_type ();
-
handle = ACE_OS::socket (protocol_family,
SOCK_STREAM,
0);
// save it
result->connect_handle (handle);
-
if (handle == ACE_INVALID_HANDLE)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT (" ACE_OS::socket failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT ("socket")),
+ -1);
}
// Reuse the address
@@ -2548,11 +2523,11 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
sizeof one) == -1)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT (" ACE_OS::setsockopt failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT ("setsockopt")),
+ -1);
}
}
@@ -2560,35 +2535,34 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
{
sockaddr * laddr = reinterpret_cast<sockaddr *> (local_sap.get_addr ());
int size = local_sap.get_size ();
-
if (ACE_OS::bind (handle, laddr, size) == -1)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: %p\n"),
- ACE_LIB_TEXT ("ACE_OS::bind")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT ("bind")),
+ -1);
}
}
// set non blocking mode
-
if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
{
result->set_error (errno);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
- ACE_LIB_TEXT (" ACE::set_flags failed\n")),
- -1);
+ ACE_ERROR_RETURN
+ ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect_i: %p\n"),
+ ACE_LIB_TEXT ("set_flags")),
+ -1);
}
for (;;)
{
- int rc = ACE_OS::connect (handle,
- reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
- remote_sap.get_size ());
+ int rc = ACE_OS::connect
+ (handle,
+ reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
+ remote_sap.get_size ());
if (rc < 0) // failure
{
@@ -2607,7 +2581,7 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
}
-//@@ New method cancel_uncompleted
+// cancel_uncompleted
// It performs cancellation of all pending requests
//
// Parameter flg_notify can be
@@ -2620,14 +2594,14 @@ ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
//
int
-ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & set)
+ACE_WIN32_Asynch_Connect::cancel_uncompleted (bool flg_notify,
+ ACE_Handle_Set &set)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel_uncompleted\n"));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::cancel_uncompleted");
int retval = 0;
MAP_MANAGER::ITERATOR iter (result_map_);
-
MAP_MANAGER::ENTRY * me = 0;
set.reset ();
@@ -2652,118 +2626,83 @@ ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & s
int
ACE_WIN32_Asynch_Connect::cancel (void)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel\n"));
-
- //We are not really ACE_WIN32_Asynch_Operation
- //so we could not call ::aiocancel ()
- // or just write
- //return ACE_WIN32_Asynch_Operation::cancel ();
- //We delegate real cancelation to cancel_uncompleted (1)
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::cancel");
int rc = -1 ; // ERRORS
ACE_Handle_Set set;
-
+ int num_cancelled = 0;
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- int num_cancelled = cancel_uncompleted (flg_open_, set);
-
- if (num_cancelled == 0)
- rc = 1; // AIO_ALLDONE
- else if (num_cancelled > 0)
- rc = 0; // AIO_CANCELED
-
- if (this->flg_open_ == 0)
- return rc;
-
- this->task_lock_count_++;
+ num_cancelled = cancel_uncompleted (flg_open_, set);
}
+ if (num_cancelled == 0)
+ rc = 1; // AIO_ALLDONE
+ else if (num_cancelled > 0)
+ rc = 0; // AIO_CANCELED
+
+ if (!this->flg_open_)
+ return rc;
ACE_Asynch_Pseudo_Task & task =
this->win32_proactor_->get_asynch_pseudo_task ();
- int rc_task = task.remove_io_handler (set);
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
- }
-
+ task.remove_io_handler (set);
return rc;
}
int
ACE_WIN32_Asynch_Connect::close (void)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::close\n"));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::close");
ACE_Handle_Set set;
-
+ int num_cancelled = 0;
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- int num_cancelled = cancel_uncompleted (flg_open_, set);
-
- if (num_cancelled == 0 || this->flg_open_ == 0)
- {
- this->flg_open_ = 0;
- return 0;
- }
-
- this->task_lock_count_++;
+ num_cancelled = cancel_uncompleted (flg_open_, set);
}
+ if (num_cancelled == 0 || this->flg_open_ == 0)
+ {
+ this->flg_open_ = false;
+ return 0;
+ }
ACE_Asynch_Pseudo_Task & task =
this->win32_proactor_->get_asynch_pseudo_task ();
- int rc_task = task.remove_io_handler (set);
-
- {
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
-
- this->task_lock_count_--;
-
- if (rc_task == -1 && ACE_OS::last_error () == ESHUTDOWN &&
- this->task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
-
- this->flg_open_ = 0;
- }
-
+ task.remove_io_handler (set);
return 0;
}
int
ACE_WIN32_Asynch_Connect::handle_exception (ACE_HANDLE fd)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_exception\n"));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_exception");
return handle_output (fd);
}
int
ACE_WIN32_Asynch_Connect::handle_input (ACE_HANDLE fd)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_input\n"));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_input");
return handle_output (fd);
}
int
ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd)
{
- ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_output\n"));
-
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_output");
ACE_WIN32_Asynch_Connect_Result* result = 0;
- if (this->result_map_.unbind (fd, result) != 0) // not found
- return -1;
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+ }
int sockerror = 0 ;
int lsockerror = sizeof sockerror;
@@ -2791,38 +2730,19 @@ ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd)
int
ACE_WIN32_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
{
- ACE_TRACE(ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_close\n"));
-
- ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ ACE_TRACE ("ACE_WIN32_Asynch_Connect::handle_close");
ACE_Asynch_Pseudo_Task & task =
this->win32_proactor_->get_asynch_pseudo_task ();
-
- if (task.is_active () == 0) // task is closing
- {
- if (this->flg_open_ != 0) // we are open
- {
- this->flg_open_ = 0;
-
- // it means other thread is waiting for reactor token_
- if (this->task_lock_count_ > 0)
- task.lock_finish ();
- }
-
- ACE_Handle_Set set;
- this->cancel_uncompleted (0, set);
-
- return 0;
- }
-
- // remove_io_handler() contains flag DONT_CALL
- // so it is save
task.remove_io_handler (fd);
ACE_WIN32_Asynch_Connect_Result* result = 0;
- if (this->result_map_.unbind (fd, result) != 0) // not found
- return -1;
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+ }
result->set_bytes_transferred (0);
result->set_error (ERROR_OPERATION_ABORTED);
diff --git a/ace/WIN32_Asynch_IO.h b/ace/WIN32_Asynch_IO.h
index 53e9f0ba873..3efd39417d2 100644
--- a/ace/WIN32_Asynch_IO.h
+++ b/ace/WIN32_Asynch_IO.h
@@ -1307,13 +1307,13 @@ private:
const ACE_Addr &local_sap,
int reuse_addr);
- int post_result (ACE_WIN32_Asynch_Connect_Result *result, int flg_post);
+ int post_result (ACE_WIN32_Asynch_Connect_Result *result, bool flg_post);
/// Cancel uncompleted connect operations.
/**
* @param flg_notify Indicates whether or not to send notification about
- * canceled connect operations. If 0, don't send
- * notifications. If 1, notify user about canceled
+ * canceled connect operations. If false, don't send
+ * notifications. If true, notify user about canceled
* connects.
* According WIN32 standards we should receive
* notifications on canceled AIO requests.
@@ -1323,36 +1323,22 @@ private:
* method. The contents of @a set are completely
* replaced.
*/
- int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set);
+ int cancel_uncompleted (bool flg_notify, ACE_Handle_Set &set);
- /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task
- /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
- int flg_open_ ;
-
- /// To prevent ACE_Asynch_Pseudo_Task from deletion
- /// while we make a call to the ACE_Asynch_Pseudo_Task
- /// This is extra cost !!!
- /// we could avoid them if all applications will follow the rule:
- /// Proactor should be deleted only after deletion all
- /// AsynchOperation objects connected with it
- int task_lock_count_;
+ /// true - Connect is registered in ACE_Asynch_Pseudo_Task
+ /// false - Accept is deregisted in ACE_Asynch_Pseudo_Task
+ bool flg_open_ ;
typedef ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
MAP_MANAGER;
- // (Two) Deprecated typedefs. Use appropriate MAP_MANAGER traits
- // instead.
- typedef MAP_MANAGER::ITERATOR MAP_ITERATOR;
- typedef MAP_MANAGER::ENTRY MAP_ENTRY;
-
/// Map of Result pointers that correspond to all the <accept>'s
/// pending.
MAP_MANAGER result_map_;
- /// The lock to protect the result queue which is shared. The queue
+ /// The lock to protect the result map which is shared. The queue
/// is updated by main thread in the register function call and
- /// through the auxillary thread in the deregister fun. So let us
- /// mutex it.
+ /// through the auxillary thread in the asynch pseudo task.
ACE_SYNCH_MUTEX lock_;
};