summaryrefslogtreecommitdiff
path: root/TAO/tao/Strategies/DIOP_Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/Strategies/DIOP_Transport.cpp')
-rw-r--r--TAO/tao/Strategies/DIOP_Transport.cpp355
1 files changed, 355 insertions, 0 deletions
diff --git a/TAO/tao/Strategies/DIOP_Transport.cpp b/TAO/tao/Strategies/DIOP_Transport.cpp
new file mode 100644
index 00000000000..22de0661c8a
--- /dev/null
+++ b/TAO/tao/Strategies/DIOP_Transport.cpp
@@ -0,0 +1,355 @@
+// $Id$
+
+#include "tao/Strategies/DIOP_Transport.h"
+
+#if defined (TAO_HAS_DIOP) && (TAO_HAS_DIOP != 0)
+
+#include "tao/Strategies/DIOP_Connection_Handler.h"
+#include "tao/Strategies/DIOP_Acceptor.h"
+#include "tao/Strategies/DIOP_Profile.h"
+#include "tao/Acceptor_Registry.h"
+#include "tao/operation_details.h"
+#include "tao/Timeprobe.h"
+#include "tao/CDR.h"
+#include "tao/Transport_Mux_Strategy.h"
+#include "tao/Wait_Strategy.h"
+#include "tao/Stub.h"
+#include "tao/ORB_Core.h"
+#include "tao/debug.h"
+#include "tao/Resume_Handle.h"
+#include "tao/GIOP_Message_Base.h"
+#include "tao/GIOP_Message_Lite.h"
+
+ACE_RCSID (tao, DIOP_Transport, "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_DIOP_Transport::TAO_DIOP_Transport (TAO_DIOP_Connection_Handler *handler,
+ TAO_ORB_Core *orb_core,
+ CORBA::Boolean flag)
+ : TAO_Transport (TAO_TAG_DIOP_PROFILE,
+ orb_core)
+ , connection_handler_ (handler)
+ , messaging_object_ (0)
+{
+/*
+ * Hook to customize the messaging object when the concrete messaging
+ * object is known a priori. In this case, the flag is ignored.
+ */
+//@@ MESSAGING_SPL_COMMENT_HOOK_START
+
+ // @@ Michael: Set the input CDR size to ACE_MAX_DGRAM_SIZE so that
+ // we read the whole UDP packet on a single read.
+ if (flag)
+ {
+ // Use the lite version of the protocol
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Lite (orb_core,
+ ACE_MAX_DGRAM_SIZE));
+ }
+ else
+ {
+ // Use the normal GIOP object
+ ACE_NEW (this->messaging_object_,
+ TAO_GIOP_Message_Base (orb_core,
+ this,
+ ACE_MAX_DGRAM_SIZE));
+ }
+
+//@@ MESSAGING_SPL_COMMENT_HOOK_END
+
+}
+
+TAO_DIOP_Transport::~TAO_DIOP_Transport (void)
+{
+ delete this->messaging_object_;
+}
+
+ACE_Event_Handler *
+TAO_DIOP_Transport::event_handler_i (void)
+{
+ return this->connection_handler_;
+}
+
+TAO_Connection_Handler *
+TAO_DIOP_Transport::connection_handler_i (void)
+{
+ return this->connection_handler_;
+}
+
+TAO_Pluggable_Messaging *
+TAO_DIOP_Transport::messaging_object (void)
+{
+ return this->messaging_object_;
+}
+
+ssize_t
+TAO_DIOP_Transport::send (iovec *iov, int iovcnt,
+ size_t &bytes_transferred,
+ const ACE_Time_Value *)
+{
+ const ACE_INET_Addr &addr = this->connection_handler_->addr ();
+
+ ssize_t bytes_to_send = 0;
+ for (int i = 0; i < iovcnt; i++)
+ bytes_to_send += iov[i].iov_len;
+
+ this->connection_handler_->dgram ().send (iov,
+ iovcnt,
+ addr);
+ // @@ Michael:
+ // Always return a positive number of bytes sent, as we do
+ // not handle sending errors in DIOP.
+
+ bytes_transferred = bytes_to_send;
+
+ return 1;
+}
+
+ssize_t
+TAO_DIOP_Transport::recv (char *buf,
+ size_t len,
+ const ACE_Time_Value * /* max_wait_time */)
+{
+ ACE_INET_Addr from_addr;
+
+ ssize_t n = this->connection_handler_->dgram ().recv (buf,
+ len,
+ from_addr);
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_DIOP_Transport::recv_i: received %d bytes from %s:%d %d\n",
+ n,
+ ACE_TEXT_CHAR_TO_TCHAR (from_addr.get_host_name ()),
+ from_addr.get_port_number (),
+ errno));
+ }
+
+ // Most of the errors handling is common for
+ // Now the message has been read
+ if (n == -1 && TAO_debug_level > 4)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO (%P|%t) - %p \n"),
+ ACE_TEXT ("TAO - read message failure ")
+ ACE_TEXT ("recv () \n")));
+ }
+
+ // Error handling
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ return 0;
+
+ return -1;
+ }
+ // @@ What are the other error handling here??
+ else if (n == 0)
+ {
+ return -1;
+ }
+
+ // Remember the from addr to eventually use it as remote
+ // addr for the reply.
+ this->connection_handler_->addr (from_addr);
+
+ return n;
+}
+
+int
+TAO_DIOP_Transport::handle_input (TAO_Resume_Handle &rh,
+ ACE_Time_Value *max_wait_time,
+ int /*block*/)
+{
+ // If there are no messages then we can go ahead to read from the
+ // handle for further reading..
+
+ // The buffer on the stack which will be used to hold the input
+ // messages
+ char buf [ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT];
+
+#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
+ (void) ACE_OS::memset (buf,
+ '\0',
+ sizeof buf);
+#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
+
+ // Create a data block
+ ACE_Data_Block db (sizeof (buf),
+ ACE_Message_Block::MB_DATA,
+ buf,
+ this->orb_core_->input_cdr_buffer_allocator (),
+ this->orb_core_->locking_strategy (),
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->input_cdr_dblock_allocator ());
+
+ // Create a message block
+ ACE_Message_Block message_block (&db,
+ ACE_Message_Block::DONT_DELETE,
+ this->orb_core_->input_cdr_msgblock_allocator ());
+
+
+ // Align the message block
+ ACE_CDR::mb_align (&message_block);
+
+
+ // Read the message into the message block that we have created on
+ // the stack.
+ ssize_t n = this->recv (message_block.rd_ptr (),
+ message_block.space (),
+ max_wait_time);
+
+ // If there is an error return to the reactor..
+ if (n <= 0)
+ {
+ if (n == -1)
+ this->tms_->connection_closed ();
+
+ return n;
+ }
+
+ // Set the write pointer in the stack buffer
+ message_block.wr_ptr (n);
+
+ // Make a node of the message block..
+ TAO_Queued_Data qd (&message_block);
+ size_t mesg_length;
+
+ // Parse the incoming message for validity. The check needs to be
+ // performed by the messaging objects.
+ if (this->messaging_object ()->parse_next_message (message_block,
+ qd,
+ mesg_length) == -1)
+ return -1;
+
+ if (qd.missing_data_ == TAO_MISSING_DATA_UNDEFINED)
+ {
+ // parse/marshal error
+ return -1;
+ }
+
+ if (message_block.length () > mesg_length)
+ {
+ // we read too much data
+ return -1;
+ }
+
+ // NOTE: We are not performing any queueing nor any checking for
+ // missing data. We are assuming that ALL the data would be got in a
+ // single read.
+
+ // Process the message
+ return this->process_parsed_messages (&qd, rh);
+}
+
+
+int
+TAO_DIOP_Transport::register_handler (void)
+{
+ // @@ Michael:
+ //
+ // We do never register register the handler with the reactor
+ // as we never need to be informed about any incoming data,
+ // assuming we only use one-ways.
+ // If we would register and ICMP Messages would arrive, e.g
+ // due to a not reachable server, we would get informed - as this
+ // disturbs the general DIOP assumptions of not being
+ // interested in any network failures, we ignore ICMP messages.
+ return 0;
+}
+
+
+int
+TAO_DIOP_Transport::send_request (TAO_Stub *stub,
+ TAO_ORB_Core *orb_core,
+ TAO_OutputCDR &stream,
+ int message_semantics,
+ ACE_Time_Value *max_wait_time)
+{
+ if (this->ws_->sending_request (orb_core,
+ message_semantics) == -1)
+ return -1;
+
+ if (this->send_message (stream,
+ stub,
+ message_semantics,
+ max_wait_time) == -1)
+
+ return -1;
+
+ this->first_request_sent();
+
+ return 0;
+}
+
+int
+TAO_DIOP_Transport::send_message (TAO_OutputCDR &stream,
+ TAO_Stub *stub,
+ int message_semantics,
+ ACE_Time_Value *max_wait_time)
+{
+ // Format the message in the stream first
+ if (this->messaging_object_->format_message (stream) != 0)
+ return -1;
+
+ // Strictly speaking, should not need to loop here because the
+ // socket never gets set to a nonblocking mode ... some Linux
+ // versions seem to need it though. Leaving it costs little.
+
+ // This guarantees to send all data (bytes) or return an error.
+ ssize_t n = this->send_message_shared (stub,
+ message_semantics,
+ stream.begin (),
+ max_wait_time);
+
+ if (n == -1)
+ {
+ if (TAO_debug_level)
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TAO: (%P|%t|%N|%l) closing transport %d after fault %p\n"),
+ this->id (),
+ ACE_TEXT ("send_message ()\n")));
+
+ return -1;
+ }
+
+ return 1;
+}
+
+int
+TAO_DIOP_Transport::send_message_shared (TAO_Stub *stub,
+ int message_semantics,
+ const ACE_Message_Block *message_block,
+ ACE_Time_Value *max_wait_time)
+{
+ int result;
+
+ {
+ ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
+
+ result =
+ this->send_message_shared_i (stub, message_semantics,
+ message_block, max_wait_time);
+ }
+
+ if (result == -1)
+ {
+ this->close_connection ();
+ }
+
+ return result;
+}
+
+int
+TAO_DIOP_Transport::messaging_init (CORBA::Octet major,
+ CORBA::Octet minor)
+{
+ this->messaging_object_->init (major, minor);
+ return 1;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL
+
+#endif /* TAO_HAS_DIOP && TAO_HAS_DIOP != 0 */