summaryrefslogtreecommitdiff
path: root/TAO/tao/Transport_Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Transport_Connector.cpp')
-rw-r--r--TAO/tao/Transport_Connector.cpp421
1 files changed, 288 insertions, 133 deletions
diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp
index a02d8c6070f..71841690ce3 100644
--- a/TAO/tao/Transport_Connector.cpp
+++ b/TAO/tao/Transport_Connector.cpp
@@ -27,47 +27,6 @@ ACE_RCSID (tao,
Connector,
"$Id$")
-namespace
-{
- class TransportCleanupGuard
- {
- public:
-
- TransportCleanupGuard (TAO_Transport *tp)
- : tp_ (tp)
- , awake_ (true)
- {
- }
-
- ~TransportCleanupGuard ()
- {
- if (this->awake_ && this->tp_)
- {
- // Purge from the connection cache. If we are not in the
- // cache, this does nothing.
- this->tp_->purge_entry ();
-
- // Close the handler.
- this->tp_->close_connection ();
-
- this->tp_->remove_reference ();
- }
- }
-
- /// Turn off the guard.
- void down ()
- {
- this->awake_ = false;
- }
-
- private:
-
- TAO_Transport * const tp_;
- bool awake_;
-
- };
-}
-
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
// Connector
@@ -322,8 +281,9 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r,
ep = ep->next_filtered(this->orb_core(),root_ep))
{
TAO_Base_Transport_Property desc2(ep,0);
- if (tcm.find_transport (&desc2,
- base_transport) == 0)
+ size_t busy_count = 0;
+ if (tcm.find_transport (&desc2, base_transport, busy_count) ==
+ TAO::Transport_Cache_Manager::CACHE_FOUND_AVAILABLE )
{
if (TAO_debug_level)
{
@@ -347,123 +307,293 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r,
return this->make_parallel_connection (r,*desc,timeout);
}
-TAO_Transport*
-TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
- TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *timeout)
+bool
+TAO_Connector::wait_for_transport(TAO::Profile_Transport_Resolver *r,
+ TAO_Transport *transport,
+ ACE_Time_Value *timeout,
+ bool force_wait)
{
- if (desc == 0 ||
- (this->set_validate_endpoint (desc->endpoint ()) == -1))
- return 0;
-
- TAO_Transport *base_transport = 0;
-
- TAO::Transport_Cache_Manager &tcm =
- this->orb_core ()->lane_resources ().transport_cache ();
-
- // Check the Cache first for connections
- // If transport found, reference count is incremented on assignment
- // @@todo: We need to send the timeout value to the cache registry
- // too. That should be the next step!
- if (tcm.find_transport (desc,
- base_transport) != 0)
+ if (transport->connection_handler ()->is_timeout ())
{
- // @@TODO: This is not the right place for this!
- // Purge connections (if necessary)
- tcm.purge ();
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT("transport [%d], Connection Timed out.\n"),
+ transport->id () ));
+ }
+ }
+ else if (transport->connection_handler()->is_closed ())
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT("transport [%d], Connection failed. (%d)\n"),
+ transport->id (),
+ errno));
+ }
- TAO_Transport* t = this->make_connection (r, *desc, timeout);
+ // purge from the connection cache. If we are not in the
+ // cache, this does nothing.
+ transport->purge_entry ();
- if (t == 0)
- return t;
+ // Close the handler.
+ transport->close_connection ();
- t->opened_as (TAO::TAO_CLIENT_ROLE);
+ }
- if (TAO_debug_level > 4)
+ if (transport->connection_handler ()->is_open ())
+ {
+ if (TAO_debug_level > 5)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("TAO (%P|%t) - Transport_Connector::connect, ")
- ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
- t->id ()));
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT("transport [%d], connection is open: no wait.\n"),
+ transport->id () ));
}
- // Call post connect hook. If the post_connect_hook () returns
- // false, just purge the entry.
- if (!t->post_connect_hook ())
- {
- (void) t->purge_entry ();
-
- // Call connect again
- return this->connect (r, desc, timeout);
- }
-
- return t;
+ return true;
}
+ else if (force_wait || r->blocked_connect ())
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT(" waiting on transport [%d]\n"),
+ transport->id () ));
+ }
+ int result = this->active_connect_strategy_->wait (transport, timeout);
+ if (result == -1 && errno == ETIME)
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT(" timeout while waiting on transport [%d]\n"),
+ transport->id () ));
+ }
+ }
+ else if(result == -1)
+ {
+ if (TAO_debug_level > 2)
+ {
+ static int complain10times = 10;
+ if (complain10times > 0)
+ {
+ complain10times -= 1;
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT(" unknown error waiting on transport [%d] (%d)\n"),
+ transport->id (),
+ errno));
+ }
+ }
+ // purge from the connection cache. If we are not in the
+ // cache, this does nothing.
+ transport->purge_entry ();
- if (TAO_debug_level > 4)
- {
- TAO::Connection_Role cr = base_transport->opened_as ();
+ // Close the handler.
+ transport->close_connection ();
+ }
+ else
+ {
+ if (TAO_debug_level > 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT("transport [%d], wait completed ok.\n"),
+ transport->id () ));
+ }
+ return true;
+ }
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::connect, "
- "got an existing %s Transport[%d] in role %s\n",
- base_transport->is_connected () ? "connected" : "unconnected",
- base_transport->id (),
- cr == TAO::TAO_SERVER_ROLE ? "TAO_SERVER_ROLE" :
- cr == TAO::TAO_CLIENT_ROLE ? "TAO_CLIENT_ROLE" :
- "TAO_UNSPECIFIED_ROLE" ));
+ ACE_TEXT("TAO (%P|%t) - TAO_Connector::wait_for_transport, ")
+ ACE_TEXT(" Connection not complete [%d]\n"),
+ transport->id () ));
}
+ transport->connection_handler ()->reset_state(
+ TAO_LF_Event::LFS_CONNECTION_WAIT);
- // If connected return.
- if (base_transport->is_connected ())
- return base_transport;
+ return true;
+ }
- // It it possible to get a transport from the cache that is not
- // connected? If not, then the following code is bogus. We cannot
- // wait for a connection to complete on a transport in the cache.
- //
- // (mesnier_p@ociweb.com) It is indeed possible to reach this point.
- // The AMI_Buffering test does. When using non-blocking connects and
- // the first request(s) are asynch and may be queued, the connection
- // establishment may not be completed by the time the invocation is
- // done with it. In that case it is up to a subsequent invocation to
- // handle the connection completion.
-
- TransportCleanupGuard tg(base_transport);
- if (!this->wait_for_connection_completion (r, base_transport, timeout))
- {
- if (TAO_debug_level > 2)
+ return false;
+}
+
+TAO_Transport*
+TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
+ TAO_Transport_Descriptor_Interface *desc,
+ ACE_Time_Value *timeout)
+{
+ if (desc == 0 ||
+ (this->set_validate_endpoint (desc->endpoint ()) == -1))
+ return 0;
+
+ TAO::Transport_Cache_Manager &tcm =
+ this->orb_core ()->lane_resources ().transport_cache ();
+ // Stay in this loop until we find:
+ // a usable connection, or a timeout happens
+ while (true)
+ {
+ // Find a connection in the cache
+ // If transport found, reference count is incremented on assignment
+ TAO_Transport *base_transport = 0;
+ size_t busy_count = 0;
+ TAO::Transport_Cache_Manager::Find_Result found =
+ tcm.find_transport (desc,
+ base_transport,
+ busy_count);
+
+ if (found == TAO::Transport_Cache_Manager::CACHE_FOUND_AVAILABLE )
{
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector::"
- "connect, "
- "wait for completion failed\n"));
+ // one last check before using the cached connection
+ if (base_transport->connection_handler()->error_detected ())
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) Transport_Connector::connect")
+ ACE_TEXT(" error in transport from cache\n")));
+ }
+ (void) base_transport->close_connection ();
+ (void) base_transport->purge_entry ();
+ base_transport = 0;
+ }
+ else
+ {
+ if (TAO_debug_level > 4)
+ {
+ TAO::Connection_Role cr = base_transport->opened_as ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::connect, "
+ "got an existing %s Transport[%d] in role %s\n",
+ base_transport->is_connected () ? "connected" : "unconnected",
+ base_transport->id (),
+ cr == TAO::TAO_SERVER_ROLE ? "TAO_SERVER_ROLE" :
+ cr == TAO::TAO_CLIENT_ROLE ? "TAO_CLIENT_ROLE" :
+ "TAO_UNSPECIFIED_ROLE" ));
+ }
+ return base_transport;
+ }
}
- return 0;
- }
-
- if (base_transport->is_connected () &&
- base_transport->wait_strategy ()->register_handler () == -1)
- {
- // Registration failures.
- if (TAO_debug_level > 0)
+ else if (found == TAO::Transport_Cache_Manager::CACHE_FOUND_CONNECTING )
{
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector [%d]::connect, "
- "could not register the transport "
- "in the reactor.\n",
- base_transport->id ()));
+ if (r->blocked_connect ())
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport_Connector::waiting")
+ ACE_TEXT(" for connection on transport [%d]\n"),
+ base_transport->id ()));
+ }
+
+ // If wait_for_transport returns no errors, the base_transport
+ // points to the connection we wait for.
+ if (this->wait_for_transport (r, base_transport, timeout, false))
+ {
+ // be sure this transport is registered with the reactor
+ // before using it.
+ if (!base_transport->register_if_necessary ())
+ {
+ base_transport->remove_reference ();
+ return 0;
+ }
+ }
+ // In either success or failure cases of wait_for_transport, the
+ // ref counter in corresponding to the ref counter added by
+ // find_transport is decremented.
+ base_transport->remove_reference ();
+ }
+ else
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport_Connector::non-blocking: returning unconnected transport [%d]\n"),
+ base_transport->id ()));
+ }
+
+ // return the transport in it's current, unconnected state
+ return base_transport;
+ }
+ }
+ else
+ {
+ // @todo: This is not the right place for this! (bugzilla 3023)
+ // Purge connections (if necessary)
+ tcm.purge ();
+ bool make_new_connection =
+ (found == TAO::Transport_Cache_Manager::CACHE_FOUND_NONE) ||
+ (found == TAO::Transport_Cache_Manager::CACHE_FOUND_BUSY
+ && this->new_connection_is_ok (busy_count));
+
+ if (make_new_connection)
+ {
+ // we aren't going to use the transport returned from the cache (if any)
+ if (base_transport != 0)
+ {
+ base_transport->remove_reference ();
+ }
+
+ base_transport = this->make_connection (r, *desc, timeout);
+ if (base_transport == 0)
+ {
+ return base_transport;
+ }
+
+ // Should this code be moved? If so, where to?
+ base_transport->opened_as (TAO::TAO_CLIENT_ROLE);
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Transport_Connector::connect, ")
+ ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
+ base_transport->id ()));
+ }
+
+ // Call post connect hook. If the post_connect_hook () returns
+ // false, just purge the entry.
+ if (!base_transport->post_connect_hook ())
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Post_connect_hook failed. ")
+ ACE_TEXT("Purging transport[%d]\n"),
+ base_transport->id ()));
+ }
+ (void) base_transport->purge_entry ();
+ }
+ // The new transport is in the cache. We'll pick it up from there
+ // next time thru this loop (using it from here causes more problems
+ // than it fixes due to the changes that allow a new connection to be
+ // re-used by a nested upcall before we get back here.)
+ base_transport->remove_reference ();
+ }
+ else // not making new connection
+ {
+ (void) this->wait_for_transport (r, base_transport, timeout, true);
+ base_transport->remove_reference ();
+ }
}
- return 0;
}
-
- tg.down ();
- return base_transport;
}
bool
TAO_Connector::wait_for_connection_completion (
TAO::Profile_Transport_Resolver *r,
+ TAO_Transport_Descriptor_Interface& desc,
TAO_Transport *&transport,
ACE_Time_Value *timeout)
{
@@ -516,6 +646,18 @@ TAO_Connector::wait_for_connection_completion (
}
else
{
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "caching connection before wait_for_connection_completion "
+ "%d = [%d]\n",
+ desc.hash(),
+ transport->id ()));
+ }
+ TAO::Transport_Cache_Manager &tcm =
+ this->orb_core ()->lane_resources ().transport_cache ();
+ tcm.cache_transport(&desc, transport, TAO::ENTRY_CONNECTING);
if (TAO_debug_level > 2)
{
ACE_DEBUG ((LM_DEBUG,
@@ -525,6 +667,7 @@ TAO_Connector::wait_for_connection_completion (
"[%d]\n",
transport->id ()));
}
+
result = this->active_connect_strategy_->wait (transport, timeout);
if (TAO_debug_level > 2)
@@ -721,6 +864,18 @@ TAO_Connector::create_connect_strategy (void)
return 0;
}
+bool
+TAO_Connector::new_connection_is_ok (size_t busy_count)
+{
+ if (orb_core_ == 0)
+ return true;
+
+ unsigned int mux_limit = orb_core_->resource_factory ()
+ ->max_muxed_connections ();
+
+ return mux_limit == 0 || busy_count < mux_limit;
+}
+
int
TAO_Connector::check_connection_closure (
TAO_Connection_Handler *connection_handler)