summaryrefslogtreecommitdiff
path: root/TAO/tao/IIOP_Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/IIOP_Transport.cpp')
-rw-r--r--TAO/tao/IIOP_Transport.cpp519
1 files changed, 519 insertions, 0 deletions
diff --git a/TAO/tao/IIOP_Transport.cpp b/TAO/tao/IIOP_Transport.cpp
new file mode 100644
index 00000000000..9ca5e3de92c
--- /dev/null
+++ b/TAO/tao/IIOP_Transport.cpp
@@ -0,0 +1,519 @@
+#include "tao/IIOP_Transport.h"
+
+#if defined (TAO_HAS_IIOP) && (TAO_HAS_IIOP != 0)
+
+#include "tao/IIOP_Acceptor.h"
+#include "tao/IIOPC.h"
+#include "tao/Acceptor_Registry.h"
+#include "tao/operation_details.h"
+#include "tao/Wait_Strategy.h"
+#include "tao/debug.h"
+#include "tao/GIOP_Message_Base.h"
+#include "tao/Protocols_Hooks.h"
+#include "tao/ORB_Core.h"
+#include "tao/Thread_Lane_Resources.h"
+#include "tao/Transport_Mux_Strategy.h"
+#include "tao/MMAP_Allocator.h"
+
+#include "ace/OS_NS_sys_sendfile.h"
+
+ACE_RCSID (tao,
+ IIOP_Transport,
+ "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_IIOP_Transport::TAO_IIOP_Transport (TAO_IIOP_Connection_Handler *handler,
+ TAO_ORB_Core *orb_core,
+ CORBA::Boolean )
+ : TAO_Transport (IOP::TAG_INTERNET_IOP,
+ orb_core)
+ , connection_handler_ (handler)
+ , messaging_object_ (0)
+{
+ // Use the normal GIOP object
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Base (orb_core, this));
+}
+
+TAO_IIOP_Transport::~TAO_IIOP_Transport (void)
+{
+ delete this->messaging_object_;
+}
+
+/*
+ * Hook to copy over all concrete implementations
+ * of Transport class from this class to the base
+ * class as a part of the specialization.
+ * All enhancements to the IIOP_Transport
+ * class, i.e., addition of new concrete non virtual
+ * methods should be added within this hook.
+ */
+
+//@@ TAO_TRANSPORT_SPL_COPY_HOOK_START
+ACE_Event_Handler *
+TAO_IIOP_Transport::event_handler_i (void)
+{
+ return this->connection_handler_;
+}
+
+TAO_Connection_Handler *
+TAO_IIOP_Transport::connection_handler_i (void)
+{
+ return this->connection_handler_;
+}
+
+TAO_Pluggable_Messaging *
+TAO_IIOP_Transport::messaging_object (void)
+{
+ return this->messaging_object_;
+}
+
+ssize_t
+TAO_IIOP_Transport::send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *max_wait_time)
+{
+ ssize_t const retval =
+ this->connection_handler_->peer ().sendv (iov,
+ iovcnt,
+ max_wait_time);
+
+ if (retval > 0)
+ bytes_transferred = retval;
+ else
+ {
+ if (TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::send, ")
+ ACE_TEXT ("send failure - %m (errno: %d)\n"),
+ this->id (), errno));
+ }
+ }
+
+ return retval;
+}
+
+#if TAO_HAS_SENDFILE == 1
+ssize_t
+TAO_IIOP_Transport::sendfile (TAO_MMAP_Allocator * allocator,
+ iovec * iov,
+ int iovcnt,
+ size_t &bytes_transferred,
+ ACE_Time_Value const * timeout)
+{
+ // @@ We should probably set the TCP_CORK socket option to minimize
+ // network operations. It may also be useful to adjust the
+ // socket send buffer size accordingly.
+
+ // If we don't have an allocator, fallback to the regular way of sending
+ // data
+ if (allocator == 0)
+ return this->send (iov, iovcnt, bytes_transferred, timeout);
+
+ // We can only use sendfile when all data is coming from the mmap allocator,
+ // if not, we just fallback to to the regular way of sending data
+ iovec * const off_check_begin = iov;
+ iovec * const off_check_end = iov + iovcnt;
+ for (iovec * index = off_check_begin; index != off_check_end; ++index)
+ {
+ if (-1 == allocator->offset (index->iov_base))
+ return this->send (iov, iovcnt, bytes_transferred, timeout);
+ }
+
+ ssize_t retval = -1;
+
+ ACE_HANDLE const in_fd = allocator->handle ();
+
+ if (in_fd == ACE_INVALID_HANDLE)
+ return retval;
+
+ ACE_HANDLE const out_fd =
+ this->connection_handler_->peer ().get_handle ();
+
+ iovec * const begin = iov;
+ iovec * const end = iov + iovcnt;
+ for (iovec * i = begin; i != end; ++i)
+ {
+ off_t offset = allocator->offset (i->iov_base);
+
+ if (timeout)
+ {
+ int val = 0;
+ if (ACE::enter_send_timedwait (out_fd, timeout, val) == -1)
+ return retval;
+ else
+ {
+ retval =
+ ACE_OS::sendfile (out_fd, in_fd, &offset, i->iov_len);
+ ACE::restore_non_blocking_mode (out_fd, val);
+
+ }
+ }
+ else
+ {
+ retval = ACE_OS::sendfile (out_fd, in_fd, &offset, i->iov_len);
+ }
+
+ if (retval <= 0) // Report errors below.
+ break;
+
+ bytes_transferred += static_cast<size_t> (retval);
+ }
+
+ if (retval <= 0 && TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::sendfile, ")
+ ACE_TEXT ("sendfile failure - %m (errno: %d)\n"),
+ this->id (),
+ errno));
+ }
+
+ return retval;
+}
+#endif /* TAO_HAS_SENDFILE==1 */
+
+ssize_t
+TAO_IIOP_Transport::recv (char *buf,
+ size_t len,
+ const ACE_Time_Value *max_wait_time)
+{
+ ssize_t const n = this->connection_handler_->peer ().recv (buf,
+ len,
+ max_wait_time);
+
+ // Do not print the error message if it is a timeout, which could
+ // occur in thread-per-connection.
+ if (n == -1 &&
+ TAO_debug_level > 4 &&
+ errno != ETIME)
+ {
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::recv, ")
+ ACE_TEXT ("read failure - %m\n"),
+ this->id ()));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ return -1;
+ }
+
+ // Most of the errors handling is common for
+ // Now the message has been read
+
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
+ return n;
+}
+
+int
+TAO_IIOP_Transport::send_request (TAO_Stub *stub,
+ TAO_ORB_Core *orb_core,
+ TAO_OutputCDR &stream,
+ int message_semantics,
+ ACE_Time_Value *max_wait_time)
+{
+ if (this->ws_->sending_request (orb_core,
+ message_semantics) == -1)
+
+ return -1;
+
+ if (this->send_message (stream,
+ stub,
+ message_semantics,
+ max_wait_time) == -1)
+ return -1;
+
+ this->first_request_sent();
+
+ return 0;
+}
+
+int
+TAO_IIOP_Transport::send_message (TAO_OutputCDR &stream,
+ TAO_Stub *stub,
+ int message_semantics,
+ ACE_Time_Value *max_wait_time)
+{
+ // Format the message in the stream first
+ if (this->messaging_object_->format_message (stream) != 0)
+ return -1;
+
+ // This guarantees to send all data (bytes) or return an error.
+ ssize_t const n = this->send_message_shared (stub,
+ message_semantics,
+ stream.begin (),
+ max_wait_time);
+
+ if (n == -1)
+ {
+ // Dont try to be smart and request for %p in the debug
+ // statement. If the event handler is destroyed the transport
+ // would return -1 with errno set to ENOENT. %p then would dump
+ // a core. %m would then be softer on this.
+ if (TAO_debug_level)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport[%d]::send_message, ")
+ ACE_TEXT ("write failure - %m\n"),
+ this->id ()));
+ return -1;
+ }
+
+ return 1;
+}
+
+int
+TAO_IIOP_Transport::send_message_shared (
+ TAO_Stub *stub,
+ int message_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ int r;
+
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
+
+ r = this->send_message_shared_i (stub, message_semantics,
+ message_block, max_wait_time);
+ }
+
+ if (r == -1)
+ {
+ this->close_connection ();
+ }
+
+ return r;
+}
+
+int
+TAO_IIOP_Transport::generate_request_header (TAO_Operation_Details &opdetails,
+ TAO_Target_Specification &spec,
+ TAO_OutputCDR &msg)
+{
+ // Check whether we have a Bi Dir IIOP policy set, whether the
+ // messaging objects are ready to handle bidirectional connections
+ // and also make sure that we have not recd. or sent any information
+ // regarding this before...
+ if (this->orb_core ()->bidir_giop_policy () &&
+ this->messaging_object_->is_ready_for_bidirectional (msg) &&
+ this->bidirectional_flag () < 0)
+ {
+ this->set_bidir_context_info (opdetails);
+
+ // Set the flag to 1 (i.e., originating side)
+ this->bidirectional_flag (1);
+
+ // At the moment we enable BiDIR giop we have to get a new
+ // request id to make sure that we follow the even/odd rule
+ // for request id's. We only need to do this when enabled
+ // it, after that the Transport Mux Strategy will make sure
+ // that the rule is followed
+ opdetails.request_id (this->tms ()->request_id ());
+ }
+
+ return TAO_Transport::generate_request_header (opdetails,
+ spec,
+ msg);
+}
+
+int
+TAO_IIOP_Transport::messaging_init (CORBA::Octet major,
+ CORBA::Octet minor)
+{
+ this->messaging_object_->init (major, minor);
+
+ return 1;
+}
+
+int
+TAO_IIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
+{
+ CORBA::Boolean byte_order;
+ if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
+ return -1;
+
+ cdr.reset_byte_order (static_cast<int> (byte_order));
+
+ IIOP::ListenPointList listen_list;
+ if ((cdr >> listen_list) == 0)
+ return -1;
+
+ // As we have received a bidirectional information, set the flag to
+ // 0 (i.e., non-originating side)
+ this->bidirectional_flag (0);
+
+ return this->connection_handler_->process_listen_point_list (listen_list);
+}
+
+void
+TAO_IIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
+{
+ // Get a handle to the acceptor registry
+ TAO_Acceptor_Registry &ar =
+ this->orb_core ()->lane_resources ().acceptor_registry ();
+
+ IIOP::ListenPointList listen_point_list;
+
+ const TAO_AcceptorSetIterator end = ar.end ();
+
+ for (TAO_AcceptorSetIterator acceptor = ar.begin ();
+ acceptor != end;
+ ++acceptor)
+ {
+ // Check whether it is an IIOP acceptor
+ if ((*acceptor)->tag () == IOP::TAG_INTERNET_IOP)
+ {
+ if (this->get_listen_point (listen_point_list, *acceptor) == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - IIOP_Transport::set_bidir_context_info, "
+ "error getting listen_point\n"));
+
+ return;
+ }
+ }
+ }
+
+ if (listen_point_list.length () == 0)
+ {
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ "TAO (%P|%t) - IIOP_Transport::set_bidir_context_info, "
+ "listen_point list is empty, client should send a list "
+ "with at least one point\n"));
+
+ return;
+ }
+
+ // We have the ListenPointList at this point. Create a output CDR
+ // stream at this point
+ TAO_OutputCDR cdr;
+
+ // Marshal the information into the stream
+ if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER) == 0)
+ || (cdr << listen_point_list) == 0)
+ return;
+
+ // Add this info in to the svc_list
+ opdetails.request_service_context ().set_context (IOP::BI_DIR_IIOP,
+ cdr);
+
+ return;
+}
+
+int
+TAO_IIOP_Transport::get_listen_point (
+ IIOP::ListenPointList &listen_point_list,
+ TAO_Acceptor *acceptor)
+{
+ TAO_IIOP_Acceptor *iiop_acceptor =
+ dynamic_cast<TAO_IIOP_Acceptor *> (acceptor);
+
+ if (iiop_acceptor == 0)
+ return -1;
+
+ // Get the array of endpoints serviced by TAO_IIOP_Acceptor
+ const ACE_INET_Addr *endpoint_addr =
+ iiop_acceptor->endpoints ();
+
+ // Get the endpoint count
+ size_t const count =
+ iiop_acceptor->endpoint_count ();
+
+ // Get the local address of the connection
+ ACE_INET_Addr local_addr;
+
+ if (this->connection_handler_->peer ().get_local_addr (local_addr)
+ == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport::get_listen_point, ")
+ ACE_TEXT ("could not resolve local host address\n")),
+ -1);
+ }
+
+ // Note: Looks like there is no point in sending the list of
+ // endpoints on interfaces on which this connection has not
+ // been established. If this is wrong, please correct me.
+ CORBA::String_var local_interface;
+
+ // Get the hostname for the local address
+ if (iiop_acceptor->hostname (this->orb_core_,
+ local_addr,
+ local_interface.out ()) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - IIOP_Transport::get_listen_point, ")
+ ACE_TEXT ("could not resolve local host name\n")),
+ -1);
+ }
+#if defined (ACE_HAS_IPV6)
+ // If this is an IPv6 decimal linklocal address containing a scopeid than
+ // remove the scopeid from the information being sent.
+ const char *cp_scope = 0;
+ if (local_addr.get_type () == PF_INET6 &&
+ (cp_scope = ACE_OS::strchr (local_interface.in (), '%')) != 0)
+ {
+ CORBA::ULong len = cp_scope - local_interface.in ();
+ local_interface[len] = '\0';
+ }
+#endif /* ACE_HAS_IPV6 */
+
+ for (size_t index = 0;
+ index < count;
+ ++index)
+ {
+ // Make sure port numbers are equal so the following comparison
+ // only concerns the IP(v4/v6) address.
+ local_addr.set_port_number (endpoint_addr[index].get_port_number ());
+
+ if (local_addr == endpoint_addr[index])
+ {
+ // Get the count of the number of elements
+ const CORBA::ULong len = listen_point_list.length ();
+
+ // Increase the length by 1
+ listen_point_list.length (len + 1);
+
+ // We have the connection and the acceptor endpoint on the
+ // same interface
+ IIOP::ListenPoint & point = listen_point_list[len];
+ point.host = CORBA::string_dup (local_interface.in ());
+ point.port = endpoint_addr[index].get_port_number ();
+
+ if (TAO_debug_level >= 5)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT("TAO (%P|%t) - Listen_Point_List[%d] = <%s:%d>\n"),
+ len,
+ point.host.in (),
+ point.port));
+ }
+
+ }
+ }
+
+ return 1;
+}
+//@@ TAO_TRANSPORT_SPL_COPY_HOOK_END
+/*
+ * End of copy hook.
+ */
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#endif /* TAO_HAS_IIOP && TAO_HAS_IIOP != 0 */