summaryrefslogtreecommitdiff
path: root/TAO/tao/Strategies/DIOP_Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Strategies/DIOP_Transport.cpp')
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp546
1 files changed, 546 insertions, 0 deletions
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
new file mode 100644
index 00000000000..28bfc2c927a
--- /dev/null
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -0,0 +1,546 @@
+// This may look like C, but it's really -*- C++ -*-
+// $Id$
+
+#include "DIOP_Transport.h"
+
+#if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
+
+#include "DIOP_Connection_Handler.h"
+#include "DIOP_Acceptor.h"
+#include "DIOP_Profile.h"
+#include "tao/Acceptor_Registry.h"
+#include "tao/operation_details.h"
+#include "tao/Timeprobe.h"
+#include "tao/CDR.h"
+#include "tao/Transport_Mux_Strategy.h"
+#include "tao/Wait_Strategy.h"
+#include "tao/Sync_Strategies.h"
+#include "tao/Stub.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/GIOP_Message_Base.h"
+#include "tao/GIOP_Message_Lite.h"
+
+#if !defined (__ACE_INLINE__)
+# include "DIOP_Transport.i"
+#endif /* ! __ACE_INLINE__ */
+
+ACE_RCSID (tao, DIOP_Transport, "$Id$")
+
+TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
+ TAO_ORB_Core *orb_core,
+ CORBA::Boolean flag)
+ : TAO_Transport (TAO_TAG_UDP_PROFILE,
+ orb_core)
+ , connection_handler_ (handler)
+ , messaging_object_ (0)
+{
+ // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that
+ // we read the whole UDP packet on a single read.
+ if (flag)
+ {
+ // Use the lite version of the protocol
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Lite (orb_core,
+ ACE_MAX_DGRAM_SIZE));
+ }
+ else
+ {
+ // Use the normal GIOP object
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Base (orb_core,
+ ACE_MAX_DGRAM_SIZE));
+ }
+}
+
+TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
+{
+ delete this->messaging_object_;
+}
+
+ACE_Event_Handler *
+TAO_DIOP_Transport::event_handler_i (void)
+{
+ return this->connection_handler_;
+}
+
+TAO_Pluggable_Messaging *
+TAO_DIOP_Transport::messaging_object (void)
+{
+ return this->messaging_object_;
+}
+
+ssize_t
+TAO_DIOP_Transport::send_i (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *)
+{
+ const ACE_INET_Addr &addr = this->connection_handler_->addr ();
+ ssize_t retval = this->connection_handler_->dgram ().send (iov,
+ iovcnt,
+ addr);
+ if (retval > 0)
+ bytes_transferred = retval;
+
+ // @@ Michael:
+ // Always return a positive number of bytes sent, as we do
+ // not handle sending errors in DIOP.
+ return 1;
+}
+
+ssize_t
+TAO_DIOP_Transport::recv_i (char *buf,
+ size_t len,
+ const ACE_Time_Value * /* max_wait_time */)
+{
+ ACE_INET_Addr from_addr;
+
+ ssize_t n = this->connection_handler_->dgram ().recv (buf,
+ len,
+ from_addr);
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_DIOP_Transport::recv_i: received %d bytes from %s:%d %d\n",
+ n,
+ from_addr.get_host_name (),
+ from_addr.get_port_number (),
+ errno));
+ }
+
+ // Remember the from addr to eventually use it as remote
+ // addr for the reply.
+ this->connection_handler_->addr (from_addr);
+
+ return n;
+}
+
+
+int
+TAO_DIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
+ int block)
+{
+ // Read the message of the socket
+ int result = this->messaging_object_->read_message (this,
+ block,
+ max_wait_time);
+
+ if (result == -1)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("DIOP_Transport::read_process_message, failure in read_message ()")));
+
+ this->tms_->connection_closed ();
+ return -1;
+ }
+ if (result < 2)
+ return result;
+
+ // Now we know that we have been able to read the complete message
+ // here.. We loop here to see whether we have read more than one
+ // message in our read.
+ // Set the result state
+ result = 1;
+
+ // See we use the reactor semantics again
+ while (result > 0)
+ {
+ result = this->process_message ();
+ }
+
+ return result;
+}
+
+
+int
+TAO_DIOP_Transport::register_handler_i (void)
+{
+ // @@ Michael:
+ //
+ // We do never register register the handler with the reactor
+ // as we never need to be informed about any incoming data,
+ // assuming we only use one-ways.
+ // If we would register and ICMP Messages would arrive, e.g
+ // due to a not reachable server, we would get informed - as this
+ // disturbs the general DIOP assumptions of not being
+ // interested in any network failures, we ignore ICMP messages.
+ return 0;
+ /*
+ // @@ It seems like this method should go away, the right reactor is
+ // picked at object creation time.
+ ACE_Reactor *r = this->orb_core_->reactor ();
+
+ if (r == this->connection_handler_->reactor ())
+ return 0;
+
+ // Set the flag in the Connection Handler
+ this->connection_handler_->is_registered (1);
+
+
+ // Register the handler with the reactor
+ return r->register_handler (this->connection_handler_,
+ ACE_Event_Handler::READ_MASK);
+ */
+}
+
+
+int
+TAO_DIOP_Transport::send_request (TAO_Stub *stub,
+ TAO_ORB_Core *orb_core,
+ TAO_OutputCDR &stream,
+ int two_way,
+ ACE_Time_Value *max_wait_time)
+{
+ if (this->ws_->sending_request (orb_core,
+ two_way) == -1)
+ return -1;
+
+ if (this->send_message (stream,
+ stub,
+ two_way,
+ max_wait_time) == -1)
+
+ return -1;
+
+ return this->idle_after_send ();
+}
+
+int
+TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
+ TAO_Stub *stub,
+ int twoway,
+ ACE_Time_Value *max_wait_time)
+{
+ // Format the message in the stream first
+ if (this->messaging_object_->format_message (stream) != 0)
+ return -1;
+
+ // Strictly speaking, should not need to loop here because the
+ // socket never gets set to a nonblocking mode ... some Linux
+ // versions seem to need it though. Leaving it costs little.
+
+ // This guarantees to send all data (bytes) or return an error.
+ ssize_t n = this->send_message_i (stub,
+ twoway,
+ stream.begin (),
+ max_wait_time);
+
+ if (n == -1)
+ {
+ if (TAO_debug_level)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
+ this->id (),
+ ACE_TEXT ("send_message ()\n")));
+
+ return -1;
+ }
+
+ return 1;
+}
+
+
+
+int
+TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
+ CORBA::Octet minor)
+{
+ this->messaging_object_->init (major,
+ minor);
+ return 1;
+}
+
+// @@ Frank: Hopefully DIOP doesn't need this
+/*
+int
+TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
+{
+ CORBA::Boolean byte_order;
+ if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
+ return -1;
+
+ cdr.reset_byte_order (ACE_static_cast(int,byte_order));
+
+ DIOP::ListenPointList listen_list;
+ if ((cdr >> listen_list) == 0)
+ return -1;
+
+ // As we have received a bidirectional information, set the flag to
+ // 1
+ this->bidirectional_flag (1);
+ return this->connection_handler_->process_listen_point_list (listen_list);
+}
+*/
+
+int
+TAO_DIOP_Transport::process_message (void)
+{
+ // Check whether we have messages for processing
+ int retval =
+ this->messaging_object_->more_messages ();
+
+ if (retval <= 0)
+ return retval;
+
+ // Get the <message_type> that we have received
+ TAO_Pluggable_Message_Type t =
+ this->messaging_object_->message_type ();
+
+
+ int result = 0;
+ if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
+ {
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("Close Connection Message recd \n")));
+
+ this->tms_->connection_closed ();
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
+ {
+ if (this->messaging_object_->process_request_message (this,
+ this->orb_core ()) == -1)
+ return -1;
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ TAO_Pluggable_Reply_Params params (this->orb_core ());
+ if (this->messaging_object_->process_reply_message (params) == -1)
+ {
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p\n"),
+ ACE_TEXT ("DIOP_Transport::process_message, process_reply_message ()")));
+
+ this->messaging_object_->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+
+ result =
+ this->tms_->dispatch_reply (params);
+
+ // @@ Somehow it seems dangerous to reset the state *after*
+ // dispatching the request, what if another threads receives
+ // another reply in the same connection?
+ // My guess is that it works as follows:
+ // - For the exclusive case there can be no such thread.
+ // - The the muxed case each thread has its own message_state.
+ // I'm pretty sure this comment is right. Could somebody else
+ // please look at it and confirm my guess?
+
+ // @@ The above comment was found in the older versions of the
+ // code. The code was also written in such a way that, when
+ // the client thread on a call from handle_input () from the
+ // reactor a call would be made on the handle_client_input
+ // (). The implementation of handle_client_input () looked so
+ // flaky. It used to create a message state upon entry in to
+ // the function using the TMS and destroy that on exit. All
+ // this was fine _theoretically_ for multiple threads. But
+ // the flakiness was originating in the implementation of
+ // get_message_state () where we were creating message state
+ // only once and dishing it out for every thread till one of
+ // them destroy's it. So, it looked broken. That has been
+ // changed. Why?. To my knowledge, the reactor does not call
+ // handle_input () on two threads at the same time. So, IMHO
+ // that defeats the purpose of creating a message state for
+ // every thread. This is just my guess. If we run in to
+ // problems this place needs to be revisited. If someone else
+ // is going to take a look please contact bala@cs.wustl.edu
+ // for details on this-- Bala
+
+
+
+ if (result == -1)
+ {
+ // Something really critical happened, we will forget about
+ // every reply on this connection.
+ if (TAO_debug_level > 0)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) : DIOP_Transport::")
+ ACE_TEXT ("process_message - ")
+ ACE_TEXT ("dispatch reply failed\n")));
+
+ this->messaging_object_->reset ();
+ this->tms_->connection_closed ();
+ return -1;
+ }
+
+ if (result == 0)
+ {
+ this->messaging_object_->reset ();
+
+ // The reply dispatcher was no longer registered.
+ // This can happened when the request/reply
+ // times out.
+ // To throw away all registered reply handlers is
+ // not the right thing, as there might be just one
+ // old reply coming in and several valid new ones
+ // pending. If we would invoke <connection_closed>
+ // we would throw away also the valid ones.
+ //return 0;
+ }
+
+
+ // This is a NOOP for the Exclusive request case, but it actually
+ // destroys the stream in the muxed case.
+ //this->tms_->destroy_message_state (message_state);
+ }
+ else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
+ {
+ return -1;
+ }
+
+ return 1;
+}
+
+// @@ Frank: Hopefully DIOP doesn't need this
+/*
+void
+TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
+{
+
+ // Get a handle on to the acceptor registry
+ TAO_Acceptor_Registry * ar =
+ this->orb_core ()->acceptor_registry ();
+
+
+ // Get the first acceptor in the registry
+ TAO_AcceptorSetIterator acceptor = ar->begin ();
+
+ DIOP::ListenPointList listen_point_list;
+
+ for (;
+ acceptor != ar->end ();
+ acceptor++)
+ {
+ // Check whether it is a DIOP acceptor
+ if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE)
+ {
+ this->get_listen_point (listen_point_list,
+ *acceptor);
+ }
+ }
+
+ // We have the ListenPointList at this point. Create a output CDR
+ // stream at this point
+ TAO_OutputCDR cdr;
+
+ // Marshall the information into the stream
+ if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0)
+ || (cdr << listen_point_list) == 0)
+ return;
+
+ // Add this info in to the svc_list
+ opdetails.service_context ().set_context (IOP::BI_DIR_DIOP,
+ cdr);
+
+ return;
+}
+
+
+int
+TAO_DIOP_Transport::get_listen_point (
+ DIOP::ListenPointList &listen_point_list,
+ TAO_Acceptor *acceptor)
+{
+ TAO_DIOP_Acceptor *iiop_acceptor =
+ ACE_dynamic_cast (TAO_DIOP_Acceptor *,
+ acceptor );
+
+ // Get the array of endpoints serviced by <iiop_acceptor>
+ const ACE_INET_Addr *endpoint_addr =
+ iiop_acceptor->endpoints ();
+
+ // Get the count
+ size_t count =
+ iiop_acceptor->endpoint_count ();
+
+ // Get the local address of the connection
+ ACE_INET_Addr local_addr;
+
+ if (this->connection_handler_->peer ().get_local_addr (local_addr)
+ == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Could not resolve local host")
+ ACE_TEXT (" address in set_bidir_context_info () \n")),
+ -1);
+ }
+
+
+ // Note: Looks like there is no point in sending the list of
+ // endpoints on interfaces on which this connection has not
+ // been established. If this is wrong, please correct me.
+ char *local_interface = 0;
+
+ // Get the hostname for the local address
+ if (iiop_acceptor->hostname (this->orb_core_,
+ local_addr,
+ local_interface) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Could not resolve local host")
+ ACE_TEXT (" name \n")),
+ -1);
+ }
+
+ ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *,
+ endpoint_addr);
+
+ for (size_t index = 0;
+ index <= count;
+ index++)
+ {
+ // Get the listen point on that acceptor if it has the same
+ // interface on which this connection is established
+ char *acceptor_interface = 0;
+
+ if (iiop_acceptor->hostname (this->orb_core_,
+ tmp_addr[index],
+ acceptor_interface) == -1)
+ continue;
+
+ // @@ This is very bad for performance, but it is a one time
+ // affair
+ if (ACE_OS::strcmp (local_interface,
+ acceptor_interface) == 0)
+ {
+ // We have the connection and the acceptor endpoint on the
+ // same interface
+ DIOP::ListenPoint point;
+ point.host = CORBA::string_dup (local_interface);
+ point.port = endpoint_addr[index].get_port_number ();
+
+ // Get the count of the number of elements
+ CORBA::ULong len = listen_point_list.length ();
+
+ // Increase the length by 1
+ listen_point_list.length (len + 1);
+
+ // Add the new length to the list
+ listen_point_list[len] = point;
+ }
+
+ // @@ This is bad....
+ CORBA::string_free (acceptor_interface);
+ }
+
+ CORBA::string_free (local_interface);
+ return 1;
+}
+*/
+
+void
+TAO_DIOP_Transport::transition_handler_state_i (void)
+{
+ this->connection_handler_ = 0;
+}
+
+#endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */