diff options
Diffstat (limited to 'TAO/tao/Transport_Connector.cpp')
-rw-r--r-- | TAO/tao/Transport_Connector.cpp | 421 |
1 files changed, 288 insertions, 133 deletions
diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp index a02d8c6070f..71841690ce3 100644 --- a/TAO/tao/Transport_Connector.cpp +++ b/TAO/tao/Transport_Connector.cpp @@ -27,47 +27,6 @@ ACE_RCSID (tao, Connector, "$Id$") -namespace -{ - class TransportCleanupGuard - { - public: - - TransportCleanupGuard (TAO_Transport *tp) - : tp_ (tp) - , awake_ (true) - { - } - - ~TransportCleanupGuard () - { - if (this->awake_ && this->tp_) - { - // Purge from the connection cache. If we are not in the - // cache, this does nothing. - this->tp_->purge_entry (); - - // Close the handler. - this->tp_->close_connection (); - - this->tp_->remove_reference (); - } - } - - /// Turn off the guard. - void down () - { - this->awake_ = false; - } - - private: - - TAO_Transport * const tp_; - bool awake_; - - }; -} - TAO_BEGIN_VERSIONED_NAMESPACE_DECL // Connector @@ -322,8 +281,9 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r, ep = ep->next_filtered(this->orb_core(),root_ep)) { TAO_Base_Transport_Property desc2(ep,0); - if (tcm.find_transport (&desc2, - base_transport) == 0) + size_t busy_count = 0; + if (tcm.find_transport (&desc2, base_transport, busy_count) == + TAO::Transport_Cache_Manager::CACHE_FOUND_AVAILABLE ) { if (TAO_debug_level) { @@ -347,123 +307,293 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r, return this->make_parallel_connection (r,*desc,timeout); } -TAO_Transport* -TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, - TAO_Transport_Descriptor_Interface *desc, - ACE_Time_Value *timeout) +bool +TAO_Connector::wait_for_transport(TAO::Profile_Transport_Resolver *r, + TAO_Transport *transport, + ACE_Time_Value *timeout, + bool force_wait) { - if (desc == 0 || - (this->set_validate_endpoint (desc->endpoint ()) == -1)) - return 0; - - TAO_Transport *base_transport = 0; - - TAO::Transport_Cache_Manager &tcm = - this->orb_core ()->lane_resources ().transport_cache (); - - // Check the Cache first for connections - // If transport found, reference count is incremented on assignment - // @@todo: We need to send the timeout value to the cache registry - // too. That should be the next step! - if (tcm.find_transport (desc, - base_transport) != 0) + if (transport->connection_handler ()->is_timeout ()) { - // @@TODO: This is not the right place for this! - // Purge connections (if necessary) - tcm.purge (); + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT("transport [%d], Connection Timed out.\n"), + transport->id () )); + } + } + else if (transport->connection_handler()->is_closed ()) + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT("transport [%d], Connection failed. (%d)\n"), + transport->id (), + errno)); + } - TAO_Transport* t = this->make_connection (r, *desc, timeout); + // purge from the connection cache. If we are not in the + // cache, this does nothing. + transport->purge_entry (); - if (t == 0) - return t; + // Close the handler. + transport->close_connection (); - t->opened_as (TAO::TAO_CLIENT_ROLE); + } - if (TAO_debug_level > 4) + if (transport->connection_handler ()->is_open ()) + { + if (TAO_debug_level > 5) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("TAO (%P|%t) - Transport_Connector::connect, ") - ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"), - t->id ())); + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT("transport [%d], connection is open: no wait.\n"), + transport->id () )); } - // Call post connect hook. If the post_connect_hook () returns - // false, just purge the entry. - if (!t->post_connect_hook ()) - { - (void) t->purge_entry (); - - // Call connect again - return this->connect (r, desc, timeout); - } - - return t; + return true; } + else if (force_wait || r->blocked_connect ()) + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT(" waiting on transport [%d]\n"), + transport->id () )); + } + int result = this->active_connect_strategy_->wait (transport, timeout); + if (result == -1 && errno == ETIME) + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT(" timeout while waiting on transport [%d]\n"), + transport->id () )); + } + } + else if(result == -1) + { + if (TAO_debug_level > 2) + { + static int complain10times = 10; + if (complain10times > 0) + { + complain10times -= 1; + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT(" unknown error waiting on transport [%d] (%d)\n"), + transport->id (), + errno)); + } + } + // purge from the connection cache. If we are not in the + // cache, this does nothing. + transport->purge_entry (); - if (TAO_debug_level > 4) - { - TAO::Connection_Role cr = base_transport->opened_as (); + // Close the handler. + transport->close_connection (); + } + else + { + if (TAO_debug_level > 5) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT("transport [%d], wait completed ok.\n"), + transport->id () )); + } + return true; + } + } + else + { + if (TAO_debug_level > 2) + { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::connect, " - "got an existing %s Transport[%d] in role %s\n", - base_transport->is_connected () ? "connected" : "unconnected", - base_transport->id (), - cr == TAO::TAO_SERVER_ROLE ? "TAO_SERVER_ROLE" : - cr == TAO::TAO_CLIENT_ROLE ? "TAO_CLIENT_ROLE" : - "TAO_UNSPECIFIED_ROLE" )); + ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ") + ACE_TEXT(" Connection not complete [%d]\n"), + transport->id () )); } + transport->connection_handler ()->reset_state( + TAO_LF_Event::LFS_CONNECTION_WAIT); - // If connected return. - if (base_transport->is_connected ()) - return base_transport; + return true; + } - // It it possible to get a transport from the cache that is not - // connected? If not, then the following code is bogus. We cannot - // wait for a connection to complete on a transport in the cache. - // - // (mesnier_p@ociweb.com) It is indeed possible to reach this point. - // The AMI_Buffering test does. When using non-blocking connects and - // the first request(s) are asynch and may be queued, the connection - // establishment may not be completed by the time the invocation is - // done with it. In that case it is up to a subsequent invocation to - // handle the connection completion. - - TransportCleanupGuard tg(base_transport); - if (!this->wait_for_connection_completion (r, base_transport, timeout)) - { - if (TAO_debug_level > 2) + return false; +} + +TAO_Transport* +TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface *desc, + ACE_Time_Value *timeout) +{ + if (desc == 0 || + (this->set_validate_endpoint (desc->endpoint ()) == -1)) + return 0; + + TAO::Transport_Cache_Manager &tcm = + this->orb_core ()->lane_resources ().transport_cache (); + // Stay in this loop until we find: + // a usable connection, or a timeout happens + while (true) + { + // Find a connection in the cache + // If transport found, reference count is incremented on assignment + TAO_Transport *base_transport = 0; + size_t busy_count = 0; + TAO::Transport_Cache_Manager::Find_Result found = + tcm.find_transport (desc, + base_transport, + busy_count); + + if (found == TAO::Transport_Cache_Manager::CACHE_FOUND_AVAILABLE ) { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Connector::" - "connect, " - "wait for completion failed\n")); + // one last check before using the cached connection + if (base_transport->connection_handler()->error_detected ()) + { + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) Transport_Connector::connect") + ACE_TEXT(" error in transport from cache\n"))); + } + (void) base_transport->close_connection (); + (void) base_transport->purge_entry (); + base_transport = 0; + } + else + { + if (TAO_debug_level > 4) + { + TAO::Connection_Role cr = base_transport->opened_as (); + + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::connect, " + "got an existing %s Transport[%d] in role %s\n", + base_transport->is_connected () ? "connected" : "unconnected", + base_transport->id (), + cr == TAO::TAO_SERVER_ROLE ? "TAO_SERVER_ROLE" : + cr == TAO::TAO_CLIENT_ROLE ? "TAO_CLIENT_ROLE" : + "TAO_UNSPECIFIED_ROLE" )); + } + return base_transport; + } } - return 0; - } - - if (base_transport->is_connected () && - base_transport->wait_strategy ()->register_handler () == -1) - { - // Registration failures. - if (TAO_debug_level > 0) + else if (found == TAO::Transport_Cache_Manager::CACHE_FOUND_CONNECTING ) { - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Connector [%d]::connect, " - "could not register the transport " - "in the reactor.\n", - base_transport->id ())); + if (r->blocked_connect ()) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Connector::waiting") + ACE_TEXT(" for connection on transport [%d]\n"), + base_transport->id ())); + } + + // If wait_for_transport returns no errors, the base_transport + // points to the connection we wait for. + if (this->wait_for_transport (r, base_transport, timeout, false)) + { + // be sure this transport is registered with the reactor + // before using it. + if (!base_transport->register_if_necessary ()) + { + base_transport->remove_reference (); + return 0; + } + } + // In either success or failure cases of wait_for_transport, the + // ref counter in corresponding to the ref counter added by + // find_transport is decremented. + base_transport->remove_reference (); + } + else + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Connector::non-blocking: returning unconnected transport [%d]\n"), + base_transport->id ())); + } + + // return the transport in it's current, unconnected state + return base_transport; + } + } + else + { + // @todo: This is not the right place for this! (bugzilla 3023) + // Purge connections (if necessary) + tcm.purge (); + bool make_new_connection = + (found == TAO::Transport_Cache_Manager::CACHE_FOUND_NONE) || + (found == TAO::Transport_Cache_Manager::CACHE_FOUND_BUSY + && this->new_connection_is_ok (busy_count)); + + if (make_new_connection) + { + // we aren't going to use the transport returned from the cache (if any) + if (base_transport != 0) + { + base_transport->remove_reference (); + } + + base_transport = this->make_connection (r, *desc, timeout); + if (base_transport == 0) + { + return base_transport; + } + + // Should this code be moved? If so, where to? + base_transport->opened_as (TAO::TAO_CLIENT_ROLE); + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Transport_Connector::connect, ") + ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"), + base_transport->id ())); + } + + // Call post connect hook. If the post_connect_hook () returns + // false, just purge the entry. + if (!base_transport->post_connect_hook ()) + { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - Post_connect_hook failed. ") + ACE_TEXT("Purging transport[%d]\n"), + base_transport->id ())); + } + (void) base_transport->purge_entry (); + } + // The new transport is in the cache. We'll pick it up from there + // next time thru this loop (using it from here causes more problems + // than it fixes due to the changes that allow a new connection to be + // re-used by a nested upcall before we get back here.) + base_transport->remove_reference (); + } + else // not making new connection + { + (void) this->wait_for_transport (r, base_transport, timeout, true); + base_transport->remove_reference (); + } } - return 0; } - - tg.down (); - return base_transport; } bool TAO_Connector::wait_for_connection_completion ( TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface& desc, TAO_Transport *&transport, ACE_Time_Value *timeout) { @@ -516,6 +646,18 @@ TAO_Connector::wait_for_connection_completion ( } else { + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "caching connection before wait_for_connection_completion " + "%d = [%d]\n", + desc.hash(), + transport->id ())); + } + TAO::Transport_Cache_Manager &tcm = + this->orb_core ()->lane_resources ().transport_cache (); + tcm.cache_transport(&desc, transport, TAO::ENTRY_CONNECTING); if (TAO_debug_level > 2) { ACE_DEBUG ((LM_DEBUG, @@ -525,6 +667,7 @@ TAO_Connector::wait_for_connection_completion ( "[%d]\n", transport->id ())); } + result = this->active_connect_strategy_->wait (transport, timeout); if (TAO_debug_level > 2) @@ -721,6 +864,18 @@ TAO_Connector::create_connect_strategy (void) return 0; } +bool +TAO_Connector::new_connection_is_ok (size_t busy_count) +{ + if (orb_core_ == 0) + return true; + + unsigned int mux_limit = orb_core_->resource_factory () + ->max_muxed_connections (); + + return mux_limit == 0 || busy_count < mux_limit; +} + int TAO_Connector::check_connection_closure ( TAO_Connection_Handler *connection_handler) |