summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Huston <shuston@riverace.com>2002-04-16 22:57:11 +0000
committerSteve Huston <shuston@riverace.com>2002-04-16 22:57:11 +0000
commitf97a745c44cb491dc1760c7f18dc42af5680b4be (patch)
tree65ede0b735a5470c152d5355013f89974c0430f5
parent0b97fde98e72ccf1364dc54aa14385de769f0af7 (diff)
downloadATCD-f97a745c44cb491dc1760c7f18dc42af5680b4be.tar.gz
ChangeLogTag:Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com>
-rw-r--r--ChangeLog51
-rw-r--r--ChangeLogs/ChangeLog-02a51
-rw-r--r--ChangeLogs/ChangeLog-03a51
-rw-r--r--ace/Asynch_Connector.cpp297
-rw-r--r--ace/Asynch_Connector.h145
-rw-r--r--ace/Asynch_IO.cpp105
-rw-r--r--ace/Asynch_IO.h101
-rw-r--r--ace/Asynch_IO_Impl.cpp8
-rw-r--r--ace/Asynch_IO_Impl.h53
-rw-r--r--ace/Asynch_IO_Impl.i13
-rw-r--r--ace/Asynch_Pseudo_Task.cpp313
-rw-r--r--ace/Asynch_Pseudo_Task.h79
-rw-r--r--ace/Makefile2
-rw-r--r--ace/POSIX_Asynch_IO.cpp934
-rw-r--r--ace/POSIX_Asynch_IO.h317
-rw-r--r--ace/POSIX_Proactor.cpp52
-rw-r--r--ace/POSIX_Proactor.h27
-rw-r--r--ace/Proactor.cpp23
-rw-r--r--ace/Proactor.h19
-rw-r--r--ace/Proactor_Impl.h12
-rw-r--r--ace/SUN_Proactor.cpp4
-rw-r--r--ace/WIN32_Asynch_IO.cpp668
-rw-r--r--ace/WIN32_Asynch_IO.h226
-rw-r--r--ace/WIN32_Proactor.cpp46
-rw-r--r--ace/WIN32_Proactor.h20
25 files changed, 3226 insertions, 391 deletions
diff --git a/ChangeLog b/ChangeLog
index fd57322c2db..4ff9024aebe 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,44 @@
+Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com>
+
+ New feature, ACE_Asynch_Connect, contributed by Alex Libman
+ <alibman@ihug.com.au>. Allows asynchronous connect using the
+ ACE Proactor framework. The new classes follow the same arrangement
+ as the existing ACE_Asynch_Accept framework.
+
+ * ace/Asynch_Connector.{h cpp}: New files
+
+ * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and
+ its result. Added new method, ACE_Handler::handle_connect(), to
+ handle completion of asynch connect operations.
+
+ * ace/Asynch_IO_Impl.{h i cpp}: Added new classes
+ ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl.
+
+ * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles
+ asynch emulation where needed, for example, asynch accept/connect.
+ Replaces the ACE_Asynch_Accept_Task and used for both accept/connect.
+
+ * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task
+ (subsumed by ACE_Asynch_Pseudo_Task, above) and add the
+ ACE_POSIX_Asynch_Connect and its Result class.
+
+ * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing.
+
+ * ace/Proactor.{h cpp}: Added asynch connect support methods.
+
+ * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods.
+
+ * ace/SUN_Proactor.cpp: Change from asynch_accept_task to
+ asynch_pseudo_task.
+
+ * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and
+ _Result.
+
+ * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and
+ create_asynch_connect_result() methods.
+
+ * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task
+
Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu>
* ace/Service_Templates.cpp:
@@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
- Make it possible to get/set the timer queue based on pointers, the
get method with a & is still available, but is marked as deprecated
- Make the thr_id method const
- - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter
- then it is also deleted, if it is passed or set afterwards it isn't
- deleted by ACE_Thread_Timer_Queue_Adapter (just like in the
- ACE_Reactor).
+ - When the timer queue is created by the
+ ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is
+ passed or set afterwards it isn't deleted by
+ ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor).
Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
@@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com>
* ace/Asynch_Acceptor.cpp (parse_address): Set the entire address
(address and port) instead of just the IP address part.
- Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix.
+ Thanks to Alex Libman <alibman@ihug.com.au> for this fix.
Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a
index fd57322c2db..4ff9024aebe 100644
--- a/ChangeLogs/ChangeLog-02a
+++ b/ChangeLogs/ChangeLog-02a
@@ -1,3 +1,44 @@
+Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com>
+
+ New feature, ACE_Asynch_Connect, contributed by Alex Libman
+ <alibman@ihug.com.au>. Allows asynchronous connect using the
+ ACE Proactor framework. The new classes follow the same arrangement
+ as the existing ACE_Asynch_Accept framework.
+
+ * ace/Asynch_Connector.{h cpp}: New files
+
+ * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and
+ its result. Added new method, ACE_Handler::handle_connect(), to
+ handle completion of asynch connect operations.
+
+ * ace/Asynch_IO_Impl.{h i cpp}: Added new classes
+ ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl.
+
+ * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles
+ asynch emulation where needed, for example, asynch accept/connect.
+ Replaces the ACE_Asynch_Accept_Task and used for both accept/connect.
+
+ * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task
+ (subsumed by ACE_Asynch_Pseudo_Task, above) and add the
+ ACE_POSIX_Asynch_Connect and its Result class.
+
+ * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing.
+
+ * ace/Proactor.{h cpp}: Added asynch connect support methods.
+
+ * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods.
+
+ * ace/SUN_Proactor.cpp: Change from asynch_accept_task to
+ asynch_pseudo_task.
+
+ * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and
+ _Result.
+
+ * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and
+ create_asynch_connect_result() methods.
+
+ * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task
+
Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu>
* ace/Service_Templates.cpp:
@@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
- Make it possible to get/set the timer queue based on pointers, the
get method with a & is still available, but is marked as deprecated
- Make the thr_id method const
- - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter
- then it is also deleted, if it is passed or set afterwards it isn't
- deleted by ACE_Thread_Timer_Queue_Adapter (just like in the
- ACE_Reactor).
+ - When the timer queue is created by the
+ ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is
+ passed or set afterwards it isn't deleted by
+ ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor).
Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
@@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com>
* ace/Asynch_Acceptor.cpp (parse_address): Set the entire address
(address and port) instead of just the IP address part.
- Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix.
+ Thanks to Alex Libman <alibman@ihug.com.au> for this fix.
Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a
index fd57322c2db..4ff9024aebe 100644
--- a/ChangeLogs/ChangeLog-03a
+++ b/ChangeLogs/ChangeLog-03a
@@ -1,3 +1,44 @@
+Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com>
+
+ New feature, ACE_Asynch_Connect, contributed by Alex Libman
+ <alibman@ihug.com.au>. Allows asynchronous connect using the
+ ACE Proactor framework. The new classes follow the same arrangement
+ as the existing ACE_Asynch_Accept framework.
+
+ * ace/Asynch_Connector.{h cpp}: New files
+
+ * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and
+ its result. Added new method, ACE_Handler::handle_connect(), to
+ handle completion of asynch connect operations.
+
+ * ace/Asynch_IO_Impl.{h i cpp}: Added new classes
+ ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl.
+
+ * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles
+ asynch emulation where needed, for example, asynch accept/connect.
+ Replaces the ACE_Asynch_Accept_Task and used for both accept/connect.
+
+ * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task
+ (subsumed by ACE_Asynch_Pseudo_Task, above) and add the
+ ACE_POSIX_Asynch_Connect and its Result class.
+
+ * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing.
+
+ * ace/Proactor.{h cpp}: Added asynch connect support methods.
+
+ * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods.
+
+ * ace/SUN_Proactor.cpp: Change from asynch_accept_task to
+ asynch_pseudo_task.
+
+ * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and
+ _Result.
+
+ * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and
+ create_asynch_connect_result() methods.
+
+ * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task
+
Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu>
* ace/Service_Templates.cpp:
@@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
- Make it possible to get/set the timer queue based on pointers, the
get method with a & is still available, but is marked as deprecated
- Make the thr_id method const
- - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter
- then it is also deleted, if it is passed or set afterwards it isn't
- deleted by ACE_Thread_Timer_Queue_Adapter (just like in the
- ACE_Reactor).
+ - When the timer queue is created by the
+ ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is
+ passed or set afterwards it isn't deleted by
+ ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor).
Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl>
@@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com>
* ace/Asynch_Acceptor.cpp (parse_address): Set the entire address
(address and port) instead of just the IP address part.
- Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix.
+ Thanks to Alex Libman <alibman@ihug.com.au> for this fix.
Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org>
diff --git a/ace/Asynch_Connector.cpp b/ace/Asynch_Connector.cpp
new file mode 100644
index 00000000000..190a230f053
--- /dev/null
+++ b/ace/Asynch_Connector.cpp
@@ -0,0 +1,297 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef ACE_ASYNCH_CONNECTOR_C
+#define ACE_ASYNCH_CONNECTOR_C
+
+#include "ace/Asynch_Connector.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+ACE_RCSID(ace, Asynch_Connector, "$Id$")
+
+#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS)
+// This only works on platforms that support async I/O.
+
+#include "ace/Message_Block.h"
+#include "ace/INET_Addr.h"
+
+template <class HANDLER>
+const ACE_INET_Addr
+ACE_Asynch_Connector<HANDLER>::inet_addr_any_ = ACE_INET_Addr ( (unsigned short) 0, ACE_UINT32 (INADDR_ANY) );
+
+
+template <class HANDLER>
+ACE_Asynch_Connector<HANDLER>::ACE_Asynch_Connector (void)
+ : pass_addresses_ (0),
+ validate_new_connection_ (0)
+{
+}
+
+template <class HANDLER>
+ACE_Asynch_Connector<HANDLER>::~ACE_Asynch_Connector (void)
+{
+ //this->asynch_connect_.close ();
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::open (int pass_addresses,
+ ACE_Proactor *proactor,
+ int validate_new_connection)
+{
+ this->proactor (proactor);
+ this->pass_addresses_ = pass_addresses;
+ this->validate_new_connection_ = validate_new_connection;
+
+ // Initialize the ACE_Asynch_Connect
+ if (this->asynch_connect_.open (*this,
+ ACE_INVALID_HANDLE,
+ 0,
+ this->proactor ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connect::open")),
+ -1);
+ return 0;
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::connect (const ACE_INET_Addr & remote_sap,
+ const ACE_INET_Addr & local_sap,
+ int reuse_addr,
+ const void *act)
+{
+ // Initiate asynchronous connect
+ if (this->asynch_connect_.connect (ACE_INVALID_HANDLE,
+ remote_sap,
+ local_sap,
+ reuse_addr,
+ act) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connect::connect")),
+ -1);
+ return 0;
+}
+
+template <class HANDLER> void
+ACE_Asynch_Connector<HANDLER>::handle_connect (const ACE_Asynch_Connect::Result &result)
+{
+ // Variable for error tracking
+ int error = 0;
+
+ // If the asynchronous connect fails.
+ if (!result.success () ||
+ result.connect_handle () == ACE_INVALID_HANDLE)
+ {
+ error = 1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Invalid handle")));
+ }
+
+ if (result.error () != 0)
+ {
+ error = 1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : result error=%d\n"),
+ ACE_static_cast (int, result.error ())));
+ }
+
+ // set blocking mode
+ if (!error &&
+ ACE::clr_flags (result.connect_handle (), ACE_NONBLOCK) != 0)
+ {
+ error = 1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Set blocking mode")));
+ }
+
+ // Parse the addresses.
+ ACE_INET_Addr local_address;
+ ACE_INET_Addr remote_address;
+ if (!error &&
+ (this->validate_new_connection_ || this->pass_addresses_))
+ this->parse_address (result,
+ remote_address,
+ local_address);
+
+ // Validate remote address
+ if (!error &&
+ this->validate_new_connection_ &&
+ this->validate_new_connection (remote_address) == -1)
+ {
+ error = 1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Address validation failed")));
+ }
+
+ HANDLER *new_handler = 0;
+ if (!error)
+ {
+ // The Template method
+ new_handler = this->make_handler ();
+ if (new_handler == 0)
+ {
+ error = 1;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Making of new handler failed")));
+ }
+ }
+
+ // If no errors
+ if (!error)
+ {
+ // Update the Proactor.
+ new_handler->proactor (this->proactor ());
+
+ // Pass the addresses
+ if (this->pass_addresses_)
+ new_handler->addresses (remote_address,
+ local_address);
+
+ // Pass the ACT
+ if (result.act () != 0)
+ new_handler->act (result.act ());
+
+ // Set up the handler's new handle value
+ new_handler->handle (result.connect_handle ());
+
+ ACE_Message_Block mb;
+
+ // Initiate the handler with empty message block;
+ new_handler->open (result.connect_handle (), mb);
+ }
+
+ // On failure, no choice but to close the socket
+ if (error &&
+ result.connect_handle() != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (result.connect_handle ());
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::validate_new_connection (const ACE_INET_Addr &remote_address)
+{
+ ACE_UNUSED_ARG (remote_address);
+
+ // Default implemenation always validates the remote address.
+ return 0;
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::cancel (void)
+{
+ return this->asynch_connect_.cancel ();
+}
+
+template <class HANDLER> void
+ACE_Asynch_Connector<HANDLER>::parse_address (const ACE_Asynch_Connect::Result &result,
+ ACE_INET_Addr &remote_address,
+ ACE_INET_Addr &local_address)
+{
+ // Getting the addresses.
+ sockaddr_in local_addr;
+ sockaddr_in remote_addr;
+
+ // Get the length.
+ int local_size = sizeof (local_addr);
+ int remote_size = sizeof (remote_addr);
+
+ // Get the local address.
+ if (ACE_OS::getsockname (result.connect_handle (),
+ ACE_reinterpret_cast (sockaddr *,
+ &local_addr),
+ &local_size) < 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("%p\n"),
+ ACE_LIB_TEXT("ACE_Asynch_Connector::<getsockname> failed")));
+
+ // Get the remote address.
+ if (ACE_OS::getpeername (result.connect_handle (),
+ ACE_reinterpret_cast (sockaddr *,
+ &remote_addr),
+ &remote_size) < 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("%p\n"),
+ ACE_LIB_TEXT("ACE_Asynch_Connector::<getpeername> failed")));
+
+ // Set the addresses.
+ local_address.set (&local_addr, local_size);
+ remote_address.set (&remote_addr, remote_size);
+
+#if 0
+ // @@ Just debugging.
+ char local_address_buf [BUFSIZ];
+ char remote_address_buf [BUFSIZ];
+
+ if (local_address.addr_to_string (local_address_buf,
+ sizeof local_address_buf) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:%p:can't obtain local_address's address string"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Connector<HANDLER>::parse_address : "\
+ "Local address %s\n",
+ local_address_buf));
+
+ if (remote_address.addr_to_string (remote_address_buf,
+ sizeof remote_address_buf) == -1)
+ ACE_ERROR ((LM_ERROR,
+ "Error:%p:can't obtain remote_address's address string"));
+
+ ACE_DEBUG ((LM_DEBUG,
+ "ACE_Asynch_Connector<HANDLER>::parse_address : "\
+ "Remote address %s\n",
+ remote_address_buf));
+#endif /* 0 */
+
+ return;
+}
+
+
+template <class HANDLER> ACE_Asynch_Connect &
+ACE_Asynch_Connector<HANDLER>::asynch_connect (void)
+{
+ return this->asynch_connect_;
+}
+
+template <class HANDLER> HANDLER *
+ACE_Asynch_Connector<HANDLER>::make_handler (void)
+{
+ // Default behavior
+ HANDLER *handler = 0;
+ ACE_NEW_RETURN (handler, HANDLER, 0);
+ return handler;
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::pass_addresses (void) const
+{
+ return this->pass_addresses_;
+}
+
+template <class HANDLER> void
+ACE_Asynch_Connector<HANDLER>::pass_addresses (int new_value)
+{
+ this->pass_addresses_ = new_value;
+}
+
+template <class HANDLER> int
+ACE_Asynch_Connector<HANDLER>::validate_new_connection (void) const
+{
+ return this->validate_new_connection_;
+}
+
+template <class HANDLER> void
+ACE_Asynch_Connector<HANDLER>::validate_new_connection (int new_value)
+{
+ this->validate_new_connection_ = new_value;
+}
+
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */
+#endif /* ACE_ASYNCH_CONNECTOR_C */
diff --git a/ace/Asynch_Connector.h b/ace/Asynch_Connector.h
new file mode 100644
index 00000000000..e652f2ee7f6
--- /dev/null
+++ b/ace/Asynch_Connector.h
@@ -0,0 +1,145 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file Asynch_Connector.h
+ *
+ * $Id$
+ *
+ * @author Alexander Libman <alibman@ihug.com.au>
+ */
+//=============================================================================
+
+#ifndef ACE_ASYNCH_CONNECTOR_H
+#define ACE_ASYNCH_CONNECTOR_H
+#include "ace/pre.h"
+
+#include "ace/OS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+# pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS)
+// This only works on platforms that support async i/o.
+
+#include "ace/Asynch_IO.h"
+#include "ace/INET_Addr.h"
+
+// Forward declarations
+class ACE_Message_Block;
+
+/**
+ * @class ACE_Asynch_Connector
+ *
+ * @brief This class is an example of the Connector pattern. This class
+ * will establish new connections and create new HANDLER objects to handle
+ * the new connections.
+ *
+ * Unlike the ACE_Connector, however, this class is designed to
+ * be used asynchronously with the ACE Proactor framework.
+ */
+
+template <class HANDLER>
+class ACE_Asynch_Connector : public ACE_Handler
+{
+public:
+
+ static const ACE_INET_Addr inet_addr_any_;
+
+ /// A do nothing constructor.
+ ACE_Asynch_Connector (void);
+
+ /// Virtual destruction
+ virtual ~ACE_Asynch_Connector (void);
+
+ /**
+ * This opens asynch connector
+ */
+ virtual int open (int pass_addresses = 0,
+ ACE_Proactor *proactor = 0,
+ int validate_new_connection = 0);
+
+ /// This initiates a new asynchronous connect
+ virtual int connect (const ACE_INET_Addr & remote_sap,
+ const ACE_INET_Addr & local_sap = inet_addr_any_,
+ int reuse_addr = 1,
+ const void *act = 0);
+
+ /**
+ * This cancels all pending accepts operations that were issued by
+ * the calling thread.
+ *
+ * @note On Windows, this method does not cancel connect operations
+ * issued by other threads.
+ *
+ * @note On POSIX, delegates cancelation to ACE_POSIX_Asynch_Connect.
+ */
+ virtual int cancel (void);
+
+ /**
+ * Template method for address validation.
+ *
+ * Default implemenation always validates the remote address.
+ */
+ virtual int validate_new_connection (const ACE_INET_Addr &remote_address);
+
+ //
+ // These are low level tweaking methods
+ //
+
+ /// Set and get flag that indicates if parsing and passing of
+ /// addresses to the service_handler is necessary.
+ virtual int pass_addresses (void) const;
+ virtual void pass_addresses (int new_value);
+
+ /// Set and get flag that indicates if address validation is
+ /// required.
+ virtual int validate_new_connection (void) const;
+ virtual void validate_new_connection (int new_value);
+
+protected:
+
+ /// This is called when an outstanding accept completes.
+ virtual void handle_connect (const ACE_Asynch_Connect::Result &result);
+
+
+ /// This parses the address from read buffer.
+ void parse_address (const ACE_Asynch_Connect::Result &result,
+ ACE_INET_Addr &remote_address,
+ ACE_INET_Addr &local_address);
+
+ /// Return the asynch Connect object.
+ ACE_Asynch_Connect & asynch_connect (void);
+
+ /**
+ * This is the template method used to create new handler.
+ * Subclasses must overwrite this method if a new handler creation
+ * strategy is required.
+ */
+ virtual HANDLER *make_handler (void);
+
+private:
+
+ /// Asynch_Connect used to make life easier :-)
+ ACE_Asynch_Connect asynch_connect_;
+
+ /// Flag that indicates if parsing of addresses is necessary.
+ int pass_addresses_;
+
+ /// Flag that indicates if address validation is required.
+ int validate_new_connection_;
+
+};
+
+#if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
+#include "ace/Asynch_Connector.cpp"
+#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
+
+#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA)
+#pragma implementation ("Asynch_Connector.cpp")
+#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
+
+#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */
+#include "ace/post.h"
+#endif /* ACE_ASYNCH_CONNECTOR_H */
diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp
index 4fa2b269a90..06e5a952f5b 100644
--- a/ace/Asynch_IO.cpp
+++ b/ace/Asynch_IO.cpp
@@ -661,6 +661,106 @@ ACE_Asynch_Accept::Result::implementation (void) const
return this->implementation_;
}
+
+
+// *********************************************************************
+
+ACE_Asynch_Connect::ACE_Asynch_Connect (void)
+ : implementation_ (0)
+{
+}
+
+ACE_Asynch_Connect::~ACE_Asynch_Connect (void)
+{
+}
+
+int
+ACE_Asynch_Connect::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ // Get a proactor for/from the user.
+ proactor = this->get_proactor (proactor, handler);
+
+ // Delete the old implementation.
+ delete this->implementation_;
+ this->implementation_ = 0;
+
+ // Now let us get the implementation initialized.
+ ACE_Asynch_Connect_Impl *implementation = proactor->create_asynch_connect ();
+ if (implementation == 0)
+ return -1;
+
+ // Set the implementation class
+ this->implementation (implementation);
+
+ // Call the <open> method of the base class.
+ return ACE_Asynch_Operation::open (handler,
+ handle,
+ completion_key,
+ proactor);
+}
+
+int
+ACE_Asynch_Connect::connect (ACE_HANDLE connect_handle,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number)
+{
+ return this->implementation ()->connect (connect_handle,
+ remote_sap,
+ local_sap,
+ reuse_addr,
+ act,
+ priority,
+ signal_number);
+}
+
+ACE_Asynch_Connect_Impl *
+ACE_Asynch_Connect::implementation (void) const
+{
+ return this->implementation_;
+}
+
+void
+ACE_Asynch_Connect::implementation (ACE_Asynch_Connect_Impl *implementation)
+{
+ this->implementation_ = implementation;
+ // Set the implementation in the base class.
+ ACE_Asynch_Operation::implementation (implementation);
+}
+
+// ************************************************************
+
+ACE_Asynch_Connect::Result::Result (ACE_Asynch_Connect_Result_Impl *implementation)
+ : ACE_Asynch_Result (implementation),
+ implementation_ (implementation)
+{
+}
+
+ACE_Asynch_Connect::Result::~Result (void)
+{
+ // Proactor will delete the implementation when the <complete> call
+ // completes.
+}
+
+ACE_HANDLE
+ACE_Asynch_Connect::Result::connect_handle (void) const
+{
+ return this->implementation ()->connect_handle ();
+}
+
+
+ACE_Asynch_Connect_Result_Impl *
+ACE_Asynch_Connect::Result::implementation (void) const
+{
+ return this->implementation_;
+}
+
// ************************************************************
ACE_Asynch_Transmit_File::ACE_Asynch_Transmit_File (void)
@@ -950,6 +1050,11 @@ ACE_Handler::handle_accept (const ACE_Asynch_Accept::Result & /* result */)
}
void
+ACE_Handler::handle_connect (const ACE_Asynch_Connect::Result & /* result */)
+{
+}
+
+void
ACE_Handler::handle_transmit_file (const ACE_Asynch_Transmit_File::Result & /* result */)
{
}
diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h
index 96a6f99dc2b..6976955fc84 100644
--- a/ace/Asynch_IO.h
+++ b/ace/Asynch_IO.h
@@ -19,6 +19,7 @@
* @author Tim Harrison <harrison@cs.wustl.edu>
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
* @author Roger Tragin <r.tragin@computer.org>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -41,6 +42,7 @@ class ACE_Proactor;
class ACE_Handler;
class ACE_Message_Block;
class ACE_INET_Addr;
+class ACE_Addr;
// Forward declarations
class ACE_Asynch_Result_Impl;
@@ -822,6 +824,102 @@ public:
ACE_Asynch_Accept_Result_Impl *implementation_;
};
};
+// Forward declarations
+class ACE_Asynch_Connect_Result_Impl;
+class ACE_Asynch_Connect_Impl;
+
+/**
+ * @class ACE_Asynch_Connect
+ *
+ * @brief This class is a factory for starting off asynchronous connects
+ * This class forwards all methods to its implementation class.
+ *
+ * Once @c open is called, multiple asynchronous connect operationss can
+ * started using this class. A ACE_Asynch_Connect::Result will
+ * be passed back to the associated ACE_Handler when the asynchronous connect
+ * completes through the ACE_Handler::handle_connect() callback.
+ */
+class ACE_Export ACE_Asynch_Connect : public ACE_Asynch_Operation
+{
+
+public:
+ /// A do nothing constructor.
+ ACE_Asynch_Connect (void);
+
+ /// Destructor.
+ virtual ~ACE_Asynch_Connect (void);
+
+ /**
+ * Initializes the factory with information which will be used with
+ * each asynchronous call.
+ *
+ * @note @arg handle is ignored and should be @c ACE_INVALID_HANDLE.
+ */
+ int open (ACE_Handler &handler,
+ ACE_HANDLE handle = ACE_INVALID_HANDLE,
+ const void *completion_key = 0,
+ ACE_Proactor *proactor = 0);
+
+ /**
+ * This starts off an asynchronous Connect.
+ */
+ int connect (ACE_HANDLE connect_handle,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr,
+ const void *act=0,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
+
+ /// Return the underlying implementation class.
+ ACE_Asynch_Connect_Impl *implementation (void) const;
+
+protected:
+ /// Set the implementation class.
+ void implementation (ACE_Asynch_Connect_Impl *implementation);
+
+ /// Delegation/implementation class that all methods will be
+ /// forwarded to.
+ ACE_Asynch_Connect_Impl *implementation_;
+
+public:
+/**
+ * @class Result
+ *
+ * @brief This is that class which will be passed back to the
+ * handler when the asynchronous connect completes.
+ *
+ * This class has all the information necessary for the
+ * handler to uniquely identify the completion of the
+ * asynchronous connect.
+ */
+ class ACE_Export Result : public ACE_Asynch_Result
+ {
+
+ /// The concrete implementation result classes only construct this
+ /// class.
+ friend class ACE_POSIX_Asynch_Connect_Result;
+ friend class ACE_WIN32_Asynch_Connect_Result;
+
+ public:
+
+ /// I/O handle for the connection.
+ ACE_HANDLE connect_handle (void) const;
+
+ /// Get the implementation.
+ ACE_Asynch_Connect_Result_Impl *implementation (void) const;
+
+ protected:
+ /// Contructor. Implementation will not be deleted.
+ Result (ACE_Asynch_Connect_Result_Impl *implementation);
+
+ /// Destructor.
+ virtual ~Result (void);
+
+ /// Impelmentation class.
+ ACE_Asynch_Connect_Result_Impl *implementation_;
+ };
+};
// Forward declarations
class ACE_Asynch_Transmit_File_Result_Impl;
@@ -1345,6 +1443,9 @@ public:
/// This method will be called when an asynchronous accept completes.
virtual void handle_accept (const ACE_Asynch_Accept::Result &result);
+ /// This method will be called when an asynchronous connect completes.
+ virtual void handle_connect (const ACE_Asynch_Connect::Result &result);
+
/// This method will be called when an asynchronous transmit file
/// completes.
virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result);
diff --git a/ace/Asynch_IO_Impl.cpp b/ace/Asynch_IO_Impl.cpp
index e3796fead53..8f26d82ae4b 100644
--- a/ace/Asynch_IO_Impl.cpp
+++ b/ace/Asynch_IO_Impl.cpp
@@ -55,10 +55,18 @@ ACE_Asynch_Accept_Result_Impl::~ACE_Asynch_Accept_Result_Impl (void)
{
}
+ACE_Asynch_Connect_Result_Impl::~ACE_Asynch_Connect_Result_Impl (void)
+{
+}
+
ACE_Asynch_Accept_Impl::~ACE_Asynch_Accept_Impl (void)
{
}
+ACE_Asynch_Connect_Impl::~ACE_Asynch_Connect_Impl (void)
+{
+}
+
ACE_Asynch_Transmit_File_Impl::~ACE_Asynch_Transmit_File_Impl (void)
{
}
diff --git a/ace/Asynch_IO_Impl.h b/ace/Asynch_IO_Impl.h
index b49a827d8ba..8fcee5b7668 100644
--- a/ace/Asynch_IO_Impl.h
+++ b/ace/Asynch_IO_Impl.h
@@ -16,6 +16,7 @@
* @author Tim Harrison (harrison@cs.wustl.edu)
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
* @author Roger Tragin <r.tragin@computer.org>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -439,6 +440,58 @@ protected:
ACE_Asynch_Accept_Result_Impl (void);
};
+
+/**
+ * @class ACE_Asynch_Connect_Impl
+ *
+ * @brief Abstract base class for all the concrete implementation
+ * classes that provide different implementations for the
+ * ACE_Asynch_Connect.
+ *
+ */
+class ACE_Export ACE_Asynch_Connect_Impl : public virtual ACE_Asynch_Operation_Impl
+{
+public:
+ virtual ~ACE_Asynch_Connect_Impl (void);
+
+ /**
+ * This starts off an asynchronous connect
+ */
+ virtual int connect (ACE_HANDLE connect_handle,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number) = 0;
+
+protected:
+ /// Do-nothing constructor.
+ ACE_Asynch_Connect_Impl (void);
+};
+
+/**
+ * @class ACE_Asynch_Connect_Result_Impl
+ *
+ * @brief Abstract base class for all the concrete implementation
+ * classes that provide different implementations for the
+ * ACE_Asynch_Connect.
+ *
+ */
+class ACE_Export ACE_Asynch_Connect_Result_Impl : public virtual ACE_Asynch_Result_Impl
+{
+public:
+ virtual ~ACE_Asynch_Connect_Result_Impl (void);
+
+ /// I/O handle for the connection.
+ virtual ACE_HANDLE connect_handle (void) const = 0;
+
+protected:
+ /// Do-nothing constructor.
+ ACE_Asynch_Connect_Result_Impl (void);
+};
+
+
/**
* @class ACE_Asynch_Transmit_File_Impl
*
diff --git a/ace/Asynch_IO_Impl.i b/ace/Asynch_IO_Impl.i
index a6c36925943..3d61fae7942 100644
--- a/ace/Asynch_IO_Impl.i
+++ b/ace/Asynch_IO_Impl.i
@@ -75,6 +75,19 @@ ACE_Asynch_Accept_Result_Impl::ACE_Asynch_Accept_Result_Impl (void)
}
ACE_INLINE
+ACE_Asynch_Connect_Impl::ACE_Asynch_Connect_Impl (void)
+ : ACE_Asynch_Operation_Impl ()
+{
+}
+
+ACE_INLINE
+ACE_Asynch_Connect_Result_Impl::ACE_Asynch_Connect_Result_Impl (void)
+ : ACE_Asynch_Result_Impl ()
+{
+}
+
+
+ACE_INLINE
ACE_Asynch_Transmit_File_Impl::ACE_Asynch_Transmit_File_Impl (void)
: ACE_Asynch_Operation_Impl ()
{
diff --git a/ace/Asynch_Pseudo_Task.cpp b/ace/Asynch_Pseudo_Task.cpp
new file mode 100644
index 00000000000..b00450da517
--- /dev/null
+++ b/ace/Asynch_Pseudo_Task.cpp
@@ -0,0 +1,313 @@
+// $Id$
+
+#include "ace/Asynch_Pseudo_Task.h"
+
+ACE_RCSID(ace, Asynch_Pseudo_Task, "$Id$")
+
+ACE_Asynch_Pseudo_Task::ACE_Asynch_Pseudo_Task()
+ : flg_active_ (0),
+ select_reactor_ (), // should be initialized before reactor_
+ reactor_ (&select_reactor_, 0), // don't delete implementation
+ token_ (select_reactor_.lock ()), // we can use reactor token
+ finish_count_ (0)
+{
+}
+
+ACE_Asynch_Pseudo_Task::~ACE_Asynch_Pseudo_Task()
+{
+ stop();
+}
+
+int
+ACE_Asynch_Pseudo_Task::is_active (void)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ return flg_active_;
+}
+
+int
+ACE_Asynch_Pseudo_Task::start (void)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start already started")),
+ -1);
+
+ if (this->reactor_.initialized () == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start reactor is not initialized")),
+ -1);
+
+
+ if (this->activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start failed")),
+ -1);
+
+ this->flg_active_ = 1;
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::stop (void)
+{
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0) // already stopped
+ return 0;
+
+ reactor_.end_reactor_event_loop ();
+ }
+
+ int rc = this->wait ();
+
+ if (rc != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:%p\n"),
+ ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::stop failed")),
+ -1);
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ this->flg_active_ = 0;
+
+ if (this->reactor_.initialized ())
+ this->reactor_.close ();
+
+ while (finish_count_ > 0)
+ {
+ ace_mon.release ();
+ finish_event_.wait ();
+
+ ace_mon.acquire ();
+ finish_event_.reset ();
+ }
+ }
+
+ return rc;
+}
+
+int
+ACE_Asynch_Pseudo_Task::lock_finish (void)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ finish_count_ ++;
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::unlock_finish (void)
+{
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ --finish_count_;
+ finish_event_.signal ();
+
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::svc (void)
+{
+#if !defined (ACE_WIN32)
+
+ sigset_t RT_signals;
+
+ if (sigemptyset (&RT_signals) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT ("sigemptyset failed")));
+
+ int member = 0;
+
+ for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
+ {
+ member = sigismember (& RT_signals , si);
+ if (member == 1)
+ {
+ sigaddset (&RT_signals, si);
+ }
+ }
+
+ if (ACE_OS::pthread_sigmask (SIG_BLOCK, &RT_signals, 0) != 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT ("pthread_sigmask failed")));
+#endif
+
+ reactor_.owner (ACE_Thread::self());
+
+ reactor_.run_reactor_event_loop ();
+
+ return 0;
+}
+
+
+
+int
+ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle,
+ ACE_Event_Handler *handler,
+ ACE_Reactor_Mask mask,
+ int flg_suspend)
+{
+ // Return codes :
+ // 0 success
+ // -1 reactor errors
+ // -2 task not active
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n")
+ ACE_LIB_TEXT ("task not active \n")),
+ -2);
+
+ // Register the handler with the reactor.
+ int retval = this->reactor_.register_handler (handle, handler, mask);
+
+ if (retval == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n")
+ ACE_LIB_TEXT ("register_handler failed \n")),
+ -1);
+
+ if (flg_suspend == 0 )
+ return 0;
+
+ // Suspend the <handle> now. Enable only when the <accept> is issued
+ // by the application.
+ retval = this->reactor_.suspend_handler (handle);
+
+ if (retval == -1)
+ {
+ this->reactor_.remove_handler (handle,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n")
+ ACE_LIB_TEXT ("suspend_handler failed \n")),
+ -1);
+ }
+
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_HANDLE handle)
+{
+ // Return codes :
+ // 0 success
+ // -1 reactor errors
+ // -2 task not active
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n")
+ ACE_LIB_TEXT ("task not active \n")),
+ -2);
+
+ int retval =
+ this->reactor_.remove_handler (handle ,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ if (retval == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n")
+ ACE_LIB_TEXT ("remove_handler failed \n")),
+ -1);
+
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_Handle_Set &set)
+{
+ // Return codes :
+ // 0 success
+ // -1 reactor errors
+ // -2 task not active
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n")
+ ACE_LIB_TEXT ("task not active \n")),
+ -2);
+
+ int retval =
+ this->reactor_.remove_handler (set ,
+ ACE_Event_Handler::ALL_EVENTS_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ if (retval == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n")
+ ACE_LIB_TEXT ("remove_handler failed \n")),
+ -1);
+
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::suspend_io_handler (ACE_HANDLE handle)
+{
+ // Return codes :
+ // 0 success
+ // -1 reactor errors
+ // -2 task not active
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::suspend_io_handler \n")
+ ACE_LIB_TEXT ("task not active \n")),
+ -2);
+
+ int retval = this->reactor_.suspend_handler (handle);
+
+ if (retval == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::suspend_io_handler \n")
+ ACE_LIB_TEXT ("suspend_handler failed \n")),
+ -1);
+
+ return 0;
+}
+
+int
+ACE_Asynch_Pseudo_Task::resume_io_handler (ACE_HANDLE handle)
+{
+ // Return codes :
+ // 0 success
+ // -1 reactor errors
+ // -2 task not active
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+
+ if (this->flg_active_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::resume_io_handler \n")
+ ACE_LIB_TEXT ("task not active \n")),
+ -2);
+
+ int retval = this->reactor_.resume_handler (handle);
+
+ if (retval == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::resume_io_handler \n")
+ ACE_LIB_TEXT ("resume_handler failed \n")),
+ -1);
+
+ return 0;
+}
diff --git a/ace/Asynch_Pseudo_Task.h b/ace/Asynch_Pseudo_Task.h
new file mode 100644
index 00000000000..66d1866a664
--- /dev/null
+++ b/ace/Asynch_Pseudo_Task.h
@@ -0,0 +1,79 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file Asynch_Pseudo_Task.h
+ *
+ * $Id$
+ *
+ * @author Alexander Libman <alibman@ihug.com.au>
+ */
+//=============================================================================
+
+#ifndef ACE_ASYNCH_PSEUDO_TASK_H
+#define ACE_ASYNCH_PSEUDO_TASK_H
+#include "ace/pre.h"
+
+#include "ace/OS.h"
+
+#if !defined (ACE_LACKS_PRAGMA_ONCE)
+#pragma once
+#endif /* ACE_LACKS_PRAGMA_ONCE */
+
+#include "ace/Reactor.h"
+#include "ace/Select_Reactor.h"
+#include "ace/Task.h"
+
+
+/**
+ * @class ACE_Asynch_Pseudo_Task
+ *
+ */
+class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_SYNCH>
+{
+ friend class ACE_POSIX_Asynch_Accept;
+ friend class ACE_POSIX_Asynch_Connect;
+ friend class ACE_WIN32_Asynch_Connect;
+
+public:
+
+ ACE_Asynch_Pseudo_Task();
+ virtual ~ACE_Asynch_Pseudo_Task();
+
+ int start (void);
+ int stop (void);
+
+ virtual int svc (void);
+
+ int is_active (void);
+
+ int register_io_handler (ACE_HANDLE handle,
+ ACE_Event_Handler *handler,
+ ACE_Reactor_Mask mask,
+ int flg_suspend);
+
+ int remove_io_handler (ACE_HANDLE handle);
+ int remove_io_handler (ACE_Handle_Set &set);
+ int resume_io_handler (ACE_HANDLE handle);
+ int suspend_io_handler (ACE_HANDLE handle);
+
+protected:
+
+ int lock_finish (void);
+ int unlock_finish (void);
+
+ int flg_active_;
+
+ ACE_Select_Reactor select_reactor_;
+ // should be initialized before reactor_
+
+ ACE_Reactor reactor_;
+
+ ACE_Lock &token_;
+
+ int finish_count_;
+ ACE_Manual_Event finish_event_;
+};
+
+#include "ace/post.h"
+#endif /* ACE_ASYNCH_PSEUDO_TASK_H */
diff --git a/ace/Makefile b/ace/Makefile
index c3bf0d7fd43..9ebf7fce847 100644
--- a/ace/Makefile
+++ b/ace/Makefile
@@ -109,6 +109,7 @@ DEMUX_FILES = \
CONNECTION_FILES = \
Asynch_IO \
Asynch_IO_Impl \
+ Asynch_Pseudo_Task \
POSIX_Asynch_IO \
WIN32_Asynch_IO
SOCKETS_FILES = \
@@ -252,6 +253,7 @@ TEMPLATE_FILES = \
Unbounded_Set \
Unbounded_Queue \
Asynch_Acceptor \
+ Asynch_Connector \
Auto_IncDec_T \
Auto_Ptr \
Based_Pointer_T \
diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp
index dad2e902dd6..4195d339830 100644
--- a/ace/POSIX_Asynch_IO.cpp
+++ b/ace/POSIX_Asynch_IO.cpp
@@ -8,7 +8,7 @@
#include "ace/Proactor.h"
#include "ace/Message_Block.h"
#include "ace/INET_Addr.h"
-#include "ace/Task_T.h"
+#include "ace/Asynch_Pseudo_Task.h"
#include "ace/POSIX_Proactor.h"
#if !defined (__ACE_INLINE__)
@@ -1120,6 +1120,7 @@ ACE_POSIX_Asynch_Write_File::proactor (void) const
// *********************************************************************
+
u_long
ACE_POSIX_Asynch_Accept_Result::bytes_to_read (void) const
{
@@ -1273,6 +1274,7 @@ ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_AIOCB_Proactor * pos
ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void)
{
this->close ();
+ this->reactor(0); // to avoid purge_pending_notifications
}
ACE_Proactor *
@@ -1302,19 +1304,19 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler,
{
ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::open\n"));
- int result = 0;
+ int result=0;
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- // If we are already opened, we could not create a new handler
- // without closing the previous.
+ // if we are already opened,
+ // we could not create a new handler without closing the previous
if (this->flg_open_ != 0)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
ACE_LIB_TEXT("acceptor already open \n")),
-1);
-
+
result = ACE_POSIX_Asynch_Operation::open (handler,
handle,
completion_key,
@@ -1326,24 +1328,24 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler,
task_lock_count_++;
- // At this moment asynch_accept_task does not know about us, so we
- // can lock task's token with our lock_ locked. In all other cases
- // we should release our lock_ before calling task's methods to
- // avoid deadlock
-
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
-
- result = task.register_acceptor (this, ACE_Event_Handler::ACCEPT_MASK);
+ // At this moment asynch_accept_task does not know about us,
+ // so we can lock task's token with our lock_ locked.
+ // In all other cases we should release our lock_ before
+ // calling task's methods to avoid deadlock
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor()->get_asynch_pseudo_task();
+
+ result = task.register_io_handler (this->get_handle(),
+ this,
+ ACE_Event_Handler::ACCEPT_MASK,
+ 1); // suspend after register
task_lock_count_-- ;
if (result < 0)
{
-
this->flg_open_= 0;
this->handle_ = ACE_INVALID_HANDLE;
-
return -1 ;
}
@@ -1352,22 +1354,22 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler,
int
ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
- u_long bytes_to_read,
- ACE_HANDLE accept_handle,
- const void *act,
- int priority,
- int signal_number)
+ u_long bytes_to_read,
+ ACE_HANDLE accept_handle,
+ const void *act,
+ int priority,
+ int signal_number)
{
- ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n") );
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n"));
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- if (this->flg_open_ == 0 )
+ if (this->flg_open_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
- ACE_LIB_TEXT("acceptor was not opened before\n")),
- -1);
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
+ ACE_LIB_TEXT("acceptor was not opened before\n")),
+ -1);
// Sanity check: make sure that enough space has been allocated by
// the caller.
@@ -1380,7 +1382,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
if (available_space < space_needed)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_LIB_TEXT ("Buffer too small\n")),
- -1);
+ -1);
// Common code for both WIN and POSIX.
// Create future Asynch_Accept_Result
@@ -1400,14 +1402,14 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
// Enqueue result
if (this->result_queue_.enqueue_tail (result) == -1)
{
- delete result; // to avoid memory leak
+ delete result; // to avoid memory leak
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:")
- ACE_LIB_TEXT("enqueue accept call failed\n")),
- -1);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:")
+ ACE_LIB_TEXT("enqueue accept call failed\n")),
+ -1);
}
-
+
if (this->result_queue_.size () > 1)
return 0;
@@ -1417,10 +1419,10 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
// If this is the only item, then it means there the set was empty
// before. So enable the <handle> in the reactor.
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.resume_acceptor (this);
+ int rc_task = task.resume_io_handler (this->get_handle());
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
@@ -1428,8 +1430,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
task_lock_count_ --;
if (rc_task == -2 && task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
-
+ task.unlock_finish ();
}
if (rc_task < 0)
@@ -1458,31 +1459,31 @@ ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify)
int retval = 0;
for (; ; retval++)
- {
- ACE_POSIX_Asynch_Accept_Result* result = 0;
-
- this->result_queue_.dequeue_head (result);
+ {
+ ACE_POSIX_Asynch_Accept_Result* result = 0;
- if (result == 0)
- break;
+ this->result_queue_.dequeue_head (result);
- if (this->flg_open_==0 || flg_notify == 0) //if we should not notify
- delete result ; // we have to delete result
- else //else notify as any cancelled AIO
- {
- // Store the new handle.
- result->aio_fildes = ACE_INVALID_HANDLE ;
- result->set_bytes_transferred (0);
- result->set_error (ECANCELED);
+ if (result == 0)
+ break;
- if (this->posix_proactor()->post_completion (result) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
- ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::")
- ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed")
- ));
+ if (this->flg_open_ == 0 || flg_notify == 0) //if we should not notify
+ delete result ; // we have to delete result
+ else //else notify as any cancelled AIO
+ {
+ // Store the new handle.
+ result->aio_fildes = ACE_INVALID_HANDLE ;
+ result->set_bytes_transferred (0);
+ result->set_error (ECANCELED);
+
+ if (this->posix_proactor ()->post_completion (result) == -1)
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::")
+ ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed")
+ ));
+ }
}
- }
return retval;
}
@@ -1497,7 +1498,7 @@ ACE_POSIX_Asynch_Accept::cancel (void)
//return ACE_POSIX_Asynch_Operation::cancel ();
//We delegate real cancelation to cancel_uncompleted (1)
- int rc = -1 ; // ERRORS
+ int rc = -1 ; // ERRORS
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
@@ -1511,14 +1512,14 @@ ACE_POSIX_Asynch_Accept::cancel (void)
if (this->flg_open_ == 0)
return rc ;
-
- task_lock_count_ ++;
+
+ task_lock_count_++;
}
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
-
- int rc_task = task.suspend_acceptor (this);
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+
+ int rc_task = task.suspend_io_handler (this->get_handle());
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
@@ -1527,7 +1528,6 @@ ACE_POSIX_Asynch_Accept::cancel (void)
if (rc_task == -2 && task_lock_count_ == 0) // task is closing
task.unlock_finish ();
-
}
return rc;
@@ -1539,7 +1539,7 @@ ACE_POSIX_Asynch_Accept::close ()
ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::close\n"));
// 1. It performs cancellation of all pending requests
- // 2. Removes itself from Reactor (ACE_POSIX_Asynch_Accept_Task)
+ // 2. Removes itself from Reactor ( ACE_Asynch_Pseudo_Task)
// 3. close the socket
//
// Parameter flg_notify can be
@@ -1551,7 +1551,6 @@ ACE_POSIX_Asynch_Accept::close ()
// Return codes : 0 - OK ,
// -1 - Errors
-
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
@@ -1562,7 +1561,7 @@ ACE_POSIX_Asynch_Accept::close ()
if (this->handle_ != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle_);
- this->handle_=ACE_INVALID_HANDLE;
+ this->handle_ = ACE_INVALID_HANDLE;
}
return 0;
}
@@ -1570,10 +1569,10 @@ ACE_POSIX_Asynch_Accept::close ()
task_lock_count_++;
}
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
- int rc_task = task.remove_acceptor (this);
+ int rc_task = task.remove_io_handler (this->get_handle ());
{
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
@@ -1581,15 +1580,15 @@ ACE_POSIX_Asynch_Accept::close ()
task_lock_count_--;
if (rc_task == -2 && task_lock_count_ == 0) // task is closing
- task.unlock_finish ();
-
+ task.unlock_finish ();
+
if (this->handle_ != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle_);
- this->handle_=ACE_INVALID_HANDLE;
+ this->handle_ = ACE_INVALID_HANDLE;
}
- this->flg_open_=0;
+ this->flg_open_ = 0;
}
return 0;
@@ -1600,26 +1599,26 @@ ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close
{
ACE_UNUSED_ARG (handle);
ACE_UNUSED_ARG (close_mask);
-
- ACE_TRACE(ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n"));
+
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n"));
ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
// handle_close is called only in one case :
- // when Asynch_accept_task is closing (i.e. proactor destructor)
+ // when Asynch_accept_task is closing ( i.e. proactor destructor )
+
+ // In all other cases we deregister ourself
+ // with ACE_Event_Handler::DONT_CALL mask
- // In all other cases we deregister ourself with
- // ACE_Event_Handler::DONT_CALL mask
-
this->cancel_uncompleted (0);
this->flg_open_ = 0;
// it means other thread is waiting for reactor token_
- if (task_lock_count_ > 0)
+ if (task_lock_count_ > 0)
{
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
task.lock_finish ();
}
@@ -1645,8 +1644,7 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
- ACE_LIB_TEXT(" dequeueing failed")
- ));
+ ACE_LIB_TEXT( " dequeueing failed")));
// Disable the <handle> in the reactor if no <accept>'s are pending.
@@ -1656,10 +1654,10 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
if (this->result_queue_.size () == 0)
{
- ACE_POSIX_Asynch_Accept_Task & task =
- this->posix_proactor()->get_asynch_accept_task();
-
- task.suspend_acceptor (this);
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+
+ task.suspend_io_handler (this->get_handle());
}
// Issue <accept> now.
@@ -1668,13 +1666,11 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0);
-
if (result == 0) // there is nobody to notify
{
ACE_OS::closesocket (new_handle);
return 0;
}
-
if (new_handle == ACE_INVALID_HANDLE)
{
@@ -1682,8 +1678,7 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"),
ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
- ACE_LIB_TEXT(" <accept> system call failed")
- ));
+ ACE_LIB_TEXT(" <accept> system call failed")));
// Notify client as usual, "AIO" finished with errors
}
@@ -1693,282 +1688,654 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
// Notify the main process about this completion
// Send the Result through the notification pipe.
- if (this->posix_proactor()->post_completion (result) == -1)
+ if (this->posix_proactor ()->post_completion (result) == -1)
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
- ACE_LIB_TEXT(" <post_completion> failed")
- ));
-
+ ACE_LIB_TEXT(" <post_completion> failed")));
+
return 0;
}
// *********************************************************************
-ACE_POSIX_Asynch_Accept_Task::ACE_POSIX_Asynch_Accept_Task()
- : flg_active_ (0),
- select_reactor_(), // should be initialized before reactor_
- reactor_ (& select_reactor_, 0), // don't delete implementation
- token_ (select_reactor_.lock()), // we can use reactor token
- finish_count_(0)
+ACE_HANDLE
+ACE_POSIX_Asynch_Connect_Result::connect_handle (void) const
{
+ return this->aio_fildes;
}
-ACE_POSIX_Asynch_Accept_Task::~ACE_POSIX_Asynch_Accept_Task()
+void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle)
{
- stop();
+ this->aio_fildes = handle;
}
-int
-ACE_POSIX_Asynch_Accept_Task::start ()
+
+ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Connect_Result_Impl (),
+ ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ this->aio_fildes = connect_handle;
+ this->aio_nbytes = 0;
+}
- if (this->flg_active_)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start already started")),
- -1);
+void
+ACE_POSIX_Asynch_Connect_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy the data.
+ this->bytes_transferred_ = bytes_transferred;
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+ this->error_ = error;
- if (this->reactor_.initialized () == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start reactor is not initialized")),
- -1);
+ // Create the interface result class.
+ ACE_Asynch_Connect::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_connect (result);
+}
+
+ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result (void)
+{
+}
+
+// Base class operations. These operations are here to kill dominance
+// warnings. These methods call the base class methods.
+
+u_long
+ACE_POSIX_Asynch_Connect_Result::bytes_transferred (void) const
+{
+ return ACE_POSIX_Asynch_Result::bytes_transferred ();
+}
+const void *
+ACE_POSIX_Asynch_Connect_Result::act (void) const
+{
+ return ACE_POSIX_Asynch_Result::act ();
+}
- if (this->activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0 )
+int
+ACE_POSIX_Asynch_Connect_Result::success (void) const
+{
+ return ACE_POSIX_Asynch_Result::success ();
+}
+
+const void *
+ACE_POSIX_Asynch_Connect_Result::completion_key (void) const
+{
+ return ACE_POSIX_Asynch_Result::completion_key ();
+}
+
+u_long
+ACE_POSIX_Asynch_Connect_Result::error (void) const
+{
+ return ACE_POSIX_Asynch_Result::error ();
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Connect_Result::event (void) const
+{
+ return ACE_POSIX_Asynch_Result::event ();
+}
+
+u_long
+ACE_POSIX_Asynch_Connect_Result::offset (void) const
+{
+ return ACE_POSIX_Asynch_Result::offset ();
+}
+
+u_long
+ACE_POSIX_Asynch_Connect_Result::offset_high (void) const
+{
+ return ACE_POSIX_Asynch_Result::offset_high ();
+}
+
+int
+ACE_POSIX_Asynch_Connect_Result::priority (void) const
+{
+ return ACE_POSIX_Asynch_Result::priority ();
+}
+
+int
+ACE_POSIX_Asynch_Connect_Result::signal_number (void) const
+{
+ return ACE_POSIX_Asynch_Result::signal_number ();
+}
+
+int
+ACE_POSIX_Asynch_Connect_Result::post_completion (ACE_Proactor_Impl *proactor)
+{
+ return ACE_POSIX_Asynch_Result::post_completion (proactor);
+}
+
+// *********************************************************************
+
+ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_AIOCB_Proactor * posix_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Connect_Impl (),
+ ACE_POSIX_Asynch_Operation (posix_proactor),
+ flg_open_ (0),
+ task_lock_count_ (0)
+{
+}
+
+ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void)
+{
+ this->close ();
+ this->reactor(0); // to avoid purge_pending_notifications
+}
+
+ACE_Proactor *
+ACE_POSIX_Asynch_Connect::proactor (void) const
+{
+ return ACE_POSIX_Asynch_Operation::proactor ();
+}
+
+ACE_HANDLE
+ACE_POSIX_Asynch_Connect::get_handle (void) const
+{
+
+ ACE_ASSERT (0);
+ return ACE_INVALID_HANDLE;
+}
+
+void
+ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE handle)
+{
+ ACE_ASSERT (0) ;
+}
+
+int
+ACE_POSIX_Asynch_Connect::open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::open\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ // if we are already opened,
+ // we could not create a new handler without closing the previous
+
+ if (this->flg_open_ != 0)
ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start failed")),
- -1);
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::open:")
+ ACE_LIB_TEXT("connector already open \n")),
+ -1);
+
+ //int result =
+ ACE_POSIX_Asynch_Operation::open (handler,
+ handle,
+ completion_key,
+ proactor);
+
+ // Ignore result as we pass ACE_INVALID_HANDLE
+ //if (result == -1)
+ // return result;
+
+ this->flg_open_ = 1;
- this->flg_active_ = 1;
return 0;
}
-int
-ACE_POSIX_Asynch_Accept_Task::stop ()
+int
+ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number)
{
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect\n"));
+
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- if (this->flg_active_ == 0 ) // already stopped
- return 0;
+ if (this->flg_open_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
+ ACE_LIB_TEXT("connector was not opened before\n")),
+ -1);
- reactor_.end_reactor_event_loop ();
- }
+ // Common code for both WIN and POSIX.
+ // Create future Asynch_Connect_Result
+ ACE_POSIX_Asynch_Connect_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_POSIX_Asynch_Connect_Result (*this->handler_,
+ connect_handle,
+ act,
+ this->posix_proactor ()->get_handle (),
+ priority,
+ signal_number),
+ -1);
- int rc = this->wait ();
+ int rc = connect_i (result,
+ remote_sap,
+ local_sap,
+ reuse_addr);
- if (rc != 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:%p\n"),
- ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::stop failed")),
- -1);
+ // update handle
+ connect_handle = result->connect_handle ();
+
+ if (rc != 0)
+ return post_result (result, 1);
+
+ // Enqueue result we will wait for completion
+
+ if (this->result_map_.bind (connect_handle, result) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect:")
+ ACE_LIB_TEXT("result map binding failed\n")));
+
+ result->set_error (EFAULT);
+ return post_result (result, 1);
+ }
+
+ task_lock_count_ ++;
+ }
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+ int rc_task = task.register_io_handler (connect_handle,
+ this,
+ ACE_Event_Handler::CONNECT_MASK,
+ 0); // not to suspend after register
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
- this->flg_active_ = 0;
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_ --;
- if (this->reactor_.initialized ())
- this->reactor_.close();
+ int post_enable = 1;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ {
+ post_enable = 0;
+ task.unlock_finish ();
+ }
- while (finish_count_ > 0)
+ if (rc_task < 0)
{
- ace_mon.release ();
- finish_event_.wait();
+ ACE_POSIX_Asynch_Connect_Result *result = 0;
+
+ this->result_map_.unbind (connect_handle, result);
+
+ if (result != 0)
+ {
+ result->set_error (EFAULT);
- ace_mon.acquire ();
- finish_event_.reset ();
+ return post_result (result, post_enable);
+ }
}
}
- return rc;
-}
-
-int
-ACE_POSIX_Asynch_Accept_Task::lock_finish ()
-{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
- finish_count_ ++;
return 0;
}
-int
-ACE_POSIX_Asynch_Accept_Task::unlock_finish ()
+int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result,
+ int post_enable)
{
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ if (this->flg_open_ != 0 && post_enable != 0)
+ {
+ if (this->posix_proactor ()->post_completion (result) == 0)
+ return 0 ;
- finish_count_ --;
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::post_result: ")
+ ACE_LIB_TEXT(" <post_completion> failed")));
+ }
- finish_event_.signal ();
+ ACE_HANDLE handle = result->connect_handle ();
- return 0;
+ if (handle != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (handle);
+
+ delete result;
+
+ return -1;
}
+//@@ New method connect_i
+// return code :
+// -1 errors before attempt to connect
+// 0 connect started
+// 1 connect finished ( may be unsuccessfully)
+
int
-ACE_POSIX_Asynch_Accept_Task::svc ()
+ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr)
{
- sigset_t RT_signals;
+ result->set_bytes_transferred (0);
- if (sigemptyset (& RT_signals) == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
- ACE_LIB_TEXT ("sigemptyset failed")));
+ ACE_HANDLE handle = result->connect_handle ();
+
+ if (handle == ACE_INVALID_HANDLE)
+ {
+ int protocol_family = remote_sap.get_type ();
+
+ handle = ACE_OS::socket (protocol_family,
+ SOCK_STREAM,
+ 0);
+ // save it
+ result->connect_handle (handle);
+
+ if (handle == ACE_INVALID_HANDLE)
+ {
+ result->set_error (errno);
- int member = 0;
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT(" ACE_OS::socket failed\n")),
+ -1);
+ }
+
+ // Reuse the address
+ int one = 1;
+ if (protocol_family != PF_UNIX &&
+ reuse_addr != 0 &&
+ ACE_OS::setsockopt (handle,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ (const char*) &one,
+ sizeof one) == -1 )
+ {
+ result->set_error (errno);
- for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT(" ACE_OS::setsockopt failed\n")),
+ -1);
+ }
+ }
+
+ if (local_sap != ACE_Addr::sap_any)
{
- member = sigismember (& RT_signals , si);
- if (member == 1)
+ sockaddr * laddr = ACE_reinterpret_cast (sockaddr *,
+ local_sap.get_addr ());
+ size_t size = local_sap.get_size ();
+
+ if (ACE_OS::bind (handle, laddr, size) == -1)
{
- sigaddset (& RT_signals, si);
+ result->set_error (errno);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT(" ACE_OS::bind failed\n")),
+ -1);
}
}
- if (ACE_OS::pthread_sigmask (SIG_BLOCK, & RT_signals, 0) != 0)
- ACE_ERROR ((LM_ERROR,
- ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
- ACE_LIB_TEXT ("pthread_sigmask failed")));
+ // set non blocking mode
+ if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
+ {
+ result->set_error (errno);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT(" ACE::set_flags failed\n")),
+ -1);
+ }
+ for (;;)
+ {
+ int rc = ACE_OS::connect (handle,
+ ACE_reinterpret_cast (sockaddr *,
+ remote_sap.get_addr ()),
+ remote_sap.get_size ());
+ if (rc < 0) // failure
+ {
+ if (errno == EWOULDBLOCK || errno == EINPROGRESS)
+ return 0; // connect started
- reactor_.owner (ACE_Thread::self());
+ if (errno == EINTR)
+ continue;
- reactor_.run_reactor_event_loop ();
+ result->set_error (errno);
+ }
- return 0;
-}
+ return 1 ; // connect finished
+ }
+ ACE_NOTREACHED (return 0);
+}
+//@@ New method cancel_uncompleted
+// It performs cancellation of all pending requests
+//
+// Parameter flg_notify can be
+// 0 - don't send notifications about canceled accepts
+// !0 - notify user about canceled accepts
+// according POSIX standards we should receive notifications
+// on canceled AIO requests
+//
+// Return value : number of cancelled requests
+//
+
int
-ACE_POSIX_Asynch_Accept_Task::register_acceptor (ACE_POSIX_Asynch_Accept * posix_accept,
- ACE_Reactor_Mask mask)
+ACE_POSIX_Asynch_Connect::cancel_uncompleted (int flg_notify,
+ ACE_Handle_Set & set)
{
- // Return codes :
- // 0 success
- // -1 reactor errors
- // -2 task not active
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel_uncompleted\n"));
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ int retval = 0;
- if (this->flg_active_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n")
- ACE_LIB_TEXT ("task not active \n")),
- -2);
-
- // Register the handler with the reactor.
- int retval = this->reactor_.register_handler (posix_accept->get_handle(),
- posix_accept,
- mask);
-
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n")
- ACE_LIB_TEXT ("register_handler failed \n")),
- -1);
+ MAP_ITERATOR iter (result_map_);
+ MAP_ENTRY * me = 0;
+
+ set.reset ();
- // Suspend the <handle> now. Enable only when the <accept> is issued
- // by the application.
- retval = this->reactor_.suspend_handler (posix_accept->get_handle());
- if (retval == -1)
+ for (; iter.next (me) != 0; retval++ , iter.advance ())
{
- this->reactor_.remove_handler (posix_accept,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
-
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n")
- ACE_LIB_TEXT ("suspend_handler failed \n")),
- -1);
+ ACE_HANDLE handle = me->ext_id_;
+ ACE_POSIX_Asynch_Connect_Result* result = me->int_id_ ;
+
+ set.set_bit (handle);
+
+ result->set_bytes_transferred (0);
+ result->set_error (ECANCELED);
+ this->post_result (result, flg_notify);
}
- return 0;
+ result_map_.unbind_all ();
+
+ return retval;
}
int
-ACE_POSIX_Asynch_Accept_Task::remove_acceptor (ACE_POSIX_Asynch_Accept * posix_accept)
+ACE_POSIX_Asynch_Connect::cancel (void)
{
- // Return codes :
- // 0 success
- // -1 reactor errors
- // -2 task not active
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel\n"));
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ //We are not really ACE_POSIX_Asynch_Operation
+ //so we could not call ::aiocancel ()
+ // or just write
+ //return ACE_POSIX_Asynch_Operation::cancel ();
+ //We delegate real cancelation to cancel_uncompleted (1)
- if (this->flg_active_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n")
- ACE_LIB_TEXT ("task not active \n")),
- -2);
-
- int retval = this->reactor_.remove_handler (posix_accept,
- ACE_Event_Handler::ALL_EVENTS_MASK
- | ACE_Event_Handler::DONT_CALL);
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n")
- ACE_LIB_TEXT ("remove_handler failed \n")),
- -1);
+ int rc = -1 ; // ERRORS
- return 0;
+ ACE_Handle_Set set;
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ int num_cancelled = cancel_uncompleted (flg_open_, set);
+
+ if (num_cancelled == 0)
+ rc = 1 ; // AIO_ALLDONE
+ else if (num_cancelled > 0)
+ rc = 0 ; // AIO_CANCELED
+
+ if (this->flg_open_ == 0)
+ return rc ;
+
+ this->task_lock_count_++;
+ }
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+
+ int rc_task = task.remove_io_handler (set);
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_--;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ task.unlock_finish ();
+ }
+
+ return rc;
}
int
-ACE_POSIX_Asynch_Accept_Task::suspend_acceptor (ACE_POSIX_Asynch_Accept * posix_accept)
+ACE_POSIX_Asynch_Connect::close (void)
{
- // Return codes :
- // 0 success
- // -1 reactor errors
- // -2 task not active
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::close\n"));
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+ ACE_Handle_Set set ;
- if (this->flg_active_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n")
- ACE_LIB_TEXT ("task not active \n")),
- -2);
-
- int retval = this->reactor_.suspend_handler (posix_accept);
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n")
- ACE_LIB_TEXT ("suspend_handler failed \n")),
- -1);
+ int num_cancelled = cancel_uncompleted (flg_open_, set);
+
+ if (num_cancelled == 0 || this->flg_open_ == 0)
+ {
+ this->flg_open_ = 0;
+ return 0;
+ }
+
+ this->task_lock_count_++;
+ }
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+
+ int rc_task = task.remove_io_handler (set);
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_--;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ task.unlock_finish ();
+
+ this->flg_open_ = 0;
+ }
return 0;
}
int
-ACE_POSIX_Asynch_Accept_Task::resume_acceptor (ACE_POSIX_Asynch_Accept * posix_accept)
+ACE_POSIX_Asynch_Connect::handle_exception (ACE_HANDLE fd)
{
- // Return codes :
- // 0 success
- // -1 reactor errors
- // -2 task not active
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_exception\n"));
+ return handle_input (fd);
+}
- ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1));
+int
+ACE_POSIX_Asynch_Connect::handle_input (ACE_HANDLE fd)
+{
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_input\n"));
- if (this->flg_active_ == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n")
- ACE_LIB_TEXT ("task not active \n")),
- -2);
-
- int retval = this->reactor_.resume_handler (posix_accept);
+ return handle_input (fd);
+}
- if (retval == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n")
- ACE_LIB_TEXT ("resume_handler failed \n")),
- -1);
+int
+ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd)
+{
+ ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_output\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+
+ ACE_POSIX_Asynch_Connect_Result* result = 0;
+
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+
+ int sockerror = 0 ;
+ int lsockerror = sizeof sockerror;
+
+ ACE_OS::getsockopt (fd,
+ SOL_SOCKET,
+ SO_ERROR,
+ (char*) &sockerror,
+ &lsockerror);
+
+ result->set_bytes_transferred (0);
+ result->set_error (sockerror);
+ this->post_result (result, this->flg_open_);
+
+ return -1;
+
+ //ACE_Asynch_Pseudo_Task & task =
+ // this->posix_proactor()->get_asynch_pseudo_task();
+
+ //task.remove_io_handler ( fd );
+
+ //return 0;
+}
+
+
+int
+ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_POSIX_Asynch_Connect::handle_close\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+
+ ACE_Asynch_Pseudo_Task &task =
+ this->posix_proactor ()->get_asynch_pseudo_task ();
+
+ if (task.is_active() == 0) // task is closing
+ {
+ if (this->flg_open_ !=0) // we are open
+ {
+ this->flg_open_ = 0;
+
+ // it means other thread is waiting for reactor token_
+ if (task_lock_count_ > 0)
+ task.lock_finish ();
+ }
+
+ ACE_Handle_Set set;
+ this->cancel_uncompleted (0, set);
+
+ return 0;
+ }
+
+ // remove_io_handler() contains flag DONT_CALL
+ // so it is save
+ task.remove_io_handler (fd);
+
+ ACE_POSIX_Asynch_Connect_Result* result = 0;
+
+ if (this->result_map_.unbind (fd, result) != 0 ) // not found
+ return -1;
+
+ result->set_bytes_transferred (0);
+ result->set_error (ECANCELED);
+ this->post_result (result, this->flg_open_);
return 0;
}
@@ -2302,6 +2669,7 @@ ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_S
if (result.success () == 0)
{
// Failure.
+
ACE_ERROR ((LM_ERROR,
"Asynch_Transmit_File failed.\n"));
@@ -3008,7 +3376,6 @@ ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram (ACE_POSIX_AIOCB_Proa
{
}
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>;
@@ -3019,6 +3386,14 @@ template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>;
template class ACE_Node<ACE_POSIX_Asynch_Result *>;
template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>;
+template class ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>;
+template class ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>
@@ -3029,6 +3404,15 @@ template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>;
#pragma instantiate ACE_Node<ACE_POSIX_Asynch_Result *>
#pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>
+#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>
+#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+
+
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h
index f00dcd79db3..132a4075ca4 100644
--- a/ace/POSIX_Asynch_IO.h
+++ b/ace/POSIX_Asynch_IO.h
@@ -29,22 +29,22 @@
#if defined (ACE_HAS_AIO_CALLS)
#include "ace/OS.h"
+
#include "ace/Asynch_IO_Impl.h"
-#include "ace/Reactor.h"
-#include "ace/Select_Reactor.h"
#include "ace/Unbounded_Queue.h"
-#include "ace/Task.h"
+#include "ace/Map_Manager.h"
// Forward declarations
class ACE_POSIX_SIG_Proactor;
class ACE_POSIX_AIOCB_Proactor;
class ACE_Proactor_Impl;
+class ACE_Handle_Set;
/**
* @class ACE_POSIX_Asynch_Result
*
- * This class provides concrete implementation for <ACE_Asynch_Result>
- * for POSIX4 platforms. This class extends <aiocb> and makes it more
+ * This class provides concrete implementation for ACE_Asynch_Result
+ * for POSIX4 platforms. This class extends @c aiocb and makes it more
* useful.
*/
class ACE_Export ACE_POSIX_Asynch_Result : public virtual ACE_Asynch_Result_Impl,
@@ -495,7 +495,7 @@ protected:
* @class ACE_POSIX_Asynch_Write_Stream
*
* @brief This class implements <ACE_Asynch_Write_Stream> for
- * all POSIX implementations of Proactor.
+ * all POSIX implementations of ACE_Proactor.
*/
class ACE_Export ACE_POSIX_Asynch_Write_Stream : public virtual ACE_Asynch_Write_Stream_Impl,
public ACE_POSIX_Asynch_Operation
@@ -623,6 +623,8 @@ public:
int post_completion (ACE_Proactor_Impl *proactor);
protected:
+ /// Constructor is protected since creation is limited to
+ /// ACE_Asynch_Read_File factory.
ACE_POSIX_Asynch_Read_File_Result (ACE_Handler &handler,
ACE_HANDLE handle,
ACE_Message_Block &message_block,
@@ -633,8 +635,6 @@ protected:
ACE_HANDLE event,
int priority,
int signal_number);
- // Constructor is protected since creation is limited to
- // ACE_Asynch_Read_File factory.
/// ACE_Proactor will call this method when the read completes.
virtual void complete (u_long bytes_transferred,
@@ -924,7 +924,6 @@ class ACE_Export ACE_POSIX_Asynch_Accept_Result : public virtual ACE_Asynch_Acce
{
/// Factory classes will have special permissions.
friend class ACE_POSIX_Asynch_Accept;
- friend class ACE_POSIX_Asynch_Accept_Handler;
/// The Proactor constructs the Result class for faking results.
friend class ACE_POSIX_Proactor;
@@ -1082,31 +1081,30 @@ public:
int priority,
int signal_number = 0);
-
/**
- * Cancel all pending pseudo-asynchronus requests
- * Behavior as usual AIO request
- */
+ * Cancel all pending pseudo-asynchronus requests
+ * Behavior as usual AIO request
+ */
int cancel (void);
/**
- * Close performs cancellation of all pending requests
- * and closure the listen handle
- */
- int close (void);
+ * Close performs cancellation of all pending requests
+ * and closure the listen handle
+ */
+ int close ();
- /// virtual from ACE_Event_Hanlder
+ /// virtual from ACE_Event_Handler
ACE_HANDLE get_handle (void) const;
- /// virtual from ACE_Event_Hanlder
+ /// virtual from ACE_Event_Handler
void set_handle (ACE_HANDLE handle);
- /// virtual from ACE_Event_Hanlder
- /// Called when accept event comes up on <listen_hanlde>
+ /// virtual from ACE_Event_Handler
+ /// Called when accept event comes up on <listen_handle>
int handle_input (ACE_HANDLE handle);
- /// virtual from ACE_Event_Hanlder
- int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ;
+ /// virtual from ACE_Event_Handler
+ int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
// = Methods belong to ACE_POSIX_Asynch_Operation base class. These
// methods are defined here to avoid dominace warnings. They route
@@ -1114,7 +1112,6 @@ public:
/// Return the underlying proactor.
ACE_Proactor* proactor (void) const;
-
private:
int cancel_uncompleted (int flg_notify);
// flg_notify points whether or not we should send notification about
@@ -1126,69 +1123,253 @@ private:
// on canceled AIO requests
int flg_open_ ;
- /// 1 - Accept is registered in ACE_POSIX_Asynch_Accept_Task
- /// 0 - Aceept is deregisted in ACE_POSIX_Asynch_Accept_Task
+ /// 1 - Accept is registered in ACE_Asynch_Pseudo_Task
+ /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
-
- /// to prevent ACE_POSIX_Asynch_Accept_Task from deletion
- /// while we make a call to the ACE_POSIX_Asynch_Accept_Task
+ /// to prevent ACE_Asynch_Pseudo_Task from deletion
+ /// while we make a call to the ACE_Asynch_Pseudo_Task
/// This is extra cost !!!
/// we could avoid them if all applications will follow the rule:
/// Proactor should be deleted only after deletion all
- /// AsynchOperation objects connected with it
+ /// AsynchOperation objects connected with it
int task_lock_count_;
- /// Queue of Result pointers that correspond to all the <accept>'s
- /// pending.
+ /// Queue of Result pointers that correspond to all the pending
+ /// accept operations.
ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_;
- /// The lock to protect the result queue which is shared. The queue
+ /// The lock to protect the result queue which is shared. The queue
/// is updated by main thread in the register function call and
- /// through the auxillary thread in the deregister fun. So let us
+ /// through the auxillary thread in the deregister fun. So let us
/// mutex it.
ACE_SYNCH_MUTEX lock_;
};
/**
- * @class ACE_POSIX_Asynch_Accept_Task
+ * @class ACE_POSIX_Asynch_Connect_Result
*
+ * @brief This is that class which will be passed back to the
+ * completion handler when the asynchronous connect completes.
+ *
+ * This class has all the information necessary for a
+ * completion handler to uniquely identify the completion of the
+ * asynchronous connect.
*/
-class ACE_Export ACE_POSIX_Asynch_Accept_Task : public ACE_Task<ACE_MT_SYNCH>
+class ACE_Export ACE_POSIX_Asynch_Connect_Result : public virtual ACE_Asynch_Connect_Result_Impl,
+ public ACE_POSIX_Asynch_Result
{
- friend class ACE_POSIX_Asynch_Accept;
+ /// Factory classes will have special permissions.
+ friend class ACE_POSIX_Asynch_Connect;
+
+ /// The Proactor constructs the Result class for faking results.
+ friend class ACE_POSIX_Proactor;
+
public:
- ACE_POSIX_Asynch_Accept_Task (void);
- virtual ~ACE_POSIX_Asynch_Accept_Task (void);
+ /// I/O handle for the connection.
+ ACE_HANDLE connect_handle (void) const;
- int start (void);
- int stop (void);
+ // = Base class operations. These operations are here to kill
+ // dominance warnings. These methods call the base class methods.
+
+ /// Number of bytes transferred by the operation.
+ u_long bytes_transferred (void) const;
- virtual int svc (void);
+ /// ACT associated with the operation.
+ const void *act (void) const;
- int register_acceptor (ACE_POSIX_Asynch_Accept *posix_accept,
- ACE_Reactor_Mask mask);
- int remove_acceptor (ACE_POSIX_Asynch_Accept *posix_accept);
- int resume_acceptor (ACE_POSIX_Asynch_Accept *posix_accept);
- int suspend_acceptor (ACE_POSIX_Asynch_Accept *posix_accept);
+ /// Did the operation succeed?
+ int success (void) const;
+
+ /**
+ * This is the ACT associated with the handle on which the
+ * Asynch_Operation takes place.
+ *
+ * @note This is not implemented for POSIX4 platforms.
+ */
+ const void *completion_key (void) const;
+
+ /// Error value if the operation fail.
+ u_long error (void) const;
+
+ /// This returns ACE_INVALID_HANDLE on POSIX4 platforms.
+ ACE_HANDLE event (void) const;
+
+ /**
+ * This really make sense only when doing file I/O.
+ *
+ * @note On POSIX4-Unix, @c offset_high should be supported using
+ * @c aiocb64.
+ */
+ u_long offset (void) const;
+ u_long offset_high (void) const;
+
+ /// The priority of the asynchronous operation.
+ int priority (void) const;
+
+ /**
+ * POSIX4 realtime signal number to be used for the
+ * operation. The signal number ranges from @c SIGRTMIN to @c SIGRTMAX.
+ * By default, SIGRTMIN is used to issue AIO calls.
+ *
+ * @note This is a no-op on non-POSIX4 systems and returns 0.
+ */
+ int signal_number (void) const;
+
+ /// Post this object to the Proactor.
+ int post_completion (ACE_Proactor_Impl *proactor);
protected:
- int lock_finish (void);
- int unlock_finish (void);
+ /// Constructor is protected since creation is limited to
+ /// ACE_Asynch_Connect factory.
+ ACE_POSIX_Asynch_Connect_Result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number);
- int flg_active_ ;
+ /// ACE_Proactor will call this method when the accept completes.
+ virtual void complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error);
- ACE_Select_Reactor select_reactor_;
- // should be initialized before reactor_
+ /// Destructor.
+ virtual ~ACE_POSIX_Asynch_Connect_Result (void);
- ACE_Reactor reactor_;
+ // aiocb::aio_filedes
+ // I/O handle for the new connection.
+ void connect_handle (ACE_HANDLE handle);
+};
- ACE_Lock &token_;
- int finish_count_;
- ACE_Manual_Event finish_event_;
+/**
+ * @class ACE_POSIX_Asynch_Connect
+ *
+ */
+class ACE_Export ACE_POSIX_Asynch_Connect :
+ public virtual ACE_Asynch_Connect_Impl,
+ public ACE_POSIX_Asynch_Operation,
+ public ACE_Event_Handler
+{
+public:
+
+ /// Constructor.
+ ACE_POSIX_Asynch_Connect (ACE_POSIX_AIOCB_Proactor * posix_aiocb_proactor);
+
+ /// Destructor.
+ virtual ~ACE_POSIX_Asynch_Connect (void);
+
+ /**
+ * This belongs to ACE_POSIX_Asynch_Operation. We forward
+ * this call to that method. We have put this here to avoid the
+ * compiler warnings.
+ */
+ int open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor = 0);
+
+ /**
+ * This starts off an asynchronous connect.
+ *
+ * @arg connect_handle will be used for the connect call. If
+ * ACE_INVALID_HANDLE is specified, a new
+ * handle will be created.
+ */
+ int connect (ACE_HANDLE connect_handle,
+ const ACE_Addr &remote_sap,
+ const ACE_Addr &local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number = 0);
+
+ /**
+ * Cancel all pending pseudo-asynchronus requests
+ * Behavior as usual AIO request
+ */
+ int cancel (void);
+
+ /**
+ * Close performs cancellation of all pending requests.
+ */
+ int close (void);
+
+ /// virtual from ACE_Event_Handler
+ ACE_HANDLE get_handle (void) const;
+
+ /// virtual from ACE_Event_Handler
+ void set_handle (ACE_HANDLE handle);
+
+ /// virtual from ACE_Event_Handler
+ /// Called when accept event comes up on <listen_hanlde>
+ int handle_input (ACE_HANDLE handle);
+ int handle_output (ACE_HANDLE handle);
+ int handle_exception (ACE_HANDLE handle);
+
+ /// virtual from ACE_Event_Handler
+ int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ;
+
+ // = Methods belong to ACE_POSIX_Asynch_Operation base class. These
+ // methods are defined here to avoid dominace warnings. They route
+ // the call to the ACE_POSIX_Asynch_Operation base class.
+
+ /// Return the underlying proactor.
+ ACE_Proactor* proactor (void) const;
+
+private:
+ int connect_i (ACE_POSIX_Asynch_Connect_Result *result,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr);
+
+ int post_result (ACE_POSIX_Asynch_Connect_Result *result, int flg_post);
+
+ /// Cancel uncompleted connect operations.
+ /**
+ * @arg flg_notify Indicates whether or not we should send notification
+ * about canceled accepts. If this is 0, don't send
+ * notifications about canceled connects. If 1, notify
+ * user about canceled connects according POSIX
+ * standards we should receive notifications on canceled
+ * AIO requests.
+ */
+ int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set);
+
+ int flg_open_ ;
+ /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task
+ /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
+
+
+ /// to prevent ACE_Asynch_Pseudo_Task from deletion
+ /// while we make a call to the ACE_Asynch_Pseudo_Task
+ /// This is extra cost !!!
+ /// we could avoid them if all applications will follow the rule:
+ /// Proactor should be deleted only after deletion all
+ /// AsynchOperation objects connected with it
+ int task_lock_count_;
+
+ typedef ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+ MAP_MANAGER;
+ typedef ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+ MAP_ITERATOR;
+ typedef ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>
+ MAP_ENTRY;
+
+ MAP_MANAGER result_map_;
+ // Map of Result pointers that correspond to all the <accept>'s
+ // pending.
+
+ /// The lock to protect the result queue which is shared. The queue
+ /// is updated by main thread in the register function call and
+ /// through the auxillary thread in the deregister fun. So let us
+ /// mutex it.
+ ACE_SYNCH_MUTEX lock_;
};
+
/**
* @class ACE_POSIX_Asynch_Transmit_File_Result
*
@@ -1207,7 +1388,7 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_File_Result : public virtual ACE_Asyn
/// Handlers do all the job.
friend class ACE_POSIX_Asynch_Transmit_Handler;
-
+
/// The Proactor constructs the Result class for faking results.
friend class ACE_POSIX_Proactor;
@@ -1471,7 +1652,7 @@ protected:
* @class ACE_POSIX__Asynch_Write_Dgram_Result
*
* @brief This is class provides concrete implementation for
- * ACE_Asynch_Write_Dgram::Result class.
+ * ACE_Asynch_Write_Dgram::Result class.
*/
class ACE_Export ACE_POSIX_Asynch_Write_Dgram_Result : public virtual ACE_Asynch_Write_Dgram_Result_Impl,
public ACE_POSIX_Asynch_Result
@@ -1541,14 +1722,14 @@ protected:
/// Constructor is protected since creation is limited to
/// ACE_Asynch_Write_Stream factory.
ACE_POSIX_Asynch_Write_Dgram_Result (ACE_Handler &handler,
- ACE_HANDLE handle,
- ACE_Message_Block *message_block,
- size_t bytes_to_write,
- int flags,
- const void* act,
- ACE_HANDLE event,
- int priority,
- int signal_number);
+ ACE_HANDLE handle,
+ ACE_Message_Block *message_block,
+ size_t bytes_to_write,
+ int flags,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number);
/// ACE_Proactor will call this method when the write completes.
virtual void complete (u_long bytes_transferred,
@@ -1597,7 +1778,7 @@ public:
/// Destructor
virtual ~ACE_POSIX_Asynch_Write_Dgram (void);
- /** This starts off an asynchronous send. Upto
+ /** This starts off an asynchronous send. Up to
* <message_block->total_length()> will be sent. <message_block>'s
* <rd_ptr> will be updated to reflect the sent bytes if the send operation
* is successful completed.
@@ -1659,7 +1840,7 @@ protected:
* @class ACE_POSIX_Asynch_Read_Dgram_Result
*
* @brief This is class provides concrete implementation for
- * ACE_Asynch_Read_Dgram::Result class.
+ * ACE_Asynch_Read_Dgram::Result class.
*/
class ACE_Export ACE_POSIX_Asynch_Read_Dgram_Result : public virtual ACE_Asynch_Read_Dgram_Result_Impl,
public virtual ACE_POSIX_Asynch_Result
diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp
index 3004e21e10b..381d7521303 100644
--- a/ace/POSIX_Proactor.cpp
+++ b/ace/POSIX_Proactor.cpp
@@ -316,6 +316,26 @@ ACE_POSIX_Proactor::create_asynch_accept_result (ACE_Handler &handler,
return implementation;
}
+ACE_Asynch_Connect_Result_Impl *
+ACE_POSIX_Proactor::create_asynch_connect_result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+{
+ ACE_Asynch_Connect_Result_Impl *implementation;
+ ACE_NEW_RETURN (implementation,
+ ACE_POSIX_Asynch_Connect_Result (handler,
+ connect_handle,
+ act,
+ event,
+ priority,
+ signal_number),
+ 0);
+ return implementation;
+}
+
ACE_Asynch_Transmit_File_Result_Impl *
ACE_POSIX_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
@@ -494,7 +514,7 @@ private:
ACE_Pipe pipe_;
// Pipe for the communication between Proactor and the
- // Asynch_Accept.
+ // Asynch_Accept/Asynch_Connect and other post_completions
ACE_POSIX_Asynch_Read_Stream read_stream_;
// To do asynch_read on the pipe.
@@ -505,7 +525,7 @@ private:
ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
: posix_aiocb_proactor_ (posix_aiocb_proactor),
- message_block_ (sizeof (ACE_POSIX_Asynch_Accept_Result *)),
+ message_block_ (sizeof (2)),
read_stream_ (posix_aiocb_proactor)
{
// Open the pipe.
@@ -620,7 +640,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
// start pseudo-asynchronous accept task
// one per all future acceptors
- this->accept_task_.start ();
+ this->get_asynch_pseudo_task().start ();
+
}
// Special protected constructor for ACE_SUN_Proactor
@@ -661,7 +682,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
{
// stop asynch accept task
- this->get_asynch_accept_task().stop ();
+ this->get_asynch_pseudo_task().stop ();
+
delete_notify_manager ();
@@ -745,8 +767,8 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
void
ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
{
- // Accept Handler for aio_accept. Remember! this issues a Asynch_Read
- // on the notify pipe for doing the Asynch_Accept.
+ // Remember! this issues a Asynch_Read
+ // on the notify pipe for doing the Asynch_Accept/Connect.
if (aiocb_notify_pipe_manager_ == 0)
ACE_NEW (aiocb_notify_pipe_manager_,
@@ -939,7 +961,17 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_accept (void)
ACE_NEW_RETURN (implementation,
ACE_POSIX_Asynch_Accept (this),
0);
- //was ACE_POSIX_AIOCB_Asynch_Accept (this)
+
+ return implementation;
+}
+
+ACE_Asynch_Connect_Impl *
+ACE_POSIX_AIOCB_Proactor::create_asynch_connect (void)
+{
+ ACE_Asynch_Connect_Impl *implementation = 0;
+ ACE_NEW_RETURN (implementation,
+ ACE_POSIX_Asynch_Connect (this),
+ 0);
return implementation;
}
@@ -1485,7 +1517,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
// but we should start pseudo-asynchronous accept task
// one per all future acceptors
- this->accept_task_.start ();
+ this->get_asynch_pseudo_task().start ();
return;
}
@@ -1532,14 +1564,14 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
// but we should start pseudo-asynchronous accept task
// one per all future acceptors
- this->accept_task_.start ();
+ this->get_asynch_pseudo_task().start ();
return;
}
ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
{
// stop asynch accept task
- this->get_asynch_accept_task().stop ();
+ this->get_asynch_pseudo_task().stop ();
// @@ Enable the masked signals again.
}
diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h
index 5331f3abb3d..8781218633f 100644
--- a/ace/POSIX_Proactor.h
+++ b/ace/POSIX_Proactor.h
@@ -31,6 +31,7 @@
#include "ace/Free_List.h"
#include "ace/Pipe.h"
#include "ace/POSIX_Asynch_IO.h"
+#include "ace/Asynch_Pseudo_Task.h"
#define ACE_AIO_MAX_SIZE 2048
#define ACE_AIO_DEFAULT_SIZE 1024
@@ -51,13 +52,6 @@
*/
class ACE_Export ACE_POSIX_Proactor : public ACE_Proactor_Impl
{
- /**
- * For <POSIX_SIG_Asynch_Accept> operation, this handler class does
- * the actual work, has to register the real-time signal with the
- * Proactor.
- */
- friend class ACE_POSIX_SIG_Asynch_Accept_Handler;
-
public:
enum Proactor_Type
{
@@ -194,6 +188,13 @@ public:
int priority = 0,
int signal_number = ACE_SIGRTMIN);
+ virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler & handler,
+ ACE_HANDLE connect_handle,
+ const void *act,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
+
virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
ACE_HANDLE file,
@@ -249,7 +250,6 @@ protected:
// Forward declarations.
class ACE_AIOCB_Notify_Pipe_Manager;
-class ACE_POSIX_Accept_Task;
/**
* @class ACE_POSIX_AIOCB_Proactor
@@ -268,6 +268,7 @@ class ACE_Export ACE_POSIX_AIOCB_Proactor : public ACE_POSIX_Proactor
/// Proactor which is necessary in the AIOCB strategy.
friend class ACE_POSIX_Asynch_Operation;
friend class ACE_POSIX_Asynch_Accept;
+ friend class ACE_POSIX_Asynch_Connect;
public:
@@ -317,6 +318,8 @@ public:
virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void);
+ virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void);
+
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void);
/**
@@ -339,8 +342,9 @@ protected:
ACE_POSIX_AIOCB_Proactor (size_t nmaxop,
ACE_POSIX_Proactor::Proactor_Type ptype);
- /// Task to process pseudo-asynchronous accept
- ACE_POSIX_Asynch_Accept_Task &get_asynch_accept_task (void);
+
+ /// Task to process pseudo-asynchronous operations
+ ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task();
/// Call these methods from derived class when virtual table is
/// built.
@@ -442,7 +446,7 @@ protected:
ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> result_queue_;
/// Task to process pseudo-asynchronous accept
- ACE_POSIX_Asynch_Accept_Task accept_task_;
+ ACE_Asynch_Pseudo_Task pseudo_task_;
};
/**
@@ -563,6 +567,7 @@ protected:
*/
class ACE_Export ACE_POSIX_Asynch_Timer : public ACE_POSIX_Asynch_Result
{
+
/// The factory method for this class is with the POSIX_Proactor
/// class.
friend class ACE_POSIX_Proactor;
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 6a87d59fe4a..c75fa578d93 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -772,6 +772,12 @@ ACE_Proactor::create_asynch_accept (void)
return this->implementation ()->create_asynch_accept ();
}
+ACE_Asynch_Connect_Impl *
+ACE_Proactor::create_asynch_connect (void)
+{
+ return this->implementation ()->create_asynch_connect ();
+}
+
ACE_Asynch_Transmit_File_Impl *
ACE_Proactor::create_asynch_transmit_file (void)
{
@@ -939,6 +945,23 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler,
signal_number);
}
+ACE_Asynch_Connect_Result_Impl *
+ACE_Proactor::create_asynch_connect_result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+
+{
+ return this->implementation ()->create_asynch_connect_result (handler,
+ connect_handle,
+ act,
+ event,
+ priority,
+ signal_number);
+}
+
ACE_Asynch_Transmit_File_Result_Impl *
ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
diff --git a/ace/Proactor.h b/ace/Proactor.h
index b3b55257586..65cc7cef5ee 100644
--- a/ace/Proactor.h
+++ b/ace/Proactor.h
@@ -1,4 +1,4 @@
-// -*- C++ -*-
+/* -*- C++ -*- */
//=============================================================================
/**
@@ -9,7 +9,7 @@
* @author Irfan Pyarali <irfan@cs.wustl.edu>
* @author Tim Harrison <harrison@cs.wustl.edu>
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
- * @author Alexander Libman <alibman@@ihug.com.au>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -381,6 +381,9 @@ public:
/// Create the correct implementation class for doing Asynch_Accept.
virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void);
+ /// Create the correct implementation class for doing Asynch_Connect.
+ virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void);
+
/// Create the correct implementation class for doing
/// Asynch_Transmit_File.
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void);
@@ -489,6 +492,15 @@ public:
int priority = 0,
int signal_number = ACE_SIGRTMIN);
+ /// Create the correct implementation class for ACE_Asynch_Connect::Result
+ virtual ACE_Asynch_Connect_Result_Impl *
+ create_asynch_connect_result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN);
+
/// Create the correct implementation class for
/// ACE_Asynch_Transmit_File::Result.
virtual ACE_Asynch_Transmit_File_Result_Impl *
@@ -513,8 +525,7 @@ public:
* Timer object with a meaningful signal number, choosing the
* largest signal number from the signal mask of the Proactor.
*/
- virtual ACE_Asynch_Result_Impl *create_asynch_timer (
- ACE_Handler &handler,
+ virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler,
const void *act,
const ACE_Time_Value &tv,
ACE_HANDLE event = ACE_INVALID_HANDLE,
diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h
index 4b129de07b1..d5de76dbb0a 100644
--- a/ace/Proactor_Impl.h
+++ b/ace/Proactor_Impl.h
@@ -7,6 +7,7 @@
* $Id$
*
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -102,6 +103,9 @@ public:
/// Create the correct implementation class for doing Asynch_Accept.
virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void) = 0;
+ /// Create the correct implementation class for doing Asynch_Connect.
+ virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void) = 0;
+
/// Create the correct implementation class for doing Asynch_Transmit_File.
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void) = 0;
@@ -197,6 +201,14 @@ public:
int priority = 0,
int signal_number = ACE_SIGRTMIN) = 0;
+ /// Create the correct implementation class for ACE_Asynch_Connect::Result.
+ virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event = ACE_INVALID_HANDLE,
+ int priority = 0,
+ int signal_number = ACE_SIGRTMIN) = 0;
+
/// Create the correct implementation class for ACE_Asynch_Transmit_File::Result.
virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp
index f406393c747..5d674493565 100644
--- a/ace/SUN_Proactor.cpp
+++ b/ace/SUN_Proactor.cpp
@@ -24,14 +24,14 @@ ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations)
// we should start pseudo-asynchronous accept task
// one per all future acceptors
- this->accept_task_.start ();
+ this->get_asynch_pseudo_task ().start ();
}
// Destructor.
ACE_SUN_Proactor::~ACE_SUN_Proactor (void)
{
// stop asynch accept task
- this->get_asynch_accept_task().stop ();
+ this->get_asynch_pseudo_task ().stop ();
// to provide correct virtual calls
delete_notify_manager ();
diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp
index 69d504e3792..cbeb6a4e629 100644
--- a/ace/WIN32_Asynch_IO.cpp
+++ b/ace/WIN32_Asynch_IO.cpp
@@ -1430,6 +1430,651 @@ ACE_WIN32_Asynch_Accept::proactor (void) const
return ACE_WIN32_Asynch_Operation::proactor ();
}
+// *********************************************************************
+
+ACE_HANDLE
+ACE_WIN32_Asynch_Connect_Result::connect_handle (void) const
+{
+ return this->connect_handle_;
+}
+
+void ACE_WIN32_Asynch_Connect_Result::connect_handle ( ACE_HANDLE handle )
+{
+ this->connect_handle_ = handle;
+}
+
+
+ACE_WIN32_Asynch_Connect_Result::ACE_WIN32_Asynch_Connect_Result
+ (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number)
+ : ACE_Asynch_Result_Impl (),
+ ACE_Asynch_Connect_Result_Impl (),
+ ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number),
+ connect_handle_ ( connect_handle )
+{
+ ;
+}
+
+void
+ACE_WIN32_Asynch_Connect_Result::complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error)
+{
+ // Copy the data.
+ this->bytes_transferred_ = bytes_transferred;
+ this->success_ = success;
+ this->completion_key_ = completion_key;
+ this->error_ = error;
+
+ // Create the interface result class.
+ ACE_Asynch_Connect::Result result (this);
+
+ // Call the application handler.
+ this->handler_.handle_connect (result);
+}
+
+ACE_WIN32_Asynch_Connect_Result::~ACE_WIN32_Asynch_Connect_Result (void)
+{
+}
+
+// Base class operations. These operations are here to kill dominance
+// warnings. These methods call the base class methods.
+
+u_long
+ACE_WIN32_Asynch_Connect_Result::bytes_transferred (void) const
+{
+ return ACE_WIN32_Asynch_Result::bytes_transferred ();
+}
+
+const void *
+ACE_WIN32_Asynch_Connect_Result::act (void) const
+{
+ return ACE_WIN32_Asynch_Result::act ();
+}
+
+int
+ACE_WIN32_Asynch_Connect_Result::success (void) const
+{
+ return ACE_WIN32_Asynch_Result::success ();
+}
+
+const void *
+ACE_WIN32_Asynch_Connect_Result::completion_key (void) const
+{
+ return ACE_WIN32_Asynch_Result::completion_key ();
+}
+
+u_long
+ACE_WIN32_Asynch_Connect_Result::error (void) const
+{
+ return ACE_WIN32_Asynch_Result::error ();
+}
+
+ACE_HANDLE
+ACE_WIN32_Asynch_Connect_Result::event (void) const
+{
+ return ACE_WIN32_Asynch_Result::event ();
+}
+
+u_long
+ACE_WIN32_Asynch_Connect_Result::offset (void) const
+{
+ return ACE_WIN32_Asynch_Result::offset ();
+}
+
+u_long
+ACE_WIN32_Asynch_Connect_Result::offset_high (void) const
+{
+ return ACE_WIN32_Asynch_Result::offset_high ();
+}
+
+int
+ACE_WIN32_Asynch_Connect_Result::priority (void) const
+{
+ return ACE_WIN32_Asynch_Result::priority ();
+}
+
+int
+ACE_WIN32_Asynch_Connect_Result::signal_number (void) const
+{
+ return ACE_WIN32_Asynch_Result::signal_number ();
+}
+
+int
+ACE_WIN32_Asynch_Connect_Result::post_completion (ACE_Proactor_Impl *proactor)
+{
+ return ACE_WIN32_Asynch_Result::post_completion (proactor);
+}
+
+// *********************************************************************
+
+ACE_WIN32_Asynch_Connect::ACE_WIN32_Asynch_Connect (ACE_WIN32_Proactor * win32_proactor)
+ : ACE_Asynch_Operation_Impl (),
+ ACE_Asynch_Connect_Impl (),
+ ACE_WIN32_Asynch_Operation (win32_proactor),
+ flg_open_ (0),
+ task_lock_count_ (0)
+{
+}
+
+ACE_WIN32_Asynch_Connect::~ACE_WIN32_Asynch_Connect (void)
+{
+ this->close ();
+ this->reactor (0); // to avoid purge_pending_notifications
+}
+
+ACE_Proactor *
+ACE_WIN32_Asynch_Connect::proactor (void) const
+{
+ return ACE_WIN32_Asynch_Operation::proactor ();
+}
+
+ACE_HANDLE
+ACE_WIN32_Asynch_Connect::get_handle (void) const
+{
+
+ ACE_ASSERT (0);
+ return ACE_INVALID_HANDLE;
+}
+
+void
+ACE_WIN32_Asynch_Connect::set_handle (ACE_HANDLE handle)
+{
+ ACE_ASSERT (0) ;
+}
+
+int
+ACE_WIN32_Asynch_Connect::open (ACE_Handler &handler,
+ ACE_HANDLE,
+ const void *completion_key,
+ ACE_Proactor *proactor)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::open\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ // if we are already opened,
+ // we could not create a new handler without closing the previous
+ if (this->flg_open_ != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::open:")
+ ACE_LIB_TEXT ("connector already open \n")),
+ -1);
+
+ //int result =
+ ACE_WIN32_Asynch_Operation::open (handler,
+ ACE_INVALID_HANDLE,
+ completion_key,
+ proactor);
+
+ // Ignore result as we pass ACE_INVALID_HANDLE
+ //if (result == -1)
+ // return result;
+
+ this->flg_open_ = 1;
+
+ return 0;
+}
+
+int
+ACE_WIN32_Asynch_Connect::connect (ACE_HANDLE connect_handle,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect\n"));
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ if (this->flg_open_ == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect")
+ ACE_LIB_TEXT ("connector was not opened before\n")),
+ -1);
+
+ // Common code for both WIN and WIN32.
+ // Create future Asynch_Connect_Result
+ ACE_WIN32_Asynch_Connect_Result *result = 0;
+ ACE_NEW_RETURN (result,
+ ACE_WIN32_Asynch_Connect_Result (*this->handler_,
+ connect_handle,
+ act,
+ this->win32_proactor_->get_handle (),
+ priority,
+ signal_number),
+ -1);
+
+ int rc = connect_i (result,
+ remote_sap,
+ local_sap,
+ reuse_addr);
+
+ // update handle
+ connect_handle = result->connect_handle ();
+
+ if (rc != 0)
+ return post_result (result, 1);
+
+ // Enqueue result we will wait for completion
+
+ if (this->result_map_.bind (connect_handle, result) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect:")
+ ACE_LIB_TEXT ("result map binding failed\n")));
+ result->set_error (EFAULT);
+ return post_result (result, 1);
+ }
+
+ this->task_lock_count_++;
+ }
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->win32_proactor_->get_asynch_pseudo_task ();
+
+ int rc_task = task.register_io_handler (connect_handle,
+ this,
+ ACE_Event_Handler::CONNECT_MASK,
+ 0); // not to suspend after register
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_--;
+
+ int post_enable = 1;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ {
+ post_enable = 0;
+ task.unlock_finish ();
+ }
+
+ if (rc_task < 0)
+ {
+ ACE_WIN32_Asynch_Connect_Result *result = 0;
+
+ this->result_map_.unbind (connect_handle, result);
+
+ if (result != 0)
+ {
+ result->set_error (EFAULT);
+
+ return post_result (result, post_enable);
+ }
+ }
+ }
+
+ return 0;
+}
+
+int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * result,
+ int post_enable)
+{
+ if (this->flg_open_ != 0 && post_enable != 0)
+ {
+ if (this->win32_proactor_ ->post_completion (result) == 0)
+ return 0;
+
+ ACE_ERROR ((LM_ERROR,
+ ACE_LIB_TEXT ("Error:(%P | %t):%p\n"),
+ ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::post_result: ")
+ ACE_LIB_TEXT (" <post_completion> failed")));
+ }
+
+ ACE_HANDLE handle = result->connect_handle ();
+
+ if (handle != ACE_INVALID_HANDLE)
+ ACE_OS::closesocket (handle);
+
+ delete result;
+
+ return -1;
+}
+
+//@@ New method connect_i
+// return code :
+// -1 errors before attempt to connect
+// 0 connect started
+// 1 connect finished ( may be unsuccessfully)
+
+int
+ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result,
+ const ACE_Addr & remote_sap,
+ const ACE_Addr & local_sap,
+ int reuse_addr)
+{
+ result->set_bytes_transferred (0);
+
+ ACE_HANDLE handle = result->connect_handle ();
+
+ if (handle == ACE_INVALID_HANDLE)
+ {
+ int protocol_family = remote_sap.get_type ();
+
+ handle = ACE_OS::socket (protocol_family,
+ SOCK_STREAM,
+ 0);
+
+ // save it
+ result->connect_handle (handle);
+
+ if (handle == ACE_INVALID_HANDLE)
+ {
+ result->set_error (errno);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT (" ACE_OS::socket failed\n")),
+ -1);
+ }
+
+ // Reuse the address
+ int one = 1;
+ if (protocol_family != PF_UNIX &&
+ reuse_addr != 0 &&
+ ACE_OS::setsockopt (handle,
+ SOL_SOCKET,
+ SO_REUSEADDR,
+ (const char*) &one,
+ sizeof one) == -1)
+ {
+ result->set_error (errno);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT (" ACE_OS::setsockopt failed\n")),
+ -1);
+ }
+ }
+
+ if (local_sap != ACE_Addr::sap_any)
+ {
+ sockaddr * laddr = ACE_reinterpret_cast (sockaddr *,
+ local_sap.get_addr ());
+ size_t size = local_sap.get_size ();
+
+ if (ACE_OS::bind (handle, laddr, size) == -1)
+ {
+ result->set_error (errno);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT (" ACE_OS::bind failed\n")),
+ -1);
+ }
+ }
+
+ // set non blocking mode
+
+ if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
+ {
+ result->set_error (errno);
+
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ")
+ ACE_LIB_TEXT (" ACE::set_flags failed\n")),
+ -1);
+ }
+
+ for (;;)
+ {
+ int rc = ACE_OS::connect (handle,
+ ACE_reinterpret_cast (sockaddr *,
+ remote_sap.get_addr ()),
+ remote_sap.get_size ());
+
+ if (rc < 0) // failure
+ {
+ if (errno == EWOULDBLOCK || errno == EINPROGRESS)
+ return 0; // connect started
+
+ if (errno == EINTR)
+ continue;
+
+ result->set_error (errno);
+ }
+ return 1 ; // connect finished
+ }
+
+ ACE_NOTREACHED (return 0);
+}
+
+
+//@@ New method cancel_uncompleted
+// It performs cancellation of all pending requests
+//
+// Parameter flg_notify can be
+// 0 - don't send notifications about canceled accepts
+// !0 - notify user about canceled accepts
+// according WIN32 standards we should receive notifications
+// on canceled AIO requests
+//
+// Return value : number of cancelled requests
+//
+
+int
+ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & set)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel_uncompleted\n"));
+
+ int retval = 0;
+
+ MAP_ITERATOR iter (result_map_);
+
+ MAP_ENTRY * me = 0;
+
+ set.reset ();
+
+ for (; iter.next (me) != 0; retval++, iter.advance ())
+ {
+ ACE_HANDLE handle = me->ext_id_;
+ ACE_WIN32_Asynch_Connect_Result* result = me->int_id_ ;
+
+ set.set_bit (handle);
+
+ result->set_bytes_transferred (0);
+ result->set_error (ERROR_OPERATION_ABORTED);
+ this->post_result (result, flg_notify);
+ }
+
+ result_map_.unbind_all ();
+
+ return retval;
+}
+
+int
+ACE_WIN32_Asynch_Connect::cancel (void)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel\n"));
+
+ //We are not really ACE_WIN32_Asynch_Operation
+ //so we could not call ::aiocancel ()
+ // or just write
+ //return ACE_WIN32_Asynch_Operation::cancel ();
+ //We delegate real cancelation to cancel_uncompleted (1)
+
+ int rc = -1 ; // ERRORS
+
+ ACE_Handle_Set set;
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ int num_cancelled = cancel_uncompleted (flg_open_, set);
+
+ if (num_cancelled == 0)
+ rc = 1; // AIO_ALLDONE
+ else if (num_cancelled > 0)
+ rc = 0; // AIO_CANCELED
+
+ if (this->flg_open_ == 0)
+ return rc;
+
+ this->task_lock_count_++;
+ }
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->win32_proactor_->get_asynch_pseudo_task ();
+
+ int rc_task = task.remove_io_handler (set);
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_--;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ task.unlock_finish ();
+ }
+
+ return rc;
+}
+
+int
+ACE_WIN32_Asynch_Connect::close (void)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::close\n"));
+
+ ACE_Handle_Set set;
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ int num_cancelled = cancel_uncompleted (flg_open_, set);
+
+ if (num_cancelled == 0 || this->flg_open_ == 0)
+ {
+ this->flg_open_ = 0;
+ return 0;
+ }
+
+ this->task_lock_count_++;
+ }
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->win32_proactor_->get_asynch_pseudo_task ();
+
+ int rc_task = task.remove_io_handler (set);
+
+ {
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
+
+ this->task_lock_count_--;
+
+ if (rc_task == -2 && task_lock_count_ == 0) // task is closing
+ task.unlock_finish ();
+
+ this->flg_open_ = 0;
+ }
+
+ return 0;
+}
+
+int
+ACE_WIN32_Asynch_Connect::handle_exception (ACE_HANDLE fd)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_exception\n"));
+ return handle_output (fd);
+}
+
+int
+ACE_WIN32_Asynch_Connect::handle_input (ACE_HANDLE fd)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_input\n"));
+ return handle_output (fd);
+}
+
+int
+ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd)
+{
+ ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_output\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+
+ ACE_WIN32_Asynch_Connect_Result* result = 0;
+
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+
+ int sockerror = 0 ;
+ int lsockerror = sizeof sockerror;
+
+ ACE_OS::getsockopt (fd,
+ SOL_SOCKET,
+ SO_ERROR,
+ (char*) & sockerror,
+ & lsockerror);
+
+ result->set_bytes_transferred (0);
+ result->set_error (sockerror);
+ this->post_result (result, this->flg_open_);
+
+ //ACE_Asynch_Pseudo_Task & task =
+ // this->win32_proactor_->get_asynch_pseudo_task();
+
+ // remove_io_handler() contains flag DONT_CALL
+ //task.remove_io_handler ( fd );
+
+ //return 0;
+ return -1;
+}
+
+
+int
+ACE_WIN32_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask)
+{
+ ACE_TRACE(ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_close\n"));
+
+ ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
+
+ ACE_Asynch_Pseudo_Task & task =
+ this->win32_proactor_->get_asynch_pseudo_task ();
+
+ if (task.is_active () == 0) // task is closing
+ {
+ if (this->flg_open_ != 0) // we are open
+ {
+ this->flg_open_ = 0;
+
+ // it means other thread is waiting for reactor token_
+ if (this->task_lock_count_ > 0)
+ task.lock_finish ();
+ }
+
+ ACE_Handle_Set set;
+ this->cancel_uncompleted (0, set);
+
+ return 0;
+ }
+
+ // remove_io_handler() contains flag DONT_CALL
+ // so it is save
+ task.remove_io_handler (fd);
+
+ ACE_WIN32_Asynch_Connect_Result* result = 0;
+
+ if (this->result_map_.unbind (fd, result) != 0) // not found
+ return -1;
+
+ result->set_bytes_transferred (0);
+ result->set_error (ERROR_OPERATION_ABORTED);
+ this->post_result (result, this->flg_open_);
+
+ return 0;
+}
+
+// *********************************************************************
+
ACE_HANDLE
ACE_WIN32_Asynch_Transmit_File_Result::socket (void) const
{
@@ -2317,4 +2962,27 @@ ACE_WIN32_Asynch_Write_Dgram::ACE_WIN32_Asynch_Write_Dgram (ACE_WIN32_Proactor *
{
}
+#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
+
+template class ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *>;
+template class ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Const_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>;
+
+#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
+
+#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *>
+#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Const_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+
+
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
+
#endif /* ACE_WIN32 || ACE_HAS_WINCE */
diff --git a/ace/WIN32_Asynch_IO.h b/ace/WIN32_Asynch_IO.h
index 1d99a0e61b8..6b59020df0b 100644
--- a/ace/WIN32_Asynch_IO.h
+++ b/ace/WIN32_Asynch_IO.h
@@ -9,15 +9,16 @@
*
* These classes only works on Win32 platforms.
*
- * The implementation of <ACE_Asynch_Transmit_File> and
- * <ACE_Asynch_Accept> are only supported if ACE_HAS_WINSOCK2 is
- * defined or you are on WinNT 4.0 or higher.
+ * The implementation of ACE_Asynch_Transmit_File,
+ * ACE_Asynch_Accept, and ACE_Asynch_Connect are only supported if
+ * ACE_HAS_WINSOCK2 is defined or you are on WinNT 4.0 or higher.
*
*
* @author Irfan Pyarali <irfan@cs.wustl.edu>
* @author Tim Harrison <harrison@cs.wustl.edu>
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
* @author Roger Tragin <r.tragin@computer.org>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -36,6 +37,9 @@
#include "ace/OS.h"
#include "ace/Asynch_IO_Impl.h"
#include "ace/Addr.h"
+#include "ace/Event_Handler.h"
+
+#include "ace/Map_Manager.h"
// Forward declaration
class ACE_WIN32_Proactor;
@@ -1044,6 +1048,222 @@ public:
};
/**
+ * @class ACE_WIN32_Asynch_Connect_Result
+ *
+ * @brief This is that class which will be passed back to the
+ * completion handler when the asynchronous connect completes.
+ *
+ * This class has all the information necessary for the
+ * completion handler to uniquiely identify the completion of the
+ * asynchronous connect.
+ */
+class ACE_Export ACE_WIN32_Asynch_Connect_Result : public virtual ACE_Asynch_Connect_Result_Impl,
+ public ACE_WIN32_Asynch_Result
+{
+ /// Factory classes will have special permissions.
+ friend class ACE_WIN32_Asynch_Connect;
+
+ /// The Proactor constructs the Result class for faking results.
+ friend class ACE_WIN32_Proactor;
+
+public:
+
+ /// I/O handle for the connection.
+ ACE_HANDLE connect_handle (void) const;
+
+ // = Base class operations. These operations are here to kill some
+ // warnings. These methods call the base class methods.
+
+ /// Number of bytes transferred by the operation.
+ u_long bytes_transferred (void) const;
+
+ /// ACT associated with the operation.
+ const void *act (void) const;
+
+ /// Did the operation succeed?
+ int success (void) const;
+
+ /**
+ * Returns the ACT associated with the handle when it was
+ * registered with the I/O completion port. This ACT is not the
+ * same as the ACT associated with the asynchronous operation.
+ */
+ const void *completion_key (void) const;
+
+ /// Error value if the operation fail.
+ u_long error (void) const;
+
+ /// Event associated with the OVERLAPPED structure.
+ ACE_HANDLE event (void) const;
+
+ /// This really make sense only when doing file I/O.
+ u_long offset (void) const;
+
+ /// Offset_high associated with the OVERLAPPED structure.
+ u_long offset_high (void) const;
+
+ /// The priority of the asynchronous operation. Currently, this is
+ /// not supported on Win32.
+ int priority (void) const;
+
+ /// No-op. Returns 0.
+ int signal_number (void) const;
+
+ /// Post this object to the Proactor's completion port.
+ int post_completion (ACE_Proactor_Impl *proactor);
+
+protected:
+ /// Constructor is protected since creation is limited to
+ /// ACE_Asynch_Connect factory.
+ ACE_WIN32_Asynch_Connect_Result (ACE_Handler &handler,
+ ACE_HANDLE connect_handle,
+ const void* act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number);
+
+ /// ACE_Proactor will call this method when the accept completes.
+ virtual void complete (u_long bytes_transferred,
+ int success,
+ const void *completion_key,
+ u_long error);
+
+ /// Destructor.
+ virtual ~ACE_WIN32_Asynch_Connect_Result (void);
+
+ /// Set the I/O handle for the new connection.
+ void connect_handle (ACE_HANDLE handle);
+
+ ACE_HANDLE connect_handle_;
+};
+
+
+/**
+ * @class ACE_WIN32_Asynch_Connect
+ */
+class ACE_Export ACE_WIN32_Asynch_Connect :
+ public virtual ACE_Asynch_Connect_Impl,
+ public ACE_WIN32_Asynch_Operation,
+ public ACE_Event_Handler
+{
+public:
+
+ /// Constructor.
+ ACE_WIN32_Asynch_Connect (ACE_WIN32_Proactor * win32_proactor);
+
+ /// Destructor.
+ virtual ~ACE_WIN32_Asynch_Connect (void);
+
+ /**
+ * This open belongs to ACE_WIN32_Asynch_Operation. We forward
+ * this call to that method. We have put this here to avoid the
+ * compiler warnings.
+ */
+ int open (ACE_Handler &handler,
+ ACE_HANDLE handle,
+ const void *completion_key,
+ ACE_Proactor *proactor = 0);
+
+ /**
+ * Start an asynchronous connect.
+ *
+ * @arg connect_handle handle to use for the connect. If the value
+ * ACE_INVALID_HANDLE, a new handle will be created.
+ *
+ * @retval 0 Success
+ * @retval -1 Error
+ */
+ int connect (ACE_HANDLE connect_handle,
+ const ACE_Addr &remote_sap,
+ const ACE_Addr &local_sap,
+ int reuse_addr,
+ const void *act,
+ int priority,
+ int signal_number = 0);
+
+ /**
+ * Cancel all pending pseudo-asynchronus requests
+ * Behavior as usual AIO request
+ */
+ int cancel (void);
+
+ /**
+ * Close performs cancellation of all pending requests
+ * and close the connect handle
+ */
+ int close (void);
+
+ /// virtual from ACE_Event_Handler
+ ACE_HANDLE get_handle (void) const;
+
+ /// virtual from ACE_Event_Handler
+ void set_handle (ACE_HANDLE handle);
+
+ /// virtual from ACE_Event_Handler
+ int handle_input ( ACE_HANDLE handle);
+ int handle_output ( ACE_HANDLE handle);
+ int handle_exception ( ACE_HANDLE handle);
+
+ /// virtual from ACE_Event_Handler
+ int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ;
+
+ // = Methods belong to ACE_WIN32_Asynch_Operation base class. These
+ // methods are defined here to avoid dominace warnings. They route
+ // the call to the ACE_WIN32_Asynch_Operation base class.
+ /// Return the underlying proactor.
+ ACE_Proactor* proactor (void) const;
+
+private:
+ int connect_i (ACE_WIN32_Asynch_Connect_Result *result,
+ const ACE_Addr &remote_sap,
+ const ACE_Addr &local_sap,
+ int reuse_addr);
+
+ int post_result (ACE_WIN32_Asynch_Connect_Result *result, int flg_post);
+
+ /// Cancel uncompleted connect operations.
+ /**
+ * @arg flg_notify Indicates whether or not to send notification about
+ * canceled connect operations. If 0, don't send
+ * notifications. If 1, notify user about canceled
+ * connects.
+ * According WIN32 standards we should receive
+ * notifications on canceled AIO requests.
+ */
+ int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set);
+
+ int flg_open_ ;
+ /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task
+ /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task
+
+
+ /// to prevent ACE_Asynch_Pseudo_Task from deletion
+ /// while we make a call to the ACE_Asynch_Pseudo_Task
+ /// This is extra cost !!!
+ /// we could avoid them if all applications will follow the rule:
+ /// Proactor should be deleted only after deletion all
+ /// AsynchOperation objects connected with it
+ int task_lock_count_;
+
+ typedef ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+ MAP_MANAGER;
+ typedef ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>
+ MAP_ITERATOR;
+ typedef ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *>
+ MAP_ENTRY;
+
+ MAP_MANAGER result_map_;
+ // Map of Result pointers that correspond to all the <accept>'s
+ // pending.
+
+ ACE_SYNCH_MUTEX lock_;
+ // The lock to protect the result queue which is shared. The queue
+ // is updated by main thread in the register function call and
+ // through the auxillary thread in the deregister fun. So let us
+ // mutex it.
+};
+
+/**
* @class ACE_WIN32_Asynch_Transmit_File_Result
*
*
diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp
index ec89c97f5b0..a26c164430b 100644
--- a/ace/WIN32_Proactor.cpp
+++ b/ace/WIN32_Proactor.cpp
@@ -52,13 +52,23 @@ ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads,
ACE_ERROR ((LM_ERROR,
ACE_LIB_TEXT ("%p\n"),
ACE_LIB_TEXT ("CreateIoCompletionPort")));
+
+ this->get_asynch_pseudo_task ().start ();
}
ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void)
{
+ this->get_asynch_pseudo_task ().stop ();
+
this->close ();
}
+ACE_Asynch_Pseudo_Task &
+ACE_WIN32_Proactor::get_asynch_pseudo_task ()
+{
+ return this->pseudo_task_;
+}
+
int
ACE_WIN32_Proactor::close (void)
{
@@ -66,7 +76,7 @@ ACE_WIN32_Proactor::close (void)
if (this->completion_port_ != 0)
{
// To avoid memory leaks we should delete all results from queue.
-
+
for (;;)
{
ACE_OVERLAPPED *overlapped = 0;
@@ -74,7 +84,7 @@ ACE_WIN32_Proactor::close (void)
u_long completion_key = 0;
// Get the next asynchronous operation that completes
- BOOL res = ::GetQueuedCompletionStatus
+ BOOL res = ::GetQueuedCompletionStatus
(this->completion_port_,
&bytes_transferred,
&completion_key,
@@ -84,7 +94,7 @@ ACE_WIN32_Proactor::close (void)
if (overlapped == 0)
break;
- ACE_WIN32_Asynch_Result *asynch_result =
+ ACE_WIN32_Asynch_Result *asynch_result =
(ACE_WIN32_Asynch_Result *) overlapped;
delete asynch_result;
@@ -196,6 +206,16 @@ ACE_WIN32_Proactor::create_asynch_accept (void)
return implementation;
}
+ACE_Asynch_Connect_Impl *
+ACE_WIN32_Proactor::create_asynch_connect (void)
+{
+ ACE_Asynch_Connect_Impl *implementation = 0;
+ ACE_NEW_RETURN (implementation,
+ ACE_WIN32_Asynch_Connect (this),
+ 0);
+ return implementation;
+}
+
ACE_Asynch_Transmit_File_Impl *
ACE_WIN32_Proactor::create_asynch_transmit_file (void)
{
@@ -390,6 +410,26 @@ ACE_WIN32_Proactor::create_asynch_accept_result (ACE_Handler &handler,
return implementation;
}
+ACE_Asynch_Connect_Result_Impl *
+ACE_WIN32_Proactor::create_asynch_connect_result (ACE_Handler & handler,
+ ACE_HANDLE connect_handle,
+ const void *act,
+ ACE_HANDLE event,
+ int priority ,
+ int signal_number)
+{
+ ACE_Asynch_Connect_Result_Impl *implementation = 0;
+ ACE_NEW_RETURN (implementation,
+ ACE_WIN32_Asynch_Connect_Result (handler,
+ connect_handle,
+ act,
+ event,
+ priority,
+ signal_number),
+ 0);
+ return implementation;
+}
+
ACE_Asynch_Transmit_File_Result_Impl *
ACE_WIN32_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
diff --git a/ace/WIN32_Proactor.h b/ace/WIN32_Proactor.h
index 1784ead2c8a..c363292f954 100644
--- a/ace/WIN32_Proactor.h
+++ b/ace/WIN32_Proactor.h
@@ -10,6 +10,7 @@
* @author Tim Harrison (harrison@cs.wustl.edu)
* @author Alexander Babu Arulanthu <alex@cs.wustl.edu>
* @author Roger Tragin <r.tragin@computer.org>
+ * @author Alexander Libman <alibman@ihug.com.au>
*/
//=============================================================================
@@ -31,6 +32,7 @@
#include "ace/Event_Handler.h"
#include "ace/Proactor_Impl.h"
+#include "ace/Asynch_Pseudo_Task.h"
// Forward declarations.
class ACE_WIN32_Asynch_Result;
@@ -47,6 +49,8 @@ class ACE_WIN32_Proactor_Timer_Handler;
*/
class ACE_Export ACE_WIN32_Proactor : public ACE_Proactor_Impl
{
+ friend class ACE_WIN32_Asynch_Connect;
+
public:
/// A do nothing constructor.
ACE_WIN32_Proactor (size_t number_of_threads = 0,
@@ -110,6 +114,7 @@ public:
virtual ACE_Asynch_Read_Dgram_Impl *create_asynch_read_dgram (void);
virtual ACE_Asynch_Write_Dgram_Impl *create_asynch_write_dgram (void);
virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void);
+ virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void);
virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void);
// Methods used to create Asynch_IO_Result objects. We create the right
@@ -188,6 +193,14 @@ public:
int priority,
int signal_number = 0);
+ virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler & handler,
+ ACE_HANDLE connect_handle,
+ const void *act,
+ ACE_HANDLE event,
+ int priority,
+ int signal_number = 0);
+
+
virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler,
ACE_HANDLE socket,
ACE_HANDLE file,
@@ -212,6 +225,9 @@ public:
int signal_number = 0);
protected:
+ /// Task to process pseudo-asynchronous operations
+ ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task (void);
+
/// Called when object is signaled by OS (either via UNIX signals or
/// when a Win32 object becomes signaled).
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
@@ -261,6 +277,10 @@ protected:
/// Handler to handle the wakeups. This works in conjunction with the
/// <ACE_Proactor::run_event_loop>.
ACE_Handler wakeup_handler_;
+
+ /// Pseudo-task for asynch connect ( NT/2000)
+ /// In future should removed in XP with ConnectEx support
+ ACE_Asynch_Pseudo_Task pseudo_task_;
};
/**