summaryrefslogtreecommitdiff
path: root/TAO/tao
diff options
context:
space:
mode:
authorAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-15 15:49:02 +0000
committerAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-15 15:49:02 +0000
commit25a924e9e32dd1fe24047bfa66b293baef5e7648 (patch)
tree0a1115f1046fcc2c35626776f6d4ec85bcf69d6d /TAO/tao
parentdd79b6cb78943f5f29b6be3d20083d9c4b2f46cc (diff)
downloadATCD-25a924e9e32dd1fe24047bfa66b293baef5e7648.tar.gz
Tue Aug 15 14:56:35 UTC 2006 Adam Mitz <mitza@ociweb.com>
Diffstat (limited to 'TAO/tao')
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp27
-rw-r--r--TAO/tao/Asynch_Queued_Message.h15
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp9
-rw-r--r--TAO/tao/Connection_Handler.h5
-rw-r--r--TAO/tao/Connection_Handler.inl6
-rw-r--r--TAO/tao/Flushing_Strategy.h6
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/IIOP_Connector.cpp36
-rw-r--r--TAO/tao/Invocation_Adapter.cpp13
-rw-r--r--TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp11
-rw-r--r--TAO/tao/Messaging/Messaging_Policy_i.cpp7
-rw-r--r--TAO/tao/Profile_Transport_Resolver.cpp55
-rw-r--r--TAO/tao/Queued_Message.cpp6
-rw-r--r--TAO/tao/Queued_Message.h15
-rw-r--r--TAO/tao/Strategies/SCIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/SCIOP_Connector.cpp16
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Connector.cpp15
-rw-r--r--TAO/tao/Synch_Invocation.cpp2
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp5
-rw-r--r--TAO/tao/Transport.cpp142
-rw-r--r--TAO/tao/Transport.h10
-rw-r--r--TAO/tao/Transport_Connector.cpp271
-rw-r--r--TAO/tao/Transport_Queueing_Strategies.cpp14
25 files changed, 447 insertions, 255 deletions
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
index 7c6ec5020e8..d21cc0a65ca 100644
--- a/TAO/tao/Asynch_Queued_Message.cpp
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -8,7 +8,7 @@
#include "ace/Log_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Malloc_Base.h"
-
+#include "ace/High_Res_Timer.h"
ACE_RCSID (tao,
Asynch_Queued_Message,
@@ -19,12 +19,16 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
const ACE_Message_Block *contents,
TAO_ORB_Core *oc,
+ ACE_Time_Value *timeout,
ACE_Allocator *alloc,
bool is_heap_allocated)
: TAO_Queued_Message (oc, alloc, is_heap_allocated)
, size_ (contents->total_length ())
, offset_ (0)
+ , abs_timeout_ (ACE_Time_Value::zero)
{
+ if (timeout != 0 && *timeout != ACE_Time_Value::zero)
+ this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
// @@ Use a pool for these guys!!
ACE_NEW (this->buffer_, char[this->size_]);
@@ -43,11 +47,13 @@ TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
TAO_ORB_Core *oc,
size_t size,
+ const ACE_Time_Value &abs_timeout,
ACE_Allocator *alloc)
- : TAO_Queued_Message (oc, alloc)
+ : TAO_Queued_Message (oc, alloc, 0)
, size_ (size)
, offset_ (0)
, buffer_ (buf)
+ , abs_timeout_ (abs_timeout)
{
}
@@ -133,6 +139,7 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
TAO_Asynch_Queued_Message (buf,
this->orb_core_,
sz,
+ this->abs_timeout_,
alloc),
0);
}
@@ -150,7 +157,8 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
ACE_NEW_RETURN (qm,
TAO_Asynch_Queued_Message (buf,
this->orb_core_,
- sz),
+ sz,
+ this->abs_timeout_),
0);
}
@@ -180,7 +188,20 @@ TAO_Asynch_Queued_Message::destroy (void)
delete this;
}
}
+}
+bool
+TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
+{
+ if (this->abs_timeout_ > ACE_Time_Value::zero)
+ {
+ if (this->offset_ > 0)
+ {
+ return false; //never expire partial messages
+ }
+ return this->abs_timeout_ < now;
+ }
+ return false;
}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
index 045f3dcd4fd..991a9f07063 100644
--- a/TAO/tao/Asynch_Queued_Message.h
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -17,12 +17,15 @@
#include "tao/Queued_Message.h"
+#include "ace/Time_Value.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+class ACE_Message_Block;
/**
* @class TAO_Asynch_Queued_Message
*
@@ -39,6 +42,9 @@ public:
*
* @param alloc Allocator used for creating @c this object.
*
+ * @param timeout The relative timeout after which this
+ * message should be expired.
+ *
* @todo I'm almost sure this class will require a callback
* interface for AMIs sent with SYNC_NONE policy. Those guys
* need to hear when the connection timeouts or closes, but
@@ -46,6 +52,7 @@ public:
*/
TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
TAO_ORB_Core *oc,
+ ACE_Time_Value *timeout,
ACE_Allocator *alloc = 0,
bool is_heap_allocated = false);
@@ -65,6 +72,7 @@ public:
/// it here for the sake of uniformity.
virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
+ virtual bool is_expired (const ACE_Time_Value &now) const;
//@}
protected:
@@ -78,11 +86,14 @@ protected:
* @param size The size of the buffer <buf> that is being handed
* over.
*
+ * @param abs_timeout The time after which this message should be expired.
+ *
* @param alloc Allocator used for creating <this> object.
*/
TAO_Asynch_Queued_Message (char *buf,
TAO_ORB_Core *oc,
size_t size,
+ const ACE_Time_Value &abs_timeout,
ACE_Allocator *alloc = 0);
private:
/// The number of bytes in the buffer
@@ -97,6 +108,10 @@ private:
/// The buffer containing the complete message.
char *buffer_;
+
+ // Expiration time
+ ACE_Time_Value abs_timeout_;
+
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
index 8e0282ecfdd..87082f19819 100644
--- a/TAO/tao/Block_Flushing_Strategy.cpp
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -9,14 +9,9 @@ ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$")
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
int
-TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *transport)
+TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *)
{
- while (!transport->queue_is_empty_i ())
- {
- if (transport->drain_queue_i () == -1)
- return -1;
- }
- return 0;
+ return MUST_FLUSH;
}
int
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h
index e17ff6af08f..2342390dd5c 100644
--- a/TAO/tao/Connection_Handler.h
+++ b/TAO/tao/Connection_Handler.h
@@ -70,12 +70,15 @@ public:
/// Set the underlying transport object
void transport (TAO_Transport* transport);
- /// Is the handler closed?
+ /// Is the handler closed or timed out?
bool is_closed (void) const;
/// Is the handler open?
bool is_open (void) const;
+ /// Closed due to timeout?
+ bool is_timeout (void) const;
+
/// Is the handler in the process of being connected?
bool is_connecting (void) const;
diff --git a/TAO/tao/Connection_Handler.inl b/TAO/tao/Connection_Handler.inl
index 5841cf3aebc..48e1b0b1b41 100644
--- a/TAO/tao/Connection_Handler.inl
+++ b/TAO/tao/Connection_Handler.inl
@@ -24,6 +24,12 @@ TAO_Connection_Handler::is_closed (void) const
}
ACE_INLINE bool
+TAO_Connection_Handler::is_timeout (void) const
+{
+ return (this->state_ == TAO_LF_Event::LFS_TIMEOUT);
+}
+
+ACE_INLINE bool
TAO_Connection_Handler::is_open (void) const
{
return this->state_ == TAO_LF_Event::LFS_SUCCESS;
diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h
index ead55aec8f2..61c9b51de10 100644
--- a/TAO/tao/Flushing_Strategy.h
+++ b/TAO/tao/Flushing_Strategy.h
@@ -56,7 +56,13 @@ public:
/// Destructor
virtual ~TAO_Flushing_Strategy (void);
+ enum SCHEDULE_OUTPUT_RETURN { MUST_FLUSH = -2 };
+
/// Schedule the transport argument to be flushed
+ /// If -2 is returned then the caller must call one of
+ /// the flush_* methods.
+ /// If -1 is returned then there was an error.
+ /// If 0 is returned then the flush was scheduled successfully.
virtual int schedule_output (TAO_Transport *transport) = 0;
/// Cancel all scheduled output for the transport argument
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index 0f59383abe5..6a56bb9e0b9 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -322,7 +322,9 @@ TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp
index ec2165a07e4..4d98fe433ff 100644
--- a/TAO/tao/IIOP_Connector.cpp
+++ b/TAO/tao/IIOP_Connector.cpp
@@ -60,6 +60,7 @@ TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard
TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void)
{
+ ACE_Errno_Guard eguard (errno);
if (this->ptr_ != 0)
{
for (unsigned i = 0; i < this->count_; i++)
@@ -205,9 +206,10 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint;
TAO_LF_Multi_Event mev;
mev.add_event(svc_handler);
- return this->complete_connection (result, desc,
- sh_ptr, ep_ptr,
- 1U, r, &mev, timeout);
+ TAO_Transport* tp = this->complete_connection (result, desc,
+ sh_ptr, ep_ptr,
+ 1U, r, &mev, timeout);
+ return tp;
}
TAO_Transport *
@@ -327,14 +329,9 @@ TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler,
this->active_connect_strategy_->synch_options (timeout,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- timeout = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
svc_handler = 0;
@@ -427,7 +424,7 @@ TAO_IIOP_Connector::complete_connection (int result,
}
}
}
- // At this point, the connection has be successfully created
+ // At this point, the connection has been successfully created
// connected or not connected, but we have a connection.
TAO_IIOP_Connection_Handler *svc_handler = 0;
TAO_IIOP_Endpoint *iiop_endpoint = 0;
@@ -461,14 +458,15 @@ TAO_IIOP_Connector::complete_connection (int result,
if (TAO_debug_level > 3)
{
for (unsigned i = 0; i < count; i++)
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ")
- ACE_TEXT("connection to <%s:%d> failed (%p)\n"),
- ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()),
- ep_list[i]->port (),
- ACE_TEXT("errno")));
+ {
+ ACE_DEBUG ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection,")
+ ACE_TEXT (" connection to <%s:%d> failed (%p)\n"),
+ ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()),
+ ep_list[i]->port (),
+ ACE_TEXT("errno")));
+ }
}
-
return 0;
}
diff --git a/TAO/tao/Invocation_Adapter.cpp b/TAO/tao/Invocation_Adapter.cpp
index b26bad9f3f1..bc715ad4bac 100644
--- a/TAO/tao/Invocation_Adapter.cpp
+++ b/TAO/tao/Invocation_Adapter.cpp
@@ -12,6 +12,8 @@
#include "tao/Transport_Mux_Strategy.h"
#include "tao/Collocation_Proxy_Broker.h"
#include "tao/GIOP_Utils.h"
+#include "tao/TAOC.h"
+
#if !defined (__ACE_INLINE__)
# include "tao/Invocation_Adapter.inl"
#endif /* __ACE_INLINE__ */
@@ -64,8 +66,6 @@ namespace TAO
// Initial state
TAO::Invocation_Status status = TAO_INVOKE_START;
- ACE_Time_Value *max_wait_time = 0;
-
while (status == TAO_INVOKE_START ||
status == TAO_INVOKE_RESTART)
{
@@ -87,6 +87,7 @@ namespace TAO
if (strat == TAO_CS_REMOTE_STRATEGY ||
strat == TAO_CS_LAST)
{
+ ACE_Time_Value *max_wait_time = 0;
status =
this->invoke_remote_i (stub,
details,
@@ -261,12 +262,18 @@ namespace TAO
(void) this->set_response_flags (stub,
details);
+ CORBA::Octet rflags = details.response_flags ();
+ bool block_connect =
+ rflags != static_cast<CORBA::Octet> (Messaging::SYNC_NONE)
+ && rflags != static_cast<CORBA::Octet> (TAO::SYNC_EAGER_BUFFERING)
+ && rflags != static_cast<CORBA::Octet> (TAO::SYNC_DELAYED_BUFFERING);
+
// Create the resolver which will pick (or create) for us a
// transport and a profile from the effective_target.
Profile_Transport_Resolver resolver (
effective_target.in (),
stub,
- (details.response_flags () != Messaging::SYNC_NONE));
+ block_connect);
resolver.resolve (max_wait_time
ACE_ENV_ARG_PARAMETER);
diff --git a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
index d17338aae9c..b621a85b375 100644
--- a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
+++ b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
@@ -103,11 +103,9 @@ TAO_ConnectionTimeoutPolicy::hook (TAO_ORB_Core *orb_core,
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs =
- static_cast<CORBA::ULong> (microseconds / 1000);
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
ACE_CATCHANY
@@ -188,10 +186,9 @@ TAO_ConnectionTimeoutPolicy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong const msecs = time_value.msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
diff --git a/TAO/tao/Messaging/Messaging_Policy_i.cpp b/TAO/tao/Messaging/Messaging_Policy_i.cpp
index 35d3d25a37f..39d4cc2b50a 100644
--- a/TAO/tao/Messaging/Messaging_Policy_i.cpp
+++ b/TAO/tao/Messaging/Messaging_Policy_i.cpp
@@ -107,7 +107,7 @@ TAO_RelativeRoundtripTimeoutPolicy::hook (TAO_ORB_Core *orb_core,
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
+ ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"),
time_value.msec ()));
}
}
@@ -189,10 +189,9 @@ TAO_RelativeRoundtripTimeoutPolicy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs = time_value.msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
diff --git a/TAO/tao/Profile_Transport_Resolver.cpp b/TAO/tao/Profile_Transport_Resolver.cpp
index 81d3a67f52d..30cc30341be 100644
--- a/TAO/tao/Profile_Transport_Resolver.cpp
+++ b/TAO/tao/Profile_Transport_Resolver.cpp
@@ -123,28 +123,28 @@ namespace TAO
bool
Profile_Transport_Resolver::try_connect (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value
+ ACE_Time_Value *timeout
ACE_ENV_ARG_DECL
)
{
- return this->try_connect_i (desc,max_time_value,0 ACE_ENV_ARG_PARAMETER);
+ return this->try_connect_i (desc, timeout, 0 ACE_ENV_ARG_PARAMETER);
}
bool
Profile_Transport_Resolver::try_parallel_connect (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value
+ ACE_Time_Value *timeout
ACE_ENV_ARG_DECL
)
{
- return this->try_connect_i (desc,max_time_value,1 ACE_ENV_ARG_PARAMETER);
+ return this->try_connect_i (desc, timeout, 1 ACE_ENV_ARG_PARAMETER);
}
bool
Profile_Transport_Resolver::try_connect_i (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value,
+ ACE_Time_Value *timeout,
bool parallel
ACE_ENV_ARG_DECL
)
@@ -165,41 +165,40 @@ namespace TAO
}
ACE_Time_Value connection_timeout;
+ bool has_con_timeout = this->get_connection_timeout (connection_timeout);
- bool const is_conn_timeout =
- this->get_connection_timeout (connection_timeout);
-
- ACE_Time_Value *max_wait_time =
- is_conn_timeout ? &connection_timeout : max_time_value;
+ if (has_con_timeout && !blocked_)
+ {
+ timeout = &connection_timeout;
+ }
+ else if (has_con_timeout)
+ {
+ if (timeout == 0 || connection_timeout < *timeout)
+ timeout = &connection_timeout;
+ }
+ else if (!blocked_)
+ {
+ timeout = 0;
+ }
+ TAO_Connector *con = conn_reg->get_connector (desc->endpoint ()->tag ());
+ ACE_ASSERT(con != 0);
if (parallel)
{
- this->transport_ =
- conn_reg->get_connector (desc->endpoint ()->tag ())->
- parallel_connect (this,
- desc,
- max_wait_time
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (false);
+ this->transport_ = con->parallel_connect (this, desc, timeout
+ ACE_ENV_ARG_PARAMETER);
}
else
{
- // Obtain a connection.
- this->transport_ =
- conn_reg->get_connector (desc->endpoint ()->tag ())->
- connect (this,
- desc,
- max_wait_time
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (false);
+ this->transport_ = con->connect (this, desc, timeout
+ ACE_ENV_ARG_PARAMETER);
}
+ ACE_CHECK_RETURN (false);
// A timeout error occurred.
// If the user has set a roundtrip timeout policy, throw a timeout
// exception. Otherwise, just fall through and return false to
// look at the next endpoint.
- if (this->transport_ == 0
- && is_conn_timeout == false
- && errno == ETIME)
+ if (this->transport_ == 0 && errno == ETIME)
{
ACE_THROW_RETURN (CORBA::TIMEOUT (
CORBA::SystemException::_tao_minor_code (
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 8b4b73072c9..19cba1406a6 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -102,4 +102,10 @@ TAO_Queued_Message::push_front (TAO_Queued_Message *&head,
}
}
+bool
+TAO_Queued_Message::is_expired (const ACE_Time_Value &) const
+{
+ return false;
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 70defaa15cd..c8e8883fd11 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -17,6 +17,7 @@
#include "tao/LF_Invocation_Event.h"
#include "ace/os_include/os_stddef.h"
+#include "ace/Time_Value.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -76,8 +77,8 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Invocation_Event
public:
/// Constructor
TAO_Queued_Message (TAO_ORB_Core *oc,
- ACE_Allocator *alloc = 0,
- bool is_heap_allocated = false);
+ ACE_Allocator *alloc,
+ bool is_heap_allocated);
/// Destructor
virtual ~TAO_Queued_Message (void);
@@ -194,6 +195,16 @@ public:
* a pool), they need to be reclaimed explicitly.
*/
virtual void destroy (void) = 0;
+
+ /// Check for timeout
+ /**
+ * @param now Pass in the current time using
+ * ACE_High_Res_Timer::gettimeofday_hr().
+ * This is a parameter in order to avoid calling gettimeofday_hr() inside
+ * of this method (which will be called in a tight loop).
+ * @return true if the relative roundtrip timeout has expired.
+ */
+ virtual bool is_expired (const ACE_Time_Value &now) const;
//@}
protected:
diff --git a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
index dd83f2fac6e..5df628bcc5c 100644
--- a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
@@ -250,7 +250,9 @@ TAO_SCIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/SCIOP_Connector.cpp b/TAO/tao/Strategies/SCIOP_Connector.cpp
index 663bccdb60a..70f61e87ae0 100644
--- a/TAO/tao/Strategies/SCIOP_Connector.cpp
+++ b/TAO/tao/Strategies/SCIOP_Connector.cpp
@@ -172,14 +172,9 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r,
this->active_connect_strategy_->synch_options (timeout,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- timeout = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
TAO_SCIOP_Connection_Handler *svc_handler = 0;
@@ -268,6 +263,11 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r,
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
+
// At this point, the connection has be successfully connected.
// #REFCOUNT# is one.
if (TAO_debug_level > 2)
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
index 788407dfd7d..a584be7e368 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
@@ -204,7 +204,9 @@ TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
index ada81ab9075..f7f754cb197 100644
--- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
@@ -183,7 +183,9 @@ TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/UIOP_Connector.cpp b/TAO/tao/Strategies/UIOP_Connector.cpp
index efce00f17d2..3bbec64d87e 100644
--- a/TAO/tao/Strategies/UIOP_Connector.cpp
+++ b/TAO/tao/Strategies/UIOP_Connector.cpp
@@ -169,14 +169,9 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
this->active_connect_strategy_->synch_options (max_wait_time,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- max_wait_time = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
TAO_UIOP_Connection_Handler *svc_handler = 0;
@@ -248,6 +243,10 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
// At this point, the connection has be successfully created
// connected or not connected, but we have a connection.
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index bff4141a033..e38207cdbfa 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -772,7 +772,7 @@ namespace TAO
"TAO (%P|%t) - Synch_Oneway_Invocation::"
"remote_oneway, queueing message\n"));
- if (transport->format_queue_message (cdr) != 0)
+ if (transport->format_queue_message (cdr, max_wait_time) != 0)
s = TAO_INVOKE_FAILURE;
}
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
index 59fecb37311..afa14627a05 100644
--- a/TAO/tao/Synch_Queued_Message.cpp
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -129,7 +129,8 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
TAO_Synch_Queued_Message (mb,
this->orb_core_,
- alloc),
+ alloc,
+ 0),
0);
}
else
@@ -144,7 +145,7 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
}
ACE_NEW_RETURN (qm,
- TAO_Synch_Queued_Message (mb, this->orb_core_),
+ TAO_Synch_Queued_Message (mb, this->orb_core_, 0, 0),
0);
}
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 7f3c7ebf4b2..3fcb0b164a9 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -27,6 +27,7 @@
#include "ace/OS_NS_stdio.h"
#include "ace/Reactor.h"
#include "ace/os_include/sys/os_uio.h"
+#include "ace/High_Res_Timer.h"
/*
* Specialization hook to add include files from
@@ -478,12 +479,13 @@ TAO_Transport::handle_output (void)
}
int
-TAO_Transport::format_queue_message (TAO_OutputCDR &stream)
+TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time)
{
if (this->messaging_object ()->format_message (stream) != 0)
return -1;
- return this->queue_message_i (stream.begin());
+ return this->queue_message_i (stream.begin (), max_wait_time);
}
int
@@ -503,7 +505,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
size_t &bytes_transferred,
ACE_Time_Value *)
{
- size_t const total_length = mb->total_length ();
+ const size_t total_length = mb->total_length ();
// We are going to block, so there is no need to clone
// the message block.
@@ -512,7 +514,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int const n = this->drain_queue_i ();
+ const int n = this->drain_queue_i ();
if (n == -1)
{
@@ -541,14 +543,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// We are going to block, so there is no need to clone
// the message block.
TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
+ const size_t message_length = synch_message.message_length ();
- // Push synch_message on to the back of the queue.
synch_message.push_back (this->head_, this->tail_);
- int const n =
- this->send_synch_message_helper_i (synch_message,
- max_wait_time);
-
+ const int n = this->send_synch_message_helper_i (synch_message,
+ 0 /*ignored*/);
if (n == -1 || n == 1)
{
return n;
@@ -558,18 +558,30 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// if (max_wait_time != 0 && errno == ETIME) return -1;
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == -1)
+ {
+ synch_message.remove_from_list (this->head_, this->tail_);
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_synchronous_message_i, ")
+ ACE_TEXT ("error while scheduling flush - %m\n"),
+ this->id ()));
+ }
+ return -1;
+ }
+
+ // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
+ // because we're always going to flush anyway.
// Release the mutex, other threads may modify the queue as we
// block for a long time writing out data.
- int result;
{
typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
- ace_mon,
- reverse,
- -1);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
result = flushing_strategy->flush_message (this,
&synch_message,
@@ -582,7 +594,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (errno == ETIME)
{
- if (this->head_ == &synch_message)
+ // If partially sent, then we must queue the remainder.
+ if (message_length != synch_message.message_length ())
{
// This is a timeout, there is only one nasty case: the
// message has been partially sent! We simply cannot take
@@ -597,13 +610,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// and figuring out the request ID would be a bit of a
// nightmare.
//
-
- synch_message.remove_from_list (this->head_, this->tail_);
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (
synch_message.current_block (),
this->orb_core_,
+ 0, // no timeout
0,
1),
-1);
@@ -682,6 +694,13 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
msg->remove_from_list (this->head_, this->tail_);
msg->destroy ();
}
+ else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_message(this, msg, 0);
+ }
return 1;
}
@@ -797,7 +816,14 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_transport (this);
+ }
}
return 0;
@@ -920,8 +946,30 @@ TAO_Transport::drain_queue_i (void)
// call.
this->sent_byte_count_ = 0;
+ // Avoid calling this expensive function each time through the loop. Instead
+ // we'll assume that the time is unlikely to change much during the loop.
+ // If we are forced to send in the loop then we'll recompute the time.
+ ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
+
while (i != 0)
{
+ if (i->is_expired (now))
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
+ ACE_TEXT ("Discarding expired queued message.\n"),
+ this->id ()));
+ }
+ TAO_Queued_Message *next = i->next ();
+ i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
+ this->orb_core_->leader_follower ());
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ i = next;
+ continue;
+ }
// ... each element fills the iovector ...
i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
@@ -933,6 +981,8 @@ TAO_Transport::drain_queue_i (void)
int const retval =
this->drain_queue_helper (iovcnt, iov);
+ now = ACE_High_Res_Timer::gettimeofday_hr ();
+
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -999,11 +1049,19 @@ TAO_Transport::cleanup_queue_i ()
this->id ()));
}
+ int byte_count = 0;
+ int msg_count = 0;
+
// Cleanup all messages
while (this->head_ != 0)
{
TAO_Queued_Message *i = this->head_;
+ if (TAO_debug_level > 4)
+ {
+ byte_count += i->message_length();
+ ++msg_count;
+ }
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
@@ -1012,7 +1070,15 @@ TAO_Transport::cleanup_queue_i ()
i->remove_from_list (this->head_, this->tail_);
i->destroy ();
- }
+ }
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
+ ACE_TEXT ("discarded %d messages, %d bytes.\n"),
+ this->id (), msg_count, byte_count));
+ }
}
void
@@ -1232,6 +1298,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
return 0;
}
+ // If it was partially sent, then we can't allow a timeout
+ if (byte_count > 0)
+ max_wait_time = 0;
+
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1262,14 +1332,16 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
this->id ()));
}
- if (this->queue_message_i(message_block) == -1)
+ if (this->queue_message_i (message_block, max_wait_time) == -1)
{
if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("cannot queue message for ")
- ACE_TEXT (" - %m\n"),
- this->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_asynchronous_message_i, ")
+ ACE_TEXT ("cannot queue message for - %m\n"),
+ this->id ()));
+ }
return -1;
}
@@ -1289,7 +1361,11 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (constraints_reached || try_sending_first)
{
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ must_flush = true;
+ }
}
if (must_flush)
@@ -1305,12 +1381,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
}
int
-TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
+TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
{
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (message_block,
this->orb_core_,
+ max_wait_time,
0,
1),
-1);
@@ -2387,13 +2465,7 @@ TAO_Transport::post_open (size_t id)
// If the wait strategy wants us to be registered with the reactor
// then we do so. If registeration is required and it succeeds,
// #REFCOUNT# becomes two.
- if (this->wait_strategy ()->register_handler () == 0)
- {
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
- }
- else
+ if (this->wait_strategy ()->register_handler () != 0)
{
// Registration failures.
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index a563cc42727..ea5623072c3 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -680,11 +680,17 @@ protected:
ACE_Time_Value *max_wait_time);
/// Queue a message for @a message_block
- int queue_message_i (const ACE_Message_Block *message_block);
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
public:
/// Format and queue a message for @a stream
- int format_queue_message (TAO_OutputCDR &stream);
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time);
/// Send a message block chain,
int send_message_block_chain (const ACE_Message_Block *message_block,
diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp
index 33313169d3c..24c1cd19a43 100644
--- a/TAO/tao/Transport_Connector.cpp
+++ b/TAO/tao/Transport_Connector.cpp
@@ -302,10 +302,12 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r,
base_transport) == 0)
{
if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ")
- ACE_TEXT ("found a transport [%d]\n"),
- base_transport->id()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ")
+ ACE_TEXT ("found a transport [%d]\n"),
+ base_transport->id ()));
+ }
return base_transport;
}
}
@@ -357,10 +359,12 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
t->opened_as (TAO::TAO_CLIENT_ROLE);
if (TAO_debug_level > 4)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("(%P|%t) Transport_Connector::connect, ")
- ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
- t->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport_Connector::connect, ")
+ ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
+ t->id ()));
+ }
// Call post connect hook. If the post_connect_hook () returns
// false, just purge the entry.
@@ -393,7 +397,6 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
"TAO_UNSPECIFIED_ROLE" ));
}
-
// If connected return.
if (base_transport->is_connected ())
return base_transport;
@@ -414,15 +417,17 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
timeout))
{
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector::"
- "connect, "
- "wait for completion failed\n"));
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector::"
+ "connect, "
+ "wait for completion failed\n"));
+ }
return 0;
}
if (base_transport->is_connected () &&
- base_transport->wait_strategy ()->register_handler () == 0)
+ base_transport->wait_strategy ()->register_handler () == -1)
{
// Registration failures.
@@ -434,12 +439,13 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
(void) base_transport->close_connection ();
if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector [%d]::connect, "
- "could not register the transport "
- "in the reactor.\n",
- base_transport->id ()));
-
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector [%d]::connect, "
+ "could not register the transport "
+ "in the reactor.\n",
+ base_transport->id ()));
+ }
return 0;
}
@@ -452,88 +458,121 @@ TAO_Connector::wait_for_connection_completion (
TAO_Transport *&transport,
ACE_Time_Value *timeout)
{
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "going to wait for connection completion on transport"
- "[%d]\n",
- transport->id ()));
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
+ int result = -1;
if (!r->blocked_connect ())
{
- timeout = &tmp_zero;
- }
-
- // Wait until the connection is ready, when non-blocking we just do a wait
- // with zero time
- int result =
- this->active_connect_strategy_->wait (
- transport,
- timeout);
-
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "transport [%d], wait done result = %d\n",
- transport->id (), result));
-
- // There are three possibilities when wait() returns: (a)
- // connection succeeded; (b) connection failed; (c) wait()
- // failed because of some other error. It is easy to deal with
- // (a) and (b). (c) is tricky since the connection is still
- // pending and may get completed by some other thread. The
- // following method deals with (c).
-
- if (result == -1)
- {
- if (!r->blocked_connect () && errno == ETIME)
+ if (transport->connection_handler ()->is_open ())
{
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "transport [%d], timeout, resetting state.\n",
- transport->id ()));
- transport->connection_handler()->
- reset_state(TAO_LF_Event::LFS_CONNECTION_WAIT);
- // If we did a non blocking connect, just ignore
- // any timeout errors
result = 0;
}
- else
+ else if (transport->connection_handler ()->is_timeout ())
{
- // When we need to get a connected transport
- result =
- this->check_connection_closure (
- transport->connection_handler ());
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection timed out.\n",
+ transport->id ()));
+ }
+ result = -1;
+ errno = ETIME;
}
-
- // In case of errors.
- if (result == -1)
+ else if (transport->connection_handler ()->is_closed ())
{
- // Report that making the connection failed, don't print errno
- // because we touched the reactor and errno could be changed
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection failed. (%d) %p\n",
+ transport->id (), errno, ""));
+ }
+ result = -1;
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection not complete.\n",
+ transport->id ()));
+ }
+ transport->connection_handler ()->
+ reset_state (TAO_LF_Event::LFS_CONNECTION_WAIT);
+ result = 0;
+ }
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport_Connector::"
"wait_for_connection_completion, "
- "transport [%d], wait for completion failed\n",
- transport->id()));
-
-
- // Set transport to zero, it is not usable, and the reference
- // count we added above was decremented by the base connector
- // handling the connection failure.
- transport = 0;
+ "going to wait for connection completion on transport"
+ "[%d]\n",
+ transport->id ()));
+ }
+ result = this->active_connect_strategy_->wait (transport, timeout);
- return false;
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], wait done result = %d\n",
+ transport->id (), result));
}
+ // There are three possibilities when wait() returns: (a)
+ // connection succeeded; (b) connection failed; (c) wait()
+ // failed because of some other error. It is easy to deal with
+ // (a) and (b). (c) is tricky since the connection is still
+ // pending and may get completed by some other thread. The
+ // following code deals with (c).
+
+ if (result == -1)
+ {
+ if (errno == ETIME)
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection timed out.\n",
+ transport->id ()));
+ }
+ }
+ else
+ {
+ // The wait failed for some other reason.
+ // Report that making the connection failed, don't print errno
+ // because we touched the reactor and errno could be changed
+ if (TAO_debug_level > 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], wait for completion failed (%d) %p\n",
+ transport->id (), errno, ""));
+ }
+ TAO_Connection_Handler *con = transport->connection_handler ();
+ result = this->check_connection_closure (con);
+ }
+ }
}
+ if (result == -1)
+ {
+ // Set transport to zero, it is not usable, and the reference
+ // count we added above was decremented by the base connector
+ // handling the connection failure.
+ transport = 0;
+ return false;
+ }
// Connection not ready yet but we can use this transport, if
// we need a connected one we will block later to make sure
// it is connected
@@ -559,30 +598,32 @@ TAO_Connector::wait_for_connection_completion (
count));
for (unsigned int i = 0; i < count; i++)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("%d%s"),transport[i]->id(),
+ ACE_TEXT("%d%s"),transport[i]->id (),
(i < (count -1) ? ", " : "]\n")));
}
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
+ int result = -1;
+ if (r->blocked_connect ())
{
- timeout = &tmp_zero;
+ result = this->active_connect_strategy_->wait (mev, timeout);
+ the_winner = 0;
+ }
+ else
+ {
+ errno = ETIME;
}
-
- int result = this->active_connect_strategy_->wait (mev,timeout);
- the_winner = 0;
if (result != -1)
{
the_winner = mev->winner()->transport();
if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("(%P|%t) Transport_Connector::")
- ACE_TEXT("wait_for_connection_completion, ")
- ACE_TEXT("transport [%d]\n"),
- the_winner->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Transport_Connector::")
+ ACE_TEXT ("wait_for_connection_completion, ")
+ ACE_TEXT ("transport [%d]\n"),
+ the_winner->id ()));
+ }
}
else if (errno == ETIME)
{
@@ -616,10 +657,12 @@ TAO_Connector::wait_for_connection_completion (
// Report that making the connection failed, don't print errno
// because we touched the reactor and errno could be changed
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Transport_Connector::")
- ACE_TEXT ("wait_for_connection_completion, failed\n")
- ));
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Transport_Connector::")
+ ACE_TEXT ("wait_for_connection_completion, failed\n")
+ ));
+ }
return false;
}
@@ -630,11 +673,13 @@ TAO_Connector::wait_for_connection_completion (
if (r->blocked_connect () && !the_winner->is_connected ())
{
if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "no connected transport for a blocked connection, "
- "cancelling connections and reverting things \n"));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "no connected transport for a blocked connection, "
+ "cancelling connections and reverting things \n"));
+ }
// Forget the return value. We are busted anyway. Try our best
// here.
@@ -643,9 +688,9 @@ TAO_Connector::wait_for_connection_completion (
return false;
}
- // Connection may not ready for SYNC_NONE cases but we can use this
- // transport, if we need a connected one we will block later to make
- // sure it is connected
+ // Connection may not ready for SYNC_NONE and SYNC_DELAYED_BUFFERING cases
+ // but we can use this transport, if we need a connected one we will poll
+ // later to make sure it is connected
return true;
}
diff --git a/TAO/tao/Transport_Queueing_Strategies.cpp b/TAO/tao/Transport_Queueing_Strategies.cpp
index 4de33f760dd..fbf6595904a 100644
--- a/TAO/tao/Transport_Queueing_Strategies.cpp
+++ b/TAO/tao/Transport_Queueing_Strategies.cpp
@@ -90,7 +90,7 @@ namespace TAO
must_flush = false;
set_timer = false;
- TAO_Buffering_Constraint_Policy *buffering_constraint_policy = 0;
+ TAO::BufferingConstraint buffering_constraint;
ACE_TRY_NEW_ENV
{
@@ -99,18 +99,18 @@ namespace TAO
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- TAO::BufferingConstraintPolicy_var bcp =
+ TAO::BufferingConstraintPolicy_var bcpv =
TAO::BufferingConstraintPolicy::_narrow (bcp_policy.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- buffering_constraint_policy =
- dynamic_cast<TAO_Buffering_Constraint_Policy *> (bcp.in ());
-
- if (buffering_constraint_policy == 0)
+ TAO_Buffering_Constraint_Policy* bcp =
+ dynamic_cast<TAO_Buffering_Constraint_Policy *> (bcpv.in ());
+ if (bcp == 0)
{
return true;
}
+ bcp->get_buffering_constraint (buffering_constraint);
}
ACE_CATCHANY
{
@@ -118,8 +118,6 @@ namespace TAO
}
ACE_ENDTRY;
- TAO::BufferingConstraint buffering_constraint;
- buffering_constraint_policy->get_buffering_constraint (buffering_constraint);
if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
{