summaryrefslogtreecommitdiff
path: root/TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp')
-rw-r--r--TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp706
1 files changed, 0 insertions, 706 deletions
diff --git a/TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp b/TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp
deleted file mode 100644
index 1a3582d958f..00000000000
--- a/TAO/examples/PluggableUDP/DIOP/DIOP_Transport.cpp
+++ /dev/null
@@ -1,706 +0,0 @@
-// This may look like C, but it's really -*- C++ -*-
-// $Id$
-
-#include "DIOP_Transport.h"
-#include "DIOP_Connection_Handler.h"
-#include "DIOP_Acceptor.h"
-#include "DIOP_Profile.h"
-#include "tao/Acceptor_Registry.h"
-#include "tao/operation_details.h"
-#include "tao/Timeprobe.h"
-#include "tao/CDR.h"
-#include "tao/Transport_Mux_Strategy.h"
-#include "tao/Wait_Strategy.h"
-#include "tao/Sync_Strategies.h"
-#include "tao/Stub.h"
-#include "tao/ORB_Core.h"
-#include "tao/debug.h"
-#include "tao/GIOP_Message_Base.h"
-#include "tao/GIOP_Message_Lite.h"
-
-#if !defined (__ACE_INLINE__)
-# include "DIOP_Transport.i"
-#endif /* ! __ACE_INLINE__ */
-
-ACE_RCSID (tao, DIOP_Transport, "$Id$")
-
-TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
- TAO_ORB_Core *orb_core,
- CORBA::Boolean flag)
- : TAO_Transport (TAO_TAG_UDP_PROFILE,
- orb_core)
- , connection_handler_ (handler)
- , messaging_object_ (0)
- , local_buffer_ (4192)
-{
- // @@ Michael: Here we hardcoded the size of the buffer!
- if (flag)
- {
- // Use the lite version of the protocol
- ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Lite (orb_core));
- }
- else
- {
- // Use the normal GIOP object
- ACE_NEW (this->messaging_object_,
- TAO_GIOP_Message_Base (orb_core));
- }
-}
-
-TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
-{
- delete this->messaging_object_;
-}
-
-TAO_DIOP_SVC_HANDLER *
-TAO_DIOP_Transport::service_handler (void)
-{
- return this->connection_handler_;
-}
-
-ACE_HANDLE
-TAO_DIOP_Transport::handle (void)
-{
- return this->connection_handler_->get_handle ();
-}
-
-ACE_Event_Handler *
-TAO_DIOP_Transport::event_handler (void)
-{
- return this->connection_handler_;
-}
-
-void
-TAO_DIOP_Transport::close_connection (void)
-{
- // Call handle close
- this->connection_handler_->handle_close ();
-
- // Purge the entry
- this->connection_handler_->purge_entry ();
-}
-
-int
-TAO_DIOP_Transport::idle (void)
-{
- // @@ Michael: We do not use this information
- //return this->connection_handler_->make_idle ();
- return 0;
-}
-
-ssize_t
-TAO_DIOP_Transport::send (const ACE_Message_Block *message_block,
- const ACE_Time_Value * /*max_wait_time*/,
- size_t *bt)
-{
- const ACE_INET_Addr &addr = this->connection_handler_->addr ();
-
- size_t temp;
- size_t &bytes_transferred = bt == 0 ? temp : *bt;
- bytes_transferred = 0;
-
- iovec iov[IOV_MAX];
- int iovcnt = 0;
-
- while (message_block != 0)
- {
- // Our current message block chain.
- const ACE_Message_Block *current_message_block = message_block;
-
- while (current_message_block != 0)
- {
- size_t current_message_block_length =
- current_message_block->length ();
-
- // Check if this block has any data to be sent.
- if (current_message_block_length > 0)
- {
- // Collect the data in the iovec.
- iov[iovcnt].iov_base = current_message_block->rd_ptr ();
- iov[iovcnt].iov_len = current_message_block_length;
-
- // Increment iovec counter.
- iovcnt++;
-
- // The buffer is full make a OS call. @@ TODO find a way to
- // find IOV_MAX for platforms that do not define it rather
- // than simply setting IOV_MAX to some arbitrary value such
- // as 16.
- if (iovcnt == IOV_MAX)
- {
- ssize_t current_transfer =
- this->connection_handler_->dgram ().send (iov,
- iovcnt,
- addr);
- // Errors.
- if (current_transfer == -1 || current_transfer == 0)
- return current_transfer;
-
- // Add to total bytes transferred.
- bytes_transferred += current_transfer;
-
- // Reset iovec counter.
- iovcnt = 0;
- }
- }
-
- // Select the next message block in the chain.
- current_message_block = current_message_block->cont ();
- }
-
- // Selection of the next message block chain.
- message_block = message_block->next ();
- }
-
- // Check for remaining buffers to be sent. This will happen when
- // IOV_MAX is not a multiple of the number of message blocks.
- if (iovcnt != 0)
- {
- ssize_t current_transfer =
- this->connection_handler_->dgram ().send (iov,
- iovcnt,
- addr);
- // Errors.
- if (current_transfer == -1 || current_transfer == 0)
- return current_transfer;
-
- // Add to total bytes transferred.
- bytes_transferred += current_transfer;
- }
-
- //ACE_DEBUG ((LM_DEBUG,
- // "Sent %d bytes to %s:%d\n",
- // bytes_transferred,
- // addr.get_host_name (),
- // addr.get_port_number ()));
-
- // Return total bytes transferred.
- return bytes_transferred;
-}
-
-ssize_t
-TAO_DIOP_Transport::recv (char *buf,
- size_t len,
- const ACE_Time_Value * /*max_wait_time*/)
-{
- ACE_INET_Addr from_addr;
-
- // We read always one datagram, the unread rest is buffered
- // if not immediately read.
- if (local_buffer_.length () == 0)
- {
- local_buffer_.crunch ();
- ssize_t n = this->connection_handler_->dgram ().recv (local_buffer_.wr_ptr (),
- local_buffer_.size (),
- from_addr);
- if (n == -1)
- return -1;
- else
- local_buffer_.wr_ptr (n);
-
- ACE_DEBUG ((LM_DEBUG,
- "Received %d bytes from %s:%d\n",
- n,
- this->connection_handler_->addr ().get_host_name (),
- this->connection_handler_->addr ().get_port_number ()));
- }
-
- if (local_buffer_.length () > 0)
- {
- ssize_t to_read = -1;
- if (len >= local_buffer_.length ())
- to_read = local_buffer_.length ();
- else
- to_read = len;
-
- ACE_OS::memcpy (buf, local_buffer_.rd_ptr (), to_read);
- local_buffer_.rd_ptr (to_read);
-
- ACE_DEBUG ((LM_DEBUG,
- "Read %d bytes\n",
- to_read));
- return to_read;
- }
-
- return -1;
-}
-
-
-int
-TAO_DIOP_Transport::read_process_message (ACE_Time_Value *max_wait_time,
- int block)
-{
- // Read the message of the socket
- int result = this->messaging_object_->read_message (this,
- block,
- max_wait_time);
-
- if (result == -1)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("DIOP_Transport::read_message, failure in read_message ()")));
-
- this->tms_->connection_closed ();
- return -1;
- }
- if (result == 0)
- return result;
-
- // Now we know that we have been able to read the complete message
- // here.. We loop here to see whether we have read more than one
- // message in our read.
- do
- {
- result = this->process_message ();
- }
- while (result > 1);
-
- return result;
-}
-
-
-int
-TAO_DIOP_Transport::register_handler (void)
-{
- // @@ It seems like this method should go away, the right reactor is
- // picked at object creation time.
- ACE_Reactor *r = this->orb_core_->reactor ();
-
- if (r == this->connection_handler_->reactor ())
- return 0;
-
- // About to be registered with the reactor, so bump the ref
- // count
- this->connection_handler_->incr_ref_count ();
-
- // Set the flag in the Connection Handler
- this->connection_handler_->is_registered (1);
-
-
- // Register the handler with the reactor
- return r->register_handler (this->connection_handler_,
- ACE_Event_Handler::READ_MASK);
-}
-
-
-int
-TAO_DIOP_Transport::send_request (TAO_Stub *stub,
- TAO_ORB_Core *orb_core,
- TAO_OutputCDR &stream,
- int two_way,
- ACE_Time_Value *max_wait_time)
-{
- if (this->ws_->sending_request (orb_core,
- two_way) == -1)
- return -1;
-
- if (this->send_message (stream,
- stub,
- two_way,
- max_wait_time) == -1)
-
- return -1;
-
- return this->idle_after_send ();
-}
-
-int
-TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
- TAO_Stub *stub,
- int twoway,
- ACE_Time_Value *max_wait_time)
-{
- // Format the message in the stream first
- if (this->messaging_object_->format_message (stream) != 0)
- return -1;
-
- // Strictly speaking, should not need to loop here because the
- // socket never gets set to a nonblocking mode ... some Linux
- // versions seem to need it though. Leaving it costs little.
-
- // This guarantees to send all data (bytes) or return an error.
- ssize_t n = this->send_or_buffer (stub,
- twoway,
- stream.begin (),
- max_wait_time);
-
- if (n == -1)
- {
- if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) closing conn %d after fault %p\n"),
- this->handle (),
- ACE_TEXT ("send_message ()\n")));
-
- return -1;
- }
-
- // EOF.
- if (n == 0)
- {
- if (TAO_debug_level)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO: (%P|%t|%N|%l) send_message () \n")
- ACE_TEXT ("EOF, closing conn %d\n"),
- this->handle()));
- return -1;
- }
-
- return 1;
-}
-
-
-void
-TAO_DIOP_Transport::start_request (TAO_ORB_Core * /*orb_core*/,
- TAO_Target_Specification & /*spec */,
- TAO_OutputCDR & /*output */,
- CORBA::Environment & /*ACE_TRY_ENV*/)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- // @@ This method should NO longer be required..
-
- /* if (this->client_mesg_factory_->write_protocol_header
- (TAO_PLUGGABLE_MESSAGE_REQUEST,
- output) == 0)
- ACE_THROW (CORBA::MARSHAL ());*/
-}
-
-
-void
-TAO_DIOP_Transport::start_locate (TAO_ORB_Core * /*orb_core*/,
- TAO_Target_Specification &spec,
- TAO_Operation_Details &opdetails,
- TAO_OutputCDR &output,
- CORBA::Environment &ACE_TRY_ENV)
- ACE_THROW_SPEC ((CORBA::SystemException))
-{
- if (this->messaging_object_->generate_locate_request_header (opdetails,
- spec,
- output) == -1)
- ACE_THROW (CORBA::MARSHAL ());
-}
-
-
-CORBA::Boolean
-TAO_DIOP_Transport::send_request_header (TAO_Operation_Details &opdetails,
- TAO_Target_Specification &spec,
- TAO_OutputCDR &msg)
-{
-// @@ Frank: No Bi Dir DIOP yet..
-/*
- // Check whether we have a Bi Dir DIOP policy set, whether the
- // messaging objects are ready to handle bidirectional connections
- // and also make sure that we have not recd. or sent any information
- // regarding this before...
- if (this->orb_core ()->bidir_giop_policy () &&
- this->messaging_object_->is_ready_for_bidirectional () &&
- this->bidirectional_flag () < 0)
- {
- this->set_bidir_context_info (opdetails);
-
- // Set the flag to 0
- this->bidirectional_flag (0);
- }
-
- // Modify the request id if we have BiDirectional client/server
- // setup
- opdetails.modify_request_id (this->bidirectional_flag ());
-*/
- // We are going to pass on this request to the underlying messaging
- // layer. It should take care of this request
- if (this->messaging_object_->generate_request_header (opdetails,
- spec,
- msg) == -1)
- return 0;
-
- return 1;
-}
-
-int
-TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
- CORBA::Octet minor)
-{
- this->messaging_object_->init (major,
- minor);
- return 1;
-}
-
-// @@ Frank: Hopefully DIOP doesn't need this
-/*
-int
-TAO_DIOP_Transport::tear_listen_point_list (TAO_InputCDR &cdr)
-{
- CORBA::Boolean byte_order;
- if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
- return -1;
-
- cdr.reset_byte_order (ACE_static_cast(int,byte_order));
-
- DIOP::ListenPointList listen_list;
- if ((cdr >> listen_list) == 0)
- return -1;
-
- // As we have received a bidirectional information, set the flag to
- // 1
- this->bidirectional_flag (1);
- return this->connection_handler_->process_listen_point_list (listen_list);
-}
-*/
-
-int
-TAO_DIOP_Transport::process_message (void)
-{
- // Get the <message_type> that we have received
- TAO_Pluggable_Message_Type t =
- this->messaging_object_->message_type ();
-
-
- int result = 0;
- if (t == TAO_PLUGGABLE_MESSAGE_CLOSECONNECTION)
- {
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("Close Connection Message recd \n")));
-
- this->tms_->connection_closed ();
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REQUEST)
- {
- if (this->messaging_object_->process_request_message (this,
- this->orb_core ()) == -1)
- return -1;
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_REPLY)
- {
- TAO_Pluggable_Reply_Params params (this->orb_core ());
- if (this->messaging_object_->process_reply_message (params) == -1)
- {
-
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("TAO (%P|%t) - %p\n"),
- ACE_TEXT ("DIOP_Transport::process_message, process_reply_message ()")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
-
- result =
- this->tms_->dispatch_reply (params);
-
- // @@ Somehow it seems dangerous to reset the state *after*
- // dispatching the request, what if another threads receives
- // another reply in the same connection?
- // My guess is that it works as follows:
- // - For the exclusive case there can be no such thread.
- // - The the muxed case each thread has its own message_state.
- // I'm pretty sure this comment is right. Could somebody else
- // please look at it and confirm my guess?
-
- // @@ The above comment was found in the older versions of the
- // code. The code was also written in such a way that, when
- // the client thread on a call from handle_input () from the
- // reactor a call would be made on the handle_client_input
- // (). The implementation of handle_client_input () looked so
- // flaky. It used to create a message state upon entry in to
- // the function using the TMS and destroy that on exit. All
- // this was fine _theoretically_ for multiple threads. But
- // the flakiness was originating in the implementation of
- // get_message_state () where we were creating message state
- // only once and dishing it out for every thread till one of
- // them destroy's it. So, it looked broken. That has been
- // changed. Why?. To my knowledge, the reactor does not call
- // handle_input () on two threads at the same time. So, IMHO
- // that defeats the purpose of creating a message state for
- // every thread. This is just my guess. If we run in to
- // problems this place needs to be revisited. If someone else
- // is going to take a look please contact bala@cs.wustl.edu
- // for details on this-- Bala
-
-
-
- if (result == -1)
- {
- // Something really critical happened, we will forget about
- // every reply on this connection.
- if (TAO_debug_level > 0)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("TAO (%P|%t) : DIOP_Client_Transport::")
- ACE_TEXT ("handle_client_input - ")
- ACE_TEXT ("dispatch reply failed\n")));
-
- this->messaging_object_->reset ();
- this->tms_->connection_closed ();
- return -1;
- }
-
- if (result == 0)
- {
- this->messaging_object_->reset ();
-
- // The reply dispatcher was no longer registered.
- // This can happened when the request/reply
- // times out.
- // To throw away all registered reply handlers is
- // not the right thing, as there might be just one
- // old reply coming in and several valid new ones
- // pending. If we would invoke <connection_closed>
- // we would throw away also the valid ones.
- //return 0;
- }
-
-
- // This is a NOOP for the Exclusive request case, but it actually
- // destroys the stream in the muxed case.
- //this->tms_->destroy_message_state (message_state);
- }
- else if (t == TAO_PLUGGABLE_MESSAGE_MESSAGERROR)
- {
- return -1;
- }
-
- return this->messaging_object_->more_messages ();
-}
-
-// @@ Frank: Hopefully DIOP doesn't need this
-/*
-void
-TAO_DIOP_Transport::set_bidir_context_info (TAO_Operation_Details &opdetails)
-{
-
- // Get a handle on to the acceptor registry
- TAO_Acceptor_Registry * ar =
- this->orb_core ()->acceptor_registry ();
-
-
- // Get the first acceptor in the registry
- TAO_AcceptorSetIterator acceptor = ar->begin ();
-
- DIOP::ListenPointList listen_point_list;
-
- for (;
- acceptor != ar->end ();
- acceptor++)
- {
- // Check whether it is a DIOP acceptor
- if ((*acceptor)->tag () == TAO_TAG_UDP_PROFILE)
- {
- this->get_listen_point (listen_point_list,
- *acceptor);
- }
- }
-
- // We have the ListenPointList at this point. Create a output CDR
- // stream at this point
- TAO_OutputCDR cdr;
-
- // Marshall the information into the stream
- if ((cdr << ACE_OutputCDR::from_boolean (TAO_ENCAP_BYTE_ORDER)== 0)
- || (cdr << listen_point_list) == 0)
- return;
-
- // Add this info in to the svc_list
- opdetails.service_context ().set_context (IOP::BI_DIR_DIOP,
- cdr);
-
- return;
-}
-
-
-int
-TAO_DIOP_Transport::get_listen_point (
- DIOP::ListenPointList &listen_point_list,
- TAO_Acceptor *acceptor)
-{
- TAO_DIOP_Acceptor *iiop_acceptor =
- ACE_dynamic_cast (TAO_DIOP_Acceptor *,
- acceptor );
-
- // Get the array of endpoints serviced by <iiop_acceptor>
- const ACE_INET_Addr *endpoint_addr =
- iiop_acceptor->endpoints ();
-
- // Get the count
- size_t count =
- iiop_acceptor->endpoint_count ();
-
- // Get the local address of the connection
- ACE_INET_Addr local_addr;
-
- if (this->connection_handler_->peer ().get_local_addr (local_addr)
- == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local host")
- ACE_TEXT (" address in set_bidir_context_info () \n")),
- -1);
- }
-
-
- // Note: Looks like there is no point in sending the list of
- // endpoints on interfaces on which this connection has not
- // been established. If this is wrong, please correct me.
- char *local_interface = 0;
-
- // Get the hostname for the local address
- if (iiop_acceptor->hostname (this->orb_core_,
- local_addr,
- local_interface) == -1)
- {
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) Could not resolve local host")
- ACE_TEXT (" name \n")),
- -1);
- }
-
- ACE_INET_Addr *tmp_addr = ACE_const_cast (ACE_INET_Addr *,
- endpoint_addr);
-
- for (size_t index = 0;
- index <= count;
- index++)
- {
- // Get the listen point on that acceptor if it has the same
- // interface on which this connection is established
- char *acceptor_interface = 0;
-
- if (iiop_acceptor->hostname (this->orb_core_,
- tmp_addr[index],
- acceptor_interface) == -1)
- continue;
-
- // @@ This is very bad for performance, but it is a one time
- // affair
- if (ACE_OS::strcmp (local_interface,
- acceptor_interface) == 0)
- {
- // We have the connection and the acceptor endpoint on the
- // same interface
- DIOP::ListenPoint point;
- point.host = CORBA::string_dup (local_interface);
- point.port = endpoint_addr[index].get_port_number ();
-
- // Get the count of the number of elements
- CORBA::ULong len = listen_point_list.length ();
-
- // Increase the length by 1
- listen_point_list.length (len + 1);
-
- // Add the new length to the list
- listen_point_list[len] = point;
- }
-
- // @@ This is bad....
- CORBA::string_free (acceptor_interface);
- }
-
- CORBA::string_free (local_interface);
- return 1;
-}
-*/