From 02b6be8e9fc42875d428cda382627512f6c04a53 Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Wed, 9 Aug 2006 15:39:56 +0000 Subject: importing initial work on this ticket into the subversion branch --- TAO/NEWS | 13 +- TAO/examples/Buffered_Oneways/client.cpp | 7 + .../orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp | 12 +- TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp | 11 +- .../PortableGroup/UIPMC_Connection_Handler.cpp | 4 +- TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp | 5 + .../orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp | 4 +- TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp | 16 +- TAO/tao/Asynch_Queued_Message.cpp | 35 +- TAO/tao/Asynch_Queued_Message.h | 15 + TAO/tao/Block_Flushing_Strategy.cpp | 9 +- TAO/tao/Connection_Handler.h | 5 +- TAO/tao/Connection_Handler.inl | 6 + TAO/tao/Flushing_Strategy.h | 6 + TAO/tao/IIOP_Connection_Handler.cpp | 4 +- TAO/tao/IIOP_Connector.cpp | 36 +- TAO/tao/Invocation_Adapter.cpp | 13 +- TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp | 11 +- TAO/tao/Messaging/Messaging_Policy_i.cpp | 7 +- TAO/tao/Profile_Transport_Resolver.cpp | 55 ++- TAO/tao/Queued_Message.cpp | 6 + TAO/tao/Queued_Message.h | 15 +- TAO/tao/Strategies/SCIOP_Connection_Handler.cpp | 4 +- TAO/tao/Strategies/SCIOP_Connector.cpp | 16 +- TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp | 4 +- TAO/tao/Strategies/UIOP_Connection_Handler.cpp | 4 +- TAO/tao/Strategies/UIOP_Connector.cpp | 15 +- TAO/tao/Synch_Invocation.cpp | 2 +- TAO/tao/Synch_Queued_Message.cpp | 5 +- TAO/tao/Transport.cpp | 141 ++++-- TAO/tao/Transport.h | 10 +- TAO/tao/Transport_Connector.cpp | 271 +++++++----- TAO/tao/Transport_Queueing_Strategies.cpp | 14 +- TAO/tests/AMI_Buffering/client.cpp | 30 +- TAO/tests/AMI_Buffering/run_buffer_size.pl | 2 +- TAO/tests/AMI_Buffering/run_message_count.pl | 2 +- TAO/tests/AMI_Buffering/run_test.pl | 2 +- TAO/tests/AMI_Buffering/run_timeout.pl | 2 +- TAO/tests/AMI_Buffering/run_timeout_reactive.pl | 2 +- TAO/tests/Oneway_Buffering/client.cpp | 39 +- TAO/tests/Oneway_Buffering/run_buffer_size.pl | 2 +- TAO/tests/Oneway_Buffering/run_message_count.pl | 2 +- TAO/tests/Oneway_Buffering/run_test.pl | 2 +- TAO/tests/Oneway_Buffering/run_timeout.pl | 2 +- TAO/tests/Oneway_Buffering/run_timeout_reactive.pl | 2 +- TAO/tests/Oneway_Timeouts/Test.idl | 4 + TAO/tests/Oneway_Timeouts/client.cpp | 488 +++++++++++++++++++++ TAO/tests/Oneway_Timeouts/run_test.pl | 392 +++++++++++++++++ TAO/tests/Oneway_Timeouts/server.cpp | 318 ++++++++++++++ TAO/tests/Oneway_Timeouts/test.mpc | 26 ++ .../Queued_Message_Test/Queued_Message_Test.cpp | 3 +- TAO/tests/Timed_Buffered_Oneways/client.cpp | 186 +++----- TAO/tests/Timed_Buffered_Oneways/run_test.pl | 14 +- TAO/tests/Timed_Buffered_Oneways/server.cpp | 2 +- TAO/tests/Timed_Buffered_Oneways/test.idl | 5 +- TAO/tests/Timed_Buffered_Oneways/test_i.cpp | 29 +- TAO/tests/Timed_Buffered_Oneways/test_i.h | 7 +- 57 files changed, 1901 insertions(+), 443 deletions(-) create mode 100644 TAO/tests/Oneway_Timeouts/Test.idl create mode 100644 TAO/tests/Oneway_Timeouts/client.cpp create mode 100755 TAO/tests/Oneway_Timeouts/run_test.pl create mode 100644 TAO/tests/Oneway_Timeouts/server.cpp create mode 100644 TAO/tests/Oneway_Timeouts/test.mpc diff --git a/TAO/NEWS b/TAO/NEWS index 3aa6c6defd3..2ef85cea403 100644 --- a/TAO/NEWS +++ b/TAO/NEWS @@ -27,7 +27,18 @@ USER VISIBLE CHANGES BETWEEN TAO-1.5.2 and TAO-1.5.3 . Fixed a compatibility problem between TAO and ORBs using Valuetype ID indirection, such as JacORB. This problem was inadvertently introduced - in 1.5.2 + in 1.5.2. + +. Oneway requests when using SYNC_NONE, SYNC_DELAYED_BUFFERING, or + SYNC_EAGER_BUFFERING will be queued until the connection is completed. + Connection completion occurs when one of the following occurs: + a) A one-way request that does NOT use one of the sync-scopes mentioned + above is sent via the same transport + b) A two-way request is sent via the same transport + c) orb->run() is called + Applications that do not currently do one of the above will no longer + establish a connection and therefore no data will be sent. + USER VISIBLE CHANGES BETWEEN TAO-1.5.1 and TAO-1.5.2 ==================================================== diff --git a/TAO/examples/Buffered_Oneways/client.cpp b/TAO/examples/Buffered_Oneways/client.cpp index 7851ec8638e..8f1469da973 100644 --- a/TAO/examples/Buffered_Oneways/client.cpp +++ b/TAO/examples/Buffered_Oneways/client.cpp @@ -230,6 +230,13 @@ main (int argc, char **argv) ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; + if (buffering_constraint.mode == TAO::BUFFER_FLUSH) + { + ACE_ERROR((LM_ERROR, "Error : Must specify a timeout, message" + " count, or message bytes constraint.\n")); + return 1; + } + // Setup the constraints. policy_current->set_policy_overrides (buffering_constraint_policy_list, CORBA::ADD_OVERRIDE diff --git a/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp b/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp index b53265cbfa5..638639dfb10 100755 --- a/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp +++ b/TAO/orbsvcs/orbsvcs/FaultTolerance/FT_ClientPolicy_i.cpp @@ -87,11 +87,9 @@ TAO_FT_Request_Duration_Policy::set_time_value (ACE_Time_Value &time_value) if (TAO_debug_level > 0) { - CORBA::ULong msecs = - static_cast (microseconds / 1000); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%u>\n"), - msecs)); + ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%dms>\n"), + time_value.msec ())); } } @@ -182,11 +180,9 @@ TAO_FT_Heart_Beat_Policy::set_time_value (ACE_Time_Value &time_value, if (TAO_debug_level > 0) { - CORBA::ULong msecs = - static_cast (microseconds / 1000); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%u>\n"), - msecs)); + ACE_TEXT ("TAO_FT (%P|%t) - Timeout is <%dms>\n"), + time_value.msec ())); } } diff --git a/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp index c2a2dde9735..720f93a2a2f 100644 --- a/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp +++ b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Connector.cpp @@ -190,14 +190,9 @@ TAO::HTIOP::Connector::make_connection (TAO::Profile_Transport_Resolver *r, this->active_connect_strategy_->synch_options (timeout, synch_options); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!r->blocked_connect ()) - { - synch_options.timeout (ACE_Time_Value::zero); - timeout = &tmp_zero; - } + // The code used to set the timeout to zero, with the intent of + // polling the reactor for connection completion. However, the side-effect + // was to cause the connection to timeout immediately. // This is where we need to set the ACE::HTBP::Stream to the connection // handler. diff --git a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp index df2fbd006c7..dbe916788af 100644 --- a/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp +++ b/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Connection_Handler.cpp @@ -212,7 +212,9 @@ TAO_UIPMC_Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp index 51a3db9a486..61ae03facc1 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/IIOP_SSL_Connector.cpp @@ -215,6 +215,11 @@ TAO::IIOP_SSL_Connector::make_connection ( return 0; } + if (transport->connection_handler ()->keep_waiting ()) + { + svc_handler->add_reference (); + } + // At this point, the connection has be successfully connected. // #REFCOUNT# is one. if (TAO_debug_level > 2) diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp index 57a6397a818..9bf664bb848 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connection_Handler.cpp @@ -281,7 +281,9 @@ TAO::SSLIOP::Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp index 3635555cb98..32174c6b252 100644 --- a/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp +++ b/TAO/orbsvcs/orbsvcs/SSLIOP/SSLIOP_Connector.cpp @@ -635,14 +635,9 @@ TAO::SSLIOP::Connector::ssliop_connect ( this->active_connect_strategy_->synch_options (max_wait_time, synch_options); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!resolver->blocked_connect ()) - { - synch_options.timeout (ACE_Time_Value::zero); - max_wait_time = &tmp_zero; - } + // The code used to set the timeout to zero, with the intent of + // polling the reactor for connection completion. However, the side-effect + // was to cause the connection to timeout immediately. // We obtain the transport in the variable. As we // know now that the connection is not available in Cache we can @@ -710,6 +705,11 @@ TAO::SSLIOP::Connector::ssliop_connect ( return 0; } + if (transport->connection_handler ()->keep_waiting ()) + { + svc_handler->add_reference (); + } + // At this point, the connection has be successfully connected. // #REFCOUNT# is one. if (TAO_debug_level > 2) diff --git a/TAO/tao/Asynch_Queued_Message.cpp b/TAO/tao/Asynch_Queued_Message.cpp index 7c6ec5020e8..cbdd57b7e08 100644 --- a/TAO/tao/Asynch_Queued_Message.cpp +++ b/TAO/tao/Asynch_Queued_Message.cpp @@ -8,7 +8,7 @@ #include "ace/Log_Msg.h" #include "ace/Message_Block.h" #include "ace/Malloc_Base.h" - +#include "ace/High_Res_Timer.h" ACE_RCSID (tao, Asynch_Queued_Message, @@ -19,12 +19,16 @@ TAO_BEGIN_VERSIONED_NAMESPACE_DECL TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message ( const ACE_Message_Block *contents, TAO_ORB_Core *oc, + ACE_Time_Value *timeout, ACE_Allocator *alloc, bool is_heap_allocated) : TAO_Queued_Message (oc, alloc, is_heap_allocated) , size_ (contents->total_length ()) , offset_ (0) + , abs_timeout_ (ACE_Time_Value::zero) { + if (timeout != 0 && *timeout != ACE_Time_Value::zero) + this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout; // @@ Use a pool for these guys!! ACE_NEW (this->buffer_, char[this->size_]); @@ -43,11 +47,13 @@ TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message ( TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf, TAO_ORB_Core *oc, size_t size, + const ACE_Time_Value &abs_timeout, ACE_Allocator *alloc) - : TAO_Queued_Message (oc, alloc) + : TAO_Queued_Message (oc, alloc, 0) , size_ (size) , offset_ (0) , buffer_ (buf) + , abs_timeout_ (abs_timeout) { } @@ -133,6 +139,7 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc) TAO_Asynch_Queued_Message (buf, this->orb_core_, sz, + this->abs_timeout_, alloc), 0); } @@ -150,7 +157,8 @@ TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc) ACE_NEW_RETURN (qm, TAO_Asynch_Queued_Message (buf, this->orb_core_, - sz), + sz, + this->abs_timeout_), 0); } @@ -180,7 +188,28 @@ TAO_Asynch_Queued_Message::destroy (void) delete this; } } +} +bool +TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const +{ + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Asynch_Queued_Message::is_expired - " + "Age of message is <%dms> this = %x.\n", + (now - this->abs_timeout_).msec (), this)); + if (this->offset_ > 0) + { + // This debug is for testing purposes! + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Asynch_Queued_Message::is_expired - " + "Can't expire message due to partial send. \n")); + return false; //never expire partial messages + } + if (this->abs_timeout_ > ACE_Time_Value::zero) + { + return this->abs_timeout_ < now; + } + return false; } TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Asynch_Queued_Message.h b/TAO/tao/Asynch_Queued_Message.h index 045f3dcd4fd..991a9f07063 100644 --- a/TAO/tao/Asynch_Queued_Message.h +++ b/TAO/tao/Asynch_Queued_Message.h @@ -17,12 +17,15 @@ #include "tao/Queued_Message.h" +#include "ace/Time_Value.h" + #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once #endif /* ACE_LACKS_PRAGMA_ONCE */ TAO_BEGIN_VERSIONED_NAMESPACE_DECL +class ACE_Message_Block; /** * @class TAO_Asynch_Queued_Message * @@ -39,6 +42,9 @@ public: * * @param alloc Allocator used for creating @c this object. * + * @param timeout The relative timeout after which this + * message should be expired. + * * @todo I'm almost sure this class will require a callback * interface for AMIs sent with SYNC_NONE policy. Those guys * need to hear when the connection timeouts or closes, but @@ -46,6 +52,7 @@ public: */ TAO_Asynch_Queued_Message (const ACE_Message_Block *contents, TAO_ORB_Core *oc, + ACE_Time_Value *timeout, ACE_Allocator *alloc = 0, bool is_heap_allocated = false); @@ -65,6 +72,7 @@ public: /// it here for the sake of uniformity. virtual TAO_Queued_Message *clone (ACE_Allocator *alloc); virtual void destroy (void); + virtual bool is_expired (const ACE_Time_Value &now) const; //@} protected: @@ -78,11 +86,14 @@ protected: * @param size The size of the buffer that is being handed * over. * + * @param abs_timeout The time after which this message should be expired. + * * @param alloc Allocator used for creating object. */ TAO_Asynch_Queued_Message (char *buf, TAO_ORB_Core *oc, size_t size, + const ACE_Time_Value &abs_timeout, ACE_Allocator *alloc = 0); private: /// The number of bytes in the buffer @@ -97,6 +108,10 @@ private: /// The buffer containing the complete message. char *buffer_; + + // Expiration time + ACE_Time_Value abs_timeout_; + }; TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Block_Flushing_Strategy.cpp b/TAO/tao/Block_Flushing_Strategy.cpp index 8e0282ecfdd..87082f19819 100644 --- a/TAO/tao/Block_Flushing_Strategy.cpp +++ b/TAO/tao/Block_Flushing_Strategy.cpp @@ -9,14 +9,9 @@ ACE_RCSID(tao, Block_Flushing_Strategy, "$Id$") TAO_BEGIN_VERSIONED_NAMESPACE_DECL int -TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *transport) +TAO_Block_Flushing_Strategy::schedule_output (TAO_Transport *) { - while (!transport->queue_is_empty_i ()) - { - if (transport->drain_queue_i () == -1) - return -1; - } - return 0; + return MUST_FLUSH; } int diff --git a/TAO/tao/Connection_Handler.h b/TAO/tao/Connection_Handler.h index e17ff6af08f..2342390dd5c 100644 --- a/TAO/tao/Connection_Handler.h +++ b/TAO/tao/Connection_Handler.h @@ -70,12 +70,15 @@ public: /// Set the underlying transport object void transport (TAO_Transport* transport); - /// Is the handler closed? + /// Is the handler closed or timed out? bool is_closed (void) const; /// Is the handler open? bool is_open (void) const; + /// Closed due to timeout? + bool is_timeout (void) const; + /// Is the handler in the process of being connected? bool is_connecting (void) const; diff --git a/TAO/tao/Connection_Handler.inl b/TAO/tao/Connection_Handler.inl index 5841cf3aebc..48e1b0b1b41 100644 --- a/TAO/tao/Connection_Handler.inl +++ b/TAO/tao/Connection_Handler.inl @@ -23,6 +23,12 @@ TAO_Connection_Handler::is_closed (void) const this->state_ == TAO_LF_Event::LFS_TIMEOUT); } +ACE_INLINE bool +TAO_Connection_Handler::is_timeout (void) const +{ + return (this->state_ == TAO_LF_Event::LFS_TIMEOUT); +} + ACE_INLINE bool TAO_Connection_Handler::is_open (void) const { diff --git a/TAO/tao/Flushing_Strategy.h b/TAO/tao/Flushing_Strategy.h index ead55aec8f2..61c9b51de10 100644 --- a/TAO/tao/Flushing_Strategy.h +++ b/TAO/tao/Flushing_Strategy.h @@ -56,7 +56,13 @@ public: /// Destructor virtual ~TAO_Flushing_Strategy (void); + enum SCHEDULE_OUTPUT_RETURN { MUST_FLUSH = -2 }; + /// Schedule the transport argument to be flushed + /// If -2 is returned then the caller must call one of + /// the flush_* methods. + /// If -1 is returned then there was an error. + /// If 0 is returned then the flush was scheduled successfully. virtual int schedule_output (TAO_Transport *transport) = 0; /// Cancel all scheduled output for the transport argument diff --git a/TAO/tao/IIOP_Connection_Handler.cpp b/TAO/tao/IIOP_Connection_Handler.cpp index 0f59383abe5..6a56bb9e0b9 100644 --- a/TAO/tao/IIOP_Connection_Handler.cpp +++ b/TAO/tao/IIOP_Connection_Handler.cpp @@ -322,7 +322,9 @@ TAO_IIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index ec2165a07e4..4d98fe433ff 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -60,6 +60,7 @@ TAO_IIOP_Connection_Handler_Array_Guard::TAO_IIOP_Connection_Handler_Array_Guard TAO_IIOP_Connection_Handler_Array_Guard::~TAO_IIOP_Connection_Handler_Array_Guard (void) { + ACE_Errno_Guard eguard (errno); if (this->ptr_ != 0) { for (unsigned i = 0; i < this->count_; i++) @@ -205,9 +206,10 @@ TAO_IIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, TAO_IIOP_Endpoint **ep_ptr = &iiop_endpoint; TAO_LF_Multi_Event mev; mev.add_event(svc_handler); - return this->complete_connection (result, desc, - sh_ptr, ep_ptr, - 1U, r, &mev, timeout); + TAO_Transport* tp = this->complete_connection (result, desc, + sh_ptr, ep_ptr, + 1U, r, &mev, timeout); + return tp; } TAO_Transport * @@ -327,14 +329,9 @@ TAO_IIOP_Connector::begin_connection (TAO_IIOP_Connection_Handler *&svc_handler, this->active_connect_strategy_->synch_options (timeout, synch_options); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!r->blocked_connect ()) - { - synch_options.timeout (ACE_Time_Value::zero); - timeout = &tmp_zero; - } + // The code used to set the timeout to zero, with the intent of + // polling the reactor for connection completion. However, the side-effect + // was to cause the connection to timeout immediately. svc_handler = 0; @@ -427,7 +424,7 @@ TAO_IIOP_Connector::complete_connection (int result, } } } - // At this point, the connection has be successfully created + // At this point, the connection has been successfully created // connected or not connected, but we have a connection. TAO_IIOP_Connection_Handler *svc_handler = 0; TAO_IIOP_Endpoint *iiop_endpoint = 0; @@ -461,14 +458,15 @@ TAO_IIOP_Connector::complete_connection (int result, if (TAO_debug_level > 3) { for (unsigned i = 0; i < count; i++) - ACE_DEBUG ((LM_ERROR, - ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection, ") - ACE_TEXT("connection to <%s:%d> failed (%p)\n"), - ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()), - ep_list[i]->port (), - ACE_TEXT("errno"))); + { + ACE_DEBUG ((LM_ERROR, + ACE_TEXT ("(%P|%t) IIOP_Connector::make_connection,") + ACE_TEXT (" connection to <%s:%d> failed (%p)\n"), + ACE_TEXT_CHAR_TO_TCHAR (ep_list[i]->host ()), + ep_list[i]->port (), + ACE_TEXT("errno"))); + } } - return 0; } diff --git a/TAO/tao/Invocation_Adapter.cpp b/TAO/tao/Invocation_Adapter.cpp index 39ac727b340..17d91b4db07 100644 --- a/TAO/tao/Invocation_Adapter.cpp +++ b/TAO/tao/Invocation_Adapter.cpp @@ -12,6 +12,8 @@ #include "tao/Transport_Mux_Strategy.h" #include "tao/Collocation_Proxy_Broker.h" #include "tao/GIOP_Utils.h" +#include "tao/TAOC.h" + #if !defined (__ACE_INLINE__) # include "tao/Invocation_Adapter.inl" #endif /* __ACE_INLINE__ */ @@ -65,8 +67,6 @@ namespace TAO // Initial state TAO::Invocation_Status status = TAO_INVOKE_START; - ACE_Time_Value *max_wait_time = 0; - while (status == TAO_INVOKE_START || status == TAO_INVOKE_RESTART) { @@ -88,6 +88,7 @@ namespace TAO if (strat == TAO_CS_REMOTE_STRATEGY || strat == TAO_CS_LAST) { + ACE_Time_Value *max_wait_time = 0; status = this->invoke_remote_i (stub, details, @@ -262,12 +263,18 @@ namespace TAO (void) this->set_response_flags (stub, details); + CORBA::Octet rflags = details.response_flags (); + bool block_connect = + rflags != static_cast (Messaging::SYNC_NONE) + && rflags != static_cast (TAO::SYNC_EAGER_BUFFERING) + && rflags != static_cast (TAO::SYNC_DELAYED_BUFFERING); + // Create the resolver which will pick (or create) for us a // transport and a profile from the effective_target. Profile_Transport_Resolver resolver ( effective_target.in (), stub, - (details.response_flags () != Messaging::SYNC_NONE)); + block_connect); resolver.resolve (max_wait_time ACE_ENV_ARG_PARAMETER); diff --git a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp index d17338aae9c..b621a85b375 100644 --- a/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp +++ b/TAO/tao/Messaging/Connection_Timeout_Policy_i.cpp @@ -103,11 +103,9 @@ TAO_ConnectionTimeoutPolicy::hook (TAO_ORB_Core *orb_core, if (TAO_debug_level > 0) { - CORBA::ULong msecs = - static_cast (microseconds / 1000); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"), - msecs)); + ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"), + time_value.msec ())); } } ACE_CATCHANY @@ -188,10 +186,9 @@ TAO_ConnectionTimeoutPolicy::set_time_value (ACE_Time_Value &time_value) if (TAO_debug_level > 0) { - CORBA::ULong const msecs = time_value.msec (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"), - msecs)); + ACE_TEXT ("TAO (%P|%t) - Connect timeout is <%dms>\n"), + time_value.msec ())); } } diff --git a/TAO/tao/Messaging/Messaging_Policy_i.cpp b/TAO/tao/Messaging/Messaging_Policy_i.cpp index 35d3d25a37f..39d4cc2b50a 100644 --- a/TAO/tao/Messaging/Messaging_Policy_i.cpp +++ b/TAO/tao/Messaging/Messaging_Policy_i.cpp @@ -107,7 +107,7 @@ TAO_RelativeRoundtripTimeoutPolicy::hook (TAO_ORB_Core *orb_core, if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"), + ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"), time_value.msec ())); } } @@ -189,10 +189,9 @@ TAO_RelativeRoundtripTimeoutPolicy::set_time_value (ACE_Time_Value &time_value) if (TAO_debug_level > 0) { - CORBA::ULong msecs = time_value.msec (); ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Timeout is <%u>\n"), - msecs)); + ACE_TEXT ("TAO (%P|%t) - Request timeout is <%dms>\n"), + time_value.msec ())); } } diff --git a/TAO/tao/Profile_Transport_Resolver.cpp b/TAO/tao/Profile_Transport_Resolver.cpp index 81d3a67f52d..30cc30341be 100644 --- a/TAO/tao/Profile_Transport_Resolver.cpp +++ b/TAO/tao/Profile_Transport_Resolver.cpp @@ -123,28 +123,28 @@ namespace TAO bool Profile_Transport_Resolver::try_connect ( TAO_Transport_Descriptor_Interface *desc, - ACE_Time_Value *max_time_value + ACE_Time_Value *timeout ACE_ENV_ARG_DECL ) { - return this->try_connect_i (desc,max_time_value,0 ACE_ENV_ARG_PARAMETER); + return this->try_connect_i (desc, timeout, 0 ACE_ENV_ARG_PARAMETER); } bool Profile_Transport_Resolver::try_parallel_connect ( TAO_Transport_Descriptor_Interface *desc, - ACE_Time_Value *max_time_value + ACE_Time_Value *timeout ACE_ENV_ARG_DECL ) { - return this->try_connect_i (desc,max_time_value,1 ACE_ENV_ARG_PARAMETER); + return this->try_connect_i (desc, timeout, 1 ACE_ENV_ARG_PARAMETER); } bool Profile_Transport_Resolver::try_connect_i ( TAO_Transport_Descriptor_Interface *desc, - ACE_Time_Value *max_time_value, + ACE_Time_Value *timeout, bool parallel ACE_ENV_ARG_DECL ) @@ -165,41 +165,40 @@ namespace TAO } ACE_Time_Value connection_timeout; + bool has_con_timeout = this->get_connection_timeout (connection_timeout); - bool const is_conn_timeout = - this->get_connection_timeout (connection_timeout); - - ACE_Time_Value *max_wait_time = - is_conn_timeout ? &connection_timeout : max_time_value; + if (has_con_timeout && !blocked_) + { + timeout = &connection_timeout; + } + else if (has_con_timeout) + { + if (timeout == 0 || connection_timeout < *timeout) + timeout = &connection_timeout; + } + else if (!blocked_) + { + timeout = 0; + } + TAO_Connector *con = conn_reg->get_connector (desc->endpoint ()->tag ()); + ACE_ASSERT(con != 0); if (parallel) { - this->transport_ = - conn_reg->get_connector (desc->endpoint ()->tag ())-> - parallel_connect (this, - desc, - max_wait_time - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (false); + this->transport_ = con->parallel_connect (this, desc, timeout + ACE_ENV_ARG_PARAMETER); } else { - // Obtain a connection. - this->transport_ = - conn_reg->get_connector (desc->endpoint ()->tag ())-> - connect (this, - desc, - max_wait_time - ACE_ENV_ARG_PARAMETER); - ACE_CHECK_RETURN (false); + this->transport_ = con->connect (this, desc, timeout + ACE_ENV_ARG_PARAMETER); } + ACE_CHECK_RETURN (false); // A timeout error occurred. // If the user has set a roundtrip timeout policy, throw a timeout // exception. Otherwise, just fall through and return false to // look at the next endpoint. - if (this->transport_ == 0 - && is_conn_timeout == false - && errno == ETIME) + if (this->transport_ == 0 && errno == ETIME) { ACE_THROW_RETURN (CORBA::TIMEOUT ( CORBA::SystemException::_tao_minor_code ( diff --git a/TAO/tao/Queued_Message.cpp b/TAO/tao/Queued_Message.cpp index 8b4b73072c9..19cba1406a6 100644 --- a/TAO/tao/Queued_Message.cpp +++ b/TAO/tao/Queued_Message.cpp @@ -102,4 +102,10 @@ TAO_Queued_Message::push_front (TAO_Queued_Message *&head, } } +bool +TAO_Queued_Message::is_expired (const ACE_Time_Value &) const +{ + return false; +} + TAO_END_VERSIONED_NAMESPACE_DECL diff --git a/TAO/tao/Queued_Message.h b/TAO/tao/Queued_Message.h index 70defaa15cd..c8e8883fd11 100644 --- a/TAO/tao/Queued_Message.h +++ b/TAO/tao/Queued_Message.h @@ -17,6 +17,7 @@ #include "tao/LF_Invocation_Event.h" #include "ace/os_include/os_stddef.h" +#include "ace/Time_Value.h" #if !defined (ACE_LACKS_PRAGMA_ONCE) # pragma once @@ -76,8 +77,8 @@ class TAO_Export TAO_Queued_Message : public TAO_LF_Invocation_Event public: /// Constructor TAO_Queued_Message (TAO_ORB_Core *oc, - ACE_Allocator *alloc = 0, - bool is_heap_allocated = false); + ACE_Allocator *alloc, + bool is_heap_allocated); /// Destructor virtual ~TAO_Queued_Message (void); @@ -194,6 +195,16 @@ public: * a pool), they need to be reclaimed explicitly. */ virtual void destroy (void) = 0; + + /// Check for timeout + /** + * @param now Pass in the current time using + * ACE_High_Res_Timer::gettimeofday_hr(). + * This is a parameter in order to avoid calling gettimeofday_hr() inside + * of this method (which will be called in a tight loop). + * @return true if the relative roundtrip timeout has expired. + */ + virtual bool is_expired (const ACE_Time_Value &now) const; //@} protected: diff --git a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp index dd83f2fac6e..5df628bcc5c 100644 --- a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp @@ -250,7 +250,9 @@ TAO_SCIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/tao/Strategies/SCIOP_Connector.cpp b/TAO/tao/Strategies/SCIOP_Connector.cpp index 663bccdb60a..70f61e87ae0 100644 --- a/TAO/tao/Strategies/SCIOP_Connector.cpp +++ b/TAO/tao/Strategies/SCIOP_Connector.cpp @@ -172,14 +172,9 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r, this->active_connect_strategy_->synch_options (timeout, synch_options); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!r->blocked_connect()) - { - synch_options.timeout (ACE_Time_Value::zero); - timeout = &tmp_zero; - } + // The code used to set the timeout to zero, with the intent of + // polling the reactor for connection completion. However, the side-effect + // was to cause the connection to timeout immediately. TAO_SCIOP_Connection_Handler *svc_handler = 0; @@ -268,6 +263,11 @@ TAO_SCIOP_Connector::make_connection_i (TAO::Profile_Transport_Resolver *r, return 0; } + if (transport->connection_handler ()->keep_waiting ()) + { + svc_handler->add_reference (); + } + // At this point, the connection has be successfully connected. // #REFCOUNT# is one. if (TAO_debug_level > 2) diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp index 788407dfd7d..a584be7e368 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -204,7 +204,9 @@ TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp index ada81ab9075..f7f754cb197 100644 --- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp @@ -183,7 +183,9 @@ TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, // We don't use this upcall for I/O. This is only used by the // Connector to indicate that the connection timedout. Therefore, // we should call close(). - return this->close (); + int ret = this->close (); + this->reset_state (TAO_LF_Event::LFS_TIMEOUT); + return ret; } int diff --git a/TAO/tao/Strategies/UIOP_Connector.cpp b/TAO/tao/Strategies/UIOP_Connector.cpp index efce00f17d2..3bbec64d87e 100644 --- a/TAO/tao/Strategies/UIOP_Connector.cpp +++ b/TAO/tao/Strategies/UIOP_Connector.cpp @@ -169,14 +169,9 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, this->active_connect_strategy_->synch_options (max_wait_time, synch_options); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!r->blocked_connect ()) - { - synch_options.timeout (ACE_Time_Value::zero); - max_wait_time = &tmp_zero; - } + // The code used to set the timeout to zero, with the intent of + // polling the reactor for connection completion. However, the side-effect + // was to cause the connection to timeout immediately. TAO_UIOP_Connection_Handler *svc_handler = 0; @@ -248,6 +243,10 @@ TAO_UIOP_Connector::make_connection (TAO::Profile_Transport_Resolver *r, return 0; } + if (transport->connection_handler ()->keep_waiting ()) + { + svc_handler->add_reference (); + } // At this point, the connection has be successfully created // connected or not connected, but we have a connection. diff --git a/TAO/tao/Synch_Invocation.cpp b/TAO/tao/Synch_Invocation.cpp index bff4141a033..e38207cdbfa 100644 --- a/TAO/tao/Synch_Invocation.cpp +++ b/TAO/tao/Synch_Invocation.cpp @@ -772,7 +772,7 @@ namespace TAO "TAO (%P|%t) - Synch_Oneway_Invocation::" "remote_oneway, queueing message\n")); - if (transport->format_queue_message (cdr) != 0) + if (transport->format_queue_message (cdr, max_wait_time) != 0) s = TAO_INVOKE_FAILURE; } diff --git a/TAO/tao/Synch_Queued_Message.cpp b/TAO/tao/Synch_Queued_Message.cpp index 59fecb37311..afa14627a05 100644 --- a/TAO/tao/Synch_Queued_Message.cpp +++ b/TAO/tao/Synch_Queued_Message.cpp @@ -129,7 +129,8 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc) alloc->malloc (sizeof (TAO_Synch_Queued_Message))), TAO_Synch_Queued_Message (mb, this->orb_core_, - alloc), + alloc, + 0), 0); } else @@ -144,7 +145,7 @@ TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc) } ACE_NEW_RETURN (qm, - TAO_Synch_Queued_Message (mb, this->orb_core_), + TAO_Synch_Queued_Message (mb, this->orb_core_, 0, 0), 0); } diff --git a/TAO/tao/Transport.cpp b/TAO/tao/Transport.cpp index 7f3c7ebf4b2..b132d4ee612 100644 --- a/TAO/tao/Transport.cpp +++ b/TAO/tao/Transport.cpp @@ -27,6 +27,7 @@ #include "ace/OS_NS_stdio.h" #include "ace/Reactor.h" #include "ace/os_include/sys/os_uio.h" +#include "ace/High_Res_Timer.h" /* * Specialization hook to add include files from @@ -478,12 +479,13 @@ TAO_Transport::handle_output (void) } int -TAO_Transport::format_queue_message (TAO_OutputCDR &stream) +TAO_Transport::format_queue_message (TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time) { if (this->messaging_object ()->format_message (stream) != 0) return -1; - return this->queue_message_i (stream.begin()); + return this->queue_message_i (stream.begin (), max_wait_time); } int @@ -503,7 +505,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, size_t &bytes_transferred, ACE_Time_Value *) { - size_t const total_length = mb->total_length (); + const size_t total_length = mb->total_length (); // We are going to block, so there is no need to clone // the message block. @@ -512,7 +514,7 @@ TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb, synch_message.push_back (this->head_, this->tail_); - int const n = this->drain_queue_i (); + const int n = this->drain_queue_i (); if (n == -1) { @@ -541,14 +543,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // We are going to block, so there is no need to clone // the message block. TAO_Synch_Queued_Message synch_message (mb, this->orb_core_); + const size_t message_length = synch_message.message_length (); - // Push synch_message on to the back of the queue. synch_message.push_back (this->head_, this->tail_); - int const n = - this->send_synch_message_helper_i (synch_message, - max_wait_time); - + const int n = this->send_synch_message_helper_i (synch_message, + 0 /*ignored*/); if (n == -1 || n == 1) { return n; @@ -558,18 +558,30 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // if (max_wait_time != 0 && errno == ETIME) return -1; TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == -1) + { + synch_message.remove_from_list (this->head_, this->tail_); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_synchronous_message_i, ") + ACE_TEXT ("error while scheduling flush - %m\n"), + this->id ())); + } + return -1; + } + + // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH, + // because we're always going to flush anyway. // Release the mutex, other threads may modify the queue as we // block for a long time writing out data. - int result; { typedef ACE_Reverse_Lock TAO_REVERSE_LOCK; TAO_REVERSE_LOCK reverse (*this->handler_lock_); - ACE_GUARD_RETURN (TAO_REVERSE_LOCK, - ace_mon, - reverse, - -1); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); result = flushing_strategy->flush_message (this, &synch_message, @@ -582,7 +594,8 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, if (errno == ETIME) { - if (this->head_ == &synch_message) + // If partially sent, then we must queue the remainder. + if (message_length != synch_message.message_length ()) { // This is a timeout, there is only one nasty case: the // message has been partially sent! We simply cannot take @@ -597,13 +610,12 @@ TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb, // and figuring out the request ID would be a bit of a // nightmare. // - - synch_message.remove_from_list (this->head_, this->tail_); TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, TAO_Asynch_Queued_Message ( synch_message.current_block (), this->orb_core_, + 0, // no timeout 0, 1), -1); @@ -682,6 +694,13 @@ TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb, msg->remove_from_list (this->head_, this->tail_); msg->destroy (); } + else if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_message(this, msg, 0); + } return 1; } @@ -797,7 +816,14 @@ TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */, TAO_Flushing_Strategy *flushing_strategy = this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + typedef ACE_Reverse_Lock TAO_REVERSE_LOCK; + TAO_REVERSE_LOCK reverse (*this->handler_lock_); + ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1); + (void) flushing_strategy->flush_transport (this); + } } return 0; @@ -920,8 +946,29 @@ TAO_Transport::drain_queue_i (void) // call. this->sent_byte_count_ = 0; + // Avoid calling this expensive function each time through the loop. Instead + // we'll assume that the time is unlikely to change much during the loop. + // If we are forced to send in the loop then we'll recompute the time. + ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); + while (i != 0) { + if (i->is_expired (now)) + { + if (TAO_debug_level > 3) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t - Transport[%d]::drain_queue_i, ") + ACE_TEXT ("Discarding expired queued message.\n"), + this->id ())); + } + i->state_changed (TAO_LF_Event::LFS_TIMEOUT, + this->orb_core_->leader_follower ()); + i->remove_from_list (this->head_, this->tail_); + i->destroy (); + i = this->head_; + continue; + } // ... each element fills the iovector ... i->fill_iov (ACE_IOV_MAX, iovcnt, iov); @@ -933,6 +980,8 @@ TAO_Transport::drain_queue_i (void) int const retval = this->drain_queue_helper (iovcnt, iov); + now = ACE_High_Res_Timer::gettimeofday_hr (); + if (TAO_debug_level > 4) { ACE_DEBUG ((LM_DEBUG, @@ -999,11 +1048,19 @@ TAO_Transport::cleanup_queue_i () this->id ())); } + int byte_count = 0; + int msg_count = 0; + // Cleanup all messages while (this->head_ != 0) { TAO_Queued_Message *i = this->head_; + if (TAO_debug_level > 4) + { + byte_count += i->message_length(); + ++msg_count; + } // @@ This is a good point to insert a flag to indicate that a // CloseConnection message was successfully received. i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED, @@ -1012,7 +1069,15 @@ TAO_Transport::cleanup_queue_i () i->remove_from_list (this->head_, this->tail_); i->destroy (); - } + } + + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ") + ACE_TEXT ("discarded %d messages, %d bytes.\n"), + this->id (), msg_count, byte_count)); + } } void @@ -1232,6 +1297,10 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, return 0; } + // If it was partially sent, then we can't allow a timeout + if (byte_count > 0) + max_wait_time = 0; + if (TAO_debug_level > 6) { ACE_DEBUG ((LM_DEBUG, @@ -1262,14 +1331,16 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, this->id ())); } - if (this->queue_message_i(message_block) == -1) + if (this->queue_message_i (message_block, max_wait_time) == -1) { if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ") - ACE_TEXT ("cannot queue message for ") - ACE_TEXT (" - %m\n"), - this->id ())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - Transport[%d]::") + ACE_TEXT ("send_asynchronous_message_i, ") + ACE_TEXT ("cannot queue message for - %m\n"), + this->id ())); + } return -1; } @@ -1289,7 +1360,11 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, if (constraints_reached || try_sending_first) { - (void) flushing_strategy->schedule_output (this); + int result = flushing_strategy->schedule_output (this); + if (result == TAO_Flushing_Strategy::MUST_FLUSH) + { + must_flush = true; + } } if (must_flush) @@ -1305,12 +1380,14 @@ TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub, } int -TAO_Transport::queue_message_i(const ACE_Message_Block *message_block) +TAO_Transport::queue_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) { TAO_Queued_Message *queued_message = 0; ACE_NEW_RETURN (queued_message, TAO_Asynch_Queued_Message (message_block, this->orb_core_, + max_wait_time, 0, 1), -1); @@ -2387,13 +2464,7 @@ TAO_Transport::post_open (size_t id) // If the wait strategy wants us to be registered with the reactor // then we do so. If registeration is required and it succeeds, // #REFCOUNT# becomes two. - if (this->wait_strategy ()->register_handler () == 0) - { - TAO_Flushing_Strategy *flushing_strategy = - this->orb_core ()->flushing_strategy (); - (void) flushing_strategy->schedule_output (this); - } - else + if (this->wait_strategy ()->register_handler () != 0) { // Registration failures. diff --git a/TAO/tao/Transport.h b/TAO/tao/Transport.h index a563cc42727..ea5623072c3 100644 --- a/TAO/tao/Transport.h +++ b/TAO/tao/Transport.h @@ -680,11 +680,17 @@ protected: ACE_Time_Value *max_wait_time); /// Queue a message for @a message_block - int queue_message_i (const ACE_Message_Block *message_block); + /// @param max_wait_time The maximum time that the operation can + /// block, used in the implementation of timeouts. + int queue_message_i (const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); public: /// Format and queue a message for @a stream - int format_queue_message (TAO_OutputCDR &stream); + /// @param max_wait_time The maximum time that the operation can + /// block, used in the implementation of timeouts. + int format_queue_message (TAO_OutputCDR &stream, + ACE_Time_Value *max_wait_time); /// Send a message block chain, int send_message_block_chain (const ACE_Message_Block *message_block, diff --git a/TAO/tao/Transport_Connector.cpp b/TAO/tao/Transport_Connector.cpp index 33313169d3c..24c1cd19a43 100644 --- a/TAO/tao/Transport_Connector.cpp +++ b/TAO/tao/Transport_Connector.cpp @@ -302,10 +302,12 @@ TAO_Connector::parallel_connect (TAO::Profile_Transport_Resolver *r, base_transport) == 0) { if (TAO_debug_level) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ") - ACE_TEXT ("found a transport [%d]\n"), - base_transport->id())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) TAO_Connector::parallel_connect: ") + ACE_TEXT ("found a transport [%d]\n"), + base_transport->id ())); + } return base_transport; } } @@ -357,10 +359,12 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, t->opened_as (TAO::TAO_CLIENT_ROLE); if (TAO_debug_level > 4) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%P|%t) Transport_Connector::connect, ") - ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"), - t->id ())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("(%P|%t) Transport_Connector::connect, ") + ACE_TEXT("opening Transport[%d] in TAO_CLIENT_ROLE\n"), + t->id ())); + } // Call post connect hook. If the post_connect_hook () returns // false, just purge the entry. @@ -393,7 +397,6 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, "TAO_UNSPECIFIED_ROLE" )); } - // If connected return. if (base_transport->is_connected ()) return base_transport; @@ -414,15 +417,17 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, timeout)) { if (TAO_debug_level > 2) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Connector::" - "connect, " - "wait for completion failed\n")); + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Connector::" + "connect, " + "wait for completion failed\n")); + } return 0; } if (base_transport->is_connected () && - base_transport->wait_strategy ()->register_handler () == 0) + base_transport->wait_strategy ()->register_handler () == -1) { // Registration failures. @@ -434,12 +439,13 @@ TAO_Connector::connect (TAO::Profile_Transport_Resolver *r, (void) base_transport->close_connection (); if (TAO_debug_level > 0) - ACE_ERROR ((LM_ERROR, - "TAO (%P|%t) - Transport_Connector [%d]::connect, " - "could not register the transport " - "in the reactor.\n", - base_transport->id ())); - + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Connector [%d]::connect, " + "could not register the transport " + "in the reactor.\n", + base_transport->id ())); + } return 0; } @@ -452,88 +458,121 @@ TAO_Connector::wait_for_connection_completion ( TAO_Transport *&transport, ACE_Time_Value *timeout) { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::" - "wait_for_connection_completion, " - "going to wait for connection completion on transport" - "[%d]\n", - transport->id ())); - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); + int result = -1; if (!r->blocked_connect ()) { - timeout = &tmp_zero; - } - - // Wait until the connection is ready, when non-blocking we just do a wait - // with zero time - int result = - this->active_connect_strategy_->wait ( - transport, - timeout); - - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::" - "wait_for_connection_completion, " - "transport [%d], wait done result = %d\n", - transport->id (), result)); - - // There are three possibilities when wait() returns: (a) - // connection succeeded; (b) connection failed; (c) wait() - // failed because of some other error. It is easy to deal with - // (a) and (b). (c) is tricky since the connection is still - // pending and may get completed by some other thread. The - // following method deals with (c). - - if (result == -1) - { - if (!r->blocked_connect () && errno == ETIME) + if (transport->connection_handler ()->is_open ()) { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::" - "wait_for_connection_completion, " - "transport [%d], timeout, resetting state.\n", - transport->id ())); - transport->connection_handler()-> - reset_state(TAO_LF_Event::LFS_CONNECTION_WAIT); - // If we did a non blocking connect, just ignore - // any timeout errors result = 0; } - else + else if (transport->connection_handler ()->is_timeout ()) { - // When we need to get a connected transport - result = - this->check_connection_closure ( - transport->connection_handler ()); + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], Connection timed out.\n", + transport->id ())); + } + result = -1; + errno = ETIME; } - - // In case of errors. - if (result == -1) + else if (transport->connection_handler ()->is_closed ()) { - // Report that making the connection failed, don't print errno - // because we touched the reactor and errno could be changed if (TAO_debug_level > 2) - ACE_ERROR ((LM_ERROR, + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], Connection failed. (%d) %p\n", + transport->id (), errno, "")); + } + result = -1; + } + else + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], Connection not complete.\n", + transport->id ())); + } + transport->connection_handler ()-> + reset_state (TAO_LF_Event::LFS_CONNECTION_WAIT); + result = 0; + } + } + else + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport_Connector::" "wait_for_connection_completion, " - "transport [%d], wait for completion failed\n", - transport->id())); - - - // Set transport to zero, it is not usable, and the reference - // count we added above was decremented by the base connector - // handling the connection failure. - transport = 0; + "going to wait for connection completion on transport" + "[%d]\n", + transport->id ())); + } + result = this->active_connect_strategy_->wait (transport, timeout); - return false; + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], wait done result = %d\n", + transport->id (), result)); } + // There are three possibilities when wait() returns: (a) + // connection succeeded; (b) connection failed; (c) wait() + // failed because of some other error. It is easy to deal with + // (a) and (b). (c) is tricky since the connection is still + // pending and may get completed by some other thread. The + // following code deals with (c). + + if (result == -1) + { + if (errno == ETIME) + { + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], Connection timed out.\n", + transport->id ())); + } + } + else + { + // The wait failed for some other reason. + // Report that making the connection failed, don't print errno + // because we touched the reactor and errno could be changed + if (TAO_debug_level > 2) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "transport [%d], wait for completion failed (%d) %p\n", + transport->id (), errno, "")); + } + TAO_Connection_Handler *con = transport->connection_handler (); + result = this->check_connection_closure (con); + } + } } + if (result == -1) + { + // Set transport to zero, it is not usable, and the reference + // count we added above was decremented by the base connector + // handling the connection failure. + transport = 0; + return false; + } // Connection not ready yet but we can use this transport, if // we need a connected one we will block later to make sure // it is connected @@ -559,30 +598,32 @@ TAO_Connector::wait_for_connection_completion ( count)); for (unsigned int i = 0; i < count; i++) ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("%d%s"),transport[i]->id(), + ACE_TEXT("%d%s"),transport[i]->id (), (i < (count -1) ? ", " : "]\n"))); } - // If we don't need to block for a transport just set the timeout to - // be zero. - ACE_Time_Value tmp_zero (ACE_Time_Value::zero); - if (!r->blocked_connect ()) + int result = -1; + if (r->blocked_connect ()) { - timeout = &tmp_zero; + result = this->active_connect_strategy_->wait (mev, timeout); + the_winner = 0; + } + else + { + errno = ETIME; } - - int result = this->active_connect_strategy_->wait (mev,timeout); - the_winner = 0; if (result != -1) { the_winner = mev->winner()->transport(); if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT("(%P|%t) Transport_Connector::") - ACE_TEXT("wait_for_connection_completion, ") - ACE_TEXT("transport [%d]\n"), - the_winner->id ())); + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) Transport_Connector::") + ACE_TEXT ("wait_for_connection_completion, ") + ACE_TEXT ("transport [%d]\n"), + the_winner->id ())); + } } else if (errno == ETIME) { @@ -616,10 +657,12 @@ TAO_Connector::wait_for_connection_completion ( // Report that making the connection failed, don't print errno // because we touched the reactor and errno could be changed if (TAO_debug_level > 2) - ACE_ERROR ((LM_ERROR, - ACE_TEXT ("(%P|%t) Transport_Connector::") - ACE_TEXT ("wait_for_connection_completion, failed\n") - )); + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) Transport_Connector::") + ACE_TEXT ("wait_for_connection_completion, failed\n") + )); + } return false; } @@ -630,11 +673,13 @@ TAO_Connector::wait_for_connection_completion ( if (r->blocked_connect () && !the_winner->is_connected ()) { if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - Transport_Connector::" - "wait_for_connection_completion, " - "no connected transport for a blocked connection, " - "cancelling connections and reverting things \n")); + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - Transport_Connector::" + "wait_for_connection_completion, " + "no connected transport for a blocked connection, " + "cancelling connections and reverting things \n")); + } // Forget the return value. We are busted anyway. Try our best // here. @@ -643,9 +688,9 @@ TAO_Connector::wait_for_connection_completion ( return false; } - // Connection may not ready for SYNC_NONE cases but we can use this - // transport, if we need a connected one we will block later to make - // sure it is connected + // Connection may not ready for SYNC_NONE and SYNC_DELAYED_BUFFERING cases + // but we can use this transport, if we need a connected one we will poll + // later to make sure it is connected return true; } diff --git a/TAO/tao/Transport_Queueing_Strategies.cpp b/TAO/tao/Transport_Queueing_Strategies.cpp index 4de33f760dd..fbf6595904a 100644 --- a/TAO/tao/Transport_Queueing_Strategies.cpp +++ b/TAO/tao/Transport_Queueing_Strategies.cpp @@ -90,7 +90,7 @@ namespace TAO must_flush = false; set_timer = false; - TAO_Buffering_Constraint_Policy *buffering_constraint_policy = 0; + TAO::BufferingConstraint buffering_constraint; ACE_TRY_NEW_ENV { @@ -99,18 +99,18 @@ namespace TAO ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - TAO::BufferingConstraintPolicy_var bcp = + TAO::BufferingConstraintPolicy_var bcpv = TAO::BufferingConstraintPolicy::_narrow (bcp_policy.in () ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - buffering_constraint_policy = - dynamic_cast (bcp.in ()); - - if (buffering_constraint_policy == 0) + TAO_Buffering_Constraint_Policy* bcp = + dynamic_cast (bcpv.in ()); + if (bcp == 0) { return true; } + bcp->get_buffering_constraint (buffering_constraint); } ACE_CATCHANY { @@ -118,8 +118,6 @@ namespace TAO } ACE_ENDTRY; - TAO::BufferingConstraint buffering_constraint; - buffering_constraint_policy->get_buffering_constraint (buffering_constraint); if (buffering_constraint.mode == TAO::BUFFER_FLUSH) { diff --git a/TAO/tests/AMI_Buffering/client.cpp b/TAO/tests/AMI_Buffering/client.cpp index eb5819492ca..2e05c13f86e 100644 --- a/TAO/tests/AMI_Buffering/client.cpp +++ b/TAO/tests/AMI_Buffering/client.cpp @@ -12,7 +12,7 @@ ACE_RCSID(AMI_Buffering, client, "$Id$") const char *server_ior = "file://server.ior"; const char *admin_ior = "file://admin.ior"; -int iterations = 200; +int iterations = 20; int run_message_count_test = 0; int run_timeout_test = 0; @@ -20,12 +20,12 @@ int run_timeout_reactive_test = 0; int run_buffer_size_test = 0; const int PAYLOAD_LENGTH = 1024; -const int BUFFERED_MESSAGES_COUNT = 50; -const unsigned int TIMEOUT_MILLISECONDS = 25; -const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH; +const int BUFFERED_MESSAGES_COUNT = 10; +const unsigned int TIMEOUT_MILLISECONDS = 50; +const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH; /// Allow a larger timeout to occur due to scheduler differences -const unsigned int TIMEOUT_TOLERANCE = 20 * TIMEOUT_MILLISECONDS; +const unsigned int TIMEOUT_TOLERANCE = 4 * TIMEOUT_MILLISECONDS; /// Check that no more than 10% of the messages are not sent. const double LIVENESS_TOLERANCE = 0.9; @@ -393,6 +393,10 @@ run_liveness_test (CORBA::ORB_ptr orb, ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + // Once the system has sent enough messages we don't // expect it to fall too far behind, i.e. at least 90% of the // messages should be delivered.... @@ -491,6 +495,10 @@ run_message_count (CORBA::ORB_ptr orb, ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + CORBA::ULong iteration_count = send_count - initial_receive_count; if (receive_count != initial_receive_count) @@ -605,6 +613,10 @@ run_timeout (CORBA::ORB_ptr orb, ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; if (receive_count != initial_receive_count) { @@ -720,8 +732,8 @@ run_timeout_reactive (CORBA::ORB_ptr orb, ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); - ACE_Time_Value sleep (0, 10000); - orb->run (sleep ACE_ENV_ARG_PARAMETER); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); ACE_CHECK_RETURN (-1); ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; @@ -838,6 +850,10 @@ run_buffer_size (CORBA::ORB_ptr orb, ami_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + CORBA::ULong payload_delta = bytes_sent - initial_bytes_received; if (bytes_received != initial_bytes_received) diff --git a/TAO/tests/AMI_Buffering/run_buffer_size.pl b/TAO/tests/AMI_Buffering/run_buffer_size.pl index 6f3f961b9d0..d8d013e7528 100755 --- a/TAO/tests/AMI_Buffering/run_buffer_size.pl +++ b/TAO/tests/AMI_Buffering/run_buffer_size.pl @@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/AMI_Buffering/run_message_count.pl b/TAO/tests/AMI_Buffering/run_message_count.pl index 030af52a919..dcb2c43bea8 100755 --- a/TAO/tests/AMI_Buffering/run_message_count.pl +++ b/TAO/tests/AMI_Buffering/run_message_count.pl @@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/AMI_Buffering/run_test.pl b/TAO/tests/AMI_Buffering/run_test.pl index 9bd93d690f4..4e682196753 100755 --- a/TAO/tests/AMI_Buffering/run_test.pl +++ b/TAO/tests/AMI_Buffering/run_test.pl @@ -40,7 +40,7 @@ foreach $test_type ("-c", "-t", "-b") { exit 1; } - $client = $CL->SpawnWaitKill (300); + $client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/AMI_Buffering/run_timeout.pl b/TAO/tests/AMI_Buffering/run_timeout.pl index 5e93a904afb..eb3580c12c5 100755 --- a/TAO/tests/AMI_Buffering/run_timeout.pl +++ b/TAO/tests/AMI_Buffering/run_timeout.pl @@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (600); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/AMI_Buffering/run_timeout_reactive.pl b/TAO/tests/AMI_Buffering/run_timeout_reactive.pl index d5fd87fe65c..061a524f4a6 100755 --- a/TAO/tests/AMI_Buffering/run_timeout_reactive.pl +++ b/TAO/tests/AMI_Buffering/run_timeout_reactive.pl @@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Buffering/client.cpp b/TAO/tests/Oneway_Buffering/client.cpp index a0ab26bed3e..0c4383016ef 100644 --- a/TAO/tests/Oneway_Buffering/client.cpp +++ b/TAO/tests/Oneway_Buffering/client.cpp @@ -13,7 +13,7 @@ ACE_RCSID(Oneway_Buffering, client, "$Id$") const char *server_ior = "file://server.ior"; const char *admin_ior = "file://admin.ior"; -int iterations = 200; +int iterations = 20; int run_message_count_test = 0; int run_timeout_test = 0; @@ -21,9 +21,9 @@ int run_timeout_reactive_test = 0; int run_buffer_size_test = 0; const int PAYLOAD_LENGTH = 1024; -const int BUFFERED_MESSAGES_COUNT = 50; +const int BUFFERED_MESSAGES_COUNT = 10; const unsigned int TIMEOUT_MILLISECONDS = 50; -const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH; +const int BUFFER_SIZE = 10 * PAYLOAD_LENGTH; /// Check that no more than 10% of the messages are not sent. const double LIVENESS_TOLERANCE = 0.9; @@ -316,7 +316,8 @@ sync_server (Test::Oneway_Buffering_ptr flusher } int -run_liveness_test (Test::Oneway_Buffering_ptr oneway_buffering, +run_liveness_test (CORBA::ORB_ptr orb, + Test::Oneway_Buffering_ptr oneway_buffering, Test::Oneway_Buffering_ptr flusher, Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin ACE_ENV_ARG_DECL) @@ -349,6 +350,10 @@ run_liveness_test (Test::Oneway_Buffering_ptr oneway_buffering, oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + // Once the system has sent enough messages we don't // expect it to fall too far behind, i.e. at least 90% of the // messages should be delivered.... @@ -436,6 +441,10 @@ run_message_count (CORBA::ORB_ptr orb, oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + CORBA::ULong iteration_count = send_count - initial_receive_count; if (receive_count != initial_receive_count) @@ -468,7 +477,8 @@ run_message_count (CORBA::ORB_ptr orb, } int liveness_test_failed = - run_liveness_test (oneway_buffering, + run_liveness_test (orb, + oneway_buffering, flusher.in (), oneway_buffering_admin ACE_ENV_ARG_PARAMETER); @@ -536,6 +546,10 @@ run_timeout (CORBA::ORB_ptr orb, oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; if (receive_count != initial_receive_count) { @@ -568,7 +582,7 @@ run_timeout (CORBA::ORB_ptr orb, } int liveness_test_failed = - run_liveness_test (oneway_buffering, + run_liveness_test (orb, oneway_buffering, flusher.in (), oneway_buffering_admin ACE_ENV_ARG_PARAMETER); @@ -639,8 +653,9 @@ run_timeout_reactive (CORBA::ORB_ptr orb, oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); - ACE_Time_Value sleep (0, 10000); - orb->run (sleep ACE_ENV_ARG_PARAMETER); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start; @@ -675,7 +690,7 @@ run_timeout_reactive (CORBA::ORB_ptr orb, } int liveness_test_failed = - run_liveness_test (oneway_buffering, + run_liveness_test (orb, oneway_buffering, flusher.in (), oneway_buffering_admin ACE_ENV_ARG_PARAMETER); @@ -743,6 +758,10 @@ run_buffer_size (CORBA::ORB_ptr orb, oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK_RETURN (-1); + ACE_Time_Value tv (0, 10 * 1000); + orb->run (tv ACE_ENV_ARG_PARAMETER); + ACE_CHECK_RETURN (-1); + CORBA::ULong payload_delta = bytes_sent - initial_bytes_received; if (bytes_received != initial_bytes_received) @@ -780,7 +799,7 @@ run_buffer_size (CORBA::ORB_ptr orb, } int liveness_test_failed = - run_liveness_test (oneway_buffering, + run_liveness_test (orb, oneway_buffering, flusher.in (), oneway_buffering_admin ACE_ENV_ARG_PARAMETER); diff --git a/TAO/tests/Oneway_Buffering/run_buffer_size.pl b/TAO/tests/Oneway_Buffering/run_buffer_size.pl index 3e8db4fdc66..4f3bd5c5037 100755 --- a/TAO/tests/Oneway_Buffering/run_buffer_size.pl +++ b/TAO/tests/Oneway_Buffering/run_buffer_size.pl @@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Buffering/run_message_count.pl b/TAO/tests/Oneway_Buffering/run_message_count.pl index 949a61e5c62..35e68a43b1a 100755 --- a/TAO/tests/Oneway_Buffering/run_message_count.pl +++ b/TAO/tests/Oneway_Buffering/run_message_count.pl @@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Buffering/run_test.pl b/TAO/tests/Oneway_Buffering/run_test.pl index b58853ec49c..39c47eb20e3 100755 --- a/TAO/tests/Oneway_Buffering/run_test.pl +++ b/TAO/tests/Oneway_Buffering/run_test.pl @@ -41,7 +41,7 @@ foreach $test_type ("-c", "-t", "-b", "-r") { exit 1; } - $client = $CL->SpawnWaitKill (300); + $client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Buffering/run_timeout.pl b/TAO/tests/Oneway_Buffering/run_timeout.pl index da28486e145..6518696565e 100755 --- a/TAO/tests/Oneway_Buffering/run_timeout.pl +++ b/TAO/tests/Oneway_Buffering/run_timeout.pl @@ -45,7 +45,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl index d5fd87fe65c..061a524f4a6 100755 --- a/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl +++ b/TAO/tests/Oneway_Buffering/run_timeout_reactive.pl @@ -44,7 +44,7 @@ if (PerlACE::waitforfile_timed ($server_iorfile, 10) == -1) { exit 1; } -$client = $CL->SpawnWaitKill (300); +$client = $CL->SpawnWaitKill (30); if ($client != 0) { print STDERR "ERROR: client returned $client\n"; diff --git a/TAO/tests/Oneway_Timeouts/Test.idl b/TAO/tests/Oneway_Timeouts/Test.idl new file mode 100644 index 00000000000..d076114e7c6 --- /dev/null +++ b/TAO/tests/Oneway_Timeouts/Test.idl @@ -0,0 +1,4 @@ +interface Tester { + oneway void test(in long id); + long test2(in long id); +}; diff --git a/TAO/tests/Oneway_Timeouts/client.cpp b/TAO/tests/Oneway_Timeouts/client.cpp new file mode 100644 index 00000000000..e928a1e232a --- /dev/null +++ b/TAO/tests/Oneway_Timeouts/client.cpp @@ -0,0 +1,488 @@ +#include "TestS.h" +#include "tao/Strategies/advanced_resource.h" +#include "tao/Messaging/Messaging.h" +#include "tao/AnyTypeCode/TAOA.h" +#include "tao/AnyTypeCode/Any.h" +#include "tao/IIOP_Connector.h" + +#include "ace/streams.h" +#include "ace/High_Res_Timer.h" +#include "ace/Arg_Shifter.h" + +using namespace CORBA; +using namespace PortableServer; + +namespace +{ + const char *non_existent_ior = "corbaloc:iiop:1.2@ociweb.com:12345/test"; + const int TIME_THRESHOLD = 50; //ms + + int request_timeout = 0; + Messaging::SyncScope sync_scope; + bool use_buf_constraints = false; + bool use_sync_scope = false; + int bc_mode = TAO::BUFFER_FLUSH; + int bc_count = 0; + int bc_bytes = 0; + int bc_timeout = 0; + int num_requests = 10; + int request_interval = 50; + int connect_timeout = 0; + int run_orb_delay = 0; + int run_orb_time = 0; + bool force_timeout = false; + // This will force a blocking connection before starting the test + // by sending the num_requests as a twoway. + bool force_connect = false; + bool use_sleep = false; + unsigned int max_request_time = 0; + bool use_twoway = false; + bool retry_transients = false; + bool retry_timeouts = false; + + void print_usage () + { + cout << "client [-request_timeout ms=0] [-connect_timeout ms=0] " + "[-request_interval ms=100]\n\t[-run_orb_delay ms=0] " + "[-run_orb_time ms=0] [-max_request_time ms=0]\n" + "\t[-num_requests n=10] [-use_twoway] [-retry_transients] " + "[-retry_timeouts]\n" + "\t[-use_sleep] [-force_timeout] [-force_connect] [-buffer_count n=0]\n" + "\t[-buffer_bytes n=0] [-buffer_timeout ms=0] [-sync delayed|eager|none]" + << endl; + } + + bool parse_command_line (int ac, char *av[]) + { + ACE_Arg_Shifter args (ac, av); + args.consume_arg (); + + while (args.is_anything_left ()) + { + if (args.cur_arg_strncasecmp ("-request_timeout") == 0) + { + args.consume_arg (); + request_timeout = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-connect_timeout") == 0) + { + args.consume_arg (); + connect_timeout = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-request_interval") == 0) + { + args.consume_arg (); + request_interval = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-run_orb_delay") == 0) + { + args.consume_arg (); + run_orb_delay = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-run_orb_time") == 0) + { + args.consume_arg (); + run_orb_time = ACE_OS::atoi(args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-max_request_time") == 0) + { + args.consume_arg (); + max_request_time = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-num_requests") == 0) + { + args.consume_arg (); + num_requests = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-use_twoway") == 0) + { + use_twoway = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-retry_transients") == 0) + { + retry_transients = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-retry_timeouts") == 0) + { + retry_timeouts = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-use_sleep") == 0) + { + use_sleep = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-force_timeout") == 0) + { + force_timeout = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-force_connect") == 0) + { + force_connect = true; + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-buffer_count") == 0) + { + args.consume_arg (); + use_buf_constraints = true; + bc_count = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-buffer_bytes") == 0) + { + args.consume_arg (); + use_buf_constraints = true; + bc_bytes = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-buffer_timeout") == 0) + { + args.consume_arg (); + use_buf_constraints = true; + bc_timeout = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-sync") == 0) + { + args.consume_arg (); + if (args.cur_arg_strncasecmp ("delayed") == 0) + { + sync_scope = TAO::SYNC_DELAYED_BUFFERING; + use_sync_scope = true; + } + else if (args.cur_arg_strncasecmp ("eager") == 0) + { + sync_scope = TAO::SYNC_EAGER_BUFFERING; + use_sync_scope = true; + } + else if (args.cur_arg_strncasecmp ("none") == 0) + { + sync_scope = Messaging::SYNC_NONE; + use_sync_scope = true; + } + else + { + print_usage (); + return false; + } + + args.consume_arg (); + } + else + { + cerr << "Error: Unknown argument \"" + << args.get_current () << "\"" << endl; + print_usage (); + return false; + } + + } + + return true; + } + + + POA_ptr create_poa (ORB_ptr orb) + { + POA_var poa; + PolicyList pols; + Object_var obj = orb->resolve_initial_references ("RootPOA"); + POA_var root = POA::_narrow (obj.in ()); + ACE_ASSERT (! is_nil (root.in ())); + POAManager_var man = root->the_POAManager (); + poa = root->create_POA ("X", man, pols); + return poa._retn (); + } + + + Tester_ptr set_request_timeout (Tester_ptr tst, ORB_ptr orb) + { + if (request_timeout <= 0) + { + return Tester::_duplicate (tst); + } + + Any a; + a <<= static_cast (request_timeout * 10000); + PolicyList pols (1); + pols.length (1); + pols[0] = + orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, a); + Object_var obj = tst->_set_policy_overrides (pols, ADD_OVERRIDE); + pols[0]->destroy (); + return Tester::_unchecked_narrow (obj.in ()); + } + + + void set_connect_timeout (ORB_ptr orb) + { + if (connect_timeout <= 0) + return; + Object_var obj = orb->resolve_initial_references ("PolicyCurrent"); + PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ()); + Any a; + a <<= static_cast (connect_timeout * 10000); + PolicyList pols (1); + pols.length (1); + pols[0] = orb->create_policy (TAO::CONNECTION_TIMEOUT_POLICY_TYPE, a); + policy_current->set_policy_overrides (pols, ADD_OVERRIDE); + pols[0]->destroy (); + } + + + void set_buffering (ORB_ptr orb) + { + Object_var obj = orb->resolve_initial_references ("PolicyCurrent"); + PolicyCurrent_var policy_current = PolicyCurrent::_narrow (obj.in ()); + PolicyList pols (1); + pols.length (1); + + if (use_sync_scope) + { + Any a; + a <<= sync_scope; + pols[0] = orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, a); + policy_current->set_policy_overrides (pols, ADD_OVERRIDE); + pols[0]->destroy (); + } + + + if (use_buf_constraints) + { + TAO::BufferingConstraint bc; + if (bc_count > 0) + { + bc_mode |= TAO::BUFFER_MESSAGE_COUNT; + } + + if (bc_bytes > 0) + { + bc_mode |= TAO::BUFFER_MESSAGE_BYTES; + } + + if (bc_timeout > 0) + { + bc_mode |= TAO::BUFFER_TIMEOUT; + } + + bc.mode = bc_mode; + bc.message_count = bc_count; + bc.message_bytes = bc_bytes; + bc.timeout = static_cast (bc_timeout * 10000); + Any a; + a <<= bc; + pols[0] = + orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, a); + policy_current->set_policy_overrides (pols, ADD_OVERRIDE); + pols[0]->destroy (); + } + + } + +} + + +int main (int ac, char *av[]) +{ + + ACE_Time_Value before = ACE_High_Res_Timer::gettimeofday_hr (); + + int num_requests_sent = 0; + + try + { + ORB_var orb = ORB_init (ac, av); + + if (!parse_command_line (ac, av)) + { + return 1; + } + + + set_connect_timeout (orb.in ()); + set_buffering (orb.in ()); + + ACE_CString ior ("file://server.ior"); + if (force_timeout) + { + ior = non_existent_ior; + } + + Object_var obj = orb->string_to_object (ior.c_str ()); + + ACE_ASSERT (! is_nil (obj.in ())); + + Tester_var tmp_tester = Tester::_unchecked_narrow (obj.in ()); + + Tester_var tester = set_request_timeout (tmp_tester.in (), orb.in ()); + + ACE_ASSERT (! is_nil (tester.in ())); + + Long i = 0; + + if (force_connect) + { + tester->test2 (-2); + cout << "Connected..." << endl; + } + + + for (; i < num_requests; ++i) + { + before = ACE_High_Res_Timer::gettimeofday_hr (); + try + { + if (use_twoway) + { + tester->test2 (i); + } + else + { + tester->test (i); + } + + } + catch (CORBA::TRANSIENT&) + { + cerr << "Transient exception during test () invocation " << i << endl; + if (! retry_transients) + { + throw; + } + + } + catch (CORBA::TIMEOUT&) + { + cerr << "Timeout exception during test () invocation " << i << endl; + if (! retry_timeouts) + { + throw; + } + + } + + ++num_requests_sent; + + ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr (); + if (max_request_time > 0 && + (after - before).msec () > max_request_time) + { + cerr << "Error : test () took " << (after - before).msec () + << endl; + return 1; + } + + + cout << 'c' << i << endl; + if (request_interval > 0) + { + ACE_Time_Value tv (0, request_interval * 1000); + ACE_Time_Value done = tv + + ACE_High_Res_Timer::gettimeofday_hr (); + if (! use_sleep) + { + orb->run (tv); + } + else + { + ACE_OS::sleep (tv); + } + + while (ACE_High_Res_Timer::gettimeofday_hr () < done) + { + ACE_OS::sleep (0); + } + } + } + + + if (run_orb_delay > 0) + { + ACE_Time_Value tv (0, run_orb_delay * 1000); + ACE_OS::sleep (tv); + } + + + if (run_orb_time > 0) + { + ACE_Time_Value tv (0, run_orb_time * 1000); + orb->run (tv); + } + + + // Let the server know we're finished. + tester->test2 (-1); + + orb->shutdown (1); + + orb->destroy (); + + if (force_timeout) + { + cerr << "Error: Connection did not timeout." << endl; + return 1; + } + + + return 0; + + } + catch (CORBA::TIMEOUT &ex) + { + if (force_timeout) + { + ACE_Time_Value after = ACE_High_Res_Timer::gettimeofday_hr (); + long ms = (after - before).msec (); + if ( (use_twoway || !use_sync_scope) + && request_timeout > 0 + && request_timeout < connect_timeout) + { + connect_timeout = request_timeout; + } + else if (use_sync_scope && !use_sleep) + { + if (ms > TIME_THRESHOLD) + { + cerr << "Error: Buffered request took " << ms << endl; + return 1; + } + + ms = num_requests_sent * request_interval; + } + + if (std::abs (ms - connect_timeout) > TIME_THRESHOLD) + { + cerr << "Error: Timeout expected in " << connect_timeout + << "ms, but took " << ms << "ms" << endl; + return 1; + } + + return 0; + } + else + { + cerr << "Error: Unexpected timeout\n" << ex << endl; + } + + } + catch (Exception &ex) + { + cerr << "client: " << ex << endl; + cerr << "\nLast operation took " + << (ACE_High_Res_Timer::gettimeofday_hr () - before).msec () + << "ms" + << endl; + } + + return 1; +} diff --git a/TAO/tests/Oneway_Timeouts/run_test.pl b/TAO/tests/Oneway_Timeouts/run_test.pl new file mode 100755 index 00000000000..2330e7906be --- /dev/null +++ b/TAO/tests/Oneway_Timeouts/run_test.pl @@ -0,0 +1,392 @@ +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +# $Id: run_test.pl,v 1.7 2005/01/27 16:38:29 elliott_c Exp $ +# -*- perl -*- + +############################################################################### +my $ACE_ROOT = $ENV{ACE_ROOT}; + +if (!defined $ACE_ROOT) { + print "Error: ACE_ROOT not defined.\n"; + return 1; +} + +use lib "$ENV{ACE_ROOT}/bin"; +use PerlACE::Run_Test; +use File::Copy; + +use strict; + +my $srv_ior = PerlACE::LocalFile ("server.ior"); +my $CLI = new PerlACE::Process ("client"); +my $SRV = new PerlACE::Process ("server"); +my $SRV_PORT = PerlACE::random_port(); +my $SRV_ARGS = "-orbobjrefstyle url -orbendpoint iiop://:$SRV_PORT"; + +sub test_timeouts +{ + print "test_timeouts 1 testing...\n"; + $CLI->Arguments("-force_timeout -connect_timeout 200"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 1 passed...\n"; + print "test_timeouts 2 testing...\n"; + # request timeout should override connect timeout + $CLI->Arguments("-force_timeout -request_timeout 100 -connect_timeout 200"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 2 passed...\n"; + print "test_timeouts 3 testing...\n"; + $CLI->Arguments("-use_twoway -force_timeout -connect_timeout 200"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 3 passed...\n"; + print "test_timeouts 4 testing...\n"; + # request timeout should override connect timeout + $CLI->Arguments("-use_twoway -force_timeout -request_timeout 200 -connect_timeout 1000"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 4 passed...\n"; + print "test_timeouts 5 testing...\n"; + # request_timeout ignored for other sync_scopes + $CLI->Arguments("-sync none -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 5 passed...\n"; + print "test_timeouts 6 testing...\n"; + $CLI->Arguments("-sync eager -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + print "test_timeouts 6 passed...\n"; + $CLI->Arguments("-sync delayed -force_timeout -request_timeout 100 -connect_timeout 200 -max_request_time 30"); + my $ret = $CLI->SpawnWaitKill(5); + if ($ret != 0) { + return $ret; + } + return $ret; +} + +sub test_buffering +{ + print "test_buffering 1 testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync none -max_request_time 30"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 1 passed...\n"; + print "test_buffering 2 testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync delayed -max_request_time 30"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 2 passed...\n"; + print "test_buffering 3 testing...\n"; + # Using sleep() instead of orb->run() for the interval + # should cause all requests to be sent at once. + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync none -max_request_time 30 -use_sleep -run_orb_time 500"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 3 passed...\n"; + print "test_buffering 4 testing...\n"; + # Even delayed buffering will work this way, because the + # connection won't be established until the orb is run. + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync delayed -max_request_time 30 -use_sleep -run_orb_time 500"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 4 passed...\n"; + print "test_buffering 5 testing...\n"; + # However, if we connect first, then delayed buffering will + # cause the data to be sent right away + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 400"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync delayed -max_request_time 30 -use_sleep -run_orb_time 500 -force_connect"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 5 passed...\n"; + print "test_buffering 6 testing...\n"; + # Forcing the connection won't help sync_none, because it always buffers + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync none -max_request_time 30 -use_sleep -run_orb_time 500 -force_connect"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffering 6 passed...\n"; + return 0; +} + +# Set a buffer count trigger and a request timeout so that a +# predictable number will be discarded. +sub test_buffer_count_timeout +{ + print "test_buffer_count_timeout testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 2"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync none -buffer_count 5 -max_request_time 30 -request_timeout 10"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffer_count_timeout passed...\n"; + return 0; +} + +sub test_buffer_bytes_timeout +{ + print "test_buffer_bytes_timeout testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 3"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + $CLI->Arguments("-sync none -buffer_bytes 200 -max_request_time 30 -request_timeout 10"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffer_bytes_timeout passed...\n"; + return 0; +} + +sub test_buffer_timeout +{ + print "test_buffer_timeout 1 testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50 -first_min 1000"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + # Must use run_orb_time so that the timer will fire, and to prevent sending the + # test_done twoway request which would flush the queue. + $CLI->Arguments("-sync none -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffer_timeout 1 passed...\n"; + print "test_buffer_timeout 2 testing...\n"; + # delayed buffering should behave as above, because it will start out buffering + # due to the connection not being established. + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_max 50 -first_min 1000"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + # Must use run_orb_time so that the timer will fire, and to prevent sending the + # test_done twoway request which would flush the queue. + $CLI->Arguments("-sync delayed -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffer_timeout 2 passed...\n"; + print "test_buffer_timeout 3 testing...\n"; + # delayed buffering will ignore constraints if the connection is forced + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 10 -elapsed_min 450 -first_max 50"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + # Must use run_orb_time so that the timer will fire, and to prevent sending the + # test_done twoway request which would flush the queue. + $CLI->Arguments("-sync delayed -force_connect -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_buffer_timeout 3 passed...\n"; + return 0; +} + +# test sending one request with buffering timeout constraint. +sub test_one_request +{ + print "test_one_request testing...\n"; + unlink $srv_ior; + $SRV->Arguments("$SRV_ARGS -expected 1 -first_min 1000"); + if ($SRV->Spawn() != 0) { + return 1; + } + if (PerlACE::waitforfile_timed ($srv_ior, 2) != 0) { + print STDERR "Error: IOR not found.\n"; + return 1; + } + # Must use run_orb_time so that the timer will fire, and to prevent sending the + # test_done twoway request which would flush the queue. + $CLI->Arguments("-sync none -buffer_timeout 1000 -max_request_time 30 -run_orb_time 1500 -num_requests 1"); + if ($CLI->SpawnWaitKill(5) != 0) { + print STDERR "Error: Client failed.\n"; + return 1; + } + if ($SRV->WaitKill(5) != 0) { + print STDERR "Error: Server failed.\n"; + return 1; + } + print "test_one_request passed...\n"; + return 0; +} + +sub run_test +{ + my $ret = shift; + if ($ret != 0) { + exit $ret; + } +} + +unlink $srv_ior; + +run_test(test_timeouts()); +run_test(test_buffering()); +run_test(test_buffer_count_timeout()); +run_test(test_buffer_bytes_timeout()); +run_test(test_buffer_timeout()); +run_test(test_one_request()); + +# Regardless of the return value, ensure that the processes +# are terminated before exiting +$CLI->Kill(); +$SRV->Kill(); + +unlink $srv_ior; +exit 0; diff --git a/TAO/tests/Oneway_Timeouts/server.cpp b/TAO/tests/Oneway_Timeouts/server.cpp new file mode 100644 index 00000000000..ac9e6dadee2 --- /dev/null +++ b/TAO/tests/Oneway_Timeouts/server.cpp @@ -0,0 +1,318 @@ +#include "TestS.h" + +#include "tao/Strategies/advanced_resource.h" +#include "tao/Messaging/Messaging.h" +#include "tao/AnyTypeCode/TAOA.h" +#include "tao/AnyTypeCode/Any.h" + +#include "ace/streams.h" +#include "ace/High_Res_Timer.h" +#include "ace/Reactor.h" + +const int TIME_THRESHOLD = 50; //ms + +int activate_delay = 0000; +int run_delay = 00; +int request_delay = 00; +int abort_after = 0; +int num_expected = 0; +int elapsed_max = 0; +int elapsed_min = 0; +int first_min = 0; +int first_max = 0; + +class Tester_i + : public virtual POA_Tester + , public virtual ACE_Event_Handler +{ +public: + Tester_i (CORBA::ORB_ptr orb) + : orb_ (orb) + , id1_ (0) + , id2_ (0) + , count_ (0) + , failed_ (false) + { + this->start_ = ACE_High_Res_Timer::gettimeofday_hr (); + } + + virtual ~Tester_i () + { + } + + virtual void test (CORBA::Long id) + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + testShared (id); + } + + virtual CORBA::Long test2 (CORBA::Long id) + ACE_THROW_SPEC ((::CORBA::SystemException)) + { + if (id == -2) + { + // Special id used to force a connect. Ignore. + this->start_ = ACE_High_Res_Timer::gettimeofday_hr (); + return id; + } + return testShared (id); + } + + int testShared (CORBA::Long id) + { + ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr (); + if (id == -1) + { + // Signals the end of a test run + if (num_expected > 0 && count_ != num_expected) + { + cerr << "Error: Expected " << num_expected + << ", but received " << count_ << endl; + this->failed_ = true; + } + long ms = (last_ - first_).msec (); + if (elapsed_max > 0 && ms > elapsed_max) + { + cerr << "Error: Expected < " << elapsed_max + << "ms, but was " << ms << "ms" << endl; + this->failed_ = true; + } + if (elapsed_min > 0 && ms < elapsed_min) + { + cerr << "Error: Expected > " << elapsed_min + << "ms, but was " << ms << "ms" << endl; + this->failed_ = true; + } + ms = (first_ - start_).msec (); + if (first_max > 0 && ms > first_max) + { + cerr << "Error: Expected first < " << first_max + << "ms, but was " << ms << "ms" << endl; + this->failed_ = true; + } + if (first_min > 0 && ms < first_min) + { + cerr << "Error: Expected first > " << first_min + << "ms, but was " << ms << "ms" << endl; + this->failed_ = true; + } + ACE_Time_Value timeout (0, 50 * 1000); + this->orb_->orb_core ()->reactor ()->schedule_timer (this, 0, timeout); + return id; + } + this->last_ = now; + if (id == 0) + { + this->first_ = now; + } + ++this->count_; + cout << 's' << id << endl; + if (abort_after > 0 && this->count_ >= abort_after) + { + cout << "\nAborting..." << endl; + ACE_OS::abort (); + } + if (request_delay > 0 && id == 0) + { + ACE_OS::sleep (ACE_Time_Value (0, 1000 * request_delay)); + } + return id; + } + + int handle_timeout (const ACE_Time_Value &, const void *) + { + this->orb_->shutdown (0); + return 0; + } + + bool failed () const { + return this->failed_; + } + +private: + CORBA::ORB_ptr orb_; + CORBA::Long id1_; + CORBA::Long id2_; + int count_; + bool failed_; + ACE_Time_Value start_; + ACE_Time_Value first_; + ACE_Time_Value last_; +}; + +#include "tao/Messaging/Messaging.h" +#include "tao/Strategies/advanced_resource.h" + +#include "ace/streams.h" +#include "ace/Log_Msg.h" +#include "ace/Arg_Shifter.h" + +using namespace CORBA; +using namespace PortableServer; + +namespace { + + void print_usage () + { + cout << "server [-activate_delay ms] [-run_delay ms] [-request_delay ms] " + "[-abort_after n]\n" + "\t[-expected n=0] [-elapsed_max ms=0] [-elapsed_min ms=0] " + "[-first_min ms=0]\n" + "\t[-first_max ms=0]\n" + "\tactivate_delay Millisecond delay before POAManager::activate.\n" + "\trun_delay Millisecond delay before ORB::run ().\n" + "\trequest_delay Millisecond delay within each servant request.\n" + "\tabort_after abort () after N requests.\n" << endl; + } + + bool parse_command_line (int ac, char *av[]) + { + ACE_Arg_Shifter args (ac, av); + args.consume_arg (); + while (args.is_anything_left ()) + { + if (args.cur_arg_strncasecmp ("-activate_delay") == 0) + { + args.consume_arg (); + activate_delay = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-run_delay") == 0) + { + args.consume_arg (); + run_delay = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-request_delay") == 0) + { + args.consume_arg (); + request_delay = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-expected") == 0) + { + args.consume_arg (); + num_expected = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-elapsed_max") == 0) + { + args.consume_arg (); + elapsed_max = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-elapsed_min") == 0) + { + args.consume_arg (); + elapsed_min = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-first_min") == 0) + { + args.consume_arg (); + first_min = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-first_max") == 0) + { + args.consume_arg (); + first_max = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else if (args.cur_arg_strncasecmp ("-abort_after") == 0) + { + args.consume_arg (); + abort_after = ACE_OS::atoi (args.get_current ()); + args.consume_arg (); + } + else + { + cerr << "Error: Unknown argument \"" + << args.get_current () << "\"" << endl; + print_usage (); + return false; + } + } + return true; + } + + void WriteIOR (const char *ior) + { + ofstream out ("server.ior"); + out << ior; + } + + POA_ptr create_poa (ORB_ptr orb) + { + PolicyList pols (2); + Object_var obj = orb->resolve_initial_references ("RootPOA"); + POA_var root = POA::_narrow (obj.in ()); + ACE_ASSERT (! is_nil (root.in ())); + pols.length (2); + pols[0] = root->create_id_assignment_policy (PortableServer::USER_ID); + pols[1] = root->create_lifespan_policy (PortableServer::PERSISTENT); + POAManager_var man = root->the_POAManager (); + POA_var poa = root->create_POA ("X", man.in (), pols); + return poa._retn (); + } + +} + +int main (int ac, char *av[]) +{ + try + { + ORB_var orb = ORB_init (ac, av); + + if (!parse_command_line (ac, av)) + { + return 1; + } + + POA_var poa = create_poa (orb.in ()); + ACE_ASSERT (! is_nil (poa.in ())); + + Tester_i svt (orb.in ()); + + ObjectId_var id = string_to_ObjectId ("tester"); + + poa->activate_object_with_id (id.in (), &svt); + Object_var obj = poa->id_to_reference (id.in ()); + String_var ior = orb->object_to_string (obj.in ()); + WriteIOR (ior.in ()); + + cout << "Servants registered and activated." << endl; + + if (activate_delay > 0) + { + ACE_OS::sleep (ACE_Time_Value (0, activate_delay * 1000)); + } + POAManager_var man = poa->the_POAManager (); + man->activate (); + + cout << "POAManager activated." << endl; + + if (run_delay > 0) + { + ACE_OS::sleep (ACE_Time_Value (0, run_delay * 1000)); + } + cout << "Running orb..." << endl; + + orb->run (); + + if (svt.failed ()) + { + return 1; + } + + return 0; + + } + catch (CORBA::Exception &ex) + { + ex._tao_print_exception ("server:"); + } + + return 1; + +} diff --git a/TAO/tests/Oneway_Timeouts/test.mpc b/TAO/tests/Oneway_Timeouts/test.mpc new file mode 100644 index 00000000000..bc999788463 --- /dev/null +++ b/TAO/tests/Oneway_Timeouts/test.mpc @@ -0,0 +1,26 @@ +project (*client) : taoexe, messaging, portableserver, strategies { + exename = client + after = *server + source_files { + client.cpp + TestC.cpp + } + header_files { + TestC.h + } + idl_files { + } +} + +project (*server) : taoexe, messaging, portableserver, strategies { + exename = server + source_files { + server.cpp + TestS.cpp + TestC.cpp + } + header_files { + TestS.h + TestC.h + } +} diff --git a/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp b/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp index 077d93b4596..2e2586f85e5 100644 --- a/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp +++ b/TAO/tests/Queued_Message_Test/Queued_Message_Test.cpp @@ -33,8 +33,7 @@ create_new_message (void) ACE_Message_Block mb (block_size); mb.wr_ptr (block_size); - return new TAO_Asynch_Queued_Message (&mb, TAO_ORB_Core_instance (), - 0, 1); + return new TAO_Asynch_Queued_Message (&mb, TAO_ORB_Core_instance (), 0, 0, 1); } /// Add a new message at the tail of the queue. diff --git a/TAO/tests/Timed_Buffered_Oneways/client.cpp b/TAO/tests/Timed_Buffered_Oneways/client.cpp index 9878f57d8b8..6593941db8b 100644 --- a/TAO/tests/Timed_Buffered_Oneways/client.cpp +++ b/TAO/tests/Timed_Buffered_Oneways/client.cpp @@ -117,123 +117,68 @@ parse_args (int argc, char **argv) return 0; } -void -setup_timeouts (CORBA::ORB_ptr orb - ACE_ENV_ARG_DECL) +test_ptr +setup_policies (CORBA::ORB_ptr orb, test_ptr object ACE_ENV_ARG_DECL) { - // Escape value. - if (timeout == -1) - return; - - // Obtain PolicyCurrent. - CORBA::Object_var object = orb->resolve_initial_references ("PolicyCurrent" - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - // Narrow down to correct type. - CORBA::PolicyCurrent_var policy_current = - CORBA::PolicyCurrent::_narrow (object.in () - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - TimeBase::TimeT rt_timeout = 10000 * timeout; - - CORBA::Any rt_timeout_any; - rt_timeout_any <<= rt_timeout; - - CORBA::PolicyList rt_timeout_policy_list (1); - rt_timeout_policy_list.length (1); - - rt_timeout_policy_list[0] = - orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, - rt_timeout_any - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - policy_current->set_policy_overrides (rt_timeout_policy_list, - CORBA::ADD_OVERRIDE - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - rt_timeout_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; -} - -void -setup_buffering_constraints (CORBA::ORB_ptr orb - ACE_ENV_ARG_DECL) -{ - // Obtain PolicyCurrent. - CORBA::Object_var object = orb->resolve_initial_references ("PolicyCurrent" - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - // Narrow down to correct type. - CORBA::PolicyCurrent_var policy_current = - CORBA::PolicyCurrent::_narrow (object.in () - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; + test_var object_with_policy; + CORBA::PolicyList policy_list (1); + if (timeout == -1) + { + object_with_policy = test::_duplicate (object); + } + else + { + policy_list.length (1); + TimeBase::TimeT rt_timeout = 10000 * timeout; + CORBA::Any rt_timeout_any; + rt_timeout_any <<= rt_timeout; + policy_list[0] = + orb->create_policy (Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE, + rt_timeout_any + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + CORBA::Object_var object_temp = + object->_set_policy_overrides (policy_list, + CORBA::ADD_OVERRIDE + ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + object_with_policy = test::_narrow (object_temp ACE_ENV_ARG_PARAMETER); + ACE_CHECK; + + policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_CHECK; + } - // Setup the sync scope policy, i.e., the ORB will buffer oneways. Messaging::SyncScope sync = eager_buffering ? TAO::SYNC_EAGER_BUFFERING : TAO::SYNC_DELAYED_BUFFERING; - // Setup the sync scope any. CORBA::Any sync_any; sync_any <<= sync; - // Setup the sync scope policy list. - CORBA::PolicyList sync_policy_list (1); - sync_policy_list.length (1); - - // Setup the sync scope policy. - sync_policy_list[0] = + policy_list.length (1); + policy_list[0] = orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE, sync_any ACE_ENV_ARG_PARAMETER); ACE_CHECK; - // Setup the sync scope. - policy_current->set_policy_overrides (sync_policy_list, - CORBA::ADD_OVERRIDE - ACE_ENV_ARG_PARAMETER); + CORBA::Object_var object_temp = + object_with_policy->_set_policy_overrides (policy_list, + CORBA::ADD_OVERRIDE + ACE_ENV_ARG_PARAMETER); ACE_CHECK; - // We are now done with this policy. - sync_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); + test_var object_with_two_policies = test::_narrow (object_temp + ACE_ENV_ARG_PARAMETER); ACE_CHECK; - // Flush buffers. - TAO::BufferingConstraint buffering_constraint; - buffering_constraint.mode = TAO::BUFFER_FLUSH; - buffering_constraint.message_count = 0; - buffering_constraint.message_bytes = 0; - buffering_constraint.timeout = 0; - - // Setup the buffering constraint any. - CORBA::Any buffering_constraint_any; - buffering_constraint_any <<= buffering_constraint; - - // Setup the buffering constraint policy list. - CORBA::PolicyList buffering_constraint_policy_list (1); - buffering_constraint_policy_list.length (1); - - // Setup the buffering constraint policy. - buffering_constraint_policy_list[0] = - orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE, - buffering_constraint_any - ACE_ENV_ARG_PARAMETER); + policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); ACE_CHECK; - // Setup the constraints. - policy_current->set_policy_overrides (buffering_constraint_policy_list, - CORBA::ADD_OVERRIDE - ACE_ENV_ARG_PARAMETER); - ACE_CHECK; - - // We are done with the policy. - buffering_constraint_policy_list[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER); - ACE_CHECK; + return object_with_two_policies._retn (); } int @@ -263,18 +208,14 @@ main (int argc, char **argv) ACE_TRY_CHECK; // Try to narrow the object reference to a reference. - test_var test_object = test::_narrow (object.in () - ACE_ENV_ARG_PARAMETER); - ACE_TRY_CHECK; - - // Setup buffering. - setup_buffering_constraints (orb.in () - ACE_ENV_ARG_PARAMETER); + test_var test_object_no_policy = test::_narrow (object.in () + ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; - // Setup timeout. - setup_timeouts (orb.in () - ACE_ENV_ARG_PARAMETER); + // Setup buffering and timeout + test_var test_object = setup_policies (orb.in (), + test_object_no_policy.in () + ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; test::data the_data (data_bytes); @@ -282,31 +223,44 @@ main (int argc, char **argv) for (CORBA::ULong i = 1; i <= iterations; ++i) { - ACE_DEBUG ((LM_DEBUG, - "client: Iteration %d @ %T\n", - i)); - + ACE_Time_Value start = ACE_OS::gettimeofday (); // Invoke the oneway method. test_object->method (i, + start.msec (), the_data, work ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; + ACE_Time_Value end = ACE_OS::gettimeofday (); + + ACE_DEBUG ((LM_DEBUG, + "client:\t%d took\t%dms\n", + i, (end - start).msec ())); + // Interval between successive calls. - ACE_Time_Value sleep_interval (0, - interval * 1000); + ACE_Time_Value sleep_interval (0, interval * 1000); - ACE_OS::sleep (sleep_interval); + // If we don't run the orb, then no data will be sent, and no + // connection will be made initially. + orb->run (sleep_interval ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; } - // Shutdown server. + test_object_no_policy->flush (ACE_ENV_SINGLE_ARG_PARAMETER); + ACE_TRY_CHECK; + + ACE_DEBUG ((LM_DEBUG, "client: Shutting down...\n")); if (shutdown_server) { - test_object->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER); + long now = ACE_OS::gettimeofday ().msec (); + test_object_no_policy->shutdown (now ACE_ENV_ARG_PARAMETER); ACE_TRY_CHECK; } + orb->shutdown (1 ACE_ENV_ARG_PARAMETER); + ACE_TRY_CHECK; + // Destroy the ORB. On some platforms, e.g., Win32, the socket // library is closed at the end of main(). This means that any // socket calls made after main() fail. Hence if we wait for diff --git a/TAO/tests/Timed_Buffered_Oneways/run_test.pl b/TAO/tests/Timed_Buffered_Oneways/run_test.pl index bf2638950c7..16ef5e32ab3 100755 --- a/TAO/tests/Timed_Buffered_Oneways/run_test.pl +++ b/TAO/tests/Timed_Buffered_Oneways/run_test.pl @@ -17,9 +17,13 @@ if (PerlACE::is_vxworks_test()) { $SV = new PerlACE::ProcessVX ("server", "-o server.ior"); } else { - $SV = new PerlACE::Process ("server", "-o $iorfile"); + $SV = new PerlACE::Process ("server", "-o $iorfile" +." -ORBDebugLevel 6 -ORBLogFile server.log" +); } -$CL = new PerlACE::Process ("client", "-k file://$iorfile -x"); +$CL = new PerlACE::Process ("client", "-k file://$iorfile -x" +." -ORBDebugLevel 6 -ORBLogFile client.log" +); $SV->Spawn (); @@ -30,16 +34,16 @@ if (PerlACE::waitforfile_timed ($iorfile, exit 1; } -$client = $CL->SpawnWaitKill (200); - +$client = $CL->SpawnWaitKill (120); if ($client != 0) { $time = localtime; print STDERR "ERROR: client returned $client at $time\n"; $status = 1; } -$server = $SV->WaitKill (100); +print "Client finished...\n"; +$server = $SV->WaitKill (10); if ($server != 0) { $time = localtime; print STDERR "ERROR: server returned $server at $time\n"; diff --git a/TAO/tests/Timed_Buffered_Oneways/server.cpp b/TAO/tests/Timed_Buffered_Oneways/server.cpp index 20af40930ea..19009369e80 100644 --- a/TAO/tests/Timed_Buffered_Oneways/server.cpp +++ b/TAO/tests/Timed_Buffered_Oneways/server.cpp @@ -101,7 +101,7 @@ main (int argc, char *argv[]) ACE_CATCHANY { ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, - "Exception caught:"); + "Server side exception caught:"); return -1; } ACE_ENDTRY; diff --git a/TAO/tests/Timed_Buffered_Oneways/test.idl b/TAO/tests/Timed_Buffered_Oneways/test.idl index ce512ae6fb2..3035dc7f5fd 100644 --- a/TAO/tests/Timed_Buffered_Oneways/test.idl +++ b/TAO/tests/Timed_Buffered_Oneways/test.idl @@ -6,8 +6,9 @@ interface test { typedef sequence data; oneway void method (in unsigned long request_number, + in long start_time, in data d, in unsigned long work); - - oneway void shutdown (); + void flush (); + oneway void shutdown (in long start_time); }; diff --git a/TAO/tests/Timed_Buffered_Oneways/test_i.cpp b/TAO/tests/Timed_Buffered_Oneways/test_i.cpp index 159c09067ec..54c6c393c88 100644 --- a/TAO/tests/Timed_Buffered_Oneways/test_i.cpp +++ b/TAO/tests/Timed_Buffered_Oneways/test_i.cpp @@ -2,6 +2,8 @@ #include "test_i.h" #include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_time.h" +#include "ace/Time_Value.h" ACE_RCSID(Timed_Buffered_Oneways, test_i, "$Id$") @@ -12,27 +14,40 @@ test_i::test_i (CORBA::ORB_ptr orb) void test_i::method (CORBA::ULong request_number, + CORBA::Long start_time, const test::data &, CORBA::ULong work ACE_ENV_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)) { + ACE_Time_Value start (0); + start.msec (start_time); ACE_DEBUG ((LM_DEBUG, - "server: Iteration %d @ %T\n", - request_number)); + "server:\t%d took\t%dms\n", + request_number, + (ACE_OS::gettimeofday () - start).msec ())); // Time required to process this request. is time units in // milli seconds. - ACE_Time_Value work_time (0, - work * 1000); + ACE_Time_Value work_time (0, work * 1000); ACE_OS::sleep (work_time); } void -test_i::shutdown (ACE_ENV_SINGLE_ARG_DECL) +test_i::flush (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)) { - this->orb_->shutdown (0 - ACE_ENV_ARG_PARAMETER); +} + +void +test_i::shutdown (CORBA::Long start_time ACE_ENV_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)) +{ + ACE_Time_Value start (0); + start.msec (start_time); + ACE_DEBUG ((LM_DEBUG, "server: Shutting down... (%dms)\n", + (ACE_OS::gettimeofday() - start).msec ())); + this->orb_->shutdown (0 ACE_ENV_ARG_PARAMETER); + ACE_CHECK; } diff --git a/TAO/tests/Timed_Buffered_Oneways/test_i.h b/TAO/tests/Timed_Buffered_Oneways/test_i.h index 48b20f4057f..ef92af5e074 100644 --- a/TAO/tests/Timed_Buffered_Oneways/test_i.h +++ b/TAO/tests/Timed_Buffered_Oneways/test_i.h @@ -1,3 +1,4 @@ +// -*- C++ -*- // $Id$ // ============================================================================ @@ -29,12 +30,16 @@ public: // = The test interface methods. void method (CORBA::ULong request_number, + CORBA::Long start_time, const test::data &, CORBA::ULong work ACE_ENV_ARG_DECL_NOT_USED) ACE_THROW_SPEC ((CORBA::SystemException)); - void shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED) + void flush (ACE_ENV_SINGLE_ARG_DECL) + ACE_THROW_SPEC ((CORBA::SystemException)); + + void shutdown (CORBA::Long start_time ACE_ENV_ARG_DECL) ACE_THROW_SPEC ((CORBA::SystemException)); private: -- cgit v1.2.1