diff options
author | Phil Mesnier <mesnier_p@ociweb.com> | 2006-04-26 17:12:48 +0000 |
---|---|---|
committer | Phil Mesnier <mesnier_p@ociweb.com> | 2006-04-26 17:12:48 +0000 |
commit | deaa7a203fb9a4db2f3b01eaf2995ccb87e5944d (patch) | |
tree | 05a08d17ed7b625832acb49896659fb0beda830b | |
parent | 4274f7f32351550bc6b3f8b346488f1b0a993bb4 (diff) | |
download | ATCD-deaa7a203fb9a4db2f3b01eaf2995ccb87e5944d.tar.gz |
ChangeLog tag: Wed Apr 26 16:42:45 UTC 2006 Phil Mesnier <mesnier_p@ociweb.com>
59 files changed, 1941 insertions, 262 deletions
diff --git a/TAO/ChangeLog b/TAO/ChangeLog index 3536cc3b41e..48ecb96913a 100644 --- a/TAO/ChangeLog +++ b/TAO/ChangeLog @@ -1,3 +1,140 @@ +Wed Apr 26 16:42:45 UTC 2006 Phil Mesnier <mesnier_p@ociweb.com> + + * docs/Options.html: + + Adding text for the new commandline options used to control the + parallel connect feature. + + * tao/Blocked_Connect_Strategy.h: + * tao/Blocked_Connect_Strategy.cpp: + * tao/CORBALOC_Parser.cpp: + * tao/Client_Strategy_Factory.h: + * tao/Client_Strategy_Factory.cpp: + * tao/Connect_Strategy.h: + * tao/Connect_Strategy.cpp: + * tao/Endpoint.h: + * tao/Endpoint.cpp: + * tao/IIOP_Connection_Handler.h: + * tao/IIOP_Connection_Handler.cpp: + * tao/IIOP_Connector.h: + * tao/IIOP_Connector.cpp: + * tao/IIOP_Endpoint.h: + * tao/IIOP_Endpoint.cpp: + * tao/IIOP_Profile.h: + * tao/IIOP_Profile.cpp: + * tao/Invocation_Endpoint_Selectors.h: + * tao/Invocation_Endpoint_Selectors.cpp: + * tao/LF_CH_Event.h: + * tao/LF_Connect_Strategy.h: + * tao/LF_Connect_Strategy.cpp: + * tao/LF_Event.h: + * tao/LF_Multi_Event.h: + * tao/LF_Multi_Event.cpp: + * tao/MProfile.h: + * tao/MProfile.i: + * tao/MProfile.cpp: + * tao/ORB_Core.cpp: + * tao/Profile.h: + * tao/Profile.cpp: + * tao/Profile_Transport_Resolver.h: + * tao/Profile_Transport_Resolver.cpp: + * tao/Reactive_Connect_Strategy.h: + * tao/Reactive_Connect_Strategy.cpp: + * tao/Transport.cpp: + * tao/Transport_Connector.h: + * tao/Transport_Connector.cpp: + * tao/Transport_Descriptor_Interface.h: + * tao/Transport_Descriptor_Interface.inl: + * tao/Transport_Descriptor_Interface.cpp: + * tao/default_client.h: + * tao/default_client.cpp: + * tao/params.h: + * tao/params.i: + * tao/params.cpp: + * tao/tao.mpc: + + These changes support a new technique for active connection + establishment when presented with a profile containing multiple + possible endpoints. This commit resolves bugzilla bug #2485. + + The technique in question is "parallel connects" meaning + attempting to connect to many endpoints simultaniously. It was + conceived as a way to deal with timeouts when the Invocation + Endpoint Selector would first try to connect to one or more + unreachable endpoints. If those endpoints were defined as IP + addresses (not hostnames) or as resolvable hostnames that + pointed to unreachable IP addresses, the connection + establishment would take potentially several minutes to time out + and eventually encounter a reachable endpoint. In the case of + shared profiles (those using TAG_ALTERNATE_IIOP_ENDPOINT) this + delay impacts every single invocation. + + This parallel connect feature (also referred to somewhat + inacurately as a strategy) avoids this by supplying all the + endpoints in a profile to the connector and letting it first + test to see if any are already cached and available, and if not, + to open connections to each and wait for a winner. When the + first connection completes, any pending connections are + terminated. + + In order to minimize the use of pending connections, an iterator + traverses the list of endpoints creating new connections and + also checking any existing connections for completion. If the + first endpoint happens to be reachable and the server responds + quickly enough, the client may not open any more connections. + + If the server does not respond immediately, a wait strategy is + entered. This wait strategy may be Reactive or Leader/Follower + based. In either case, a specal "multi event" type is used to + allow a single thread to wait on one of many connectors, and + then to clean up those that didn't finish in time. The parallel + connect feature is also available using blocking connects, but + the only advantage there is in checking the cache for all + endpoints in the profile, there is no performance gain during + actual connection establishment. + + The parallel connect strategy differs from another endpoint + selection optimization, available in + tao/Strategies/Optimized_Connection_Endpoint_Selector.*. That + strategy works by examining all profiles simultaniously, this + feature still treats separate profiles separately. This profile + separation is necessary to support Load Balancing and Fault + Tolerence. Also, this feature requires additional support to be + built into protocol specific connectors (IIOP is currently the + only protocol supporting parallel connects) whereas the other + feature works regardless of the protocol. + + As this is a new feature, it is disabled by default. Use the + -ORBUseParallelConnects option to enable its use. A second + option, -ORBParallelConnectDelay, is used to introduce a small + delay between the opening of new potential connections if the + server is particularly busy. This is useful to minimize the + impact on a busy server if more than one of the available + endpoints is reachable. Also, because this feature only focuses + on one profile at a time, the server must be run with + -ORBUseSharedProfiles enabled (it is disabled by default). + + * tests/Parallel_Connect_Strategy/Parallel_Connect_Strategy.mpc: + * tests/Parallel_Connect_Strategy/README: + * tests/Parallel_Connect_Strategy/Test.idl: + * tests/Parallel_Connect_Strategy/Test_i.h: + * tests/Parallel_Connect_Strategy/Test_i.cpp: + * tests/Parallel_Connect_Strategy/blocked.conf: + * tests/Parallel_Connect_Strategy/client.cpp: + * tests/Parallel_Connect_Strategy/reactive.conf: + * tests/Parallel_Connect_Strategy/run_test.pl: + * tests/Parallel_Connect_Strategy/server.cpp: + + This is a new test for the parallel connect feature. It works by + having the server open two endpoints, one aliased to something + unreachable. The client then uses different wait strategies to + make invocations on the server and records the time for + each. These tests also include counter-examples in which + parallel connects are not used, and these take several minutes + to run. On my Linux machine the timeout period is about 3 + minutes which causes the overall test to take about 9 minutes to + run. + Wed Apr 26 16:30:56 UTC 2006 Phil Mesnier <mesnier_p@ociweb.com> * tao/PortableServer/POAManagerFactory.cpp: diff --git a/TAO/docs/Options.html b/TAO/docs/Options.html index c5151b03d56..22197133429 100644 --- a/TAO/docs/Options.html +++ b/TAO/docs/Options.html @@ -531,11 +531,18 @@ is <code>0</code>. This option is disabled (<code>0</code>) by default.</td> Note: If none of the preferred interfaces apply to an outgoing connection then they will not be enforced. For this option to have any effect, therefore, the connection through a legal preferred interface must fail. - </tr> + </tr> <tr> <td><code>-ORBKeepalive</code> <em>boolean (0|1)</em></td> - <td><a name="-ORBKeepalive"></a>This option allows users to - specify that the SO_KEEPALIVE option is set on TCP sockets. + <td><a name="-ORBKeepalive"></a>This option allows users to specify + that the SO_KEEPALIVE option is set on TCP sockets used by IIOP. + The default is <code>0</code> (false). + </td> + </tr> + <tr> + <td><code>-ORBDontRoute</code> <em>boolean (0|1)</em></td> + <td><a name="-ORBDontRoute"></a>This option allows users to specify + that the SO_DONTROUTE option is set on TCP sockets used by IIOP. The default is <code>0</code> (false). </td> </tr> @@ -694,6 +701,16 @@ listen endpoints should be encoded into IORs when <code>-ORBUseIMR</code> is set. The default is true. </td> </tr> <tr> + <td><code>-ORBUseParallelConnects</code> <em>boolean (0|1)</em></td> + <td><a name="-ORBUseParallelConnects"></a>This option allows users to + specify the ORB attempt to connect simultaniously to all endpoints + listed in profiles, rather than stepping through individual endpoints, + trying and possibly failing, before moving on to the next. For this + feature to work, the server must be using shared profiles. + The default is <code>0</code> (false). + </td> + </tr> + <tr> <td><code>-ORBUseSharedProfile</code> <em>boolean (0|1)</em></td> <td><a name="-ORBUseSharedProfile"></a> This option allows multiple implicit or explicit endpoints to be @@ -707,6 +724,18 @@ is set. The default is true. </td> </td> </tr> <tr> + <td><code>-ORBParallelConnectDelay</code> <em>unsigned long msec</em></td> + <td><a name="-ORBParallelConnectDelay"></a>When using parallel + connection attempts, this option defines the number of milliseconds to + delay when polling previously started connection attempts. If a server + is likely to be busy, this client side option will help avoid creating + redundant connections that must be accepted, only to be closed a moment + later. However, if the first reachable endpoint is far down the list, + this option will increase the delay before that endpoint is reached. + The default is <code>0</code>. + </td> + </tr> + <tr> <td><code>-ORBPreferIPV6Interfaces</code> <em>boolean (0|1)</em></td> <td><a name="-ORBPreferIPV6Interfaces"></a> If option is <CODE>1</CODE> (true) it directs the default diff --git a/TAO/tao/Blocked_Connect_Strategy.cpp b/TAO/tao/Blocked_Connect_Strategy.cpp index 9317a128679..e5ee355a042 100644 --- a/TAO/tao/Blocked_Connect_Strategy.cpp +++ b/TAO/tao/Blocked_Connect_Strategy.cpp @@ -38,21 +38,13 @@ TAO_Blocked_Connect_Strategy::synch_options (ACE_Time_Value *timeout, } } - int -TAO_Blocked_Connect_Strategy::wait (TAO_Connection_Handler *, +TAO_Blocked_Connect_Strategy::wait_i (TAO_LF_Event *, + TAO_Transport *, ACE_Time_Value * ) { // We cannot wait for connection completion return -1; } -int -TAO_Blocked_Connect_Strategy::wait (TAO_Transport *, - ACE_Time_Value *) -{ - // We cannot wait for connection completion - return -1; -} - TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Blocked_Connect_Strategy.h b/TAO/tao/Blocked_Connect_Strategy.h index 9c547ebfc23..9eea4edaebc 100644 --- a/TAO/tao/Blocked_Connect_Strategy.h +++ b/TAO/tao/Blocked_Connect_Strategy.h @@ -45,11 +45,11 @@ public: virtual void synch_options (ACE_Time_Value *val, ACE_Synch_Options &opt); - virtual int wait (TAO_Connection_Handler *ch, +protected: + virtual int wait_i (TAO_LF_Event *ev, + TAO_Transport *t, ACE_Time_Value *val); - virtual int wait (TAO_Transport *t, - ACE_Time_Value *val); }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/CORBALOC_Parser.cpp b/TAO/tao/CORBALOC_Parser.cpp index 9192b99c8a4..d3b8fbb84a7 100644 --- a/TAO/tao/CORBALOC_Parser.cpp +++ b/TAO/tao/CORBALOC_Parser.cpp @@ -233,7 +233,8 @@ TAO_CORBALOC_Parser::parse_string (const char * ior, const char * str = full_ep.c_str(); endpoints[i].profile_->parse_string (str ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (CORBA::Object::_nil ()); - if (mprofile.give_profile(endpoints[i].profile_) != -1) + int share = orb->orb_core()->orb_params()->shared_profile(); + if (mprofile.give_profile(endpoints[i].profile_, share) != -1) endpoints[i].profile_ = 0; else { diff --git a/TAO/tao/Client_Strategy_Factory.cpp b/TAO/tao/Client_Strategy_Factory.cpp index f459955e2bf..f8cfb870b74 100644 --- a/TAO/tao/Client_Strategy_Factory.cpp +++ b/TAO/tao/Client_Strategy_Factory.cpp @@ -40,6 +40,12 @@ TAO_Client_Strategy_Factory::create_wait_strategy (TAO_Transport *) return 0; } +TAO_Client_Strategy_Factory::Connect_Strategy +TAO_Client_Strategy_Factory::connect_strategy (void) const +{ + return TAO_BLOCKED_CONNECT; +} + TAO_Connect_Strategy * TAO_Client_Strategy_Factory::create_connect_strategy (TAO_ORB_Core *) { diff --git a/TAO/tao/Client_Strategy_Factory.h b/TAO/tao/Client_Strategy_Factory.h index 3c63290ad4a..407e580b509 100644 --- a/TAO/tao/Client_Strategy_Factory.h +++ b/TAO/tao/Client_Strategy_Factory.h @@ -71,6 +71,15 @@ public: /// Create the correct client <asynch_connect> strategy. virtual TAO_Connect_Strategy *create_connect_strategy (TAO_ORB_Core *); + enum Connect_Strategy + { + TAO_BLOCKED_CONNECT, + TAO_REACTIVE_CONNECT, + TAO_LEADER_FOLLOWER_CONNECT + }; + /// Return the selected connection strategy option. + virtual Connect_Strategy connect_strategy (void) const; + /// Does the client allow any form of callback? virtual int allow_callback (void); diff --git a/TAO/tao/Connect_Strategy.cpp b/TAO/tao/Connect_Strategy.cpp index 2299f775d46..9a79bece571 100644 --- a/TAO/tao/Connect_Strategy.cpp +++ b/TAO/tao/Connect_Strategy.cpp @@ -1,4 +1,7 @@ #include "tao/Connect_Strategy.h" +#include "tao/Transport.h" +#include "tao/Connection_Handler.h" +#include "tao/LF_Multi_Event.h" ACE_RCSID (tao, Connect_Strategy, @@ -16,4 +19,41 @@ TAO_Connect_Strategy::~TAO_Connect_Strategy (void) { } +int +TAO_Connect_Strategy::wait (TAO_Connection_Handler *ch, + ACE_Time_Value *max_wait_time) +{ + if (ch == 0) + return -1; + + return this->wait_i (ch, ch->transport (),max_wait_time); +} + +int +TAO_Connect_Strategy::wait (TAO_Transport *t, + ACE_Time_Value *max_wait_time) +{ + // Basically the connection was EINPROGRESS, but before we could + // wait for it some other thread detected a failure and cleaned up + // the connection handler. + if (t == 0) + return -1; + + return this->wait_i (t->connection_handler(),t,max_wait_time); +} + +int +TAO_Connect_Strategy::wait (TAO_LF_Multi_Event *mev, + ACE_Time_Value *max_wait_time) +{ + return this->wait_i (mev, mev->base_transport(), max_wait_time); +} + +int +TAO_Connect_Strategy::poll (TAO_LF_Multi_Event *mev) +{ + ACE_Time_Value zero(ACE_Time_Value::zero); + return this->wait_i (mev, mev->base_transport(), &zero); +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Connect_Strategy.h b/TAO/tao/Connect_Strategy.h index 1189a6cd171..4783039326a 100644 --- a/TAO/tao/Connect_Strategy.h +++ b/TAO/tao/Connect_Strategy.h @@ -34,6 +34,8 @@ class TAO_ORB_Core; class TAO_Connector; class TAO_Connection_Handler; class TAO_Transport; +class TAO_LF_Multi_Event; +class TAO_LF_Event; /** * @class TAO_Connect_Strategy @@ -72,13 +74,23 @@ public: /* If the connection establishment fails the state within the * connection handler is set appropriately. */ - virtual int wait (TAO_Connection_Handler *ch, - ACE_Time_Value *val) = 0; + int wait (TAO_Connection_Handler *ch, ACE_Time_Value *val); - virtual int wait (TAO_Transport *t, - ACE_Time_Value *val) = 0; + int wait (TAO_Transport *t, ACE_Time_Value *val); + + /// Wait for one of many connections to complete. Returns when one + /// succeeds or all fail. + int wait (TAO_LF_Multi_Event *ev, ACE_Time_Value *val); + + /// Do a quick check to see if any connections are complete. This + /// does the same as the wait with an explicit time value of 0. + int poll (TAO_LF_Multi_Event *ev); protected: + /// This is the method that does all the real interesting stuff. + virtual int wait_i (TAO_LF_Event *ev, + TAO_Transport *t, + ACE_Time_Value *val) = 0; /// Cached copy of the ORB core pointer TAO_ORB_Core * const orb_core_; diff --git a/TAO/tao/Endpoint.cpp b/TAO/tao/Endpoint.cpp index 75f61995341..6077a1b6b94 100644 --- a/TAO/tao/Endpoint.cpp +++ b/TAO/tao/Endpoint.cpp @@ -1,6 +1,7 @@ // $Id$ #include "tao/Endpoint.h" +#include "tao/ORB_Core.h" #if !defined (__ACE_INLINE__) #include "tao/Endpoint.i" @@ -16,6 +17,14 @@ TAO_Endpoint::~TAO_Endpoint (void) { } +TAO_Endpoint * +TAO_Endpoint::next_filtered (TAO_ORB_Core *, TAO_Endpoint *root) +{ + if (root == 0) + return this; + return this->next(); +} + TAO_END_VERSIONED_NAMESPACE_DECL //@@ TAO_ENDPOINT_SPL_METHODS_ADD_HOOK diff --git a/TAO/tao/Endpoint.h b/TAO/tao/Endpoint.h index b762595ef58..d89582e973e 100644 --- a/TAO/tao/Endpoint.h +++ b/TAO/tao/Endpoint.h @@ -29,6 +29,8 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class TAO_ORB_Core; + /* * Includes and forward decls for specializing TAO's * endpoint implementation. @@ -79,12 +81,29 @@ public: */ virtual CORBA::Boolean is_equivalent (const TAO_Endpoint *other_endpoint) = 0; - /// Endpoints can be stringed in a list. + /// Endpoints can be linked in a list. /** * @return The next endpoint in the list, if any. */ virtual TAO_Endpoint *next (void) = 0; + /** + * Return the next endpoint in the list, but use protocol-specific + * filtering to constrain the value. The orb core is needed to supply + * any sort of filter arguments, and the root endpoint is needed in case + * the algorithm needs to rewind. If the supplied root is 0, then this + * is assumed to be the candidate next endpoint. + * + * To use this, the caller starts off the change with root == 0. This + * is a bit of a violation in logic, a more correct implementation would + * accept this == 0 and a non-null root. + * To do iteration using next_filtered, do: + * for (TAO_Endpoint *ep = root_endpoint->next_filtered (orb_core, 0); + * ep != 0; + * ep = ep->next_filtered(orb_core, root_endpoint)) { } + */ + virtual TAO_Endpoint *next_filtered (TAO_ORB_Core *, TAO_Endpoint *root); + /// Return a string representation for the address. /** * The purpose of this method is to provide a general interface to diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 502797c9585..d272769f3fd 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -169,8 +169,9 @@ TAO_IIOP_Connection_Handler::open (void*) if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, ACE_TEXT("TAO (%P|%t) - IIOP_Connection_Handler::open, ") - ACE_TEXT("The local addr is <%s> \n"), - local_addr. get_host_addr ())); + ACE_TEXT("The local addr is <%s:%d> \n"), + local_addr.get_host_addr (), + local_addr.get_port_number())); if (local_addr == remote_addr) { @@ -478,6 +479,31 @@ TAO_IIOP_Connection_Handler::set_dscp_codepoint (CORBA::Boolean set_network_prio return 0; } + +void +TAO_IIOP_Connection_Handler::abort (void) +{ + struct linger lval; + lval.l_onoff = 1; + lval.l_linger = 0; + + if (this->peer ().set_option(SOL_SOCKET, + SO_LINGER, + (void*) &lval, + sizeof (lval)) == -1) + { + if (TAO_debug_level) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) Unable to set ") + ACE_TEXT ("SO_LINGER on %d\n"), + this->peer ().get_handle ())); + } + } +} + + + //@@ CONNECTION_HANDLER_SPL_COPY_HOOK_END /* * End copy hook diff --git a/TAO/tao/IIOP_Connection_Handler.h b/TAO/tao/IIOP_Connection_Handler.h index 08e2b7478af..16f647affa0 100644 --- a/TAO/tao/IIOP_Connection_Handler.h +++ b/TAO/tao/IIOP_Connection_Handler.h @@ -109,6 +109,11 @@ public: virtual int open_handler (void *); + /// This is used during a canceled connection attempt. Force the + /// SO_LINGER timeout to 0 so that when the peer is closed, it won't + /// hang around. + void abort (void); + protected: /// Constructor that could be used by the derived classes. diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index 793fd0f7859..8756adac2bb 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -10,11 +10,14 @@ #include "tao/Connect_Strategy.h" #include "tao/Thread_Lane_Resources.h" #include "tao/Profile_Transport_Resolver.h" +#include "tao/Base_Transport_Property.h" #include "tao/Transport.h" #include "tao/Wait_Strategy.h" #include "tao/SystemException.h" +#include "tao/LF_Multi_Event.h" #include "ace/OS_NS_strings.h" #include "ace/OS_NS_string.h" +#include "ace/OS_NS_time.h" ACE_RCSID (tao, IIOP_Connector, @@ -22,6 +25,51 @@ ACE_RCSID (tao, TAO_BEGIN_VERSIONED_NAMESPACE_DECL + + +//----------------------------------------------------------------------------- + +/** + * @class TAO_Event_Handler_Array_var + * + * @brief Auto pointer like class for an array of Event Handlers. + * + * Used to manage lifecycle of handlers. This class calls + * ACE_Event_Handler::remove_reference() on each handler in its destructor + * This class started out life as a replacement for the ACE_Event_Handle_var + * but is now pared down to be very specific in its role.. + */ +class TAO_IIOP_Connection_Handler_Array_Guard +{ +public: + TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p, unsigned count); + ~TAO_IIOP_Connection_Handler_Array_Guard (void); + +private: + /// Handler. + TAO_IIOP_Connection_Handler **ptr_; + unsigned count_; +}; + +TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p, + unsigned count) + : ptr_ (p), + count_ (count) +{ +} + +TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void) +{ + if (this->ptr_ != 0) + { + for (unsigned i = 0; i < this->count_; i++) + this->ptr_[i]->remove_reference (); + } +} + +//--------------------------------------------------------------------------- + + TAO_IIOP_Connector::~TAO_IIOP_Connector (void) { } @@ -81,6 +129,12 @@ TAO_IIOP_Connector::close (void) } int +TAO_IIOP_Connector::supports_parallel_connects(void) const +{ + return 1; +} + +int TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint) { TAO_IIOP_Endpoint *iiop_endpoint = @@ -105,8 +159,8 @@ TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) IIOP connection failed.\n") - ACE_TEXT ("TAO (%P|%t) This is most likely ") + ACE_TEXT ("(%P|%t) IIOP connection failed.\n") + ACE_TEXT (" This is most likely ") ACE_TEXT ("due to a hostname lookup ") ACE_TEXT ("failure.\n"))); } @@ -122,12 +176,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, TAO_Transport_Descriptor_Interface &desc, ACE_Time_Value *timeout) { + TAO_IIOP_Connection_Handler *svc_handler = 0; TAO_IIOP_Endpoint *iiop_endpoint = - this->remote_endpoint (desc.endpoint ()); - + this->remote_endpoint (desc.endpoint()); + int result = -1; if (iiop_endpoint == 0) return 0; + result = this->begin_connection (svc_handler, r, iiop_endpoint, timeout); + + if (result == -1 && errno != EWOULDBLOCK) + { + // connect completed unsuccessfully + svc_handler->remove_reference(); + // Give users a clue to the problem. + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT("connection to <%s:%d> failed (%p)\n"), + ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()), + iiop_endpoint->port (), + ACE_TEXT("errno"))); + } + return 0; + } + TAO_IIOP_Connection_Handler **sh_ptr = &svc_handler; + TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint; + TAO_LF_Multi_Event mev; + mev.add_event(svc_handler); + return this->complete_connection (result, sh_ptr, ep_ptr, 1U, r, &mev, timeout); +} + +TAO_Transport * +TAO_IIOP_Connector::make_parallel_connection (TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface &desc, + ACE_Time_Value *timeout) +{ + TAO_Endpoint *root_ep = desc.endpoint(); + unsigned max_count = 1; + unsigned long ns_stagger = + this->orb_core()->orb_params()->parallel_connect_delay(); + unsigned long sec_stagger = ns_stagger/1000; + ns_stagger = (ns_stagger % 1000) * 1000000; + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + max_count++; + + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("make_parallel_connection, ") + ACE_TEXT ("to %d endpoints\n"), max_count)); + TAO_IIOP_Endpoint **eplist = 0; + TAO_IIOP_Connection_Handler **shlist = 0; + ACE_NEW_RETURN (shlist,TAO_IIOP_Connection_Handler *[max_count], 0); + ACE_NEW_RETURN (eplist, TAO_IIOP_Endpoint *[max_count], 0); + + TAO_LF_Multi_Event mev; + int result = 0; + unsigned count = 0; + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + { + eplist[count] = this->remote_endpoint (ep); + shlist[count] = 0; + result = this->begin_connection (shlist[count], + r, + eplist[count], + timeout); + + // The connection may fail because it is slow, or for other reasons. + // If it was an incomplete non-blocking connection, add it to the list + // to be waited on, otherwise remove the reference to the handler and + // move on to the next endpoint. + if (result == -1) + { + if (errno == EWOULDBLOCK) + { + mev.add_event(shlist[count++]); + if (ep->next() != 0) + { + struct timespec nsleep = {sec_stagger, ns_stagger}; + ACE_OS::nanosleep (&nsleep); + result = this->active_connect_strategy_->poll (&mev); + if (result != -1) + break; + } + } + else + { + shlist[count]->remove_reference(); // done bump the list count + } + continue; + } + + if (result != -1) // we have a winner! + { + count++; + break; // no waiting involved since a connection is completed + } + } + + TAO_Transport *winner = 0; + if (count > 0) // only complete if at least one pending or success + winner = this->complete_connection (result,shlist,eplist,count,r,&mev,timeout); + delete [] shlist; // reference reductions should have been done already + delete [] eplist; + return winner; +} + +int +TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler, + TAO::Profile_Transport_Resolver *r, + TAO_IIOP_Endpoint *iiop_endpoint, + ACE_Time_Value *timeout) +{ const ACE_INET_Addr &remote_address = iiop_endpoint->object_addr (); @@ -146,8 +312,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "to <%s:%d> which should %s\n", + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::begin_connection, ") + ACE_TEXT ("to <%s:%d> which should %s\n"), ACE_TEXT_CHAR_TO_TCHAR(iiop_endpoint->host()), iiop_endpoint->port(), r->blocked_connect () ? ACE_TEXT("block") : ACE_TEXT("nonblock"))); @@ -167,7 +333,7 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, timeout = &tmp_zero; } - TAO_IIOP_Connection_Handler *svc_handler = 0; + svc_handler = 0; int result = this->base_connector_.connect (svc_handler, @@ -193,62 +359,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, // once the connect() returns since this might be too late if // another thread pick up the completion and potentially deletes the // handler before we get a chance to increment the reference count. + return result; +} - // Make sure that we always do a remove_reference - ACE_Event_Handler_var svc_handler_auto_ptr (svc_handler); - - TAO_Transport *transport = svc_handler->transport (); - - if (result == -1) +TAO_Transport * +TAO_IIOP_Connector::complete_connection (int result, + TAO_IIOP_Connection_Handler **&sh_list, + TAO_IIOP_Endpoint **ep_list, + unsigned count, + TAO::Profile_Transport_Resolver *r, + TAO_LF_Multi_Event *mev, + ACE_Time_Value *timeout) +{ + // Make sure that we always do a remove_reference for every member + // of the list + TAO_IIOP_Connection_Handler_Array_Guard svc_handler_auto_ptr (sh_list,count); + TAO_Transport *transport = 0; + TAO_Transport **tlist = 0; + ACE_NEW_RETURN (tlist,TAO_Transport*[count],0); + + // populate the transport list + for (unsigned i = 0; i < count; i++) + tlist[i] = sh_list[i]->transport(); + + if (result != -1) + { + // We received a compeleted connection and 0 or more pending. + // the winner is the last member of the list, because the + // iterator stopped on a successful connect. + transport = tlist[count-1]; + } + else { - // No immediate result, wait for completion - if (errno == EWOULDBLOCK) + if (count == 1) { - // Try to wait until connection completion. Incase we block, then we - // get a connected transport or not. In case of non block we get - // a connected or not connected transport + transport = tlist[0]; if (!this->wait_for_connection_completion (r, transport, timeout)) { if (TAO_debug_level > 2) - ACE_ERROR ((LM_ERROR, "TAO (%P|%t) - IIOP_Connector::" - "make_connection, " - "wait for completion failed\n")); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("complete_connection, wait for completion ") + ACE_TEXT ("failed for 1 pending connect\n"))); } } else { - // Transport is not usable - transport = 0; + if (!this->wait_for_connection_completion (r, + transport, + tlist, + count, + mev, + timeout)) + { + if (TAO_debug_level > 2) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("complete_connection, wait for completion ") + ACE_TEXT ("failed for %d pending connects\n"), + count)); + } + } + } + // At this point, the connection has be successfully created + // connected or not connected, but we have a connection. + TAO_IIOP_Connection_Handler *svc_handler = 0; + TAO_IIOP_Endpoint *iiop_endpoint = 0; + + if (transport != 0) + { + for (unsigned i = 0; i < count; i++) + { + if (transport == tlist[i]) + { + svc_handler = sh_list[i]; + iiop_endpoint = ep_list[i]; + break; + } } } + + // Done with the transport list + delete [] tlist; + // In case of errors transport is zero if (transport == 0) { // Give users a clue to the problem. if (TAO_debug_level > 3) + { + for (unsigned i = 0; i < count; i++) ACE_DEBUG ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "connection to <%s:%d> failed (%p)\n", - iiop_endpoint->host (), iiop_endpoint->port (), + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT("connection to <%s:%d> failed (%p)\n"), + ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()), + ep_list[i]->port (), ACE_TEXT("errno"))); + } return 0; } - // At this point, the connection has be successfully created - // connected or not connected, but we have a connection. + if (TAO_debug_level > 2) + { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "new %s connection to <%s:%d> on Transport[%d]\n", - transport->is_connected() ? "connected" : "not connected", - iiop_endpoint->host (), + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::make_connection, ") + ACE_TEXT ("new %s connection to <%s:%d> on Transport[%d]\n"), + transport->is_connected() ? + ACE_TEXT("connected") : ACE_TEXT("not connected"), + ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()), iiop_endpoint->port (), svc_handler->peer ().get_handle ())); + } + TAO_Base_Transport_Property desc(iiop_endpoint,0); // Add the handler to Cache int retval = this->orb_core ()->lane_resources ().transport_cache ().cache_transport ( @@ -264,8 +492,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "could not add the new connection to cache\n")); + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT ("could not add new connection to cache\n"))); } return 0; @@ -285,9 +513,9 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector [%d]::make_connection, " - "could not register the transport " - "in the reactor.\n", + ACE_TEXT ("(%P|%t) IIOP_Connector [%d]::make_connection, ") + ACE_TEXT ("could not register the transport ") + ACE_TEXT ("in the reactor.\n"), transport->id ())); return 0; @@ -395,10 +623,15 @@ TAO_IIOP_Connector::cancel_svc_handler ( // Cancel from the connector if (handler) + { + handler->abort(); return this->base_connector_.cancel (handler); + } return -1; } + + //@@ TAO_CONNECTOR_SPL_COPY_HOOK_END TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/IIOP_Connector.h b/TAO/tao/IIOP_Connector.h index f465814dbc6..c1026010a09 100644 --- a/TAO/tao/IIOP_Connector.h +++ b/TAO/tao/IIOP_Connector.h @@ -90,12 +90,23 @@ public: //@@ TAO_CONNECTOR_SPL_PUBLIC_METHODS_COPY_HOOK_END protected: + /// A flag indicating the actual connector supports parallel + /// connection attempts. The base implementation always returns + /// 0. Override to return non-zero if parallel connection attempts + /// may be tried. + virtual int supports_parallel_connects (void) const; // = The TAO_Connector methods, please check the documentation on // Transport_Connector.h int set_validate_endpoint (TAO_Endpoint *ep); - TAO_Transport *make_connection (TAO::Profile_Transport_Resolver *r, + virtual TAO_Transport *make_connection ( + TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface &desc, + ACE_Time_Value *timeout = 0); + + virtual TAO_Transport *make_parallel_connection ( + TAO::Profile_Transport_Resolver *r, TAO_Transport_Descriptor_Interface &desc, ACE_Time_Value *timeout = 0); @@ -118,6 +129,26 @@ protected: const bool lite_flag_; private: + /// This is the first half of making a connection. Both make_connection + /// and make_parallel_connection will start out using begin_connection. + int begin_connection (TAO_IIOP_Connection_Handler *&svc_handler, + TAO::Profile_Transport_Resolver *r, + TAO_IIOP_Endpoint *endpoint, + ACE_Time_Value *timeout = 0); + + /// This is the second half of making a connection when several endpoints + /// are involved. This works with modified wait strategies to wait for one + /// of many transports, and when once completes it will cancel the rest. + /// The winning transport is returned. + TAO_Transport *complete_connection (int result, + TAO_IIOP_Connection_Handler **&sh_list, + TAO_IIOP_Endpoint **ep_list, + unsigned count, + TAO::Profile_Transport_Resolver *r, + TAO_LF_Multi_Event *mev, + ACE_Time_Value *timeout = 0); + + /// Return the remote endpoint, a helper function TAO_IIOP_Endpoint *remote_endpoint (TAO_Endpoint *ep); diff --git a/TAO/tao/IIOP_Endpoint.cpp b/TAO/tao/IIOP_Endpoint.cpp index a595be39d13..ac13a05101a 100644 --- a/TAO/tao/IIOP_Endpoint.cpp +++ b/TAO/tao/IIOP_Endpoint.cpp @@ -244,6 +244,70 @@ TAO_IIOP_Endpoint::next (void) } TAO_Endpoint * +TAO_IIOP_Endpoint::next_filtered (TAO_ORB_Core * orb_core, TAO_Endpoint *root) +{ + bool want_ipv6 = false; + bool ipv6_only = false; + bool prefer_ipv6 = false; +#if defined (ACE_HAS_IPV6) + want_ipv6 = 1; + ipv6_only = orb_core->orb_params()->connect_ipv6_only(); + prefer_ipv6 = orb_core->orb_params()->prefer_ipv6_interfaces(); +#else + ACE_UNUSED_ARG (orb_core); +#endif /* ACE_HAS_IPV6 */ + return + this->next_filtered_i (static_cast<TAO_IIOP_Endpoint *>(root), + ipv6_only, + prefer_ipv6, + want_ipv6); +} + +TAO_IIOP_Endpoint* +TAO_IIOP_Endpoint::next_filtered_i (TAO_IIOP_Endpoint *root, + bool ipv6_only, + bool prefer_ipv6, + bool want_ipv6) +{ + TAO_IIOP_Endpoint *candidate = (root == 0) ? this : next_; + +#if defined (ACE_HAS_IPV6) + if (ipv6_only) + { + if (candidate == 0 || candidate->is_ipv6_decimal()) + return candidate; + const ACE_INET_Addr &addr = candidate->object_addr (); + bool allowed = addr.get_type () == AF_INET6 && + !addr.is_ipv4_mapped_ipv6(); + + return allowed ? candidate : + candidate->next_filtered_i(root, ipv6_only, prefer_ipv6, true); + } + if (prefer_ipv6) + { + if (candidate == 0) + return !want_ipv6 ? candidate : + candidate->next_filtered_i(root, ipv6_only, prefer_ipv6, false); + + if (want_ipv6 == candidate->is_ipv6_decimal()) + return candidate; + + const ACE_INET_Addr &addr = candidate->object_addr (); + bool really_ipv6 = addr.get_type () == AF_INET6 && + !addr.is_ipv4_mapped_ipv6(); + return (want_ipv6 == really_ipv6) ? candidate : + candidate->next_filtered_i(root, ipv6_only, prefer_ipv6, want_ipv6); + } +#else + ACE_UNUSED_ARG (want_ipv6); + ACE_UNUSED_ARG (ipv6_only); + ACE_UNUSED_ARG (prefer_ipv6); +#endif + + return candidate; +} + +TAO_Endpoint * TAO_IIOP_Endpoint::duplicate (void) { TAO_IIOP_Endpoint *endpoint = 0; diff --git a/TAO/tao/IIOP_Endpoint.h b/TAO/tao/IIOP_Endpoint.h index 197c4f03a0e..fff92575b94 100644 --- a/TAO/tao/IIOP_Endpoint.h +++ b/TAO/tao/IIOP_Endpoint.h @@ -85,6 +85,23 @@ public: virtual TAO_Endpoint *next (void); + /** + * Return the next endpoint in the list, but use protocol-specific + * filtering to constrain the value. The orb core is needed to supply + * any sort of filter arguments, and the root endpoint is needed in case + * the algorithm needs to rewind. If the supplied root is 0, then this + * is assumed to be the candidate next endpoint. + * + * To use this, the caller starts off the change with root == 0. This + * is a bit of a violation in logic, a more correct implementation would + * accept this == 0 and a non-null root. + * To do iteration using next_filtered, do: + * for (TAO_Endpoint *ep = root_endpoint->next_filtered (orb_core, 0); + * ep != 0; + * ep = ep->next_filtered(orb_core, root_endpoint)) { } + */ + virtual TAO_Endpoint *next_filtered (TAO_ORB_Core *, TAO_Endpoint *root); + virtual int addr_to_string (char *buffer, size_t length); /// Makes a copy of @c this @@ -141,6 +158,11 @@ public: TAO_IIOP_Endpoint & operator= (const TAO_IIOP_Endpoint& other); private: + TAO_IIOP_Endpoint *next_filtered_i (TAO_IIOP_Endpoint *root, + bool ipv6_only, + bool prefer_ipv6, + bool want_ipv6); + //@@ TAO_ENDPOINT_SPL_PRIVATE_DATA_COPY_HOOK_START diff --git a/TAO/tao/IIOP_Profile.cpp b/TAO/tao/IIOP_Profile.cpp index a9b421046b4..4866b52a071 100644 --- a/TAO/tao/IIOP_Profile.cpp +++ b/TAO/tao/IIOP_Profile.cpp @@ -202,13 +202,16 @@ TAO_IIOP_Profile::parse_string_i (const char *ior { // A port number or port name was specified. CORBA::ULong length_port = okd - cp_pos - 1; - CORBA::String_var tmp = CORBA::string_alloc (length_port); ACE_OS::strncpy (tmp.inout (), cp_pos + 1, length_port); tmp[length_port] = '\0'; - - if (ACE_OS::strspn (tmp.in (), "1234567890") == length_port) + if (length_port == 0) + { + this->endpoint_.port_ = 2809; // default IIOP port for + // parsing corbaloc strings + } + else if (ACE_OS::strspn (tmp.in (), "1234567890") == length_port) { this->endpoint_.port_ = static_cast<CORBA::UShort> (ACE_OS::atoi (tmp.in ())); @@ -426,6 +429,18 @@ TAO_IIOP_Profile::remove_generic_endpoint (TAO_Endpoint *ep) this->remove_endpoint(dynamic_cast<TAO_IIOP_Endpoint *>(ep)); } +void +TAO_IIOP_Profile::add_generic_endpoint (TAO_Endpoint *endp) +{ + TAO_IIOP_Endpoint *iep = dynamic_cast<TAO_IIOP_Endpoint *>(endp); + if (iep != 0) + { + TAO_IIOP_Endpoint *clone; + ACE_NEW (clone, TAO_IIOP_Endpoint(*iep)); + this->add_endpoint(clone); + } +} + char * TAO_IIOP_Profile::to_string (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) { diff --git a/TAO/tao/IIOP_Profile.h b/TAO/tao/IIOP_Profile.h index 3ca09cb2dd7..03cf95694bd 100644 --- a/TAO/tao/IIOP_Profile.h +++ b/TAO/tao/IIOP_Profile.h @@ -84,7 +84,10 @@ public: */ void remove_endpoint (TAO_IIOP_Endpoint *endp); - void remove_generic_endpoint (TAO_Endpoint *ep); + virtual void remove_generic_endpoint (TAO_Endpoint *ep); + + /// Add an endpoint when the specific endpoint type is unknown + virtual void add_generic_endpoint (TAO_Endpoint *ep); //@@ TAO_PROFILE_SPL_PUBLIC_METHODS_COPY_HOOK_END diff --git a/TAO/tao/Invocation_Endpoint_Selectors.cpp b/TAO/tao/Invocation_Endpoint_Selectors.cpp index bdb493bd648..5221ee8d9c5 100644 --- a/TAO/tao/Invocation_Endpoint_Selectors.cpp +++ b/TAO/tao/Invocation_Endpoint_Selectors.cpp @@ -14,7 +14,7 @@ ACE_RCSID (tao, "$Id$") -TAO_BEGIN_VERSIONED_NAMESPACE_DECL + TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Invocation_Endpoint_Selector::~TAO_Invocation_Endpoint_Selector (void) { @@ -27,98 +27,10 @@ TAO_Default_Endpoint_Selector::~TAO_Default_Endpoint_Selector (void) } void -TAO_Default_Endpoint_Selector::select_endpoint ( - TAO::Profile_Transport_Resolver *r, - ACE_Time_Value *max_wait_time - ACE_ENV_ARG_DECL) +TAO_Default_Endpoint_Selector::select_endpoint (TAO::Profile_Transport_Resolver *r, + ACE_Time_Value *max_wait_time + ACE_ENV_ARG_DECL) { -#if defined (ACE_HAS_IPV6) - // See if we have IIOP selector preference parameters set. - // If so we need to walk a different path in selecting an endpoint. - if (r->stub ()->orb_core ()->orb_params ()->prefer_ipv6_interfaces () || - r->stub ()->orb_core ()->orb_params ()->connect_ipv6_only ()) - { - bool test_ipv6 = true; // first round try IPv6 - - for (unsigned test_count = 2; test_count > 0 ;--test_count) - { - do - { - r->profile (r->stub ()->profile_in_use ()); - - // Check whether we need to do a blocked wait or we have a - // non-blocked wait and we support that. If this is not the - // case we can't use this profile so try the next. - if (r->blocked_connect () || - (!r->blocked_connect () && - r->profile ()->supports_non_blocking_oneways ())) - { - const size_t endpoint_count = - r->profile ()->endpoint_count (); - - TAO_Endpoint *ep = - r->profile ()->endpoint (); - - for (size_t i = 0; i < endpoint_count; ++i) - { - // in case we're running the first round: - // try this endpoint if it either *not* an IIOP endpoint OR - // if it is an IIOP IPv6 endpoint. - // else - // only try when it's an IPv4 IIOP endpoint - bool try_ep = test_ipv6; - - if (r->profile ()->tag () == IOP::TAG_INTERNET_IOP) - { - TAO_IIOP_Endpoint *iep = - dynamic_cast<TAO_IIOP_Endpoint *> (ep); - if (!iep->is_ipv6_decimal ()) - { - const ACE_INET_Addr &addr = iep->object_addr (); - - if (test_ipv6) - try_ep = - addr.get_type () == AF_INET6 && - !addr.is_ipv4_mapped_ipv6(); - else - try_ep = - addr.get_type () == AF_INET || - (addr.get_type () == AF_INET6 && - addr.is_ipv4_mapped_ipv6()); - } - } - - if (try_ep) - { - TAO_Base_Transport_Property desc (ep); - const bool retval = - r->try_connect (&desc, - max_wait_time - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - // Check if the connect has completed. - if (retval) - return; - } - - // Go to the next endpoint in this profile. - ep = ep->next (); - } - } - } - while (r->stub ()->next_profile_retry () != 0); - - // If we get here we did not find any suitable non-IPv4 endpoint so - // now try those if allowed - if (!r->stub ()->orb_core ()->orb_params ()->connect_ipv6_only ()) - test_ipv6 = false; - else - break; // Do not test remaining (if any) IPv4 endpoints - } - } - else -#endif /* ACE_HAS_IPV6 */ do { r->profile (r->stub ()->profile_in_use ()); @@ -127,18 +39,41 @@ TAO_Default_Endpoint_Selector::select_endpoint ( // non-blocked wait and we support that. If this is not the // case we can't use this profile so try the next. if (r->blocked_connect () || - (!r->blocked_connect () && r->profile ()->supports_non_blocking_oneways ())) + (!r->blocked_connect () && + r->profile ()->supports_non_blocking_oneways ())) { - size_t const endpoint_count = - r->profile ()->endpoint_count (); + if (r->profile ()->endpoint_count () > 1 && + r->use_parallel_connect()) + { + + TAO_Endpoint *ep = + r->profile ()->endpoint (); - TAO_Endpoint * ep = - r->profile ()->endpoint (); + TAO_Base_Transport_Property desc (ep); + bool success = + r->try_parallel_connect (&desc, + max_wait_time + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; - for (size_t i = 0; i < endpoint_count; ++i) + // Check if the connect has completed. + if (success) + return; + // The default implementation of try_parallel_connect returns + // a not supported errno. In this case, allow the ordinary + // connection mechanism to be tried. OTOH, if the connection + // attempt failed for some other reason, then abandon this + // profile and try the next one in the list. + else if (errno != ENOTSUP) + continue; + } + + for (TAO_Endpoint *ep = r->profile ()->first_filtered_endpoint (); + ep != 0; + ep = r->profile ()->next_filtered_endpoint (ep)) { TAO_Base_Transport_Property desc (ep); - bool const retval = + bool retval = r->try_connect (&desc, max_wait_time ACE_ENV_ARG_PARAMETER); @@ -147,14 +82,13 @@ TAO_Default_Endpoint_Selector::select_endpoint ( // Check if the connect has completed. if (retval) return; - - // Go to the next endpoint in this profile. - ep = ep->next (); } + } } while (r->stub ()->next_profile_retry () != 0); + // If we get here, we completely failed to find an endpoint selector // that we know how to use, so throw an exception. ACE_THROW (CORBA::TRANSIENT (CORBA::OMGVMCID | 2, diff --git a/TAO/tao/Invocation_Endpoint_Selectors.h b/TAO/tao/Invocation_Endpoint_Selectors.h index 29b5580d88a..bd2c5101889 100644 --- a/TAO/tao/Invocation_Endpoint_Selectors.h +++ b/TAO/tao/Invocation_Endpoint_Selectors.h @@ -99,6 +99,7 @@ public: virtual void select_endpoint (TAO::Profile_Transport_Resolver *r, ACE_Time_Value *val ACE_ENV_ARG_DECL); + }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/LF_CH_Event.h b/TAO/tao/LF_CH_Event.h index 2a92f9007ab..f9adcc5fb1b 100644 --- a/TAO/tao/LF_CH_Event.h +++ b/TAO/tao/LF_CH_Event.h @@ -23,6 +23,8 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class TAO_LF_Multi_Event; + /** * @class TAO_LF_CH_Event * @@ -35,6 +37,15 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL class TAO_Export TAO_LF_CH_Event: public TAO_LF_Event { public: + /** + * The TAO_LF_Multi_Event class is another specialization of + * TAO_LF_Event, used for aggregating many connection handlers into + * a single event object.. It requires friendship so that it can + * check the is_state_final() flag on each of its contained + * connection handlers. + */ + friend class TAO_LF_Multi_Event; + /// Constructor TAO_LF_CH_Event (void); diff --git a/TAO/tao/LF_Connect_Strategy.cpp b/TAO/tao/LF_Connect_Strategy.cpp index d1d71b550e3..bc401286334 100644 --- a/TAO/tao/LF_Connect_Strategy.cpp +++ b/TAO/tao/LF_Connect_Strategy.cpp @@ -1,4 +1,5 @@ #include "tao/LF_Connect_Strategy.h" +#include "tao/LF_Multi_Event.h" #include "tao/Connection_Handler.h" #include "tao/LF_Follower.h" #include "tao/Leader_Follower.h" @@ -41,41 +42,26 @@ TAO_LF_Connect_Strategy::synch_options (ACE_Time_Value *timeout, } int -TAO_LF_Connect_Strategy::wait (TAO_Connection_Handler *ch, +TAO_LF_Connect_Strategy::wait_i (TAO_LF_Event *ev, + TAO_Transport *transport, ACE_Time_Value *max_wait_time) { - ACE_ASSERT (ch != 0); - - return this->wait (ch->transport (), - max_wait_time); -} - -int -TAO_LF_Connect_Strategy::wait (TAO_Transport *transport, - ACE_Time_Value *max_wait_time) -{ - // Basically the connection was EINPROGRESS, but before we could - // wait for it some other thread detected a failure and cleaned up - // the connection handler. if (transport == 0) return -1; - TAO_Connection_Handler *ch = - transport->connection_handler (); - TAO_Leader_Follower &leader_follower = this->orb_core_->leader_follower (); int result = - leader_follower.wait_for_event (ch, + leader_follower.wait_for_event (ev, transport, max_wait_time); - // Set the result. - if (ch->error_detected () && result != -1) + if (ev->error_detected () && result != -1) result = -1; return result; } + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/LF_Connect_Strategy.h b/TAO/tao/LF_Connect_Strategy.h index 4cff8ed7976..35e6e514046 100644 --- a/TAO/tao/LF_Connect_Strategy.h +++ b/TAO/tao/LF_Connect_Strategy.h @@ -28,7 +28,7 @@ ACE_END_VERSIONED_NAMESPACE_DECL TAO_BEGIN_VERSIONED_NAMESPACE_DECL -class TAO_Connector; +class TAO_LF_Event; /** * @class TAO_LF_Connect_Strategy @@ -53,13 +53,11 @@ public: virtual void synch_options (ACE_Time_Value *val, ACE_Synch_Options &opt); - virtual int wait (TAO_Connection_Handler *ch, +protected: + virtual int wait_i (TAO_LF_Event *ev, + TAO_Transport *t, ACE_Time_Value *val); - virtual int wait (TAO_Transport *ch, - ACE_Time_Value *val); - - }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/LF_Event.h b/TAO/tao/LF_Event.h index 678be63cb3c..473fc8a4868 100644 --- a/TAO/tao/LF_Event.h +++ b/TAO/tao/LF_Event.h @@ -65,12 +65,15 @@ public: * method is used to bind the waiting thread to the event, in order * to let the event signal any important state changes. * + * This is virtual to allow the LF_Multi_Event derived type share + * the follower with all the subordinate LF_CH_Events. + * * @return -1 if the LF_Event is already bound, 0 otherwise */ - int bind (TAO_LF_Follower *follower); + virtual int bind (TAO_LF_Follower *follower); /// Unbind the follower - int unbind (void); + virtual int unbind (void); //@{ /** @name State management diff --git a/TAO/tao/LF_Multi_Event.cpp b/TAO/tao/LF_Multi_Event.cpp new file mode 100644 index 00000000000..c0252cf1bab --- /dev/null +++ b/TAO/tao/LF_Multi_Event.cpp @@ -0,0 +1,118 @@ +#include "tao/LF_Multi_Event.h" +#include "tao/Connection_Handler.h" +#include "ace/OS_Memory.h" + +ACE_RCSID(tao, + LF_Multi_Event, + "$Id$") + +TAO_LF_Multi_Event::TAO_LF_Multi_Event (void) + : TAO_LF_Event (), + events_ (0), + winner_ (0) +{ +} + +TAO_LF_Multi_Event::~TAO_LF_Multi_Event (void) +{ + while (this->events_ != 0) + { + Event_Node *n = this->events_->next_; + delete this->events_; + this->events_ = n; + } +} + +int +TAO_LF_Multi_Event::bind (TAO_LF_Follower *follower) +{ + if (this->TAO_LF_Event::bind(follower) == -1) + { + return -1; + } + + for (Event_Node *n = this->events_; n != 0; n = n->next_) + if (n->ptr_->bind(follower) == -1) + { + return -1; + } + return 0; +} + +int +TAO_LF_Multi_Event::unbind (void) +{ + if (this->TAO_LF_Event::unbind() == -1) + { + return -1; + } + + for (Event_Node *n = this->events_; n != 0; n = n->next_) + if (n->ptr_->unbind() == -1) + { + return -1; + } + return 0; +} + + +void +TAO_LF_Multi_Event::add_event (TAO_Connection_Handler *ev) +{ + Event_Node *node = 0; + ACE_NEW (node, Event_Node); + node->next_ = this->events_; + node->ptr_ = ev; + + this->events_ = node; +} + +TAO_Connection_Handler* +TAO_LF_Multi_Event::winner (void) +{ + return this->winner_; +} + +TAO_Transport * +TAO_LF_Multi_Event::base_transport (void) +{ + return (this->events_ == 0) ? 0 : this->events_->ptr_->transport(); +} + +int +TAO_LF_Multi_Event::successful (void) const +{ + for (Event_Node *n = this->events_; n != 0; n = n->next_) + if (n->ptr_->successful() == 1) + { + this->winner_ = n->ptr_; + return 1; + } + return 0; +} + +int +TAO_LF_Multi_Event::error_detected (void) const +{ + int result = 1; + for (Event_Node *n = this->events_; n != 0; n = n->next_) + if (n->ptr_->error_detected () == 0) + result = 0; + return result; +} + +void +TAO_LF_Multi_Event::state_changed_i (int ) +{ + // no-op +} + +int +TAO_LF_Multi_Event::is_state_final (void) +{ + int result = 1; + for (Event_Node *n = this->events_; n != 0; n = n->next_) + if (!n->ptr_->is_state_final () == 0) + result = 0; + return result; +} diff --git a/TAO/tao/LF_Multi_Event.h b/TAO/tao/LF_Multi_Event.h new file mode 100644 index 00000000000..11727f8ab16 --- /dev/null +++ b/TAO/tao/LF_Multi_Event.h @@ -0,0 +1,97 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file LF_Multi_Event.h + * + * $Id$ + * + * @author Phil Mesnier <mesnier_p@ociweb.com> + */ +//============================================================================= + +#ifndef TAO_LF_MULTI_EVENT_H +#define TAO_LF_MULTI_EVENT_H + +#include /**/ "ace/pre.h" + +#include "tao/LF_Event.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +// Forward decls +class TAO_Connection_Handler; +class TAO_Transport; + +/** + * @class TAO_LF_Multi_Event + * + * @brief Use the Leader/Follower loop to wait for one specific event + * in the invocation path. + * + * Used by the parallel connection strategy for waiting on one of many + * connections. + */ +class TAO_Export TAO_LF_Multi_Event: public TAO_LF_Event +{ +public: + /// Constructor + TAO_LF_Multi_Event (void); + + /// Destructor + virtual ~TAO_LF_Multi_Event (void); + + /// propogate the follower to all the events in the collection. + virtual int bind (TAO_LF_Follower *follower); + + /// Unbind the follower from all the collected events. + virtual int unbind (void); + + /// Adds a handler to the collection + void add_event (TAO_Connection_Handler *ch); + + /// Returns the connection handler that caused the successful status + /// to be returned. + TAO_Connection_Handler *winner(void); + + /// Returns the transport associated with the first entry in the collection. + TAO_Transport *base_transport(void); + + //@{ + /// Return 1 if the condition was satisfied successfully, 0 if it + /// has not - This iterates over the list of attached events and + /// returns 1 if any of them return 1 from successful. + int successful (void) const; + + /// Return 1 if an error was detected while waiting for the + /// event - This iterates over the list of events and returns + /// 1 only if all of them return 1 from error_detected. + int error_detected (void) const; + + //@} +protected: + + /// Validate the state change + virtual void state_changed_i (int new_state); + + /// Check whether we have reached the final state.. + virtual int is_state_final (void); + +private: + + struct Event_Node { + TAO_Connection_Handler * ptr_; + Event_Node *next_; + }; + + struct Event_Node *events_; + + mutable TAO_Connection_Handler * winner_; + +}; + +#include /**/ "ace/post.h" + +#endif /* TAO_LF_Multi_EVENT_H */ diff --git a/TAO/tao/MProfile.cpp b/TAO/tao/MProfile.cpp index 59e25c0bfb7..94fb9e4dbf0 100644 --- a/TAO/tao/MProfile.cpp +++ b/TAO/tao/MProfile.cpp @@ -364,4 +364,19 @@ TAO_MProfile::policy_list (ACE_ENV_SINGLE_ARG_DECL) return ret_val; } +int +TAO_MProfile::give_shared_profile (TAO_Profile *pfile) +{ + for (unsigned i = 0; i < this->last_; i++) + if (pfile->tag() == this->pfiles_[i]->tag() && + pfile->compare_key(this->pfiles_[i])) + { + this->pfiles_[i]->add_generic_endpoint(pfile->endpoint()); + pfile->_decr_refcnt(); + return i; + } + return this->give_profile(pfile,0); +} + + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/MProfile.h b/TAO/tao/MProfile.h index ba45b3a4a5d..7fef351e37c 100644 --- a/TAO/tao/MProfile.h +++ b/TAO/tao/MProfile.h @@ -140,7 +140,7 @@ public: /// Return the index of this entry or -1 if it can not be added. /// this object assumes ownership of this profile!! - int give_profile (TAO_Profile *pfile); + int give_profile (TAO_Profile *pfile, int share = 0); /// append the profiles in pfiles to this object. The count /// will be incremented on the individual profile objects. @@ -217,6 +217,12 @@ private: /// Helper method to implement the destructor void cleanup (void); + /// A helper to give_profile to be used when share is true. This + /// method is used primarily to help the corbaloc parser create a + /// single profile with multiple endpoints rather than constructing + /// multiple profiles with 1 endpoint per. + int give_shared_profile (TAO_Profile *pfile); + private: /** diff --git a/TAO/tao/MProfile.i b/TAO/tao/MProfile.i index b3ebb70abf5..2cf4d05d274 100644 --- a/TAO/tao/MProfile.i +++ b/TAO/tao/MProfile.i @@ -142,8 +142,10 @@ TAO_MProfile::rewind (void) current_ = 0; } ACE_INLINE int -TAO_MProfile::give_profile (TAO_Profile *pfile) +TAO_MProfile::give_profile (TAO_Profile *pfile, int share) { + if (share) + return this->give_shared_profile(pfile); // skip by the used slots if (last_ == size_) // full! return -1; diff --git a/TAO/tao/ORB_Core.cpp b/TAO/tao/ORB_Core.cpp index cd6c50da1c0..7ae1df67ec6 100644 --- a/TAO/tao/ORB_Core.cpp +++ b/TAO/tao/ORB_Core.cpp @@ -359,6 +359,8 @@ TAO_ORB_Core::init (int &argc, char *argv[] ACE_ENV_ARG_DECL) int linger = -1; + int use_parallel_connects = 0; + // Copy command line parameter not to use original. ACE_Argv_Type_Converter command_line (argc, argv); @@ -915,6 +917,20 @@ TAO_ORB_Core::init (int &argc, char *argv[] ACE_ENV_ARG_DECL) { negotiate_codesets = (ACE_OS::atoi (current_arg)); + + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter + (ACE_LIB_TEXT("-ORBUseParallelConnects")))) + { + use_parallel_connects = ACE_OS::atoi (current_arg); + arg_shifter.consume_arg (); + } + else if ((current_arg = arg_shifter.get_the_parameter + (ACE_LIB_TEXT("-ORBParallelConnectDelay")))) + { + this->orb_params ()->parallel_connect_delay + (ACE_OS::atoi (current_arg)); arg_shifter.consume_arg (); } else if (0 != (current_arg = arg_shifter.get_the_parameter @@ -1155,6 +1171,10 @@ TAO_ORB_Core::init (int &argc, char *argv[] ACE_ENV_ARG_DECL) this->orb_params ()->cache_incoming_by_dotted_decimal_address (no_server_side_name_lookups || dotted_decimal_addresses); + + this->orb_params ()->use_parallel_connects + (use_parallel_connects != 0); + this->orb_params ()->linger (linger); this->orb_params ()->nodelay (nodelay); this->orb_params ()->sock_keepalive (so_keepalive); diff --git a/TAO/tao/Profile.cpp b/TAO/tao/Profile.cpp index 1c4d2976c69..6807655a876 100644 --- a/TAO/tao/Profile.cpp +++ b/TAO/tao/Profile.cpp @@ -10,6 +10,7 @@ #include "tao/CDR.h" #include "tao/SystemException.h" #include "tao/PolicyC.h" +#include "tao/Endpoint.h" #include "ace/ACE.h" #include "ace/OS_NS_string.h" @@ -254,8 +255,8 @@ TAO_Profile::decode (TAO_InputCDR& cdr) encap_len)); } - // Decode any additional endpoints per profile. (At the present, - // only RTCORBA takes advantage of this feature.) + // Decode any additional endpoints per profile. This is used by RTCORBA + // and by IIOP when TAG_ALTERNATE_IIOP_ADDRESS components are present. if (this->decode_endpoints () == -1) { return -1; @@ -728,6 +729,36 @@ TAO_Profile::is_equivalent (const TAO_Profile *other) return result; } +CORBA::Boolean +TAO_Profile::compare_key (const TAO_Profile *other) const +{ + return (this->ref_object_key_ == other->ref_object_key_) || + ((this->ref_object_key_ != 0 && + other->ref_object_key_ != 0 && + this->ref_object_key_->object_key() == + other->ref_object_key_->object_key())); +} + +TAO_Endpoint * +TAO_Profile::first_filtered_endpoint (void) +{ + return this->endpoint()->next_filtered(this->orb_core_,0); +} + +TAO_Endpoint * +TAO_Profile::next_filtered_endpoint (TAO_Endpoint *source) +{ + if (source == 0) + return this->first_filtered_endpoint(); + return this->endpoint()->next_filtered(this->orb_core_,this->endpoint()); +} + +void +TAO_Profile::add_generic_endpoint (TAO_Endpoint *) +{ + // noop for the base type +} + TAO_Service_Callbacks::Profile_Equivalence TAO_Profile::is_equivalent_hook (const TAO_Profile *other) { @@ -904,6 +935,8 @@ TAO_Unknown_Profile::create_profile_body (TAO_OutputCDR &) const return; } + + // ************************************************************* // Operators for TAO_opaque encoding and decoding // ************************************************************* diff --git a/TAO/tao/Profile.h b/TAO/tao/Profile.h index 6382de6b44e..700b257c6ec 100644 --- a/TAO/tao/Profile.h +++ b/TAO/tao/Profile.h @@ -206,7 +206,7 @@ public: virtual int encode_alternate_endpoints (void); /** - * Return pointer to this profile's endpoint. If the profile + * Return a pointer to this profile's endpoint. If the profile * contains more than one endpoint, i.e., a list, the method returns * the head of the list. */ @@ -216,6 +216,19 @@ public: virtual CORBA::ULong endpoint_count (void) const = 0; /** + * Return the first endpoint in the list that matches some filtering + * constraint, such as IPv6 compatibility for IIOP endpoints. This + * method is implemented in terms of TAO_Endpoint;:next_filtered(). + */ + TAO_Endpoint *first_filtered_endpoint (void); + + /// Return the next filtered endpoint in the list after the one + /// passed in. This method is implemented in terms of + /// TAO_Endpoint;:next_filtered(). If the supplied source endpoint + /// is null, this returns the first filtered endpoint. + TAO_Endpoint *next_filtered_endpoint (TAO_Endpoint *source); + + /** * Remove the provided endpoint from the profile. Some * subclasses of TAO_Profile already have a protocol-specific * version of remove_endpoint, but this generic interface is @@ -226,6 +239,9 @@ public: */ virtual void remove_generic_endpoint (TAO_Endpoint *ep); + /// Add a protocol-agnostic endpoint + virtual void add_generic_endpoint (TAO_Endpoint *ep); + /// Verify profile equivalance. /** * Two profiles are equivalent if their tag, object_key, version @@ -238,6 +254,12 @@ public: */ CORBA::Boolean is_equivalent (const TAO_Profile* other_profile); + /** + * Compare the object key for this profile with that of + * another. This is weaker than is_equivalent + */ + CORBA::Boolean compare_key (const TAO_Profile *other) const; + /// Return a hash value for this object. virtual CORBA::ULong hash (CORBA::ULong max ACE_ENV_ARG_DECL) = 0; diff --git a/TAO/tao/Profile_Transport_Resolver.cpp b/TAO/tao/Profile_Transport_Resolver.cpp index 123dc6c1425..8f87bb6f619 100644 --- a/TAO/tao/Profile_Transport_Resolver.cpp +++ b/TAO/tao/Profile_Transport_Resolver.cpp @@ -14,6 +14,7 @@ #include "tao/Transport_Connector.h" #include "tao/Endpoint.h" #include "tao/SystemException.h" +#include "tao/Client_Strategy_Factory.h" #include "ace/Countdown_Time.h" @@ -119,7 +120,6 @@ namespace TAO } } - bool Profile_Transport_Resolver::try_connect ( TAO_Transport_Descriptor_Interface *desc, @@ -127,6 +127,28 @@ namespace TAO ACE_ENV_ARG_DECL ) { + return this->try_connect_i (desc,max_time_value,0 ACE_ENV_ARG_PARAMETER); + }; + + bool + Profile_Transport_Resolver::try_parallel_connect ( + TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *max_time_value + ACE_ENV_ARG_DECL + ) + { + return this->try_connect_i (desc,max_time_value,1 ACE_ENV_ARG_PARAMETER); + }; + + + bool + Profile_Transport_Resolver::try_connect_i ( + TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *max_time_value, + bool parallel + ACE_ENV_ARG_DECL + ) + { TAO_Connector_Registry *conn_reg = this->stub_->orb_core ()->connector_registry ( ACE_ENV_SINGLE_ARG_PARAMETER); @@ -147,28 +169,30 @@ namespace TAO bool const is_conn_timeout = this->get_connection_timeout (connection_timeout); + ACE_Time_Value *max_wait_time = + is_conn_timeout ? &connection_timeout : max_time_value; - ACE_Time_Value *max_wait_time = 0; - - if (is_conn_timeout == true) + if (parallel) { - max_wait_time = &connection_timeout; + this->transport_ = + conn_reg->get_connector (desc->endpoint ()->tag ())-> + parallel_connect (this, + desc, + max_wait_time + ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (false); } else { - max_wait_time = max_time_value; - } - - // Obtain a connection. this->transport_ = - conn_reg->get_connector (desc->endpoint ()->tag ())->connect ( - this, + conn_reg->get_connector (desc->endpoint ()->tag ())-> + connect (this, desc, max_wait_time ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (false); - + } // A timeout error occurred. // If the user has set a roundtrip timeout policy, throw a timeout // exception. Otherwise, just fall through and return false to @@ -193,6 +217,20 @@ namespace TAO } bool + Profile_Transport_Resolver::use_parallel_connect (void) const + { + TAO_ORB_Core *oc = this->stub_->orb_core(); + return (oc->orb_params()->use_parallel_connects() +#if 0 // it was decided that even with blocked connects + // parallel connects could be useful, at least for cache + // processing. + oc->client_factory()->connect_strategy() != + TAO_Client_Strategy_Factory::TAO_BLOCKED_CONNECT +#endif /* 0 */ + ); + } + + bool Profile_Transport_Resolver::get_connection_timeout ( ACE_Time_Value &max_wait_time) { diff --git a/TAO/tao/Profile_Transport_Resolver.h b/TAO/tao/Profile_Transport_Resolver.h index 5ec4889a76b..34cef016395 100644 --- a/TAO/tao/Profile_Transport_Resolver.h +++ b/TAO/tao/Profile_Transport_Resolver.h @@ -130,13 +130,27 @@ namespace TAO /// released back to the cache. void transport_released (void) const; - /// This is a callback method used by the endpoint selectors, to + /// This is a callback method used by the endpoint selectors to /// delegate the responsibility of reserving a transport from the - /// connection cache for this invocation. + /// connection cache for this invocation. When the descriptor + /// contains more than one endpoint (as part of a linked list) and + /// the parallel flag is true then the connector will look for a + /// connection on any of the endpoints if it supports that + /// behavior, otherwise an ENOTSUP errno will be set and the + /// method will return false. bool try_connect (TAO_Transport_Descriptor_Interface *desc, ACE_Time_Value *val ACE_ENV_ARG_DECL); + bool try_parallel_connect (TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *val + ACE_ENV_ARG_DECL); + + /// This method wraps a call to the orb core to see if parallel + /// connection attempts are even desired. This is controlled by + /// the -ORBUseParallelConnects 1|0 commandline option. + bool use_parallel_connect (void) const; + /// Initialize the inconsistent policy list that this object has /// cached. void init_inconsistent_policies (ACE_ENV_SINGLE_ARG_DECL) @@ -152,6 +166,10 @@ namespace TAO bool get_connection_timeout (ACE_Time_Value &max_wait_time); private: + bool try_connect_i (TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *val, + bool parallel + ACE_ENV_ARG_DECL); /// Target object mutable CORBA::Object *obj_; diff --git a/TAO/tao/Reactive_Connect_Strategy.cpp b/TAO/tao/Reactive_Connect_Strategy.cpp index 0883b3ee91c..17a9d2d642a 100644 --- a/TAO/tao/Reactive_Connect_Strategy.cpp +++ b/TAO/tao/Reactive_Connect_Strategy.cpp @@ -3,6 +3,7 @@ #include "tao/ORB_Core.h" #include "tao/debug.h" #include "tao/Transport.h" +#include "tao/LF_Multi_Event.h" #include "ace/Synch_Options.h" @@ -41,9 +42,14 @@ TAO_Reactive_Connect_Strategy::synch_options (ACE_Time_Value *timeout, } int -TAO_Reactive_Connect_Strategy::wait (TAO_Connection_Handler *ch, - ACE_Time_Value *max_wait_time) +TAO_Reactive_Connect_Strategy::wait_i (TAO_LF_Event *ev, + TAO_Transport *, + ACE_Time_Value * max_wait_time) { + int result = 0; + if (ev == 0) + return -1; + if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, @@ -51,12 +57,10 @@ TAO_Reactive_Connect_Strategy::wait (TAO_Connection_Handler *ch, ACE_TEXT ("connection completion - wait ()\n"))); } - int result = 0; - ACE_DECLARE_NEW_CORBA_ENV; ACE_TRY { - while (ch->keep_waiting ()) + while (ev->keep_waiting ()) { result = this->orb_core_->run (max_wait_time, 1 ACE_ENV_ARG_PARAMETER); @@ -84,7 +88,7 @@ TAO_Reactive_Connect_Strategy::wait (TAO_Connection_Handler *ch, ACE_ENDTRY; // Set the result. - if (ch->error_detected () && result != -1) + if (result != -1 && ev->error_detected ()) { result = -1; } @@ -92,15 +96,4 @@ TAO_Reactive_Connect_Strategy::wait (TAO_Connection_Handler *ch, return result; } -int -TAO_Reactive_Connect_Strategy::wait (TAO_Transport *t, - ACE_Time_Value *val) -{ - if (t == 0) - return -1; - - return this->wait (t->connection_handler (), - val); -} - TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Reactive_Connect_Strategy.h b/TAO/tao/Reactive_Connect_Strategy.h index 305b5e9576e..694bbb49560 100644 --- a/TAO/tao/Reactive_Connect_Strategy.h +++ b/TAO/tao/Reactive_Connect_Strategy.h @@ -28,8 +28,6 @@ ACE_END_VERSIONED_NAMESPACE_DECL TAO_BEGIN_VERSIONED_NAMESPACE_DECL -class TAO_Connector; - /** * @class TAO_Reactive_Connect_Strategy * @@ -53,12 +51,8 @@ public: virtual void synch_options (ACE_Time_Value *val, ACE_Synch_Options &opt); - virtual int wait (TAO_Connection_Handler *ch, - ACE_Time_Value *val); - - - virtual int wait (TAO_Transport *t, - ACE_Time_Value *val); +protected: + virtual int wait_i (TAO_LF_Event *ch, TAO_Transport *, ACE_Time_Value *val); }; diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index e4817c918de..7b730df42a9 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -1224,6 +1224,7 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (this->queue_message_i(message_block) == -1) { + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") ACE_TEXT ("cannot queue message for ") @@ -2335,7 +2336,6 @@ TAO_Transport::post_open (size_t id) ace_mon, *this->handler_lock_, false); - this->is_connected_ = true; } diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp index 844e29defa6..f7a742a87f4 100644 --- a/TAO/tao/Transport_Connector.cpp +++ b/TAO/tao/Transport_Connector.cpp @@ -7,11 +7,14 @@ #include "tao/Thread_Lane_Resources.h" #include "tao/debug.h" #include "tao/Connect_Strategy.h" +#include "tao/LF_Multi_Event.h" #include "tao/Client_Strategy_Factory.h" #include "tao/Connection_Handler.h" #include "tao/Profile_Transport_Resolver.h" #include "tao/Wait_Strategy.h" #include "tao/SystemException.h" +#include "tao/Endpoint.h" +#include "tao/Base_Transport_Property.h" #include "ace/OS_NS_string.h" @@ -242,6 +245,81 @@ TAO_Connector::make_mprofile (const char *string, return 0; // Success } +int +TAO_Connector::supports_parallel_connects(void) const +{ + return 0; // by default, we don't support parallel connection attempts; +} + +TAO_Transport* +TAO_Connector::make_parallel_connection (TAO::Profile_Transport_Resolver *, + TAO_Transport_Descriptor_Interface &, + ACE_Time_Value *) +{ + return 0; +} + + +TAO_Transport* +TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *timeout + ACE_ENV_ARG_DECL_NOT_USED) +{ + if (this->supports_parallel_connects() == 0) + { + errno = ENOTSUP; + return 0; + } + + errno = 0; // need to clear errno to ensure a stale enotsup is not set + if (desc == 0) + return 0; + unsigned int endpoint_count = 0; + TAO_Endpoint *root_ep = desc->endpoint(); + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + if (this->set_validate_endpoint (ep) == 0) + ++endpoint_count; + if (endpoint_count == 0) + return 0; + + TAO_Transport *base_transport = 0; + + TAO::Transport_Cache_Manager &tcm = + this->orb_core ()->lane_resources ().transport_cache (); + + // Iterate through the endpoints. Since find_transport takes a + // Transport Descriptor rather than an endpoint, we must create a + // local TDI for each endpoint. The first one found will be used. + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + { + TAO_Base_Transport_Property desc2(ep,0); + if (tcm.find_transport (&desc2, + base_transport) == 0) + { + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ") + ACE_TEXT ("found a transport [%d]\n"), + base_transport->id())); + return base_transport; + } + } + + // Now we have searched the cache on all endpoints and come up + // empty. We need to initiate connections on each of the + // endpoints. Presumably only one will have a route and will succeed, + // and the rest will fail. This requires the use of asynch + // connection establishment. Maybe a custom wait strategy is needed + // at this point to register several potential transports so that + // when one succeeds the rest are cancelled or closed. + + return this->make_parallel_connection (r,*desc,timeout); +} TAO_Transport* TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, @@ -280,8 +358,8 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 4) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::connect, " - "opening Transport[%d] in TAO_CLIENT_ROLE\n", + ACE_TEXT("(%P|%t) Transport_Connector::connect, ") + ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"), t->id ())); // Call post connect hook. If the post_connect_hook () returns @@ -315,10 +393,13 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, "TAO_UNSPECIFIED_ROLE" )); } - // If connected return.. + // If connected return. if (base_transport->is_connected ()) return base_transport; + // It it possible to get a transport from the cache that is not + // connected? If not, then the following code is bogus. We cannot + // wait for a connection to complete on a transport in the cache. if (!this->wait_for_connection_completion (r, base_transport, timeout)) @@ -432,10 +513,100 @@ TAO_Connector::wait_for_connection_completion ( } } + // Connection not ready yet but we can use this transport, if + // we need a connected one we will block later to make sure + // it is connected + return true; +} + +bool +TAO_Connector::wait_for_connection_completion ( + TAO::Profile_Transport_Resolver *r, + TAO_Transport *&the_winner, + TAO_Transport **transport, + unsigned int count, + TAO_LF_Multi_Event *mev, + ACE_Time_Value *timeout) +{ + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%P|%t) Transport_Connector::") + ACE_TEXT("wait_for_connection_completion, ") + ACE_TEXT("waiting for connection completion on ") + ACE_TEXT("%d transports, ["), + count)); + for (unsigned int i = 0; i < count; i++) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("%d%s"),transport[i]->id(), + (i < (count -1) ? ", " : "]\n"))); + } + + // If we don't need to block for a transport just set the timeout to + // be zero. + ACE_Time_Value tmp_zero (ACE_Time_Value::zero); + if (!r->blocked_connect ()) + { + timeout = &tmp_zero; + } + + int result = this->active_connect_strategy_->wait (mev,timeout); + the_winner = 0; + + if (result != -1) + { + the_winner = mev->winner()->transport(); + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%P|%t) Transport_Connector::") + ACE_TEXT("wait_for_connection_completion, ") + ACE_TEXT("transport [%d]\n"), + the_winner->id ())); + } + else if (errno == ETIME) + { + // this is the most difficult case. In this situation, there is no + // nominated by the Multi_Event. The best we can do is pick one of + // the pending connections. + // Of course, this shouldn't happen in any case, since the wait + // strategy is called with a timeout value of 0. + for (unsigned int i = 0; i < count; i++) + if (!transport[i]->connection_handler()->is_closed()) + { + the_winner = transport[i]; + break; + } + } + + // It is possible that we have more than one connection that happened + // to complete, or that none completed. Therefore we need to traverse + // the list and ensure that all of the losers are closed. + for (unsigned int i = 0; i < count; i++) + { + if (transport[i] != the_winner) + this->check_connection_closure (transport[i]->connection_handler()); + // since we are doing this on may connections, the result isn't + // particularly important. + } + + // In case of errors. + if (the_winner == 0) + { + // Report that making the connection failed, don't print errno + // because we touched the reactor and errno could be changed + if (TAO_debug_level > 2) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Transport_Connector::") + ACE_TEXT ("wait_for_connection_completion, failed\n") + )); + + return false; + } + // Fix for a subtle problem. What happens if we are supposed to do // blocked connect but the transport is NOT connected? Force close // the connections - if (r->blocked_connect () && !transport->is_connected ()) + if (r->blocked_connect () && !the_winner->is_connected ()) { if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, @@ -446,8 +617,8 @@ TAO_Connector::wait_for_connection_completion ( // Forget the return value. We are busted anyway. Try our best // here. - (void) this->cancel_svc_handler (transport->connection_handler ()); - transport = 0; + (void) this->cancel_svc_handler (the_winner->connection_handler ()); + the_winner = 0; return false; } diff --git a/TAO/tao/Transport_Connector.h b/TAO/tao/Transport_Connector.h index e29d0b251b3..f457ee5ec9a 100644 --- a/TAO/tao/Transport_Connector.h +++ b/TAO/tao/Transport_Connector.h @@ -41,6 +41,7 @@ class TAO_ORB_Core; class TAO_Connect_Strategy; class TAO_Transport; class TAO_Connection_Handler; +class TAO_LF_Multi_Event; namespace TAO { @@ -121,6 +122,14 @@ public: ACE_Time_Value *timeout ACE_ENV_ARG_DECL); + /// A variation on connect that will try simultanious connections + /// on all endpoints listed in the desc. + virtual TAO_Transport* parallel_connect ( + TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *timeout + ACE_ENV_ARG_DECL); + /// Create a profile for this protocol and initialize it based on the /// encapsulation in @a cdr virtual TAO_Profile *create_profile ( @@ -137,6 +146,10 @@ public: //@@ TAO_CONNECTOR_SPL_PUBLIC_METHODS_ADD_HOOK protected: + /// A flag indicating the actual connector supports parallel connection + /// attempts. The base implementation alwayse returns 0. Override to return + /// non-zero if parallel connection attempts may be tried. + virtual int supports_parallel_connects (void) const; /// Create a profile with a given endpoint. virtual TAO_Profile *make_profile (ACE_ENV_SINGLE_ARG_DECL) = 0; @@ -151,6 +164,13 @@ protected: TAO_Transport_Descriptor_Interface &desc, ACE_Time_Value *timeout) = 0; + /// Make a connection using - not a pure virtual since not all + /// protocols support this. + virtual TAO_Transport* make_parallel_connection ( + TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface &desc, + ACE_Time_Value *timeout); + /// Cancel the passed cvs handler from the connector virtual int cancel_svc_handler ( TAO_Connection_Handler *svc_handler) = 0; @@ -175,6 +195,17 @@ protected: TAO_Transport *&transport, ACE_Time_Value *timeout); + /// In the case of a parallel connection attempt, we take an array of + /// transports, and wait on any of them. When the first one completes, + /// the rest are closed. + virtual bool wait_for_connection_completion( + TAO::Profile_Transport_Resolver *r, + TAO_Transport *&the_winner, + TAO_Transport **transport, + unsigned int count, + TAO_LF_Multi_Event *mev, + ACE_Time_Value *timeout); + /// Set the ORB Core pointer void orb_core (TAO_ORB_Core *orb_core); diff --git a/TAO/tao/Transport_Descriptor_Interface.cpp b/TAO/tao/Transport_Descriptor_Interface.cpp index a58af5ac064..9db80555cf8 100644 --- a/TAO/tao/Transport_Descriptor_Interface.cpp +++ b/TAO/tao/Transport_Descriptor_Interface.cpp @@ -15,7 +15,7 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Transport_Descriptor_Interface::~TAO_Transport_Descriptor_Interface (void) { - if (this->endpoint_from_heap_) + if (this->release_) { delete this->endpoint_; } diff --git a/TAO/tao/Transport_Descriptor_Interface.h b/TAO/tao/Transport_Descriptor_Interface.h index 0934238d072..7cc891fecff 100644 --- a/TAO/tao/Transport_Descriptor_Interface.h +++ b/TAO/tao/Transport_Descriptor_Interface.h @@ -73,7 +73,7 @@ protected: /// Constructor TAO_Transport_Descriptor_Interface (TAO_Endpoint *endpoint, - CORBA::Boolean flag = 0); + CORBA::Boolean take_ownership = 0); /// The base property of the connection ie. the peer's endpoint TAO_Endpoint *endpoint_; @@ -83,7 +83,7 @@ protected: /// Is the endpoint allocated on the heap? If so, we will have to /// delete it when we destruct ourselves. - CORBA::Boolean endpoint_from_heap_; + CORBA::Boolean release_; }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Transport_Descriptor_Interface.inl b/TAO/tao/Transport_Descriptor_Interface.inl index 6d1bbcb6d56..4e667f946a9 100644 --- a/TAO/tao/Transport_Descriptor_Interface.inl +++ b/TAO/tao/Transport_Descriptor_Interface.inl @@ -7,10 +7,10 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE TAO_Transport_Descriptor_Interface::TAO_Transport_Descriptor_Interface ( TAO_Endpoint *endpoint, - CORBA::Boolean flag) + CORBA::Boolean take_ownership) : endpoint_ (endpoint), bidir_flag_ (false), - endpoint_from_heap_ (flag) + release_ (take_ownership) { } @@ -18,7 +18,7 @@ ACE_INLINE TAO_Transport_Descriptor_Interface::TAO_Transport_Descriptor_Interface (void) : endpoint_ (0), bidir_flag_ (false), - endpoint_from_heap_ (false) + release_ (false) { } diff --git a/TAO/tao/default_client.cpp b/TAO/tao/default_client.cpp index bc6d38cf6ab..6076024c440 100644 --- a/TAO/tao/default_client.cpp +++ b/TAO/tao/default_client.cpp @@ -331,6 +331,12 @@ TAO_Default_Client_Strategy_Factory::create_wait_strategy (TAO_Transport *transp return ws; } +TAO_Client_Strategy_Factory::Connect_Strategy +TAO_Default_Client_Strategy_Factory::connect_strategy (void) const +{ + return this->connect_strategy_; +} + TAO_Connect_Strategy * TAO_Default_Client_Strategy_Factory::create_connect_strategy (TAO_ORB_Core *orb_core) { diff --git a/TAO/tao/default_client.h b/TAO/tao/default_client.h index db6544d166d..e1660fb2041 100644 --- a/TAO/tao/default_client.h +++ b/TAO/tao/default_client.h @@ -62,6 +62,7 @@ public: virtual TAO_Connect_Strategy *create_connect_strategy (TAO_ORB_Core *); virtual ACE_Lock *create_ft_service_retention_id_lock (void); virtual bool use_cleanup_options (void) const; + virtual Connect_Strategy connect_strategy (void) const; protected: void report_option_value_error (const ACE_TCHAR* option_name, @@ -97,13 +98,6 @@ private: /// The wait-for-reply strategy. Wait_Strategy wait_strategy_; - enum Connect_Strategy - { - TAO_BLOCKED_CONNECT, - TAO_REACTIVE_CONNECT, - TAO_LEADER_FOLLOWER_CONNECT - }; - /// The connection initiation strategy. Connect_Strategy connect_strategy_; diff --git a/TAO/tao/params.cpp b/TAO/tao/params.cpp index a06608bedb2..afc01c91980 100644 --- a/TAO/tao/params.cpp +++ b/TAO/tao/params.cpp @@ -14,7 +14,6 @@ ACE_RCSID (tao, params, "$Id$") - TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_ORB_Parameters::TAO_ORB_Parameters (void) @@ -36,7 +35,9 @@ TAO_ORB_Parameters::TAO_ORB_Parameters (void) , sched_policy_ (THR_SCHED_DEFAULT) , scope_policy_ (THR_SCOPE_PROCESS) , single_read_optimization_ (1) - , shared_profile_ (0) + , shared_profile_ (1) + , use_parallel_connects_ (false) + , parallel_connect_delay_ (0) , pref_network_ () , disable_rt_collocation_resolver_ (false) , enforce_preferred_interfaces_ (false) diff --git a/TAO/tao/params.h b/TAO/tao/params.h index c65f9177e91..ba3df276a92 100644 --- a/TAO/tao/params.h +++ b/TAO/tao/params.h @@ -175,6 +175,16 @@ public: int shared_profile (void) const; void shared_profile (int x); + /// Want to use parallel connection attempts when profiles have multiple + /// endpoints. + bool use_parallel_connects(void) const; + void use_parallel_connects (bool x); + + /// The milliseconds delay used to stagger individual connection starts + /// when using parallel connects. + unsigned long parallel_connect_delay (void) const; + void parallel_connect_delay (unsigned long x); + /// Mutators and accessors for rt_collocation_resolver bool disable_rt_collocation_resolver (void) const; void disable_rt_collocation_resolver (bool); @@ -304,6 +314,16 @@ private: /// Shared Profile - Use the same profile for multiple endpoints int shared_profile_; + /// Use Parallel Connects - Try to connect to all endpoints in a + /// shared profile at once, use the first to complete. + int use_parallel_connects_; + + /// When using parallel connects, this delay is used to stagger connection + /// attempts. This gives a trade-off between opening more potential + /// connections than necessary vs increasing the potential time before + /// a good connection is discovered. Time is expressed in milliseconds. + unsigned long parallel_connect_delay_; + /// Preferred network interfaces as a string ACE_CString pref_network_; diff --git a/TAO/tao/params.i b/TAO/tao/params.i index d900d18a967..4109e06cb24 100644 --- a/TAO/tao/params.i +++ b/TAO/tao/params.i @@ -218,6 +218,30 @@ TAO_ORB_Parameters::single_read_optimization (int x) this->single_read_optimization_ = x; } +ACE_INLINE bool +TAO_ORB_Parameters::use_parallel_connects (void) const +{ + return this->use_parallel_connects_; +} + +ACE_INLINE void +TAO_ORB_Parameters::use_parallel_connects (bool x) +{ + this->use_parallel_connects_ = x; +} + +ACE_INLINE unsigned long +TAO_ORB_Parameters::parallel_connect_delay (void) const +{ + return this->parallel_connect_delay_; +} + +ACE_INLINE void +TAO_ORB_Parameters::parallel_connect_delay (unsigned long x) +{ + this->parallel_connect_delay_ = x; +} + ACE_INLINE int TAO_ORB_Parameters::shared_profile (void) const { diff --git a/TAO/tao/tao.mpc b/TAO/tao/tao.mpc index 8c3d1c34998..c514e95d123 100644 --- a/TAO/tao/tao.mpc +++ b/TAO/tao/tao.mpc @@ -113,6 +113,7 @@ project(TAO) : acelib, core, tao_output, taodefaults, pidl, extra_core { LF_Follower_Auto_Adder.cpp LF_Follower_Auto_Ptr.cpp LF_Invocation_Event.cpp + LF_Multi_Event.cpp LF_Strategy.cpp LF_Strategy_Complete.cpp LocalObject.cpp diff --git a/TAO/tests/Parallel_Connect_Strategy/Parallel_Connect_Strategy.mpc b/TAO/tests/Parallel_Connect_Strategy/Parallel_Connect_Strategy.mpc new file mode 100644 index 00000000000..29d26448590 --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/Parallel_Connect_Strategy.mpc @@ -0,0 +1,18 @@ +// -*- MPC -*- +// $Id$ + +project(*Server): taoserver, iortable { + idlflags += -Sa -St + Source_Files { + Test_i.cpp + server.cpp + } +} + +project(*Client): taoclient { + after += *Server + Source_Files { + TestC.cpp + client.cpp + } +} diff --git a/TAO/tests/Parallel_Connect_Strategy/README b/TAO/tests/Parallel_Connect_Strategy/README new file mode 100644 index 00000000000..75dac13ce01 --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/README @@ -0,0 +1,33 @@ +/** + +@page Parallel Connect Strategy Test README File + +This test is intended to demonstrate that the Parallel Connect +strategy improves performance of connection establishment regardless +of the wait strategy involved. + +The measurements are timed invocations based on the ACE_High_Res_Timer. + +Sample run_test.pl output: +LF wait strategy test +Starting invocation 1 - call completed in 6347 usec +Starting invocation 2 - call completed in 406 usec + +LF wait strategy, corbaloc test +Narrowing IOR - call completed in 5172 usec +Starting invocation 1 - call completed in 193 usec +Starting invocation 2 - call completed in 170 usec + +Reactive wait strategy test +Starting invocation 1 - call completed in 4469 usec +Starting invocation 2 - call completed in 361 usec + +Blocked wait strategy test +Starting invocation 1 - call completed in 189015027 usec +Starting invocation 2 - call completed in 408 usec + +No parallel connect test +Starting invocation 1 - call completed in 189014806 usec +Starting invocation 2 - call completed in 189012352 usec + +*/ diff --git a/TAO/tests/Parallel_Connect_Strategy/Test.idl b/TAO/tests/Parallel_Connect_Strategy/Test.idl new file mode 100644 index 00000000000..3c0976e106d --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/Test.idl @@ -0,0 +1,20 @@ +// +// $Id$ +// + +/// Put the interfaces in a module, to avoid global namespace pollution +module Test +{ + /// A very simple interface + interface Hello + { + /// Return a simple string + string get_string (); + + /// A method to shutdown the ORB + /** + * This method is used to simplify the test shutdown process + */ + oneway void shutdown (); + }; +}; diff --git a/TAO/tests/Parallel_Connect_Strategy/Test_i.cpp b/TAO/tests/Parallel_Connect_Strategy/Test_i.cpp new file mode 100644 index 00000000000..bdf487d4a54 --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/Test_i.cpp @@ -0,0 +1,23 @@ +// +// $Id$ +// +#include "Test_i.h" + +Hello::Hello (CORBA::ORB_ptr orb) + : orb_ (CORBA::ORB::_duplicate (orb)) +{ +} + +char * +Hello::get_string (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + return CORBA::string_dup ("Hello there!"); +} + +void +Hello::shutdown (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); +} diff --git a/TAO/tests/Parallel_Connect_Strategy/Test_i.h b/TAO/tests/Parallel_Connect_Strategy/Test_i.h new file mode 100644 index 00000000000..0fbcc7a4ab6 --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/Test_i.h @@ -0,0 +1,35 @@ +// -*- C++ -*- + +// +// $Id$ +// + +#ifndef TEST_I_H +#define TEST_I_H +#include /**/ "ace/pre.h" + +#include "TestS.h" + +/// Implement the Test::Hello interface +class Hello + : public virtual POA_Test::Hello +{ +public: + /// Constructor + Hello (CORBA::ORB_ptr orb); + + // = The skeleton methods + virtual char * get_string (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + virtual void shutdown (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + +private: + /// Use an ORB reference to conver strings to objects and shutdown + /// the application. + CORBA::ORB_var orb_; +}; + +#include /**/ "ace/post.h" +#endif /* TEST_I_H */ diff --git a/TAO/tests/Parallel_Connect_Strategy/blocked.conf b/TAO/tests/Parallel_Connect_Strategy/blocked.conf new file mode 100644 index 00000000000..2feddbe2302 --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/blocked.conf @@ -0,0 +1,3 @@ +# test for using the blocked connect strategy + +static Client_Strategy_Factory "-ORBConnectStrategy Blocked" diff --git a/TAO/tests/Parallel_Connect_Strategy/client.cpp b/TAO/tests/Parallel_Connect_Strategy/client.cpp new file mode 100644 index 00000000000..ee7747bb1ca --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/client.cpp @@ -0,0 +1,121 @@ +// $Id$ + +#include "TestC.h" +#include "ace/Get_Opt.h" +#include "ace/High_Res_Timer.h" + +ACE_RCSID(Hello, client, "$Id$") + +const char *ior = "file://test.ior"; +int kill_server = 0; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "k:x"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'k': + ior = get_opts.opt_arg (); + break; + case 'x': + kill_server = 1; + break; + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-k <ior> " + "-x " + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + CORBA::Object_var tmp = + orb->string_to_object(ior ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_High_Res_Timer hrt; + ACE_hrtime_t elapsed; + + ACE_DEBUG ((LM_DEBUG,"Narrowing IOR - ")); + hrt.start(); + + Test::Hello_var hello = + Test::Hello::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + hrt.stop(); + hrt.elapsed_microseconds (elapsed); + hrt.reset(); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("call completed in %d usec\n"), + elapsed )); + + if (CORBA::is_nil (hello.in ())) + { + ACE_ERROR_RETURN ((LM_DEBUG, + "Nil Test::Hello reference <%s>\n", + ior), + 1); + } + if (kill_server) + { + hello->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + else + { + ACE_DEBUG ((LM_DEBUG,"Starting invocation 1 - ")); + hrt.start(); + CORBA::String_var the_string = + hello->get_string (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + hrt.stop(); + hrt.elapsed_microseconds (elapsed); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("call completed in %d usec\n"), + elapsed )); + ACE_DEBUG ((LM_DEBUG,"Starting invocation 2 - ")); + hrt.reset(); + hrt.start(); + the_string = hello->get_string (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + hrt.stop(); + hrt.elapsed_microseconds (elapsed); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("call completed in %d usec\n"), + elapsed )); + } + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} diff --git a/TAO/tests/Parallel_Connect_Strategy/reactive.conf b/TAO/tests/Parallel_Connect_Strategy/reactive.conf new file mode 100644 index 00000000000..0317f1c624c --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/reactive.conf @@ -0,0 +1,3 @@ +# test for using the blocked connect strategy + +static Client_Strategy_Factory "-ORBConnectStrategy Reactive" diff --git a/TAO/tests/Parallel_Connect_Strategy/run_test.pl b/TAO/tests/Parallel_Connect_Strategy/run_test.pl new file mode 100755 index 00000000000..e663d69e42d --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/run_test.pl @@ -0,0 +1,97 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id$ +# -*- perl -*- + +use lib '../../../bin'; +use PerlACE::Run_Test; + +$iorfile = PerlACE::LocalFile ("altiiop.ior"); +unlink $iorfile; +$status = 0; +@bogus_eps = ("-orbendpoint iiop://localhost:10200/hostname_in_ior=126.0.0.123", + "-orbendpoint iiop://localhost:10202/hostname_in_ior=126.0.0.124"); +$valid_ep = "-orbendpoint iiop://localhost:10201"; + +$corbaloc = "corbaloc::126.0.0.123:10200,:localhost:10201,:126.0.0.124:10202/pcs_test"; + +$SV_ALT_IIOP = new PerlACE::Process ("server", "-ORBUseSharedProfile 1 -o $iorfile $bogus_eps[0] $valid_ep $bogus_eps[1]"); + +$CL_LF = new PerlACE::Process ("client", "-ORBuseParallelConnects 1 -k file://$iorfile"); +$CL_CORBALOC = new PerlACE::Process ("client", "-ORBuseParallelConnects 1 -k $corbaloc"); +$CL_Reactive = new PerlACE::Process ("client", "-ORBSvcConf reactive.conf -ORBuseParallelConnects 1 -k file://$iorfile"); +$CL_Blocked = new PerlACE::Process ("client", "-ORBSvcConf blocked.conf -ORBuseParallelConnects 1 -k file://$iorfile"); +$CL_None = new PerlACE::Process ("client", "-ORBuseParallelConnects 0 -k file://$iorfile"); +$CL_Shutdown = new PerlACE::Process ("client", "-ORBuseParallelConnects 1 -k file://$iorfile -x"); + +$SV_ALT_IIOP->Spawn (); + +if (PerlACE::waitforfile_timed ($iorfile, + $PerlACE::wait_interval_for_process_creation) == -1) { + print STDERR "ERROR: cannot find file <$iorfile>\n"; + $SV->Kill (); $SV->TimedWait (1); + exit 1; +} + +print "LF wait strategy test\n"; + +$client = $CL_LF->SpawnWaitKill (60); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +print "\nLF wait strategy, corbaloc test\n"; + +$client = $CL_CORBALOC->SpawnWaitKill (60); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +print "\nReactive wait strategy test\n"; + +$client = $CL_Reactive->SpawnWaitKill (60); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +print "\nBlocked wait strategy test\n"; + +$client = $CL_Blocked->SpawnWaitKill (600); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +print "\nNo parallel connect test\n"; + +$client = $CL_None->SpawnWaitKill (900); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +$client = $CL_Shutdown->SpawnWaitKill (60); + +if ($client != 0) { + print STDERR "ERROR: client returned $client\n"; + $status = 1; +} + +$server = $SV_ALT_IIOP->WaitKill (60); + +if ($server != 0) { + print STDERR "ERROR: server returned $server\n"; + $status = 1; +} + +exit $status; diff --git a/TAO/tests/Parallel_Connect_Strategy/server.cpp b/TAO/tests/Parallel_Connect_Strategy/server.cpp new file mode 100644 index 00000000000..153aba883ff --- /dev/null +++ b/TAO/tests/Parallel_Connect_Strategy/server.cpp @@ -0,0 +1,136 @@ +// $Id$ + +#include "Test_i.h" +#include "ace/Get_Opt.h" +#include "ace/OS_NS_stdio.h" +#include "tao/IORTable/IORTable.h" + +ACE_RCSID (Hello, + server, + "$Id$") + +const char *ior_output_file = "test.ior"; + +int +parse_args (int argc, char *argv[]) +{ + ACE_Get_Opt get_opts (argc, argv, "o:"); + int c; + + while ((c = get_opts ()) != -1) + switch (c) + { + case 'o': + ior_output_file = get_opts.opt_arg (); + break; + + case '?': + default: + ACE_ERROR_RETURN ((LM_ERROR, + "usage: %s " + "-o <iorfile>" + "\n", + argv [0]), + -1); + } + // Indicates sucessful parsing of the command line + return 0; +} + +int +main (int argc, char *argv[]) +{ + ACE_TRY_NEW_ENV + { + CORBA::ORB_var orb = + CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + CORBA::Object_var poa_object = + orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + PortableServer::POA_var root_poa = + PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (root_poa.in ())) + ACE_ERROR_RETURN ((LM_ERROR, + " (%P|%t) Panic: nil RootPOA\n"), + 1); + + PortableServer::POAManager_var poa_manager = + root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (parse_args (argc, argv) != 0) + return 1; + + Hello *hello_impl; + ACE_NEW_RETURN (hello_impl, + Hello (orb.in ()), + 1); + PortableServer::ServantBase_var owner_transfer(hello_impl); + + Test::Hello_var hello = + hello_impl->_this (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + + CORBA::String_var ior = + orb->object_to_string (hello.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + // Output the IOR to the <ior_output_file> + FILE *output_file= ACE_OS::fopen (ior_output_file, "w"); + if (output_file == 0) + ACE_ERROR_RETURN ((LM_ERROR, + "Cannot open output file for writing IOR: %s", + ior_output_file), + 1); + ACE_OS::fprintf (output_file, "%s", ior.in ()); + ACE_OS::fclose (output_file); + + CORBA::Object_var table_object = + orb->resolve_initial_references ("IORTable" + ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + IORTable::Table_var adapter = + IORTable::Table::_narrow (table_object.in () ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + if (CORBA::is_nil (adapter.in ())) + { + ACE_ERROR ((LM_ERROR, "Nil IORTable\n")); + } + else + { + adapter->bind ("pcs_test", ior.in() ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + } + + poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->run (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "(%P|%t) server - event loop finished\n")); + + root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + + orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + } + ACE_CATCHANY + { + ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, + "Exception caught:"); + return 1; + } + ACE_ENDTRY; + + return 0; +} |