summaryrefslogtreecommitdiff
path: root/TAO/tao/IIOP_Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/IIOP_Connector.cpp')
-rw-r--r--TAO/tao/IIOP_Connector.cpp307
1 files changed, 270 insertions, 37 deletions
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp
index 91cd5db0599..dd7aa107fe7 100644
--- a/TAO/tao/IIOP_Connector.cpp
+++ b/TAO/tao/IIOP_Connector.cpp
@@ -10,11 +10,14 @@
#include "tao/Connect_Strategy.h"
#include "tao/Thread_Lane_Resources.h"
#include "tao/Profile_Transport_Resolver.h"
+#include "tao/Base_Transport_Property.h"
#include "tao/Transport.h"
#include "tao/Wait_Strategy.h"
#include "tao/SystemException.h"
+#include "tao/LF_Multi_Event.h"
#include "ace/OS_NS_strings.h"
#include "ace/OS_NS_string.h"
+#include "ace/OS_NS_time.h"
ACE_RCSID (tao,
IIOP_Connector,
@@ -22,6 +25,51 @@ ACE_RCSID (tao,
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+
+//-----------------------------------------------------------------------------
+
+/**
+ * @class TAO_Event_Handler_Array_var
+ *
+ * @brief Auto pointer like class for an array of Event Handlers.
+ *
+ * Used to manage lifecycle of handlers. This class calls
+ * ACE_Event_Handler::remove_reference() on each handler in its destructor
+ * This class started out life as a replacement for the ACE_Event_Handle_var
+ * but is now pared down to be very specific in its role..
+ */
+class TAO_IIOP_Connection_Handler_Array_Guard
+{
+public:
+ TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p, unsigned count);
+ ~TAO_IIOP_Connection_Handler_Array_Guard (void);
+
+private:
+ /// Handler.
+ TAO_IIOP_Connection_Handler **ptr_;
+ unsigned count_;
+};
+
+TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard (TAO_IIOP_Connection_Handler **p,
+ unsigned count)
+ : ptr_ (p),
+ count_ (count)
+{
+}
+
+TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void)
+{
+ if (this->ptr_ != 0)
+ {
+ for (unsigned i = 0; i < this->count_; i++)
+ this->ptr_[i]->remove_reference ();
+ }
+}
+
+//---------------------------------------------------------------------------
+
+
TAO_IIOP_Connector::~TAO_IIOP_Connector (void)
{
}
@@ -81,6 +129,12 @@ TAO_IIOP_Connector::close (void)
}
int
+TAO_IIOP_Connector::supports_parallel_connects(void) const
+{
+ return 1;
+}
+
+int
TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint)
{
TAO_IIOP_Endpoint *iiop_endpoint =
@@ -105,8 +159,8 @@ TAO_IIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint)
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) IIOP connection failed.\n")
- ACE_TEXT ("TAO (%P|%t) This is most likely ")
+ ACE_TEXT ("(%P|%t) IIOP connection failed.\n")
+ ACE_TEXT (" This is most likely ")
ACE_TEXT ("due to a hostname lookup ")
ACE_TEXT ("failure.\n")));
}
@@ -122,12 +176,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
TAO_Transport_Descriptor_Interface &desc,
ACE_Time_Value *timeout)
{
+ TAO_IIOP_Connection_Handler *svc_handler = 0;
TAO_IIOP_Endpoint *iiop_endpoint =
- this->remote_endpoint (desc.endpoint ());
-
+ this->remote_endpoint (desc.endpoint());
+ int result = -1;
if (iiop_endpoint == 0)
return 0;
+ result = this->begin_connection (svc_handler, r, iiop_endpoint, timeout);
+
+ if (result == -1 && errno != EWOULDBLOCK)
+ {
+ // connect completed unsuccessfully
+ svc_handler->remove_reference();
+ // Give users a clue to the problem.
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ")
+ ACE_TEXT("connection to <%s:%d> failed (%p)\n"),
+ ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()),
+ iiop_endpoint->port (),
+ ACE_TEXT("errno")));
+ }
+ return 0;
+ }
+ TAO_IIOP_Connection_Handler **sh_ptr = &svc_handler;
+ TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint;
+ TAO_LF_Multi_Event mev;
+ mev.add_event(svc_handler);
+ return this->complete_connection (result, sh_ptr, ep_ptr, 1U, r, &mev, timeout);
+}
+
+TAO_Transport *
+TAO_IIOP_Connector::make_parallel_connection (TAO::Profile_Transport_Resolver *r,
+ TAO_Transport_Descriptor_Interface &desc,
+ ACE_Time_Value *timeout)
+{
+ TAO_Endpoint *root_ep = desc.endpoint();
+ unsigned max_count = 1;
+ unsigned long ns_stagger =
+ this->orb_core()->orb_params()->parallel_connect_delay();
+ unsigned long sec_stagger = ns_stagger/1000;
+ ns_stagger = (ns_stagger % 1000) * 1000000;
+ for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0);
+ ep != 0;
+ ep = ep->next_filtered(this->orb_core(),root_ep))
+ max_count++;
+
+ if (TAO_debug_level > 2)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::")
+ ACE_TEXT ("make_parallel_connection, ")
+ ACE_TEXT ("to %d endpoints\n"), max_count));
+ TAO_IIOP_Endpoint **eplist = 0;
+ TAO_IIOP_Connection_Handler **shlist = 0;
+ ACE_NEW_RETURN (shlist,TAO_IIOP_Connection_Handler *[max_count], 0);
+ ACE_NEW_RETURN (eplist, TAO_IIOP_Endpoint *[max_count], 0);
+
+ TAO_LF_Multi_Event mev;
+ int result = 0;
+ unsigned count = 0;
+ for (TAO_Endpoint *ep = root_ep->next_filtered (this->orb_core(),0);
+ ep != 0;
+ ep = ep->next_filtered(this->orb_core(),root_ep))
+ {
+ eplist[count] = this->remote_endpoint (ep);
+ shlist[count] = 0;
+ result = this->begin_connection (shlist[count],
+ r,
+ eplist[count],
+ timeout);
+
+ // The connection may fail because it is slow, or for other reasons.
+ // If it was an incomplete non-blocking connection, add it to the list
+ // to be waited on, otherwise remove the reference to the handler and
+ // move on to the next endpoint.
+ if (result == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ {
+ mev.add_event(shlist[count++]);
+ if (ep->next() != 0)
+ {
+ struct timespec nsleep = {sec_stagger, ns_stagger};
+ ACE_OS::nanosleep (&nsleep);
+ result = this->active_connect_strategy_->poll (&mev);
+ if (result != -1)
+ break;
+ }
+ }
+ else
+ {
+ shlist[count]->remove_reference(); // done bump the list count
+ }
+ continue;
+ }
+
+ if (result != -1) // we have a winner!
+ {
+ count++;
+ break; // no waiting involved since a connection is completed
+ }
+ }
+
+ TAO_Transport *winner = 0;
+ if (count > 0) // only complete if at least one pending or success
+ winner = this->complete_connection (result,shlist,eplist,count,r,&mev,timeout);
+ delete [] shlist; // reference reductions should have been done already
+ delete [] eplist;
+ return winner;
+}
+
+int
+TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler,
+ TAO::Profile_Transport_Resolver *r,
+ TAO_IIOP_Endpoint *iiop_endpoint,
+ ACE_Time_Value *timeout)
+{
const ACE_INET_Addr &remote_address =
iiop_endpoint->object_addr ();
@@ -146,8 +312,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
if (TAO_debug_level > 2)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - IIOP_Connector::make_connection, "
- "to <%s:%d> which should %s\n",
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::begin_connection, ")
+ ACE_TEXT ("to <%s:%d> which should %s\n"),
ACE_TEXT_TO_TCHAR_IN(iiop_endpoint->host()),
iiop_endpoint->port(),
r->blocked_connect () ? ACE_TEXT("block") : ACE_TEXT("nonblock")));
@@ -167,7 +333,7 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
timeout = &tmp_zero;
}
- TAO_IIOP_Connection_Handler *svc_handler = 0;
+ svc_handler = 0;
int result =
this->base_connector_.connect (svc_handler,
@@ -193,62 +359,124 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
// once the connect() returns since this might be too late if
// another thread pick up the completion and potentially deletes the
// handler before we get a chance to increment the reference count.
+ return result;
+}
- // Make sure that we always do a remove_reference
- ACE_Event_Handler_var svc_handler_auto_ptr (svc_handler);
-
- TAO_Transport *transport = svc_handler->transport ();
-
- if (result == -1)
+TAO_Transport *
+TAO_IIOP_Connector::complete_connection (int result,
+ TAO_IIOP_Connection_Handler **&sh_list,
+ TAO_IIOP_Endpoint **ep_list,
+ unsigned count,
+ TAO::Profile_Transport_Resolver *r,
+ TAO_LF_Multi_Event *mev,
+ ACE_Time_Value *timeout)
+{
+ // Make sure that we always do a remove_reference for every member
+ // of the list
+ TAO_IIOP_Connection_Handler_Array_Guard svc_handler_auto_ptr (sh_list,count);
+ TAO_Transport *transport = 0;
+ TAO_Transport **tlist = 0;
+ ACE_NEW_RETURN (tlist,TAO_Transport*[count],0);
+
+ // populate the transport list
+ for (unsigned i = 0; i < count; i++)
+ tlist[i] = sh_list[i]->transport();
+
+ if (result != -1)
+ {
+ // We received a compeleted connection and 0 or more pending.
+ // the winner is the last member of the list, because the
+ // iterator stopped on a successful connect.
+ transport = tlist[count-1];
+ }
+ else
{
- // No immediate result, wait for completion
- if (errno == EWOULDBLOCK)
+ if (count == 1)
{
- // Try to wait until connection completion. Incase we block, then we
- // get a connected transport or not. In case of non block we get
- // a connected or not connected transport
+ transport = tlist[0];
if (!this->wait_for_connection_completion (r,
transport,
timeout))
{
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR, "TAO (%P|%t) - IIOP_Connector::"
- "make_connection, "
- "wait for completion failed\n"));
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::")
+ ACE_TEXT ("complete_connection, wait for completion ")
+ ACE_TEXT ("failed for 1 pending connect\n")));
}
}
else
{
- // Transport is not usable
- transport = 0;
+ if (!this->wait_for_connection_completion (r,
+ transport,
+ tlist,
+ count,
+ mev,
+ timeout))
+ {
+ if (TAO_debug_level > 2)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::")
+ ACE_TEXT ("complete_connection, wait for completion ")
+ ACE_TEXT ("failed for %d pending connects\n"),
+ count));
+ }
+ }
+ }
+ // At this point, the connection has be successfully created
+ // connected or not connected, but we have a connection.
+ TAO_IIOP_Connection_Handler *svc_handler = 0;
+ TAO_IIOP_Endpoint *iiop_endpoint = 0;
+
+ if (transport != 0)
+ {
+ for (unsigned i = 0; i < count; i++)
+ {
+ if (transport == tlist[i])
+ {
+ svc_handler = sh_list[i];
+ iiop_endpoint = ep_list[i];
+ break;
+ }
}
}
+
+ // Done with the transport list
+ delete [] tlist;
+
// In case of errors transport is zero
if (transport == 0)
{
// Give users a clue to the problem.
if (TAO_debug_level > 3)
+ {
+ for (unsigned i = 0; i < count; i++)
ACE_DEBUG ((LM_ERROR,
- "TAO (%P|%t) - IIOP_Connector::make_connection, "
- "connection to <%s:%d> failed (%p)\n",
- iiop_endpoint->host (), iiop_endpoint->port (),
+ ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ")
+ ACE_TEXT("connection to <%s:%d> failed (%p)\n"),
+ ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()),
+ ep_list[i]->port (),
ACE_TEXT("errno")));
+ }
return 0;
}
- // At this point, the connection has be successfully created
- // connected or not connected, but we have a connection.
+
if (TAO_debug_level > 2)
+ {
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - IIOP_Connector::make_connection, "
- "new %s connection to <%s:%d> on Transport[%d]\n",
- transport->is_connected() ? "connected" : "not connected",
- iiop_endpoint->host (),
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Connector::make_connection, ")
+ ACE_TEXT ("new %s connection to <%s:%d> on Transport[%d]\n"),
+ transport->is_connected() ?
+ ACE_TEXT("connected") : ACE_TEXT("not connected"),
+ ACE_TEXT_CHAR_TO_TCHAR (iiop_endpoint->host ()),
iiop_endpoint->port (),
svc_handler->peer ().get_handle ()));
+ }
+ TAO_Base_Transport_Property desc(iiop_endpoint,0);
// Add the handler to Cache
int retval =
this->orb_core ()->lane_resources ().transport_cache ().cache_transport (
@@ -264,8 +492,8 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
if (TAO_debug_level > 0)
{
ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - IIOP_Connector::make_connection, "
- "could not add the new connection to cache\n"));
+ ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ")
+ ACE_TEXT ("could not add new connection to cache\n")));
}
return 0;
@@ -285,9 +513,9 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - IIOP_Connector [%d]::make_connection, "
- "could not register the transport "
- "in the reactor.\n",
+ ACE_TEXT ("(%P|%t) IIOP_Connector [%d]::make_connection, ")
+ ACE_TEXT ("could not register the transport ")
+ ACE_TEXT ("in the reactor.\n"),
transport->id ()));
return 0;
@@ -395,10 +623,15 @@ TAO_IIOP_Connector::cancel_svc_handler (
// Cancel from the connector
if (handler)
+ {
+ handler->abort();
return this->base_connector_.cancel (handler);
+ }
return -1;
}
+
+
//@@ TAO_CONNECTOR_SPL_COPY_HOOK_END
TAO_END_VERSIONED_NAMESPACE_DECL