diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp new file mode 100644 index 00000000000..cef6698b219 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/HTIOP/HTIOP_Transport.cpp @@ -0,0 +1,457 @@ +// $Id$ + +#include "HTIOP_Transport.h" + +#include "HTIOP_Connection_Handler.h" +#include "HTIOP_Acceptor.h" +#include "HTIOP_Profile.h" +#include "ace/HTBP/HTBP_Session.h" + + +#include "tao/Acceptor_Registry.h" +#include "tao/Thread_Lane_Resources.h" +#include "tao/operation_details.h" +#include "tao/Timeprobe.h" +#include "tao/CDR.h" +#include "tao/Transport_Mux_Strategy.h" +#include "tao/Wait_Strategy.h" +#include "tao/Sync_Strategies.h" +#include "tao/Stub.h" +#include "tao/ORB_Core.h" +#include "tao/debug.h" +#include "tao/GIOP_Message_Base.h" +#include "tao/GIOP_Message_Lite.h" +#include "tao/Protocols_Hooks.h" +#include "tao/Adapter.h" + +ACE_RCSID (HTIOP, + TAO_HTIOP_Transport, + "$Id$") + +TAO::HTIOP::Transport::Transport (TAO::HTIOP::Connection_Handler *h, + TAO_ORB_Core *orb_core, + CORBA::Boolean flag) + : TAO_Transport (OCI_TAG_HTIOP_PROFILE, orb_core), + connection_handler_ (h), + messaging_object_ (0) +{ + if (flag) + { + // Use the lite version of the protocol + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Lite (orb_core)); + } + else + { + // Use the normal GIOP object + ACE_NEW (this->messaging_object_, + TAO_GIOP_Message_Base (orb_core)); + } +} + +TAO::HTIOP::Transport::~Transport (void) +{ + delete this->messaging_object_; +} + +ACE_Event_Handler * +TAO::HTIOP::Transport::event_handler_i (void) +{ + return this->connection_handler_; +} + +TAO_Connection_Handler * +TAO::HTIOP::Transport::connection_handler_i (void) +{ + return this->connection_handler_; +} + +TAO_Pluggable_Messaging * +TAO::HTIOP::Transport::messaging_object (void) +{ + return this->messaging_object_; +} + +ssize_t +TAO::HTIOP::Transport::send (iovec *iov, int iovcnt, + size_t &bytes_transferred, + const ACE_Time_Value *max_wait_time) +{ + ACE_UNUSED_ARG (max_wait_time); + ssize_t retval = this->connection_handler_->peer ().sendv (iov, iovcnt, + max_wait_time); + if (retval > 0) + { + bytes_transferred = retval; + } + + return retval; +} + +ssize_t +TAO::HTIOP::Transport::recv (char *buf, + size_t len, + const ACE_Time_Value *max_wait_time) +{ + ssize_t n = this->connection_handler_->peer ().recv (buf, + len, + max_wait_time); + + // Do not print the error message if it is a timeout, which could + // occur in thread-per-connection. + if (n == -1 && + TAO_debug_level > 4 && + errno != ETIME) + { + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO::HTIOP::Transport[%d]::recv_i, ") + ACE_TEXT ("read failure - %m"), + this->id ())); + } + + // Error handling + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + + return -1; + } + + // Most of the errors handling is common for + // Now the message has been read + + // @@ What are the other error handling here?? + else if (n == 0) + { + return -1; + } + + return n; +} + +int +TAO::HTIOP::Transport::register_handler (void) +{ + if (TAO_debug_level > 4) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT("TAO (%P|%t) - TAO::HTIOP::Transport[%d]::register_handler\n"), + this->id ())); + } + + ACE_Reactor *r = this->orb_core_->reactor (); + if (r == this->connection_handler_->reactor ()) + return 0; + + // Set the flag in the Connection Handler and in the Wait Strategy + // @@Maybe we should set these flags after registering with the + // reactor. What if the registration fails??? + this->ws_->is_registered (1); + + ACE::HTBP::Session *s = this->connection_handler_->peer().session(); + if (s) + s->reactor(r); + ACE::HTBP::Channel *ch = s->inbound(); + return r->register_handler (ch->notifier(), + ACE_Event_Handler::READ_MASK); +} + + +int +TAO::HTIOP::Transport::send_request (TAO_Stub *stub, + TAO_ORB_Core *orb_core, + TAO_OutputCDR &stream, + int message_semantics, + ACE_Time_Value *max_wait_time) +{ + if (this->ws_->sending_request (orb_core, + message_semantics) == -1) + + return -1; + + if (this->send_message (stream, + stub, + message_semantics, + max_wait_time) == -1) + return -1; + + this->first_request_sent(); + + return this->idle_after_send (); +} + +int +TAO::HTIOP::Transport::send_message (TAO_OutputCDR &stream, + TAO_Stub *stub, + int message_semantics, + ACE_Time_Value *max_wait_time) +{ + // Format the message in the stream first + if (this->messaging_object_->format_message (stream) != 0) + return -1; + + // This guarantees to send all data (bytes) or return an error. + ssize_t n = this->send_message_shared (stub, + message_semantics, + stream.begin (), + max_wait_time); + + if (n == -1) + { + // Dont try to be smart and request for %p in the debug + // statement. If the event handler is destroyed the transport + // would return -1 with errno set to ENOENT. %p then would dump + // a core. %m would then be softer on this. + if (TAO_debug_level) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TAO (%P|%t) - TAO::HTIOP::Transport[%d]::send_message, ") + ACE_TEXT (" write failure - %m\n"), + this->id ())); + return -1; + } + + return 1; +} + +int +TAO::HTIOP::Transport::send_message_shared (TAO_Stub *stub, + int message_semantics, + const ACE_Message_Block *message_block, + ACE_Time_Value *max_wait_time) +{ + int r; + { + ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1); + + r = this->send_message_shared_i (stub, message_semantics, + message_block, max_wait_time); + } + + if (r == -1) + { + this->close_connection (); + } + + return r; +} + +int +TAO::HTIOP::Transport::generate_request_header (TAO_Operation_Details &opdetails, + TAO_Target_Specification &spec, + TAO_OutputCDR &msg) +{ + // Check whether we have a Bi Dir HTIOP policy set, whether the + // messaging objects are ready to handle bidirectional connections + // and also make sure that we have not recd. or sent any information + // regarding this before... + if (this->orb_core ()->bidir_giop_policy () && + this->messaging_object_->is_ready_for_bidirectional (msg) && + this->bidirectional_flag () < 0) + { + this->set_bidir_context_info (opdetails); + + // Set the flag to 0 (i.e., originating side) + this->bidirectional_flag (0); + } + + // Modify the request id if we have BiDirectional client/server + // setup + opdetails.modify_request_id (this->bidirectional_flag ()); + + + return TAO_Transport::generate_request_header (opdetails, + spec, + msg); +} + + +int +TAO::HTIOP::Transport::messaging_init (CORBA::Octet major, + CORBA::Octet minor) +{ + this->messaging_object_->init (major, + minor); + return 1; +} + +int +TAO::HTIOP::Transport::tear_listen_point_list (TAO_InputCDR &cdr) +{ + CORBA::Boolean byte_order; + if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0) + return -1; + + cdr.reset_byte_order (static_cast<int> (byte_order)); + + ::HTIOP::ListenPointList listen_list; + if ((cdr >> listen_list) == 0) + ACE_ERROR_RETURN ((LM_ERROR,"tear_listen_point_list: no list\n"),-1); + //return -1; + + // As we have received a bidirectional information, set the flag to + // 1 (i.e., non-originating side) + this->bidirectional_flag (1); + + // Just make sure that the connection handler is sane before we go + // head and do anything with it. + ACE_GUARD_RETURN (ACE_Lock, + ace_mon, + *this->handler_lock_, + -1); + + return this->connection_handler_->process_listen_point_list (listen_list); +} + +void +TAO::HTIOP::Transport::set_bidir_context_info (TAO_Operation_Details &opdetails) +{ + ACE_UNUSED_ARG (opdetails); + + // Get a handle to the acceptor registry + TAO_Acceptor_Registry &ar = + this->orb_core ()->lane_resources ().acceptor_registry (); + + // Get the first acceptor in the registry + TAO_AcceptorSetIterator acceptor = ar.begin (); + + ::HTIOP::ListenPointList listen_point_list; + + for (; + acceptor != ar.end (); + acceptor++) + { + // Check whether it is a HTIOP acceptor + if ((*acceptor)->tag () == OCI_TAG_HTIOP_PROFILE) + { + if (this->get_listen_point (listen_point_list, + *acceptor) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("TAO (%P|%t) - TAO::HTIOP::Transport::set_bidir_info, "), + ACE_TEXT("error getting listen_point \n"))); + + return; + } + } + } + + // We have the ListenPointList at this point. Create a output CDR + // stream at this point + TAO_OutputCDR cdr; + + // Marshall the information into the stream + if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER) == 0) + || (cdr << listen_point_list) == 0) + return; + + // Add this info in to the svc_list + opdetails.request_service_context ().set_context (IOP::BI_DIR_IIOP, + cdr); + + return; +} + +int +TAO::HTIOP::Transport::get_listen_point (::HTIOP::ListenPointList &lp_list, + TAO_Acceptor *acceptor) +{ + TAO::HTIOP::Acceptor *htiop_acceptor = + dynamic_cast<TAO::HTIOP::Acceptor *> (acceptor ); + + // Get the array of endpoints serviced by TAO::HTIOP::Acceptor + const ACE::HTBP::Addr *endpoint_addr = + htiop_acceptor->endpoints (); + + // Get the endpoint count + size_t count = + htiop_acceptor->endpoint_count (); + + // Get the local address of the connection + ACE::HTBP::Addr local_addr; + { + // Just make sure that the connection handler is sane before we go + // head and do anything with it. + ACE_GUARD_RETURN (ACE_Lock, + ace_mon, + *this->handler_lock_, + -1); + + if (this->connection_handler_->peer ().get_local_addr (local_addr) + == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local ") + ACE_TEXT ("host address in ") + ACE_TEXT ("get_listen_point()\n")), + -1); + } + } + + if (local_addr.get_port_number() != 0) + { + + // Note: Looks like there is no point in sending the list of + // endpoints on interfaces on which this connection has not + // been established. If this is wrong, please correct me. + CORBA::String_var local_interface; + + // Get the hostname for the local address + if (htiop_acceptor->hostname (this->orb_core_, + local_addr, + local_interface.out ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) Could not resolve local host") + ACE_TEXT (" name \n")), + -1); + } + + for (size_t index = 0; + index != count; + index++) + { + if (local_addr.get_ip_address() + == endpoint_addr[index].get_ip_address()) + { + // Get the count of the number of elements + CORBA::ULong len = lp_list.length (); + + // Increase the length by 1 + lp_list.length (len + 1); + + // We have the connection and the acceptor endpoint on the + // same interface + ::HTIOP::ListenPoint &point = lp_list[len]; + point.host = CORBA::string_dup (local_interface.in ()); + point.port = endpoint_addr[index].get_port_number (); + } + } + } + else + { + // Only add a single listen point based on the htid in the addr + lp_list.length (1); + ::HTIOP::ListenPoint &point = lp_list[0]; + point.host = CORBA::string_dup (""); + point.port = 0; + point.htid = endpoint_addr[0].get_htid(); + } + return 1; +} + + +TAO_Connection_Handler * +TAO::HTIOP::Transport::invalidate_event_handler_i (void) +{ + TAO_Connection_Handler * eh = this->connection_handler_; + this->connection_handler_ = 0; + return eh; +} + + +#if defined ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT +template class HTIOP_Export ACE_Svc_Handler<ACE_HTBP_STREAM, ACE_NULL_SYNCH>; +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION_EXPORT */ |