diff options
Diffstat (limited to 'TAO/tao')
-rw-r--r-- | TAO/tao/Connector_Registry.cpp | 6 | ||||
-rw-r--r-- | TAO/tao/Connector_Registry.h | 4 | ||||
-rw-r--r-- | TAO/tao/Exception.cpp | 9 | ||||
-rw-r--r-- | TAO/tao/GIOP.cpp | 24 | ||||
-rw-r--r-- | TAO/tao/GIOP.h | 3 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connector.cpp | 13 | ||||
-rw-r--r-- | TAO/tao/IIOP_Connector.h | 4 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.cpp | 47 | ||||
-rw-r--r-- | TAO/tao/IIOP_Transport.h | 6 | ||||
-rw-r--r-- | TAO/tao/Invocation.cpp | 151 | ||||
-rw-r--r-- | TAO/tao/Invocation.h | 5 | ||||
-rw-r--r-- | TAO/tao/Pluggable.cpp | 4 | ||||
-rw-r--r-- | TAO/tao/Pluggable.h | 8 | ||||
-rw-r--r-- | TAO/tao/Reply_Dispatcher.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Transport_Mux_Strategy.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/Typecode_Constants.cpp | 2 | ||||
-rw-r--r-- | TAO/tao/UIOP_Connector.cpp | 13 | ||||
-rw-r--r-- | TAO/tao/UIOP_Connector.h | 4 | ||||
-rw-r--r-- | TAO/tao/UIOP_Transport.cpp | 45 | ||||
-rw-r--r-- | TAO/tao/UIOP_Transport.h | 6 | ||||
-rw-r--r-- | TAO/tao/Wait_Strategy.cpp | 71 | ||||
-rw-r--r-- | TAO/tao/Wait_Strategy.h | 25 | ||||
-rw-r--r-- | TAO/tao/corbafwd.h | 3 |
23 files changed, 323 insertions, 134 deletions
diff --git a/TAO/tao/Connector_Registry.cpp b/TAO/tao/Connector_Registry.cpp index af598dc18ae..454b8b9d7bf 100644 --- a/TAO/tao/Connector_Registry.cpp +++ b/TAO/tao/Connector_Registry.cpp @@ -115,16 +115,16 @@ TAO_Connector_Registry::preconnect (TAO_EndpointSet &preconnections) int TAO_Connector_Registry::connect (TAO_Profile *&profile, - TAO_Transport *&transport) + TAO_Transport *&transport, + ACE_Time_Value *max_wait_time) { - // Find the appropriate connector object TAO_Connector *connector = this->get_connector (profile->tag ()); if (connector == 0) return -1; - return connector->connect (profile, transport); + return connector->connect (profile, transport, max_wait_time); } int diff --git a/TAO/tao/Connector_Registry.h b/TAO/tao/Connector_Registry.h index 43d2eb4c9bc..12e182b8702 100644 --- a/TAO/tao/Connector_Registry.h +++ b/TAO/tao/Connector_Registry.h @@ -76,7 +76,9 @@ public: // For this list of preconnections call the connector specific // preconnect method for each preconnection. - int connect (TAO_Profile *&profile, TAO_Transport *&transport); + int connect (TAO_Profile *&profile, + TAO_Transport *&transport, + ACE_Time_Value *max_wait_time = 0); // This is where the transport protocol is selected based on some // policy. This member will call the connect member of the // TAO_Connector class which in turn will call the concrete diff --git a/TAO/tao/Exception.cpp b/TAO/tao/Exception.cpp index 87c9f611f17..8e1f7708d83 100644 --- a/TAO/tao/Exception.cpp +++ b/TAO/tao/Exception.cpp @@ -351,6 +351,15 @@ CORBA_SystemException::_info (void) const case TAO_MPROFILE_CREATION_ERROR: location = "error during MProfile creation"; break; + case TAO_TIMEOUT_CONNECT_MINOR_CODE: + location = "timeout during connect"; + break; + case TAO_TIMEOUT_SEND_MINOR_CODE: + location = "timeout during send"; + break; + case TAO_TIMEOUT_RECV_MINOR_CODE: + location = "timeout during recv"; + break; default: location = "unknown location"; } diff --git a/TAO/tao/GIOP.cpp b/TAO/tao/GIOP.cpp index 50fdc7e039a..e1468cfb851 100644 --- a/TAO/tao/GIOP.cpp +++ b/TAO/tao/GIOP.cpp @@ -67,8 +67,11 @@ ACE_RCSID(tao, GIOP, "$Id$") static const char *TAO_GIOP_Timeprobe_Description[] = { - "GIOP::send_request - start", - "GIOP::send_request - end", + "GIOP::send_message - start", + "GIOP::send_message - end", + + "GIOP::recv_message - start", + "GIOP::recv_message - end", "GIOP::read_buffer - start", "GIOP::read_buffer - end", @@ -80,8 +83,8 @@ static const char *TAO_GIOP_Timeprobe_Description[] = enum { // Timeprobe description table start key - TAO_GIOP_SEND_REQUEST_START = 100, - TAO_GIOP_SEND_REQUEST_END, + TAO_GIOP_SEND_MESSAGE_START = 100, + TAO_GIOP_SEND_MESSAGE_END, TAO_GIOP_RECV_MESSAGE_START, TAO_GIOP_RECV_MESSAGE_END, @@ -95,7 +98,7 @@ enum // Setup Timeprobes ACE_TIMEPROBE_EVENT_DESCRIPTIONS (TAO_GIOP_Timeprobe_Description, - TAO_GIOP_SEND_REQUEST_START); + TAO_GIOP_SEND_MESSAGE_START); #endif /* ACE_ENABLE_TIMEPROBES */ @@ -382,10 +385,11 @@ TAO_GIOP::write_locate_request_header (CORBA::ULong request_id, int TAO_GIOP::send_message (TAO_Transport *transport, TAO_OutputCDR &stream, - TAO_ORB_Core *orb_core) + TAO_ORB_Core *orb_core, + ACE_Time_Value *max_wait_time) { - TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_SEND_REQUEST_START); + TAO_FUNCTION_PP_TIMEPROBE (TAO_GIOP_SEND_MESSAGE_START); // Ptr to first buffer. char *buf = (char *) stream.buffer (); @@ -447,7 +451,7 @@ TAO_GIOP::send_message (TAO_Transport *transport, stream.length ()); // This guarantees to send all data (bytes) or return an error. - ssize_t n = transport->send (stream.begin ()); + ssize_t n = transport->send (stream.begin (), max_wait_time); if (n == -1) { @@ -455,7 +459,7 @@ TAO_GIOP::send_message (TAO_Transport *transport, ACE_DEBUG ((LM_DEBUG, "TAO: (%P|%t) closing conn %d after fault %p\n", transport->handle (), - "GIOP::send_request ()")); + "GIOP::send_message ()")); return -1; } @@ -465,7 +469,7 @@ TAO_GIOP::send_message (TAO_Transport *transport, { if (TAO_orbdebug) ACE_DEBUG ((LM_DEBUG, - "TAO: (%P|%t) GIOP::send_request () " + "TAO: (%P|%t) GIOP::send_message () " "EOF, closing conn %d\n", transport->handle())); return -1; diff --git a/TAO/tao/GIOP.h b/TAO/tao/GIOP.h index 42fbb6099c0..7582c0b3519 100644 --- a/TAO/tao/GIOP.h +++ b/TAO/tao/GIOP.h @@ -434,7 +434,8 @@ public: static int send_message (TAO_Transport *transport, TAO_OutputCDR &stream, - TAO_ORB_Core* orb_core); + TAO_ORB_Core* orb_core, + ACE_Time_Value *max_wait_time = 0); // Send message, returns TRUE if success, else FALSE. static void dump_msg (const char *label, diff --git a/TAO/tao/IIOP_Connector.cpp b/TAO/tao/IIOP_Connector.cpp index 5e198272ea4..16a595e06b9 100644 --- a/TAO/tao/IIOP_Connector.cpp +++ b/TAO/tao/IIOP_Connector.cpp @@ -104,7 +104,8 @@ TAO_IIOP_Connector::close (void) int TAO_IIOP_Connector::connect (TAO_Profile *profile, - TAO_Transport *& transport) + TAO_Transport *& transport, + ACE_Time_Value *max_wait_time) { if (profile->tag () != TAO_IOP_TAG_INTERNET_IOP) return -1; @@ -117,6 +118,13 @@ TAO_IIOP_Connector::connect (TAO_Profile *profile, const ACE_INET_Addr &oa = iiop_profile->object_addr (); + ACE_Synch_Options synch_options; + if (max_wait_time != 0) + { + synch_options.set (ACE_Synch_Options::USE_TIMEOUT, + *max_wait_time); + } + TAO_IIOP_Client_Connection_Handler* result; // the connect call will set the hint () stored in the Profile @@ -126,7 +134,8 @@ TAO_IIOP_Connector::connect (TAO_Profile *profile, errno = 0; if (this->base_connector_.connect (iiop_profile->hint (), result, - oa) == -1) + oa, + synch_options) == -1) { // Give users a clue to the problem. if (TAO_orbdebug) { diff --git a/TAO/tao/IIOP_Connector.h b/TAO/tao/IIOP_Connector.h index 1e5b3cee501..39265a10519 100644 --- a/TAO/tao/IIOP_Connector.h +++ b/TAO/tao/IIOP_Connector.h @@ -73,7 +73,9 @@ public: // Pluggable.h int open (TAO_ORB_Core *orb_core); int close (void); - int connect (TAO_Profile *profile, TAO_Transport *&transport); + int connect (TAO_Profile *profile, + TAO_Transport *&transport, + ACE_Time_Value *max_wait_time); int preconnect (const char *preconnections); TAO_Profile *create_profile (TAO_InputCDR& cdr); diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp index bd25a716223..71c44e58788 100644 --- a/TAO/tao/IIOP_Transport.cpp +++ b/TAO/tao/IIOP_Transport.cpp @@ -200,7 +200,8 @@ TAO_IIOP_Client_Transport::start_locate (TAO_ORB_Core *orb_core, int TAO_IIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int two_way) + int two_way, + ACE_Time_Value *max_wait_time) { ACE_FUNCTION_TIMEPROBE (TAO_IIOP_CLIENT_TRANSPORT_SEND_REQUEST_START); @@ -210,7 +211,8 @@ TAO_IIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core, return TAO_GIOP::send_message (this, stream, - orb_core); + orb_core, + max_wait_time); } // Return 0, when the reply is not read fully, 1 if it is read fully. @@ -293,11 +295,14 @@ TAO_IIOP_Client_Transport::handle_client_input (int /* block */) return -1; } - if (this->tms_->dispatch_reply (request_id, - reply_status, - message_state->giop_version, - reply_ctx, - message_state) != 0) + result = + this->tms_->dispatch_reply (request_id, + reply_status, + message_state->giop_version, + reply_ctx, + message_state); + + if (result == -1) { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, @@ -308,12 +313,17 @@ TAO_IIOP_Client_Transport::handle_client_input (int /* block */) return -1; } + if (result == 0) + { + message_state->reset (); + return 0; + } + // This is a NOOP for the Exclusive request case, but it actually // destroys the stream in the muxed case. this->tms_->destroy_message_state (message_state); - // Return something to indicate the reply is received. - return 1; + return result; } int @@ -386,7 +396,8 @@ TAO_IIOP_Client_Transport::check_unexpected_data (void) // ********************************************************************* ssize_t -TAO_IIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *) +TAO_IIOP_Transport::send (const ACE_Message_Block *mblk, + ACE_Time_Value *max_wait_time) { TAO_FUNCTION_PP_TIMEPROBE (TAO_IIOP_TRANSPORT_SEND_START); @@ -417,9 +428,16 @@ TAO_IIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *) // we should set IOV_MAX to that limit. if (iovcnt == IOV_MAX) { - n = this->handler_->peer ().sendv_n ((const iovec *) iov, - iovcnt); - if (n < 1) + if (max_wait_time == 0) + n = this->handler_->peer ().sendv_n ((const iovec *) iov, + iovcnt); + else + n = ACE::writev (this->handler_->peer ().get_handle (), + (const iovec*)iov, + iovcnt, + max_wait_time); + + if (n <= 0) return n; nbytes += n; @@ -500,7 +518,8 @@ TAO_IIOP_Transport::recv (iovec *iov, int TAO_IIOP_Transport::send_request (TAO_ORB_Core * /* orb_core */, TAO_OutputCDR & /* stream */, - int /* twoway */) + int /* twoway */, + ACE_Time_Value * /* max_wait_time */) { return -1; } diff --git a/TAO/tao/IIOP_Transport.h b/TAO/tao/IIOP_Transport.h index d8780b16e67..80d51dd6abd 100644 --- a/TAO/tao/IIOP_Transport.h +++ b/TAO/tao/IIOP_Transport.h @@ -94,7 +94,8 @@ public: virtual int send_request (TAO_ORB_Core *orb_core , TAO_OutputCDR &stream, - int twoway); + int twoway, + ACE_Time_Value *max_wait_time); // Default action to be taken for send request. protected: @@ -146,7 +147,8 @@ public: int send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway); + int twoway, + ACE_Time_Value *max_wait_time); // This is a bridge method for the connection handlers // <send_request> method. The connection handler is responsible for // concurrency strategies, typically using the leader-follower diff --git a/TAO/tao/Invocation.cpp b/TAO/tao/Invocation.cpp index a44048cf51c..cd43fe56264 100644 --- a/TAO/tao/Invocation.cpp +++ b/TAO/tao/Invocation.cpp @@ -79,7 +79,8 @@ TAO_GIOP_Invocation::TAO_GIOP_Invocation (TAO_Stub *stub, orb_core->to_iso8859 (), orb_core->to_unicode ()), orb_core_ (orb_core), - transport_ (0) + transport_ (0), + max_wait_time_ (0) { } @@ -141,28 +142,32 @@ TAO_GIOP_Invocation::start (CORBA::Environment &ACE_TRY_ENV) // So the invocation Object should handle policy decisions. #if defined (TAO_HAS_CORBA_MESSAGING) -#if 0 // @@ TODO implement once PP are merged in POA_Messaging::RelativeRoundtripTimeoutPolicy* timeout = this->stub_->relative_roundtrip_timeout (); - if (TAO_debug_level > 0) + // If max_wait_time is not zero then this is not the first attempt + // to send the request, the timeout value includes *all* those + // attempts. + if (this->max_wait_time_ == 0 + && timeout != 0) { - if (timeout == 0) - ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) Timeout is nil\n")); - else + TimeBase::TimeT microseconds = + timeout->relative_expiry (ACE_TRY_ENV) / 10; + ACE_CHECK; + this->max_wait_time_value_.set (microseconds / 1000000, + microseconds % 1000000); + this->max_wait_time_ = &this->max_wait_time_value_; + if (TAO_debug_level > 0) { - TimeBase::TimeT expiry = - timeout->relative_expiry (ACE_TRY_ENV); - ACE_CHECK; CORBA::ULong msecs = - ACE_static_cast(CORBA::ULong, expiry / 10000); + ACE_static_cast(CORBA::ULong, microseconds / 1000); ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) Timeout is <%u>\n", msecs)); } } -#endif /* 0 */ #endif /* TAO_HAS_CORBA_MESSAGING */ + ACE_Countdown_Time countdown (this->max_wait_time_); // Loop until a connection is established or there aren't any more // profiles to try. for (;;) @@ -175,10 +180,23 @@ TAO_GIOP_Invocation::start (CORBA::Environment &ACE_TRY_ENV) if (this->transport_ != 0) this->transport_->idle (); - int result = conn_reg->connect (this->profile_, this->transport_); + countdown.update (); + int result = conn_reg->connect (this->profile_, + this->transport_, + this->max_wait_time_); + countdown.update (); if (result == 0) break; + if (errno == ETIME) + { + ACE_THROW (CORBA::TIMEOUT ( + CORBA_SystemException::_tao_minor_code ( + TAO_TIMEOUT_CONNECT_MINOR_CODE, + errno), + CORBA::COMPLETED_NO)); + } + // Try moving to the next profile and starting over, if that // fails then we must raise the TRANSIENT exception. if (this->stub_->next_profile_retry () == 0) @@ -192,6 +210,8 @@ TAO_GIOP_Invocation::start (CORBA::Environment &ACE_TRY_ENV) // Obtain unique request id from the RMS. this->request_id_ = this->transport_->request_id (); + countdown.update (); + ACE_TIMEPROBE (TAO_GIOP_INVOCATION_START_REQUEST_HDR); } @@ -201,6 +221,7 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, CORBA::Environment &ACE_TRY_ENV) ACE_THROW_SPEC ((CORBA::SystemException)) { + ACE_Countdown_Time countdown (this->max_wait_time_); if (this->transport_ == 0) ACE_THROW_RETURN (CORBA::INTERNAL (), @@ -212,10 +233,13 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, // Even for oneways: with AMI it is possible to wait for a // response (empty) for oneways, just to make sure that they // arrive, there are policies to control that. + countdown.update (); int result = this->transport_->send_request (this->orb_core_, this->out_stream_, - is_roundtrip); + is_roundtrip, + this->max_wait_time_); + countdown.update (); // // @@ highly desirable to know whether we wrote _any_ data; if @@ -233,6 +257,15 @@ TAO_GIOP_Invocation::invoke (CORBA::Boolean is_roundtrip, if (result == -1) { + if (errno == ETIME) + { + ACE_THROW_RETURN (CORBA::TIMEOUT ( + CORBA_SystemException::_tao_minor_code ( + TAO_TIMEOUT_SEND_MINOR_CODE, + errno), + CORBA::COMPLETED_NO), + TAO_INVOKE_EXCEPTION); + } this->transport_->close_connection (); this->transport_ = 0; @@ -412,7 +445,8 @@ TAO_GIOP_Twoway_Invocation::invoke (CORBA::ExceptionList &exceptions, // failed, but the connection seems to be still // valid! // this->transport_->close_connection (); - ACE_THROW_RETURN (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES), + ACE_THROW_RETURN (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES), TAO_INVOKE_EXCEPTION); } @@ -442,7 +476,8 @@ TAO_GIOP_Twoway_Invocation::invoke (CORBA::ExceptionList &exceptions, CORBA_Exception *exception; ACE_NEW_THROW_EX (exception, CORBA_UnknownUserException (any), - CORBA::NO_MEMORY (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES)); + CORBA::NO_MEMORY (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES)); ACE_CHECK_RETURN (TAO_INVOKE_EXCEPTION); // @@ Think about a better way to raise the exception here, @@ -457,7 +492,8 @@ TAO_GIOP_Twoway_Invocation::invoke (CORBA::ExceptionList &exceptions, // @@ It would seem like if the remote exception is a // UserException we can assume that the request was // completed. - ACE_THROW_RETURN (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES), + ACE_THROW_RETURN (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES), TAO_INVOKE_EXCEPTION); } @@ -498,7 +534,8 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts, // failed, but the connection seems to be still // valid! // this->transport_->close_connection (); - ACE_THROW_RETURN (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES), + ACE_THROW_RETURN (CORBA::MARSHAL (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES), TAO_INVOKE_EXCEPTION); } @@ -517,7 +554,8 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts, CORBA::Exception_ptr exception = excepts[i].alloc (); if (exception == 0) - ACE_THROW_RETURN (CORBA::NO_MEMORY (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES), + ACE_THROW_RETURN (CORBA::NO_MEMORY (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES), TAO_INVOKE_EXCEPTION); this->inp_stream ().decode (exception->_type (), @@ -539,7 +577,8 @@ TAO_GIOP_Twoway_Invocation::invoke (TAO_Exception_Data *excepts, // If we couldn't find the right exception, report it as // CORBA::UNKNOWN. - ACE_THROW_RETURN (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_YES), + ACE_THROW_RETURN (CORBA::UNKNOWN (TAO_DEFAULT_MINOR_CODE, + CORBA::COMPLETED_YES), TAO_INVOKE_EXCEPTION); } @@ -576,21 +615,6 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) // there is only one client thread that ever uses this connection, // so most response messages are illegal. // - // THREADING NOTE: to make more efficient use of connection - // resources, we'd multiplex I/O on connections. For example, one - // thread would write its GIOP::Request (or GIOP::LocateRequest etc) - // message and block for the response, then another would do the - // same thing. When a response came back, it would be handed to the - // thread which requested it. - // - // Currently the connection manager doesn't support such fine - // grained connection locking, and also this server implementation - // wouldn't take advantage of that potential concurrency in requests - // either. There are often performance losses coming from - // fine-grained locks being used inappropriately; there's some - // evidence that locking at the level of requests loses on at least - // some platforms. - // // @@ In all MT environments, there's a cancellation point lurking // here; need to investigate. Client threads would frequently be // canceled sometime during recv_request ... the correct action to @@ -612,17 +636,46 @@ TAO_GIOP_Twoway_Invocation::invoke_i (CORBA::Environment &ACE_TRY_ENV) // Wait for the reply. - int reply_error = this->transport_->wait_for_reply (); + if (TAO_debug_level > 0 && this->max_wait_time_ != 0) + { + CORBA::ULong msecs = + this->max_wait_time_->msec (); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) Timeout on recv is <%u>\n", + msecs)); + } + int reply_error = + this->transport_->wait_for_reply (this->max_wait_time_); + if (TAO_debug_level > 0 && this->max_wait_time_ != 0) + { + CORBA::ULong msecs = + this->max_wait_time_->msec (); + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) Timeout after recv is <%u> status <%d>\n", + msecs, reply_error)); + } if (reply_error == -1) { + if (errno == ETIME) + { + // Just a timeout, don't close the connection or + // anything... + ACE_THROW_RETURN (CORBA::TIMEOUT ( + CORBA_SystemException::_tao_minor_code ( + TAO_TIMEOUT_SEND_MINOR_CODE, + errno), + CORBA::COMPLETED_NO), + TAO_INVOKE_EXCEPTION); + } + this->close_connection (); ACE_THROW_RETURN (CORBA::COMM_FAILURE ( - CORBA_SystemException::_tao_minor_code ( - TAO_INVOCATION_RECV_REQUEST_MINOR_CODE, - errno), - CORBA::COMPLETED_MAYBE), - TAO_INVOKE_EXCEPTION); + CORBA_SystemException::_tao_minor_code ( + TAO_INVOCATION_RECV_REQUEST_MINOR_CODE, + errno), + CORBA::COMPLETED_MAYBE), + TAO_INVOKE_EXCEPTION); } // @@ Alex: the old version of this had some error handling code, @@ -772,7 +825,8 @@ TAO_GIOP_Locate_Request_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) int result = this->transport_->send_request (this->orb_core_, this->out_stream_, - 1); + 1, + this->max_wait_time_); if (result == -1) @@ -801,10 +855,23 @@ TAO_GIOP_Locate_Request_Invocation::invoke (CORBA::Environment &ACE_TRY_ENV) // Wait for the reply. - int reply_error = this->transport_->wait_for_reply (); + int reply_error = + this->transport_->wait_for_reply (this->max_wait_time_); if (reply_error == -1) { + if (errno == ETIME) + { + // Just a timeout, don't close the connection or + // anything... + ACE_THROW_RETURN (CORBA::TIMEOUT ( + CORBA_SystemException::_tao_minor_code ( + TAO_TIMEOUT_SEND_MINOR_CODE, + errno), + CORBA::COMPLETED_NO), + TAO_INVOKE_EXCEPTION); + } + this->close_connection (); ACE_THROW_RETURN (CORBA::COMM_FAILURE (TAO_DEFAULT_MINOR_CODE, CORBA::COMPLETED_MAYBE), diff --git a/TAO/tao/Invocation.h b/TAO/tao/Invocation.h index b28d47f5650..90816bd6fc0 100644 --- a/TAO/tao/Invocation.h +++ b/TAO/tao/Invocation.h @@ -140,6 +140,11 @@ protected: TAO_Profile *profile_; // This invocation is using this transport, may change... + + ACE_Time_Value max_wait_time_value_; + ACE_Time_Value *max_wait_time_; + // The timeout remaining for this request, it is initialized in + // start() and updated as required. }; class TAO_Export TAO_GIOP_Twoway_Invocation : public TAO_GIOP_Invocation diff --git a/TAO/tao/Pluggable.cpp b/TAO/tao/Pluggable.cpp index 5800c7b3694..025aa870587 100644 --- a/TAO/tao/Pluggable.cpp +++ b/TAO/tao/Pluggable.cpp @@ -232,9 +232,9 @@ TAO_Transport::register_handler (void) } int -TAO_Transport::wait_for_reply (void) +TAO_Transport::wait_for_reply (ACE_Time_Value *max_wait_time) { - return this->ws_->wait (); + return this->ws_->wait (max_wait_time); } void diff --git a/TAO/tao/Pluggable.h b/TAO/tao/Pluggable.h index 946bc0c635e..a40ea1b0449 100644 --- a/TAO/tao/Pluggable.h +++ b/TAO/tao/Pluggable.h @@ -137,7 +137,8 @@ public: virtual int send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway) = 0; + int twoway, + ACE_Time_Value *max_time_wait) = 0; // Default action to be taken for send request. // = Get and set methods for the ORB Core. @@ -166,7 +167,7 @@ public: TAO_Reply_Dispatcher *rd); // Bind the reply dispatcher with the TMS object. - virtual int wait_for_reply (void); + virtual int wait_for_reply (ACE_Time_Value *max_wait_time); // Wait for the reply depending on the strategy. virtual int handle_client_input (int block = 0); @@ -413,7 +414,8 @@ public: // Shutdown Connector bridge and concreate Connector. virtual int connect (TAO_Profile *profile, - TAO_Transport *&) = 0; + TAO_Transport *&, + ACE_Time_Value *max_wait_time) = 0; // To support pluggable we need to abstract away the connect() // method so it can be called from the GIOP code independant of the // actual transport protocol in use. diff --git a/TAO/tao/Reply_Dispatcher.cpp b/TAO/tao/Reply_Dispatcher.cpp index 9ea95c9c172..e819776faab 100644 --- a/TAO/tao/Reply_Dispatcher.cpp +++ b/TAO/tao/Reply_Dispatcher.cpp @@ -63,7 +63,7 @@ TAO_Synch_Reply_Dispatcher::dispatch_reply (CORBA::ULong reply_status, TAO_GIOP_ServiceContext* context_list = reply_ctx.get_buffer (1); this->reply_ctx_.replace (max, len, context_list, 1); - return 0; + return 1; } TAO_GIOP_Message_State * diff --git a/TAO/tao/Transport_Mux_Strategy.cpp b/TAO/tao/Transport_Mux_Strategy.cpp index f0bc8eeb9a9..647cef53e67 100644 --- a/TAO/tao/Transport_Mux_Strategy.cpp +++ b/TAO/tao/Transport_Mux_Strategy.cpp @@ -119,7 +119,7 @@ TAO_Exclusive_TMS::dispatch_reply (CORBA::ULong request_id, ACE_DEBUG ((LM_DEBUG, "TAO_Exclusive_TMS::dispatch_reply - <%d != %d>\n", this->request_id_, request_id)); - return -1; + return 0; } TAO_Reply_Dispatcher *rd = this->rd_; diff --git a/TAO/tao/Typecode_Constants.cpp b/TAO/tao/Typecode_Constants.cpp index a5613e25137..25dacc3b002 100644 --- a/TAO/tao/Typecode_Constants.cpp +++ b/TAO/tao/Typecode_Constants.cpp @@ -1897,6 +1897,8 @@ TAO_TypeCodes::fini (void) CORBA::release (CORBA::_tc_RepositoryId); + CORBA::release (CORBA::ORB::_tc_InvalidName); + CORBA::release (CORBA::_tc_ORBid); #if !defined (TAO_HAS_MINIMUM_CORBA) diff --git a/TAO/tao/UIOP_Connector.cpp b/TAO/tao/UIOP_Connector.cpp index f16c60f99f5..0d64982692a 100644 --- a/TAO/tao/UIOP_Connector.cpp +++ b/TAO/tao/UIOP_Connector.cpp @@ -109,7 +109,8 @@ TAO_UIOP_Connector::close (void) int TAO_UIOP_Connector::connect (TAO_Profile *profile, - TAO_Transport *& transport) + TAO_Transport *& transport, + ACE_Time_Value *max_wait_time) { if (profile->tag () != TAO_IOP_TAG_UNIX_IOP) return -1; @@ -122,6 +123,13 @@ TAO_UIOP_Connector::connect (TAO_Profile *profile, const ACE_UNIX_Addr &oa = uiop_profile->object_addr (); + ACE_Synch_Options synch_options; + if (max_wait_time != 0) + { + synch_options.set (ACE_Synch_Options::USE_TIMEOUT, + *max_wait_time); + } + TAO_UIOP_Client_Connection_Handler* result; // the connect call will set the hint () stored in the Profile @@ -130,7 +138,8 @@ TAO_UIOP_Connector::connect (TAO_Profile *profile, // affected. if (this->base_connector_.connect (uiop_profile->hint (), result, - oa) == -1) + oa, + synch_options) == -1) { // Give users a clue to the problem. if (TAO_orbdebug) { diff --git a/TAO/tao/UIOP_Connector.h b/TAO/tao/UIOP_Connector.h index f54e4edb020..5036124f74e 100644 --- a/TAO/tao/UIOP_Connector.h +++ b/TAO/tao/UIOP_Connector.h @@ -77,7 +77,9 @@ public: // Pluggable.h int open (TAO_ORB_Core *orb_core); int close (void); - int connect (TAO_Profile *profile, TAO_Transport *&transport); + int connect (TAO_Profile *profile, + TAO_Transport *&transport, + ACE_Time_Value *max_wait_time); int preconnect (const char *preconnections); TAO_Profile *create_profile (TAO_InputCDR& cdr); diff --git a/TAO/tao/UIOP_Transport.cpp b/TAO/tao/UIOP_Transport.cpp index 519416cdd99..a751392b2cd 100644 --- a/TAO/tao/UIOP_Transport.cpp +++ b/TAO/tao/UIOP_Transport.cpp @@ -200,7 +200,8 @@ TAO_UIOP_Client_Transport::start_locate (TAO_ORB_Core *orb_core, int TAO_UIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int two_way) + int two_way, + ACE_Time_Value *max_wait_time) { ACE_FUNCTION_TIMEPROBE (TAO_UIOP_CLIENT_TRANSPORT_SEND_REQUEST_START); @@ -210,7 +211,8 @@ TAO_UIOP_Client_Transport::send_request (TAO_ORB_Core *orb_core, return TAO_GIOP::send_message (this, stream, - orb_core); + orb_core, + max_wait_time); } // Return 0, when the reply is not read fully, 1 if it is read fully. @@ -293,11 +295,14 @@ TAO_UIOP_Client_Transport::handle_client_input (int /* block */) return -1; } - if (this->tms_->dispatch_reply (request_id, - reply_status, - message_state->giop_version, - reply_ctx, - message_state) != 0) + result = + this->tms_->dispatch_reply (request_id, + reply_status, + message_state->giop_version, + reply_ctx, + message_state); + + if (result == -1) { if (TAO_debug_level > 0) ACE_ERROR ((LM_ERROR, @@ -308,12 +313,18 @@ TAO_UIOP_Client_Transport::handle_client_input (int /* block */) return -1; } + if (result == 0) + { + message_state->reset (); + return 0; + } + // This is a NOOP for the Exclusive request case, but it actually // destroys the stream in the muxed case. this->tms_->destroy_message_state (message_state); // Return something to indicate the reply is received. - return 1; + return result; } int @@ -386,7 +397,8 @@ TAO_UIOP_Client_Transport::check_unexpected_data (void) // **************************************************************** ssize_t -TAO_UIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *) +TAO_UIOP_Transport::send (const ACE_Message_Block *mblk, + ACE_Time_Value *max_time_wait) { TAO_FUNCTION_PP_TIMEPROBE (TAO_UIOP_TRANSPORT_SEND_START); @@ -417,9 +429,15 @@ TAO_UIOP_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *) // we should set IOV_MAX to that limit. if (iovcnt == IOV_MAX) { - n = this->handler_->peer ().sendv_n ((const iovec *) iov, - iovcnt); - if (n < 1) + if (max_time_wait == 0) + n = this->handler_->peer ().sendv_n ((const iovec *) iov, + iovcnt); + else + n = ACE::writev (this->handler_->peer ().get_handle (), + (const iovec*) iov, + iovcnt, + max_time_wait); + if (n <= 0) return n; nbytes += n; @@ -500,7 +518,8 @@ TAO_UIOP_Transport::recv (iovec *iov, int TAO_UIOP_Transport::send_request (TAO_ORB_Core * /* orb_core */, TAO_OutputCDR & /* stream */, - int /* twoway */) + int /* twoway */, + ACE_Time_Value * /* max_wait_time */) { return -1; } diff --git a/TAO/tao/UIOP_Transport.h b/TAO/tao/UIOP_Transport.h index 76d1a76d830..d94ef8ba884 100644 --- a/TAO/tao/UIOP_Transport.h +++ b/TAO/tao/UIOP_Transport.h @@ -97,7 +97,8 @@ public: virtual int send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway); + int twoway, + ACE_Time_Value *max_wait_time); // Default action to be taken for send request. protected: @@ -149,7 +150,8 @@ public: int send_request (TAO_ORB_Core *orb_core, TAO_OutputCDR &stream, - int twoway); + int twoway, + ACE_Time_Value *max_wait_time); // This is a bridge method for the connection handlers // <send_request> method. The connection handler is responsible for // concurrency strategies, typically using the leader-follower diff --git a/TAO/tao/Wait_Strategy.cpp b/TAO/tao/Wait_Strategy.cpp index 5d07a29c36f..8f9288b9877 100644 --- a/TAO/tao/Wait_Strategy.cpp +++ b/TAO/tao/Wait_Strategy.cpp @@ -40,7 +40,7 @@ TAO_Wait_On_Reactor::~TAO_Wait_On_Reactor (void) } int -TAO_Wait_On_Reactor::wait (void) +TAO_Wait_On_Reactor::wait (ACE_Time_Value *max_wait_time) { // Reactor does not change inside the loop. ACE_Reactor* reactor = @@ -58,15 +58,32 @@ TAO_Wait_On_Reactor::wait (void) int result = 0; this->reply_received_ = 0; - while (this->reply_received_ == 0 && result >= 0) + while (this->reply_received_ == 0 && result > 0) { - result = reactor->handle_events (/* timeout */); + result = reactor->handle_events (max_wait_time); } if (result == -1 || this->reply_received_ == -1) return -1; - return 0; + // Return an error if there was a problem receiving the reply... + if (max_wait_time != 0) + { + if (this->reply_received_ != 1 + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + } + else + { + result = 0; + if (this->reply_received_ == -1) + result = -1; + } + + return result; } int @@ -173,7 +190,7 @@ TAO_Wait_On_Leader_Follower::sending_request (TAO_ORB_Core *orb_core, } int -TAO_Wait_On_Leader_Follower::wait (void) +TAO_Wait_On_Leader_Follower::wait (ACE_Time_Value *max_wait_time) { // Cache the ORB core, it won't change and is used multiple times // below: @@ -189,6 +206,8 @@ TAO_Wait_On_Leader_Follower::wait (void) leader_follower.set_client_thread (); + ACE_Countdown_Time countdown (max_wait_time); + // Check if there is a leader, but the leader is not us if (leader_follower.leader_available () && !leader_follower.is_leader_thread ()) @@ -217,10 +236,22 @@ TAO_Wait_On_Leader_Follower::wait (void) while (!this->reply_received_ && leader_follower.leader_available ()) { - if (cond == 0 || cond->wait () == -1) - return -1; + if (max_wait_time == 0) + { + if (cond == 0 || cond->wait () == -1) + return -1; + } + else + { + countdown.update (); + ACE_Time_Value tv = ACE_OS::gettimeofday (); + tv += *max_wait_time; + if (cond == 0 || cond->wait (&tv) == -1) + return -1; + } } + countdown.update (); if (leader_follower.remove_follower (cond) == -1) ACE_ERROR ((LM_ERROR, "TAO (%P|%t) TAO_Wait_On_Leader_Follower::wait - " @@ -266,7 +297,7 @@ TAO_Wait_On_Leader_Follower::wait (void) // This might increase the refcount of the leader. leader_follower.set_leader_thread (); - int result = 0; + int result = 1; { ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_SYNCH_MUTEX>, rev_mon, @@ -281,8 +312,8 @@ TAO_Wait_On_Leader_Follower::wait (void) //ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - wait (leader) on <%x>\n", //this->transport_)); - while (result >= 0 && this->reply_received_ == 0) - result = orb_core->reactor ()->handle_events (); + while (result > 0 && this->reply_received_ == 0) + result = orb_core->reactor ()->handle_events (max_wait_time); //ACE_DEBUG ((LM_DEBUG, "TAO (%P|%t) - done (leader) on <%x>\n", //this->transport_)); @@ -311,10 +342,22 @@ TAO_Wait_On_Leader_Follower::wait (void) -1); // Return an error if there was a problem receiving the reply... - result = 0; - if (this->reply_received_ == -1) + if (max_wait_time != 0) + { + if (this->reply_received_ != 1 + && *max_wait_time == ACE_Time_Value::zero) + { + result = -1; + errno = ETIME; + } + } + else { - result = -1; + result = 0; + if (this->reply_received_ == -1) + { + result = -1; + } } // Make us reusable @@ -449,7 +492,7 @@ TAO_Wait_On_Read::~TAO_Wait_On_Read (void) // Wait on the read operation. int -TAO_Wait_On_Read::wait (void) +TAO_Wait_On_Read::wait (ACE_Time_Value * /* max_wait_time */) { int received_reply = 0; while (received_reply == 0) diff --git a/TAO/tao/Wait_Strategy.h b/TAO/tao/Wait_Strategy.h index 83641b195d8..b436903f605 100644 --- a/TAO/tao/Wait_Strategy.h +++ b/TAO/tao/Wait_Strategy.h @@ -47,7 +47,7 @@ public: // variables because the reply may arrive *before* the user calls // wait. - virtual int wait (void) = 0; + virtual int wait (ACE_Time_Value *max_wait_time) = 0; // Base class virtual method. virtual int handle_input (void) = 0; @@ -83,15 +83,11 @@ public: virtual ~TAO_Wait_On_Reactor (void); // Destructor. - virtual int wait (void); - // Do the event loop of the Reactor. + // = Documented in TAO_Wait_Strategy. + virtual int wait (ACE_Time_Value *max_wait_time); virtual int handle_input (void); - // Handle the input. Delegate this job to Transport object. Before - // that suspend the handler in the Reactor. - virtual int register_handler (void); - // Register the handler with the Reactor. private: int reply_received_; @@ -121,11 +117,8 @@ public: virtual int sending_request (TAO_ORB_Core *orb_core, int two_way); - - virtual int wait (void); - + virtual int wait (ACE_Time_Value *max_wait_time); virtual int handle_input (void); - virtual int register_handler (void); protected: @@ -157,9 +150,8 @@ class TAO_Export TAO_Wait_On_Read : public TAO_Wait_Strategy { // = TITLE // - // Wait on receiving the reply. - // // = DESCRIPTION + // Simply block on read() to wait for the reply. // public: @@ -169,14 +161,9 @@ public: virtual ~TAO_Wait_On_Read (void); // Destructor. - virtual int wait (void); - // Wait on the read operation. - + virtual int wait (ACE_Time_Value *max_wait_time); virtual int handle_input (void); - // Handle the input. Delegate this job to Transport object. - virtual int register_handler (void); - // No-op. Return 0. }; #endif /* TAO_WAIT_STRATEGY_H */ diff --git a/TAO/tao/corbafwd.h b/TAO/tao/corbafwd.h index 92da2946800..29d3fb43235 100644 --- a/TAO/tao/corbafwd.h +++ b/TAO/tao/corbafwd.h @@ -969,6 +969,9 @@ TAO_NAMESPACE CORBA #define TAO_CONNECTOR_REGISTRY_NO_USABLE_PROTOCOL (0x08U << 4) #define TAO_NULL_POINTER_MINOR_CODE (0x09U << 4) #define TAO_MPROFILE_CREATION_ERROR (0x0AU << 4) +#define TAO_TIMEOUT_CONNECT_MINOR_CODE (0x0BU << 4) +#define TAO_TIMEOUT_SEND_MINOR_CODE (0x0CU << 4) +#define TAO_TIMEOUT_RECV_MINOR_CODE (0x0DU << 4) // errno encoding: bottom 4 bits. #define TAO_UNSPECIFIED_MINOR_CODE 0x0U |