diff options
Diffstat (limited to 'TAO/tao/IIOP_Connector.cpp')
-rw-r--r-- | TAO/tao/IIOP_Connector.cpp | 307 |
1 files changed, 270 insertions, 37 deletions
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index 91cd5db0599..dd7aa107fe7 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -10,11 +10,14 @@ #include "tao/Connect_Strategy.h" #include "tao/Thread_Lane_Resources.h" #include "tao/Profile_Transport_Resolver.h" +#include "tao/Base_Transport_Property.h" #include "tao/Transport.h" #include "tao/Wait_Strategy.h" #include "tao/SystemException.h" +#include "tao/LF_Multi_Event.h" #include "ace/OS_NS_strings.h" #include "ace/OS_NS_string.h" +#include "ace/OS_NS_time.h" ACE_RCSID (tao, IIOP_Connector, @@ -22,6 +25,51 @@ ACE_RCSID (tao, TAO_BEGIN_VERSIONED_NAMESPACE_DECL + + +//----------------------------------------------------------------------------- + +/** + * @class TAO_Event_Handler_Array_var + * + * @brief Auto pointer like class for an array of Event Handlers. + * + * Used to manage lifecycle of handlers. This class calls + * ACE_Event_Handler::remove_reference() on each handler in its destructor + * This class started out life as a replacement for the ACE_Event_Handle_var + * but is now pared down to be very specific in its role.. + */ +class TAO_IIOP_Connection_Handler_Array_Guard +{ +public: + TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p, unsigned count); + ~TAO_IIOP_Connection_Handler_Array_Guard (void); + +private: + /// Handler. + TAO_IIOP_Connection_Handler **ptr_; + unsigned count_; +}; + +TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p, + unsigned count) + : ptr_ (p), + count_ (count) +{ +} + +TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void) +{ + if (this->ptr_ != 0) + { + for (unsigned i = 0; i < this->count_; i++) + this->ptr_[i]->remove_reference (); + } +} + +//--------------------------------------------------------------------------- + + TAO_IIOP_Connector::~TAO_IIOP_Connector (void) { } @@ -81,6 +129,12 @@ TAO_IIOP_Connector::close (void) } int +TAO_IIOP_Connector::supports_parallel_connects(void) const +{ + return 1; +} + +int TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint) { TAO_IIOP_Endpoint *iiop_endpoint = @@ -105,8 +159,8 @@ TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) IIOP connection failed.\n") - ACE_TEXT ("TAO (%P|%t) This is most likely ") + ACE_TEXT ("(%P|%t) IIOP connection failed.\n") + ACE_TEXT (" This is most likely ") ACE_TEXT ("due to a hostname lookup ") ACE_TEXT ("failure.\n"))); } @@ -122,12 +176,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, TAO_Transport_Descriptor_Interface &desc, ACE_Time_Value *timeout) { + TAO_IIOP_Connection_Handler *svc_handler = 0; TAO_IIOP_Endpoint *iiop_endpoint = - this->remote_endpoint (desc.endpoint ()); - + this->remote_endpoint (desc.endpoint()); + int result = -1; if (iiop_endpoint == 0) return 0; + result = this->begin_connection (svc_handler, r, iiop_endpoint, timeout); + + if (result == -1 && errno != EWOULDBLOCK) + { + // connect completed unsuccessfully + svc_handler->remove_reference(); + // Give users a clue to the problem. + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT("connection to <%s:%d> failed (%p)\n"), + ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()), + iiop_endpoint->port (), + ACE_TEXT("errno"))); + } + return 0; + } + TAO_IIOP_Connection_Handler **sh_ptr = &svc_handler; + TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint; + TAO_LF_Multi_Event mev; + mev.add_event(svc_handler); + return this->complete_connection (result, sh_ptr, ep_ptr, 1U, r, &mev, timeout); +} + +TAO_Transport * +TAO_IIOP_Connector::make_parallel_connection (TAO::Profile_Transport_Resolver *r, + TAO_Transport_Descriptor_Interface &desc, + ACE_Time_Value *timeout) +{ + TAO_Endpoint *root_ep = desc.endpoint(); + unsigned max_count = 1; + unsigned long ns_stagger = + this->orb_core()->orb_params()->parallel_connect_delay(); + unsigned long sec_stagger = ns_stagger/1000; + ns_stagger = (ns_stagger % 1000) * 1000000; + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + max_count++; + + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("make_parallel_connection, ") + ACE_TEXT ("to %d endpoints\n"), max_count)); + TAO_IIOP_Endpoint **eplist = 0; + TAO_IIOP_Connection_Handler **shlist = 0; + ACE_NEW_RETURN (shlist,TAO_IIOP_Connection_Handler *[max_count], 0); + ACE_NEW_RETURN (eplist, TAO_IIOP_Endpoint *[max_count], 0); + + TAO_LF_Multi_Event mev; + int result = 0; + unsigned count = 0; + for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0); + ep != 0; + ep = ep->next_filtered(this->orb_core(),root_ep)) + { + eplist[count] = this->remote_endpoint (ep); + shlist[count] = 0; + result = this->begin_connection (shlist[count], + r, + eplist[count], + timeout); + + // The connection may fail because it is slow, or for other reasons. + // If it was an incomplete non-blocking connection, add it to the list + // to be waited on, otherwise remove the reference to the handler and + // move on to the next endpoint. + if (result == -1) + { + if (errno == EWOULDBLOCK) + { + mev.add_event(shlist[count++]); + if (ep->next() != 0) + { + struct timespec nsleep = {sec_stagger, ns_stagger}; + ACE_OS::nanosleep (&nsleep); + result = this->active_connect_strategy_->poll (&mev); + if (result != -1) + break; + } + } + else + { + shlist[count]->remove_reference(); // done bump the list count + } + continue; + } + + if (result != -1) // we have a winner! + { + count++; + break; // no waiting involved since a connection is completed + } + } + + TAO_Transport *winner = 0; + if (count > 0) // only complete if at least one pending or success + winner = this->complete_connection (result,shlist,eplist,count,r,&mev,timeout); + delete [] shlist; // reference reductions should have been done already + delete [] eplist; + return winner; +} + +int +TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler, + TAO::Profile_Transport_Resolver *r, + TAO_IIOP_Endpoint *iiop_endpoint, + ACE_Time_Value *timeout) +{ const ACE_INET_Addr &remote_address = iiop_endpoint->object_addr (); @@ -146,8 +312,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "to <%s:%d> which should %s\n", + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::begin_connection, ") + ACE_TEXT ("to <%s:%d> which should %s\n"), ACE_TEXT_TO_TCHAR_IN(iiop_endpoint->host()), iiop_endpoint->port(), r->blocked_connect () ? ACE_TEXT("block") : ACE_TEXT("nonblock"))); @@ -167,7 +333,7 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, timeout = &tmp_zero; } - TAO_IIOP_Connection_Handler *svc_handler = 0; + svc_handler = 0; int result = this->base_connector_.connect (svc_handler, @@ -193,62 +359,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, // once the connect() returns since this might be too late if // another thread pick up the completion and potentially deletes the // handler before we get a chance to increment the reference count. + return result; +} - // Make sure that we always do a remove_reference - ACE_Event_Handler_var svc_handler_auto_ptr (svc_handler); - - TAO_Transport *transport = svc_handler->transport (); - - if (result == -1) +TAO_Transport * +TAO_IIOP_Connector::complete_connection (int result, + TAO_IIOP_Connection_Handler **&sh_list, + TAO_IIOP_Endpoint **ep_list, + unsigned count, + TAO::Profile_Transport_Resolver *r, + TAO_LF_Multi_Event *mev, + ACE_Time_Value *timeout) +{ + // Make sure that we always do a remove_reference for every member + // of the list + TAO_IIOP_Connection_Handler_Array_Guard svc_handler_auto_ptr (sh_list,count); + TAO_Transport *transport = 0; + TAO_Transport **tlist = 0; + ACE_NEW_RETURN (tlist,TAO_Transport*[count],0); + + // populate the transport list + for (unsigned i = 0; i < count; i++) + tlist[i] = sh_list[i]->transport(); + + if (result != -1) + { + // We received a compeleted connection and 0 or more pending. + // the winner is the last member of the list, because the + // iterator stopped on a successful connect. + transport = tlist[count-1]; + } + else { - // No immediate result, wait for completion - if (errno == EWOULDBLOCK) + if (count == 1) { - // Try to wait until connection completion. Incase we block, then we - // get a connected transport or not. In case of non block we get - // a connected or not connected transport + transport = tlist[0]; if (!this->wait_for_connection_completion (r, transport, timeout)) { if (TAO_debug_level > 2) - ACE_ERROR ((LM_ERROR, "TAO (%P|%t) - IIOP_Connector::" - "make_connection, " - "wait for completion failed\n")); + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("complete_connection, wait for completion ") + ACE_TEXT ("failed for 1 pending connect\n"))); } } else { - // Transport is not usable - transport = 0; + if (!this->wait_for_connection_completion (r, + transport, + tlist, + count, + mev, + timeout)) + { + if (TAO_debug_level > 2) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::") + ACE_TEXT ("complete_connection, wait for completion ") + ACE_TEXT ("failed for %d pending connects\n"), + count)); + } + } + } + // At this point, the connection has be successfully created + // connected or not connected, but we have a connection. + TAO_IIOP_Connection_Handler *svc_handler = 0; + TAO_IIOP_Endpoint *iiop_endpoint = 0; + + if (transport != 0) + { + for (unsigned i = 0; i < count; i++) + { + if (transport == tlist[i]) + { + svc_handler = sh_list[i]; + iiop_endpoint = ep_list[i]; + break; + } } } + + // Done with the transport list + delete [] tlist; + // In case of errors transport is zero if (transport == 0) { // Give users a clue to the problem. if (TAO_debug_level > 3) + { + for (unsigned i = 0; i < count; i++) ACE_DEBUG ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "connection to <%s:%d> failed (%p)\n", - iiop_endpoint->host (), iiop_endpoint->port (), + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT("connection to <%s:%d> failed (%p)\n"), + ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()), + ep_list[i]->port (), ACE_TEXT("errno"))); + } return 0; } - // At this point, the connection has be successfully created - // connected or not connected, but we have a connection. + if (TAO_debug_level > 2) + { ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "new %s connection to <%s:%d> on Transport[%d]\n", - transport->is_connected() ? "connected" : "not connected", - iiop_endpoint->host (), + ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::make_connection, ") + ACE_TEXT ("new %s connection to <%s:%d> on Transport[%d]\n"), + transport->is_connected() ? + ACE_TEXT("connected") : ACE_TEXT("not connected"), + ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()), iiop_endpoint->port (), svc_handler->peer ().get_handle ())); + } + TAO_Base_Transport_Property desc(iiop_endpoint,0); // Add the handler to Cache int retval = this->orb_core ()->lane_resources ().transport_cache ().cache_transport ( @@ -264,8 +492,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 0) { ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector::make_connection, " - "could not add the new connection to cache\n")); + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") + ACE_TEXT ("could not add new connection to cache\n"))); } return 0; @@ -285,9 +513,9 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - IIOP_Connector [%d]::make_connection, " - "could not register the transport " - "in the reactor.\n", + ACE_TEXT ("(%P|%t) IIOP_Connector [%d]::make_connection, ") + ACE_TEXT ("could not register the transport ") + ACE_TEXT ("in the reactor.\n"), transport->id ())); return 0; @@ -395,10 +623,15 @@ TAO_IIOP_Connector::cancel_svc_handler ( // Cancel from the connector if (handler) + { + handler->abort(); return this->base_connector_.cancel (handler); + } return -1; } + + //@@ TAO_CONNECTOR_SPL_COPY_HOOK_END TAO_END_VERSIONED_NAMESPACE_DECL |