summaryrefslogtreecommitdiff
path: root/TAO/tao/GIOP_Message_Base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/tao/GIOP_Message_Base.cpp')
-rw-r--r--TAO/tao/GIOP_Message_Base.cpp830
1 files changed, 700 insertions, 130 deletions
diff --git a/TAO/tao/GIOP_Message_Base.cpp b/TAO/tao/GIOP_Message_Base.cpp
index aa0efbb8b97..8d266a30a21 100644
--- a/TAO/tao/GIOP_Message_Base.cpp
+++ b/TAO/tao/GIOP_Message_Base.cpp
@@ -1,17 +1,17 @@
// $Id$
-#include "GIOP_Message_Base.h"
-#include "operation_details.h"
-#include "debug.h"
-#include "ORB_Core.h"
-#include "TAO_Server_Request.h"
-#include "GIOP_Message_Locate_Header.h"
-#include "Transport.h"
-#include "Transport_Mux_Strategy.h"
-#include "LF_Strategy.h"
-#include "Request_Dispatcher.h"
-#include "Codeset_Manager.h"
-#include "SystemException.h"
+#include "tao/GIOP_Message_Base.h"
+#include "tao/operation_details.h"
+#include "tao/debug.h"
+#include "tao/ORB_Core.h"
+#include "tao/TAO_Server_Request.h"
+#include "tao/GIOP_Message_Locate_Header.h"
+#include "tao/Transport.h"
+#include "tao/Transport_Mux_Strategy.h"
+#include "tao/LF_Strategy.h"
+#include "tao/Request_Dispatcher.h"
+#include "tao/Codeset_Manager.h"
+#include "tao/SystemException.h"
/*
* Hook to add additional include files during specializations.
@@ -22,22 +22,27 @@ ACE_RCSID (tao,
GIOP_Message_Base,
"$Id$")
-TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
- size_t /*input_cdr_size*/)
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core * orb_core,
+ TAO_Transport * transport,
+ size_t /* input_cdr_size */)
: orb_core_ (orb_core)
- , message_state_ ()
- , out_stream_ (this->buffer_,
- sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
- TAO_ENCAP_BYTE_ORDER,
- orb_core->output_cdr_buffer_allocator (),
- orb_core->output_cdr_dblock_allocator (),
- orb_core->output_cdr_msgblock_allocator (),
- orb_core->orb_params ()->cdr_memcpy_tradeoff (),
- TAO_DEF_GIOP_MAJOR,
- TAO_DEF_GIOP_MINOR)
+ , message_state_ ()
+ , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
+ , out_stream_ (this->buffer_,
+ sizeof this->buffer_, /* ACE_CDR::DEFAULT_BUFSIZE */
+ TAO_ENCAP_BYTE_ORDER,
+ orb_core->output_cdr_buffer_allocator (),
+ orb_core->output_cdr_dblock_allocator (),
+ orb_core->output_cdr_msgblock_allocator (),
+ orb_core->orb_params ()->cdr_memcpy_tradeoff (),
+ fragmentation_strategy_.get (),
+ TAO_DEF_GIOP_MAJOR,
+ TAO_DEF_GIOP_MINOR)
{
#if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
- ACE_OS::memset(buffer_, 0, sizeof (buffer_));
+ ACE_OS::memset (this->buffer_, 0, sizeof (buffer_));
#endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
}
@@ -196,7 +201,7 @@ TAO_GIOP_Message_Base::generate_reply_header (
ACE_TRY
{
// Now call the implementation for the rest of the header
- int result =
+ int const result =
generator_parser->write_reply_header (cdr,
params
ACE_ENV_ARG_PARAMETER);
@@ -216,7 +221,7 @@ TAO_GIOP_Message_Base::generate_reply_header (
{
if (TAO_debug_level > 4)
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
- "TAO_GIOP_Message_Base::generate_reply_header");
+ ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
return -1;
}
@@ -225,12 +230,39 @@ TAO_GIOP_Message_Base::generate_reply_header (
return 0;
}
-
int
-TAO_GIOP_Message_Base::read_message (TAO_Transport * /*transport*/,
- int /*block */,
- ACE_Time_Value * /*max_wait_time*/)
+TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
+ CORBA::ULong request_id)
{
+ // Get a parser for us
+ TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
+
+ CORBA::Octet major, minor;
+
+ cdr.get_version (major, minor);
+
+ // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
+ // supports them in 1.2 or better since GIOP 1.1 fragments do not
+ // have a fragment message header.
+ if (major == 1 && minor < 2)
+ return -1;
+
+ // Get the state information that we need to use
+ this->set_state (major,
+ minor,
+ generator_parser);
+
+ // Write the GIOP header first
+ if (!this->write_protocol_header (TAO_GIOP_FRAGMENT, cdr)
+ || !generator_parser->write_fragment_header (cdr, request_id))
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("(%P|%t) Error in writing GIOP header \n")));
+
+ return -1;
+ }
+
return 0;
}
@@ -238,11 +270,12 @@ int
TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
{
// Ptr to first buffer.
- char *buf = (char *) stream.buffer ();
+ char * buf = (char *) stream.buffer ();
+
+ this->set_giop_flags (stream);
// Length of all buffers.
- size_t total_len =
- stream.total_length ();
+ size_t const total_len = stream.total_length ();
// NOTE: Here would also be a fine place to calculate a digital
// signature for the message and place it into a preallocated slot
@@ -251,7 +284,7 @@ TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream)
// this particular environment and that isn't handled by the
// networking infrastructure (e.g., IPSEC).
- CORBA::ULong bodylen = static_cast <CORBA::ULong>
+ CORBA::ULong const bodylen = static_cast <CORBA::ULong>
(total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
#if !defined (ACE_ENABLE_SWAP_ON_WRITE)
@@ -307,6 +340,7 @@ TAO_GIOP_Message_Base::message_type (
case TAO_GIOP_LOCATEREPLY:
return TAO_PLUGGABLE_MESSAGE_LOCATEREPLY;
+
case TAO_GIOP_REPLY:
return TAO_PLUGGABLE_MESSAGE_REPLY;
@@ -317,13 +351,19 @@ TAO_GIOP_Message_Base::message_type (
return TAO_PLUGGABLE_MESSAGE_FRAGMENT;
case TAO_GIOP_MESSAGERROR:
+ return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
+
case TAO_GIOP_CANCELREQUEST:
- // Does it happen? why??
+ return TAO_PLUGGABLE_MESSAGE_CANCELREQUEST;
+
default:
+ if (TAO_debug_level > 0)
+ {
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("TAO (%P|%t) %N:%l message_type : ")
ACE_TEXT ("wrong message.\n")));
}
+ }
return TAO_PLUGGABLE_MESSAGE_MESSAGERROR;
}
@@ -331,31 +371,50 @@ TAO_GIOP_Message_Base::message_type (
int
TAO_GIOP_Message_Base::parse_incoming_messages (ACE_Message_Block &incoming)
{
+ this->message_state_.reset ();
+
return this->message_state_.parse_message_header (incoming);
}
-ssize_t
-TAO_GIOP_Message_Base::missing_data (ACE_Message_Block &incoming)
+int
+TAO_GIOP_Message_Base::parse_next_message (ACE_Message_Block &incoming,
+ TAO_Queued_Data &qd,
+ size_t &mesg_length)
{
- // Actual message size including the header..
- CORBA::ULong msg_size =
- this->message_state_.message_size ();
-
- size_t len = incoming.length ();
-
- // If we have too many messages or if we have less than even a size
- // of the GIOP header then ..
- if (len > msg_size ||
- len < TAO_GIOP_MESSAGE_HEADER_LEN)
+ if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
{
- return -1;
+ qd.missing_data_ = TAO_MISSING_DATA_UNDEFINED;
+
+ return 0; /* incomplete header */
}
- else if (len == msg_size)
- return 0;
+ else
+ {
+ TAO_GIOP_Message_State state;
- return msg_size - len;
-}
+ if (state.parse_message_header (incoming) == -1)
+ {
+ return -1;
+ }
+
+ const size_t message_size = state.message_size (); /* Header + Payload */
+
+ if (message_size > incoming.length ())
+ {
+ qd.missing_data_ = message_size - incoming.length ();
+ }
+ else
+ {
+ qd.missing_data_ = 0;
+ }
+
+ /* init out-parameters */
+ this->init_queued_data (&qd, state);
+ mesg_length = TAO_GIOP_MESSAGE_HEADER_LEN
+ + state.payload_size ();
+ return 1; /* complete header */
+ }
+}
int
TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
@@ -365,15 +424,45 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
{
if (incoming.length () > 0)
{
- // Make a node which has a message block of the size of
- // MESSAGE_HEADER_LEN.
- qd =
- this->make_queued_data (TAO_GIOP_MESSAGE_HEADER_LEN);
+ // Optimize memory usage, we dont know actual message size
+ // so far, but allocate enough space to hold small GIOP
+ // messages. This way we avoid expensive "grow" operation
+ // for small messages.
+ const size_t default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
+
+ // Make a node which has at least message block of the size
+ // of MESSAGE_HEADER_LEN.
+ const size_t buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
+ default_buf_size);
+
+ // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
+
+ qd = this->make_queued_data (buf_size);
+
+ if (qd == 0)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
+ ACE_TEXT ("out of memory\n")));
+ }
+ return -1;
+ }
qd->msg_block_->copy (incoming.rd_ptr (),
incoming.length ());
- qd->missing_data_ = -1;
+
+ incoming.rd_ptr (incoming.length ()); // consume all available data
+
+ qd->missing_data_ = TAO_MISSING_DATA_UNDEFINED;
+ }
+ else
+ {
+ // handle not initialized variables
+ qd = 0; // reset
}
+
return 0;
}
@@ -387,12 +476,26 @@ TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
qd = this->make_queued_data (copying_len);
+ if (qd == 0)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
+ ACE_TEXT ("out of memory\n")));
+ }
+ return -1;
+ }
+
if (copying_len > incoming.length ())
{
qd->missing_data_ = copying_len - incoming.length ();
-
copying_len = incoming.length ();
}
+ else
+ {
+ qd->missing_data_ = 0;
+ }
qd->msg_block_->copy (incoming.rd_ptr (),
copying_len);
@@ -408,35 +511,71 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
ACE_Message_Block &incoming)
{
// Look to see whether we had atleast parsed the GIOP header ...
- if (qd->missing_data_ == -1)
+ if (qd->missing_data_ == TAO_MISSING_DATA_UNDEFINED)
{
// The data length that has been stuck in there during the last
// read ....
- size_t len =
+ size_t const len =
qd->msg_block_->length ();
+ // paranoid check
+ if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ // inconsistency - this code should have parsed the header
+ // so far
+ return -1;
+ }
+
// We know that we would have space for
// TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
// from the <incoming> into the message block in <qd>
- qd->msg_block_->copy (incoming.rd_ptr (),
- TAO_GIOP_MESSAGE_HEADER_LEN - len);
+ const size_t available = incoming.length ();
+ const size_t desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
+ const size_t n_copy = ace_min (available, desired);
+
+ // paranoid check, but would cause endless looping
+ if (n_copy == 0)
+ {
+ return -1;
+ }
+
+ if (qd->msg_block_->copy (incoming.rd_ptr (),
+ n_copy) == -1)
+ {
+ return -1;
+ }
// Move the rd_ptr () in the incoming message block..
- incoming.rd_ptr (TAO_GIOP_MESSAGE_HEADER_LEN - len);
+ incoming.rd_ptr (n_copy);
+
+ // verify sufficient data to parse GIOP header
+ if (qd->msg_block_->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
+ {
+ return 0; /* continue */
+ }
TAO_GIOP_Message_State state;
// Parse the message header now...
if (state.parse_message_header (*qd->msg_block_) == -1)
- return -1;
-
- // Now grow the message block so that we can copy the rest of
- // the data...
- if (qd->msg_block_->space () < state.message_size ())
{
- ACE_CDR::grow (qd->msg_block_,
- state.message_size ());
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
+ ACE_TEXT ("error parsing header\n") ));
+ }
+ return -1;
}
+ // Now grow the message block so that we can copy the rest of
+ // the data, the message_block must be able to hold complete message
+ if (ACE_CDR::grow (qd->msg_block_,
+ state.message_size ()) == -1) /* GIOP_Header + Payload */
+ {
+ // on mem-error get rid of context silently, try to avoid
+ // system calls that might allocate additional memory
+ return -1;
+ }
// Copy the pay load..
// Calculate the bytes that needs to be copied in the queue...
@@ -459,8 +598,11 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
// ..now we are set to copy the right amount of data to the
// node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
+ if (qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len) == -1)
+ {
+ return -1;
+ }
// Set the <rd_ptr> of the <incoming>..
incoming.rd_ptr (copy_len);
@@ -482,10 +624,19 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
copy_len = incoming.length ();
}
+ // paranoid check for endless-event-looping
+ if (copy_len == 0)
+ {
+ return -1;
+ }
+
// Copy the right amount of data in to the node...
// node..
- qd->msg_block_->copy (incoming.rd_ptr (),
- copy_len);
+ if (qd->msg_block_->copy (incoming.rd_ptr (),
+ copy_len) == -1)
+ {
+ return -1;
+ }
// Set the <rd_ptr> of the <incoming>..
qd->msg_block_->rd_ptr (copy_len);
@@ -495,16 +646,6 @@ TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
return 0;
}
-void
-TAO_GIOP_Message_Base::get_message_data (TAO_Queued_Data *qd)
-{
- // Get the message information
- this->init_queued_data (qd, this->message_state_);
-
- // Reset the message_state
- this->message_state_.reset ();
-}
-
int
TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
TAO_Queued_Data *qd)
@@ -544,12 +685,13 @@ TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
this->orb_core_->input_cdr_dblock_allocator (),
this->orb_core_->input_cdr_msgblock_allocator (),
this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
+ this->fragmentation_strategy_.get (),
qd->major_version_,
qd->minor_version_);
// Get the read and write positions before we steal data.
size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
- size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level > 0)
@@ -642,7 +784,7 @@ TAO_GIOP_Message_Base::process_reply_message (
// Get the read and write positions before we steal data.
size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
- size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ size_t const wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
if (TAO_debug_level > 0)
@@ -704,8 +846,8 @@ TAO_GIOP_Message_Base::process_reply_message (
// every reply on this connection.
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- "TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, "
- "dispatch reply failed\n",
+ ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_parsed_messages, ")
+ ACE_TEXT ("dispatch reply failed\n"),
params.transport_->id ()));
}
@@ -738,7 +880,8 @@ TAO_GIOP_Message_Base::generate_exception_reply (
// happened -> no hope, close connection.
// Close the handle.
- ACE_DEBUG ((LM_DEBUG,
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
ACE_TEXT ("generate_exception_reply ()\n")));
return -1;
@@ -754,34 +897,36 @@ TAO_GIOP_Message_Base::write_protocol_header (TAO_GIOP_Message_Type type,
TAO_OutputCDR &msg)
{
// Reset the message type
- // Reset the message type
msg.reset ();
CORBA::Octet header[12] =
- {
- // The following works on non-ASCII platforms, such as MVS (which
- // uses EBCDIC).
- 0x47, // 'G'
- 0x49, // 'I'
- 0x4f, // 'O'
- 0x50 // 'P'
- };
+ {
+ // The following works on non-ASCII platforms, such as MVS (which
+ // uses EBCDIC).
+ 0x47, // 'G'
+ 0x49, // 'I'
+ 0x4f, // 'O'
+ 0x50 // 'P'
+ };
CORBA::Octet major, minor = 0;
- msg.get_version (major, minor);
+
+ (void) msg.get_version (major, minor);
header[4] = major;
header[5] = minor;
- // We are putting the byte order. But at a later date if we support
- // fragmentation and when we want to use the other 6 bits in this
- // octet we can have a virtual function do this for us as the
- // version info , Bala
- header[6] = (TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
+ // "flags" octet, i.e. header[6] will be set up later when message
+ // is formatted by the transport.
+
+ header[7] = CORBA::Octet (type); // Message type
- header[7] = CORBA::Octet(type);
+ static ACE_CDR::ULong const header_size =
+ sizeof (header) / sizeof (header[0]);
- static int header_size = sizeof (header) / sizeof (header[0]);
+ // Fragmentation should not occur at this point since there are only
+ // 12 bytes in the stream, and fragmentation may only occur when
+ // the stream length >= 16.
msg.write_octet_array (header, header_size);
return msg.good_bit ();
@@ -830,11 +975,11 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
CORBA::Object_var forward_to;
/*
- * Hook to specialize request processing within TAO
+ * Hook to specialize request processing within TAO
* This hook will be replaced by specialized request
* processing implementation.
*/
-//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
+//@@ TAO_DISPATCH_RESOLUTION_OPT_COMMENT_HOOK_START
// Do this before the reply is sent.
this->orb_core_->request_dispatcher ()->dispatch (
@@ -848,15 +993,28 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
if (!CORBA::is_nil (forward_to.in ()))
{
+ const CORBA::Boolean permanent_forward_condition =
+ this->orb_core_->is_permanent_forward_condition
+ (forward_to.in (),
+ request.request_service_context ());
+
// We should forward to another object...
TAO_Pluggable_Reply_Params_Base reply_params;
reply_params.request_id_ = request_id;
- reply_params.reply_status_ = TAO_GIOP_LOCATION_FORWARD;
+ reply_params.reply_status_ =
+ permanent_forward_condition
+ ? TAO_GIOP_LOCATION_FORWARD_PERM
+ : TAO_GIOP_LOCATION_FORWARD;
reply_params.svc_ctx_.length (0);
// Send back the reply service context.
reply_params.service_context_notowned (&request.reply_service_info ());
+ output.message_attributes (request_id,
+ 0,
+ TAO_Transport::TAO_REPLY,
+ 0);
+
// Make the GIOP header and Reply header
this->generate_reply_header (output,
reply_params);
@@ -871,6 +1029,8 @@ TAO_GIOP_Message_Base::process_request (TAO_Transport *transport,
return -1;
}
+ output.more_fragments (false);
+
int result = transport->send_message (output,
0,
TAO_Transport::TAO_REPLY);
@@ -1150,6 +1310,8 @@ TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
request.request_id (),
status_info);
+ output.more_fragments (false);
+
// Send the message
int result = transport->send_message (output,
0,
@@ -1323,8 +1485,8 @@ TAO_GIOP_Message_Base::
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
- "TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -"
- " connection already closed\n"));
+ ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::send_close_connection -")
+ ACE_TEXT (" connection already closed\n")));
return;
}
#endif
@@ -1345,14 +1507,14 @@ TAO_GIOP_Message_Base::
{
if (TAO_debug_level > 0)
ACE_ERROR ((LM_ERROR,
- "(%P|%t) error closing connection %u, errno = %d\n",
- transport->id (), errno));
+ ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
+ transport->id (), errno));
}
transport->close_connection ();
ACE_DEBUG ((LM_DEBUG,
- "(%P|%t) shut down transport, handle %d\n",
- transport-> id ()));
+ ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
+ transport-> id ()));
}
@@ -1389,6 +1551,8 @@ TAO_GIOP_Message_Base::send_reply_exception (
*x) == -1)
return -1;
+ output.more_fragments (false);
+
return transport->send_message (output,
0,
TAO_Transport::TAO_REPLY);
@@ -1434,7 +1598,8 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
char *tmp_id = 0;
if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REQUEST ||
- ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY)
+ ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_REPLY ||
+ ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == TAO_GIOP_FRAGMENT)
{
if (major == 1 && minor < 2)
{
@@ -1446,16 +1611,16 @@ TAO_GIOP_Message_Base::dump_msg (const char *label,
tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
}
#if !defined (ACE_DISABLE_SWAP_ON_READ)
- if (byte_order == TAO_ENCAP_BYTE_ORDER)
- {
- id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
- }
- else
- {
- ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
- }
+ if (byte_order == TAO_ENCAP_BYTE_ORDER)
+ {
+ id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
+ }
+ else
+ {
+ ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
+ }
#else
- id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
+ id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
#endif /* ACE_DISABLE_SWAP_ON_READ */
}
@@ -1520,6 +1685,17 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
TAO_Queued_Data::make_queued_data (
this->orb_core_->transport_message_buffer_allocator ());
+ if (qd == 0)
+ {
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
+ ACE_TEXT ("our of memory, failed to allocate queued data object\n")));
+ }
+ return 0; // NULL pointer
+ }
+
// @@todo: We have a similar method in Transport.cpp. Need to see how
// we can factor them out..
// Make a datablock for the size requested + something. The
@@ -1531,20 +1707,58 @@ TAO_GIOP_Message_Base::make_queued_data (size_t sz)
this->orb_core_->create_input_cdr_data_block (sz +
ACE_CDR::MAX_ALIGNMENT);
+ if (db == 0)
+ {
+ TAO_Queued_Data::release (qd);
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
+ ACE_TEXT ("out of memory, failed to allocate input data block of size %u\n"),
+ sz));
+ }
+ return 0; // NULL pointer
+ }
+
ACE_Allocator *alloc =
this->orb_core_->input_cdr_msgblock_allocator ();
+ if (alloc == 0)
+ {
+ if (TAO_debug_level >= 8)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("(%P|%t) - TAO_GIOP_Message_Base::make_queued_data,")
+ ACE_TEXT (" no ACE_Allocator defined\n")));
+ }
+ }
+
+
ACE_Message_Block mb (db,
0,
alloc);
ACE_Message_Block *new_mb = mb.duplicate ();
+ if (new_mb == 0)
+ {
+ TAO_Queued_Data::release (qd);
+ db->release();
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
+ ACE_TEXT ("out of memory, failed to allocate message block\n")));
+ }
+ return 0;
+ }
+
ACE_CDR::mb_align (new_mb);
qd->msg_block_ = new_mb;
-
return qd;
}
@@ -1570,13 +1784,369 @@ TAO_GIOP_Message_Base::fragment_header_length (CORBA::Octet major,
void
TAO_GIOP_Message_Base::init_queued_data (
- TAO_Queued_Data* qd,
- const TAO_GIOP_Message_State& state) const
+ TAO_Queued_Data* qd,
+ const TAO_GIOP_Message_State& state) const
{
qd->byte_order_ = state.byte_order_;
qd->major_version_ = state.giop_version_.major;
qd->minor_version_ = state.giop_version_.minor;
qd->more_fragments_ = state.more_fragments_;
- qd->request_id_ = state.request_id_;
qd->msg_type_ = this->message_type (state);
}
+
+int
+TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd, CORBA::ULong &request_id) const
+{
+ // Get a parser for us
+ TAO_GIOP_Message_Generator_Parser *generator_parser = 0;
+
+ // Get the state information that we need to use
+ this->set_state (qd->major_version_,
+ qd->minor_version_,
+ generator_parser);
+
+ // Get the read and write positions before we steal data.
+ size_t rd_pos = qd->msg_block_->rd_ptr () - qd->msg_block_->base ();
+ size_t wr_pos = qd->msg_block_->wr_ptr () - qd->msg_block_->base ();
+ rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
+
+ // Create a input CDR stream. We do the following
+ // 1 - If the incoming message block has a data block with a flag
+ // DONT_DELETE (for the data block) we create an input CDR
+ // stream the same way.
+ // 2 - If the incoming message block had a datablock from heap just
+ // use it by duplicating it and make the flag 0.
+ // NOTE: We use the same data block in which we read the message and
+ // we pass it on to the higher layers of the ORB. So we dont to any
+ // copies at all here. The same is also done in the higher layers.
+
+ ACE_Message_Block::Message_Flags flg = 0;
+ ACE_Data_Block *db = 0;
+
+ // Get the flag in the message block
+ flg = qd->msg_block_->self_flags ();
+
+ if (ACE_BIT_ENABLED (flg,
+ ACE_Message_Block::DONT_DELETE))
+ {
+ // Use the same datablock
+ db = qd->msg_block_->data_block ();
+ }
+ else
+ {
+ // Use a duplicated datablock as the datablock has come off the
+ // heap.
+ db = qd->msg_block_->data_block ()->duplicate ();
+ }
+
+
+ TAO_InputCDR input_cdr (db,
+ flg,
+ rd_pos,
+ wr_pos,
+ qd->byte_order_,
+ qd->major_version_,
+ qd->minor_version_,
+ this->orb_core_);
+
+ if (qd->major_version_ >= 1 &&
+ (qd->minor_version_ == 0 || qd->minor_version_ == 1))
+ {
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY)
+ {
+ IOP::ServiceContextList service_context;
+
+ if ( ! (input_cdr >> service_context &&
+ input_cdr >> request_id) )
+ {
+ return -1;
+ }
+
+ return 0;
+ }
+ else if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
+ {
+ if ( ! (input_cdr >> request_id) )
+ {
+ return -1;
+ }
+
+ return 0;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ if (qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_REPLY ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_FRAGMENT ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_CANCELREQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREQUEST ||
+ qd->msg_type_ == TAO_PLUGGABLE_MESSAGE_LOCATEREPLY)
+ {
+ // Dealing with GIOP-1.2, the request-id is located directly behind the GIOP-Header.
+ // This is true for all message types that might be sent in form of fragments or cancel-requests.
+ if ( ! (input_cdr >> request_id) )
+ {
+ return -1;
+ }
+
+ return 0;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ return -1;
+}
+
+/* @return -1 error, 0 ok, +1 outstanding fragments */
+int
+TAO_GIOP_Message_Base::consolidate_fragmented_message (TAO_Queued_Data *qd, TAO_Queued_Data *&msg)
+{
+ TAO::Incoming_Message_Stack reverse_stack;
+
+ TAO_Queued_Data *tail = 0;
+ TAO_Queued_Data *head = 0;
+
+ //
+ // CONSOLIDATE FRAGMENTED MESSAGE
+ //
+
+ // check for error-condition
+ if (qd == 0)
+ {
+ return -1;
+ }
+
+ if (qd->major_version_ == 1 && qd->minor_version_ == 0)
+ {
+ TAO_Queued_Data::release (qd);
+ return -1; // error: GIOP-1.0 does not support fragments
+ }
+
+ // If this is not the last fragment, push it onto stack for later processing
+ if (qd->more_fragments_)
+ {
+ this->fragment_stack_.push (qd);
+
+ msg = 0; // no consolidated message available yet
+ return 1; // status: more messages expected.
+ }
+
+ tail = qd; // init
+
+ // Add the current message block to the end of the chain
+ // after adjusting the read pointer to skip the header(s)
+ const size_t header_adjustment =
+ this->header_length () +
+ this->fragment_header_length (tail->major_version_,
+ tail->minor_version_);
+
+ if (tail->msg_block_->length () < header_adjustment)
+ {
+ // buffer length not sufficient
+ TAO_Queued_Data::release (qd);
+ return -1;
+ }
+
+ // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
+ if (tail->major_version_ == 1 && tail->minor_version_ == 1)
+ {
+ // GIOP-1.1
+
+ while (this->fragment_stack_.pop (head) != -1)
+ {
+ if (head->more_fragments_ &&
+ head->major_version_ == 1 &&
+ head->minor_version_ == 1 &&
+ head->msg_block_->length () >= header_adjustment)
+ {
+ // adjust the read-pointer, skip the fragment header
+ tail->msg_block_->rd_ptr(header_adjustment);
+
+ head->msg_block_->cont (tail->msg_block_);
+
+ tail->msg_block_ = 0;
+
+ TAO_Queued_Data::release (tail);
+
+ tail = head;
+ }
+ else
+ {
+ reverse_stack.push (head);
+ }
+ }
+ }
+ else
+ {
+ // > GIOP-1.2
+
+ CORBA::ULong tmp_request_id = 0;
+ if (this->parse_request_id (tail, tmp_request_id) == -1)
+ {
+ return -1;
+ }
+
+ const CORBA::ULong request_id = tmp_request_id;
+
+ while (this->fragment_stack_.pop (head) != -1)
+ {
+ CORBA::ULong head_request_id = 0;
+ int parse_status = 0;
+
+ if (head->more_fragments_ &&
+ head->major_version_ >= 1 &&
+ head->minor_version_ >= 2 &&
+ head->msg_block_->length () >= header_adjustment &&
+ (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
+ request_id == head_request_id)
+ {
+ // adjust the read-pointer, skip the fragment header
+ tail->msg_block_->rd_ptr(header_adjustment);
+
+ head->msg_block_->cont (tail->msg_block_);
+
+ tail->msg_block_ = 0;
+
+ TAO_Queued_Data::release (tail);
+
+ tail = head;
+ }
+ else
+ {
+ if (parse_status == -1)
+ {
+ TAO_Queued_Data::release (head);
+ return -1;
+ }
+
+ reverse_stack.push (head);
+ }
+ }
+ }
+
+ // restore stack
+ while (reverse_stack.pop (head) != -1)
+ {
+ this->fragment_stack_.push (head);
+ }
+
+ if (tail->consolidate () == -1)
+ {
+ // memory allocation failed
+ TAO_Queued_Data::release (tail);
+ return -1;
+ }
+
+ // set out value
+ msg = tail;
+
+ return 0;
+}
+
+
+int
+TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
+{
+ // We must extract the specific request-id from message-buffer
+ // and remove all fragments from stack that match this request-id.
+
+ TAO::Incoming_Message_Stack reverse_stack;
+
+ CORBA::ULong cancel_request_id;
+
+ if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
+ {
+ return -1;
+ }
+
+ TAO_Queued_Data *head = 0;
+
+ // Revert stack
+ while (this->fragment_stack_.pop (head) != -1)
+ {
+ reverse_stack.push (head);
+ }
+
+ bool discard_all_GIOP11_messages = false;
+
+ // Now we are able to process message in order they have arrived.
+ // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
+ // fragments belong to this message and must be discarded.
+ // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
+ // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
+ // having encoded the request id will be discarded.
+ while (reverse_stack.pop (head) != -1)
+ {
+ CORBA::ULong head_request_id;
+
+ if (head->major_version_ == 1 &&
+ head->minor_version_ <= 1 &&
+ head->msg_type_ != TAO_PLUGGABLE_MESSAGE_FRAGMENT && // GIOP11 fragment does not provide request id
+ this->parse_request_id (head, head_request_id) >= 0 &&
+ cancel_request_id == head_request_id)
+ {
+ TAO_Queued_Data::release (head);
+ discard_all_GIOP11_messages = true;
+ }
+ else if (head->major_version_ == 1 &&
+ head->minor_version_ <= 1 &&
+ discard_all_GIOP11_messages)
+ {
+ TAO_Queued_Data::release (head);
+ }
+ else if (head->major_version_ >= 1 &&
+ head->minor_version_ >= 2 &&
+ this->parse_request_id (head, head_request_id) >= 0 &&
+ cancel_request_id == head_request_id)
+ {
+ TAO_Queued_Data::release (head);
+ }
+ else
+ {
+ this->fragment_stack_.push (head);
+ }
+ }
+
+ return 0;
+}
+
+TAO_GIOP_Fragmentation_Strategy *
+TAO_GIOP_Message_Base::fragmentation_strategy (void)
+{
+ return this->fragmentation_strategy_.get ();
+}
+
+void
+TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
+{
+ CORBA::Octet * const buf =
+ reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
+
+ CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
+ CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
+
+ // Flags for the GIOP protocol header "flags" field.
+ CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
+
+ // Least significant bit: Byte order
+ ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
+
+ // Second least significant bit: More fragments
+ //
+ // Only supported in GIOP 1.1 or better.
+ if (!(major <= 1 && minor == 0))
+ ACE_SET_BITS (flags, msg.more_fragments () << 1);
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL