diff options
author | Steve Huston <shuston@riverace.com> | 2002-04-16 22:57:11 +0000 |
---|---|---|
committer | Steve Huston <shuston@riverace.com> | 2002-04-16 22:57:11 +0000 |
commit | f97a745c44cb491dc1760c7f18dc42af5680b4be (patch) | |
tree | 65ede0b735a5470c152d5355013f89974c0430f5 | |
parent | 0b97fde98e72ccf1364dc54aa14385de769f0af7 (diff) | |
download | ATCD-f97a745c44cb491dc1760c7f18dc42af5680b4be.tar.gz |
ChangeLogTag:Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com>
-rw-r--r-- | ChangeLog | 51 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-02a | 51 | ||||
-rw-r--r-- | ChangeLogs/ChangeLog-03a | 51 | ||||
-rw-r--r-- | ace/Asynch_Connector.cpp | 297 | ||||
-rw-r--r-- | ace/Asynch_Connector.h | 145 | ||||
-rw-r--r-- | ace/Asynch_IO.cpp | 105 | ||||
-rw-r--r-- | ace/Asynch_IO.h | 101 | ||||
-rw-r--r-- | ace/Asynch_IO_Impl.cpp | 8 | ||||
-rw-r--r-- | ace/Asynch_IO_Impl.h | 53 | ||||
-rw-r--r-- | ace/Asynch_IO_Impl.i | 13 | ||||
-rw-r--r-- | ace/Asynch_Pseudo_Task.cpp | 313 | ||||
-rw-r--r-- | ace/Asynch_Pseudo_Task.h | 79 | ||||
-rw-r--r-- | ace/Makefile | 2 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.cpp | 934 | ||||
-rw-r--r-- | ace/POSIX_Asynch_IO.h | 317 | ||||
-rw-r--r-- | ace/POSIX_Proactor.cpp | 52 | ||||
-rw-r--r-- | ace/POSIX_Proactor.h | 27 | ||||
-rw-r--r-- | ace/Proactor.cpp | 23 | ||||
-rw-r--r-- | ace/Proactor.h | 19 | ||||
-rw-r--r-- | ace/Proactor_Impl.h | 12 | ||||
-rw-r--r-- | ace/SUN_Proactor.cpp | 4 | ||||
-rw-r--r-- | ace/WIN32_Asynch_IO.cpp | 668 | ||||
-rw-r--r-- | ace/WIN32_Asynch_IO.h | 226 | ||||
-rw-r--r-- | ace/WIN32_Proactor.cpp | 46 | ||||
-rw-r--r-- | ace/WIN32_Proactor.h | 20 |
25 files changed, 3226 insertions, 391 deletions
diff --git a/ChangeLog b/ChangeLog index fd57322c2db..4ff9024aebe 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,44 @@ +Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com> + + New feature, ACE_Asynch_Connect, contributed by Alex Libman + <alibman@ihug.com.au>. Allows asynchronous connect using the + ACE Proactor framework. The new classes follow the same arrangement + as the existing ACE_Asynch_Accept framework. + + * ace/Asynch_Connector.{h cpp}: New files + + * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and + its result. Added new method, ACE_Handler::handle_connect(), to + handle completion of asynch connect operations. + + * ace/Asynch_IO_Impl.{h i cpp}: Added new classes + ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl. + + * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles + asynch emulation where needed, for example, asynch accept/connect. + Replaces the ACE_Asynch_Accept_Task and used for both accept/connect. + + * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task + (subsumed by ACE_Asynch_Pseudo_Task, above) and add the + ACE_POSIX_Asynch_Connect and its Result class. + + * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing. + + * ace/Proactor.{h cpp}: Added asynch connect support methods. + + * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods. + + * ace/SUN_Proactor.cpp: Change from asynch_accept_task to + asynch_pseudo_task. + + * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and + _Result. + + * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and + create_asynch_connect_result() methods. + + * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task + Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu> * ace/Service_Templates.cpp: @@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> - Make it possible to get/set the timer queue based on pointers, the get method with a & is still available, but is marked as deprecated - Make the thr_id method const - - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter - then it is also deleted, if it is passed or set afterwards it isn't - deleted by ACE_Thread_Timer_Queue_Adapter (just like in the - ACE_Reactor). + - When the timer queue is created by the + ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is + passed or set afterwards it isn't deleted by + ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor). Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> @@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com> * ace/Asynch_Acceptor.cpp (parse_address): Set the entire address (address and port) instead of just the IP address part. - Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix. + Thanks to Alex Libman <alibman@ihug.com.au> for this fix. Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ChangeLogs/ChangeLog-02a b/ChangeLogs/ChangeLog-02a index fd57322c2db..4ff9024aebe 100644 --- a/ChangeLogs/ChangeLog-02a +++ b/ChangeLogs/ChangeLog-02a @@ -1,3 +1,44 @@ +Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com> + + New feature, ACE_Asynch_Connect, contributed by Alex Libman + <alibman@ihug.com.au>. Allows asynchronous connect using the + ACE Proactor framework. The new classes follow the same arrangement + as the existing ACE_Asynch_Accept framework. + + * ace/Asynch_Connector.{h cpp}: New files + + * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and + its result. Added new method, ACE_Handler::handle_connect(), to + handle completion of asynch connect operations. + + * ace/Asynch_IO_Impl.{h i cpp}: Added new classes + ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl. + + * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles + asynch emulation where needed, for example, asynch accept/connect. + Replaces the ACE_Asynch_Accept_Task and used for both accept/connect. + + * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task + (subsumed by ACE_Asynch_Pseudo_Task, above) and add the + ACE_POSIX_Asynch_Connect and its Result class. + + * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing. + + * ace/Proactor.{h cpp}: Added asynch connect support methods. + + * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods. + + * ace/SUN_Proactor.cpp: Change from asynch_accept_task to + asynch_pseudo_task. + + * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and + _Result. + + * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and + create_asynch_connect_result() methods. + + * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task + Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu> * ace/Service_Templates.cpp: @@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> - Make it possible to get/set the timer queue based on pointers, the get method with a & is still available, but is marked as deprecated - Make the thr_id method const - - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter - then it is also deleted, if it is passed or set afterwards it isn't - deleted by ACE_Thread_Timer_Queue_Adapter (just like in the - ACE_Reactor). + - When the timer queue is created by the + ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is + passed or set afterwards it isn't deleted by + ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor). Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> @@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com> * ace/Asynch_Acceptor.cpp (parse_address): Set the entire address (address and port) instead of just the IP address part. - Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix. + Thanks to Alex Libman <alibman@ihug.com.au> for this fix. Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ChangeLogs/ChangeLog-03a b/ChangeLogs/ChangeLog-03a index fd57322c2db..4ff9024aebe 100644 --- a/ChangeLogs/ChangeLog-03a +++ b/ChangeLogs/ChangeLog-03a @@ -1,3 +1,44 @@ +Tue Apr 16 18:42:39 2002 Steve Huston <shuston@riverace.com> + + New feature, ACE_Asynch_Connect, contributed by Alex Libman + <alibman@ihug.com.au>. Allows asynchronous connect using the + ACE Proactor framework. The new classes follow the same arrangement + as the existing ACE_Asynch_Accept framework. + + * ace/Asynch_Connector.{h cpp}: New files + + * ace/Asynch_IO.{h cpp}: Added new ACE_Asynch_Connect class and + its result. Added new method, ACE_Handler::handle_connect(), to + handle completion of asynch connect operations. + + * ace/Asynch_IO_Impl.{h i cpp}: Added new classes + ACE_Asynch_Connect_Impl and ACE_Asynch_Connect_Result_Impl. + + * ace/Asynch_Pseudo_Task.{h cpp}: Generalized task that handles + asynch emulation where needed, for example, asynch accept/connect. + Replaces the ACE_Asynch_Accept_Task and used for both accept/connect. + + * ace/POSIX_Asynch_IO.{h cpp}: Removed ACE_POSIX_Asynch_Accept_Task + (subsumed by ACE_Asynch_Pseudo_Task, above) and add the + ACE_POSIX_Asynch_Connect and its Result class. + + * ace/POSIX_Proactor.{h cpp}: Added asynch connect plumbing. + + * ace/Proactor.{h cpp}: Added asynch connect support methods. + + * ace/Proactor_Impl.h: Added create_asynch_connect[_result] methods. + + * ace/SUN_Proactor.cpp: Change from asynch_accept_task to + asynch_pseudo_task. + + * ace/WIN32_Asynch_IO.{h cpp}: Add new ACE_WIN32_Asynch_Connect and + _Result. + + * ace/WIN32_Proactor.{h cpp}: Added new create_asynch_connect() and + create_asynch_connect_result() methods. + + * ace/Makefile: Added Asynch_Connector, Asynch_Pseudo_Task + Tue Apr 16 11:49:00 2002 Ossama Othman <ossama@uci.edu> * ace/Service_Templates.cpp: @@ -77,10 +118,10 @@ Mon Apr 15 16:20:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> - Make it possible to get/set the timer queue based on pointers, the get method with a & is still available, but is marked as deprecated - Make the thr_id method const - - When the timer queue is created by the ACE_Thread_Timer_Queue_Adapter - then it is also deleted, if it is passed or set afterwards it isn't - deleted by ACE_Thread_Timer_Queue_Adapter (just like in the - ACE_Reactor). + - When the timer queue is created by the + ACE_Thread_Timer_Queue_Adapter then it is also deleted, if it is + passed or set afterwards it isn't deleted by + ACE_Thread_Timer_Queue_Adapter (just like in the ACE_Reactor). Mon Apr 15 10:22:12 UTC 2002 Johnny Willemsen <jwillemsen@remedy.nl> @@ -192,7 +233,7 @@ Fri Apr 12 19:15:39 2002 Steve Huston <shuston@riverace.com> * ace/Asynch_Acceptor.cpp (parse_address): Set the entire address (address and port) instead of just the IP address part. - Thanks to Alex Libman <AlexL@rumblegroup.com> for this fix. + Thanks to Alex Libman <alibman@ihug.com.au> for this fix. Fri Apr 12 18:00:41 UTC 2002 Don Hinton <dhinton@ieee.org> diff --git a/ace/Asynch_Connector.cpp b/ace/Asynch_Connector.cpp new file mode 100644 index 00000000000..190a230f053 --- /dev/null +++ b/ace/Asynch_Connector.cpp @@ -0,0 +1,297 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef ACE_ASYNCH_CONNECTOR_C +#define ACE_ASYNCH_CONNECTOR_C + +#include "ace/Asynch_Connector.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_RCSID(ace, Asynch_Connector, "$Id$") + +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) +// This only works on platforms that support async I/O. + +#include "ace/Message_Block.h" +#include "ace/INET_Addr.h" + +template <class HANDLER> +const ACE_INET_Addr +ACE_Asynch_Connector<HANDLER>::inet_addr_any_ = ACE_INET_Addr ( (unsigned short) 0, ACE_UINT32 (INADDR_ANY) ); + + +template <class HANDLER> +ACE_Asynch_Connector<HANDLER>::ACE_Asynch_Connector (void) + : pass_addresses_ (0), + validate_new_connection_ (0) +{ +} + +template <class HANDLER> +ACE_Asynch_Connector<HANDLER>::~ACE_Asynch_Connector (void) +{ + //this->asynch_connect_.close (); +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::open (int pass_addresses, + ACE_Proactor *proactor, + int validate_new_connection) +{ + this->proactor (proactor); + this->pass_addresses_ = pass_addresses; + this->validate_new_connection_ = validate_new_connection; + + // Initialize the ACE_Asynch_Connect + if (this->asynch_connect_.open (*this, + ACE_INVALID_HANDLE, + 0, + this->proactor ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connect::open")), + -1); + return 0; +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::connect (const ACE_INET_Addr & remote_sap, + const ACE_INET_Addr & local_sap, + int reuse_addr, + const void *act) +{ + // Initiate asynchronous connect + if (this->asynch_connect_.connect (ACE_INVALID_HANDLE, + remote_sap, + local_sap, + reuse_addr, + act) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connect::connect")), + -1); + return 0; +} + +template <class HANDLER> void +ACE_Asynch_Connector<HANDLER>::handle_connect (const ACE_Asynch_Connect::Result &result) +{ + // Variable for error tracking + int error = 0; + + // If the asynchronous connect fails. + if (!result.success () || + result.connect_handle () == ACE_INVALID_HANDLE) + { + error = 1; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Invalid handle"))); + } + + if (result.error () != 0) + { + error = 1; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : result error=%d\n"), + ACE_static_cast (int, result.error ()))); + } + + // set blocking mode + if (!error && + ACE::clr_flags (result.connect_handle (), ACE_NONBLOCK) != 0) + { + error = 1; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Set blocking mode"))); + } + + // Parse the addresses. + ACE_INET_Addr local_address; + ACE_INET_Addr remote_address; + if (!error && + (this->validate_new_connection_ || this->pass_addresses_)) + this->parse_address (result, + remote_address, + local_address); + + // Validate remote address + if (!error && + this->validate_new_connection_ && + this->validate_new_connection (remote_address) == -1) + { + error = 1; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Address validation failed"))); + } + + HANDLER *new_handler = 0; + if (!error) + { + // The Template method + new_handler = this->make_handler (); + if (new_handler == 0) + { + error = 1; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Connector::handle_connect : Making of new handler failed"))); + } + } + + // If no errors + if (!error) + { + // Update the Proactor. + new_handler->proactor (this->proactor ()); + + // Pass the addresses + if (this->pass_addresses_) + new_handler->addresses (remote_address, + local_address); + + // Pass the ACT + if (result.act () != 0) + new_handler->act (result.act ()); + + // Set up the handler's new handle value + new_handler->handle (result.connect_handle ()); + + ACE_Message_Block mb; + + // Initiate the handler with empty message block; + new_handler->open (result.connect_handle (), mb); + } + + // On failure, no choice but to close the socket + if (error && + result.connect_handle() != ACE_INVALID_HANDLE) + ACE_OS::closesocket (result.connect_handle ()); +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::validate_new_connection (const ACE_INET_Addr &remote_address) +{ + ACE_UNUSED_ARG (remote_address); + + // Default implemenation always validates the remote address. + return 0; +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::cancel (void) +{ + return this->asynch_connect_.cancel (); +} + +template <class HANDLER> void +ACE_Asynch_Connector<HANDLER>::parse_address (const ACE_Asynch_Connect::Result &result, + ACE_INET_Addr &remote_address, + ACE_INET_Addr &local_address) +{ + // Getting the addresses. + sockaddr_in local_addr; + sockaddr_in remote_addr; + + // Get the length. + int local_size = sizeof (local_addr); + int remote_size = sizeof (remote_addr); + + // Get the local address. + if (ACE_OS::getsockname (result.connect_handle (), + ACE_reinterpret_cast (sockaddr *, + &local_addr), + &local_size) < 0) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("%p\n"), + ACE_LIB_TEXT("ACE_Asynch_Connector::<getsockname> failed"))); + + // Get the remote address. + if (ACE_OS::getpeername (result.connect_handle (), + ACE_reinterpret_cast (sockaddr *, + &remote_addr), + &remote_size) < 0) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("%p\n"), + ACE_LIB_TEXT("ACE_Asynch_Connector::<getpeername> failed"))); + + // Set the addresses. + local_address.set (&local_addr, local_size); + remote_address.set (&remote_addr, remote_size); + +#if 0 + // @@ Just debugging. + char local_address_buf [BUFSIZ]; + char remote_address_buf [BUFSIZ]; + + if (local_address.addr_to_string (local_address_buf, + sizeof local_address_buf) == -1) + ACE_ERROR ((LM_ERROR, + "Error:%p:can't obtain local_address's address string")); + + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Connector<HANDLER>::parse_address : "\ + "Local address %s\n", + local_address_buf)); + + if (remote_address.addr_to_string (remote_address_buf, + sizeof remote_address_buf) == -1) + ACE_ERROR ((LM_ERROR, + "Error:%p:can't obtain remote_address's address string")); + + ACE_DEBUG ((LM_DEBUG, + "ACE_Asynch_Connector<HANDLER>::parse_address : "\ + "Remote address %s\n", + remote_address_buf)); +#endif /* 0 */ + + return; +} + + +template <class HANDLER> ACE_Asynch_Connect & +ACE_Asynch_Connector<HANDLER>::asynch_connect (void) +{ + return this->asynch_connect_; +} + +template <class HANDLER> HANDLER * +ACE_Asynch_Connector<HANDLER>::make_handler (void) +{ + // Default behavior + HANDLER *handler = 0; + ACE_NEW_RETURN (handler, HANDLER, 0); + return handler; +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::pass_addresses (void) const +{ + return this->pass_addresses_; +} + +template <class HANDLER> void +ACE_Asynch_Connector<HANDLER>::pass_addresses (int new_value) +{ + this->pass_addresses_ = new_value; +} + +template <class HANDLER> int +ACE_Asynch_Connector<HANDLER>::validate_new_connection (void) const +{ + return this->validate_new_connection_; +} + +template <class HANDLER> void +ACE_Asynch_Connector<HANDLER>::validate_new_connection (int new_value) +{ + this->validate_new_connection_ = new_value; +} + +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ +#endif /* ACE_ASYNCH_CONNECTOR_C */ diff --git a/ace/Asynch_Connector.h b/ace/Asynch_Connector.h new file mode 100644 index 00000000000..e652f2ee7f6 --- /dev/null +++ b/ace/Asynch_Connector.h @@ -0,0 +1,145 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file Asynch_Connector.h + * + * $Id$ + * + * @author Alexander Libman <alibman@ihug.com.au> + */ +//============================================================================= + +#ifndef ACE_ASYNCH_CONNECTOR_H +#define ACE_ASYNCH_CONNECTOR_H +#include "ace/pre.h" + +#include "ace/OS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#if defined (ACE_WIN32) || defined (ACE_HAS_AIO_CALLS) +// This only works on platforms that support async i/o. + +#include "ace/Asynch_IO.h" +#include "ace/INET_Addr.h" + +// Forward declarations +class ACE_Message_Block; + +/** + * @class ACE_Asynch_Connector + * + * @brief This class is an example of the Connector pattern. This class + * will establish new connections and create new HANDLER objects to handle + * the new connections. + * + * Unlike the ACE_Connector, however, this class is designed to + * be used asynchronously with the ACE Proactor framework. + */ + +template <class HANDLER> +class ACE_Asynch_Connector : public ACE_Handler +{ +public: + + static const ACE_INET_Addr inet_addr_any_; + + /// A do nothing constructor. + ACE_Asynch_Connector (void); + + /// Virtual destruction + virtual ~ACE_Asynch_Connector (void); + + /** + * This opens asynch connector + */ + virtual int open (int pass_addresses = 0, + ACE_Proactor *proactor = 0, + int validate_new_connection = 0); + + /// This initiates a new asynchronous connect + virtual int connect (const ACE_INET_Addr & remote_sap, + const ACE_INET_Addr & local_sap = inet_addr_any_, + int reuse_addr = 1, + const void *act = 0); + + /** + * This cancels all pending accepts operations that were issued by + * the calling thread. + * + * @note On Windows, this method does not cancel connect operations + * issued by other threads. + * + * @note On POSIX, delegates cancelation to ACE_POSIX_Asynch_Connect. + */ + virtual int cancel (void); + + /** + * Template method for address validation. + * + * Default implemenation always validates the remote address. + */ + virtual int validate_new_connection (const ACE_INET_Addr &remote_address); + + // + // These are low level tweaking methods + // + + /// Set and get flag that indicates if parsing and passing of + /// addresses to the service_handler is necessary. + virtual int pass_addresses (void) const; + virtual void pass_addresses (int new_value); + + /// Set and get flag that indicates if address validation is + /// required. + virtual int validate_new_connection (void) const; + virtual void validate_new_connection (int new_value); + +protected: + + /// This is called when an outstanding accept completes. + virtual void handle_connect (const ACE_Asynch_Connect::Result &result); + + + /// This parses the address from read buffer. + void parse_address (const ACE_Asynch_Connect::Result &result, + ACE_INET_Addr &remote_address, + ACE_INET_Addr &local_address); + + /// Return the asynch Connect object. + ACE_Asynch_Connect & asynch_connect (void); + + /** + * This is the template method used to create new handler. + * Subclasses must overwrite this method if a new handler creation + * strategy is required. + */ + virtual HANDLER *make_handler (void); + +private: + + /// Asynch_Connect used to make life easier :-) + ACE_Asynch_Connect asynch_connect_; + + /// Flag that indicates if parsing of addresses is necessary. + int pass_addresses_; + + /// Flag that indicates if address validation is required. + int validate_new_connection_; + +}; + +#if defined (ACE_TEMPLATES_REQUIRE_SOURCE) +#include "ace/Asynch_Connector.cpp" +#endif /* ACE_TEMPLATES_REQUIRE_SOURCE */ + +#if defined (ACE_TEMPLATES_REQUIRE_PRAGMA) +#pragma implementation ("Asynch_Connector.cpp") +#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ + +#endif /* ACE_WIN32 || ACE_HAS_AIO_CALLS */ +#include "ace/post.h" +#endif /* ACE_ASYNCH_CONNECTOR_H */ diff --git a/ace/Asynch_IO.cpp b/ace/Asynch_IO.cpp index 4fa2b269a90..06e5a952f5b 100644 --- a/ace/Asynch_IO.cpp +++ b/ace/Asynch_IO.cpp @@ -661,6 +661,106 @@ ACE_Asynch_Accept::Result::implementation (void) const return this->implementation_; } + + +// ********************************************************************* + +ACE_Asynch_Connect::ACE_Asynch_Connect (void) + : implementation_ (0) +{ +} + +ACE_Asynch_Connect::~ACE_Asynch_Connect (void) +{ +} + +int +ACE_Asynch_Connect::open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor) +{ + // Get a proactor for/from the user. + proactor = this->get_proactor (proactor, handler); + + // Delete the old implementation. + delete this->implementation_; + this->implementation_ = 0; + + // Now let us get the implementation initialized. + ACE_Asynch_Connect_Impl *implementation = proactor->create_asynch_connect (); + if (implementation == 0) + return -1; + + // Set the implementation class + this->implementation (implementation); + + // Call the <open> method of the base class. + return ACE_Asynch_Operation::open (handler, + handle, + completion_key, + proactor); +} + +int +ACE_Asynch_Connect::connect (ACE_HANDLE connect_handle, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number) +{ + return this->implementation ()->connect (connect_handle, + remote_sap, + local_sap, + reuse_addr, + act, + priority, + signal_number); +} + +ACE_Asynch_Connect_Impl * +ACE_Asynch_Connect::implementation (void) const +{ + return this->implementation_; +} + +void +ACE_Asynch_Connect::implementation (ACE_Asynch_Connect_Impl *implementation) +{ + this->implementation_ = implementation; + // Set the implementation in the base class. + ACE_Asynch_Operation::implementation (implementation); +} + +// ************************************************************ + +ACE_Asynch_Connect::Result::Result (ACE_Asynch_Connect_Result_Impl *implementation) + : ACE_Asynch_Result (implementation), + implementation_ (implementation) +{ +} + +ACE_Asynch_Connect::Result::~Result (void) +{ + // Proactor will delete the implementation when the <complete> call + // completes. +} + +ACE_HANDLE +ACE_Asynch_Connect::Result::connect_handle (void) const +{ + return this->implementation ()->connect_handle (); +} + + +ACE_Asynch_Connect_Result_Impl * +ACE_Asynch_Connect::Result::implementation (void) const +{ + return this->implementation_; +} + // ************************************************************ ACE_Asynch_Transmit_File::ACE_Asynch_Transmit_File (void) @@ -950,6 +1050,11 @@ ACE_Handler::handle_accept (const ACE_Asynch_Accept::Result & /* result */) } void +ACE_Handler::handle_connect (const ACE_Asynch_Connect::Result & /* result */) +{ +} + +void ACE_Handler::handle_transmit_file (const ACE_Asynch_Transmit_File::Result & /* result */) { } diff --git a/ace/Asynch_IO.h b/ace/Asynch_IO.h index 96a6f99dc2b..6976955fc84 100644 --- a/ace/Asynch_IO.h +++ b/ace/Asynch_IO.h @@ -19,6 +19,7 @@ * @author Tim Harrison <harrison@cs.wustl.edu> * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> * @author Roger Tragin <r.tragin@computer.org> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -41,6 +42,7 @@ class ACE_Proactor; class ACE_Handler; class ACE_Message_Block; class ACE_INET_Addr; +class ACE_Addr; // Forward declarations class ACE_Asynch_Result_Impl; @@ -822,6 +824,102 @@ public: ACE_Asynch_Accept_Result_Impl *implementation_; }; }; +// Forward declarations +class ACE_Asynch_Connect_Result_Impl; +class ACE_Asynch_Connect_Impl; + +/** + * @class ACE_Asynch_Connect + * + * @brief This class is a factory for starting off asynchronous connects + * This class forwards all methods to its implementation class. + * + * Once @c open is called, multiple asynchronous connect operationss can + * started using this class. A ACE_Asynch_Connect::Result will + * be passed back to the associated ACE_Handler when the asynchronous connect + * completes through the ACE_Handler::handle_connect() callback. + */ +class ACE_Export ACE_Asynch_Connect : public ACE_Asynch_Operation +{ + +public: + /// A do nothing constructor. + ACE_Asynch_Connect (void); + + /// Destructor. + virtual ~ACE_Asynch_Connect (void); + + /** + * Initializes the factory with information which will be used with + * each asynchronous call. + * + * @note @arg handle is ignored and should be @c ACE_INVALID_HANDLE. + */ + int open (ACE_Handler &handler, + ACE_HANDLE handle = ACE_INVALID_HANDLE, + const void *completion_key = 0, + ACE_Proactor *proactor = 0); + + /** + * This starts off an asynchronous Connect. + */ + int connect (ACE_HANDLE connect_handle, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr, + const void *act=0, + int priority = 0, + int signal_number = ACE_SIGRTMIN); + + /// Return the underlying implementation class. + ACE_Asynch_Connect_Impl *implementation (void) const; + +protected: + /// Set the implementation class. + void implementation (ACE_Asynch_Connect_Impl *implementation); + + /// Delegation/implementation class that all methods will be + /// forwarded to. + ACE_Asynch_Connect_Impl *implementation_; + +public: +/** + * @class Result + * + * @brief This is that class which will be passed back to the + * handler when the asynchronous connect completes. + * + * This class has all the information necessary for the + * handler to uniquely identify the completion of the + * asynchronous connect. + */ + class ACE_Export Result : public ACE_Asynch_Result + { + + /// The concrete implementation result classes only construct this + /// class. + friend class ACE_POSIX_Asynch_Connect_Result; + friend class ACE_WIN32_Asynch_Connect_Result; + + public: + + /// I/O handle for the connection. + ACE_HANDLE connect_handle (void) const; + + /// Get the implementation. + ACE_Asynch_Connect_Result_Impl *implementation (void) const; + + protected: + /// Contructor. Implementation will not be deleted. + Result (ACE_Asynch_Connect_Result_Impl *implementation); + + /// Destructor. + virtual ~Result (void); + + /// Impelmentation class. + ACE_Asynch_Connect_Result_Impl *implementation_; + }; +}; // Forward declarations class ACE_Asynch_Transmit_File_Result_Impl; @@ -1345,6 +1443,9 @@ public: /// This method will be called when an asynchronous accept completes. virtual void handle_accept (const ACE_Asynch_Accept::Result &result); + /// This method will be called when an asynchronous connect completes. + virtual void handle_connect (const ACE_Asynch_Connect::Result &result); + /// This method will be called when an asynchronous transmit file /// completes. virtual void handle_transmit_file (const ACE_Asynch_Transmit_File::Result &result); diff --git a/ace/Asynch_IO_Impl.cpp b/ace/Asynch_IO_Impl.cpp index e3796fead53..8f26d82ae4b 100644 --- a/ace/Asynch_IO_Impl.cpp +++ b/ace/Asynch_IO_Impl.cpp @@ -55,10 +55,18 @@ ACE_Asynch_Accept_Result_Impl::~ACE_Asynch_Accept_Result_Impl (void) { } +ACE_Asynch_Connect_Result_Impl::~ACE_Asynch_Connect_Result_Impl (void) +{ +} + ACE_Asynch_Accept_Impl::~ACE_Asynch_Accept_Impl (void) { } +ACE_Asynch_Connect_Impl::~ACE_Asynch_Connect_Impl (void) +{ +} + ACE_Asynch_Transmit_File_Impl::~ACE_Asynch_Transmit_File_Impl (void) { } diff --git a/ace/Asynch_IO_Impl.h b/ace/Asynch_IO_Impl.h index b49a827d8ba..8fcee5b7668 100644 --- a/ace/Asynch_IO_Impl.h +++ b/ace/Asynch_IO_Impl.h @@ -16,6 +16,7 @@ * @author Tim Harrison (harrison@cs.wustl.edu) * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> * @author Roger Tragin <r.tragin@computer.org> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -439,6 +440,58 @@ protected: ACE_Asynch_Accept_Result_Impl (void); }; + +/** + * @class ACE_Asynch_Connect_Impl + * + * @brief Abstract base class for all the concrete implementation + * classes that provide different implementations for the + * ACE_Asynch_Connect. + * + */ +class ACE_Export ACE_Asynch_Connect_Impl : public virtual ACE_Asynch_Operation_Impl +{ +public: + virtual ~ACE_Asynch_Connect_Impl (void); + + /** + * This starts off an asynchronous connect + */ + virtual int connect (ACE_HANDLE connect_handle, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number) = 0; + +protected: + /// Do-nothing constructor. + ACE_Asynch_Connect_Impl (void); +}; + +/** + * @class ACE_Asynch_Connect_Result_Impl + * + * @brief Abstract base class for all the concrete implementation + * classes that provide different implementations for the + * ACE_Asynch_Connect. + * + */ +class ACE_Export ACE_Asynch_Connect_Result_Impl : public virtual ACE_Asynch_Result_Impl +{ +public: + virtual ~ACE_Asynch_Connect_Result_Impl (void); + + /// I/O handle for the connection. + virtual ACE_HANDLE connect_handle (void) const = 0; + +protected: + /// Do-nothing constructor. + ACE_Asynch_Connect_Result_Impl (void); +}; + + /** * @class ACE_Asynch_Transmit_File_Impl * diff --git a/ace/Asynch_IO_Impl.i b/ace/Asynch_IO_Impl.i index a6c36925943..3d61fae7942 100644 --- a/ace/Asynch_IO_Impl.i +++ b/ace/Asynch_IO_Impl.i @@ -75,6 +75,19 @@ ACE_Asynch_Accept_Result_Impl::ACE_Asynch_Accept_Result_Impl (void) } ACE_INLINE +ACE_Asynch_Connect_Impl::ACE_Asynch_Connect_Impl (void) + : ACE_Asynch_Operation_Impl () +{ +} + +ACE_INLINE +ACE_Asynch_Connect_Result_Impl::ACE_Asynch_Connect_Result_Impl (void) + : ACE_Asynch_Result_Impl () +{ +} + + +ACE_INLINE ACE_Asynch_Transmit_File_Impl::ACE_Asynch_Transmit_File_Impl (void) : ACE_Asynch_Operation_Impl () { diff --git a/ace/Asynch_Pseudo_Task.cpp b/ace/Asynch_Pseudo_Task.cpp new file mode 100644 index 00000000000..b00450da517 --- /dev/null +++ b/ace/Asynch_Pseudo_Task.cpp @@ -0,0 +1,313 @@ +// $Id$ + +#include "ace/Asynch_Pseudo_Task.h" + +ACE_RCSID(ace, Asynch_Pseudo_Task, "$Id$") + +ACE_Asynch_Pseudo_Task::ACE_Asynch_Pseudo_Task() + : flg_active_ (0), + select_reactor_ (), // should be initialized before reactor_ + reactor_ (&select_reactor_, 0), // don't delete implementation + token_ (select_reactor_.lock ()), // we can use reactor token + finish_count_ (0) +{ +} + +ACE_Asynch_Pseudo_Task::~ACE_Asynch_Pseudo_Task() +{ + stop(); +} + +int +ACE_Asynch_Pseudo_Task::is_active (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + return flg_active_; +} + +int +ACE_Asynch_Pseudo_Task::start (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start already started")), + -1); + + if (this->reactor_.initialized () == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start reactor is not initialized")), + -1); + + + if (this->activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::start failed")), + -1); + + this->flg_active_ = 1; + return 0; +} + +int +ACE_Asynch_Pseudo_Task::stop (void) +{ + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) // already stopped + return 0; + + reactor_.end_reactor_event_loop (); + } + + int rc = this->wait (); + + if (rc != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:%p\n"), + ACE_LIB_TEXT ("ACE_Asynch_Pseudo_Task::stop failed")), + -1); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + this->flg_active_ = 0; + + if (this->reactor_.initialized ()) + this->reactor_.close (); + + while (finish_count_ > 0) + { + ace_mon.release (); + finish_event_.wait (); + + ace_mon.acquire (); + finish_event_.reset (); + } + } + + return rc; +} + +int +ACE_Asynch_Pseudo_Task::lock_finish (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + finish_count_ ++; + return 0; +} + +int +ACE_Asynch_Pseudo_Task::unlock_finish (void) +{ + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + --finish_count_; + finish_event_.signal (); + + return 0; +} + +int +ACE_Asynch_Pseudo_Task::svc (void) +{ +#if !defined (ACE_WIN32) + + sigset_t RT_signals; + + if (sigemptyset (&RT_signals) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT ("sigemptyset failed"))); + + int member = 0; + + for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) + { + member = sigismember (& RT_signals , si); + if (member == 1) + { + sigaddset (&RT_signals, si); + } + } + + if (ACE_OS::pthread_sigmask (SIG_BLOCK, &RT_signals, 0) != 0) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT ("pthread_sigmask failed"))); +#endif + + reactor_.owner (ACE_Thread::self()); + + reactor_.run_reactor_event_loop (); + + return 0; +} + + + +int +ACE_Asynch_Pseudo_Task::register_io_handler (ACE_HANDLE handle, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int flg_suspend) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + // Register the handler with the reactor. + int retval = this->reactor_.register_handler (handle, handler, mask); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n") + ACE_LIB_TEXT ("register_handler failed \n")), + -1); + + if (flg_suspend == 0 ) + return 0; + + // Suspend the <handle> now. Enable only when the <accept> is issued + // by the application. + retval = this->reactor_.suspend_handler (handle); + + if (retval == -1) + { + this->reactor_.remove_handler (handle, + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::register_io_handler \n") + ACE_LIB_TEXT ("suspend_handler failed \n")), + -1); + } + + return 0; +} + +int +ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_HANDLE handle) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = + this->reactor_.remove_handler (handle , + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n") + ACE_LIB_TEXT ("remove_handler failed \n")), + -1); + + return 0; +} + +int +ACE_Asynch_Pseudo_Task::remove_io_handler (ACE_Handle_Set &set) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = + this->reactor_.remove_handler (set , + ACE_Event_Handler::ALL_EVENTS_MASK + | ACE_Event_Handler::DONT_CALL); + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::remove_io_handler \n") + ACE_LIB_TEXT ("remove_handler failed \n")), + -1); + + return 0; +} + +int +ACE_Asynch_Pseudo_Task::suspend_io_handler (ACE_HANDLE handle) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::suspend_io_handler \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = this->reactor_.suspend_handler (handle); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::suspend_io_handler \n") + ACE_LIB_TEXT ("suspend_handler failed \n")), + -1); + + return 0; +} + +int +ACE_Asynch_Pseudo_Task::resume_io_handler (ACE_HANDLE handle) +{ + // Return codes : + // 0 success + // -1 reactor errors + // -2 task not active + + ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + + if (this->flg_active_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::resume_io_handler \n") + ACE_LIB_TEXT ("task not active \n")), + -2); + + int retval = this->reactor_.resume_handler (handle); + + if (retval == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_Asynch_Pseudo_Task::resume_io_handler \n") + ACE_LIB_TEXT ("resume_handler failed \n")), + -1); + + return 0; +} diff --git a/ace/Asynch_Pseudo_Task.h b/ace/Asynch_Pseudo_Task.h new file mode 100644 index 00000000000..66d1866a664 --- /dev/null +++ b/ace/Asynch_Pseudo_Task.h @@ -0,0 +1,79 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file Asynch_Pseudo_Task.h + * + * $Id$ + * + * @author Alexander Libman <alibman@ihug.com.au> + */ +//============================================================================= + +#ifndef ACE_ASYNCH_PSEUDO_TASK_H +#define ACE_ASYNCH_PSEUDO_TASK_H +#include "ace/pre.h" + +#include "ace/OS.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +#pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Reactor.h" +#include "ace/Select_Reactor.h" +#include "ace/Task.h" + + +/** + * @class ACE_Asynch_Pseudo_Task + * + */ +class ACE_Export ACE_Asynch_Pseudo_Task : public ACE_Task<ACE_SYNCH> +{ + friend class ACE_POSIX_Asynch_Accept; + friend class ACE_POSIX_Asynch_Connect; + friend class ACE_WIN32_Asynch_Connect; + +public: + + ACE_Asynch_Pseudo_Task(); + virtual ~ACE_Asynch_Pseudo_Task(); + + int start (void); + int stop (void); + + virtual int svc (void); + + int is_active (void); + + int register_io_handler (ACE_HANDLE handle, + ACE_Event_Handler *handler, + ACE_Reactor_Mask mask, + int flg_suspend); + + int remove_io_handler (ACE_HANDLE handle); + int remove_io_handler (ACE_Handle_Set &set); + int resume_io_handler (ACE_HANDLE handle); + int suspend_io_handler (ACE_HANDLE handle); + +protected: + + int lock_finish (void); + int unlock_finish (void); + + int flg_active_; + + ACE_Select_Reactor select_reactor_; + // should be initialized before reactor_ + + ACE_Reactor reactor_; + + ACE_Lock &token_; + + int finish_count_; + ACE_Manual_Event finish_event_; +}; + +#include "ace/post.h" +#endif /* ACE_ASYNCH_PSEUDO_TASK_H */ diff --git a/ace/Makefile b/ace/Makefile index c3bf0d7fd43..9ebf7fce847 100644 --- a/ace/Makefile +++ b/ace/Makefile @@ -109,6 +109,7 @@ DEMUX_FILES = \ CONNECTION_FILES = \ Asynch_IO \ Asynch_IO_Impl \ + Asynch_Pseudo_Task \ POSIX_Asynch_IO \ WIN32_Asynch_IO SOCKETS_FILES = \ @@ -252,6 +253,7 @@ TEMPLATE_FILES = \ Unbounded_Set \ Unbounded_Queue \ Asynch_Acceptor \ + Asynch_Connector \ Auto_IncDec_T \ Auto_Ptr \ Based_Pointer_T \ diff --git a/ace/POSIX_Asynch_IO.cpp b/ace/POSIX_Asynch_IO.cpp index dad2e902dd6..4195d339830 100644 --- a/ace/POSIX_Asynch_IO.cpp +++ b/ace/POSIX_Asynch_IO.cpp @@ -8,7 +8,7 @@ #include "ace/Proactor.h" #include "ace/Message_Block.h" #include "ace/INET_Addr.h" -#include "ace/Task_T.h" +#include "ace/Asynch_Pseudo_Task.h" #include "ace/POSIX_Proactor.h" #if !defined (__ACE_INLINE__) @@ -1120,6 +1120,7 @@ ACE_POSIX_Asynch_Write_File::proactor (void) const // ********************************************************************* + u_long ACE_POSIX_Asynch_Accept_Result::bytes_to_read (void) const { @@ -1273,6 +1274,7 @@ ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_AIOCB_Proactor * pos ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept (void) { this->close (); + this->reactor(0); // to avoid purge_pending_notifications } ACE_Proactor * @@ -1302,19 +1304,19 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, { ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::open\n")); - int result = 0; + int result=0; ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - // If we are already opened, we could not create a new handler - // without closing the previous. + // if we are already opened, + // we could not create a new handler without closing the previous if (this->flg_open_ != 0) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:") ACE_LIB_TEXT("acceptor already open \n")), -1); - + result = ACE_POSIX_Asynch_Operation::open (handler, handle, completion_key, @@ -1326,24 +1328,24 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, task_lock_count_++; - // At this moment asynch_accept_task does not know about us, so we - // can lock task's token with our lock_ locked. In all other cases - // we should release our lock_ before calling task's methods to - // avoid deadlock - - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); - - result = task.register_acceptor (this, ACE_Event_Handler::ACCEPT_MASK); + // At this moment asynch_accept_task does not know about us, + // so we can lock task's token with our lock_ locked. + // In all other cases we should release our lock_ before + // calling task's methods to avoid deadlock + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor()->get_asynch_pseudo_task(); + + result = task.register_io_handler (this->get_handle(), + this, + ACE_Event_Handler::ACCEPT_MASK, + 1); // suspend after register task_lock_count_-- ; if (result < 0) { - this->flg_open_= 0; this->handle_ = ACE_INVALID_HANDLE; - return -1 ; } @@ -1352,22 +1354,22 @@ ACE_POSIX_Asynch_Accept::open (ACE_Handler &handler, int ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, - u_long bytes_to_read, - ACE_HANDLE accept_handle, - const void *act, - int priority, - int signal_number) + u_long bytes_to_read, + ACE_HANDLE accept_handle, + const void *act, + int priority, + int signal_number) { - ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n") ); + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::accept\n")); { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - if (this->flg_open_ == 0 ) + if (this->flg_open_ == 0) ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept") - ACE_LIB_TEXT("acceptor was not opened before\n")), - -1); + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept") + ACE_LIB_TEXT("acceptor was not opened before\n")), + -1); // Sanity check: make sure that enough space has been allocated by // the caller. @@ -1380,7 +1382,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, if (available_space < space_needed) ACE_ERROR_RETURN ((LM_ERROR, ACE_LIB_TEXT ("Buffer too small\n")), - -1); + -1); // Common code for both WIN and POSIX. // Create future Asynch_Accept_Result @@ -1400,14 +1402,14 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, // Enqueue result if (this->result_queue_.enqueue_tail (result) == -1) { - delete result; // to avoid memory leak + delete result; // to avoid memory leak - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:") - ACE_LIB_TEXT("enqueue accept call failed\n")), - -1); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept:") + ACE_LIB_TEXT("enqueue accept call failed\n")), + -1); } - + if (this->result_queue_.size () > 1) return 0; @@ -1417,10 +1419,10 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, // If this is the only item, then it means there the set was empty // before. So enable the <handle> in the reactor. - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.resume_acceptor (this); + int rc_task = task.resume_io_handler (this->get_handle()); { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); @@ -1428,8 +1430,7 @@ ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block, task_lock_count_ --; if (rc_task == -2 && task_lock_count_ == 0) // task is closing - task.unlock_finish (); - + task.unlock_finish (); } if (rc_task < 0) @@ -1458,31 +1459,31 @@ ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify) int retval = 0; for (; ; retval++) - { - ACE_POSIX_Asynch_Accept_Result* result = 0; - - this->result_queue_.dequeue_head (result); + { + ACE_POSIX_Asynch_Accept_Result* result = 0; - if (result == 0) - break; + this->result_queue_.dequeue_head (result); - if (this->flg_open_==0 || flg_notify == 0) //if we should not notify - delete result ; // we have to delete result - else //else notify as any cancelled AIO - { - // Store the new handle. - result->aio_fildes = ACE_INVALID_HANDLE ; - result->set_bytes_transferred (0); - result->set_error (ECANCELED); + if (result == 0) + break; - if (this->posix_proactor()->post_completion (result) == -1) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT("Error:(%P | %t):%p\n"), - ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::") - ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed") - )); + if (this->flg_open_ == 0 || flg_notify == 0) //if we should not notify + delete result ; // we have to delete result + else //else notify as any cancelled AIO + { + // Store the new handle. + result->aio_fildes = ACE_INVALID_HANDLE ; + result->set_bytes_transferred (0); + result->set_error (ECANCELED); + + if (this->posix_proactor ()->post_completion (result) == -1) + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::") + ACE_LIB_TEXT("cancel_uncompleted:<post_completion> failed") + )); + } } - } return retval; } @@ -1497,7 +1498,7 @@ ACE_POSIX_Asynch_Accept::cancel (void) //return ACE_POSIX_Asynch_Operation::cancel (); //We delegate real cancelation to cancel_uncompleted (1) - int rc = -1 ; // ERRORS + int rc = -1 ; // ERRORS { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); @@ -1511,14 +1512,14 @@ ACE_POSIX_Asynch_Accept::cancel (void) if (this->flg_open_ == 0) return rc ; - - task_lock_count_ ++; + + task_lock_count_++; } - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); - - int rc_task = task.suspend_acceptor (this); + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); + + int rc_task = task.suspend_io_handler (this->get_handle()); { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); @@ -1527,7 +1528,6 @@ ACE_POSIX_Asynch_Accept::cancel (void) if (rc_task == -2 && task_lock_count_ == 0) // task is closing task.unlock_finish (); - } return rc; @@ -1539,7 +1539,7 @@ ACE_POSIX_Asynch_Accept::close () ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::close\n")); // 1. It performs cancellation of all pending requests - // 2. Removes itself from Reactor (ACE_POSIX_Asynch_Accept_Task) + // 2. Removes itself from Reactor ( ACE_Asynch_Pseudo_Task) // 3. close the socket // // Parameter flg_notify can be @@ -1551,7 +1551,6 @@ ACE_POSIX_Asynch_Accept::close () // Return codes : 0 - OK , // -1 - Errors - { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); @@ -1562,7 +1561,7 @@ ACE_POSIX_Asynch_Accept::close () if (this->handle_ != ACE_INVALID_HANDLE) { ACE_OS::closesocket (this->handle_); - this->handle_=ACE_INVALID_HANDLE; + this->handle_ = ACE_INVALID_HANDLE; } return 0; } @@ -1570,10 +1569,10 @@ ACE_POSIX_Asynch_Accept::close () task_lock_count_++; } - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); - int rc_task = task.remove_acceptor (this); + int rc_task = task.remove_io_handler (this->get_handle ()); { ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); @@ -1581,15 +1580,15 @@ ACE_POSIX_Asynch_Accept::close () task_lock_count_--; if (rc_task == -2 && task_lock_count_ == 0) // task is closing - task.unlock_finish (); - + task.unlock_finish (); + if (this->handle_ != ACE_INVALID_HANDLE) { ACE_OS::closesocket (this->handle_); - this->handle_=ACE_INVALID_HANDLE; + this->handle_ = ACE_INVALID_HANDLE; } - this->flg_open_=0; + this->flg_open_ = 0; } return 0; @@ -1600,26 +1599,26 @@ ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close { ACE_UNUSED_ARG (handle); ACE_UNUSED_ARG (close_mask); - - ACE_TRACE(ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n")); + + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_close\n")); ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); // handle_close is called only in one case : - // when Asynch_accept_task is closing (i.e. proactor destructor) + // when Asynch_accept_task is closing ( i.e. proactor destructor ) + + // In all other cases we deregister ourself + // with ACE_Event_Handler::DONT_CALL mask - // In all other cases we deregister ourself with - // ACE_Event_Handler::DONT_CALL mask - this->cancel_uncompleted (0); this->flg_open_ = 0; // it means other thread is waiting for reactor token_ - if (task_lock_count_ > 0) + if (task_lock_count_ > 0) { - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); task.lock_finish (); } @@ -1645,8 +1644,7 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input:") - ACE_LIB_TEXT(" dequeueing failed") - )); + ACE_LIB_TEXT( " dequeueing failed"))); // Disable the <handle> in the reactor if no <accept>'s are pending. @@ -1656,10 +1654,10 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) if (this->result_queue_.size () == 0) { - ACE_POSIX_Asynch_Accept_Task & task = - this->posix_proactor()->get_asynch_accept_task(); - - task.suspend_acceptor (this); + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); + + task.suspend_io_handler (this->get_handle()); } // Issue <accept> now. @@ -1668,13 +1666,11 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0); - if (result == 0) // there is nobody to notify { ACE_OS::closesocket (new_handle); return 0; } - if (new_handle == ACE_INVALID_HANDLE) { @@ -1682,8 +1678,7 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT("%N:%l:(%P | %t):%p\n"), ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ") - ACE_LIB_TEXT(" <accept> system call failed") - )); + ACE_LIB_TEXT(" <accept> system call failed"))); // Notify client as usual, "AIO" finished with errors } @@ -1693,282 +1688,654 @@ ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */) // Notify the main process about this completion // Send the Result through the notification pipe. - if (this->posix_proactor()->post_completion (result) == -1) + if (this->posix_proactor ()->post_completion (result) == -1) ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT("Error:(%P | %t):%p\n"), ACE_LIB_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ") - ACE_LIB_TEXT(" <post_completion> failed") - )); - + ACE_LIB_TEXT(" <post_completion> failed"))); + return 0; } // ********************************************************************* -ACE_POSIX_Asynch_Accept_Task::ACE_POSIX_Asynch_Accept_Task() - : flg_active_ (0), - select_reactor_(), // should be initialized before reactor_ - reactor_ (& select_reactor_, 0), // don't delete implementation - token_ (select_reactor_.lock()), // we can use reactor token - finish_count_(0) +ACE_HANDLE +ACE_POSIX_Asynch_Connect_Result::connect_handle (void) const { + return this->aio_fildes; } -ACE_POSIX_Asynch_Accept_Task::~ACE_POSIX_Asynch_Accept_Task() +void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle) { - stop(); + this->aio_fildes = handle; } -int -ACE_POSIX_Asynch_Accept_Task::start () + +ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number) + + : ACE_Asynch_Result_Impl (), + ACE_Asynch_Connect_Result_Impl (), + ACE_POSIX_Asynch_Result (handler, act, event, 0, 0, priority, signal_number) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + this->aio_fildes = connect_handle; + this->aio_nbytes = 0; +} - if (this->flg_active_) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start already started")), - -1); +void +ACE_POSIX_Asynch_Connect_Result::complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error) +{ + // Copy the data. + this->bytes_transferred_ = bytes_transferred; + this->success_ = success; + this->completion_key_ = completion_key; + this->error_ = error; - if (this->reactor_.initialized () == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start reactor is not initialized")), - -1); + // Create the interface result class. + ACE_Asynch_Connect::Result result (this); + + // Call the application handler. + this->handler_.handle_connect (result); +} + +ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result (void) +{ +} + +// Base class operations. These operations are here to kill dominance +// warnings. These methods call the base class methods. + +u_long +ACE_POSIX_Asynch_Connect_Result::bytes_transferred (void) const +{ + return ACE_POSIX_Asynch_Result::bytes_transferred (); +} +const void * +ACE_POSIX_Asynch_Connect_Result::act (void) const +{ + return ACE_POSIX_Asynch_Result::act (); +} - if (this->activate (THR_NEW_LWP | THR_JOINABLE, 1) != 0 ) +int +ACE_POSIX_Asynch_Connect_Result::success (void) const +{ + return ACE_POSIX_Asynch_Result::success (); +} + +const void * +ACE_POSIX_Asynch_Connect_Result::completion_key (void) const +{ + return ACE_POSIX_Asynch_Result::completion_key (); +} + +u_long +ACE_POSIX_Asynch_Connect_Result::error (void) const +{ + return ACE_POSIX_Asynch_Result::error (); +} + +ACE_HANDLE +ACE_POSIX_Asynch_Connect_Result::event (void) const +{ + return ACE_POSIX_Asynch_Result::event (); +} + +u_long +ACE_POSIX_Asynch_Connect_Result::offset (void) const +{ + return ACE_POSIX_Asynch_Result::offset (); +} + +u_long +ACE_POSIX_Asynch_Connect_Result::offset_high (void) const +{ + return ACE_POSIX_Asynch_Result::offset_high (); +} + +int +ACE_POSIX_Asynch_Connect_Result::priority (void) const +{ + return ACE_POSIX_Asynch_Result::priority (); +} + +int +ACE_POSIX_Asynch_Connect_Result::signal_number (void) const +{ + return ACE_POSIX_Asynch_Result::signal_number (); +} + +int +ACE_POSIX_Asynch_Connect_Result::post_completion (ACE_Proactor_Impl *proactor) +{ + return ACE_POSIX_Asynch_Result::post_completion (proactor); +} + +// ********************************************************************* + +ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_AIOCB_Proactor * posix_proactor) + : ACE_Asynch_Operation_Impl (), + ACE_Asynch_Connect_Impl (), + ACE_POSIX_Asynch_Operation (posix_proactor), + flg_open_ (0), + task_lock_count_ (0) +{ +} + +ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect (void) +{ + this->close (); + this->reactor(0); // to avoid purge_pending_notifications +} + +ACE_Proactor * +ACE_POSIX_Asynch_Connect::proactor (void) const +{ + return ACE_POSIX_Asynch_Operation::proactor (); +} + +ACE_HANDLE +ACE_POSIX_Asynch_Connect::get_handle (void) const +{ + + ACE_ASSERT (0); + return ACE_INVALID_HANDLE; +} + +void +ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE handle) +{ + ACE_ASSERT (0) ; +} + +int +ACE_POSIX_Asynch_Connect::open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor) +{ + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::open\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + // if we are already opened, + // we could not create a new handler without closing the previous + + if (this->flg_open_ != 0) ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::start failed")), - -1); + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::open:") + ACE_LIB_TEXT("connector already open \n")), + -1); + + //int result = + ACE_POSIX_Asynch_Operation::open (handler, + handle, + completion_key, + proactor); + + // Ignore result as we pass ACE_INVALID_HANDLE + //if (result == -1) + // return result; + + this->flg_open_ = 1; - this->flg_active_ = 1; return 0; } -int -ACE_POSIX_Asynch_Accept_Task::stop () +int +ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number) { + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::connect\n")); + { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - if (this->flg_active_ == 0 ) // already stopped - return 0; + if (this->flg_open_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect") + ACE_LIB_TEXT("connector was not opened before\n")), + -1); - reactor_.end_reactor_event_loop (); - } + // Common code for both WIN and POSIX. + // Create future Asynch_Connect_Result + ACE_POSIX_Asynch_Connect_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_POSIX_Asynch_Connect_Result (*this->handler_, + connect_handle, + act, + this->posix_proactor ()->get_handle (), + priority, + signal_number), + -1); - int rc = this->wait (); + int rc = connect_i (result, + remote_sap, + local_sap, + reuse_addr); - if (rc != 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:%p\n"), - ACE_LIB_TEXT ("ACE_POSIX_Asynch_Accept_Task::stop failed")), - -1); + // update handle + connect_handle = result->connect_handle (); + + if (rc != 0) + return post_result (result, 1); + + // Enqueue result we will wait for completion + + if (this->result_map_.bind (connect_handle, result) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect:") + ACE_LIB_TEXT("result map binding failed\n"))); + + result->set_error (EFAULT); + return post_result (result, 1); + } + + task_lock_count_ ++; + } + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); + int rc_task = task.register_io_handler (connect_handle, + this, + ACE_Event_Handler::CONNECT_MASK, + 0); // not to suspend after register { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - this->flg_active_ = 0; + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_ --; - if (this->reactor_.initialized ()) - this->reactor_.close(); + int post_enable = 1; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + { + post_enable = 0; + task.unlock_finish (); + } - while (finish_count_ > 0) + if (rc_task < 0) { - ace_mon.release (); - finish_event_.wait(); + ACE_POSIX_Asynch_Connect_Result *result = 0; + + this->result_map_.unbind (connect_handle, result); + + if (result != 0) + { + result->set_error (EFAULT); - ace_mon.acquire (); - finish_event_.reset (); + return post_result (result, post_enable); + } } } - return rc; -} - -int -ACE_POSIX_Asynch_Accept_Task::lock_finish () -{ - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); - finish_count_ ++; return 0; } -int -ACE_POSIX_Asynch_Accept_Task::unlock_finish () +int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result, + int post_enable) { - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + if (this->flg_open_ != 0 && post_enable != 0) + { + if (this->posix_proactor ()->post_completion (result) == 0) + return 0 ; - finish_count_ --; + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::post_result: ") + ACE_LIB_TEXT(" <post_completion> failed"))); + } - finish_event_.signal (); + ACE_HANDLE handle = result->connect_handle (); - return 0; + if (handle != ACE_INVALID_HANDLE) + ACE_OS::closesocket (handle); + + delete result; + + return -1; } +//@@ New method connect_i +// return code : +// -1 errors before attempt to connect +// 0 connect started +// 1 connect finished ( may be unsuccessfully) + int -ACE_POSIX_Asynch_Accept_Task::svc () +ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr) { - sigset_t RT_signals; + result->set_bytes_transferred (0); - if (sigemptyset (& RT_signals) == -1) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), - ACE_LIB_TEXT ("sigemptyset failed"))); + ACE_HANDLE handle = result->connect_handle (); + + if (handle == ACE_INVALID_HANDLE) + { + int protocol_family = remote_sap.get_type (); + + handle = ACE_OS::socket (protocol_family, + SOCK_STREAM, + 0); + // save it + result->connect_handle (handle); + + if (handle == ACE_INVALID_HANDLE) + { + result->set_error (errno); - int member = 0; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT(" ACE_OS::socket failed\n")), + -1); + } + + // Reuse the address + int one = 1; + if (protocol_family != PF_UNIX && + reuse_addr != 0 && + ACE_OS::setsockopt (handle, + SOL_SOCKET, + SO_REUSEADDR, + (const char*) &one, + sizeof one) == -1 ) + { + result->set_error (errno); - for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT(" ACE_OS::setsockopt failed\n")), + -1); + } + } + + if (local_sap != ACE_Addr::sap_any) { - member = sigismember (& RT_signals , si); - if (member == 1) + sockaddr * laddr = ACE_reinterpret_cast (sockaddr *, + local_sap.get_addr ()); + size_t size = local_sap.get_size (); + + if (ACE_OS::bind (handle, laddr, size) == -1) { - sigaddset (& RT_signals, si); + result->set_error (errno); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT(" ACE_OS::bind failed\n")), + -1); } } - if (ACE_OS::pthread_sigmask (SIG_BLOCK, & RT_signals, 0) != 0) - ACE_ERROR ((LM_ERROR, - ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), - ACE_LIB_TEXT ("pthread_sigmask failed"))); + // set non blocking mode + if (ACE::set_flags (handle, ACE_NONBLOCK) != 0) + { + result->set_error (errno); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT(" ACE::set_flags failed\n")), + -1); + } + for (;;) + { + int rc = ACE_OS::connect (handle, + ACE_reinterpret_cast (sockaddr *, + remote_sap.get_addr ()), + remote_sap.get_size ()); + if (rc < 0) // failure + { + if (errno == EWOULDBLOCK || errno == EINPROGRESS) + return 0; // connect started - reactor_.owner (ACE_Thread::self()); + if (errno == EINTR) + continue; - reactor_.run_reactor_event_loop (); + result->set_error (errno); + } - return 0; -} + return 1 ; // connect finished + } + ACE_NOTREACHED (return 0); +} +//@@ New method cancel_uncompleted +// It performs cancellation of all pending requests +// +// Parameter flg_notify can be +// 0 - don't send notifications about canceled accepts +// !0 - notify user about canceled accepts +// according POSIX standards we should receive notifications +// on canceled AIO requests +// +// Return value : number of cancelled requests +// + int -ACE_POSIX_Asynch_Accept_Task::register_acceptor (ACE_POSIX_Asynch_Accept * posix_accept, - ACE_Reactor_Mask mask) +ACE_POSIX_Asynch_Connect::cancel_uncompleted (int flg_notify, + ACE_Handle_Set & set) { - // Return codes : - // 0 success - // -1 reactor errors - // -2 task not active + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel_uncompleted\n")); - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + int retval = 0; - if (this->flg_active_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") - ACE_LIB_TEXT ("task not active \n")), - -2); - - // Register the handler with the reactor. - int retval = this->reactor_.register_handler (posix_accept->get_handle(), - posix_accept, - mask); - - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") - ACE_LIB_TEXT ("register_handler failed \n")), - -1); + MAP_ITERATOR iter (result_map_); + MAP_ENTRY * me = 0; + + set.reset (); - // Suspend the <handle> now. Enable only when the <accept> is issued - // by the application. - retval = this->reactor_.suspend_handler (posix_accept->get_handle()); - if (retval == -1) + for (; iter.next (me) != 0; retval++ , iter.advance ()) { - this->reactor_.remove_handler (posix_accept, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::register_acceptor \n") - ACE_LIB_TEXT ("suspend_handler failed \n")), - -1); + ACE_HANDLE handle = me->ext_id_; + ACE_POSIX_Asynch_Connect_Result* result = me->int_id_ ; + + set.set_bit (handle); + + result->set_bytes_transferred (0); + result->set_error (ECANCELED); + this->post_result (result, flg_notify); } - return 0; + result_map_.unbind_all (); + + return retval; } int -ACE_POSIX_Asynch_Accept_Task::remove_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) +ACE_POSIX_Asynch_Connect::cancel (void) { - // Return codes : - // 0 success - // -1 reactor errors - // -2 task not active + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::cancel\n")); - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + //We are not really ACE_POSIX_Asynch_Operation + //so we could not call ::aiocancel () + // or just write + //return ACE_POSIX_Asynch_Operation::cancel (); + //We delegate real cancelation to cancel_uncompleted (1) - if (this->flg_active_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n") - ACE_LIB_TEXT ("task not active \n")), - -2); - - int retval = this->reactor_.remove_handler (posix_accept, - ACE_Event_Handler::ALL_EVENTS_MASK - | ACE_Event_Handler::DONT_CALL); - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::remove_acceptor \n") - ACE_LIB_TEXT ("remove_handler failed \n")), - -1); + int rc = -1 ; // ERRORS - return 0; + ACE_Handle_Set set; + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + int num_cancelled = cancel_uncompleted (flg_open_, set); + + if (num_cancelled == 0) + rc = 1 ; // AIO_ALLDONE + else if (num_cancelled > 0) + rc = 0 ; // AIO_CANCELED + + if (this->flg_open_ == 0) + return rc ; + + this->task_lock_count_++; + } + + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); + + int rc_task = task.remove_io_handler (set); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + } + + return rc; } int -ACE_POSIX_Asynch_Accept_Task::suspend_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) +ACE_POSIX_Asynch_Connect::close (void) { - // Return codes : - // 0 success - // -1 reactor errors - // -2 task not active + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::close\n")); - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); + ACE_Handle_Set set ; - if (this->flg_active_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n") - ACE_LIB_TEXT ("task not active \n")), - -2); - - int retval = this->reactor_.suspend_handler (posix_accept); + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::suspend_acceptor \n") - ACE_LIB_TEXT ("suspend_handler failed \n")), - -1); + int num_cancelled = cancel_uncompleted (flg_open_, set); + + if (num_cancelled == 0 || this->flg_open_ == 0) + { + this->flg_open_ = 0; + return 0; + } + + this->task_lock_count_++; + } + + ACE_Asynch_Pseudo_Task & task = + this->posix_proactor ()->get_asynch_pseudo_task (); + + int rc_task = task.remove_io_handler (set); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + + this->flg_open_ = 0; + } return 0; } int -ACE_POSIX_Asynch_Accept_Task::resume_acceptor (ACE_POSIX_Asynch_Accept * posix_accept) +ACE_POSIX_Asynch_Connect::handle_exception (ACE_HANDLE fd) { - // Return codes : - // 0 success - // -1 reactor errors - // -2 task not active + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_exception\n")); + return handle_input (fd); +} - ACE_MT (ACE_GUARD_RETURN (ACE_Lock, ace_mon, this->token_, -1)); +int +ACE_POSIX_Asynch_Connect::handle_input (ACE_HANDLE fd) +{ + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_input\n")); - if (this->flg_active_ == 0) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n") - ACE_LIB_TEXT ("task not active \n")), - -2); - - int retval = this->reactor_.resume_handler (posix_accept); + return handle_input (fd); +} - if (retval == -1) - ACE_ERROR_RETURN ((LM_ERROR, - ACE_LIB_TEXT ("%N:%l:ACE_POSIX_Asynch_Accept_Task::resume_acceptor \n") - ACE_LIB_TEXT ("resume_handler failed \n")), - -1); +int +ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd) +{ + ACE_TRACE (ACE_LIB_TEXT("ACE_POSIX_Asynch_Connect::handle_output\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + ACE_POSIX_Asynch_Connect_Result* result = 0; + + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + + int sockerror = 0 ; + int lsockerror = sizeof sockerror; + + ACE_OS::getsockopt (fd, + SOL_SOCKET, + SO_ERROR, + (char*) &sockerror, + &lsockerror); + + result->set_bytes_transferred (0); + result->set_error (sockerror); + this->post_result (result, this->flg_open_); + + return -1; + + //ACE_Asynch_Pseudo_Task & task = + // this->posix_proactor()->get_asynch_pseudo_task(); + + //task.remove_io_handler ( fd ); + + //return 0; +} + + +int +ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_POSIX_Asynch_Connect::handle_close\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + ACE_Asynch_Pseudo_Task &task = + this->posix_proactor ()->get_asynch_pseudo_task (); + + if (task.is_active() == 0) // task is closing + { + if (this->flg_open_ !=0) // we are open + { + this->flg_open_ = 0; + + // it means other thread is waiting for reactor token_ + if (task_lock_count_ > 0) + task.lock_finish (); + } + + ACE_Handle_Set set; + this->cancel_uncompleted (0, set); + + return 0; + } + + // remove_io_handler() contains flag DONT_CALL + // so it is save + task.remove_io_handler (fd); + + ACE_POSIX_Asynch_Connect_Result* result = 0; + + if (this->result_map_.unbind (fd, result) != 0 ) // not found + return -1; + + result->set_bytes_transferred (0); + result->set_error (ECANCELED); + this->post_result (result, this->flg_open_); return 0; } @@ -2302,6 +2669,7 @@ ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_S if (result.success () == 0) { // Failure. + ACE_ERROR ((LM_ERROR, "Asynch_Transmit_File failed.\n")); @@ -3008,7 +3376,6 @@ ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram (ACE_POSIX_AIOCB_Proa { } - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *>; @@ -3019,6 +3386,14 @@ template class ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *>; template class ACE_Node<ACE_POSIX_Asynch_Result *>; template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>; +template class ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *>; +template class ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; + #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result *> @@ -3029,6 +3404,15 @@ template class ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *>; #pragma instantiate ACE_Node<ACE_POSIX_Asynch_Result *> #pragma instantiate ACE_Unbounded_Queue_Iterator<ACE_POSIX_Asynch_Result *> +#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *> +#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Const_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + + #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/POSIX_Asynch_IO.h b/ace/POSIX_Asynch_IO.h index f00dcd79db3..132a4075ca4 100644 --- a/ace/POSIX_Asynch_IO.h +++ b/ace/POSIX_Asynch_IO.h @@ -29,22 +29,22 @@ #if defined (ACE_HAS_AIO_CALLS) #include "ace/OS.h" + #include "ace/Asynch_IO_Impl.h" -#include "ace/Reactor.h" -#include "ace/Select_Reactor.h" #include "ace/Unbounded_Queue.h" -#include "ace/Task.h" +#include "ace/Map_Manager.h" // Forward declarations class ACE_POSIX_SIG_Proactor; class ACE_POSIX_AIOCB_Proactor; class ACE_Proactor_Impl; +class ACE_Handle_Set; /** * @class ACE_POSIX_Asynch_Result * - * This class provides concrete implementation for <ACE_Asynch_Result> - * for POSIX4 platforms. This class extends <aiocb> and makes it more + * This class provides concrete implementation for ACE_Asynch_Result + * for POSIX4 platforms. This class extends @c aiocb and makes it more * useful. */ class ACE_Export ACE_POSIX_Asynch_Result : public virtual ACE_Asynch_Result_Impl, @@ -495,7 +495,7 @@ protected: * @class ACE_POSIX_Asynch_Write_Stream * * @brief This class implements <ACE_Asynch_Write_Stream> for - * all POSIX implementations of Proactor. + * all POSIX implementations of ACE_Proactor. */ class ACE_Export ACE_POSIX_Asynch_Write_Stream : public virtual ACE_Asynch_Write_Stream_Impl, public ACE_POSIX_Asynch_Operation @@ -623,6 +623,8 @@ public: int post_completion (ACE_Proactor_Impl *proactor); protected: + /// Constructor is protected since creation is limited to + /// ACE_Asynch_Read_File factory. ACE_POSIX_Asynch_Read_File_Result (ACE_Handler &handler, ACE_HANDLE handle, ACE_Message_Block &message_block, @@ -633,8 +635,6 @@ protected: ACE_HANDLE event, int priority, int signal_number); - // Constructor is protected since creation is limited to - // ACE_Asynch_Read_File factory. /// ACE_Proactor will call this method when the read completes. virtual void complete (u_long bytes_transferred, @@ -924,7 +924,6 @@ class ACE_Export ACE_POSIX_Asynch_Accept_Result : public virtual ACE_Asynch_Acce { /// Factory classes will have special permissions. friend class ACE_POSIX_Asynch_Accept; - friend class ACE_POSIX_Asynch_Accept_Handler; /// The Proactor constructs the Result class for faking results. friend class ACE_POSIX_Proactor; @@ -1082,31 +1081,30 @@ public: int priority, int signal_number = 0); - /** - * Cancel all pending pseudo-asynchronus requests - * Behavior as usual AIO request - */ + * Cancel all pending pseudo-asynchronus requests + * Behavior as usual AIO request + */ int cancel (void); /** - * Close performs cancellation of all pending requests - * and closure the listen handle - */ - int close (void); + * Close performs cancellation of all pending requests + * and closure the listen handle + */ + int close (); - /// virtual from ACE_Event_Hanlder + /// virtual from ACE_Event_Handler ACE_HANDLE get_handle (void) const; - /// virtual from ACE_Event_Hanlder + /// virtual from ACE_Event_Handler void set_handle (ACE_HANDLE handle); - /// virtual from ACE_Event_Hanlder - /// Called when accept event comes up on <listen_hanlde> + /// virtual from ACE_Event_Handler + /// Called when accept event comes up on <listen_handle> int handle_input (ACE_HANDLE handle); - /// virtual from ACE_Event_Hanlder - int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ; + /// virtual from ACE_Event_Handler + int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask); // = Methods belong to ACE_POSIX_Asynch_Operation base class. These // methods are defined here to avoid dominace warnings. They route @@ -1114,7 +1112,6 @@ public: /// Return the underlying proactor. ACE_Proactor* proactor (void) const; - private: int cancel_uncompleted (int flg_notify); // flg_notify points whether or not we should send notification about @@ -1126,69 +1123,253 @@ private: // on canceled AIO requests int flg_open_ ; - /// 1 - Accept is registered in ACE_POSIX_Asynch_Accept_Task - /// 0 - Aceept is deregisted in ACE_POSIX_Asynch_Accept_Task + /// 1 - Accept is registered in ACE_Asynch_Pseudo_Task + /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task - - /// to prevent ACE_POSIX_Asynch_Accept_Task from deletion - /// while we make a call to the ACE_POSIX_Asynch_Accept_Task + /// to prevent ACE_Asynch_Pseudo_Task from deletion + /// while we make a call to the ACE_Asynch_Pseudo_Task /// This is extra cost !!! /// we could avoid them if all applications will follow the rule: /// Proactor should be deleted only after deletion all - /// AsynchOperation objects connected with it + /// AsynchOperation objects connected with it int task_lock_count_; - /// Queue of Result pointers that correspond to all the <accept>'s - /// pending. + /// Queue of Result pointers that correspond to all the pending + /// accept operations. ACE_Unbounded_Queue<ACE_POSIX_Asynch_Accept_Result*> result_queue_; - /// The lock to protect the result queue which is shared. The queue + /// The lock to protect the result queue which is shared. The queue /// is updated by main thread in the register function call and - /// through the auxillary thread in the deregister fun. So let us + /// through the auxillary thread in the deregister fun. So let us /// mutex it. ACE_SYNCH_MUTEX lock_; }; /** - * @class ACE_POSIX_Asynch_Accept_Task + * @class ACE_POSIX_Asynch_Connect_Result * + * @brief This is that class which will be passed back to the + * completion handler when the asynchronous connect completes. + * + * This class has all the information necessary for a + * completion handler to uniquely identify the completion of the + * asynchronous connect. */ -class ACE_Export ACE_POSIX_Asynch_Accept_Task : public ACE_Task<ACE_MT_SYNCH> +class ACE_Export ACE_POSIX_Asynch_Connect_Result : public virtual ACE_Asynch_Connect_Result_Impl, + public ACE_POSIX_Asynch_Result { - friend class ACE_POSIX_Asynch_Accept; + /// Factory classes will have special permissions. + friend class ACE_POSIX_Asynch_Connect; + + /// The Proactor constructs the Result class for faking results. + friend class ACE_POSIX_Proactor; + public: - ACE_POSIX_Asynch_Accept_Task (void); - virtual ~ACE_POSIX_Asynch_Accept_Task (void); + /// I/O handle for the connection. + ACE_HANDLE connect_handle (void) const; - int start (void); - int stop (void); + // = Base class operations. These operations are here to kill + // dominance warnings. These methods call the base class methods. + + /// Number of bytes transferred by the operation. + u_long bytes_transferred (void) const; - virtual int svc (void); + /// ACT associated with the operation. + const void *act (void) const; - int register_acceptor (ACE_POSIX_Asynch_Accept *posix_accept, - ACE_Reactor_Mask mask); - int remove_acceptor (ACE_POSIX_Asynch_Accept *posix_accept); - int resume_acceptor (ACE_POSIX_Asynch_Accept *posix_accept); - int suspend_acceptor (ACE_POSIX_Asynch_Accept *posix_accept); + /// Did the operation succeed? + int success (void) const; + + /** + * This is the ACT associated with the handle on which the + * Asynch_Operation takes place. + * + * @note This is not implemented for POSIX4 platforms. + */ + const void *completion_key (void) const; + + /// Error value if the operation fail. + u_long error (void) const; + + /// This returns ACE_INVALID_HANDLE on POSIX4 platforms. + ACE_HANDLE event (void) const; + + /** + * This really make sense only when doing file I/O. + * + * @note On POSIX4-Unix, @c offset_high should be supported using + * @c aiocb64. + */ + u_long offset (void) const; + u_long offset_high (void) const; + + /// The priority of the asynchronous operation. + int priority (void) const; + + /** + * POSIX4 realtime signal number to be used for the + * operation. The signal number ranges from @c SIGRTMIN to @c SIGRTMAX. + * By default, SIGRTMIN is used to issue AIO calls. + * + * @note This is a no-op on non-POSIX4 systems and returns 0. + */ + int signal_number (void) const; + + /// Post this object to the Proactor. + int post_completion (ACE_Proactor_Impl *proactor); protected: - int lock_finish (void); - int unlock_finish (void); + /// Constructor is protected since creation is limited to + /// ACE_Asynch_Connect factory. + ACE_POSIX_Asynch_Connect_Result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number); - int flg_active_ ; + /// ACE_Proactor will call this method when the accept completes. + virtual void complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error); - ACE_Select_Reactor select_reactor_; - // should be initialized before reactor_ + /// Destructor. + virtual ~ACE_POSIX_Asynch_Connect_Result (void); - ACE_Reactor reactor_; + // aiocb::aio_filedes + // I/O handle for the new connection. + void connect_handle (ACE_HANDLE handle); +}; - ACE_Lock &token_; - int finish_count_; - ACE_Manual_Event finish_event_; +/** + * @class ACE_POSIX_Asynch_Connect + * + */ +class ACE_Export ACE_POSIX_Asynch_Connect : + public virtual ACE_Asynch_Connect_Impl, + public ACE_POSIX_Asynch_Operation, + public ACE_Event_Handler +{ +public: + + /// Constructor. + ACE_POSIX_Asynch_Connect (ACE_POSIX_AIOCB_Proactor * posix_aiocb_proactor); + + /// Destructor. + virtual ~ACE_POSIX_Asynch_Connect (void); + + /** + * This belongs to ACE_POSIX_Asynch_Operation. We forward + * this call to that method. We have put this here to avoid the + * compiler warnings. + */ + int open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor = 0); + + /** + * This starts off an asynchronous connect. + * + * @arg connect_handle will be used for the connect call. If + * ACE_INVALID_HANDLE is specified, a new + * handle will be created. + */ + int connect (ACE_HANDLE connect_handle, + const ACE_Addr &remote_sap, + const ACE_Addr &local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number = 0); + + /** + * Cancel all pending pseudo-asynchronus requests + * Behavior as usual AIO request + */ + int cancel (void); + + /** + * Close performs cancellation of all pending requests. + */ + int close (void); + + /// virtual from ACE_Event_Handler + ACE_HANDLE get_handle (void) const; + + /// virtual from ACE_Event_Handler + void set_handle (ACE_HANDLE handle); + + /// virtual from ACE_Event_Handler + /// Called when accept event comes up on <listen_hanlde> + int handle_input (ACE_HANDLE handle); + int handle_output (ACE_HANDLE handle); + int handle_exception (ACE_HANDLE handle); + + /// virtual from ACE_Event_Handler + int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ; + + // = Methods belong to ACE_POSIX_Asynch_Operation base class. These + // methods are defined here to avoid dominace warnings. They route + // the call to the ACE_POSIX_Asynch_Operation base class. + + /// Return the underlying proactor. + ACE_Proactor* proactor (void) const; + +private: + int connect_i (ACE_POSIX_Asynch_Connect_Result *result, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr); + + int post_result (ACE_POSIX_Asynch_Connect_Result *result, int flg_post); + + /// Cancel uncompleted connect operations. + /** + * @arg flg_notify Indicates whether or not we should send notification + * about canceled accepts. If this is 0, don't send + * notifications about canceled connects. If 1, notify + * user about canceled connects according POSIX + * standards we should receive notifications on canceled + * AIO requests. + */ + int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set); + + int flg_open_ ; + /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task + /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task + + + /// to prevent ACE_Asynch_Pseudo_Task from deletion + /// while we make a call to the ACE_Asynch_Pseudo_Task + /// This is extra cost !!! + /// we could avoid them if all applications will follow the rule: + /// Proactor should be deleted only after deletion all + /// AsynchOperation objects connected with it + int task_lock_count_; + + typedef ACE_Map_Manager<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + MAP_MANAGER; + typedef ACE_Map_Iterator<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + MAP_ITERATOR; + typedef ACE_Map_Entry<ACE_HANDLE, ACE_POSIX_Asynch_Connect_Result *> + MAP_ENTRY; + + MAP_MANAGER result_map_; + // Map of Result pointers that correspond to all the <accept>'s + // pending. + + /// The lock to protect the result queue which is shared. The queue + /// is updated by main thread in the register function call and + /// through the auxillary thread in the deregister fun. So let us + /// mutex it. + ACE_SYNCH_MUTEX lock_; }; + /** * @class ACE_POSIX_Asynch_Transmit_File_Result * @@ -1207,7 +1388,7 @@ class ACE_Export ACE_POSIX_Asynch_Transmit_File_Result : public virtual ACE_Asyn /// Handlers do all the job. friend class ACE_POSIX_Asynch_Transmit_Handler; - + /// The Proactor constructs the Result class for faking results. friend class ACE_POSIX_Proactor; @@ -1471,7 +1652,7 @@ protected: * @class ACE_POSIX__Asynch_Write_Dgram_Result * * @brief This is class provides concrete implementation for - * ACE_Asynch_Write_Dgram::Result class. + * ACE_Asynch_Write_Dgram::Result class. */ class ACE_Export ACE_POSIX_Asynch_Write_Dgram_Result : public virtual ACE_Asynch_Write_Dgram_Result_Impl, public ACE_POSIX_Asynch_Result @@ -1541,14 +1722,14 @@ protected: /// Constructor is protected since creation is limited to /// ACE_Asynch_Write_Stream factory. ACE_POSIX_Asynch_Write_Dgram_Result (ACE_Handler &handler, - ACE_HANDLE handle, - ACE_Message_Block *message_block, - size_t bytes_to_write, - int flags, - const void* act, - ACE_HANDLE event, - int priority, - int signal_number); + ACE_HANDLE handle, + ACE_Message_Block *message_block, + size_t bytes_to_write, + int flags, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number); /// ACE_Proactor will call this method when the write completes. virtual void complete (u_long bytes_transferred, @@ -1597,7 +1778,7 @@ public: /// Destructor virtual ~ACE_POSIX_Asynch_Write_Dgram (void); - /** This starts off an asynchronous send. Upto + /** This starts off an asynchronous send. Up to * <message_block->total_length()> will be sent. <message_block>'s * <rd_ptr> will be updated to reflect the sent bytes if the send operation * is successful completed. @@ -1659,7 +1840,7 @@ protected: * @class ACE_POSIX_Asynch_Read_Dgram_Result * * @brief This is class provides concrete implementation for - * ACE_Asynch_Read_Dgram::Result class. + * ACE_Asynch_Read_Dgram::Result class. */ class ACE_Export ACE_POSIX_Asynch_Read_Dgram_Result : public virtual ACE_Asynch_Read_Dgram_Result_Impl, public virtual ACE_POSIX_Asynch_Result diff --git a/ace/POSIX_Proactor.cpp b/ace/POSIX_Proactor.cpp index 3004e21e10b..381d7521303 100644 --- a/ace/POSIX_Proactor.cpp +++ b/ace/POSIX_Proactor.cpp @@ -316,6 +316,26 @@ ACE_POSIX_Proactor::create_asynch_accept_result (ACE_Handler &handler, return implementation; } +ACE_Asynch_Connect_Result_Impl * +ACE_POSIX_Proactor::create_asynch_connect_result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number) +{ + ACE_Asynch_Connect_Result_Impl *implementation; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Connect_Result (handler, + connect_handle, + act, + event, + priority, + signal_number), + 0); + return implementation; +} + ACE_Asynch_Transmit_File_Result_Impl * ACE_POSIX_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, @@ -494,7 +514,7 @@ private: ACE_Pipe pipe_; // Pipe for the communication between Proactor and the - // Asynch_Accept. + // Asynch_Accept/Asynch_Connect and other post_completions ACE_POSIX_Asynch_Read_Stream read_stream_; // To do asynch_read on the pipe. @@ -505,7 +525,7 @@ private: ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor) : posix_aiocb_proactor_ (posix_aiocb_proactor), - message_block_ (sizeof (ACE_POSIX_Asynch_Accept_Result *)), + message_block_ (sizeof (2)), read_stream_ (posix_aiocb_proactor) { // Open the pipe. @@ -620,7 +640,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations) // start pseudo-asynchronous accept task // one per all future acceptors - this->accept_task_.start (); + this->get_asynch_pseudo_task().start (); + } // Special protected constructor for ACE_SUN_Proactor @@ -661,7 +682,8 @@ ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations, ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void) { // stop asynch accept task - this->get_asynch_accept_task().stop (); + this->get_asynch_pseudo_task().stop (); + delete_notify_manager (); @@ -745,8 +767,8 @@ void ACE_POSIX_AIOCB_Proactor::check_max_aio_num () void ACE_POSIX_AIOCB_Proactor::create_notify_manager (void) { - // Accept Handler for aio_accept. Remember! this issues a Asynch_Read - // on the notify pipe for doing the Asynch_Accept. + // Remember! this issues a Asynch_Read + // on the notify pipe for doing the Asynch_Accept/Connect. if (aiocb_notify_pipe_manager_ == 0) ACE_NEW (aiocb_notify_pipe_manager_, @@ -939,7 +961,17 @@ ACE_POSIX_AIOCB_Proactor::create_asynch_accept (void) ACE_NEW_RETURN (implementation, ACE_POSIX_Asynch_Accept (this), 0); - //was ACE_POSIX_AIOCB_Asynch_Accept (this) + + return implementation; +} + +ACE_Asynch_Connect_Impl * +ACE_POSIX_AIOCB_Proactor::create_asynch_connect (void) +{ + ACE_Asynch_Connect_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_POSIX_Asynch_Connect (this), + 0); return implementation; } @@ -1485,7 +1517,7 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations) // but we should start pseudo-asynchronous accept task // one per all future acceptors - this->accept_task_.start (); + this->get_asynch_pseudo_task().start (); return; } @@ -1532,14 +1564,14 @@ ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set, // but we should start pseudo-asynchronous accept task // one per all future acceptors - this->accept_task_.start (); + this->get_asynch_pseudo_task().start (); return; } ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void) { // stop asynch accept task - this->get_asynch_accept_task().stop (); + this->get_asynch_pseudo_task().stop (); // @@ Enable the masked signals again. } diff --git a/ace/POSIX_Proactor.h b/ace/POSIX_Proactor.h index 5331f3abb3d..8781218633f 100644 --- a/ace/POSIX_Proactor.h +++ b/ace/POSIX_Proactor.h @@ -31,6 +31,7 @@ #include "ace/Free_List.h" #include "ace/Pipe.h" #include "ace/POSIX_Asynch_IO.h" +#include "ace/Asynch_Pseudo_Task.h" #define ACE_AIO_MAX_SIZE 2048 #define ACE_AIO_DEFAULT_SIZE 1024 @@ -51,13 +52,6 @@ */ class ACE_Export ACE_POSIX_Proactor : public ACE_Proactor_Impl { - /** - * For <POSIX_SIG_Asynch_Accept> operation, this handler class does - * the actual work, has to register the real-time signal with the - * Proactor. - */ - friend class ACE_POSIX_SIG_Asynch_Accept_Handler; - public: enum Proactor_Type { @@ -194,6 +188,13 @@ public: int priority = 0, int signal_number = ACE_SIGRTMIN); + virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler & handler, + ACE_HANDLE connect_handle, + const void *act, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); + virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, ACE_HANDLE file, @@ -249,7 +250,6 @@ protected: // Forward declarations. class ACE_AIOCB_Notify_Pipe_Manager; -class ACE_POSIX_Accept_Task; /** * @class ACE_POSIX_AIOCB_Proactor @@ -268,6 +268,7 @@ class ACE_Export ACE_POSIX_AIOCB_Proactor : public ACE_POSIX_Proactor /// Proactor which is necessary in the AIOCB strategy. friend class ACE_POSIX_Asynch_Operation; friend class ACE_POSIX_Asynch_Accept; + friend class ACE_POSIX_Asynch_Connect; public: @@ -317,6 +318,8 @@ public: virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void); + virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void); + virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); /** @@ -339,8 +342,9 @@ protected: ACE_POSIX_AIOCB_Proactor (size_t nmaxop, ACE_POSIX_Proactor::Proactor_Type ptype); - /// Task to process pseudo-asynchronous accept - ACE_POSIX_Asynch_Accept_Task &get_asynch_accept_task (void); + + /// Task to process pseudo-asynchronous operations + ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task(); /// Call these methods from derived class when virtual table is /// built. @@ -442,7 +446,7 @@ protected: ACE_Unbounded_Queue<ACE_POSIX_Asynch_Result *> result_queue_; /// Task to process pseudo-asynchronous accept - ACE_POSIX_Asynch_Accept_Task accept_task_; + ACE_Asynch_Pseudo_Task pseudo_task_; }; /** @@ -563,6 +567,7 @@ protected: */ class ACE_Export ACE_POSIX_Asynch_Timer : public ACE_POSIX_Asynch_Result { + /// The factory method for this class is with the POSIX_Proactor /// class. friend class ACE_POSIX_Proactor; diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 6a87d59fe4a..c75fa578d93 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -772,6 +772,12 @@ ACE_Proactor::create_asynch_accept (void) return this->implementation ()->create_asynch_accept (); } +ACE_Asynch_Connect_Impl * +ACE_Proactor::create_asynch_connect (void) +{ + return this->implementation ()->create_asynch_connect (); +} + ACE_Asynch_Transmit_File_Impl * ACE_Proactor::create_asynch_transmit_file (void) { @@ -939,6 +945,23 @@ ACE_Proactor::create_asynch_accept_result (ACE_Handler &handler, signal_number); } +ACE_Asynch_Connect_Result_Impl * +ACE_Proactor::create_asynch_connect_result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number) + +{ + return this->implementation ()->create_asynch_connect_result (handler, + connect_handle, + act, + event, + priority, + signal_number); +} + ACE_Asynch_Transmit_File_Result_Impl * ACE_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, diff --git a/ace/Proactor.h b/ace/Proactor.h index b3b55257586..65cc7cef5ee 100644 --- a/ace/Proactor.h +++ b/ace/Proactor.h @@ -1,4 +1,4 @@ -// -*- C++ -*- +/* -*- C++ -*- */ //============================================================================= /** @@ -9,7 +9,7 @@ * @author Irfan Pyarali <irfan@cs.wustl.edu> * @author Tim Harrison <harrison@cs.wustl.edu> * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> - * @author Alexander Libman <alibman@@ihug.com.au> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -381,6 +381,9 @@ public: /// Create the correct implementation class for doing Asynch_Accept. virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void); + /// Create the correct implementation class for doing Asynch_Connect. + virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void); + /// Create the correct implementation class for doing /// Asynch_Transmit_File. virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); @@ -489,6 +492,15 @@ public: int priority = 0, int signal_number = ACE_SIGRTMIN); + /// Create the correct implementation class for ACE_Asynch_Connect::Result + virtual ACE_Asynch_Connect_Result_Impl * + create_asynch_connect_result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN); + /// Create the correct implementation class for /// ACE_Asynch_Transmit_File::Result. virtual ACE_Asynch_Transmit_File_Result_Impl * @@ -513,8 +525,7 @@ public: * Timer object with a meaningful signal number, choosing the * largest signal number from the signal mask of the Proactor. */ - virtual ACE_Asynch_Result_Impl *create_asynch_timer ( - ACE_Handler &handler, + virtual ACE_Asynch_Result_Impl *create_asynch_timer (ACE_Handler &handler, const void *act, const ACE_Time_Value &tv, ACE_HANDLE event = ACE_INVALID_HANDLE, diff --git a/ace/Proactor_Impl.h b/ace/Proactor_Impl.h index 4b129de07b1..d5de76dbb0a 100644 --- a/ace/Proactor_Impl.h +++ b/ace/Proactor_Impl.h @@ -7,6 +7,7 @@ * $Id$ * * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -102,6 +103,9 @@ public: /// Create the correct implementation class for doing Asynch_Accept. virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void) = 0; + /// Create the correct implementation class for doing Asynch_Connect. + virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void) = 0; + /// Create the correct implementation class for doing Asynch_Transmit_File. virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void) = 0; @@ -197,6 +201,14 @@ public: int priority = 0, int signal_number = ACE_SIGRTMIN) = 0; + /// Create the correct implementation class for ACE_Asynch_Connect::Result. + virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event = ACE_INVALID_HANDLE, + int priority = 0, + int signal_number = ACE_SIGRTMIN) = 0; + /// Create the correct implementation class for ACE_Asynch_Transmit_File::Result. virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, diff --git a/ace/SUN_Proactor.cpp b/ace/SUN_Proactor.cpp index f406393c747..5d674493565 100644 --- a/ace/SUN_Proactor.cpp +++ b/ace/SUN_Proactor.cpp @@ -24,14 +24,14 @@ ACE_SUN_Proactor::ACE_SUN_Proactor (size_t max_aio_operations) // we should start pseudo-asynchronous accept task // one per all future acceptors - this->accept_task_.start (); + this->get_asynch_pseudo_task ().start (); } // Destructor. ACE_SUN_Proactor::~ACE_SUN_Proactor (void) { // stop asynch accept task - this->get_asynch_accept_task().stop (); + this->get_asynch_pseudo_task ().stop (); // to provide correct virtual calls delete_notify_manager (); diff --git a/ace/WIN32_Asynch_IO.cpp b/ace/WIN32_Asynch_IO.cpp index 69d504e3792..cbeb6a4e629 100644 --- a/ace/WIN32_Asynch_IO.cpp +++ b/ace/WIN32_Asynch_IO.cpp @@ -1430,6 +1430,651 @@ ACE_WIN32_Asynch_Accept::proactor (void) const return ACE_WIN32_Asynch_Operation::proactor (); } +// ********************************************************************* + +ACE_HANDLE +ACE_WIN32_Asynch_Connect_Result::connect_handle (void) const +{ + return this->connect_handle_; +} + +void ACE_WIN32_Asynch_Connect_Result::connect_handle ( ACE_HANDLE handle ) +{ + this->connect_handle_ = handle; +} + + +ACE_WIN32_Asynch_Connect_Result::ACE_WIN32_Asynch_Connect_Result + (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number) + : ACE_Asynch_Result_Impl (), + ACE_Asynch_Connect_Result_Impl (), + ACE_WIN32_Asynch_Result (handler, act, event, 0, 0, priority, signal_number), + connect_handle_ ( connect_handle ) +{ + ; +} + +void +ACE_WIN32_Asynch_Connect_Result::complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error) +{ + // Copy the data. + this->bytes_transferred_ = bytes_transferred; + this->success_ = success; + this->completion_key_ = completion_key; + this->error_ = error; + + // Create the interface result class. + ACE_Asynch_Connect::Result result (this); + + // Call the application handler. + this->handler_.handle_connect (result); +} + +ACE_WIN32_Asynch_Connect_Result::~ACE_WIN32_Asynch_Connect_Result (void) +{ +} + +// Base class operations. These operations are here to kill dominance +// warnings. These methods call the base class methods. + +u_long +ACE_WIN32_Asynch_Connect_Result::bytes_transferred (void) const +{ + return ACE_WIN32_Asynch_Result::bytes_transferred (); +} + +const void * +ACE_WIN32_Asynch_Connect_Result::act (void) const +{ + return ACE_WIN32_Asynch_Result::act (); +} + +int +ACE_WIN32_Asynch_Connect_Result::success (void) const +{ + return ACE_WIN32_Asynch_Result::success (); +} + +const void * +ACE_WIN32_Asynch_Connect_Result::completion_key (void) const +{ + return ACE_WIN32_Asynch_Result::completion_key (); +} + +u_long +ACE_WIN32_Asynch_Connect_Result::error (void) const +{ + return ACE_WIN32_Asynch_Result::error (); +} + +ACE_HANDLE +ACE_WIN32_Asynch_Connect_Result::event (void) const +{ + return ACE_WIN32_Asynch_Result::event (); +} + +u_long +ACE_WIN32_Asynch_Connect_Result::offset (void) const +{ + return ACE_WIN32_Asynch_Result::offset (); +} + +u_long +ACE_WIN32_Asynch_Connect_Result::offset_high (void) const +{ + return ACE_WIN32_Asynch_Result::offset_high (); +} + +int +ACE_WIN32_Asynch_Connect_Result::priority (void) const +{ + return ACE_WIN32_Asynch_Result::priority (); +} + +int +ACE_WIN32_Asynch_Connect_Result::signal_number (void) const +{ + return ACE_WIN32_Asynch_Result::signal_number (); +} + +int +ACE_WIN32_Asynch_Connect_Result::post_completion (ACE_Proactor_Impl *proactor) +{ + return ACE_WIN32_Asynch_Result::post_completion (proactor); +} + +// ********************************************************************* + +ACE_WIN32_Asynch_Connect::ACE_WIN32_Asynch_Connect (ACE_WIN32_Proactor * win32_proactor) + : ACE_Asynch_Operation_Impl (), + ACE_Asynch_Connect_Impl (), + ACE_WIN32_Asynch_Operation (win32_proactor), + flg_open_ (0), + task_lock_count_ (0) +{ +} + +ACE_WIN32_Asynch_Connect::~ACE_WIN32_Asynch_Connect (void) +{ + this->close (); + this->reactor (0); // to avoid purge_pending_notifications +} + +ACE_Proactor * +ACE_WIN32_Asynch_Connect::proactor (void) const +{ + return ACE_WIN32_Asynch_Operation::proactor (); +} + +ACE_HANDLE +ACE_WIN32_Asynch_Connect::get_handle (void) const +{ + + ACE_ASSERT (0); + return ACE_INVALID_HANDLE; +} + +void +ACE_WIN32_Asynch_Connect::set_handle (ACE_HANDLE handle) +{ + ACE_ASSERT (0) ; +} + +int +ACE_WIN32_Asynch_Connect::open (ACE_Handler &handler, + ACE_HANDLE, + const void *completion_key, + ACE_Proactor *proactor) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::open\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + // if we are already opened, + // we could not create a new handler without closing the previous + if (this->flg_open_ != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::open:") + ACE_LIB_TEXT ("connector already open \n")), + -1); + + //int result = + ACE_WIN32_Asynch_Operation::open (handler, + ACE_INVALID_HANDLE, + completion_key, + proactor); + + // Ignore result as we pass ACE_INVALID_HANDLE + //if (result == -1) + // return result; + + this->flg_open_ = 1; + + return 0; +} + +int +ACE_WIN32_Asynch_Connect::connect (ACE_HANDLE connect_handle, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::connect\n")); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + if (this->flg_open_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect") + ACE_LIB_TEXT ("connector was not opened before\n")), + -1); + + // Common code for both WIN and WIN32. + // Create future Asynch_Connect_Result + ACE_WIN32_Asynch_Connect_Result *result = 0; + ACE_NEW_RETURN (result, + ACE_WIN32_Asynch_Connect_Result (*this->handler_, + connect_handle, + act, + this->win32_proactor_->get_handle (), + priority, + signal_number), + -1); + + int rc = connect_i (result, + remote_sap, + local_sap, + reuse_addr); + + // update handle + connect_handle = result->connect_handle (); + + if (rc != 0) + return post_result (result, 1); + + // Enqueue result we will wait for completion + + if (this->result_map_.bind (connect_handle, result) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect:") + ACE_LIB_TEXT ("result map binding failed\n"))); + result->set_error (EFAULT); + return post_result (result, 1); + } + + this->task_lock_count_++; + } + + ACE_Asynch_Pseudo_Task & task = + this->win32_proactor_->get_asynch_pseudo_task (); + + int rc_task = task.register_io_handler (connect_handle, + this, + ACE_Event_Handler::CONNECT_MASK, + 0); // not to suspend after register + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_--; + + int post_enable = 1; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + { + post_enable = 0; + task.unlock_finish (); + } + + if (rc_task < 0) + { + ACE_WIN32_Asynch_Connect_Result *result = 0; + + this->result_map_.unbind (connect_handle, result); + + if (result != 0) + { + result->set_error (EFAULT); + + return post_result (result, post_enable); + } + } + } + + return 0; +} + +int ACE_WIN32_Asynch_Connect::post_result (ACE_WIN32_Asynch_Connect_Result * result, + int post_enable) +{ + if (this->flg_open_ != 0 && post_enable != 0) + { + if (this->win32_proactor_ ->post_completion (result) == 0) + return 0; + + ACE_ERROR ((LM_ERROR, + ACE_LIB_TEXT ("Error:(%P | %t):%p\n"), + ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::post_result: ") + ACE_LIB_TEXT (" <post_completion> failed"))); + } + + ACE_HANDLE handle = result->connect_handle (); + + if (handle != ACE_INVALID_HANDLE) + ACE_OS::closesocket (handle); + + delete result; + + return -1; +} + +//@@ New method connect_i +// return code : +// -1 errors before attempt to connect +// 0 connect started +// 1 connect finished ( may be unsuccessfully) + +int +ACE_WIN32_Asynch_Connect::connect_i (ACE_WIN32_Asynch_Connect_Result *result, + const ACE_Addr & remote_sap, + const ACE_Addr & local_sap, + int reuse_addr) +{ + result->set_bytes_transferred (0); + + ACE_HANDLE handle = result->connect_handle (); + + if (handle == ACE_INVALID_HANDLE) + { + int protocol_family = remote_sap.get_type (); + + handle = ACE_OS::socket (protocol_family, + SOCK_STREAM, + 0); + + // save it + result->connect_handle (handle); + + if (handle == ACE_INVALID_HANDLE) + { + result->set_error (errno); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT (" ACE_OS::socket failed\n")), + -1); + } + + // Reuse the address + int one = 1; + if (protocol_family != PF_UNIX && + reuse_addr != 0 && + ACE_OS::setsockopt (handle, + SOL_SOCKET, + SO_REUSEADDR, + (const char*) &one, + sizeof one) == -1) + { + result->set_error (errno); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT (" ACE_OS::setsockopt failed\n")), + -1); + } + } + + if (local_sap != ACE_Addr::sap_any) + { + sockaddr * laddr = ACE_reinterpret_cast (sockaddr *, + local_sap.get_addr ()); + size_t size = local_sap.get_size (); + + if (ACE_OS::bind (handle, laddr, size) == -1) + { + result->set_error (errno); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT (" ACE_OS::bind failed\n")), + -1); + } + } + + // set non blocking mode + + if (ACE::set_flags (handle, ACE_NONBLOCK) != 0) + { + result->set_error (errno); + + ACE_ERROR_RETURN ((LM_ERROR, + ACE_LIB_TEXT ("%N:%l:ACE_WIN32_Asynch_Connect::connect_i: ") + ACE_LIB_TEXT (" ACE::set_flags failed\n")), + -1); + } + + for (;;) + { + int rc = ACE_OS::connect (handle, + ACE_reinterpret_cast (sockaddr *, + remote_sap.get_addr ()), + remote_sap.get_size ()); + + if (rc < 0) // failure + { + if (errno == EWOULDBLOCK || errno == EINPROGRESS) + return 0; // connect started + + if (errno == EINTR) + continue; + + result->set_error (errno); + } + return 1 ; // connect finished + } + + ACE_NOTREACHED (return 0); +} + + +//@@ New method cancel_uncompleted +// It performs cancellation of all pending requests +// +// Parameter flg_notify can be +// 0 - don't send notifications about canceled accepts +// !0 - notify user about canceled accepts +// according WIN32 standards we should receive notifications +// on canceled AIO requests +// +// Return value : number of cancelled requests +// + +int +ACE_WIN32_Asynch_Connect::cancel_uncompleted (int flg_notify, ACE_Handle_Set & set) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel_uncompleted\n")); + + int retval = 0; + + MAP_ITERATOR iter (result_map_); + + MAP_ENTRY * me = 0; + + set.reset (); + + for (; iter.next (me) != 0; retval++, iter.advance ()) + { + ACE_HANDLE handle = me->ext_id_; + ACE_WIN32_Asynch_Connect_Result* result = me->int_id_ ; + + set.set_bit (handle); + + result->set_bytes_transferred (0); + result->set_error (ERROR_OPERATION_ABORTED); + this->post_result (result, flg_notify); + } + + result_map_.unbind_all (); + + return retval; +} + +int +ACE_WIN32_Asynch_Connect::cancel (void) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::cancel\n")); + + //We are not really ACE_WIN32_Asynch_Operation + //so we could not call ::aiocancel () + // or just write + //return ACE_WIN32_Asynch_Operation::cancel (); + //We delegate real cancelation to cancel_uncompleted (1) + + int rc = -1 ; // ERRORS + + ACE_Handle_Set set; + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + int num_cancelled = cancel_uncompleted (flg_open_, set); + + if (num_cancelled == 0) + rc = 1; // AIO_ALLDONE + else if (num_cancelled > 0) + rc = 0; // AIO_CANCELED + + if (this->flg_open_ == 0) + return rc; + + this->task_lock_count_++; + } + + ACE_Asynch_Pseudo_Task & task = + this->win32_proactor_->get_asynch_pseudo_task (); + + int rc_task = task.remove_io_handler (set); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + } + + return rc; +} + +int +ACE_WIN32_Asynch_Connect::close (void) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::close\n")); + + ACE_Handle_Set set; + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + int num_cancelled = cancel_uncompleted (flg_open_, set); + + if (num_cancelled == 0 || this->flg_open_ == 0) + { + this->flg_open_ = 0; + return 0; + } + + this->task_lock_count_++; + } + + ACE_Asynch_Pseudo_Task & task = + this->win32_proactor_->get_asynch_pseudo_task (); + + int rc_task = task.remove_io_handler (set); + + { + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1)); + + this->task_lock_count_--; + + if (rc_task == -2 && task_lock_count_ == 0) // task is closing + task.unlock_finish (); + + this->flg_open_ = 0; + } + + return 0; +} + +int +ACE_WIN32_Asynch_Connect::handle_exception (ACE_HANDLE fd) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_exception\n")); + return handle_output (fd); +} + +int +ACE_WIN32_Asynch_Connect::handle_input (ACE_HANDLE fd) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_input\n")); + return handle_output (fd); +} + +int +ACE_WIN32_Asynch_Connect::handle_output (ACE_HANDLE fd) +{ + ACE_TRACE (ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_output\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + ACE_WIN32_Asynch_Connect_Result* result = 0; + + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + + int sockerror = 0 ; + int lsockerror = sizeof sockerror; + + ACE_OS::getsockopt (fd, + SOL_SOCKET, + SO_ERROR, + (char*) & sockerror, + & lsockerror); + + result->set_bytes_transferred (0); + result->set_error (sockerror); + this->post_result (result, this->flg_open_); + + //ACE_Asynch_Pseudo_Task & task = + // this->win32_proactor_->get_asynch_pseudo_task(); + + // remove_io_handler() contains flag DONT_CALL + //task.remove_io_handler ( fd ); + + //return 0; + return -1; +} + + +int +ACE_WIN32_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask) +{ + ACE_TRACE(ACE_LIB_TEXT ("ACE_WIN32_Asynch_Connect::handle_close\n")); + + ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0)); + + ACE_Asynch_Pseudo_Task & task = + this->win32_proactor_->get_asynch_pseudo_task (); + + if (task.is_active () == 0) // task is closing + { + if (this->flg_open_ != 0) // we are open + { + this->flg_open_ = 0; + + // it means other thread is waiting for reactor token_ + if (this->task_lock_count_ > 0) + task.lock_finish (); + } + + ACE_Handle_Set set; + this->cancel_uncompleted (0, set); + + return 0; + } + + // remove_io_handler() contains flag DONT_CALL + // so it is save + task.remove_io_handler (fd); + + ACE_WIN32_Asynch_Connect_Result* result = 0; + + if (this->result_map_.unbind (fd, result) != 0) // not found + return -1; + + result->set_bytes_transferred (0); + result->set_error (ERROR_OPERATION_ABORTED); + this->post_result (result, this->flg_open_); + + return 0; +} + +// ********************************************************************* + ACE_HANDLE ACE_WIN32_Asynch_Transmit_File_Result::socket (void) const { @@ -2317,4 +2962,27 @@ ACE_WIN32_Asynch_Write_Dgram::ACE_WIN32_Asynch_Write_Dgram (ACE_WIN32_Proactor * { } +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *>; +template class ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Const_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; +template class ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *> +#pragma instantiate ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Const_Iterator_Base<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Const_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> +#pragma instantiate ACE_Map_Reverse_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + + +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ + #endif /* ACE_WIN32 || ACE_HAS_WINCE */ diff --git a/ace/WIN32_Asynch_IO.h b/ace/WIN32_Asynch_IO.h index 1d99a0e61b8..6b59020df0b 100644 --- a/ace/WIN32_Asynch_IO.h +++ b/ace/WIN32_Asynch_IO.h @@ -9,15 +9,16 @@ * * These classes only works on Win32 platforms. * - * The implementation of <ACE_Asynch_Transmit_File> and - * <ACE_Asynch_Accept> are only supported if ACE_HAS_WINSOCK2 is - * defined or you are on WinNT 4.0 or higher. + * The implementation of ACE_Asynch_Transmit_File, + * ACE_Asynch_Accept, and ACE_Asynch_Connect are only supported if + * ACE_HAS_WINSOCK2 is defined or you are on WinNT 4.0 or higher. * * * @author Irfan Pyarali <irfan@cs.wustl.edu> * @author Tim Harrison <harrison@cs.wustl.edu> * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> * @author Roger Tragin <r.tragin@computer.org> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -36,6 +37,9 @@ #include "ace/OS.h" #include "ace/Asynch_IO_Impl.h" #include "ace/Addr.h" +#include "ace/Event_Handler.h" + +#include "ace/Map_Manager.h" // Forward declaration class ACE_WIN32_Proactor; @@ -1044,6 +1048,222 @@ public: }; /** + * @class ACE_WIN32_Asynch_Connect_Result + * + * @brief This is that class which will be passed back to the + * completion handler when the asynchronous connect completes. + * + * This class has all the information necessary for the + * completion handler to uniquiely identify the completion of the + * asynchronous connect. + */ +class ACE_Export ACE_WIN32_Asynch_Connect_Result : public virtual ACE_Asynch_Connect_Result_Impl, + public ACE_WIN32_Asynch_Result +{ + /// Factory classes will have special permissions. + friend class ACE_WIN32_Asynch_Connect; + + /// The Proactor constructs the Result class for faking results. + friend class ACE_WIN32_Proactor; + +public: + + /// I/O handle for the connection. + ACE_HANDLE connect_handle (void) const; + + // = Base class operations. These operations are here to kill some + // warnings. These methods call the base class methods. + + /// Number of bytes transferred by the operation. + u_long bytes_transferred (void) const; + + /// ACT associated with the operation. + const void *act (void) const; + + /// Did the operation succeed? + int success (void) const; + + /** + * Returns the ACT associated with the handle when it was + * registered with the I/O completion port. This ACT is not the + * same as the ACT associated with the asynchronous operation. + */ + const void *completion_key (void) const; + + /// Error value if the operation fail. + u_long error (void) const; + + /// Event associated with the OVERLAPPED structure. + ACE_HANDLE event (void) const; + + /// This really make sense only when doing file I/O. + u_long offset (void) const; + + /// Offset_high associated with the OVERLAPPED structure. + u_long offset_high (void) const; + + /// The priority of the asynchronous operation. Currently, this is + /// not supported on Win32. + int priority (void) const; + + /// No-op. Returns 0. + int signal_number (void) const; + + /// Post this object to the Proactor's completion port. + int post_completion (ACE_Proactor_Impl *proactor); + +protected: + /// Constructor is protected since creation is limited to + /// ACE_Asynch_Connect factory. + ACE_WIN32_Asynch_Connect_Result (ACE_Handler &handler, + ACE_HANDLE connect_handle, + const void* act, + ACE_HANDLE event, + int priority, + int signal_number); + + /// ACE_Proactor will call this method when the accept completes. + virtual void complete (u_long bytes_transferred, + int success, + const void *completion_key, + u_long error); + + /// Destructor. + virtual ~ACE_WIN32_Asynch_Connect_Result (void); + + /// Set the I/O handle for the new connection. + void connect_handle (ACE_HANDLE handle); + + ACE_HANDLE connect_handle_; +}; + + +/** + * @class ACE_WIN32_Asynch_Connect + */ +class ACE_Export ACE_WIN32_Asynch_Connect : + public virtual ACE_Asynch_Connect_Impl, + public ACE_WIN32_Asynch_Operation, + public ACE_Event_Handler +{ +public: + + /// Constructor. + ACE_WIN32_Asynch_Connect (ACE_WIN32_Proactor * win32_proactor); + + /// Destructor. + virtual ~ACE_WIN32_Asynch_Connect (void); + + /** + * This open belongs to ACE_WIN32_Asynch_Operation. We forward + * this call to that method. We have put this here to avoid the + * compiler warnings. + */ + int open (ACE_Handler &handler, + ACE_HANDLE handle, + const void *completion_key, + ACE_Proactor *proactor = 0); + + /** + * Start an asynchronous connect. + * + * @arg connect_handle handle to use for the connect. If the value + * ACE_INVALID_HANDLE, a new handle will be created. + * + * @retval 0 Success + * @retval -1 Error + */ + int connect (ACE_HANDLE connect_handle, + const ACE_Addr &remote_sap, + const ACE_Addr &local_sap, + int reuse_addr, + const void *act, + int priority, + int signal_number = 0); + + /** + * Cancel all pending pseudo-asynchronus requests + * Behavior as usual AIO request + */ + int cancel (void); + + /** + * Close performs cancellation of all pending requests + * and close the connect handle + */ + int close (void); + + /// virtual from ACE_Event_Handler + ACE_HANDLE get_handle (void) const; + + /// virtual from ACE_Event_Handler + void set_handle (ACE_HANDLE handle); + + /// virtual from ACE_Event_Handler + int handle_input ( ACE_HANDLE handle); + int handle_output ( ACE_HANDLE handle); + int handle_exception ( ACE_HANDLE handle); + + /// virtual from ACE_Event_Handler + int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask) ; + + // = Methods belong to ACE_WIN32_Asynch_Operation base class. These + // methods are defined here to avoid dominace warnings. They route + // the call to the ACE_WIN32_Asynch_Operation base class. + /// Return the underlying proactor. + ACE_Proactor* proactor (void) const; + +private: + int connect_i (ACE_WIN32_Asynch_Connect_Result *result, + const ACE_Addr &remote_sap, + const ACE_Addr &local_sap, + int reuse_addr); + + int post_result (ACE_WIN32_Asynch_Connect_Result *result, int flg_post); + + /// Cancel uncompleted connect operations. + /** + * @arg flg_notify Indicates whether or not to send notification about + * canceled connect operations. If 0, don't send + * notifications. If 1, notify user about canceled + * connects. + * According WIN32 standards we should receive + * notifications on canceled AIO requests. + */ + int cancel_uncompleted (int flg_notify, ACE_Handle_Set & set); + + int flg_open_ ; + /// 1 - Connect is registered in ACE_Asynch_Pseudo_Task + /// 0 - Aceept is deregisted in ACE_Asynch_Pseudo_Task + + + /// to prevent ACE_Asynch_Pseudo_Task from deletion + /// while we make a call to the ACE_Asynch_Pseudo_Task + /// This is extra cost !!! + /// we could avoid them if all applications will follow the rule: + /// Proactor should be deleted only after deletion all + /// AsynchOperation objects connected with it + int task_lock_count_; + + typedef ACE_Map_Manager<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + MAP_MANAGER; + typedef ACE_Map_Iterator<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *, ACE_SYNCH_NULL_MUTEX> + MAP_ITERATOR; + typedef ACE_Map_Entry<ACE_HANDLE, ACE_WIN32_Asynch_Connect_Result *> + MAP_ENTRY; + + MAP_MANAGER result_map_; + // Map of Result pointers that correspond to all the <accept>'s + // pending. + + ACE_SYNCH_MUTEX lock_; + // The lock to protect the result queue which is shared. The queue + // is updated by main thread in the register function call and + // through the auxillary thread in the deregister fun. So let us + // mutex it. +}; + +/** * @class ACE_WIN32_Asynch_Transmit_File_Result * * diff --git a/ace/WIN32_Proactor.cpp b/ace/WIN32_Proactor.cpp index ec89c97f5b0..a26c164430b 100644 --- a/ace/WIN32_Proactor.cpp +++ b/ace/WIN32_Proactor.cpp @@ -52,13 +52,23 @@ ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads, ACE_ERROR ((LM_ERROR, ACE_LIB_TEXT ("%p\n"), ACE_LIB_TEXT ("CreateIoCompletionPort"))); + + this->get_asynch_pseudo_task ().start (); } ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void) { + this->get_asynch_pseudo_task ().stop (); + this->close (); } +ACE_Asynch_Pseudo_Task & +ACE_WIN32_Proactor::get_asynch_pseudo_task () +{ + return this->pseudo_task_; +} + int ACE_WIN32_Proactor::close (void) { @@ -66,7 +76,7 @@ ACE_WIN32_Proactor::close (void) if (this->completion_port_ != 0) { // To avoid memory leaks we should delete all results from queue. - + for (;;) { ACE_OVERLAPPED *overlapped = 0; @@ -74,7 +84,7 @@ ACE_WIN32_Proactor::close (void) u_long completion_key = 0; // Get the next asynchronous operation that completes - BOOL res = ::GetQueuedCompletionStatus + BOOL res = ::GetQueuedCompletionStatus (this->completion_port_, &bytes_transferred, &completion_key, @@ -84,7 +94,7 @@ ACE_WIN32_Proactor::close (void) if (overlapped == 0) break; - ACE_WIN32_Asynch_Result *asynch_result = + ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped; delete asynch_result; @@ -196,6 +206,16 @@ ACE_WIN32_Proactor::create_asynch_accept (void) return implementation; } +ACE_Asynch_Connect_Impl * +ACE_WIN32_Proactor::create_asynch_connect (void) +{ + ACE_Asynch_Connect_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_WIN32_Asynch_Connect (this), + 0); + return implementation; +} + ACE_Asynch_Transmit_File_Impl * ACE_WIN32_Proactor::create_asynch_transmit_file (void) { @@ -390,6 +410,26 @@ ACE_WIN32_Proactor::create_asynch_accept_result (ACE_Handler &handler, return implementation; } +ACE_Asynch_Connect_Result_Impl * +ACE_WIN32_Proactor::create_asynch_connect_result (ACE_Handler & handler, + ACE_HANDLE connect_handle, + const void *act, + ACE_HANDLE event, + int priority , + int signal_number) +{ + ACE_Asynch_Connect_Result_Impl *implementation = 0; + ACE_NEW_RETURN (implementation, + ACE_WIN32_Asynch_Connect_Result (handler, + connect_handle, + act, + event, + priority, + signal_number), + 0); + return implementation; +} + ACE_Asynch_Transmit_File_Result_Impl * ACE_WIN32_Proactor::create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, diff --git a/ace/WIN32_Proactor.h b/ace/WIN32_Proactor.h index 1784ead2c8a..c363292f954 100644 --- a/ace/WIN32_Proactor.h +++ b/ace/WIN32_Proactor.h @@ -10,6 +10,7 @@ * @author Tim Harrison (harrison@cs.wustl.edu) * @author Alexander Babu Arulanthu <alex@cs.wustl.edu> * @author Roger Tragin <r.tragin@computer.org> + * @author Alexander Libman <alibman@ihug.com.au> */ //============================================================================= @@ -31,6 +32,7 @@ #include "ace/Event_Handler.h" #include "ace/Proactor_Impl.h" +#include "ace/Asynch_Pseudo_Task.h" // Forward declarations. class ACE_WIN32_Asynch_Result; @@ -47,6 +49,8 @@ class ACE_WIN32_Proactor_Timer_Handler; */ class ACE_Export ACE_WIN32_Proactor : public ACE_Proactor_Impl { + friend class ACE_WIN32_Asynch_Connect; + public: /// A do nothing constructor. ACE_WIN32_Proactor (size_t number_of_threads = 0, @@ -110,6 +114,7 @@ public: virtual ACE_Asynch_Read_Dgram_Impl *create_asynch_read_dgram (void); virtual ACE_Asynch_Write_Dgram_Impl *create_asynch_write_dgram (void); virtual ACE_Asynch_Accept_Impl *create_asynch_accept (void); + virtual ACE_Asynch_Connect_Impl *create_asynch_connect (void); virtual ACE_Asynch_Transmit_File_Impl *create_asynch_transmit_file (void); // Methods used to create Asynch_IO_Result objects. We create the right @@ -188,6 +193,14 @@ public: int priority, int signal_number = 0); + virtual ACE_Asynch_Connect_Result_Impl *create_asynch_connect_result (ACE_Handler & handler, + ACE_HANDLE connect_handle, + const void *act, + ACE_HANDLE event, + int priority, + int signal_number = 0); + + virtual ACE_Asynch_Transmit_File_Result_Impl *create_asynch_transmit_file_result (ACE_Handler &handler, ACE_HANDLE socket, ACE_HANDLE file, @@ -212,6 +225,9 @@ public: int signal_number = 0); protected: + /// Task to process pseudo-asynchronous operations + ACE_Asynch_Pseudo_Task & get_asynch_pseudo_task (void); + /// Called when object is signaled by OS (either via UNIX signals or /// when a Win32 object becomes signaled). virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0); @@ -261,6 +277,10 @@ protected: /// Handler to handle the wakeups. This works in conjunction with the /// <ACE_Proactor::run_event_loop>. ACE_Handler wakeup_handler_; + + /// Pseudo-task for asynch connect ( NT/2000) + /// In future should removed in XP with ConnectEx support + ACE_Asynch_Pseudo_Task pseudo_task_; }; /** |