From 796fa166f828decd8be9e24c503f3c4799cca23f Mon Sep 17 00:00:00 2001 From: irfan Date: Wed, 25 Jun 2003 01:58:29 +0000 Subject: ChangeLogTag: Tue Jun 24 21:46:13 2003 Irfan Pyarali --- TAO/tao/Strategies/DIOP_Connection_Handler.cpp | 49 ++++- TAO/tao/Strategies/DIOP_Connection_Handler.h | 6 + TAO/tao/Strategies/DIOP_Connector.cpp | 6 +- TAO/tao/Strategies/DIOP_Transport.cpp | 199 ++----------------- TAO/tao/Strategies/DIOP_Transport.h | 21 +- TAO/tao/Strategies/SCIOP_Connection_Handler.cpp | 49 ++++- TAO/tao/Strategies/SCIOP_Connection_Handler.h | 11 +- TAO/tao/Strategies/SCIOP_Connector.cpp | 241 +++++++++++++---------- TAO/tao/Strategies/SCIOP_Transport.cpp | 110 ++--------- TAO/tao/Strategies/SCIOP_Transport.h | 22 +-- TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp | 45 ++++- TAO/tao/Strategies/SHMIOP_Connection_Handler.h | 6 + TAO/tao/Strategies/SHMIOP_Connector.cpp | 100 ++++++---- TAO/tao/Strategies/SHMIOP_Transport.cpp | 54 +---- TAO/tao/Strategies/SHMIOP_Transport.h | 15 +- TAO/tao/Strategies/UIOP_Connection_Handler.cpp | 51 ++++- TAO/tao/Strategies/UIOP_Connector.cpp | 117 +++++++---- TAO/tao/Strategies/UIOP_Transport.cpp | 55 +----- TAO/tao/Strategies/UIOP_Transport.h | 15 +- 19 files changed, 551 insertions(+), 621 deletions(-) diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp index 21be6692f27..c195a3570be 100644 --- a/TAO/tao/Strategies/DIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/DIOP_Connection_Handler.cpp @@ -52,12 +52,12 @@ TAO_DIOP_Connection_Handler::TAO_DIOP_Connection_Handler (TAO_ORB_Core *orb_core // store this pointer (indirectly increment ref count) this->transport (specific_transport); - TAO_Transport::release (specific_transport); } TAO_DIOP_Connection_Handler::~TAO_DIOP_Connection_Handler (void) { + delete this->transport (); this->udp_socket_.close (); } @@ -166,20 +166,57 @@ TAO_DIOP_Connection_Handler::close_connection (void) int TAO_DIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_eh (h, this); + int result = + this->handle_input_eh (h, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int TAO_DIOP_Connection_Handler::handle_output (ACE_HANDLE handle) { - return this->handle_output_eh (handle, this); + int result = + this->handle_output_eh (handle, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int -TAO_DIOP_Connection_Handler::handle_close (ACE_HANDLE handle, - ACE_Reactor_Mask rm) +TAO_DIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) { - return this->handle_close_eh (handle, rm, this); + // We don't use this upcall from the Reactor. However, we should + // override this since the base class returns -1 which will result + // in handle_close() getting called. + return 0; +} + +int +TAO_DIOP_Connection_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + ACE_ASSERT (0); + return 0; +} + +int +TAO_DIOP_Connection_Handler::close (u_long) +{ + this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + this->transport ()->remove_reference (); + return 0; } int diff --git a/TAO/tao/Strategies/DIOP_Connection_Handler.h b/TAO/tao/Strategies/DIOP_Connection_Handler.h index 4fbf5155a7a..e234b8e74b7 100644 --- a/TAO/tao/Strategies/DIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/DIOP_Connection_Handler.h @@ -102,6 +102,10 @@ public: virtual int open_handler (void *); //@} + /// Close called by the Acceptor or Connector when connection + /// establishment fails. + int close (u_long = 0); + //@{ /** @name Event Handler overloads */ @@ -110,6 +114,8 @@ public: virtual int handle_input (ACE_HANDLE); virtual int handle_output (ACE_HANDLE); virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); //@} /// Add ourselves to Cache. diff --git a/TAO/tao/Strategies/DIOP_Connector.cpp b/TAO/tao/Strategies/DIOP_Connector.cpp index c177e55406a..6eebb705664 100644 --- a/TAO/tao/Strategies/DIOP_Connector.cpp +++ b/TAO/tao/Strategies/DIOP_Connector.cpp @@ -72,7 +72,7 @@ TAO_DIOP_Connector::close (void) while (!iter.done ()) { - (*iter).int_id_->decr_refcount(); + (*iter).int_id_->remove_reference (); iter++; } @@ -156,8 +156,8 @@ TAO_DIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, } // @@ Michael: We do not use regular connection management. - - transport = TAO_Transport::_duplicate (svc_handler->transport ()); + svc_handler->add_reference (); + transport = svc_handler->transport (); return 0; } diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp index ea03fd05281..d4edd4c8c80 100644 --- a/TAO/tao/Strategies/DIOP_Transport.cpp +++ b/TAO/tao/Strategies/DIOP_Transport.cpp @@ -36,13 +36,6 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, , connection_handler_ (handler) , messaging_object_ (0) { - if (connection_handler_ != 0) - { - // REFCNT: Matches one of - // TAO_Transport::connection_handler_close() or - // TAO_Transport::close_connection_shared. - this->connection_handler_->incr_refcount(); - } // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that // we read the whole UDP packet on a single read. if (flag) @@ -63,7 +56,6 @@ TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler, TAO_DIOP_Transport::~TAO_DIOP_Transport (void) { - ACE_ASSERT(this->connection_handler_ == 0); delete this->messaging_object_; } @@ -86,9 +78,9 @@ TAO_DIOP_Transport::messaging_object (void) } ssize_t -TAO_DIOP_Transport::send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *) +TAO_DIOP_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *) { const ACE_INET_Addr &addr = this->connection_handler_->addr (); @@ -109,9 +101,9 @@ TAO_DIOP_Transport::send_i (iovec *iov, int iovcnt, } ssize_t -TAO_DIOP_Transport::recv_i (char *buf, - size_t len, - const ACE_Time_Value * /* max_wait_time */) +TAO_DIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value * /* max_wait_time */) { ACE_INET_Addr from_addr; @@ -136,7 +128,7 @@ TAO_DIOP_Transport::recv_i (char *buf, ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - %p \n"), ACE_TEXT ("TAO - read message failure ") - ACE_TEXT ("recv_i () \n"))); + ACE_TEXT ("recv () \n"))); } // Error handling @@ -161,9 +153,9 @@ TAO_DIOP_Transport::recv_i (char *buf, } int -TAO_DIOP_Transport::handle_input_i (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time, - int /*block*/) +TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time, + int /*block*/) { // If there are no messages then we can go ahead to read from the // handle for further reading.. @@ -236,7 +228,7 @@ TAO_DIOP_Transport::handle_input_i (TAO_Resume_Handle &rh, int -TAO_DIOP_Transport::register_handler_i (void) +TAO_DIOP_Transport::register_handler (void) { // @@ Michael: // @@ -318,173 +310,4 @@ TAO_DIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -// @@ Frank: Hopefully DIOP doesn't need this -/* -int -TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) -{ - CORBA::Boolean byte_order; - if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) - return -1; - - cdr.reset_byte_order (ACE_static_cast(int,byte_order)); - - DIOP::ListenPointList listen_list; - if ((cdr >> listen_list) == 0) - return -1; - - // As we have received a bidirectional information, set the flag to - // 1 - this->bidirectional_flag (1); - return this->connection_handler_->process_listen_point_list (listen_list); -} -*/ - - - -// @@ Frank: Hopefully DIOP doesn't need this -/* -void -TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) -{ - - // Get a handle on to the acceptor registry - TAO_Acceptor_Registry * ar = - this->orb_core ()->acceptor_registry (); - - - // Get the first acceptor in the registry - TAO_AcceptorSetIterator acceptor = ar->begin (); - - DIOP::ListenPointList listen_point_list; - - for (; - acceptor != ar->end (); - acceptor++) - { - // Check whether it is a DIOP acceptor - if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE) - { - this->get_listen_point (listen_point_list, - *acceptor); - } - } - - // We have the ListenPointList at this point. Create a output CDR - // stream at this point - TAO_OutputCDR cdr; - - // Marshall the information into the stream - if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0) - || (cdr << listen_point_list) == 0) - return; - - // Add this info in to the svc_list - opdetails.service_context ().set_context (IOP::BI_DIR_DIOP, - cdr); - - return; -} - - -int -TAO_DIOP_Transport::get_listen_point ( - DIOP::ListenPointList &listen_point_list, - TAO_Acceptor *acceptor) -{ - TAO_DIOP_Acceptor *iiop_acceptor = - ACE_dynamic_cast (TAO_DIOP_Acceptor *, - acceptor ); - - // Get the array of endpoints serviced by - const ACE_INET_Addr *endpoint_addr = - iiop_acceptor->endpoints (); - - // Get the count - size_t count = - iiop_acceptor->endpoint_count (); - - // Get the local address of the connection - ACE_INET_Addr local_addr; - - if (this->connection_handler_->peer ().get_local_addr (local_addr) - == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local host") - ACE_TEXT (" address in set_bidir_context_info () \n")), - -1); - } - - - // Note: Looks like there is no point in sending the list of - // endpoints on interfaces on which this connection has not - // been established. If this is wrong, please correct me. - char *local_interface = 0; - - // Get the hostname for the local address - if (iiop_acceptor->hostname (this->orb_core_, - local_addr, - local_interface) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local host") - ACE_TEXT (" name \n")), - -1); - } - - ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *, - endpoint_addr); - - for (size_t index = 0; - index <= count; - index++) - { - // Get the listen point on that acceptor if it has the same - // interface on which this connection is established - char *acceptor_interface = 0; - - if (iiop_acceptor->hostname (this->orb_core_, - tmp_addr[index], - acceptor_interface) == -1) - continue; - - // @@ This is very bad for performance, but it is a one time - // affair - if (ACE_OS::strcmp (local_interface, - acceptor_interface) == 0) - { - // We have the connection and the acceptor endpoint on the - // same interface - DIOP::ListenPoint point; - point.host = CORBA::string_dup (local_interface); - point.port = endpoint_addr[index].get_port_number (); - - // Get the count of the number of elements - CORBA::ULong len = listen_point_list.length (); - - // Increase the length by 1 - listen_point_list.length (len + 1); - - // Add the new length to the list - listen_point_list[len] = point; - } - - // @@ This is bad.... - CORBA::string_free (acceptor_interface); - } - - CORBA::string_free (local_interface); - return 1; -} -*/ - -TAO_Connection_Handler * -TAO_DIOP_Transport::invalidate_event_handler_i (void) -{ - TAO_Connection_Handler * eh = this->connection_handler_; - this->connection_handler_ = 0; - return eh; -} - #endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */ diff --git a/TAO/tao/Strategies/DIOP_Transport.h b/TAO/tao/Strategies/DIOP_Transport.h index c47c60d1a93..1ffe7064ff2 100644 --- a/TAO/tao/Strategies/DIOP_Transport.h +++ b/TAO/tao/Strategies/DIOP_Transport.h @@ -63,9 +63,9 @@ public: ~TAO_DIOP_Transport (void); /// Look for the documentation in Transport.h. - virtual int handle_input_i (TAO_Resume_Handle &rh, - ACE_Time_Value *max_wait_time = 0, - int block = 0); + virtual int handle_input (TAO_Resume_Handle &rh, + ACE_Time_Value *max_wait_time = 0, + int block = 0); protected: /** @name Overridden Template Methods * @@ -75,21 +75,20 @@ protected: virtual ACE_Event_Handler * event_handler_i (void); virtual TAO_Connection_Handler *connection_handler_i (void); - virtual TAO_Connection_Handler * invalidate_event_handler_i (void); virtual TAO_Pluggable_Messaging *messaging_object (void); /// Write the complete Message_Block chain to the connection. - virtual ssize_t send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *max_wait_time); + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time); /// Read len bytes from into buf. - virtual ssize_t recv_i (char *buf, - size_t len, - const ACE_Time_Value *s = 0); + virtual ssize_t recv (char *buf, + size_t len, + const ACE_Time_Value *s = 0); - virtual int register_handler_i (void); + virtual int register_handler (void); ///@} public: diff --git a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp index 187dae6aaa6..4e51c44e5f5 100644 --- a/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SCIOP_Connection_Handler.cpp @@ -57,7 +57,6 @@ TAO_SCIOP_Connection_Handler::TAO_SCIOP_Connection_Handler (TAO_ORB_Core *orb_co // store this pointer (indirectly increment ref count) this->transport (specific_transport); - TAO_Transport::release (specific_transport); } TAO_SCIOP_Connection_Handler::TAO_SCIOP_Connection_Handler (TAO_ORB_Core *orb_core, @@ -73,6 +72,7 @@ TAO_SCIOP_Connection_Handler::TAO_SCIOP_Connection_Handler (TAO_ORB_Core *orb_co TAO_SCIOP_Connection_Handler::~TAO_SCIOP_Connection_Handler (void) { + delete this->transport (); } int @@ -174,20 +174,57 @@ TAO_SCIOP_Connection_Handler::close_connection (void) int TAO_SCIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_eh (h, this); + int result = + this->handle_input_eh (h, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int TAO_SCIOP_Connection_Handler::handle_output (ACE_HANDLE handle) { - return this->handle_output_eh (handle, this); + int result = + this->handle_output_eh (handle, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int -TAO_SCIOP_Connection_Handler::handle_close (ACE_HANDLE handle, - ACE_Reactor_Mask rm) +TAO_SCIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) { - return this->handle_close_eh (handle, rm, this); + // We don't use this upcall from the Reactor. However, we should + // override this since the base class returns -1 which will result + // in handle_close() getting called. + return 0; +} + +int +TAO_SCIOP_Connection_Handler::handle_close (ACE_HANDLE, + ACE_Reactor_Mask) +{ + ACE_ASSERT (0); + return 0; +} + +int +TAO_SCIOP_Connection_Handler::close (u_long) +{ + this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + this->transport ()->remove_reference (); + return 0; } int diff --git a/TAO/tao/Strategies/SCIOP_Connection_Handler.h b/TAO/tao/Strategies/SCIOP_Connection_Handler.h index a54e7d1fa33..bfed0629fbb 100644 --- a/TAO/tao/Strategies/SCIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/SCIOP_Connection_Handler.h @@ -95,8 +95,9 @@ public: */ -class TAO_Export TAO_SCIOP_Connection_Handler : public TAO_SCIOP_SVC_HANDLER, - public TAO_Connection_Handler +class TAO_Export TAO_SCIOP_Connection_Handler : + public TAO_SCIOP_SVC_HANDLER, + public TAO_Connection_Handler { public: @@ -120,6 +121,10 @@ public: virtual int open_handler (void *); //@} + /// Close called by the Acceptor or Connector when connection + /// establishment fails. + int close (u_long = 0); + //@{ /** @name Event Handler overloads */ @@ -128,6 +133,8 @@ public: virtual int handle_input (ACE_HANDLE); virtual int handle_output (ACE_HANDLE); virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); //@} /// Add ourselves to Cache. diff --git a/TAO/tao/Strategies/SCIOP_Connector.cpp b/TAO/tao/Strategies/SCIOP_Connector.cpp index 2be650999a0..0177dce669d 100644 --- a/TAO/tao/Strategies/SCIOP_Connector.cpp +++ b/TAO/tao/Strategies/SCIOP_Connector.cpp @@ -157,124 +157,165 @@ TAO_SCIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, TAO_SCIOP_Endpoint *sciop_endpoint = this->remote_endpoint (desc->endpoint ()); - if (sciop_endpoint == 0) - return -1; - - const ACE_INET_Addr &remote_address = - sciop_endpoint->object_addr (); + if (sciop_endpoint == 0) + return -1; - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "to <%s:%d>\n", - sciop_endpoint->host(), sciop_endpoint->port())); + const ACE_INET_Addr &remote_address = + sciop_endpoint->object_addr (); - // Get the right synch options - ACE_Synch_Options synch_options; + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "to <%s:%d>\n", + sciop_endpoint->host(), sciop_endpoint->port())); - this->active_connect_strategy_->synch_options (max_wait_time, - synch_options); + // Get the right synch options + ACE_Synch_Options synch_options; - TAO_SCIOP_Connection_Handler *svc_handler = 0; + this->active_connect_strategy_->synch_options (max_wait_time, + synch_options); - // Active connect - int result = this->base_connector_.connect (svc_handler, - remote_address, - synch_options); + TAO_SCIOP_Connection_Handler *svc_handler = 0; - if (result == -1 && errno == EWOULDBLOCK) - { - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "going to wait for connection completion on local" - "handle [%d]\n", - svc_handler->get_handle ())); - - result = - this->active_connect_strategy_->wait (svc_handler, - max_wait_time); - - if (TAO_debug_level > 2) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection" - "wait done for handle[%d], result = %d\n", - svc_handler->get_handle (), result)); - } + // Connect. + int result = this->base_connector_.connect (svc_handler, + remote_address, + synch_options); - } - - int status = - svc_handler->is_finalized (); - - // Reduce the refcount to the svc_handler that we have. The - // increment to the handler is done in make_svc_handler (). Now - // that we dont need the reference to it anymore we can decrement - // the refcount whether the connection is successful ot not. - // REFCNT: Matches with TAO_Connect_Strategy<>::make_svc_handler() - long refcount = svc_handler->decr_refcount (); - - ACE_ASSERT (refcount >= 0); - - ACE_UNUSED_ARG (refcount); + // This call creates the service handler and bumps the #REFCOUNT# up + // one extra. There are three possibilities: (a) connection + // succeeds immediately - in this case, the #REFCOUNT# on the + // handler is two; (b) connection completion is pending - in this + // case, the #REFCOUNT# on the handler is also two; (c) connection + // fails immediately - in this case, the #REFCOUNT# on the handler + // is one since close() gets called on the handler. + // + // The extra reference count in + // TAO_Connect_Creation_Strategy::make_svc_handler() is needed in + // the case when connection completion is pending and we are going + // to wait on a variable in the handler to changes, signifying + // success or failure. Note, that this increment cannot be done + // once the connect() returns since this might be too late if + // another thread pick up the completion and potentially deletes the + // handler before we get a chance to increment the reference count. + + // No immediate result. Wait for completion. + if (result == -1 && errno == EWOULDBLOCK) + { + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "going to wait for connection completion on local" + "handle [%d]\n", + svc_handler->get_handle ())); + + // Wait for connection completion. + result = + this->active_connect_strategy_->wait (svc_handler, + max_wait_time); + + if (TAO_debug_level > 2) + { + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - SCIOP_Connector::make_connection" + "wait done for handle[%d], result = %d\n", + svc_handler->get_handle (), result)); + } + + // Check if the handler has been closed. + int closed = + svc_handler->is_finalized (); + + // In case of failure, check if the connection has been closed. + if (result == -1) + { + if (!closed) + { + // Handler has not been closed - close it now. This + // happens when there is a problem while waiting other + // than the connection failure. + svc_handler->close (); + } + } + } - if (result == -1) - { - // Give users a clue to the problem. - if (TAO_debug_level) - { - ACE_DEBUG ((LM_ERROR, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "connection to <%s:%d> failed (%p)\n", - sciop_endpoint->host (), sciop_endpoint->port (), - "errno")); - } + // Irrespective of success or failure, remove the extra #REFCOUNT#. + svc_handler->remove_reference (); - (void) this->active_connect_strategy_->post_failed_connect (svc_handler, - status); + // In case of errors. + if (result == -1) + { + // Give users a clue to the problem. + if (TAO_debug_level) + { + ACE_DEBUG ((LM_ERROR, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "connection to <%s:%d> failed (%p)\n", + sciop_endpoint->host (), sciop_endpoint->port (), + "errno")); + } + + return -1; + } - return -1; - } + // At this point, the connection has be successfully connected. + // #REFCOUNT# is one. + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "new connection to <%s:%d> on Transport[%d]\n", + sciop_endpoint->host (), sciop_endpoint->port (), + svc_handler->peer ().get_handle ())); + + TAO_Transport *transport = + svc_handler->transport (); + + // Add the handler to Cache + int retval = + this->orb_core ()->lane_resources ().transport_cache ().cache_transport (desc, + base_transport); + + // Failure in adding to cache. + if (retval != 0) + { + // Close the handler. + svc_handler->close (); - if (TAO_debug_level > 2) - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "new connection to <%s:%d> on Transport[%d]\n", - sciop_endpoint->host (), sciop_endpoint->port (), - svc_handler->peer ().get_handle ())); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "could not add the new connection to cache\n")); + } - TAO_Transport *base_transport = - TAO_Transport::_duplicate (svc_handler->transport ()); + return -1; + } - // Add the handler to Cache - int retval = - this->orb_core ()->lane_resources ().transport_cache ().cache_transport (desc, - base_transport); + // Registration failures. + if (retval != 0) + { + // Purge from the connection cache. + transport->purge_entry (); - if (retval != 0 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "could not add the new connection to cache\n")); - } + // Close the handler. + svc_handler->close (); - // If the wait strategy wants us to be registered with the reactor - // then we do so. - retval = base_transport->wait_strategy ()->register_handler (); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - SCIOP_Connector::make_connection, " + "could not register the new connection in the reactor\n")); + } - if (retval != 0 && TAO_debug_level > 0) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Connector::make_connection, " - "could not register the new connection in the reactor\n")); - } + return -1; + } - // Handover the transport pointer to the Invocation class. - TAO_Transport *&transport = invocation->transport (); - transport = base_transport; + // Handover the transport pointer to the Invocation class. + TAO_Transport *&invocation_transport = + invocation->transport (); + invocation_transport = transport; - return 0; + return 0; } diff --git a/TAO/tao/Strategies/SCIOP_Transport.cpp b/TAO/tao/Strategies/SCIOP_Transport.cpp index 75cde3d8814..14eec2c17c2 100644 --- a/TAO/tao/Strategies/SCIOP_Transport.cpp +++ b/TAO/tao/Strategies/SCIOP_Transport.cpp @@ -41,13 +41,6 @@ TAO_SCIOP_Transport::TAO_SCIOP_Transport (TAO_SCIOP_Connection_Handler *handler, , connection_handler_ (handler) , messaging_object_ (0) { - if (connection_handler_ != 0) - { - // REFCNT: Matches one of - // TAO_Transport::connection_handler_close() or - // TAO_Transport::close_connection_shared. - this->connection_handler_->incr_refcount(); - } if (flag) { // Use the lite version of the protocol @@ -64,7 +57,6 @@ TAO_SCIOP_Transport::TAO_SCIOP_Transport (TAO_SCIOP_Connection_Handler *handler, TAO_SCIOP_Transport::~TAO_SCIOP_Transport (void) { - ACE_ASSERT(this->connection_handler_ == 0); delete this->messaging_object_; } @@ -87,9 +79,9 @@ TAO_SCIOP_Transport::messaging_object (void) } ssize_t -TAO_SCIOP_Transport::send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *max_wait_time) +TAO_SCIOP_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) { ssize_t retval = this->connection_handler_->peer ().sendv (iov, iovcnt, max_wait_time); @@ -100,9 +92,9 @@ TAO_SCIOP_Transport::send_i (iovec *iov, int iovcnt, } ssize_t -TAO_SCIOP_Transport::recv_i (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) +TAO_SCIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { ssize_t n = this->connection_handler_->peer ().recv (buf, len, @@ -115,7 +107,7 @@ TAO_SCIOP_Transport::recv_i (char *buf, errno != ETIME) { ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("TAO (%P|%t) - SCIOP_Transport[%d]::recv_i, ") + ACE_TEXT ("TAO (%P|%t) - SCIOP_Transport[%d]::recv, ") ACE_TEXT ("read failure - %m\n"), this->id ())); } @@ -142,32 +134,6 @@ TAO_SCIOP_Transport::recv_i (char *buf, return n; } -int -TAO_SCIOP_Transport::register_handler_i (void) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SCIOP_Transport[%d]::register_handler\n", - this->id ())); - } - - ACE_Reactor *r = this->orb_core_->reactor (); - - if (r == this->connection_handler_->reactor ()) - return 0; - - // Set the flag in the Connection Handler and in the Wait Strategy - // @@Maybe we should set these flags after registering with the - // reactor. What if the registration fails??? - this->ws_->is_registered (1); - - // Register the handler with the reactor - return r->register_handler (this->connection_handler_, - ACE_Event_Handler::READ_MASK); -} - - int TAO_SCIOP_Transport::send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, @@ -184,12 +150,6 @@ TAO_SCIOP_Transport::send_request (TAO_Stub *stub, if (tph != 0) { - ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - - if (this->check_event_handler_i ("SCIOP_Transport::send_request") - == -1) - return -1; - const char protocol[] = "sciop"; const char * protocol_type = protocol; @@ -270,9 +230,6 @@ TAO_SCIOP_Transport::send_message_shared (TAO_Stub *stub, { ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); - if (this->check_event_handler_i ("SCIOP_Transport::send_message_shared") == -1) - return -1; - if (TAO_debug_level > 6) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - ") @@ -350,17 +307,6 @@ TAO_SCIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr) // 1 (i.e., non-originating side) this->bidirectional_flag (1); - // Just make sure that the connection handler is sane before we go - // head and do anything with it. - ACE_GUARD_RETURN (ACE_Lock, - ace_mon, - *this->handler_lock_, - -1); - - if (this->check_event_handler_i ("SCIOP_Transport::tear_listen_point_list") - == -1) - return -1; - return this->connection_handler_->process_listen_point_list (listen_list); } @@ -430,29 +376,17 @@ TAO_SCIOP_Transport::get_listen_point ( // Get the local address of the connection ACE_INET_Addr local_addr; - { - // Just make sure that the connection handler is sane before we go - // head and do anything with it. - ACE_GUARD_RETURN (ACE_Lock, - ace_mon, - *this->handler_lock_, - -1); - - if (this->check_event_handler_i ("IIOP_Transport::get_listen_point") - == -1) - return -1; - - if (this->connection_handler_->peer ().get_local_addr (local_addr) - == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - ACE_TEXT ("(%P|%t) Could not resolve local ") - ACE_TEXT ("host address in ") - ACE_TEXT ("get_listen_point()\n")), - -1); - } - } - + + if (this->connection_handler_->peer ().get_local_addr (local_addr) + == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local ") + ACE_TEXT ("host address in ") + ACE_TEXT ("get_listen_point()\n")), + -1); + } + // Note: Looks like there is no point in sending the list of // endpoints on interfaces on which this connection has not // been established. If this is wrong, please correct me. @@ -493,12 +427,4 @@ TAO_SCIOP_Transport::get_listen_point ( return 1; } -TAO_Connection_Handler * -TAO_SCIOP_Transport::invalidate_event_handler_i (void) -{ - TAO_Connection_Handler * eh = this->connection_handler_; - this->connection_handler_ = 0; - return eh; -} - #endif /* TAO_HAS_SCIOP == 1 */ diff --git a/TAO/tao/Strategies/SCIOP_Transport.h b/TAO/tao/Strategies/SCIOP_Transport.h index 18b9531436d..00c62944009 100644 --- a/TAO/tao/Strategies/SCIOP_Transport.h +++ b/TAO/tao/Strategies/SCIOP_Transport.h @@ -69,25 +69,22 @@ protected: //@{ virtual ACE_Event_Handler * event_handler_i (void); - virtual TAO_Connection_Handler * invalidate_event_handler_i (void); /// Access the underlying messaging object virtual TAO_Pluggable_Messaging *messaging_object (void); - virtual ssize_t send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *timeout = 0); + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0); - virtual ssize_t recv_i (char *buf, - size_t len, - const ACE_Time_Value *s = 0); - - virtual int register_handler_i (void); + virtual ssize_t recv (char *buf, + size_t len, + const ACE_Time_Value *s = 0); virtual int send_message_shared (TAO_Stub *stub, - int message_semantics, - const ACE_Message_Block *message_block, - ACE_Time_Value *max_wait_time); + int message_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time); public: @@ -150,4 +147,3 @@ private: #include "ace/post.h" #endif /* TAO_SCIOP_TRANSPORT_H */ - diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp index 6ff58729342..979a0dc9674 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.cpp @@ -47,12 +47,12 @@ TAO_SHMIOP_Connection_Handler::TAO_SHMIOP_Connection_Handler (TAO_ORB_Core *orb_ // store this pointer (indirectly increment ref count) this->transport (specific_transport); - TAO_Transport::release (specific_transport); } TAO_SHMIOP_Connection_Handler::~TAO_SHMIOP_Connection_Handler (void) { + delete this->transport (); } int @@ -133,20 +133,57 @@ TAO_SHMIOP_Connection_Handler::close_connection (void) int TAO_SHMIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_eh (h, this); + int result = + this->handle_input_eh (h, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int TAO_SHMIOP_Connection_Handler::handle_output (ACE_HANDLE handle) { - return this->handle_output_eh (handle, this); + int result = + this->handle_output_eh (handle, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; +} + +int +TAO_SHMIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // We don't use this upcall from the Reactor. However, we should + // override this since the base class returns -1 which will result + // in handle_close() getting called. + return 0; } int TAO_SHMIOP_Connection_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask rm) { - return this->handle_close_eh (handle, rm, this); + ACE_ASSERT (0); + return 0; +} + +int +TAO_SHMIOP_Connection_Handler::close (u_long) +{ + this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + this->transport ()->remove_reference (); + return 0; } int diff --git a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h index fd1d046a43b..12562a37adf 100644 --- a/TAO/tao/Strategies/SHMIOP_Connection_Handler.h +++ b/TAO/tao/Strategies/SHMIOP_Connection_Handler.h @@ -71,6 +71,10 @@ public: virtual int open_handler (void *); //@} + /// Close called by the Acceptor or Connector when connection + /// establishment fails. + int close (u_long = 0); + //@{ /** @name Event Handler overloads */ @@ -79,6 +83,8 @@ public: virtual int handle_input (ACE_HANDLE); virtual int handle_output (ACE_HANDLE); virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); + virtual int handle_timeout (const ACE_Time_Value ¤t_time, + const void *act = 0); //@} /// Add ourselves to Cache. diff --git a/TAO/tao/Strategies/SHMIOP_Connector.cpp b/TAO/tao/Strategies/SHMIOP_Connector.cpp index 829d3ce0def..24f53525d68 100644 --- a/TAO/tao/Strategies/SHMIOP_Connector.cpp +++ b/TAO/tao/Strategies/SHMIOP_Connector.cpp @@ -133,7 +133,7 @@ TAO_SHMIOP_Connector::set_validate_endpoint (TAO_Endpoint *endpoint) if (TAO_debug_level > 0) { ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("TAO (%P|%t) IIOP connection failed.\n") + ACE_LIB_TEXT ("TAO (%P|%t) SHMIOP connection failed.\n") ACE_LIB_TEXT ("TAO (%P|%t) This is most likely ") ACE_LIB_TEXT ("due to a hostname lookup ") ACE_LIB_TEXT ("failure.\n"))); @@ -165,9 +165,6 @@ TAO_SHMIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, const ACE_INET_Addr &remote_address = shmiop_endpoint->object_addr (); - - TAO_SHMIOP_Connection_Handler *svc_handler = 0; - if (TAO_debug_level > 2) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) SHMIOP_Connector::connect ") @@ -179,27 +176,25 @@ TAO_SHMIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, this->active_connect_strategy_->synch_options (max_wait_time, synch_options); + TAO_SHMIOP_Connection_Handler *svc_handler = 0; + + // Connect. int result = this->base_connector_.connect (svc_handler, remote_address, synch_options); - int status = svc_handler->is_finalized (); - // Reduce the refcount to the svc_handler that we have. The - // increment to the handler is done in make_svc_handler (). Now - // that we dont need the reference to it anymore we can decrement - // the refcount whether the connection is successful ot not. - long refcount = svc_handler->decr_refcount (); - - ACE_ASSERT (refcount >= 0); - ACE_UNUSED_ARG (refcount); + // This call creates the service handler and bumps the #REFCOUNT# up + // one extra. There are two possibilities: (a) connection succeeds + // immediately - in this case, the #REFCOUNT# on the handler is two; + // (b) connection fails immediately - in this case, the #REFCOUNT# + // on the handler is one since close() gets called on the handler. + // We always use a blocking connection so the connection is never + // pending. - // = We dont do a wait since we know that we are doing a blocking - // connect - if (TAO_debug_level > 4) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) SHMIOP_Connector::connect ") - ACE_TEXT ("The result is <%d> \n"), result)); + // Irrespective of success or failure, remove the extra #REFCOUNT#. + svc_handler->remove_reference (); + // In case of errors. if (result == -1) { // Give users a clue to the problem. @@ -215,47 +210,74 @@ TAO_SHMIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, ACE_TEXT ("errno"))); } - (void) this->active_connect_strategy_->post_failed_connect (svc_handler, - status); - return -1; } - TAO_Transport *base_transport = - TAO_Transport::_duplicate (svc_handler->transport ()); + // At this point, the connection has be successfully connected. + // #REFCOUNT# is one. + if (TAO_debug_level > 2) + ACE_DEBUG ((LM_DEBUG, + "TAO (%P|%t) - SHMIOP_Connector::make_connection, " + "new connection to <%s:%d> on Transport[%d]\n", + shmiop_endpoint->host (), shmiop_endpoint->port (), + svc_handler->peer ().get_handle ())); + + TAO_Transport *transport = + svc_handler->transport (); // Add the handler to Cache int retval = this->orb_core ()->lane_resources ().transport_cache ().cache_transport (desc, - base_transport); + transport); - if (retval != 0 && TAO_debug_level > 0) + // Failure in adding to cache. + if (retval != 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) SHMIOP_Connector::connect ") - ACE_TEXT ("could not add the new connection to Cache \n"))); - } + // Close the handler. + svc_handler->close (); + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - SHMIOP_Connector::make_connection, " + "could not add the new connection to cache\n")); + } + + return -1; + } // If the wait strategy wants us to be registered with the reactor - // then we do so. - retval = base_transport->wait_strategy ()->register_handler (); + // then we do so. If registeration is required and it succeeds, + // #REFCOUNT# becomes two. + retval = transport->wait_strategy ()->register_handler (); - if (retval != 0 && TAO_debug_level > 0) + // Registration failures. + if (retval != 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("(%P|%t) IIOP_Connector::connect ") - ACE_LIB_TEXT ("could not add the new connection to reactor \n"))); + // Purge from the connection cache. + transport->purge_entry (); + + // Close the handler. + svc_handler->close (); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - SHMIOP_Connector::make_connection, " + "could not register the new connection in the reactor\n")); + } + + return -1; } // Handover the transport pointer to the Invocation class. - TAO_Transport *&transport = invocation->transport (); - transport = base_transport; + TAO_Transport *&invocation_transport = + invocation->transport (); + invocation_transport = transport; return 0; } - TAO_Profile * TAO_SHMIOP_Connector::create_profile (TAO_InputCDR& cdr) { diff --git a/TAO/tao/Strategies/SHMIOP_Transport.cpp b/TAO/tao/Strategies/SHMIOP_Transport.cpp index 4151467e50f..77ef4957f7c 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.cpp +++ b/TAO/tao/Strategies/SHMIOP_Transport.cpp @@ -34,13 +34,6 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl connection_handler_ (handler), messaging_object_ (0) { - if (connection_handler_ != 0) - { - // REFCNT: Matches one of - // TAO_Transport::connection_handler_close() or - // TAO_Transport::close_connection_shared. - this->connection_handler_->incr_refcount(); - } if (flag) { // Use the lite version of the protocol @@ -57,7 +50,6 @@ TAO_SHMIOP_Transport::TAO_SHMIOP_Transport (TAO_SHMIOP_Connection_Handler *handl TAO_SHMIOP_Transport::~TAO_SHMIOP_Transport (void) { - ACE_ASSERT(this->connection_handler_ == 0); delete this->messaging_object_; } @@ -81,9 +73,9 @@ TAO_SHMIOP_Transport::messaging_object (void) ssize_t -TAO_SHMIOP_Transport::send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) { bytes_transferred = 0; for (int i = 0; i < iovcnt; ++i) @@ -101,9 +93,9 @@ TAO_SHMIOP_Transport::send_i (iovec *iov, int iovcnt, } ssize_t -TAO_SHMIOP_Transport::recv_i (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) +TAO_SHMIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { ssize_t n = 0; @@ -200,32 +192,6 @@ TAO_SHMIOP_Transport::consolidate_message (ACE_Message_Block &incoming, return this->process_parsed_messages (&pqd, rh); } - -int -TAO_SHMIOP_Transport::register_handler_i (void) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - SHMIOP_Transport::register_handler %d\n", - this->id ())); - } - // @@ It seems like this method should go away, the right reactor is - // picked at object creation time. - ACE_Reactor *r = this->orb_core_->reactor (); - - if (r == this->connection_handler_->reactor ()) - return 0; - - // Set the flag in the Connection Handler - this->ws_->is_registered (1); - - // Register the handler with the reactor - return r->register_handler (this->connection_handler_, - ACE_Event_Handler::READ_MASK); -} - - int TAO_SHMIOP_Transport::send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, @@ -292,12 +258,4 @@ TAO_SHMIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -TAO_Connection_Handler * -TAO_SHMIOP_Transport::invalidate_event_handler_i (void) -{ - TAO_Connection_Handler * eh = this->connection_handler_; - this->connection_handler_ = 0; - return eh; -} - #endif /* TAO_HAS_SHMIOP && TAO_HAS_SHMIOP != 0 */ diff --git a/TAO/tao/Strategies/SHMIOP_Transport.h b/TAO/tao/Strategies/SHMIOP_Transport.h index b41197303ae..a0089bae8a0 100644 --- a/TAO/tao/Strategies/SHMIOP_Transport.h +++ b/TAO/tao/Strategies/SHMIOP_Transport.h @@ -69,26 +69,23 @@ protected: //@{ virtual ACE_Event_Handler * event_handler_i (void); virtual TAO_Connection_Handler *connection_handler_i (void); - virtual TAO_Connection_Handler * invalidate_event_handler_i (void); virtual TAO_Pluggable_Messaging *messaging_object (void); /// Write the complete Message_Block chain to the connection. - virtual ssize_t send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *timeout = 0); + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0); /// Read len bytes from into buf. - virtual ssize_t recv_i (char *buf, - size_t len, - const ACE_Time_Value *s = 0); + virtual ssize_t recv (char *buf, + size_t len, + const ACE_Time_Value *s = 0); virtual int consolidate_message (ACE_Message_Block &incoming, ssize_t missing_data, TAO_Resume_Handle &rh, ACE_Time_Value *max_wait_time); - virtual int register_handler_i (void); - //@} public: diff --git a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp index ca1b2f1b95a..9b7c278e6bb 100644 --- a/TAO/tao/Strategies/UIOP_Connection_Handler.cpp +++ b/TAO/tao/Strategies/UIOP_Connection_Handler.cpp @@ -47,17 +47,17 @@ TAO_UIOP_Connection_Handler::TAO_UIOP_Connection_Handler (TAO_ORB_Core *orb_core (TAO_UIOP_Properties *, arg)) { TAO_UIOP_Transport* specific_transport = 0; - ACE_NEW(specific_transport, - TAO_UIOP_Transport(this, orb_core, flag)); + ACE_NEW (specific_transport, + TAO_UIOP_Transport(this, orb_core, flag)); // store this pointer (indirectly increment ref count) - this->transport(specific_transport); - TAO_Transport::release (specific_transport); + this->transport (specific_transport); } TAO_UIOP_Connection_Handler::~TAO_UIOP_Connection_Handler (void) { + delete this->transport (); } int @@ -116,20 +116,57 @@ TAO_UIOP_Connection_Handler::close_connection (void) int TAO_UIOP_Connection_Handler::handle_input (ACE_HANDLE h) { - return this->handle_input_eh (h, this); + int result = + this->handle_input_eh (h, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; } int TAO_UIOP_Connection_Handler::handle_output (ACE_HANDLE handle) { - return this->handle_output_eh (handle, this); + int result = + this->handle_output_eh (handle, this); + + if (result == -1) + { + this->close_connection (); + return 0; + } + + return result; +} + +int +TAO_UIOP_Connection_Handler::handle_timeout (const ACE_Time_Value &, + const void *) +{ + // We don't use this upcall from the Reactor. However, we should + // override this since the base class returns -1 which will result + // in handle_close() getting called. + return 0; } int TAO_UIOP_Connection_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask rm) { - return this->handle_close_eh (handle, rm, this); + ACE_ASSERT (0); + return 0; +} + +int +TAO_UIOP_Connection_Handler::close (u_long) +{ + this->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED); + this->transport ()->remove_reference (); + return 0; } int diff --git a/TAO/tao/Strategies/UIOP_Connector.cpp b/TAO/tao/Strategies/UIOP_Connector.cpp index 0881aba8f4d..5bdb548f2c0 100644 --- a/TAO/tao/Strategies/UIOP_Connector.cpp +++ b/TAO/tao/Strategies/UIOP_Connector.cpp @@ -155,6 +155,7 @@ TAO_UIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, ACE_TEXT ("(%P|%t) UIOP_Connector::connect ") ACE_TEXT ("making a new connection \n"))); + // Get the right synch options ACE_Synch_Options synch_options; this->active_connect_strategy_->synch_options (max_wait_time, @@ -162,11 +163,30 @@ TAO_UIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, TAO_UIOP_Connection_Handler *svc_handler = 0; + // Connect. int result = this->base_connector_.connect (svc_handler, remote_address, synch_options); + // This call creates the service handler and bumps the #REFCOUNT# up + // one extra. There are three possibilities: (a) connection + // succeeds immediately - in this case, the #REFCOUNT# on the + // handler is two; (b) connection completion is pending - in this + // case, the #REFCOUNT# on the handler is also two; (c) connection + // fails immediately - in this case, the #REFCOUNT# on the handler + // is one since close() gets called on the handler. + // + // The extra reference count in + // TAO_Connect_Creation_Strategy::make_svc_handler() is needed in + // the case when connection completion is pending and we are going + // to wait on a variable in the handler to changes, signifying + // success or failure. Note, that this increment cannot be done + // once the connect() returns since this might be too late if + // another thread pick up the completion and potentially deletes the + // handler before we get a chance to increment the reference count. + + // No immediate result. Wait for completion. if (result == -1 && errno == EWOULDBLOCK) { if (TAO_debug_level > 2) @@ -175,6 +195,8 @@ TAO_UIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, "going to wait for connection completion on local" "handle [%d]\n", svc_handler->get_handle ())); + + // Wait for connection completion. result = this->active_connect_strategy_->wait (svc_handler, max_wait_time); @@ -186,26 +208,28 @@ TAO_UIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, "wait done for handle[%d], result = %d\n", svc_handler->get_handle (), result)); } - } - - int status = - svc_handler->is_finalized (); + // Check if the handler has been closed. + int closed = + svc_handler->is_finalized (); - // Reduce the refcount to the svc_handler that we have. The - // increment to the handler is done in make_svc_handler (). Now - // that we dont need the reference to it anymore we can decrement - // the refcount whether the connection is successful ot not. - long refcount = svc_handler->decr_refcount (); - - ACE_ASSERT (refcount >= 0); - ACE_UNUSED_ARG (refcount); + // In case of failure, check if the connection has been closed. + if (result == -1) + { + if (!closed) + { + // Handler has not been closed - close it now. This + // happens when there is a problem while waiting other + // than the connection failure. + svc_handler->close (); + } + } + } - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) UIOP_Connector::connect ") - ACE_TEXT ("The result is <%d> \n"), result)); + // Irrespective of success or failure, remove the extra #REFCOUNT#. + svc_handler->remove_reference (); + // In case of errors. if (result == -1) { // Give users a clue to the problem. @@ -220,46 +244,67 @@ TAO_UIOP_Connector::make_connection (TAO_GIOP_Invocation *invocation, ACE_TEXT ("errno"))); } - (void) this->active_connect_strategy_->post_failed_connect (svc_handler, - status); return -1; } - TAO_Transport *base_transport = - TAO_Transport::_duplicate (svc_handler->transport ()); + // At this point, the connection has be successfully connected. + // #REFCOUNT# is one. + + TAO_Transport *transport = + svc_handler->transport (); // Add the handler to Cache int retval = this->orb_core ()->lane_resources ().transport_cache ().cache_transport (desc, - base_transport); - - if (retval != 0 && TAO_debug_level > 0) + transport); + // Failure in adding to cache. + if (retval != 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_TEXT ("(%P|%t) UIOP_Connector::connect ") - ACE_TEXT ("could not add the new connection to Cache \n"))); + // Close the handler. + svc_handler->close (); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) UIOP_Connector::connect ") + ACE_TEXT ("could not add the new connection to Cache \n"))); + } + + return -1; } // If the wait strategy wants us to be registered with the reactor - // then we do so. - retval = base_transport->wait_strategy ()->register_handler (); + // then we do so. If registeration is required and it succeeds, + // #REFCOUNT# becomes two. + retval = transport->wait_strategy ()->register_handler (); - if (retval != 0 && TAO_debug_level > 0) + // Registration failures. + if (retval != 0) { - ACE_DEBUG ((LM_DEBUG, - ACE_LIB_TEXT ("(%P|%t) IIOP_Connector::connect ") - ACE_LIB_TEXT ("could not add the new connection to reactor \n"))); + // Purge from the connection cache. + transport->purge_entry (); + + // Close the handler. + svc_handler->close (); + + if (TAO_debug_level > 0) + { + ACE_ERROR ((LM_ERROR, + "TAO (%P|%t) - UIOP_Connector::make_connection, " + "could not register the new connection in the reactor\n")); + } + + return -1; } // Handover the transport pointer to the Invocation class. - TAO_Transport *&transport = invocation->transport (); - transport = base_transport; + TAO_Transport *&invocation_transport = + invocation->transport (); + invocation_transport = transport; return 0; } - - TAO_Profile * TAO_UIOP_Connector::create_profile (TAO_InputCDR& cdr) { diff --git a/TAO/tao/Strategies/UIOP_Transport.cpp b/TAO/tao/Strategies/UIOP_Transport.cpp index 3270f94f012..b5cfaf044dd 100644 --- a/TAO/tao/Strategies/UIOP_Transport.cpp +++ b/TAO/tao/Strategies/UIOP_Transport.cpp @@ -33,13 +33,6 @@ TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, , connection_handler_ (handler) , messaging_object_ (0) { - if (connection_handler_ != 0) - { - // REFCNT: Matches one of - // TAO_Transport::connection_handler_close() or - // TAO_Transport::close_connection_shared. - this->connection_handler_->incr_refcount(); - } if (flag) { // Use the lite version of the protocol @@ -56,7 +49,6 @@ TAO_UIOP_Transport::TAO_UIOP_Transport (TAO_UIOP_Connection_Handler *handler, TAO_UIOP_Transport::~TAO_UIOP_Transport (void) { - ACE_ASSERT(this->connection_handler_ == 0); delete this->messaging_object_; } @@ -79,9 +71,9 @@ TAO_UIOP_Transport::messaging_object (void) } ssize_t -TAO_UIOP_Transport::send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *max_wait_time) +TAO_UIOP_Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) { ssize_t retval = this->connection_handler_->peer ().sendv (iov, iovcnt, max_wait_time); @@ -92,9 +84,9 @@ TAO_UIOP_Transport::send_i (iovec *iov, int iovcnt, } ssize_t -TAO_UIOP_Transport::recv_i (char *buf, - size_t len, - const ACE_Time_Value *max_wait_time) +TAO_UIOP_Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) { ssize_t n = this->connection_handler_->peer ().recv (buf, len, @@ -109,7 +101,7 @@ TAO_UIOP_Transport::recv_i (char *buf, ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - %p \n"), ACE_TEXT ("TAO - read message failure ") - ACE_TEXT ("recv_i () \n"))); + ACE_TEXT ("recv () \n"))); } // Error handling @@ -129,31 +121,6 @@ TAO_UIOP_Transport::recv_i (char *buf, return n; } -int -TAO_UIOP_Transport::register_handler_i (void) -{ - if (TAO_debug_level > 4) - { - ACE_DEBUG ((LM_DEBUG, - "TAO (%P|%t) - UIOP_Transport::register_handler %d\n", - this->id ())); - } - // @@ It seems like this method should go away, the right reactor is - // picked at object creation time. - ACE_Reactor *r = this->orb_core_->reactor (); - - if (r == this->connection_handler_->reactor ()) - return 0; - - // Set the flag in the Connection Handler - this->ws_->is_registered (1); - - // Register the handler with the reactor - return r->register_handler (this->connection_handler_, - ACE_Event_Handler::READ_MASK); -} - - int TAO_UIOP_Transport::send_request (TAO_Stub *stub, TAO_ORB_Core *orb_core, @@ -220,12 +187,4 @@ TAO_UIOP_Transport::messaging_init (CORBA::Octet major, return 1; } -TAO_Connection_Handler * -TAO_UIOP_Transport::invalidate_event_handler_i (void) -{ - TAO_Connection_Handler * eh = this->connection_handler_; - this->connection_handler_ = 0; - return eh; -} - #endif /* TAO_HAS_UIOP */ diff --git a/TAO/tao/Strategies/UIOP_Transport.h b/TAO/tao/Strategies/UIOP_Transport.h index cb504621570..1099d7e1bf0 100644 --- a/TAO/tao/Strategies/UIOP_Transport.h +++ b/TAO/tao/Strategies/UIOP_Transport.h @@ -68,20 +68,17 @@ protected: virtual ACE_Event_Handler * event_handler_i (void); virtual TAO_Connection_Handler *connection_handler_i (void); - virtual TAO_Connection_Handler * invalidate_event_handler_i (void); virtual TAO_Pluggable_Messaging *messaging_object (void); /// Write the complete Message_Block chain to the connection. - virtual ssize_t send_i (iovec *iov, int iovcnt, - size_t &bytes_transferred, - const ACE_Time_Value *timeout = 0); + virtual ssize_t send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *timeout = 0); /// Read len bytes from into buf. - virtual ssize_t recv_i (char *buf, - size_t len, - const ACE_Time_Value *s = 0); - - virtual int register_handler_i (void); + virtual ssize_t recv (char *buf, + size_t len, + const ACE_Time_Value *s = 0); public: /// @todo These methods IMHO should have more meaningful names. -- cgit v1.2.1