summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-09 15:39:56 +0000
committerAdam Mitz <mitza-oci@users.noreply.github.com>2006-08-09 15:39:56 +0000
commit02b6be8e9fc42875d428cda382627512f6c04a53 (patch)
treedbf7b5750a89613bbe790660b096e99c30b8fed3
parent1168389f4bc5ac9ecb4c54042530ffea807e38fb (diff)
downloadATCD-02b6be8e9fc42875d428cda382627512f6c04a53.tar.gz
importing initial work on this ticket into the subversion branch
-rw-r--r--TAO/NEWS13
-rw-r--r--TAO/examples/Buffered_Oneways/client.cpp7
-rwxr-xr-xTAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp12
-rw-r--r--TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp11
-rw-r--r--TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp5
-rw-r--r--TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp16
-rw-r--r--TAO/tao/Asynch_Queued_Message.cpp35
-rw-r--r--TAO/tao/Asynch_Queued_Message.h15
-rw-r--r--TAO/tao/Block_Flushing_Strategy.cpp9
-rw-r--r--TAO/tao/Connection_Handler.h5
-rw-r--r--TAO/tao/Connection_Handler.inl6
-rw-r--r--TAO/tao/Flushing_Strategy.h6
-rw-r--r--TAO/tao/IIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/IIOP_Connector.cpp36
-rw-r--r--TAO/tao/Invocation_Adapter.cpp13
-rw-r--r--TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp11
-rw-r--r--TAO/tao/Messaging/Messaging_Policy_i.cpp7
-rw-r--r--TAO/tao/Profile_Transport_Resolver.cpp55
-rw-r--r--TAO/tao/Queued_Message.cpp6
-rw-r--r--TAO/tao/Queued_Message.h15
-rw-r--r--TAO/tao/Strategies/SCIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/SCIOP_Connector.cpp16
-rw-r--r--TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Connection_Handler.cpp4
-rw-r--r--TAO/tao/Strategies/UIOP_Connector.cpp15
-rw-r--r--TAO/tao/Synch_Invocation.cpp2
-rw-r--r--TAO/tao/Synch_Queued_Message.cpp5
-rw-r--r--TAO/tao/Transport.cpp141
-rw-r--r--TAO/tao/Transport.h10
-rw-r--r--TAO/tao/Transport_Connector.cpp271
-rw-r--r--TAO/tao/Transport_Queueing_Strategies.cpp14
-rw-r--r--TAO/tests/AMI_Buffering/client.cpp30
-rwxr-xr-xTAO/tests/AMI_Buffering/run_buffer_size.pl2
-rwxr-xr-xTAO/tests/AMI_Buffering/run_message_count.pl2
-rwxr-xr-xTAO/tests/AMI_Buffering/run_test.pl2
-rwxr-xr-xTAO/tests/AMI_Buffering/run_timeout.pl2
-rwxr-xr-xTAO/tests/AMI_Buffering/run_timeout_reactive.pl2
-rw-r--r--TAO/tests/Oneway_Buffering/client.cpp39
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_buffer_size.pl2
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_message_count.pl2
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_test.pl2
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_timeout.pl2
-rwxr-xr-xTAO/tests/Oneway_Buffering/run_timeout_reactive.pl2
-rw-r--r--TAO/tests/Oneway_Timeouts/Test.idl4
-rw-r--r--TAO/tests/Oneway_Timeouts/client.cpp488
-rwxr-xr-xTAO/tests/Oneway_Timeouts/run_test.pl392
-rw-r--r--TAO/tests/Oneway_Timeouts/server.cpp318
-rw-r--r--TAO/tests/Oneway_Timeouts/test.mpc26
-rw-r--r--TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp3
-rw-r--r--TAO/tests/Timed_Buffered_Oneways/client.cpp186
-rwxr-xr-xTAO/tests/Timed_Buffered_Oneways/run_test.pl14
-rw-r--r--TAO/tests/Timed_Buffered_Oneways/server.cpp2
-rw-r--r--TAO/tests/Timed_Buffered_Oneways/test.idl5
-rw-r--r--TAO/tests/Timed_Buffered_Oneways/test_i.cpp29
-rw-r--r--TAO/tests/Timed_Buffered_Oneways/test_i.h7
57 files changed, 1901 insertions, 443 deletions
diff --git a/TAO/NEWS b/TAO/NEWS
index 3aa6c6defd3..2ef85cea403 100644
--- a/TAO/NEWS
+++ b/TAO/NEWS
@@ -27,7 +27,18 @@ USER VISIBLE CHANGES BETWEEN TAO-1.5.2 and TAO-1.5.3
. Fixed a compatibility problem between TAO and ORBs using Valuetype ID
indirection, such as JacORB. This problem was inadvertently introduced
- in 1.5.2
+ in 1.5.2.
+
+. Oneway requests when using SYNC_NONE, SYNC_DELAYED_BUFFERING, or
+ SYNC_EAGER_BUFFERING will be queued until the connection is completed.
+ Connection completion occurs when one of the following occurs:
+ a) A one-way request that does NOT use one of the sync-scopes mentioned
+ above is sent via the same transport
+ b) A two-way request is sent via the same transport
+ c) orb->run() is called
+ Applications that do not currently do one of the above will no longer
+ establish a connection and therefore no data will be sent.
+
USER VISIBLE CHANGES BETWEEN TAO-1.5.1 and TAO-1.5.2
====================================================
diff --git a/TAO/examples/Buffered_Oneways/client.cpp b/TAO/examples/Buffered_Oneways/client.cpp
index 7851ec8638e..8f1469da973 100644
--- a/TAO/examples/Buffered_Oneways/client.cpp
+++ b/TAO/examples/Buffered_Oneways/client.cpp
@@ -230,6 +230,13 @@ main (int argc, char **argv)
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
+ if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
+ {
+ ACE_ERROR((LM_ERROR, "Error : Must specify a timeout, message"
+ " count, or message bytes constraint.\n"));
+ return 1;
+ }
+
// Setup the constraints.
policy_current->set_policy_overrides (buffering_constraint_policy_list,
CORBA::ADD_OVERRIDE
diff --git a/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp b/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp
index b53265cbfa5..638639dfb10 100755
--- a/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp
+++ b/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp
@@ -87,11 +87,9 @@ TAO_FT_Request_Duration_Policy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs =
- static_cast<CORBA::ULong> (microseconds / 1000);
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
@@ -182,11 +180,9 @@ TAO_FT_Heart_Beat_Policy::set_time_value (ACE_Time_Value &time_value,
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs =
- static_cast<CORBA::ULong> (microseconds / 1000);
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
diff --git a/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp
index c2a2dde9735..720f93a2a2f 100644
--- a/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp
+++ b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp
@@ -190,14 +190,9 @@ TAO::HTIOP::Connector::make_connection (TAO::Profile_Transport_Resolver *r,
this->active_connect_strategy_->synch_options (timeout,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- timeout = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
// This is where we need to set the ACE::HTBP::Stream to the connection
// handler.
diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp
index df2fbd006c7..dbe916788af 100644
--- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp
+++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp
@@ -212,7 +212,9 @@ TAO_UIPMC_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp
index 51a3db9a486..61ae03facc1 100644
--- a/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp
+++ b/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp
@@ -215,6 +215,11 @@ TAO::IIOP_SSL_Connector::make_connection (
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
+
// At this point, the connection has be successfully connected.
// #REFCOUNT# is one.
if (TAO_debug_level > 2)
diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp
index 57a6397a818..9bf664bb848 100644
--- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp
+++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp
@@ -281,7 +281,9 @@ TAO::SSLIOP::Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp
index 3635555cb98..32174c6b252 100644
--- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp
+++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp
@@ -635,14 +635,9 @@ TAO::SSLIOP::Connector::ssliop_connect (
this->active_connect_strategy_->synch_options (max_wait_time,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!resolver->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- max_wait_time = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
// We obtain the transport in the <svc_handler> variable. As we
// know now that the connection is not available in Cache we can
@@ -710,6 +705,11 @@ TAO::SSLIOP::Connector::ssliop_connect (
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
+
// At this point, the connection has be successfully connected.
// #REFCOUNT# is one.
if (TAO_debug_level > 2)
diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp
index 7c6ec5020e8..cbdd57b7e08 100644
--- a/TAO/tao/Asynch_Queued_Message.cpp
+++ b/TAO/tao/Asynch_Queued_Message.cpp
@@ -8,7 +8,7 @@
#include "ace/Log_Msg.h"
#include "ace/Message_Block.h"
#include "ace/Malloc_Base.h"
-
+#include "ace/High_Res_Timer.h"
ACE_RCSID (tao,
Asynch_Queued_Message,
@@ -19,12 +19,16 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL
TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
const ACE_Message_Block *contents,
TAO_ORB_Core *oc,
+ ACE_Time_Value *timeout,
ACE_Allocator *alloc,
bool is_heap_allocated)
: TAO_Queued_Message (oc, alloc, is_heap_allocated)
, size_ (contents->total_length ())
, offset_ (0)
+ , abs_timeout_ (ACE_Time_Value::zero)
{
+ if (timeout != 0 && *timeout != ACE_Time_Value::zero)
+ this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
// @@ Use a pool for these guys!!
ACE_NEW (this->buffer_, char[this->size_]);
@@ -43,11 +47,13 @@ TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
TAO_ORB_Core *oc,
size_t size,
+ const ACE_Time_Value &abs_timeout,
ACE_Allocator *alloc)
- : TAO_Queued_Message (oc, alloc)
+ : TAO_Queued_Message (oc, alloc, 0)
, size_ (size)
, offset_ (0)
, buffer_ (buf)
+ , abs_timeout_ (abs_timeout)
{
}
@@ -133,6 +139,7 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
TAO_Asynch_Queued_Message (buf,
this->orb_core_,
sz,
+ this->abs_timeout_,
alloc),
0);
}
@@ -150,7 +157,8 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
ACE_NEW_RETURN (qm,
TAO_Asynch_Queued_Message (buf,
this->orb_core_,
- sz),
+ sz,
+ this->abs_timeout_),
0);
}
@@ -180,7 +188,28 @@ TAO_Asynch_Queued_Message::destroy (void)
delete this;
}
}
+}
+bool
+TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Asynch_Queued_Message::is_expired - "
+ "Age of message is <%dms> this = %x.\n",
+ (now - this->abs_timeout_).msec (), this));
+ if (this->offset_ > 0)
+ {
+ // This debug is for testing purposes!
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Asynch_Queued_Message::is_expired - "
+ "Can't expire message due to partial send. \n"));
+ return false; //never expire partial messages
+ }
+ if (this->abs_timeout_ > ACE_Time_Value::zero)
+ {
+ return this->abs_timeout_ < now;
+ }
+ return false;
}
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h
index 045f3dcd4fd..991a9f07063 100644
--- a/TAO/tao/Asynch_Queued_Message.h
+++ b/TAO/tao/Asynch_Queued_Message.h
@@ -17,12 +17,15 @@
#include "tao/Queued_Message.h"
+#include "ace/Time_Value.h"
+
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+class ACE_Message_Block;
/**
* @class TAO_Asynch_Queued_Message
*
@@ -39,6 +42,9 @@ public:
*
* @param alloc Allocator used for creating @c this object.
*
+ * @param timeout The relative timeout after which this
+ * message should be expired.
+ *
* @todo I'm almost sure this class will require a callback
* interface for AMIs sent with SYNC_NONE policy. Those guys
* need to hear when the connection timeouts or closes, but
@@ -46,6 +52,7 @@ public:
*/
TAO_Asynch_Queued_Message (const ACE_Message_Block *contents,
TAO_ORB_Core *oc,
+ ACE_Time_Value *timeout,
ACE_Allocator *alloc = 0,
bool is_heap_allocated = false);
@@ -65,6 +72,7 @@ public:
/// it here for the sake of uniformity.
virtual TAO_Queued_Message *clone (ACE_Allocator *alloc);
virtual void destroy (void);
+ virtual bool is_expired (const ACE_Time_Value &now) const;
//@}
protected:
@@ -78,11 +86,14 @@ protected:
* @param size The size of the buffer <buf> that is being handed
* over.
*
+ * @param abs_timeout The time after which this message should be expired.
+ *
* @param alloc Allocator used for creating <this> object.
*/
TAO_Asynch_Queued_Message (char *buf,
TAO_ORB_Core *oc,
size_t size,
+ const ACE_Time_Value &abs_timeout,
ACE_Allocator *alloc = 0);
private:
/// The number of bytes in the buffer
@@ -97,6 +108,10 @@ private:
/// The buffer containing the complete message.
char *buffer_;
+
+ // Expiration time
+ ACE_Time_Value abs_timeout_;
+
};
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp
index 8e0282ecfdd..87082f19819 100644
--- a/TAO/tao/Block_Flushing_Strategy.cpp
+++ b/TAO/tao/Block_Flushing_Strategy.cpp
@@ -9,14 +9,9 @@ ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$")
TAO_BEGIN_VERSIONED_NAMESPACE_DECL
int
-TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *transport)
+TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *)
{
- while (!transport->queue_is_empty_i ())
- {
- if (transport->drain_queue_i () == -1)
- return -1;
- }
- return 0;
+ return MUST_FLUSH;
}
int
diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h
index e17ff6af08f..2342390dd5c 100644
--- a/TAO/tao/Connection_Handler.h
+++ b/TAO/tao/Connection_Handler.h
@@ -70,12 +70,15 @@ public:
/// Set the underlying transport object
void transport (TAO_Transport* transport);
- /// Is the handler closed?
+ /// Is the handler closed or timed out?
bool is_closed (void) const;
/// Is the handler open?
bool is_open (void) const;
+ /// Closed due to timeout?
+ bool is_timeout (void) const;
+
/// Is the handler in the process of being connected?
bool is_connecting (void) const;
diff --git a/TAO/tao/Connection_Handler.inl b/TAO/tao/Connection_Handler.inl
index 5841cf3aebc..48e1b0b1b41 100644
--- a/TAO/tao/Connection_Handler.inl
+++ b/TAO/tao/Connection_Handler.inl
@@ -24,6 +24,12 @@ TAO_Connection_Handler::is_closed (void) const
}
ACE_INLINE bool
+TAO_Connection_Handler::is_timeout (void) const
+{
+ return (this->state_ == TAO_LF_Event::LFS_TIMEOUT);
+}
+
+ACE_INLINE bool
TAO_Connection_Handler::is_open (void) const
{
return this->state_ == TAO_LF_Event::LFS_SUCCESS;
diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h
index ead55aec8f2..61c9b51de10 100644
--- a/TAO/tao/Flushing_Strategy.h
+++ b/TAO/tao/Flushing_Strategy.h
@@ -56,7 +56,13 @@ public:
/// Destructor
virtual ~TAO_Flushing_Strategy (void);
+ enum SCHEDULE_OUTPUT_RETURN { MUST_FLUSH = -2 };
+
/// Schedule the transport argument to be flushed
+ /// If -2 is returned then the caller must call one of
+ /// the flush_* methods.
+ /// If -1 is returned then there was an error.
+ /// If 0 is returned then the flush was scheduled successfully.
virtual int schedule_output (TAO_Transport *transport) = 0;
/// Cancel all scheduled output for the transport argument
diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp
index 0f59383abe5..6a56bb9e0b9 100644
--- a/TAO/tao/IIOP_Connection_Handler.cpp
+++ b/TAO/tao/IIOP_Connection_Handler.cpp
@@ -322,7 +322,9 @@ TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp
index ec2165a07e4..4d98fe433ff 100644
--- a/TAO/tao/IIOP_Connector.cpp
+++ b/TAO/tao/IIOP_Connector.cpp
@@ -60,6 +60,7 @@ TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard
TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void)
{
+ ACE_Errno_Guard eguard (errno);
if (this->ptr_ != 0)
{
for (unsigned i = 0; i < this->count_; i++)
@@ -205,9 +206,10 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint;
TAO_LF_Multi_Event mev;
mev.add_event(svc_handler);
- return this->complete_connection (result, desc,
- sh_ptr, ep_ptr,
- 1U, r, &mev, timeout);
+ TAO_Transport* tp = this->complete_connection (result, desc,
+ sh_ptr, ep_ptr,
+ 1U, r, &mev, timeout);
+ return tp;
}
TAO_Transport *
@@ -327,14 +329,9 @@ TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler,
this->active_connect_strategy_->synch_options (timeout,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- timeout = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
svc_handler = 0;
@@ -427,7 +424,7 @@ TAO_IIOP_Connector::complete_connection (int result,
}
}
}
- // At this point, the connection has be successfully created
+ // At this point, the connection has been successfully created
// connected or not connected, but we have a connection.
TAO_IIOP_Connection_Handler *svc_handler = 0;
TAO_IIOP_Endpoint *iiop_endpoint = 0;
@@ -461,14 +458,15 @@ TAO_IIOP_Connector::complete_connection (int result,
if (TAO_debug_level > 3)
{
for (unsigned i = 0; i < count; i++)
- ACE_DEBUG ((LM_ERROR,
- ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ")
- ACE_TEXT("connection to <%s:%d> failed (%p)\n"),
- ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()),
- ep_list[i]->port (),
- ACE_TEXT("errno")));
+ {
+ ACE_DEBUG ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection,")
+ ACE_TEXT (" connection to <%s:%d> failed (%p)\n"),
+ ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()),
+ ep_list[i]->port (),
+ ACE_TEXT("errno")));
+ }
}
-
return 0;
}
diff --git a/TAO/tao/Invocation_Adapter.cpp b/TAO/tao/Invocation_Adapter.cpp
index 39ac727b340..17d91b4db07 100644
--- a/TAO/tao/Invocation_Adapter.cpp
+++ b/TAO/tao/Invocation_Adapter.cpp
@@ -12,6 +12,8 @@
#include "tao/Transport_Mux_Strategy.h"
#include "tao/Collocation_Proxy_Broker.h"
#include "tao/GIOP_Utils.h"
+#include "tao/TAOC.h"
+
#if !defined (__ACE_INLINE__)
# include "tao/Invocation_Adapter.inl"
#endif /* __ACE_INLINE__ */
@@ -65,8 +67,6 @@ namespace TAO
// Initial state
TAO::Invocation_Status status = TAO_INVOKE_START;
- ACE_Time_Value *max_wait_time = 0;
-
while (status == TAO_INVOKE_START ||
status == TAO_INVOKE_RESTART)
{
@@ -88,6 +88,7 @@ namespace TAO
if (strat == TAO_CS_REMOTE_STRATEGY ||
strat == TAO_CS_LAST)
{
+ ACE_Time_Value *max_wait_time = 0;
status =
this->invoke_remote_i (stub,
details,
@@ -262,12 +263,18 @@ namespace TAO
(void) this->set_response_flags (stub,
details);
+ CORBA::Octet rflags = details.response_flags ();
+ bool block_connect =
+ rflags != static_cast<CORBA::Octet> (Messaging::SYNC_NONE)
+ && rflags != static_cast<CORBA::Octet> (TAO::SYNC_EAGER_BUFFERING)
+ && rflags != static_cast<CORBA::Octet> (TAO::SYNC_DELAYED_BUFFERING);
+
// Create the resolver which will pick (or create) for us a
// transport and a profile from the effective_target.
Profile_Transport_Resolver resolver (
effective_target.in (),
stub,
- (details.response_flags () != Messaging::SYNC_NONE));
+ block_connect);
resolver.resolve (max_wait_time
ACE_ENV_ARG_PARAMETER);
diff --git a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
index d17338aae9c..b621a85b375 100644
--- a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
+++ b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp
@@ -103,11 +103,9 @@ TAO_ConnectionTimeoutPolicy::hook (TAO_ORB_Core *orb_core,
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs =
- static_cast<CORBA::ULong> (microseconds / 1000);
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
ACE_CATCHANY
@@ -188,10 +186,9 @@ TAO_ConnectionTimeoutPolicy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong const msecs = time_value.msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
diff --git a/TAO/tao/Messaging/Messaging_Policy_i.cpp b/TAO/tao/Messaging/Messaging_Policy_i.cpp
index 35d3d25a37f..39d4cc2b50a 100644
--- a/TAO/tao/Messaging/Messaging_Policy_i.cpp
+++ b/TAO/tao/Messaging/Messaging_Policy_i.cpp
@@ -107,7 +107,7 @@ TAO_RelativeRoundtripTimeoutPolicy::hook (TAO_ORB_Core *orb_core,
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
+ ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"),
time_value.msec ()));
}
}
@@ -189,10 +189,9 @@ TAO_RelativeRoundtripTimeoutPolicy::set_time_value (ACE_Time_Value &time_value)
if (TAO_debug_level > 0)
{
- CORBA::ULong msecs = time_value.msec ();
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"),
- msecs));
+ ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"),
+ time_value.msec ()));
}
}
diff --git a/TAO/tao/Profile_Transport_Resolver.cpp b/TAO/tao/Profile_Transport_Resolver.cpp
index 81d3a67f52d..30cc30341be 100644
--- a/TAO/tao/Profile_Transport_Resolver.cpp
+++ b/TAO/tao/Profile_Transport_Resolver.cpp
@@ -123,28 +123,28 @@ namespace TAO
bool
Profile_Transport_Resolver::try_connect (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value
+ ACE_Time_Value *timeout
ACE_ENV_ARG_DECL
)
{
- return this->try_connect_i (desc,max_time_value,0 ACE_ENV_ARG_PARAMETER);
+ return this->try_connect_i (desc, timeout, 0 ACE_ENV_ARG_PARAMETER);
}
bool
Profile_Transport_Resolver::try_parallel_connect (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value
+ ACE_Time_Value *timeout
ACE_ENV_ARG_DECL
)
{
- return this->try_connect_i (desc,max_time_value,1 ACE_ENV_ARG_PARAMETER);
+ return this->try_connect_i (desc, timeout, 1 ACE_ENV_ARG_PARAMETER);
}
bool
Profile_Transport_Resolver::try_connect_i (
TAO_Transport_Descriptor_Interface *desc,
- ACE_Time_Value *max_time_value,
+ ACE_Time_Value *timeout,
bool parallel
ACE_ENV_ARG_DECL
)
@@ -165,41 +165,40 @@ namespace TAO
}
ACE_Time_Value connection_timeout;
+ bool has_con_timeout = this->get_connection_timeout (connection_timeout);
- bool const is_conn_timeout =
- this->get_connection_timeout (connection_timeout);
-
- ACE_Time_Value *max_wait_time =
- is_conn_timeout ? &connection_timeout : max_time_value;
+ if (has_con_timeout && !blocked_)
+ {
+ timeout = &connection_timeout;
+ }
+ else if (has_con_timeout)
+ {
+ if (timeout == 0 || connection_timeout < *timeout)
+ timeout = &connection_timeout;
+ }
+ else if (!blocked_)
+ {
+ timeout = 0;
+ }
+ TAO_Connector *con = conn_reg->get_connector (desc->endpoint ()->tag ());
+ ACE_ASSERT(con != 0);
if (parallel)
{
- this->transport_ =
- conn_reg->get_connector (desc->endpoint ()->tag ())->
- parallel_connect (this,
- desc,
- max_wait_time
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (false);
+ this->transport_ = con->parallel_connect (this, desc, timeout
+ ACE_ENV_ARG_PARAMETER);
}
else
{
- // Obtain a connection.
- this->transport_ =
- conn_reg->get_connector (desc->endpoint ()->tag ())->
- connect (this,
- desc,
- max_wait_time
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK_RETURN (false);
+ this->transport_ = con->connect (this, desc, timeout
+ ACE_ENV_ARG_PARAMETER);
}
+ ACE_CHECK_RETURN (false);
// A timeout error occurred.
// If the user has set a roundtrip timeout policy, throw a timeout
// exception. Otherwise, just fall through and return false to
// look at the next endpoint.
- if (this->transport_ == 0
- && is_conn_timeout == false
- && errno == ETIME)
+ if (this->transport_ == 0 && errno == ETIME)
{
ACE_THROW_RETURN (CORBA::TIMEOUT (
CORBA::SystemException::_tao_minor_code (
diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp
index 8b4b73072c9..19cba1406a6 100644
--- a/TAO/tao/Queued_Message.cpp
+++ b/TAO/tao/Queued_Message.cpp
@@ -102,4 +102,10 @@ TAO_Queued_Message::push_front (TAO_Queued_Message *&head,
}
}
+bool
+TAO_Queued_Message::is_expired (const ACE_Time_Value &) const
+{
+ return false;
+}
+
TAO_END_VERSIONED_NAMESPACE_DECL
diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h
index 70defaa15cd..c8e8883fd11 100644
--- a/TAO/tao/Queued_Message.h
+++ b/TAO/tao/Queued_Message.h
@@ -17,6 +17,7 @@
#include "tao/LF_Invocation_Event.h"
#include "ace/os_include/os_stddef.h"
+#include "ace/Time_Value.h"
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
@@ -76,8 +77,8 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Invocation_Event
public:
/// Constructor
TAO_Queued_Message (TAO_ORB_Core *oc,
- ACE_Allocator *alloc = 0,
- bool is_heap_allocated = false);
+ ACE_Allocator *alloc,
+ bool is_heap_allocated);
/// Destructor
virtual ~TAO_Queued_Message (void);
@@ -194,6 +195,16 @@ public:
* a pool), they need to be reclaimed explicitly.
*/
virtual void destroy (void) = 0;
+
+ /// Check for timeout
+ /**
+ * @param now Pass in the current time using
+ * ACE_High_Res_Timer::gettimeofday_hr().
+ * This is a parameter in order to avoid calling gettimeofday_hr() inside
+ * of this method (which will be called in a tight loop).
+ * @return true if the relative roundtrip timeout has expired.
+ */
+ virtual bool is_expired (const ACE_Time_Value &now) const;
//@}
protected:
diff --git a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
index dd83f2fac6e..5df628bcc5c 100644
--- a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp
@@ -250,7 +250,9 @@ TAO_SCIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/SCIOP_Connector.cpp b/TAO/tao/Strategies/SCIOP_Connector.cpp
index 663bccdb60a..70f61e87ae0 100644
--- a/TAO/tao/Strategies/SCIOP_Connector.cpp
+++ b/TAO/tao/Strategies/SCIOP_Connector.cpp
@@ -172,14 +172,9 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r,
this->active_connect_strategy_->synch_options (timeout,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- timeout = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
TAO_SCIOP_Connection_Handler *svc_handler = 0;
@@ -268,6 +263,11 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r,
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
+
// At this point, the connection has be successfully connected.
// #REFCOUNT# is one.
if (TAO_debug_level > 2)
diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
index 788407dfd7d..a584be7e368 100644
--- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp
@@ -204,7 +204,9 @@ TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
index ada81ab9075..f7f754cb197 100644
--- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
+++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp
@@ -183,7 +183,9 @@ TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &,
// We don't use this upcall for I/O. This is only used by the
// Connector to indicate that the connection timedout. Therefore,
// we should call close().
- return this->close ();
+ int ret = this->close ();
+ this->reset_state (TAO_LF_Event::LFS_TIMEOUT);
+ return ret;
}
int
diff --git a/TAO/tao/Strategies/UIOP_Connector.cpp b/TAO/tao/Strategies/UIOP_Connector.cpp
index efce00f17d2..3bbec64d87e 100644
--- a/TAO/tao/Strategies/UIOP_Connector.cpp
+++ b/TAO/tao/Strategies/UIOP_Connector.cpp
@@ -169,14 +169,9 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
this->active_connect_strategy_->synch_options (max_wait_time,
synch_options);
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
- {
- synch_options.timeout (ACE_Time_Value::zero);
- max_wait_time = &tmp_zero;
- }
+ // The code used to set the timeout to zero, with the intent of
+ // polling the reactor for connection completion. However, the side-effect
+ // was to cause the connection to timeout immediately.
TAO_UIOP_Connection_Handler *svc_handler = 0;
@@ -248,6 +243,10 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r,
return 0;
}
+ if (transport->connection_handler ()->keep_waiting ())
+ {
+ svc_handler->add_reference ();
+ }
// At this point, the connection has be successfully created
// connected or not connected, but we have a connection.
diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp
index bff4141a033..e38207cdbfa 100644
--- a/TAO/tao/Synch_Invocation.cpp
+++ b/TAO/tao/Synch_Invocation.cpp
@@ -772,7 +772,7 @@ namespace TAO
"TAO (%P|%t) - Synch_Oneway_Invocation::"
"remote_oneway, queueing message\n"));
- if (transport->format_queue_message (cdr) != 0)
+ if (transport->format_queue_message (cdr, max_wait_time) != 0)
s = TAO_INVOKE_FAILURE;
}
diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp
index 59fecb37311..afa14627a05 100644
--- a/TAO/tao/Synch_Queued_Message.cpp
+++ b/TAO/tao/Synch_Queued_Message.cpp
@@ -129,7 +129,8 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
TAO_Synch_Queued_Message (mb,
this->orb_core_,
- alloc),
+ alloc,
+ 0),
0);
}
else
@@ -144,7 +145,7 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
}
ACE_NEW_RETURN (qm,
- TAO_Synch_Queued_Message (mb, this->orb_core_),
+ TAO_Synch_Queued_Message (mb, this->orb_core_, 0, 0),
0);
}
diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp
index 7f3c7ebf4b2..b132d4ee612 100644
--- a/TAO/tao/Transport.cpp
+++ b/TAO/tao/Transport.cpp
@@ -27,6 +27,7 @@
#include "ace/OS_NS_stdio.h"
#include "ace/Reactor.h"
#include "ace/os_include/sys/os_uio.h"
+#include "ace/High_Res_Timer.h"
/*
* Specialization hook to add include files from
@@ -478,12 +479,13 @@ TAO_Transport::handle_output (void)
}
int
-TAO_Transport::format_queue_message (TAO_OutputCDR &stream)
+TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time)
{
if (this->messaging_object ()->format_message (stream) != 0)
return -1;
- return this->queue_message_i (stream.begin());
+ return this->queue_message_i (stream.begin (), max_wait_time);
}
int
@@ -503,7 +505,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
size_t &bytes_transferred,
ACE_Time_Value *)
{
- size_t const total_length = mb->total_length ();
+ const size_t total_length = mb->total_length ();
// We are going to block, so there is no need to clone
// the message block.
@@ -512,7 +514,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
synch_message.push_back (this->head_, this->tail_);
- int const n = this->drain_queue_i ();
+ const int n = this->drain_queue_i ();
if (n == -1)
{
@@ -541,14 +543,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// We are going to block, so there is no need to clone
// the message block.
TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
+ const size_t message_length = synch_message.message_length ();
- // Push synch_message on to the back of the queue.
synch_message.push_back (this->head_, this->tail_);
- int const n =
- this->send_synch_message_helper_i (synch_message,
- max_wait_time);
-
+ const int n = this->send_synch_message_helper_i (synch_message,
+ 0 /*ignored*/);
if (n == -1 || n == 1)
{
return n;
@@ -558,18 +558,30 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// if (max_wait_time != 0 && errno == ETIME) return -1;
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == -1)
+ {
+ synch_message.remove_from_list (this->head_, this->tail_);
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_synchronous_message_i, ")
+ ACE_TEXT ("error while scheduling flush - %m\n"),
+ this->id ()));
+ }
+ return -1;
+ }
+
+ // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
+ // because we're always going to flush anyway.
// Release the mutex, other threads may modify the queue as we
// block for a long time writing out data.
- int result;
{
typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
TAO_REVERSE_LOCK reverse (*this->handler_lock_);
- ACE_GUARD_RETURN (TAO_REVERSE_LOCK,
- ace_mon,
- reverse,
- -1);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
result = flushing_strategy->flush_message (this,
&synch_message,
@@ -582,7 +594,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
if (errno == ETIME)
{
- if (this->head_ == &synch_message)
+ // If partially sent, then we must queue the remainder.
+ if (message_length != synch_message.message_length ())
{
// This is a timeout, there is only one nasty case: the
// message has been partially sent! We simply cannot take
@@ -597,13 +610,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
// and figuring out the request ID would be a bit of a
// nightmare.
//
-
- synch_message.remove_from_list (this->head_, this->tail_);
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (
synch_message.current_block (),
this->orb_core_,
+ 0, // no timeout
0,
1),
-1);
@@ -682,6 +694,13 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
msg->remove_from_list (this->head_, this->tail_);
msg->destroy ();
}
+ else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_message(this, msg, 0);
+ }
return 1;
}
@@ -797,7 +816,14 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
TAO_Flushing_Strategy *flushing_strategy =
this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
+ TAO_REVERSE_LOCK reverse (*this->handler_lock_);
+ ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
+ (void) flushing_strategy->flush_transport (this);
+ }
}
return 0;
@@ -920,8 +946,29 @@ TAO_Transport::drain_queue_i (void)
// call.
this->sent_byte_count_ = 0;
+ // Avoid calling this expensive function each time through the loop. Instead
+ // we'll assume that the time is unlikely to change much during the loop.
+ // If we are forced to send in the loop then we'll recompute the time.
+ ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
+
while (i != 0)
{
+ if (i->is_expired (now))
+ {
+ if (TAO_debug_level > 3)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ")
+ ACE_TEXT ("Discarding expired queued message.\n"),
+ this->id ()));
+ }
+ i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
+ this->orb_core_->leader_follower ());
+ i->remove_from_list (this->head_, this->tail_);
+ i->destroy ();
+ i = this->head_;
+ continue;
+ }
// ... each element fills the iovector ...
i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
@@ -933,6 +980,8 @@ TAO_Transport::drain_queue_i (void)
int const retval =
this->drain_queue_helper (iovcnt, iov);
+ now = ACE_High_Res_Timer::gettimeofday_hr ();
+
if (TAO_debug_level > 4)
{
ACE_DEBUG ((LM_DEBUG,
@@ -999,11 +1048,19 @@ TAO_Transport::cleanup_queue_i ()
this->id ()));
}
+ int byte_count = 0;
+ int msg_count = 0;
+
// Cleanup all messages
while (this->head_ != 0)
{
TAO_Queued_Message *i = this->head_;
+ if (TAO_debug_level > 4)
+ {
+ byte_count += i->message_length();
+ ++msg_count;
+ }
// @@ This is a good point to insert a flag to indicate that a
// CloseConnection message was successfully received.
i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
@@ -1012,7 +1069,15 @@ TAO_Transport::cleanup_queue_i ()
i->remove_from_list (this->head_, this->tail_);
i->destroy ();
- }
+ }
+
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
+ ACE_TEXT ("discarded %d messages, %d bytes.\n"),
+ this->id (), msg_count, byte_count));
+ }
}
void
@@ -1232,6 +1297,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
return 0;
}
+ // If it was partially sent, then we can't allow a timeout
+ if (byte_count > 0)
+ max_wait_time = 0;
+
if (TAO_debug_level > 6)
{
ACE_DEBUG ((LM_DEBUG,
@@ -1262,14 +1331,16 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
this->id ()));
}
- if (this->queue_message_i(message_block) == -1)
+ if (this->queue_message_i (message_block, max_wait_time) == -1)
{
if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
- ACE_TEXT ("cannot queue message for ")
- ACE_TEXT (" - %m\n"),
- this->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
+ ACE_TEXT ("send_asynchronous_message_i, ")
+ ACE_TEXT ("cannot queue message for - %m\n"),
+ this->id ()));
+ }
return -1;
}
@@ -1289,7 +1360,11 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
if (constraints_reached || try_sending_first)
{
- (void) flushing_strategy->schedule_output (this);
+ int result = flushing_strategy->schedule_output (this);
+ if (result == TAO_Flushing_Strategy::MUST_FLUSH)
+ {
+ must_flush = true;
+ }
}
if (must_flush)
@@ -1305,12 +1380,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
}
int
-TAO_Transport::queue_message_i(const ACE_Message_Block *message_block)
+TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
{
TAO_Queued_Message *queued_message = 0;
ACE_NEW_RETURN (queued_message,
TAO_Asynch_Queued_Message (message_block,
this->orb_core_,
+ max_wait_time,
0,
1),
-1);
@@ -2387,13 +2464,7 @@ TAO_Transport::post_open (size_t id)
// If the wait strategy wants us to be registered with the reactor
// then we do so. If registeration is required and it succeeds,
// #REFCOUNT# becomes two.
- if (this->wait_strategy ()->register_handler () == 0)
- {
- TAO_Flushing_Strategy *flushing_strategy =
- this->orb_core ()->flushing_strategy ();
- (void) flushing_strategy->schedule_output (this);
- }
- else
+ if (this->wait_strategy ()->register_handler () != 0)
{
// Registration failures.
diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h
index a563cc42727..ea5623072c3 100644
--- a/TAO/tao/Transport.h
+++ b/TAO/tao/Transport.h
@@ -680,11 +680,17 @@ protected:
ACE_Time_Value *max_wait_time);
/// Queue a message for @a message_block
- int queue_message_i (const ACE_Message_Block *message_block);
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int queue_message_i (const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time);
public:
/// Format and queue a message for @a stream
- int format_queue_message (TAO_OutputCDR &stream);
+ /// @param max_wait_time The maximum time that the operation can
+ /// block, used in the implementation of timeouts.
+ int format_queue_message (TAO_OutputCDR &stream,
+ ACE_Time_Value *max_wait_time);
/// Send a message block chain,
int send_message_block_chain (const ACE_Message_Block *message_block,
diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp
index 33313169d3c..24c1cd19a43 100644
--- a/TAO/tao/Transport_Connector.cpp
+++ b/TAO/tao/Transport_Connector.cpp
@@ -302,10 +302,12 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r,
base_transport) == 0)
{
if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ")
- ACE_TEXT ("found a transport [%d]\n"),
- base_transport->id()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ")
+ ACE_TEXT ("found a transport [%d]\n"),
+ base_transport->id ()));
+ }
return base_transport;
}
}
@@ -357,10 +359,12 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
t->opened_as (TAO::TAO_CLIENT_ROLE);
if (TAO_debug_level > 4)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("(%P|%t) Transport_Connector::connect, ")
- ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
- t->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("(%P|%t) Transport_Connector::connect, ")
+ ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"),
+ t->id ()));
+ }
// Call post connect hook. If the post_connect_hook () returns
// false, just purge the entry.
@@ -393,7 +397,6 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
"TAO_UNSPECIFIED_ROLE" ));
}
-
// If connected return.
if (base_transport->is_connected ())
return base_transport;
@@ -414,15 +417,17 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
timeout))
{
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector::"
- "connect, "
- "wait for completion failed\n"));
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector::"
+ "connect, "
+ "wait for completion failed\n"));
+ }
return 0;
}
if (base_transport->is_connected () &&
- base_transport->wait_strategy ()->register_handler () == 0)
+ base_transport->wait_strategy ()->register_handler () == -1)
{
// Registration failures.
@@ -434,12 +439,13 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r,
(void) base_transport->close_connection ();
if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - Transport_Connector [%d]::connect, "
- "could not register the transport "
- "in the reactor.\n",
- base_transport->id ()));
-
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector [%d]::connect, "
+ "could not register the transport "
+ "in the reactor.\n",
+ base_transport->id ()));
+ }
return 0;
}
@@ -452,88 +458,121 @@ TAO_Connector::wait_for_connection_completion (
TAO_Transport *&transport,
ACE_Time_Value *timeout)
{
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "going to wait for connection completion on transport"
- "[%d]\n",
- transport->id ()));
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
+ int result = -1;
if (!r->blocked_connect ())
{
- timeout = &tmp_zero;
- }
-
- // Wait until the connection is ready, when non-blocking we just do a wait
- // with zero time
- int result =
- this->active_connect_strategy_->wait (
- transport,
- timeout);
-
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "transport [%d], wait done result = %d\n",
- transport->id (), result));
-
- // There are three possibilities when wait() returns: (a)
- // connection succeeded; (b) connection failed; (c) wait()
- // failed because of some other error. It is easy to deal with
- // (a) and (b). (c) is tricky since the connection is still
- // pending and may get completed by some other thread. The
- // following method deals with (c).
-
- if (result == -1)
- {
- if (!r->blocked_connect () && errno == ETIME)
+ if (transport->connection_handler ()->is_open ())
{
- if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "transport [%d], timeout, resetting state.\n",
- transport->id ()));
- transport->connection_handler()->
- reset_state(TAO_LF_Event::LFS_CONNECTION_WAIT);
- // If we did a non blocking connect, just ignore
- // any timeout errors
result = 0;
}
- else
+ else if (transport->connection_handler ()->is_timeout ())
{
- // When we need to get a connected transport
- result =
- this->check_connection_closure (
- transport->connection_handler ());
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection timed out.\n",
+ transport->id ()));
+ }
+ result = -1;
+ errno = ETIME;
}
-
- // In case of errors.
- if (result == -1)
+ else if (transport->connection_handler ()->is_closed ())
{
- // Report that making the connection failed, don't print errno
- // because we touched the reactor and errno could be changed
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection failed. (%d) %p\n",
+ transport->id (), errno, ""));
+ }
+ result = -1;
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection not complete.\n",
+ transport->id ()));
+ }
+ transport->connection_handler ()->
+ reset_state (TAO_LF_Event::LFS_CONNECTION_WAIT);
+ result = 0;
+ }
+ }
+ else
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
"TAO (%P|%t) - Transport_Connector::"
"wait_for_connection_completion, "
- "transport [%d], wait for completion failed\n",
- transport->id()));
-
-
- // Set transport to zero, it is not usable, and the reference
- // count we added above was decremented by the base connector
- // handling the connection failure.
- transport = 0;
+ "going to wait for connection completion on transport"
+ "[%d]\n",
+ transport->id ()));
+ }
+ result = this->active_connect_strategy_->wait (transport, timeout);
- return false;
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], wait done result = %d\n",
+ transport->id (), result));
}
+ // There are three possibilities when wait() returns: (a)
+ // connection succeeded; (b) connection failed; (c) wait()
+ // failed because of some other error. It is easy to deal with
+ // (a) and (b). (c) is tricky since the connection is still
+ // pending and may get completed by some other thread. The
+ // following code deals with (c).
+
+ if (result == -1)
+ {
+ if (errno == ETIME)
+ {
+ if (TAO_debug_level > 2)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], Connection timed out.\n",
+ transport->id ()));
+ }
+ }
+ else
+ {
+ // The wait failed for some other reason.
+ // Report that making the connection failed, don't print errno
+ // because we touched the reactor and errno could be changed
+ if (TAO_debug_level > 2)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "transport [%d], wait for completion failed (%d) %p\n",
+ transport->id (), errno, ""));
+ }
+ TAO_Connection_Handler *con = transport->connection_handler ();
+ result = this->check_connection_closure (con);
+ }
+ }
}
+ if (result == -1)
+ {
+ // Set transport to zero, it is not usable, and the reference
+ // count we added above was decremented by the base connector
+ // handling the connection failure.
+ transport = 0;
+ return false;
+ }
// Connection not ready yet but we can use this transport, if
// we need a connected one we will block later to make sure
// it is connected
@@ -559,30 +598,32 @@ TAO_Connector::wait_for_connection_completion (
count));
for (unsigned int i = 0; i < count; i++)
ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("%d%s"),transport[i]->id(),
+ ACE_TEXT("%d%s"),transport[i]->id (),
(i < (count -1) ? ", " : "]\n")));
}
- // If we don't need to block for a transport just set the timeout to
- // be zero.
- ACE_Time_Value tmp_zero (ACE_Time_Value::zero);
- if (!r->blocked_connect ())
+ int result = -1;
+ if (r->blocked_connect ())
{
- timeout = &tmp_zero;
+ result = this->active_connect_strategy_->wait (mev, timeout);
+ the_winner = 0;
+ }
+ else
+ {
+ errno = ETIME;
}
-
- int result = this->active_connect_strategy_->wait (mev,timeout);
- the_winner = 0;
if (result != -1)
{
the_winner = mev->winner()->transport();
if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT("(%P|%t) Transport_Connector::")
- ACE_TEXT("wait_for_connection_completion, ")
- ACE_TEXT("transport [%d]\n"),
- the_winner->id ()));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) Transport_Connector::")
+ ACE_TEXT ("wait_for_connection_completion, ")
+ ACE_TEXT ("transport [%d]\n"),
+ the_winner->id ()));
+ }
}
else if (errno == ETIME)
{
@@ -616,10 +657,12 @@ TAO_Connector::wait_for_connection_completion (
// Report that making the connection failed, don't print errno
// because we touched the reactor and errno could be changed
if (TAO_debug_level > 2)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Transport_Connector::")
- ACE_TEXT ("wait_for_connection_completion, failed\n")
- ));
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Transport_Connector::")
+ ACE_TEXT ("wait_for_connection_completion, failed\n")
+ ));
+ }
return false;
}
@@ -630,11 +673,13 @@ TAO_Connector::wait_for_connection_completion (
if (r->blocked_connect () && !the_winner->is_connected ())
{
if (TAO_debug_level > 2)
- ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) - Transport_Connector::"
- "wait_for_connection_completion, "
- "no connected transport for a blocked connection, "
- "cancelling connections and reverting things \n"));
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO (%P|%t) - Transport_Connector::"
+ "wait_for_connection_completion, "
+ "no connected transport for a blocked connection, "
+ "cancelling connections and reverting things \n"));
+ }
// Forget the return value. We are busted anyway. Try our best
// here.
@@ -643,9 +688,9 @@ TAO_Connector::wait_for_connection_completion (
return false;
}
- // Connection may not ready for SYNC_NONE cases but we can use this
- // transport, if we need a connected one we will block later to make
- // sure it is connected
+ // Connection may not ready for SYNC_NONE and SYNC_DELAYED_BUFFERING cases
+ // but we can use this transport, if we need a connected one we will poll
+ // later to make sure it is connected
return true;
}
diff --git a/TAO/tao/Transport_Queueing_Strategies.cpp b/TAO/tao/Transport_Queueing_Strategies.cpp
index 4de33f760dd..fbf6595904a 100644
--- a/TAO/tao/Transport_Queueing_Strategies.cpp
+++ b/TAO/tao/Transport_Queueing_Strategies.cpp
@@ -90,7 +90,7 @@ namespace TAO
must_flush = false;
set_timer = false;
- TAO_Buffering_Constraint_Policy *buffering_constraint_policy = 0;
+ TAO::BufferingConstraint buffering_constraint;
ACE_TRY_NEW_ENV
{
@@ -99,18 +99,18 @@ namespace TAO
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- TAO::BufferingConstraintPolicy_var bcp =
+ TAO::BufferingConstraintPolicy_var bcpv =
TAO::BufferingConstraintPolicy::_narrow (bcp_policy.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- buffering_constraint_policy =
- dynamic_cast<TAO_Buffering_Constraint_Policy *> (bcp.in ());
-
- if (buffering_constraint_policy == 0)
+ TAO_Buffering_Constraint_Policy* bcp =
+ dynamic_cast<TAO_Buffering_Constraint_Policy *> (bcpv.in ());
+ if (bcp == 0)
{
return true;
}
+ bcp->get_buffering_constraint (buffering_constraint);
}
ACE_CATCHANY
{
@@ -118,8 +118,6 @@ namespace TAO
}
ACE_ENDTRY;
- TAO::BufferingConstraint buffering_constraint;
- buffering_constraint_policy->get_buffering_constraint (buffering_constraint);
if (buffering_constraint.mode == TAO::BUFFER_FLUSH)
{
diff --git a/TAO/tests/AMI_Buffering/client.cpp b/TAO/tests/AMI_Buffering/client.cpp
index eb5819492ca..2e05c13f86e 100644
--- a/TAO/tests/AMI_Buffering/client.cpp
+++ b/TAO/tests/AMI_Buffering/client.cpp
@@ -12,7 +12,7 @@ ACE_RCSID(AMI_Buffering, client, "$Id$")
const char *server_ior = "file://server.ior";
const char *admin_ior = "file://admin.ior";
-int iterations = 200;
+int iterations = 20;
int run_message_count_test = 0;
int run_timeout_test = 0;
@@ -20,12 +20,12 @@ int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;
const int PAYLOAD_LENGTH = 1024;
-const int BUFFERED_MESSAGES_COUNT = 50;
-const unsigned int TIMEOUT_MILLISECONDS = 25;
-const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH;
+const int BUFFERED_MESSAGES_COUNT = 10;
+const unsigned int TIMEOUT_MILLISECONDS = 50;
+const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH;
/// Allow a larger timeout to occur due to scheduler differences
-const unsigned int TIMEOUT_TOLERANCE = 20 * TIMEOUT_MILLISECONDS;
+const unsigned int TIMEOUT_TOLERANCE = 4 * TIMEOUT_MILLISECONDS;
/// Check that no more than 10% of the messages are not sent.
const double LIVENESS_TOLERANCE = 0.9;
@@ -393,6 +393,10 @@ run_liveness_test (CORBA::ORB_ptr orb,
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
// Once the system has sent enough messages we don't
// expect it to fall too far behind, i.e. at least 90% of the
// messages should be delivered....
@@ -491,6 +495,10 @@ run_message_count (CORBA::ORB_ptr orb,
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
CORBA::ULong iteration_count =
send_count - initial_receive_count;
if (receive_count != initial_receive_count)
@@ -605,6 +613,10 @@ run_timeout (CORBA::ORB_ptr orb,
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
if (receive_count != initial_receive_count)
{
@@ -720,8 +732,8 @@ run_timeout_reactive (CORBA::ORB_ptr orb,
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
- ACE_Time_Value sleep (0, 10000);
- orb->run (sleep ACE_ENV_ARG_PARAMETER);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
@@ -838,6 +850,10 @@ run_buffer_size (CORBA::ORB_ptr orb,
ami_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
CORBA::ULong payload_delta =
bytes_sent - initial_bytes_received;
if (bytes_received != initial_bytes_received)
diff --git a/TAO/tests/AMI_Buffering/run_buffer_size.pl b/TAO/tests/AMI_Buffering/run_buffer_size.pl
index 6f3f961b9d0..d8d013e7528 100755
--- a/TAO/tests/AMI_Buffering/run_buffer_size.pl
+++ b/TAO/tests/AMI_Buffering/run_buffer_size.pl
@@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/AMI_Buffering/run_message_count.pl b/TAO/tests/AMI_Buffering/run_message_count.pl
index 030af52a919..dcb2c43bea8 100755
--- a/TAO/tests/AMI_Buffering/run_message_count.pl
+++ b/TAO/tests/AMI_Buffering/run_message_count.pl
@@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/AMI_Buffering/run_test.pl b/TAO/tests/AMI_Buffering/run_test.pl
index 9bd93d690f4..4e682196753 100755
--- a/TAO/tests/AMI_Buffering/run_test.pl
+++ b/TAO/tests/AMI_Buffering/run_test.pl
@@ -40,7 +40,7 @@ foreach $test_type ("-c", "-t", "-b") {
exit 1;
}
- $client = $CL->SpawnWaitKill (300);
+ $client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/AMI_Buffering/run_timeout.pl b/TAO/tests/AMI_Buffering/run_timeout.pl
index 5e93a904afb..eb3580c12c5 100755
--- a/TAO/tests/AMI_Buffering/run_timeout.pl
+++ b/TAO/tests/AMI_Buffering/run_timeout.pl
@@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (600);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/AMI_Buffering/run_timeout_reactive.pl b/TAO/tests/AMI_Buffering/run_timeout_reactive.pl
index d5fd87fe65c..061a524f4a6 100755
--- a/TAO/tests/AMI_Buffering/run_timeout_reactive.pl
+++ b/TAO/tests/AMI_Buffering/run_timeout_reactive.pl
@@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Buffering/client.cpp b/TAO/tests/Oneway_Buffering/client.cpp
index a0ab26bed3e..0c4383016ef 100644
--- a/TAO/tests/Oneway_Buffering/client.cpp
+++ b/TAO/tests/Oneway_Buffering/client.cpp
@@ -13,7 +13,7 @@ ACE_RCSID(Oneway_Buffering, client, "$Id$")
const char *server_ior = "file://server.ior";
const char *admin_ior = "file://admin.ior";
-int iterations = 200;
+int iterations = 20;
int run_message_count_test = 0;
int run_timeout_test = 0;
@@ -21,9 +21,9 @@ int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;
const int PAYLOAD_LENGTH = 1024;
-const int BUFFERED_MESSAGES_COUNT = 50;
+const int BUFFERED_MESSAGES_COUNT = 10;
const unsigned int TIMEOUT_MILLISECONDS = 50;
-const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH;
+const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH;
/// Check that no more than 10% of the messages are not sent.
const double LIVENESS_TOLERANCE = 0.9;
@@ -316,7 +316,8 @@ sync_server (Test::Oneway_Buffering_ptr flusher
}
int
-run_liveness_test (Test::Oneway_Buffering_ptr oneway_buffering,
+run_liveness_test (CORBA::ORB_ptr orb,
+ Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_ptr flusher,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
@@ -349,6 +350,10 @@ run_liveness_test (Test::Oneway_Buffering_ptr oneway_buffering,
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
// Once the system has sent enough messages we don't
// expect it to fall too far behind, i.e. at least 90% of the
// messages should be delivered....
@@ -436,6 +441,10 @@ run_message_count (CORBA::ORB_ptr orb,
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
CORBA::ULong iteration_count =
send_count - initial_receive_count;
if (receive_count != initial_receive_count)
@@ -468,7 +477,8 @@ run_message_count (CORBA::ORB_ptr orb,
}
int liveness_test_failed =
- run_liveness_test (oneway_buffering,
+ run_liveness_test (orb,
+ oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
@@ -536,6 +546,10 @@ run_timeout (CORBA::ORB_ptr orb,
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
if (receive_count != initial_receive_count)
{
@@ -568,7 +582,7 @@ run_timeout (CORBA::ORB_ptr orb,
}
int liveness_test_failed =
- run_liveness_test (oneway_buffering,
+ run_liveness_test (orb, oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
@@ -639,8 +653,9 @@ run_timeout_reactive (CORBA::ORB_ptr orb,
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
- ACE_Time_Value sleep (0, 10000);
- orb->run (sleep ACE_ENV_ARG_PARAMETER);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+
ACE_CHECK_RETURN (-1);
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
@@ -675,7 +690,7 @@ run_timeout_reactive (CORBA::ORB_ptr orb,
}
int liveness_test_failed =
- run_liveness_test (oneway_buffering,
+ run_liveness_test (orb, oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
@@ -743,6 +758,10 @@ run_buffer_size (CORBA::ORB_ptr orb,
oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
+ ACE_Time_Value tv (0, 10 * 1000);
+ orb->run (tv ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK_RETURN (-1);
+
CORBA::ULong payload_delta =
bytes_sent - initial_bytes_received;
if (bytes_received != initial_bytes_received)
@@ -780,7 +799,7 @@ run_buffer_size (CORBA::ORB_ptr orb,
}
int liveness_test_failed =
- run_liveness_test (oneway_buffering,
+ run_liveness_test (orb, oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
diff --git a/TAO/tests/Oneway_Buffering/run_buffer_size.pl b/TAO/tests/Oneway_Buffering/run_buffer_size.pl
index 3e8db4fdc66..4f3bd5c5037 100755
--- a/TAO/tests/Oneway_Buffering/run_buffer_size.pl
+++ b/TAO/tests/Oneway_Buffering/run_buffer_size.pl
@@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Buffering/run_message_count.pl b/TAO/tests/Oneway_Buffering/run_message_count.pl
index 949a61e5c62..35e68a43b1a 100755
--- a/TAO/tests/Oneway_Buffering/run_message_count.pl
+++ b/TAO/tests/Oneway_Buffering/run_message_count.pl
@@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Buffering/run_test.pl b/TAO/tests/Oneway_Buffering/run_test.pl
index b58853ec49c..39c47eb20e3 100755
--- a/TAO/tests/Oneway_Buffering/run_test.pl
+++ b/TAO/tests/Oneway_Buffering/run_test.pl
@@ -41,7 +41,7 @@ foreach $test_type ("-c", "-t", "-b", "-r") {
exit 1;
}
- $client = $CL->SpawnWaitKill (300);
+ $client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Buffering/run_timeout.pl b/TAO/tests/Oneway_Buffering/run_timeout.pl
index da28486e145..6518696565e 100755
--- a/TAO/tests/Oneway_Buffering/run_timeout.pl
+++ b/TAO/tests/Oneway_Buffering/run_timeout.pl
@@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl
index d5fd87fe65c..061a524f4a6 100755
--- a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl
+++ b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl
@@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) {
exit 1;
}
-$client = $CL->SpawnWaitKill (300);
+$client = $CL->SpawnWaitKill (30);
if ($client != 0) {
print STDERR "ERROR: client returned $client\n";
diff --git a/TAO/tests/Oneway_Timeouts/Test.idl b/TAO/tests/Oneway_Timeouts/Test.idl
new file mode 100644
index 00000000000..d076114e7c6
--- /dev/null
+++ b/TAO/tests/Oneway_Timeouts/Test.idl
@@ -0,0 +1,4 @@
+interface Tester {
+ oneway void test(in long id);
+ long test2(in long id);
+};
diff --git a/TAO/tests/Oneway_Timeouts/client.cpp b/TAO/tests/Oneway_Timeouts/client.cpp
new file mode 100644
index 00000000000..e928a1e232a
--- /dev/null
+++ b/TAO/tests/Oneway_Timeouts/client.cpp
@@ -0,0 +1,488 @@
+#include "TestS.h"
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/Messaging/Messaging.h"
+#include "tao/AnyTypeCode/TAOA.h"
+#include "tao/AnyTypeCode/Any.h"
+#include "tao/IIOP_Connector.h"
+
+#include "ace/streams.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Arg_Shifter.h"
+
+using namespace CORBA;
+using namespace PortableServer;
+
+namespace
+{
+ const char *non_existent_ior = "corbaloc:iiop:1.2@ociweb.com:12345/test";
+ const int TIME_THRESHOLD = 50; //ms
+
+ int request_timeout = 0;
+ Messaging::SyncScope sync_scope;
+ bool use_buf_constraints = false;
+ bool use_sync_scope = false;
+ int bc_mode = TAO::BUFFER_FLUSH;
+ int bc_count = 0;
+ int bc_bytes = 0;
+ int bc_timeout = 0;
+ int num_requests = 10;
+ int request_interval = 50;
+ int connect_timeout = 0;
+ int run_orb_delay = 0;
+ int run_orb_time = 0;
+ bool force_timeout = false;
+ // This will force a blocking connection before starting the test
+ // by sending the num_requests as a twoway.
+ bool force_connect = false;
+ bool use_sleep = false;
+ unsigned int max_request_time = 0;
+ bool use_twoway = false;
+ bool retry_transients = false;
+ bool retry_timeouts = false;
+
+ void print_usage ()
+ {
+ cout << "client [-request_timeout ms=0] [-connect_timeout ms=0] "
+ "[-request_interval ms=100]\n\t[-run_orb_delay ms=0] "
+ "[-run_orb_time ms=0] [-max_request_time ms=0]\n"
+ "\t[-num_requests n=10] [-use_twoway] [-retry_transients] "
+ "[-retry_timeouts]\n"
+ "\t[-use_sleep] [-force_timeout] [-force_connect] [-buffer_count n=0]\n"
+ "\t[-buffer_bytes n=0] [-buffer_timeout ms=0] [-sync delayed|eager|none]"
+ << endl;
+ }
+
+ bool parse_command_line (int ac, char *av[])
+ {
+ ACE_Arg_Shifter args (ac, av);
+ args.consume_arg ();
+
+ while (args.is_anything_left ())
+ {
+ if (args.cur_arg_strncasecmp ("-request_timeout") == 0)
+ {
+ args.consume_arg ();
+ request_timeout = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-connect_timeout") == 0)
+ {
+ args.consume_arg ();
+ connect_timeout = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-request_interval") == 0)
+ {
+ args.consume_arg ();
+ request_interval = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-run_orb_delay") == 0)
+ {
+ args.consume_arg ();
+ run_orb_delay = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-run_orb_time") == 0)
+ {
+ args.consume_arg ();
+ run_orb_time = ACE_OS::atoi(args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-max_request_time") == 0)
+ {
+ args.consume_arg ();
+ max_request_time = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-num_requests") == 0)
+ {
+ args.consume_arg ();
+ num_requests = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-use_twoway") == 0)
+ {
+ use_twoway = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-retry_transients") == 0)
+ {
+ retry_transients = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-retry_timeouts") == 0)
+ {
+ retry_timeouts = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-use_sleep") == 0)
+ {
+ use_sleep = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-force_timeout") == 0)
+ {
+ force_timeout = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-force_connect") == 0)
+ {
+ force_connect = true;
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-buffer_count") == 0)
+ {
+ args.consume_arg ();
+ use_buf_constraints = true;
+ bc_count = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-buffer_bytes") == 0)
+ {
+ args.consume_arg ();
+ use_buf_constraints = true;
+ bc_bytes = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-buffer_timeout") == 0)
+ {
+ args.consume_arg ();
+ use_buf_constraints = true;
+ bc_timeout = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-sync") == 0)
+ {
+ args.consume_arg ();
+ if (args.cur_arg_strncasecmp ("delayed") == 0)
+ {
+ sync_scope = TAO::SYNC_DELAYED_BUFFERING;
+ use_sync_scope = true;
+ }
+ else if (args.cur_arg_strncasecmp ("eager") == 0)
+ {
+ sync_scope = TAO::SYNC_EAGER_BUFFERING;
+ use_sync_scope = true;
+ }
+ else if (args.cur_arg_strncasecmp ("none") == 0)
+ {
+ sync_scope = Messaging::SYNC_NONE;
+ use_sync_scope = true;
+ }
+ else
+ {
+ print_usage ();
+ return false;
+ }
+
+ args.consume_arg ();
+ }
+ else
+ {
+ cerr << "Error: Unknown argument \""
+ << args.get_current () << "\"" << endl;
+ print_usage ();
+ return false;
+ }
+
+ }
+
+ return true;
+ }
+
+
+ POA_ptr create_poa (ORB_ptr orb)
+ {
+ POA_var poa;
+ PolicyList pols;
+ Object_var obj = orb->resolve_initial_references ("RootPOA");
+ POA_var root = POA::_narrow (obj.in ());
+ ACE_ASSERT (! is_nil (root.in ()));
+ POAManager_var man = root->the_POAManager ();
+ poa = root->create_POA ("X", man, pols);
+ return poa._retn ();
+ }
+
+
+ Tester_ptr set_request_timeout (Tester_ptr tst, ORB_ptr orb)
+ {
+ if (request_timeout <= 0)
+ {
+ return Tester::_duplicate (tst);
+ }
+
+ Any a;
+ a <<= static_cast<TimeBase::TimeT> (request_timeout * 10000);
+ PolicyList pols (1);
+ pols.length (1);
+ pols[0] =
+ orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, a);
+ Object_var obj = tst->_set_policy_overrides (pols, ADD_OVERRIDE);
+ pols[0]->destroy ();
+ return Tester::_unchecked_narrow (obj.in ());
+ }
+
+
+ void set_connect_timeout (ORB_ptr orb)
+ {
+ if (connect_timeout <= 0)
+ return;
+ Object_var obj = orb->resolve_initial_references ("PolicyCurrent");
+ PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ());
+ Any a;
+ a <<= static_cast<TimeBase::TimeT> (connect_timeout * 10000);
+ PolicyList pols (1);
+ pols.length (1);
+ pols[0] = orb->create_policy (TAO::CONNECTION_TIMEOUT_POLICY_TYPE, a);
+ policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
+ pols[0]->destroy ();
+ }
+
+
+ void set_buffering (ORB_ptr orb)
+ {
+ Object_var obj = orb->resolve_initial_references ("PolicyCurrent");
+ PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ());
+ PolicyList pols (1);
+ pols.length (1);
+
+ if (use_sync_scope)
+ {
+ Any a;
+ a <<= sync_scope;
+ pols[0] = orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, a);
+ policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
+ pols[0]->destroy ();
+ }
+
+
+ if (use_buf_constraints)
+ {
+ TAO::BufferingConstraint bc;
+ if (bc_count > 0)
+ {
+ bc_mode |= TAO::BUFFER_MESSAGE_COUNT;
+ }
+
+ if (bc_bytes > 0)
+ {
+ bc_mode |= TAO::BUFFER_MESSAGE_BYTES;
+ }
+
+ if (bc_timeout > 0)
+ {
+ bc_mode |= TAO::BUFFER_TIMEOUT;
+ }
+
+ bc.mode = bc_mode;
+ bc.message_count = bc_count;
+ bc.message_bytes = bc_bytes;
+ bc.timeout = static_cast<TimeBase::TimeT> (bc_timeout * 10000);
+ Any a;
+ a <<= bc;
+ pols[0] =
+ orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, a);
+ policy_current->set_policy_overrides (pols, ADD_OVERRIDE);
+ pols[0]->destroy ();
+ }
+
+ }
+
+}
+
+
+int main (int ac, char *av[])
+{
+
+ ACE_Time_Value before = ACE_High_Res_Timer::gettimeofday_hr ();
+
+ int num_requests_sent = 0;
+
+ try
+ {
+ ORB_var orb = ORB_init (ac, av);
+
+ if (!parse_command_line (ac, av))
+ {
+ return 1;
+ }
+
+
+ set_connect_timeout (orb.in ());
+ set_buffering (orb.in ());
+
+ ACE_CString ior ("file://server.ior");
+ if (force_timeout)
+ {
+ ior = non_existent_ior;
+ }
+
+ Object_var obj = orb->string_to_object (ior.c_str ());
+
+ ACE_ASSERT (! is_nil (obj.in ()));
+
+ Tester_var tmp_tester = Tester::_unchecked_narrow (obj.in ());
+
+ Tester_var tester = set_request_timeout (tmp_tester.in (), orb.in ());
+
+ ACE_ASSERT (! is_nil (tester.in ()));
+
+ Long i = 0;
+
+ if (force_connect)
+ {
+ tester->test2 (-2);
+ cout << "Connected..." << endl;
+ }
+
+
+ for (; i < num_requests; ++i)
+ {
+ before = ACE_High_Res_Timer::gettimeofday_hr ();
+ try
+ {
+ if (use_twoway)
+ {
+ tester->test2 (i);
+ }
+ else
+ {
+ tester->test (i);
+ }
+
+ }
+ catch (CORBA::TRANSIENT&)
+ {
+ cerr << "Transient exception during test () invocation " << i << endl;
+ if (! retry_transients)
+ {
+ throw;
+ }
+
+ }
+ catch (CORBA::TIMEOUT&)
+ {
+ cerr << "Timeout exception during test () invocation " << i << endl;
+ if (! retry_timeouts)
+ {
+ throw;
+ }
+
+ }
+
+ ++num_requests_sent;
+
+ ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr ();
+ if (max_request_time > 0 &&
+ (after - before).msec () > max_request_time)
+ {
+ cerr << "Error : test () took " << (after - before).msec ()
+ << endl;
+ return 1;
+ }
+
+
+ cout << 'c' << i << endl;
+ if (request_interval > 0)
+ {
+ ACE_Time_Value tv (0, request_interval * 1000);
+ ACE_Time_Value done = tv +
+ ACE_High_Res_Timer::gettimeofday_hr ();
+ if (! use_sleep)
+ {
+ orb->run (tv);
+ }
+ else
+ {
+ ACE_OS::sleep (tv);
+ }
+
+ while (ACE_High_Res_Timer::gettimeofday_hr () < done)
+ {
+ ACE_OS::sleep (0);
+ }
+ }
+ }
+
+
+ if (run_orb_delay > 0)
+ {
+ ACE_Time_Value tv (0, run_orb_delay * 1000);
+ ACE_OS::sleep (tv);
+ }
+
+
+ if (run_orb_time > 0)
+ {
+ ACE_Time_Value tv (0, run_orb_time * 1000);
+ orb->run (tv);
+ }
+
+
+ // Let the server know we're finished.
+ tester->test2 (-1);
+
+ orb->shutdown (1);
+
+ orb->destroy ();
+
+ if (force_timeout)
+ {
+ cerr << "Error: Connection did not timeout." << endl;
+ return 1;
+ }
+
+
+ return 0;
+
+ }
+ catch (CORBA::TIMEOUT &ex)
+ {
+ if (force_timeout)
+ {
+ ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr ();
+ long ms = (after - before).msec ();
+ if ( (use_twoway || !use_sync_scope)
+ && request_timeout > 0
+ && request_timeout < connect_timeout)
+ {
+ connect_timeout = request_timeout;
+ }
+ else if (use_sync_scope && !use_sleep)
+ {
+ if (ms > TIME_THRESHOLD)
+ {
+ cerr << "Error: Buffered request took " << ms << endl;
+ return 1;
+ }
+
+ ms = num_requests_sent * request_interval;
+ }
+
+ if (std::abs (ms - connect_timeout) > TIME_THRESHOLD)
+ {
+ cerr << "Error: Timeout expected in " << connect_timeout
+ << "ms, but took " << ms << "ms" << endl;
+ return 1;
+ }
+
+ return 0;
+ }
+ else
+ {
+ cerr << "Error: Unexpected timeout\n" << ex << endl;
+ }
+
+ }
+ catch (Exception &ex)
+ {
+ cerr << "client: " << ex << endl;
+ cerr << "\nLast operation took "
+ << (ACE_High_Res_Timer::gettimeofday_hr () - before).msec ()
+ << "ms"
+ << endl;
+ }
+
+ return 1;
+}
diff --git a/TAO/tests/Oneway_Timeouts/run_test.pl b/TAO/tests/Oneway_Timeouts/run_test.pl
new file mode 100755
index 00000000000..2330e7906be
--- /dev/null
+++ b/TAO/tests/Oneway_Timeouts/run_test.pl
@@ -0,0 +1,392 @@
+eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}'
+ & eval 'exec perl -S $0 $argv:q'
+ if 0;
+
+# $Id: run_test.pl,v 1.7 2005/01/27 16:38:29 elliott_c Exp $
+# -*- perl -*-
+
+###############################################################################
+my $ACE_ROOT = $ENV{ACE_ROOT};
+
+if (!defined $ACE_ROOT) {
+ print "Error: ACE_ROOT not defined.\n";
+ return 1;
+}
+
+use lib "$ENV{ACE_ROOT}/bin";
+use PerlACE::Run_Test;
+use File::Copy;
+
+use strict;
+
+my $srv_ior = PerlACE::LocalFile ("server.ior");
+my $CLI = new PerlACE::Process ("client");
+my $SRV = new PerlACE::Process ("server");
+my $SRV_PORT = PerlACE::random_port();
+my $SRV_ARGS = "-orbobjrefstyle url -orbendpoint iiop://:$SRV_PORT";
+
+sub test_timeouts
+{
+ print "test_timeouts 1 testing...\n";
+ $CLI->Arguments("-force_timeout -connect_timeout 200");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 1 passed...\n";
+ print "test_timeouts 2 testing...\n";
+ # request timeout should override connect timeout
+ $CLI->Arguments("-force_timeout -request_timeout 100 -connect_timeout 200");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 2 passed...\n";
+ print "test_timeouts 3 testing...\n";
+ $CLI->Arguments("-use_twoway -force_timeout -connect_timeout 200");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 3 passed...\n";
+ print "test_timeouts 4 testing...\n";
+ # request timeout should override connect timeout
+ $CLI->Arguments("-use_twoway -force_timeout -request_timeout 200 -connect_timeout 1000");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 4 passed...\n";
+ print "test_timeouts 5 testing...\n";
+ # request_timeout ignored for other sync_scopes
+ $CLI->Arguments("-sync none -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 5 passed...\n";
+ print "test_timeouts 6 testing...\n";
+ $CLI->Arguments("-sync eager -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ print "test_timeouts 6 passed...\n";
+ $CLI->Arguments("-sync delayed -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30");
+ my $ret = $CLI->SpawnWaitKill(5);
+ if ($ret != 0) {
+ return $ret;
+ }
+ return $ret;
+}
+
+sub test_buffering
+{
+ print "test_buffering 1 testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync none -max_request_time 30");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 1 passed...\n";
+ print "test_buffering 2 testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync delayed -max_request_time 30");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 2 passed...\n";
+ print "test_buffering 3 testing...\n";
+ # Using sleep() instead of orb->run() for the interval
+ # should cause all requests to be sent at once.
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync none -max_request_time 30 -use_sleep -run_orb_time 500");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 3 passed...\n";
+ print "test_buffering 4 testing...\n";
+ # Even delayed buffering will work this way, because the
+ # connection won't be established until the orb is run.
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync delayed -max_request_time 30 -use_sleep -run_orb_time 500");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 4 passed...\n";
+ print "test_buffering 5 testing...\n";
+ # However, if we connect first, then delayed buffering will
+ # cause the data to be sent right away
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync delayed -max_request_time 30 -use_sleep -run_orb_time 500 -force_connect");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 5 passed...\n";
+ print "test_buffering 6 testing...\n";
+ # Forcing the connection won't help sync_none, because it always buffers
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync none -max_request_time 30 -use_sleep -run_orb_time 500 -force_connect");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffering 6 passed...\n";
+ return 0;
+}
+
+# Set a buffer count trigger and a request timeout so that a
+# predictable number will be discarded.
+sub test_buffer_count_timeout
+{
+ print "test_buffer_count_timeout testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 2");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync none -buffer_count 5 -max_request_time 30 -request_timeout 10");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffer_count_timeout passed...\n";
+ return 0;
+}
+
+sub test_buffer_bytes_timeout
+{
+ print "test_buffer_bytes_timeout testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 3");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ $CLI->Arguments("-sync none -buffer_bytes 200 -max_request_time 30 -request_timeout 10");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffer_bytes_timeout passed...\n";
+ return 0;
+}
+
+sub test_buffer_timeout
+{
+ print "test_buffer_timeout 1 testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50 -first_min 1000");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ # Must use run_orb_time so that the timer will fire, and to prevent sending the
+ # test_done twoway request which would flush the queue.
+ $CLI->Arguments("-sync none -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffer_timeout 1 passed...\n";
+ print "test_buffer_timeout 2 testing...\n";
+ # delayed buffering should behave as above, because it will start out buffering
+ # due to the connection not being established.
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50 -first_min 1000");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ # Must use run_orb_time so that the timer will fire, and to prevent sending the
+ # test_done twoway request which would flush the queue.
+ $CLI->Arguments("-sync delayed -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffer_timeout 2 passed...\n";
+ print "test_buffer_timeout 3 testing...\n";
+ # delayed buffering will ignore constraints if the connection is forced
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 450 -first_max 50");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ # Must use run_orb_time so that the timer will fire, and to prevent sending the
+ # test_done twoway request which would flush the queue.
+ $CLI->Arguments("-sync delayed -force_connect -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_buffer_timeout 3 passed...\n";
+ return 0;
+}
+
+# test sending one request with buffering timeout constraint.
+sub test_one_request
+{
+ print "test_one_request testing...\n";
+ unlink $srv_ior;
+ $SRV->Arguments("$SRV_ARGS -expected 1 -first_min 1000");
+ if ($SRV->Spawn() != 0) {
+ return 1;
+ }
+ if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) {
+ print STDERR "Error: IOR not found.\n";
+ return 1;
+ }
+ # Must use run_orb_time so that the timer will fire, and to prevent sending the
+ # test_done twoway request which would flush the queue.
+ $CLI->Arguments("-sync none -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500 -num_requests 1");
+ if ($CLI->SpawnWaitKill(5) != 0) {
+ print STDERR "Error: Client failed.\n";
+ return 1;
+ }
+ if ($SRV->WaitKill(5) != 0) {
+ print STDERR "Error: Server failed.\n";
+ return 1;
+ }
+ print "test_one_request passed...\n";
+ return 0;
+}
+
+sub run_test
+{
+ my $ret = shift;
+ if ($ret != 0) {
+ exit $ret;
+ }
+}
+
+unlink $srv_ior;
+
+run_test(test_timeouts());
+run_test(test_buffering());
+run_test(test_buffer_count_timeout());
+run_test(test_buffer_bytes_timeout());
+run_test(test_buffer_timeout());
+run_test(test_one_request());
+
+# Regardless of the return value, ensure that the processes
+# are terminated before exiting
+$CLI->Kill();
+$SRV->Kill();
+
+unlink $srv_ior;
+exit 0;
diff --git a/TAO/tests/Oneway_Timeouts/server.cpp b/TAO/tests/Oneway_Timeouts/server.cpp
new file mode 100644
index 00000000000..ac9e6dadee2
--- /dev/null
+++ b/TAO/tests/Oneway_Timeouts/server.cpp
@@ -0,0 +1,318 @@
+#include "TestS.h"
+
+#include "tao/Strategies/advanced_resource.h"
+#include "tao/Messaging/Messaging.h"
+#include "tao/AnyTypeCode/TAOA.h"
+#include "tao/AnyTypeCode/Any.h"
+
+#include "ace/streams.h"
+#include "ace/High_Res_Timer.h"
+#include "ace/Reactor.h"
+
+const int TIME_THRESHOLD = 50; //ms
+
+int activate_delay = 0000;
+int run_delay = 00;
+int request_delay = 00;
+int abort_after = 0;
+int num_expected = 0;
+int elapsed_max = 0;
+int elapsed_min = 0;
+int first_min = 0;
+int first_max = 0;
+
+class Tester_i
+ : public virtual POA_Tester
+ , public virtual ACE_Event_Handler
+{
+public:
+ Tester_i (CORBA::ORB_ptr orb)
+ : orb_ (orb)
+ , id1_ (0)
+ , id2_ (0)
+ , count_ (0)
+ , failed_ (false)
+ {
+ this->start_ = ACE_High_Res_Timer::gettimeofday_hr ();
+ }
+
+ virtual ~Tester_i ()
+ {
+ }
+
+ virtual void test (CORBA::Long id)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+ {
+ testShared (id);
+ }
+
+ virtual CORBA::Long test2 (CORBA::Long id)
+ ACE_THROW_SPEC ((::CORBA::SystemException))
+ {
+ if (id == -2)
+ {
+ // Special id used to force a connect. Ignore.
+ this->start_ = ACE_High_Res_Timer::gettimeofday_hr ();
+ return id;
+ }
+ return testShared (id);
+ }
+
+ int testShared (CORBA::Long id)
+ {
+ ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
+ if (id == -1)
+ {
+ // Signals the end of a test run
+ if (num_expected > 0 && count_ != num_expected)
+ {
+ cerr << "Error: Expected " << num_expected
+ << ", but received " << count_ << endl;
+ this->failed_ = true;
+ }
+ long ms = (last_ - first_).msec ();
+ if (elapsed_max > 0 && ms > elapsed_max)
+ {
+ cerr << "Error: Expected < " << elapsed_max
+ << "ms, but was " << ms << "ms" << endl;
+ this->failed_ = true;
+ }
+ if (elapsed_min > 0 && ms < elapsed_min)
+ {
+ cerr << "Error: Expected > " << elapsed_min
+ << "ms, but was " << ms << "ms" << endl;
+ this->failed_ = true;
+ }
+ ms = (first_ - start_).msec ();
+ if (first_max > 0 && ms > first_max)
+ {
+ cerr << "Error: Expected first < " << first_max
+ << "ms, but was " << ms << "ms" << endl;
+ this->failed_ = true;
+ }
+ if (first_min > 0 && ms < first_min)
+ {
+ cerr << "Error: Expected first > " << first_min
+ << "ms, but was " << ms << "ms" << endl;
+ this->failed_ = true;
+ }
+ ACE_Time_Value timeout (0, 50 * 1000);
+ this->orb_->orb_core ()->reactor ()->schedule_timer (this, 0, timeout);
+ return id;
+ }
+ this->last_ = now;
+ if (id == 0)
+ {
+ this->first_ = now;
+ }
+ ++this->count_;
+ cout << 's' << id << endl;
+ if (abort_after > 0 && this->count_ >= abort_after)
+ {
+ cout << "\nAborting..." << endl;
+ ACE_OS::abort ();
+ }
+ if (request_delay > 0 && id == 0)
+ {
+ ACE_OS::sleep (ACE_Time_Value (0, 1000 * request_delay));
+ }
+ return id;
+ }
+
+ int handle_timeout (const ACE_Time_Value &, const void *)
+ {
+ this->orb_->shutdown (0);
+ return 0;
+ }
+
+ bool failed () const {
+ return this->failed_;
+ }
+
+private:
+ CORBA::ORB_ptr orb_;
+ CORBA::Long id1_;
+ CORBA::Long id2_;
+ int count_;
+ bool failed_;
+ ACE_Time_Value start_;
+ ACE_Time_Value first_;
+ ACE_Time_Value last_;
+};
+
+#include "tao/Messaging/Messaging.h"
+#include "tao/Strategies/advanced_resource.h"
+
+#include "ace/streams.h"
+#include "ace/Log_Msg.h"
+#include "ace/Arg_Shifter.h"
+
+using namespace CORBA;
+using namespace PortableServer;
+
+namespace {
+
+ void print_usage ()
+ {
+ cout << "server [-activate_delay ms] [-run_delay ms] [-request_delay ms] "
+ "[-abort_after n]\n"
+ "\t[-expected n=0] [-elapsed_max ms=0] [-elapsed_min ms=0] "
+ "[-first_min ms=0]\n"
+ "\t[-first_max ms=0]\n"
+ "\tactivate_delay Millisecond delay before POAManager::activate.\n"
+ "\trun_delay Millisecond delay before ORB::run ().\n"
+ "\trequest_delay Millisecond delay within each servant request.\n"
+ "\tabort_after abort () after N requests.\n" << endl;
+ }
+
+ bool parse_command_line (int ac, char *av[])
+ {
+ ACE_Arg_Shifter args (ac, av);
+ args.consume_arg ();
+ while (args.is_anything_left ())
+ {
+ if (args.cur_arg_strncasecmp ("-activate_delay") == 0)
+ {
+ args.consume_arg ();
+ activate_delay = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-run_delay") == 0)
+ {
+ args.consume_arg ();
+ run_delay = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-request_delay") == 0)
+ {
+ args.consume_arg ();
+ request_delay = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-expected") == 0)
+ {
+ args.consume_arg ();
+ num_expected = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-elapsed_max") == 0)
+ {
+ args.consume_arg ();
+ elapsed_max = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-elapsed_min") == 0)
+ {
+ args.consume_arg ();
+ elapsed_min = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-first_min") == 0)
+ {
+ args.consume_arg ();
+ first_min = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-first_max") == 0)
+ {
+ args.consume_arg ();
+ first_max = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else if (args.cur_arg_strncasecmp ("-abort_after") == 0)
+ {
+ args.consume_arg ();
+ abort_after = ACE_OS::atoi (args.get_current ());
+ args.consume_arg ();
+ }
+ else
+ {
+ cerr << "Error: Unknown argument \""
+ << args.get_current () << "\"" << endl;
+ print_usage ();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void WriteIOR (const char *ior)
+ {
+ ofstream out ("server.ior");
+ out << ior;
+ }
+
+ POA_ptr create_poa (ORB_ptr orb)
+ {
+ PolicyList pols (2);
+ Object_var obj = orb->resolve_initial_references ("RootPOA");
+ POA_var root = POA::_narrow (obj.in ());
+ ACE_ASSERT (! is_nil (root.in ()));
+ pols.length (2);
+ pols[0] = root->create_id_assignment_policy (PortableServer::USER_ID);
+ pols[1] = root->create_lifespan_policy (PortableServer::PERSISTENT);
+ POAManager_var man = root->the_POAManager ();
+ POA_var poa = root->create_POA ("X", man.in (), pols);
+ return poa._retn ();
+ }
+
+}
+
+int main (int ac, char *av[])
+{
+ try
+ {
+ ORB_var orb = ORB_init (ac, av);
+
+ if (!parse_command_line (ac, av))
+ {
+ return 1;
+ }
+
+ POA_var poa = create_poa (orb.in ());
+ ACE_ASSERT (! is_nil (poa.in ()));
+
+ Tester_i svt (orb.in ());
+
+ ObjectId_var id = string_to_ObjectId ("tester");
+
+ poa->activate_object_with_id (id.in (), &svt);
+ Object_var obj = poa->id_to_reference (id.in ());
+ String_var ior = orb->object_to_string (obj.in ());
+ WriteIOR (ior.in ());
+
+ cout << "Servants registered and activated." << endl;
+
+ if (activate_delay > 0)
+ {
+ ACE_OS::sleep (ACE_Time_Value (0, activate_delay * 1000));
+ }
+ POAManager_var man = poa->the_POAManager ();
+ man->activate ();
+
+ cout << "POAManager activated." << endl;
+
+ if (run_delay > 0)
+ {
+ ACE_OS::sleep (ACE_Time_Value (0, run_delay * 1000));
+ }
+ cout << "Running orb..." << endl;
+
+ orb->run ();
+
+ if (svt.failed ())
+ {
+ return 1;
+ }
+
+ return 0;
+
+ }
+ catch (CORBA::Exception &ex)
+ {
+ ex._tao_print_exception ("server:");
+ }
+
+ return 1;
+
+}
diff --git a/TAO/tests/Oneway_Timeouts/test.mpc b/TAO/tests/Oneway_Timeouts/test.mpc
new file mode 100644
index 00000000000..bc999788463
--- /dev/null
+++ b/TAO/tests/Oneway_Timeouts/test.mpc
@@ -0,0 +1,26 @@
+project (*client) : taoexe, messaging, portableserver, strategies {
+ exename = client
+ after = *server
+ source_files {
+ client.cpp
+ TestC.cpp
+ }
+ header_files {
+ TestC.h
+ }
+ idl_files {
+ }
+}
+
+project (*server) : taoexe, messaging, portableserver, strategies {
+ exename = server
+ source_files {
+ server.cpp
+ TestS.cpp
+ TestC.cpp
+ }
+ header_files {
+ TestS.h
+ TestC.h
+ }
+}
diff --git a/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp b/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp
index 077d93b4596..2e2586f85e5 100644
--- a/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp
+++ b/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp
@@ -33,8 +33,7 @@ create_new_message (void)
ACE_Message_Block mb (block_size);
mb.wr_ptr (block_size);
- return new TAO_Asynch_Queued_Message (&mb, TAO_ORB_Core_instance (),
- 0, 1);
+ return new TAO_Asynch_Queued_Message (&mb, TAO_ORB_Core_instance (), 0, 0, 1);
}
/// Add a new message at the tail of the queue.
diff --git a/TAO/tests/Timed_Buffered_Oneways/client.cpp b/TAO/tests/Timed_Buffered_Oneways/client.cpp
index 9878f57d8b8..6593941db8b 100644
--- a/TAO/tests/Timed_Buffered_Oneways/client.cpp
+++ b/TAO/tests/Timed_Buffered_Oneways/client.cpp
@@ -117,123 +117,68 @@ parse_args (int argc, char **argv)
return 0;
}
-void
-setup_timeouts (CORBA::ORB_ptr orb
- ACE_ENV_ARG_DECL)
+test_ptr
+setup_policies (CORBA::ORB_ptr orb, test_ptr object ACE_ENV_ARG_DECL)
{
- // Escape value.
- if (timeout == -1)
- return;
-
- // Obtain PolicyCurrent.
- CORBA::Object_var object = orb->resolve_initial_references ("PolicyCurrent"
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- // Narrow down to correct type.
- CORBA::PolicyCurrent_var policy_current =
- CORBA::PolicyCurrent::_narrow (object.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- TimeBase::TimeT rt_timeout = 10000 * timeout;
-
- CORBA::Any rt_timeout_any;
- rt_timeout_any <<= rt_timeout;
-
- CORBA::PolicyList rt_timeout_policy_list (1);
- rt_timeout_policy_list.length (1);
-
- rt_timeout_policy_list[0] =
- orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
- rt_timeout_any
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
- policy_current->set_policy_overrides (rt_timeout_policy_list,
- CORBA::ADD_OVERRIDE
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- rt_timeout_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
-}
-
-void
-setup_buffering_constraints (CORBA::ORB_ptr orb
- ACE_ENV_ARG_DECL)
-{
- // Obtain PolicyCurrent.
- CORBA::Object_var object = orb->resolve_initial_references ("PolicyCurrent"
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- // Narrow down to correct type.
- CORBA::PolicyCurrent_var policy_current =
- CORBA::PolicyCurrent::_narrow (object.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
+ test_var object_with_policy;
+ CORBA::PolicyList policy_list (1);
+ if (timeout == -1)
+ {
+ object_with_policy = test::_duplicate (object);
+ }
+ else
+ {
+ policy_list.length (1);
+ TimeBase::TimeT rt_timeout = 10000 * timeout;
+ CORBA::Any rt_timeout_any;
+ rt_timeout_any <<= rt_timeout;
+ policy_list[0] =
+ orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
+ rt_timeout_any
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ CORBA::Object_var object_temp =
+ object->_set_policy_overrides (policy_list,
+ CORBA::ADD_OVERRIDE
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ object_with_policy = test::_narrow (object_temp ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+
+ policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_CHECK;
+ }
- // Setup the sync scope policy, i.e., the ORB will buffer oneways.
Messaging::SyncScope sync =
eager_buffering ? TAO::SYNC_EAGER_BUFFERING : TAO::SYNC_DELAYED_BUFFERING;
- // Setup the sync scope any.
CORBA::Any sync_any;
sync_any <<= sync;
- // Setup the sync scope policy list.
- CORBA::PolicyList sync_policy_list (1);
- sync_policy_list.length (1);
-
- // Setup the sync scope policy.
- sync_policy_list[0] =
+ policy_list.length (1);
+ policy_list[0] =
orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
sync_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- // Setup the sync scope.
- policy_current->set_policy_overrides (sync_policy_list,
- CORBA::ADD_OVERRIDE
- ACE_ENV_ARG_PARAMETER);
+ CORBA::Object_var object_temp =
+ object_with_policy->_set_policy_overrides (policy_list,
+ CORBA::ADD_OVERRIDE
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- // We are now done with this policy.
- sync_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
+ test_var object_with_two_policies = test::_narrow (object_temp
+ ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
- // Flush buffers.
- TAO::BufferingConstraint buffering_constraint;
- buffering_constraint.mode = TAO::BUFFER_FLUSH;
- buffering_constraint.message_count = 0;
- buffering_constraint.message_bytes = 0;
- buffering_constraint.timeout = 0;
-
- // Setup the buffering constraint any.
- CORBA::Any buffering_constraint_any;
- buffering_constraint_any <<= buffering_constraint;
-
- // Setup the buffering constraint policy list.
- CORBA::PolicyList buffering_constraint_policy_list (1);
- buffering_constraint_policy_list.length (1);
-
- // Setup the buffering constraint policy.
- buffering_constraint_policy_list[0] =
- orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
- buffering_constraint_any
- ACE_ENV_ARG_PARAMETER);
+ policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
- // Setup the constraints.
- policy_current->set_policy_overrides (buffering_constraint_policy_list,
- CORBA::ADD_OVERRIDE
- ACE_ENV_ARG_PARAMETER);
- ACE_CHECK;
-
- // We are done with the policy.
- buffering_constraint_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
- ACE_CHECK;
+ return object_with_two_policies._retn ();
}
int
@@ -263,18 +208,14 @@ main (int argc, char **argv)
ACE_TRY_CHECK;
// Try to narrow the object reference to a <test> reference.
- test_var test_object = test::_narrow (object.in ()
- ACE_ENV_ARG_PARAMETER);
- ACE_TRY_CHECK;
-
- // Setup buffering.
- setup_buffering_constraints (orb.in ()
- ACE_ENV_ARG_PARAMETER);
+ test_var test_object_no_policy = test::_narrow (object.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
- // Setup timeout.
- setup_timeouts (orb.in ()
- ACE_ENV_ARG_PARAMETER);
+ // Setup buffering and timeout
+ test_var test_object = setup_policies (orb.in (),
+ test_object_no_policy.in ()
+ ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
test::data the_data (data_bytes);
@@ -282,31 +223,44 @@ main (int argc, char **argv)
for (CORBA::ULong i = 1; i <= iterations; ++i)
{
- ACE_DEBUG ((LM_DEBUG,
- "client: Iteration %d @ %T\n",
- i));
-
+ ACE_Time_Value start = ACE_OS::gettimeofday ();
// Invoke the oneway method.
test_object->method (i,
+ start.msec (),
the_data,
work
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
+ ACE_Time_Value end = ACE_OS::gettimeofday ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "client:\t%d took\t%dms\n",
+ i, (end - start).msec ()));
+
// Interval between successive calls.
- ACE_Time_Value sleep_interval (0,
- interval * 1000);
+ ACE_Time_Value sleep_interval (0, interval * 1000);
- ACE_OS::sleep (sleep_interval);
+ // If we don't run the orb, then no data will be sent, and no
+ // connection will be made initially.
+ orb->run (sleep_interval ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
}
- // Shutdown server.
+ test_object_no_policy->flush (ACE_ENV_SINGLE_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
+ ACE_DEBUG ((LM_DEBUG, "client: Shutting down...\n"));
if (shutdown_server)
{
- test_object->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
+ long now = ACE_OS::gettimeofday ().msec ();
+ test_object_no_policy->shutdown (now ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
+ orb->shutdown (1 ACE_ENV_ARG_PARAMETER);
+ ACE_TRY_CHECK;
+
// Destroy the ORB. On some platforms, e.g., Win32, the socket
// library is closed at the end of main(). This means that any
// socket calls made after main() fail. Hence if we wait for
diff --git a/TAO/tests/Timed_Buffered_Oneways/run_test.pl b/TAO/tests/Timed_Buffered_Oneways/run_test.pl
index bf2638950c7..16ef5e32ab3 100755
--- a/TAO/tests/Timed_Buffered_Oneways/run_test.pl
+++ b/TAO/tests/Timed_Buffered_Oneways/run_test.pl
@@ -17,9 +17,13 @@ if (PerlACE::is_vxworks_test()) {
$SV = new PerlACE::ProcessVX ("server", "-o server.ior");
}
else {
- $SV = new PerlACE::Process ("server", "-o $iorfile");
+ $SV = new PerlACE::Process ("server", "-o $iorfile"
+." -ORBDebugLevel 6 -ORBLogFile server.log"
+);
}
-$CL = new PerlACE::Process ("client", "-k file://$iorfile -x");
+$CL = new PerlACE::Process ("client", "-k file://$iorfile -x"
+." -ORBDebugLevel 6 -ORBLogFile client.log"
+);
$SV->Spawn ();
@@ -30,16 +34,16 @@ if (PerlACE::waitforfile_timed ($iorfile,
exit 1;
}
-$client = $CL->SpawnWaitKill (200);
-
+$client = $CL->SpawnWaitKill (120);
if ($client != 0) {
$time = localtime;
print STDERR "ERROR: client returned $client at $time\n";
$status = 1;
}
-$server = $SV->WaitKill (100);
+print "Client finished...\n";
+$server = $SV->WaitKill (10);
if ($server != 0) {
$time = localtime;
print STDERR "ERROR: server returned $server at $time\n";
diff --git a/TAO/tests/Timed_Buffered_Oneways/server.cpp b/TAO/tests/Timed_Buffered_Oneways/server.cpp
index 20af40930ea..19009369e80 100644
--- a/TAO/tests/Timed_Buffered_Oneways/server.cpp
+++ b/TAO/tests/Timed_Buffered_Oneways/server.cpp
@@ -101,7 +101,7 @@ main (int argc, char *argv[])
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- "Exception caught:");
+ "Server side exception caught:");
return -1;
}
ACE_ENDTRY;
diff --git a/TAO/tests/Timed_Buffered_Oneways/test.idl b/TAO/tests/Timed_Buffered_Oneways/test.idl
index ce512ae6fb2..3035dc7f5fd 100644
--- a/TAO/tests/Timed_Buffered_Oneways/test.idl
+++ b/TAO/tests/Timed_Buffered_Oneways/test.idl
@@ -6,8 +6,9 @@ interface test
{
typedef sequence<octet> data;
oneway void method (in unsigned long request_number,
+ in long start_time,
in data d,
in unsigned long work);
-
- oneway void shutdown ();
+ void flush ();
+ oneway void shutdown (in long start_time);
};
diff --git a/TAO/tests/Timed_Buffered_Oneways/test_i.cpp b/TAO/tests/Timed_Buffered_Oneways/test_i.cpp
index 159c09067ec..54c6c393c88 100644
--- a/TAO/tests/Timed_Buffered_Oneways/test_i.cpp
+++ b/TAO/tests/Timed_Buffered_Oneways/test_i.cpp
@@ -2,6 +2,8 @@
#include "test_i.h"
#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_time.h"
+#include "ace/Time_Value.h"
ACE_RCSID(Timed_Buffered_Oneways, test_i, "$Id$")
@@ -12,27 +14,40 @@ test_i::test_i (CORBA::ORB_ptr orb)
void
test_i::method (CORBA::ULong request_number,
+ CORBA::Long start_time,
const test::data &,
CORBA::ULong work
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
+ ACE_Time_Value start (0);
+ start.msec (start_time);
ACE_DEBUG ((LM_DEBUG,
- "server: Iteration %d @ %T\n",
- request_number));
+ "server:\t%d took\t%dms\n",
+ request_number,
+ (ACE_OS::gettimeofday () - start).msec ()));
// Time required to process this request. <work> is time units in
// milli seconds.
- ACE_Time_Value work_time (0,
- work * 1000);
+ ACE_Time_Value work_time (0, work * 1000);
ACE_OS::sleep (work_time);
}
void
-test_i::shutdown (ACE_ENV_SINGLE_ARG_DECL)
+test_i::flush (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException))
{
- this->orb_->shutdown (0
- ACE_ENV_ARG_PARAMETER);
+}
+
+void
+test_i::shutdown (CORBA::Long start_time ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ ACE_Time_Value start (0);
+ start.msec (start_time);
+ ACE_DEBUG ((LM_DEBUG, "server: Shutting down... (%dms)\n",
+ (ACE_OS::gettimeofday() - start).msec ()));
+ this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
}
diff --git a/TAO/tests/Timed_Buffered_Oneways/test_i.h b/TAO/tests/Timed_Buffered_Oneways/test_i.h
index 48b20f4057f..ef92af5e074 100644
--- a/TAO/tests/Timed_Buffered_Oneways/test_i.h
+++ b/TAO/tests/Timed_Buffered_Oneways/test_i.h
@@ -1,3 +1,4 @@
+// -*- C++ -*-
// $Id$
// ============================================================================
@@ -29,12 +30,16 @@ public:
// = The test interface methods.
void method (CORBA::ULong request_number,
+ CORBA::Long start_time,
const test::data &,
CORBA::ULong work
ACE_ENV_ARG_DECL_NOT_USED)
ACE_THROW_SPEC ((CORBA::SystemException));
- void shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
+ void flush (ACE_ENV_SINGLE_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException));
+
+ void shutdown (CORBA::Long start_time ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((CORBA::SystemException));
private: